diff --git a/lib/stream.cpp b/lib/stream.cpp index 5df8f7f5..08dad389 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -331,7 +331,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno)); return false; } - if (pid && filename.substr(0, 21) == "push://INTERNAL_ONLY:"){ + if (pid && overrides.count("singular")){ Util::Procs::setHandler(); Util::Procs::remember(pid); } diff --git a/src/input/input.cpp b/src/input/input.cpp index b5991fe0..339033ce 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -14,6 +14,7 @@ namespace Mist { Input * Input::singleton = NULL; + Util::Config * Input::config = NULL; void Input::userCallback(char * data, size_t len, unsigned int id) { for (int i = 0; i < SIMUL_TRACKS; i++) { @@ -502,45 +503,53 @@ namespace Mist { /// - call getNext() in a loop, buffering packets void Input::stream(){ IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; + if(isSingular()){ + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return; + } + + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return; + } + + if (Util::streamAlive(streamName)){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + WARN_MSG("Stream already online, cancelling"); + return; + } } - if (Util::streamAlive(streamName)){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - WARN_MSG("Stream already online, cancelling"); - return; - } std::map overrides; overrides["throughboot"] = ""; + if(isSingular()){ + overrides["singular"] = ""; + } + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } WARN_MSG("Could not start buffer, cancelling"); return; } - char userPageName[NAME_BUFFER_SIZE]; - snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); - nProxy.userClient.countAsViewer = false; - + INFO_MSG("Input for stream %s started", streamName.c_str()); if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); - pullLock.post(); - pullLock.close(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + } return; } parseStreamHeader(); @@ -548,12 +557,18 @@ namespace Mist { if (myMeta.tracks.size() == 0){ nProxy.userClient.finish(); finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } INFO_MSG("No tracks found, cancelling"); return; } + + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); nProxy.userClient.countAsViewer = false; for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ @@ -568,9 +583,11 @@ namespace Mist { nProxy.userClient.finish(); finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); return; } diff --git a/src/input/input.h b/src/input/input.h index 0f2b53b0..3ae390b8 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -26,12 +26,16 @@ namespace Mist { virtual ~Input() {}; virtual bool needsLock(){return true;} + + static Util::Config * config; + protected: static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool checkArguments() = 0; virtual bool readHeader() = 0; virtual bool needHeader(){return !readExistingHeader();} virtual bool preRun(){return true;} + virtual bool isSingular(){return true;} virtual bool readExistingHeader(); virtual bool atKeyFrame(); virtual void getNext(bool smart = true) {} diff --git a/src/input/input_ebml.cpp b/src/input/input_ebml.cpp index 91093852..bcbf1505 100644 --- a/src/input/input_ebml.cpp +++ b/src/input/input_ebml.cpp @@ -132,7 +132,12 @@ namespace Mist{ if (inFile == stdin){ lastClusterBPos = 0; }else{ - lastClusterBPos = Util::ftell(inFile); + int64_t bp = Util::ftell(inFile); + if(bp == -1 && errno == ESPIPE){ + lastClusterBPos = 0; + }else{ + lastClusterBPos = bp; + } } DONTEVEN_MSG("Found a cluster at position %llu", lastClusterBPos); } diff --git a/src/input/input_ebml.h b/src/input/input_ebml.h index 4b433f0c..057bcd8c 100644 --- a/src/input/input_ebml.h +++ b/src/input/input_ebml.h @@ -1,3 +1,4 @@ +#pragma once #include "input.h" #include diff --git a/src/io.cpp b/src/io.cpp index 54e0c7be..d9fd12d9 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -7,7 +7,6 @@ #include "io.h" namespace Mist { - Util::Config * InOutBase::config = NULL; ///Opens a shared memory page for the stream metadata. /// ///Assumes myMeta contains the metadata to write. diff --git a/src/io.h b/src/io.h index 10d36e1e..9154ac76 100644 --- a/src/io.h +++ b/src/io.h @@ -85,7 +85,6 @@ namespace Mist { bool standAlone; - static Util::Config * config; negotiationProxy nProxy; diff --git a/src/output/output.cpp b/src/output/output.cpp index fe2370d7..1d4d82f5 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -25,6 +25,7 @@ namespace Mist{ JSON::Value Output::capa = JSON::Value(); + Util::Config * Output::config = NULL; int getDTSCLen(char * mapped, long long int offset){ return Bit::btohl(mapped + offset + 4); diff --git a/src/output/output.h b/src/output/output.h index a5511d66..915ac290 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -80,6 +80,7 @@ namespace Mist { virtual void sendHeader(); virtual void onFail(); virtual void requestHandler(); + static Util::Config * config; private://these *should* not be messed with in child classes. /*LTS-START*/ void Log(std::string type, std::string message); diff --git a/src/output/output_ebml.h b/src/output/output_ebml.h index bb7f7de7..25dda375 100644 --- a/src/output/output_ebml.h +++ b/src/output/output_ebml.h @@ -1,3 +1,4 @@ +#pragma once #include "output_http.h" namespace Mist{ @@ -7,7 +8,7 @@ namespace Mist{ static void init(Util::Config *cfg); void onHTTP(); void sendNext(); - void sendHeader(); + virtual void sendHeader(); uint32_t clusterSize(uint64_t start, uint64_t end); private: