Several fixes:

- Fixed bug in stream health function causing loop if track not active
- Fixed DTSC pulls ignoring data before the live point
- Improved async buffers (deque mode) to spread the tracks more fairly
- DTSC pull now implements "ping" and "error" commands
- DTSC pulls report suspicious keyframe intervals to the origin and ask for confirmation
- DTSC output now accepts these reports and disconnects if there is no match in keyframe intervals
- Outputs in async mode now keep the seek point in all tracks when reselecting
- Outputs in async mode now default to a starting position in each track that is at a keyframe roughly halfway in the buffer
- Outputs in async mode now ignore playback rate (always fastest possible)
- Removed code duplication in prepareNext function
- Reordered the prepareNext function somewhat to be easier to follow for humans
- DTSC output no longer overrides initialSeek function, now uses default implementation
- Sanitycheck output now supports both sync and async modes, supports printing multiple timestamps for multiple tracks
This commit is contained in:
Thulinma 2022-01-06 12:52:47 +01:00
parent b89875ea37
commit f560b88bfe
9 changed files with 257 additions and 222 deletions

View file

@ -463,7 +463,7 @@ namespace Mist{
meta.reloadReplacedPagesIfNeeded();
bool autoSeek = buffer.size();
uint64_t seekTarget = currentTime();
uint64_t seekTarget = buffer.getSyncMode()?currentTime():0;
std::set<size_t> newSelects =
Util::wouldSelect(M, targetParams, capa, UA, autoSeek ? seekTarget : 0);
@ -482,9 +482,10 @@ namespace Mist{
}
std::set<size_t> oldSelects;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); ++it){
oldSelects.insert(it->first);
}
buffer.getTrackList(oldSelects);
std::map<size_t, uint64_t> seekTargets;
buffer.getTrackList(seekTargets);
//No changes? Abort and return false;
if (oldSelects == newSelects){return false;}
@ -510,6 +511,7 @@ namespace Mist{
WARN_MSG("Could not select track %zu, dropping track", *it);
newSelects.erase(*it);
userSelect.erase(*it);
continue;
}
}
@ -521,10 +523,16 @@ namespace Mist{
//After attempting to add/remove tracks, now no changes? Abort and return false;
if (oldSelects == newSelects){return false;}
if (autoSeek){
INFO_MSG("Automatically seeking to position %" PRIu64 " to resume playback", seekTarget);
seek(seekTarget);
buffer.clear();
INFO_MSG("Automatically seeking to resume playback");
for (std::set<size_t>::iterator it = newSelects.begin(); it != newSelects.end(); it++){
if (seekTargets.count(*it)){
seek(*it, seekTargets[*it], false);
}else{
seek(*it, 0, false);
}
}
}
return true;
}
@ -793,6 +801,11 @@ namespace Mist{
return false;
}
DTSC::Keys keys(M.keys(tid));
if (M.getLive() && !pos && !buffer.getSyncMode()){
uint64_t tmpTime = (M.getFirstms(tid) + M.getLastms(tid))/2;
uint32_t tmpKey = M.getKeyNumForTime(tid, tmpTime);
pos = keys.getTime(tmpKey);
}
uint32_t keyNum = M.getKeyNumForTime(tid, pos);
if (keyNum == INVALID_KEY_NUM){
FAIL_MSG("Attempted seek on empty track %zu", tid);
@ -827,8 +840,7 @@ namespace Mist{
tmp.time = tmpPack.getTime();
}
if (tmpPack){
HIGH_MSG("Sought to time %" PRIu64 " (yields a packet at %" PRIu64 "ms) in %s@%zu", tmp.time,
tmpPack.getTime(), streamName.c_str(), tid);
HIGH_MSG("Sought to time %" PRIu64 " in %s", tmp.time, curPage[tid].name.c_str());
tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid);
buffer.insert(tmp);
return true;
@ -862,7 +874,7 @@ namespace Mist{
void Output::initialSeek(){
if (!meta){return;}
uint64_t seekPos = 0;
if (meta.getLive()){
if (meta.getLive() && buffer.getSyncMode()){
size_t mainTrack = getMainSelectedTrack();
if (mainTrack == INVALID_TRACK_ID){return;}
DTSC::Keys keys(M.keys(mainTrack));
@ -1208,7 +1220,7 @@ namespace Mist{
/// Waits for the given amount of millis, increasing the realtime playback
/// related times as needed to keep smooth playback intact.
void Output::playbackSleep(uint64_t millis){
if (realTime && M.getLive()){
if (realTime && M.getLive() && buffer.getSyncMode()){
firstTime += millis;
extraKeepAway += millis;
}
@ -1321,7 +1333,7 @@ namespace Mist{
if (firstPacketTime == 0xFFFFFFFFFFFFFFFFull){firstPacketTime = lastPacketTime;}
// slow down processing, if real time speed is wanted
if (realTime){
if (realTime && buffer.getSyncMode()){
uint8_t i = 6;
while (--i && thisPacket.getTime() > (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead) &&
keepGoing()){
@ -1476,9 +1488,14 @@ namespace Mist{
// depending on whether this is probably bad and the current debug level, print a message
size_t printLevel = (probablyBad ? DLVL_WARN : DLVL_INFO);
const Comms::Users &usr = userSelect.at(trackId);
DEBUG_MSG(printLevel, "Dropping %s track %zu@k%zu (nextP=%" PRIu64 ", lastP=%" PRIu64 "): %s",
meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1,
pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str());
if (!usr){
DEBUG_MSG(printLevel, "Dropping %s track %zu (lastP=%" PRIu64 "): %s",
meta.getCodec(trackId).c_str(), trackId, pageNumMax(trackId), reason.c_str());
}else{
DEBUG_MSG(printLevel, "Dropping %s track %zu@k%zu (nextP=%" PRIu64 ", lastP=%" PRIu64 "): %s",
meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1,
pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str());
}
// now actually drop the track from the buffer
buffer.dropTrack(trackId);
userSelect.erase(trackId);
@ -1576,42 +1593,12 @@ namespace Mist{
return false;
}
Util::sortedPageInfo nxt = *(buffer.begin());
Util::sortedPageInfo nxt;
if (meta.reloadReplacedPagesIfNeeded()){return false;}
if (!M.getValidTracks().count(nxt.tid)){
dropTrack(nxt.tid, "disappeared from metadata");
return false;
}
// if we're going to read past the end of the data page, load the next page
// this only happens for VoD
if (nxt.offset >= curPage[nxt.tid].len ||
(!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){
if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){
dropTrack(nxt.tid, "end of non-live track reached", false);
return false;
}
if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){
loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time));
nxt.offset = 0;
//Only read the next time if the page load succeeded and there is a packet to read from
if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0);
}
buffer.replaceFirst(nxt);
return false;
}
dropTrack(nxt.tid, "VoD page load failure");
return false;
}
// We know this packet will be valid, pre-load it so we know its length
DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true);
uint64_t nextTime = 0;
uint64_t nextTime;
size_t trackTries = 0;
//In case we're not in sync mode, we might have to retry a few times
for (size_t trackTries = 0; trackTries < buffer.size(); ++trackTries){
for (; trackTries < buffer.size(); ++trackTries){
nxt = *(buffer.begin());
@ -1639,8 +1626,13 @@ namespace Mist{
buffer.replaceFirst(nxt);
return false;
}
INFO_MSG("Invalid packet: no data @%" PRIu64 " for time %" PRIu64 " on track %zu", nxt.offset, nxt.time, nxt.tid);
dropTrack(nxt.tid, "VoD page load failure");
if (nxt.offset >= curPage[nxt.tid].len){
INFO_MSG("Reading past end of page %s: %" PRIu64 " > %" PRIu64 " for time %" PRIu64 " on track %zu", curPage[nxt.tid].name.c_str(), nxt.offset, curPage[nxt.tid].len, nxt.time, nxt.tid);
dropTrack(nxt.tid, "reading past end of page");
}else{
INFO_MSG("Invalid packet: no data @%" PRIu64 " in %s for time %" PRIu64 " on track %zu", nxt.offset, curPage[nxt.tid].name.c_str(), nxt.time, nxt.tid);
dropTrack(nxt.tid, "zero packet");
}
return false;
}
// We know this packet will be valid, pre-load it so we know its length
@ -1662,88 +1654,101 @@ namespace Mist{
dropTrack(nxt.tid, errMsg.str().c_str());
return false;
}
}else{
//no next packet yet!
//Check if this is the last packet of a VoD stream. Return success and drop the track.
if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisIdx = nxt.tid;
dropTrack(nxt.tid, "end of non-live track reached", false);
return true;
}
uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
//Check if there exists a different page for the next key
uint32_t nextKeyPage = INVALID_KEY_NUM;
//Make sure we only try to read the page for the next key if it actually should be available
DTSC::Keys keys(M.keys(nxt.tid));
if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);}
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
// If so, the next key is our next packet
nextTime = keys.getTime(thisKey + 1);
break;//Packet valid!
}
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
if (nextTime < nxt.time){
std::stringstream errMsg;
errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time;
errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage;
errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid();
dropTrack(nxt.tid, errMsg.str().c_str());
return false;
}
}else{
if (!buffer.getSyncMode() && trackTries < buffer.size()-1){
//We shuffle the just-tried packet back to the end of the queue, then retry up to buffer.size() times
buffer.moveFirstToEnd();
continue;
}
//Okay, there's no next page yet, and no next packet on this page either.
//That means we're waiting for data to show up, somewhere.
// after ~25 seconds, give up and drop the track.
if (++emptyCount >= dataWaitTimeout){
dropTrack(nxt.tid, "EOP: data wait timeout");
return false;
}
//every ~1 second, check if the stream is not offline
if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){
Util::logExitReason("Stream source shut down");
thisPacket.null();
return true;
}
//every ~16 seconds, reconnect to metadata
if (emptyCount % 1600 == 0){
INFO_MSG("Reconnecting to input; track %zu key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]);
reconnect();
if (!meta){
onFail("Could not connect to stream data", true);
thisPacket.null();
return true;
}
// if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile.
if (!meta){
Util::logExitReason("Attempted reconnect to source failed");
thisPacket.null();
return true;
}
return false;//no sleep after reconnect
}
//Fine! We didn't want a packet, anyway. Let's try again later.
playbackSleep(10);
//no next packet on the current page
//Check if this is the last packet of a VoD stream. Return success and drop the track.
if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisIdx = nxt.tid;
dropTrack(nxt.tid, "end of non-live track reached", false);
return true;
}
//Check if there exists a different page for the next key
uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
uint32_t nextKeyPage = INVALID_KEY_NUM;
//Make sure we only try to read the page for the next key if it actually should be available
DTSC::Keys keys(M.keys(nxt.tid));
if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);}
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
// If so, the next key is our next packet
nextTime = keys.getTime(thisKey + 1);
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
if (nextTime < nxt.time){
std::stringstream errMsg;
errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time;
errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage;
errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid();
dropTrack(nxt.tid, errMsg.str().c_str());
return false;
}
break;//Valid packet!
}
//Okay, there's no next page yet, and no next packet on this page either.
//That means we're waiting for data to show up, somewhere.
//In non-sync mode, shuffle the just-tried packet to the end of queue and retry
if (!buffer.getSyncMode()){
buffer.moveFirstToEnd();
continue;
}
// in sync mode, after ~25 seconds, give up and drop the track.
if (++emptyCount >= dataWaitTimeout){
dropTrack(nxt.tid, "EOP: data wait timeout");
return false;
}
//every ~1 second, check if the stream is not offline
if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){
Util::logExitReason("Stream source shut down");
thisPacket.null();
return true;
}
//every ~16 seconds, reconnect to metadata
if (emptyCount % 1600 == 0){
INFO_MSG("Reconnecting to input; track %zu key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]);
reconnect();
if (!meta){
onFail("Could not connect to stream data", true);
thisPacket.null();
return true;
}
// if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile.
if (!meta){
Util::logExitReason("Attempted reconnect to source failed");
thisPacket.null();
return true;
}
return false;//no sleep after reconnect
}
//Fine! We didn't want a packet, anyway. Let's try again later.
playbackSleep(10);
return false;
}
if (trackTries == buffer.size()){
//Fine! We didn't want a packet, anyway. Let's try again later.
playbackSleep(10);
return false;
}
// we've handled all special cases - at this point the packet should exist
// let's load it
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisIdx = nxt.tid;
thisTime = thisPacket.getTime();
// if it failed, drop the track and continue
if (!thisPacket){
dropTrack(nxt.tid, "packet load failure");
return false;
}
emptyCount = 0; // valid packet - reset empty counter
thisIdx = nxt.tid;
thisTime = thisPacket.getTime();
if (!userSelect[nxt.tid]){
dropTrack(nxt.tid, "track is not alive!");