Prometheus stats split over viewers, incoming and outgoing.

Load balancer updated to use new split stats and provide info per host and/or stream if requested over its port.
This commit is contained in:
Thulinma 2016-05-15 00:20:53 +02:00
parent 39a61b6380
commit dfc41cc596
9 changed files with 199 additions and 62 deletions

View file

@ -44,6 +44,7 @@ class hostDetails{
upPrev = 0;
downPrev = 0;
prevTime = 0;
total = 0;
addBandwidth = 0;
availBandwidth = 128 * 1024 * 1024;//assume 1G connections
}
@ -59,6 +60,27 @@ class hostDetails{
addBandwidth += 1 * 1024 * 1024;
addBandwidth *= 1.2;
}
///Returns the count of viewers for a given stream s.
unsigned long long count(std::string & s){
if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
if (streams.count(s)){
return streams[s].total;
}else{
return 0;
}
}
///Fills out a by reference given JSON::Value with current state.
void fillState(JSON::Value & r){
if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
r["cpu"] = (long long)(cpu/10);
if (ramMax){r["ram"] = (long long)((ramCurr*100) / ramMax);}
r["up"] = (long long)upSpeed;
r["down"] = (long long)downSpeed;
r["streams"] = (long long)streams.size();
r["viewers"] = (long long)total;
}
///Scores a potential new connection to this server, on a scale from 0 to 3200.
///0 is horrible, 3200 is perfect.
unsigned int rate(std::string & s){
@ -74,7 +96,7 @@ class hostDetails{
if (streams.count(s)){score += 200;}
//Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect.
score += (1000 - (((upSpeed + addBandwidth) * 1000) / availBandwidth));
MEDIUM_MSG("Scores: CPU %u, RAM %u, Stream %u, BW %u (-%u) (%lluMB/s avail)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024);
MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024);
return score;
}
void addViewer(std::string & s){
@ -96,8 +118,8 @@ class hostDetails{
cpu = d["cpu"].asInt();
ramMax = d["mem_total"].asInt();
ramCurr = d["mem_used"].asInt();
total = d["sess_current"].asInt();
unsigned long long currUp = d["upload"].asInt(), currDown = d["download"].asInt();
total = d["curr"][0u].asInt();
unsigned long long currUp = d["bw"][0u].asInt(), currDown = d["bw"][1u].asInt();
unsigned int timeDiff = 0;
if (prevTime){
timeDiff = time(0) - prevTime;
@ -112,7 +134,7 @@ class hostDetails{
if (d.isMember("streams") && d["streams"].size()){
jsonForEach(d["streams"], it){
unsigned int count =(*it)["sess_current"].asInt();
unsigned int count = (*it)["curr"][0u].asInt() + (*it)["curr"][1u].asInt() + (*it)["curr"][2u].asInt();
if (!count){
if (streams.count(it.key())){
streams.erase(it.key());
@ -120,8 +142,8 @@ class hostDetails{
continue;
}
struct streamDetails & strm = streams[it.key()];
strm.total = count;
unsigned long long currTotal = (*it)["download"].asInt() + (*it)["upload"].asInt();
strm.total = (*it)["curr"][0u].asInt();
unsigned long long currTotal = (*it)["bw"][0u].asInt() + (*it)["bw"][1u].asInt();
if (timeDiff && count){
strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count;
}else{
@ -145,6 +167,32 @@ int handleRequest(Socket::Connection & conn){
HTTP::Parser H;
while (conn){
if ((conn.spool() || conn.Received().size()) && H.Read(conn)){
if (H.url.size() == 1){
std::string host = H.GetVar("host");
std::string stream = H.GetVar("stream");
H.Clean();
H.SetHeader("Content-Type", "text/plain");
JSON::Value ret;
if (!host.size() && !stream.size()){
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){
it->second.fillState(ret[it->first]);
}
}else{
if (stream.size()){
unsigned long long strTot = 0;
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){
strTot += it->second.count(stream);
}
ret = (long long)strTot;
}else if (hosts.count(host)){
hosts[host].fillState(ret);
}
}
H.SetBody(ret.toPrettyString());
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
std::string stream = H.url.substr(1);
INFO_MSG("Balancing stream %s", stream.c_str());
H.Clean();

View file

@ -7,6 +7,7 @@
#include "controller_statistics.h"
#include "controller_limits.h"
#include "controller_push.h"
#include "controller_storage.h"
#ifndef KILL_ON_EXIT
#define KILL_ON_EXIT false
@ -43,13 +44,17 @@ std::map<std::string, unsigned int> Controller::activeStreams;
struct streamTotals {
unsigned long long upBytes;
unsigned long long downBytes;
unsigned long long clients;
unsigned long long inputs;
unsigned long long outputs;
unsigned long long viewers;
unsigned int timeout;
};
static std::map<std::string, struct streamTotals> streamStats;
static unsigned long long servUpBytes = 0;
static unsigned long long servDownBytes = 0;
static unsigned long long servClients = 0;
static unsigned long long servInputs = 0;
static unsigned long long servOutputs = 0;
static unsigned long long servViewers = 0;
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
host = dhost;
@ -211,8 +216,19 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
if (currDown + currUp > COUNTABLE_BYTES){
std::string streamName = data.streamName();
if (prevUp + prevDown < COUNTABLE_BYTES){
++servClients;
streamStats[streamName].clients++;
if (data.connector() == "INPUT"){
++servInputs;
streamStats[streamName].inputs++;
sessionType = SESS_INPUT;
}else if (data.connector() == "OUTPUT"){
++servOutputs;
streamStats[streamName].outputs++;
sessionType = SESS_OUTPUT;
}else{
++servViewers;
streamStats[streamName].viewers++;
sessionType = SESS_VIEWER;
}
streamStats[streamName].upBytes += currUp;
streamStats[streamName].downBytes += currDown;
}else{
@ -222,6 +238,10 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
}
}
Controller::sessType Controller::statSession::getSessType(){
return sessionType;
}
/// Archives the given connection.
void Controller::statSession::wipeOld(unsigned long long cutOff){
if (firstSec > cutOff){
@ -262,6 +282,7 @@ Controller::statSession::statSession(){
sync = 1;
wipedUp = 0;
wipedDown = 0;
sessionType = SESS_UNSET;
}
/// Moves the given connection to the given session
@ -1083,6 +1104,9 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
if (mode == PROMETHEUS_TEXT){
std::stringstream response;
response << "# HELP mist_logs Count of log messages since server start.\n";
response << "# TYPE mist_logs counter\n";
response << "mist_logs " << Controller::logCounter << "\n\n";
response << "# HELP mist_cpu Total CPU usage in tenths of percent.\n";
response << "# TYPE mist_cpu gauge\n";
response << "mist_cpu " << cpu_use << "\n\n";
@ -1095,57 +1119,68 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
{//Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::mutex> guard(statsMutex);
response << "# HELP mist_sessions_cached Number of sessions active in the last ~10 minutes.\n";
response << "# TYPE mist_sessions_cached gauge\n";
response << "mist_sessions_cached " << sessions.size() << "\n\n";
//collect the data first
std::map<std::string, unsigned long> clients;
unsigned long totClients = 0;
std::map<std::string, struct streamTotals> streams;
unsigned long totViewers = 0, totInputs = 0, totOutputs = 0;
unsigned int t = Util::epoch() - STATS_DELAY;
//check all sessions
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.hasDataFor(t) && it->second.isViewerOn(t)){
clients[it->first.streamName]++;
totClients++;
switch (it->second.getSessType()){
case SESS_UNSET:
case SESS_VIEWER:
streams[it->first.streamName].viewers++;
totViewers++;
break;
case SESS_INPUT:
streams[it->first.streamName].inputs++;
totInputs++;
break;
case SESS_OUTPUT:
streams[it->first.streamName].outputs++;
totOutputs++;
break;
}
}
}
}
response << "# HELP mist_sessions_current Number of sessions active right now, server-wide.\n";
response << "# TYPE mist_sessions_current gauge\n";
response << "mist_sessions_current " << totClients << "\n\n";
response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n";
response << "# TYPE mist_sessions_total gauge\n";
response << "mist_sessions_total{sessType=\"viewers\"} " << totViewers << "\n";
response << "mist_sessions_total{sessType=\"incoming\"} " << totInputs << "\n";
response << "mist_sessions_total{sessType=\"outgoing\"} " << totOutputs << "\n";
response << "mist_sessions_total{sessType=\"cached\"} " << sessions.size() << "\n\n";
response << "# HELP mist_sessions_total Count of unique sessions since server start.\n";
response << "# TYPE mist_sessions_total counter\n";
response << "mist_sessions_total " << servClients << "\n\n";
response << "# HELP mist_sessions_count Counts of unique sessions by type since server start.\n";
response << "# TYPE mist_sessions_count counter\n";
response << "mist_sessions_count{sessType=\"viewers\"} " << servViewers << "\n";
response << "mist_sessions_count{sessType=\"incoming\"} " << servInputs << "\n";
response << "mist_sessions_count{sessType=\"outgoing\"} " << servOutputs << "\n\n";
response << "# HELP mist_upload_total Count of bytes uploaded since server start.\n";
response << "# TYPE mist_upload_total counter\n";
response << "mist_upload_total " << servUpBytes << "\n\n";
response << "# HELP mist_bw_total Count of bytes handled since server start, by direction.\n";
response << "# TYPE mist_bw_total counter\n";
response << "mist_bw_total{direction=\"up\"} " << servUpBytes << "\n";
response << "mist_bw_total{direction=\"down\"} " << servDownBytes << "\n\n";
response << "# HELP mist_download_total Count of bytes downloaded since server start.\n";
response << "# TYPE mist_download_total counter\n";
response << "mist_download_total " << servDownBytes << "\n\n";
response << "# HELP mist_current Number of sessions for a given stream active right now.\n";
response << "# TYPE mist_current gauge\n";
for (std::map<std::string, unsigned long>::iterator it = clients.begin(); it != clients.end(); ++it){
response << "mist_current{stream=\"" << it->first << "\"} " << it->second << "\n";
response << "# HELP mist_viewers Number of sessions by type and stream active right now.\n";
response << "# TYPE mist_viewers gauge\n";
for (std::map<std::string, struct streamTotals>::iterator it = streams.begin(); it != streams.end(); ++it){
response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"} " << it->second.viewers << "\n";
response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"incoming\"} " << it->second.inputs << "\n";
response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"} " << it->second.outputs << "\n";
}
response << "\n# HELP mist_sessions Count of unique sessions since stream start.\n";
response << "# TYPE mist_sessions counter\n";
response << "# HELP mist_upload Count of bytes uploaded since stream start.\n";
response << "# TYPE mist_upload counter\n";
response << "# HELP mist_download Count of bytes downloaded since stream start.\n";
response << "# TYPE mist_download counter\n";
std::set<std::string> mustWipe;
response << "\n# HELP mist_viewcount Count of unique viewer sessions since stream start, per stream.\n";
response << "# TYPE mist_viewcount counter\n";
response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n";
response << "# TYPE mist_bw counter\n";
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
response << "mist_sessions{stream=\"" << it->first << "\"} " << it->second.clients << "\n";
response << "mist_upload{stream=\"" << it->first << "\"} " << it->second.upBytes << "\n";
response << "mist_download{stream=\"" << it->first << "\"} " << it->second.downBytes << "\n";
response << "mist_viewcount{stream=\"" << it->first << "\"} " << it->second.viewers << "\n";
response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"} " << it->second.upBytes << "\n";
response << "mist_bw{stream=\"" << it->first << "\",direction=\"down\"} " << it->second.downBytes << "\n";
}
}
H.Chunkify(response.str(), conn);
@ -1155,37 +1190,59 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
resp["cpu"] = cpu_use;
resp["mem_total"] = mem_total;
resp["mem_used"] = (mem_total - mem_free - mem_bufcache);
resp["logs"] = (long long)Controller::logCounter;
{//Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//collect the data first
std::map<std::string, unsigned long> clients;
unsigned long totClients = 0;
std::map<std::string, struct streamTotals> streams;
unsigned long totViewers = 0, totInputs = 0, totOutputs = 0;
unsigned int t = Util::epoch() - STATS_DELAY;
//check all sessions
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.hasDataFor(t) && it->second.isViewerOn(t)){
clients[it->first.streamName]++;
totClients++;
switch (it->second.getSessType()){
case SESS_UNSET:
case SESS_VIEWER:
streams[it->first.streamName].viewers++;
totViewers++;
break;
case SESS_INPUT:
streams[it->first.streamName].inputs++;
totInputs++;
break;
case SESS_OUTPUT:
streams[it->first.streamName].outputs++;
totOutputs++;
break;
}
}
}
}
resp["sess_cached"] = (long long)sessions.size();
resp["sess_current"] = (long long)totClients;
resp["sess_total"] = (long long)servClients;
resp["upload"] = (long long)servUpBytes;
resp["download"] = (long long)servDownBytes;
resp["curr"].append((long long)totViewers);
resp["curr"].append((long long)totInputs);
resp["curr"].append((long long)totOutputs);
resp["curr"].append((long long)sessions.size());
resp["tot"].append((long long)servViewers);
resp["tot"].append((long long)servInputs);
resp["tot"].append((long long)servOutputs);
resp["bw"].append((long long)servUpBytes);
resp["bw"].append((long long)servDownBytes);
for (std::map<std::string, unsigned long>::iterator it = clients.begin(); it != clients.end(); ++it){
resp["streams"][it->first]["sess_current"] = (long long)it->second;
for (std::map<std::string, struct streamTotals>::iterator it = streams.begin(); it != streams.end(); ++it){
resp["streams"][it->first]["curr"].append((long long)it->second.viewers);
resp["streams"][it->first]["curr"].append((long long)it->second.inputs);
resp["streams"][it->first]["curr"].append((long long)it->second.outputs);
}
std::set<std::string> mustWipe;
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
resp["streams"][it->first]["sess_total"] = (long long)it->second.clients;
resp["streams"][it->first]["upload"] = (long long)it->second.upBytes;
resp["streams"][it->first]["download"] = (long long)it->second.downBytes;
resp["streams"][it->first]["tot"].append((long long)it->second.viewers);
resp["streams"][it->first]["tot"].append((long long)it->second.inputs);
resp["streams"][it->first]["tot"].append((long long)it->second.outputs);
resp["streams"][it->first]["bw"].append((long long)it->second.upBytes);
resp["streams"][it->first]["bw"].append((long long)it->second.downBytes);
}
}
H.Chunkify(resp.toString(), conn);

View file

@ -31,6 +31,13 @@ namespace Controller {
long long up;
};
enum sessType {
SESS_UNSET = 0,
SESS_INPUT,
SESS_OUTPUT,
SESS_VIEWER
};
/// This is a comparison and storage class that keeps sessions apart from each other.
/// Whenever two of these objects are not equal, it will create a new session.
class sessIndex {
@ -72,7 +79,9 @@ namespace Controller {
std::deque<statStorage> oldConns;
std::map<unsigned long, statStorage> curConns;
char sync;
sessType sessionType;
public:
sessType getSessType();
statSession();
void wipeOld(unsigned long long);
void finish(unsigned long index);

View file

@ -16,6 +16,7 @@ namespace Controller {
JSON::Value Storage; ///< Global storage of data.
tthread::mutex configMutex;
tthread::mutex logMutex;
unsigned long long logCounter = 0;
bool configChanged = false;
///\brief Store and print a log message.
@ -36,6 +37,7 @@ namespace Controller {
timeinfo = localtime (&rawtime);
strftime(buffer,100,"%F %H:%M:%S",timeinfo);
std::cout << "[" << buffer << "] " << kind << ": " << message << std::endl;
logCounter++;
}
///\brief Write contents to Filename

View file

@ -9,6 +9,7 @@ namespace Controller {
extern tthread::mutex logMutex;///< Mutex for log thread.
extern tthread::mutex configMutex;///< Mutex for server config access.
extern bool configChanged; ///< Bool that indicates config must be written to SHM.
extern unsigned long long logCounter; ///<Count of logged messages since boot
/// Store and print a log message.
void Log(std::string kind, std::string message);

View file

@ -1123,6 +1123,17 @@ namespace Mist {
stats();
}
/// Returns the name as it should be used in statistics.
/// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name.
/// The default implementation is usually good enough for all the non-INPUT types.
std::string Output::getStatsName(){
if (config->hasOption("target") && config->getString("target").size()){
return "OUTPUT";
}else{
return capa["name"].asStringRef();
}
}
void Output::stats(){
if (!isInitialized){
return;
@ -1144,7 +1155,7 @@ namespace Mist {
}
tmpEx.crc(crc);
tmpEx.streamName(streamName);
tmpEx.connector(capa["name"].asString());
tmpEx.connector(getStatsName());
tmpEx.up(myConn.dataUp());
tmpEx.down(myConn.dataDown());
tmpEx.time(now - myConn.connTime());

View file

@ -107,7 +107,7 @@ namespace Mist {
protected://these are to be messed with by child classes
virtual std::string getConnectedHost();
virtual std::string getConnectedBinHost();
virtual std::string getStatsName();
IPC::sharedClient statsPage;///< Shared memory used for statistics reporting.
bool isBlocking;///< If true, indicates that myConn is blocking.

View file

@ -167,6 +167,14 @@ namespace Mist {
return false;
}
std::string OutRTMP::getStatsName(){
if (isPushing){
return "INPUT";
}else{
return Output::getStatsName();
}
}
void OutRTMP::parseVars(std::string data){
std::string varname;
std::string varval;

View file

@ -23,6 +23,7 @@ namespace Mist {
void parseChunk(Socket::Buffer & inputBuffer);
void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);
void sendCommand(AMF::Object & amfReply, int messageType, int streamId);
virtual std::string getStatsName();
};
}