From 440596c117bb781f63909910f0bae4fc40e3e678 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 18 Nov 2016 15:33:25 +0100 Subject: [PATCH 1/4] Added crash handler for Pull-mode inputs --- src/input/input.cpp | 23 ++++++++++++++++++++--- src/input/input.h | 2 +- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index 8a2ff609..786354a1 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -126,7 +126,18 @@ namespace Mist { return 0; } - void Input::convert(){ + /// Default crash handler, cleans up Pull semaphore on crashes + void Input::onCrash(){ + if (streamName.size() && !needsLock()) { + //we have a Pull semaphore to clean up, do it + IPC::semaphore pullLock; + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + pullLock.close(); + pullLock.unlink(); + } + } + + void Input::convert() { //check filename for no - if (config->getString("output") != "-"){ std::string filename = config->getString("output"); @@ -205,12 +216,18 @@ namespace Mist { } /// Main loop for stream-style inputs. - /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by .... + /// This loop will do the following, in order: + /// - exit if another stream() input is already open for this streamname + /// - start a buffer in push mode + /// - connect to it + /// - run parseStreamHeader + /// - if there are tracks, register as a non-viewer on the user page of the buffer + /// - call getNext() in a loop, buffering packets void Input::stream(){ IPC::semaphore pullLock; pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!pullLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str()); + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); pullLock.close(); return; } diff --git a/src/input/input.h b/src/input/input.h index 88d8ddd2..09403839 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -20,7 +20,7 @@ namespace Mist { public: Input(Util::Config * cfg); virtual int run(); - virtual void onCrash(){} + virtual void onCrash(); virtual void argumentsParsed(){} virtual ~Input() {}; From ee4e0461b4f48ffa321f90b0b67adcba7a479eb9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 17 Nov 2016 13:45:06 +0100 Subject: [PATCH 2/4] Saver thisPacket handling in generic Output class, better documentation of internals --- src/output/output.cpp | 17 ++++++++++++----- src/output/output.h | 2 ++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index a42128c2..b52dc721 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -418,7 +418,10 @@ namespace Mist{ } return highest; } - + + /// Loads the page for the given trackId and keyNum into memory. + /// Overwrites any existing page for the same trackId. + /// Automatically calls thisPacket.null() if necessary. void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ if (!myMeta.tracks.count(trackId) || !myMeta.tracks[trackId].keys.size()){ WARN_MSG("Load for track %lu key %lld aborted - track is empty", trackId, keyNum); @@ -469,6 +472,10 @@ namespace Mist{ if (currKeyOpen.count(trackId) && currKeyOpen[trackId] == (unsigned int)pageNum){ return; } + //If we're loading the track thisPacket is on, null it to prevent accesses. + if (thisPacket && thisPacket.getTrackId() == trackId){ + thisPacket.null(); + } char id[NAME_BUFFER_SIZE]; snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackId, pageNum); nProxy.curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE); @@ -786,7 +793,7 @@ namespace Mist{ } ///Attempts to prepare a new packet for output. - ///If thisPacket evaluates to false, playback has completed. + ///If it returns true and thisPacket evaluates to false, playback has completed. ///Could be called repeatedly in a loop if you really really want a new packet. /// \returns true if thisPacket was filled with the next packet. /// \returns false if we could not reliably determine the next packet yet. @@ -844,7 +851,9 @@ namespace Mist{ //if we're going to read past the end of the data page, load the next page //this only happens for VoD if (nxt.offset >= nProxy.curPage[nxt.tid].len){ - nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); + if (thisPacket){ + nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); + } loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]); nxt.offset = 0; if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){ @@ -857,7 +866,6 @@ namespace Mist{ buffer.insert(nxt); } }else{ - thisPacket.null(); dropTrack(nxt.tid, "page load failure", true); } return false; @@ -898,7 +906,6 @@ namespace Mist{ //The next key showed up on another page! //We've simply reached the end of the page. Load the next key = next page. loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]); - thisPacket.null(); nxt.offset = 0; if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){ unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset); diff --git a/src/output/output.h b/src/output/output.h index 3d0da809..6e8fd266 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -58,6 +58,8 @@ namespace Mist { uint32_t currTrackCount() const; virtual bool isReadyForPlay(); //virtuals. The optional virtuals have default implementations that do as little as possible. + /// This function is called whenever a packet is ready for sending. + /// Inside it, thisPacket is guaranteed to contain a valid packet. virtual void sendNext() {}//REQUIRED! Others are optional. bool prepareNext(); virtual void dropTrack(uint32_t trackId, std::string reason, bool probablyBad = true); From bdb1578ba65cc472aefc5162c0c8bb6e98c1f7e4 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 17 Nov 2016 13:53:17 +0100 Subject: [PATCH 3/4] Robustified RTMP thisPacket handling --- src/output/output_rtmp.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 3cb81b3e..319bf191 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -131,7 +131,7 @@ namespace Mist { // erase & pos = nextpos + 1; } - if (trackSwitch){ + if (trackSwitch && thisPacket){ seek(thisPacket.getTime()); } } From 42eca60cfc7fe49749d0ef816b227b8fbfea3a5c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 18 Nov 2016 00:04:17 +0100 Subject: [PATCH 4/4] Added forget and remember functions to process library --- lib/procs.cpp | 14 +++++++++++++- lib/procs.h | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/procs.cpp b/lib/procs.cpp index ba984b10..69e38299 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -444,6 +444,18 @@ int Util::Procs::Count() { /// Returns true if a process with this PID is currently active. bool Util::Procs::isActive(pid_t name) { tthread::lock_guard guard(plistMutex); - return (plist.count(name) == 1) && (kill(name, 0) == 0); + return (kill(name, 0) == 0); +} + +/// Forget about the given PID, keeping it running on shutdown. +void Util::Procs::forget(pid_t pid) { + tthread::lock_guard guard(plistMutex); + plist.erase(pid); +} + +/// Remember the given PID, killing it on shutdown. +void Util::Procs::remember(pid_t pid) { + tthread::lock_guard guard(plistMutex); + plist.insert(pid); } diff --git a/lib/procs.h b/lib/procs.h index 2c07cda8..667f570b 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -38,6 +38,8 @@ namespace Util { static int Count(); static bool isActive(pid_t name); static bool isRunning(pid_t pid); + static void forget(pid_t pid); + static void remember(pid_t pid); static std::set socketList; ///< Holds sockets that should be closed before forking }; }