diff --git a/lib/urireader.cpp b/lib/urireader.cpp index ddf60011..92f288fe 100644 --- a/lib/urireader.cpp +++ b/lib/urireader.cpp @@ -288,19 +288,23 @@ namespace HTTP{ } void URIReader::readAll(size_t (*dataCallback)(const char *data, size_t len)){ - while (!isEOF()){readSome(dataCallback, 419430);} + while (!isEOF()){ + if (!readSome(dataCallback, 419430)){Util::sleep(50);} + } } /// Read all function, with use of callbacks void URIReader::readAll(Util::DataCallback &cb){ - while (!isEOF()){readSome(1048576, cb);} + while (!isEOF()){ + if (!readSome(1048576, cb)){Util::sleep(50);} + } } /// Read all blocking function, which internally uses the Nonblocking function. void URIReader::readAll(char *&dataPtr, size_t &dataLen){ if (getSize() != std::string::npos){allData.allocate(getSize());} while (!isEOF()){ - readSome(10046, *this); + if (!readSome(10046, *this)){Util::sleep(50);} bufPos = allData.size(); } dataPtr = allData; @@ -309,27 +313,29 @@ namespace HTTP{ void httpBodyCallback(const char *ptr, size_t size){INFO_MSG("callback");} - void URIReader::readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen){ + size_t URIReader::readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen){ /// TODO: Implement + return 0; } // readsome with callback - void URIReader::readSome(size_t wantedLen, Util::DataCallback &cb){ - if (isEOF()){return;} + size_t URIReader::readSome(size_t wantedLen, Util::DataCallback &cb){ + if (isEOF()){return 0;} // Files read from the memory-mapped file if (stateType == HTTP::File){ // Simple bounds check, don't read beyond the end of the file uint64_t dataLen = ((wantedLen + curPos) > totalSize) ? totalSize - curPos : wantedLen; cb.dataCallback(mapped + curPos, dataLen); curPos += dataLen; - return; + return dataLen; } // HTTP-based read from the Downloader if (stateType == HTTP::HTTP){ // Note: this function returns true if the full read was completed only. // It's the reason this function returns void rather than bool. + size_t prev = cb.getDataCallbackPos(); downer.continueNonBlocking(cb); - return; + return cb.getDataCallbackPos() - prev; } // Everything else uses the socket directly int s = downer.getSocket().Received().bytes(wantedLen); @@ -339,7 +345,7 @@ namespace HTTP{ s = downer.getSocket().Received().bytes(wantedLen); }else{ Util::sleep(50); - return; + return s; } } // Future optimization: augment the Socket::Buffer to handle a Util::DataCallback as argument. @@ -347,10 +353,11 @@ namespace HTTP{ Util::ResizeablePointer buf; downer.getSocket().Received().remove(buf, s); cb.dataCallback(buf, s); + return s; } /// Readsome blocking function. - void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){ + size_t URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){ // Clear the buffer if we're finished with it if (allData.size() && bufPos == allData.size()){ allData.truncate(0); @@ -365,12 +372,13 @@ namespace HTTP{ dataPtr = allData + bufPos; dataLen = wantedLen; bufPos += wantedLen; - return; + return wantedLen; } // Ok, we have a short count. Return the amount we actually got. dataPtr = allData + bufPos; dataLen = allData.size() - bufPos; bufPos = allData.size(); + return dataLen; } void URIReader::close(){ diff --git a/lib/urireader.h b/lib/urireader.h index 1cca08b2..50604421 100644 --- a/lib/urireader.h +++ b/lib/urireader.h @@ -1,7 +1,6 @@ #pragma once #include "downloader.h" #include "util.h" -#include namespace HTTP{ enum URIType{Closed = 0, File, Stream, HTTP}; @@ -37,11 +36,11 @@ namespace HTTP{ void readAll(Util::DataCallback &cb); /// Reads wantedLen bytes of data from current position, calling the dataCallback whenever minLen/maxLen require it. - void readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen); + size_t readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen); /// Reads wantedLen bytes of data from current position, returning it in a single buffer. - void readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen); + size_t readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen); - void readSome(size_t wantedLen, Util::DataCallback &cb); + size_t readSome(size_t wantedLen, Util::DataCallback &cb); /// Closes the currently open URI. Does not change the internal URI value. void close(); diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 275d7a7a..86e5344a 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -204,19 +204,18 @@ namespace Mist{ playlistMapping[pls.id] = pls; plsInitCount++; if (initOnly){ + INFO_MSG("Thread for %s exiting", pls.uri.c_str()); return; }// Exit because init-only mode - while (self->config->is_active){ - // If the timer has not expired yet, sleep up to a second. Otherwise, reload. - /// \TODO Sleep longer if that makes sense? + while (self->config->is_active && streamIsLive){ if (pls.reloadNext > Util::bootSecs()){ Util::sleep(1000); }else{ pls.reload(); } } - MEDIUM_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); + INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); } Playlist::Playlist(const std::string &uriSource){ @@ -280,206 +279,199 @@ namespace Mist{ int count = 0; std::istringstream urlSource; - std::ifstream fileSource; - if (isUrl()){ - HTTP::URIReader plsDL; - plsDL.open(uri); - char * dataPtr; - size_t dataLen; - plsDL.readAll(dataPtr, dataLen); - if (!dataLen){ - FAIL_MSG("Could not download playlist '%s', aborting.", uri.c_str()); - reloadNext = Util::bootSecs() + waitTime; - return false; - } - urlSource.str(std::string(dataPtr, dataLen)); - }else{ - fileSource.open(uri.c_str()); - if (!fileSource.good()){ - FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), uri.c_str()); - reloadNext = Util::bootSecs() + waitTime; - return false; - } + HTTP::URIReader plsDL; + plsDL.open(uri); + char * dataPtr; + size_t dataLen; + plsDL.readAll(dataPtr, dataLen); + if (!dataLen){ + FAIL_MSG("Could not download playlist '%s', aborting.", uri.c_str()); + reloadNext = Util::bootSecs() + waitTime; + return false; } + urlSource.str(std::string(dataPtr, dataLen)); - std::istream &input = (isUrl() ? (std::istream &)urlSource : (std::istream &)fileSource); + std::istream &input = (std::istream &)urlSource; std::getline(input, line); - DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); - while (std::getline(input, line)){ - DONTEVEN_MSG("Parsing line '%s'", line.c_str()); - cleanLine(line); - if (line.empty()){continue;}// skip empty lines + {// Mutex scope + tthread::lock_guard guard(entryMutex); + DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); + while (std::getline(input, line)){ + DONTEVEN_MSG("Parsing line '%s'", line.c_str()); + cleanLine(line); + if (line.empty()){continue;}// skip empty lines - if (line.compare(0, 7, "#EXTINF") == 0){ - segDur = atof(line.c_str() + 8); - continue; - } - if (line.compare(0, 7, "#EXT-X-") == 0){ - size_t pos = line.find(":"); - key = line.substr(7, pos - 7); - val = line.c_str() + pos + 1; + if (line.compare(0, 7, "#EXTINF") == 0){ + segDur = atof(line.c_str() + 8); + continue; + } + if (line.compare(0, 7, "#EXT-X-") == 0){ + size_t pos = line.find(":"); + key = line.substr(7, pos - 7); + val = line.c_str() + pos + 1; - if (key == "KEY"){ - size_t tmpPos = val.find("METHOD="); - size_t tmpPos2 = val.substr(tmpPos).find(","); - keyMethod = val.substr(tmpPos + 7, tmpPos2 - tmpPos - 7); + if (key == "KEY"){ + size_t tmpPos = val.find("METHOD="); + size_t tmpPos2 = val.substr(tmpPos).find(","); + keyMethod = val.substr(tmpPos + 7, tmpPos2 - tmpPos - 7); - tmpPos = val.find("URI=\""); - tmpPos2 = val.substr(tmpPos + 5).find("\""); - keyUri = val.substr(tmpPos + 5, tmpPos2); + tmpPos = val.find("URI=\""); + tmpPos2 = val.substr(tmpPos + 5).find("\""); + keyUri = val.substr(tmpPos + 5, tmpPos2); - tmpPos = val.find("IV="); - if (tmpPos != std::string::npos){keyIV = val.substr(tmpPos + 5, 32);} + tmpPos = val.find("IV="); + if (tmpPos != std::string::npos){keyIV = val.substr(tmpPos + 5, 32);} - // when key not found, download and store it in the map - if (!keys.count(keyUri)){ - HTTP::URIReader keyDL; - if (!keyDL.open(root.link(keyUri)) || !keyDL){ - FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); - continue; + // when key not found, download and store it in the map + if (!keys.count(keyUri)){ + HTTP::URIReader keyDL; + if (!keyDL.open(root.link(keyUri)) || !keyDL){ + FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); + continue; + } + char *keyPtr; + size_t keyLen; + keyDL.readAll(keyPtr, keyLen); + if (!keyLen){ + FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); + continue; + } + keys.insert(std::pair(keyUri, std::string(keyPtr, keyLen))); } - char *keyPtr; - size_t keyLen; - keyDL.readAll(keyPtr, keyLen); - if (!keyLen){ - FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); - continue; - } - keys.insert(std::pair(keyUri, std::string(keyPtr, keyLen))); + continue; } - continue; - } - if (key == "BYTERANGE"){ - size_t atSign = val.find('@'); - if (atSign != std::string::npos){ - std::string len = val.substr(0, atSign); - std::string pos = val.substr(atSign+1); - lenByte = atoll(len.c_str()); - startByte = atoll(pos.c_str()); - }else{ - lenByte = atoll(val.c_str()); - } - continue; - } - - if (key == "TARGETDURATION"){ - waitTime = atoi(val.c_str()) / 2; - if (waitTime < 2){waitTime = 2;} - continue; - } - - // Assuming this always comes before any segment - if (key == "MEDIA-SEQUENCE"){ - // Reinit the segment counter - firstIndex = atoll(val.c_str()); - bposCounter = firstIndex + 1; - continue; - } - - if (key == "PROGRAM-DATE-TIME"){ - nextUTC = ISO8601toUnixmillis(val); - continue; - } - - if (key == "MAP"){ - size_t mapLen = 0, mapOffset = 0; - size_t tmpPos = val.find("BYTERANGE=\""); - if (tmpPos != std::string::npos){ - size_t tmpPos2 = val.substr(tmpPos).find('"'); - mapRange = val.substr(tmpPos + 11, tmpPos2 - tmpPos - 11); - - size_t atSign = mapRange.find('@'); + if (key == "BYTERANGE"){ + size_t atSign = val.find('@'); if (atSign != std::string::npos){ - std::string len = mapRange.substr(0, atSign); - std::string pos = mapRange.substr(atSign+1); - mapLen = atoll(len.c_str()); - mapOffset = atoll(pos.c_str()); + std::string len = val.substr(0, atSign); + std::string pos = val.substr(atSign+1); + lenByte = atoll(len.c_str()); + startByte = atoll(pos.c_str()); }else{ - mapLen = atoll(val.c_str()); + lenByte = atoll(val.c_str()); } + continue; } - tmpPos = val.find("URI=\""); - if (tmpPos != std::string::npos){ - size_t tmpPos2 = val.substr(tmpPos + 5).find('"'); - mapUri = val.substr(tmpPos + 5, tmpPos2); + if (key == "TARGETDURATION"){ + waitTime = atoi(val.c_str()) / 2; + if (waitTime < 2){waitTime = 2;} + continue; } - // when key not found, download and store it in the map - if (!maps.count(mapUri+mapRange)){ - HTTP::URIReader mapDL; - if (!mapDL.open(root.link(mapUri)) || !mapDL){ - FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str()); - continue; - } - char *mapPtr; - size_t mapPLen; - mapDL.readAll(mapPtr, mapPLen); - if (mapOffset){ - if (mapOffset <= mapPLen){ - mapPtr += mapOffset; - mapPLen -= mapOffset; + // Assuming this always comes before any segment + if (key == "MEDIA-SEQUENCE"){ + // Reinit the segment counter + firstIndex = atoll(val.c_str()); + bposCounter = firstIndex + 1; + continue; + } + + if (key == "PROGRAM-DATE-TIME"){ + nextUTC = ISO8601toUnixmillis(val); + continue; + } + + if (key == "MAP"){ + size_t mapLen = 0, mapOffset = 0; + size_t tmpPos = val.find("BYTERANGE=\""); + if (tmpPos != std::string::npos){ + size_t tmpPos2 = val.substr(tmpPos).find('"'); + mapRange = val.substr(tmpPos + 11, tmpPos2 - tmpPos - 11); + + size_t atSign = mapRange.find('@'); + if (atSign != std::string::npos){ + std::string len = mapRange.substr(0, atSign); + std::string pos = mapRange.substr(atSign+1); + mapLen = atoll(len.c_str()); + mapOffset = atoll(pos.c_str()); }else{ - mapPLen = 0; + mapLen = atoll(val.c_str()); } } - if (!mapLen){mapLen = mapPLen;} - if (mapLen < mapPLen){mapPLen = mapLen;} - if (!mapPLen){ - FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str()); - continue; + + tmpPos = val.find("URI=\""); + if (tmpPos != std::string::npos){ + size_t tmpPos2 = val.substr(tmpPos + 5).find('"'); + mapUri = val.substr(tmpPos + 5, tmpPos2); } - maps.insert(std::pair(mapUri+mapRange, std::string(mapPtr, mapPLen))); + + // when key not found, download and store it in the map + if (!maps.count(mapUri+mapRange)){ + HTTP::URIReader mapDL; + if (!mapDL.open(root.link(mapUri)) || !mapDL){ + FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str()); + continue; + } + char *mapPtr; + size_t mapPLen; + mapDL.readAll(mapPtr, mapPLen); + if (mapOffset){ + if (mapOffset <= mapPLen){ + mapPtr += mapOffset; + mapPLen -= mapOffset; + }else{ + mapPLen = 0; + } + } + if (!mapLen){mapLen = mapPLen;} + if (mapLen < mapPLen){mapPLen = mapLen;} + if (!mapPLen){ + FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str()); + continue; + } + maps.insert(std::pair(mapUri+mapRange, std::string(mapPtr, mapPLen))); + } + continue; } - continue; - } - if (key == "PLAYLIST-TYPE"){ - if (val == "VOD"){ - streamIsVOD = true; + if (key == "PLAYLIST-TYPE"){ + if (val == "VOD"){ + streamIsVOD = true; + streamIsLive = false; + }else if (val == "LIVE"){ + streamIsVOD = false; + streamIsLive = true; + }else if (val == "EVENT"){ + streamIsVOD = true; + streamIsLive = true; + } + continue; + } + + // Once we see this tag, the entire playlist becomes VOD + if (key == "ENDLIST"){ streamIsLive = false; - }else if (val == "LIVE"){ - streamIsVOD = false; - streamIsLive = true; - }else if (val == "EVENT"){ streamIsVOD = true; - streamIsLive = true; + continue; } + VERYHIGH_MSG("ignoring line: %s.", line.c_str()); + continue; + } + if (line[0] == '#'){ + VERYHIGH_MSG("ignoring line: %s.", line.c_str()); continue; } - // Once we see this tag, the entire playlist becomes VOD - if (key == "ENDLIST"){ - streamIsLive = false; - streamIsVOD = true; - continue; + // check for already added segments + INFO_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); + if (bposCounter > lastSegment){ + char ivec[16]; + if (keyIV.size()){ + parseKey(keyIV, ivec, 16); + }else{ + memset(ivec, 0, 16); + Bit::htobll(ivec + 8, bposCounter); + } + addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), mapUri+mapRange, startByte, lenByte); + lastSegment = bposCounter; + ++count; } - VERYHIGH_MSG("ignoring line: %s.", line.c_str()); - continue; - } - if (line[0] == '#'){ - VERYHIGH_MSG("ignoring line: %s.", line.c_str()); - continue; - } - - // check for already added segments - DONTEVEN_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); - if (bposCounter > lastSegment){ - char ivec[16]; - if (keyIV.size()){ - parseKey(keyIV, ivec, 16); - }else{ - memset(ivec, 0, 16); - Bit::htobll(ivec + 8, bposCounter); - } - addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), mapUri+mapRange, startByte, lenByte); - lastSegment = bposCounter; - ++count; - } + }// Mutex scope nextUTC = 0; segDur = 0.0; startByte = std::string::npos; @@ -487,11 +479,6 @@ namespace Mist{ ++bposCounter; } - // VOD over HTTP needs to be processed as LIVE. - if (!isUrl()){ - fileSource.close(); - } - if (globalWaitTime < waitTime){globalWaitTime = waitTime;} reloadNext = Util::bootSecs() + waitTime; @@ -512,28 +499,25 @@ namespace Mist{ DTSC::veryUglyJitterOverride = entry.duration * 1000; } - { - tthread::lock_guard guard(entryMutex); - if (id && listEntries[id].size()){ - // If the UTC has gone backwards, shift forward. - playListEntries & prev = listEntries[id].back(); - if (nextUTC && prev.mUTC && nextUTC < prev.mUTC + prev.duration * 1000){ - WARN_MSG("UTC time went from %s to %s; adjusting!", Util::getUTCStringMillis(prev.mUTC + prev.duration * 1000).c_str(), Util::getUTCStringMillis(nextUTC).c_str()); - // Reset UTC time, this will cause the next check to set it correctly - nextUTC = 0; - } - // If we ever had a UTC time, ensure it's set for all segments going forward - if (!nextUTC && prev.mUTC){ - nextUTC = prev.mUTC + (uint64_t)(prev.duration * 1000); - } - // If startByte unknown and we have a length, calculate it from previous entry - if (startByte == std::string::npos && lenByte){ - if (filename == prev.relative_filename){startByte = prev.stopAtByte;} - } - }else{ - // If startByte unknown and we have a length, set to zero - if (startByte == std::string::npos && lenByte){startByte = 0;} + if (id && listEntries[id].size()){ + // If the UTC has gone backwards, shift forward. + playListEntries & prev = listEntries[id].back(); + if (nextUTC && prev.mUTC && nextUTC < prev.mUTC + prev.duration * 1000){ + WARN_MSG("UTC time went from %s to %s; adjusting!", Util::getUTCStringMillis(prev.mUTC + prev.duration * 1000).c_str(), Util::getUTCStringMillis(nextUTC).c_str()); + // Reset UTC time, this will cause the next check to set it correctly + nextUTC = 0; } + // If we ever had a UTC time, ensure it's set for all segments going forward + if (!nextUTC && prev.mUTC){ + nextUTC = prev.mUTC + (uint64_t)(prev.duration * 1000); + } + // If startByte unknown and we have a length, calculate it from previous entry + if (startByte == std::string::npos && lenByte){ + if (filename == prev.relative_filename){startByte = prev.stopAtByte;} + } + }else{ + // If startByte unknown and we have a length, set to zero + if (startByte == std::string::npos && lenByte){startByte = 0;} } if ((lenByte && startByte == std::string::npos) || (!lenByte && startByte != std::string::npos)){ WARN_MSG("Invalid byte range entry for segment: %s", filename.c_str()); @@ -565,20 +549,18 @@ namespace Mist{ entry.timestamp = lastTimestamp + startTime; } lastTimestamp = entry.timestamp - startTime + duration; - { - tthread::lock_guard guard(entryMutex); - // Set a playlist ID if we haven't assigned one yet. - // Note: This method requires never removing playlists, only adding. - // The mutex assures we have a unique count/number. - if (!id){id = listEntries.size() + 1;} - if (entry.startAtByte){ - HIGH_MSG("Adding entry '%s' (%" PRIu64 "-%" PRIu64 ") to ID %u", filename.c_str(), entry.startAtByte, entry.stopAtByte, id); - }else{ - HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id); - } - playlist_urls[JSON::Value(id).asString()] = relurl; - listEntries[id].push_back(entry); + + // Set a playlist ID if we haven't assigned one yet. + // Note: This method requires never removing playlists, only adding. + // The mutex assures we have a unique count/number. + if (!id){id = listEntries.size() + 1;} + if (entry.startAtByte){ + HIGH_MSG("Adding entry '%s' (%" PRIu64 "-%" PRIu64 ") to ID %u", filename.c_str(), entry.startAtByte, entry.stopAtByte, id); + }else{ + HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id); } + playlist_urls[JSON::Value(id).asString()] = relurl; + listEntries[id].push_back(entry); } /// Constructor of HLS Input @@ -648,7 +630,7 @@ namespace Mist{ return false; } - if (!initPlaylist(config->getString("input"), false)){ + if (!initPlaylist(config->getString("input"), true)){ Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); return false; } @@ -979,10 +961,6 @@ namespace Mist{ return true; } - void InputHLS::streamMainLoop(){ - parseLivePoint(); - } - // Removes any metadata which is no longer and the playlist or buffered in memory void InputHLS::updateMeta(){ // EVENT and VOD type playlists should never segments disappear from the start @@ -1009,6 +987,7 @@ namespace Mist{ if (listEntries[currentPlaylist].back().timestamp - listEntries[currentPlaylist].front().timestamp < bufferTime){ break; } + if (keys.getValidCount() <= 3){break;} // First key could still be in memory, but is no longer seekable: drop it HIGH_MSG("Removing key %lu @%lu ms on track %lu from metadata", M.getKeys(trackIdx->first).getFirstValid(), M.getFirstms(trackIdx->first), trackIdx->first); meta.removeFirstKey(trackIdx->first); @@ -1021,11 +1000,8 @@ namespace Mist{ // Update all playlists to make sure listEntries contains all live segments for (std::map::iterator pListIt = playlistMapping.begin(); pListIt != playlistMapping.end(); pListIt++){ - if (pListIt->second.reloadNext < Util::bootSecs()){ - pListIt->second.reload(); - } currentPlaylist = pListIt->first; - const uint64_t firstIdx = playlistMapping[currentPlaylist].firstIndex; + const uint64_t firstIdx = pListIt->second.firstIndex; // If the segment counter decreases, reset counters and remove old segments from metadata if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){ @@ -1036,12 +1012,9 @@ namespace Mist{ } // Remove segments from listEntries as soon as it is no longer requestable - { - tthread::lock_guard guard(entryMutex); - while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){ - MEDIUM_MSG("Segment #%lu no longer in the input playlist", firstIdx + 1); - listEntries[currentPlaylist].pop_front(); - } + while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){ + INFO_MSG("Segment #%" PRIu64 " no longer in the input playlist", listEntries[currentPlaylist].front().bytePos); + listEntries[currentPlaylist].pop_front(); } // Unload memory pages which are outside of the buffer window and not recently loaded @@ -1052,20 +1025,18 @@ namespace Mist{ // Check for new segments if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){ - INFO_MSG("Playlist #%lu has parsed %" PRIu64 "/%zu entries. Parsing new segments...", currentPlaylist, parsedSegments[currentPlaylist] - firstIdx, listEntries[currentPlaylist].size()); + INFO_MSG("Playlist #%lu has parsed %" PRId64 "/%zu entries. Parsing new segments...", currentPlaylist, (int64_t)(parsedSegments[currentPlaylist] - firstIdx), listEntries[currentPlaylist].size()); }else if (isInitialRun){ isInitialRun = false; } if (parsedSegments[currentPlaylist] < firstIdx){ - WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx); - parsedSegments[currentPlaylist] = firstIdx; + WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx + listEntries[currentPlaylist].size() - 1); + parsedSegments[currentPlaylist] = firstIdx + listEntries[currentPlaylist].size() - 1; } for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){ - MEDIUM_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1); - if (parseSegmentAsLive(entryIt)){ - parsedSegments[currentPlaylist] = firstIdx + entryIt + 1; - } - if (Util::bootMS() > maxTime){return;} + INFO_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1); + if (parseSegmentAsLive(entryIt)){parsedSegments[currentPlaylist] = firstIdx + entryIt + 1;} + if (Util::bootMS() > maxTime){break;} } } } @@ -1074,6 +1045,7 @@ namespace Mist{ void InputHLS::userLeadOut(){ Input::userLeadOut(); if (streamIsLive){ + tthread::lock_guard guard(entryMutex); parseLivePoint(); } } @@ -1118,7 +1090,7 @@ namespace Mist{ currentPlaylist = firstSegment(); } if (currentPlaylist == 0){ - VERYHIGH_MSG("Waiting for segments..."); + INFO_MSG("Waiting for segments..."); Util::wait(500); continue; } diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 4603cd75..ef324797 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -156,7 +156,6 @@ namespace Mist{ void parseStreamHeader(); void parseLivePoint(); - void streamMainLoop(); uint32_t getMappedTrackId(uint64_t id); uint32_t getMappedTrackPlaylist(uint64_t id);