Add configurable restart behaviour for processes

This commit is contained in:
Marco van Dijk 2023-07-10 15:20:01 +02:00 committed by Thulinma
parent 19f2388500
commit 9f97610e47
3 changed files with 65 additions and 3 deletions

View file

@ -655,6 +655,7 @@ namespace Mist{
allProcsRunning = true; allProcsRunning = true;
if (!M.getValidTracks().size()){return;} if (!M.getValidTracks().size()){return;}
std::set<std::string> newProcs; std::set<std::string> newProcs;
uint64_t now = Util::bootMS(); //< Used for delayed starts
// used for building args // used for building args
int err = fileno(stderr); int err = fileno(stderr);
@ -664,6 +665,7 @@ namespace Mist{
jsonForEachConst(procs, it){ jsonForEachConst(procs, it){
JSON::Value tmp = *it; JSON::Value tmp = *it;
tmp["source"] = streamName; tmp["source"] = streamName;
std::string key = tmp.toString();
if (!M.getValidTracks().size() && if (!M.getValidTracks().size() &&
(!tmp.isMember("source_track") && !tmp.isMember("track_select"))){ (!tmp.isMember("source_track") && !tmp.isMember("track_select"))){
continue; continue;
@ -683,9 +685,9 @@ namespace Mist{
M, std::string("audio=none&video=none&subtitle=none&") + tmp["track_inhibit"].asStringRef()); M, std::string("audio=none&video=none&subtitle=none&") + tmp["track_inhibit"].asStringRef());
if (wouldSelect.size()){ if (wouldSelect.size()){
// Inhibit if there is a match and we're not already running. // Inhibit if there is a match and we're not already running.
if (!runningProcs.count(tmp.toString())){continue;} if (!runningProcs.count(key)){continue;}
bool inhibited = false; bool inhibited = false;
std::set<size_t> myTracks = M.getMySourceTracks(runningProcs[tmp.toString()]); std::set<size_t> myTracks = M.getMySourceTracks(runningProcs[key]);
// Also inhibit if there is a match with not-the-currently-running-process // Also inhibit if there is a match with not-the-currently-running-process
for (std::set<size_t>::iterator it = wouldSelect.begin(); it != wouldSelect.end(); ++it){ for (std::set<size_t>::iterator it = wouldSelect.begin(); it != wouldSelect.end(); ++it){
if (!myTracks.count(*it)){inhibited = true;} if (!myTracks.count(*it)){inhibited = true;}
@ -693,7 +695,8 @@ namespace Mist{
if (inhibited){continue;} if (inhibited){continue;}
} }
} }
newProcs.insert(tmp.toString()); // Mark process as should-be-active
newProcs.insert(key);
} }
// shut down deleted/changed processes // shut down deleted/changed processes
@ -706,6 +709,9 @@ namespace Mist{
Util::Procs::Stop(it->second); Util::Procs::Stop(it->second);
} }
runningProcs.erase(it); runningProcs.erase(it);
// If we stop a process this way, reset it's counter and delayed start time
procBoots.erase(it->first);
procNextBoot.erase(it->first);
if (!runningProcs.size()){break;} if (!runningProcs.size()){break;}
it = runningProcs.begin(); it = runningProcs.begin();
} }
@ -718,6 +724,41 @@ namespace Mist{
const std::string & config = (*newProcs.begin()); const std::string & config = (*newProcs.begin());
JSON::Value args = JSON::fromString(config); JSON::Value args = JSON::fromString(config);
if (!runningProcs.count(config) || !Util::Procs::isActive(runningProcs[config])){ if (!runningProcs.count(config) || !Util::Procs::isActive(runningProcs[config])){
// Check restart behaviour - default to instant (re)starts
std::string restartType = "fixed";
uint64_t restartDelay = 0;
if (args.isMember("restart_type")){
restartType = args["restart_type"].asString();
}
if (args.isMember("restart_delay")){
restartDelay = args["restart_delay"].asInt();
}
// Skip if restarts are disabled and this buffer has already booted an instance of this process
if (restartType == "disabled" && procBoots[config]){
VERYHIGH_MSG("Skipping process `%s`, as restarts are disabled", args["process"].asString().c_str());
newProcs.erase(newProcs.begin());
continue;
}
// Apply any delayed start time if we've booted before
if (restartDelay && procBoots[config] && !procNextBoot[config]){
if (restartType == "fixed"){
procNextBoot[config] = now + restartDelay;
}else if (restartType == "backoff"){
uint64_t thisTries = procBoots[config];
if (thisTries > 10){
thisTries = 10;
}
procNextBoot[config] = now + Util::expBackoffMs(thisTries, 10, restartDelay);
}
}
// Skip if we have a delayed start time
if (procNextBoot[config] > now){
VERYHIGH_MSG("Delaying start of process `%s`, %lu ms remaining", args["process"].asString().c_str(), procNextBoot[config] - now);
newProcs.erase(newProcs.begin());
continue;
}
std::string procname = std::string procname =
Util::getMyPath() + "MistProc" + JSON::fromString(config)["process"].asString(); Util::getMyPath() + "MistProc" + JSON::fromString(config)["process"].asString();
argarr[0] = (char *)procname.c_str(); argarr[0] = (char *)procname.c_str();
@ -736,6 +777,10 @@ namespace Mist{
allProcsRunning = false; allProcsRunning = false;
INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]); INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]);
runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err); runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err);
// Increment per-process boot counter
procBoots[*newProcs.begin()]++;
// Remove the delayed start counter
procNextBoot.erase(*newProcs.begin());
} }
newProcs.erase(newProcs.begin()); newProcs.erase(newProcs.begin());
} }

View file

@ -53,6 +53,8 @@ namespace Mist{
uint64_t findTrack(const std::string &trackVal); uint64_t findTrack(const std::string &trackVal);
void checkProcesses(const JSON::Value &procs); // LTS void checkProcesses(const JSON::Value &procs); // LTS
std::map<std::string, pid_t> runningProcs; // LTS std::map<std::string, pid_t> runningProcs; // LTS
std::map<std::string, uint32_t> procBoots;
std::map<std::string, uint64_t> procNextBoot;
std::set<size_t> generatePids; std::set<size_t> generatePids;
std::map<size_t, size_t> sourcePids; std::map<size_t, size_t> sourcePids;

View file

@ -716,6 +716,21 @@ int main(int argc, char *argv[]){
capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp"; capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp";
capa["ainfo"]["sourceTime"]["name"] = "Source timestamp"; capa["ainfo"]["sourceTime"]["name"] = "Source timestamp";
capa["optional"]["restart_delay"]["name"] = "Restart delay";
capa["optional"]["restart_delay"]["help"] = "The maximum amount of delay in milliseconds between restarts. If set to 0 it will restart immediately";
capa["optional"]["restart_delay"]["type"] = "int";
capa["optional"]["restart_delay"]["default"] = 0;
capa["optional"]["restart_type"]["name"] = "Restart behaviour";
capa["optional"]["restart_type"]["help"] = "When set to exponential backoff it will increase the delay up to the configured amount for each restart";
capa["optional"]["restart_type"]["type"] = "select";
capa["optional"]["restart_type"]["select"][0u][0u] = "fixed";
capa["optional"]["restart_type"]["select"][0u][1u] = "Fixed Delay";
capa["optional"]["restart_type"]["select"][1u][0u] = "backoff";
capa["optional"]["restart_type"]["select"][1u][1u] = "Exponential Backoff";
capa["optional"]["restart_type"]["select"][2u][0u] = "disabled";
capa["optional"]["restart_type"]["select"][2u][1u] = "Disabled";
capa["optional"]["restart_type"]["value"] = "fixed";
std::cout << capa.toString() << std::endl; std::cout << capa.toString() << std::endl;
return -1; return -1;