diff --git a/src/controller.cpp b/src/controller.cpp index 4007a3b9..0fc6dfbb 100644 --- a/src/controller.cpp +++ b/src/controller.cpp @@ -35,36 +35,11 @@ #define COMPILED_USERNAME "" #define COMPILED_PASSWORD "" -Socket::Server API_Socket; ///< Main connection socket. +namespace Connector{ + std::map lastBuffer; ///< Last moment of contact with all buffers. Auth keychecker; ///< Checks key authorization. -/// Basic signal handler. Disconnects the server_socket if it receives -/// a SIGINT, SIGHUP or SIGTERM signal, but does nothing for SIGPIPE. -/// Disconnecting the server_socket will terminate the main listening loop -/// and cleanly shut down the process. -void signal_handler (int signum){ - switch (signum){ - case SIGINT: - #if DEBUG >= 1 - fprintf(stderr, "Received SIGINT - closing server socket.\n"); - #endif - break; - case SIGHUP: - #if DEBUG >= 1 - fprintf(stderr, "Received SIGHUP - closing server socket.\n"); - #endif - break; - case SIGTERM: - #if DEBUG >= 1 - fprintf(stderr, "Received SIGTERM - closing server socket.\n"); - #endif - break; - default: return; break; - } - API_Socket.close(); -}//signal_handler - /// Wrapper function for openssl MD5 implementation std::string md5(std::string input){ char tmp[3]; @@ -110,6 +85,7 @@ void Log(std::string kind, std::string message){ m.append(kind); m.append(message); Storage["log"].append(m); + Storage["log"].shrink(100);//limit to 100 log messages std::cout << "[" << kind << "] " << message << std::endl; } @@ -131,7 +107,9 @@ void Authorize( JSON::Value & Request, JSON::Value & Response, ConnectedUser & c } } if (UserID != ""){ - Log("AUTH", "Failed login attempt "+UserID+" @ "+conn.C.getHost()); + if (Request["authorize"]["password"].asString() != "" && md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){ + Log("AUTH", "Failed login attempt "+UserID+" @ "+conn.C.getHost()); + } } conn.logins++; } @@ -238,17 +216,22 @@ void startStream(std::string name, JSON::Value & data){ std::string cmd1, cmd2, cmd3; if (URL.substr(0, 4) == "push"){ std::string pusher = URL.substr(7); - cmd2 = "MistBuffer "+name+" "+pusher; + cmd2 = "MistBuffer -s "+name+" "+pusher; Util::Procs::Start(name, cmd2); Log("BUFF", "(re)starting stream buffer "+name+" for push data from "+pusher); }else{ if (URL.substr(0, 1) == "/"){ + struct stat fileinfo; + if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){ + Log("BUFF", "*Not* starting stream "+name+"! File not found: "+URL); + return; + } cmd1 = "cat "+URL; }else{ cmd1 = "ffmpeg -re -async 2 -i "+URL+" "+preset+" -f flv -"; cmd2 = "MistFLV2DTSC"; } - cmd3 = "MistBuffer "+name; + cmd3 = "MistBuffer -s "+name; if (cmd2 != ""){ Util::Procs::Start(name, cmd1, cmd2, cmd3); Log("BUFF", "(re)starting stream buffer "+name+" for ffmpeg data: "+cmd1); @@ -308,16 +291,18 @@ void CheckStreams(JSON::Value & in, JSON::Value & out){ } } +}; //Connector namespace + int main(int argc, char ** argv){ - Storage = JSON::fromFile("config.json"); + Connector::Storage = JSON::fromFile("config.json"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); - stored_port["default"] = Storage["config"]["controller"]["port"]; + stored_port["default"] = Connector::Storage["config"]["controller"]["port"]; if (!stored_port["default"]){stored_port["default"] = 4242;} JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}"); - stored_interface["default"] = Storage["config"]["controller"]["interface"]; + stored_interface["default"] = Connector::Storage["config"]["controller"]["interface"]; if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";} JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}"); - stored_user["default"] = Storage["config"]["controller"]["username"]; + stored_user["default"] = Connector::Storage["config"]["controller"]["username"]; if (!stored_user["default"]){stored_user["default"] = "root";} Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); conf.addOption("listen_port", stored_port); @@ -325,6 +310,7 @@ int main(int argc, char ** argv){ conf.addOption("username", stored_user); conf.addOption("daemonize", JSON::fromString("{\"long\":\"daemon\", \"short\":\"d\", \"default\":1, \"long_off\":\"nodaemon\", \"short_off\":\"n\", \"help\":\"Whether or not to daemonize the process after starting.\"}")); conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}")); + conf.addOption("uplink", JSON::fromString("{\"default\":0, \"help\":\"Enable MistSteward uplink.\", \"short\":\"U\", \"long\":\"uplink\"}")); conf.parseArgs(argc, argv); std::string account = conf.getString("account"); @@ -333,39 +319,38 @@ int main(int argc, char ** argv){ if (colon != std::string::npos && colon != 0 && colon != account.size()){ std::string uname = account.substr(0, colon); std::string pword = account.substr(colon + 1, std::string::npos); - Log("CONF", "Created account "+uname+" through commandline option"); - Storage["account"][uname]["password"] = md5(pword); + Connector::Log("CONF", "Created account "+uname+" through commandline option"); + Connector::Storage["account"][uname]["password"] = Connector::md5(pword); } } time_t lastuplink = 0; time_t processchecker = 0; - API_Socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"), true); + Socket::Server API_Socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"), true); mkdir("/tmp/mist", S_IRWXU | S_IRWXG | S_IRWXO);//attempt to create /tmp/mist/ - ignore failures Socket::Server Stats_Socket = Socket::Server("/tmp/mist/statistics", true); conf.activate(); Socket::Connection Incoming; - std::vector< ConnectedUser > users; + std::vector< Connector::ConnectedUser > users; std::vector buffers; JSON::Value Request; JSON::Value Response; std::string jsonp; - ConnectedUser * uplink = 0; - Log("CONF", "Controller started"); + Connector::ConnectedUser * uplink = 0; + Connector::Log("CONF", "Controller started"); + conf.activate(); while (API_Socket.connected() && conf.is_active){ usleep(100000); //sleep for 100 ms - prevents 100% CPU time if (time(0) - processchecker > 10){ processchecker = time(0); - CheckProtocols(Storage["config"]["protocols"]); - CheckAllStreams(Storage["streams"]); + Connector::CheckProtocols(Connector::Storage["config"]["protocols"]); + Connector::CheckAllStreams(Connector::Storage["streams"]); } - /// \todo Uplink disabled until gearbox is live. Simply comment this section back in to re-enable. - /* - if (time(0) - lastuplink > UPLINK_INTERVAL){ + if (conf.getBool("uplink") && time(0) - lastuplink > UPLINK_INTERVAL){ lastuplink = time(0); bool gotUplink = false; if (users.size() > 0){ - for( std::vector< ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { + for( std::vector< Connector::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { if (!it->C.connected()){ it->C.close(); users.erase(it); @@ -385,23 +370,22 @@ int main(int argc, char ** argv){ } if (gotUplink){ Response.null(); //make sure no data leaks from previous requests - Response["config"] = Storage["config"]; - Response["streams"] = Storage["streams"]; - Response["log"] = Storage["log"]; - Response["statistics"] = Storage["statistics"]; + Response["config"] = Connector::Storage["config"]; + Response["streams"] = Connector::Storage["streams"]; + Response["log"] = Connector::Storage["log"]; + Response["statistics"] = Connector::Storage["statistics"]; Response["now"] = (unsigned int)lastuplink; uplink->H.Clean(); uplink->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); uplink->H.BuildRequest(); uplink->C.Send(uplink->H.BuildResponse("200", "OK")); uplink->H.Clean(); - //Log("UPLK", "Sending server data to uplink."); + //Connector::Log("UPLK", "Sending server data to uplink."); }else{ - Log("UPLK", "Could not connect to uplink."); + Connector::Log("UPLK", "Could not connect to uplink."); } } - */ - + Incoming = API_Socket.accept(); if (Incoming.connected()){users.push_back(Incoming);} Incoming = Stats_Socket.accept(); @@ -416,17 +400,19 @@ int main(int argc, char ** argv){ if (it->spool()){ size_t newlines = it->Received().find("\n\n"); while (newlines != std::string::npos){ - Request = it->Received().substr(0, newlines); + Request = JSON::fromString(it->Received().substr(0, newlines)); if (Request.isMember("totals") && Request["totals"].isMember("buffer")){ std::string thisbuffer = Request["totals"]["buffer"]; - lastBuffer[thisbuffer] = time(0); - Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; + Connector::lastBuffer[thisbuffer] = time(0); + Connector::Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; std::stringstream st; st << (long long int)Request["totals"]["now"]; std::string nowstr = st.str(); - Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"]; + Connector::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"]; + Connector::Storage["statistics"][thisbuffer]["totals"].shrink(600);//limit to 10 minutes of data for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){ - Storage["statistics"][thisbuffer]["log"].append(jit->second); + Connector::Storage["statistics"][thisbuffer]["log"].append(jit->second); + Connector::Storage["statistics"][thisbuffer]["log"].shrink(1000);//limit to 1000 users per buffer } } it->Received().erase(0, newlines+2); @@ -436,7 +422,7 @@ int main(int argc, char ** argv){ } } if (users.size() > 0){ - for( std::vector< ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) { + for( std::vector< Connector::ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) { if (!it->C.connected() || it->logins > 3){ it->C.close(); users.erase(it); @@ -454,57 +440,58 @@ int main(int argc, char ** argv){ if (Request["authorize"].isMember("challenge")){ it->logins++; if (it->logins > 2){ - Log("UPLK", "Max login attempts passed - dropping connection to uplink."); + Connector::Log("UPLK", "Max login attempts passed - dropping connection to uplink."); it->C.close(); }else{ - Response["config"] = Storage["config"]; - Response["streams"] = Storage["streams"]; - Response["log"] = Storage["log"]; - Response["statistics"] = Storage["statistics"]; + Response["config"] = Connector::Storage["config"]; + Response["streams"] = Connector::Storage["streams"]; + Response["log"] = Connector::Storage["log"]; + Response["statistics"] = Connector::Storage["statistics"]; Response["authorize"]["username"] = COMPILED_USERNAME; - Log("UPLK", "Responding to login challenge: " + (std::string)Request["authorize"]["challenge"]); - Response["authorize"]["password"] = md5(COMPILED_PASSWORD + (std::string)Request["authorize"]["challenge"]); + Connector::Log("UPLK", "Responding to login challenge: " + (std::string)Request["authorize"]["challenge"]); + Response["authorize"]["password"] = Connector::md5(COMPILED_PASSWORD + (std::string)Request["authorize"]["challenge"]); it->H.Clean(); it->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); it->H.BuildRequest(); it->C.Send(it->H.BuildResponse("200", "OK")); it->H.Clean(); - Log("UPLK", "Attempting login to uplink."); + Connector::Log("UPLK", "Attempting login to uplink."); } } }else{ - if (Request.isMember("config")){CheckConfig(Request["config"], Storage["config"]);} - if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);} + if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);} + if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);} if (Request.isMember("clearstatlogs")){ - Storage["log"].null(); - Storage["statistics"].null(); + Connector::Storage["log"].null(); + Connector::Storage["statistics"].null(); } } }else{ Request = JSON::fromString(it->H.GetVar("command")); - std::cout << "Request: " << Request.toString() << std::endl; Authorize(Request, Response, (*it)); if (it->Authorized){ //Parse config and streams from the request. - if (Request.isMember("config")){CheckConfig(Request["config"], Storage["config"]);} - if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);} + if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);} + if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);} if (Request.isMember("save")){ - WriteFile("config.json", Storage.toString()); - Log("CONF", "Config written to file on request through API"); + Connector::WriteFile("config.json", Connector::Storage.toString()); + Connector::Log("CONF", "Config written to file on request through API"); } //sent current configuration, no matter if it was changed or not //Response["streams"] = Storage["streams"]; - Response["config"] = Storage["config"]; - Response["streams"] = Storage["streams"]; + Response["config"] = Connector::Storage["config"]; + Response["streams"] = Connector::Storage["streams"]; //add required data to the current unix time to the config, for syncing reasons Response["config"]["time"] = (long long int)time(0); if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";} //sent any available logs and statistics - Response["log"] = Storage["log"]; - Response["statistics"] = Storage["statistics"]; - //clear log and statistics to prevent useless data transfer - Storage["log"].null(); - Storage["statistics"].null(); + Response["log"] = Connector::Storage["log"]; + Response["statistics"] = Connector::Storage["statistics"]; + //clear log and statistics if requested + if (Request.isMember("clearstatlogs")){ + Connector::Storage["log"].null(); + Connector::Storage["statistics"].null(); + } } jsonp = ""; if (it->H.GetVar("callback") != ""){jsonp = it->H.GetVar("callback");} @@ -526,9 +513,9 @@ int main(int argc, char ** argv){ } } API_Socket.close(); - Log("CONF", "Controller shutting down"); + Connector::Log("CONF", "Controller shutting down"); Util::Procs::StopAll(); - WriteFile("config.json", Storage.toString()); + Connector::WriteFile("config.json", Connector::Storage.toString()); std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; return 0; }