diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 713dbef0..438b4d32 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -123,7 +123,11 @@ void Controller::SharedMemStats(void * config){ if (sessions.size()){ std::list mustWipe; unsigned long long cutOffPoint = Util::epoch() - STAT_CUTOFF; + unsigned long long disconnectPointIn = Util::epoch() - STATS_INPUT_DELAY; + unsigned long long disconnectPointOut = Util::epoch() - STATS_DELAY; for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + unsigned long long dPoint = it->second.getSessType() == SESS_INPUT ? disconnectPointIn : disconnectPointOut; + it->second.ping(it->first, dPoint); it->second.wipeOld(cutOffPoint); if (!it->second.hasData()){ mustWipe.push_back(it->first); @@ -224,6 +228,10 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da //store timestamp of last received data, if newer if (data.now() > lastSec){ lastSec = data.now(); + if (!tracked){ + tracked = true; + firstActive = firstSec; + } } long long currDown = getDown(); long long currUp = getUp(); @@ -285,6 +293,10 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ while (it->log.size() && it->log.begin()->first < cutOff){ + if (it->log.size() == 1){ + wipedDown += it->log.begin()->second.down; + wipedUp += it->log.begin()->second.up; + } it->log.erase(it->log.begin()); } if (it->log.size()){ @@ -312,6 +324,7 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ } void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){ + if (!tracked){return;} if (lastSec < disconnectPoint){ switch (sessionType){ case SESS_INPUT: @@ -324,9 +337,10 @@ void Controller::statSession::ping(const Controller::sessIndex & index, unsigned if (streamStats[index.streamName].currViews){streamStats[index.streamName].currViews--;} break; } - uint64_t duration = lastSec - firstSec; + uint64_t duration = lastSec - firstActive; if (duration < 1){duration = 1;} Controller::logAccess("", index.streamName, index.connector, index.host, duration, getUp(), getDown(), ""); + tracked = false; } } @@ -338,8 +352,12 @@ void Controller::statSession::finish(unsigned long index){ /// Constructs an empty session Controller::statSession::statSession(){ + firstActive = 0; + tracked = false; firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; + wipedUp = 0; + wipedDown = 0; sessionType = SESS_UNSET; } @@ -437,6 +455,28 @@ bool Controller::statSession::isViewerOn(unsigned long long t){ return getUp(t) + getDown(t) > COUNTABLE_BYTES; } +/// Returns true if this session should count as a viewer +bool Controller::statSession::isViewer(){ + long long upTotal = wipedUp+wipedDown; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){ + upTotal += it->log.rbegin()->second.up + it->log.rbegin()->second.down; + if (upTotal > COUNTABLE_BYTES){return true;} + } + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){ + upTotal += it->second.log.rbegin()->second.up + it->second.log.rbegin()->second.down; + if (upTotal > COUNTABLE_BYTES){return true;} + } + } + } + return false; +} + /// Returns the cumulative connected time for this session at timestamp t. long long Controller::statSession::getConnTime(unsigned long long t){ long long retVal = 0; @@ -478,7 +518,7 @@ long long Controller::statSession::getLastSecond(unsigned long long t){ /// Returns the cumulative downloaded bytes for this session at timestamp t. long long Controller::statSession::getDown(unsigned long long t){ - long long retVal = 0; + long long retVal = wipedDown; if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ if (it->hasDataFor(t)){ @@ -498,7 +538,7 @@ long long Controller::statSession::getDown(unsigned long long t){ /// Returns the cumulative uploaded bytes for this session at timestamp t. long long Controller::statSession::getUp(unsigned long long t){ - long long retVal = 0; + long long retVal = wipedUp; if (oldConns.size()){ for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ if (it->hasDataFor(t)){ @@ -518,7 +558,14 @@ long long Controller::statSession::getUp(unsigned long long t){ /// Returns the cumulative downloaded bytes for this session at timestamp t. long long Controller::statSession::getDown(){ - long long retVal = 0; + long long retVal = wipedDown; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){ + retVal += it->log.rbegin()->second.down; + } + } + } if (curConns.size()){ for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ if (it->second.log.size()){ @@ -531,7 +578,14 @@ long long Controller::statSession::getDown(){ /// Returns the cumulative uploaded bytes for this session at timestamp t. long long Controller::statSession::getUp(){ - long long retVal = 0; + long long retVal = wipedUp; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){ + retVal += it->log.rbegin()->second.up; + } + } + } if (curConns.size()){ for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ if (it->second.log.size()){ @@ -574,11 +628,6 @@ long long Controller::statSession::getBpsUp(unsigned long long t){ } } -Controller::statStorage::statStorage(){ - removeDown = 0; - removeUp = 0; -} - /// Returns true if there is data available for timestamp t. bool Controller::statStorage::hasDataFor(unsigned long long t) { if (!log.size()){return false;} @@ -608,13 +657,8 @@ void Controller::statStorage::update(IPC::statExchange & data) { statLog tmp; tmp.time = data.time(); tmp.lastSecond = data.lastSecond(); - tmp.down = data.down() - removeDown; - tmp.up = data.up() - removeUp; - if (!log.size() && tmp.down + tmp.up > COUNTABLE_BYTES){ - //substract the start values if they are too high - this is a resumed connection of some sort - removeDown = tmp.down; - removeUp = tmp.up; - } + tmp.down = data.down(); + tmp.up = data.up(); log[data.now()] = tmp; //wipe data older than approx. STAT_CUTOFF seconds /// \todo Remove least interesting data first. @@ -651,6 +695,10 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ INSANE_MSG("Ended connection: %lu as %s", id, idx.toStr().c_str()); sessions[idx].finish(id); connToSession.erase(id); + }else{ + if (sessions[idx].getSessType() != SESS_OUTPUT && sessions[idx].getSessType() != SESS_UNSET){ + std::string strmName = tmpEx.streamName(); + } } } diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index ebc3124d..31c6e4f0 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -49,11 +49,7 @@ namespace Controller { class statStorage { - private: - long long removeUp; - long long removeDown; public: - statStorage(); void update(IPC::statExchange & data); bool hasDataFor(unsigned long long); statLog & getDataFor(unsigned long long); @@ -64,10 +60,14 @@ namespace Controller { /// Allows for moving of connections to another session. class statSession { private: + uint64_t firstActive; unsigned long long firstSec; unsigned long long lastSec; + unsigned long long wipedUp; + unsigned long long wipedDown; std::deque oldConns; sessType sessionType; + bool tracked; public: statSession(); std::map curConns; @@ -80,6 +80,7 @@ namespace Controller { unsigned long long getStart(); unsigned long long getEnd(); bool isViewerOn(unsigned long long time); + bool isViewer(); bool hasDataFor(unsigned long long time); bool hasData(); long long getConnTime(unsigned long long time); diff --git a/src/output/output.h b/src/output/output.h index 009cc6d3..21391cd7 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -82,13 +82,15 @@ namespace Mist { int pageNumForKey(long unsigned int trackId, long long int keyNum); int pageNumMax(long unsigned int trackId); unsigned int lastStats;///