Implemented new active_streams API, backwards compatible with old API

This commit is contained in:
Thulinma 2021-07-26 22:50:22 +02:00
parent 64ad0ad4a9
commit bf2ce9a422
3 changed files with 155 additions and 14 deletions

View file

@ -1028,10 +1028,10 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
}
}
if (Request.isMember("active_streams")){
Controller::fillActive(Request["active_streams"], Response["active_streams"], true);
Controller::fillActive(Request["active_streams"], Response["active_streams"]);
}
if (Request.isMember("stats_streams")){
Controller::fillActive(Request["stats_streams"], Response["stats_streams"]);
Controller::fillHasStats(Request["stats_streams"], Response["stats_streams"]);
}
if (Request.isMember("api_endpoint")){
@ -1067,7 +1067,7 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
if (Request.isMember("no_unconfigured_streams")){
JSON::Value emptyRequest;
JSON::Value currStreams;
Controller::fillActive(emptyRequest, currStreams, true);
Controller::fillActive(emptyRequest, currStreams);
jsonForEach(currStreams, strm){
std::string S = strm->asStringRef();
//Remove wildcard, if any

View file

@ -1455,26 +1455,20 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){
///}
/// ~~~~~~~~~~~~~~~
/// All streams that any statistics data is available for are listed, and only those streams.
void Controller::fillActive(JSON::Value &req, JSON::Value &rep, bool onlyNow){
void Controller::fillHasStats(JSON::Value &req, JSON::Value &rep){
// collect the data first
std::set<std::string> streams;
std::map<std::string, uint64_t> clients;
uint64_t tOut = Util::bootSecs() - STATS_DELAY;
uint64_t tIn = Util::bootSecs() - STATS_INPUT_DELAY;
// check all sessions
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.getSessType() == SESS_INPUT){
if (!onlyNow || (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn))){
streams.insert(it->first.streamName);
}
streams.insert(it->first.streamName);
}else{
if (!onlyNow || (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut))){
streams.insert(it->first.streamName);
if (it->second.getSessType() == SESS_VIEWER){clients[it->first.streamName]++;}
}
streams.insert(it->first.streamName);
if (it->second.getSessType() == SESS_VIEWER){clients[it->first.streamName]++;}
}
}
}
@ -1507,6 +1501,152 @@ void Controller::fillActive(JSON::Value &req, JSON::Value &rep, bool onlyNow){
// all done! return is by reference, so no need to return anything here.
}
void Controller::fillActive(JSON::Value &req, JSON::Value &rep){
//check what values we wanted to receive
JSON::Value fields;
JSON::Value streams;
bool objMode = false;
bool longForm = false;
if (req.isArray()){
fields = req;
}else if (req.isObject()){
objMode = true;
if (req.isMember("fields") && req["fields"].isArray()){
fields = req["fields"];
}
if (req.isMember("streams") && req["streams"].isArray()){
streams = req["streams"];
}
if (req.isMember("streams") && req["streams"].isString()){
streams.append(req["streams"]);
}
if (req.isMember("stream") && req["stream"].isString()){
streams.append(req["stream"]);
}
if (req.isMember("longform") && req["longform"].asBool()){
longForm = true;
}
if (!fields.size()){
fields.append("status");
fields.append("viewers");
fields.append("inputs");
fields.append("outputs");
fields.append("tracks");
fields.append("views");
fields.append("viewseconds");
fields.append("upbytes");
fields.append("downbytes");
fields.append("packsent");
fields.append("packloss");
fields.append("packretrans");
fields.append("firstms");
fields.append("lastms");
//fields.append("zerounix");
fields.append("health");
}
}
// collect the data first
rep.null();
if (objMode && !longForm){
rep["fields"] = fields;
}
DTSC::Meta M;
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
//If specific streams were requested, match and skip non-matching
if (streams.size()){
bool match = false;
jsonForEachConst(streams, s){
if (!s->isString()){continue;}
if (s->asStringRef() == it->first || (*(s->asStringRef().rbegin()) == '+' && it->first.substr(0, s->asStringRef().size()) == s->asStringRef())){
match = true;
break;
}
}
if (!match){continue;}
}
if (!fields.size()){
rep.append(it->first);
continue;
}
JSON::Value & S = (objMode && !longForm) ? (rep["data"][it->first]) : (rep[it->first]);
S.null();
jsonForEachConst(fields, j){
JSON::Value & F = longForm ? (S[j->asStringRef()]) : (S.append());
if (j->asStringRef() == "clients"){
F = it->second.currViews+it->second.currIns+it->second.currOuts;
}else if (j->asStringRef() == "viewers"){
F = it->second.currViews;
}else if (j->asStringRef() == "inputs"){
F = it->second.currIns;
}else if (j->asStringRef() == "outputs"){
F = it->second.currOuts;
}else if (j->asStringRef() == "views"){
F = it->second.viewers;
}else if (j->asStringRef() == "viewseconds"){
F = it->second.viewSeconds;
}else if (j->asStringRef() == "upbytes"){
F = it->second.upBytes;
}else if (j->asStringRef() == "downbytes"){
F = it->second.downBytes;
}else if (j->asStringRef() == "packsent"){
F = it->second.packSent;
}else if (j->asStringRef() == "packloss"){
F = it->second.packLoss;
}else if (j->asStringRef() == "packretrans"){
F = it->second.packRetrans;
}else if (j->asStringRef() == "firstms"){
if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);}
if (M){
uint64_t fms = 0;
std::set<size_t> validTracks = M.getValidTracks();
for (std::set<size_t>::iterator jt = validTracks.begin(); jt != validTracks.end(); jt++){
if (M.getFirstms(*jt) < fms){fms = M.getFirstms(*jt);}
}
F = fms;
}
}else if (j->asStringRef() == "lastms"){
if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);}
if (M){
uint64_t lms = 0;
std::set<size_t> validTracks = M.getValidTracks();
for (std::set<size_t>::iterator jt = validTracks.begin(); jt != validTracks.end(); jt++){
if (M.getLastms(*jt) > lms){lms = M.getLastms(*jt);}
}
F = lms;
}
}else if (j->asStringRef() == "zerounix"){
if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);}
if (M && M.getLive()){
F = (M.getBootMsOffset() + (Util::unixMS() - Util::bootMS())) / 1000;
}
}else if (j->asStringRef() == "health"){
if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);}
if (M){M.getHealthJSON(F);}
}else if (j->asStringRef() == "tracks"){
if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);}
if (M){
F = M.getValidTracks().size();
}
}else if (j->asStringRef() == "status"){
uint8_t ss = Util::getStreamStatus(it->first);
switch (ss){
case STRMSTAT_OFF: F = "Offline"; break;
case STRMSTAT_INIT: F = "Initializing"; break;
case STRMSTAT_BOOT: F = "Input booting"; break;
case STRMSTAT_WAIT: F = "Waiting for data"; break;
case STRMSTAT_READY: F = "Online"; break;
case STRMSTAT_SHUTDOWN: F = "Shutting down"; break;
default: F = "Invalid / Unknown"; break;
}
}
}
}
}
// all done! return is by reference, so no need to return anything here.
}
class totalsData{
public:
totalsData(){

View file

@ -143,7 +143,8 @@ namespace Controller{
std::set<std::string> getActiveStreams(const std::string &prefix = "");
void killStatistics(char *data, size_t len, unsigned int id);
void fillClients(JSON::Value &req, JSON::Value &rep);
void fillActive(JSON::Value &req, JSON::Value &rep, bool onlyNow = false);
void fillActive(JSON::Value &req, JSON::Value &rep);
void fillHasStats(JSON::Value &req, JSON::Value &rep);
void fillTotals(JSON::Value &req, JSON::Value &rep);
void SharedMemStats(void *config);
void sessions_invalidate(const std::string &streamname);