From 0716c319a6295bd6c34d9cdbc046efa1d3c55220 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 26 Nov 2011 03:16:28 +0100 Subject: [PATCH] Implemented stats logging on the connector level - closes #1 --- Buffer/main.cpp | 25 +++++++++++++++---------- Connector_HTTP/main.cpp | 9 +++++++++ Connector_RTMP/main.cpp | 14 ++++++++++++-- util/socket.cpp | 15 +++++++++++++++ util/socket.h | 2 ++ 5 files changed, 53 insertions(+), 12 deletions(-) diff --git a/Buffer/main.cpp b/Buffer/main.cpp index 677fce5d..fa5a40b6 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -271,18 +271,23 @@ namespace Buffer{ tmp += charbuf; } if (tmp != ""){ - std::cout << "Push attempt from IP " << tmp << std::endl; - if (tmp == waiting_ip){ - if (!ip_input.connected()){ - std::cout << "Push accepted!" << std::endl; - ip_input = (*usersIt).S; - users.erase(usersIt); - break; + if (tmp[0] == 'P'){ + std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl; + if (tmp.substr(2) == waiting_ip){ + if (!ip_input.connected()){ + std::cout << "Push accepted!" << std::endl; + ip_input = (*usersIt).S; + users.erase(usersIt); + break; + }else{ + (*usersIt).Disconnect("Push denied - push already in progress!"); + } }else{ - (*usersIt).Disconnect("Push denied - push already in progress!"); + (*usersIt).Disconnect("Push denied - invalid IP address!"); } - }else{ - (*usersIt).Disconnect("Push denied - invalid IP address!"); + } + if (tmp[0] == 'S'){ + /// \todo Parse and save stats } } } diff --git a/Connector_HTTP/main.cpp b/Connector_HTTP/main.cpp index 965590fb..77e6109f 100644 --- a/Connector_HTTP/main.cpp +++ b/Connector_HTTP/main.cpp @@ -175,6 +175,7 @@ namespace Connector_HTTP{ int Segment = -1; int ReqFragment = -1; int temp; + unsigned int lastStats = 0; //int CurrentFragment = -1; later herbruiken? while (conn.connected() && !FLV::Parse_Error){ @@ -263,6 +264,14 @@ namespace Connector_HTTP{ fprintf(stderr, "Sending a video fragment. %i left in buffer, %i requested\n", (int)Flash_FragBuffer.size(), Flash_RequestPending); #endif } + if (inited){ + unsigned int now = time(0); + if (now != lastStats){ + lastStats = now; + std::string stat = "S "+conn.getStats(); + ss.write(stat); + } + } ss.canRead(); switch (ss.ready()){ case -1: diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index d9e8215a..a82f2e45 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -58,6 +58,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ return 0; } + unsigned int lastStats = 0; + while (Socket.connected() && !FLV::Parse_Error){ //only parse input if available or not yet init'ed //rightnow = getNowMS(); @@ -83,7 +85,15 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif inited = true; + } + if (inited){ + unsigned int now = time(0); + if (now != lastStats){ + lastStats = now; + std::string stat = "S "+Socket.getStats(); + SS.write(stat); } + } SS.canRead(); switch (SS.ready()){ case -1: @@ -374,7 +384,7 @@ void Connector_RTMP::parseChunk(){ Socket.close();//disconnect user break; } - SS.write(Socket.getHost()+'\n'); + SS.write("P "+Socket.getHost()+'\n'); #if DEBUG >= 4 fprintf(stderr, "Connected to buffer, starting to sent data...\n"); #endif @@ -594,7 +604,7 @@ void Connector_RTMP::parseChunk(){ Socket.close();//disconnect user break; } - SS.write(Socket.getHost()+'\n'); + SS.write("P "+Socket.getHost()+'\n'); #if DEBUG >= 4 fprintf(stderr, "Connected to buffer, starting to send data...\n"); #endif diff --git a/util/socket.cpp b/util/socket.cpp index 6482b0a3..c7379470 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -11,12 +11,19 @@ #include #endif +std::string uint2string(unsigned int i){ + std::stringstream st; + st << i; + return st.str(); +} + /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// \param sockNo Integer representing the socket to convert. Socket::Connection::Connection(int sockNo){ sock = sockNo; up = 0; down = 0; + conntime = time(0); Error = false; Blocking = false; }//Socket::Connection basic constructor @@ -27,6 +34,7 @@ Socket::Connection::Connection(){ sock = -1; up = 0; down = 0; + conntime = time(0); Error = false; Blocking = false; }//Socket::Connection basic constructor @@ -64,6 +72,7 @@ Socket::Connection::Connection(std::string address, bool nonblock){ Blocking = false; up = 0; down = 0; + conntime = time(0); sockaddr_un addr; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, address.c_str(), address.size()+1); @@ -92,6 +101,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ Blocking = false; up = 0; down = 0; + conntime = time(0); std::stringstream ss; ss << port; @@ -205,6 +215,11 @@ unsigned int Socket::Connection::dataDown(){ return down; } +/// Returns a std::string of stats, ended by a newline. +std::string Socket::Connection::getStats(){ + return getHost() + uint2string(time(0) - conntime) + " " + uint2string(up) + uint2string(down) + "\n"; +} + /// Writes data to socket. This function blocks if the socket is blocking and all data cannot be written right away. /// If the socket is nonblocking and not all data can be written, this function sets internal variable Blocking to true /// and returns false. diff --git a/util/socket.h b/util/socket.h index 95c28d82..b1238abc 100644 --- a/util/socket.h +++ b/util/socket.h @@ -25,6 +25,7 @@ namespace Socket{ std::string remotehost; ///< Stores remote host address. unsigned int up; unsigned int down; + unsigned int conntime; public: Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. @@ -51,6 +52,7 @@ namespace Socket{ std::string getError(); ///< Returns a string describing the last error that occured. unsigned int dataUp(); ///< Returns total amount of bytes sent. unsigned int dataDown(); ///< Returns total amount of bytes received. + std::string getStats(); ///< Returns a std::string of stats, ended by a newline. friend class Server; bool Error; ///< Set to true if a socket error happened. bool Blocking; ///< Set to true if a socket is currently or wants to be blocking.