Now collects and summerizes stats in controller - sends to gearbox system. Closes #3

This commit is contained in:
Thulinma 2011-12-04 20:44:33 +01:00
parent fd16bf494e
commit 49cae23086
3 changed files with 60 additions and 5 deletions

View file

@ -1,4 +1,4 @@
SRC = main.cpp ../util/socket.cpp ../util/flv_tag.cpp SRC = main.cpp ../util/json/json_reader.cpp ../util/json/json_value.cpp ../util/json/json_writer.cpp ../util/socket.cpp ../util/flv_tag.cpp
OBJ = $(SRC:.cpp=.o) OBJ = $(SRC:.cpp=.o)
OUT = DDV_Buffer OUT = DDV_Buffer
INCLUDES = INCLUDES =

View file

@ -10,12 +10,16 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include <sstream>
#include "../util/flv_tag.h" //FLV format parser #include "../util/flv_tag.h" //FLV format parser
#include "../util/socket.h" //Socket lib #include "../util/socket.h" //Socket lib
#include "../util/json/json.h"
/// Holds all code unique to the Buffer. /// Holds all code unique to the Buffer.
namespace Buffer{ namespace Buffer{
Json::Value Storage = Json::Value(Json::objectValue); ///< Global storage of data.
///A simple signal handler that ignores all signals. ///A simple signal handler that ignores all signals.
void termination_handler (int signum){ void termination_handler (int signum){
switch (signum){ switch (signum){
@ -76,6 +80,7 @@ namespace Buffer{
int MyBuffer_num; ///< Number of currently used buffer. int MyBuffer_num; ///< Number of currently used buffer.
int MyBuffer_len; ///< Length in bytes of currently used buffer. int MyBuffer_len; ///< Length in bytes of currently used buffer.
int MyNum; ///< User ID of this user. int MyNum; ///< User ID of this user.
std::string MyStr; ///< User ID of this user as a string.
int currsend; ///< Current amount of bytes sent. int currsend; ///< Current amount of bytes sent.
Stats lastStats; ///< Holds last known stats for this connection. Stats lastStats; ///< Holds last known stats for this connection.
unsigned int curr_up; ///< Holds the current estimated transfer speed up. unsigned int curr_up; ///< Holds the current estimated transfer speed up.
@ -89,6 +94,9 @@ namespace Buffer{
user(Socket::Connection fd){ user(Socket::Connection fd){
S = fd; S = fd;
MyNum = UserCount++; MyNum = UserCount++;
std::stringstream st;
st << MyNum;
MyStr = st.str();
gotproperaudio = false; gotproperaudio = false;
curr_up = 0; curr_up = 0;
curr_down = 0; curr_down = 0;
@ -100,7 +108,14 @@ namespace Buffer{
if (S.connected()) { if (S.connected()) {
S.close(); S.close();
} }
std::cout << "Disconnected user " << MyNum << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; Storage["curr"].removeMember(MyStr);
Storage["log"][MyStr]["connector"] = lastStats.connector;
Storage["log"][MyStr]["up"] = lastStats.up;
Storage["log"][MyStr]["down"] = lastStats.down;
Storage["log"][MyStr]["conntime"] = lastStats.conntime;
Storage["log"][MyStr]["host"] = lastStats.host;
Storage["log"][MyStr]["start"] = (unsigned int)time(0) - lastStats.conntime;
std::cout << "Disconnected user " << MyStr << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl;
}//Disconnect }//Disconnect
/// Tries to send the current buffer, returns true if success, false otherwise. /// Tries to send the current buffer, returns true if success, false otherwise.
/// Has a side effect of dropping the connection if send will never complete. /// Has a side effect of dropping the connection if send will never complete.
@ -206,7 +221,12 @@ namespace Buffer{
unsigned int stattimer = 0; unsigned int stattimer = 0;
Socket::Connection incoming; Socket::Connection incoming;
Socket::Connection std_input(fileno(stdin)); Socket::Connection std_input(fileno(stdin));
Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
Storage["log"] = Json::Value(Json::objectValue);
Storage["curr"] = Json::Value(Json::objectValue);
Storage["totals"] = Json::Value(Json::objectValue);
unsigned char packtype; unsigned char packtype;
bool gotVideoInfo = false; bool gotVideoInfo = false;
bool gotAudioInfo = false; bool gotAudioInfo = false;
@ -224,7 +244,18 @@ namespace Buffer{
tot_up += usersIt->curr_up; tot_up += usersIt->curr_up;
tot_count++; tot_count++;
} }
std::cout << "Stats: " << tot_count << " viewers, " << tot_up << " up, " << tot_down << " down" << std::endl; }
Storage["totals"]["down"] = tot_down;
Storage["totals"]["up"] = tot_up;
Storage["totals"]["count"] = tot_count;
Storage["totals"]["now"] = now;
Storage["totals"]["buffer"] = argv[2];
if (!StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
}
if (StatsSocket.connected()){
StatsSocket.write(Storage.toStyledString());
Storage["log"].clear();
} }
} }
//invalidate the current buffer //invalidate the current buffer
@ -351,6 +382,12 @@ namespace Buffer{
(*usersIt).curr_up = (tmpStats.up - (*usersIt).lastStats.up) / secs; (*usersIt).curr_up = (tmpStats.up - (*usersIt).lastStats.up) / secs;
(*usersIt).curr_down = (tmpStats.down - (*usersIt).lastStats.down) / secs; (*usersIt).curr_down = (tmpStats.down - (*usersIt).lastStats.down) / secs;
(*usersIt).lastStats = tmpStats; (*usersIt).lastStats = tmpStats;
Storage["curr"][(*usersIt).MyStr]["connector"] = tmpStats.connector;
Storage["curr"][(*usersIt).MyStr]["up"] = tmpStats.up;
Storage["curr"][(*usersIt).MyStr]["down"] = tmpStats.down;
Storage["curr"][(*usersIt).MyStr]["conntime"] = tmpStats.conntime;
Storage["curr"][(*usersIt).MyStr]["host"] = tmpStats.host;
Storage["curr"][(*usersIt).MyStr]["start"] = (unsigned int) time(0) - tmpStats.conntime;
} }
} }
} }

View file

@ -446,7 +446,24 @@ int main(int argc, char ** argv){
buffers.erase(it); buffers.erase(it);
break; break;
} }
it->spool();
if (it->Received() != ""){
size_t newlines = it->Received().find("\n\n");
while (newlines != std::string::npos){
if (!JsonParse.parse(it->Received().substr(0, newlines), Request, false)){
if (Request.isMember("totals") && Request["totals"].isMember("buffer")){
std::string thisbuffer = Request["totals"]["buffer"].asString();
Storage["statistics"][thisbuffer]["curr"] = Request["curr"];
Storage["statistics"][thisbuffer]["totals"][Request["totals"]["now"].asString()] = Request["totals"];
for (Json::ValueIterator jit = Request["log"].begin(); jit != Request["log"].end(); jit++){
Storage["statistics"][thisbuffer]["log"][jit.memberName()] = Request["log"][jit.memberName()];
}
}
}
it->Received().erase(newlines+2);
newlines = it->Received().find("\n\n");
}
}
} }
} }
if (users.size() > 0){ if (users.size() > 0){
@ -495,7 +512,8 @@ int main(int argc, char ** argv){
if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);} if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);}
if (Request.isMember("clearstatlogs")){ if (Request.isMember("clearstatlogs")){
Storage["log"].clear(); Storage["log"].clear();
Storage["statistics"].clear(); /// \todo Uncomment this line after testing.
//Storage["statistics"].clear();
} }
//Log("UPLK", "Received data from uplink."); //Log("UPLK", "Received data from uplink.");
//WriteFile("config.json", Storage.toStyledString()); //WriteFile("config.json", Storage.toStyledString());