From 6cf88f4cee7f7f87a0b88ee56a63c68589ff02e2 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 18 Oct 2011 09:10:19 +0200 Subject: [PATCH] Fixes --- DDV_Controller/main.cpp | 153 ++++++++++++++++++++++++++++++++++++---- util/http_parser.cpp | 5 +- util/socket.cpp | 31 +++++--- util/util.cpp | 9 +++ 4 files changed, 172 insertions(+), 26 deletions(-) diff --git a/DDV_Controller/main.cpp b/DDV_Controller/main.cpp index b71de94b..30157894 100644 --- a/DDV_Controller/main.cpp +++ b/DDV_Controller/main.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include "../util/socket.h" #include "../util/http_parser.h" #include "../util/md5.h" @@ -33,6 +33,35 @@ #define STRINGIFY(x) #x #define TOSTRING(x) STRINGIFY(x) +Socket::Server API_Socket; ///< Main connection socket. + +/// Basic signal handler. Disconnects the server_socket if it receives +/// a SIGINT, SIGHUP or SIGTERM signal, but does nothing for SIGPIPE. +/// Disconnecting the server_socket will terminate the main listening loop +/// and cleanly shut down the process. +void signal_handler (int signum){ + switch (signum){ + case SIGINT: + #if DEBUG >= 1 + fprintf(stderr, "Received SIGINT - closing server socket.\n"); + #endif + break; + case SIGHUP: + #if DEBUG >= 1 + fprintf(stderr, "Received SIGHUP - closing server socket.\n"); + #endif + break; + case SIGTERM: + #if DEBUG >= 1 + fprintf(stderr, "Received SIGTERM - closing server socket.\n"); + #endif + break; + default: return; break; + } + API_Socket.close(); +}//signal_handler + + /// Needed for base64_encode function static const std::string base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; @@ -137,7 +166,7 @@ void RSA_Load(){ /// Returns true if the data could be verified, false otherwise. bool RSA_check(std::string & data, std::string basesign){ std::string sign = base64_decode(basesign); - return (RSA_verify(NID_md5, (const unsigned char*)data.c_str(), data.size(), (const unsigned char*)sign.c_str(), sign.size(), pubkey) == 1); + return (RSA_verify(NID_md5, (unsigned char*)data.c_str(), data.size(), (unsigned char*)sign.c_str(), sign.size(), pubkey) == 1); } Json::Value Storage = Json::Value(Json::objectValue); ///< Global storage of data. @@ -223,11 +252,43 @@ void Authorize( Json::Value & Request, Json::Value & Response, ConnectedUser & c return; } +void CheckProtocols(Json::Value & p){ + static std::map connports; + bool seenHTTP = false; + bool seenRTMP = false; + std::string tmp; + Util::Procs::Stop("RTMP"); + for (Json::ValueIterator jit = p.begin(); jit != p.end(); jit++){ + if (jit.memberName() == std::string("HTTP")){ + tmp = p[jit.memberName()]["port"].asString(); + seenHTTP = true; + if (connports["HTTP"] != tmp){Util::Procs::Stop("HTTP");} + connports["HTTP"] = tmp; + if (!Util::Procs::isActive("HTTP")){ + Util::Procs::Start("HTTP", std::string("DDV_Conn_HTTP -p ")+tmp); + } + } + if (jit.memberName() == std::string("RTMP")){ + tmp = p[jit.memberName()]["port"].asString(); + seenRTMP = true; + if (connports["RTMP"] != tmp){Util::Procs::Stop("RTMP");} + connports["RTMP"] = tmp; + if (!Util::Procs::isActive("RTMP")){ + Util::Procs::Start("RTMP", std::string("DDV_Conn_RTMP -p ")+tmp); + } + } + } + if (!seenHTTP){Util::Procs::Stop("HTTP");} + if (!seenRTMP){Util::Procs::Stop("RTMP");} +} + void CheckConfig(Json::Value & in, Json::Value & out){ if (in.isObject() && (in.size() > 0)){ for (Json::ValueIterator jit = in.begin(); jit != in.end(); jit++){ if (out.isObject() && out.isMember(jit.memberName())){ - Log("CONF", std::string("Updated configuration value ")+jit.memberName()); + if (in[jit.memberName()] != out[jit.memberName()]){ + Log("CONF", std::string("Updated configuration value ")+jit.memberName()); + } }else{ Log("CONF", std::string("New configuration value ")+jit.memberName()); } @@ -244,20 +305,50 @@ void CheckConfig(Json::Value & in, Json::Value & out){ out["version"] = TOSTRING(VERSION); } +void startStream(std::string name, Json::Value & data){ + Log("BUFF", "(re)starting stream buffer "+name); + std::string URL = data["channel"]["URL"].asString(); + std::string preset = data["preset"]["cmd"].asString(); + std::string cmd1, cmd2; + if (URL.substr(0, 4) == "push"){ + std::string pusher = URL.substr(7); + cmd2 = "DDV_Buffer 500 "+name+" "+pusher; + Util::Procs::Start(name, cmd2); + }else{ + cmd1 = "ffmpeg -re -async 2 -i "+URL+" "+preset+" -f flv -"; + cmd2 = "DDV_Buffer 500 "+name; + Util::Procs::Start(name, cmd1, cmd2); + } +} + +void CheckAllStreams(Json::Value & data){ + for (Json::ValueIterator jit = data.begin(); jit != data.end(); jit++){ + if (!Util::Procs::isActive(jit.memberName())){ + startStream(jit.memberName(), data[jit.memberName()]); + } + } +} + void CheckStreams(Json::Value & in, Json::Value & out){ if (in.isObject() && (in.size() > 0)){ for (Json::ValueIterator jit = in.begin(); jit != in.end(); jit++){ if (out.isObject() && out.isMember(jit.memberName())){ - Log("STRM", std::string("Updated stream ")+jit.memberName()); + if (in[jit.memberName()] != out[jit.memberName()]){ + Log("STRM", std::string("Updated stream ")+jit.memberName()); + Util::Procs::Stop(jit.memberName()); + startStream(jit.memberName(), in[jit.memberName()]); + } }else{ Log("STRM", std::string("New stream ")+jit.memberName()); + startStream(jit.memberName(), in[jit.memberName()]); } } - if (out.isObject() && (out.size() > 0)){ - for (Json::ValueIterator jit = out.begin(); jit != out.end(); jit++){ - if (!in.isMember(jit.memberName())){ - Log("STRM", std::string("Deleted stream ")+jit.memberName()); - } + } + if (out.isObject() && (out.size() > 0)){ + for (Json::ValueIterator jit = out.begin(); jit != out.end(); jit++){ + if (!in.isMember(jit.memberName())){ + Log("STRM", std::string("Deleted stream ")+jit.memberName()); + Util::Procs::Stop(jit.memberName()); } } } @@ -265,13 +356,24 @@ void CheckStreams(Json::Value & in, Json::Value & out){ } int main(int argc, char ** argv){ + //setup signal handler + struct sigaction new_action; + new_action.sa_handler = signal_handler; + sigemptyset (&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGINT, &new_action, NULL); + sigaction(SIGHUP, &new_action, NULL); + sigaction(SIGTERM, &new_action, NULL); + sigaction(SIGPIPE, &new_action, NULL); + RSA_Load(); // Load GearBox public key Util::Config C; C.confsection = "API"; C.parseArgs(argc, argv); C.parseFile(); time_t lastuplink = 0; - Socket::Server API_Socket = Socket::Server(C.listen_port, C.interface, true); + time_t processchecker = 0; + API_Socket = Socket::Server(C.listen_port, C.interface, true); Socket::Server Stats_Socket = Socket::Server("/tmp/ddv_statistics", true); Util::setUser(C.username); if (C.daemon_mode){ @@ -279,6 +381,7 @@ int main(int argc, char ** argv){ } Socket::Connection Incoming; std::vector< ConnectedUser > users; + std::vector buffers; Json::Value Request = Json::Value(Json::objectValue); Json::Value Response = Json::Value(Json::objectValue); Json::Reader JsonParse; @@ -291,6 +394,12 @@ int main(int argc, char ** argv){ while (API_Socket.connected()){ usleep(100000); //sleep for 100 ms - prevents 100% CPU time + if (time(0) - processchecker > 10){ + processchecker = time(0); + CheckProtocols(Storage["config"]["protocols"]); + CheckAllStreams(Storage["streams"]); + } + if (time(0) - lastuplink > UPLINK_INTERVAL){ lastuplink = time(0); bool gotUplink = false; @@ -324,7 +433,7 @@ int main(int argc, char ** argv){ uplink->H.BuildRequest(); uplink->writebuffer += uplink->H.BuildResponse("200", "OK"); uplink->H.Clean(); - Log("UPLK", "Sending server data to uplink."); + //Log("UPLK", "Sending server data to uplink."); }else{ Log("UPLK", "Could not connect to uplink."); } @@ -332,6 +441,18 @@ int main(int argc, char ** argv){ Incoming = API_Socket.accept(); if (Incoming.connected()){users.push_back(Incoming);} + Incoming = Stats_Socket.accept(); + if (Incoming.connected()){buffers.push_back(Incoming);} + if (buffers.size() > 0){ + for( std::vector< Socket::Connection >::iterator it = buffers.end() - 1; it >= buffers.begin(); it--) { + if (!it->connected()){ + it->close(); + buffers.erase(it); + break; + } + + } + } if (users.size() > 0){ for( std::vector< ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { if (!it->C.connected() || it->logins > 3){ @@ -349,7 +470,7 @@ int main(int argc, char ** argv){ // They are assumed to be authorized, but authorization to gearbox is still done. // This authorization uses the compiled-in username and password (account). if (!JsonParse.parse(it->H.body, Request, false)){ - Log("HTTP", "Failed to parse JSON: "+it->H.body); + Log("HTTP", "Failed to parse body JSON: "+it->H.body); Response["authorize"]["status"] = "INVALID"; }else{ if (Request["authorize"]["status"] != "OK"){ @@ -380,13 +501,13 @@ int main(int argc, char ** argv){ Storage["log"].clear(); Storage["statistics"].clear(); } - Log("UPLK", "Received data from uplink."); - WriteFile("config.json", Storage.toStyledString()); + //Log("UPLK", "Received data from uplink."); + //WriteFile("config.json", Storage.toStyledString()); } } }else{ if (!JsonParse.parse(it->H.GetVar("command"), Request, false)){ - Log("HTTP", "Failed to parse JSON: "+it->H.GetVar("command")); + Log("HTTP", "Failed to parse command JSON: "+it->H.GetVar("command")); Response["authorize"]["status"] = "INVALID"; }else{ std::cout << "Request: " << Request.toStyledString() << std::endl; @@ -428,6 +549,8 @@ int main(int argc, char ** argv){ } } } + Util::Procs::StopAll(); WriteFile("config.json", Storage.toStyledString()); + std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; return 0; } diff --git a/util/http_parser.cpp b/util/http_parser.cpp index f46e5a2b..06bc713c 100644 --- a/util/http_parser.cpp +++ b/util/http_parser.cpp @@ -178,9 +178,12 @@ bool HTTP::Parser::parse(){ if (seenHeaders){ if (length > 0){ if (HTTPbuffer.length() >= length){ + if ((method != "HTTP/1.0") && (method != "HTTP/1.1")){ + body = HTTPbuffer.substr(0, length); + parseVars(body); //parse POST variables + } body = HTTPbuffer.substr(0, length); HTTPbuffer.erase(0, length); - parseVars(body); //parse POST variables return true; }else{ return false; diff --git a/util/socket.cpp b/util/socket.cpp index 5463e8de..522db714 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -29,7 +29,7 @@ Socket::Connection::Connection(){ /// Close connection. The internal socket is closed and then set to -1. void Socket::Connection::close(){ - #if DEBUG >= 4 + #if DEBUG >= 6 fprintf(stderr, "Socket closed.\n"); #endif shutdown(sock, SHUT_RDWR); @@ -81,14 +81,24 @@ Socket::Connection::Connection(std::string address, bool nonblock){ /// \param port String containing the port to connect to. /// \param nonblock Whether the socket should be nonblocking. Socket::Connection::Connection(std::string host, int port, bool nonblock){ - struct addrinfo *result, *rp; + struct addrinfo *result, *rp, hints; Error = false; Blocking = false; std::stringstream ss; ss << port; - if (getaddrinfo(host.c_str(), ss.str().c_str(), 0, &result) != 0){ + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG; + hints.ai_protocol = 0; + hints.ai_canonname = NULL; + hints.ai_addr = NULL; + hints.ai_next = NULL; + int s = getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &result); + if (s != 0){ #if DEBUG >= 1 - fprintf(stderr, "Could not connect to %s:%i! Error: %s\n", host.c_str(), port, strerror(errno)); + fprintf(stderr, "Could not connect to %s:%i! Error: %s\n", host.c_str(), port, gai_strerror(s)); #endif close(); return; @@ -96,10 +106,11 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ for (rp = result; rp != NULL; rp = rp->ai_next) { sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sock == -1){continue;} - if (connect(sock, rp->ai_addr, rp->ai_addrlen) != -1){break;} + if (sock < 0){continue;} + if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0){break;} ::close(sock); } + freeaddrinfo(result); if (rp == 0){ #if DEBUG >= 1 @@ -534,18 +545,18 @@ Socket::Connection Socket::Server::accept(bool nonblock){ }else{ if (addrinfo.sin6_family == AF_INET6){ tmp.remotehost = inet_ntop(AF_INET6, &(addrinfo.sin6_addr), addrconv, INET6_ADDRSTRLEN); - #if DEBUG >= 4 + #if DEBUG >= 6 fprintf(stderr,"IPv6 addr: %s\n", tmp.remotehost.c_str()); #endif } if (addrinfo.sin6_family == AF_INET){ tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in*)&addrinfo)->sin_addr), addrconv, INET6_ADDRSTRLEN); - #if DEBUG >= 4 + #if DEBUG >= 6 fprintf(stderr,"IPv4 addr: %s\n", tmp.remotehost.c_str()); #endif } if (addrinfo.sin6_family == AF_UNIX){ - #if DEBUG >= 4 + #if DEBUG >= 6 tmp.remotehost = ((sockaddr_un*)&addrinfo)->sun_path; fprintf(stderr,"Unix socket, no address\n"); #endif @@ -557,7 +568,7 @@ Socket::Connection Socket::Server::accept(bool nonblock){ /// Close connection. The internal socket is closed and then set to -1. void Socket::Server::close(){ - #if DEBUG >= 4 + #if DEBUG >= 6 fprintf(stderr, "ServerSocket closed.\n"); #endif shutdown(sock, SHUT_RDWR); diff --git a/util/util.cpp b/util/util.cpp index 687dba58..06a1ace1 100644 --- a/util/util.cpp +++ b/util/util.cpp @@ -5,7 +5,12 @@ #include #include #include + +#ifdef __FreeBSD__ +#include +#else #include +#endif #include #include #include @@ -52,6 +57,7 @@ void Util::Procs::childsig_handler(int signum){ #if DEBUG >= 1 if (isActive(pname)){ std::cerr << "Process " << pname << " half-terminated." << std::endl; + Stop(pname); }else{ std::cerr << "Process " << pname << " fully terminated." << std::endl; } @@ -190,8 +196,11 @@ pid_t Util::Procs::Start(std::string name, std::string cmd, std::string cmd2){ /// Stops the named process, if running. /// \arg name (Internal) name of process to stop void Util::Procs::Stop(std::string name){ + int max = 5; while (isActive(name)){ Stop(getPid(name)); + max--; + if (max <= 0){return;} } }