Fix statistics in Free edition
This commit is contained in:
parent
dff150419b
commit
b2995ea2db
5 changed files with 88 additions and 31 deletions
|
@ -123,7 +123,11 @@ void Controller::SharedMemStats(void * config){
|
|||
if (sessions.size()){
|
||||
std::list<sessIndex> mustWipe;
|
||||
unsigned long long cutOffPoint = Util::epoch() - STAT_CUTOFF;
|
||||
unsigned long long disconnectPointIn = Util::epoch() - STATS_INPUT_DELAY;
|
||||
unsigned long long disconnectPointOut = Util::epoch() - STATS_DELAY;
|
||||
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
|
||||
unsigned long long dPoint = it->second.getSessType() == SESS_INPUT ? disconnectPointIn : disconnectPointOut;
|
||||
it->second.ping(it->first, dPoint);
|
||||
it->second.wipeOld(cutOffPoint);
|
||||
if (!it->second.hasData()){
|
||||
mustWipe.push_back(it->first);
|
||||
|
@ -224,6 +228,10 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
|
|||
//store timestamp of last received data, if newer
|
||||
if (data.now() > lastSec){
|
||||
lastSec = data.now();
|
||||
if (!tracked){
|
||||
tracked = true;
|
||||
firstActive = firstSec;
|
||||
}
|
||||
}
|
||||
long long currDown = getDown();
|
||||
long long currUp = getUp();
|
||||
|
@ -285,6 +293,10 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){
|
|||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
while (it->log.size() && it->log.begin()->first < cutOff){
|
||||
if (it->log.size() == 1){
|
||||
wipedDown += it->log.begin()->second.down;
|
||||
wipedUp += it->log.begin()->second.up;
|
||||
}
|
||||
it->log.erase(it->log.begin());
|
||||
}
|
||||
if (it->log.size()){
|
||||
|
@ -312,6 +324,7 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){
|
|||
}
|
||||
|
||||
void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){
|
||||
if (!tracked){return;}
|
||||
if (lastSec < disconnectPoint){
|
||||
switch (sessionType){
|
||||
case SESS_INPUT:
|
||||
|
@ -324,9 +337,10 @@ void Controller::statSession::ping(const Controller::sessIndex & index, unsigned
|
|||
if (streamStats[index.streamName].currViews){streamStats[index.streamName].currViews--;}
|
||||
break;
|
||||
}
|
||||
uint64_t duration = lastSec - firstSec;
|
||||
uint64_t duration = lastSec - firstActive;
|
||||
if (duration < 1){duration = 1;}
|
||||
Controller::logAccess("", index.streamName, index.connector, index.host, duration, getUp(), getDown(), "");
|
||||
tracked = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,8 +352,12 @@ void Controller::statSession::finish(unsigned long index){
|
|||
|
||||
/// Constructs an empty session
|
||||
Controller::statSession::statSession(){
|
||||
firstActive = 0;
|
||||
tracked = false;
|
||||
firstSec = 0xFFFFFFFFFFFFFFFFull;
|
||||
lastSec = 0;
|
||||
wipedUp = 0;
|
||||
wipedDown = 0;
|
||||
sessionType = SESS_UNSET;
|
||||
}
|
||||
|
||||
|
@ -437,6 +455,28 @@ bool Controller::statSession::isViewerOn(unsigned long long t){
|
|||
return getUp(t) + getDown(t) > COUNTABLE_BYTES;
|
||||
}
|
||||
|
||||
/// Returns true if this session should count as a viewer
|
||||
bool Controller::statSession::isViewer(){
|
||||
long long upTotal = wipedUp+wipedDown;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->log.size()){
|
||||
upTotal += it->log.rbegin()->second.up + it->log.rbegin()->second.down;
|
||||
if (upTotal > COUNTABLE_BYTES){return true;}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.log.size()){
|
||||
upTotal += it->second.log.rbegin()->second.up + it->second.log.rbegin()->second.down;
|
||||
if (upTotal > COUNTABLE_BYTES){return true;}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Returns the cumulative connected time for this session at timestamp t.
|
||||
long long Controller::statSession::getConnTime(unsigned long long t){
|
||||
long long retVal = 0;
|
||||
|
@ -478,7 +518,7 @@ long long Controller::statSession::getLastSecond(unsigned long long t){
|
|||
|
||||
/// Returns the cumulative downloaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getDown(unsigned long long t){
|
||||
long long retVal = 0;
|
||||
long long retVal = wipedDown;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->hasDataFor(t)){
|
||||
|
@ -498,7 +538,7 @@ long long Controller::statSession::getDown(unsigned long long t){
|
|||
|
||||
/// Returns the cumulative uploaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getUp(unsigned long long t){
|
||||
long long retVal = 0;
|
||||
long long retVal = wipedUp;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->hasDataFor(t)){
|
||||
|
@ -518,7 +558,14 @@ long long Controller::statSession::getUp(unsigned long long t){
|
|||
|
||||
/// Returns the cumulative downloaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getDown(){
|
||||
long long retVal = 0;
|
||||
long long retVal = wipedDown;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->log.size()){
|
||||
retVal += it->log.rbegin()->second.down;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.log.size()){
|
||||
|
@ -531,7 +578,14 @@ long long Controller::statSession::getDown(){
|
|||
|
||||
/// Returns the cumulative uploaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getUp(){
|
||||
long long retVal = 0;
|
||||
long long retVal = wipedUp;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->log.size()){
|
||||
retVal += it->log.rbegin()->second.up;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.log.size()){
|
||||
|
@ -574,11 +628,6 @@ long long Controller::statSession::getBpsUp(unsigned long long t){
|
|||
}
|
||||
}
|
||||
|
||||
Controller::statStorage::statStorage(){
|
||||
removeDown = 0;
|
||||
removeUp = 0;
|
||||
}
|
||||
|
||||
/// Returns true if there is data available for timestamp t.
|
||||
bool Controller::statStorage::hasDataFor(unsigned long long t) {
|
||||
if (!log.size()){return false;}
|
||||
|
@ -608,13 +657,8 @@ void Controller::statStorage::update(IPC::statExchange & data) {
|
|||
statLog tmp;
|
||||
tmp.time = data.time();
|
||||
tmp.lastSecond = data.lastSecond();
|
||||
tmp.down = data.down() - removeDown;
|
||||
tmp.up = data.up() - removeUp;
|
||||
if (!log.size() && tmp.down + tmp.up > COUNTABLE_BYTES){
|
||||
//substract the start values if they are too high - this is a resumed connection of some sort
|
||||
removeDown = tmp.down;
|
||||
removeUp = tmp.up;
|
||||
}
|
||||
tmp.down = data.down();
|
||||
tmp.up = data.up();
|
||||
log[data.now()] = tmp;
|
||||
//wipe data older than approx. STAT_CUTOFF seconds
|
||||
/// \todo Remove least interesting data first.
|
||||
|
@ -651,6 +695,10 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
|
|||
INSANE_MSG("Ended connection: %lu as %s", id, idx.toStr().c_str());
|
||||
sessions[idx].finish(id);
|
||||
connToSession.erase(id);
|
||||
}else{
|
||||
if (sessions[idx].getSessType() != SESS_OUTPUT && sessions[idx].getSessType() != SESS_UNSET){
|
||||
std::string strmName = tmpEx.streamName();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,11 +49,7 @@ namespace Controller {
|
|||
|
||||
|
||||
class statStorage {
|
||||
private:
|
||||
long long removeUp;
|
||||
long long removeDown;
|
||||
public:
|
||||
statStorage();
|
||||
void update(IPC::statExchange & data);
|
||||
bool hasDataFor(unsigned long long);
|
||||
statLog & getDataFor(unsigned long long);
|
||||
|
@ -64,10 +60,14 @@ namespace Controller {
|
|||
/// Allows for moving of connections to another session.
|
||||
class statSession {
|
||||
private:
|
||||
uint64_t firstActive;
|
||||
unsigned long long firstSec;
|
||||
unsigned long long lastSec;
|
||||
unsigned long long wipedUp;
|
||||
unsigned long long wipedDown;
|
||||
std::deque<statStorage> oldConns;
|
||||
sessType sessionType;
|
||||
bool tracked;
|
||||
public:
|
||||
statSession();
|
||||
std::map<unsigned long, statStorage> curConns;
|
||||
|
@ -80,6 +80,7 @@ namespace Controller {
|
|||
unsigned long long getStart();
|
||||
unsigned long long getEnd();
|
||||
bool isViewerOn(unsigned long long time);
|
||||
bool isViewer();
|
||||
bool hasDataFor(unsigned long long time);
|
||||
bool hasData();
|
||||
long long getConnTime(unsigned long long time);
|
||||
|
|
|
@ -82,13 +82,15 @@ namespace Mist {
|
|||
int pageNumForKey(long unsigned int trackId, long long int keyNum);
|
||||
int pageNumMax(long unsigned int trackId);
|
||||
unsigned int lastStats;///<Time of last sending of stats.
|
||||
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
|
||||
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.
|
||||
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
|
||||
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
|
||||
protected://these are to be messed with by child classes
|
||||
bool pushing;
|
||||
std::string UA; ///< User Agent string, if known.
|
||||
uint16_t uaDelay;///<Seconds to wait before setting the UA.
|
||||
uint64_t lastRecv;
|
||||
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
|
||||
virtual std::string getConnectedHost();
|
||||
virtual std::string getConnectedBinHost();
|
||||
virtual std::string getStatsName();
|
||||
|
|
|
@ -104,7 +104,11 @@ namespace Mist {
|
|||
}
|
||||
char lineBuf[400];
|
||||
|
||||
snprintf(lineBuf, 400, "#EXTINF:%f,\r\n%lld_%lld.ts\r\n", (double)duration/1000, starttime, starttime + duration);
|
||||
if (sessId.size()){
|
||||
snprintf(lineBuf, 400, "#EXTINF:%f,\r\n%lld_%lld.ts?sessId=%s\r\n", (double)duration/1000, starttime, starttime + duration, sessId.c_str());
|
||||
}else{
|
||||
snprintf(lineBuf, 400, "#EXTINF:%f,\r\n%lld_%lld.ts\r\n", (double)duration/1000, starttime, starttime + duration);
|
||||
}
|
||||
durs.push_back(duration);
|
||||
total_dur += duration;
|
||||
lines.push_back(lineBuf);
|
||||
|
|
|
@ -12,9 +12,6 @@ namespace Mist {
|
|||
if (config->getString("ip").size()){
|
||||
myConn.setHost(config->getString("ip"));
|
||||
}
|
||||
if (config->getString("streamname").size()){
|
||||
streamName = config->getString("streamname");
|
||||
}
|
||||
config->activate();
|
||||
}
|
||||
|
||||
|
@ -245,6 +242,9 @@ namespace Mist {
|
|||
|
||||
void HTTPOutput::onRequest(){
|
||||
while (H.Read(myConn)){
|
||||
if (H.hasHeader("User-Agent")){
|
||||
UA = H.GetHeader("User-Agent");
|
||||
}
|
||||
if (hasSessionIDs()){
|
||||
if (H.GetVar("sessId").size()){
|
||||
std::string ua = H.GetVar("sessId");
|
||||
|
@ -254,8 +254,8 @@ namespace Mist {
|
|||
crc = checksum::crc32(0, ua.data(), ua.size());
|
||||
}
|
||||
}else{
|
||||
std::string ua = H.GetHeader("User-Agent") + H.GetHeader("X-Playback-Session-Id");
|
||||
crc = checksum::crc32(0, ua.data(), ua.size());
|
||||
std::string mixed_ua = UA + H.GetHeader("X-Playback-Session-Id");
|
||||
crc = checksum::crc32(0, mixed_ua.data(), mixed_ua.size());
|
||||
}
|
||||
|
||||
INFO_MSG("Received request %s", H.getUrl().c_str());
|
||||
|
@ -327,6 +327,7 @@ namespace Mist {
|
|||
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
|
||||
unsigned int prots_ctr = prots.getSize();
|
||||
|
||||
JSON::Value p;//properties of protocol
|
||||
for (unsigned int i=0; i < prots_ctr; ++i){
|
||||
if (prots.getIndice(i).getMember("connector").asString() == connector) {
|
||||
id = i;
|
||||
|
@ -349,20 +350,21 @@ namespace Mist {
|
|||
return;
|
||||
}
|
||||
}
|
||||
//read options from found connector
|
||||
p = prots.getIndice(id).asJSON();
|
||||
|
||||
DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str());
|
||||
//build arguments for starting output process
|
||||
|
||||
std::string temphost=getConnectedHost();
|
||||
std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString();
|
||||
std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector;
|
||||
|
||||
int argnum = 0;
|
||||
argarr[argnum++] = (char*)tmparg.c_str();
|
||||
JSON::Value p = prots.getIndice(id).asJSON();
|
||||
JSON::Value pipedCapa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(connector).asJSON();
|
||||
configLock.post();
|
||||
configLock.close();
|
||||
std::string temphost=getConnectedHost();
|
||||
std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString();
|
||||
argarr[argnum++] = (char*)"--ip";
|
||||
argarr[argnum++] = (char*)(temphost.c_str());
|
||||
argarr[argnum++] = (char*)"--stream";
|
||||
|
|
Loading…
Add table
Reference in a new issue