From 3fe345780ddefa63db02f3c11b7f3591a1c1f13e Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Wed, 17 Jul 2024 11:29:26 +0200
Subject: [PATCH] Allow setting processes as inconsequential, which means the
 stream will be considered fully active regardless of process status; add
 process start behaviour config group to all processes

---
 src/input/input_buffer.cpp       |  5 +--
 src/process/meson.build          |  8 ++---
 src/process/process.hpp          | 40 +++++++++++++++++++++
 src/process/process_av.cpp       | 10 ++----
 src/process/process_exec.cpp     | 12 ++-----
 src/process/process_ffmpeg.cpp   | 16 ++++-----
 src/process/process_livepeer.cpp | 62 ++++++++++++--------------------
 7 files changed, 79 insertions(+), 74 deletions(-)
 create mode 100644 src/process/process.hpp

diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp
index dc1ef4b0..9eac0ef8 100644
--- a/src/input/input_buffer.cpp
+++ b/src/input/input_buffer.cpp
@@ -682,7 +682,7 @@ namespace Mist{
       }
       if (tmp.isMember("track_inhibit")){
         std::set<size_t> wouldSelect = Util::wouldSelect(
-            M, std::string("audio=none&video=none&subtitle=none&") + tmp["track_inhibit"].asStringRef());
+            M, std::string("audio=none&video=none&subtitle=none&meta=none&") + tmp["track_inhibit"].asStringRef());
         if (wouldSelect.size()){
           // Inhibit if there is a match and we're not already running.
           if (!runningProcs.count(key)){continue;}
@@ -774,7 +774,8 @@ namespace Mist{
           argarr[3] = (char*)debugLvl.c_str();;
           argarr[4] = 0;
         }
-        allProcsRunning = false;
+        // Only count process as not-running if it's not inconsequential
+        if (!args.isMember("inconsequential") || !args["inconsequential"].asBool()){allProcsRunning = false;}
         INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]);
         runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err);
         // Increment per-process boot counter
diff --git a/src/process/meson.build b/src/process/meson.build
index c0fb1fb7..a553989e 100644
--- a/src/process/meson.build
+++ b/src/process/meson.build
@@ -14,7 +14,7 @@ process_common_dep = declare_dependency(link_with: process_common)
 executables += {
     'name' : 'MistProcFFMPEG',
     'sources' : [
-        files('process_ffmpeg.cpp'),
+        files('process.hpp', 'process_ffmpeg.cpp'),
         header_tgts
     ],
     'deps' :[libmist_dep, process_common_dep],
@@ -24,7 +24,7 @@ executables += {
 executables += {
     'name' : 'MistProcMKVExec',
     'sources' : [
-        files('process_exec.cpp'),
+        files('process.hpp', 'process_exec.cpp'),
         header_tgts
     ],
     'deps' :[libmist_dep, process_common_dep],
@@ -34,7 +34,7 @@ executables += {
 executables += {
     'name' : 'MistProcLivepeer',
     'sources' : [
-        files('process_livepeer.cpp'),
+        files('process.hpp', 'process_livepeer.cpp'),
         input_cpp,
         output_http_cpp,
         output_ts_base_cpp,
@@ -50,7 +50,7 @@ if get_option('WITH_AV')
     executables += {
         'name' : 'MistProcAV',
         'sources' : [
-            files('process_av.cpp'),
+            files('process.hpp', 'process_av.cpp'),
             header_tgts,
             io_cpp,
             input_cpp,
diff --git a/src/process/process.hpp b/src/process/process.hpp
new file mode 100644
index 00000000..61fdf7f7
--- /dev/null
+++ b/src/process/process.hpp
@@ -0,0 +1,40 @@
+#include <mist/json.h>
+
+void addGenericProcessOptions(JSON::Value & capa){
+  capa["optional"]["start_control"]["name"] = "Process start behaviour";
+  capa["optional"]["start_control"]["type"] = "group";
+  capa["optional"]["start_control"]["help"] = "Control when the process starts";
+  capa["optional"]["start_control"]["sort"] = "aaaa";
+  {
+    JSON::Value &grp = capa["optional"]["start_control"]["options"];
+    grp["restart_delay"]["name"] = "Restart delay";
+    grp["restart_delay"]["help"] = "The time in milliseconds between restarts. If set to 0 it will restart immediately";
+    grp["restart_delay"]["type"] = "uint";
+    grp["restart_delay"]["unit"] = "ms";
+    grp["restart_delay"]["default"] = 0;
+
+    grp["restart_type"]["name"] = "Restart behaviour";
+    grp["restart_type"]["help"] = "What to do when the process exits or fails for any reason. Fixed waits the restart delay every time. Exponential backoff will increase the delay up to max the configured delay for each restart. Disabled will not restart the process.";
+    grp["restart_type"]["type"] = "select";
+    grp["restart_type"]["select"][0u][0u] = "fixed";
+    grp["restart_type"]["select"][0u][1u] = "Fixed delay";
+    grp["restart_type"]["select"][1u][0u] = "backoff";
+    grp["restart_type"]["select"][1u][1u] = "Exponential backoff";
+    grp["restart_type"]["select"][2u][0u] = "disabled";
+    grp["restart_type"]["select"][2u][1u] = "Disabled";
+    grp["restart_type"]["value"] = "fixed";
+
+    grp["track_inhibit"]["name"] = "Track inhibitor(s)";
+    grp["track_inhibit"]["help"] =
+        "What tracks to use as inhibitors. If this track selector is able to select a track, the "
+        "process does not start. Defaults to none.";
+    grp["track_inhibit"]["type"] = "string";
+    grp["track_inhibit"]["validate"][0u] = "track_selector";
+    grp["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
+
+    grp["inconsequential"]["name"] = "Inconsequential process";
+    grp["inconsequential"]["help"] = "If set, this process need not be running for a stream to be considered fully active.";
+    grp["inconsequential"]["default"] = false;
+  }
+}
+
diff --git a/src/process/process_av.cpp b/src/process/process_av.cpp
index 07be5211..ec58425a 100644
--- a/src/process/process_av.cpp
+++ b/src/process/process_av.cpp
@@ -1,4 +1,5 @@
 #include "process_av.h"
+#include "process.hpp"
 #include <mist/procs.h>
 #include <mist/h264.h>
 #include <mist/mp4_generic.h>
@@ -1788,6 +1789,7 @@ int main(int argc, char *argv[]){
     capa["name"] = "AV";
     capa["hrn"] = "Encoder: libav (ffmpeg library)";
     capa["desc"] = "Generic video encoder that directly integrates with the ffmpeg library rather than calling a binary and transmuxing twice";
+    addGenericProcessOptions(capa);
 
     capa["optional"]["source_mask"]["name"] = "Source track mask";
     capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
@@ -1844,14 +1846,6 @@ int main(int argc, char *argv[]){
     capa["optional"]["track_select"]["validate"][0u] = "track_selector";
     capa["optional"]["track_select"]["default"] = "audio=all&video=all";
 
-    capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)";
-    capa["optional"]["track_inhibit"]["help"] =
-        "What tracks to use as inhibitors. If this track selector is able to select a track, the "
-        "process does not start. Defaults to none.";
-    capa["optional"]["track_inhibit"]["type"] = "string";
-    capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector";
-    capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
-
     capa["optional"]["gopsize"]["name"] = "GOP Size";
     capa["optional"]["gopsize"]["help"] = "Amount of frames before a new keyframe is sent";
     capa["optional"]["gopsize"]["type"] = "uint";
diff --git a/src/process/process_exec.cpp b/src/process/process_exec.cpp
index 3d804cd1..6a6b27d5 100644
--- a/src/process/process_exec.cpp
+++ b/src/process/process_exec.cpp
@@ -1,6 +1,5 @@
 #include "process_exec.h"
-#include <algorithm> //for std::find
-#include <fstream>
+#include "process.hpp"
 #include <mist/procs.h>
 #include <mist/tinythread.h>
 #include <mist/util.h>
@@ -321,6 +320,7 @@ int main(int argc, char *argv[]){
 
     capa["name"] = "MKVExec";
     capa["desc"] = "Pipe MKV in, expect MKV out. You choose the executable in between yourself.";
+    addGenericProcessOptions(capa);
 
     capa["optional"]["source_mask"]["name"] = "Source track mask";
     capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
@@ -381,14 +381,6 @@ int main(int argc, char *argv[]){
     capa["optional"]["track_select"]["validate"][0u] = "track_selector";
     capa["optional"]["track_select"]["default"] = "audio=all&video=all";
 
-    capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)";
-    capa["optional"]["track_inhibit"]["help"] =
-        "What tracks to use as inhibitors. If this track selector is able to select a track, the "
-        "process does not start. Defaults to none.";
-    capa["optional"]["track_inhibit"]["type"] = "string";
-    capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector";
-    capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
-
     std::cout << capa.toString() << std::endl;
     return -1;
   }
diff --git a/src/process/process_ffmpeg.cpp b/src/process/process_ffmpeg.cpp
index 582cc3b6..75dc0419 100644
--- a/src/process/process_ffmpeg.cpp
+++ b/src/process/process_ffmpeg.cpp
@@ -1,6 +1,6 @@
 #include "process_ffmpeg.h"
+#include "process.hpp"
 #include <algorithm> //for std::find
-#include <fstream>
 #include <mist/defines.h>
 #include <mist/procs.h>
 #include <mist/stream.h>
@@ -235,6 +235,7 @@ int main(int argc, char *argv[]){
     capa["hrn"] = "Encoder: FFMPEG";                                     // human readable name
     capa["desc"] = "Use a local FFMPEG installed binary to do encoding"; // description
     capa["sort"] = "sort"; // sort the parameters by this key
+    addGenericProcessOptions(capa);
 
     capa["optional"]["source_mask"]["name"] = "Source track mask";
     capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
@@ -281,6 +282,10 @@ int main(int argc, char *argv[]){
     capa["optional"]["exit_unmask"]["default"] = false;
     capa["optional"]["exit_unmask"]["sort"] = "dda";
 
+    capa["optional"]["inconsequential"]["name"] = "Inconsequential process";
+    capa["optional"]["inconsequential"]["help"] = "If set, this process need not be running for a stream to be considered fully active.";
+    capa["optional"]["inconsequential"]["default"] = false;
+
     capa["required"]["x-LSP-kind"]["name"] = "Input type"; // human readable name of option
     capa["required"]["x-LSP-kind"]["help"] = "The type of input to use"; // extra information
     capa["required"]["x-LSP-kind"]["type"] = "select";          // type of input field to use
@@ -489,15 +494,6 @@ int main(int argc, char *argv[]){
     capa["optional"]["sources"]["sort"] = "baa";
     capa["optional"]["sources"]["dependent"]["x-LSP-kind"] = "video";
 
-    capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)";
-    capa["optional"]["track_inhibit"]["sort"] = "aba";
-    capa["optional"]["track_inhibit"]["help"] =
-        "What tracks to use as inhibitors. If this track selector is able to select a track, the "
-        "process does not start. Defaults to none.";
-    capa["optional"]["track_inhibit"]["type"] = "string";
-    capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector";
-    capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
-
     capa["codecs"][0u][0u].append("H264");
     capa["codecs"][0u][0u].append("HEVC");
     capa["codecs"][0u][0u].append("VP8");
diff --git a/src/process/process_livepeer.cpp b/src/process/process_livepeer.cpp
index e494567a..047fabdd 100644
--- a/src/process/process_livepeer.cpp
+++ b/src/process/process_livepeer.cpp
@@ -1,7 +1,6 @@
-#include <algorithm> //for std::find
-#include <fstream>
-#include <mist/timing.h>
 #include "process_livepeer.h"
+#include "process.hpp"
+#include <mist/timing.h>
 #include <mist/procs.h>
 #include <mist/util.h>
 #include <mist/downloader.h>
@@ -107,23 +106,25 @@ namespace Mist{
       }
       presegs[currPreSeg].data.append(tsData, len);
     };
-    virtual void initialSeek(){
+    virtual void initialSeek(bool dryRun = false){
       if (!meta){return;}
-      if (opt.isMember("source_mask") && !opt["source_mask"].isNull() && opt["source_mask"].asString() != ""){
-        uint64_t sourceMask = opt["source_mask"].asInt();
-        if (userSelect.size()){
-          for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
-            INFO_MSG("Masking source track %zu to %" PRIu64, it->first, sourceMask);
-            meta.validateTrack(it->first, sourceMask);
+      if (!dryRun){
+        if (opt.isMember("source_mask") && !opt["source_mask"].isNull() && opt["source_mask"].asString() != ""){
+          uint64_t sourceMask = opt["source_mask"].asInt();
+          if (userSelect.size()){
+            for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
+              INFO_MSG("Masking source track %zu to %" PRIu64, it->first, sourceMask);
+              meta.validateTrack(it->first, sourceMask);
+            }
           }
         }
+        if (!meta.getLive() || opt["leastlive"].asBool()){
+          INFO_MSG("Seeking to earliest point in stream");
+          seek(0);
+          return;
+        }
       }
-      if (!meta.getLive() || opt["leastlive"].asBool()){
-        INFO_MSG("Seeking to earliest point in stream");
-        seek(0);
-        return;
-      }
-      Output::initialSeek();
+      Output::initialSeek(dryRun);
     }
     void sendNext(){
       {
@@ -554,6 +555,7 @@ int main(int argc, char *argv[]){
 
     capa["name"] = "Livepeer";
     capa["desc"] = "Use livepeer to transcode video.";
+    addGenericProcessOptions(capa);
 
     capa["optional"]["source_mask"]["name"] = "Source track mask";
     capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
@@ -597,6 +599,10 @@ int main(int argc, char *argv[]){
     capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)";
     capa["optional"]["exit_unmask"]["default"] = false;
 
+    capa["optional"]["inconsequential"]["name"] = "Inconsequential process";
+    capa["optional"]["inconsequential"]["help"] = "If set, this process need not be running for a stream to be considered fully active.";
+    capa["optional"]["inconsequential"]["default"] = false;
+
     capa["optional"]["sink"]["name"] = "Target stream";
     capa["optional"]["sink"]["help"] = "What stream the encoded track should be added to. Defaults "
                                        "to source stream. May contain variables.";
@@ -694,14 +700,6 @@ int main(int argc, char *argv[]){
       grp["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
     }
 
-    capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)";
-    capa["optional"]["track_inhibit"]["help"] =
-        "What tracks to use as inhibitors. If this track selector is able to select a track, the "
-        "process does not start. Defaults to none.";
-    capa["optional"]["track_inhibit"]["type"] = "string";
-    capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector";
-    capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
-
     capa["optional"]["debug"]["name"] = "Debug level";
     capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
     capa["optional"]["debug"]["type"] = "debug";
@@ -716,22 +714,6 @@ int main(int argc, char *argv[]){
     capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp";
     capa["ainfo"]["sourceTime"]["name"] = "Source timestamp";
 
-    capa["optional"]["restart_delay"]["name"] = "Restart delay";
-    capa["optional"]["restart_delay"]["help"] = "The maximum amount of delay in milliseconds between restarts. If set to 0 it will restart immediately";
-    capa["optional"]["restart_delay"]["type"] = "int";
-    capa["optional"]["restart_delay"]["default"] = 0;
-
-    capa["optional"]["restart_type"]["name"] = "Restart behaviour";
-    capa["optional"]["restart_type"]["help"] = "When set to exponential backoff it will increase the delay up to the configured amount for each restart";
-    capa["optional"]["restart_type"]["type"] = "select";
-    capa["optional"]["restart_type"]["select"][0u][0u] = "fixed";
-    capa["optional"]["restart_type"]["select"][0u][1u] = "Fixed Delay";
-    capa["optional"]["restart_type"]["select"][1u][0u] = "backoff";
-    capa["optional"]["restart_type"]["select"][1u][1u] = "Exponential Backoff";
-    capa["optional"]["restart_type"]["select"][2u][0u] = "disabled";
-    capa["optional"]["restart_type"]["select"][2u][1u] = "Disabled";
-    capa["optional"]["restart_type"]["value"] = "fixed";
-
     std::cout << capa.toString() << std::endl;
     return -1;
   }