MistInHLS improvements and speedups, part 4/2

This commit is contained in:
Thulinma 2023-11-13 14:13:05 +01:00
parent 382e1eec03
commit b9819eb40f
4 changed files with 101 additions and 45 deletions

View file

@ -1036,6 +1036,9 @@ namespace DTSC{
setID(tIdx, trak.getMember("trackid").asInt()); setID(tIdx, trak.getMember("trackid").asInt());
setFirstms(tIdx, trak.getMember("firstms").asInt()); setFirstms(tIdx, trak.getMember("firstms").asInt());
setLastms(tIdx, trak.getMember("lastms").asInt()); setLastms(tIdx, trak.getMember("lastms").asInt());
if (trak.hasMember("nowms")){
setNowms(tIdx, trak.getMember("nowms").asInt());
}
setBps(tIdx, trak.getMember("bps").asInt()); setBps(tIdx, trak.getMember("bps").asInt());
setMaxBps(tIdx, trak.getMember("maxbps").asInt()); setMaxBps(tIdx, trak.getMember("maxbps").asInt());
setSourceTrack(tIdx, INVALID_TRACK_ID); setSourceTrack(tIdx, INVALID_TRACK_ID);
@ -1774,6 +1777,7 @@ namespace DTSC{
t.track.addField("codec", RAX_STRING, 8); t.track.addField("codec", RAX_STRING, 8);
t.track.addField("firstms", RAX_64UINT); t.track.addField("firstms", RAX_64UINT);
t.track.addField("lastms", RAX_64UINT); t.track.addField("lastms", RAX_64UINT);
t.track.addField("nowms", RAX_64UINT);
t.track.addField("bps", RAX_32UINT); t.track.addField("bps", RAX_32UINT);
t.track.addField("maxbps", RAX_32UINT); t.track.addField("maxbps", RAX_32UINT);
t.track.addField("lang", RAX_STRING, 4); t.track.addField("lang", RAX_STRING, 4);
@ -2014,6 +2018,9 @@ namespace DTSC{
void Meta::setLastms(size_t trackIdx, uint64_t lastms){ void Meta::setLastms(size_t trackIdx, uint64_t lastms){
DTSC::Track &t = tracks.at(trackIdx); DTSC::Track &t = tracks.at(trackIdx);
t.track.setInt(t.trackLastmsField, lastms); t.track.setInt(t.trackLastmsField, lastms);
if (t.trackNowmsField && t.track.getInt(t.trackNowmsField) < lastms){
t.track.setInt(t.trackNowmsField, lastms);
}
} }
uint64_t Meta::getLastms(size_t trackIdx) const{ uint64_t Meta::getLastms(size_t trackIdx) const{
const DTSC::Track &t = tracks.find(trackIdx)->second; const DTSC::Track &t = tracks.find(trackIdx)->second;
@ -2539,6 +2546,7 @@ namespace DTSC{
t.fragments.setInt(t.fragmentSizeField, t.fragments.setInt(t.fragmentSizeField,
t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum); t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum);
t.track.setInt(t.trackLastmsField, packTime); t.track.setInt(t.trackLastmsField, packTime);
t.track.setInt(t.trackNowmsField, packTime);
markUpdated(tNumber); markUpdated(tNumber);
} }
@ -3202,7 +3210,7 @@ namespace DTSC{
const Util::RelAccX &keys = trk.keys; const Util::RelAccX &keys = trk.keys;
const Util::RelAccX &parts = trk.parts; const Util::RelAccX &parts = trk.parts;
if (!keys.getEndPos()){return INVALID_KEY_NUM;} if (!keys.getEndPos()){return INVALID_KEY_NUM;}
size_t res = keys.getStartPos(); size_t res = keys.getDeleted();
for (size_t i = res; i < keys.getEndPos(); i++){ for (size_t i = res; i < keys.getEndPos(); i++){
if (keys.getInt(trk.keyTimeField, i) > time){ if (keys.getInt(trk.keyTimeField, i) > time){
//It's possible we overshot our timestamp, but the previous key does not contain it. //It's possible we overshot our timestamp, but the previous key does not contain it.

View file

@ -192,6 +192,7 @@ namespace Mist{
Playlist::Playlist(const std::string &uriSource){ Playlist::Playlist(const std::string &uriSource){
nextUTC = 0; nextUTC = 0;
oUTC = 0;
id = 0; // to be set later id = 0; // to be set later
//If this is the copy constructor, just be silent. //If this is the copy constructor, just be silent.
std::string uriSrc; std::string uriSrc;
@ -521,7 +522,6 @@ namespace Mist{
DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str());
while (std::getline(input, line)){ while (std::getline(input, line)){
if (input.eof()){break;} // Skip last line, might be incomplete
DONTEVEN_MSG("Parsing line '%s'", line.c_str()); DONTEVEN_MSG("Parsing line '%s'", line.c_str());
cleanLine(line); cleanLine(line);
if (line.empty()){continue;}// skip empty lines if (line.empty()){continue;}// skip empty lines
@ -579,19 +579,23 @@ namespace Mist{
if (val == "VOD"){ if (val == "VOD"){
streamIsVOD = true; streamIsVOD = true;
streamIsLive = false; streamIsLive = false;
INFO_MSG("SIL=F");
}else if (val == "LIVE"){ }else if (val == "LIVE"){
streamIsVOD = false; streamIsVOD = false;
streamIsLive = true; streamIsLive = true;
INFO_MSG("SIL=T");
}else if (val == "EVENT"){ }else if (val == "EVENT"){
streamIsVOD = true; streamIsVOD = true;
streamIsLive = true; streamIsLive = true;
INFO_MSG("SIL=T");
} }
} }
// Once we see this tag, the entire playlist becomes VOD // Once we see this tag, the entire playlist becomes VOD
if (key == "ENDLIST"){ if (key == "ENDLIST"){
streamIsVOD = true;
streamIsLive = false; streamIsLive = false;
INFO_MSG("SIL=F");
streamIsVOD = true;
} }
continue; continue;
} }
@ -667,7 +671,28 @@ namespace Mist{
if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){ if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){
DTSC::veryUglyJitterOverride = entry.duration * 1000; DTSC::veryUglyJitterOverride = entry.duration * 1000;
} }
{
tthread::lock_guard<tthread::mutex> guard(entryMutex);
if (id && listEntries[id].size()){
// If the UTC has gone backwards, shift forward.
playListEntries & prev = listEntries[id].back();
if (nextUTC && prev.mUTC && nextUTC < prev.mUTC + prev.duration * 1000){
WARN_MSG("UTC time went from %s to %s; adjusting!", Util::getUTCStringMillis(prev.mUTC + prev.duration * 1000).c_str(), Util::getUTCStringMillis(nextUTC).c_str());
// Reset UTC time, this will cause the next check to set it correctly
nextUTC = 0;
}
// If we ever had a UTC time, ensure it's set for all segments going forward
if (!nextUTC && prev.mUTC){
nextUTC = prev.mUTC + (uint64_t)(prev.duration * 1000);
}
}
}
entry.mUTC = nextUTC; entry.mUTC = nextUTC;
if (nextUTC && !oUTC){
oUTC = nextUTC - (lastTimestamp + startTime);
}
if (key.size() && iv.size()){ if (key.size() && iv.size()){
memcpy(entry.ivec, iv.data(), 16); memcpy(entry.ivec, iv.data(), 16);
@ -677,15 +702,13 @@ namespace Mist{
memset(entry.keyAES, 0, 16); memset(entry.keyAES, 0, 16);
} }
if (!isUrl()){ // Base timestamp of entry on UTC time if we have it, otherwise on a simple addition of the previous duration
std::ifstream fileSource; if (nextUTC && oUTC){
std::string test = root.link(entry.filename).getFilePath(); entry.timestamp = nextUTC - oUTC;
fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); }else{
if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));} entry.timestamp = lastTimestamp + startTime;
} }
lastTimestamp = entry.timestamp - startTime + duration;
entry.timestamp = lastTimestamp + startTime;
lastTimestamp += duration;
{ {
tthread::lock_guard<tthread::mutex> guard(entryMutex); tthread::lock_guard<tthread::mutex> guard(entryMutex);
// Set a playlist ID if we haven't assigned one yet. // Set a playlist ID if we haven't assigned one yet.
@ -775,13 +798,9 @@ namespace Mist{
} }
// Segments can be added (and removed if VOD is false) // Segments can be added (and removed if VOD is false)
if (streamIsLive){ meta.setLive(streamIsLive);
meta.setLive(true);
}
// Segments can not be removed // Segments can not be removed
if (streamIsVOD){ meta.setVod(streamIsVOD);
meta.setVod(true);
}
return true; return true;
} }
@ -906,7 +925,7 @@ namespace Mist{
tsStream.clear(); tsStream.clear();
uint32_t entId = 0; uint32_t entId = 0;
bool foundAtLeastOnePacket = false; bool foundAtLeastOnePacket = false;
INFO_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex);
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin(); for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end() && config->is_active; entryIt++){ entryIt != pListIt->second.end() && config->is_active; entryIt++){
@ -934,13 +953,15 @@ namespace Mist{
tsStream.initializeMetadata(meta, tmpTrackId, packetId); tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid()); idx = M.trackIDToIndex(packetId, getpid());
} }
headerPack.getString("data", data, dataLen); if (!streamIsLive){
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata. // keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
}
tsStream.getEarliestPacket(headerPack); tsStream.getEarliestPacket(headerPack);
foundAtLeastOnePacket = true; foundAtLeastOnePacket = true;
} }
@ -965,12 +986,14 @@ namespace Mist{
idx = M.trackIDToIndex(packetId, getpid()); idx = M.trackIDToIndex(packetId, getpid());
} }
headerPack.getString("data", data, dataLen); if (!streamIsLive){
// keyframe data exists, so always add 19 bytes keyframedata. headerPack.getString("data", data, dataLen);
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; // keyframe data exists, so always add 19 bytes keyframedata.
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
}
tsStream.getEarliestPacket(headerPack); tsStream.getEarliestPacket(headerPack);
} }
// Finally save the offset as part of the TS segment. This is required for bufferframe // Finally save the offset as part of the TS segment. This is required for bufferframe
@ -988,11 +1011,18 @@ namespace Mist{
streamStatus.mapped[1] = (255 * currentSegment) / totalSegments; streamStatus.mapped[1] = (255 * currentSegment) / totalSegments;
} }
// Init segment counters to what was set to MEDIA-SEQUENCE // If live, don't actually parse anything.
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment; // If non-live, we read all the segments
if (streamIsLive){
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex;
}else{
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment;
}
// For non-vod, only parse the first segment for each playlist // For still-appending streams, only parse the first segment for each playlist
if (!streamIsVOD && foundAtLeastOnePacket){break;} if (streamIsLive && foundAtLeastOnePacket){
break;
}
} }
} }
if (!config->is_active){return false;} if (!config->is_active){return false;}
@ -1088,7 +1118,7 @@ namespace Mist{
if (!hasOffset && curList.at(segmentIndex).mUTC){ if (!hasOffset && curList.at(segmentIndex).mUTC){
hasOffset = true; hasOffset = true;
DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime;
MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); INFO_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]);
curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist];
} }
if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){
@ -1642,7 +1672,7 @@ namespace Mist{
if (codecSupported){ if (codecSupported){
ret = readPlaylist(playlistRootPath.link(line), line, fullInit); ret |= readPlaylist(playlistRootPath.link(line), line, fullInit);
}else{ }else{
INFO_MSG("skipping variant playlist %s, none of the codecs are supported", INFO_MSG("skipping variant playlist %s, none of the codecs are supported",
playlistRootPath.link(line).getUrl().c_str()); playlistRootPath.link(line).getUrl().c_str());
@ -1658,7 +1688,7 @@ namespace Mist{
int pos = line.find("URI"); int pos = line.find("URI");
if (pos != std::string::npos){ if (pos != std::string::npos){
mediafile = line.substr(pos + 5, line.length() - pos - 6); mediafile = line.substr(pos + 5, line.length() - pos - 6);
ret = readPlaylist(playlistRootPath.link(mediafile), mediafile, fullInit); ret |= readPlaylist(playlistRootPath.link(mediafile), mediafile, fullInit);
} }
} }
@ -1686,7 +1716,7 @@ namespace Mist{
} }
if (isRegularPls){ if (isRegularPls){
ret = readPlaylist(playlistRootPath.getUrl(), "", fullInit); ret |= readPlaylist(playlistRootPath.getUrl(), "", fullInit);
} }
if (!isUrl){fileSource.close();} if (!isUrl){fileSource.close();}
@ -1719,7 +1749,7 @@ namespace Mist{
else{ else{
urlBuffer = (fullInit ? "" : ";") + uri.getUrl() + "\n" + relurl; urlBuffer = (fullInit ? "" : ";") + uri.getUrl() + "\n" + relurl;
} }
INFO_MSG("Adding playlist(s): %s", urlBuffer.c_str()); VERYHIGH_MSG("Adding playlist(s): %s", urlBuffer.c_str());
tthread::thread runList(playlistRunner, (void *)urlBuffer.data()); tthread::thread runList(playlistRunner, (void *)urlBuffer.data());
runList.detach(); // Abandon the thread, it's now running independently runList.detach(); // Abandon the thread, it's now running independently
uint32_t timeout = 0; uint32_t timeout = 0;
@ -1740,7 +1770,7 @@ namespace Mist{
currentIndex++; currentIndex++;
tthread::lock_guard<tthread::mutex> guard(entryMutex); tthread::lock_guard<tthread::mutex> guard(entryMutex);
std::deque<playListEntries> &curList = listEntries[currentPlaylist]; std::deque<playListEntries> &curList = listEntries[currentPlaylist];
INFO_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); HIGH_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist);
if (curList.size() <= currentIndex){ if (curList.size() <= currentIndex){
if (streamIsLive){ if (streamIsLive){
INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist); INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist);
@ -1801,7 +1831,7 @@ namespace Mist{
} }
void inputHLS::finish(){ void inputHLS::finish(){
if (!streamIsVOD){ //< Already generated from readHeader if (streamIsLive){ //< Already generated from readHeader
INFO_MSG("Writing updated header to disk"); INFO_MSG("Writing updated header to disk");
injectLocalVars(); injectLocalVars();
M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl()); M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl());

View file

@ -26,9 +26,9 @@ namespace Mist{
std::string relative_filename; std::string relative_filename;
uint64_t bytePos; uint64_t bytePos;
uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known
float duration; float duration; ///< Duration of entry in seconds
uint64_t timestamp; uint64_t timestamp; ///< zUTC-based timestamp for this entry
int64_t timeOffset; int64_t timeOffset; ///< Value timestamps in the media are shifted by to get zUTC-based timestamps
uint64_t wait; uint64_t wait;
char ivec[16]; char ivec[16];
char keyAES[16]; char keyAES[16];
@ -96,6 +96,7 @@ namespace Mist{
uint64_t waitTime; uint64_t waitTime;
uint64_t lastTimestamp; uint64_t lastTimestamp;
uint64_t startTime; uint64_t startTime;
int64_t oUTC;
uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist
char keyAES[16]; char keyAES[16];
std::map<std::string, std::string> keys; std::map<std::string, std::string> keys;

View file

@ -896,6 +896,23 @@ namespace Mist{
while (tmp.time < pos && tmpPack){ while (tmp.time < pos && tmpPack){
tmp.offset += tmpPack.getDataLen(); tmp.offset += tmpPack.getDataLen();
tmpPack.reInit(mpd + tmp.offset, 0, true); tmpPack.reInit(mpd + tmp.offset, 0, true);
if (!tmpPack){
nowMs = M.getNowms(tid);
if (M.getLastms(tid) <= tmp.time && nowMs > tmp.time){
// Okay, we're awaiting more data, let's insert a ghost packet instead.
break;
}
uint64_t timeOut = Util::bootMS() + 10000;
while (Util::bootMS() < timeOut && !tmpPack){
Util::sleep(50);
tmpPack.reInit(mpd + tmp.offset, 0, true);
}
if (!tmpPack){
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: timeout", pos, tid);
userSelect.erase(tid);
return false;
}
}
tmp.time = tmpPack.getTime(); tmp.time = tmpPack.getTime();
} }
if (tmpPack){ if (tmpPack){