diff --git a/src/controller/controller_capabilities.cpp b/src/controller/controller_capabilities.cpp index dbd8d4c6..1b4382fe 100644 --- a/src/controller/controller_capabilities.cpp +++ b/src/controller/controller_capabilities.cpp @@ -109,6 +109,12 @@ namespace Controller{ trgs["STREAM_BUFFER"]["response"] = "ignored"; trgs["STREAM_BUFFER"]["response_action"] = "None."; + trgs["STREAM_END"]["when"] = "Every time a stream ends (no more viewers after a period of activity)"; + trgs["STREAM_END"]["stream_specific"] = true; + trgs["STREAM_END"]["payload"] = "stream name (string)\ndownloaded bytes (integer)\nuploaded bytes (integer)\ntotal viewers (integer)\ntotal inputs (integer)\ntotal outputs (integer)\nviewer seconds (integer)"; + trgs["STREAM_END"]["response"] = "ignored"; + trgs["STREAM_END"]["response_action"] = "None."; + trgs["RTMP_PUSH_REWRITE"]["when"] = "On incoming RTMP pushes, allows rewriting the RTMP URL to/from custom formatting"; trgs["RTMP_PUSH_REWRITE"]["stream_specific"] = false; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 480c0081..78938487 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -14,6 +14,7 @@ #include #include #include //for fstatvfs +#include #ifndef KILL_ON_EXIT #define KILL_ON_EXIT false @@ -105,6 +106,7 @@ struct streamTotals{ uint64_t currOuts; uint64_t currViews; uint8_t status; + uint64_t viewSeconds; }; static std::map streamStats; static uint64_t servUpBytes = 0; @@ -114,6 +116,7 @@ static uint64_t servDownOtherBytes = 0; static uint64_t servInputs = 0; static uint64_t servOutputs = 0; static uint64_t servViewers = 0; +static uint64_t servSeconds = 0; Controller::sessIndex::sessIndex(){ crc = 0; @@ -388,10 +391,12 @@ void Controller::SharedMemStats(void *config){ servDownOtherBytes = 0; servUpBytes = 0; servDownBytes = 0; + servSeconds = 0; for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ it->second.upBytes = 0; it->second.downBytes = 0; + it->second.viewSeconds = 0; } } // wipe old statistics @@ -466,7 +471,14 @@ void Controller::SharedMemStats(void *config){ strmStats->setDeleted(prevEnd); } while (inactiveStreams.size()){ - streamStats.erase(*inactiveStreams.begin()); + const std::string & streamName = *inactiveStreams.begin(); + const streamTotals & stats = streamStats.at(streamName); + if(Triggers::shouldTrigger("STREAM_END", streamName)){ + std::stringstream payload; + payload << streamName+"\n" << stats.downBytes << "\n" << stats.upBytes << "\n" << stats.viewers << "\n" << stats.inputs << "\n" << stats.outputs << "\n" << stats.viewSeconds; + Triggers::doTrigger("STREAM_END", payload.str(), streamName); + } + streamStats.erase(streamName); inactiveStreams.erase(inactiveStreams.begin()); shiftWrites = true; } @@ -611,13 +623,18 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm curConns[index].update(statComm, index); // store timestamp of first received data, if older if (firstSec > statComm.getNow(index)){firstSec = statComm.getNow(index);} + uint64_t prevLastSec = lastSec; + uint64_t secIncr = 0; // store timestamp of last received data, if newer if (statComm.getNow(index) > lastSec){ lastSec = statComm.getNow(index); if (!tracked){ tracked = true; firstActive = firstSec; + }else{ + secIncr = (statComm.getNow(index) - lastSec); } + lastSec = statComm.getNow(index); } long long currDown = getDown(); long long currUp = getUp(); @@ -678,6 +695,7 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm }else{ streamStats[myStream].upBytes += currUp; streamStats[myStream].downBytes += currDown; + if (sessionType == SESS_VIEWER){streamStats[myStream].viewSeconds += lastSec - firstSec;} } }else{ if (!myStream.size() || myStream[0] == 0){ @@ -685,6 +703,7 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm }else{ streamStats[myStream].upBytes += currUp - prevUp; streamStats[myStream].downBytes += currDown - prevDown; + if (sessionType == SESS_VIEWER){streamStats[myStream].viewSeconds += secIncr;} } } } @@ -1599,8 +1618,11 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "mist_sessions_total{sessType=\"outgoing\"}" << totOutputs << "\n"; response << "mist_sessions_total{sessType=\"cached\"}" << sessions.size() << "\n\n"; - response << "# HELP mist_outputs Number of viewers active right now, server-wide, by output " - "type.\n"; + response << "# HELP mist_viewseconds_total Number of seconds any media was received by a viewer.\n"; + response << "# TYPE mist_viewseconds_total counter\n"; + response << "mist_viewseconds_total " << servSeconds << "\n"; + + response << "# HELP mist_outputs Number of viewers active right now, server-wide, by output type.\n"; response << "# TYPE mist_outputs gauge\n"; for (std::map::iterator it = outputs.begin(); it != outputs.end(); ++it){ response << "mist_outputs{output=\"" << it->first << "\"}" << it->second << "\n"; @@ -1631,6 +1653,9 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "# TYPE mist_viewcount counter\n"; response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n"; response << "# TYPE mist_bw counter\n"; + response << "# HELP mist_viewseconds Number of seconds any media was received by a viewer.\n"; + response << "# TYPE mist_viewseconds counter\n"; + response << "mist_viewseconds_total " << servSeconds << "\n"; for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"}" @@ -1640,6 +1665,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"}" << it->second.currOuts << "\n"; response << "mist_viewcount{stream=\"" << it->first << "\"}" << it->second.viewers << "\n"; + response << "mist_viewseconds{stream=\"" << it->first << "\"} " << it->second.viewSeconds << "\n"; response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"}" << it->second.upBytes << "\n"; response << "mist_bw{stream=\"" << it->first << "\",direction=\"down\"}" << it->second.downBytes << "\n"; }