From dfc41cc59678b06306622ef8e9295347a95c4b09 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 15 May 2016 00:20:53 +0200 Subject: [PATCH] Prometheus stats split over viewers, incoming and outgoing. Load balancer updated to use new split stats and provide info per host and/or stream if requested over its port. --- src/analysers/load_analyser.cpp | 60 ++++++++- src/controller/controller_statistics.cpp | 165 +++++++++++++++-------- src/controller/controller_statistics.h | 9 ++ src/controller/controller_storage.cpp | 2 + src/controller/controller_storage.h | 1 + src/output/output.cpp | 13 +- src/output/output.h | 2 +- src/output/output_rtmp.cpp | 8 ++ src/output/output_rtmp.h | 1 + 9 files changed, 199 insertions(+), 62 deletions(-) diff --git a/src/analysers/load_analyser.cpp b/src/analysers/load_analyser.cpp index bc010c96..650803fb 100644 --- a/src/analysers/load_analyser.cpp +++ b/src/analysers/load_analyser.cpp @@ -44,6 +44,7 @@ class hostDetails{ upPrev = 0; downPrev = 0; prevTime = 0; + total = 0; addBandwidth = 0; availBandwidth = 128 * 1024 * 1024;//assume 1G connections } @@ -59,6 +60,27 @@ class hostDetails{ addBandwidth += 1 * 1024 * 1024; addBandwidth *= 1.2; } + ///Returns the count of viewers for a given stream s. + unsigned long long count(std::string & s){ + if (!hostMutex){hostMutex = new tthread::mutex();} + tthread::lock_guard guard(*hostMutex); + if (streams.count(s)){ + return streams[s].total; + }else{ + return 0; + } + } + ///Fills out a by reference given JSON::Value with current state. + void fillState(JSON::Value & r){ + if (!hostMutex){hostMutex = new tthread::mutex();} + tthread::lock_guard guard(*hostMutex); + r["cpu"] = (long long)(cpu/10); + if (ramMax){r["ram"] = (long long)((ramCurr*100) / ramMax);} + r["up"] = (long long)upSpeed; + r["down"] = (long long)downSpeed; + r["streams"] = (long long)streams.size(); + r["viewers"] = (long long)total; + } ///Scores a potential new connection to this server, on a scale from 0 to 3200. ///0 is horrible, 3200 is perfect. unsigned int rate(std::string & s){ @@ -74,7 +96,7 @@ class hostDetails{ if (streams.count(s)){score += 200;} //Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect. score += (1000 - (((upSpeed + addBandwidth) * 1000) / availBandwidth)); - MEDIUM_MSG("Scores: CPU %u, RAM %u, Stream %u, BW %u (-%u) (%lluMB/s avail)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024); + MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024); return score; } void addViewer(std::string & s){ @@ -96,8 +118,8 @@ class hostDetails{ cpu = d["cpu"].asInt(); ramMax = d["mem_total"].asInt(); ramCurr = d["mem_used"].asInt(); - total = d["sess_current"].asInt(); - unsigned long long currUp = d["upload"].asInt(), currDown = d["download"].asInt(); + total = d["curr"][0u].asInt(); + unsigned long long currUp = d["bw"][0u].asInt(), currDown = d["bw"][1u].asInt(); unsigned int timeDiff = 0; if (prevTime){ timeDiff = time(0) - prevTime; @@ -112,7 +134,7 @@ class hostDetails{ if (d.isMember("streams") && d["streams"].size()){ jsonForEach(d["streams"], it){ - unsigned int count =(*it)["sess_current"].asInt(); + unsigned int count = (*it)["curr"][0u].asInt() + (*it)["curr"][1u].asInt() + (*it)["curr"][2u].asInt(); if (!count){ if (streams.count(it.key())){ streams.erase(it.key()); @@ -120,8 +142,8 @@ class hostDetails{ continue; } struct streamDetails & strm = streams[it.key()]; - strm.total = count; - unsigned long long currTotal = (*it)["download"].asInt() + (*it)["upload"].asInt(); + strm.total = (*it)["curr"][0u].asInt(); + unsigned long long currTotal = (*it)["bw"][0u].asInt() + (*it)["bw"][1u].asInt(); if (timeDiff && count){ strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count; }else{ @@ -145,6 +167,32 @@ int handleRequest(Socket::Connection & conn){ HTTP::Parser H; while (conn){ if ((conn.spool() || conn.Received().size()) && H.Read(conn)){ + if (H.url.size() == 1){ + std::string host = H.GetVar("host"); + std::string stream = H.GetVar("stream"); + H.Clean(); + H.SetHeader("Content-Type", "text/plain"); + JSON::Value ret; + if (!host.size() && !stream.size()){ + for (std::map::iterator it = hosts.begin(); it != hosts.end(); ++it){ + it->second.fillState(ret[it->first]); + } + }else{ + if (stream.size()){ + unsigned long long strTot = 0; + for (std::map::iterator it = hosts.begin(); it != hosts.end(); ++it){ + strTot += it->second.count(stream); + } + ret = (long long)strTot; + }else if (hosts.count(host)){ + hosts[host].fillState(ret); + } + } + H.SetBody(ret.toPrettyString()); + H.SendResponse("200", "OK", conn); + H.Clean(); + continue; + } std::string stream = H.url.substr(1); INFO_MSG("Balancing stream %s", stream.c_str()); H.Clean(); diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 56667556..8913316a 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -7,6 +7,7 @@ #include "controller_statistics.h" #include "controller_limits.h" #include "controller_push.h" +#include "controller_storage.h" #ifndef KILL_ON_EXIT #define KILL_ON_EXIT false @@ -43,13 +44,17 @@ std::map Controller::activeStreams; struct streamTotals { unsigned long long upBytes; unsigned long long downBytes; - unsigned long long clients; + unsigned long long inputs; + unsigned long long outputs; + unsigned long long viewers; unsigned int timeout; }; static std::map streamStats; static unsigned long long servUpBytes = 0; static unsigned long long servDownBytes = 0; -static unsigned long long servClients = 0; +static unsigned long long servInputs = 0; +static unsigned long long servOutputs = 0; +static unsigned long long servViewers = 0; Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ host = dhost; @@ -211,8 +216,19 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da if (currDown + currUp > COUNTABLE_BYTES){ std::string streamName = data.streamName(); if (prevUp + prevDown < COUNTABLE_BYTES){ - ++servClients; - streamStats[streamName].clients++; + if (data.connector() == "INPUT"){ + ++servInputs; + streamStats[streamName].inputs++; + sessionType = SESS_INPUT; + }else if (data.connector() == "OUTPUT"){ + ++servOutputs; + streamStats[streamName].outputs++; + sessionType = SESS_OUTPUT; + }else{ + ++servViewers; + streamStats[streamName].viewers++; + sessionType = SESS_VIEWER; + } streamStats[streamName].upBytes += currUp; streamStats[streamName].downBytes += currDown; }else{ @@ -222,6 +238,10 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da } } +Controller::sessType Controller::statSession::getSessType(){ + return sessionType; +} + /// Archives the given connection. void Controller::statSession::wipeOld(unsigned long long cutOff){ if (firstSec > cutOff){ @@ -262,6 +282,7 @@ Controller::statSession::statSession(){ sync = 1; wipedUp = 0; wipedDown = 0; + sessionType = SESS_UNSET; } /// Moves the given connection to the given session @@ -1083,6 +1104,9 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i if (mode == PROMETHEUS_TEXT){ std::stringstream response; + response << "# HELP mist_logs Count of log messages since server start.\n"; + response << "# TYPE mist_logs counter\n"; + response << "mist_logs " << Controller::logCounter << "\n\n"; response << "# HELP mist_cpu Total CPU usage in tenths of percent.\n"; response << "# TYPE mist_cpu gauge\n"; response << "mist_cpu " << cpu_use << "\n\n"; @@ -1095,57 +1119,68 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i {//Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); - response << "# HELP mist_sessions_cached Number of sessions active in the last ~10 minutes.\n"; - response << "# TYPE mist_sessions_cached gauge\n"; - response << "mist_sessions_cached " << sessions.size() << "\n\n"; - //collect the data first - std::map clients; - unsigned long totClients = 0; + std::map streams; + unsigned long totViewers = 0, totInputs = 0, totOutputs = 0; unsigned int t = Util::epoch() - STATS_DELAY; //check all sessions if (sessions.size()){ for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.hasDataFor(t) && it->second.isViewerOn(t)){ - clients[it->first.streamName]++; - totClients++; + switch (it->second.getSessType()){ + case SESS_UNSET: + case SESS_VIEWER: + streams[it->first.streamName].viewers++; + totViewers++; + break; + case SESS_INPUT: + streams[it->first.streamName].inputs++; + totInputs++; + break; + case SESS_OUTPUT: + streams[it->first.streamName].outputs++; + totOutputs++; + break; + } + } } } - response << "# HELP mist_sessions_current Number of sessions active right now, server-wide.\n"; - response << "# TYPE mist_sessions_current gauge\n"; - response << "mist_sessions_current " << totClients << "\n\n"; + response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n"; + response << "# TYPE mist_sessions_total gauge\n"; + response << "mist_sessions_total{sessType=\"viewers\"} " << totViewers << "\n"; + response << "mist_sessions_total{sessType=\"incoming\"} " << totInputs << "\n"; + response << "mist_sessions_total{sessType=\"outgoing\"} " << totOutputs << "\n"; + response << "mist_sessions_total{sessType=\"cached\"} " << sessions.size() << "\n\n"; - response << "# HELP mist_sessions_total Count of unique sessions since server start.\n"; - response << "# TYPE mist_sessions_total counter\n"; - response << "mist_sessions_total " << servClients << "\n\n"; + response << "# HELP mist_sessions_count Counts of unique sessions by type since server start.\n"; + response << "# TYPE mist_sessions_count counter\n"; + response << "mist_sessions_count{sessType=\"viewers\"} " << servViewers << "\n"; + response << "mist_sessions_count{sessType=\"incoming\"} " << servInputs << "\n"; + response << "mist_sessions_count{sessType=\"outgoing\"} " << servOutputs << "\n\n"; - response << "# HELP mist_upload_total Count of bytes uploaded since server start.\n"; - response << "# TYPE mist_upload_total counter\n"; - response << "mist_upload_total " << servUpBytes << "\n\n"; + response << "# HELP mist_bw_total Count of bytes handled since server start, by direction.\n"; + response << "# TYPE mist_bw_total counter\n"; + response << "mist_bw_total{direction=\"up\"} " << servUpBytes << "\n"; + response << "mist_bw_total{direction=\"down\"} " << servDownBytes << "\n\n"; - response << "# HELP mist_download_total Count of bytes downloaded since server start.\n"; - response << "# TYPE mist_download_total counter\n"; - response << "mist_download_total " << servDownBytes << "\n\n"; - - response << "# HELP mist_current Number of sessions for a given stream active right now.\n"; - response << "# TYPE mist_current gauge\n"; - for (std::map::iterator it = clients.begin(); it != clients.end(); ++it){ - response << "mist_current{stream=\"" << it->first << "\"} " << it->second << "\n"; + response << "# HELP mist_viewers Number of sessions by type and stream active right now.\n"; + response << "# TYPE mist_viewers gauge\n"; + for (std::map::iterator it = streams.begin(); it != streams.end(); ++it){ + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"} " << it->second.viewers << "\n"; + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"incoming\"} " << it->second.inputs << "\n"; + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"} " << it->second.outputs << "\n"; } - response << "\n# HELP mist_sessions Count of unique sessions since stream start.\n"; - response << "# TYPE mist_sessions counter\n"; - response << "# HELP mist_upload Count of bytes uploaded since stream start.\n"; - response << "# TYPE mist_upload counter\n"; - response << "# HELP mist_download Count of bytes downloaded since stream start.\n"; - response << "# TYPE mist_download counter\n"; - std::set mustWipe; + response << "\n# HELP mist_viewcount Count of unique viewer sessions since stream start, per stream.\n"; + response << "# TYPE mist_viewcount counter\n"; + response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n"; + response << "# TYPE mist_bw counter\n"; for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ - response << "mist_sessions{stream=\"" << it->first << "\"} " << it->second.clients << "\n"; - response << "mist_upload{stream=\"" << it->first << "\"} " << it->second.upBytes << "\n"; - response << "mist_download{stream=\"" << it->first << "\"} " << it->second.downBytes << "\n"; + response << "mist_viewcount{stream=\"" << it->first << "\"} " << it->second.viewers << "\n"; + response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"} " << it->second.upBytes << "\n"; + response << "mist_bw{stream=\"" << it->first << "\",direction=\"down\"} " << it->second.downBytes << "\n"; } } H.Chunkify(response.str(), conn); @@ -1155,37 +1190,59 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i resp["cpu"] = cpu_use; resp["mem_total"] = mem_total; resp["mem_used"] = (mem_total - mem_free - mem_bufcache); + resp["logs"] = (long long)Controller::logCounter; {//Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); //collect the data first - std::map clients; - unsigned long totClients = 0; + std::map streams; + unsigned long totViewers = 0, totInputs = 0, totOutputs = 0; unsigned int t = Util::epoch() - STATS_DELAY; //check all sessions if (sessions.size()){ for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.hasDataFor(t) && it->second.isViewerOn(t)){ - clients[it->first.streamName]++; - totClients++; + switch (it->second.getSessType()){ + case SESS_UNSET: + case SESS_VIEWER: + streams[it->first.streamName].viewers++; + totViewers++; + break; + case SESS_INPUT: + streams[it->first.streamName].inputs++; + totInputs++; + break; + case SESS_OUTPUT: + streams[it->first.streamName].outputs++; + totOutputs++; + break; + } + } } } - resp["sess_cached"] = (long long)sessions.size(); - resp["sess_current"] = (long long)totClients; - resp["sess_total"] = (long long)servClients; - resp["upload"] = (long long)servUpBytes; - resp["download"] = (long long)servDownBytes; + resp["curr"].append((long long)totViewers); + resp["curr"].append((long long)totInputs); + resp["curr"].append((long long)totOutputs); + resp["curr"].append((long long)sessions.size()); + resp["tot"].append((long long)servViewers); + resp["tot"].append((long long)servInputs); + resp["tot"].append((long long)servOutputs); + resp["bw"].append((long long)servUpBytes); + resp["bw"].append((long long)servDownBytes); - for (std::map::iterator it = clients.begin(); it != clients.end(); ++it){ - resp["streams"][it->first]["sess_current"] = (long long)it->second; + for (std::map::iterator it = streams.begin(); it != streams.end(); ++it){ + resp["streams"][it->first]["curr"].append((long long)it->second.viewers); + resp["streams"][it->first]["curr"].append((long long)it->second.inputs); + resp["streams"][it->first]["curr"].append((long long)it->second.outputs); } - std::set mustWipe; for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ - resp["streams"][it->first]["sess_total"] = (long long)it->second.clients; - resp["streams"][it->first]["upload"] = (long long)it->second.upBytes; - resp["streams"][it->first]["download"] = (long long)it->second.downBytes; + resp["streams"][it->first]["tot"].append((long long)it->second.viewers); + resp["streams"][it->first]["tot"].append((long long)it->second.inputs); + resp["streams"][it->first]["tot"].append((long long)it->second.outputs); + resp["streams"][it->first]["bw"].append((long long)it->second.upBytes); + resp["streams"][it->first]["bw"].append((long long)it->second.downBytes); } } H.Chunkify(resp.toString(), conn); diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index f9d97e42..92d620c2 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -31,6 +31,13 @@ namespace Controller { long long up; }; + enum sessType { + SESS_UNSET = 0, + SESS_INPUT, + SESS_OUTPUT, + SESS_VIEWER + }; + /// This is a comparison and storage class that keeps sessions apart from each other. /// Whenever two of these objects are not equal, it will create a new session. class sessIndex { @@ -72,7 +79,9 @@ namespace Controller { std::deque oldConns; std::map curConns; char sync; + sessType sessionType; public: + sessType getSessType(); statSession(); void wipeOld(unsigned long long); void finish(unsigned long index); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index e1099fb6..7e20dd5d 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -16,6 +16,7 @@ namespace Controller { JSON::Value Storage; ///< Global storage of data. tthread::mutex configMutex; tthread::mutex logMutex; + unsigned long long logCounter = 0; bool configChanged = false; ///\brief Store and print a log message. @@ -36,6 +37,7 @@ namespace Controller { timeinfo = localtime (&rawtime); strftime(buffer,100,"%F %H:%M:%S",timeinfo); std::cout << "[" << buffer << "] " << kind << ": " << message << std::endl; + logCounter++; } ///\brief Write contents to Filename diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index bd65fff6..00e28174 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -9,6 +9,7 @@ namespace Controller { extern tthread::mutex logMutex;///< Mutex for log thread. extern tthread::mutex configMutex;///< Mutex for server config access. extern bool configChanged; ///< Bool that indicates config must be written to SHM. + extern unsigned long long logCounter; ///hasOption("target") && config->getString("target").size()){ + return "OUTPUT"; + }else{ + return capa["name"].asStringRef(); + } + } + void Output::stats(){ if (!isInitialized){ return; @@ -1144,7 +1155,7 @@ namespace Mist { } tmpEx.crc(crc); tmpEx.streamName(streamName); - tmpEx.connector(capa["name"].asString()); + tmpEx.connector(getStatsName()); tmpEx.up(myConn.dataUp()); tmpEx.down(myConn.dataDown()); tmpEx.time(now - myConn.connTime()); diff --git a/src/output/output.h b/src/output/output.h index a54da52e..5539bb4d 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -107,7 +107,7 @@ namespace Mist { protected://these are to be messed with by child classes virtual std::string getConnectedHost(); virtual std::string getConnectedBinHost(); - + virtual std::string getStatsName(); IPC::sharedClient statsPage;///< Shared memory used for statistics reporting. bool isBlocking;///< If true, indicates that myConn is blocking. diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index a29920c0..80f847c7 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -167,6 +167,14 @@ namespace Mist { return false; } + std::string OutRTMP::getStatsName(){ + if (isPushing){ + return "INPUT"; + }else{ + return Output::getStatsName(); + } + } + void OutRTMP::parseVars(std::string data){ std::string varname; std::string varval; diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 87154c22..a6d2ad45 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -23,6 +23,7 @@ namespace Mist { void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void sendCommand(AMF::Object & amfReply, int messageType, int streamId); + virtual std::string getStatsName(); }; }