diff --git a/src/controller.cpp b/src/controller.cpp index c7d87cba..7b27ff1d 100644 --- a/src/controller.cpp +++ b/src/controller.cpp @@ -35,7 +35,7 @@ #define COMPILED_USERNAME "" #define COMPILED_PASSWORD "" -namespace Connector{ +namespace Controller{ std::map lastBuffer; ///< Last moment of contact with all buffers. Auth keychecker; ///< Checks key authorization. @@ -279,6 +279,74 @@ void CheckStats(JSON::Value & stats){ } } +class cpudata { + public: + std::string model; + int cores; + int threads; + int mhz; + int id; + cpudata(){ + model = "Unknown"; + cores = 1; + threads = 1; + mhz = 0; + id = 0; + }; + void fill(char * data){ + int i; + i = 0; + if (sscanf(data, "model name : %n", &i) != EOF && i > 0){model = (data+i);} + if (sscanf(data, "cpu cores : %d", &i) == 1){cores = i;} + if (sscanf(data, "siblings : %d", &i) == 1){threads = i;} + if (sscanf(data, "physical id : %d", &i) == 1){id = i;} + if (sscanf(data, "cpu MHz : %d", &i) == 1){mhz = i;} + }; +}; + +void checkCapable(JSON::Value & capa){ + std::ifstream cpuinfo("/proc/cpuinfo"); + if (!cpuinfo){ + capa["cpu"].null(); + }else{ + std::map cpus; + char line[300]; + int proccount = -1; + while (cpuinfo.good()){ + cpuinfo.getline(line, 300); + if (cpuinfo.fail()){ + //empty lines? ignore them, clear flags, continue + if (!cpuinfo.eof()){ + cpuinfo.ignore(); + cpuinfo.clear(); + } + continue; + } + if (memcmp(line, "processor", 9) == 0){proccount++;} + cpus[proccount].fill(line); + } + //remove double physical IDs - we only want real CPUs. + std::set used_physids; + int total_speed = 0; + int total_threads = 0; + for (int i = 0; i <= proccount; ++i){ + if (!used_physids.count(cpus[i].id)){ + used_physids.insert(cpus[i].id); + JSON::Value thiscpu; + thiscpu["model"] = cpus[i].model; + thiscpu["cores"] = cpus[i].cores; + thiscpu["threads"] = cpus[i].threads; + thiscpu["mhz"] = cpus[i].mhz; + capa["cpu"].append(thiscpu); + total_speed += cpus[i].cores * cpus[i].mhz; + total_threads += cpus[i].threads; + } + } + capa["speed"] = total_speed; + capa["threads"] = total_threads; + } +} + void CheckAllStreams(JSON::Value & data){ long long int currTime = Util::epoch(); for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){ @@ -334,15 +402,15 @@ void CheckStreams(JSON::Value & in, JSON::Value & out){ }; //Connector namespace int main(int argc, char ** argv){ - Connector::Storage = JSON::fromFile("config.json"); + Controller::Storage = JSON::fromFile("config.json"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); - stored_port["default"] = Connector::Storage["config"]["controller"]["port"]; + stored_port["default"] = Controller::Storage["config"]["controller"]["port"]; if (!stored_port["default"]){stored_port["default"] = 4242;} JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}"); - stored_interface["default"] = Connector::Storage["config"]["controller"]["interface"]; + stored_interface["default"] = Controller::Storage["config"]["controller"]["interface"]; if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";} JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}"); - stored_user["default"] = Connector::Storage["config"]["controller"]["username"]; + stored_user["default"] = Controller::Storage["config"]["controller"]["username"]; if (!stored_user["default"]){stored_user["default"] = "root";} Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); conf.addOption("listen_port", stored_port); @@ -359,8 +427,8 @@ int main(int argc, char ** argv){ if (colon != std::string::npos && colon != 0 && colon != account.size()){ std::string uname = account.substr(0, colon); std::string pword = account.substr(colon + 1, std::string::npos); - Connector::Log("CONF", "Created account "+uname+" through commandline option"); - Connector::Storage["account"][uname]["password"] = Connector::md5(pword); + Controller::Log("CONF", "Created account "+uname+" through commandline option"); + Controller::Storage["account"][uname]["password"] = Controller::md5(pword); } } time_t lastuplink = 0; @@ -370,28 +438,28 @@ int main(int argc, char ** argv){ Socket::Server Stats_Socket = Socket::Server("/tmp/mist/statistics", true); conf.activate(); Socket::Connection Incoming; - std::vector< Connector::ConnectedUser > users; + std::vector< Controller::ConnectedUser > users; std::vector buffers; JSON::Value Request; JSON::Value Response; std::string jsonp; - Connector::ConnectedUser * uplink = 0; - Connector::Log("CONF", "Controller started"); + Controller::ConnectedUser * uplink = 0; + Controller::Log("CONF", "Controller started"); conf.activate(); while (API_Socket.connected() && conf.is_active){ usleep(10000); //sleep for 10 ms - prevents 100% CPU time if (Util::epoch() - processchecker > 10){ processchecker = Util::epoch(); - Connector::CheckProtocols(Connector::Storage["config"]["protocols"]); - Connector::CheckAllStreams(Connector::Storage["streams"]); - Connector::CheckStats(Connector::Storage["statistics"]); + Controller::CheckProtocols(Controller::Storage["config"]["protocols"]); + Controller::CheckAllStreams(Controller::Storage["streams"]); + Controller::CheckStats(Controller::Storage["statistics"]); } if (conf.getBool("uplink") && Util::epoch() - lastuplink > UPLINK_INTERVAL){ lastuplink = Util::epoch(); bool gotUplink = false; if (users.size() > 0){ - for( std::vector< Connector::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { + for( std::vector< Controller::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { if (!it->C.connected()){ it->C.close(); users.erase(it); @@ -411,19 +479,19 @@ int main(int argc, char ** argv){ } if (gotUplink){ Response.null(); //make sure no data leaks from previous requests - Response["config"] = Connector::Storage["config"]; - Response["streams"] = Connector::Storage["streams"]; - Response["log"] = Connector::Storage["log"]; - Response["statistics"] = Connector::Storage["statistics"]; + Response["config"] = Controller::Storage["config"]; + Response["streams"] = Controller::Storage["streams"]; + Response["log"] = Controller::Storage["log"]; + Response["statistics"] = Controller::Storage["statistics"]; Response["now"] = (unsigned int)lastuplink; uplink->H.Clean(); uplink->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); uplink->H.BuildRequest(); uplink->C.Send(uplink->H.BuildResponse("200", "OK")); uplink->H.Clean(); - //Connector::Log("UPLK", "Sending server data to uplink."); + //Controller::Log("UPLK", "Sending server data to uplink."); }else{ - Connector::Log("UPLK", "Could not connect to uplink."); + Controller::Log("UPLK", "Could not connect to uplink."); } } @@ -445,43 +513,43 @@ int main(int argc, char ** argv){ it->Received().get().clear(); if (Request.isMember("buffer")){ std::string thisbuffer = Request["buffer"]; - Connector::lastBuffer[thisbuffer] = Util::epoch(); + Controller::lastBuffer[thisbuffer] = Util::epoch(); //if metadata is available, store it if (Request.isMember("meta")){ - Connector::Storage["streams"][thisbuffer]["meta"] = Request["meta"]; + Controller::Storage["streams"][thisbuffer]["meta"] = Request["meta"]; } if (Request.isMember("totals")){ - Connector::Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; + Controller::Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; std::string nowstr = Request["totals"]["now"].asString(); - Connector::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"]; - Connector::Storage["statistics"][thisbuffer]["totals"][nowstr].removeMember("now"); - Connector::Storage["statistics"][thisbuffer]["totals"].shrink(600);//limit to 10 minutes of data + Controller::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"]; + Controller::Storage["statistics"][thisbuffer]["totals"][nowstr].removeMember("now"); + Controller::Storage["statistics"][thisbuffer]["totals"].shrink(600);//limit to 10 minutes of data for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){ - Connector::Storage["statistics"][thisbuffer]["log"].append(jit->second); - Connector::Storage["statistics"][thisbuffer]["log"].shrink(1000);//limit to 1000 users per buffer + Controller::Storage["statistics"][thisbuffer]["log"].append(jit->second); + Controller::Storage["statistics"][thisbuffer]["log"].shrink(1000);//limit to 1000 users per buffer } } } if (Request.isMember("vod")){ std::string thisfile = Request["vod"]["filename"]; - for (JSON::ObjIter oit = Connector::Storage["streams"].ObjBegin(); oit != Connector::Storage["streams"].ObjEnd(); ++oit){ + for (JSON::ObjIter oit = Controller::Storage["streams"].ObjBegin(); oit != Controller::Storage["streams"].ObjEnd(); ++oit){ if (oit->second["channel"]["URL"].asString() == thisfile){ - Connector::lastBuffer[oit->first] = Util::epoch(); + Controller::lastBuffer[oit->first] = Util::epoch(); if (Request["vod"].isMember("meta")){ - Connector::Storage["streams"][oit->first]["meta"] = Request["vod"]["meta"]; + Controller::Storage["streams"][oit->first]["meta"] = Request["vod"]["meta"]; } JSON::Value sockit = (long long int)it->getSocket(); std::string nowstr = Request["vod"]["now"].asString(); - Connector::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"]; - Connector::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta"); + Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"]; + Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta"); JSON::Value nowtotal; - for (JSON::ObjIter u_it = Connector::Storage["statistics"][oit->first]["curr"].ObjBegin(); u_it != Connector::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){ + for (JSON::ObjIter u_it = Controller::Storage["statistics"][oit->first]["curr"].ObjBegin(); u_it != Controller::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){ nowtotal["up"] = nowtotal["up"].asInt() + u_it->second["up"].asInt(); nowtotal["down"] = nowtotal["down"].asInt() + u_it->second["down"].asInt(); nowtotal["count"] = nowtotal["count"].asInt() + 1; } - Connector::Storage["statistics"][oit->first]["totals"][nowstr] = nowtotal; - Connector::Storage["statistics"][oit->first]["totals"].shrink(600); + Controller::Storage["statistics"][oit->first]["totals"][nowstr] = nowtotal; + Controller::Storage["statistics"][oit->first]["totals"].shrink(600); } } } @@ -490,7 +558,7 @@ int main(int argc, char ** argv){ } } if (users.size() > 0){ - for( std::vector< Connector::ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) { + for( std::vector< Controller::ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) { if (!it->C.connected() || it->logins > 3){ it->C.close(); users.erase(it); @@ -518,30 +586,31 @@ int main(int argc, char ** argv){ if (Request["authorize"].isMember("challenge")){ it->logins++; if (it->logins > 2){ - Connector::Log("UPLK", "Max login attempts passed - dropping connection to uplink."); + Controller::Log("UPLK", "Max login attempts passed - dropping connection to uplink."); it->C.close(); }else{ - Response["config"] = Connector::Storage["config"]; - Response["streams"] = Connector::Storage["streams"]; - Response["log"] = Connector::Storage["log"]; - Response["statistics"] = Connector::Storage["statistics"]; + Response["config"] = Controller::Storage["config"]; + Response["streams"] = Controller::Storage["streams"]; + Response["log"] = Controller::Storage["log"]; + Response["statistics"] = Controller::Storage["statistics"]; Response["authorize"]["username"] = COMPILED_USERNAME; - Connector::Log("UPLK", "Responding to login challenge: " + Request["authorize"]["challenge"].asString()); - Response["authorize"]["password"] = Connector::md5(COMPILED_PASSWORD + Request["authorize"]["challenge"].asString()); + Controller::checkCapable(Response["capabilities"]); + Controller::Log("UPLK", "Responding to login challenge: " + Request["authorize"]["challenge"].asString()); + Response["authorize"]["password"] = Controller::md5(COMPILED_PASSWORD + Request["authorize"]["challenge"].asString()); it->H.Clean(); it->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); it->H.BuildRequest(); it->C.Send(it->H.BuildResponse("200", "OK")); it->H.Clean(); - Connector::Log("UPLK", "Attempting login to uplink."); + Controller::Log("UPLK", "Attempting login to uplink."); } } }else{ - if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);} - if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);} + if (Request.isMember("config")){Controller::CheckConfig(Request["config"], Controller::Storage["config"]);} + if (Request.isMember("streams")){Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);} if (Request.isMember("clearstatlogs")){ - Connector::Storage["log"].null(); - Connector::Storage["statistics"].null(); + Controller::Storage["log"].null(); + Controller::Storage["statistics"].null(); } } }else{ @@ -558,27 +627,28 @@ int main(int argc, char ** argv){ Authorize(Request, Response, (*it)); if (it->Authorized){ //Parse config and streams from the request. - if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);} - if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);} + if (Request.isMember("config")){Controller::CheckConfig(Request["config"], Controller::Storage["config"]);} + if (Request.isMember("streams")){Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);} if (Request.isMember("save")){ - Connector::WriteFile("config.json", Connector::Storage.toString()); - Connector::Log("CONF", "Config written to file on request through API"); + Controller::WriteFile("config.json", Controller::Storage.toString()); + Controller::Log("CONF", "Config written to file on request through API"); } //sent current configuration, no matter if it was changed or not //Response["streams"] = Storage["streams"]; - Response["config"] = Connector::Storage["config"]; + Response["config"] = Controller::Storage["config"]; + Controller::checkCapable(Response["capabilities"]); Response["config"]["version"] = PACKAGE_VERSION "/" + Util::Config::libver; - Response["streams"] = Connector::Storage["streams"]; + Response["streams"] = Controller::Storage["streams"]; //add required data to the current unix time to the config, for syncing reasons Response["config"]["time"] = Util::epoch(); if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";} //sent any available logs and statistics - Response["log"] = Connector::Storage["log"]; - Response["statistics"] = Connector::Storage["statistics"]; + Response["log"] = Controller::Storage["log"]; + Response["statistics"] = Controller::Storage["statistics"]; //clear log and statistics if requested if (Request.isMember("clearstatlogs")){ - Connector::Storage["log"].null(); - Connector::Storage["statistics"].null(); + Controller::Storage["log"].null(); + Controller::Storage["statistics"].null(); } } jsonp = ""; @@ -601,9 +671,9 @@ int main(int argc, char ** argv){ } } API_Socket.close(); - Connector::Log("CONF", "Controller shutting down"); + Controller::Log("CONF", "Controller shutting down"); Util::Procs::StopAll(); - Connector::WriteFile("config.json", Connector::Storage.toString()); + Controller::WriteFile("config.json", Controller::Storage.toString()); std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; return 0; }