From 55046206fed6b905b7ab803fe7f0df9987820dd2 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 5 Dec 2014 21:31:45 +0100 Subject: [PATCH] Fixed various statistics bugs. --- src/controller/controller_statistics.cpp | 106 +++++++++++++++++++++-- src/controller/controller_statistics.h | 5 ++ src/output/output.cpp | 2 +- 3 files changed, 104 insertions(+), 9 deletions(-) diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 9b62329b..b941aeda 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -23,6 +23,7 @@ std::map Controller::sessions; ///< list of sessions that have statistics data available std::map Controller::connToSession; ///< Map of socket IDs to session info. +tthread::mutex Controller::statsMutex; Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ host = dhost; @@ -86,10 +87,18 @@ void Controller::SharedMemStats(void * config){ DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); IPC::sharedServer statServer("statistics", STAT_EX_SIZE, true); while(((Util::Config*)config)->is_active){ - //parse current users - statServer.parseEach(parseStatistics); - //wipe old statistics - /// \todo Loop over all sessions and trigger erase function + { + tthread::lock_guard guard(statsMutex); + //parse current users + statServer.parseEach(parseStatistics); + //wipe old statistics + if (sessions.size()){ + unsigned long long cutOffPoint = Util::epoch() - STAT_CUTOFF; + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + it->second.wipeOld(cutOffPoint); + } + } + } Util::sleep(1000); } DEBUG_MSG(DLVL_HIGH, "Stopping stats thread"); @@ -102,32 +111,91 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da if (data.now() > lastSec){ lastSec = data.now(); } - //store timestamp of first received data, if not known yet or older - if (!firstSec || firstSec > data.now()){ + //store timestamp of first received data, if older + if (firstSec > data.now()){ firstSec = data.now(); } } +/// Archives the given connection. +void Controller::statSession::wipeOld(unsigned long long cutOff){ + if (firstSec > cutOff){ + return; + } + firstSec = 0xFFFFFFFFFFFFFFFFull; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + while (it->log.size() && it->log.begin()->first < cutOff){ + it->log.erase(it->log.begin()); + } + if (it->log.size()){ + if (firstSec > it->log.begin()->first){ + firstSec = it->log.begin()->first; + } + } + } + while (oldConns.size() && !oldConns.begin()->log.size()){ + oldConns.pop_front(); + } + } +} + /// Archives the given connection. void Controller::statSession::finish(unsigned long index){ oldConns.push_back(curConns[index]); curConns.erase(index); } +/// Constructs an empty session +Controller::statSession::statSession(){ + firstSec = 0xFFFFFFFFFFFFFFFFull; + lastSec = 0; +} + /// Moves the given connection to the given session void Controller::statSession::switchOverTo(statSession & newSess, unsigned long index){ + //add to the given session first newSess.curConns[index] = curConns[index]; //if this connection has data, update firstSec/lastSec if needed if (curConns[index].log.size()){ - if (!newSess.firstSec || newSess.firstSec > curConns[index].log.begin()->first){ + if (newSess.firstSec > curConns[index].log.begin()->first){ newSess.firstSec = curConns[index].log.begin()->first; } if (newSess.lastSec < curConns[index].log.rbegin()->first){ newSess.lastSec = curConns[index].log.rbegin()->first; } - /// \todo Correct local firstSec/lastSec - we may have just deleted either (or both) end(s) of the data for this session. } + //remove from current session curConns.erase(index); + //if there was any data, recalculate this session's firstSec and lastSec. + if (newSess.curConns[index].log.size()){ + firstSec = 0xFFFFFFFFFFFFFFFFull; + lastSec = 0; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){ + if (firstSec > it->log.begin()->first){ + firstSec = it->log.begin()->first; + } + if (lastSec < it->log.rbegin()->first){ + lastSec = it->log.rbegin()->first; + } + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){ + if (firstSec > it->second.log.begin()->first){ + firstSec = it->second.log.begin()->first; + } + if (lastSec < it->second.log.rbegin()->first){ + lastSec = it->second.log.rbegin()->first; + } + } + } + } + } } /// Returns the first measured timestamp in this session. @@ -157,6 +225,22 @@ bool Controller::statSession::hasDataFor(unsigned long long t){ return false; } +/// Returns true if there is any data for this session. +bool Controller::statSession::hasData(){ + if (!firstSec && !lastSec){return false;} + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){return true;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){return true;} + } + } + return false; +} + /// Returns the cumulative connected time for this session at timestamp t. long long Controller::statSession::getConnTime(unsigned long long t){ long long retVal = 0; @@ -317,6 +401,9 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ //if the connection was already indexed and it has changed, move it if (connToSession.count(id) && connToSession[id] != idx){ sessions[connToSession[id]].switchOverTo(sessions[idx], id); + if (!sessions[connToSession[id]].hasData()){ + sessions.erase(connToSession[id]); + } } //store the index for later comparison connToSession[id] = idx; @@ -327,6 +414,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ //the data is no longer valid - connection has gone away, store for later sessions[idx].finish(id); + connToSession.erase(id); } } @@ -378,6 +466,7 @@ bool Controller::hasViewers(std::string streamName){ /// ~~~~~~~~~~~~~~~ /// In case of the second method, the response is an array in the same order as the requests. void Controller::fillClients(JSON::Value & req, JSON::Value & rep){ + tthread::lock_guard guard(statsMutex); //first, figure out the timestamp wanted long long int reqTime = 0; if (req.isMember("time")){ @@ -523,6 +612,7 @@ class totalsData { /// ~~~~~~~~~~~~~~~ /// In case of the second method, the response is an array in the same order as the requests. void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ + tthread::lock_guard guard(statsMutex); //first, figure out the timestamps wanted long long int reqStart = 0; long long int reqEnd = 0; diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index e4fdeacb..cebf1163 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -55,12 +56,15 @@ namespace Controller { std::deque oldConns; std::map curConns; public: + statSession(); + void wipeOld(unsigned long long); void finish(unsigned long index); void switchOverTo(statSession & newSess, unsigned long index); void update(unsigned long index, IPC::statExchange & data); unsigned long long getStart(); unsigned long long getEnd(); bool hasDataFor(unsigned long long time); + bool hasData(); long long getConnTime(unsigned long long time); long long getLastSecond(unsigned long long time); long long getDown(unsigned long long time); @@ -74,6 +78,7 @@ namespace Controller { extern std::map sessions; extern std::map connToSession; + extern tthread::mutex statsMutex; void parseStatistics(char * data, size_t len, unsigned int id); void fillClients(JSON::Value & req, JSON::Value & rep); void fillTotals(JSON::Value & req, JSON::Value & rep); diff --git a/src/output/output.cpp b/src/output/output.cpp index a5269742..39898d77 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -34,7 +34,7 @@ namespace Mist { Output::Output(Socket::Connection & conn) : myConn(conn) { firstTime = 0; - crc = 0; + crc = getpid(); parseData = false; wantRequest = true; sought = false;