Improved stream start handling
This commit is contained in:
parent
017258b1b9
commit
92d14f52ea
3 changed files with 163 additions and 114 deletions
248
lib/stream.cpp
248
lib/stream.cpp
|
@ -73,7 +73,7 @@ void Util::sanitizeName(std::string & streamname) {
|
|||
}
|
||||
}
|
||||
|
||||
JSON::Value Util::getStreamConfig(std::string streamname){
|
||||
JSON::Value Util::getStreamConfig(const std::string & streamname){
|
||||
JSON::Value result;
|
||||
if (streamname.size() > 100){
|
||||
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
|
||||
|
@ -83,8 +83,6 @@ JSON::Value Util::getStreamConfig(std::string streamname){
|
|||
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||
configLock.wait();
|
||||
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
||||
|
||||
sanitizeName(streamname);
|
||||
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
|
||||
//check if smp (everything before + or space) exists
|
||||
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
|
||||
|
@ -97,6 +95,20 @@ JSON::Value Util::getStreamConfig(std::string streamname){
|
|||
return result;
|
||||
}
|
||||
|
||||
DTSC::Meta Util::getStreamMeta(const std::string & streamname){
|
||||
DTSC::Meta ret;
|
||||
char pageId[NAME_BUFFER_SIZE];
|
||||
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamname.c_str());
|
||||
IPC::sharedPage mPage(pageId, DEFAULT_STRM_PAGE_SIZE);
|
||||
if (!mPage.mapped){
|
||||
FAIL_MSG("Could not connect to metadata for %s", streamname.c_str());
|
||||
return ret;
|
||||
}
|
||||
DTSC::Packet tmpMeta(mPage.mapped, mPage.len, true);
|
||||
if (tmpMeta.getVersion()){ret.reinit(tmpMeta);}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Checks if the given streamname has an active input serving it. Returns true if this is the case.
|
||||
/// Assumes the streamname has already been through sanitizeName()!
|
||||
bool Util::streamAlive(std::string & streamname){
|
||||
|
@ -119,7 +131,7 @@ bool Util::streamAlive(std::string & streamname){
|
|||
/// Then, checks if an input is already active by running streamAlive(). If yes, return true.
|
||||
/// If no, loads up the server configuration and attempts to start the given stream according to current configuration.
|
||||
/// At this point, fails and aborts if MistController isn't running.
|
||||
bool Util::startInput(std::string streamname, std::string filename, bool forkFirst, bool isProvider) {
|
||||
bool Util::startInput(std::string streamname, std::string filename, bool forkFirst, bool isProvider, const std::map<std::string, std::string> & overrides, pid_t * spawn_pid ) {
|
||||
sanitizeName(streamname);
|
||||
if (streamname.size() > 100){
|
||||
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
|
||||
|
@ -130,34 +142,23 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
|
|||
//It's still possible a duplicate starts anyway, this is caught in the inputs initializer.
|
||||
//Note: this uses the _whole_ stream name, including + (if any).
|
||||
//This means "test+a" and "test+b" have separate locks and do not interact with each other.
|
||||
if (streamAlive(streamname)){
|
||||
uint8_t streamStat = getStreamStatus(streamname);
|
||||
while (streamStat == STRMSTAT_SHUTDOWN){
|
||||
while (streamStat != STRMSTAT_OFF && streamStat != STRMSTAT_READY){
|
||||
if (streamStat == STRMSTAT_BOOT && overrides.count("throughboot")){
|
||||
break;
|
||||
}
|
||||
Util::sleep(250);
|
||||
streamStat = getStreamStatus(streamname);
|
||||
}
|
||||
if (streamStat != STRMSTAT_OFF){
|
||||
if (streamAlive(streamname) && !overrides.count("alwaysStart")){
|
||||
DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
//Attempt to load up configuration and find this stream
|
||||
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
|
||||
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||
//Lock the config to prevent race conditions and corruption issues while reading
|
||||
configLock.wait();
|
||||
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
||||
//Abort if no config available
|
||||
if (!config){
|
||||
FAIL_MSG("Configuration not available, aborting! Is MistController running?");
|
||||
configLock.post();//unlock the config semaphore
|
||||
return false;
|
||||
}
|
||||
//Find stream base name
|
||||
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
|
||||
//check if base name (everything before + or space) exists
|
||||
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
|
||||
const JSON::Value stream_cfg = getStreamConfig(streamname);
|
||||
if (!stream_cfg){
|
||||
DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str());
|
||||
}
|
||||
|
@ -166,102 +167,56 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
|
|||
if (!filename.size()){
|
||||
if (!stream_cfg){
|
||||
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str());
|
||||
configLock.post();//unlock the config semaphore
|
||||
return false;
|
||||
}
|
||||
filename = stream_cfg.getMember("source").asString();
|
||||
filename = stream_cfg["source"].asStringRef();
|
||||
}
|
||||
|
||||
//check in curConf for capabilities-inputs-<naam>-priority/source_match
|
||||
std::string player_bin;
|
||||
bool selected = false;
|
||||
long long int curPrio = -1;
|
||||
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
|
||||
DTSC::Scan input;
|
||||
unsigned int input_size = inputs.getSize();
|
||||
bool noProviderNoPick = false;
|
||||
for (unsigned int i = 0; i < input_size; ++i){
|
||||
DTSC::Scan tmp_input = inputs.getIndice(i);
|
||||
|
||||
//if match voor current stream && priority is hoger dan wat we al hebben
|
||||
if (tmp_input.getMember("source_match") && curPrio < tmp_input.getMember("priority").asInt()){
|
||||
if (tmp_input.getMember("source_match").getSize()){
|
||||
for(unsigned int j = 0; j < tmp_input.getMember("source_match").getSize(); ++j){
|
||||
std::string source = tmp_input.getMember("source_match").getIndice(j).asString();
|
||||
std::string front = source.substr(0,source.find('*'));
|
||||
std::string back = source.substr(source.find('*')+1);
|
||||
MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str());
|
||||
|
||||
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
|
||||
if (tmp_input.getMember("non-provider") && !isProvider){
|
||||
noProviderNoPick = true;
|
||||
continue;
|
||||
}
|
||||
player_bin = Util::getMyPath() + "MistIn" + tmp_input.getMember("name").asString();
|
||||
curPrio = tmp_input.getMember("priority").asInt();
|
||||
selected = true;
|
||||
input = tmp_input;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
std::string source = tmp_input.getMember("source_match").asString();
|
||||
std::string front = source.substr(0,source.find('*'));
|
||||
std::string back = source.substr(source.find('*')+1);
|
||||
MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str());
|
||||
|
||||
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
|
||||
if (tmp_input.getMember("non-provider") && !isProvider){
|
||||
noProviderNoPick = true;
|
||||
continue;
|
||||
}
|
||||
player_bin = Util::getMyPath() + "MistIn" + tmp_input.getMember("name").asString();
|
||||
curPrio = tmp_input.getMember("priority").asInt();
|
||||
selected = true;
|
||||
input = tmp_input;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (!selected){
|
||||
configLock.post();//unlock the config semaphore
|
||||
if (noProviderNoPick){
|
||||
INFO_MSG("Not a media provider for stream %s: %s", streamname.c_str(), filename.c_str());
|
||||
}else{
|
||||
FAIL_MSG("No compatible input found for stream %s: %s", streamname.c_str(), filename.c_str());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
const JSON::Value input = getInputBySource(filename, isProvider);
|
||||
if (!input){return false;}
|
||||
|
||||
//copy the necessary arguments to separate storage so we can unlock the config semaphore safely
|
||||
std::map<std::string, std::string> str_args;
|
||||
//check required parameters
|
||||
DTSC::Scan required = input.getMember("required");
|
||||
unsigned int req_size = required.getSize();
|
||||
for (unsigned int i = 0; i < req_size; ++i){
|
||||
std::string opt = required.getIndiceName(i);
|
||||
if (!stream_cfg.getMember(opt)){
|
||||
configLock.post();//unlock the config semaphore
|
||||
FAIL_MSG("Required parameter %s for stream %s missing", opt.c_str(), streamname.c_str());
|
||||
if (input.isMember("required")){
|
||||
jsonForEachConst(input["required"], prm){
|
||||
const std::string opt = (*prm)["option"].asStringRef();
|
||||
//check for overrides
|
||||
if (overrides.count(opt)){
|
||||
str_args[opt] = overrides.at(opt);
|
||||
}else{
|
||||
if (!stream_cfg.isMember(prm.key())){
|
||||
FAIL_MSG("Required parameter %s for stream %s missing", prm.key().c_str(), streamname.c_str());
|
||||
return false;
|
||||
}
|
||||
str_args[required.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString();
|
||||
str_args[opt] = stream_cfg[opt].asStringRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
//check optional parameters
|
||||
DTSC::Scan optional = input.getMember("optional");
|
||||
unsigned int opt_size = optional.getSize();
|
||||
for (unsigned int i = 0; i < opt_size; ++i){
|
||||
std::string opt = optional.getIndiceName(i);
|
||||
VERYHIGH_MSG("Checking optional %u: %s", i, opt.c_str());
|
||||
if (stream_cfg.getMember(opt)){
|
||||
str_args[optional.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString();
|
||||
if (input.isMember("optional")){
|
||||
jsonForEachConst(input["optional"], prm){
|
||||
const std::string opt = (*prm)["option"].asStringRef();
|
||||
//check for overrides
|
||||
if (overrides.count(opt)){
|
||||
str_args[opt] = overrides.at(opt);
|
||||
}else{
|
||||
if (stream_cfg.isMember(prm.key())){
|
||||
str_args[opt] = stream_cfg[prm.key()].asStringRef();
|
||||
}
|
||||
}
|
||||
if (!prm->isMember("type") && str_args.count(opt)){
|
||||
str_args[opt] = "";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//finally, unlock the config semaphore
|
||||
configLock.post();
|
||||
if (isProvider){
|
||||
//Set environment variable so we can know if we have a provider when re-exec'ing.
|
||||
setenv("MISTPROVIDER", "1", 1);
|
||||
}
|
||||
|
||||
std::string player_bin = Util::getMyPath() + "MistIn" + input["name"].asStringRef();
|
||||
INFO_MSG("Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
|
||||
char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()};
|
||||
int argNum = 3;
|
||||
|
@ -273,11 +228,14 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
|
|||
}
|
||||
for (std::map<std::string, std::string>::iterator it = str_args.begin(); it != str_args.end(); ++it){
|
||||
argv[++argNum] = (char *)it->first.c_str();
|
||||
if (it->second.size()){
|
||||
argv[++argNum] = (char *)it->second.c_str();
|
||||
INFO_MSG(" Option %s = %s", it->first.c_str(), it->second.c_str());
|
||||
}
|
||||
}
|
||||
argv[++argNum] = (char *)0;
|
||||
|
||||
Util::Procs::setHandler();
|
||||
|
||||
int pid = 0;
|
||||
if (forkFirst){
|
||||
DEBUG_MSG(DLVL_DONTEVEN, "Forking");
|
||||
|
@ -301,15 +259,99 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
|
|||
execvp(argv[0], argv);
|
||||
FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno));
|
||||
_exit(42);
|
||||
}else if (spawn_pid != NULL){
|
||||
*spawn_pid = pid;
|
||||
}
|
||||
|
||||
unsigned int waiting = 0;
|
||||
while (!streamAlive(streamname) && ++waiting < 40){
|
||||
while (!streamAlive(streamname) && ++waiting < 240){
|
||||
Util::wait(250);
|
||||
if (!Util::Procs::isRunning(pid)){
|
||||
FAIL_MSG("Input process shut down before stream coming online, aborting.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return streamAlive(streamname);
|
||||
}
|
||||
|
||||
JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider){
|
||||
JSON::Value ret;
|
||||
|
||||
//Attempt to load up configuration and find this stream
|
||||
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
|
||||
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||
//Lock the config to prevent race conditions and corruption issues while reading
|
||||
configLock.wait();
|
||||
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
||||
//Abort if no config available
|
||||
if (!config){
|
||||
FAIL_MSG("Configuration not available, aborting! Is MistController running?");
|
||||
configLock.post();//unlock the config semaphore
|
||||
return false;
|
||||
}
|
||||
|
||||
//check in curConf for capabilities-inputs-<naam>-priority/source_match
|
||||
bool selected = false;
|
||||
long long int curPrio = -1;
|
||||
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
|
||||
DTSC::Scan input;
|
||||
unsigned int input_size = inputs.getSize();
|
||||
bool noProviderNoPick = false;
|
||||
for (unsigned int i = 0; i < input_size; ++i){
|
||||
DTSC::Scan tmp_input = inputs.getIndice(i);
|
||||
|
||||
//if match voor current stream && priority is hoger dan wat we al hebben
|
||||
if (tmp_input.getMember("source_match") && curPrio < tmp_input.getMember("priority").asInt()){
|
||||
if (tmp_input.getMember("source_match").getSize()){
|
||||
for(unsigned int j = 0; j < tmp_input.getMember("source_match").getSize(); ++j){
|
||||
std::string source = tmp_input.getMember("source_match").getIndice(j).asString();
|
||||
std::string front = source.substr(0,source.find('*'));
|
||||
std::string back = source.substr(source.find('*')+1);
|
||||
MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str());
|
||||
|
||||
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
|
||||
if (tmp_input.getMember("non-provider") && !isProvider){
|
||||
noProviderNoPick = true;
|
||||
continue;
|
||||
}
|
||||
curPrio = tmp_input.getMember("priority").asInt();
|
||||
selected = true;
|
||||
input = tmp_input;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
std::string source = tmp_input.getMember("source_match").asString();
|
||||
std::string front = source.substr(0,source.find('*'));
|
||||
std::string back = source.substr(source.find('*')+1);
|
||||
MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), tmp_input.getMember("name").asString().c_str(), source.c_str());
|
||||
|
||||
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
|
||||
if (tmp_input.getMember("non-provider") && !isProvider){
|
||||
noProviderNoPick = true;
|
||||
continue;
|
||||
}
|
||||
curPrio = tmp_input.getMember("priority").asInt();
|
||||
selected = true;
|
||||
input = tmp_input;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if (!selected){
|
||||
if (noProviderNoPick){
|
||||
INFO_MSG("Not a media provider for input: %s", filename.c_str());
|
||||
}else{
|
||||
FAIL_MSG("No compatible input found for: %s", filename.c_str());
|
||||
}
|
||||
}else{
|
||||
ret = input.asJSON();
|
||||
}
|
||||
configLock.post();//unlock the config semaphore
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint8_t Util::getStreamStatus(const std::string & streamname){
|
||||
char pageName[NAME_BUFFER_SIZE];
|
||||
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamname.c_str());
|
||||
|
|
|
@ -5,13 +5,16 @@
|
|||
#include <string>
|
||||
#include "socket.h"
|
||||
#include "json.h"
|
||||
#include "dtsc.h"
|
||||
|
||||
namespace Util {
|
||||
std::string getTmpFolder();
|
||||
void sanitizeName(std::string & streamname);
|
||||
bool streamAlive(std::string & streamname);
|
||||
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false);
|
||||
JSON::Value getStreamConfig(std::string streamname);
|
||||
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false, const std::map<std::string, std::string> & overrides = std::map<std::string, std::string>(), pid_t * spawn_pid = NULL);
|
||||
JSON::Value getStreamConfig(const std::string & streamname);
|
||||
JSON::Value getInputBySource(const std::string & filename, bool isProvider = false);
|
||||
DTSC::Meta getStreamMeta(const std::string & streamname);
|
||||
uint8_t getStreamStatus(const std::string & streamname);
|
||||
}
|
||||
|
||||
|
|
|
@ -100,6 +100,8 @@ namespace Mist {
|
|||
return 0;
|
||||
}
|
||||
|
||||
INFO_MSG("Booting input for stream %s", streamName.c_str());
|
||||
|
||||
if (!checkArguments()) {
|
||||
FAIL_MSG("Setup failed - exiting");
|
||||
return 0;
|
||||
|
@ -152,7 +154,7 @@ namespace Mist {
|
|||
}
|
||||
//if the exit was clean, don't restart it
|
||||
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
|
||||
MEDIUM_MSG("Input for stream %s shut down cleanly", streamName.c_str());
|
||||
INFO_MSG("Input for stream %s shut down cleanly", streamName.c_str());
|
||||
break;
|
||||
}
|
||||
char pageName[NAME_BUFFER_SIZE];
|
||||
|
@ -359,7 +361,9 @@ namespace Mist {
|
|||
WARN_MSG("Stream already online, cancelling");
|
||||
return;
|
||||
}
|
||||
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true)) {//manually override stream url to start the buffer
|
||||
std::map<std::string, std::string> overrides;
|
||||
overrides["throughboot"] = "";
|
||||
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer
|
||||
pullLock.post();
|
||||
pullLock.close();
|
||||
pullLock.unlink();
|
||||
|
|
Loading…
Add table
Reference in a new issue