From b839a9f618149bd119b9dac3feb0a4305f9bf86e Mon Sep 17 00:00:00 2001 From: Ramoe Date: Mon, 27 Aug 2018 14:10:07 +0200 Subject: [PATCH 1/2] EBML tweaks --- src/input/input_ebml.cpp | 7 ++++++- src/input/input_ebml.h | 1 + src/output/output_ebml.h | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/input/input_ebml.cpp b/src/input/input_ebml.cpp index b7b88fd6..8401c0c8 100644 --- a/src/input/input_ebml.cpp +++ b/src/input/input_ebml.cpp @@ -131,7 +131,12 @@ namespace Mist{ if (inFile == stdin){ lastClusterBPos = 0; }else{ - lastClusterBPos = Util::ftell(inFile); + int64_t bp = Util::ftell(inFile); + if(bp == -1 && errno == ESPIPE){ + lastClusterBPos = 0; + }else{ + lastClusterBPos = bp; + } } DONTEVEN_MSG("Found a cluster at position %llu", lastClusterBPos); } diff --git a/src/input/input_ebml.h b/src/input/input_ebml.h index 4b433f0c..057bcd8c 100644 --- a/src/input/input_ebml.h +++ b/src/input/input_ebml.h @@ -1,3 +1,4 @@ +#pragma once #include "input.h" #include diff --git a/src/output/output_ebml.h b/src/output/output_ebml.h index 37242bdf..35943a3e 100644 --- a/src/output/output_ebml.h +++ b/src/output/output_ebml.h @@ -1,3 +1,4 @@ +#pragma once #include "output_http.h" namespace Mist{ @@ -7,7 +8,7 @@ namespace Mist{ static void init(Util::Config *cfg); void onHTTP(); void sendNext(); - void sendHeader(); + virtual void sendHeader(); uint32_t clusterSize(uint64_t start, uint64_t end); private: From 33488da329e1f4476432e09ba65edb4024e9504a Mon Sep 17 00:00:00 2001 From: Ramoe Date: Wed, 4 Apr 2018 14:03:49 +0200 Subject: [PATCH 2/2] Added singular mode override for inputs --- lib/stream.cpp | 2 +- src/input/input.cpp | 81 ++++++++++++++++++++++++++----------------- src/input/input.h | 4 +++ src/io.cpp | 1 - src/io.h | 1 - src/output/output.cpp | 1 + src/output/output.h | 1 + 7 files changed, 56 insertions(+), 35 deletions(-) diff --git a/lib/stream.cpp b/lib/stream.cpp index 010246c4..eb06bf6e 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -244,7 +244,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno)); return false; } - if (pid && filename.substr(0, 21) == "push://INTERNAL_ONLY:"){ + if (pid && overrides.count("singular")){ Util::Procs::setHandler(); Util::Procs::remember(pid); } diff --git a/src/input/input.cpp b/src/input/input.cpp index 19d9c2be..d6093bde 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -13,6 +13,7 @@ namespace Mist { Input * Input::singleton = NULL; + Util::Config * Input::config = NULL; void Input::userCallback(char * data, size_t len, unsigned int id) { for (int i = 0; i < SIMUL_TRACKS; i++) { @@ -343,45 +344,53 @@ namespace Mist { /// - call getNext() in a loop, buffering packets void Input::stream(){ IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; + if(isSingular()){ + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return; + } + + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return; + } + + if (Util::streamAlive(streamName)){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + WARN_MSG("Stream already online, cancelling"); + return; + } } - if (Util::streamAlive(streamName)){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - WARN_MSG("Stream already online, cancelling"); - return; - } std::map overrides; overrides["throughboot"] = ""; + if(isSingular()){ + overrides["singular"] = ""; + } + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } WARN_MSG("Could not start buffer, cancelling"); return; } - char userPageName[NAME_BUFFER_SIZE]; - snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); - nProxy.userClient.countAsViewer = false; - + INFO_MSG("Input for stream %s started", streamName.c_str()); if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); - pullLock.post(); - pullLock.close(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + } return; } parseStreamHeader(); @@ -389,12 +398,18 @@ namespace Mist { if (myMeta.tracks.size() == 0){ nProxy.userClient.finish(); finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } INFO_MSG("No tracks found, cancelling"); return; } + + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); nProxy.userClient.countAsViewer = false; for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ @@ -409,9 +424,11 @@ namespace Mist { nProxy.userClient.finish(); finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); + if(isSingular()){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + } INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); return; } diff --git a/src/input/input.h b/src/input/input.h index 3bc837e2..30727947 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -25,12 +25,16 @@ namespace Mist { virtual ~Input() {}; virtual bool needsLock(){return true;} + + static Util::Config * config; + protected: static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool checkArguments() = 0; virtual bool readHeader() = 0; virtual bool needHeader(){return !readExistingHeader();} virtual bool preRun(){return true;} + virtual bool isSingular(){return true;} virtual bool readExistingHeader(); virtual bool atKeyFrame(); virtual void getNext(bool smart = true) {} diff --git a/src/io.cpp b/src/io.cpp index 68c7ac1f..0decc495 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -7,7 +7,6 @@ #include "io.h" namespace Mist { - Util::Config * InOutBase::config = NULL; ///Opens a shared memory page for the stream metadata. /// ///Assumes myMeta contains the metadata to write. diff --git a/src/io.h b/src/io.h index a04d4f3a..49765f74 100644 --- a/src/io.h +++ b/src/io.h @@ -78,7 +78,6 @@ namespace Mist { bool standAlone; - static Util::Config * config; negotiationProxy nProxy; diff --git a/src/output/output.cpp b/src/output/output.cpp index 646a7a2c..a7505e7c 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -16,6 +16,7 @@ namespace Mist{ JSON::Value Output::capa = JSON::Value(); + Util::Config * Output::config = NULL; int getDTSCLen(char * mapped, long long int offset){ return Bit::btohl(mapped + offset + 4); diff --git a/src/output/output.h b/src/output/output.h index b3e306ac..d6daab76 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -74,6 +74,7 @@ namespace Mist { virtual void sendHeader(); virtual void onFail(); virtual void requestHandler(); + static Util::Config * config; private://these *should* not be messed with in child classes. std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum);