diff --git a/lib/defines.h b/lib/defines.h index baa66eda..140f0c42 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -137,6 +137,9 @@ static inline void show_stackframe(){} #define SEM_CONF "/MstConfLock" #define SEM_SESSCACHE "/MstSessCacheLock" #define SHM_CONF "MstConf" +#define SHM_STATE_LOGS "MstStateLogs" +#define SHM_STATE_ACCS "MstStateAccs" +#define SHM_STATE_STREAMS "MstStateStreams" #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define SHM_SESSIONS "/MstSess" #define SHM_SESSIONS_ITEM 165 //4 byte crc, 100b streamname, 20b connector, 40b host, 1b sync diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 663c866e..968491da 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -105,6 +105,156 @@ bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket return false; }//Authorize +class streamStat{ + public: + streamStat(){ + status = 0; + viewers = 0; + inputs = 0; + outputs = 0; + } + streamStat(const Util::RelAccX & rlx, uint64_t entry){ + status = rlx.getInt("status", entry); + viewers = rlx.getInt("viewers", entry); + inputs = rlx.getInt("inputs", entry); + outputs = rlx.getInt("outputs", entry); + } + bool operator ==(const streamStat &b) const{ + return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs); + } + bool operator !=(const streamStat &b) const{ + return !(*this == b); + } + uint8_t status; + uint64_t viewers; + uint64_t inputs; + uint64_t outputs; +}; + +void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){ + std::string logs = H.GetVar("logs"); + std::string accs = H.GetVar("accs"); + bool doStreams = H.GetVar("streams").size(); + HTTP::Websocket W(C, H); + if (!W){return;} + + IPC::sharedPage shmLogs(SHM_STATE_LOGS, 1024*1024); + IPC::sharedPage shmAccs(SHM_STATE_ACCS, 1024*1024); + IPC::sharedPage shmStreams(SHM_STATE_STREAMS, 1024*1024); + Util::RelAccX rlxStreams(shmStreams.mapped); + Util::RelAccX rlxLog(shmLogs.mapped); + Util::RelAccX rlxAccs(shmAccs.mapped); + if (!rlxStreams.isReady()){doStreams = false;} + uint64_t logPos = 0; + bool doLog = false; + uint64_t accsPos = 0; + bool doAccs = false; + if (logs.size() && rlxLog.isReady()){ + doLog = true; + logPos = rlxLog.getEndPos(); + if (logs.substr(0, 6) == "since:"){ + uint64_t startLogs = JSON::Value(logs.substr(6)).asInt(); + logPos = rlxLog.getDeleted(); + while (logPos < rlxLog.getEndPos() && rlxLog.getInt("time", logPos) < startLogs){++logPos;} + }else{ + uint64_t numLogs = JSON::Value(logs).asInt(); + if (logPos <= numLogs){ + logPos = rlxLog.getDeleted(); + }else{ + logPos -= numLogs; + } + } + } + if (accs.size() && rlxAccs.isReady()){ + doAccs = true; + accsPos = rlxAccs.getEndPos(); + if (accs.substr(0, 6) == "since:"){ + uint64_t startAccs = JSON::Value(accs.substr(6)).asInt(); + accsPos = rlxAccs.getDeleted(); + while (accsPos < rlxAccs.getEndPos() && rlxAccs.getInt("time", accsPos) < startAccs){++accsPos;} + }else{ + uint64_t numAccs = JSON::Value(accs).asInt(); + if (accsPos <= numAccs){ + accsPos = rlxAccs.getDeleted(); + }else{ + accsPos -= numAccs; + } + } + } + std::map lastStrmStat; + std::set strmRemove; + while (W){ + bool sent = false; + while (doLog && rlxLog.getEndPos() > logPos){ + sent = true; + JSON::Value tmp; + tmp[0u] = "log"; + tmp[1u].append((long long)rlxLog.getInt("time", logPos)); + tmp[1u].append(rlxLog.getPointer("kind", logPos)); + tmp[1u].append(rlxLog.getPointer("msg", logPos)); + W.sendFrame(tmp.toString()); + logPos++; + } + while (doAccs && rlxAccs.getEndPos() > accsPos){ + sent = true; + JSON::Value tmp; + tmp[0u] = "access"; + tmp[1u].append((long long)rlxAccs.getInt("time", accsPos)); + tmp[1u].append(rlxAccs.getPointer("session", accsPos)); + tmp[1u].append(rlxAccs.getPointer("stream", accsPos)); + tmp[1u].append(rlxAccs.getPointer("connector", accsPos)); + tmp[1u].append(rlxAccs.getPointer("host", accsPos)); + tmp[1u].append((long long)rlxAccs.getInt("duration", accsPos)); + tmp[1u].append((long long)rlxAccs.getInt("up", accsPos)); + tmp[1u].append((long long)rlxAccs.getInt("down", accsPos)); + tmp[1u].append(rlxAccs.getPointer("tags", accsPos)); + W.sendFrame(tmp.toString()); + accsPos++; + } + if (doStreams){ + for (std::map::iterator it = lastStrmStat.begin(); it != lastStrmStat.end(); ++it){ + strmRemove.insert(it->first); + } + uint64_t startPos = rlxStreams.getDeleted(); + uint64_t endPos = rlxStreams.getEndPos(); + for (uint64_t cPos = startPos; cPos < endPos; ++cPos){ + std::string strm = rlxStreams.getPointer("stream", cPos); + strmRemove.erase(strm); + streamStat tmpStat(rlxStreams, cPos); + if (lastStrmStat[strm] != tmpStat){ + lastStrmStat[strm] = tmpStat; + sent = true; + JSON::Value tmp; + tmp[0u] = "stream"; + tmp[1u].append(strm); + tmp[1u].append((long long)tmpStat.status); + tmp[1u].append((long long)tmpStat.viewers); + tmp[1u].append((long long)tmpStat.inputs); + tmp[1u].append((long long)tmpStat.outputs); + W.sendFrame(tmp.toString()); + } + } + while (strmRemove.size()){ + std::string strm = *strmRemove.begin(); + sent = true; + JSON::Value tmp; + tmp[0u] = "stream"; + tmp[1u].append(strm); + tmp[1u].append((long long)0); + tmp[1u].append((long long)0); + tmp[1u].append((long long)0); + tmp[1u].append((long long)0); + W.sendFrame(tmp.toString()); + strmRemove.erase(strm); + lastStrmStat.erase(strm); + } + } + if (!sent){ + Util::sleep(500); + } + } +} + /// Handles a single incoming API connection. /// Assumes the connection is unauthorized and will allow for 4 requests without authorization before disconnecting. int Controller::handleAPIConnection(Socket::Connection & conn){ @@ -145,6 +295,20 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ } } } + //Catch websocket requests + if (H.url == "/ws"){ + if (!authorized){ + H.Clean(); + H.body = "Please login first or provide a valid token authentication."; + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); + H.SendResponse("403", "Not authorized", conn); + H.Clean(); + continue; + } + handleWebSocket(H, conn); + H.Clean(); + continue; + } //Catch prometheus requests if (Controller::prometheus.size()){ if (H.url == "/"+Controller::prometheus){ @@ -612,13 +776,12 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response if (*stream.rbegin() != '+'){ startPush(stream, target); }else{ + std::set activeStreams = Controller::getActiveStreams(stream); if (activeStreams.size()){ - for (std::map::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ - if (jt->first.substr(0, stream.size()) == stream){ - std::string streamname = jt->first; - std::string target_tmp = target; - startPush(streamname, target_tmp); - } + for (std::set::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ + std::string streamname = *jt; + std::string target_tmp = target; + startPush(streamname, target_tmp); } } } diff --git a/src/controller/controller_api.h b/src/controller/controller_api.h index 53081916..6c348716 100644 --- a/src/controller/controller_api.h +++ b/src/controller/controller_api.h @@ -1,9 +1,12 @@ #include #include +#include +#include namespace Controller { bool authorize(JSON::Value & Request, JSON::Value & Response, Socket::Connection & conn); int handleAPIConnection(Socket::Connection & conn); void handleAPICommands(JSON::Value & Request, JSON::Value & Response); + void handleWebSocket(HTTP::Parser & H, Socket::Connection & C); void handleUDPAPI(void * np); } diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index 344634a6..031c0db9 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -171,12 +171,13 @@ namespace Controller{ } if (waittime || it->size() > 2){ const std::string &pStr = (*it)[0u].asStringRef(); + std::set activeStreams = Controller::getActiveStreams(pStr); if (activeStreams.size()){ - for (std::map::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ - std::string streamname = jt->first; + for (std::set::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ + std::string streamname = *jt; std::string target = (*it)[1u]; if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ - if (!isPushActive(streamname, target) && Util::getStreamStatus(streamname) == STRMSTAT_READY){ + if (!isPushActive(streamname, target)){ if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ waitingPushes[streamname].erase(target); if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} @@ -269,11 +270,12 @@ namespace Controller{ startPush(streamname, target); return; } + const std::string &pStr = newPush[0u].asStringRef(); + std::set activeStreams = Controller::getActiveStreams(pStr); if (activeStreams.size()){ - const std::string &pStr = newPush[0u].asStringRef(); std::string target = newPush[1u].asStringRef(); - for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ - std::string streamname = it->first; + for (std::set::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ + std::string streamname = *it; if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ std::string tmpName = streamname; std::string tmpTarget = target; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 1e453966..73d81ee0 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -42,7 +42,6 @@ std::map Controller::sessions; / std::map Controller::connToSession; ///< Map of socket IDs to session info. bool Controller::killOnExit = KILL_ON_EXIT; tthread::mutex Controller::statsMutex; -std::map Controller::activeStreams; unsigned int Controller::maxConnsPerIP = 0; char noBWCountMatches[1717]; uint64_t bwLimit = 128*1024*1024;//gigabit default limit @@ -81,7 +80,10 @@ struct streamTotals { unsigned long long inputs; unsigned long long outputs; unsigned long long viewers; - unsigned int timeout; + unsigned long long currIns; + unsigned long long currOuts; + unsigned long long currViews; + uint8_t status; }; static std::map streamStats; static unsigned long long servUpBytes = 0; @@ -322,6 +324,8 @@ void Controller::SharedMemStats(void * config){ cacheLock->unlink(); cacheLock->open(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); std::set inactiveStreams; + Controller::initState(); + bool shiftWrites = true; while(((Util::Config*)config)->is_active){ { tthread::lock_guard guard(Controller::configMutex); @@ -352,12 +356,23 @@ void Controller::SharedMemStats(void * config){ mustWipe.pop_front(); } } - if (activeStreams.size()){ - for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ + Util::RelAccX * strmStats = streamsAccessor(); + if (!strmStats || !strmStats->isReady()){strmStats = 0;} + uint64_t strmPos = 0; + if (strmStats){ + if (shiftWrites || (strmStats->getEndPos() - strmStats->getDeleted() != streamStats.size())){ + shiftWrites = true; + strmPos = strmStats->getEndPos(); + }else{ + strmPos = strmStats->getDeleted(); + } + } + if (streamStats.size()){ + for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ uint8_t newState = Util::getStreamStatus(it->first); - uint8_t oldState = activeStreams[it->first]; + uint8_t oldState = it->second.status; if (newState != oldState){ - activeStreams[it->first] = newState; + it->second.status = newState; if (newState == STRMSTAT_READY){ streamStarted(it->first); }else{ @@ -369,12 +384,28 @@ void Controller::SharedMemStats(void * config){ if (newState == STRMSTAT_OFF){ inactiveStreams.insert(it->first); } + if (strmStats){ + if (shiftWrites){ + strmStats->setString("stream", it->first, strmPos); + } + strmStats->setInt("status", it->second.status, strmPos); + strmStats->setInt("viewers", it->second.currViews, strmPos); + strmStats->setInt("inputs", it->second.currIns, strmPos); + strmStats->setInt("outputs", it->second.currOuts, strmPos); + ++strmPos; + } } - while (inactiveStreams.size()){ - activeStreams.erase(*inactiveStreams.begin()); - streamStats.erase(*inactiveStreams.begin()); - inactiveStreams.erase(inactiveStreams.begin()); - } + } + if (strmStats && shiftWrites){ + shiftWrites = false; + uint64_t prevEnd = strmStats->getEndPos(); + strmStats->setEndPos(strmPos); + strmStats->setDeleted(prevEnd); + } + while (inactiveStreams.size()){ + streamStats.erase(*inactiveStreams.begin()); + inactiveStreams.erase(inactiveStreams.begin()); + shiftWrites = true; } /*LTS-START*/ Controller::writeSessionCache(); @@ -396,12 +427,36 @@ void Controller::SharedMemStats(void * config){ } /*LTS-END*/ } + Controller::deinitState(Controller::restarting); delete shmSessions; shmSessions = 0; delete cacheLock; cacheLock = 0; } +/// Gets a complete list of all streams currently in active state, with optional prefix matching +std::set Controller::getActiveStreams(const std::string & prefix){ + std::set ret; + Util::RelAccX * strmStats = streamsAccessor(); + if (!strmStats || !strmStats->isReady()){return ret;} + uint64_t endPos = strmStats->getEndPos(); + if (prefix.size()){ + for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ + if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} + const char * S = strmStats->getPointer("stream", i); + if (!strncmp(S, prefix.data(), prefix.size())){ + ret.insert(S); + } + } + }else{ + for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ + if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} + ret.insert(strmStats->getPointer("stream", i)); + } + } + return ret; +} + /// Forces a re-sync of the session /// Assumes the session cache will be updated separately - may not work correctly if this is forgotten! uint32_t Controller::statSession::invalidate(){ @@ -530,14 +585,17 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da if (data.connector() == "INPUT"){ ++servInputs; streamStats[streamName].inputs++; + streamStats[streamName].currIns++; sessionType = SESS_INPUT; }else if (data.connector() == "OUTPUT"){ ++servOutputs; streamStats[streamName].outputs++; + streamStats[streamName].currOuts++; sessionType = SESS_OUTPUT; }else{ ++servViewers; streamStats[streamName].viewers++; + streamStats[streamName].currViews++; sessionType = SESS_VIEWER; } if (!streamName.size() || streamName[0] == 0){ @@ -612,18 +670,31 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){ if (!tracked){return;} if (lastSec < disconnectPoint){ + switch (sessionType){ + case SESS_INPUT: + streamStats[index.streamName].currIns--; + break; + case SESS_OUTPUT: + streamStats[index.streamName].currOuts--; + break; + case SESS_VIEWER: + streamStats[index.streamName].currViews--; + break; + } + uint64_t duration = lastSec - firstActive; + if (duration < 1){duration = 1;} + std::stringstream tagStream; + if (tags.size()){ + for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ + tagStream << "[" << *it << "]"; + } + } + Controller::logAccess(index.ID, index.streamName, index.connector, index.host, duration, getUp(), getDown(), tagStream.str()); if (Controller::accesslog.size()){ - uint64_t duration = lastSec - firstActive; - if (duration < 1){duration = 1;} if (Controller::accesslog == "LOG"){ std::stringstream accessStr; accessStr << "Session <" << index.ID << "> " << index.streamName << " (" << index.connector << ") from " << index.host << " ended after " << duration << "s, avg " << getUp()/duration/1024 << "KB/s up " << getDown()/duration/1024 << "KB/s down."; - if (tags.size()){ - accessStr << " Tags: "; - for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ - accessStr << "[" << *it << "]"; - } - } + if (tags.size()){accessStr << " Tags: " << tagStream.str();} Controller::Log("ACCS", accessStr.str()); }else{ static std::ofstream accLogFile; @@ -645,11 +716,7 @@ void Controller::statSession::ping(const Controller::sessIndex & index, unsigned timeinfo = localtime(&rawtime); strftime(buffer, 100, "%F %H:%M:%S", timeinfo); accLogFile << buffer << ", " << index.ID << ", " << index.streamName << ", " << index.connector << ", " << index.host << ", " << duration << ", " << getUp()/duration/1024 << ", " << getDown()/duration/1024 << ", "; - if (tags.size()){ - for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ - accLogFile << "[" << *it << "]"; - } - } + if (tags.size()){accLogFile << tagStream.str();} accLogFile << std::endl; } } @@ -1019,11 +1086,6 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ }else{ if (sessions[idx].getSessType() != SESS_OUTPUT && sessions[idx].getSessType() != SESS_UNSET){ std::string strmName = tmpEx.streamName(); - if (strmName.size()){ - if (!activeStreams.count(strmName)){ - activeStreams[strmName] = 0; - } - } } } } @@ -1558,7 +1620,6 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i {//Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); //collect the data first - std::map streams; std::map outputs; unsigned long totViewers = 0, totInputs = 0, totOutputs = 0; unsigned int tOut = Util::epoch() - STATS_DELAY; @@ -1570,20 +1631,17 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i case SESS_UNSET: case SESS_VIEWER: if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - streams[it->first.streamName].viewers++; outputs[it->first.connector]++; totViewers++; } break; case SESS_INPUT: if (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn)){ - streams[it->first.streamName].inputs++; totInputs++; } break; case SESS_OUTPUT: if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - streams[it->first.streamName].outputs++; totOutputs++; } break; @@ -1621,19 +1679,16 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i response << "mist_bw_other{direction=\"down\"} " << servDownOtherBytes << "\n\n"; response << "mist_bw_limit " << bwLimit << "\n\n"; - response << "# HELP mist_viewers Number of sessions by type and stream active right now.\n"; + response << "\n# HELP mist_viewers Number of sessions by type and stream active right now.\n"; response << "# TYPE mist_viewers gauge\n"; - for (std::map::iterator it = streams.begin(); it != streams.end(); ++it){ - response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"} " << it->second.viewers << "\n"; - response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"incoming\"} " << it->second.inputs << "\n"; - response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"} " << it->second.outputs << "\n"; - } - - response << "\n# HELP mist_viewcount Count of unique viewer sessions since stream start, per stream.\n"; + response << "# HELP mist_viewcount Count of unique viewer sessions since stream start, per stream.\n"; response << "# TYPE mist_viewcount counter\n"; response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n"; response << "# TYPE mist_bw counter\n"; for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"} " << it->second.currViews << "\n"; + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"incoming\"} " << it->second.currIns << "\n"; + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"} " << it->second.currOuts << "\n"; response << "mist_viewcount{stream=\"" << it->first << "\"} " << it->second.viewers << "\n"; response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"} " << it->second.upBytes << "\n"; response << "mist_bw{stream=\"" << it->first << "\",direction=\"down\"} " << it->second.downBytes << "\n"; @@ -1652,7 +1707,6 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i {//Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); //collect the data first - std::map streams; std::map outputs; unsigned long totViewers = 0, totInputs = 0, totOutputs = 0; unsigned int tOut = Util::epoch() - STATS_DELAY; @@ -1664,20 +1718,17 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i case SESS_UNSET: case SESS_VIEWER: if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - streams[it->first.streamName].viewers++; outputs[it->first.connector]++; totViewers++; } break; case SESS_INPUT: if (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn)){ - streams[it->first.streamName].inputs++; totInputs++; } break; case SESS_OUTPUT: if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - streams[it->first.streamName].outputs++; totOutputs++; } break; @@ -1707,11 +1758,9 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i resp["streams"][it->first]["tot"].append((long long)it->second.outputs); resp["streams"][it->first]["bw"].append((long long)it->second.upBytes); resp["streams"][it->first]["bw"].append((long long)it->second.downBytes); - } - for (std::map::iterator it = streams.begin(); it != streams.end(); ++it){ - resp["streams"][it->first]["curr"].append((long long)it->second.viewers); - resp["streams"][it->first]["curr"].append((long long)it->second.inputs); - resp["streams"][it->first]["curr"].append((long long)it->second.outputs); + resp["streams"][it->first]["curr"].append((long long)it->second.currViews); + resp["streams"][it->first]["curr"].append((long long)it->second.currIns); + resp["streams"][it->first]["curr"].append((long long)it->second.currOuts); } for (std::map::iterator it = outputs.begin(); it != outputs.end(); ++it){ resp["outputs"][it->first] = (long long)it->second; diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 86947bb8..e54e08e0 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -120,6 +120,8 @@ namespace Controller { extern std::map sessions; extern std::map connToSession; extern tthread::mutex statsMutex; + + std::set getActiveStreams(const std::string & prefix = ""); void parseStatistics(char * data, size_t len, unsigned int id); void killStatistics(char * data, size_t len, unsigned int id); void fillClients(JSON::Value & req, JSON::Value & rep); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 1834e2a0..d2d9c8ff 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -7,7 +7,6 @@ #include #include #include //LTS -#include #include "controller_storage.h" #include "controller_capabilities.h" @@ -25,24 +24,76 @@ namespace Controller{ bool restarting = false; bool isTerminal = false; bool isColorized = false; + uint32_t maxLogsRecs = 0; + uint32_t maxAccsRecs = 0; + uint64_t firstLog = 0; + IPC::sharedPage * shmLogs = 0; + Util::RelAccX * rlxLogs = 0; + IPC::sharedPage * shmAccs = 0; + Util::RelAccX * rlxAccs = 0; + IPC::sharedPage * shmStrm = 0; + Util::RelAccX * rlxStrm = 0; + + Util::RelAccX * logAccessor(){ + return rlxLogs; + } + + Util::RelAccX * accesslogAccessor(){ + return rlxAccs; + } + + Util::RelAccX * streamsAccessor(){ + return rlxStrm; + } ///\brief Store and print a log message. ///\param kind The type of message. ///\param message The message to be logged. void Log(std::string kind, std::string message, bool noWriteToLog){ - tthread::lock_guard guard(logMutex); - JSON::Value m; - m.append(Util::epoch()); - m.append(kind); - m.append(message); - Storage["log"].append(m); - Storage["log"].shrink(100); // limit to 100 log messages - logCounter++; - if (!noWriteToLog){ + if (noWriteToLog){ + tthread::lock_guard guard(logMutex); + JSON::Value m; + uint64_t logTime = Util::epoch(); + m.append((long long)logTime); + m.append(kind); + m.append(message); + Storage["log"].append(m); + Storage["log"].shrink(100); // limit to 100 log messages + logCounter++; + if (rlxLogs && rlxLogs->isReady()){ + if (!firstLog){ + firstLog = logCounter; + } + rlxLogs->setRCount(logCounter > maxLogsRecs ? maxLogsRecs : logCounter); + rlxLogs->setDeleted(logCounter > rlxLogs->getRCount() ? logCounter - rlxLogs->getRCount() : firstLog); + rlxLogs->setInt("time", logTime, logCounter-1); + rlxLogs->setString("kind", kind, logCounter-1); + rlxLogs->setString("msg", message, logCounter-1); + rlxLogs->setEndPos(logCounter); + } + }else{ std::cerr << kind << "|MistController|" << getpid() << "||" << message << "\n"; } } + void logAccess(const std::string & sessId, const std::string & strm, const std::string & conn, const std::string & host, uint64_t duration, uint64_t up, uint64_t down, const std::string & tags){ + if (rlxAccs && rlxAccs->isReady()){ + uint64_t newEndPos = rlxAccs->getEndPos(); + rlxAccs->setRCount(newEndPos+1 > maxLogsRecs ? maxAccsRecs : newEndPos+1); + rlxAccs->setDeleted(newEndPos + 1 > maxAccsRecs ? newEndPos + 1 - maxAccsRecs : 0); + rlxAccs->setInt("time", Util::epoch(), newEndPos); + rlxAccs->setString("session", sessId, newEndPos); + rlxAccs->setString("stream", strm, newEndPos); + rlxAccs->setString("connector", conn, newEndPos); + rlxAccs->setString("host", host, newEndPos); + rlxAccs->setInt("duration", duration, newEndPos); + rlxAccs->setInt("up", up, newEndPos); + rlxAccs->setInt("down", down, newEndPos); + rlxAccs->setString("tags", tags, newEndPos); + rlxAccs->setEndPos(newEndPos + 1); + } + } + ///\brief Write contents to Filename ///\param Filename The full path of the file to write to. ///\param contents The data to be written to the file. @@ -54,6 +105,92 @@ namespace Controller{ return File.good(); } + void initState(){ + tthread::lock_guard guard(logMutex); + shmLogs = new IPC::sharedPage(SHM_STATE_LOGS, 1024*1024, true);//max 1M of logs cached + if (!shmLogs->mapped){ + FAIL_MSG("Could not open memory page for logs buffer"); + return; + } + rlxLogs = new Util::RelAccX(shmLogs->mapped, false); + if (rlxLogs->isReady()){ + logCounter = rlxLogs->getEndPos(); + }else{ + rlxLogs->addField("time", RAX_64UINT); + rlxLogs->addField("kind", RAX_32STRING); + rlxLogs->addField("msg", RAX_512STRING); + rlxLogs->setReady(); + } + maxLogsRecs = (1024*1024 - rlxLogs->getOffset()) / rlxLogs->getRSize(); + + shmAccs = new IPC::sharedPage(SHM_STATE_ACCS, 1024*1024, true);//max 1M of accesslogs cached + if (!shmAccs->mapped){ + FAIL_MSG("Could not open memory page for access logs buffer"); + return; + } + rlxAccs = new Util::RelAccX(shmAccs->mapped, false); + if (!rlxAccs->isReady()){ + rlxAccs->addField("time", RAX_64UINT); + rlxAccs->addField("session", RAX_32STRING); + rlxAccs->addField("stream", RAX_128STRING); + rlxAccs->addField("connector", RAX_32STRING); + rlxAccs->addField("host", RAX_64STRING); + rlxAccs->addField("duration", RAX_32UINT); + rlxAccs->addField("up", RAX_64UINT); + rlxAccs->addField("down", RAX_64UINT); + rlxAccs->addField("tags", RAX_256STRING); + rlxAccs->setReady(); + } + maxAccsRecs = (1024*1024 - rlxAccs->getOffset()) / rlxAccs->getRSize(); + + shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024*1024, true);//max 1M of stream data + if (!shmStrm->mapped){ + FAIL_MSG("Could not open memory page for stream data"); + return; + } + rlxStrm = new Util::RelAccX(shmStrm->mapped, false); + if (!rlxStrm->isReady()){ + rlxStrm->addField("stream", RAX_128STRING); + rlxStrm->addField("status", RAX_UINT, 1); + rlxStrm->addField("viewers", RAX_64UINT); + rlxStrm->addField("inputs", RAX_64UINT); + rlxStrm->addField("outputs", RAX_64UINT); + rlxStrm->setReady(); + } + rlxStrm->setRCount((1024*1024 - rlxStrm->getOffset()) / rlxStrm->getRSize()); + } + + void deinitState(bool leaveBehind){ + tthread::lock_guard guard(logMutex); + if (!leaveBehind){ + rlxLogs->setExit(); + shmLogs->master = true; + rlxAccs->setExit(); + shmAccs->master = true; + rlxStrm->setExit(); + shmStrm->master = true; + }else{ + shmLogs->master = false; + shmAccs->master = false; + shmStrm->master = false; + } + Util::RelAccX * tmp = rlxLogs; + rlxLogs = 0; + delete tmp; + delete shmLogs; + shmLogs = 0; + tmp = rlxAccs; + rlxAccs = 0; + delete tmp; + delete shmAccs; + shmAccs = 0; + tmp = rlxStrm; + rlxStrm = 0; + delete tmp; + delete shmStrm; + shmStrm = 0; + } + void handleMsg(void *err){ Util::logParser((long long)err, fileno(stdout), Controller::isColorized, &Log); } diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 1f0c704b..1884ff83 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -2,6 +2,7 @@ #include #include #include +#include namespace Controller { extern std::string instanceId; ///