New statistics code with sessions.

This commit is contained in:
Thulinma 2014-11-28 16:30:35 +01:00
parent baa29b55c5
commit 63c4e5d1e6
2 changed files with 350 additions and 157 deletions

View file

@ -2,9 +2,6 @@
#include <mist/config.h>
#include "controller_statistics.h"
/// The STAT_CUTOFF define sets how many seconds of statistics history is kept.
#define STAT_CUTOFF 600
// These are used to store "clients" field requests in a bitfield for speedup.
#define STAT_CLI_HOST 1
#define STAT_CLI_STREAM 2
@ -24,31 +21,23 @@
#define STAT_TOT_ALL 0xFF
std::multimap<unsigned long long int, Controller::statStorage> Controller::oldConns;///<Old connections, sorted on disconnect timestamp
std::map<unsigned long, Controller::statStorage> Controller::curConns;///<Connection storage, sorted on page location.
std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; ///< list of sessions that have statistics data available
std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
/// This function runs as a thread and roughly once per second retrieves
/// statistics from all connected clients, as well as wipes
/// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer("statistics", STAT_EX_SIZE, true);
while(((Util::Config*)config)->is_active){
//parse current users
statServer.parseEach(parseStatistics);
//wipe old statistics
while (oldConns.size() && oldConns.begin()->first < (unsigned long long)(Util::epoch() - STAT_CUTOFF)){
oldConns.erase(oldConns.begin());
}
Util::sleep(1000);
}
DEBUG_MSG(DLVL_HIGH, "Stopping stats thread");
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
host = dhost;
crc = dcrc;
streamName = dstreamName;
connector = dconnector;
}
/// This function is called by parseStatistics.
/// It updates the internally saved statistics data.
void Controller::statStorage::update(IPC::statExchange & data) {
if (!streamName.size()){
Controller::sessIndex::sessIndex(){
crc = 0;
}
/// Initializes a sessIndex from a statExchange object, converting binary format IP addresses into strings.
/// This extracts the host, stream name, connector and crc field, ignoring everything else.
Controller::sessIndex::sessIndex(IPC::statExchange & data){
std::string tHost = data.host();
if (tHost.substr(0, 12) == std::string("\000\000\000\000\000\000\000\000\000\000\377\377", 12)){
char tmpstr[16];
@ -60,11 +49,251 @@ void Controller::statStorage::update(IPC::statExchange & data) {
host = tmpstr;
}
streamName = data.streamName();
}
if (!connector.size()){
connector = data.connector();
}
crc = data.crc();
}
bool Controller::sessIndex::operator== (const Controller::sessIndex &b) const{
return (host == b.host && crc == b.crc && streamName == b.streamName && connector == b.connector);
}
bool Controller::sessIndex::operator!= (const Controller::sessIndex &b) const{
return !(*this == b);
}
bool Controller::sessIndex::operator> (const Controller::sessIndex &b) const{
return host > b.host || (host == b.host && (crc > b.crc || (crc == b.crc && (streamName > b.streamName || (streamName == b.streamName && connector > b.connector)))));
}
bool Controller::sessIndex::operator< (const Controller::sessIndex &b) const{
return host < b.host || (host == b.host && (crc < b.crc || (crc == b.crc && (streamName < b.streamName || (streamName == b.streamName && connector < b.connector)))));
}
bool Controller::sessIndex::operator<= (const Controller::sessIndex &b) const{
return !(*this > b);
}
bool Controller::sessIndex::operator>= (const Controller::sessIndex &b) const{
return !(*this < b);
}
/// This function runs as a thread and roughly once per second retrieves
/// statistics from all connected clients, as well as wipes
/// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer("statistics", STAT_EX_SIZE, true);
while(((Util::Config*)config)->is_active){
//parse current users
statServer.parseEach(parseStatistics);
//wipe old statistics
/// \todo Loop over all sessions and trigger erase function
Util::sleep(1000);
}
DEBUG_MSG(DLVL_HIGH, "Stopping stats thread");
}
/// Updates the given active connection with new stats data.
void Controller::statSession::update(unsigned long index, IPC::statExchange & data){
curConns[index].update(data);
//store timestamp of last received data, if newer
if (data.now() > lastSec){
lastSec = data.now();
}
//store timestamp of first received data, if not known yet or older
if (!firstSec || firstSec > data.now()){
firstSec = data.now();
}
}
/// Archives the given connection.
void Controller::statSession::finish(unsigned long index){
oldConns.push_back(curConns[index]);
curConns.erase(index);
}
/// Moves the given connection to the given session
void Controller::statSession::switchOverTo(statSession & newSess, unsigned long index){
newSess.curConns[index] = curConns[index];
//if this connection has data, update firstSec/lastSec if needed
if (curConns[index].log.size()){
if (!newSess.firstSec || newSess.firstSec > curConns[index].log.begin()->first){
newSess.firstSec = curConns[index].log.begin()->first;
}
if (newSess.lastSec < curConns[index].log.rbegin()->first){
newSess.lastSec = curConns[index].log.rbegin()->first;
}
/// \todo Correct local firstSec/lastSec - we may have just deleted either (or both) end(s) of the data for this session.
}
curConns.erase(index);
}
/// Returns the first measured timestamp in this session.
unsigned long long Controller::statSession::getStart(){
return firstSec;
}
/// Returns the last measured timestamp in this session.
unsigned long long Controller::statSession::getEnd(){
return lastSec;
}
/// Returns true if there is data for this session at timestamp t.
bool Controller::statSession::hasDataFor(unsigned long long t){
if (lastSec < t){return false;}
if (firstSec > t){return false;}
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){return true;}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){return true;}
}
}
return false;
}
/// Returns the cumulative connected time for this session at timestamp t.
long long Controller::statSession::getConnTime(unsigned long long t){
long long retVal = 0;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
retVal += it->getDataFor(t).time;
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
retVal += it->second.getDataFor(t).time;
}
}
}
return retVal;
}
/// Returns the last requested media timestamp for this session at timestamp t.
long long Controller::statSession::getLastSecond(unsigned long long t){
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
return it->second.getDataFor(t).lastSecond;
}
}
}
if (oldConns.size()){
for (std::deque<statStorage>::reverse_iterator it = oldConns.rbegin(); it != oldConns.rend(); ++it){
if (it->hasDataFor(t)){
return it->getDataFor(t).lastSecond;
}
}
}
return 0;
}
/// Returns the cumulative downloaded bytes for this session at timestamp t.
long long Controller::statSession::getDown(unsigned long long t){
long long retVal = 0;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
retVal += it->getDataFor(t).down;
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
retVal += it->second.getDataFor(t).down;
}
}
}
return retVal;
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
long long Controller::statSession::getUp(unsigned long long t){
long long retVal = 0;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
retVal += it->getDataFor(t).up;
}
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
retVal += it->second.getDataFor(t).up;
}
}
}
return retVal;
}
/// Returns the cumulative downloaded bytes per second for this session at timestamp t.
long long Controller::statSession::getBpsDown(unsigned long long t){
unsigned long long aTime = t - 5;
if (aTime < firstSec){
aTime = firstSec;
}
long long valA = getDown(aTime);
long long valB = getDown(t);
if (t > aTime){
//INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, (valB - valA) / (t - aTime));
return (valB - valA) / (t - aTime);
}else{
//INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, 0);
return 0;
}
}
/// Returns the cumulative uploaded bytes per second for this session at timestamp t.
long long Controller::statSession::getBpsUp(unsigned long long t){
unsigned long long aTime = t - 5;
if (aTime < firstSec){
aTime = firstSec;
}
long long valA = getUp(aTime);
long long valB = getUp(t);
if (t > aTime){
return (valB - valA) / (t - aTime);
}else{
return 0;
}
}
/// Returns true if there is data available for timestamp t.
bool Controller::statStorage::hasDataFor(unsigned long long t) {
if (!log.size()){return false;}
return (t >= log.begin()->first);
}
/// Returns a reference to the most current data available at timestamp t.
Controller::statLog & Controller::statStorage::getDataFor(unsigned long long t) {
static statLog empty;
if (!log.size()){
empty.time = 0;
empty.lastSecond = 0;
empty.down = 0;
empty.up = 0;
return empty;
}
std::map<unsigned long long, statLog>::iterator it = log.upper_bound(t);
if (it != log.begin()){
it--;
}
return it->second;
}
/// This function is called by parseStatistics.
/// It updates the internally saved statistics data.
void Controller::statStorage::update(IPC::statExchange & data) {
statLog tmp;
tmp.time = data.time();
tmp.lastSecond = data.lastSecond();
@ -72,28 +301,40 @@ void Controller::statStorage::update(IPC::statExchange & data) {
tmp.up = data.up();
log[data.now()] = tmp;
//wipe data older than approx. STAT_CUTOFF seconds
/// \todo Remove least interesting data first.
if (log.size() > STAT_CUTOFF){
log.erase(log.begin());
}
}
/// This function is called by the shared memory page that holds statistics.
/// It updates the internally saved statistics data, archiving if neccessary.
/// It updates the internally saved statistics data, moving across sessions or archiving when neccessary.
void Controller::parseStatistics(char * data, size_t len, unsigned int id){
//retrieve stats data
IPC::statExchange tmpEx(data);
curConns[id].update(tmpEx);
//calculate the current session index, store as idx.
sessIndex idx(tmpEx);
//if the connection was already indexed and it has changed, move it
if (connToSession.count(id) && connToSession[id] != idx){
sessions[connToSession[id]].switchOverTo(sessions[idx], id);
}
//store the index for later comparison
connToSession[id] = idx;
//update the session with the latest data
sessions[idx].update(id, tmpEx);
//check validity of stats data
char counter = (*(data - 1));
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
oldConns.insert(std::pair<unsigned long long int, statStorage>(Util::epoch(), curConns[id]));
curConns.erase(id);
//the data is no longer valid - connection has gone away, store for later
sessions[idx].finish(id);
}
}
/// Returns true if this stream has at least one connected client.
bool Controller::hasViewers(std::string streamName){
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
if (it->second.streamName == streamName){
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->first.streamName == streamName){
return true;
}
}
@ -137,8 +378,6 @@ bool Controller::hasViewers(std::string streamName){
/// ~~~~~~~~~~~~~~~
/// In case of the second method, the response is an array in the same order as the requests.
void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
//memorize the current system time
long long currTime = Util::epoch();
//first, figure out the timestamp wanted
long long int reqTime = 0;
if (req.isMember("time")){
@ -198,88 +437,29 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
if (fields & STAT_CLI_CRC){rep["fields"].append("crc");}
//output the data itself
rep["data"].null();
//start with current connections
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
if (!it->second.log.size()){continue;}
//ignore users that haven't been updated in the last 5 seconds.
if (it->second.log.rbegin()->first < currTime - 5){
continue;
}
//loop over all sessions
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
unsigned long long time = reqTime;
if (now){time = it->second.log.rbegin()->first;}
if (now && reqTime - it->second.getEnd() < 5){time = it->second.getEnd();}
//data present and wanted? insert it!
if ((it->second.log.rbegin()->first >= time && it->second.log.begin()->first <= time) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
if ((it->second.getEnd() >= time && it->second.getStart() <= time) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){
if (it->second.hasDataFor(time)){
JSON::Value d;
std::map<unsigned long long, statLog>::iterator statRef = it->second.log.lower_bound(time);
std::map<unsigned long long, statLog>::iterator prevRef = --(it->second.log.lower_bound(time));
if (fields & STAT_CLI_HOST){d.append(it->second.host);}
if (fields & STAT_CLI_STREAM){d.append(it->second.streamName);}
if (fields & STAT_CLI_PROTO){d.append(it->second.connector);}
if (fields & STAT_CLI_CONNTIME){d.append((long long)statRef->second.time);}
if (fields & STAT_CLI_POSITION){d.append((long long)statRef->second.lastSecond);}
if (fields & STAT_CLI_DOWN){d.append(statRef->second.down);}
if (fields & STAT_CLI_UP){d.append(statRef->second.up);}
if (fields & STAT_CLI_BPS_DOWN){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.down - prevRef->second.down) / diff);
}else{
d.append(statRef->second.down);
}
}
if (fields & STAT_CLI_BPS_UP){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.up - prevRef->second.up) / diff);
}else{
d.append(statRef->second.up);
}
}
if (fields & STAT_CLI_CRC){d.append((long long)it->second.crc);}
if (fields & STAT_CLI_HOST){d.append(it->first.host);}
if (fields & STAT_CLI_STREAM){d.append(it->first.streamName);}
if (fields & STAT_CLI_PROTO){d.append(it->first.connector);}
if (fields & STAT_CLI_CONNTIME){d.append(it->second.getConnTime(time));}
if (fields & STAT_CLI_POSITION){d.append(it->second.getLastSecond(time));}
if (fields & STAT_CLI_DOWN){d.append(it->second.getDown(time));}
if (fields & STAT_CLI_UP){d.append(it->second.getUp(time));}
if (fields & STAT_CLI_BPS_DOWN){d.append(it->second.getBpsDown(time));}
if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));}
if (fields & STAT_CLI_CRC){d.append((long long)it->first.crc);}
rep["data"].append(d);
}
}
}
//if we're only interested in current, don't even bother looking at history
if (now){
return;
}
//look at history
if (oldConns.size()){
for (std::map<unsigned long long int, statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); it++){
//data present and wanted? insert it!
if ((it->second.log.rbegin()->first >= (unsigned long long)reqTime && it->second.log.begin()->first <= (unsigned long long)reqTime) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
JSON::Value d;
std::map<unsigned long long, statLog>::iterator statRef = it->second.log.lower_bound(reqTime);
std::map<unsigned long long, statLog>::iterator prevRef = --(it->second.log.lower_bound(reqTime));
if (fields & STAT_CLI_HOST){d.append(it->second.host);}
if (fields & STAT_CLI_STREAM){d.append(it->second.streamName);}
if (fields & STAT_CLI_PROTO){d.append(it->second.connector);}
if (fields & STAT_CLI_CONNTIME){d.append((long long)statRef->second.time);}
if (fields & STAT_CLI_POSITION){d.append((long long)statRef->second.lastSecond);}
if (fields & STAT_CLI_DOWN){d.append(statRef->second.down);}
if (fields & STAT_CLI_UP){d.append(statRef->second.up);}
if (fields & STAT_CLI_BPS_DOWN){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.down - prevRef->second.down) / diff);
}else{
d.append(statRef->second.down);
}
}
if (fields & STAT_CLI_BPS_UP){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.up - prevRef->second.up) / diff);
}else{
d.append(statRef->second.up);
}
}
if (fields & STAT_CLI_CRC){d.append((long long)it->second.crc);}
rep["data"].append(d);
}
}
}
//all done! return is by reference, so no need to return anything here.
}
@ -396,50 +576,16 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");}
//start data collection
std::map<long long unsigned int, totalsData> totalsCount;
//start with current connections
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
//loop over all sessions
/// \todo Make the interval configurable instead of 1 second
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
//data present and wanted? insert it!
if (it->second.log.size() > 1 && (it->second.log.rbegin()->first >= (unsigned long long)reqStart || it->second.log.begin()->first <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
//keep track of the previous and current, starting at position 2 so there's always a delta down/up value.
std::map<unsigned long long, statLog>::iterator pi = it->second.log.begin();
for (std::map<unsigned long long, statLog>::iterator li = ++(it->second.log.begin()); li != it->second.log.end(); li++){
if (li->first < (unsigned long long)reqStart || pi->first > (unsigned long long)reqEnd){
continue;
if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->first.streamName)) && (!protos.size() || protos.count(it->first.connector))){
for (unsigned long long i = reqStart; i <= reqEnd; ++i){
if (it->second.hasDataFor(i)){
totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i));
}
unsigned int diff = li->first - pi->first;
unsigned int ddown = (li->second.down - pi->second.down) / diff;
unsigned int dup = (li->second.up - pi->second.up) / diff;
for (long long unsigned int t = pi->first; t < li->first; t++){
if (t >= (unsigned long long)reqStart && t <= (unsigned long long)reqEnd){
totalsCount[t].add(ddown, dup);
}
}
pi = li;//set previous iterator to log iterator
}
}
}
}
//look at history
if (oldConns.size()){
for (std::map<unsigned long long int, statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); it++){
//data present and wanted? insert it!
if (it->second.log.size() > 1 && (it->second.log.rbegin()->first >= (unsigned long long)reqStart || it->second.log.begin()->first <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
//keep track of the previous and current, starting at position 2 so there's always a delta down/up value.
std::map<unsigned long long, statLog>::iterator pi = it->second.log.begin();
for (std::map<unsigned long long, statLog>::iterator li = ++(it->second.log.begin()); li != it->second.log.end(); li++){
if (li->first < (unsigned long long)reqStart || pi->first > (unsigned long long)reqEnd){
continue;
}
unsigned int diff = li->first - pi->first;
unsigned int ddown = (li->second.down - pi->second.down) / diff;
unsigned int dup = (li->second.up - pi->second.up) / diff;
for (long long unsigned int t = pi->first; t < li->first; t++){
if (t >= (unsigned long long)reqStart && t <= (unsigned long long)reqEnd){
totalsCount[t].add(ddown, dup);
}
}
pi = li;//set previous iterator to log iterator
}
}
}

View file

@ -5,6 +5,9 @@
#include <string>
#include <map>
/// The STAT_CUTOFF define sets how many seconds of statistics history is kept.
#define STAT_CUTOFF 600
namespace Controller {
struct statLog {
@ -14,19 +17,63 @@ namespace Controller {
long long up;
};
class statStorage {
/// This is a comparison and storage class that keeps sessions apart from each other.
/// Whenever two of these objects are not equal, it will create a new session.
class sessIndex {
public:
void update(IPC::statExchange & data);
sessIndex(std::string host, unsigned int crc, std::string streamName, std::string connector);
sessIndex(IPC::statExchange & data);
sessIndex();
std::string host;
unsigned int crc;
std::string streamName;
std::string connector;
std::map<unsigned long long, statLog> log;
bool operator== (const sessIndex &o) const;
bool operator!= (const sessIndex &o) const;
bool operator> (const sessIndex &o) const;
bool operator<= (const sessIndex &o) const;
bool operator< (const sessIndex &o) const;
bool operator>= (const sessIndex &o) const;
};
extern std::multimap<unsigned long long int, statStorage> oldConns;
extern std::map<unsigned long, statStorage> curConns;
class statStorage {
public:
void update(IPC::statExchange & data);
bool hasDataFor(unsigned long long);
statLog & getDataFor(unsigned long long);
std::map<unsigned long long, statLog> log;
};
/// A session class that keeps track of both current and archived connections.
/// Allows for moving of connections to another session.
class statSession {
private:
unsigned long long firstSec;
unsigned long long lastSec;
std::deque<statStorage> oldConns;
std::map<unsigned long, statStorage> curConns;
public:
void finish(unsigned long index);
void switchOverTo(statSession & newSess, unsigned long index);
void update(unsigned long index, IPC::statExchange & data);
unsigned long long getStart();
unsigned long long getEnd();
bool hasDataFor(unsigned long long time);
long long getConnTime(unsigned long long time);
long long getLastSecond(unsigned long long time);
long long getDown(unsigned long long time);
long long getUp(unsigned long long time);
long long getBpsDown(unsigned long long time);
long long getBpsUp(unsigned long long time);
long long getBpsDown(unsigned long long start, unsigned long long end);
long long getBpsUp(unsigned long long start, unsigned long long end);
};
extern std::map<sessIndex, statSession> sessions;
extern std::map<unsigned long, sessIndex> connToSession;
void parseStatistics(char * data, size_t len, unsigned int id);
void fillClients(JSON::Value & req, JSON::Value & rep);
void fillTotals(JSON::Value & req, JSON::Value & rep);