From 7c89122a527cec8e23fe596dd6907b059c1a1470 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 10 Jun 2016 14:17:42 +0200 Subject: [PATCH 1/6] Improved output default seek position and completeKeyReady logic --- src/output/output.cpp | 75 ++++++++++++++++++++++++++++++++----------- src/output/output.h | 1 + 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 6c3c7ac2..9f5f7ed9 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -128,7 +128,8 @@ namespace Mist { if (!selectedTracks.size()){ selectDefaultTracks(); } - if (myMeta.tracks[getMainSelectedTrack()].keys.size() >= 2){ + unsigned int mainTrack = getMainSelectedTrack(); + if (mainTrack && myMeta.tracks.count(mainTrack) && myMeta.tracks[mainTrack].keys.size() >= 2){ return true; }else{ HIGH_MSG("NOT READY YET (%lu tracks, %lu = %lu keys)", myMeta.tracks.size(), getMainSelectedTrack(), myMeta.tracks[getMainSelectedTrack()].keys.size()); @@ -473,6 +474,7 @@ namespace Mist { bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){ if (myMeta.tracks[tid].lastms < pos){ INFO_MSG("Aborting seek to %llums in track %u: past end of track (= %llums).", pos, tid, myMeta.tracks[tid].lastms); + selectedTracks.erase(tid); return false; } unsigned int keyNum = getKeyForTime(tid, pos); @@ -485,6 +487,7 @@ namespace Mist { loadPageForKey(tid, keyNum + (getNextKey?1:0)); if (!nProxy.curPage.count(tid) || !nProxy.curPage[tid].mapped){ INFO_MSG("Aborting seek to %llums in track %u: not available.", pos, tid); + selectedTracks.erase(tid); return false; } sortedPageInfo tmp; @@ -519,9 +522,48 @@ namespace Mist { return seek(tid, pos, getNextKey); } } + selectedTracks.erase(tid); return false; } } + + /// This function decides where in the stream initial playback starts. + /// The default implementation calls seek(0) for VoD. + /// For live, it seeks to the last sync'ed keyframe of the main track. + /// Unless lastms < 5000, then it seeks to the first keyframe of the main track. + /// Aborts if there is no main track or it has no keyframes. + void Output::initialSeek(){ + unsigned long long seekPos = 0; + if (myMeta.live){ + long unsigned int mainTrack = getMainSelectedTrack(); + //cancel if there are no keys in the main track + if (!myMeta.tracks.count(mainTrack) || !myMeta.tracks[mainTrack].keys.size()){return;} + //seek to the newest keyframe, unless that is <5s, then seek to the oldest keyframe + for (std::deque::reverse_iterator it = myMeta.tracks[mainTrack].keys.rbegin(); it != myMeta.tracks[mainTrack].keys.rend(); ++it){ + seekPos = it->getTime(); + if (seekPos < 5000){continue;}//if we're near the start, skip back + bool good = true; + //check if all tracks have data for this point in time + for (std::set::iterator ti = selectedTracks.begin(); ti != selectedTracks.end(); ++ti){ + if (mainTrack == *ti){continue;}//skip self + if (!myMeta.tracks.count(*ti)){ + HIGH_MSG("Skipping track %lu, not in tracks", *ti); + continue; + }//ignore missing tracks + if (myMeta.tracks[*ti].lastms == myMeta.tracks[*ti].firstms){ + HIGH_MSG("Skipping track %lu, last equals first", *ti); + continue; + }//ignore point-tracks + if (myMeta.tracks[*ti].lastms < seekPos){good = false; break;} + HIGH_MSG("Track %lu is good", *ti); + } + //if yes, seek here + if (good){break;} + } + } + MEDIUM_MSG("Initial seek to %llums", seekPos); + seek(seekPos); + } void Output::requestHandler(){ static bool firstData = true;//only the first time, we call onRequest if there's data buffered already. @@ -551,19 +593,7 @@ namespace Mist { sendHeader(); } if (!sought){ - if (myMeta.live){ - long unsigned int mainTrack = getMainSelectedTrack(); - //cancel if there are no keys in the main track - if (!myMeta.tracks.count(mainTrack) || !myMeta.tracks[mainTrack].keys.size()){break;} - //seek to the newest keyframe, unless that is <5s, then seek to the oldest keyframe - unsigned long long seekPos = myMeta.tracks[mainTrack].keys.rbegin()->getTime(); - if (seekPos < 5000){ - seekPos = myMeta.tracks[mainTrack].keys.begin()->getTime(); - } - seek(seekPos); - }else{ - seek(0); - } + initialSeek(); } if (prepareNext()){ if (thisPacket){ @@ -587,12 +617,19 @@ namespace Mist { if (thisTimeoutTries > timeoutTries) timeoutTries = thisTimeoutTries; } } + unsigned int mTrack = getMainSelectedTrack(); while(!completeKeyReady && timeoutTries>0){ - completeKeyReady = true; - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (!myMeta.tracks[*it].keys.size() || myMeta.tracks[*it].keys.rbegin()->getTime() + myMeta.tracks[*it].keys.rbegin()->getLength() <= thisPacket.getTime() ){ - completeKeyReady = false; - break; + if (!myMeta.tracks[mTrack].keys.size() || myMeta.tracks[mTrack].keys.rbegin()->getTime() <= thisPacket.getTime()){ + completeKeyReady = false; + }else{ + DTSC::Key & mustHaveKey = myMeta.tracks[mTrack].getKey(getKeyForTime(mTrack, thisPacket.getTime())); + unsigned long long mustHaveTime = mustHaveKey.getTime() + mustHaveKey.getLength(); + completeKeyReady = true; + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ + if (myMeta.tracks[*it].lastms < mustHaveTime){ + completeKeyReady = false; + break; + } } } if (!completeKeyReady){ diff --git a/src/output/output.h b/src/output/output.h index 17d95c60..bb7f734e 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -58,6 +58,7 @@ namespace Mist { bool prepareNext(); virtual void dropTrack(uint32_t trackId, std::string reason, bool probablyBad = true); virtual void onRequest(); + virtual void initialSeek(); virtual bool onFinish() { return false; } From a131557a43373f0bb4a9b95a823dd4e5f37c6df0 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 7 Jun 2016 23:24:52 +0200 Subject: [PATCH 2/6] Added proper buffering of not-yet-accepted tracks. --- src/io.cpp | 21 ++++++++++++++++----- src/io.h | 2 ++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/io.cpp b/src/io.cpp index 57fe1c71..cf6fe2b4 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -408,10 +408,25 @@ namespace Mist { //If the track is declined, stop here if (trackState[tid] == FILL_DEC) { INFO_MSG("Track %lu Declined", tid); + preBuffer[tid].clear(); return; } + //Not accepted yet? Buffer. + if (trackState[tid] != FILL_ACC) { + preBuffer[tid].push_back(packet); + }else{ + while (preBuffer[tid].size()){ + bufferSinglePacket(preBuffer[tid].front(), myMeta); + preBuffer[tid].pop_front(); + } + bufferSinglePacket(packet, myMeta); + } + } + + void negotiationProxy::bufferSinglePacket(DTSC::Packet & packet, DTSC::Meta & myMeta){ + //Store the trackid for easier access + unsigned long tid = packet.getTrackId(); //This update needs to happen whether the track is accepted or not. - ///\todo Figure out how to act with declined track here bool isKeyframe = false; if (myMeta.tracks[tid].type == "video") { if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) { @@ -462,10 +477,6 @@ namespace Mist { if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0){ return; } - //At this point we can stop parsing when the track is not accepted - if (trackState[tid] != FILL_ACC) { - return; - } //Check if the correct page is opened if (!curPageNum.count(tid) || nextPageNum != curPageNum[tid]) { diff --git a/src/io.h b/src/io.h index 707484fb..813473d1 100644 --- a/src/io.h +++ b/src/io.h @@ -32,6 +32,7 @@ namespace Mist { void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta); void bufferFinalize(unsigned long tid, DTSC::Meta &myMeta); void bufferLivePacket(DTSC::Packet & packet, DTSC::Meta & myMeta); + void bufferSinglePacket(DTSC::Packet & packet, DTSC::Meta & myMeta); bool isBuffered(unsigned long tid, unsigned long keyNum); unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum); @@ -47,6 +48,7 @@ namespace Mist { std::map metaPages;///< For each track, holds the page that describes which dataPages are mapped std::map curPageNum;///< For each track, holds the number page that is currently being written. std::map curPage;///< For each track, holds the page that is currently being written. + std::map > preBuffer;///< For each track, holds to-be-buffered packets. IPC::sharedClient userClient;///< Shared memory used for connection to Mixer process. From 055866e2fb33f5e23849931fffc36530989cf407 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 10 Jun 2016 14:20:17 +0200 Subject: [PATCH 3/6] Tweaked seek/isReadyForPlay implementations, fix HLS support for large key durations. --- src/input/input_buffer.cpp | 5 ++++- src/output/output.cpp | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index a7e9592b..d8313ce4 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -208,9 +208,12 @@ namespace Mist { } bool inputBuffer::removeKey(unsigned int tid) { - if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active) { + //Make sure we have at least 3 whole fragments at all times, + //unless we're shutting down the whole buffer right now + if (myMeta.tracks[tid].fragments.size() < 5 && config->is_active) { return false; } + //If we're shutting down, and this track is empty, abort if (!myMeta.tracks[tid].keys.size()) { return false; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 9f5f7ed9..fe1dc432 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -129,7 +129,7 @@ namespace Mist { selectDefaultTracks(); } unsigned int mainTrack = getMainSelectedTrack(); - if (mainTrack && myMeta.tracks.count(mainTrack) && myMeta.tracks[mainTrack].keys.size() >= 2){ + if (mainTrack && myMeta.tracks.count(mainTrack) && (myMeta.tracks[mainTrack].keys.size() >= 2 || myMeta.tracks[mainTrack].lastms - myMeta.tracks[mainTrack].firstms > 5000)){ return true; }else{ HIGH_MSG("NOT READY YET (%lu tracks, %lu = %lu keys)", myMeta.tracks.size(), getMainSelectedTrack(), myMeta.tracks[getMainSelectedTrack()].keys.size()); From 722738ffb008f3425032712a01474f8039b67ed6 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 10 Jun 2016 14:22:06 +0200 Subject: [PATCH 4/6] Fixed VoD slow initial load problem. --- src/output/output.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index fe1dc432..3f4b80ad 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -62,9 +62,9 @@ namespace Mist { if (nProxy.metaPages[0].mapped){ IPC::semaphore * liveSem = 0; if (!myMeta.vod){ - static char liveSemName[NAME_BUFFER_SIZE]; - snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1); + static char liveSemName[NAME_BUFFER_SIZE]; + snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); + liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live); if (*liveSem){ liveSem->wait(); }else{ From 903786df605ea7ed025899ce871ae4e192d296a9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 9 Jun 2016 12:46:33 +0200 Subject: [PATCH 5/6] Add buffer/select mismatch check in Output class, tweak seek abort debug message levels to be WARN instead of INFO. --- src/output/output.cpp | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 3f4b80ad..096635a5 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -473,20 +473,20 @@ namespace Mist { bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){ if (myMeta.tracks[tid].lastms < pos){ - INFO_MSG("Aborting seek to %llums in track %u: past end of track (= %llums).", pos, tid, myMeta.tracks[tid].lastms); + WARN_MSG("Aborting seek to %llums in track %u: past end of track (= %llums).", pos, tid, myMeta.tracks[tid].lastms); selectedTracks.erase(tid); return false; } unsigned int keyNum = getKeyForTime(tid, pos); if (myMeta.tracks[tid].getKey(keyNum).getTime() > pos){ if (myMeta.live){ - INFO_MSG("Actually seeking to %d, for %d is not available any more", myMeta.tracks[tid].getKey(keyNum).getTime(), pos); + WARN_MSG("Actually seeking to %d, for %d is not available any more", myMeta.tracks[tid].getKey(keyNum).getTime(), pos); pos = myMeta.tracks[tid].getKey(keyNum).getTime(); } } loadPageForKey(tid, keyNum + (getNextKey?1:0)); if (!nProxy.curPage.count(tid) || !nProxy.curPage[tid].mapped){ - INFO_MSG("Aborting seek to %llums in track %u: not available.", pos, tid); + WARN_MSG("Aborting seek to %llums in track %u: not available.", pos, tid); selectedTracks.erase(tid); return false; } @@ -709,6 +709,39 @@ namespace Mist { INFO_MSG("Buffer completely played out"); return true; } + //check if we have a next seek point for every track that is selected + if (buffer.size() != selectedTracks.size()){ + std::set dropTracks; + if (buffer.size() < selectedTracks.size()){ + //prepare to drop any selectedTrack without buffe entry + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ + bool found = false; + for (std::set::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){ + if (bi->tid == *it){ + found = true; + break; + } + } + if (!found){ + dropTracks.insert(*it); + } + } + }else{ + //prepare to drop any buffer entry without selectedTrack + for (std::set::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){ + if (!selectedTracks.count(bi->tid)){ + dropTracks.insert(bi->tid); + } + } + } + //actually drop what we found. + //if both of the above cases occur, the next prepareNext iteration will take care of that + for (std::set::iterator it = dropTracks.begin(); it != dropTracks.end(); ++it){ + dropTrack(*it, "seek/select mismatch", true); + } + return false; + } + sortedPageInfo nxt = *(buffer.begin()); if (!myMeta.tracks.count(nxt.tid)){ From ea18815676b776766763491d2024ac45d13e73bd Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 9 Jun 2016 15:17:40 +0200 Subject: [PATCH 6/6] Fixed slow embed code (internal HTTP output) problem --- src/output/output.cpp | 1 + src/output/output_http_internal.cpp | 5 +++++ src/output/output_http_internal.h | 1 + 3 files changed, 7 insertions(+) diff --git a/src/output/output.cpp b/src/output/output.cpp index 096635a5..c772a88d 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -139,6 +139,7 @@ namespace Mist { } return false; } + /// Connects or reconnects to the stream. /// Assumes streamName class member has been set already. /// Will start input if not currently active, calls onFail() if this does not succeed. diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 0fbcefe4..9ebb950d 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -38,6 +38,11 @@ namespace Mist { } Output::onFail(); } + + /// We assume it's ready to play if there is at least one track available + bool OutHTTP::isReadyForPlay() { + return myMeta.tracks.size(); + } void OutHTTP::init(Util::Config * cfg){ HTTPOutput::init(cfg); diff --git a/src/output/output_http_internal.h b/src/output/output_http_internal.h index 4f025afa..479d37a4 100644 --- a/src/output/output_http_internal.h +++ b/src/output/output_http_internal.h @@ -10,6 +10,7 @@ namespace Mist { static bool listenMode(); virtual void onFail(); void onHTTP(); + bool isReadyForPlay(); }; }