Migrate some older code to new style, reducing linecount quite a bit. Moved all non-main() code to a proper namespace. Removed log messages for non-encrypted auth attempts as well as missing-password auth. Update MistBuffer calling with the new -s commandline option. Check files for existance before catting them to buffers, added an error message in case this is not possible. Fixed statistics and logs being wiped for no reason. Set limits to stats/logs stored to limit memory and disk usage to sane amounts. Added new commandline option to turn on uplink, defaulting to off for now.

This commit is contained in:
Thulinma 2012-08-13 09:45:59 +02:00
parent 54276fd9bf
commit 6d7e4f5019

View file

@ -35,36 +35,11 @@
#define COMPILED_USERNAME "" #define COMPILED_USERNAME ""
#define COMPILED_PASSWORD "" #define COMPILED_PASSWORD ""
Socket::Server API_Socket; ///< Main connection socket. namespace Connector{
std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers. std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
Auth keychecker; ///< Checks key authorization. Auth keychecker; ///< Checks key authorization.
/// Basic signal handler. Disconnects the server_socket if it receives
/// a SIGINT, SIGHUP or SIGTERM signal, but does nothing for SIGPIPE.
/// Disconnecting the server_socket will terminate the main listening loop
/// and cleanly shut down the process.
void signal_handler (int signum){
switch (signum){
case SIGINT:
#if DEBUG >= 1
fprintf(stderr, "Received SIGINT - closing server socket.\n");
#endif
break;
case SIGHUP:
#if DEBUG >= 1
fprintf(stderr, "Received SIGHUP - closing server socket.\n");
#endif
break;
case SIGTERM:
#if DEBUG >= 1
fprintf(stderr, "Received SIGTERM - closing server socket.\n");
#endif
break;
default: return; break;
}
API_Socket.close();
}//signal_handler
/// Wrapper function for openssl MD5 implementation /// Wrapper function for openssl MD5 implementation
std::string md5(std::string input){ std::string md5(std::string input){
char tmp[3]; char tmp[3];
@ -110,6 +85,7 @@ void Log(std::string kind, std::string message){
m.append(kind); m.append(kind);
m.append(message); m.append(message);
Storage["log"].append(m); Storage["log"].append(m);
Storage["log"].shrink(100);//limit to 100 log messages
std::cout << "[" << kind << "] " << message << std::endl; std::cout << "[" << kind << "] " << message << std::endl;
} }
@ -131,7 +107,9 @@ void Authorize( JSON::Value & Request, JSON::Value & Response, ConnectedUser & c
} }
} }
if (UserID != ""){ if (UserID != ""){
Log("AUTH", "Failed login attempt "+UserID+" @ "+conn.C.getHost()); if (Request["authorize"]["password"].asString() != "" && md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){
Log("AUTH", "Failed login attempt "+UserID+" @ "+conn.C.getHost());
}
} }
conn.logins++; conn.logins++;
} }
@ -238,17 +216,22 @@ void startStream(std::string name, JSON::Value & data){
std::string cmd1, cmd2, cmd3; std::string cmd1, cmd2, cmd3;
if (URL.substr(0, 4) == "push"){ if (URL.substr(0, 4) == "push"){
std::string pusher = URL.substr(7); std::string pusher = URL.substr(7);
cmd2 = "MistBuffer "+name+" "+pusher; cmd2 = "MistBuffer -s "+name+" "+pusher;
Util::Procs::Start(name, cmd2); Util::Procs::Start(name, cmd2);
Log("BUFF", "(re)starting stream buffer "+name+" for push data from "+pusher); Log("BUFF", "(re)starting stream buffer "+name+" for push data from "+pusher);
}else{ }else{
if (URL.substr(0, 1) == "/"){ if (URL.substr(0, 1) == "/"){
struct stat fileinfo;
if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){
Log("BUFF", "*Not* starting stream "+name+"! File not found: "+URL);
return;
}
cmd1 = "cat "+URL; cmd1 = "cat "+URL;
}else{ }else{
cmd1 = "ffmpeg -re -async 2 -i "+URL+" "+preset+" -f flv -"; cmd1 = "ffmpeg -re -async 2 -i "+URL+" "+preset+" -f flv -";
cmd2 = "MistFLV2DTSC"; cmd2 = "MistFLV2DTSC";
} }
cmd3 = "MistBuffer "+name; cmd3 = "MistBuffer -s "+name;
if (cmd2 != ""){ if (cmd2 != ""){
Util::Procs::Start(name, cmd1, cmd2, cmd3); Util::Procs::Start(name, cmd1, cmd2, cmd3);
Log("BUFF", "(re)starting stream buffer "+name+" for ffmpeg data: "+cmd1); Log("BUFF", "(re)starting stream buffer "+name+" for ffmpeg data: "+cmd1);
@ -308,16 +291,18 @@ void CheckStreams(JSON::Value & in, JSON::Value & out){
} }
} }
}; //Connector namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Storage = JSON::fromFile("config.json"); Connector::Storage = JSON::fromFile("config.json");
JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}");
stored_port["default"] = Storage["config"]["controller"]["port"]; stored_port["default"] = Connector::Storage["config"]["controller"]["port"];
if (!stored_port["default"]){stored_port["default"] = 4242;} if (!stored_port["default"]){stored_port["default"] = 4242;}
JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}"); JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}");
stored_interface["default"] = Storage["config"]["controller"]["interface"]; stored_interface["default"] = Connector::Storage["config"]["controller"]["interface"];
if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";} if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";}
JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}"); JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}");
stored_user["default"] = Storage["config"]["controller"]["username"]; stored_user["default"] = Connector::Storage["config"]["controller"]["username"];
if (!stored_user["default"]){stored_user["default"] = "root";} if (!stored_user["default"]){stored_user["default"] = "root";}
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("listen_port", stored_port); conf.addOption("listen_port", stored_port);
@ -325,6 +310,7 @@ int main(int argc, char ** argv){
conf.addOption("username", stored_user); conf.addOption("username", stored_user);
conf.addOption("daemonize", JSON::fromString("{\"long\":\"daemon\", \"short\":\"d\", \"default\":1, \"long_off\":\"nodaemon\", \"short_off\":\"n\", \"help\":\"Whether or not to daemonize the process after starting.\"}")); conf.addOption("daemonize", JSON::fromString("{\"long\":\"daemon\", \"short\":\"d\", \"default\":1, \"long_off\":\"nodaemon\", \"short_off\":\"n\", \"help\":\"Whether or not to daemonize the process after starting.\"}"));
conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}")); conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}"));
conf.addOption("uplink", JSON::fromString("{\"default\":0, \"help\":\"Enable MistSteward uplink.\", \"short\":\"U\", \"long\":\"uplink\"}"));
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
std::string account = conf.getString("account"); std::string account = conf.getString("account");
@ -333,39 +319,38 @@ int main(int argc, char ** argv){
if (colon != std::string::npos && colon != 0 && colon != account.size()){ if (colon != std::string::npos && colon != 0 && colon != account.size()){
std::string uname = account.substr(0, colon); std::string uname = account.substr(0, colon);
std::string pword = account.substr(colon + 1, std::string::npos); std::string pword = account.substr(colon + 1, std::string::npos);
Log("CONF", "Created account "+uname+" through commandline option"); Connector::Log("CONF", "Created account "+uname+" through commandline option");
Storage["account"][uname]["password"] = md5(pword); Connector::Storage["account"][uname]["password"] = Connector::md5(pword);
} }
} }
time_t lastuplink = 0; time_t lastuplink = 0;
time_t processchecker = 0; time_t processchecker = 0;
API_Socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"), true); Socket::Server API_Socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"), true);
mkdir("/tmp/mist", S_IRWXU | S_IRWXG | S_IRWXO);//attempt to create /tmp/mist/ - ignore failures mkdir("/tmp/mist", S_IRWXU | S_IRWXG | S_IRWXO);//attempt to create /tmp/mist/ - ignore failures
Socket::Server Stats_Socket = Socket::Server("/tmp/mist/statistics", true); Socket::Server Stats_Socket = Socket::Server("/tmp/mist/statistics", true);
conf.activate(); conf.activate();
Socket::Connection Incoming; Socket::Connection Incoming;
std::vector< ConnectedUser > users; std::vector< Connector::ConnectedUser > users;
std::vector<Socket::Connection> buffers; std::vector<Socket::Connection> buffers;
JSON::Value Request; JSON::Value Request;
JSON::Value Response; JSON::Value Response;
std::string jsonp; std::string jsonp;
ConnectedUser * uplink = 0; Connector::ConnectedUser * uplink = 0;
Log("CONF", "Controller started"); Connector::Log("CONF", "Controller started");
conf.activate();
while (API_Socket.connected() && conf.is_active){ while (API_Socket.connected() && conf.is_active){
usleep(100000); //sleep for 100 ms - prevents 100% CPU time usleep(100000); //sleep for 100 ms - prevents 100% CPU time
if (time(0) - processchecker > 10){ if (time(0) - processchecker > 10){
processchecker = time(0); processchecker = time(0);
CheckProtocols(Storage["config"]["protocols"]); Connector::CheckProtocols(Connector::Storage["config"]["protocols"]);
CheckAllStreams(Storage["streams"]); Connector::CheckAllStreams(Connector::Storage["streams"]);
} }
/// \todo Uplink disabled until gearbox is live. Simply comment this section back in to re-enable. if (conf.getBool("uplink") && time(0) - lastuplink > UPLINK_INTERVAL){
/*
if (time(0) - lastuplink > UPLINK_INTERVAL){
lastuplink = time(0); lastuplink = time(0);
bool gotUplink = false; bool gotUplink = false;
if (users.size() > 0){ if (users.size() > 0){
for( std::vector< ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { for( std::vector< Connector::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) {
if (!it->C.connected()){ if (!it->C.connected()){
it->C.close(); it->C.close();
users.erase(it); users.erase(it);
@ -385,23 +370,22 @@ int main(int argc, char ** argv){
} }
if (gotUplink){ if (gotUplink){
Response.null(); //make sure no data leaks from previous requests Response.null(); //make sure no data leaks from previous requests
Response["config"] = Storage["config"]; Response["config"] = Connector::Storage["config"];
Response["streams"] = Storage["streams"]; Response["streams"] = Connector::Storage["streams"];
Response["log"] = Storage["log"]; Response["log"] = Connector::Storage["log"];
Response["statistics"] = Storage["statistics"]; Response["statistics"] = Connector::Storage["statistics"];
Response["now"] = (unsigned int)lastuplink; Response["now"] = (unsigned int)lastuplink;
uplink->H.Clean(); uplink->H.Clean();
uplink->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); uplink->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString()));
uplink->H.BuildRequest(); uplink->H.BuildRequest();
uplink->C.Send(uplink->H.BuildResponse("200", "OK")); uplink->C.Send(uplink->H.BuildResponse("200", "OK"));
uplink->H.Clean(); uplink->H.Clean();
//Log("UPLK", "Sending server data to uplink."); //Connector::Log("UPLK", "Sending server data to uplink.");
}else{ }else{
Log("UPLK", "Could not connect to uplink."); Connector::Log("UPLK", "Could not connect to uplink.");
} }
} }
*/
Incoming = API_Socket.accept(); Incoming = API_Socket.accept();
if (Incoming.connected()){users.push_back(Incoming);} if (Incoming.connected()){users.push_back(Incoming);}
Incoming = Stats_Socket.accept(); Incoming = Stats_Socket.accept();
@ -416,17 +400,19 @@ int main(int argc, char ** argv){
if (it->spool()){ if (it->spool()){
size_t newlines = it->Received().find("\n\n"); size_t newlines = it->Received().find("\n\n");
while (newlines != std::string::npos){ while (newlines != std::string::npos){
Request = it->Received().substr(0, newlines); Request = JSON::fromString(it->Received().substr(0, newlines));
if (Request.isMember("totals") && Request["totals"].isMember("buffer")){ if (Request.isMember("totals") && Request["totals"].isMember("buffer")){
std::string thisbuffer = Request["totals"]["buffer"]; std::string thisbuffer = Request["totals"]["buffer"];
lastBuffer[thisbuffer] = time(0); Connector::lastBuffer[thisbuffer] = time(0);
Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; Connector::Storage["statistics"][thisbuffer]["curr"] = Request["curr"];
std::stringstream st; std::stringstream st;
st << (long long int)Request["totals"]["now"]; st << (long long int)Request["totals"]["now"];
std::string nowstr = st.str(); std::string nowstr = st.str();
Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"]; Connector::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"];
Connector::Storage["statistics"][thisbuffer]["totals"].shrink(600);//limit to 10 minutes of data
for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){ for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){
Storage["statistics"][thisbuffer]["log"].append(jit->second); Connector::Storage["statistics"][thisbuffer]["log"].append(jit->second);
Connector::Storage["statistics"][thisbuffer]["log"].shrink(1000);//limit to 1000 users per buffer
} }
} }
it->Received().erase(0, newlines+2); it->Received().erase(0, newlines+2);
@ -436,7 +422,7 @@ int main(int argc, char ** argv){
} }
} }
if (users.size() > 0){ if (users.size() > 0){
for( std::vector< ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) { for( std::vector< Connector::ConnectedUser >::iterator it = users.begin(); it != users.end(); it++) {
if (!it->C.connected() || it->logins > 3){ if (!it->C.connected() || it->logins > 3){
it->C.close(); it->C.close();
users.erase(it); users.erase(it);
@ -454,57 +440,58 @@ int main(int argc, char ** argv){
if (Request["authorize"].isMember("challenge")){ if (Request["authorize"].isMember("challenge")){
it->logins++; it->logins++;
if (it->logins > 2){ if (it->logins > 2){
Log("UPLK", "Max login attempts passed - dropping connection to uplink."); Connector::Log("UPLK", "Max login attempts passed - dropping connection to uplink.");
it->C.close(); it->C.close();
}else{ }else{
Response["config"] = Storage["config"]; Response["config"] = Connector::Storage["config"];
Response["streams"] = Storage["streams"]; Response["streams"] = Connector::Storage["streams"];
Response["log"] = Storage["log"]; Response["log"] = Connector::Storage["log"];
Response["statistics"] = Storage["statistics"]; Response["statistics"] = Connector::Storage["statistics"];
Response["authorize"]["username"] = COMPILED_USERNAME; Response["authorize"]["username"] = COMPILED_USERNAME;
Log("UPLK", "Responding to login challenge: " + (std::string)Request["authorize"]["challenge"]); Connector::Log("UPLK", "Responding to login challenge: " + (std::string)Request["authorize"]["challenge"]);
Response["authorize"]["password"] = md5(COMPILED_PASSWORD + (std::string)Request["authorize"]["challenge"]); Response["authorize"]["password"] = Connector::md5(COMPILED_PASSWORD + (std::string)Request["authorize"]["challenge"]);
it->H.Clean(); it->H.Clean();
it->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString())); it->H.SetBody("command="+HTTP::Parser::urlencode(Response.toString()));
it->H.BuildRequest(); it->H.BuildRequest();
it->C.Send(it->H.BuildResponse("200", "OK")); it->C.Send(it->H.BuildResponse("200", "OK"));
it->H.Clean(); it->H.Clean();
Log("UPLK", "Attempting login to uplink."); Connector::Log("UPLK", "Attempting login to uplink.");
} }
} }
}else{ }else{
if (Request.isMember("config")){CheckConfig(Request["config"], Storage["config"]);} if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);}
if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);} if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);}
if (Request.isMember("clearstatlogs")){ if (Request.isMember("clearstatlogs")){
Storage["log"].null(); Connector::Storage["log"].null();
Storage["statistics"].null(); Connector::Storage["statistics"].null();
} }
} }
}else{ }else{
Request = JSON::fromString(it->H.GetVar("command")); Request = JSON::fromString(it->H.GetVar("command"));
std::cout << "Request: " << Request.toString() << std::endl;
Authorize(Request, Response, (*it)); Authorize(Request, Response, (*it));
if (it->Authorized){ if (it->Authorized){
//Parse config and streams from the request. //Parse config and streams from the request.
if (Request.isMember("config")){CheckConfig(Request["config"], Storage["config"]);} if (Request.isMember("config")){Connector::CheckConfig(Request["config"], Connector::Storage["config"]);}
if (Request.isMember("streams")){CheckStreams(Request["streams"], Storage["streams"]);} if (Request.isMember("streams")){Connector::CheckStreams(Request["streams"], Connector::Storage["streams"]);}
if (Request.isMember("save")){ if (Request.isMember("save")){
WriteFile("config.json", Storage.toString()); Connector::WriteFile("config.json", Connector::Storage.toString());
Log("CONF", "Config written to file on request through API"); Connector::Log("CONF", "Config written to file on request through API");
} }
//sent current configuration, no matter if it was changed or not //sent current configuration, no matter if it was changed or not
//Response["streams"] = Storage["streams"]; //Response["streams"] = Storage["streams"];
Response["config"] = Storage["config"]; Response["config"] = Connector::Storage["config"];
Response["streams"] = Storage["streams"]; Response["streams"] = Connector::Storage["streams"];
//add required data to the current unix time to the config, for syncing reasons //add required data to the current unix time to the config, for syncing reasons
Response["config"]["time"] = (long long int)time(0); Response["config"]["time"] = (long long int)time(0);
if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";} if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";}
//sent any available logs and statistics //sent any available logs and statistics
Response["log"] = Storage["log"]; Response["log"] = Connector::Storage["log"];
Response["statistics"] = Storage["statistics"]; Response["statistics"] = Connector::Storage["statistics"];
//clear log and statistics to prevent useless data transfer //clear log and statistics if requested
Storage["log"].null(); if (Request.isMember("clearstatlogs")){
Storage["statistics"].null(); Connector::Storage["log"].null();
Connector::Storage["statistics"].null();
}
} }
jsonp = ""; jsonp = "";
if (it->H.GetVar("callback") != ""){jsonp = it->H.GetVar("callback");} if (it->H.GetVar("callback") != ""){jsonp = it->H.GetVar("callback");}
@ -526,9 +513,9 @@ int main(int argc, char ** argv){
} }
} }
API_Socket.close(); API_Socket.close();
Log("CONF", "Controller shutting down"); Connector::Log("CONF", "Controller shutting down");
Util::Procs::StopAll(); Util::Procs::StopAll();
WriteFile("config.json", Storage.toString()); Connector::WriteFile("config.json", Connector::Storage.toString());
std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
return 0; return 0;
} }