diff --git a/lib/stream.cpp b/lib/stream.cpp index 010246c4..eb06bf6e 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -244,7 +244,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno)); return false; } - if (pid && filename.substr(0, 21) == "push://INTERNAL_ONLY:"){ + if (pid && overrides.count("singular")){ Util::Procs::setHandler(); Util::Procs::remember(pid); } diff --git a/src/input/input.cpp b/src/input/input.cpp index 19d9c2be..d6093bde 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -13,6 +13,7 @@ namespace Mist { Input * Input::singleton = NULL; + Util::Config * Input::config = NULL; void Input::userCallback(char * data, size_t len, unsigned int id) { for (int i = 0; i < SIMUL_TRACKS; i++) { @@ -343,45 +344,53 @@ namespace Mist { /// - call getNext() in a loop, buffering packets void Input::stream(){ IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; + if(isSingular()){ + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return; + } + + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return; + } + + if (Util::streamAlive(streamName)){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + WARN_MSG("Stream already online, cancelling"); + return; + } } - if (Util::streamAlive(streamName)){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - WARN_MSG("Stream already online, cancelling"); - return; - } std::map overrides; overrides["throughboot"] = ""; + if(isSingular()){ + overrides["singular"] = ""; + } + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } WARN_MSG("Could not start buffer, cancelling"); return; } - char userPageName[NAME_BUFFER_SIZE]; - snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); - nProxy.userClient.countAsViewer = false; - + INFO_MSG("Input for stream %s started", streamName.c_str()); if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); - pullLock.post(); - pullLock.close(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + } return; } parseStreamHeader(); @@ -389,12 +398,18 @@ namespace Mist { if (myMeta.tracks.size() == 0){ nProxy.userClient.finish(); finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } INFO_MSG("No tracks found, cancelling"); return; } + + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); nProxy.userClient.countAsViewer = false; for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ @@ -409,9 +424,11 @@ namespace Mist { nProxy.userClient.finish(); finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); return; } diff --git a/src/input/input.h b/src/input/input.h index 3bc837e2..30727947 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -25,12 +25,16 @@ namespace Mist { virtual ~Input() {}; virtual bool needsLock(){return true;} + + static Util::Config * config; + protected: static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool checkArguments() = 0; virtual bool readHeader() = 0; virtual bool needHeader(){return !readExistingHeader();} virtual bool preRun(){return true;} + virtual bool isSingular(){return true;} virtual bool readExistingHeader(); virtual bool atKeyFrame(); virtual void getNext(bool smart = true) {} diff --git a/src/io.cpp b/src/io.cpp index 68c7ac1f..0decc495 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -7,7 +7,6 @@ #include "io.h" namespace Mist { - Util::Config * InOutBase::config = NULL; ///Opens a shared memory page for the stream metadata. /// ///Assumes myMeta contains the metadata to write. diff --git a/src/io.h b/src/io.h index a04d4f3a..49765f74 100644 --- a/src/io.h +++ b/src/io.h @@ -78,7 +78,6 @@ namespace Mist { bool standAlone; - static Util::Config * config; negotiationProxy nProxy; diff --git a/src/output/output.cpp b/src/output/output.cpp index 646a7a2c..a7505e7c 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -16,6 +16,7 @@ namespace Mist{ JSON::Value Output::capa = JSON::Value(); + Util::Config * Output::config = NULL; int getDTSCLen(char * mapped, long long int offset){ return Bit::btohl(mapped + offset + 4); diff --git a/src/output/output.h b/src/output/output.h index b3e306ac..d6daab76 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -74,6 +74,7 @@ namespace Mist { virtual void sendHeader(); virtual void onFail(); virtual void requestHandler(); + static Util::Config * config; private://these *should* not be messed with in child classes. std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum);