Finished fixes to HLS input

This commit is contained in:
Marco van Dijk 2024-05-11 12:12:33 +02:00 committed by Thulinma
parent e4a2da45d3
commit 6e4256f06b
2 changed files with 177 additions and 194 deletions

View file

@ -148,7 +148,7 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){
namespace Mist{ namespace Mist{
/// Save playlist objects for manual reloading /// Save playlist objects for manual reloading
static std::map<uint64_t, Playlist> playlistMapping; std::map<uint64_t, Playlist*> playlistMapping;
/// Track which segment numbers have been parsed /// Track which segment numbers have been parsed
std::map<uint64_t, uint64_t> parsedSegments; std::map<uint64_t, uint64_t> parsedSegments;
@ -190,32 +190,33 @@ namespace Mist{
bool initOnly = false; bool initOnly = false;
if (((char *)ptr)[0] == ';'){initOnly = true;} if (((char *)ptr)[0] == ';'){initOnly = true;}
Playlist pls(initOnly ? ((char *)ptr) + 1 : (char *)ptr); Playlist *pls = new Playlist(initOnly ? ((char *)ptr) + 1 : (char *)ptr);
plsTotalCount++; plsTotalCount++;
pls->id = plsTotalCount;
playlistMapping[pls->id] = pls;
// signal that we have now copied the URL and no longer need it // signal that we have now copied the URL and no longer need it
((char *)ptr)[0] = 0; ((char *)ptr)[0] = 0;
if (!pls.uri.size()){ if (!pls->uri.size()){
FAIL_MSG("Variant playlist URL is empty, aborting update thread."); FAIL_MSG("Variant playlist URL is empty, aborting update thread.");
return; return;
} }
pls.reload(); pls->reload();
playlistMapping[pls.id] = pls;
plsInitCount++; plsInitCount++;
if (initOnly){ if (initOnly){
INFO_MSG("Thread for %s exiting", pls.uri.c_str()); INFO_MSG("Thread for %s exiting", pls->uri.c_str());
return; return;
}// Exit because init-only mode }// Exit because init-only mode
while (self->config->is_active && streamIsLive){ while (self->config->is_active && streamIsLive){
if (pls.reloadNext > Util::bootSecs()){ if (pls->reloadNext > Util::bootSecs()){
Util::sleep(1000); Util::sleep(1000);
}else{ }else{
pls.reload(); pls->reload();
} }
} }
INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); INFO_MSG("Downloader thread for '%s' exiting", pls->uri.c_str());
} }
Playlist::Playlist(const std::string &uriSource){ Playlist::Playlist(const std::string &uriSource){
@ -296,6 +297,7 @@ namespace Mist{
std::getline(input, line); std::getline(input, line);
{// Mutex scope {// Mutex scope
// Block the main thread from reading listEntries and firstIndex
tthread::lock_guard<tthread::mutex> guard(entryMutex); tthread::lock_guard<tthread::mutex> guard(entryMutex);
DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str());
while (std::getline(input, line)){ while (std::getline(input, line)){
@ -458,8 +460,9 @@ namespace Mist{
} }
// check for already added segments // check for already added segments
INFO_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); VERYHIGH_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment);
if (bposCounter > lastSegment){ if (bposCounter > lastSegment){
INFO_MSG("Playlist #%u: Adding new segment #%" PRIu64 " to playlist entries", id, bposCounter);
char ivec[16]; char ivec[16];
if (keyIV.size()){ if (keyIV.size()){
parseKey(keyIV, ivec, 16); parseKey(keyIV, ivec, 16);
@ -471,12 +474,12 @@ namespace Mist{
lastSegment = bposCounter; lastSegment = bposCounter;
++count; ++count;
} }
nextUTC = 0;
segDur = 0.0;
startByte = std::string::npos;
lenByte = 0;
++bposCounter;
}// Mutex scope }// Mutex scope
nextUTC = 0;
segDur = 0.0;
startByte = std::string::npos;
lenByte = 0;
++bposCounter;
} }
if (globalWaitTime < waitTime){globalWaitTime = waitTime;} if (globalWaitTime < waitTime){globalWaitTime = waitTime;}
@ -565,7 +568,7 @@ namespace Mist{
/// Constructor of HLS Input /// Constructor of HLS Input
InputHLS::InputHLS(Util::Config *cfg) : Input(cfg){ InputHLS::InputHLS(Util::Config *cfg) : Input(cfg){
zUTC = nUTC = 0; zUTC = 0;
self = this; self = this;
streamIsLive = true; //< default to sliding window playlist streamIsLive = true; //< default to sliding window playlist
streamIsVOD = false; //< default to sliding window playlist streamIsVOD = false; //< default to sliding window playlist
@ -630,7 +633,7 @@ namespace Mist{
return false; return false;
} }
if (!initPlaylist(config->getString("input"), true)){ if (!initPlaylist(config->getString("input"), false)){
Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting");
return false; return false;
} }
@ -643,6 +646,14 @@ namespace Mist{
return true; return true;
} }
void InputHLS::postHeader(){
// Run continuous playlist updaters after the main thread is forked
if (!initPlaylist(config->getString("input"), true)){
Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting");
config->is_active = false;
}
}
bool InputHLS::readExistingHeader(){ bool InputHLS::readExistingHeader(){
if (!Input::readExistingHeader()){ if (!Input::readExistingHeader()){
INFO_MSG("Could not read existing header, regenerating"); INFO_MSG("Could not read existing header, regenerating");
@ -670,18 +681,18 @@ namespace Mist{
HTTP::URL root = HTTP::localURIResolver().link(config->getString("input")); HTTP::URL root = HTTP::localURIResolver().link(config->getString("input"));
jsonForEachConst(M.inputLocalVars["playlistEntries"], i){ jsonForEachConst(M.inputLocalVars["playlistEntries"], i){
uint64_t plNum = JSON::Value(i.key()).asInt(); uint64_t plNum = JSON::Value(i.key()).asInt();
if (M.inputLocalVars["playlistEntries"][i.key()].size() < listEntries[plNum].size()){ if (M.inputLocalVars["playlistEntries"][i.key()].size() > listEntries[plNum].size()){
INFO_MSG("Header needs update as the amount of segments in the playlist has decreased, regenerating header"); INFO_MSG("Header needs update as the amount of segments in the playlist has decreased, regenerating header");
return false; return false;
} }
std::deque<playListEntries> newList; std::deque<playListEntries> newList;
jsonForEachConst(*i, j){ jsonForEachConst(*i, j){
const JSON::Value & thisEntry = *j; const JSON::Value & thisEntry = *j;
if (thisEntry[1u].asInt() < playlistMapping[plNum].firstIndex + 1){ if (thisEntry[1u].asInt() < playlistMapping[plNum]->firstIndex + 1){
INFO_MSG("Skipping segment %lu which is present in the header, but no longer available in the playlist", thisEntry[1u].asInt()); INFO_MSG("Skipping segment %lu which is present in the header, but no longer available in the playlist", thisEntry[1u].asInt());
continue; continue;
} }
if (thisEntry[1u].asInt() > playlistMapping[plNum].firstIndex + listEntries[plNum].size()){ if (thisEntry[1u].asInt() > playlistMapping[plNum]->firstIndex + listEntries[plNum].size()){
INFO_MSG("Header needs update as the segment index has decreased. The stream has likely restarted, regenerating"); INFO_MSG("Header needs update as the segment index has decreased. The stream has likely restarted, regenerating");
return false; return false;
} }
@ -727,12 +738,12 @@ namespace Mist{
uint64_t key = JSON::Value(i.key()).asInt(); uint64_t key = JSON::Value(i.key()).asInt();
uint64_t val = i->asInt(); uint64_t val = i->asInt();
// If there was a jump in MEDIA-SEQUENCE, start from there // If there was a jump in MEDIA-SEQUENCE, start from there
if (val < playlistMapping[key].firstIndex){ if (val < playlistMapping[key]->firstIndex){
INFO_MSG("Detected a jump in MEDIA-SEQUENCE, adjusting segment counter from %lu to %lu", val, playlistMapping[key].firstIndex); INFO_MSG("Detected a jump in MEDIA-SEQUENCE, adjusting segment counter from %lu to %lu", val, playlistMapping[key]->firstIndex);
val = playlistMapping[key].firstIndex; val = playlistMapping[key]->firstIndex;
} }
parsedSegments[key] = val; parsedSegments[key] = val;
playlistMapping[key].lastSegment = val; playlistMapping[key]->lastSegment = val;
INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val); INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val);
} }
@ -770,7 +781,7 @@ namespace Mist{
std::string lastMapName; std::string lastMapName;
uint32_t entId = 0; uint32_t entId = 0;
bool foundAtLeastOnePacket = false; bool foundAtLeastOnePacket = false;
VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first]->firstIndex);
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin(); for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end() && config->is_active; entryIt++){ entryIt != pListIt->second.end() && config->is_active; entryIt++){
@ -778,7 +789,7 @@ namespace Mist{
if (entryIt->mapName != lastMapName){ if (entryIt->mapName != lastMapName){
lastMapName = entryIt->mapName; lastMapName = entryIt->mapName;
segDowner.setInit(playlistMapping[pListIt->first].maps[lastMapName]); segDowner.setInit(playlistMapping[pListIt->first]->maps[lastMapName]);
} }
if (!loadSegment(segDowner, *entryIt)){ if (!loadSegment(segDowner, *entryIt)){
FAIL_MSG("Failed to load segment - skipping to next"); FAIL_MSG("Failed to load segment - skipping to next");
@ -789,26 +800,26 @@ namespace Mist{
DTSC::Packet headerPack; DTSC::Packet headerPack;
while (config->is_active && readNext(segDowner, headerPack, entryIt->bytePos)){ while (config->is_active && readNext(segDowner, headerPack, entryIt->bytePos)){
if (!config->is_active){return false;} if (!config->is_active){return false;}
if (headerPack){ if (!headerPack){
size_t tmpTrackId = headerPack.getTrackId(); continue;
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
segDowner.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
if (!streamIsLive){
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
}
foundAtLeastOnePacket = true;
} }
size_t tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
segDowner.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
if (!streamIsLive){
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
INSANE_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"));
}
foundAtLeastOnePacket = true;
} }
// Finally save the offset as part of the TS segment. This is required for bufferframe // Finally save the offset as part of the TS segment. This is required for bufferframe
// to work correctly, since not every segment might have an UTC timestamp tag // to work correctly, since not every segment might have an UTC timestamp tag
@ -824,7 +835,7 @@ namespace Mist{
} }
// If live, don't actually parse anything. If non-live, we read all the segments // If live, don't actually parse anything. If non-live, we read all the segments
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + (streamIsLive ? 0 : currentSegment); parsedSegments[pListIt->first] = playlistMapping[pListIt->first]->firstIndex + (streamIsLive ? 0 : currentSegment);
// For still-appending streams, only parse the first segment for each playlist // For still-appending streams, only parse the first segment for each playlist
if (streamIsLive && foundAtLeastOnePacket){break;} if (streamIsLive && foundAtLeastOnePacket){break;}
@ -892,7 +903,7 @@ namespace Mist{
/// \param segmentIndex: the index of the segment in the current playlist /// \param segmentIndex: the index of the segment in the current playlist
/// \return True if the segment has been buffered successfully /// \return True if the segment has been buffered successfully
bool InputHLS::parseSegmentAsLive(uint64_t segmentIndex){ bool InputHLS::parseSegmentAsLive(uint64_t segmentIndex){
bool hasOffset = false; allowRemap = true; //< New segment, so allow timestamp remap
uint64_t bufferTime = config->getInteger("pagetimeout"); uint64_t bufferTime = config->getInteger("pagetimeout");
if (config->hasOption("bufferTime")){ if (config->hasOption("bufferTime")){
bufferTime = config->getInteger("bufferTime") / 1000; bufferTime = config->getInteger("bufferTime") / 1000;
@ -904,7 +915,7 @@ namespace Mist{
TS::Stream tsStream; TS::Stream tsStream;
char *data; char *data;
size_t dataLen; size_t dataLen;
// Get the updated list of entries // Get the updated list of entries. Safe access to listEntries is handled at a higher level
std::deque<playListEntries> &curList = listEntries[currentPlaylist]; std::deque<playListEntries> &curList = listEntries[currentPlaylist];
if (curList.size() <= segmentIndex){ if (curList.size() <= segmentIndex){
FAIL_MSG("Tried to load segment with index '%" PRIu64 "', but the playlist only contains '%zu' entries!", segmentIndex, curList.size()); FAIL_MSG("Tried to load segment with index '%" PRIu64 "', but the playlist only contains '%zu' entries!", segmentIndex, curList.size());
@ -913,7 +924,7 @@ namespace Mist{
playListEntries & ntry = curList.at(segmentIndex); playListEntries & ntry = curList.at(segmentIndex);
if (ntry.mapName.size()){ if (ntry.mapName.size()){
segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]); segDowner.setInit(playlistMapping[currentPlaylist]->maps[ntry.mapName]);
} }
if (!loadSegment(segDowner, ntry)){ if (!loadSegment(segDowner, ntry)){
FAIL_MSG("Failed to load segment"); FAIL_MSG("Failed to load segment");
@ -921,42 +932,38 @@ namespace Mist{
} }
DTSC::Packet headerPack; DTSC::Packet headerPack;
while (readNext(segDowner, headerPack, curList.at(segmentIndex).bytePos)){ while (config->is_active && readNext(segDowner, headerPack, curList.at(segmentIndex).bytePos)){
if (headerPack){ if (!headerPack){
size_t tmpTrackId = headerPack.getTrackId(); continue;
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
uint64_t packetTime = headerPack.getTime();
// Set segment offset and save it
if (!hasOffset && curList.at(segmentIndex).mUTC){
hasOffset = true;
DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime;
MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]);
curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist];
}
if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){
hasOffset = true;
packetTime += DVRTimeOffsets[currentPlaylist];
HIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime);
}
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
playlistMapping[currentPlaylist].tracks[idx] = true;
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
if (isInitialRun){
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
}else{
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
}
tsStream.getEarliestPacket(headerPack);
} }
size_t tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
uint64_t packetTime = headerPack.getTime();
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
if (ntry.timeOffset){
packetTime += ntry.timeOffset;
}else{
packetTime = getPacketTime(packetTime, idx, currentPlaylist, ntry.mUTC);
}
// Mark which tracks need to be checked for removing expired metadata
playlistMapping[currentPlaylist]->tracks[idx] = true;
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
VERYHIGH_MSG("Adding packet (%zuB) at timestamp %" PRIu64 " -> %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, headerPack.getTime(), packetTime, packOffset, idx);
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
if (isInitialRun){
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
}else{
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
}
tsStream.getEarliestPacket(headerPack);
} }
return true; return true;
} }
@ -969,8 +976,8 @@ namespace Mist{
return; return;
} }
for (std::map<size_t, bool>::iterator trackIdx = playlistMapping[currentPlaylist].tracks.begin(); for (std::map<size_t, bool>::iterator trackIdx = playlistMapping[currentPlaylist]->tracks.begin();
trackIdx != playlistMapping[currentPlaylist].tracks.end(); trackIdx++){ trackIdx != playlistMapping[currentPlaylist]->tracks.end(); trackIdx++){
// Calc after how many MS segments are no longer part of the buffer window // Calc after how many MS segments are no longer part of the buffer window
uint64_t bufferTime = config->getInteger("pagetimeout"); uint64_t bufferTime = config->getInteger("pagetimeout");
if (config->hasOption("bufferTime")){ if (config->hasOption("bufferTime")){
@ -979,7 +986,7 @@ namespace Mist{
// Remove keys which are not requestable anymore // Remove keys which are not requestable anymore
while (true) { while (true) {
DTSC::Keys keys = M.getKeys(trackIdx->first); DTSC::Keys keys = M.getKeys(trackIdx->first);
// Stop if the earliest key is still in the playlist // Stop if the earliest key is still in the playlist. Safe access to listEntries is handled at a higher level
if (listEntries[currentPlaylist].front().bytePos <= keys.getBpos(keys.getFirstValid())){ if (listEntries[currentPlaylist].front().bytePos <= keys.getBpos(keys.getFirstValid())){
break; break;
} }
@ -997,45 +1004,58 @@ namespace Mist{
void InputHLS::parseLivePoint(){ void InputHLS::parseLivePoint(){
uint64_t maxTime = Util::bootMS() + 500; uint64_t maxTime = Util::bootMS() + 500;
// Update all playlists to make sure listEntries contains all live segments // Block playlist runners from updating listEntries while the main thread is accessing it
for (std::map<uint64_t, Playlist>::iterator pListIt = playlistMapping.begin(); tthread::lock_guard<tthread::mutex> guard(entryMutex);
// Iterate over all playlists, parse new segments as they've appeared in listEntries and remove expired entries in listEntries
for (std::map<uint64_t, Playlist*>::iterator pListIt = playlistMapping.begin();
pListIt != playlistMapping.end(); pListIt++){ pListIt != playlistMapping.end(); pListIt++){
currentPlaylist = pListIt->first; currentPlaylist = pListIt->first;
const uint64_t firstIdx = pListIt->second.firstIndex;
// If the segment counter decreases, reset counters and remove old segments from metadata if (!listEntries[currentPlaylist].size()){
if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){ continue;
WARN_MSG("Segment counter for playlist %lu has decreased to %lu. Exiting to reset stream", currentPlaylist, firstIdx); }
// Remove segments from listEntries if they're no longer requestable
while (listEntries[currentPlaylist].front().bytePos < playlistMapping[currentPlaylist]->firstIndex + 1){
INFO_MSG("Playlist #%" PRIu64 ": Segment #%" PRIu64 " no longer in the input playlist", currentPlaylist, listEntries[currentPlaylist].front().bytePos);
listEntries[currentPlaylist].pop_front();
}
uint64_t firstSegment = listEntries[currentPlaylist].front().bytePos;
uint64_t lastParsedSegment = parsedSegments[currentPlaylist];
uint64_t lastSegment = listEntries[currentPlaylist].back().bytePos;
// Skip ahead if we've missed segments which are no longer in the playlist
if (lastParsedSegment < firstSegment - 1){
WARN_MSG("Playlist #%" PRIu64 ": Skipping from segment #%" PRIu64 " to segment #%" PRIu64 " since we've fallen behind", currentPlaylist, lastParsedSegment, firstSegment);
parsedSegments[currentPlaylist] = firstSegment - 1;
lastParsedSegment = parsedSegments[currentPlaylist];
}
// If the segment counter decreases, restart to reinit counters and metadata
if (lastParsedSegment > lastSegment){
WARN_MSG("Playlist #%" PRIu64 ": Segment counter has decreased from %" PRIu64 " to %" PRIu64 ". Exiting to reset stream", currentPlaylist, lastParsedSegment, firstSegment);
config->is_active = false; config->is_active = false;
Util::logExitReason(ER_FORMAT_SPECIFIC, "Segment counter decreased. Exiting to reset stream"); Util::logExitReason(ER_FORMAT_SPECIFIC, "Segment counter decreased. Exiting to reset stream");
return; return;
} }
// Remove segments from listEntries as soon as it is no longer requestable
while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){
INFO_MSG("Segment #%" PRIu64 " no longer in the input playlist", listEntries[currentPlaylist].front().bytePos);
listEntries[currentPlaylist].pop_front();
}
// Unload memory pages which are outside of the buffer window and not recently loaded // Unload memory pages which are outside of the buffer window and not recently loaded
removeUnused(); removeUnused();
// Remove meta info for expired keys // Remove meta info for expired keys
updateMeta(); updateMeta();
// Check for new segments // Parse new segments in listEntries
if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){ if (lastParsedSegment < lastSegment){
INFO_MSG("Playlist #%lu has parsed %" PRId64 "/%zu entries. Parsing new segments...", currentPlaylist, (int64_t)(parsedSegments[currentPlaylist] - firstIdx), listEntries[currentPlaylist].size()); INFO_MSG("Playlist #%lu: Parsed %" PRIu64 "/%" PRIu64 " entries. Parsing new segments...", currentPlaylist, lastParsedSegment, lastSegment);
}else if (isInitialRun){ }else if (isInitialRun){
isInitialRun = false; isInitialRun = false;
} }
if (parsedSegments[currentPlaylist] < firstIdx){ for(uint64_t entryIt = 1 + lastParsedSegment - firstSegment; entryIt < listEntries[currentPlaylist].size(); entryIt++){
WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx + listEntries[currentPlaylist].size() - 1); INFO_MSG("Playlist #%lu: Parsing segment #%" PRIu64 " as live data", currentPlaylist, firstSegment + entryIt);
parsedSegments[currentPlaylist] = firstIdx + listEntries[currentPlaylist].size() - 1; if (parseSegmentAsLive(entryIt)){parsedSegments[currentPlaylist] = firstSegment + entryIt;}
} // Rotate between playlists if there are lots of entries to parse
for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){
INFO_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1);
if (parseSegmentAsLive(entryIt)){parsedSegments[currentPlaylist] = firstIdx + entryIt + 1;}
if (Util::bootMS() > maxTime){break;} if (Util::bootMS() > maxTime){break;}
} }
} }
@ -1045,7 +1065,6 @@ namespace Mist{
void InputHLS::userLeadOut(){ void InputHLS::userLeadOut(){
Input::userLeadOut(); Input::userLeadOut();
if (streamIsLive){ if (streamIsLive){
tthread::lock_guard<tthread::mutex> guard(entryMutex);
parseLivePoint(); parseLivePoint();
} }
} }
@ -1053,66 +1072,43 @@ namespace Mist{
bool InputHLS::openStreamSource(){return true;} bool InputHLS::openStreamSource(){return true;}
void InputHLS::getNext(size_t idx){ void InputHLS::getNext(size_t idx){
INSANE_MSG("Getting next");
uint32_t tid = 0; uint32_t tid = 0;
thisPacket.null(); thisPacket.null();
uint64_t segIdx = listEntries[currentPlaylist].at(currentIndex).bytePos;
while (config->is_active && (needsLock() || keepAlive())){ while (config->is_active && (needsLock() || keepAlive())){
// Check if we have a packet // Check if we have a packet
if (readNext(segDowner, thisPacket, listEntries[currentPlaylist].at(currentIndex).bytePos)){ if (readNext(segDowner, thisPacket, segIdx)){
if (thisPacket){ if (!thisPacket){continue;}
tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId());
// Is it one we want? uint64_t packetTime = thisPacket.getTime();
if (idx == INVALID_TRACK_ID || getMappedTrackId(M.getID(idx)) == thisPacket.getTrackId()){ if (listEntries[currentPlaylist].at(currentIndex).timeOffset){
uint64_t packetTime = thisPacket.getTime(); packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset;
if (listEntries[currentPlaylist].at(currentIndex).timeOffset){ }else{
packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset; packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, listEntries[currentPlaylist].at(currentIndex).mUTC);
}else{ }
packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); // Is it one we want?
} if (idx == INVALID_TRACK_ID || getMappedTrackId(M.getID(idx)) == thisPacket.getTrackId()){
INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime);
// overwrite trackId on success // overwrite trackId on success
Bit::htobl(thisPacket.getData() + 8, tid); Bit::htobl(thisPacket.getData() + 8, tid);
Bit::htobll(thisPacket.getData() + 12, packetTime); Bit::htobll(thisPacket.getData() + 12, packetTime);
thisTime = packetTime; thisTime = packetTime;
thisIdx = tid; return; // Success!
return; // Success!
}
} }
continue; continue;
} }
// No? Then we want to try reading the next file. // No? Then we want to try reading the next file.
// No segments? Wait until next playlist reloading time.
if (idx != INVALID_TRACK_ID){
currentPlaylist = getMappedTrackPlaylist(M.getID(idx));
}else{
currentPlaylist = firstSegment();
}
if (currentPlaylist == 0){
INFO_MSG("Waiting for segments...");
Util::wait(500);
continue;
}
// Now that we know our playlist is up-to-date, actually try to read the file. // Now that we know our playlist is up-to-date, actually try to read the file.
VERYHIGH_MSG("Moving on to next TS segment (variant %" PRIu64 ")", currentPlaylist); VERYHIGH_MSG("Moving on to next TS segment (variant %" PRIu64 ")", currentPlaylist);
if (readNextFile()){ if (readNextFile()){
allowRemap = true;
MEDIUM_MSG("Next segment read successfully"); MEDIUM_MSG("Next segment read successfully");
segIdx = listEntries[currentPlaylist].at(currentIndex).bytePos;
continue; // Success! Continue regular parsing. continue; // Success! Continue regular parsing.
}else{
if (userSelect.size() > 1){
// failed to read segment for playlist, dropping it
WARN_MSG("Dropping variant %" PRIu64 " because we couldn't read anything from it", currentPlaylist);
tthread::lock_guard<tthread::mutex> guard(entryMutex);
listEntries.erase(currentPlaylist);
if (listEntries.size()){continue;}
}
} }
// Nothing works! // Reached the end of the playlist
// HLS input will now quit trying to prevent severe mental depression.
Util::logExitReason(ER_CLEAN_EOF, "No packets can be read - exhausted all playlists");
thisPacket.null(); thisPacket.null();
return; return;
} }
@ -1124,8 +1120,7 @@ namespace Mist{
plsLastTime.clear(); plsLastTime.clear();
plsInterval.clear(); plsInterval.clear();
segDowner.reset(); segDowner.reset();
uint64_t trackId = M.getID(idx); currentPlaylist = getMappedTrackPlaylist(M.getID(idx));
currentPlaylist = getMappedTrackPlaylist(trackId);
unsigned long plistEntry = 0; unsigned long plistEntry = 0;
DTSC::Keys keys = M.getKeys(idx); DTSC::Keys keys = M.getKeys(idx);
@ -1135,11 +1130,11 @@ namespace Mist{
break; break;
} }
// Keys can still be accessible in memory. Skip any segments we cannot seek to in the playlist // Keys can still be accessible in memory. Skip any segments we cannot seek to in the playlist
if (keys.getBpos(i) <= playlistMapping[currentPlaylist].firstIndex){ if (keys.getBpos(i) <= playlistMapping[currentPlaylist]->firstIndex){
INSANE_MSG("Skipping segment #%lu (key %lu @ %lu ms) for seeking, as it is no longer available in the playlist", keys.getBpos(i) - 1, i, keys.getTime(i)); INSANE_MSG("Skipping segment #%lu (key %lu @ %lu ms) for seeking, as it is no longer available in the playlist", keys.getBpos(i) - 1, i, keys.getTime(i));
continue; continue;
} }
plistEntry = keys.getBpos(i) - 1 - playlistMapping[currentPlaylist].firstIndex; plistEntry = keys.getBpos(i) - 1 - playlistMapping[currentPlaylist]->firstIndex;
INSANE_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), plistEntry); INSANE_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), plistEntry);
} }
currentIndex = plistEntry; currentIndex = plistEntry;
@ -1160,14 +1155,16 @@ namespace Mist{
} }
playListEntries & e = curPlaylist.at(currentIndex); playListEntries & e = curPlaylist.at(currentIndex);
if (e.mapName.size()){ if (e.mapName.size()){
segDowner.setInit(playlistMapping[currentPlaylist].maps[e.mapName]); segDowner.setInit(playlistMapping[currentPlaylist]->maps[e.mapName]);
} }
loadSegment(segDowner, e); loadSegment(segDowner, e);
// If we have an offset, load it
allowRemap = false;
if (e.timeOffset){ if (e.timeOffset){
// If we have an offset, load it
allowRemap = false;
HIGH_MSG("Setting time offset of this TS segment to %" PRId64, e.timeOffset); HIGH_MSG("Setting time offset of this TS segment to %" PRId64, e.timeOffset);
plsTimeOffset[currentPlaylist] = e.timeOffset; plsTimeOffset[currentPlaylist] = e.timeOffset;
}else{
allowRemap = true;
} }
} }
} }
@ -1426,7 +1423,6 @@ namespace Mist{
size_t pos = line.find(":"); size_t pos = line.find(":");
std::string val = line.c_str() + pos + 1; std::string val = line.c_str() + pos + 1;
zUTC = ISO8601toUnixmillis(val) - uint64_t(timestampSum); zUTC = ISO8601toUnixmillis(val) - uint64_t(timestampSum);
nUTC = zUTC;
INFO_MSG("Setting program unix start time to '%s' (%" PRIu64 ")", line.substr(pos + 1).c_str(), zUTC); INFO_MSG("Setting program unix start time to '%s' (%" PRIu64 ")", line.substr(pos + 1).c_str(), zUTC);
// store offset so that we can set it after reading the header // store offset so that we can set it after reading the header
streamOffset = zUTC - (Util::unixMS() - Util::bootMS()); streamOffset = zUTC - (Util::unixMS() - Util::bootMS());
@ -1486,7 +1482,6 @@ namespace Mist{
bool InputHLS::readNextFile(){ bool InputHLS::readNextFile(){
segDowner.reset(); segDowner.reset();
playListEntries ntry;
// This scope limiter prevents the recursion down below from deadlocking us // This scope limiter prevents the recursion down below from deadlocking us
{ {
// Switch to next file // Switch to next file
@ -1495,39 +1490,28 @@ namespace Mist{
std::deque<playListEntries> &curList = listEntries[currentPlaylist]; std::deque<playListEntries> &curList = listEntries[currentPlaylist];
HIGH_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); HIGH_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist);
if (curList.size() <= currentIndex){ if (curList.size() <= currentIndex){
if (streamIsLive){
INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist);
if (Util::bootSecs() < ntry.timestamp){
VERYHIGH_MSG("Slowing down to realtime...");
while (Util::bootSecs() < ntry.timestamp){
keepAlive();
Util::wait(250);
}
}
}else{
INFO_MSG("Reached last entry in playlist %" PRIu64, currentPlaylist);
}
return false; return false;
} }
ntry = curList[currentIndex]; playListEntries & ntry = curList.at(currentIndex);
}
if (ntry.mapName.size()){ if (ntry.mapName.size()){
segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]); segDowner.setInit(playlistMapping[currentPlaylist]->maps[ntry.mapName]);
}
if (!loadSegment(segDowner, ntry)){
ERROR_MSG("Could not download segment: %s", ntry.filename.c_str());
return false;
}
// If we have an offset, load it
if (ntry.timeOffset){
allowRemap = false;
plsTimeOffset[currentPlaylist] = ntry.timeOffset;
// Else allow of the offset to be set by getPacketTime
}else{
allowRemap = true;
}
return true;
} }
if (!loadSegment(segDowner, ntry)){ return false;
ERROR_MSG("Could not download segment: %s", ntry.filename.c_str());
return readNextFile(); // Attempt to read another, if possible.
}
allowRemap = false;
// If we have an offset, load it
if (ntry.timeOffset){
plsTimeOffset[currentPlaylist] = ntry.timeOffset;
// Else allow of the offset to be set by getPacketTime
}else{
nUTC = ntry.mUTC;
}
return true;
} }
/// return the playlist id from which we need to read the first upcoming segment /// return the playlist id from which we need to read the first upcoming segment

View file

@ -107,7 +107,6 @@ namespace Mist{
protected: protected:
uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis
uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis
int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header
unsigned int startTime; unsigned int startTime;
SegmentReader segDowner; SegmentReader segDowner;
@ -120,7 +119,6 @@ namespace Mist{
std::map<uint64_t, uint64_t> pidMapping; std::map<uint64_t, uint64_t> pidMapping;
std::map<uint64_t, uint64_t> pidMappingR; std::map<uint64_t, uint64_t> pidMappingR;
std::map<int, int64_t> plsTimeOffset; std::map<int, int64_t> plsTimeOffset;
std::map<int, int64_t> DVRTimeOffsets;
std::map<int, uint64_t> plsLastTime; std::map<int, uint64_t> plsLastTime;
std::map<int, uint64_t> plsInterval; std::map<int, uint64_t> plsInterval;
@ -146,6 +144,7 @@ namespace Mist{
bool preSetup(); bool preSetup();
bool readHeader(); bool readHeader();
bool readExistingHeader(); bool readExistingHeader();
void postHeader();
void getNext(size_t idx = INVALID_TRACK_ID); void getNext(size_t idx = INVALID_TRACK_ID);
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);