diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index a96f193e..f7e15622 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -1028,10 +1028,10 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ } } if (Request.isMember("active_streams")){ - Controller::fillActive(Request["active_streams"], Response["active_streams"], true); + Controller::fillActive(Request["active_streams"], Response["active_streams"]); } if (Request.isMember("stats_streams")){ - Controller::fillActive(Request["stats_streams"], Response["stats_streams"]); + Controller::fillHasStats(Request["stats_streams"], Response["stats_streams"]); } if (Request.isMember("api_endpoint")){ @@ -1067,7 +1067,7 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ if (Request.isMember("no_unconfigured_streams")){ JSON::Value emptyRequest; JSON::Value currStreams; - Controller::fillActive(emptyRequest, currStreams, true); + Controller::fillActive(emptyRequest, currStreams); jsonForEach(currStreams, strm){ std::string S = strm->asStringRef(); //Remove wildcard, if any diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 859bd44d..0ab6fcf6 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -1455,26 +1455,20 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ ///} /// ~~~~~~~~~~~~~~~ /// All streams that any statistics data is available for are listed, and only those streams. -void Controller::fillActive(JSON::Value &req, JSON::Value &rep, bool onlyNow){ +void Controller::fillHasStats(JSON::Value &req, JSON::Value &rep){ // collect the data first std::set streams; std::map clients; - uint64_t tOut = Util::bootSecs() - STATS_DELAY; - uint64_t tIn = Util::bootSecs() - STATS_INPUT_DELAY; // check all sessions { tthread::lock_guard guard(statsMutex); if (sessions.size()){ for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.getSessType() == SESS_INPUT){ - if (!onlyNow || (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn))){ - streams.insert(it->first.streamName); - } + streams.insert(it->first.streamName); }else{ - if (!onlyNow || (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut))){ - streams.insert(it->first.streamName); - if (it->second.getSessType() == SESS_VIEWER){clients[it->first.streamName]++;} - } + streams.insert(it->first.streamName); + if (it->second.getSessType() == SESS_VIEWER){clients[it->first.streamName]++;} } } } @@ -1507,6 +1501,152 @@ void Controller::fillActive(JSON::Value &req, JSON::Value &rep, bool onlyNow){ // all done! return is by reference, so no need to return anything here. } +void Controller::fillActive(JSON::Value &req, JSON::Value &rep){ + //check what values we wanted to receive + JSON::Value fields; + JSON::Value streams; + bool objMode = false; + bool longForm = false; + if (req.isArray()){ + fields = req; + }else if (req.isObject()){ + objMode = true; + if (req.isMember("fields") && req["fields"].isArray()){ + fields = req["fields"]; + } + if (req.isMember("streams") && req["streams"].isArray()){ + streams = req["streams"]; + } + if (req.isMember("streams") && req["streams"].isString()){ + streams.append(req["streams"]); + } + if (req.isMember("stream") && req["stream"].isString()){ + streams.append(req["stream"]); + } + if (req.isMember("longform") && req["longform"].asBool()){ + longForm = true; + } + if (!fields.size()){ + fields.append("status"); + fields.append("viewers"); + fields.append("inputs"); + fields.append("outputs"); + fields.append("tracks"); + fields.append("views"); + fields.append("viewseconds"); + fields.append("upbytes"); + fields.append("downbytes"); + fields.append("packsent"); + fields.append("packloss"); + fields.append("packretrans"); + fields.append("firstms"); + fields.append("lastms"); + //fields.append("zerounix"); + fields.append("health"); + } + } + // collect the data first + rep.null(); + if (objMode && !longForm){ + rep["fields"] = fields; + } + DTSC::Meta M; + { + tthread::lock_guard guard(statsMutex); + for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ + //If specific streams were requested, match and skip non-matching + if (streams.size()){ + bool match = false; + jsonForEachConst(streams, s){ + if (!s->isString()){continue;} + if (s->asStringRef() == it->first || (*(s->asStringRef().rbegin()) == '+' && it->first.substr(0, s->asStringRef().size()) == s->asStringRef())){ + match = true; + break; + } + } + if (!match){continue;} + } + if (!fields.size()){ + rep.append(it->first); + continue; + } + JSON::Value & S = (objMode && !longForm) ? (rep["data"][it->first]) : (rep[it->first]); + S.null(); + jsonForEachConst(fields, j){ + JSON::Value & F = longForm ? (S[j->asStringRef()]) : (S.append()); + if (j->asStringRef() == "clients"){ + F = it->second.currViews+it->second.currIns+it->second.currOuts; + }else if (j->asStringRef() == "viewers"){ + F = it->second.currViews; + }else if (j->asStringRef() == "inputs"){ + F = it->second.currIns; + }else if (j->asStringRef() == "outputs"){ + F = it->second.currOuts; + }else if (j->asStringRef() == "views"){ + F = it->second.viewers; + }else if (j->asStringRef() == "viewseconds"){ + F = it->second.viewSeconds; + }else if (j->asStringRef() == "upbytes"){ + F = it->second.upBytes; + }else if (j->asStringRef() == "downbytes"){ + F = it->second.downBytes; + }else if (j->asStringRef() == "packsent"){ + F = it->second.packSent; + }else if (j->asStringRef() == "packloss"){ + F = it->second.packLoss; + }else if (j->asStringRef() == "packretrans"){ + F = it->second.packRetrans; + }else if (j->asStringRef() == "firstms"){ + if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);} + if (M){ + uint64_t fms = 0; + std::set validTracks = M.getValidTracks(); + for (std::set::iterator jt = validTracks.begin(); jt != validTracks.end(); jt++){ + if (M.getFirstms(*jt) < fms){fms = M.getFirstms(*jt);} + } + F = fms; + } + }else if (j->asStringRef() == "lastms"){ + if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);} + if (M){ + uint64_t lms = 0; + std::set validTracks = M.getValidTracks(); + for (std::set::iterator jt = validTracks.begin(); jt != validTracks.end(); jt++){ + if (M.getLastms(*jt) > lms){lms = M.getLastms(*jt);} + } + F = lms; + } + }else if (j->asStringRef() == "zerounix"){ + if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);} + if (M && M.getLive()){ + F = (M.getBootMsOffset() + (Util::unixMS() - Util::bootMS())) / 1000; + } + }else if (j->asStringRef() == "health"){ + if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);} + if (M){M.getHealthJSON(F);} + }else if (j->asStringRef() == "tracks"){ + if (!M || M.getStreamName() != it->first){M.reInit(it->first, false);} + if (M){ + F = M.getValidTracks().size(); + } + }else if (j->asStringRef() == "status"){ + uint8_t ss = Util::getStreamStatus(it->first); + switch (ss){ + case STRMSTAT_OFF: F = "Offline"; break; + case STRMSTAT_INIT: F = "Initializing"; break; + case STRMSTAT_BOOT: F = "Input booting"; break; + case STRMSTAT_WAIT: F = "Waiting for data"; break; + case STRMSTAT_READY: F = "Online"; break; + case STRMSTAT_SHUTDOWN: F = "Shutting down"; break; + default: F = "Invalid / Unknown"; break; + } + } + } + } + } + // all done! return is by reference, so no need to return anything here. +} + class totalsData{ public: totalsData(){ diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 0af4e1b6..1cc4a82d 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -143,7 +143,8 @@ namespace Controller{ std::set getActiveStreams(const std::string &prefix = ""); void killStatistics(char *data, size_t len, unsigned int id); void fillClients(JSON::Value &req, JSON::Value &rep); - void fillActive(JSON::Value &req, JSON::Value &rep, bool onlyNow = false); + void fillActive(JSON::Value &req, JSON::Value &rep); + void fillHasStats(JSON::Value &req, JSON::Value &rep); void fillTotals(JSON::Value &req, JSON::Value &rep); void SharedMemStats(void *config); void sessions_invalidate(const std::string &streamname);