Added singular mode override for inputs

This commit is contained in:
Ramoe 2018-04-04 14:03:49 +02:00 committed by Thulinma
parent b839a9f618
commit 33488da329
7 changed files with 56 additions and 35 deletions

View file

@ -244,7 +244,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno)); FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno));
return false; return false;
} }
if (pid && filename.substr(0, 21) == "push://INTERNAL_ONLY:"){ if (pid && overrides.count("singular")){
Util::Procs::setHandler(); Util::Procs::setHandler();
Util::Procs::remember(pid); Util::Procs::remember(pid);
} }

View file

@ -13,6 +13,7 @@
namespace Mist { namespace Mist {
Input * Input::singleton = NULL; Input * Input::singleton = NULL;
Util::Config * Input::config = NULL;
void Input::userCallback(char * data, size_t len, unsigned int id) { void Input::userCallback(char * data, size_t len, unsigned int id) {
for (int i = 0; i < SIMUL_TRACKS; i++) { for (int i = 0; i < SIMUL_TRACKS; i++) {
@ -343,45 +344,53 @@ namespace Mist {
/// - call getNext() in a loop, buffering packets /// - call getNext() in a loop, buffering packets
void Input::stream(){ void Input::stream(){
IPC::semaphore pullLock; IPC::semaphore pullLock;
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if(isSingular()){
if (!pullLock){ pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); if (!pullLock){
return; FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str());
} return;
if (!pullLock.tryWait()){ }
WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
pullLock.close(); if (!pullLock.tryWait()){
return; WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
pullLock.close();
return;
}
if (Util::streamAlive(streamName)){
pullLock.post();
pullLock.close();
pullLock.unlink();
WARN_MSG("Stream already online, cancelling");
return;
}
} }
if (Util::streamAlive(streamName)){
pullLock.post();
pullLock.close();
pullLock.unlink();
WARN_MSG("Stream already online, cancelling");
return;
}
std::map<std::string, std::string> overrides; std::map<std::string, std::string> overrides;
overrides["throughboot"] = ""; overrides["throughboot"] = "";
if(isSingular()){
overrides["singular"] = "";
}
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer
pullLock.post(); if(isSingular()){
pullLock.close(); pullLock.post();
pullLock.unlink(); pullLock.close();
pullLock.unlink();
}
WARN_MSG("Could not start buffer, cancelling"); WARN_MSG("Could not start buffer, cancelling");
return; return;
} }
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
nProxy.userClient.countAsViewer = false;
INFO_MSG("Input for stream %s started", streamName.c_str()); INFO_MSG("Input for stream %s started", streamName.c_str());
if (!openStreamSource()){ if (!openStreamSource()){
FAIL_MSG("Unable to connect to source"); FAIL_MSG("Unable to connect to source");
pullLock.post(); if(isSingular()){
pullLock.close(); pullLock.post();
pullLock.close();
}
return; return;
} }
parseStreamHeader(); parseStreamHeader();
@ -389,12 +398,18 @@ namespace Mist {
if (myMeta.tracks.size() == 0){ if (myMeta.tracks.size() == 0){
nProxy.userClient.finish(); nProxy.userClient.finish();
finish(); finish();
pullLock.post(); if(isSingular()){
pullLock.close(); pullLock.post();
pullLock.unlink(); pullLock.close();
pullLock.unlink();
}
INFO_MSG("No tracks found, cancelling"); INFO_MSG("No tracks found, cancelling");
return; return;
} }
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
nProxy.userClient.countAsViewer = false; nProxy.userClient.countAsViewer = false;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
@ -409,9 +424,11 @@ namespace Mist {
nProxy.userClient.finish(); nProxy.userClient.finish();
finish(); finish();
pullLock.post(); if(isSingular()){
pullLock.close(); pullLock.post();
pullLock.unlink(); pullLock.close();
pullLock.unlink();
}
INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str());
return; return;
} }

View file

@ -25,12 +25,16 @@ namespace Mist {
virtual ~Input() {}; virtual ~Input() {};
virtual bool needsLock(){return true;} virtual bool needsLock(){return true;}
static Util::Config * config;
protected: protected:
static void callbackWrapper(char * data, size_t len, unsigned int id); static void callbackWrapper(char * data, size_t len, unsigned int id);
virtual bool checkArguments() = 0; virtual bool checkArguments() = 0;
virtual bool readHeader() = 0; virtual bool readHeader() = 0;
virtual bool needHeader(){return !readExistingHeader();} virtual bool needHeader(){return !readExistingHeader();}
virtual bool preRun(){return true;} virtual bool preRun(){return true;}
virtual bool isSingular(){return true;}
virtual bool readExistingHeader(); virtual bool readExistingHeader();
virtual bool atKeyFrame(); virtual bool atKeyFrame();
virtual void getNext(bool smart = true) {} virtual void getNext(bool smart = true) {}

View file

@ -7,7 +7,6 @@
#include "io.h" #include "io.h"
namespace Mist { namespace Mist {
Util::Config * InOutBase::config = NULL;
///Opens a shared memory page for the stream metadata. ///Opens a shared memory page for the stream metadata.
/// ///
///Assumes myMeta contains the metadata to write. ///Assumes myMeta contains the metadata to write.

View file

@ -78,7 +78,6 @@ namespace Mist {
bool standAlone; bool standAlone;
static Util::Config * config;
negotiationProxy nProxy; negotiationProxy nProxy;

View file

@ -16,6 +16,7 @@
namespace Mist{ namespace Mist{
JSON::Value Output::capa = JSON::Value(); JSON::Value Output::capa = JSON::Value();
Util::Config * Output::config = NULL;
int getDTSCLen(char * mapped, long long int offset){ int getDTSCLen(char * mapped, long long int offset){
return Bit::btohl(mapped + offset + 4); return Bit::btohl(mapped + offset + 4);

View file

@ -74,6 +74,7 @@ namespace Mist {
virtual void sendHeader(); virtual void sendHeader();
virtual void onFail(); virtual void onFail();
virtual void requestHandler(); virtual void requestHandler();
static Util::Config * config;
private://these *should* not be messed with in child classes. private://these *should* not be messed with in child classes.
std::map<unsigned long, unsigned int> currKeyOpen; std::map<unsigned long, unsigned int> currKeyOpen;
void loadPageForKey(long unsigned int trackId, long long int keyNum); void loadPageForKey(long unsigned int trackId, long long int keyNum);