diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 4ef0ba11..9b62329b 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -2,9 +2,6 @@ #include #include "controller_statistics.h" -/// The STAT_CUTOFF define sets how many seconds of statistics history is kept. -#define STAT_CUTOFF 600 - // These are used to store "clients" field requests in a bitfield for speedup. #define STAT_CLI_HOST 1 #define STAT_CLI_STREAM 2 @@ -24,8 +21,63 @@ #define STAT_TOT_ALL 0xFF -std::multimap Controller::oldConns;/// Controller::curConns;/// Controller::sessions; ///< list of sessions that have statistics data available +std::map Controller::connToSession; ///< Map of socket IDs to session info. + +Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ + host = dhost; + crc = dcrc; + streamName = dstreamName; + connector = dconnector; +} + +Controller::sessIndex::sessIndex(){ + crc = 0; +} + +/// Initializes a sessIndex from a statExchange object, converting binary format IP addresses into strings. +/// This extracts the host, stream name, connector and crc field, ignoring everything else. +Controller::sessIndex::sessIndex(IPC::statExchange & data){ + std::string tHost = data.host(); + if (tHost.substr(0, 12) == std::string("\000\000\000\000\000\000\000\000\000\000\377\377", 12)){ + char tmpstr[16]; + snprintf(tmpstr, 16, "%hhu.%hhu.%hhu.%hhu", tHost[12], tHost[13], tHost[14], tHost[15]); + host = tmpstr; + }else{ + char tmpstr[40]; + snprintf(tmpstr, 40, "%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x", tHost[0], tHost[1], tHost[2], tHost[3], tHost[4], tHost[5], tHost[6], tHost[7], tHost[8], tHost[9], tHost[10], tHost[11], tHost[12], tHost[13], tHost[14], tHost[15]); + host = tmpstr; + } + streamName = data.streamName(); + connector = data.connector(); + crc = data.crc(); +} + + +bool Controller::sessIndex::operator== (const Controller::sessIndex &b) const{ + return (host == b.host && crc == b.crc && streamName == b.streamName && connector == b.connector); +} + +bool Controller::sessIndex::operator!= (const Controller::sessIndex &b) const{ + return !(*this == b); +} + +bool Controller::sessIndex::operator> (const Controller::sessIndex &b) const{ + return host > b.host || (host == b.host && (crc > b.crc || (crc == b.crc && (streamName > b.streamName || (streamName == b.streamName && connector > b.connector))))); +} + +bool Controller::sessIndex::operator< (const Controller::sessIndex &b) const{ + return host < b.host || (host == b.host && (crc < b.crc || (crc == b.crc && (streamName < b.streamName || (streamName == b.streamName && connector < b.connector))))); +} + +bool Controller::sessIndex::operator<= (const Controller::sessIndex &b) const{ + return !(*this > b); +} + +bool Controller::sessIndex::operator>= (const Controller::sessIndex &b) const{ + return !(*this < b); +} + /// This function runs as a thread and roughly once per second retrieves /// statistics from all connected clients, as well as wipes @@ -37,34 +89,211 @@ void Controller::SharedMemStats(void * config){ //parse current users statServer.parseEach(parseStatistics); //wipe old statistics - while (oldConns.size() && oldConns.begin()->first < (unsigned long long)(Util::epoch() - STAT_CUTOFF)){ - oldConns.erase(oldConns.begin()); - } + /// \todo Loop over all sessions and trigger erase function Util::sleep(1000); } DEBUG_MSG(DLVL_HIGH, "Stopping stats thread"); } +/// Updates the given active connection with new stats data. +void Controller::statSession::update(unsigned long index, IPC::statExchange & data){ + curConns[index].update(data); + //store timestamp of last received data, if newer + if (data.now() > lastSec){ + lastSec = data.now(); + } + //store timestamp of first received data, if not known yet or older + if (!firstSec || firstSec > data.now()){ + firstSec = data.now(); + } +} + +/// Archives the given connection. +void Controller::statSession::finish(unsigned long index){ + oldConns.push_back(curConns[index]); + curConns.erase(index); +} + +/// Moves the given connection to the given session +void Controller::statSession::switchOverTo(statSession & newSess, unsigned long index){ + newSess.curConns[index] = curConns[index]; + //if this connection has data, update firstSec/lastSec if needed + if (curConns[index].log.size()){ + if (!newSess.firstSec || newSess.firstSec > curConns[index].log.begin()->first){ + newSess.firstSec = curConns[index].log.begin()->first; + } + if (newSess.lastSec < curConns[index].log.rbegin()->first){ + newSess.lastSec = curConns[index].log.rbegin()->first; + } + /// \todo Correct local firstSec/lastSec - we may have just deleted either (or both) end(s) of the data for this session. + } + curConns.erase(index); +} + +/// Returns the first measured timestamp in this session. +unsigned long long Controller::statSession::getStart(){ + return firstSec; +} + +/// Returns the last measured timestamp in this session. +unsigned long long Controller::statSession::getEnd(){ + return lastSec; +} + +/// Returns true if there is data for this session at timestamp t. +bool Controller::statSession::hasDataFor(unsigned long long t){ + if (lastSec < t){return false;} + if (firstSec > t){return false;} + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){return true;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){return true;} + } + } + return false; +} + +/// Returns the cumulative connected time for this session at timestamp t. +long long Controller::statSession::getConnTime(unsigned long long t){ + long long retVal = 0; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){ + retVal += it->getDataFor(t).time; + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){ + retVal += it->second.getDataFor(t).time; + } + } + } + return retVal; +} + +/// Returns the last requested media timestamp for this session at timestamp t. +long long Controller::statSession::getLastSecond(unsigned long long t){ + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){ + return it->second.getDataFor(t).lastSecond; + } + } + } + if (oldConns.size()){ + for (std::deque::reverse_iterator it = oldConns.rbegin(); it != oldConns.rend(); ++it){ + if (it->hasDataFor(t)){ + return it->getDataFor(t).lastSecond; + } + } + } + return 0; +} + +/// Returns the cumulative downloaded bytes for this session at timestamp t. +long long Controller::statSession::getDown(unsigned long long t){ + long long retVal = 0; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){ + retVal += it->getDataFor(t).down; + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){ + retVal += it->second.getDataFor(t).down; + } + } + } + return retVal; +} + +/// Returns the cumulative uploaded bytes for this session at timestamp t. +long long Controller::statSession::getUp(unsigned long long t){ + long long retVal = 0; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){ + retVal += it->getDataFor(t).up; + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){ + retVal += it->second.getDataFor(t).up; + } + } + } + return retVal; +} + +/// Returns the cumulative downloaded bytes per second for this session at timestamp t. +long long Controller::statSession::getBpsDown(unsigned long long t){ + unsigned long long aTime = t - 5; + if (aTime < firstSec){ + aTime = firstSec; + } + long long valA = getDown(aTime); + long long valB = getDown(t); + if (t > aTime){ + //INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, (valB - valA) / (t - aTime)); + return (valB - valA) / (t - aTime); + }else{ + //INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, 0); + return 0; + } +} + +/// Returns the cumulative uploaded bytes per second for this session at timestamp t. +long long Controller::statSession::getBpsUp(unsigned long long t){ + unsigned long long aTime = t - 5; + if (aTime < firstSec){ + aTime = firstSec; + } + long long valA = getUp(aTime); + long long valB = getUp(t); + if (t > aTime){ + return (valB - valA) / (t - aTime); + }else{ + return 0; + } +} + +/// Returns true if there is data available for timestamp t. +bool Controller::statStorage::hasDataFor(unsigned long long t) { + if (!log.size()){return false;} + return (t >= log.begin()->first); +} + +/// Returns a reference to the most current data available at timestamp t. +Controller::statLog & Controller::statStorage::getDataFor(unsigned long long t) { + static statLog empty; + if (!log.size()){ + empty.time = 0; + empty.lastSecond = 0; + empty.down = 0; + empty.up = 0; + return empty; + } + std::map::iterator it = log.upper_bound(t); + if (it != log.begin()){ + it--; + } + return it->second; +} + /// This function is called by parseStatistics. /// It updates the internally saved statistics data. void Controller::statStorage::update(IPC::statExchange & data) { - if (!streamName.size()){ - std::string tHost = data.host(); - if (tHost.substr(0, 12) == std::string("\000\000\000\000\000\000\000\000\000\000\377\377", 12)){ - char tmpstr[16]; - snprintf(tmpstr, 16, "%hhu.%hhu.%hhu.%hhu", tHost[12], tHost[13], tHost[14], tHost[15]); - host = tmpstr; - }else{ - char tmpstr[40]; - snprintf(tmpstr, 40, "%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x", tHost[0], tHost[1], tHost[2], tHost[3], tHost[4], tHost[5], tHost[6], tHost[7], tHost[8], tHost[9], tHost[10], tHost[11], tHost[12], tHost[13], tHost[14], tHost[15]); - host = tmpstr; - } - streamName = data.streamName(); - } - if (!connector.size()){ - connector = data.connector(); - } - crc = data.crc(); statLog tmp; tmp.time = data.time(); tmp.lastSecond = data.lastSecond(); @@ -72,28 +301,40 @@ void Controller::statStorage::update(IPC::statExchange & data) { tmp.up = data.up(); log[data.now()] = tmp; //wipe data older than approx. STAT_CUTOFF seconds + /// \todo Remove least interesting data first. if (log.size() > STAT_CUTOFF){ log.erase(log.begin()); } } /// This function is called by the shared memory page that holds statistics. -/// It updates the internally saved statistics data, archiving if neccessary. +/// It updates the internally saved statistics data, moving across sessions or archiving when neccessary. void Controller::parseStatistics(char * data, size_t len, unsigned int id){ + //retrieve stats data IPC::statExchange tmpEx(data); - curConns[id].update(tmpEx); + //calculate the current session index, store as idx. + sessIndex idx(tmpEx); + //if the connection was already indexed and it has changed, move it + if (connToSession.count(id) && connToSession[id] != idx){ + sessions[connToSession[id]].switchOverTo(sessions[idx], id); + } + //store the index for later comparison + connToSession[id] = idx; + //update the session with the latest data + sessions[idx].update(id, tmpEx); + //check validity of stats data char counter = (*(data - 1)); if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ - oldConns.insert(std::pair(Util::epoch(), curConns[id])); - curConns.erase(id); + //the data is no longer valid - connection has gone away, store for later + sessions[idx].finish(id); } } /// Returns true if this stream has at least one connected client. bool Controller::hasViewers(std::string streamName){ - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); it++){ - if (it->second.streamName == streamName){ + if (sessions.size()){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->first.streamName == streamName){ return true; } } @@ -137,8 +378,6 @@ bool Controller::hasViewers(std::string streamName){ /// ~~~~~~~~~~~~~~~ /// In case of the second method, the response is an array in the same order as the requests. void Controller::fillClients(JSON::Value & req, JSON::Value & rep){ - //memorize the current system time - long long currTime = Util::epoch(); //first, figure out the timestamp wanted long long int reqTime = 0; if (req.isMember("time")){ @@ -198,86 +437,27 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){ if (fields & STAT_CLI_CRC){rep["fields"].append("crc");} //output the data itself rep["data"].null(); - //start with current connections - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); it++){ - if (!it->second.log.size()){continue;} - //ignore users that haven't been updated in the last 5 seconds. - if (it->second.log.rbegin()->first < currTime - 5){ - continue; - } + //loop over all sessions + if (sessions.size()){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ unsigned long long time = reqTime; - if (now){time = it->second.log.rbegin()->first;} + if (now && reqTime - it->second.getEnd() < 5){time = it->second.getEnd();} //data present and wanted? insert it! - if ((it->second.log.rbegin()->first >= time && it->second.log.begin()->first <= time) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){ - JSON::Value d; - std::map::iterator statRef = it->second.log.lower_bound(time); - std::map::iterator prevRef = --(it->second.log.lower_bound(time)); - if (fields & STAT_CLI_HOST){d.append(it->second.host);} - if (fields & STAT_CLI_STREAM){d.append(it->second.streamName);} - if (fields & STAT_CLI_PROTO){d.append(it->second.connector);} - if (fields & STAT_CLI_CONNTIME){d.append((long long)statRef->second.time);} - if (fields & STAT_CLI_POSITION){d.append((long long)statRef->second.lastSecond);} - if (fields & STAT_CLI_DOWN){d.append(statRef->second.down);} - if (fields & STAT_CLI_UP){d.append(statRef->second.up);} - if (fields & STAT_CLI_BPS_DOWN){ - if (statRef != it->second.log.begin()){ - unsigned int diff = statRef->first - prevRef->first; - d.append((statRef->second.down - prevRef->second.down) / diff); - }else{ - d.append(statRef->second.down); - } + if ((it->second.getEnd() >= time && it->second.getStart() <= time) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){ + if (it->second.hasDataFor(time)){ + JSON::Value d; + if (fields & STAT_CLI_HOST){d.append(it->first.host);} + if (fields & STAT_CLI_STREAM){d.append(it->first.streamName);} + if (fields & STAT_CLI_PROTO){d.append(it->first.connector);} + if (fields & STAT_CLI_CONNTIME){d.append(it->second.getConnTime(time));} + if (fields & STAT_CLI_POSITION){d.append(it->second.getLastSecond(time));} + if (fields & STAT_CLI_DOWN){d.append(it->second.getDown(time));} + if (fields & STAT_CLI_UP){d.append(it->second.getUp(time));} + if (fields & STAT_CLI_BPS_DOWN){d.append(it->second.getBpsDown(time));} + if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));} + if (fields & STAT_CLI_CRC){d.append((long long)it->first.crc);} + rep["data"].append(d); } - if (fields & STAT_CLI_BPS_UP){ - if (statRef != it->second.log.begin()){ - unsigned int diff = statRef->first - prevRef->first; - d.append((statRef->second.up - prevRef->second.up) / diff); - }else{ - d.append(statRef->second.up); - } - } - if (fields & STAT_CLI_CRC){d.append((long long)it->second.crc);} - rep["data"].append(d); - } - } - } - //if we're only interested in current, don't even bother looking at history - if (now){ - return; - } - //look at history - if (oldConns.size()){ - for (std::map::iterator it = oldConns.begin(); it != oldConns.end(); it++){ - //data present and wanted? insert it! - if ((it->second.log.rbegin()->first >= (unsigned long long)reqTime && it->second.log.begin()->first <= (unsigned long long)reqTime) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){ - JSON::Value d; - std::map::iterator statRef = it->second.log.lower_bound(reqTime); - std::map::iterator prevRef = --(it->second.log.lower_bound(reqTime)); - if (fields & STAT_CLI_HOST){d.append(it->second.host);} - if (fields & STAT_CLI_STREAM){d.append(it->second.streamName);} - if (fields & STAT_CLI_PROTO){d.append(it->second.connector);} - if (fields & STAT_CLI_CONNTIME){d.append((long long)statRef->second.time);} - if (fields & STAT_CLI_POSITION){d.append((long long)statRef->second.lastSecond);} - if (fields & STAT_CLI_DOWN){d.append(statRef->second.down);} - if (fields & STAT_CLI_UP){d.append(statRef->second.up);} - if (fields & STAT_CLI_BPS_DOWN){ - if (statRef != it->second.log.begin()){ - unsigned int diff = statRef->first - prevRef->first; - d.append((statRef->second.down - prevRef->second.down) / diff); - }else{ - d.append(statRef->second.down); - } - } - if (fields & STAT_CLI_BPS_UP){ - if (statRef != it->second.log.begin()){ - unsigned int diff = statRef->first - prevRef->first; - d.append((statRef->second.up - prevRef->second.up) / diff); - }else{ - d.append(statRef->second.up); - } - } - if (fields & STAT_CLI_CRC){d.append((long long)it->second.crc);} - rep["data"].append(d); } } } @@ -396,50 +576,16 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){ if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");} //start data collection std::map totalsCount; - //start with current connections - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); it++){ + //loop over all sessions + /// \todo Make the interval configurable instead of 1 second + if (sessions.size()){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ //data present and wanted? insert it! - if (it->second.log.size() > 1 && (it->second.log.rbegin()->first >= (unsigned long long)reqStart || it->second.log.begin()->first <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){ - //keep track of the previous and current, starting at position 2 so there's always a delta down/up value. - std::map::iterator pi = it->second.log.begin(); - for (std::map::iterator li = ++(it->second.log.begin()); li != it->second.log.end(); li++){ - if (li->first < (unsigned long long)reqStart || pi->first > (unsigned long long)reqEnd){ - continue; + if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){ + for (unsigned long long i = reqStart; i <= reqEnd; ++i){ + if (it->second.hasDataFor(i)){ + totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i)); } - unsigned int diff = li->first - pi->first; - unsigned int ddown = (li->second.down - pi->second.down) / diff; - unsigned int dup = (li->second.up - pi->second.up) / diff; - for (long long unsigned int t = pi->first; t < li->first; t++){ - if (t >= (unsigned long long)reqStart && t <= (unsigned long long)reqEnd){ - totalsCount[t].add(ddown, dup); - } - } - pi = li;//set previous iterator to log iterator - } - } - } - } - //look at history - if (oldConns.size()){ - for (std::map::iterator it = oldConns.begin(); it != oldConns.end(); it++){ - //data present and wanted? insert it! - if (it->second.log.size() > 1 && (it->second.log.rbegin()->first >= (unsigned long long)reqStart || it->second.log.begin()->first <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){ - //keep track of the previous and current, starting at position 2 so there's always a delta down/up value. - std::map::iterator pi = it->second.log.begin(); - for (std::map::iterator li = ++(it->second.log.begin()); li != it->second.log.end(); li++){ - if (li->first < (unsigned long long)reqStart || pi->first > (unsigned long long)reqEnd){ - continue; - } - unsigned int diff = li->first - pi->first; - unsigned int ddown = (li->second.down - pi->second.down) / diff; - unsigned int dup = (li->second.up - pi->second.up) / diff; - for (long long unsigned int t = pi->first; t < li->first; t++){ - if (t >= (unsigned long long)reqStart && t <= (unsigned long long)reqEnd){ - totalsCount[t].add(ddown, dup); - } - } - pi = li;//set previous iterator to log iterator } } } diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index fefa010c..e4fdeacb 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -5,6 +5,9 @@ #include #include +/// The STAT_CUTOFF define sets how many seconds of statistics history is kept. +#define STAT_CUTOFF 600 + namespace Controller { struct statLog { @@ -14,19 +17,63 @@ namespace Controller { long long up; }; - class statStorage { + /// This is a comparison and storage class that keeps sessions apart from each other. + /// Whenever two of these objects are not equal, it will create a new session. + class sessIndex { public: - void update(IPC::statExchange & data); + sessIndex(std::string host, unsigned int crc, std::string streamName, std::string connector); + sessIndex(IPC::statExchange & data); + sessIndex(); std::string host; unsigned int crc; std::string streamName; std::string connector; + + bool operator== (const sessIndex &o) const; + bool operator!= (const sessIndex &o) const; + bool operator> (const sessIndex &o) const; + bool operator<= (const sessIndex &o) const; + bool operator< (const sessIndex &o) const; + bool operator>= (const sessIndex &o) const; + }; + + + class statStorage { + public: + void update(IPC::statExchange & data); + bool hasDataFor(unsigned long long); + statLog & getDataFor(unsigned long long); std::map log; }; + + /// A session class that keeps track of both current and archived connections. + /// Allows for moving of connections to another session. + class statSession { + private: + unsigned long long firstSec; + unsigned long long lastSec; + std::deque oldConns; + std::map curConns; + public: + void finish(unsigned long index); + void switchOverTo(statSession & newSess, unsigned long index); + void update(unsigned long index, IPC::statExchange & data); + unsigned long long getStart(); + unsigned long long getEnd(); + bool hasDataFor(unsigned long long time); + long long getConnTime(unsigned long long time); + long long getLastSecond(unsigned long long time); + long long getDown(unsigned long long time); + long long getUp(unsigned long long time); + long long getBpsDown(unsigned long long time); + long long getBpsUp(unsigned long long time); + long long getBpsDown(unsigned long long start, unsigned long long end); + long long getBpsUp(unsigned long long start, unsigned long long end); + }; - extern std::multimap oldConns; - extern std::map curConns; + extern std::map sessions; + extern std::map connToSession; void parseStatistics(char * data, size_t len, unsigned int id); void fillClients(JSON::Value & req, JSON::Value & rep); void fillTotals(JSON::Value & req, JSON::Value & rep);