Added ?streamstats=NAME to load balancer

This commit is contained in:
Thulinma 2020-05-14 02:05:40 +02:00
parent 49ee109b50
commit 0b6710e4eb

View file

@ -42,10 +42,12 @@ const char *stateLookup[] ={"Offline", "Starting monitoring",
"Requesting stop", "Requesting clean"};
struct streamDetails{
uint32_t total;
uint64_t total;
uint32_t inputs;
uint32_t bandwidth;
uint32_t prevTotal;
uint64_t prevTotal;
uint64_t bytesUp;
uint64_t bytesDown;
};
class outUrl{
@ -155,7 +157,7 @@ public:
r["score"]["bw"] = (uint64_t)(weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
}
}
/// Fills out a by reference given JSON::Value with current streams.
/// Fills out a by reference given JSON::Value with current streams viewer count.
void fillStreams(JSON::Value &r){
if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
@ -164,6 +166,26 @@ public:
r[jt->first] = r[jt->first].asInt() + jt->second.total;
}
}
/// Fills out a by reference given JSON::Value with current stream statistics.
void fillStreamStats(const std::string & s, JSON::Value &r){
if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
for (std::map<std::string, struct streamDetails>::iterator jt = streams.begin();
jt != streams.end(); ++jt){
const std::string & n = jt->first;
if (s != "*" && n != s && n.substr(0, s.size()+1) != s+"+"){continue;}
if (!r.isMember(n)){
r[n].append(jt->second.total);//viewers
r[n].append(jt->second.bandwidth);//bandwidth usage
r[n].append(jt->second.bytesUp);//total bytes up
r[n].append(jt->second.bytesDown);//total bytes down
}else{
r[n][0u] = r[n][0u].asInt() + jt->second.total;
r[n][2u] = r[n][2u].asInt() + jt->second.bytesUp;
r[n][3u] = r[n][3u].asInt() + jt->second.bytesDown;
}
}
}
/// Returns viewcount for the given stream
long long getViewers(const std::string &strm){
if (!hostMutex){hostMutex = new tthread::mutex();}
@ -304,7 +326,9 @@ public:
struct streamDetails &strm = streams[it.key()];
strm.total = (*it)["curr"][0u].asInt();
strm.inputs = (*it)["curr"][1u].asInt();
uint64_t currTotal = (*it)["bw"][0u].asInt() + (*it)["bw"][1u].asInt();
strm.bytesUp = (*it)["bw"][0u].asInt();
strm.bytesDown = (*it)["bw"][1u].asInt();
uint64_t currTotal = strm.bytesUp + strm.bytesDown;
if (timeDiff && count){
strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count;
}else{
@ -369,6 +393,7 @@ int handleRequest(Socket::Connection &conn){
}
std::string host = H.GetVar("host");
std::string viewers = H.GetVar("viewers");
std::string streamStats = H.GetVar("streamstats");
std::string stream = H.GetVar("stream");
std::string source = H.GetVar("source");
std::string fback = H.GetVar("fallback");
@ -479,6 +504,18 @@ int handleRequest(Socket::Connection &conn){
H.Clean();
continue;
}
// Request full stream statistics
if (streamStats.size()){
for (HOSTLOOP){
HOSTCHECK;
HOST(i).details->fillStreamStats(streamStats, ret);
}
H.SetBody(ret.toPrettyString());
H.setCORSHeaders();
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
if (stream.size()){
uint64_t count = 0;
for (HOSTLOOP){