From 22cd84fcd5cc60a2fa6f8a65ac911bacf7560ffd Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 11 Oct 2016 15:02:29 +0200 Subject: [PATCH 1/2] Added options to DTSC pull for controlling the buffer config. Made 8X target duration the default buffer size. Inputs now wait for INPUT_TIMEOUT *and* two biggestFragment durations, added a few helper functions to DTSC::Meta. Buffer now automatically sets segment size to min(configed_size, biggest_fragment/2) to prevent sudden reductions of fragment sizes by more than 50% --- lib/dtsc.h | 2 ++ lib/dtscmeta.cpp | 22 ++++++++++++++++++++++ src/input/input.cpp | 2 +- src/input/input_buffer.cpp | 8 ++++---- src/input/input_dtsc.cpp | 14 ++++++++++++++ 5 files changed, 43 insertions(+), 5 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index 02b32f30..e09d44ee 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -326,6 +326,8 @@ namespace DTSC { void toPrettyString(std::ostream & str, int indent = 0, int verbosity = 0); //members: std::map tracks; + Track & mainTrack(); + uint32_t biggestFragment(); bool vod; bool live; bool merged; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 5f28ad1b..d2c1b371 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -1347,6 +1347,28 @@ namespace DTSC { } } + /// Returns a reference to the first video track, or the first track. + /// Beware: returns a reference to invalid memory if there are no tracks! + /// Will print a WARN-level message if this is the case. + Track & Meta::mainTrack(){ + if (!tracks.size()){ + WARN_MSG("Returning nonsense reference - crashing is likely"); + return tracks.begin()->second; + } + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + if (it->second.type == "video"){ + return it->second; + } + } + return tracks.begin()->second; + } + + /// Returns 0 if there are no tracks, otherwise calls mainTrack().biggestFragment(). + uint32_t Meta::biggestFragment(){ + if (!tracks.size()){return 0;} + return mainTrack().biggestFragment(); + } + ///\brief Converts a track to a human readable string ///\param str The stringstream to append to ///\param indent the amount of indentation needed diff --git a/src/input/input.cpp b/src/input/input.cpp index cb1ee0b9..601e2a40 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -170,7 +170,7 @@ namespace Mist { DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str()); long long int activityCounter = Util::bootSecs(); - while ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT && config->is_active) { //15 second timeout + while (((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)) && config->is_active) { //15 second timeout userPage.parseEach(callbackWrapper); removeUnused(); if (userPage.connectedUsers) { diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 76095fb2..16795a58 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -27,7 +27,7 @@ namespace Mist { option["value"].append(50000LL); config->addOption("bufferTime", option); capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; - capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in milliseconds. This is the time available to seek around in, and will automatically be extended to fit whole keyframes."; + capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in milliseconds. This is the time available to seek around in, and will automatically be extended to fit whole keyframes as well as the minimum duration needed for stable playback."; capa["optional"]["DVR"]["option"] = "--buffer"; capa["optional"]["DVR"]["type"] = "uint"; capa["optional"]["DVR"]["default"] = 50000LL; @@ -219,14 +219,14 @@ namespace Mist { return false; } if (config->is_active && Trk.fragments.size() > 2){ - ///Make sure we have at least 3X the target duration. + ///Make sure we have at least 8X the target duration. //The target duration is the biggest fragment, rounded up to whole seconds. uint32_t targetDuration = (Trk.biggestFragment() / 1000 + 1) * 1000; //The start is the third fragment's begin uint32_t fragStart = Trk.getKey((++(++Trk.fragments.begin()))->getNumber()).getTime(); //The end is the last fragment's begin uint32_t fragEnd = Trk.getKey(Trk.fragments.rbegin()->getNumber()).getTime(); - if ((fragEnd - fragStart) < targetDuration * 3){ + if ((fragEnd - fragStart) < targetDuration * 8){ return false; } } @@ -754,7 +754,7 @@ namespace Mist { } //if the new value is different, print a message and apply it if (resumeMode != (bool)tmpNum) { - DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode ? "enabled" : "disabled", tmpNum ? "enabled" : "disabled"); + INFO_MSG("Setting resume mode from %s to new value of %s", resumeMode ? "enabled" : "disabled", tmpNum ? "enabled" : "disabled"); resumeMode = tmpNum; } diff --git a/src/input/input_dtsc.cpp b/src/input/input_dtsc.cpp index 180f0e36..236304d1 100644 --- a/src/input/input_dtsc.cpp +++ b/src/input/input_dtsc.cpp @@ -26,6 +26,20 @@ namespace Mist { capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("MP3"); capa["codecs"][0u][1u].append("vorbis"); + + + JSON::Value option; + option["arg"] = "integer"; + option["long"] = "buffer"; + option["short"] = "b"; + option["help"] = "Live stream DVR buffer time in ms"; + option["value"].append(50000LL); + config->addOption("bufferTime", option); + capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; + capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in milliseconds. This is the time available to seek around in, and will automatically be extended to fit whole keyframes as well as the minimum duration needed for stable playback."; + capa["optional"]["DVR"]["option"] = "--buffer"; + capa["optional"]["DVR"]["type"] = "uint"; + capa["optional"]["DVR"]["default"] = 50000LL; } bool inputDTSC::needsLock(){ From f22d95b9744cd74a0897b86076348bca4d5a2560 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 25 Sep 2016 13:58:41 +0200 Subject: [PATCH 2/2] Prevent memory leak when receiving media loops at a page boundary --- src/io.cpp | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/src/io.cpp b/src/io.cpp index d17934b5..a38f0d6c 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -146,6 +146,7 @@ namespace Mist { } if (!inserted){ FAIL_MSG("Could not insert page in track index. Aborting."); + curPage[tid].master = true;//set this page for instant-deletion when we're done with it return false; } } @@ -260,6 +261,7 @@ namespace Mist { static bool multiWrong = false; //Save the trackid of the track for easier access unsigned long tid = pack.getTrackId(); + //these checks were already done in bufferSinglePacket, but we check again just to be sure if (myMeta.live && pack.getTime() > 0xFFFF0000 && !myMeta.tracks[tid].lastms){ return;//ignore bullshit timestamps } @@ -370,14 +372,17 @@ namespace Mist { //Print a message about registering the page or not. if (!inserted) { - FAIL_MSG("Can't register page %lu on the metaPage of track %lu~>%lu, No empty spots left within 'should be' amount of slots", curPageNum[tid], tid, mapTid); + FAIL_MSG("Can't register %lu on the metaPage of %s track %lu~>%lu, No empty spots left. Deleting.", curPageNum[tid], streamName.c_str(), tid, mapTid); + //Since the buffer can't see it - we should delete it ourselves, now. + curPage[tid].master = true; } else { - HIGH_MSG("Succesfully registered page %lu on the metaPage of track %lu~>%lu.", curPageNum[tid], tid, mapTid); + HIGH_MSG("Registered %lu on the metaPage of %s track %lu~>%lu.", curPageNum[tid], streamName.c_str(), tid, mapTid); +#if defined(__CYGWIN__) || defined(_WIN32) + IPC::preservePage(curPage[tid].name); +#endif } //Close our link to the page. This will NOT destroy the shared page, as we've set master to false upon construction -#if defined(__CYGWIN__) || defined(_WIN32) - IPC::preservePage(curPage[tid].name); -#endif + //Note: if there was a registering failure above, this WILL destroy the shared page, to prevent a memory leak curPage.erase(tid); curPageNum.erase(tid); } @@ -457,6 +462,19 @@ namespace Mist { } } } + + //For live streams, ignore packets that make no sense + //This also happens in bufferNext, with the same rules + if (myMeta.live){ + if (packet.getTime() > 0xFFFF0000 && !myMeta.tracks[tid].lastms){ + return;//ignore bullshit timestamps + } + if (packet.getTime() < myMeta.tracks[tid].lastms){ + HIGH_MSG("Wrong order on track %lu ignored: %lu < %lu", tid, packet.getTime(), myMeta.tracks[tid].lastms); + return; + } + } + //Determine if we need to open the next page int nextPageNum = -1; if (isKeyframe && trackState[tid] == FILL_ACC) { @@ -501,7 +519,11 @@ namespace Mist { bufferFinalize(tid, myMeta); } //Open the new page - bufferStart(tid, nextPageNum, myMeta); + if (!bufferStart(tid, nextPageNum, myMeta)){ + //if this fails, return instantly without actually buffering the packet + WARN_MSG("Dropping packet %s:%llu@%llu", streamName.c_str(), tid, packet.getTime()); + return; + } } //Buffer the packet bufferNext(packet, myMeta);