diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index d85f3ca5..30d60ebe 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -33,6 +33,8 @@ #define STAT_TOT_CLIENTS 1 #define STAT_TOT_BPS_DOWN 2 #define STAT_TOT_BPS_UP 4 +#define STAT_TOT_INPUTS 8 +#define STAT_TOT_OUTPUTS 16 #define STAT_TOT_ALL 0xFF #define COUNTABLE_BYTES 128*1024 @@ -1333,60 +1335,29 @@ class totalsData { public: totalsData(){ clients = 0; + inputs = 0; + outputs = 0; downbps = 0; upbps = 0; } - void add(unsigned int down, unsigned int up){ + void add(unsigned int down, unsigned int up, Controller::sessType sT){ + switch (sT){ + case Controller::SESS_VIEWER: clients++; break; + case Controller::SESS_INPUT: inputs++; break; + case Controller::SESS_OUTPUT: outputs++; break; + } clients++; downbps += down; upbps += up; } long long clients; + long long inputs; + long long outputs; long long downbps; long long upbps; }; /// This takes a "totals" request, and fills in the response data. -/// -/// \api -/// `"totals"` requests take the form of: -/// ~~~~~~~~~~~~~~~{.js} -/// { -/// //array of streamnames to accumulate. Empty means all. -/// "streams": ["streama", "streamb", "streamc"], -/// //array of protocols to accumulate. Empty means all. -/// "protocols": ["HLS", "HSS"], -/// //list of requested data fields. Empty means all. -/// "fields": ["clients", "downbps", "upbps"], -/// //unix timestamp of data start. Negative means X seconds ago. Empty means earliest available. -/// "start": 1234567 -/// //unix timestamp of data end. Negative means X seconds ago. Empty means latest available (usually 'now'). -/// "end": 1234567 -/// } -/// ~~~~~~~~~~~~~~~ -/// OR -/// ~~~~~~~~~~~~~~~{.js} -/// [ -/// {},//request object as above -/// {}//repeat the structure as many times as wanted -/// ] -/// ~~~~~~~~~~~~~~~ -/// and are responded to as: -/// ~~~~~~~~~~~~~~~{.js} -/// { -/// //unix timestamp of start of data. Always present, always absolute. -/// "start": 1234567, -/// //unix timestamp of end of data. Always present, always absolute. -/// "end": 1234567, -/// //array of actually represented data fields. -/// "fields": [...] -/// // Time between datapoints. Here: 10 points with each 5 seconds afterwards, followed by 10 points with each 1 second afterwards. -/// "interval": [[10, 5], [10, 1]], -/// //the data for the times as mentioned in the "interval" field, in the order they appear in the "fields" field. -/// "data": [[x, y, z], [x, y, z], [x, y, z]] -/// } -/// ~~~~~~~~~~~~~~~ -/// In case of the second method, the response is an array in the same order as the requests. void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ tthread::lock_guard guard(statsMutex); //first, figure out the timestamps wanted @@ -1415,6 +1386,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ if (req.isMember("fields") && req["fields"].size()){ jsonForEach(req["fields"], it) { if ((*it).asStringRef() == "clients"){fields |= STAT_TOT_CLIENTS;} + if ((*it).asStringRef() == "inputs"){fields |= STAT_TOT_INPUTS;} + if ((*it).asStringRef() == "outputs"){fields |= STAT_TOT_OUTPUTS;} if ((*it).asStringRef() == "downbps"){fields |= STAT_TOT_BPS_DOWN;} if ((*it).asStringRef() == "upbps"){fields |= STAT_TOT_BPS_UP;} } @@ -1438,6 +1411,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ //output the selected fields rep["fields"].null(); if (fields & STAT_TOT_CLIENTS){rep["fields"].append("clients");} + if (fields & STAT_TOT_INPUTS){rep["fields"].append("inputs");} + if (fields & STAT_TOT_OUTPUTS){rep["fields"].append("outputs");} if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");} if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");} //start data collection @@ -1450,7 +1425,7 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){ for (unsigned long long i = reqStart; i <= reqEnd; ++i){ if (it->second.hasDataFor(i)){ - totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i)); + totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i), it->second.getSessType()); } } } @@ -1475,6 +1450,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ for (std::map::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){ JSON::Value d; if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);} + if (fields & STAT_TOT_INPUTS){d.append(it->second.inputs);} + if (fields & STAT_TOT_OUTPUTS){d.append(it->second.outputs);} if (fields & STAT_TOT_BPS_DOWN){d.append(it->second.downbps);} if (fields & STAT_TOT_BPS_UP){d.append(it->second.upbps);} rep["data"].append(d); diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index e54e08e0..726051b9 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -18,8 +18,6 @@ namespace Controller { extern bool killOnExit; extern unsigned int maxConnsPerIP; - //These keep track of which streams are currently active. - extern std::map activeStreams; ///This function is ran whenever a stream becomes active. void streamStarted(std::string stream); ///This function is ran whenever a stream becomes inactive. diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index a0498cf3..3900e0ad 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -41,9 +41,39 @@ namespace Controller { ///\param name The name of the stream ///\param data The corresponding configuration values. void checkStream(std::string name, JSON::Value & data){ + if (!data.isMember("name")){data["name"] = name;} std::string prevState = data["error"].asStringRef(); data["online"] = (std::string)"Checking..."; data.removeMember("error"); + switch (Util::getStreamStatus(name)){ + case STRMSTAT_OFF: + //Do nothing + break; + case STRMSTAT_INIT: + data["online"] = 2; + data["error"] = "Initializing..."; + return; + case STRMSTAT_BOOT: + data["online"] = 2; + data["error"] = "Loading..."; + return; + case STRMSTAT_WAIT: + data["online"] = 2; + data["error"] = "Waiting for data..."; + return; + case STRMSTAT_READY: + data["online"] = 1; + return; + case STRMSTAT_SHUTDOWN: + data["online"] = 2; + data["error"] = "Shutting down..."; + return; + default: + //Unknown state? + data["error"] = "Unrecognized stream state"; + break; + } + data["online"] = 0; std::string URL; if (data.isMember("channel") && data["channel"].isMember("URL")){ URL = data["channel"]["URL"].asString(); @@ -51,76 +81,56 @@ namespace Controller { if (data.isMember("source")){ URL = data["source"].asString(); } - if (URL == ""){ + if (!URL.size()){ data["error"] = "Stream offline: Missing source parameter!"; if (data["error"].asStringRef() != prevState){ Log("STRM", "Error for stream " + name + "! Source parameter missing."); } return; } - if (URL.substr(0, 1) != "/"){ - //non-file stream - //Old style always on - if (data.isMember("udpport") && data["udpport"].asStringRef().size() && (!inputProcesses.count(name) || !Util::Procs::isRunning(inputProcesses[name]))){ - const std::string & udpPort = data["udpport"].asStringRef(); - const std::string & multicast = data["multicastinterface"].asStringRef(); - URL = "tsudp://"+udpPort; - if (multicast.size()){ - URL.append("/"+multicast); - } - // False: start TS input - INFO_MSG("No TS input for stream %s, starting it: %s", name.c_str(), URL.c_str()); - std::deque command; - command.push_back(Util::getMyPath() + "MistInTS"); - command.push_back("-s"); - command.push_back(name); - command.push_back(URL); - int stdIn = 0; - int stdOut = 1; - int stdErr = 2; - pid_t program = Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr); - if (program){ - inputProcesses[name] = program; - } - return; + //Old style always on + if (data.isMember("udpport") && data["udpport"].asStringRef().size() && (!inputProcesses.count(name) || !Util::Procs::isRunning(inputProcesses[name]))){ + const std::string & udpPort = data["udpport"].asStringRef(); + const std::string & multicast = data["multicastinterface"].asStringRef(); + URL = "tsudp://"+udpPort; + if (multicast.size()){ + URL.append("/"+multicast); } - //new style always on - if (data.isMember("always_on") && Util::getStreamStatus(name) == STRMSTAT_OFF){ - INFO_MSG("Starting always-on input %s: %s", name.c_str(), URL.c_str()); - Util::startInput(name, URL); - return; + // False: start TS input + INFO_MSG("No TS input for stream %s, starting it: %s", name.c_str(), URL.c_str()); + std::deque command; + command.push_back(Util::getMyPath() + "MistInTS"); + command.push_back("-s"); + command.push_back(name); + command.push_back(URL); + int stdIn = 0; + int stdOut = 1; + int stdErr = 2; + pid_t program = Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr); + if (program){ + inputProcesses[name] = program; } - //non-automatics simply return - return; } - if (URL.substr(0, 1) == "/"){ - //vod-style stream - data.removeMember("error"); - struct stat fileinfo; + //new style always on + if (data.isMember("always_on")){ + INFO_MSG("Starting always-on input %s: %s", name.c_str(), URL.c_str()); + Util::startInput(name, URL); + } + //non-VoD stream + if (URL.substr(0, 1) != "/"){return;} + //VoD-style stream + struct stat fileinfo; if (stat(URL.c_str(), &fileinfo) != 0){ - data["error"] = "Stream offline: Not found: " + URL; - if (data["error"].asStringRef() != prevState){ - Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL); - } - data["online"] = 0; - return; + data["error"] = "Stream offline: Not found: " + URL; + if (data["error"].asStringRef() != prevState){ + Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL); } - if (!hasViewers(name)){ - if ( !data.isMember("error")){ - data["error"] = "Available"; - } - data["online"] = 2; - }else{ - data["online"] = 1; - } - checkServerLimits(); /*LTS*/ return; } - //not recognized - data["error"] = "Invalid source format"; - if (data["error"].asStringRef() != prevState){ - Log("STRM", "Invalid source format for stream " + name + "!"); + if ( !data.isMember("error")){ + data["error"] = "Available"; } + data["online"] = 2; return; } @@ -131,25 +141,6 @@ namespace Controller { long long int currTime = Util::epoch(); jsonForEach(data, jit) { checkStream(jit.key(), (*jit)); - if (!jit->isMember("name")){ - (*jit)["name"] = jit.key(); - } - if (!hasViewers(jit.key())){ - if (jit->isMember("source") && (*jit)["source"].asString().substr(0, 1) == "/" && jit->isMember("error") - && (*jit)["error"].asString().substr(0,15) != "Stream offline:"){ - (*jit)["online"] = 2; - }else{ - if (jit->isMember("error") && (*jit)["error"].asString() == "Available"){ - jit->removeMember("error"); - } - (*jit)["online"] = 0; - } - checkServerLimits(); /*LTS*/ - }else{ - // assume all is fine - jit->removeMember("error"); - (*jit)["online"] = 1; - } } //check for changes in config or streams