diff --git a/Buffer/main.cpp b/Buffer/main.cpp index fa5a40b6..180b9d1d 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -30,6 +30,44 @@ namespace Buffer{ FLV::Tag FLV; };//buffer + /// Converts a stats line to up, down, host, connector and conntime values. + class Stats{ + public: + unsigned int up; + unsigned int down; + std::string host; + std::string connector; + unsigned int conntime; + Stats(){ + up = 0; + down = 0; + conntime = 0; + } + Stats(std::string s){ + size_t f = s.find(' '); + if (f != std::string::npos){ + host = s.substr(0, f); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + connector = s.substr(0, f); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + conntime = atoi(s.substr(0, f).c_str()); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + up = atoi(s.substr(0, f).c_str()); + s.erase(0, f+1); + down = atoi(s.c_str()); + } + } + }; + /// Holds connected users. /// Keeps track of what buffer users are using and the connection status. class user{ @@ -39,6 +77,9 @@ namespace Buffer{ int MyBuffer_len; ///< Length in bytes of currently used buffer. int MyNum; ///< User ID of this user. int currsend; ///< Current amount of bytes sent. + Stats lastStats; ///< Holds last known stats for this connection. + unsigned int curr_up; ///< Holds the current estimated transfer speed up. + unsigned int curr_down; ///< Holds the current estimated transfer speed down. bool gotproperaudio; ///< Whether the user received proper audio yet. void * lastpointer; ///< Pointer to data part of current buffer. static int UserCount; ///< Global user counter. @@ -49,6 +90,8 @@ namespace Buffer{ S = fd; MyNum = UserCount++; gotproperaudio = false; + curr_up = 0; + curr_down = 0; std::cout << "User " << MyNum << " connected" << std::endl; }//constructor /// Disconnects the current user. Doesn't do anything if already disconnected. @@ -56,8 +99,8 @@ namespace Buffer{ void Disconnect(std::string reason) { if (S.connected()) { S.close(); - std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; } + std::cout << "Disconnected user " << MyNum << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; }//Disconnect /// Tries to send the current buffer, returns true if success, false otherwise. /// Has a side effect of dropping the connection if send will never complete. @@ -160,6 +203,7 @@ namespace Buffer{ int current_buffer = 0; int lastproper = 0;//last properly finished buffer number unsigned int loopcount = 0; + unsigned int stattimer = 0; Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); @@ -170,6 +214,19 @@ namespace Buffer{ while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){ usleep(1000); //sleep for 1 ms, to prevent 100% CPU time + unsigned int now = time(0); + if (now != stattimer){ + stattimer = now; + unsigned int tot_up = 0, tot_down = 0, tot_count = 0; + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + tot_down += usersIt->curr_down; + tot_up += usersIt->curr_up; + tot_count++; + } + std::cout << "Stats: " << tot_count << " viewers, " << tot_up << " up, " << tot_down << " down" << std::endl; + } + } //invalidate the current buffer ringbuf[current_buffer]->number = -1; if ( @@ -262,6 +319,7 @@ namespace Buffer{ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ //remove disconnected users if (!(*usersIt).S.connected()){ + (*usersIt).Disconnect("Closed"); users.erase(usersIt); break; }else{ if ((*usersIt).S.canRead()){ @@ -287,7 +345,12 @@ namespace Buffer{ } } if (tmp[0] == 'S'){ - /// \todo Parse and save stats + Stats tmpStats = Stats(tmp.substr(2)); + unsigned int secs = tmpStats.conntime - (*usersIt).lastStats.conntime; + if (secs < 1){secs = 1;} + (*usersIt).curr_up = (tmpStats.up - (*usersIt).lastStats.up) / secs; + (*usersIt).curr_down = (tmpStats.down - (*usersIt).lastStats.down) / secs; + (*usersIt).lastStats = tmpStats; } } } diff --git a/Connector_HTTP/main.cpp b/Connector_HTTP/main.cpp index 77e6109f..984c22d5 100644 --- a/Connector_HTTP/main.cpp +++ b/Connector_HTTP/main.cpp @@ -268,7 +268,7 @@ namespace Connector_HTTP{ unsigned int now = time(0); if (now != lastStats){ lastStats = now; - std::string stat = "S "+conn.getStats(); + std::string stat = "S "+conn.getStats("HTTP"); ss.write(stat); } } diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index a82f2e45..b55b563c 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -21,6 +21,7 @@ namespace Connector_RTMP{ //for connection to server bool ready4data = false; ///< Set to true when streaming starts. bool inited = false; ///< Set to true when ready to connect to Buffer. + bool nostats = false; ///< Set to true if no stats should be sent anymore (push mode). bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled. Socket::Connection Socket; ///< Socket connected to user @@ -86,11 +87,11 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ #endif inited = true; } - if (inited){ + if (inited && !nostats){ unsigned int now = time(0); if (now != lastStats){ lastStats = now; - std::string stat = "S "+Socket.getStats(); + std::string stat = "S "+Socket.getStats("RTMP"); SS.write(stat); } } @@ -385,6 +386,7 @@ void Connector_RTMP::parseChunk(){ break; } SS.write("P "+Socket.getHost()+'\n'); + nostats = true; #if DEBUG >= 4 fprintf(stderr, "Connected to buffer, starting to sent data...\n"); #endif @@ -605,6 +607,7 @@ void Connector_RTMP::parseChunk(){ break; } SS.write("P "+Socket.getHost()+'\n'); + nostats = true; #if DEBUG >= 4 fprintf(stderr, "Connected to buffer, starting to send data...\n"); #endif diff --git a/util/socket.cpp b/util/socket.cpp index c7379470..29b02cd1 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -216,8 +216,9 @@ unsigned int Socket::Connection::dataDown(){ } /// Returns a std::string of stats, ended by a newline. -std::string Socket::Connection::getStats(){ - return getHost() + uint2string(time(0) - conntime) + " " + uint2string(up) + uint2string(down) + "\n"; +/// Requires the current connector name as an argument. +std::string Socket::Connection::getStats(std::string C){ + return getHost() + " " + C + " " + uint2string(time(0) - conntime) + " " + uint2string(up) + " " + uint2string(down) + "\n"; } /// Writes data to socket. This function blocks if the socket is blocking and all data cannot be written right away. diff --git a/util/socket.h b/util/socket.h index b1238abc..a7a16dc9 100644 --- a/util/socket.h +++ b/util/socket.h @@ -52,7 +52,7 @@ namespace Socket{ std::string getError(); ///< Returns a string describing the last error that occured. unsigned int dataUp(); ///< Returns total amount of bytes sent. unsigned int dataDown(); ///< Returns total amount of bytes received. - std::string getStats(); ///< Returns a std::string of stats, ended by a newline. + std::string getStats(std::string C); ///< Returns a std::string of stats, ended by a newline. friend class Server; bool Error; ///< Set to true if a socket error happened. bool Blocking; ///< Set to true if a socket is currently or wants to be blocking.