From 6e4256f06b7f9e2db7522c2cd2082df95a25e22b Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Sat, 11 May 2024 12:12:33 +0200 Subject: [PATCH] Finished fixes to HLS input --- src/input/input_hls.cpp | 368 +++++++++++++++++++--------------------- src/input/input_hls.h | 3 +- 2 files changed, 177 insertions(+), 194 deletions(-) diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 86e5344a..e1e7cb04 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -148,7 +148,7 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){ namespace Mist{ /// Save playlist objects for manual reloading - static std::map playlistMapping; + std::map playlistMapping; /// Track which segment numbers have been parsed std::map parsedSegments; @@ -190,32 +190,33 @@ namespace Mist{ bool initOnly = false; if (((char *)ptr)[0] == ';'){initOnly = true;} - Playlist pls(initOnly ? ((char *)ptr) + 1 : (char *)ptr); + Playlist *pls = new Playlist(initOnly ? ((char *)ptr) + 1 : (char *)ptr); plsTotalCount++; + pls->id = plsTotalCount; + playlistMapping[pls->id] = pls; // signal that we have now copied the URL and no longer need it ((char *)ptr)[0] = 0; - if (!pls.uri.size()){ + if (!pls->uri.size()){ FAIL_MSG("Variant playlist URL is empty, aborting update thread."); return; } - pls.reload(); - playlistMapping[pls.id] = pls; + pls->reload(); plsInitCount++; if (initOnly){ - INFO_MSG("Thread for %s exiting", pls.uri.c_str()); + INFO_MSG("Thread for %s exiting", pls->uri.c_str()); return; }// Exit because init-only mode while (self->config->is_active && streamIsLive){ - if (pls.reloadNext > Util::bootSecs()){ + if (pls->reloadNext > Util::bootSecs()){ Util::sleep(1000); }else{ - pls.reload(); + pls->reload(); } } - INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); + INFO_MSG("Downloader thread for '%s' exiting", pls->uri.c_str()); } Playlist::Playlist(const std::string &uriSource){ @@ -296,6 +297,7 @@ namespace Mist{ std::getline(input, line); {// Mutex scope + // Block the main thread from reading listEntries and firstIndex tthread::lock_guard guard(entryMutex); DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); while (std::getline(input, line)){ @@ -458,8 +460,9 @@ namespace Mist{ } // check for already added segments - INFO_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); + VERYHIGH_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); if (bposCounter > lastSegment){ + INFO_MSG("Playlist #%u: Adding new segment #%" PRIu64 " to playlist entries", id, bposCounter); char ivec[16]; if (keyIV.size()){ parseKey(keyIV, ivec, 16); @@ -471,12 +474,12 @@ namespace Mist{ lastSegment = bposCounter; ++count; } + nextUTC = 0; + segDur = 0.0; + startByte = std::string::npos; + lenByte = 0; + ++bposCounter; }// Mutex scope - nextUTC = 0; - segDur = 0.0; - startByte = std::string::npos; - lenByte = 0; - ++bposCounter; } if (globalWaitTime < waitTime){globalWaitTime = waitTime;} @@ -565,7 +568,7 @@ namespace Mist{ /// Constructor of HLS Input InputHLS::InputHLS(Util::Config *cfg) : Input(cfg){ - zUTC = nUTC = 0; + zUTC = 0; self = this; streamIsLive = true; //< default to sliding window playlist streamIsVOD = false; //< default to sliding window playlist @@ -630,7 +633,7 @@ namespace Mist{ return false; } - if (!initPlaylist(config->getString("input"), true)){ + if (!initPlaylist(config->getString("input"), false)){ Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); return false; } @@ -643,6 +646,14 @@ namespace Mist{ return true; } + void InputHLS::postHeader(){ + // Run continuous playlist updaters after the main thread is forked + if (!initPlaylist(config->getString("input"), true)){ + Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); + config->is_active = false; + } + } + bool InputHLS::readExistingHeader(){ if (!Input::readExistingHeader()){ INFO_MSG("Could not read existing header, regenerating"); @@ -670,18 +681,18 @@ namespace Mist{ HTTP::URL root = HTTP::localURIResolver().link(config->getString("input")); jsonForEachConst(M.inputLocalVars["playlistEntries"], i){ uint64_t plNum = JSON::Value(i.key()).asInt(); - if (M.inputLocalVars["playlistEntries"][i.key()].size() < listEntries[plNum].size()){ + if (M.inputLocalVars["playlistEntries"][i.key()].size() > listEntries[plNum].size()){ INFO_MSG("Header needs update as the amount of segments in the playlist has decreased, regenerating header"); return false; } std::deque newList; jsonForEachConst(*i, j){ const JSON::Value & thisEntry = *j; - if (thisEntry[1u].asInt() < playlistMapping[plNum].firstIndex + 1){ + if (thisEntry[1u].asInt() < playlistMapping[plNum]->firstIndex + 1){ INFO_MSG("Skipping segment %lu which is present in the header, but no longer available in the playlist", thisEntry[1u].asInt()); continue; } - if (thisEntry[1u].asInt() > playlistMapping[plNum].firstIndex + listEntries[plNum].size()){ + if (thisEntry[1u].asInt() > playlistMapping[plNum]->firstIndex + listEntries[plNum].size()){ INFO_MSG("Header needs update as the segment index has decreased. The stream has likely restarted, regenerating"); return false; } @@ -727,12 +738,12 @@ namespace Mist{ uint64_t key = JSON::Value(i.key()).asInt(); uint64_t val = i->asInt(); // If there was a jump in MEDIA-SEQUENCE, start from there - if (val < playlistMapping[key].firstIndex){ - INFO_MSG("Detected a jump in MEDIA-SEQUENCE, adjusting segment counter from %lu to %lu", val, playlistMapping[key].firstIndex); - val = playlistMapping[key].firstIndex; + if (val < playlistMapping[key]->firstIndex){ + INFO_MSG("Detected a jump in MEDIA-SEQUENCE, adjusting segment counter from %lu to %lu", val, playlistMapping[key]->firstIndex); + val = playlistMapping[key]->firstIndex; } parsedSegments[key] = val; - playlistMapping[key].lastSegment = val; + playlistMapping[key]->lastSegment = val; INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val); } @@ -770,7 +781,7 @@ namespace Mist{ std::string lastMapName; uint32_t entId = 0; bool foundAtLeastOnePacket = false; - VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); + VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first]->firstIndex); for (std::deque::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end() && config->is_active; entryIt++){ @@ -778,7 +789,7 @@ namespace Mist{ if (entryIt->mapName != lastMapName){ lastMapName = entryIt->mapName; - segDowner.setInit(playlistMapping[pListIt->first].maps[lastMapName]); + segDowner.setInit(playlistMapping[pListIt->first]->maps[lastMapName]); } if (!loadSegment(segDowner, *entryIt)){ FAIL_MSG("Failed to load segment - skipping to next"); @@ -789,26 +800,26 @@ namespace Mist{ DTSC::Packet headerPack; while (config->is_active && readNext(segDowner, headerPack, entryIt->bytePos)){ if (!config->is_active){return false;} - if (headerPack){ - size_t tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); - uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - segDowner.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } - if (!streamIsLive){ - headerPack.getString("data", data, dataLen); - - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); - } - foundAtLeastOnePacket = true; + if (!headerPack){ + continue; } + size_t tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + segDowner.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } + if (!streamIsLive){ + headerPack.getString("data", data, dataLen); + + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + INSANE_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe")); + } + foundAtLeastOnePacket = true; } // Finally save the offset as part of the TS segment. This is required for bufferframe // to work correctly, since not every segment might have an UTC timestamp tag @@ -824,7 +835,7 @@ namespace Mist{ } // If live, don't actually parse anything. If non-live, we read all the segments - parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + (streamIsLive ? 0 : currentSegment); + parsedSegments[pListIt->first] = playlistMapping[pListIt->first]->firstIndex + (streamIsLive ? 0 : currentSegment); // For still-appending streams, only parse the first segment for each playlist if (streamIsLive && foundAtLeastOnePacket){break;} @@ -892,7 +903,7 @@ namespace Mist{ /// \param segmentIndex: the index of the segment in the current playlist /// \return True if the segment has been buffered successfully bool InputHLS::parseSegmentAsLive(uint64_t segmentIndex){ - bool hasOffset = false; + allowRemap = true; //< New segment, so allow timestamp remap uint64_t bufferTime = config->getInteger("pagetimeout"); if (config->hasOption("bufferTime")){ bufferTime = config->getInteger("bufferTime") / 1000; @@ -904,7 +915,7 @@ namespace Mist{ TS::Stream tsStream; char *data; size_t dataLen; - // Get the updated list of entries + // Get the updated list of entries. Safe access to listEntries is handled at a higher level std::deque &curList = listEntries[currentPlaylist]; if (curList.size() <= segmentIndex){ FAIL_MSG("Tried to load segment with index '%" PRIu64 "', but the playlist only contains '%zu' entries!", segmentIndex, curList.size()); @@ -913,7 +924,7 @@ namespace Mist{ playListEntries & ntry = curList.at(segmentIndex); if (ntry.mapName.size()){ - segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]); + segDowner.setInit(playlistMapping[currentPlaylist]->maps[ntry.mapName]); } if (!loadSegment(segDowner, ntry)){ FAIL_MSG("Failed to load segment"); @@ -921,42 +932,38 @@ namespace Mist{ } DTSC::Packet headerPack; - while (readNext(segDowner, headerPack, curList.at(segmentIndex).bytePos)){ - if (headerPack){ - size_t tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); - uint64_t packetTime = headerPack.getTime(); - // Set segment offset and save it - if (!hasOffset && curList.at(segmentIndex).mUTC){ - hasOffset = true; - DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; - MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); - curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; - } - if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ - hasOffset = true; - packetTime += DVRTimeOffsets[currentPlaylist]; - HIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime); - } - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } - playlistMapping[currentPlaylist].tracks[idx] = true; - - headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); - if (isInitialRun){ - pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; - }else{ - pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); - } - tsStream.getEarliestPacket(headerPack); + while (config->is_active && readNext(segDowner, headerPack, curList.at(segmentIndex).bytePos)){ + if (!headerPack){ + continue; } + size_t tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); + uint64_t packetTime = headerPack.getTime(); + + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + tsStream.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } + if (ntry.timeOffset){ + packetTime += ntry.timeOffset; + }else{ + packetTime = getPacketTime(packetTime, idx, currentPlaylist, ntry.mUTC); + } + // Mark which tracks need to be checked for removing expired metadata + playlistMapping[currentPlaylist]->tracks[idx] = true; + + headerPack.getString("data", data, dataLen); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + VERYHIGH_MSG("Adding packet (%zuB) at timestamp %" PRIu64 " -> %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, headerPack.getTime(), packetTime, packOffset, idx); + bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); + if (isInitialRun){ + pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; + }else{ + pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); + } + tsStream.getEarliestPacket(headerPack); } return true; } @@ -969,8 +976,8 @@ namespace Mist{ return; } - for (std::map::iterator trackIdx = playlistMapping[currentPlaylist].tracks.begin(); - trackIdx != playlistMapping[currentPlaylist].tracks.end(); trackIdx++){ + for (std::map::iterator trackIdx = playlistMapping[currentPlaylist]->tracks.begin(); + trackIdx != playlistMapping[currentPlaylist]->tracks.end(); trackIdx++){ // Calc after how many MS segments are no longer part of the buffer window uint64_t bufferTime = config->getInteger("pagetimeout"); if (config->hasOption("bufferTime")){ @@ -979,7 +986,7 @@ namespace Mist{ // Remove keys which are not requestable anymore while (true) { DTSC::Keys keys = M.getKeys(trackIdx->first); - // Stop if the earliest key is still in the playlist + // Stop if the earliest key is still in the playlist. Safe access to listEntries is handled at a higher level if (listEntries[currentPlaylist].front().bytePos <= keys.getBpos(keys.getFirstValid())){ break; } @@ -997,45 +1004,58 @@ namespace Mist{ void InputHLS::parseLivePoint(){ uint64_t maxTime = Util::bootMS() + 500; - // Update all playlists to make sure listEntries contains all live segments - for (std::map::iterator pListIt = playlistMapping.begin(); + // Block playlist runners from updating listEntries while the main thread is accessing it + tthread::lock_guard guard(entryMutex); + // Iterate over all playlists, parse new segments as they've appeared in listEntries and remove expired entries in listEntries + for (std::map::iterator pListIt = playlistMapping.begin(); pListIt != playlistMapping.end(); pListIt++){ currentPlaylist = pListIt->first; - const uint64_t firstIdx = pListIt->second.firstIndex; - // If the segment counter decreases, reset counters and remove old segments from metadata - if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){ - WARN_MSG("Segment counter for playlist %lu has decreased to %lu. Exiting to reset stream", currentPlaylist, firstIdx); + if (!listEntries[currentPlaylist].size()){ + continue; + } + + // Remove segments from listEntries if they're no longer requestable + while (listEntries[currentPlaylist].front().bytePos < playlistMapping[currentPlaylist]->firstIndex + 1){ + INFO_MSG("Playlist #%" PRIu64 ": Segment #%" PRIu64 " no longer in the input playlist", currentPlaylist, listEntries[currentPlaylist].front().bytePos); + listEntries[currentPlaylist].pop_front(); + } + + uint64_t firstSegment = listEntries[currentPlaylist].front().bytePos; + uint64_t lastParsedSegment = parsedSegments[currentPlaylist]; + uint64_t lastSegment = listEntries[currentPlaylist].back().bytePos; + + // Skip ahead if we've missed segments which are no longer in the playlist + if (lastParsedSegment < firstSegment - 1){ + WARN_MSG("Playlist #%" PRIu64 ": Skipping from segment #%" PRIu64 " to segment #%" PRIu64 " since we've fallen behind", currentPlaylist, lastParsedSegment, firstSegment); + parsedSegments[currentPlaylist] = firstSegment - 1; + lastParsedSegment = parsedSegments[currentPlaylist]; + } + + // If the segment counter decreases, restart to reinit counters and metadata + if (lastParsedSegment > lastSegment){ + WARN_MSG("Playlist #%" PRIu64 ": Segment counter has decreased from %" PRIu64 " to %" PRIu64 ". Exiting to reset stream", currentPlaylist, lastParsedSegment, firstSegment); config->is_active = false; Util::logExitReason(ER_FORMAT_SPECIFIC, "Segment counter decreased. Exiting to reset stream"); return; } - // Remove segments from listEntries as soon as it is no longer requestable - while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){ - INFO_MSG("Segment #%" PRIu64 " no longer in the input playlist", listEntries[currentPlaylist].front().bytePos); - listEntries[currentPlaylist].pop_front(); - } - // Unload memory pages which are outside of the buffer window and not recently loaded removeUnused(); // Remove meta info for expired keys updateMeta(); - // Check for new segments - if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){ - INFO_MSG("Playlist #%lu has parsed %" PRId64 "/%zu entries. Parsing new segments...", currentPlaylist, (int64_t)(parsedSegments[currentPlaylist] - firstIdx), listEntries[currentPlaylist].size()); + // Parse new segments in listEntries + if (lastParsedSegment < lastSegment){ + INFO_MSG("Playlist #%lu: Parsed %" PRIu64 "/%" PRIu64 " entries. Parsing new segments...", currentPlaylist, lastParsedSegment, lastSegment); }else if (isInitialRun){ isInitialRun = false; } - if (parsedSegments[currentPlaylist] < firstIdx){ - WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx + listEntries[currentPlaylist].size() - 1); - parsedSegments[currentPlaylist] = firstIdx + listEntries[currentPlaylist].size() - 1; - } - for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){ - INFO_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1); - if (parseSegmentAsLive(entryIt)){parsedSegments[currentPlaylist] = firstIdx + entryIt + 1;} + for(uint64_t entryIt = 1 + lastParsedSegment - firstSegment; entryIt < listEntries[currentPlaylist].size(); entryIt++){ + INFO_MSG("Playlist #%lu: Parsing segment #%" PRIu64 " as live data", currentPlaylist, firstSegment + entryIt); + if (parseSegmentAsLive(entryIt)){parsedSegments[currentPlaylist] = firstSegment + entryIt;} + // Rotate between playlists if there are lots of entries to parse if (Util::bootMS() > maxTime){break;} } } @@ -1045,7 +1065,6 @@ namespace Mist{ void InputHLS::userLeadOut(){ Input::userLeadOut(); if (streamIsLive){ - tthread::lock_guard guard(entryMutex); parseLivePoint(); } } @@ -1053,66 +1072,43 @@ namespace Mist{ bool InputHLS::openStreamSource(){return true;} void InputHLS::getNext(size_t idx){ - INSANE_MSG("Getting next"); uint32_t tid = 0; thisPacket.null(); + uint64_t segIdx = listEntries[currentPlaylist].at(currentIndex).bytePos; while (config->is_active && (needsLock() || keepAlive())){ // Check if we have a packet - if (readNext(segDowner, thisPacket, listEntries[currentPlaylist].at(currentIndex).bytePos)){ - if (thisPacket){ - tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); - // Is it one we want? - if (idx == INVALID_TRACK_ID || getMappedTrackId(M.getID(idx)) == thisPacket.getTrackId()){ - uint64_t packetTime = thisPacket.getTime(); - if (listEntries[currentPlaylist].at(currentIndex).timeOffset){ - packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset; - }else{ - packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); - } - INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); - // overwrite trackId on success - Bit::htobl(thisPacket.getData() + 8, tid); - Bit::htobll(thisPacket.getData() + 12, packetTime); - thisTime = packetTime; - thisIdx = tid; - return; // Success! - } + if (readNext(segDowner, thisPacket, segIdx)){ + if (!thisPacket){continue;} + tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); + uint64_t packetTime = thisPacket.getTime(); + if (listEntries[currentPlaylist].at(currentIndex).timeOffset){ + packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset; + }else{ + packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, listEntries[currentPlaylist].at(currentIndex).mUTC); + } + // Is it one we want? + if (idx == INVALID_TRACK_ID || getMappedTrackId(M.getID(idx)) == thisPacket.getTrackId()){ + INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); + // overwrite trackId on success + Bit::htobl(thisPacket.getData() + 8, tid); + Bit::htobll(thisPacket.getData() + 12, packetTime); + thisTime = packetTime; + return; // Success! } continue; } // No? Then we want to try reading the next file. - - // No segments? Wait until next playlist reloading time. - if (idx != INVALID_TRACK_ID){ - currentPlaylist = getMappedTrackPlaylist(M.getID(idx)); - }else{ - currentPlaylist = firstSegment(); - } - if (currentPlaylist == 0){ - INFO_MSG("Waiting for segments..."); - Util::wait(500); - continue; - } - // Now that we know our playlist is up-to-date, actually try to read the file. VERYHIGH_MSG("Moving on to next TS segment (variant %" PRIu64 ")", currentPlaylist); if (readNextFile()){ + allowRemap = true; MEDIUM_MSG("Next segment read successfully"); + segIdx = listEntries[currentPlaylist].at(currentIndex).bytePos; continue; // Success! Continue regular parsing. - }else{ - if (userSelect.size() > 1){ - // failed to read segment for playlist, dropping it - WARN_MSG("Dropping variant %" PRIu64 " because we couldn't read anything from it", currentPlaylist); - tthread::lock_guard guard(entryMutex); - listEntries.erase(currentPlaylist); - if (listEntries.size()){continue;} - } } - // Nothing works! - // HLS input will now quit trying to prevent severe mental depression. - Util::logExitReason(ER_CLEAN_EOF, "No packets can be read - exhausted all playlists"); + // Reached the end of the playlist thisPacket.null(); return; } @@ -1124,8 +1120,7 @@ namespace Mist{ plsLastTime.clear(); plsInterval.clear(); segDowner.reset(); - uint64_t trackId = M.getID(idx); - currentPlaylist = getMappedTrackPlaylist(trackId); + currentPlaylist = getMappedTrackPlaylist(M.getID(idx)); unsigned long plistEntry = 0; DTSC::Keys keys = M.getKeys(idx); @@ -1135,11 +1130,11 @@ namespace Mist{ break; } // Keys can still be accessible in memory. Skip any segments we cannot seek to in the playlist - if (keys.getBpos(i) <= playlistMapping[currentPlaylist].firstIndex){ + if (keys.getBpos(i) <= playlistMapping[currentPlaylist]->firstIndex){ INSANE_MSG("Skipping segment #%lu (key %lu @ %lu ms) for seeking, as it is no longer available in the playlist", keys.getBpos(i) - 1, i, keys.getTime(i)); continue; } - plistEntry = keys.getBpos(i) - 1 - playlistMapping[currentPlaylist].firstIndex; + plistEntry = keys.getBpos(i) - 1 - playlistMapping[currentPlaylist]->firstIndex; INSANE_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), plistEntry); } currentIndex = plistEntry; @@ -1160,14 +1155,16 @@ namespace Mist{ } playListEntries & e = curPlaylist.at(currentIndex); if (e.mapName.size()){ - segDowner.setInit(playlistMapping[currentPlaylist].maps[e.mapName]); + segDowner.setInit(playlistMapping[currentPlaylist]->maps[e.mapName]); } loadSegment(segDowner, e); - // If we have an offset, load it - allowRemap = false; if (e.timeOffset){ + // If we have an offset, load it + allowRemap = false; HIGH_MSG("Setting time offset of this TS segment to %" PRId64, e.timeOffset); plsTimeOffset[currentPlaylist] = e.timeOffset; + }else{ + allowRemap = true; } } } @@ -1426,7 +1423,6 @@ namespace Mist{ size_t pos = line.find(":"); std::string val = line.c_str() + pos + 1; zUTC = ISO8601toUnixmillis(val) - uint64_t(timestampSum); - nUTC = zUTC; INFO_MSG("Setting program unix start time to '%s' (%" PRIu64 ")", line.substr(pos + 1).c_str(), zUTC); // store offset so that we can set it after reading the header streamOffset = zUTC - (Util::unixMS() - Util::bootMS()); @@ -1486,7 +1482,6 @@ namespace Mist{ bool InputHLS::readNextFile(){ segDowner.reset(); - playListEntries ntry; // This scope limiter prevents the recursion down below from deadlocking us { // Switch to next file @@ -1495,39 +1490,28 @@ namespace Mist{ std::deque &curList = listEntries[currentPlaylist]; HIGH_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); if (curList.size() <= currentIndex){ - if (streamIsLive){ - INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist); - if (Util::bootSecs() < ntry.timestamp){ - VERYHIGH_MSG("Slowing down to realtime..."); - while (Util::bootSecs() < ntry.timestamp){ - keepAlive(); - Util::wait(250); - } - } - }else{ - INFO_MSG("Reached last entry in playlist %" PRIu64, currentPlaylist); - } return false; } - ntry = curList[currentIndex]; - } + playListEntries & ntry = curList.at(currentIndex); - if (ntry.mapName.size()){ - segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]); + if (ntry.mapName.size()){ + segDowner.setInit(playlistMapping[currentPlaylist]->maps[ntry.mapName]); + } + if (!loadSegment(segDowner, ntry)){ + ERROR_MSG("Could not download segment: %s", ntry.filename.c_str()); + return false; + } + // If we have an offset, load it + if (ntry.timeOffset){ + allowRemap = false; + plsTimeOffset[currentPlaylist] = ntry.timeOffset; + // Else allow of the offset to be set by getPacketTime + }else{ + allowRemap = true; + } + return true; } - if (!loadSegment(segDowner, ntry)){ - ERROR_MSG("Could not download segment: %s", ntry.filename.c_str()); - return readNextFile(); // Attempt to read another, if possible. - } - allowRemap = false; - // If we have an offset, load it - if (ntry.timeOffset){ - plsTimeOffset[currentPlaylist] = ntry.timeOffset; - // Else allow of the offset to be set by getPacketTime - }else{ - nUTC = ntry.mUTC; - } - return true; + return false; } /// return the playlist id from which we need to read the first upcoming segment diff --git a/src/input/input_hls.h b/src/input/input_hls.h index ef324797..78b4aa75 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -107,7 +107,6 @@ namespace Mist{ protected: uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis - uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header unsigned int startTime; SegmentReader segDowner; @@ -120,7 +119,6 @@ namespace Mist{ std::map pidMapping; std::map pidMappingR; std::map plsTimeOffset; - std::map DVRTimeOffsets; std::map plsLastTime; std::map plsInterval; @@ -146,6 +144,7 @@ namespace Mist{ bool preSetup(); bool readHeader(); bool readExistingHeader(); + void postHeader(); void getNext(size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);