From 7297336e468766f89eb22bf100005ddeafcf8a11 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 30 Oct 2020 22:38:29 +0100 Subject: [PATCH] Rewrite of Comms --- lib/comms.cpp | 319 +++++++++-------------- lib/comms.h | 70 ++--- lib/defines.h | 2 +- src/controller/controller_statistics.cpp | 5 +- src/input/input.cpp | 28 +- src/input/input_buffer.cpp | 8 +- src/input/input_dtsc.cpp | 2 +- src/input/input_rtsp.cpp | 5 +- src/input/input_ts.cpp | 10 +- src/output/output.cpp | 10 +- src/output/output_cmaf.cpp | 7 +- src/output/output_rtmp.cpp | 2 +- src/output/output_webrtc.cpp | 4 +- src/utils/util_nuke.cpp | 4 +- 14 files changed, 173 insertions(+), 303 deletions(-) diff --git a/lib/comms.cpp b/lib/comms.cpp index 5bfcaa72..21dfc810 100644 --- a/lib/comms.cpp +++ b/lib/comms.cpp @@ -27,35 +27,23 @@ namespace Comms{ sem.close(); } - void Comms::addCommonFields(){ + void Comms::addFields(){ dataAccX.addField("status", RAX_UINT); - dataAccX.addField("command", RAX_64UINT); - dataAccX.addField("timer", RAX_UINT); - dataAccX.addField("pid", RAX_32UINT); - dataAccX.addField("killtime", RAX_64UINT); + dataAccX.addField("pid", RAX_64UINT); } - void Comms::commonFieldAccess(){ + void Comms::nullFields(){ + setPid(getpid()); + } + + void Comms::fieldAccess(){ status = dataAccX.getFieldAccX("status"); - command = dataAccX.getFieldAccX("command"); - timer = dataAccX.getFieldAccX("timer"); pid = dataAccX.getFieldAccX("pid"); - killTime = dataAccX.getFieldAccX("killtime"); } - size_t Comms::firstValid() const{ - if (!master){return index;} - return dataAccX.getStartPos(); - } - - size_t Comms::endValid() const{ + size_t Comms::recordCount() const{ if (!master){return index + 1;} - return dataAccX.getEndPos(); - } - - void Comms::deleteFirst(){ - if (!master){return;} - dataAccX.deleteRecords(1); + return dataAccX.getRCount(); } uint8_t Comms::getStatus() const{return status.uint(index);} @@ -66,22 +54,6 @@ namespace Comms{ status.set(_status, idx); } - uint64_t Comms::getCommand() const{return command.uint(index);} - uint64_t Comms::getCommand(size_t idx) const{return (master ? command.uint(idx) : 0);} - void Comms::setCommand(uint64_t _cmd){command.set(_cmd, index);} - void Comms::setCommand(uint64_t _cmd, size_t idx){ - if (!master){return;} - command.set(_cmd, idx); - } - - uint8_t Comms::getTimer() const{return timer.uint(index);} - uint8_t Comms::getTimer(size_t idx) const{return (master ? timer.uint(idx) : 0);} - void Comms::setTimer(uint8_t _timer){timer.set(_timer, index);} - void Comms::setTimer(uint8_t _timer, size_t idx){ - if (!master){return;} - timer.set(_timer, idx); - } - uint32_t Comms::getPid() const{return pid.uint(index);} uint32_t Comms::getPid(size_t idx) const{return (master ? pid.uint(idx) : 0);} void Comms::setPid(uint32_t _pid){pid.set(_pid, index);} @@ -90,48 +62,28 @@ namespace Comms{ pid.set(_pid, idx); } - void Comms::kill(size_t idx, bool force){ - if (!master){return;} - if (force){ - Util::Procs::Murder(pid.uint(idx)); // hard kill - status.set(COMM_STATUS_INVALID, idx); - return; - } - uint64_t kTime = killTime.uint(idx); - uint64_t now = Util::bootSecs(); - if (!kTime){ - kTime = now; - killTime.set(kTime, idx); - } - if (now - kTime > 30){ - Util::Procs::Murder(pid.uint(idx)); // hard kill - status.set(COMM_STATUS_INVALID, idx); - }else{ - Util::Procs::Stop(pid.uint(idx)); // soft kill - } - } - void Comms::finishAll(){ if (!master){return;} size_t c = 0; + bool keepGoing = true; do{ - for (size_t i = firstValid(); i < endValid(); i++){ - if (getStatus(i) == COMM_STATUS_INVALID){continue;} + keepGoing = false; + for (size_t i = 0; i < recordCount(); i++){ + if (getStatus(i) == COMM_STATUS_INVALID || getStatus(i) == COMM_STATUS_DISCONNECT){continue;} + uint64_t cPid = getPid(i); + if (cPid > 1){ + Util::Procs::Stop(cPid); // soft kill + keepGoing = true; + } setStatus(COMM_STATUS_REQDISCONNECT, i); } - while (getStatus(firstValid()) == COMM_STATUS_INVALID){deleteFirst();} - }while (firstValid() < endValid() && ++c < 10); + if (keepGoing){Util::sleep(250);} + }while (keepGoing && ++c < 8); } - void Comms::keepAlive(){ - if (isAlive()){setTimer(0);} - } - - bool Comms::isAlive() const{ - if (!*this){return false;} - if (getStatus() == COMM_STATUS_INVALID){return false;} - if (getStatus() == COMM_STATUS_DISCONNECT){return false;} - return getTimer() < 126; + Comms::operator bool() const{ + if (master){return dataPage;} + return dataPage && (getStatus() != COMM_STATUS_INVALID) && (getStatus() != COMM_STATUS_DISCONNECT); } void Comms::setMaster(bool _master){ @@ -139,6 +91,54 @@ namespace Comms{ dataPage.master = _master; } + void Comms::reload(const std::string & prefix, size_t baseSize, bool _master, bool reIssue){ + master = _master; + if (!currentSize){currentSize = baseSize;} + + if (master){ + dataPage.init(prefix, currentSize, false, false); + if (dataPage){ + dataPage.master = true; + dataAccX = Util::RelAccX(dataPage.mapped); + fieldAccess(); + }else{ + dataPage.init(prefix, currentSize, true); + dataAccX = Util::RelAccX(dataPage.mapped, false); + addFields(); + fieldAccess(); + size_t reqCount = (currentSize - dataAccX.getOffset()) / dataAccX.getRSize(); + dataAccX.setRCount(reqCount); + dataAccX.setPresent(reqCount); + dataAccX.setReady(); + } + return; + } + + dataPage.init(prefix, currentSize, false); + if (!dataPage){ + WARN_MSG("Unable to open page %s", prefix.c_str()); + return; + } + dataAccX = Util::RelAccX(dataPage.mapped); + fieldAccess(); + if (index == INVALID_RECORD_INDEX || reIssue){ + size_t reqCount = dataAccX.getRCount(); + for (index = 0; index < reqCount; ++index){ + if (getStatus() == COMM_STATUS_INVALID){ + IPC::semGuard G(&sem); + if (getStatus() != COMM_STATUS_INVALID){continue;} + nullFields(); + setStatus(COMM_STATUS_ACTIVE); + break; + } + } + if (index >= reqCount){ + FAIL_MSG("Could not register entry on comm page!"); + dataPage.close(); + } + } + } + Statistics::Statistics() : Comms(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);} void Statistics::unload(){ @@ -147,76 +147,39 @@ namespace Comms{ } void Statistics::reload(bool _master, bool reIssue){ - master = _master; - bool setFields = true; + Comms::reload(COMMS_STATISTICS, COMMS_STATISTICS_INITSIZE, _master, reIssue); + } - if (!currentSize){currentSize = COMMS_STATISTICS_INITSIZE;} - dataPage.init(COMMS_STATISTICS, currentSize, false, false); - if (master){ - if (dataPage.mapped){ - setFields = false; - dataPage.master = true; - }else{ - dataPage.init(COMMS_STATISTICS, currentSize, true); - } - } - if (!dataPage.mapped){ - FAIL_MSG("Unable to open page " COMMS_STATISTICS); - return; - } + void Statistics::addFields(){ + Comms::addFields(); + dataAccX.addField("sync", RAX_UINT); + dataAccX.addField("now", RAX_64UINT); + dataAccX.addField("time", RAX_64UINT); + dataAccX.addField("lastsecond", RAX_64UINT); + dataAccX.addField("down", RAX_64UINT); + dataAccX.addField("up", RAX_64UINT); + dataAccX.addField("host", RAX_RAW, 16); + dataAccX.addField("stream", RAX_STRING, 100); + dataAccX.addField("connector", RAX_STRING, 20); + dataAccX.addField("crc", RAX_32UINT); + } - if (master){ - dataAccX = Util::RelAccX(dataPage.mapped, false); - if (setFields){ - addCommonFields(); - - dataAccX.addField("sync", RAX_UINT); - dataAccX.addField("now", RAX_64UINT); - dataAccX.addField("time", RAX_64UINT); - dataAccX.addField("lastsecond", RAX_64UINT); - dataAccX.addField("down", RAX_64UINT); - dataAccX.addField("up", RAX_64UINT); - dataAccX.addField("host", RAX_RAW, 16); - dataAccX.addField("stream", RAX_STRING, 100); - dataAccX.addField("connector", RAX_STRING, 20); - dataAccX.addField("crc", RAX_32UINT); - - dataAccX.setRCount((currentSize - dataAccX.getOffset()) / dataAccX.getRSize()); - dataAccX.setReady(); - } - - }else{ - dataAccX = Util::RelAccX(dataPage.mapped); - if (index == INVALID_RECORD_INDEX || reIssue){ - sem.wait(); - for (index = 0; index < dataAccX.getEndPos(); ++index){ - if (dataAccX.getInt("status", index) == COMM_STATUS_INVALID){ - // Reverse! clear entry and claim it. - dataAccX.setInt("crc", 0, index); - dataAccX.setString("connector", "", index); - dataAccX.setString("stream", "", index); - dataAccX.setString("host", "", index); - dataAccX.setInt("up", 0, index); - dataAccX.setInt("down", 0, index); - dataAccX.setInt("lastsecond", 0, index); - dataAccX.setInt("time", 0, index); - dataAccX.setInt("now", 0, index); - dataAccX.setInt("sync", 0, index); - dataAccX.setInt("killtime", 0, index); - dataAccX.setInt("pid", 0, index); - dataAccX.setInt("timer", 0, index); - dataAccX.setInt("command", 0, index); - dataAccX.setInt("status", 0, index); - break; - } - } - if (index == dataAccX.getEndPos()){dataAccX.addRecords(1);} - sem.post(); - } - } - - commonFieldAccess(); + void Statistics::nullFields(){ + Comms::nullFields(); + setCRC(0); + setConnector(""); + setStream(""); + setHost(""); + setUp(0); + setDown(0); + setLastSecond(0); + setTime(0); + setNow(0); + setSync(0); + } + void Statistics::fieldAccess(){ + Comms::fieldAccess(); sync = dataAccX.getFieldAccX("sync"); now = dataAccX.getFieldAccX("now"); time = dataAccX.getFieldAccX("time"); @@ -336,7 +299,7 @@ namespace Comms{ Users::Users() : Comms(){} Users::Users(const Users &rhs) : Comms(){ - if (rhs && rhs.isAlive()){ + if (rhs){ reload(rhs.streamName, (size_t)rhs.getTrack()); if (*this){ setKeyNum(rhs.getKeyNum()); @@ -352,79 +315,35 @@ namespace Comms{ snprintf(semName, NAME_BUFFER_SIZE, SEM_USERS, streamName.c_str()); sem.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - master = _master; - - if (!currentSize){currentSize = COMMS_USERS_INITSIZE;} - char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_USERS, streamName.c_str()); - bool newPage = false; - if (master){ - dataPage.init(userPageName, currentSize, false, false); - if (dataPage){ - dataPage.master = true; - }else{ - dataPage.init(userPageName, currentSize, true); - newPage = true; - } - }else{ - dataPage.init(userPageName, currentSize, false); - } - if (!dataPage.mapped){ - HIGH_MSG("Unable to open page %s", userPageName); - return; - } + Comms::reload(userPageName, COMMS_USERS_INITSIZE, _master, reIssue); + } + + void Users::addFields(){ + Comms::addFields(); + dataAccX.addField("track", RAX_64UINT); + dataAccX.addField("keynum", RAX_64UINT); + } - if (master){ - if (newPage){ - dataAccX = Util::RelAccX(dataPage.mapped, false); - addCommonFields(); - - dataAccX.addField("track", RAX_32UINT); - dataAccX.addField("keynum", RAX_32UINT); - - dataAccX.setRCount((currentSize - dataAccX.getOffset()) / dataAccX.getRSize()); - dataAccX.setReady(); - }else{ - dataAccX = Util::RelAccX(dataPage.mapped); - } - - }else{ - dataAccX = Util::RelAccX(dataPage.mapped); - if (index == INVALID_RECORD_INDEX || reIssue){ - sem.wait(); - - for (index = 0; index < dataAccX.getEndPos(); ++index){ - if (dataAccX.getInt("status", index) == COMM_STATUS_INVALID){ - // Reverse! clear entry and claim it. - dataAccX.setInt("keynum", 0, index); - dataAccX.setInt("track", 0, index); - dataAccX.setInt("killtime", 0, index); - dataAccX.setInt("pid", 0, index); - dataAccX.setInt("timer", 0, index); - dataAccX.setInt("command", 0, index); - dataAccX.setInt("status", 0, index); - break; - } - } - if (index == dataAccX.getEndPos()){dataAccX.addRecords(1);} - sem.post(); - } - } - - commonFieldAccess(); + void Users::nullFields(){ + Comms::nullFields(); + setTrack(0); + setKeyNum(0); + } + void Users::fieldAccess(){ + Comms::fieldAccess(); track = dataAccX.getFieldAccX("track"); keyNum = dataAccX.getFieldAccX("keynum"); - - setPid(getpid()); } void Users::reload(const std::string &_streamName, size_t idx, uint8_t initialState){ reload(_streamName); - if (dataPage.mapped){ + if (dataPage){ setTrack(idx); + setKeyNum(0); setStatus(initialState); } } diff --git a/lib/comms.h b/lib/comms.h index 3235088f..7e863b2e 100644 --- a/lib/comms.h +++ b/lib/comms.h @@ -7,25 +7,21 @@ #define COMM_STATUS_SOURCE 0x80 #define COMM_STATUS_REQDISCONNECT 0xFD #define COMM_STATUS_DISCONNECT 0xFE -#define COMM_STATUS_INVALID 0xFF +#define COMM_STATUS_INVALID 0x0 +#define COMM_STATUS_ACTIVE 0x1 -#define COMM_LOOP(comm, onActive, onDisconnect) \ + +#define COMM_LOOP(comm, onActive, onDisconnect) \ {\ - for (size_t id = comm.firstValid(); id != comm.endValid(); id++){\ + for (size_t id = 0; id < comm.recordCount(); id++){\ if (comm.getStatus(id) == COMM_STATUS_INVALID){continue;}\ - onActive; \ if (!Util::Procs::isRunning(comm.getPid(id))){\ - comm.setStatus(COMM_STATUS_DISCONNECT, id); \ + comm.setStatus(COMM_STATUS_DISCONNECT, id);\ }\ - if ((comm.getTimer(id) & 0x7F) >= 126 || comm.getStatus(id) == COMM_STATUS_DISCONNECT){\ - onDisconnect; \ - comm.setStatus(COMM_STATUS_INVALID, id); \ - }\ - if ((comm.getTimer(id) & 0x7F) <= 124){\ - if ((comm.getTimer(id) & 0x7F) == 124){\ - HIGH_MSG("Timeout occurred for entry %zu, ignoring further timeout", id); \ - }\ - comm.setTimer(comm.getTimer(id) + 1, id); \ + onActive;\ + if (comm.getStatus(id) == COMM_STATUS_DISCONNECT){\ + onDisconnect;\ + comm.setStatus(COMM_STATUS_INVALID, id);\ }\ }\ } @@ -35,63 +31,37 @@ namespace Comms{ public: Comms(); ~Comms(); - - operator bool() const{return dataPage.mapped;} - - void addCommonFields(); - void commonFieldAccess(); - - size_t firstValid() const; - size_t endValid() const; - void deleteFirst(); - + operator bool() const; + void reload(const std::string & prefix, size_t baseSize, bool _master = false, bool reIssue = false); + virtual void addFields(); + virtual void nullFields(); + virtual void fieldAccess(); + size_t recordCount() const; uint8_t getStatus() const; uint8_t getStatus(size_t idx) const; void setStatus(uint8_t _status); void setStatus(uint8_t _status, size_t idx); - uint64_t getCommand() const; uint64_t getCommand(size_t idx) const; void setCommand(uint64_t _cmd); void setCommand(uint64_t _cmd, size_t idx); - - uint8_t getTimer() const; - uint8_t getTimer(size_t idx) const; - void setTimer(uint8_t _timer); - void setTimer(uint8_t _timer, size_t idx); - uint32_t getPid() const; uint32_t getPid(size_t idx) const; void setPid(uint32_t _pid); void setPid(uint32_t _pid, size_t idx); - - void kill(size_t idx, bool force = false); - void finishAll(); - - void keepAlive(); - bool isAlive() const; - void setMaster(bool _master); - const std::string &pageName() const{return dataPage.name;} protected: bool master; size_t index; - size_t currentSize; - IPC::semaphore sem; - IPC::sharedPage dataPage; Util::RelAccX dataAccX; - Util::FieldAccX status; - Util::FieldAccX command; - Util::FieldAccX timer; Util::FieldAccX pid; - Util::FieldAccX killTime; }; class Statistics : public Comms{ @@ -100,6 +70,9 @@ namespace Comms{ operator bool() const{return dataPage.mapped && (master || index != INVALID_RECORD_INDEX);} void unload(); void reload(bool _master = false, bool reIssue = false); + virtual void addFields(); + virtual void nullFields(); + virtual void fieldAccess(); uint8_t getSync() const; uint8_t getSync(size_t idx) const; @@ -172,7 +145,10 @@ namespace Comms{ Users(); Users(const Users &rhs); void reload(const std::string &_streamName = "", bool _master = false, bool reIssue = false); - void reload(const std::string &_streamName, size_t track, uint8_t initialState = 0x00); + void reload(const std::string &_streamName, size_t track, uint8_t initialState = COMM_STATUS_ACTIVE); + virtual void addFields(); + virtual void nullFields(); + virtual void fieldAccess(); operator bool() const{return dataPage.mapped;} diff --git a/lib/defines.h b/lib/defines.h index f2dfe10d..f686bc26 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -198,7 +198,7 @@ static inline void show_stackframe(){} #define COMMS_STATISTICS_INITSIZE 8 * 1024 * 1024 #define COMMS_USERS "MstUser%s" //%s stream name -#define COMMS_USERS_INITSIZE 8 * 1024 * 1024 +#define COMMS_USERS_INITSIZE 512 * 1024 #define SEM_STATISTICS "/MstStat" #define SEM_USERS "/MstUser%s" //%s stream name diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 8abd6254..23f59f8d 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -490,10 +490,7 @@ void Controller::SharedMemStats(void *config){ }else{/*LTS-START*/ if (Controller::killOnExit){ WARN_MSG("Killing all connected clients to force full shutdown"); - for (uint32_t id = statComm.firstValid(); id != statComm.endValid(); id++){ - if (statComm.getStatus(id) == COMM_STATUS_INVALID){continue;} - statComm.kill(id, true); - } + statComm.finishAll(); } /*LTS-END*/ } diff --git a/src/input/input.cpp b/src/input/input.cpp index fd39e160..4ce51d33 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -857,15 +857,14 @@ namespace Mist{ tid = thisPacket.getTrackId(); idx = M.trackIDToIndex(tid, getpid()); if (thisPacket && !userSelect.count(idx)){ - userSelect[idx].reload(streamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } - while (thisPacket && config->is_active && userSelect[idx].isAlive()){ + while (thisPacket && config->is_active && userSelect[idx]){ if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ Util::logExitReason("buffer requested shutdown"); break; } bufferLivePacket(thisPacket); - userSelect[idx].keepAlive(); getNext(); if (!thisPacket){ Util::logExitReason("invalid packet from getNext"); @@ -874,14 +873,14 @@ namespace Mist{ tid = thisPacket.getTrackId(); idx = M.trackIDToIndex(tid, getpid()); if (thisPacket && !userSelect.count(idx)){ - userSelect[idx].reload(streamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection if (!statComm){statComm.reload();} if (statComm){ - if (!statComm.isAlive()){ + if (!statComm){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; @@ -896,7 +895,6 @@ namespace Mist{ statComm.setTime(now - startTime); statComm.setLastSecond(0); statComm.setHost(getConnectedBinHost()); - statComm.keepAlive(); } statTimer = Util::bootSecs(); @@ -911,33 +909,31 @@ namespace Mist{ getNext(); if (thisPacket && !userSelect.count(thisPacket.getTrackId())){ size_t tid = thisPacket.getTrackId(); - userSelect[tid].reload(streamName, tid, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[tid].reload(streamName, tid, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } - while (thisPacket && config->is_active && userSelect[thisPacket.getTrackId()].isAlive()){ + while (thisPacket && config->is_active && userSelect[thisPacket.getTrackId()]){ thisPacket.nullMember("bpos"); - while (config->is_active && userSelect[thisPacket.getTrackId()].isAlive() && + while (config->is_active && userSelect[thisPacket.getTrackId()] && Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset) + simStartTime){ Util::sleep(std::min(((thisPacket.getTime() + timeOffset) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER), (uint64_t)1000)); - userSelect[thisPacket.getTrackId()].keepAlive(); } uint64_t originalTime = thisPacket.getTime(); thisPacket.setTime(originalTime + timeOffset); bufferLivePacket(thisPacket); thisPacket.setTime(originalTime); - userSelect[thisPacket.getTrackId()].keepAlive(); getNext(); if (thisPacket && !userSelect.count(thisPacket.getTrackId())){ size_t tid = thisPacket.getTrackId(); - userSelect[tid].reload(streamName, tid, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[tid].reload(streamName, tid, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection if (!statComm){statComm.reload();} if (statComm){ - if (!statComm.isAlive()){ + if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; @@ -952,7 +948,6 @@ namespace Mist{ statComm.setTime(now - startTime); statComm.setLastSecond(0); statComm.setHost(getConnectedBinHost()); - statComm.keepAlive(); } statTimer = Util::bootSecs(); @@ -961,7 +956,7 @@ namespace Mist{ if (!thisPacket){ Util::logExitReason("invalid packet from getNext"); } - if (thisPacket && !userSelect[thisPacket.getTrackId()].isAlive()){ + if (thisPacket && !userSelect[thisPacket.getTrackId()]){ Util::logExitReason("buffer shutdown"); } } @@ -1411,8 +1406,7 @@ namespace Mist{ bool isAlive = false; for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - if (it->second.isAlive()){isAlive = true;} - it->second.keepAlive(); + if (it->second){isAlive = true;} } return isAlive && config->is_active; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 15e27066..01835d5c 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -155,9 +155,7 @@ namespace Mist{ { Comms::Users cleanUsers; cleanUsers.reload(streamName); - for (size_t i = cleanUsers.firstValid(); i < cleanUsers.endValid(); ++i){ - cleanUsers.setStatus(COMM_STATUS_INVALID, i); - } + cleanUsers.finishAll(); cleanUsers.setMaster(true); } // Delete the live stream semaphore, if any. @@ -389,8 +387,8 @@ namespace Mist{ } void inputBuffer::removeTrack(size_t tid){ - size_t lastUser = users.endValid(); - for (size_t i = users.firstValid(); i < lastUser; ++i){ + size_t lastUser = users.recordCount(); + for (size_t i = 0; i < lastUser; ++i){ if (users.getStatus(i) == COMM_STATUS_INVALID){continue;} if (!(users.getStatus(i) & COMM_STATUS_SOURCE)){continue;} if (users.getTrack(i) != tid){continue;} diff --git a/src/input/input_dtsc.cpp b/src/input/input_dtsc.cpp index 864ed909..3cf184d8 100644 --- a/src/input/input_dtsc.cpp +++ b/src/input/input_dtsc.cpp @@ -169,7 +169,7 @@ namespace Mist{ std::set validTracks = M.getMySourceTracks(getpid()); userSelect.clear(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - userSelect[*it].reload(streamName, *it, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[*it].reload(streamName, *it, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } break; } diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp index 6ab3eb6a..c14543ba 100644 --- a/src/input/input_rtsp.cpp +++ b/src/input/input_rtsp.cpp @@ -212,7 +212,7 @@ namespace Mist{ // Connect to stats for INPUT detection statComm.reload(); if (statComm){ - if (!statComm.isAlive()){ + if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; @@ -227,7 +227,6 @@ namespace Mist{ statComm.setTime(now - startTime); statComm.setLastSecond(0); statComm.setHost(getConnectedBinHost()); - statComm.keepAlive(); } } } @@ -402,7 +401,7 @@ namespace Mist{ }else{ if (!userSelect.count(idx)){ WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx); - userSelect[idx].reload(streamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ Util::logExitReason("buffer requested shutdown"); diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 266d4033..c32c13cf 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -58,8 +58,7 @@ void parseThread(void *mistIn){ threadTimer[tid] = Util::bootSecs(); } while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && - (!dataTrack || (userConn ? userConn.isAlive() : true))){ - if (dataTrack){userConn.keepAlive();} + (!dataTrack || (userConn ? userConn : true))){ liveStream.parse(tid); if (!liveStream.hasPacket(tid)){ Util::sleep(100); @@ -95,7 +94,7 @@ void parseThread(void *mistIn){ idx = meta.trackIDToIndex(tid, getpid()); if (idx != INVALID_TRACK_ID){ //Successfully assigned a track index! Inform the buffer we're pushing - userConn.reload(globalStreamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userConn.reload(globalStreamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } //Any kind of failure? Retry later. if (idx == INVALID_TRACK_ID || !meta.trackValid(idx)){ @@ -137,7 +136,7 @@ void parseThread(void *mistIn){ std::string reason = "unknown reason"; if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} if (!cfgPointer->is_active){reason = "input shutting down";} - if (!(!liveStream.isDataTrack(tid) || userConn.isAlive())){ + if (!(!liveStream.isDataTrack(tid) || userConn)){ reason = "buffer disconnect"; cfgPointer->is_active = false; } @@ -588,7 +587,7 @@ namespace Mist{ // Connect to stats for INPUT detection statComm.reload(); if (statComm){ - if (!statComm.isAlive()){ + if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; @@ -603,7 +602,6 @@ namespace Mist{ statComm.setTime(now - startTime); statComm.setLastSecond(0); statComm.setHost(getConnectedBinHost()); - statComm.keepAlive(); } std::set activeTracks = liveStream.getActiveTracks(); diff --git a/src/output/output.cpp b/src/output/output.cpp index 9ee0305b..e8771cc8 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1605,7 +1605,7 @@ namespace Mist{ } emptyCount = 0; // valid packet - reset empty counter - if (!userSelect[nxt.tid].isAlive()){ + if (!userSelect[nxt.tid]){ INFO_MSG("Track %zu is not alive!", nxt.tid); return false; } @@ -1691,7 +1691,6 @@ namespace Mist{ statComm.setTime(now - myConn.connTime()); statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0); statComm.setPid(getpid()); - statComm.keepAlive(); /*LTS-START*/ // Tag the session with the user agent @@ -1710,21 +1709,16 @@ namespace Mist{ if (isPushing()){ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - it->second.keepAlive(); if (it->second.getStatus() == COMM_STATUS_REQDISCONNECT){ if (dropPushTrack(it->second.getTrack(), "disconnect request from buffer")){break;} } - if (!it->second.isAlive()){ + if (!it->second){ if (dropPushTrack(it->second.getTrack(), "track mapping no longer valid")){break;} } //if (Util::bootSecs() - M.getLastUpdated(it->first) > 5){ // if (dropPushTrack(it->second.getTrack(), "track updates being ignored by buffer")){break;} //} } - }else{ - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - it->second.keepAlive(); - } } } diff --git a/src/output/output_cmaf.cpp b/src/output/output_cmaf.cpp index d4408a2b..4c48fd01 100644 --- a/src/output/output_cmaf.cpp +++ b/src/output/output_cmaf.cpp @@ -811,12 +811,7 @@ namespace Mist{ } Util::wait(100); //Make sure we don't accidentally timeout while waiting - runs approximately every second. - if (i % 10 == 0){ - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ - it->second.keepAlive(); - stats(); - } - } + if (i % 10 == 0){stats();} } return (keys.getEndValid() > currentKey + 1 && M.getLastms(thisIdx) > M.getTimeForKeyIndex(getMainSelectedTrack(), currentKey+1)); } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 78709c20..5c26f5a0 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -1406,7 +1406,7 @@ namespace Mist{ } uint64_t idx = reTrackToID[reTrack]; if (idx != INVALID_TRACK_ID && !userSelect.count(idx)){ - userSelect[idx].reload(streamName, idx, COMM_STATUS_SOURCE); + userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE); } if (M.getCodec(idx) == "PCM" && M.getSize(idx) == 16){ char *ptr = F.getData(); diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index f2498576..6e1d2846 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -812,7 +812,7 @@ namespace Mist{ videoTrack.rtpToDTSC.setCallbacks(onDTSCConverterHasPacketCallback, onDTSCConverterHasInitDataCallback); videoTrack.sorter.setCallback(M.getID(vIdx), onRTPSorterHasPacketCallback); - userSelect[vIdx].reload(streamName, vIdx, COMM_STATUS_SOURCE); + userSelect[vIdx].reload(streamName, vIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE); INFO_MSG("Video push received on track %zu", vIdx); } @@ -834,7 +834,7 @@ namespace Mist{ audioTrack.rtpToDTSC.setCallbacks(onDTSCConverterHasPacketCallback, onDTSCConverterHasInitDataCallback); audioTrack.sorter.setCallback(M.getID(aIdx), onRTPSorterHasPacketCallback); - userSelect[aIdx].reload(streamName, aIdx, COMM_STATUS_SOURCE); + userSelect[aIdx].reload(streamName, aIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE); INFO_MSG("Audio push received on track %zu", aIdx); } diff --git a/src/utils/util_nuke.cpp b/src/utils/util_nuke.cpp index c338f6a3..1358783b 100644 --- a/src/utils/util_nuke.cpp +++ b/src/utils/util_nuke.cpp @@ -135,10 +135,10 @@ int main(int argc, char **argv){ Comms::Users cleanUsers; cleanUsers.reload(Util::streamName, true); std::set checkPids; - for (size_t i = cleanUsers.firstValid(); i < cleanUsers.endValid(); ++i){ + for (size_t i = 0; i < cleanUsers.recordCount(); ++i){ uint8_t status = cleanUsers.getStatus(i); cleanUsers.setStatus(COMM_STATUS_INVALID, i); - if (status != COMM_STATUS_INVALID && status != COMM_STATUS_DISCONNECT && cleanUsers.getTimer(i) < 126){ + if (status != COMM_STATUS_INVALID && status != COMM_STATUS_DISCONNECT){ pid_t pid = cleanUsers.getPid(i); if (pid > 1){ Util::Procs::Stop(pid);