Merge branch 'newStatus' into Pro_newStatus

# Conflicts:
#	src/controller/controller_api.cpp
#	src/controller/controller_statistics.cpp
#	src/controller/controller_statistics.h
#	src/controller/controller_streams.cpp
This commit is contained in:
Thulinma 2018-03-21 14:50:04 +01:00
commit 62ae2929a3
3 changed files with 85 additions and 119 deletions

View file

@ -33,6 +33,8 @@
#define STAT_TOT_CLIENTS 1
#define STAT_TOT_BPS_DOWN 2
#define STAT_TOT_BPS_UP 4
#define STAT_TOT_INPUTS 8
#define STAT_TOT_OUTPUTS 16
#define STAT_TOT_ALL 0xFF
#define COUNTABLE_BYTES 128*1024
@ -1333,60 +1335,29 @@ class totalsData {
public:
totalsData(){
clients = 0;
inputs = 0;
outputs = 0;
downbps = 0;
upbps = 0;
}
void add(unsigned int down, unsigned int up){
void add(unsigned int down, unsigned int up, Controller::sessType sT){
switch (sT){
case Controller::SESS_VIEWER: clients++; break;
case Controller::SESS_INPUT: inputs++; break;
case Controller::SESS_OUTPUT: outputs++; break;
}
clients++;
downbps += down;
upbps += up;
}
long long clients;
long long inputs;
long long outputs;
long long downbps;
long long upbps;
};
/// This takes a "totals" request, and fills in the response data.
///
/// \api
/// `"totals"` requests take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //array of streamnames to accumulate. Empty means all.
/// "streams": ["streama", "streamb", "streamc"],
/// //array of protocols to accumulate. Empty means all.
/// "protocols": ["HLS", "HSS"],
/// //list of requested data fields. Empty means all.
/// "fields": ["clients", "downbps", "upbps"],
/// //unix timestamp of data start. Negative means X seconds ago. Empty means earliest available.
/// "start": 1234567
/// //unix timestamp of data end. Negative means X seconds ago. Empty means latest available (usually 'now').
/// "end": 1234567
/// }
/// ~~~~~~~~~~~~~~~
/// OR
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// {},//request object as above
/// {}//repeat the structure as many times as wanted
/// ]
/// ~~~~~~~~~~~~~~~
/// and are responded to as:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //unix timestamp of start of data. Always present, always absolute.
/// "start": 1234567,
/// //unix timestamp of end of data. Always present, always absolute.
/// "end": 1234567,
/// //array of actually represented data fields.
/// "fields": [...]
/// // Time between datapoints. Here: 10 points with each 5 seconds afterwards, followed by 10 points with each 1 second afterwards.
/// "interval": [[10, 5], [10, 1]],
/// //the data for the times as mentioned in the "interval" field, in the order they appear in the "fields" field.
/// "data": [[x, y, z], [x, y, z], [x, y, z]]
/// }
/// ~~~~~~~~~~~~~~~
/// In case of the second method, the response is an array in the same order as the requests.
void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//first, figure out the timestamps wanted
@ -1415,6 +1386,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
if (req.isMember("fields") && req["fields"].size()){
jsonForEach(req["fields"], it) {
if ((*it).asStringRef() == "clients"){fields |= STAT_TOT_CLIENTS;}
if ((*it).asStringRef() == "inputs"){fields |= STAT_TOT_INPUTS;}
if ((*it).asStringRef() == "outputs"){fields |= STAT_TOT_OUTPUTS;}
if ((*it).asStringRef() == "downbps"){fields |= STAT_TOT_BPS_DOWN;}
if ((*it).asStringRef() == "upbps"){fields |= STAT_TOT_BPS_UP;}
}
@ -1438,6 +1411,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
//output the selected fields
rep["fields"].null();
if (fields & STAT_TOT_CLIENTS){rep["fields"].append("clients");}
if (fields & STAT_TOT_INPUTS){rep["fields"].append("inputs");}
if (fields & STAT_TOT_OUTPUTS){rep["fields"].append("outputs");}
if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");}
if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");}
//start data collection
@ -1450,7 +1425,7 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){
for (unsigned long long i = reqStart; i <= reqEnd; ++i){
if (it->second.hasDataFor(i)){
totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i));
totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i), it->second.getSessType());
}
}
}
@ -1475,6 +1450,8 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
for (std::map<long long unsigned int, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
JSON::Value d;
if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);}
if (fields & STAT_TOT_INPUTS){d.append(it->second.inputs);}
if (fields & STAT_TOT_OUTPUTS){d.append(it->second.outputs);}
if (fields & STAT_TOT_BPS_DOWN){d.append(it->second.downbps);}
if (fields & STAT_TOT_BPS_UP){d.append(it->second.upbps);}
rep["data"].append(d);

View file

@ -18,8 +18,6 @@ namespace Controller {
extern bool killOnExit;
extern unsigned int maxConnsPerIP;
//These keep track of which streams are currently active.
extern std::map<std::string, uint8_t> activeStreams;
///This function is ran whenever a stream becomes active.
void streamStarted(std::string stream);
///This function is ran whenever a stream becomes inactive.

View file

@ -41,9 +41,39 @@ namespace Controller {
///\param name The name of the stream
///\param data The corresponding configuration values.
void checkStream(std::string name, JSON::Value & data){
if (!data.isMember("name")){data["name"] = name;}
std::string prevState = data["error"].asStringRef();
data["online"] = (std::string)"Checking...";
data.removeMember("error");
switch (Util::getStreamStatus(name)){
case STRMSTAT_OFF:
//Do nothing
break;
case STRMSTAT_INIT:
data["online"] = 2;
data["error"] = "Initializing...";
return;
case STRMSTAT_BOOT:
data["online"] = 2;
data["error"] = "Loading...";
return;
case STRMSTAT_WAIT:
data["online"] = 2;
data["error"] = "Waiting for data...";
return;
case STRMSTAT_READY:
data["online"] = 1;
return;
case STRMSTAT_SHUTDOWN:
data["online"] = 2;
data["error"] = "Shutting down...";
return;
default:
//Unknown state?
data["error"] = "Unrecognized stream state";
break;
}
data["online"] = 0;
std::string URL;
if (data.isMember("channel") && data["channel"].isMember("URL")){
URL = data["channel"]["URL"].asString();
@ -51,15 +81,13 @@ namespace Controller {
if (data.isMember("source")){
URL = data["source"].asString();
}
if (URL == ""){
if (!URL.size()){
data["error"] = "Stream offline: Missing source parameter!";
if (data["error"].asStringRef() != prevState){
Log("STRM", "Error for stream " + name + "! Source parameter missing.");
}
return;
}
if (URL.substr(0, 1) != "/"){
//non-file stream
//Old style always on
if (data.isMember("udpport") && data["udpport"].asStringRef().size() && (!inputProcesses.count(name) || !Util::Procs::isRunning(inputProcesses[name]))){
const std::string & udpPort = data["udpport"].asStringRef();
@ -82,45 +110,27 @@ namespace Controller {
if (program){
inputProcesses[name] = program;
}
return;
}
//new style always on
if (data.isMember("always_on") && Util::getStreamStatus(name) == STRMSTAT_OFF){
if (data.isMember("always_on")){
INFO_MSG("Starting always-on input %s: %s", name.c_str(), URL.c_str());
Util::startInput(name, URL);
return;
}
//non-automatics simply return
return;
}
if (URL.substr(0, 1) == "/"){
//vod-style stream
data.removeMember("error");
//non-VoD stream
if (URL.substr(0, 1) != "/"){return;}
//VoD-style stream
struct stat fileinfo;
if (stat(URL.c_str(), &fileinfo) != 0){
data["error"] = "Stream offline: Not found: " + URL;
if (data["error"].asStringRef() != prevState){
Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL);
}
data["online"] = 0;
return;
}
if (!hasViewers(name)){
if ( !data.isMember("error")){
data["error"] = "Available";
}
data["online"] = 2;
}else{
data["online"] = 1;
}
checkServerLimits(); /*LTS*/
return;
}
//not recognized
data["error"] = "Invalid source format";
if (data["error"].asStringRef() != prevState){
Log("STRM", "Invalid source format for stream " + name + "!");
}
return;
}
@ -131,25 +141,6 @@ namespace Controller {
long long int currTime = Util::epoch();
jsonForEach(data, jit) {
checkStream(jit.key(), (*jit));
if (!jit->isMember("name")){
(*jit)["name"] = jit.key();
}
if (!hasViewers(jit.key())){
if (jit->isMember("source") && (*jit)["source"].asString().substr(0, 1) == "/" && jit->isMember("error")
&& (*jit)["error"].asString().substr(0,15) != "Stream offline:"){
(*jit)["online"] = 2;
}else{
if (jit->isMember("error") && (*jit)["error"].asString() == "Available"){
jit->removeMember("error");
}
(*jit)["online"] = 0;
}
checkServerLimits(); /*LTS*/
}else{
// assume all is fine
jit->removeMember("error");
(*jit)["online"] = 1;
}
}
//check for changes in config or streams