diff --git a/lib/comms.cpp b/lib/comms.cpp index 21dfc810..30863071 100644 --- a/lib/comms.cpp +++ b/lib/comms.cpp @@ -16,7 +16,9 @@ namespace Comms{ } Comms::~Comms(){ - if (index != INVALID_RECORD_INDEX){setStatus(COMM_STATUS_DISCONNECT);} + if (index != INVALID_RECORD_INDEX){ + setStatus(COMM_STATUS_DISCONNECT | getStatus()); + } if (master){ if (dataPage.mapped){ finishAll(); @@ -69,13 +71,13 @@ namespace Comms{ do{ keepGoing = false; for (size_t i = 0; i < recordCount(); i++){ - if (getStatus(i) == COMM_STATUS_INVALID || getStatus(i) == COMM_STATUS_DISCONNECT){continue;} + if (getStatus(i) == COMM_STATUS_INVALID || (getStatus(i) & COMM_STATUS_DISCONNECT)){continue;} uint64_t cPid = getPid(i); if (cPid > 1){ Util::Procs::Stop(cPid); // soft kill keepGoing = true; } - setStatus(COMM_STATUS_REQDISCONNECT, i); + setStatus(COMM_STATUS_REQDISCONNECT | getStatus(i), i); } if (keepGoing){Util::sleep(250);} }while (keepGoing && ++c < 8); @@ -83,7 +85,7 @@ namespace Comms{ Comms::operator bool() const{ if (master){return dataPage;} - return dataPage && (getStatus() != COMM_STATUS_INVALID) && (getStatus() != COMM_STATUS_DISCONNECT); + return dataPage && (getStatus() != COMM_STATUS_INVALID) && !(getStatus() & COMM_STATUS_DISCONNECT); } void Comms::setMaster(bool _master){ @@ -142,7 +144,9 @@ namespace Comms{ Statistics::Statistics() : Comms(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);} void Statistics::unload(){ - if (index != INVALID_RECORD_INDEX){setStatus(COMM_STATUS_DISCONNECT);} + if (index != INVALID_RECORD_INDEX){ + setStatus(COMM_STATUS_DISCONNECT | getStatus()); + } index = INVALID_RECORD_INDEX; } diff --git a/lib/comms.h b/lib/comms.h index 7e863b2e..dc1c5757 100644 --- a/lib/comms.h +++ b/lib/comms.h @@ -3,12 +3,12 @@ #include "shared_memory.h" #include "util.h" -#define COMM_STATUS_DONOTTRACK 0x40 #define COMM_STATUS_SOURCE 0x80 -#define COMM_STATUS_REQDISCONNECT 0xFD -#define COMM_STATUS_DISCONNECT 0xFE -#define COMM_STATUS_INVALID 0x0 +#define COMM_STATUS_DONOTTRACK 0x40 +#define COMM_STATUS_DISCONNECT 0x20 +#define COMM_STATUS_REQDISCONNECT 0x10 #define COMM_STATUS_ACTIVE 0x1 +#define COMM_STATUS_INVALID 0x0 #define COMM_LOOP(comm, onActive, onDisconnect) \ @@ -16,10 +16,10 @@ for (size_t id = 0; id < comm.recordCount(); id++){\ if (comm.getStatus(id) == COMM_STATUS_INVALID){continue;}\ if (!Util::Procs::isRunning(comm.getPid(id))){\ - comm.setStatus(COMM_STATUS_DISCONNECT, id);\ + comm.setStatus(COMM_STATUS_DISCONNECT | comm.getStatus(id), id);\ }\ onActive;\ - if (comm.getStatus(id) == COMM_STATUS_DISCONNECT){\ + if (comm.getStatus(id) & COMM_STATUS_DISCONNECT){\ onDisconnect;\ comm.setStatus(COMM_STATUS_INVALID, id);\ }\ diff --git a/src/input/input.cpp b/src/input/input.cpp index 4ce51d33..ce50bc78 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -860,7 +860,7 @@ namespace Mist{ userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } while (thisPacket && config->is_active && userSelect[idx]){ - if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ + if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){ Util::logExitReason("buffer requested shutdown"); break; } @@ -933,7 +933,7 @@ namespace Mist{ // Connect to stats for INPUT detection if (!statComm){statComm.reload();} if (statComm){ - if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ + if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 01835d5c..96b7e363 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -393,7 +393,7 @@ namespace Mist{ if (!(users.getStatus(i) & COMM_STATUS_SOURCE)){continue;} if (users.getTrack(i) != tid){continue;} // We have found the right track here (pid matches, and COMM_STATUS_SOURCE set) - users.setStatus(COMM_STATUS_REQDISCONNECT, i); + users.setStatus(COMM_STATUS_REQDISCONNECT | users.getStatus(i), i); break; } @@ -518,7 +518,7 @@ namespace Mist{ void inputBuffer::userOnActive(size_t id){ ///\todo Add tracing of earliest watched keys, to prevent data going out of memory for /// still-watching viewers - if (users.getStatus(id) != COMM_STATUS_DISCONNECT && users.getStatus(id) & COMM_STATUS_SOURCE){ + if (!(users.getStatus(id) & COMM_STATUS_DISCONNECT) && (users.getStatus(id) & COMM_STATUS_SOURCE)){ sourcePids[users.getPid(id)].insert(users.getTrack(id)); // GeneratePids holds the pids of the process that generate data, so ignore those for determining if a push is ingested. if (M.trackValid(users.getTrack(id)) && !generatePids.count(users.getPid(id))){hasPush = true;} diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp index c14543ba..ce48c0ef 100644 --- a/src/input/input_rtsp.cpp +++ b/src/input/input_rtsp.cpp @@ -212,7 +212,7 @@ namespace Mist{ // Connect to stats for INPUT detection statComm.reload(); if (statComm){ - if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ + if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; @@ -403,7 +403,7 @@ namespace Mist{ WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx); userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } - if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ + if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){ Util::logExitReason("buffer requested shutdown"); tcpCon.close(); } diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index c32c13cf..28ccc825 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -146,7 +146,7 @@ void parseThread(void *mistIn){ threadTimer.erase(tid); } liveStream.eraseTrack(tid); - if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT);} + if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT | userConn.getStatus());} } namespace Mist{ diff --git a/src/output/output.cpp b/src/output/output.cpp index e8771cc8..c8db21e0 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1365,7 +1365,7 @@ namespace Mist{ /*LTS-END*/ stats(true); - if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);} + if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT | statComm.getStatus());} userSelect.clear(); @@ -1676,7 +1676,7 @@ namespace Mist{ HIGH_MSG("Writing stats: %s, %s, %u, %lu, %lu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown()); /*LTS-START*/ - if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ + if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ onFail("Shutting down on controller request"); return; } @@ -1709,7 +1709,7 @@ namespace Mist{ if (isPushing()){ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - if (it->second.getStatus() == COMM_STATUS_REQDISCONNECT){ + if (it->second.getStatus() & COMM_STATUS_REQDISCONNECT){ if (dropPushTrack(it->second.getTrack(), "disconnect request from buffer")){break;} } if (!it->second){ diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index c6d8c802..8a1c3bd1 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -227,7 +227,7 @@ namespace Mist{ streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str()); streamName = H.GetVar("stream"); userSelect.clear(); - if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);} + if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT | statComm.getStatus());} reConnector(handler); onFail("Server error - could not start connector", true); return; diff --git a/src/output/output_rtsp.cpp b/src/output/output_rtsp.cpp index 433a219d..4acd29fc 100644 --- a/src/output/output_rtsp.cpp +++ b/src/output/output_rtsp.cpp @@ -361,7 +361,7 @@ namespace Mist{ HTTP_S.SetHeader("Cache-Control", "no-cache"); if (trackNo != INVALID_TRACK_ID){ userSelect[trackNo].reload(streamName, trackNo); - if (isPushing()){userSelect[trackNo].setStatus(COMM_STATUS_SOURCE);} + if (isPushing()){userSelect[trackNo].setStatus(COMM_STATUS_SOURCE | userSelect[trackNo].getStatus());} SDP::Track &sdpTrack = sdpState.tracks[trackNo]; if (sdpTrack.channel != -1){expectTCP = true;} HTTP_S.SetHeader("Transport", sdpTrack.transportString); diff --git a/src/utils/util_nuke.cpp b/src/utils/util_nuke.cpp index 1358783b..c4fbc944 100644 --- a/src/utils/util_nuke.cpp +++ b/src/utils/util_nuke.cpp @@ -138,7 +138,7 @@ int main(int argc, char **argv){ for (size_t i = 0; i < cleanUsers.recordCount(); ++i){ uint8_t status = cleanUsers.getStatus(i); cleanUsers.setStatus(COMM_STATUS_INVALID, i); - if (status != COMM_STATUS_INVALID && status != COMM_STATUS_DISCONNECT){ + if (status != COMM_STATUS_INVALID && !(status & COMM_STATUS_DISCONNECT)){ pid_t pid = cleanUsers.getPid(i); if (pid > 1){ Util::Procs::Stop(pid);