From bbc31722bdc83602d050d79b1897542cc5cdef7d Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 9 May 2018 10:37:56 +0200 Subject: [PATCH] Generalized stream variables implementation # Conflicts: # lib/stream.cpp --- lib/stream.cpp | 284 ++++++++++++++++++++++++++------------------ lib/stream.h | 7 +- src/input/input.cpp | 8 +- 3 files changed, 177 insertions(+), 122 deletions(-) diff --git a/lib/stream.cpp b/lib/stream.cpp index 1afccc59..af4b512f 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -122,7 +122,7 @@ void Util::sanitizeName(std::string & streamname) { } } -JSON::Value Util::getStreamConfig(std::string streamname){ +JSON::Value Util::getStreamConfig(const std::string & streamname){ JSON::Value result; if (streamname.size() > 100){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); @@ -132,8 +132,6 @@ JSON::Value Util::getStreamConfig(std::string streamname){ IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - - sanitizeName(streamname); std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); //check if smp (everything before + or space) exists DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); @@ -146,6 +144,20 @@ JSON::Value Util::getStreamConfig(std::string streamname){ return result; } +DTSC::Meta Util::getStreamMeta(const std::string & streamname){ + DTSC::Meta ret; + char pageId[NAME_BUFFER_SIZE]; + snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamname.c_str()); + IPC::sharedPage mPage(pageId, DEFAULT_STRM_PAGE_SIZE); + if (!mPage.mapped){ + FAIL_MSG("Could not connect to metadata for %s", streamname.c_str()); + return ret; + } + DTSC::Packet tmpMeta(mPage.mapped, mPage.len, true); + if (tmpMeta.getVersion()){ret.reinit(tmpMeta);} + return ret; +} + /// Checks if the given streamname has an active input serving it. Returns true if this is the case. /// Assumes the streamname has already been through sanitizeName()! bool Util::streamAlive(std::string & streamname){ @@ -168,7 +180,7 @@ bool Util::streamAlive(std::string & streamname){ /// Then, checks if an input is already active by running streamAlive(). If yes, return true. /// If no, loads up the server configuration and attempts to start the given stream according to current configuration. /// At this point, fails and aborts if MistController isn't running. -bool Util::startInput(std::string streamname, std::string filename, bool forkFirst, bool isProvider) { +bool Util::startInput(std::string streamname, std::string filename, bool forkFirst, bool isProvider, const std::map & overrides, pid_t * spawn_pid ) { sanitizeName(streamname); if (streamname.size() > 100){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); @@ -179,52 +191,53 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir //It's still possible a duplicate starts anyway, this is caught in the inputs initializer. //Note: this uses the _whole_ stream name, including + (if any). //This means "test+a" and "test+b" have separate locks and do not interact with each other. - if (streamAlive(streamname)){ - uint8_t streamStat = getStreamStatus(streamname); - while (streamStat == STRMSTAT_SHUTDOWN){ - Util::sleep(250); - streamStat = getStreamStatus(streamname); - } - if (streamStat != STRMSTAT_OFF){ - DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str()); - return true; + uint8_t streamStat = getStreamStatus(streamname); + while (streamStat != STRMSTAT_OFF && streamStat != STRMSTAT_READY){ + if (streamStat == STRMSTAT_BOOT && overrides.count("throughboot")){ + break; } + Util::sleep(250); + streamStat = getStreamStatus(streamname); + } + if (streamAlive(streamname) && !overrides.count("alwaysStart")){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str()); + return true; } - //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //Lock the config to prevent race conditions and corruption issues while reading - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - //Abort if no config available - if (!config){ - FAIL_MSG("Configuration not available, aborting! Is MistController running?"); - configLock.post();//unlock the config semaphore - return false; - } /*LTS-START*/ - if (config.getMember("hardlimit_active")) { - configLock.post();//unlock the config semaphore - return false; + { + //Attempt to load up configuration and find this stream + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); + //Lock the config to prevent race conditions and corruption issues while reading + configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + //Abort if no config available + if (config){ + if (config.getMember("hardlimit_active")) { + configLock.post();//unlock the config semaphore + return false; + } + } + //unlock the config semaphore + configLock.post(); } /*LTS-END*/ + //Find stream base name std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); //check if base name (everything before + or space) exists - DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); + const JSON::Value stream_cfg = getStreamConfig(streamname); if (!stream_cfg){ DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str()); } /*LTS-START*/ if (!filename.size()){ - if (stream_cfg && stream_cfg.getMember("hardlimit_active")) { - configLock.post();//unlock the config semaphore + if (stream_cfg && stream_cfg.isMember("hardlimit_active")) { return false; } if(Triggers::shouldTrigger("STREAM_LOAD", smp)){ if (!Triggers::doTrigger("STREAM_LOAD", streamname, smp)){ - configLock.post();//unlock the config semaphore return false; } } @@ -239,109 +252,57 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir if (!filename.size()){ if (!stream_cfg){ DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str()); - configLock.post();//unlock the config semaphore return false; } - filename = stream_cfg.getMember("source").asString(); + filename = stream_cfg["source"].asStringRef(); } streamVariables(filename, streamname); - - //check in curConf for capabilities-inputs--priority/source_match - std::string player_bin; - bool selected = false; - long long int curPrio = -1; - DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); - DTSC::Scan input; - unsigned int input_size = inputs.getSize(); - bool noProviderNoPick = false; - for (unsigned int i = 0; i < input_size; ++i){ - DTSC::Scan tmp_input = inputs.getIndice(i); - - //if match voor current stream && priority is hoger dan wat we al hebben - if (tmp_input.getMember("source_match") && curPrio < tmp_input.getMember("priority").asInt()){ - if (tmp_input.getMember("source_match").getSize()){ - for(unsigned int j = 0; j < tmp_input.getMember("source_match").getSize(); ++j){ - std::string source = tmp_input.getMember("source_match").getIndice(j).asString(); - std::string front = source.substr(0,source.find('*')); - std::string back = source.substr(source.find('*')+1); - MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str()); - - if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ - if (tmp_input.getMember("non-provider") && !isProvider){ - noProviderNoPick = true; - continue; - } - player_bin = Util::getMyPath() + "MistIn" + tmp_input.getMember("name").asString(); - curPrio = tmp_input.getMember("priority").asInt(); - selected = true; - input = tmp_input; - } - } - }else{ - std::string source = tmp_input.getMember("source_match").asString(); - std::string front = source.substr(0,source.find('*')); - std::string back = source.substr(source.find('*')+1); - MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str()); - - if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ - if (tmp_input.getMember("non-provider") && !isProvider){ - noProviderNoPick = true; - continue; - } - player_bin = Util::getMyPath() + "MistIn" + tmp_input.getMember("name").asString(); - curPrio = tmp_input.getMember("priority").asInt(); - selected = true; - input = tmp_input; - } - } - - } - } - - if (!selected){ - configLock.post();//unlock the config semaphore - if (noProviderNoPick){ - INFO_MSG("Not a media provider for stream %s: %s", streamname.c_str(), filename.c_str()); - }else{ - FAIL_MSG("No compatible input found for stream %s: %s", streamname.c_str(), filename.c_str()); - } - return false; - } + const JSON::Value input = getInputBySource(filename, isProvider); + if (!input){return false;} //copy the necessary arguments to separate storage so we can unlock the config semaphore safely std::map str_args; //check required parameters - DTSC::Scan required = input.getMember("required"); - unsigned int req_size = required.getSize(); - for (unsigned int i = 0; i < req_size; ++i){ - std::string opt = required.getIndiceName(i); - if (!stream_cfg.getMember(opt)){ - configLock.post();//unlock the config semaphore - FAIL_MSG("Required parameter %s for stream %s missing", opt.c_str(), streamname.c_str()); - return false; + if (input.isMember("required")){ + jsonForEachConst(input["required"], prm){ + const std::string opt = (*prm)["option"].asStringRef(); + //check for overrides + if (overrides.count(opt)){ + str_args[opt] = overrides.at(opt); + }else{ + if (!stream_cfg.isMember(prm.key())){ + FAIL_MSG("Required parameter %s for stream %s missing", prm.key().c_str(), streamname.c_str()); + return false; + } + str_args[opt] = stream_cfg[opt].asStringRef(); + } } - str_args[required.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString(); } //check optional parameters - DTSC::Scan optional = input.getMember("optional"); - unsigned int opt_size = optional.getSize(); - for (unsigned int i = 0; i < opt_size; ++i){ - std::string opt = optional.getIndiceName(i); - VERYHIGH_MSG("Checking optional %u: %s", i, opt.c_str()); - if (stream_cfg.getMember(opt)){ - str_args[optional.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString(); + if (input.isMember("optional")){ + jsonForEachConst(input["optional"], prm){ + const std::string opt = (*prm)["option"].asStringRef(); + //check for overrides + if (overrides.count(opt)){ + str_args[opt] = overrides.at(opt); + }else{ + if (stream_cfg.isMember(prm.key())){ + str_args[opt] = stream_cfg[prm.key()].asStringRef(); + } + } + if (!prm->isMember("type") && str_args.count(opt)){ + str_args[opt] = ""; + } } } - //finally, unlock the config semaphore - configLock.post(); - if (isProvider){ //Set environment variable so we can know if we have a provider when re-exec'ing. setenv("MISTPROVIDER", "1", 1); } + std::string player_bin = Util::getMyPath() + "MistIn" + input["name"].asStringRef(); INFO_MSG("Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()}; int argNum = 3; @@ -353,11 +314,14 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir } for (std::map::iterator it = str_args.begin(); it != str_args.end(); ++it){ argv[++argNum] = (char *)it->first.c_str(); - argv[++argNum] = (char *)it->second.c_str(); - INFO_MSG(" Option %s = %s", it->first.c_str(), it->second.c_str()); + if (it->second.size()){ + argv[++argNum] = (char *)it->second.c_str(); + } } argv[++argNum] = (char *)0; + Util::Procs::setHandler(); + int pid = 0; if (forkFirst){ DEBUG_MSG(DLVL_DONTEVEN, "Forking"); @@ -381,16 +345,100 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir execvp(argv[0], argv); FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); _exit(42); + }else if (spawn_pid != NULL){ + *spawn_pid = pid; } unsigned int waiting = 0; - while (!streamAlive(streamname) && ++waiting < 40){ + while (!streamAlive(streamname) && ++waiting < 240){ Util::wait(250); + if (!Util::Procs::isRunning(pid)){ + FAIL_MSG("Input process shut down before stream coming online, aborting."); + break; + } } return streamAlive(streamname); } +JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider){ + JSON::Value ret; + + //Attempt to load up configuration and find this stream + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); + //Lock the config to prevent race conditions and corruption issues while reading + configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + //Abort if no config available + if (!config){ + FAIL_MSG("Configuration not available, aborting! Is MistController running?"); + configLock.post();//unlock the config semaphore + return false; + } + + //check in curConf for capabilities-inputs--priority/source_match + bool selected = false; + long long int curPrio = -1; + DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); + DTSC::Scan input; + unsigned int input_size = inputs.getSize(); + bool noProviderNoPick = false; + for (unsigned int i = 0; i < input_size; ++i){ + DTSC::Scan tmp_input = inputs.getIndice(i); + + //if match voor current stream && priority is hoger dan wat we al hebben + if (tmp_input.getMember("source_match") && curPrio < tmp_input.getMember("priority").asInt()){ + if (tmp_input.getMember("source_match").getSize()){ + for(unsigned int j = 0; j < tmp_input.getMember("source_match").getSize(); ++j){ + std::string source = tmp_input.getMember("source_match").getIndice(j).asString(); + std::string front = source.substr(0,source.find('*')); + std::string back = source.substr(source.find('*')+1); + MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str()); + + if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ + if (tmp_input.getMember("non-provider") && !isProvider){ + noProviderNoPick = true; + continue; + } + curPrio = tmp_input.getMember("priority").asInt(); + selected = true; + input = tmp_input; + } + } + }else{ + std::string source = tmp_input.getMember("source_match").asString(); + std::string front = source.substr(0,source.find('*')); + std::string back = source.substr(source.find('*')+1); + MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str()); + + if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ + if (tmp_input.getMember("non-provider") && !isProvider){ + noProviderNoPick = true; + continue; + } + curPrio = tmp_input.getMember("priority").asInt(); + selected = true; + input = tmp_input; + } + } + + } + } + if (!selected){ + if (noProviderNoPick){ + INFO_MSG("Not a media provider for input: %s", filename.c_str()); + }else{ + FAIL_MSG("No compatible input found for: %s", filename.c_str()); + } + }else{ + ret = input.asJSON(); + } + configLock.post();//unlock the config semaphore + return ret; +} + + /// Attempt to start a push for streamname to target. /// streamname MUST be pre-sanitized /// target gets variables replaced and may be altered by the PUSH_OUT_START trigger response. diff --git a/lib/stream.h b/lib/stream.h index 2a0f33be..633df7be 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -5,15 +5,18 @@ #include #include "socket.h" #include "json.h" +#include "dtsc.h" namespace Util { void streamVariables(std::string &str, const std::string & streamname, const std::string & source = ""); std::string getTmpFolder(); void sanitizeName(std::string & streamname); bool streamAlive(std::string & streamname); - bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false); + bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false, const std::map & overrides = std::map(), pid_t * spawn_pid = NULL); int startPush(const std::string & streamname, std::string & target); - JSON::Value getStreamConfig(std::string streamname); + JSON::Value getStreamConfig(const std::string & streamname); + JSON::Value getInputBySource(const std::string & filename, bool isProvider = false); + DTSC::Meta getStreamMeta(const std::string & streamname); uint8_t getStreamStatus(const std::string & streamname); } diff --git a/src/input/input.cpp b/src/input/input.cpp index d71f50cb..f65473c9 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -222,6 +222,8 @@ namespace Mist { return 0; } + INFO_MSG("Booting input for stream %s", streamName.c_str()); + if (!checkArguments()) { FAIL_MSG("Setup failed - exiting"); return 0; @@ -274,7 +276,7 @@ namespace Mist { } //if the exit was clean, don't restart it if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ - MEDIUM_MSG("Input for stream %s shut down cleanly", streamName.c_str()); + INFO_MSG("Input for stream %s shut down cleanly", streamName.c_str()); break; } char pageName[NAME_BUFFER_SIZE]; @@ -518,7 +520,9 @@ namespace Mist { WARN_MSG("Stream already online, cancelling"); return; } - if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true)) {//manually override stream url to start the buffer + std::map overrides; + overrides["throughboot"] = ""; + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer pullLock.post(); pullLock.close(); pullLock.unlink();