From 49cae230866a363d96d08fe7f5c1b80ce1e742ac Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 4 Dec 2011 20:44:33 +0100 Subject: [PATCH] Now collects and summerizes stats in controller - sends to gearbox system. Closes #3 --- Buffer/Makefile | 2 +- Buffer/main.cpp | 41 +++++++++++++++++++++++++++++++++++++++-- DDV_Controller/main.cpp | 22 ++++++++++++++++++++-- 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/Buffer/Makefile b/Buffer/Makefile index 615507b1..20469803 100644 --- a/Buffer/Makefile +++ b/Buffer/Makefile @@ -1,4 +1,4 @@ -SRC = main.cpp ../util/socket.cpp ../util/flv_tag.cpp +SRC = main.cpp ../util/json/json_reader.cpp ../util/json/json_value.cpp ../util/json/json_writer.cpp ../util/socket.cpp ../util/flv_tag.cpp OBJ = $(SRC:.cpp=.o) OUT = DDV_Buffer INCLUDES = diff --git a/Buffer/main.cpp b/Buffer/main.cpp index 180b9d1d..2dd19ccb 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -10,12 +10,16 @@ #include #include #include +#include #include "../util/flv_tag.h" //FLV format parser #include "../util/socket.h" //Socket lib +#include "../util/json/json.h" /// Holds all code unique to the Buffer. namespace Buffer{ + Json::Value Storage = Json::Value(Json::objectValue); ///< Global storage of data. + ///A simple signal handler that ignores all signals. void termination_handler (int signum){ switch (signum){ @@ -76,6 +80,7 @@ namespace Buffer{ int MyBuffer_num; ///< Number of currently used buffer. int MyBuffer_len; ///< Length in bytes of currently used buffer. int MyNum; ///< User ID of this user. + std::string MyStr; ///< User ID of this user as a string. int currsend; ///< Current amount of bytes sent. Stats lastStats; ///< Holds last known stats for this connection. unsigned int curr_up; ///< Holds the current estimated transfer speed up. @@ -89,6 +94,9 @@ namespace Buffer{ user(Socket::Connection fd){ S = fd; MyNum = UserCount++; + std::stringstream st; + st << MyNum; + MyStr = st.str(); gotproperaudio = false; curr_up = 0; curr_down = 0; @@ -100,7 +108,14 @@ namespace Buffer{ if (S.connected()) { S.close(); } - std::cout << "Disconnected user " << MyNum << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; + Storage["curr"].removeMember(MyStr); + Storage["log"][MyStr]["connector"] = lastStats.connector; + Storage["log"][MyStr]["up"] = lastStats.up; + Storage["log"][MyStr]["down"] = lastStats.down; + Storage["log"][MyStr]["conntime"] = lastStats.conntime; + Storage["log"][MyStr]["host"] = lastStats.host; + Storage["log"][MyStr]["start"] = (unsigned int)time(0) - lastStats.conntime; + std::cout << "Disconnected user " << MyStr << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; }//Disconnect /// Tries to send the current buffer, returns true if success, false otherwise. /// Has a side effect of dropping the connection if send will never complete. @@ -206,7 +221,12 @@ namespace Buffer{ unsigned int stattimer = 0; Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); + Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); + Storage["log"] = Json::Value(Json::objectValue); + Storage["curr"] = Json::Value(Json::objectValue); + Storage["totals"] = Json::Value(Json::objectValue); + unsigned char packtype; bool gotVideoInfo = false; bool gotAudioInfo = false; @@ -224,7 +244,18 @@ namespace Buffer{ tot_up += usersIt->curr_up; tot_count++; } - std::cout << "Stats: " << tot_count << " viewers, " << tot_up << " up, " << tot_down << " down" << std::endl; + } + Storage["totals"]["down"] = tot_down; + Storage["totals"]["up"] = tot_up; + Storage["totals"]["count"] = tot_count; + Storage["totals"]["now"] = now; + Storage["totals"]["buffer"] = argv[2]; + if (!StatsSocket.connected()){ + StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); + } + if (StatsSocket.connected()){ + StatsSocket.write(Storage.toStyledString()); + Storage["log"].clear(); } } //invalidate the current buffer @@ -351,6 +382,12 @@ namespace Buffer{ (*usersIt).curr_up = (tmpStats.up - (*usersIt).lastStats.up) / secs; (*usersIt).curr_down = (tmpStats.down - (*usersIt).lastStats.down) / secs; (*usersIt).lastStats = tmpStats; + Storage["curr"][(*usersIt).MyStr]["connector"] = tmpStats.connector; + Storage["curr"][(*usersIt).MyStr]["up"] = tmpStats.up; + Storage["curr"][(*usersIt).MyStr]["down"] = tmpStats.down; + Storage["curr"][(*usersIt).MyStr]["conntime"] = tmpStats.conntime; + Storage["curr"][(*usersIt).MyStr]["host"] = tmpStats.host; + Storage["curr"][(*usersIt).MyStr]["start"] = (unsigned int) time(0) - tmpStats.conntime; } } } diff --git a/DDV_Controller/main.cpp b/DDV_Controller/main.cpp index 5c898a65..70c922f3 100644 --- a/DDV_Controller/main.cpp +++ b/DDV_Controller/main.cpp @@ -446,7 +446,24 @@ int main(int argc, char ** argv){ buffers.erase(it); break; } - + it->spool(); + if (it->Received() != ""){ + size_t newlines = it->Received().find("\n\n"); + while (newlines != std::string::npos){ + if (!JsonParse.parse(it->Received().substr(0, newlines), Request, false)){ + if (Request.isMember("totals") && Request["totals"].isMember("buffer")){ + std::string thisbuffer = Request["totals"]["buffer"].asString(); + Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; + Storage["statistics"][thisbuffer]["totals"][Request["totals"]["now"].asString()] = Request["totals"]; + for (Json::ValueIterator jit = Request["log"].begin(); jit != Request["log"].end(); jit++){ + Storage["statistics"][thisbuffer]["log"][jit.memberName()] = Request["log"][jit.memberName()]; + } + } + } + it->Received().erase(newlines+2); + newlines = it->Received().find("\n\n"); + } + } } } if (users.size() > 0){ @@ -495,7 +512,8 @@ int main(int argc, char ** argv){ if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);} if (Request.isMember("clearstatlogs")){ Storage["log"].clear(); - Storage["statistics"].clear(); + /// \todo Uncomment this line after testing. + //Storage["statistics"].clear(); } //Log("UPLK", "Received data from uplink."); //WriteFile("config.json", Storage.toStyledString());