Support for text-based prometheus-compatible instrumentation.

This commit is contained in:
Thulinma 2016-04-21 15:29:52 +02:00
parent fd123c8c1f
commit dd46788d37
4 changed files with 197 additions and 8 deletions

View file

@ -174,6 +174,7 @@ int main(int argc, char ** argv){
Controller::conf.addOption("uplink", JSON::fromString("{\"default\":\"\", \"arg\":\"string\", \"help\":\"MistSteward uplink host and port.\", \"short\":\"U\", \"long\":\"uplink\"}")); /*LTS*/
Controller::conf.addOption("uplink-name", JSON::fromString("{\"default\":\"" COMPILED_USERNAME "\", \"arg\":\"string\", \"help\":\"MistSteward uplink username.\", \"short\":\"N\", \"long\":\"uplink-name\"}")); /*LTS*/
Controller::conf.addOption("uplink-pass", JSON::fromString("{\"default\":\"" COMPILED_PASSWORD "\", \"arg\":\"string\", \"help\":\"MistSteward uplink password.\", \"short\":\"P\", \"long\":\"uplink-pass\"}")); /*LTS*/
Controller::conf.addOption("prometheus", JSON::fromString("{\"long\":\"prometheus\", \"short\":\"S\", \"arg\":\"string\" \"default\":\"\", \"help\":\"If set, allows collecting of Prometheus-style stats on the given path over the API port.\"}"));
Controller::conf.parseArgs(argc, argv);
if(Controller::conf.getString("logfile")!= ""){
//open logfile, dup stdout to logfile
@ -224,6 +225,10 @@ int main(int argc, char ** argv){
if (Controller::Storage["config"]["controller"]["username"]){
Controller::conf.getOption("username", true)[0u] = Controller::Storage["config"]["controller"]["username"];
}
if (Controller::Storage["config"]["controller"]["prometheus"]){
Controller::conf.getOption("prometheus", true)[0u] = Controller::Storage["config"]["controller"]["prometheus"];
}
Controller::Storage["config"]["controller"]["prometheus"] = Controller::conf.getString("prometheus");
Controller::writeConfig();
Controller::checkAvailProtocols();
createAccount(Controller::conf.getString("account"));

View file

@ -167,6 +167,17 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
//while connected and not past login attempt limit
while (conn && logins < 4){
if ((conn.spool() || conn.Received().size()) && H.Read(conn)){
//Catch prometheus requests
if (conf.getString("prometheus").size()){
if (H.url == "/"+Controller::conf.getString("prometheus")){
handlePrometheus(H, conn, PROMETHEUS_TEXT);
break;
}
if (H.url == "/"+Controller::conf.getString("prometheus")){
handlePrometheus(H, conn, PROMETHEUS_JSON);
break;
}
}
JSON::Value Response;
JSON::Value Request = JSON::fromString(H.GetVar("command"));
//invalid request? send the web interface, unless requested as "/api"

View file

@ -28,12 +28,25 @@
#define STAT_TOT_BPS_UP 4
#define STAT_TOT_ALL 0xFF
#define COUNTABLE_BYTES 128*1024
std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; ///< list of sessions that have statistics data available
std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
bool Controller::killOnExit = KILL_ON_EXIT;
tthread::mutex Controller::statsMutex;
//For server-wide totals. Local to this file only.
struct streamTotals {
unsigned long long upBytes;
unsigned long long downBytes;
unsigned long long clients;
unsigned int timeout;
};
static std::map<std::string, struct streamTotals> streamStats;
static unsigned long long servUpBytes = 0;
static unsigned long long servDownBytes = 0;
static unsigned long long servClients = 0;
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
host = dhost;
crc = dcrc;
@ -45,6 +58,12 @@ Controller::sessIndex::sessIndex(){
crc = 0;
}
std::string Controller::sessIndex::toStr(){
std::stringstream s;
s << host << " " << crc << " " << streamName << " " << connector;
return s.str();
}
/// Initializes a sessIndex from a statExchange object, converting binary format IP addresses into strings.
/// This extracts the host, stream name, connector and crc field, ignoring everything else.
Controller::sessIndex::sessIndex(IPC::statExchange & data){
@ -144,6 +163,8 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
sync = data.getSync();
}
}
long long prevDown = getDown();
long long prevUp = getUp();
curConns[index].update(data);
//store timestamp of last received data, if newer
if (data.now() > lastSec){
@ -153,6 +174,22 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
if (firstSec > data.now()){
firstSec = data.now();
}
long long currDown = getDown();
long long currUp = getUp();
servUpBytes += currUp - prevUp;
servDownBytes += currDown - prevDown;
if (currDown + currUp > COUNTABLE_BYTES){
std::string streamName = data.streamName();
if (prevUp + prevDown < COUNTABLE_BYTES){
++servClients;
streamStats[streamName].clients++;
streamStats[streamName].upBytes += currUp;
streamStats[streamName].downBytes += currDown;
}else{
streamStats[streamName].upBytes += currUp - prevUp;
streamStats[streamName].downBytes += currDown - prevDown;
}
}
}
/// Archives the given connection.
@ -164,6 +201,10 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
while (it->log.size() && it->log.begin()->first < cutOff){
if (it->log.size() == 1){
wipedDown += it->log.begin()->second.down;
wipedUp += it->log.begin()->second.up;
}
it->log.erase(it->log.begin());
}
if (it->log.size()){
@ -189,6 +230,8 @@ Controller::statSession::statSession(){
firstSec = 0xFFFFFFFFFFFFFFFFull;
lastSec = 0;
sync = 1;
wipedUp = 0;
wipedDown = 0;
}
/// Moves the given connection to the given session
@ -282,25 +325,25 @@ bool Controller::statSession::hasData(){
/// Returns true if this session should count as a viewer on the given timestamp.
bool Controller::statSession::isViewerOn(unsigned long long t){
return getUp(t) > 128 * 1024;
return getUp(t) + getDown(t) > COUNTABLE_BYTES;
}
/// Returns true if this session should count as a viewer
bool Controller::statSession::isViewer(){
long long upTotal = 0;
long long upTotal = wipedUp+wipedDown;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){
upTotal += it->log.rbegin()->second.up;
if (upTotal > 128*1024){return true;}
upTotal += it->log.rbegin()->second.up + it->log.rbegin()->second.down;
if (upTotal > COUNTABLE_BYTES){return true;}
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){
upTotal += it->second.log.rbegin()->second.up;
if (upTotal > 128*1024){return true;}
upTotal += it->second.log.rbegin()->second.up + it->second.log.rbegin()->second.down;
if (upTotal > COUNTABLE_BYTES){return true;}
}
}
}
@ -348,7 +391,7 @@ long long Controller::statSession::getLastSecond(unsigned long long t){
/// Returns the cumulative downloaded bytes for this session at timestamp t.
long long Controller::statSession::getDown(unsigned long long t){
long long retVal = 0;
long long retVal = wipedDown;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
@ -368,7 +411,7 @@ long long Controller::statSession::getDown(unsigned long long t){
/// Returns the cumulative uploaded bytes for this session at timestamp t.
long long Controller::statSession::getUp(unsigned long long t){
long long retVal = 0;
long long retVal = wipedUp;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
@ -386,6 +429,46 @@ long long Controller::statSession::getUp(unsigned long long t){
return retVal;
}
/// Returns the cumulative downloaded bytes for this session at timestamp t.
long long Controller::statSession::getDown(){
long long retVal = wipedDown;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){
retVal += it->log.rbegin()->second.down;
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){
retVal += it->second.log.rbegin()->second.down;
}
}
}
return retVal;
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
long long Controller::statSession::getUp(){
long long retVal = wipedUp;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){
retVal += it->log.rbegin()->second.up;
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){
retVal += it->second.log.rbegin()->second.up;
}
}
}
return retVal;
}
/// Returns the cumulative downloaded bytes per second for this session at timestamp t.
long long Controller::statSession::getBpsDown(unsigned long long t){
unsigned long long aTime = t - 5;
@ -466,6 +549,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
sessIndex idx(tmpEx);
//if the connection was already indexed and it has changed, move it
if (connToSession.count(id) && connToSession[id] != idx){
INSANE_MSG("SWITCHING %s OVER TO %s", connToSession[id].toStr().c_str(), idx.toStr().c_str());
sessions[connToSession[id]].switchOverTo(sessions[idx], id);
if (!sessions[connToSession[id]].hasData()){
sessions.erase(connToSession[id]);
@ -889,3 +973,81 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
}
//all done! return is by reference, so no need to return anything here.
}
void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, int mode){
switch (mode){
case PROMETHEUS_TEXT:
H.SetHeader("Content-Type", "text/plain; version=0.0.4");
break;
case PROMETHEUS_JSON:
H.SetHeader("Content-Type", "text/json");
break;
}
H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
H.StartResponse("200", "OK", H, conn);
std::stringstream response;
{//Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::mutex> guard(statsMutex);
response << "# HELP mist_sessions_cached Number of sessions active in the last ~10 minutes.\n";
response << "# TYPE mist_sessions_cached gauge\n";
response << "mist_sessions_cached " << sessions.size() << "\n\n";
//collect the data first
std::map<std::string, unsigned long> clients;
unsigned long totClients = 0;
unsigned int t = Util::epoch() - 15;
//check all sessions
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.hasDataFor(t) && it->second.isViewerOn(t)){
clients[it->first.streamName]++;
totClients++;
}
}
}
response << "# HELP mist_sessions_current Number of sessions active right now, server-wide.\n";
response << "# TYPE mist_sessions_current gauge\n";
response << "mist_sessions_current " << totClients << "\n\n";
response << "# HELP mist_sessions_total Count of unique sessions since server start.\n";
response << "# TYPE mist_sessions_total counter\n";
response << "mist_sessions_total " << servClients << "\n\n";
response << "# HELP mist_upload_total Count of bytes uploaded since server start.\n";
response << "# TYPE mist_upload_total counter\n";
response << "mist_upload_total " << servUpBytes << "\n\n";
response << "# HELP mist_download_total Count of bytes downloaded since server start.\n";
response << "# TYPE mist_download_total counter\n";
response << "mist_download_total " << servDownBytes << "\n\n";
response << "# HELP mist_current Number of sessions for a given stream active right now.\n";
response << "# TYPE mist_current gauge\n";
for (std::map<std::string, unsigned long>::iterator it = clients.begin(); it != clients.end(); ++it){
response << "mist_current{stream=\"" << it->first << "\"} " << it->second << "\n";
}
response << "\n# HELP mist_sessions Count of unique sessions since stream start.\n";
response << "# TYPE mist_sessions counter\n";
response << "# HELP mist_upload Count of bytes uploaded since stream start.\n";
response << "# TYPE mist_upload counter\n";
response << "# HELP mist_download Count of bytes downloaded since stream start.\n";
response << "# TYPE mist_download counter\n";
std::set<std::string> mustWipe;
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
response << "mist_sessions{stream=\"" << it->first << "\"} " << it->second.clients << "\n";
response << "mist_upload{stream=\"" << it->first << "\"} " << it->second.upBytes << "\n";
response << "mist_download{stream=\"" << it->first << "\"} " << it->second.downBytes << "\n";
}
}
H.Chunkify(response.str(), conn);
H.Chunkify("", conn);
H.Clean();
}

View file

@ -3,6 +3,8 @@
#include <mist/defines.h>
#include <mist/json.h>
#include <mist/tinythread.h>
#include <mist/http_parser.h>
#include <mist/socket.h>
#include <string>
#include <map>
@ -39,6 +41,7 @@ namespace Controller {
bool operator<= (const sessIndex &o) const;
bool operator< (const sessIndex &o) const;
bool operator>= (const sessIndex &o) const;
std::string toStr();
};
@ -56,6 +59,8 @@ namespace Controller {
private:
unsigned long long firstSec;
unsigned long long lastSec;
unsigned long long wipedUp;
unsigned long long wipedDown;
std::deque<statStorage> oldConns;
std::map<unsigned long, statStorage> curConns;
char sync;
@ -74,6 +79,8 @@ namespace Controller {
long long getConnTime(unsigned long long time);
long long getLastSecond(unsigned long long time);
long long getDown(unsigned long long time);
long long getUp();
long long getDown();
long long getUp(unsigned long long time);
long long getBpsDown(unsigned long long time);
long long getBpsUp(unsigned long long time);
@ -92,5 +99,9 @@ namespace Controller {
void fillTotals(JSON::Value & req, JSON::Value & rep);
void SharedMemStats(void * config);
bool hasViewers(std::string streamName);
#define PROMETHEUS_TEXT 0
#define PROMETHEUS_JSON 1
void handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, int mode);
}