Improved statistics and stream status

This commit is contained in:
Thulinma 2018-03-21 11:48:16 +01:00
parent c475eb8d8c
commit e4ac68db54
5 changed files with 186 additions and 88 deletions

View file

@ -97,6 +97,7 @@ static inline void show_stackframe(){}
#ifndef STATS_DELAY
#define STATS_DELAY 15
#endif
#define STATS_INPUT_DELAY 2
#ifndef INPUT_TIMEOUT
#define INPUT_TIMEOUT STATS_DELAY

View file

@ -600,6 +600,12 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response
Controller::fillTotals(Request["totals"], Response["totals"]);
}
}
if (Request.isMember("active_streams")){
Controller::fillActive(Request["active_streams"], Response["active_streams"], true);
}
if (Request.isMember("stats_streams")){
Controller::fillActive(Request["stats_streams"], Response["stats_streams"]);
}
Controller::configChanged = true;
}

View file

@ -1,6 +1,8 @@
#include <cstdio>
#include <list>
#include <mist/config.h>
#include <mist/shared_memory.h>
#include <mist/dtsc.h>
#include <mist/stream.h>
#include "controller_statistics.h"
#include "controller_storage.h"
@ -21,6 +23,8 @@
#define STAT_TOT_CLIENTS 1
#define STAT_TOT_BPS_DOWN 2
#define STAT_TOT_BPS_UP 4
#define STAT_TOT_INPUTS 8
#define STAT_TOT_OUTPUTS 16
#define STAT_TOT_ALL 0xFF
#define COUNTABLE_BYTES 128*1024
@ -268,6 +272,10 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
}
}
Controller::sessType Controller::statSession::getSessType(){
return sessionType;
}
/// Archives the given connection.
void Controller::statSession::wipeOld(unsigned long long cutOff){
if (firstSec > cutOff){
@ -424,6 +432,11 @@ bool Controller::statSession::hasData(){
return false;
}
/// Returns true if this session should count as a viewer on the given timestamp.
bool Controller::statSession::isViewerOn(unsigned long long t){
return getUp(t) + getDown(t) > COUNTABLE_BYTES;
}
/// Returns the cumulative connected time for this session at timestamp t.
long long Controller::statSession::getConnTime(unsigned long long t){
long long retVal = 0;
@ -777,64 +790,137 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
//all done! return is by reference, so no need to return anything here.
}
/// This takes a "active_streams" request, and fills in the response data.
///
/// \api
/// `"active_streams"` and `"stats_streams"` requests may either be empty, in which case the response looks like this:
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// //Array of stream names
/// "streamA",
/// "streamB",
/// "streamC"
/// ]
/// ~~~~~~~~~~~~~~~
/// `"stats_streams"` will list all streams that any statistics data is available for, and only those. `"active_streams"` only lists streams that are currently active, and only those.
/// If the request is an array, which may contain any of the following elements:
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// //Array of requested data types
/// "clients", //Current viewer count
/// "lastms" //Current position in the live buffer, if live
/// ]
/// ~~~~~~~~~~~~~~~
/// In which case the response is changed into this format:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //Object of stream names, containing arrays in the same order as the request, with the same data
/// "streamA":[
/// 0,
/// 60000
/// ]
/// "streamB":[
/// //....
/// ]
/// //...
/// }
/// ~~~~~~~~~~~~~~~
/// All streams that any statistics data is available for are listed, and only those streams.
void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){
//collect the data first
std::set<std::string> streams;
std::map<std::string, unsigned long> clients;
unsigned int tOut = Util::epoch() - STATS_DELAY;
unsigned int tIn = Util::epoch() - STATS_INPUT_DELAY;
//check all sessions
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.getSessType() == SESS_INPUT){
if (!onlyNow || (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn))){
streams.insert(it->first.streamName);
}
}else{
if (!onlyNow || (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut))){
streams.insert(it->first.streamName);
if (it->second.getSessType() == SESS_VIEWER){
clients[it->first.streamName]++;
}
}
}
}
}
}
//Good, now output what we found...
rep.null();
for (std::set<std::string>::iterator it = streams.begin(); it != streams.end(); it++){
if (req.isArray()){
rep[*it].null();
jsonForEach(req, j){
if (j->asStringRef() == "clients"){
rep[*it].append((long long)clients[*it]);
}
if (j->asStringRef() == "lastms"){
char pageId[NAME_BUFFER_SIZE];
IPC::sharedPage streamIndex;
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, it->c_str());
streamIndex.init(pageId, DEFAULT_STRM_PAGE_SIZE, false, false);
if (streamIndex.mapped){
static char liveSemName[NAME_BUFFER_SIZE];
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, it->c_str());
IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1);
metaLocker.wait();
DTSC::Scan strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan();
long long lms = 0;
DTSC::Scan trcks = strm.getMember("tracks");
unsigned int trcks_ctr = trcks.getSize();
for (unsigned int i = 0; i < trcks_ctr; ++i){
if (trcks.getIndice(i).getMember("lastms").asInt() > lms){
lms = trcks.getIndice(i).getMember("lastms").asInt();
}
}
rep[*it].append(lms);
metaLocker.post();
}else{
rep[*it].append(-1ll);
}
}
}
}else{
rep.append(*it);
}
}
//all done! return is by reference, so no need to return anything here.
}
class totalsData {
public:
totalsData(){
clients = 0;
inputs = 0;
outputs = 0;
downbps = 0;
upbps = 0;
}
void add(unsigned int down, unsigned int up){
void add(unsigned int down, unsigned int up, Controller::sessType sT){
switch (sT){
case Controller::SESS_VIEWER: clients++; break;
case Controller::SESS_INPUT: inputs++; break;
case Controller::SESS_OUTPUT: outputs++; break;
}
clients++;
downbps += down;
upbps += up;
}
long long clients;
long long inputs;
long long outputs;
long long downbps;
long long upbps;
};
/// This takes a "totals" request, and fills in the response data.
///
/// \api
/// `"totals"` requests take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //array of streamnames to accumulate. Empty means all.
/// "streams": ["streama", "streamb", "streamc"],
/// //array of protocols to accumulate. Empty means all.
/// "protocols": ["HLS", "HSS"],
/// //list of requested data fields. Empty means all.
/// "fields": ["clients", "downbps", "upbps"],
/// //unix timestamp of data start. Negative means X seconds ago. Empty means earliest available.
/// "start": 1234567
/// //unix timestamp of data end. Negative means X seconds ago. Empty means latest available (usually 'now').
/// "end": 1234567
/// }
/// ~~~~~~~~~~~~~~~
/// OR
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// {},//request object as above
/// {}//repeat the structure as many times as wanted
/// ]
/// ~~~~~~~~~~~~~~~
/// and are responded to as:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //unix timestamp of start of data. Always present, always absolute.
/// "start": 1234567,
/// //unix timestamp of end of data. Always present, always absolute.
/// "end": 1234567,
/// //array of actually represented data fields.
/// "fields": [...]
/// // Time between datapoints. Here: 10 points with each 5 seconds afterwards, followed by 10 points with each 1 second afterwards.
/// "interval": [[10, 5], [10, 1]],
/// //the data for the times as mentioned in the "interval" field, in the order they appear in the "fields" field.
/// "data": [[x, y, z], [x, y, z], [x, y, z]]
/// }
/// ~~~~~~~~~~~~~~~
/// In case of the second method, the response is an array in the same order as the requests.
void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//first, figure out the timestamps wanted
@ -863,6 +949,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
if (req.isMember("fields") && req["fields"].size()){
jsonForEach(req["fields"], it) {
if ((*it).asStringRef() == "clients"){fields |= STAT_TOT_CLIENTS;}
if ((*it).asStringRef() == "inputs"){fields |= STAT_TOT_INPUTS;}
if ((*it).asStringRef() == "outputs"){fields |= STAT_TOT_OUTPUTS;}
if ((*it).asStringRef() == "downbps"){fields |= STAT_TOT_BPS_DOWN;}
if ((*it).asStringRef() == "upbps"){fields |= STAT_TOT_BPS_UP;}
}
@ -886,6 +974,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
//output the selected fields
rep["fields"].null();
if (fields & STAT_TOT_CLIENTS){rep["fields"].append("clients");}
if (fields & STAT_TOT_INPUTS){rep["fields"].append("inputs");}
if (fields & STAT_TOT_OUTPUTS){rep["fields"].append("outputs");}
if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");}
if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");}
//start data collection
@ -898,7 +988,7 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){
for (unsigned long long i = reqStart; i <= reqEnd; ++i){
if (it->second.hasDataFor(i)){
totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i));
totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i), it->second.getSessType());
}
}
}
@ -923,6 +1013,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
for (std::map<long long unsigned int, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
JSON::Value d;
if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);}
if (fields & STAT_TOT_INPUTS){d.append(it->second.inputs);}
if (fields & STAT_TOT_OUTPUTS){d.append(it->second.outputs);}
if (fields & STAT_TOT_BPS_DOWN){d.append(it->second.downbps);}
if (fields & STAT_TOT_BPS_UP){d.append(it->second.upbps);}
rep["data"].append(d);

View file

@ -71,6 +71,7 @@ namespace Controller {
public:
statSession();
std::map<unsigned long, statStorage> curConns;
sessType getSessType();
void wipeOld(unsigned long long);
void finish(unsigned long index);
void switchOverTo(statSession & newSess, unsigned long index);
@ -78,6 +79,7 @@ namespace Controller {
void ping(const sessIndex & index, unsigned long long disconnectPoint);
unsigned long long getStart();
unsigned long long getEnd();
bool isViewerOn(unsigned long long time);
bool hasDataFor(unsigned long long time);
bool hasData();
long long getConnTime(unsigned long long time);
@ -100,6 +102,7 @@ namespace Controller {
std::set<std::string> getActiveStreams(const std::string & prefix = "");
void parseStatistics(char * data, size_t len, unsigned int id);
void fillClients(JSON::Value & req, JSON::Value & rep);
void fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow = false);
void fillTotals(JSON::Value & req, JSON::Value & rep);
void SharedMemStats(void * config);
bool hasViewers(std::string streamName);

View file

@ -38,9 +38,39 @@ namespace Controller {
///\param name The name of the stream
///\param data The corresponding configuration values.
void checkStream(std::string name, JSON::Value & data){
if (!data.isMember("name")){data["name"] = name;}
std::string prevState = data["error"].asStringRef();
data["online"] = (std::string)"Checking...";
data.removeMember("error");
switch (Util::getStreamStatus(name)){
case STRMSTAT_OFF:
//Do nothing
break;
case STRMSTAT_INIT:
data["online"] = 2;
data["error"] = "Initializing...";
return;
case STRMSTAT_BOOT:
data["online"] = 2;
data["error"] = "Loading...";
return;
case STRMSTAT_WAIT:
data["online"] = 2;
data["error"] = "Waiting for data...";
return;
case STRMSTAT_READY:
data["online"] = 1;
return;
case STRMSTAT_SHUTDOWN:
data["online"] = 2;
data["error"] = "Shutting down...";
return;
default:
//Unknown state?
data["error"] = "Unrecognized stream state";
break;
}
data["online"] = 0;
std::string URL;
if (data.isMember("channel") && data["channel"].isMember("URL")){
URL = data["channel"]["URL"].asString();
@ -48,44 +78,28 @@ namespace Controller {
if (data.isMember("source")){
URL = data["source"].asString();
}
if (URL == ""){
if (!URL.size()){
data["error"] = "Stream offline: Missing source parameter!";
if (data["error"].asStringRef() != prevState){
Log("STRM", "Error for stream " + name + "! Source parameter missing.");
}
return;
}
if (URL.substr(0, 1) != "/"){
//push-style stream
return;
}
if (URL.substr(0, 1) == "/"){
//vod-style stream
data.removeMember("error");
//non-VoD stream
if (URL.substr(0, 1) != "/"){return;}
//VoD-style stream
struct stat fileinfo;
if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){
data["error"] = "Stream offline: Not found: " + URL;
if (data["error"].asStringRef() != prevState){
Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL);
}
data["online"] = 0;
return;
}
if (!hasViewers(name)){
if ( !data.isMember("error")){
data["error"] = "Available";
}
data["online"] = 2;
}else{
data["online"] = 1;
}
return;
}
//not recognized
data["error"] = "Invalid source format";
if (data["error"].asStringRef() != prevState){
Log("STRM", "Invalid source format for stream " + name + "!");
}
return;
}
@ -96,24 +110,6 @@ namespace Controller {
long long int currTime = Util::epoch();
jsonForEach(data, jit) {
checkStream(jit.key(), (*jit));
if (!jit->isMember("name")){
(*jit)["name"] = jit.key();
}
if (!hasViewers(jit.key())){
if (jit->isMember("source") && (*jit)["source"].asString().substr(0, 1) == "/" && jit->isMember("error")
&& (*jit)["error"].asString().substr(0,15) != "Stream offline:"){
(*jit)["online"] = 2;
}else{
if (jit->isMember("error") && (*jit)["error"].asString() == "Available"){
jit->removeMember("error");
}
(*jit)["online"] = 0;
}
}else{
// assume all is fine
jit->removeMember("error");
(*jit)["online"] = 1;
}
}
//check for changes in config or streams