Added STREAM_END trigger + viewerseconds stat in prometheus output

This commit is contained in:
Thulinma 2020-05-06 11:42:53 +02:00
parent 97357d0972
commit 19a4701be7
2 changed files with 35 additions and 3 deletions

View file

@ -109,6 +109,12 @@ namespace Controller{
trgs["STREAM_BUFFER"]["response"] = "ignored";
trgs["STREAM_BUFFER"]["response_action"] = "None.";
trgs["STREAM_END"]["when"] = "Every time a stream ends (no more viewers after a period of activity)";
trgs["STREAM_END"]["stream_specific"] = true;
trgs["STREAM_END"]["payload"] = "stream name (string)\ndownloaded bytes (integer)\nuploaded bytes (integer)\ntotal viewers (integer)\ntotal inputs (integer)\ntotal outputs (integer)\nviewer seconds (integer)";
trgs["STREAM_END"]["response"] = "ignored";
trgs["STREAM_END"]["response_action"] = "None.";
trgs["RTMP_PUSH_REWRITE"]["when"] =
"On incoming RTMP pushes, allows rewriting the RTMP URL to/from custom formatting";
trgs["RTMP_PUSH_REWRITE"]["stream_specific"] = false;

View file

@ -14,6 +14,7 @@
#include <mist/stream.h>
#include <mist/url.h>
#include <sys/statvfs.h> //for fstatvfs
#include <mist/triggers.h>
#ifndef KILL_ON_EXIT
#define KILL_ON_EXIT false
@ -105,6 +106,7 @@ struct streamTotals{
uint64_t currOuts;
uint64_t currViews;
uint8_t status;
uint64_t viewSeconds;
};
static std::map<std::string, struct streamTotals> streamStats;
static uint64_t servUpBytes = 0;
@ -114,6 +116,7 @@ static uint64_t servDownOtherBytes = 0;
static uint64_t servInputs = 0;
static uint64_t servOutputs = 0;
static uint64_t servViewers = 0;
static uint64_t servSeconds = 0;
Controller::sessIndex::sessIndex(){
crc = 0;
@ -388,10 +391,12 @@ void Controller::SharedMemStats(void *config){
servDownOtherBytes = 0;
servUpBytes = 0;
servDownBytes = 0;
servSeconds = 0;
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin();
it != streamStats.end(); ++it){
it->second.upBytes = 0;
it->second.downBytes = 0;
it->second.viewSeconds = 0;
}
}
// wipe old statistics
@ -466,7 +471,14 @@ void Controller::SharedMemStats(void *config){
strmStats->setDeleted(prevEnd);
}
while (inactiveStreams.size()){
streamStats.erase(*inactiveStreams.begin());
const std::string & streamName = *inactiveStreams.begin();
const streamTotals & stats = streamStats.at(streamName);
if(Triggers::shouldTrigger("STREAM_END", streamName)){
std::stringstream payload;
payload << streamName+"\n" << stats.downBytes << "\n" << stats.upBytes << "\n" << stats.viewers << "\n" << stats.inputs << "\n" << stats.outputs << "\n" << stats.viewSeconds;
Triggers::doTrigger("STREAM_END", payload.str(), streamName);
}
streamStats.erase(streamName);
inactiveStreams.erase(inactiveStreams.begin());
shiftWrites = true;
}
@ -611,13 +623,18 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm
curConns[index].update(statComm, index);
// store timestamp of first received data, if older
if (firstSec > statComm.getNow(index)){firstSec = statComm.getNow(index);}
uint64_t prevLastSec = lastSec;
uint64_t secIncr = 0;
// store timestamp of last received data, if newer
if (statComm.getNow(index) > lastSec){
lastSec = statComm.getNow(index);
if (!tracked){
tracked = true;
firstActive = firstSec;
}else{
secIncr = (statComm.getNow(index) - lastSec);
}
lastSec = statComm.getNow(index);
}
long long currDown = getDown();
long long currUp = getUp();
@ -678,6 +695,7 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm
}else{
streamStats[myStream].upBytes += currUp;
streamStats[myStream].downBytes += currDown;
if (sessionType == SESS_VIEWER){streamStats[myStream].viewSeconds += lastSec - firstSec;}
}
}else{
if (!myStream.size() || myStream[0] == 0){
@ -685,6 +703,7 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm
}else{
streamStats[myStream].upBytes += currUp - prevUp;
streamStats[myStream].downBytes += currDown - prevDown;
if (sessionType == SESS_VIEWER){streamStats[myStream].viewSeconds += secIncr;}
}
}
}
@ -1599,8 +1618,11 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
response << "mist_sessions_total{sessType=\"outgoing\"}" << totOutputs << "\n";
response << "mist_sessions_total{sessType=\"cached\"}" << sessions.size() << "\n\n";
response << "# HELP mist_outputs Number of viewers active right now, server-wide, by output "
"type.\n";
response << "# HELP mist_viewseconds_total Number of seconds any media was received by a viewer.\n";
response << "# TYPE mist_viewseconds_total counter\n";
response << "mist_viewseconds_total " << servSeconds << "\n";
response << "# HELP mist_outputs Number of viewers active right now, server-wide, by output type.\n";
response << "# TYPE mist_outputs gauge\n";
for (std::map<std::string, uint32_t>::iterator it = outputs.begin(); it != outputs.end(); ++it){
response << "mist_outputs{output=\"" << it->first << "\"}" << it->second << "\n";
@ -1631,6 +1653,9 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
response << "# TYPE mist_viewcount counter\n";
response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n";
response << "# TYPE mist_bw counter\n";
response << "# HELP mist_viewseconds Number of seconds any media was received by a viewer.\n";
response << "# TYPE mist_viewseconds counter\n";
response << "mist_viewseconds_total " << servSeconds << "\n";
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin();
it != streamStats.end(); ++it){
response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"}"
@ -1640,6 +1665,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"}"
<< it->second.currOuts << "\n";
response << "mist_viewcount{stream=\"" << it->first << "\"}" << it->second.viewers << "\n";
response << "mist_viewseconds{stream=\"" << it->first << "\"} " << it->second.viewSeconds << "\n";
response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"}" << it->second.upBytes << "\n";
response << "mist_bw{stream=\"" << it->first << "\",direction=\"down\"}" << it->second.downBytes << "\n";
}