From 9f97610e470c0c1ab849f851d3d534371056aa2b Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Mon, 10 Jul 2023 15:20:01 +0200 Subject: [PATCH] Add configurable restart behaviour for processes --- src/input/input_buffer.cpp | 51 ++++++++++++++++++++++++++++++-- src/input/input_buffer.h | 2 ++ src/process/process_livepeer.cpp | 15 ++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index d75b86f0..dc1ef4b0 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -655,6 +655,7 @@ namespace Mist{ allProcsRunning = true; if (!M.getValidTracks().size()){return;} std::set newProcs; + uint64_t now = Util::bootMS(); //< Used for delayed starts // used for building args int err = fileno(stderr); @@ -664,6 +665,7 @@ namespace Mist{ jsonForEachConst(procs, it){ JSON::Value tmp = *it; tmp["source"] = streamName; + std::string key = tmp.toString(); if (!M.getValidTracks().size() && (!tmp.isMember("source_track") && !tmp.isMember("track_select"))){ continue; @@ -683,9 +685,9 @@ namespace Mist{ M, std::string("audio=none&video=none&subtitle=none&") + tmp["track_inhibit"].asStringRef()); if (wouldSelect.size()){ // Inhibit if there is a match and we're not already running. - if (!runningProcs.count(tmp.toString())){continue;} + if (!runningProcs.count(key)){continue;} bool inhibited = false; - std::set myTracks = M.getMySourceTracks(runningProcs[tmp.toString()]); + std::set myTracks = M.getMySourceTracks(runningProcs[key]); // Also inhibit if there is a match with not-the-currently-running-process for (std::set::iterator it = wouldSelect.begin(); it != wouldSelect.end(); ++it){ if (!myTracks.count(*it)){inhibited = true;} @@ -693,7 +695,8 @@ namespace Mist{ if (inhibited){continue;} } } - newProcs.insert(tmp.toString()); + // Mark process as should-be-active + newProcs.insert(key); } // shut down deleted/changed processes @@ -706,6 +709,9 @@ namespace Mist{ Util::Procs::Stop(it->second); } runningProcs.erase(it); + // If we stop a process this way, reset it's counter and delayed start time + procBoots.erase(it->first); + procNextBoot.erase(it->first); if (!runningProcs.size()){break;} it = runningProcs.begin(); } @@ -718,6 +724,41 @@ namespace Mist{ const std::string & config = (*newProcs.begin()); JSON::Value args = JSON::fromString(config); if (!runningProcs.count(config) || !Util::Procs::isActive(runningProcs[config])){ + // Check restart behaviour - default to instant (re)starts + std::string restartType = "fixed"; + uint64_t restartDelay = 0; + if (args.isMember("restart_type")){ + restartType = args["restart_type"].asString(); + } + if (args.isMember("restart_delay")){ + restartDelay = args["restart_delay"].asInt(); + } + + // Skip if restarts are disabled and this buffer has already booted an instance of this process + if (restartType == "disabled" && procBoots[config]){ + VERYHIGH_MSG("Skipping process `%s`, as restarts are disabled", args["process"].asString().c_str()); + newProcs.erase(newProcs.begin()); + continue; + } + // Apply any delayed start time if we've booted before + if (restartDelay && procBoots[config] && !procNextBoot[config]){ + if (restartType == "fixed"){ + procNextBoot[config] = now + restartDelay; + }else if (restartType == "backoff"){ + uint64_t thisTries = procBoots[config]; + if (thisTries > 10){ + thisTries = 10; + } + procNextBoot[config] = now + Util::expBackoffMs(thisTries, 10, restartDelay); + } + } + // Skip if we have a delayed start time + if (procNextBoot[config] > now){ + VERYHIGH_MSG("Delaying start of process `%s`, %lu ms remaining", args["process"].asString().c_str(), procNextBoot[config] - now); + newProcs.erase(newProcs.begin()); + continue; + } + std::string procname = Util::getMyPath() + "MistProc" + JSON::fromString(config)["process"].asString(); argarr[0] = (char *)procname.c_str(); @@ -736,6 +777,10 @@ namespace Mist{ allProcsRunning = false; INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]); runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err); + // Increment per-process boot counter + procBoots[*newProcs.begin()]++; + // Remove the delayed start counter + procNextBoot.erase(*newProcs.begin()); } newProcs.erase(newProcs.begin()); } diff --git a/src/input/input_buffer.h b/src/input/input_buffer.h index 9e53b26b..52c2fd88 100644 --- a/src/input/input_buffer.h +++ b/src/input/input_buffer.h @@ -53,6 +53,8 @@ namespace Mist{ uint64_t findTrack(const std::string &trackVal); void checkProcesses(const JSON::Value &procs); // LTS std::map runningProcs; // LTS + std::map procBoots; + std::map procNextBoot; std::set generatePids; std::map sourcePids; diff --git a/src/process/process_livepeer.cpp b/src/process/process_livepeer.cpp index 54a77e37..e494567a 100644 --- a/src/process/process_livepeer.cpp +++ b/src/process/process_livepeer.cpp @@ -716,6 +716,21 @@ int main(int argc, char *argv[]){ capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp"; capa["ainfo"]["sourceTime"]["name"] = "Source timestamp"; + capa["optional"]["restart_delay"]["name"] = "Restart delay"; + capa["optional"]["restart_delay"]["help"] = "The maximum amount of delay in milliseconds between restarts. If set to 0 it will restart immediately"; + capa["optional"]["restart_delay"]["type"] = "int"; + capa["optional"]["restart_delay"]["default"] = 0; + + capa["optional"]["restart_type"]["name"] = "Restart behaviour"; + capa["optional"]["restart_type"]["help"] = "When set to exponential backoff it will increase the delay up to the configured amount for each restart"; + capa["optional"]["restart_type"]["type"] = "select"; + capa["optional"]["restart_type"]["select"][0u][0u] = "fixed"; + capa["optional"]["restart_type"]["select"][0u][1u] = "Fixed Delay"; + capa["optional"]["restart_type"]["select"][1u][0u] = "backoff"; + capa["optional"]["restart_type"]["select"][1u][1u] = "Exponential Backoff"; + capa["optional"]["restart_type"]["select"][2u][0u] = "disabled"; + capa["optional"]["restart_type"]["select"][2u][1u] = "Disabled"; + capa["optional"]["restart_type"]["value"] = "fixed"; std::cout << capa.toString() << std::endl; return -1;