From e4ac68db54e42aa24bd2415762b5075848d7a87a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 21 Mar 2018 11:48:16 +0100 Subject: [PATCH] Improved statistics and stream status --- lib/defines.h | 1 + src/controller/controller_api.cpp | 6 + src/controller/controller_statistics.cpp | 176 +++++++++++++++++------ src/controller/controller_statistics.h | 3 + src/controller/controller_streams.cpp | 88 ++++++------ 5 files changed, 186 insertions(+), 88 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index b39e8f2f..5e8c9fdf 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -97,6 +97,7 @@ static inline void show_stackframe(){} #ifndef STATS_DELAY #define STATS_DELAY 15 #endif +#define STATS_INPUT_DELAY 2 #ifndef INPUT_TIMEOUT #define INPUT_TIMEOUT STATS_DELAY diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index eb08d40e..66c1ea03 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -600,6 +600,12 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response Controller::fillTotals(Request["totals"], Response["totals"]); } } + if (Request.isMember("active_streams")){ + Controller::fillActive(Request["active_streams"], Response["active_streams"], true); + } + if (Request.isMember("stats_streams")){ + Controller::fillActive(Request["stats_streams"], Response["stats_streams"]); + } Controller::configChanged = true; } diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 397b86e4..3f0a95a1 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include "controller_statistics.h" #include "controller_storage.h" @@ -21,6 +23,8 @@ #define STAT_TOT_CLIENTS 1 #define STAT_TOT_BPS_DOWN 2 #define STAT_TOT_BPS_UP 4 +#define STAT_TOT_INPUTS 8 +#define STAT_TOT_OUTPUTS 16 #define STAT_TOT_ALL 0xFF #define COUNTABLE_BYTES 128*1024 @@ -268,6 +272,10 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da } } +Controller::sessType Controller::statSession::getSessType(){ + return sessionType; +} + /// Archives the given connection. void Controller::statSession::wipeOld(unsigned long long cutOff){ if (firstSec > cutOff){ @@ -424,6 +432,11 @@ bool Controller::statSession::hasData(){ return false; } +/// Returns true if this session should count as a viewer on the given timestamp. +bool Controller::statSession::isViewerOn(unsigned long long t){ + return getUp(t) + getDown(t) > COUNTABLE_BYTES; +} + /// Returns the cumulative connected time for this session at timestamp t. long long Controller::statSession::getConnTime(unsigned long long t){ long long retVal = 0; @@ -777,64 +790,137 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){ //all done! return is by reference, so no need to return anything here. } +/// This takes a "active_streams" request, and fills in the response data. +/// +/// \api +/// `"active_streams"` and `"stats_streams"` requests may either be empty, in which case the response looks like this: +/// ~~~~~~~~~~~~~~~{.js} +/// [ +/// //Array of stream names +/// "streamA", +/// "streamB", +/// "streamC" +/// ] +/// ~~~~~~~~~~~~~~~ +/// `"stats_streams"` will list all streams that any statistics data is available for, and only those. `"active_streams"` only lists streams that are currently active, and only those. +/// If the request is an array, which may contain any of the following elements: +/// ~~~~~~~~~~~~~~~{.js} +/// [ +/// //Array of requested data types +/// "clients", //Current viewer count +/// "lastms" //Current position in the live buffer, if live +/// ] +/// ~~~~~~~~~~~~~~~ +/// In which case the response is changed into this format: +/// ~~~~~~~~~~~~~~~{.js} +/// { +/// //Object of stream names, containing arrays in the same order as the request, with the same data +/// "streamA":[ +/// 0, +/// 60000 +/// ] +/// "streamB":[ +/// //.... +/// ] +/// //... +/// } +/// ~~~~~~~~~~~~~~~ +/// All streams that any statistics data is available for are listed, and only those streams. +void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){ + //collect the data first + std::set streams; + std::map clients; + unsigned int tOut = Util::epoch() - STATS_DELAY; + unsigned int tIn = Util::epoch() - STATS_INPUT_DELAY; + //check all sessions + { + tthread::lock_guard guard(statsMutex); + if (sessions.size()){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->second.getSessType() == SESS_INPUT){ + if (!onlyNow || (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn))){ + streams.insert(it->first.streamName); + } + }else{ + if (!onlyNow || (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut))){ + streams.insert(it->first.streamName); + if (it->second.getSessType() == SESS_VIEWER){ + clients[it->first.streamName]++; + } + } + } + } + } + } + //Good, now output what we found... + rep.null(); + for (std::set::iterator it = streams.begin(); it != streams.end(); it++){ + if (req.isArray()){ + rep[*it].null(); + jsonForEach(req, j){ + if (j->asStringRef() == "clients"){ + rep[*it].append((long long)clients[*it]); + } + if (j->asStringRef() == "lastms"){ + char pageId[NAME_BUFFER_SIZE]; + IPC::sharedPage streamIndex; + snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, it->c_str()); + streamIndex.init(pageId, DEFAULT_STRM_PAGE_SIZE, false, false); + if (streamIndex.mapped){ + static char liveSemName[NAME_BUFFER_SIZE]; + snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, it->c_str()); + IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1); + metaLocker.wait(); + DTSC::Scan strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan(); + long long lms = 0; + DTSC::Scan trcks = strm.getMember("tracks"); + unsigned int trcks_ctr = trcks.getSize(); + for (unsigned int i = 0; i < trcks_ctr; ++i){ + if (trcks.getIndice(i).getMember("lastms").asInt() > lms){ + lms = trcks.getIndice(i).getMember("lastms").asInt(); + } + } + rep[*it].append(lms); + metaLocker.post(); + }else{ + rep[*it].append(-1ll); + } + } + } + }else{ + rep.append(*it); + } + } + //all done! return is by reference, so no need to return anything here. +} + class totalsData { public: totalsData(){ clients = 0; + inputs = 0; + outputs = 0; downbps = 0; upbps = 0; } - void add(unsigned int down, unsigned int up){ + void add(unsigned int down, unsigned int up, Controller::sessType sT){ + switch (sT){ + case Controller::SESS_VIEWER: clients++; break; + case Controller::SESS_INPUT: inputs++; break; + case Controller::SESS_OUTPUT: outputs++; break; + } clients++; downbps += down; upbps += up; } long long clients; + long long inputs; + long long outputs; long long downbps; long long upbps; }; /// This takes a "totals" request, and fills in the response data. -/// -/// \api -/// `"totals"` requests take the form of: -/// ~~~~~~~~~~~~~~~{.js} -/// { -/// //array of streamnames to accumulate. Empty means all. -/// "streams": ["streama", "streamb", "streamc"], -/// //array of protocols to accumulate. Empty means all. -/// "protocols": ["HLS", "HSS"], -/// //list of requested data fields. Empty means all. -/// "fields": ["clients", "downbps", "upbps"], -/// //unix timestamp of data start. Negative means X seconds ago. Empty means earliest available. -/// "start": 1234567 -/// //unix timestamp of data end. Negative means X seconds ago. Empty means latest available (usually 'now'). -/// "end": 1234567 -/// } -/// ~~~~~~~~~~~~~~~ -/// OR -/// ~~~~~~~~~~~~~~~{.js} -/// [ -/// {},//request object as above -/// {}//repeat the structure as many times as wanted -/// ] -/// ~~~~~~~~~~~~~~~ -/// and are responded to as: -/// ~~~~~~~~~~~~~~~{.js} -/// { -/// //unix timestamp of start of data. Always present, always absolute. -/// "start": 1234567, -/// //unix timestamp of end of data. Always present, always absolute. -/// "end": 1234567, -/// //array of actually represented data fields. -/// "fields": [...] -/// // Time between datapoints. Here: 10 points with each 5 seconds afterwards, followed by 10 points with each 1 second afterwards. -/// "interval": [[10, 5], [10, 1]], -/// //the data for the times as mentioned in the "interval" field, in the order they appear in the "fields" field. -/// "data": [[x, y, z], [x, y, z], [x, y, z]] -/// } -/// ~~~~~~~~~~~~~~~ -/// In case of the second method, the response is an array in the same order as the requests. void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ tthread::lock_guard guard(statsMutex); //first, figure out the timestamps wanted @@ -863,6 +949,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ if (req.isMember("fields") && req["fields"].size()){ jsonForEach(req["fields"], it) { if ((*it).asStringRef() == "clients"){fields |= STAT_TOT_CLIENTS;} + if ((*it).asStringRef() == "inputs"){fields |= STAT_TOT_INPUTS;} + if ((*it).asStringRef() == "outputs"){fields |= STAT_TOT_OUTPUTS;} if ((*it).asStringRef() == "downbps"){fields |= STAT_TOT_BPS_DOWN;} if ((*it).asStringRef() == "upbps"){fields |= STAT_TOT_BPS_UP;} } @@ -886,6 +974,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ //output the selected fields rep["fields"].null(); if (fields & STAT_TOT_CLIENTS){rep["fields"].append("clients");} + if (fields & STAT_TOT_INPUTS){rep["fields"].append("inputs");} + if (fields & STAT_TOT_OUTPUTS){rep["fields"].append("outputs");} if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");} if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");} //start data collection @@ -898,7 +988,7 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){ for (unsigned long long i = reqStart; i <= reqEnd; ++i){ if (it->second.hasDataFor(i)){ - totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i)); + totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i), it->second.getSessType()); } } } @@ -923,6 +1013,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ for (std::map::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){ JSON::Value d; if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);} + if (fields & STAT_TOT_INPUTS){d.append(it->second.inputs);} + if (fields & STAT_TOT_OUTPUTS){d.append(it->second.outputs);} if (fields & STAT_TOT_BPS_DOWN){d.append(it->second.downbps);} if (fields & STAT_TOT_BPS_UP){d.append(it->second.upbps);} rep["data"].append(d); diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index a5d90fdc..ebc3124d 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -71,6 +71,7 @@ namespace Controller { public: statSession(); std::map curConns; + sessType getSessType(); void wipeOld(unsigned long long); void finish(unsigned long index); void switchOverTo(statSession & newSess, unsigned long index); @@ -78,6 +79,7 @@ namespace Controller { void ping(const sessIndex & index, unsigned long long disconnectPoint); unsigned long long getStart(); unsigned long long getEnd(); + bool isViewerOn(unsigned long long time); bool hasDataFor(unsigned long long time); bool hasData(); long long getConnTime(unsigned long long time); @@ -100,6 +102,7 @@ namespace Controller { std::set getActiveStreams(const std::string & prefix = ""); void parseStatistics(char * data, size_t len, unsigned int id); void fillClients(JSON::Value & req, JSON::Value & rep); + void fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow = false); void fillTotals(JSON::Value & req, JSON::Value & rep); void SharedMemStats(void * config); bool hasViewers(std::string streamName); diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 565eb769..a81394a9 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -38,9 +38,39 @@ namespace Controller { ///\param name The name of the stream ///\param data The corresponding configuration values. void checkStream(std::string name, JSON::Value & data){ + if (!data.isMember("name")){data["name"] = name;} std::string prevState = data["error"].asStringRef(); data["online"] = (std::string)"Checking..."; data.removeMember("error"); + switch (Util::getStreamStatus(name)){ + case STRMSTAT_OFF: + //Do nothing + break; + case STRMSTAT_INIT: + data["online"] = 2; + data["error"] = "Initializing..."; + return; + case STRMSTAT_BOOT: + data["online"] = 2; + data["error"] = "Loading..."; + return; + case STRMSTAT_WAIT: + data["online"] = 2; + data["error"] = "Waiting for data..."; + return; + case STRMSTAT_READY: + data["online"] = 1; + return; + case STRMSTAT_SHUTDOWN: + data["online"] = 2; + data["error"] = "Shutting down..."; + return; + default: + //Unknown state? + data["error"] = "Unrecognized stream state"; + break; + } + data["online"] = 0; std::string URL; if (data.isMember("channel") && data["channel"].isMember("URL")){ URL = data["channel"]["URL"].asString(); @@ -48,44 +78,28 @@ namespace Controller { if (data.isMember("source")){ URL = data["source"].asString(); } - if (URL == ""){ + if (!URL.size()){ data["error"] = "Stream offline: Missing source parameter!"; if (data["error"].asStringRef() != prevState){ Log("STRM", "Error for stream " + name + "! Source parameter missing."); } return; } - if (URL.substr(0, 1) != "/"){ - //push-style stream - return; - } - if (URL.substr(0, 1) == "/"){ - //vod-style stream - data.removeMember("error"); - struct stat fileinfo; - if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){ - data["error"] = "Stream offline: Not found: " + URL; - if (data["error"].asStringRef() != prevState){ - Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL); - } - data["online"] = 0; - return; - } - if (!hasViewers(name)){ - if ( !data.isMember("error")){ - data["error"] = "Available"; - } - data["online"] = 2; - }else{ - data["online"] = 1; + //non-VoD stream + if (URL.substr(0, 1) != "/"){return;} + //VoD-style stream + struct stat fileinfo; + if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){ + data["error"] = "Stream offline: Not found: " + URL; + if (data["error"].asStringRef() != prevState){ + Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL); } return; } - //not recognized - data["error"] = "Invalid source format"; - if (data["error"].asStringRef() != prevState){ - Log("STRM", "Invalid source format for stream " + name + "!"); + if ( !data.isMember("error")){ + data["error"] = "Available"; } + data["online"] = 2; return; } @@ -96,24 +110,6 @@ namespace Controller { long long int currTime = Util::epoch(); jsonForEach(data, jit) { checkStream(jit.key(), (*jit)); - if (!jit->isMember("name")){ - (*jit)["name"] = jit.key(); - } - if (!hasViewers(jit.key())){ - if (jit->isMember("source") && (*jit)["source"].asString().substr(0, 1) == "/" && jit->isMember("error") - && (*jit)["error"].asString().substr(0,15) != "Stream offline:"){ - (*jit)["online"] = 2; - }else{ - if (jit->isMember("error") && (*jit)["error"].asString() == "Available"){ - jit->removeMember("error"); - } - (*jit)["online"] = 0; - } - }else{ - // assume all is fine - jit->removeMember("error"); - (*jit)["online"] = 1; - } } //check for changes in config or streams