diff --git a/lib/defines.h b/lib/defines.h index 2d19340d..92c8d48b 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -170,6 +170,7 @@ static inline void show_stackframe(){} #define DEFAULT_FRAGMENT_DURATION 1900 +/// \TODO These values are hardcoded and that is dangerous and probably a very bad idea. I don't even know if they are currently correct...?! I doubt they are. #define META_META_OFFSET 104 #define META_META_RECORDSIZE 576 diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 5f2ead11..0a8b64e1 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -1723,12 +1723,12 @@ namespace DTSC{ t.track.addField("channels", RAX_16UINT); t.track.addField("width", RAX_32UINT); t.track.addField("height", RAX_32UINT); + t.track.addField("fpks", RAX_16UINT); + t.track.addField("missedFrags", RAX_32UINT); t.track.addField("parts", RAX_NESTED, TRACK_PART_OFFSET + (TRACK_PART_RECORDSIZE * partCount)); t.track.addField("keys", RAX_NESTED, TRACK_KEY_OFFSET + (TRACK_KEY_RECORDSIZE * keyCount)); t.track.addField("fragments", RAX_NESTED, TRACK_FRAGMENT_OFFSET + (TRACK_FRAGMENT_RECORDSIZE * fragCount)); t.track.addField("pages", RAX_NESTED, TRACK_PAGE_OFFSET + (TRACK_PAGE_RECORDSIZE * pageCount)); - t.track.addField("fpks", RAX_16UINT); - t.track.addField("missedFrags", RAX_32UINT); t.track.setRCount(1); t.track.addRecords(1); diff --git a/src/output/output.cpp b/src/output/output.cpp index da70af24..c3322954 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -737,8 +737,6 @@ namespace Mist{ HIGH_MSG("Seeking to track %zu key %zu => time %" PRIu64, tid, keyNum, pos); if (actualKeyTime > pos){ if (M.getLive()){ - WARN_MSG("Actually seeking to %" PRIu64 ", for %" PRIu64 " is not available any more", - actualKeyTime, pos); pos = actualKeyTime; userSelect[tid].setKeyNum(keyNum); } @@ -784,7 +782,7 @@ namespace Mist{ stats(); } if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);} - FAIL_MSG("Track %zu no data (key %zu@%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset); + FAIL_MSG("Track %zu no data (key %zu@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1)); userSelect.erase(tid); firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000); return false; @@ -1249,7 +1247,7 @@ namespace Mist{ uint32_t sleepTime = std::min(20ul, needsLookAhead); // wait at most double the look ahead time, plus ten seconds uint64_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000 / sleepTime); - uint64_t needsTime = thisPacket.getTime() + needsLookAhead; + uint64_t needsTime = thisTime + needsLookAhead; bool firstTime = true; while (--timeoutTries && keepGoing()){ bool lookReady = true; @@ -1273,9 +1271,10 @@ namespace Mist{ //Make sure we stay responsive to requests and stats while waiting if (wantRequest){requestHandler();} stats(); + meta.reloadReplacedPagesIfNeeded(); } if (!timeoutTries){ - WARN_MSG("Waiting for lookahead timed out - resetting lookahead!"); + WARN_MSG("Waiting for lookahead (%zums in %zu tracks) timed out - resetting lookahead!", needsLookAhead, userSelect.size()); needsLookAhead = 0; } } diff --git a/src/output/output_cmaf.cpp b/src/output/output_cmaf.cpp index 4c48fd01..44d2925a 100644 --- a/src/output/output_cmaf.cpp +++ b/src/output/output_cmaf.cpp @@ -65,6 +65,7 @@ namespace Mist{ OutCMAF::OutCMAF(Socket::Connection &conn) : HTTPOutput(conn){ uaDelay = 0; + realTime = 0; if (config->getString("target").size()){ needsLookAhead = 5000; @@ -78,10 +79,7 @@ namespace Mist{ initialize(); initialSeek(); startPushOut(); - } else { - realTime = 0; } - INFO_MSG("Out of constructor now"); } //Properly end all tracks on shutdown. @@ -802,18 +800,19 @@ namespace Mist{ /// Function that waits at most `maxWait` ms (in steps of 100ms) for the next keyframe to become available. /// Uses thisIdx and thisPacket to determine track and current timestamp respectively. bool OutCMAF::waitForNextKey(uint64_t maxWait){ - size_t currentKey = M.getKeyIndexForTime(getMainSelectedTrack(), thisPacket.getTime()); - DTSC::Keys keys(M.keys(getMainSelectedTrack())); - size_t waitTimes = maxWait / 100; - for (size_t i = 0; i < waitTimes; ++i){ - if (keys.getEndValid() > currentKey + 1 && M.getLastms(thisIdx) > M.getTimeForKeyIndex(getMainSelectedTrack(), currentKey+1)){ + uint64_t mTrk = getMainSelectedTrack(); + size_t currentKey = M.getKeyIndexForTime(mTrk, thisTime); + uint64_t startTime = Util::bootMS(); + DTSC::Keys keys(M.keys(mTrk)); + while (startTime + maxWait > Util::bootMS() && keepGoing()){ + if (keys.getEndValid() > currentKey + 1 && M.getLastms(thisIdx) >= M.getTimeForKeyIndex(mTrk, currentKey+1)){ return true; } - Util::wait(100); - //Make sure we don't accidentally timeout while waiting - runs approximately every second. - if (i % 10 == 0){stats();} + Util::sleep(20); + meta.reloadReplacedPagesIfNeeded(); } - return (keys.getEndValid() > currentKey + 1 && M.getLastms(thisIdx) > M.getTimeForKeyIndex(getMainSelectedTrack(), currentKey+1)); + INFO_MSG("Timed out waiting for next key (track %" PRIu64 ", %zu+1, last is %zu, time is %" PRIu64 ")", mTrk, currentKey, keys.getEndValid()-1, M.getTimeForKeyIndex(getMainSelectedTrack(), currentKey+1)); + return (keys.getEndValid() > currentKey + 1 && M.getLastms(thisIdx) >= M.getTimeForKeyIndex(mTrk, currentKey+1)); } //Set up an empty connection to the target to make sure we can push data towards it. @@ -827,14 +826,15 @@ namespace Mist{ //CMAF Push output uses keyframe boundaries instead of fragment boundaries, to allow for lower latency void OutCMAF::pushNext() { + size_t mTrk = getMainSelectedTrack(); //Set up a new connection if this is a new track, or if we have been disconnected. if (!pushTracks.count(thisIdx) || !pushTracks.at(thisIdx).D.getSocket()){ if (pushTracks.count(thisIdx)){INFO_MSG("Reconnecting existing track: socket was disconnected");} CMAFPushTrack & track = pushTracks[thisIdx]; - size_t keyIndex = M.getKeyIndexForTime(getMainSelectedTrack(), thisPacket.getTime()); - track.headerFrom = M.getTimeForKeyIndex(getMainSelectedTrack(), keyIndex); + size_t keyIndex = M.getKeyIndexForTime(mTrk, thisPacket.getTime()); + track.headerFrom = M.getTimeForKeyIndex(mTrk, keyIndex); if (track.headerFrom < thisPacket.getTime()){ - track.headerFrom = M.getTimeForKeyIndex(getMainSelectedTrack(), keyIndex + 1); + track.headerFrom = M.getTimeForKeyIndex(mTrk, keyIndex + 1); } INFO_MSG("Starting track %zu at %" PRIu64 "ms into the stream, current packet at %" PRIu64 "ms", thisIdx, track.headerFrom, thisPacket.getTime()); @@ -846,14 +846,18 @@ namespace Mist{ CMAFPushTrack & track = pushTracks[thisIdx]; if (thisPacket.getTime() < track.headerFrom){return;} if (thisPacket.getTime() >= track.headerUntil){ - size_t keyIndex = M.getKeyIndexForTime(getMainSelectedTrack(), thisPacket.getTime()); - uint64_t keyTime = M.getTimeForKeyIndex(getMainSelectedTrack(), keyIndex); - if (keyTime > thisPacket.getTime()){ - WARN_MSG("Corruption probably occurred, initiating reconnect %" PRIu64 " != %" PRIu64, keyTime, thisPacket.getTime()); - onTrackEnd(thisIdx); - track.headerFrom = M.getTimeForKeyIndex(getMainSelectedTrack(), keyIndex + 1); - track.headerUntil = 0; - pushNext(); + size_t keyIndex = M.getKeyIndexForTime(mTrk, thisTime); + uint64_t keyTime = M.getTimeForKeyIndex(mTrk, keyIndex); + if (keyTime > thisTime){ + realTime = 1000; + if (!liveSeek()){ + WARN_MSG("Corruption probably occurred, initiating reconnect. Key %zu is time %" PRIu64 ", but packet is time %" PRIu64, keyIndex, keyTime, thisTime); + onTrackEnd(thisIdx); + track.headerFrom = M.getTimeForKeyIndex(mTrk, keyIndex + 1); + track.headerUntil = 0; + pushNext(); + } + realTime = 0; return; } track.headerFrom = keyTime; @@ -862,7 +866,7 @@ namespace Mist{ dropTrack(thisIdx, "No next keyframe available"); return; } - track.headerUntil = M.getTimeForKeyIndex(getMainSelectedTrack(), keyIndex + 1); + track.headerUntil = M.getTimeForKeyIndex(mTrk, keyIndex + 1); std::string keyHeader = CMAF::keyHeader(M, thisIdx, track.headerFrom, track.headerUntil, keyIndex+1, true, true); uint64_t mdatSize = 8 + CMAF::payloadSize(M, thisIdx, track.headerFrom, track.headerUntil); char mdatHeader[] ={0x00, 0x00, 0x00, 0x00, 'm', 'd', 'a', 't'}; diff --git a/src/output/output_cmaf.h b/src/output/output_cmaf.h index f24bfab3..70fb0c1a 100644 --- a/src/output/output_cmaf.h +++ b/src/output/output_cmaf.h @@ -69,7 +69,7 @@ namespace Mist{ HTTP::URL pushUrl; std::map pushTracks; void setupTrackObject(size_t idx); - bool waitForNextKey(uint64_t maxWait = 5000); + bool waitForNextKey(uint64_t maxWait = 15000); // End CMAF push out }; }// namespace Mist