From dd46788d379c6502e1b6b4ad35562a0072c00dc5 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 21 Apr 2016 15:29:52 +0200 Subject: [PATCH] Support for text-based prometheus-compatible instrumentation. --- src/controller/controller.cpp | 5 + src/controller/controller_api.cpp | 11 ++ src/controller/controller_statistics.cpp | 178 ++++++++++++++++++++++- src/controller/controller_statistics.h | 11 ++ 4 files changed, 197 insertions(+), 8 deletions(-) diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 64c3425e..85ada5dc 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -174,6 +174,7 @@ int main(int argc, char ** argv){ Controller::conf.addOption("uplink", JSON::fromString("{\"default\":\"\", \"arg\":\"string\", \"help\":\"MistSteward uplink host and port.\", \"short\":\"U\", \"long\":\"uplink\"}")); /*LTS*/ Controller::conf.addOption("uplink-name", JSON::fromString("{\"default\":\"" COMPILED_USERNAME "\", \"arg\":\"string\", \"help\":\"MistSteward uplink username.\", \"short\":\"N\", \"long\":\"uplink-name\"}")); /*LTS*/ Controller::conf.addOption("uplink-pass", JSON::fromString("{\"default\":\"" COMPILED_PASSWORD "\", \"arg\":\"string\", \"help\":\"MistSteward uplink password.\", \"short\":\"P\", \"long\":\"uplink-pass\"}")); /*LTS*/ + Controller::conf.addOption("prometheus", JSON::fromString("{\"long\":\"prometheus\", \"short\":\"S\", \"arg\":\"string\" \"default\":\"\", \"help\":\"If set, allows collecting of Prometheus-style stats on the given path over the API port.\"}")); Controller::conf.parseArgs(argc, argv); if(Controller::conf.getString("logfile")!= ""){ //open logfile, dup stdout to logfile @@ -224,6 +225,10 @@ int main(int argc, char ** argv){ if (Controller::Storage["config"]["controller"]["username"]){ Controller::conf.getOption("username", true)[0u] = Controller::Storage["config"]["controller"]["username"]; } + if (Controller::Storage["config"]["controller"]["prometheus"]){ + Controller::conf.getOption("prometheus", true)[0u] = Controller::Storage["config"]["controller"]["prometheus"]; + } + Controller::Storage["config"]["controller"]["prometheus"] = Controller::conf.getString("prometheus"); Controller::writeConfig(); Controller::checkAvailProtocols(); createAccount(Controller::conf.getString("account")); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 425a31fa..7a81742c 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -167,6 +167,17 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ //while connected and not past login attempt limit while (conn && logins < 4){ if ((conn.spool() || conn.Received().size()) && H.Read(conn)){ + //Catch prometheus requests + if (conf.getString("prometheus").size()){ + if (H.url == "/"+Controller::conf.getString("prometheus")){ + handlePrometheus(H, conn, PROMETHEUS_TEXT); + break; + } + if (H.url == "/"+Controller::conf.getString("prometheus")){ + handlePrometheus(H, conn, PROMETHEUS_JSON); + break; + } + } JSON::Value Response; JSON::Value Request = JSON::fromString(H.GetVar("command")); //invalid request? send the web interface, unless requested as "/api" diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 58560a1a..d7a392fb 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -28,12 +28,25 @@ #define STAT_TOT_BPS_UP 4 #define STAT_TOT_ALL 0xFF +#define COUNTABLE_BYTES 128*1024 std::map Controller::sessions; ///< list of sessions that have statistics data available std::map Controller::connToSession; ///< Map of socket IDs to session info. bool Controller::killOnExit = KILL_ON_EXIT; tthread::mutex Controller::statsMutex; +//For server-wide totals. Local to this file only. +struct streamTotals { + unsigned long long upBytes; + unsigned long long downBytes; + unsigned long long clients; + unsigned int timeout; +}; +static std::map streamStats; +static unsigned long long servUpBytes = 0; +static unsigned long long servDownBytes = 0; +static unsigned long long servClients = 0; + Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ host = dhost; crc = dcrc; @@ -45,6 +58,12 @@ Controller::sessIndex::sessIndex(){ crc = 0; } +std::string Controller::sessIndex::toStr(){ + std::stringstream s; + s << host << " " << crc << " " << streamName << " " << connector; + return s.str(); +} + /// Initializes a sessIndex from a statExchange object, converting binary format IP addresses into strings. /// This extracts the host, stream name, connector and crc field, ignoring everything else. Controller::sessIndex::sessIndex(IPC::statExchange & data){ @@ -144,6 +163,8 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da sync = data.getSync(); } } + long long prevDown = getDown(); + long long prevUp = getUp(); curConns[index].update(data); //store timestamp of last received data, if newer if (data.now() > lastSec){ @@ -153,6 +174,22 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da if (firstSec > data.now()){ firstSec = data.now(); } + long long currDown = getDown(); + long long currUp = getUp(); + servUpBytes += currUp - prevUp; + servDownBytes += currDown - prevDown; + if (currDown + currUp > COUNTABLE_BYTES){ + std::string streamName = data.streamName(); + if (prevUp + prevDown < COUNTABLE_BYTES){ + ++servClients; + streamStats[streamName].clients++; + streamStats[streamName].upBytes += currUp; + streamStats[streamName].downBytes += currDown; + }else{ + streamStats[streamName].upBytes += currUp - prevUp; + streamStats[streamName].downBytes += currDown - prevDown; + } + } } /// Archives the given connection. @@ -164,6 +201,10 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ while (it->log.size() && it->log.begin()->first < cutOff){ + if (it->log.size() == 1){ + wipedDown += it->log.begin()->second.down; + wipedUp += it->log.begin()->second.up; + } it->log.erase(it->log.begin()); } if (it->log.size()){ @@ -189,6 +230,8 @@ Controller::statSession::statSession(){ firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; sync = 1; + wipedUp = 0; + wipedDown = 0; } /// Moves the given connection to the given session @@ -282,25 +325,25 @@ bool Controller::statSession::hasData(){ /// Returns true if this session should count as a viewer on the given timestamp. bool Controller::statSession::isViewerOn(unsigned long long t){ - return getUp(t) > 128 * 1024; + return getUp(t) + getDown(t) > COUNTABLE_BYTES; } /// Returns true if this session should count as a viewer bool Controller::statSession::isViewer(){ - long long upTotal = 0; + long long upTotal = wipedUp+wipedDown; if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ if (it->log.size()){ - upTotal += it->log.rbegin()->second.up; - if (upTotal > 128*1024){return true;} + upTotal += it->log.rbegin()->second.up + it->log.rbegin()->second.down; + if (upTotal > COUNTABLE_BYTES){return true;} } } } if (curConns.size()){ for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ if (it->second.log.size()){ - upTotal += it->second.log.rbegin()->second.up; - if (upTotal > 128*1024){return true;} + upTotal += it->second.log.rbegin()->second.up + it->second.log.rbegin()->second.down; + if (upTotal > COUNTABLE_BYTES){return true;} } } } @@ -348,7 +391,7 @@ long long Controller::statSession::getLastSecond(unsigned long long t){ /// Returns the cumulative downloaded bytes for this session at timestamp t. long long Controller::statSession::getDown(unsigned long long t){ - long long retVal = 0; + long long retVal = wipedDown; if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ if (it->hasDataFor(t)){ @@ -368,7 +411,7 @@ long long Controller::statSession::getDown(unsigned long long t){ /// Returns the cumulative uploaded bytes for this session at timestamp t. long long Controller::statSession::getUp(unsigned long long t){ - long long retVal = 0; + long long retVal = wipedUp; if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ if (it->hasDataFor(t)){ @@ -386,6 +429,46 @@ long long Controller::statSession::getUp(unsigned long long t){ return retVal; } +/// Returns the cumulative downloaded bytes for this session at timestamp t. +long long Controller::statSession::getDown(){ + long long retVal = wipedDown; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){ + retVal += it->log.rbegin()->second.down; + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){ + retVal += it->second.log.rbegin()->second.down; + } + } + } + return retVal; +} + +/// Returns the cumulative uploaded bytes for this session at timestamp t. +long long Controller::statSession::getUp(){ + long long retVal = wipedUp; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){ + retVal += it->log.rbegin()->second.up; + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){ + retVal += it->second.log.rbegin()->second.up; + } + } + } + return retVal; +} + /// Returns the cumulative downloaded bytes per second for this session at timestamp t. long long Controller::statSession::getBpsDown(unsigned long long t){ unsigned long long aTime = t - 5; @@ -466,6 +549,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ sessIndex idx(tmpEx); //if the connection was already indexed and it has changed, move it if (connToSession.count(id) && connToSession[id] != idx){ + INSANE_MSG("SWITCHING %s OVER TO %s", connToSession[id].toStr().c_str(), idx.toStr().c_str()); sessions[connToSession[id]].switchOverTo(sessions[idx], id); if (!sessions[connToSession[id]].hasData()){ sessions.erase(connToSession[id]); @@ -889,3 +973,81 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ } //all done! return is by reference, so no need to return anything here. } + +void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, int mode){ + switch (mode){ + case PROMETHEUS_TEXT: + H.SetHeader("Content-Type", "text/plain; version=0.0.4"); + break; + case PROMETHEUS_JSON: + H.SetHeader("Content-Type", "text/json"); + break; + } + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); + H.StartResponse("200", "OK", H, conn); + std::stringstream response; + + {//Scope for shortest possible blocking of statsMutex + tthread::lock_guard guard(statsMutex); + response << "# HELP mist_sessions_cached Number of sessions active in the last ~10 minutes.\n"; + response << "# TYPE mist_sessions_cached gauge\n"; + response << "mist_sessions_cached " << sessions.size() << "\n\n"; + + //collect the data first + std::map clients; + unsigned long totClients = 0; + unsigned int t = Util::epoch() - 15; + //check all sessions + if (sessions.size()){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->second.hasDataFor(t) && it->second.isViewerOn(t)){ + clients[it->first.streamName]++; + totClients++; + } + } + } + + response << "# HELP mist_sessions_current Number of sessions active right now, server-wide.\n"; + response << "# TYPE mist_sessions_current gauge\n"; + response << "mist_sessions_current " << totClients << "\n\n"; + + response << "# HELP mist_sessions_total Count of unique sessions since server start.\n"; + response << "# TYPE mist_sessions_total counter\n"; + response << "mist_sessions_total " << servClients << "\n\n"; + + response << "# HELP mist_upload_total Count of bytes uploaded since server start.\n"; + response << "# TYPE mist_upload_total counter\n"; + response << "mist_upload_total " << servUpBytes << "\n\n"; + + response << "# HELP mist_download_total Count of bytes downloaded since server start.\n"; + response << "# TYPE mist_download_total counter\n"; + response << "mist_download_total " << servDownBytes << "\n\n"; + + response << "# HELP mist_current Number of sessions for a given stream active right now.\n"; + response << "# TYPE mist_current gauge\n"; + for (std::map::iterator it = clients.begin(); it != clients.end(); ++it){ + response << "mist_current{stream=\"" << it->first << "\"} " << it->second << "\n"; + } + + response << "\n# HELP mist_sessions Count of unique sessions since stream start.\n"; + response << "# TYPE mist_sessions counter\n"; + response << "# HELP mist_upload Count of bytes uploaded since stream start.\n"; + response << "# TYPE mist_upload counter\n"; + response << "# HELP mist_download Count of bytes downloaded since stream start.\n"; + response << "# TYPE mist_download counter\n"; + std::set mustWipe; + for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ + response << "mist_sessions{stream=\"" << it->first << "\"} " << it->second.clients << "\n"; + response << "mist_upload{stream=\"" << it->first << "\"} " << it->second.upBytes << "\n"; + response << "mist_download{stream=\"" << it->first << "\"} " << it->second.downBytes << "\n"; + } + + + + } + + H.Chunkify(response.str(), conn); + H.Chunkify("", conn); + H.Clean(); +} + diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 5f43e6d5..e2a7971c 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include @@ -39,6 +41,7 @@ namespace Controller { bool operator<= (const sessIndex &o) const; bool operator< (const sessIndex &o) const; bool operator>= (const sessIndex &o) const; + std::string toStr(); }; @@ -56,6 +59,8 @@ namespace Controller { private: unsigned long long firstSec; unsigned long long lastSec; + unsigned long long wipedUp; + unsigned long long wipedDown; std::deque oldConns; std::map curConns; char sync; @@ -74,6 +79,8 @@ namespace Controller { long long getConnTime(unsigned long long time); long long getLastSecond(unsigned long long time); long long getDown(unsigned long long time); + long long getUp(); + long long getDown(); long long getUp(unsigned long long time); long long getBpsDown(unsigned long long time); long long getBpsUp(unsigned long long time); @@ -92,5 +99,9 @@ namespace Controller { void fillTotals(JSON::Value & req, JSON::Value & rep); void SharedMemStats(void * config); bool hasViewers(std::string streamName); + +#define PROMETHEUS_TEXT 0 +#define PROMETHEUS_JSON 1 + void handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, int mode); }