From f560b88bfed2f0f05d1712c97f3f8fee3d45576a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 6 Jan 2022 12:52:47 +0100 Subject: [PATCH] Several fixes: - Fixed bug in stream health function causing loop if track not active - Fixed DTSC pulls ignoring data before the live point - Improved async buffers (deque mode) to spread the tracks more fairly - DTSC pull now implements "ping" and "error" commands - DTSC pulls report suspicious keyframe intervals to the origin and ask for confirmation - DTSC output now accepts these reports and disconnects if there is no match in keyframe intervals - Outputs in async mode now keep the seek point in all tracks when reselecting - Outputs in async mode now default to a starting position in each track that is at a keyframe roughly halfway in the buffer - Outputs in async mode now ignore playback rate (always fastest possible) - Removed code duplication in prepareNext function - Reordered the prepareNext function somewhat to be easier to follow for humans - DTSC output no longer overrides initialSeek function, now uses default implementation - Sanitycheck output now supports both sync and async modes, supports printing multiple timestamps for multiple tracks --- lib/dtsc.cpp | 13 +- lib/stream.cpp | 22 ++- lib/stream.h | 1 + src/input/input_dtsc.cpp | 69 +++++++-- src/output/output.cpp | 241 +++++++++++++++--------------- src/output/output_dtsc.cpp | 70 ++++----- src/output/output_dtsc.h | 1 - src/output/output_sanitycheck.cpp | 61 +++----- src/output/output_sanitycheck.h | 1 - 9 files changed, 257 insertions(+), 222 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 133038a6..4d186c8c 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -1384,8 +1384,13 @@ namespace DTSC{ setType(newIdx, M.getType(*it)); setCodec(newIdx, M.getCodec(*it)); setLang(newIdx, M.getLang(*it)); - setFirstms(newIdx, M.getFirstms(*it)); - setLastms(newIdx, M.getLastms(*it)); + if (copyData){ + setFirstms(newIdx, M.getFirstms(*it)); + setLastms(newIdx, M.getLastms(*it)); + }else{ + setFirstms(newIdx, 0); + setLastms(newIdx, 0); + } setBps(newIdx, M.getBps(*it)); setMaxBps(newIdx, M.getMaxBps(*it)); setFpks(newIdx, M.getFpks(*it)); @@ -3241,8 +3246,8 @@ namespace DTSC{ uint32_t longest_cnt = 0; DTSC::Keys Mkeys(keys(i)); uint32_t firstKey = Mkeys.getFirstValid(); - uint32_t endKey = Mkeys.getEndValid() - 1; - for (int k = firstKey; k < endKey; k++){ + uint32_t endKey = Mkeys.getEndValid(); + for (uint32_t k = firstKey; k+1 < endKey; k++){ uint64_t kDur = Mkeys.getDuration(k); uint64_t kParts = Mkeys.getParts(k); if (!kDur){continue;} diff --git a/lib/stream.cpp b/lib/stream.cpp index 1458a71c..61fc2097 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -280,12 +280,10 @@ void Util::packetSorter::dropTrack(size_t tid){ /// Removes the first packet from the sorter and inserts the given packet. void Util::packetSorter::replaceFirst(const sortedPageInfo &pInfo){ if (dequeMode){ + //in deque mode, insertion of the new packet is at the back + //this works, as a failure to retrieve a packet will swap the front entry to the back as well dequeBuffer.pop_front(); - if (dequeBuffer.size() && dequeBuffer.front().time > pInfo.time){ - dequeBuffer.push_front(pInfo); - }else{ - dequeBuffer.push_back(pInfo); - } + dequeBuffer.push_back(pInfo); }else{ setBuffer.erase(setBuffer.begin()); setBuffer.insert(pInfo); @@ -328,6 +326,20 @@ void Util::packetSorter::getTrackList(std::set &toFill) const{ } } +/// Fills toFill with track IDs and current playback position of tracks that are in the sorter. +void Util::packetSorter::getTrackList(std::map &toFill) const{ + toFill.clear(); + if (dequeMode){ + for (std::deque::const_iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){ + toFill[it->tid] = it->time; + } + }else{ + for (std::set::const_iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){ + toFill[it->tid] = it->time; + } + } +} + JSON::Value Util::getStreamConfig(const std::string &streamname){ JSON::Value result; if (streamname.size() > 100){ diff --git a/lib/stream.h b/lib/stream.h index fa760eeb..8b60c309 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -75,6 +75,7 @@ namespace Util{ void moveFirstToEnd(); bool hasEntry(size_t tid) const; void getTrackList(std::set &toFill) const; + void getTrackList(std::map &toFill) const; void setSyncMode(bool synced); bool getSyncMode() const; private: diff --git a/src/input/input_dtsc.cpp b/src/input/input_dtsc.cpp index 390c07fa..eacbe0fb 100644 --- a/src/input/input_dtsc.cpp +++ b/src/input/input_dtsc.cpp @@ -165,7 +165,7 @@ namespace Mist{ DTSC::Packet metaPack(dataPacket.data(), dataPacket.size()); DTSC::Meta nM("", metaPack.getScan()); meta.reInit(streamName, false); - meta.merge(nM); + meta.merge(nM, true, false); std::set validTracks = M.getMySourceTracks(getpid()); userSelect.clear(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ @@ -342,20 +342,39 @@ namespace Mist{ // userClient.keepAlive(); std::string cmd; thisPacket.getString("cmd", cmd); - if (cmd != "reset"){ + if (cmd == "reset"){ + // Read next packet thisPacket.reInit(srcConn); + if (thisPacket.getVersion() != DTSC::DTSC_HEAD){ + meta.clear(); + continue; + } + DTSC::Meta nM("", thisPacket.getScan()); + meta.merge(nM, true, false); + thisPacket.reInit(srcConn); // read the next packet before continuing + continue; // parse the next packet before returning + } + if (cmd == "error"){ + thisPacket.getString("msg", cmd); + Util::logExitReason("%s", cmd.c_str()); + thisPacket.null(); + return; + } + if (cmd == "ping"){ + thisPacket.reInit(srcConn); + JSON::Value prep; + prep["cmd"] = "ok"; + prep["msg"] = "Pong!"; + srcConn.SendNow("DTCM"); + char sSize[4] ={0, 0, 0, 0}; + Bit::htobl(sSize, prep.packedSize()); + srcConn.SendNow(sSize, 4); + prep.sendTo(srcConn); continue; } - // Read next packet + INFO_MSG("Unhandled command: %s", cmd.c_str()); thisPacket.reInit(srcConn); - if (thisPacket.getVersion() != DTSC::DTSC_HEAD){ - meta.clear(); - continue; - } - DTSC::Meta nM("", thisPacket.getScan()); - meta.merge(nM, true, false); - thisPacket.reInit(srcConn); // read the next packet before continuing - continue; // parse the next packet before returning + continue; } if (thisPacket.getVersion() == DTSC::DTSC_HEAD){ DTSC::Meta nM("", thisPacket.getScan()); @@ -364,7 +383,33 @@ namespace Mist{ continue; // parse the next packet before returning } thisTime = thisPacket.getTime(); - thisIdx = thisPacket.getTrackId(); + thisIdx = M.trackIDToIndex(thisPacket.getTrackId()); + if (thisPacket.getFlag("keyframe") && M.trackValid(thisIdx)){ + uint32_t shrtest_key = 0xFFFFFFFFul; + uint32_t longest_key = 0; + DTSC::Keys Mkeys(M.keys(thisIdx)); + uint32_t firstKey = Mkeys.getFirstValid(); + uint32_t endKey = Mkeys.getEndValid(); + uint32_t checkKey = (endKey-firstKey <= 3)?firstKey:endKey-3; + for (uint32_t k = firstKey; k+1 < endKey; k++){ + uint64_t kDur = Mkeys.getDuration(k); + if (!kDur){continue;} + if (kDur > longest_key && k >= checkKey){longest_key = kDur;} + if (kDur < shrtest_key){shrtest_key = kDur;} + } + if (longest_key > shrtest_key*2){ + JSON::Value prep; + prep["cmd"] = "check_key_duration"; + prep["id"] = thisPacket.getTrackId(); + prep["duration"] = longest_key; + srcConn.SendNow("DTCM"); + char sSize[4] ={0, 0, 0, 0}; + Bit::htobl(sSize, prep.packedSize()); + srcConn.SendNow(sSize, 4); + prep.sendTo(srcConn); + INFO_MSG("Key duration %" PRIu32 " is quite long - confirming with upstream source", longest_key); + } + } return; // we have a packet } } diff --git a/src/output/output.cpp b/src/output/output.cpp index 14d9002c..0a8588a1 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -463,7 +463,7 @@ namespace Mist{ meta.reloadReplacedPagesIfNeeded(); bool autoSeek = buffer.size(); - uint64_t seekTarget = currentTime(); + uint64_t seekTarget = buffer.getSyncMode()?currentTime():0; std::set newSelects = Util::wouldSelect(M, targetParams, capa, UA, autoSeek ? seekTarget : 0); @@ -482,9 +482,10 @@ namespace Mist{ } std::set oldSelects; - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ - oldSelects.insert(it->first); - } + buffer.getTrackList(oldSelects); + std::map seekTargets; + buffer.getTrackList(seekTargets); + //No changes? Abort and return false; if (oldSelects == newSelects){return false;} @@ -510,6 +511,7 @@ namespace Mist{ WARN_MSG("Could not select track %zu, dropping track", *it); newSelects.erase(*it); userSelect.erase(*it); + continue; } } @@ -521,10 +523,16 @@ namespace Mist{ //After attempting to add/remove tracks, now no changes? Abort and return false; if (oldSelects == newSelects){return false;} - if (autoSeek){ - INFO_MSG("Automatically seeking to position %" PRIu64 " to resume playback", seekTarget); - seek(seekTarget); + buffer.clear(); + INFO_MSG("Automatically seeking to resume playback"); + for (std::set::iterator it = newSelects.begin(); it != newSelects.end(); it++){ + if (seekTargets.count(*it)){ + seek(*it, seekTargets[*it], false); + }else{ + seek(*it, 0, false); + } + } } return true; } @@ -793,6 +801,11 @@ namespace Mist{ return false; } DTSC::Keys keys(M.keys(tid)); + if (M.getLive() && !pos && !buffer.getSyncMode()){ + uint64_t tmpTime = (M.getFirstms(tid) + M.getLastms(tid))/2; + uint32_t tmpKey = M.getKeyNumForTime(tid, tmpTime); + pos = keys.getTime(tmpKey); + } uint32_t keyNum = M.getKeyNumForTime(tid, pos); if (keyNum == INVALID_KEY_NUM){ FAIL_MSG("Attempted seek on empty track %zu", tid); @@ -827,8 +840,7 @@ namespace Mist{ tmp.time = tmpPack.getTime(); } if (tmpPack){ - HIGH_MSG("Sought to time %" PRIu64 " (yields a packet at %" PRIu64 "ms) in %s@%zu", tmp.time, - tmpPack.getTime(), streamName.c_str(), tid); + HIGH_MSG("Sought to time %" PRIu64 " in %s", tmp.time, curPage[tid].name.c_str()); tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid); buffer.insert(tmp); return true; @@ -862,7 +874,7 @@ namespace Mist{ void Output::initialSeek(){ if (!meta){return;} uint64_t seekPos = 0; - if (meta.getLive()){ + if (meta.getLive() && buffer.getSyncMode()){ size_t mainTrack = getMainSelectedTrack(); if (mainTrack == INVALID_TRACK_ID){return;} DTSC::Keys keys(M.keys(mainTrack)); @@ -1208,7 +1220,7 @@ namespace Mist{ /// Waits for the given amount of millis, increasing the realtime playback /// related times as needed to keep smooth playback intact. void Output::playbackSleep(uint64_t millis){ - if (realTime && M.getLive()){ + if (realTime && M.getLive() && buffer.getSyncMode()){ firstTime += millis; extraKeepAway += millis; } @@ -1321,7 +1333,7 @@ namespace Mist{ if (firstPacketTime == 0xFFFFFFFFFFFFFFFFull){firstPacketTime = lastPacketTime;} // slow down processing, if real time speed is wanted - if (realTime){ + if (realTime && buffer.getSyncMode()){ uint8_t i = 6; while (--i && thisPacket.getTime() > (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead) && keepGoing()){ @@ -1476,9 +1488,14 @@ namespace Mist{ // depending on whether this is probably bad and the current debug level, print a message size_t printLevel = (probablyBad ? DLVL_WARN : DLVL_INFO); const Comms::Users &usr = userSelect.at(trackId); - DEBUG_MSG(printLevel, "Dropping %s track %zu@k%zu (nextP=%" PRIu64 ", lastP=%" PRIu64 "): %s", - meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1, - pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str()); + if (!usr){ + DEBUG_MSG(printLevel, "Dropping %s track %zu (lastP=%" PRIu64 "): %s", + meta.getCodec(trackId).c_str(), trackId, pageNumMax(trackId), reason.c_str()); + }else{ + DEBUG_MSG(printLevel, "Dropping %s track %zu@k%zu (nextP=%" PRIu64 ", lastP=%" PRIu64 "): %s", + meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1, + pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str()); + } // now actually drop the track from the buffer buffer.dropTrack(trackId); userSelect.erase(trackId); @@ -1576,42 +1593,12 @@ namespace Mist{ return false; } - Util::sortedPageInfo nxt = *(buffer.begin()); + Util::sortedPageInfo nxt; - if (meta.reloadReplacedPagesIfNeeded()){return false;} - if (!M.getValidTracks().count(nxt.tid)){ - dropTrack(nxt.tid, "disappeared from metadata"); - return false; - } - - // if we're going to read past the end of the data page, load the next page - // this only happens for VoD - if (nxt.offset >= curPage[nxt.tid].len || - (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){ - if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ - dropTrack(nxt.tid, "end of non-live track reached", false); - return false; - } - if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){ - loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time)); - nxt.offset = 0; - //Only read the next time if the page load succeeded and there is a packet to read from - if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){ - nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0); - } - buffer.replaceFirst(nxt); - return false; - } - dropTrack(nxt.tid, "VoD page load failure"); - return false; - } - - // We know this packet will be valid, pre-load it so we know its length - DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); - - uint64_t nextTime = 0; + uint64_t nextTime; + size_t trackTries = 0; //In case we're not in sync mode, we might have to retry a few times - for (size_t trackTries = 0; trackTries < buffer.size(); ++trackTries){ + for (; trackTries < buffer.size(); ++trackTries){ nxt = *(buffer.begin()); @@ -1639,8 +1626,13 @@ namespace Mist{ buffer.replaceFirst(nxt); return false; } - INFO_MSG("Invalid packet: no data @%" PRIu64 " for time %" PRIu64 " on track %zu", nxt.offset, nxt.time, nxt.tid); - dropTrack(nxt.tid, "VoD page load failure"); + if (nxt.offset >= curPage[nxt.tid].len){ + INFO_MSG("Reading past end of page %s: %" PRIu64 " > %" PRIu64 " for time %" PRIu64 " on track %zu", curPage[nxt.tid].name.c_str(), nxt.offset, curPage[nxt.tid].len, nxt.time, nxt.tid); + dropTrack(nxt.tid, "reading past end of page"); + }else{ + INFO_MSG("Invalid packet: no data @%" PRIu64 " in %s for time %" PRIu64 " on track %zu", nxt.offset, curPage[nxt.tid].name.c_str(), nxt.time, nxt.tid); + dropTrack(nxt.tid, "zero packet"); + } return false; } // We know this packet will be valid, pre-load it so we know its length @@ -1662,88 +1654,101 @@ namespace Mist{ dropTrack(nxt.tid, errMsg.str().c_str()); return false; } - }else{ - //no next packet yet! - //Check if this is the last packet of a VoD stream. Return success and drop the track. - if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ - thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); - thisIdx = nxt.tid; - dropTrack(nxt.tid, "end of non-live track reached", false); - return true; - } - uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); - //Check if there exists a different page for the next key - uint32_t nextKeyPage = INVALID_KEY_NUM; - //Make sure we only try to read the page for the next key if it actually should be available - DTSC::Keys keys(M.keys(nxt.tid)); - if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);} - if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){ - // If so, the next key is our next packet - nextTime = keys.getTime(thisKey + 1); + break;//Packet valid! + } - //If the next packet should've been before the current packet, something is wrong. Abort, abort! - if (nextTime < nxt.time){ - std::stringstream errMsg; - errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time; - errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage; - errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid(); - dropTrack(nxt.tid, errMsg.str().c_str()); - return false; - } - }else{ - if (!buffer.getSyncMode() && trackTries < buffer.size()-1){ - //We shuffle the just-tried packet back to the end of the queue, then retry up to buffer.size() times - buffer.moveFirstToEnd(); - continue; - } - //Okay, there's no next page yet, and no next packet on this page either. - //That means we're waiting for data to show up, somewhere. - // after ~25 seconds, give up and drop the track. - if (++emptyCount >= dataWaitTimeout){ - dropTrack(nxt.tid, "EOP: data wait timeout"); - return false; - } - //every ~1 second, check if the stream is not offline - if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){ - Util::logExitReason("Stream source shut down"); - thisPacket.null(); - return true; - } - //every ~16 seconds, reconnect to metadata - if (emptyCount % 1600 == 0){ - INFO_MSG("Reconnecting to input; track %zu key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]); - reconnect(); - if (!meta){ - onFail("Could not connect to stream data", true); - thisPacket.null(); - return true; - } - // if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile. - if (!meta){ - Util::logExitReason("Attempted reconnect to source failed"); - thisPacket.null(); - return true; - } - return false;//no sleep after reconnect - } - //Fine! We didn't want a packet, anyway. Let's try again later. - playbackSleep(10); + //no next packet on the current page + + //Check if this is the last packet of a VoD stream. Return success and drop the track. + if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ + thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); + thisIdx = nxt.tid; + dropTrack(nxt.tid, "end of non-live track reached", false); + return true; + } + + //Check if there exists a different page for the next key + uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); + uint32_t nextKeyPage = INVALID_KEY_NUM; + //Make sure we only try to read the page for the next key if it actually should be available + DTSC::Keys keys(M.keys(nxt.tid)); + if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);} + if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){ + // If so, the next key is our next packet + nextTime = keys.getTime(thisKey + 1); + + //If the next packet should've been before the current packet, something is wrong. Abort, abort! + if (nextTime < nxt.time){ + std::stringstream errMsg; + errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time; + errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage; + errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid(); + dropTrack(nxt.tid, errMsg.str().c_str()); return false; } + break;//Valid packet! } + + //Okay, there's no next page yet, and no next packet on this page either. + //That means we're waiting for data to show up, somewhere. + + //In non-sync mode, shuffle the just-tried packet to the end of queue and retry + if (!buffer.getSyncMode()){ + buffer.moveFirstToEnd(); + continue; + } + + // in sync mode, after ~25 seconds, give up and drop the track. + if (++emptyCount >= dataWaitTimeout){ + dropTrack(nxt.tid, "EOP: data wait timeout"); + return false; + } + //every ~1 second, check if the stream is not offline + if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){ + Util::logExitReason("Stream source shut down"); + thisPacket.null(); + return true; + } + //every ~16 seconds, reconnect to metadata + if (emptyCount % 1600 == 0){ + INFO_MSG("Reconnecting to input; track %zu key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]); + reconnect(); + if (!meta){ + onFail("Could not connect to stream data", true); + thisPacket.null(); + return true; + } + // if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile. + if (!meta){ + Util::logExitReason("Attempted reconnect to source failed"); + thisPacket.null(); + return true; + } + return false;//no sleep after reconnect + } + + //Fine! We didn't want a packet, anyway. Let's try again later. + playbackSleep(10); + return false; + } + + if (trackTries == buffer.size()){ + //Fine! We didn't want a packet, anyway. Let's try again later. + playbackSleep(10); + return false; } // we've handled all special cases - at this point the packet should exist // let's load it thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); - thisIdx = nxt.tid; - thisTime = thisPacket.getTime(); // if it failed, drop the track and continue if (!thisPacket){ dropTrack(nxt.tid, "packet load failure"); return false; } emptyCount = 0; // valid packet - reset empty counter + thisIdx = nxt.tid; + thisTime = thisPacket.getTime(); if (!userSelect[nxt.tid]){ dropTrack(nxt.tid, "track is not alive!"); diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index 8cc16a69..1e9265d4 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -115,47 +115,6 @@ namespace Mist{ std::string OutDTSC::getStatsName(){return (pushing ? "INPUT:DTSC" : "OUTPUT:DTSC");} - /// Seeks to the first sync'ed keyframe of the main track. - /// Aborts if there is no main track or it has no keyframes. - void OutDTSC::initialSeek(){ - uint64_t seekPos = 0; - if (M.getLive()){ - size_t mainTrack = getMainSelectedTrack(); - // cancel if there are no keys in the main track - if (mainTrack == INVALID_TRACK_ID){return;} - - DTSC::Keys keys(M.keys(mainTrack)); - if (!keys.getValidCount()){return;} - // seek to the oldest keyframe - std::set validTracks = M.getValidTracks(); - for (size_t i = keys.getFirstValid(); i < keys.getEndValid(); ++i){ - seekPos = keys.getTime(i); - bool good = true; - // check if all tracks have data for this point in time - for (std::map::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){ - if (mainTrack == ti->first){continue;}// skip self - if (!validTracks.count(ti->first)){ - HIGH_MSG("Skipping track %zu, not in tracks", ti->first); - continue; - }// ignore missing tracks - if (M.getLastms(ti->first) == M.getFirstms(ti->first)){ - HIGH_MSG("Skipping track %zu, last equals first", ti->first); - continue; - }// ignore point-tracks - if (M.getFirstms(ti->first) > seekPos){ - good = false; - break; - } - HIGH_MSG("Track %zu is good", ti->first); - } - // if yes, seek here - if (good){break;} - } - } - MEDIUM_MSG("Initial seek to %" PRIu64 "ms", seekPos); - seek(seekPos); - } - void OutDTSC::sendNext(){ // If selectable tracks changed, set sentHeader to false to force it to send init data static uint64_t lastMeta = 0; @@ -167,7 +126,7 @@ namespace Mist{ return; } } - DTSC::Packet p(thisPacket, thisIdx + 1); + DTSC::Packet p(thisPacket, thisIdx+1); myConn.SendNow(p.getData(), p.getDataLen()); lastActive = Util::epoch(); } @@ -201,6 +160,10 @@ namespace Mist{ std::string dataPacket = myConn.Received().remove(rSize); DTSC::Scan dScan((char *)dataPacket.data(), rSize); HIGH_MSG("Received DTCM: %s", dScan.asJSON().toString().c_str()); + if (dScan.getMember("cmd").asString() == "ok"){ + INFO_MSG("Remote OK: %s", dScan.getMember("msg").asString().c_str()); + continue; + } if (dScan.getMember("cmd").asString() == "push"){ handlePush(dScan); continue; @@ -230,6 +193,29 @@ namespace Mist{ sendOk("Internal state reset"); continue; } + if (dScan.getMember("cmd").asString() == "check_key_duration"){ + size_t idx = dScan.getMember("id").asInt() - 1; + size_t dur = dScan.getMember("duration").asInt(); + if (!M.trackValid(idx)){ + ERROR_MSG("Cannot check key duration %zu for track %zu: not valid", dur, idx); + return; + } + uint32_t longest_key = 0; + DTSC::Keys Mkeys(M.keys(idx)); + uint32_t firstKey = Mkeys.getFirstValid(); + uint32_t endKey = Mkeys.getEndValid(); + for (uint32_t k = firstKey; k+1 < endKey; k++){ + uint64_t kDur = Mkeys.getDuration(k); + if (kDur > longest_key){longest_key = kDur;} + } + if (dur > longest_key*1.2){ + onFail("Key duration mismatch; disconnecting "+myConn.getHost()+" to recover ("+JSON::Value(longest_key).asString()+" -> "+JSON::Value(dur).asString()+")", true); + return; + }else{ + sendOk("Key duration matches upstream"); + } + continue; + } WARN_MSG("Unhandled DTCM command: '%s'", dScan.getMember("cmd").asString().c_str()); }else if (myConn.Received().copy(4) == "DTSC"){ // Header packet diff --git a/src/output/output_dtsc.h b/src/output/output_dtsc.h index 84920b9c..4b92ec82 100644 --- a/src/output/output_dtsc.h +++ b/src/output/output_dtsc.h @@ -11,7 +11,6 @@ namespace Mist{ void onRequest(); void sendNext(); void sendHeader(); - void initialSeek(); static bool listenMode(){return !(config->getString("target").size());} void onFail(const std::string &msg, bool critical = false); void stats(bool force = false); diff --git a/src/output/output_sanitycheck.cpp b/src/output/output_sanitycheck.cpp index bf64bc3b..d86f11d2 100644 --- a/src/output/output_sanitycheck.cpp +++ b/src/output/output_sanitycheck.cpp @@ -14,6 +14,11 @@ namespace Mist{ //} parseData = true; wantRequest = false; + if (config->getBool("sync")){ + setSyncMode(true); + }else{ + setSyncMode(false); + } initialize(); initialSeek(); sortSet.clear(); @@ -50,19 +55,7 @@ namespace Mist{ seek(seekPoint); } } - } - void OutSanityCheck::initialSeek(){ - if (M.getLive()){ - liveSeek(); - if (getKeyFrame() && thisPacket){ - sendNext(); - INFO_MSG("Initial sent!"); - } - firstTime = Util::getMS() - currentTime(); - }else{ - Output::initialSeek(); - } } void OutSanityCheck::init(Util::Config *cfg){ @@ -74,8 +67,11 @@ namespace Mist{ "\"stream\",\"help\":\"The name of the stream " "that this connector will transmit.\"}")); cfg->addOption( - "seek", JSON::fromString("{\"arg\":\"string\",\"short\":\"S\",\"long\":\"seek\",\"help\":" + "seek", JSON::fromString("{\"arg\":\"string\",\"short\":\"k\",\"long\":\"seek\",\"help\":" "\"Time in ms to check from - by default start of stream\"}")); + cfg->addOption( + "sync", JSON::fromString("{\"short\":\"y\",\"long\":\"sync\",\"help\":" + "\"Retrieve tracks in sync (default async)\"}")); cfg->addBasicConnectorOptions(capa); config = cfg; } @@ -89,36 +85,23 @@ namespace Mist{ } */ +#define printTime(t) std::setfill('0') << std::setw(2) << (t / 3600000) << ":" << std::setw(2) << ((t % 3600000) / 60000) << ":" << std::setw(2) << ((t % 60000) / 1000) << "." << std::setw(3) << (t % 1000) + void OutSanityCheck::sendNext(){ + static std::map trkTime; if (M.getLive()){ - static uint64_t prevTime = 0; - static size_t prevTrack = 0; - uint64_t t = thisPacket.getTime(); - if (t < prevTime){ - std::cout << "Time error: "; - std::cout << std::setfill('0') << std::setw(2) << (t / 3600000) << ":" << std::setw(2) - << ((t % 3600000) / 60000) << ":" << std::setw(2) << ((t % 60000) / 1000) << "." - << std::setw(3) << (t % 1000); - std::cout << " (" << thisIdx << ")"; - std::cout << " < "; - std::cout << std::setfill('0') << std::setw(2) << (prevTime / 3600000) << ":" - << std::setw(2) << ((prevTime % 3600000) / 60000) << ":" << std::setw(2) - << ((prevTime % 60000) / 1000) << "." << std::setw(3) << (prevTime % 1000); - std::cout << " (" << prevTrack << ")"; - std::cout << std::endl << std::endl; + if (thisTime < trkTime[thisIdx]){ + std::cout << "Time error in track " << thisIdx << ": "; + std::cout << printTime(thisTime) << " < " << printTime(trkTime[thisIdx]) << std::endl << std::endl; }else{ - prevTime = t; - prevTrack = thisIdx; + trkTime[thisIdx] = thisTime; } - std::cout << "\033[A" << std::setfill('0') << std::setw(2) << (t / 3600000) << ":" - << std::setw(2) << ((t % 3600000) / 60000) << ":" << std::setw(2) - << ((t % 60000) / 1000) << "." << std::setw(3) << (t % 1000) << " "; - uint32_t mainTrack = M.mainTrack(); - if (mainTrack == INVALID_TRACK_ID){return;} - t = M.getLastms(mainTrack); - std::cout << std::setfill('0') << std::setw(2) << (t / 3600000) << ":" << std::setw(2) - << ((t % 3600000) / 60000) << ":" << std::setw(2) << ((t % 60000) / 1000) << "." - << std::setw(3) << (t % 1000) << " " << std::endl; + std::cout << "\033[A"; + for (std::map::iterator it = trkTime.begin(); it != trkTime.end(); ++it){ + uint64_t t = M.getLastms(it->first); + std::cout << it->first << ":" << printTime(it->second) << "/" << printTime(t) << ", "; + } + std::cout << std::endl; return; } diff --git a/src/output/output_sanitycheck.h b/src/output/output_sanitycheck.h index 90273783..8dc211a4 100644 --- a/src/output/output_sanitycheck.h +++ b/src/output/output_sanitycheck.h @@ -27,7 +27,6 @@ namespace Mist{ OutSanityCheck(Socket::Connection &conn); static void init(Util::Config *cfg); void sendNext(); - void initialSeek(); static bool listenMode(){return false;} protected: