Converted comms system entirely to being bitflag-based instead of integer state based

This commit is contained in:
Thulinma 2020-12-04 16:26:57 +01:00
parent 6e316663fc
commit 974380ab30
10 changed files with 28 additions and 24 deletions

View file

@ -16,7 +16,9 @@ namespace Comms{
} }
Comms::~Comms(){ Comms::~Comms(){
if (index != INVALID_RECORD_INDEX){setStatus(COMM_STATUS_DISCONNECT);} if (index != INVALID_RECORD_INDEX){
setStatus(COMM_STATUS_DISCONNECT | getStatus());
}
if (master){ if (master){
if (dataPage.mapped){ if (dataPage.mapped){
finishAll(); finishAll();
@ -69,13 +71,13 @@ namespace Comms{
do{ do{
keepGoing = false; keepGoing = false;
for (size_t i = 0; i < recordCount(); i++){ for (size_t i = 0; i < recordCount(); i++){
if (getStatus(i) == COMM_STATUS_INVALID || getStatus(i) == COMM_STATUS_DISCONNECT){continue;} if (getStatus(i) == COMM_STATUS_INVALID || (getStatus(i) & COMM_STATUS_DISCONNECT)){continue;}
uint64_t cPid = getPid(i); uint64_t cPid = getPid(i);
if (cPid > 1){ if (cPid > 1){
Util::Procs::Stop(cPid); // soft kill Util::Procs::Stop(cPid); // soft kill
keepGoing = true; keepGoing = true;
} }
setStatus(COMM_STATUS_REQDISCONNECT, i); setStatus(COMM_STATUS_REQDISCONNECT | getStatus(i), i);
} }
if (keepGoing){Util::sleep(250);} if (keepGoing){Util::sleep(250);}
}while (keepGoing && ++c < 8); }while (keepGoing && ++c < 8);
@ -83,7 +85,7 @@ namespace Comms{
Comms::operator bool() const{ Comms::operator bool() const{
if (master){return dataPage;} if (master){return dataPage;}
return dataPage && (getStatus() != COMM_STATUS_INVALID) && (getStatus() != COMM_STATUS_DISCONNECT); return dataPage && (getStatus() != COMM_STATUS_INVALID) && !(getStatus() & COMM_STATUS_DISCONNECT);
} }
void Comms::setMaster(bool _master){ void Comms::setMaster(bool _master){
@ -142,7 +144,9 @@ namespace Comms{
Statistics::Statistics() : Comms(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);} Statistics::Statistics() : Comms(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);}
void Statistics::unload(){ void Statistics::unload(){
if (index != INVALID_RECORD_INDEX){setStatus(COMM_STATUS_DISCONNECT);} if (index != INVALID_RECORD_INDEX){
setStatus(COMM_STATUS_DISCONNECT | getStatus());
}
index = INVALID_RECORD_INDEX; index = INVALID_RECORD_INDEX;
} }

View file

@ -3,12 +3,12 @@
#include "shared_memory.h" #include "shared_memory.h"
#include "util.h" #include "util.h"
#define COMM_STATUS_DONOTTRACK 0x40
#define COMM_STATUS_SOURCE 0x80 #define COMM_STATUS_SOURCE 0x80
#define COMM_STATUS_REQDISCONNECT 0xFD #define COMM_STATUS_DONOTTRACK 0x40
#define COMM_STATUS_DISCONNECT 0xFE #define COMM_STATUS_DISCONNECT 0x20
#define COMM_STATUS_INVALID 0x0 #define COMM_STATUS_REQDISCONNECT 0x10
#define COMM_STATUS_ACTIVE 0x1 #define COMM_STATUS_ACTIVE 0x1
#define COMM_STATUS_INVALID 0x0
#define COMM_LOOP(comm, onActive, onDisconnect) \ #define COMM_LOOP(comm, onActive, onDisconnect) \
@ -16,10 +16,10 @@
for (size_t id = 0; id < comm.recordCount(); id++){\ for (size_t id = 0; id < comm.recordCount(); id++){\
if (comm.getStatus(id) == COMM_STATUS_INVALID){continue;}\ if (comm.getStatus(id) == COMM_STATUS_INVALID){continue;}\
if (!Util::Procs::isRunning(comm.getPid(id))){\ if (!Util::Procs::isRunning(comm.getPid(id))){\
comm.setStatus(COMM_STATUS_DISCONNECT, id);\ comm.setStatus(COMM_STATUS_DISCONNECT | comm.getStatus(id), id);\
}\ }\
onActive;\ onActive;\
if (comm.getStatus(id) == COMM_STATUS_DISCONNECT){\ if (comm.getStatus(id) & COMM_STATUS_DISCONNECT){\
onDisconnect;\ onDisconnect;\
comm.setStatus(COMM_STATUS_INVALID, id);\ comm.setStatus(COMM_STATUS_INVALID, id);\
}\ }\

View file

@ -860,7 +860,7 @@ namespace Mist{
userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
} }
while (thisPacket && config->is_active && userSelect[idx]){ while (thisPacket && config->is_active && userSelect[idx]){
if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown"); Util::logExitReason("buffer requested shutdown");
break; break;
} }
@ -933,7 +933,7 @@ namespace Mist{
// Connect to stats for INPUT detection // Connect to stats for INPUT detection
if (!statComm){statComm.reload();} if (!statComm){statComm.reload();}
if (statComm){ if (statComm){
if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
config->is_active = false; config->is_active = false;
Util::logExitReason("received shutdown request from controller"); Util::logExitReason("received shutdown request from controller");
return; return;

View file

@ -393,7 +393,7 @@ namespace Mist{
if (!(users.getStatus(i) & COMM_STATUS_SOURCE)){continue;} if (!(users.getStatus(i) & COMM_STATUS_SOURCE)){continue;}
if (users.getTrack(i) != tid){continue;} if (users.getTrack(i) != tid){continue;}
// We have found the right track here (pid matches, and COMM_STATUS_SOURCE set) // We have found the right track here (pid matches, and COMM_STATUS_SOURCE set)
users.setStatus(COMM_STATUS_REQDISCONNECT, i); users.setStatus(COMM_STATUS_REQDISCONNECT | users.getStatus(i), i);
break; break;
} }
@ -518,7 +518,7 @@ namespace Mist{
void inputBuffer::userOnActive(size_t id){ void inputBuffer::userOnActive(size_t id){
///\todo Add tracing of earliest watched keys, to prevent data going out of memory for ///\todo Add tracing of earliest watched keys, to prevent data going out of memory for
/// still-watching viewers /// still-watching viewers
if (users.getStatus(id) != COMM_STATUS_DISCONNECT && users.getStatus(id) & COMM_STATUS_SOURCE){ if (!(users.getStatus(id) & COMM_STATUS_DISCONNECT) && (users.getStatus(id) & COMM_STATUS_SOURCE)){
sourcePids[users.getPid(id)].insert(users.getTrack(id)); sourcePids[users.getPid(id)].insert(users.getTrack(id));
// GeneratePids holds the pids of the process that generate data, so ignore those for determining if a push is ingested. // GeneratePids holds the pids of the process that generate data, so ignore those for determining if a push is ingested.
if (M.trackValid(users.getTrack(id)) && !generatePids.count(users.getPid(id))){hasPush = true;} if (M.trackValid(users.getTrack(id)) && !generatePids.count(users.getPid(id))){hasPush = true;}

View file

@ -212,7 +212,7 @@ namespace Mist{
// Connect to stats for INPUT detection // Connect to stats for INPUT detection
statComm.reload(); statComm.reload();
if (statComm){ if (statComm){
if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
config->is_active = false; config->is_active = false;
Util::logExitReason("received shutdown request from controller"); Util::logExitReason("received shutdown request from controller");
return; return;
@ -403,7 +403,7 @@ namespace Mist{
WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx); WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx);
userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
} }
if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown"); Util::logExitReason("buffer requested shutdown");
tcpCon.close(); tcpCon.close();
} }

View file

@ -146,7 +146,7 @@ void parseThread(void *mistIn){
threadTimer.erase(tid); threadTimer.erase(tid);
} }
liveStream.eraseTrack(tid); liveStream.eraseTrack(tid);
if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT);} if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT | userConn.getStatus());}
} }
namespace Mist{ namespace Mist{

View file

@ -1365,7 +1365,7 @@ namespace Mist{
/*LTS-END*/ /*LTS-END*/
stats(true); stats(true);
if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);} if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT | statComm.getStatus());}
userSelect.clear(); userSelect.clear();
@ -1676,7 +1676,7 @@ namespace Mist{
HIGH_MSG("Writing stats: %s, %s, %u, %lu, %lu", getConnectedHost().c_str(), streamName.c_str(), HIGH_MSG("Writing stats: %s, %s, %u, %lu, %lu", getConnectedHost().c_str(), streamName.c_str(),
crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown()); crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown());
/*LTS-START*/ /*LTS-START*/
if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
onFail("Shutting down on controller request"); onFail("Shutting down on controller request");
return; return;
} }
@ -1709,7 +1709,7 @@ namespace Mist{
if (isPushing()){ if (isPushing()){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){ for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (it->second.getStatus() == COMM_STATUS_REQDISCONNECT){ if (it->second.getStatus() & COMM_STATUS_REQDISCONNECT){
if (dropPushTrack(it->second.getTrack(), "disconnect request from buffer")){break;} if (dropPushTrack(it->second.getTrack(), "disconnect request from buffer")){break;}
} }
if (!it->second){ if (!it->second){

View file

@ -227,7 +227,7 @@ namespace Mist{
streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str()); streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
streamName = H.GetVar("stream"); streamName = H.GetVar("stream");
userSelect.clear(); userSelect.clear();
if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);} if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT | statComm.getStatus());}
reConnector(handler); reConnector(handler);
onFail("Server error - could not start connector", true); onFail("Server error - could not start connector", true);
return; return;

View file

@ -361,7 +361,7 @@ namespace Mist{
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (trackNo != INVALID_TRACK_ID){ if (trackNo != INVALID_TRACK_ID){
userSelect[trackNo].reload(streamName, trackNo); userSelect[trackNo].reload(streamName, trackNo);
if (isPushing()){userSelect[trackNo].setStatus(COMM_STATUS_SOURCE);} if (isPushing()){userSelect[trackNo].setStatus(COMM_STATUS_SOURCE | userSelect[trackNo].getStatus());}
SDP::Track &sdpTrack = sdpState.tracks[trackNo]; SDP::Track &sdpTrack = sdpState.tracks[trackNo];
if (sdpTrack.channel != -1){expectTCP = true;} if (sdpTrack.channel != -1){expectTCP = true;}
HTTP_S.SetHeader("Transport", sdpTrack.transportString); HTTP_S.SetHeader("Transport", sdpTrack.transportString);

View file

@ -138,7 +138,7 @@ int main(int argc, char **argv){
for (size_t i = 0; i < cleanUsers.recordCount(); ++i){ for (size_t i = 0; i < cleanUsers.recordCount(); ++i){
uint8_t status = cleanUsers.getStatus(i); uint8_t status = cleanUsers.getStatus(i);
cleanUsers.setStatus(COMM_STATUS_INVALID, i); cleanUsers.setStatus(COMM_STATUS_INVALID, i);
if (status != COMM_STATUS_INVALID && status != COMM_STATUS_DISCONNECT){ if (status != COMM_STATUS_INVALID && !(status & COMM_STATUS_DISCONNECT)){
pid_t pid = cleanUsers.getPid(i); pid_t pid = cleanUsers.getPid(i);
if (pid > 1){ if (pid > 1){
Util::Procs::Stop(pid); Util::Procs::Stop(pid);