Added autostarting for processes through buffer

This commit is contained in:
Thulinma 2018-08-29 13:13:57 +02:00
parent 637c57be5e
commit fff62656ba
2 changed files with 108 additions and 0 deletions

View file

@ -8,6 +8,9 @@
#include <cstdio>
#include <string>
#include <mist/stream.h>
#include <mist/procs.h>
#include <mist/util.h>
#include <mist/langcodes.h>
#include <mist/defines.h>
#include <mist/triggers.h>
#include <mist/bitfields.h>
@ -1097,9 +1100,111 @@ namespace Mist {
INFO_MSG("Setting segmentSize from %u to new value of %lli", segmentSize, tmpNum);
segmentSize = tmpNum;
}
if (streamCfg){
JSON::Value configuredProcesses = streamCfg.getMember("processes").asJSON();
checkProcesses(configuredProcesses);
}
/*LTS-END*/
return true;
}
uint64_t inputBuffer::findTrack(const std::string &trackVal){
if (!trackVal.size() || trackVal == "0"){return 0;}//don't select anything in particular
if (trackVal.find(',') != std::string::npos){
//Comma-separated list, recurse.
std::stringstream ss(trackVal);
std::string item;
while (std::getline(ss, item, ',')){
uint64_t r = findTrack(item);
if (r){return r;}//return first match
}
return 0;//nothing found
}
size_t trackNo = JSON::Value(trackVal).asInt();
if (trackVal == JSON::Value(trackNo).asString()){
//It's an integer number
if (!myMeta.tracks.count(trackNo)){
return 0;
}
return trackNo;
}
std::string trackLow = trackVal;
Util::stringToLower(trackLow);
if (trackLow == "all" || trackLow == "*"){
//select all tracks of this type
if (!myMeta.tracks.size()){return 0;}
return myMeta.tracks.begin()->first;
}
//attempt to do language/codec matching
//convert 2-character language codes into 3-character language codes
if (trackLow.size() == 2){trackLow = Encodings::ISO639::twoToThree(trackLow);}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
const DTSC::Track & Trk = it->second;
std::string codecLow = Trk.codec;
Util::stringToLower(codecLow);
if (Trk.lang == trackLow || trackLow == codecLow){
return it->first;
}
}
return 0;
}
/*LTS-START*/
///Checks if all processes are running, starts them if needed, stops them if needed
void inputBuffer::checkProcesses(const JSON::Value & procs){
std::set<std::string> newProcs;
// used for building args
int zero = 0;
int out = fileno(stdout);
int err = fileno(stderr);
char * argarr[2]; // approx max # of args (with a wide margin)
//Convert to strings
jsonForEachConst(procs, it){
JSON::Value tmp = *it;
tmp["source"] = streamName;
if (tmp.isMember("source_track") && findTrack(tmp["source_track"].asString()) == 0){
//No match - skip this process
continue;
}
newProcs.insert(tmp.toString());
}
//shut down deleted/changed processes
std::map<std::string, pid_t>::iterator it;
if (runningProcs.size()){
for (it = runningProcs.begin(); it != runningProcs.end(); it++){
if (!newProcs.count(it->first)){
if (Util::Procs::isActive(it->second)){
INFO_MSG("Stopping process %llu: %s", it->second, it->first.c_str());
Util::Procs::Stop(it->second);
}
runningProcs.erase(it);
if (!runningProcs.size()){
break;
}
it = runningProcs.begin();
}
}
}
//start up new/changed connectors
while (newProcs.size() && config->is_active){
std::string config = (*newProcs.begin());
if (!runningProcs.count(config) || !Util::Procs::isActive(runningProcs[config])){
std::string procname = Util::getMyPath() + "MistProc" + JSON::fromString(config)["process"].asString();
argarr[0] = (char*)procname.c_str();
argarr[1] = (char*)config.c_str();
argarr[2] = 0;
INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]);
runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, &zero, &out, &err);
}
newProcs.erase(newProcs.begin());
}
}
/*LTS-END*/
}

View file

@ -46,6 +46,9 @@ namespace Mist {
inputBuffer * singleton;
//This is used for an ugly fix to prevent metadata from disappearing in some cases.
std::map<unsigned long, std::string> initData;
uint64_t findTrack(const std::string &trackVal);
void checkProcesses(const JSON::Value & procs); //LTS
std::map<std::string, pid_t> runningProcs;//LTS
};
}