From 974380ab30c7b75a464d79053be27d6b357053ee Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Fri, 4 Dec 2020 16:26:57 +0100
Subject: [PATCH] Converted comms system entirely to being bitflag-based
 instead of integer state based

---
 lib/comms.cpp              | 14 +++++++++-----
 lib/comms.h                | 12 ++++++------
 src/input/input.cpp        |  4 ++--
 src/input/input_buffer.cpp |  4 ++--
 src/input/input_rtsp.cpp   |  4 ++--
 src/input/input_ts.cpp     |  2 +-
 src/output/output.cpp      |  6 +++---
 src/output/output_http.cpp |  2 +-
 src/output/output_rtsp.cpp |  2 +-
 src/utils/util_nuke.cpp    |  2 +-
 10 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/lib/comms.cpp b/lib/comms.cpp
index 21dfc810..30863071 100644
--- a/lib/comms.cpp
+++ b/lib/comms.cpp
@@ -16,7 +16,9 @@ namespace Comms{
   }
 
   Comms::~Comms(){
-    if (index != INVALID_RECORD_INDEX){setStatus(COMM_STATUS_DISCONNECT);}
+    if (index != INVALID_RECORD_INDEX){
+      setStatus(COMM_STATUS_DISCONNECT | getStatus());
+    }
     if (master){
       if (dataPage.mapped){
         finishAll();
@@ -69,13 +71,13 @@ namespace Comms{
     do{
       keepGoing = false;
       for (size_t i = 0; i < recordCount(); i++){
-        if (getStatus(i) == COMM_STATUS_INVALID || getStatus(i) == COMM_STATUS_DISCONNECT){continue;}
+        if (getStatus(i) == COMM_STATUS_INVALID || (getStatus(i) & COMM_STATUS_DISCONNECT)){continue;}
         uint64_t cPid = getPid(i);
         if (cPid > 1){
           Util::Procs::Stop(cPid); // soft kill
           keepGoing = true;
         }
-        setStatus(COMM_STATUS_REQDISCONNECT, i);
+        setStatus(COMM_STATUS_REQDISCONNECT | getStatus(i), i);
       }
       if (keepGoing){Util::sleep(250);}
     }while (keepGoing && ++c < 8);
@@ -83,7 +85,7 @@ namespace Comms{
 
   Comms::operator bool() const{
     if (master){return dataPage;}
-    return dataPage && (getStatus() != COMM_STATUS_INVALID) && (getStatus() != COMM_STATUS_DISCONNECT);
+    return dataPage && (getStatus() != COMM_STATUS_INVALID) && !(getStatus() & COMM_STATUS_DISCONNECT);
   }
 
   void Comms::setMaster(bool _master){
@@ -142,7 +144,9 @@ namespace Comms{
   Statistics::Statistics() : Comms(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);}
 
   void Statistics::unload(){
-    if (index != INVALID_RECORD_INDEX){setStatus(COMM_STATUS_DISCONNECT);}
+    if (index != INVALID_RECORD_INDEX){
+      setStatus(COMM_STATUS_DISCONNECT | getStatus());
+    }
     index = INVALID_RECORD_INDEX;
   }
 
diff --git a/lib/comms.h b/lib/comms.h
index 7e863b2e..dc1c5757 100644
--- a/lib/comms.h
+++ b/lib/comms.h
@@ -3,12 +3,12 @@
 #include "shared_memory.h"
 #include "util.h"
 
-#define COMM_STATUS_DONOTTRACK 0x40
 #define COMM_STATUS_SOURCE 0x80
-#define COMM_STATUS_REQDISCONNECT 0xFD
-#define COMM_STATUS_DISCONNECT 0xFE
-#define COMM_STATUS_INVALID 0x0
+#define COMM_STATUS_DONOTTRACK 0x40
+#define COMM_STATUS_DISCONNECT 0x20
+#define COMM_STATUS_REQDISCONNECT 0x10
 #define COMM_STATUS_ACTIVE 0x1
+#define COMM_STATUS_INVALID 0x0
 
 
 #define COMM_LOOP(comm, onActive, onDisconnect) \
@@ -16,10 +16,10 @@
     for (size_t id = 0; id < comm.recordCount(); id++){\
       if (comm.getStatus(id) == COMM_STATUS_INVALID){continue;}\
       if (!Util::Procs::isRunning(comm.getPid(id))){\
-        comm.setStatus(COMM_STATUS_DISCONNECT, id);\
+        comm.setStatus(COMM_STATUS_DISCONNECT | comm.getStatus(id), id);\
       }\
       onActive;\
-      if (comm.getStatus(id) == COMM_STATUS_DISCONNECT){\
+      if (comm.getStatus(id) & COMM_STATUS_DISCONNECT){\
         onDisconnect;\
         comm.setStatus(COMM_STATUS_INVALID, id);\
       }\
diff --git a/src/input/input.cpp b/src/input/input.cpp
index 4ce51d33..ce50bc78 100644
--- a/src/input/input.cpp
+++ b/src/input/input.cpp
@@ -860,7 +860,7 @@ namespace Mist{
       userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
     }
     while (thisPacket && config->is_active && userSelect[idx]){
-      if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){
+      if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){
         Util::logExitReason("buffer requested shutdown");
         break;
       }
@@ -933,7 +933,7 @@ namespace Mist{
         // Connect to stats for INPUT detection
         if (!statComm){statComm.reload();}
         if (statComm){
-          if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){
+          if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
             config->is_active = false;
             Util::logExitReason("received shutdown request from controller");
             return;
diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp
index 01835d5c..96b7e363 100644
--- a/src/input/input_buffer.cpp
+++ b/src/input/input_buffer.cpp
@@ -393,7 +393,7 @@ namespace Mist{
       if (!(users.getStatus(i) & COMM_STATUS_SOURCE)){continue;}
       if (users.getTrack(i) != tid){continue;}
       // We have found the right track here (pid matches, and COMM_STATUS_SOURCE set)
-      users.setStatus(COMM_STATUS_REQDISCONNECT, i);
+      users.setStatus(COMM_STATUS_REQDISCONNECT | users.getStatus(i), i);
       break;
     }
 
@@ -518,7 +518,7 @@ namespace Mist{
   void inputBuffer::userOnActive(size_t id){
     ///\todo Add tracing of earliest watched keys, to prevent data going out of memory for
     /// still-watching viewers
-    if (users.getStatus(id) != COMM_STATUS_DISCONNECT && users.getStatus(id) & COMM_STATUS_SOURCE){
+    if (!(users.getStatus(id) & COMM_STATUS_DISCONNECT) && (users.getStatus(id) & COMM_STATUS_SOURCE)){
       sourcePids[users.getPid(id)].insert(users.getTrack(id));
       // GeneratePids holds the pids of the process that generate data, so ignore those for determining if a push is ingested.
       if (M.trackValid(users.getTrack(id)) && !generatePids.count(users.getPid(id))){hasPush = true;}
diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp
index c14543ba..ce48c0ef 100644
--- a/src/input/input_rtsp.cpp
+++ b/src/input/input_rtsp.cpp
@@ -212,7 +212,7 @@ namespace Mist{
         // Connect to stats for INPUT detection
         statComm.reload();
         if (statComm){
-          if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){
+          if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
             config->is_active = false;
             Util::logExitReason("received shutdown request from controller");
             return;
@@ -403,7 +403,7 @@ namespace Mist{
         WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx);
         userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
       }
-      if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){
+      if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){
         Util::logExitReason("buffer requested shutdown");
         tcpCon.close();
       }
diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp
index c32c13cf..28ccc825 100644
--- a/src/input/input_ts.cpp
+++ b/src/input/input_ts.cpp
@@ -146,7 +146,7 @@ void parseThread(void *mistIn){
     threadTimer.erase(tid);
   }
   liveStream.eraseTrack(tid);
-  if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT);}
+  if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT | userConn.getStatus());}
 }
 
 namespace Mist{
diff --git a/src/output/output.cpp b/src/output/output.cpp
index e8771cc8..c8db21e0 100644
--- a/src/output/output.cpp
+++ b/src/output/output.cpp
@@ -1365,7 +1365,7 @@ namespace Mist{
     /*LTS-END*/
 
     stats(true);
-    if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);}
+    if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT | statComm.getStatus());}
 
     userSelect.clear();
 
@@ -1676,7 +1676,7 @@ namespace Mist{
     HIGH_MSG("Writing stats: %s, %s, %u, %lu, %lu", getConnectedHost().c_str(), streamName.c_str(),
              crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown());
     /*LTS-START*/
-    if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){
+    if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
       onFail("Shutting down on controller request");
       return;
     }
@@ -1709,7 +1709,7 @@ namespace Mist{
 
     if (isPushing()){
       for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
-        if (it->second.getStatus() == COMM_STATUS_REQDISCONNECT){
+        if (it->second.getStatus() & COMM_STATUS_REQDISCONNECT){
           if (dropPushTrack(it->second.getTrack(), "disconnect request from buffer")){break;}
         }
         if (!it->second){
diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp
index c6d8c802..8a1c3bd1 100644
--- a/src/output/output_http.cpp
+++ b/src/output/output_http.cpp
@@ -227,7 +227,7 @@ namespace Mist{
                    streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
         streamName = H.GetVar("stream");
         userSelect.clear();
-        if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);}
+        if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT | statComm.getStatus());}
         reConnector(handler);
         onFail("Server error - could not start connector", true);
         return;
diff --git a/src/output/output_rtsp.cpp b/src/output/output_rtsp.cpp
index 433a219d..4acd29fc 100644
--- a/src/output/output_rtsp.cpp
+++ b/src/output/output_rtsp.cpp
@@ -361,7 +361,7 @@ namespace Mist{
         HTTP_S.SetHeader("Cache-Control", "no-cache");
         if (trackNo != INVALID_TRACK_ID){
           userSelect[trackNo].reload(streamName, trackNo);
-          if (isPushing()){userSelect[trackNo].setStatus(COMM_STATUS_SOURCE);}
+          if (isPushing()){userSelect[trackNo].setStatus(COMM_STATUS_SOURCE | userSelect[trackNo].getStatus());}
           SDP::Track &sdpTrack = sdpState.tracks[trackNo];
           if (sdpTrack.channel != -1){expectTCP = true;}
           HTTP_S.SetHeader("Transport", sdpTrack.transportString);
diff --git a/src/utils/util_nuke.cpp b/src/utils/util_nuke.cpp
index 1358783b..c4fbc944 100644
--- a/src/utils/util_nuke.cpp
+++ b/src/utils/util_nuke.cpp
@@ -138,7 +138,7 @@ int main(int argc, char **argv){
     for (size_t i = 0; i < cleanUsers.recordCount(); ++i){
       uint8_t status = cleanUsers.getStatus(i);
       cleanUsers.setStatus(COMM_STATUS_INVALID, i);
-      if (status != COMM_STATUS_INVALID && status != COMM_STATUS_DISCONNECT){
+      if (status != COMM_STATUS_INVALID && !(status & COMM_STATUS_DISCONNECT)){
         pid_t pid = cleanUsers.getPid(i);
         if (pid > 1){
           Util::Procs::Stop(pid);