Fixed various statistics bugs.

This commit is contained in:
Thulinma 2014-12-05 21:31:45 +01:00
parent 17a93fe927
commit 55046206fe
3 changed files with 104 additions and 9 deletions

View file

@ -23,6 +23,7 @@
std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; ///< list of sessions that have statistics data available std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; ///< list of sessions that have statistics data available
std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info. std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
tthread::mutex Controller::statsMutex;
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
host = dhost; host = dhost;
@ -86,10 +87,18 @@ void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer("statistics", STAT_EX_SIZE, true); IPC::sharedServer statServer("statistics", STAT_EX_SIZE, true);
while(((Util::Config*)config)->is_active){ while(((Util::Config*)config)->is_active){
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//parse current users //parse current users
statServer.parseEach(parseStatistics); statServer.parseEach(parseStatistics);
//wipe old statistics //wipe old statistics
/// \todo Loop over all sessions and trigger erase function if (sessions.size()){
unsigned long long cutOffPoint = Util::epoch() - STAT_CUTOFF;
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
it->second.wipeOld(cutOffPoint);
}
}
}
Util::sleep(1000); Util::sleep(1000);
} }
DEBUG_MSG(DLVL_HIGH, "Stopping stats thread"); DEBUG_MSG(DLVL_HIGH, "Stopping stats thread");
@ -102,32 +111,91 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
if (data.now() > lastSec){ if (data.now() > lastSec){
lastSec = data.now(); lastSec = data.now();
} }
//store timestamp of first received data, if not known yet or older //store timestamp of first received data, if older
if (!firstSec || firstSec > data.now()){ if (firstSec > data.now()){
firstSec = data.now(); firstSec = data.now();
} }
} }
/// Archives the given connection.
void Controller::statSession::wipeOld(unsigned long long cutOff){
if (firstSec > cutOff){
return;
}
firstSec = 0xFFFFFFFFFFFFFFFFull;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
while (it->log.size() && it->log.begin()->first < cutOff){
it->log.erase(it->log.begin());
}
if (it->log.size()){
if (firstSec > it->log.begin()->first){
firstSec = it->log.begin()->first;
}
}
}
while (oldConns.size() && !oldConns.begin()->log.size()){
oldConns.pop_front();
}
}
}
/// Archives the given connection. /// Archives the given connection.
void Controller::statSession::finish(unsigned long index){ void Controller::statSession::finish(unsigned long index){
oldConns.push_back(curConns[index]); oldConns.push_back(curConns[index]);
curConns.erase(index); curConns.erase(index);
} }
/// Constructs an empty session
Controller::statSession::statSession(){
firstSec = 0xFFFFFFFFFFFFFFFFull;
lastSec = 0;
}
/// Moves the given connection to the given session /// Moves the given connection to the given session
void Controller::statSession::switchOverTo(statSession & newSess, unsigned long index){ void Controller::statSession::switchOverTo(statSession & newSess, unsigned long index){
//add to the given session first
newSess.curConns[index] = curConns[index]; newSess.curConns[index] = curConns[index];
//if this connection has data, update firstSec/lastSec if needed //if this connection has data, update firstSec/lastSec if needed
if (curConns[index].log.size()){ if (curConns[index].log.size()){
if (!newSess.firstSec || newSess.firstSec > curConns[index].log.begin()->first){ if (newSess.firstSec > curConns[index].log.begin()->first){
newSess.firstSec = curConns[index].log.begin()->first; newSess.firstSec = curConns[index].log.begin()->first;
} }
if (newSess.lastSec < curConns[index].log.rbegin()->first){ if (newSess.lastSec < curConns[index].log.rbegin()->first){
newSess.lastSec = curConns[index].log.rbegin()->first; newSess.lastSec = curConns[index].log.rbegin()->first;
} }
/// \todo Correct local firstSec/lastSec - we may have just deleted either (or both) end(s) of the data for this session.
} }
//remove from current session
curConns.erase(index); curConns.erase(index);
//if there was any data, recalculate this session's firstSec and lastSec.
if (newSess.curConns[index].log.size()){
firstSec = 0xFFFFFFFFFFFFFFFFull;
lastSec = 0;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){
if (firstSec > it->log.begin()->first){
firstSec = it->log.begin()->first;
}
if (lastSec < it->log.rbegin()->first){
lastSec = it->log.rbegin()->first;
}
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){
if (firstSec > it->second.log.begin()->first){
firstSec = it->second.log.begin()->first;
}
if (lastSec < it->second.log.rbegin()->first){
lastSec = it->second.log.rbegin()->first;
}
}
}
}
}
} }
/// Returns the first measured timestamp in this session. /// Returns the first measured timestamp in this session.
@ -157,6 +225,22 @@ bool Controller::statSession::hasDataFor(unsigned long long t){
return false; return false;
} }
/// Returns true if there is any data for this session.
bool Controller::statSession::hasData(){
if (!firstSec && !lastSec){return false;}
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){return true;}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){return true;}
}
}
return false;
}
/// Returns the cumulative connected time for this session at timestamp t. /// Returns the cumulative connected time for this session at timestamp t.
long long Controller::statSession::getConnTime(unsigned long long t){ long long Controller::statSession::getConnTime(unsigned long long t){
long long retVal = 0; long long retVal = 0;
@ -317,6 +401,9 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
//if the connection was already indexed and it has changed, move it //if the connection was already indexed and it has changed, move it
if (connToSession.count(id) && connToSession[id] != idx){ if (connToSession.count(id) && connToSession[id] != idx){
sessions[connToSession[id]].switchOverTo(sessions[idx], id); sessions[connToSession[id]].switchOverTo(sessions[idx], id);
if (!sessions[connToSession[id]].hasData()){
sessions.erase(connToSession[id]);
}
} }
//store the index for later comparison //store the index for later comparison
connToSession[id] = idx; connToSession[id] = idx;
@ -327,6 +414,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
//the data is no longer valid - connection has gone away, store for later //the data is no longer valid - connection has gone away, store for later
sessions[idx].finish(id); sessions[idx].finish(id);
connToSession.erase(id);
} }
} }
@ -378,6 +466,7 @@ bool Controller::hasViewers(std::string streamName){
/// ~~~~~~~~~~~~~~~ /// ~~~~~~~~~~~~~~~
/// In case of the second method, the response is an array in the same order as the requests. /// In case of the second method, the response is an array in the same order as the requests.
void Controller::fillClients(JSON::Value & req, JSON::Value & rep){ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//first, figure out the timestamp wanted //first, figure out the timestamp wanted
long long int reqTime = 0; long long int reqTime = 0;
if (req.isMember("time")){ if (req.isMember("time")){
@ -523,6 +612,7 @@ class totalsData {
/// ~~~~~~~~~~~~~~~ /// ~~~~~~~~~~~~~~~
/// In case of the second method, the response is an array in the same order as the requests. /// In case of the second method, the response is an array in the same order as the requests.
void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//first, figure out the timestamps wanted //first, figure out the timestamps wanted
long long int reqStart = 0; long long int reqStart = 0;
long long int reqEnd = 0; long long int reqEnd = 0;

View file

@ -2,6 +2,7 @@
#include <mist/timing.h> #include <mist/timing.h>
#include <mist/defines.h> #include <mist/defines.h>
#include <mist/json.h> #include <mist/json.h>
#include <mist/tinythread.h>
#include <string> #include <string>
#include <map> #include <map>
@ -55,12 +56,15 @@ namespace Controller {
std::deque<statStorage> oldConns; std::deque<statStorage> oldConns;
std::map<unsigned long, statStorage> curConns; std::map<unsigned long, statStorage> curConns;
public: public:
statSession();
void wipeOld(unsigned long long);
void finish(unsigned long index); void finish(unsigned long index);
void switchOverTo(statSession & newSess, unsigned long index); void switchOverTo(statSession & newSess, unsigned long index);
void update(unsigned long index, IPC::statExchange & data); void update(unsigned long index, IPC::statExchange & data);
unsigned long long getStart(); unsigned long long getStart();
unsigned long long getEnd(); unsigned long long getEnd();
bool hasDataFor(unsigned long long time); bool hasDataFor(unsigned long long time);
bool hasData();
long long getConnTime(unsigned long long time); long long getConnTime(unsigned long long time);
long long getLastSecond(unsigned long long time); long long getLastSecond(unsigned long long time);
long long getDown(unsigned long long time); long long getDown(unsigned long long time);
@ -74,6 +78,7 @@ namespace Controller {
extern std::map<sessIndex, statSession> sessions; extern std::map<sessIndex, statSession> sessions;
extern std::map<unsigned long, sessIndex> connToSession; extern std::map<unsigned long, sessIndex> connToSession;
extern tthread::mutex statsMutex;
void parseStatistics(char * data, size_t len, unsigned int id); void parseStatistics(char * data, size_t len, unsigned int id);
void fillClients(JSON::Value & req, JSON::Value & rep); void fillClients(JSON::Value & req, JSON::Value & rep);
void fillTotals(JSON::Value & req, JSON::Value & rep); void fillTotals(JSON::Value & req, JSON::Value & rep);

View file

@ -34,7 +34,7 @@ namespace Mist {
Output::Output(Socket::Connection & conn) : myConn(conn) { Output::Output(Socket::Connection & conn) : myConn(conn) {
firstTime = 0; firstTime = 0;
crc = 0; crc = getpid();
parseData = false; parseData = false;
wantRequest = true; wantRequest = true;
sought = false; sought = false;