First version of capability detection, fixed Controller namespace label.

This commit is contained in:
Thulinma 2012-10-02 16:51:06 +02:00
parent c06184c212
commit 9cf33ed81f

View file

@ -35,7 +35,7 @@
#define COMPILED_USERNAME "" #define COMPILED_USERNAME ""
#define COMPILED_PASSWORD "" #define COMPILED_PASSWORD ""
namespace Connector{ namespace Controller{
std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers. std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
Auth keychecker; ///< Checks key authorization. Auth keychecker; ///< Checks key authorization.
@ -279,6 +279,74 @@ void CheckStats(JSON::Value & stats){
} }
} }
class cpudata {
public:
std::string model;
int cores;
int threads;
int mhz;
int id;
cpudata(){
model = "Unknown";
cores = 1;
threads = 1;
mhz = 0;
id = 0;
};
void fill(char * data){
int i;
i = 0;
if (sscanf(data, "model name : %n", &i) != EOF && i > 0){model = (data+i);}
if (sscanf(data, "cpu cores : %d", &i) == 1){cores = i;}
if (sscanf(data, "siblings : %d", &i) == 1){threads = i;}
if (sscanf(data, "physical id : %d", &i) == 1){id = i;}
if (sscanf(data, "cpu MHz : %d", &i) == 1){mhz = i;}
};
};
void checkCapable(JSON::Value & capa){
std::ifstream cpuinfo("/proc/cpuinfo");
if (!cpuinfo){
capa["cpu"].null();
}else{
std::map<int, cpudata> cpus;
char line[300];
int proccount = -1;
while (cpuinfo.good()){
cpuinfo.getline(line, 300);
if (cpuinfo.fail()){
//empty lines? ignore them, clear flags, continue
if (!cpuinfo.eof()){
cpuinfo.ignore();
cpuinfo.clear();
}
continue;
}
if (memcmp(line, "processor", 9) == 0){proccount++;}
cpus[proccount].fill(line);
}
//remove double physical IDs - we only want real CPUs.
std::set<int> used_physids;
int total_speed = 0;
int total_threads = 0;
for (int i = 0; i <= proccount; ++i){
if (!used_physids.count(cpus[i].id)){
used_physids.insert(cpus[i].id);
JSON::Value thiscpu;
thiscpu["model"] = cpus[i].model;
thiscpu["cores"] = cpus[i].cores;
thiscpu["threads"] = cpus[i].threads;
thiscpu["mhz"] = cpus[i].mhz;
capa["cpu"].append(thiscpu);
total_speed += cpus[i].cores * cpus[i].mhz;
total_threads += cpus[i].threads;
}
}
capa["speed"] = total_speed;
capa["threads"] = total_threads;
}
}
void CheckAllStreams(JSON::Value & data){ void CheckAllStreams(JSON::Value & data){
long long int currTime = Util::epoch(); long long int currTime = Util::epoch();
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){ for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
@ -334,15 +402,15 @@ void CheckStreams(JSON::Value & in, JSON::Value & out){
}; //Connector namespace }; //Connector namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Connector::Storage = JSON::fromFile("config.json"); Controller::Storage = JSON::fromFile("config.json");
JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}");
stored_port["default"] = Connector::Storage["config"]["controller"]["port"]; stored_port["default"] = Controller::Storage["config"]["controller"]["port"];
if (!stored_port["default"]){stored_port["default"] = 4242;} if (!stored_port["default"]){stored_port["default"] = 4242;}
JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}"); JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}");
stored_interface["default"] = Connector::Storage["config"]["controller"]["interface"]; stored_interface["default"] = Controller::Storage["config"]["controller"]["interface"];
if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";} if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";}
JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}"); JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}");
stored_user["default"] = Connector::Storage["config"]["controller"]["username"]; stored_user["default"] = Controller::Storage["config"]["controller"]["username"];
if (!stored_user["default"]){stored_user["default"] = "root";} if (!stored_user["default"]){stored_user["default"] = "root";}
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("listen_port", stored_port); conf.addOption("listen_port", stored_port);
@ -359,8 +427,8 @@ int main(int argc, char ** argv){
if (colon != std::string::npos && colon != 0 && colon != account.size()){ if (colon != std::string::npos && colon != 0 && colon != account.size()){
std::string uname = account.substr(0, colon); std::string uname = account.substr(0, colon);
std::string pword = account.substr(colon + 1, std::string::npos); std::string pword = account.substr(colon + 1, std::string::npos);
Connector::Log("CONF", "Created account "+uname+" through commandline option"); Controller::Log("CONF", "Created account "+uname+" through commandline option");
Connector::Storage["account"][uname]["password"] = Connector::md5(pword); Controller::Storage["account"][uname]["password"] = Controller::md5(pword);
} }
} }
time_t lastuplink = 0; time_t lastuplink = 0;
@ -370,28 +438,28 @@ int main(int argc, char ** argv){
Socket::Server Stats_Socket = Socket::Server("/tmp/mist/statistics", true); Socket::Server Stats_Socket = Socket::Server("/tmp/mist/statistics", true);
conf.activate(); conf.activate();
Socket::Connection Incoming; Socket::Connection Incoming;
std::vector< Connector::ConnectedUser > users; std::vector< Controller::ConnectedUser > users;
std::vector<Socket::Connection> buffers; std::vector<Socket::Connection> buffers;
JSON::Value Request; JSON::Value Request;
JSON::Value Response; JSON::Value Response;
std::string jsonp; std::string jsonp;
Connector::ConnectedUser * uplink = 0; Controller::ConnectedUser * uplink = 0;
Connector::Log("CONF", "Controller started"); Controller::Log("CONF", "Controller started");
conf.activate(); conf.activate();
while (API_Socket.connected() && conf.is_active){ while (API_Socket.connected() && conf.is_active){
usleep(10000); //sleep for 10 ms - prevents 100% CPU time usleep(10000); //sleep for 10 ms - prevents 100% CPU time
if (Util::epoch() - processchecker > 10){ if (Util::epoch() - processchecker > 10){
processchecker = Util::epoch(); processchecker = Util::epoch();
Connector::CheckProtocols(Connector::Storage["config"]["protocols"]); Controller::CheckProtocols(Controller::Storage["config"]["protocols"]);
Connector::CheckAllStreams(Connector::Storage["streams"]); Controller::CheckAllStreams(Controller::Storage["streams"]);
Connector::CheckStats(Connector::Storage["statistics"]); Controller::CheckStats(Controller::Storage["statistics"]);
} }
if (conf.getBool("uplink") && Util::epoch() - lastuplink > UPLINK_INTERVAL){ if (conf.getBool("uplink") && Util::epoch() - lastuplink > UPLINK_INTERVAL){
lastuplink = Util::epoch(); lastuplink = Util::epoch();
bool gotUplink = false; bool gotUplink = false;
if (users.size() > 0){ if (users.size() > 0){
for( std::vector< Connector::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { for( std::vector< Controller::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) {
if (!it->C.connected()){ if (!it->C.connected()){
it->C.close(); it->C.close();
users.erase(it); users.erase(it);
@ -411,19 +479,19 @@ int main(int argc, char ** argv){
} }
if (gotUplink){ if (gotUplink){
Response.null(); //make sure no data leaks from previous requests Response.null(); //make sure no data leaks from previous requests
Response["config"] = Connector::Storage["config"]; Response["config"] = Controller::Storage["config"];
Response["streams"] = Connector::Storage["streams"]; Response["streams"] = Controller::Storage["streams"];
Response["log"] = Connector::Storage["log"]; Response["log"] = Controller::Storage["log"];
Response["statistics"] = Connector::Storage["statistics"]; Response["statistics"] = Controller::Storage["statistics"];
Response["now"] = (unsigned int)lastuplink; Response["now"] = (unsigned int)lastuplink;
uplink->H.Clean(); uplink->H.Clean();
uplink->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); uplink->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString()));
uplink->H.BuildRequest(); uplink->H.BuildRequest();
uplink->C.Send(uplink->H.BuildResponse("200", "OK")); uplink->C.Send(uplink->H.BuildResponse("200", "OK"));
uplink->H.Clean(); uplink->H.Clean();
//Connector::Log("UPLK", "Sending server data to uplink."); //Controller::Log("UPLK", "Sending server data to uplink.");
}else{ }else{
Connector::Log("UPLK", "Could not connect to uplink."); Controller::Log("UPLK", "Could not connect to uplink.");
} }
} }
@ -445,43 +513,43 @@ int main(int argc, char ** argv){
it->Received().get().clear(); it->Received().get().clear();
if (Request.isMember("buffer")){ if (Request.isMember("buffer")){
std::string thisbuffer = Request["buffer"]; std::string thisbuffer = Request["buffer"];
Connector::lastBuffer[thisbuffer] = Util::epoch(); Controller::lastBuffer[thisbuffer] = Util::epoch();
//if metadata is available, store it //if metadata is available, store it
if (Request.isMember("meta")){ if (Request.isMember("meta")){
Connector::Storage["streams"][thisbuffer]["meta"] = Request["meta"]; Controller::Storage["streams"][thisbuffer]["meta"] = Request["meta"];
} }
if (Request.isMember("totals")){ if (Request.isMember("totals")){
Connector::Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; Controller::Storage["statistics"][thisbuffer]["curr"] = Request["curr"];
std::string nowstr = Request["totals"]["now"].asString(); std::string nowstr = Request["totals"]["now"].asString();
Connector::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"]; Controller::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"];
Connector::Storage["statistics"][thisbuffer]["totals"][nowstr].removeMember("now"); Controller::Storage["statistics"][thisbuffer]["totals"][nowstr].removeMember("now");
Connector::Storage["statistics"][thisbuffer]["totals"].shrink(600);//limit to 10 minutes of data Controller::Storage["statistics"][thisbuffer]["totals"].shrink(600);//limit to 10 minutes of data
for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){ for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){
Connector::Storage["statistics"][thisbuffer]["log"].append(jit->second); Controller::Storage["statistics"][thisbuffer]["log"].append(jit->second);
Connector::Storage["statistics"][thisbuffer]["log"].shrink(1000);//limit to 1000 users per buffer Controller::Storage["statistics"][thisbuffer]["log"].shrink(1000);//limit to 1000 users per buffer
} }
} }
} }
if (Request.isMember("vod")){ if (Request.isMember("vod")){
std::string thisfile = Request["vod"]["filename"]; std::string thisfile = Request["vod"]["filename"];
for (JSON::ObjIter oit = Connector::Storage["streams"].ObjBegin(); oit != Connector::Storage["streams"].ObjEnd(); ++oit){ for (JSON::ObjIter oit = Controller::Storage["streams"].ObjBegin(); oit != Controller::Storage["streams"].ObjEnd(); ++oit){
if (oit->second["channel"]["URL"].asString() == thisfile){ if (oit->second["channel"]["URL"].asString() == thisfile){
Connector::lastBuffer[oit->first] = Util::epoch(); Controller::lastBuffer[oit->first] = Util::epoch();
if (Request["vod"].isMember("meta")){ if (Request["vod"].isMember("meta")){
Connector::Storage["streams"][oit->first]["meta"] = Request["vod"]["meta"]; Controller::Storage["streams"][oit->first]["meta"] = Request["vod"]["meta"];
} }
JSON::Value sockit = (long long int)it->getSocket(); JSON::Value sockit = (long long int)it->getSocket();
std::string nowstr = Request["vod"]["now"].asString(); std::string nowstr = Request["vod"]["now"].asString();
Connector::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"]; Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"];
Connector::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta"); Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta");
JSON::Value nowtotal; JSON::Value nowtotal;
for (JSON::ObjIter u_it = Connector::Storage["statistics"][oit->first]["curr"].ObjBegin(); u_it != Connector::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){ for (JSON::ObjIter u_it = Controller::Storage["statistics"][oit->first]["curr"].ObjBegin(); u_it != Controller::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){
nowtotal["up"] = nowtotal["up"].asInt() + u_it->second["up"].asInt(); nowtotal["up"] = nowtotal["up"].asInt() + u_it->second["up"].asInt();
nowtotal["down"] = nowtotal["down"].asInt() + u_it->second["down"].asInt(); nowtotal["down"] = nowtotal["down"].asInt() + u_it->second["down"].asInt();
nowtotal["count"] = nowtotal["count"].asInt() + 1; nowtotal["count"] = nowtotal["count"].asInt() + 1;
} }
Connector::Storage["statistics"][oit->first]["totals"][nowstr] = nowtotal; Controller::Storage["statistics"][oit->first]["totals"][nowstr] = nowtotal;
Connector::Storage["statistics"][oit->first]["totals"].shrink(600); Controller::Storage["statistics"][oit->first]["totals"].shrink(600);
} }
} }
} }
@ -490,7 +558,7 @@ int main(int argc, char ** argv){
} }
} }
if (users.size() > 0){ if (users.size() > 0){
for( std::vector< Connector::ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) { for( std::vector< Controller::ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) {
if (!it->C.connected() || it->logins > 3){ if (!it->C.connected() || it->logins > 3){
it->C.close(); it->C.close();
users.erase(it); users.erase(it);
@ -518,30 +586,31 @@ int main(int argc, char ** argv){
if (Request["authorize"].isMember("challenge")){ if (Request["authorize"].isMember("challenge")){
it->logins++; it->logins++;
if (it->logins > 2){ if (it->logins > 2){
Connector::Log("UPLK", "Max login attempts passed - dropping connection to uplink."); Controller::Log("UPLK", "Max login attempts passed - dropping connection to uplink.");
it->C.close(); it->C.close();
}else{ }else{
Response["config"] = Connector::Storage["config"]; Response["config"] = Controller::Storage["config"];
Response["streams"] = Connector::Storage["streams"]; Response["streams"] = Controller::Storage["streams"];
Response["log"] = Connector::Storage["log"]; Response["log"] = Controller::Storage["log"];
Response["statistics"] = Connector::Storage["statistics"]; Response["statistics"] = Controller::Storage["statistics"];
Response["authorize"]["username"] = COMPILED_USERNAME; Response["authorize"]["username"] = COMPILED_USERNAME;
Connector::Log("UPLK", "Responding to login challenge: " + Request["authorize"]["challenge"].asString()); Controller::checkCapable(Response["capabilities"]);
Response["authorize"]["password"] = Connector::md5(COMPILED_PASSWORD + Request["authorize"]["challenge"].asString()); Controller::Log("UPLK", "Responding to login challenge: " + Request["authorize"]["challenge"].asString());
Response["authorize"]["password"] = Controller::md5(COMPILED_PASSWORD + Request["authorize"]["challenge"].asString());
it->H.Clean(); it->H.Clean();
it->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); it->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString()));
it->H.BuildRequest(); it->H.BuildRequest();
it->C.Send(it->H.BuildResponse("200", "OK")); it->C.Send(it->H.BuildResponse("200", "OK"));
it->H.Clean(); it->H.Clean();
Connector::Log("UPLK", "Attempting login to uplink."); Controller::Log("UPLK", "Attempting login to uplink.");
} }
} }
}else{ }else{
if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);} if (Request.isMember("config")){Controller::CheckConfig(Request["config"], Controller::Storage["config"]);}
if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);} if (Request.isMember("streams")){Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);}
if (Request.isMember("clearstatlogs")){ if (Request.isMember("clearstatlogs")){
Connector::Storage["log"].null(); Controller::Storage["log"].null();
Connector::Storage["statistics"].null(); Controller::Storage["statistics"].null();
} }
} }
}else{ }else{
@ -558,27 +627,28 @@ int main(int argc, char ** argv){
Authorize(Request, Response, (*it)); Authorize(Request, Response, (*it));
if (it->Authorized){ if (it->Authorized){
//Parse config and streams from the request. //Parse config and streams from the request.
if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);} if (Request.isMember("config")){Controller::CheckConfig(Request["config"], Controller::Storage["config"]);}
if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);} if (Request.isMember("streams")){Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);}
if (Request.isMember("save")){ if (Request.isMember("save")){
Connector::WriteFile("config.json", Connector::Storage.toString()); Controller::WriteFile("config.json", Controller::Storage.toString());
Connector::Log("CONF", "Config written to file on request through API"); Controller::Log("CONF", "Config written to file on request through API");
} }
//sent current configuration, no matter if it was changed or not //sent current configuration, no matter if it was changed or not
//Response["streams"] = Storage["streams"]; //Response["streams"] = Storage["streams"];
Response["config"] = Connector::Storage["config"]; Response["config"] = Controller::Storage["config"];
Controller::checkCapable(Response["capabilities"]);
Response["config"]["version"] = PACKAGE_VERSION "/" + Util::Config::libver; Response["config"]["version"] = PACKAGE_VERSION "/" + Util::Config::libver;
Response["streams"] = Connector::Storage["streams"]; Response["streams"] = Controller::Storage["streams"];
//add required data to the current unix time to the config, for syncing reasons //add required data to the current unix time to the config, for syncing reasons
Response["config"]["time"] = Util::epoch(); Response["config"]["time"] = Util::epoch();
if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";} if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";}
//sent any available logs and statistics //sent any available logs and statistics
Response["log"] = Connector::Storage["log"]; Response["log"] = Controller::Storage["log"];
Response["statistics"] = Connector::Storage["statistics"]; Response["statistics"] = Controller::Storage["statistics"];
//clear log and statistics if requested //clear log and statistics if requested
if (Request.isMember("clearstatlogs")){ if (Request.isMember("clearstatlogs")){
Connector::Storage["log"].null(); Controller::Storage["log"].null();
Connector::Storage["statistics"].null(); Controller::Storage["statistics"].null();
} }
} }
jsonp = ""; jsonp = "";
@ -601,9 +671,9 @@ int main(int argc, char ** argv){
} }
} }
API_Socket.close(); API_Socket.close();
Connector::Log("CONF", "Controller shutting down"); Controller::Log("CONF", "Controller shutting down");
Util::Procs::StopAll(); Util::Procs::StopAll();
Connector::WriteFile("config.json", Connector::Storage.toString()); Controller::WriteFile("config.json", Controller::Storage.toString());
std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
return 0; return 0;
} }