Summerizing of stats in Buffer done - closes #2

This commit is contained in:
Thulinma 2011-11-26 04:27:04 +01:00
parent 0716c319a6
commit a53b255e8f
5 changed files with 75 additions and 8 deletions

View file

@ -30,6 +30,44 @@ namespace Buffer{
FLV::Tag FLV; FLV::Tag FLV;
};//buffer };//buffer
/// Converts a stats line to up, down, host, connector and conntime values.
class Stats{
public:
unsigned int up;
unsigned int down;
std::string host;
std::string connector;
unsigned int conntime;
Stats(){
up = 0;
down = 0;
conntime = 0;
}
Stats(std::string s){
size_t f = s.find(' ');
if (f != std::string::npos){
host = s.substr(0, f);
s.erase(0, f+1);
}
f = s.find(' ');
if (f != std::string::npos){
connector = s.substr(0, f);
s.erase(0, f+1);
}
f = s.find(' ');
if (f != std::string::npos){
conntime = atoi(s.substr(0, f).c_str());
s.erase(0, f+1);
}
f = s.find(' ');
if (f != std::string::npos){
up = atoi(s.substr(0, f).c_str());
s.erase(0, f+1);
down = atoi(s.c_str());
}
}
};
/// Holds connected users. /// Holds connected users.
/// Keeps track of what buffer users are using and the connection status. /// Keeps track of what buffer users are using and the connection status.
class user{ class user{
@ -39,6 +77,9 @@ namespace Buffer{
int MyBuffer_len; ///< Length in bytes of currently used buffer. int MyBuffer_len; ///< Length in bytes of currently used buffer.
int MyNum; ///< User ID of this user. int MyNum; ///< User ID of this user.
int currsend; ///< Current amount of bytes sent. int currsend; ///< Current amount of bytes sent.
Stats lastStats; ///< Holds last known stats for this connection.
unsigned int curr_up; ///< Holds the current estimated transfer speed up.
unsigned int curr_down; ///< Holds the current estimated transfer speed down.
bool gotproperaudio; ///< Whether the user received proper audio yet. bool gotproperaudio; ///< Whether the user received proper audio yet.
void * lastpointer; ///< Pointer to data part of current buffer. void * lastpointer; ///< Pointer to data part of current buffer.
static int UserCount; ///< Global user counter. static int UserCount; ///< Global user counter.
@ -49,6 +90,8 @@ namespace Buffer{
S = fd; S = fd;
MyNum = UserCount++; MyNum = UserCount++;
gotproperaudio = false; gotproperaudio = false;
curr_up = 0;
curr_down = 0;
std::cout << "User " << MyNum << " connected" << std::endl; std::cout << "User " << MyNum << " connected" << std::endl;
}//constructor }//constructor
/// Disconnects the current user. Doesn't do anything if already disconnected. /// Disconnects the current user. Doesn't do anything if already disconnected.
@ -56,8 +99,8 @@ namespace Buffer{
void Disconnect(std::string reason) { void Disconnect(std::string reason) {
if (S.connected()) { if (S.connected()) {
S.close(); S.close();
std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl;
} }
std::cout << "Disconnected user " << MyNum << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl;
}//Disconnect }//Disconnect
/// Tries to send the current buffer, returns true if success, false otherwise. /// Tries to send the current buffer, returns true if success, false otherwise.
/// Has a side effect of dropping the connection if send will never complete. /// Has a side effect of dropping the connection if send will never complete.
@ -160,6 +203,7 @@ namespace Buffer{
int current_buffer = 0; int current_buffer = 0;
int lastproper = 0;//last properly finished buffer number int lastproper = 0;//last properly finished buffer number
unsigned int loopcount = 0; unsigned int loopcount = 0;
unsigned int stattimer = 0;
Socket::Connection incoming; Socket::Connection incoming;
Socket::Connection std_input(fileno(stdin)); Socket::Connection std_input(fileno(stdin));
@ -170,6 +214,19 @@ namespace Buffer{
while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){ while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){
usleep(1000); //sleep for 1 ms, to prevent 100% CPU time usleep(1000); //sleep for 1 ms, to prevent 100% CPU time
unsigned int now = time(0);
if (now != stattimer){
stattimer = now;
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
tot_down += usersIt->curr_down;
tot_up += usersIt->curr_up;
tot_count++;
}
std::cout << "Stats: " << tot_count << " viewers, " << tot_up << " up, " << tot_down << " down" << std::endl;
}
}
//invalidate the current buffer //invalidate the current buffer
ringbuf[current_buffer]->number = -1; ringbuf[current_buffer]->number = -1;
if ( if (
@ -262,6 +319,7 @@ namespace Buffer{
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
//remove disconnected users //remove disconnected users
if (!(*usersIt).S.connected()){ if (!(*usersIt).S.connected()){
(*usersIt).Disconnect("Closed");
users.erase(usersIt); break; users.erase(usersIt); break;
}else{ }else{
if ((*usersIt).S.canRead()){ if ((*usersIt).S.canRead()){
@ -287,7 +345,12 @@ namespace Buffer{
} }
} }
if (tmp[0] == 'S'){ if (tmp[0] == 'S'){
/// \todo Parse and save stats Stats tmpStats = Stats(tmp.substr(2));
unsigned int secs = tmpStats.conntime - (*usersIt).lastStats.conntime;
if (secs < 1){secs = 1;}
(*usersIt).curr_up = (tmpStats.up - (*usersIt).lastStats.up) / secs;
(*usersIt).curr_down = (tmpStats.down - (*usersIt).lastStats.down) / secs;
(*usersIt).lastStats = tmpStats;
} }
} }
} }

View file

@ -268,7 +268,7 @@ namespace Connector_HTTP{
unsigned int now = time(0); unsigned int now = time(0);
if (now != lastStats){ if (now != lastStats){
lastStats = now; lastStats = now;
std::string stat = "S "+conn.getStats(); std::string stat = "S "+conn.getStats("HTTP");
ss.write(stat); ss.write(stat);
} }
} }

View file

@ -21,6 +21,7 @@ namespace Connector_RTMP{
//for connection to server //for connection to server
bool ready4data = false; ///< Set to true when streaming starts. bool ready4data = false; ///< Set to true when streaming starts.
bool inited = false; ///< Set to true when ready to connect to Buffer. bool inited = false; ///< Set to true when ready to connect to Buffer.
bool nostats = false; ///< Set to true if no stats should be sent anymore (push mode).
bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled. bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
Socket::Connection Socket; ///< Socket connected to user Socket::Connection Socket; ///< Socket connected to user
@ -86,11 +87,11 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
#endif #endif
inited = true; inited = true;
} }
if (inited){ if (inited && !nostats){
unsigned int now = time(0); unsigned int now = time(0);
if (now != lastStats){ if (now != lastStats){
lastStats = now; lastStats = now;
std::string stat = "S "+Socket.getStats(); std::string stat = "S "+Socket.getStats("RTMP");
SS.write(stat); SS.write(stat);
} }
} }
@ -385,6 +386,7 @@ void Connector_RTMP::parseChunk(){
break; break;
} }
SS.write("P "+Socket.getHost()+'\n'); SS.write("P "+Socket.getHost()+'\n');
nostats = true;
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Connected to buffer, starting to sent data...\n"); fprintf(stderr, "Connected to buffer, starting to sent data...\n");
#endif #endif
@ -605,6 +607,7 @@ void Connector_RTMP::parseChunk(){
break; break;
} }
SS.write("P "+Socket.getHost()+'\n'); SS.write("P "+Socket.getHost()+'\n');
nostats = true;
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Connected to buffer, starting to send data...\n"); fprintf(stderr, "Connected to buffer, starting to send data...\n");
#endif #endif

View file

@ -216,8 +216,9 @@ unsigned int Socket::Connection::dataDown(){
} }
/// Returns a std::string of stats, ended by a newline. /// Returns a std::string of stats, ended by a newline.
std::string Socket::Connection::getStats(){ /// Requires the current connector name as an argument.
return getHost() + uint2string(time(0) - conntime) + " " + uint2string(up) + uint2string(down) + "\n"; std::string Socket::Connection::getStats(std::string C){
return getHost() + " " + C + " " + uint2string(time(0) - conntime) + " " + uint2string(up) + " " + uint2string(down) + "\n";
} }
/// Writes data to socket. This function blocks if the socket is blocking and all data cannot be written right away. /// Writes data to socket. This function blocks if the socket is blocking and all data cannot be written right away.

View file

@ -52,7 +52,7 @@ namespace Socket{
std::string getError(); ///< Returns a string describing the last error that occured. std::string getError(); ///< Returns a string describing the last error that occured.
unsigned int dataUp(); ///< Returns total amount of bytes sent. unsigned int dataUp(); ///< Returns total amount of bytes sent.
unsigned int dataDown(); ///< Returns total amount of bytes received. unsigned int dataDown(); ///< Returns total amount of bytes received.
std::string getStats(); ///< Returns a std::string of stats, ended by a newline. std::string getStats(std::string C); ///< Returns a std::string of stats, ended by a newline.
friend class Server; friend class Server;
bool Error; ///< Set to true if a socket error happened. bool Error; ///< Set to true if a socket error happened.
bool Blocking; ///< Set to true if a socket is currently or wants to be blocking. bool Blocking; ///< Set to true if a socket is currently or wants to be blocking.