From 0dd602d5ca95e340afc8fe85f6554e253e25accc Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 20 Mar 2018 19:51:50 +0100 Subject: [PATCH] WebSocket API in controller --- lib/defines.h | 3 + src/controller/controller_api.cpp | 164 ++++++++++++++++++++ src/controller/controller_api.h | 3 + src/controller/controller_statistics.cpp | 184 ++++++++++++++++++++++- src/controller/controller_statistics.h | 15 +- src/controller/controller_storage.cpp | 158 +++++++++++++++++-- src/controller/controller_storage.h | 10 +- 7 files changed, 523 insertions(+), 14 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index 5774b6ee..54561908 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -135,6 +135,9 @@ static inline void show_stackframe(){} #define SEM_INPUT "/MstInpt%s" //%s stream name #define SEM_CONF "/MstConfLock" #define SHM_CONF "MstConf" +#define SHM_STATE_LOGS "MstStateLogs" +#define SHM_STATE_ACCS "MstStateAccs" +#define SHM_STATE_STREAMS "MstStateStreams" #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define SIMUL_TRACKS 20 diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 9f4614ea..eb08d40e 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -97,6 +97,156 @@ bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket return false; }//Authorize +class streamStat{ + public: + streamStat(){ + status = 0; + viewers = 0; + inputs = 0; + outputs = 0; + } + streamStat(const Util::RelAccX & rlx, uint64_t entry){ + status = rlx.getInt("status", entry); + viewers = rlx.getInt("viewers", entry); + inputs = rlx.getInt("inputs", entry); + outputs = rlx.getInt("outputs", entry); + } + bool operator ==(const streamStat &b) const{ + return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs); + } + bool operator !=(const streamStat &b) const{ + return !(*this == b); + } + uint8_t status; + uint64_t viewers; + uint64_t inputs; + uint64_t outputs; +}; + +void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){ + std::string logs = H.GetVar("logs"); + std::string accs = H.GetVar("accs"); + bool doStreams = H.GetVar("streams").size(); + HTTP::Websocket W(C, H); + if (!W){return;} + + IPC::sharedPage shmLogs(SHM_STATE_LOGS, 1024*1024); + IPC::sharedPage shmAccs(SHM_STATE_ACCS, 1024*1024); + IPC::sharedPage shmStreams(SHM_STATE_STREAMS, 1024*1024); + Util::RelAccX rlxStreams(shmStreams.mapped); + Util::RelAccX rlxLog(shmLogs.mapped); + Util::RelAccX rlxAccs(shmAccs.mapped); + if (!rlxStreams.isReady()){doStreams = false;} + uint64_t logPos = 0; + bool doLog = false; + uint64_t accsPos = 0; + bool doAccs = false; + if (logs.size() && rlxLog.isReady()){ + doLog = true; + logPos = rlxLog.getEndPos(); + if (logs.substr(0, 6) == "since:"){ + uint64_t startLogs = JSON::Value(logs.substr(6)).asInt(); + logPos = rlxLog.getDeleted(); + while (logPos < rlxLog.getEndPos() && rlxLog.getInt("time", logPos) < startLogs){++logPos;} + }else{ + uint64_t numLogs = JSON::Value(logs).asInt(); + if (logPos <= numLogs){ + logPos = rlxLog.getDeleted(); + }else{ + logPos -= numLogs; + } + } + } + if (accs.size() && rlxAccs.isReady()){ + doAccs = true; + accsPos = rlxAccs.getEndPos(); + if (accs.substr(0, 6) == "since:"){ + uint64_t startAccs = JSON::Value(accs.substr(6)).asInt(); + accsPos = rlxAccs.getDeleted(); + while (accsPos < rlxAccs.getEndPos() && rlxAccs.getInt("time", accsPos) < startAccs){++accsPos;} + }else{ + uint64_t numAccs = JSON::Value(accs).asInt(); + if (accsPos <= numAccs){ + accsPos = rlxAccs.getDeleted(); + }else{ + accsPos -= numAccs; + } + } + } + std::map lastStrmStat; + std::set strmRemove; + while (W){ + bool sent = false; + while (doLog && rlxLog.getEndPos() > logPos){ + sent = true; + JSON::Value tmp; + tmp[0u] = "log"; + tmp[1u].append((long long)rlxLog.getInt("time", logPos)); + tmp[1u].append(rlxLog.getPointer("kind", logPos)); + tmp[1u].append(rlxLog.getPointer("msg", logPos)); + W.sendFrame(tmp.toString()); + logPos++; + } + while (doAccs && rlxAccs.getEndPos() > accsPos){ + sent = true; + JSON::Value tmp; + tmp[0u] = "access"; + tmp[1u].append((long long)rlxAccs.getInt("time", accsPos)); + tmp[1u].append(rlxAccs.getPointer("session", accsPos)); + tmp[1u].append(rlxAccs.getPointer("stream", accsPos)); + tmp[1u].append(rlxAccs.getPointer("connector", accsPos)); + tmp[1u].append(rlxAccs.getPointer("host", accsPos)); + tmp[1u].append((long long)rlxAccs.getInt("duration", accsPos)); + tmp[1u].append((long long)rlxAccs.getInt("up", accsPos)); + tmp[1u].append((long long)rlxAccs.getInt("down", accsPos)); + tmp[1u].append(rlxAccs.getPointer("tags", accsPos)); + W.sendFrame(tmp.toString()); + accsPos++; + } + if (doStreams){ + for (std::map::iterator it = lastStrmStat.begin(); it != lastStrmStat.end(); ++it){ + strmRemove.insert(it->first); + } + uint64_t startPos = rlxStreams.getDeleted(); + uint64_t endPos = rlxStreams.getEndPos(); + for (uint64_t cPos = startPos; cPos < endPos; ++cPos){ + std::string strm = rlxStreams.getPointer("stream", cPos); + strmRemove.erase(strm); + streamStat tmpStat(rlxStreams, cPos); + if (lastStrmStat[strm] != tmpStat){ + lastStrmStat[strm] = tmpStat; + sent = true; + JSON::Value tmp; + tmp[0u] = "stream"; + tmp[1u].append(strm); + tmp[1u].append((long long)tmpStat.status); + tmp[1u].append((long long)tmpStat.viewers); + tmp[1u].append((long long)tmpStat.inputs); + tmp[1u].append((long long)tmpStat.outputs); + W.sendFrame(tmp.toString()); + } + } + while (strmRemove.size()){ + std::string strm = *strmRemove.begin(); + sent = true; + JSON::Value tmp; + tmp[0u] = "stream"; + tmp[1u].append(strm); + tmp[1u].append((long long)0); + tmp[1u].append((long long)0); + tmp[1u].append((long long)0); + tmp[1u].append((long long)0); + W.sendFrame(tmp.toString()); + strmRemove.erase(strm); + lastStrmStat.erase(strm); + } + } + if (!sent){ + Util::sleep(500); + } + } +} + /// Handles a single incoming API connection. /// Assumes the connection is unauthorized and will allow for 4 requests without authorization before disconnecting. int Controller::handleAPIConnection(Socket::Connection & conn){ @@ -137,6 +287,20 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ } } } + //Catch websocket requests + if (H.url == "/ws"){ + if (!authorized){ + H.Clean(); + H.body = "Please login first or provide a valid token authentication."; + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); + H.SendResponse("403", "Not authorized", conn); + H.Clean(); + continue; + } + handleWebSocket(H, conn); + H.Clean(); + continue; + } JSON::Value Response; JSON::Value Request = JSON::fromString(H.GetVar("command")); //invalid request? send the web interface, unless requested as "/api" diff --git a/src/controller/controller_api.h b/src/controller/controller_api.h index b95dc53a..e274b178 100644 --- a/src/controller/controller_api.h +++ b/src/controller/controller_api.h @@ -1,8 +1,11 @@ #include #include +#include +#include namespace Controller { bool authorize(JSON::Value & Request, JSON::Value & Response, Socket::Connection & conn); int handleAPIConnection(Socket::Connection & conn); void handleAPICommands(JSON::Value & Request, JSON::Value & Response); + void handleWebSocket(HTTP::Parser & H, Socket::Connection & C); } diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index a16defde..397b86e4 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "controller_statistics.h" #include "controller_storage.h" @@ -29,6 +30,20 @@ std::map Controller::sessions; / std::map Controller::connToSession; ///< Map of socket IDs to session info. tthread::mutex Controller::statsMutex; +//For server-wide totals. Local to this file only. +struct streamTotals { + unsigned long long upBytes; + unsigned long long downBytes; + unsigned long long inputs; + unsigned long long outputs; + unsigned long long viewers; + unsigned long long currIns; + unsigned long long currOuts; + unsigned long long currViews; + uint8_t status; +}; +static std::map streamStats; + Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ host = dhost; crc = dcrc; @@ -92,6 +107,8 @@ void Controller::SharedMemStats(void * config){ IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); statPointer = &statServer; std::set inactiveStreams; + Controller::initState(); + bool shiftWrites = true; while(((Util::Config*)config)->is_active){ { tthread::lock_guard guard(Controller::configMutex); @@ -113,6 +130,50 @@ void Controller::SharedMemStats(void * config){ mustWipe.pop_front(); } } + Util::RelAccX * strmStats = streamsAccessor(); + if (!strmStats || !strmStats->isReady()){strmStats = 0;} + uint64_t strmPos = 0; + if (strmStats){ + if (shiftWrites || (strmStats->getEndPos() - strmStats->getDeleted() != streamStats.size())){ + shiftWrites = true; + strmPos = strmStats->getEndPos(); + }else{ + strmPos = strmStats->getDeleted(); + } + } + if (streamStats.size()){ + for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ + uint8_t newState = Util::getStreamStatus(it->first); + uint8_t oldState = it->second.status; + if (newState != oldState){ + it->second.status = newState; + } + if (newState == STRMSTAT_OFF){ + inactiveStreams.insert(it->first); + } + if (strmStats){ + if (shiftWrites){ + strmStats->setString("stream", it->first, strmPos); + } + strmStats->setInt("status", it->second.status, strmPos); + strmStats->setInt("viewers", it->second.currViews, strmPos); + strmStats->setInt("inputs", it->second.currIns, strmPos); + strmStats->setInt("outputs", it->second.currOuts, strmPos); + ++strmPos; + } + } + } + if (strmStats && shiftWrites){ + shiftWrites = false; + uint64_t prevEnd = strmStats->getEndPos(); + strmStats->setEndPos(strmPos); + strmStats->setDeleted(prevEnd); + } + while (inactiveStreams.size()){ + streamStats.erase(*inactiveStreams.begin()); + inactiveStreams.erase(inactiveStreams.begin()); + shiftWrites = true; + } } Util::wait(1000); } @@ -121,18 +182,89 @@ void Controller::SharedMemStats(void * config){ if (Controller::restarting){ statServer.abandon(); } + Controller::deinitState(Controller::restarting); +} + +/// Gets a complete list of all streams currently in active state, with optional prefix matching +std::set Controller::getActiveStreams(const std::string & prefix){ + std::set ret; + Util::RelAccX * strmStats = streamsAccessor(); + if (!strmStats || !strmStats->isReady()){return ret;} + uint64_t endPos = strmStats->getEndPos(); + if (prefix.size()){ + for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ + if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} + const char * S = strmStats->getPointer("stream", i); + if (!strncmp(S, prefix.data(), prefix.size())){ + ret.insert(S); + } + } + }else{ + for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ + if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} + ret.insert(strmStats->getPointer("stream", i)); + } + } + return ret; } /// Updates the given active connection with new stats data. void Controller::statSession::update(unsigned long index, IPC::statExchange & data){ + long long prevDown = getDown(); + long long prevUp = getUp(); curConns[index].update(data); + //store timestamp of first received data, if older + if (firstSec > data.now()){ + firstSec = data.now(); + } //store timestamp of last received data, if newer if (data.now() > lastSec){ lastSec = data.now(); } - //store timestamp of first received data, if older - if (firstSec > data.now()){ - firstSec = data.now(); + long long currDown = getDown(); + long long currUp = getUp(); + if (currUp - prevUp < 0 || currDown-prevDown < 0){ + INFO_MSG("Negative data usage! %lldu/%lldd (u%lld->%lld) in %s over %s, #%lu", currUp-prevUp, currDown-prevDown, prevUp, currUp, data.streamName().c_str(), data.connector().c_str(), index); + } + if (currDown + currUp > COUNTABLE_BYTES){ + std::string streamName = data.streamName(); + if (prevUp + prevDown < COUNTABLE_BYTES){ + if (data.connector() == "INPUT"){ + streamStats[streamName].inputs++; + streamStats[streamName].currIns++; + sessionType = SESS_INPUT; + }else if (data.connector() == "OUTPUT"){ + streamStats[streamName].outputs++; + streamStats[streamName].currOuts++; + sessionType = SESS_OUTPUT; + }else{ + streamStats[streamName].viewers++; + streamStats[streamName].currViews++; + sessionType = SESS_VIEWER; + } + if (!streamName.size() || streamName[0] == 0){ + if (streamStats.count(streamName)){streamStats.erase(streamName);} + }else{ + streamStats[streamName].upBytes += currUp; + streamStats[streamName].downBytes += currDown; + } + }else{ + if (!streamName.size() || streamName[0] == 0){ + if (streamStats.count(streamName)){streamStats.erase(streamName);} + }else{ + streamStats[streamName].upBytes += currUp - prevUp; + streamStats[streamName].downBytes += currDown - prevDown; + } + if (sessionType == SESS_UNSET){ + if (data.connector() == "INPUT"){ + sessionType = SESS_INPUT; + }else if (data.connector() == "OUTPUT"){ + sessionType = SESS_OUTPUT; + }else{ + sessionType = SESS_VIEWER; + } + } + } } } @@ -171,6 +303,25 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ } } +void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){ + if (lastSec < disconnectPoint){ + switch (sessionType){ + case SESS_INPUT: + streamStats[index.streamName].currIns--; + break; + case SESS_OUTPUT: + streamStats[index.streamName].currOuts--; + break; + case SESS_VIEWER: + streamStats[index.streamName].currViews--; + break; + } + uint64_t duration = lastSec - firstSec; + if (duration < 1){duration = 1;} + Controller::logAccess("", index.streamName, index.connector, index.host, duration, getUp(), getDown(), ""); + } +} + /// Archives the given connection. void Controller::statSession::finish(unsigned long index){ oldConns.push_back(curConns[index]); @@ -181,6 +332,7 @@ void Controller::statSession::finish(unsigned long index){ Controller::statSession::statSession(){ firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; + sessionType = SESS_UNSET; } /// Moves the given connection to the given session @@ -351,6 +503,32 @@ long long Controller::statSession::getUp(unsigned long long t){ return retVal; } +/// Returns the cumulative downloaded bytes for this session at timestamp t. +long long Controller::statSession::getDown(){ + long long retVal = 0; + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){ + retVal += it->second.log.rbegin()->second.down; + } + } + } + return retVal; +} + +/// Returns the cumulative uploaded bytes for this session at timestamp t. +long long Controller::statSession::getUp(){ + long long retVal = 0; + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){ + retVal += it->second.log.rbegin()->second.up; + } + } + } + return retVal; +} + /// Returns the cumulative downloaded bytes per second for this session at timestamp t. long long Controller::statSession::getBpsDown(unsigned long long t){ unsigned long long aTime = t - 5; diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 96121d0e..a5d90fdc 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -19,6 +19,13 @@ namespace Controller { long long up; }; + enum sessType { + SESS_UNSET = 0, + SESS_INPUT, + SESS_OUTPUT, + SESS_VIEWER + }; + /// This is a comparison and storage class that keeps sessions apart from each other. /// Whenever two of these objects are not equal, it will create a new session. class sessIndex { @@ -60,13 +67,15 @@ namespace Controller { unsigned long long firstSec; unsigned long long lastSec; std::deque oldConns; - std::map curConns; + sessType sessionType; public: statSession(); + std::map curConns; void wipeOld(unsigned long long); void finish(unsigned long index); void switchOverTo(statSession & newSess, unsigned long index); void update(unsigned long index, IPC::statExchange & data); + void ping(const sessIndex & index, unsigned long long disconnectPoint); unsigned long long getStart(); unsigned long long getEnd(); bool hasDataFor(unsigned long long time); @@ -74,6 +83,8 @@ namespace Controller { long long getConnTime(unsigned long long time); long long getLastSecond(unsigned long long time); long long getDown(unsigned long long time); + long long getUp(); + long long getDown(); long long getUp(unsigned long long time); long long getBpsDown(unsigned long long time); long long getBpsUp(unsigned long long time); @@ -85,6 +96,8 @@ namespace Controller { extern std::map sessions; extern std::map connToSession; extern tthread::mutex statsMutex; + + std::set getActiveStreams(const std::string & prefix = ""); void parseStatistics(char * data, size_t len, unsigned int id); void fillClients(JSON::Value & req, JSON::Value & rep); void fillTotals(JSON::Value & req, JSON::Value & rep); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 781d8053..1a6b0901 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -18,27 +18,81 @@ namespace Controller { JSON::Value Storage; ///< Global storage of data. tthread::mutex configMutex; tthread::mutex logMutex; + unsigned long long logCounter = 0; bool configChanged = false; bool restarting = false; bool isTerminal = false; bool isColorized = false; + uint32_t maxLogsRecs = 0; + uint32_t maxAccsRecs = 0; + uint64_t firstLog = 0; + IPC::sharedPage * shmLogs = 0; + Util::RelAccX * rlxLogs = 0; + IPC::sharedPage * shmAccs = 0; + Util::RelAccX * rlxAccs = 0; + IPC::sharedPage * shmStrm = 0; + Util::RelAccX * rlxStrm = 0; + + Util::RelAccX * logAccessor(){ + return rlxLogs; + } + + Util::RelAccX * accesslogAccessor(){ + return rlxAccs; + } + + Util::RelAccX * streamsAccessor(){ + return rlxStrm; + } ///\brief Store and print a log message. ///\param kind The type of message. ///\param message The message to be logged. void Log(std::string kind, std::string message, bool noWriteToLog){ - tthread::lock_guard guard(logMutex); - JSON::Value m; - m.append(Util::epoch()); - m.append(kind); - m.append(message); - Storage["log"].append(m); - Storage["log"].shrink(100); // limit to 100 log messages - if (!noWriteToLog){ + if (noWriteToLog){ + tthread::lock_guard guard(logMutex); + JSON::Value m; + uint64_t logTime = Util::epoch(); + m.append((long long)logTime); + m.append(kind); + m.append(message); + Storage["log"].append(m); + Storage["log"].shrink(100); // limit to 100 log messages + logCounter++; + if (rlxLogs && rlxLogs->isReady()){ + if (!firstLog){ + firstLog = logCounter; + } + rlxLogs->setRCount(logCounter > maxLogsRecs ? maxLogsRecs : logCounter); + rlxLogs->setDeleted(logCounter > rlxLogs->getRCount() ? logCounter - rlxLogs->getRCount() : firstLog); + rlxLogs->setInt("time", logTime, logCounter-1); + rlxLogs->setString("kind", kind, logCounter-1); + rlxLogs->setString("msg", message, logCounter-1); + rlxLogs->setEndPos(logCounter); + } + }else{ std::cerr << kind << "|MistController|" << getpid() << "||" << message << "\n"; } } + void logAccess(const std::string & sessId, const std::string & strm, const std::string & conn, const std::string & host, uint64_t duration, uint64_t up, uint64_t down, const std::string & tags){ + if (rlxAccs && rlxAccs->isReady()){ + uint64_t newEndPos = rlxAccs->getEndPos(); + rlxAccs->setRCount(newEndPos+1 > maxLogsRecs ? maxAccsRecs : newEndPos+1); + rlxAccs->setDeleted(newEndPos + 1 > maxAccsRecs ? newEndPos + 1 - maxAccsRecs : 0); + rlxAccs->setInt("time", Util::epoch(), newEndPos); + rlxAccs->setString("session", sessId, newEndPos); + rlxAccs->setString("stream", strm, newEndPos); + rlxAccs->setString("connector", conn, newEndPos); + rlxAccs->setString("host", host, newEndPos); + rlxAccs->setInt("duration", duration, newEndPos); + rlxAccs->setInt("up", up, newEndPos); + rlxAccs->setInt("down", down, newEndPos); + rlxAccs->setString("tags", tags, newEndPos); + rlxAccs->setEndPos(newEndPos + 1); + } + } + ///\brief Write contents to Filename ///\param Filename The full path of the file to write to. ///\param contents The data to be written to the file. @@ -49,7 +103,93 @@ namespace Controller { File.close(); return File.good(); } - + + void initState(){ + tthread::lock_guard guard(logMutex); + shmLogs = new IPC::sharedPage(SHM_STATE_LOGS, 1024*1024, true);//max 1M of logs cached + if (!shmLogs->mapped){ + FAIL_MSG("Could not open memory page for logs buffer"); + return; + } + rlxLogs = new Util::RelAccX(shmLogs->mapped, false); + if (rlxLogs->isReady()){ + logCounter = rlxLogs->getEndPos(); + }else{ + rlxLogs->addField("time", RAX_64UINT); + rlxLogs->addField("kind", RAX_32STRING); + rlxLogs->addField("msg", RAX_512STRING); + rlxLogs->setReady(); + } + maxLogsRecs = (1024*1024 - rlxLogs->getOffset()) / rlxLogs->getRSize(); + + shmAccs = new IPC::sharedPage(SHM_STATE_ACCS, 1024*1024, true);//max 1M of accesslogs cached + if (!shmAccs->mapped){ + FAIL_MSG("Could not open memory page for access logs buffer"); + return; + } + rlxAccs = new Util::RelAccX(shmAccs->mapped, false); + if (!rlxAccs->isReady()){ + rlxAccs->addField("time", RAX_64UINT); + rlxAccs->addField("session", RAX_32STRING); + rlxAccs->addField("stream", RAX_128STRING); + rlxAccs->addField("connector", RAX_32STRING); + rlxAccs->addField("host", RAX_64STRING); + rlxAccs->addField("duration", RAX_32UINT); + rlxAccs->addField("up", RAX_64UINT); + rlxAccs->addField("down", RAX_64UINT); + rlxAccs->addField("tags", RAX_256STRING); + rlxAccs->setReady(); + } + maxAccsRecs = (1024*1024 - rlxAccs->getOffset()) / rlxAccs->getRSize(); + + shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024*1024, true);//max 1M of stream data + if (!shmStrm->mapped){ + FAIL_MSG("Could not open memory page for stream data"); + return; + } + rlxStrm = new Util::RelAccX(shmStrm->mapped, false); + if (!rlxStrm->isReady()){ + rlxStrm->addField("stream", RAX_128STRING); + rlxStrm->addField("status", RAX_UINT, 1); + rlxStrm->addField("viewers", RAX_64UINT); + rlxStrm->addField("inputs", RAX_64UINT); + rlxStrm->addField("outputs", RAX_64UINT); + rlxStrm->setReady(); + } + rlxStrm->setRCount((1024*1024 - rlxStrm->getOffset()) / rlxStrm->getRSize()); + } + + void deinitState(bool leaveBehind){ + tthread::lock_guard guard(logMutex); + if (!leaveBehind){ + rlxLogs->setExit(); + shmLogs->master = true; + rlxAccs->setExit(); + shmAccs->master = true; + rlxStrm->setExit(); + shmStrm->master = true; + }else{ + shmLogs->master = false; + shmAccs->master = false; + shmStrm->master = false; + } + Util::RelAccX * tmp = rlxLogs; + rlxLogs = 0; + delete tmp; + delete shmLogs; + shmLogs = 0; + tmp = rlxAccs; + rlxAccs = 0; + delete tmp; + delete shmAccs; + shmAccs = 0; + tmp = rlxStrm; + rlxStrm = 0; + delete tmp; + delete shmStrm; + shmStrm = 0; + } + void handleMsg(void *err){ Util::logParser((long long)err, fileno(stdout), Controller::isColorized, &Log); } diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 13081752..525715ea 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -2,6 +2,7 @@ #include #include #include +#include namespace Controller { extern std::string instanceId; ///