From 221cd15b07d4cc66d499d6f17b39838d53839b9c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 12 Aug 2013 13:16:29 +0200 Subject: [PATCH] Some minor optimalizations and fixes to controller. --- src/connectors/conn_http.cpp | 6 +- src/connectors/conn_http_live.cpp | 4 +- src/connectors/conn_http_progressive.cpp | 4 +- src/connectors/conn_rtmp.cpp | 4 +- src/connectors/conn_ts.cpp | 16 +- src/controller/controller.cpp | 6 +- src/controller/controller_connectors.cpp | 197 +++++++++-------------- src/controller/controller_connectors.h | 2 +- 8 files changed, 101 insertions(+), 138 deletions(-) diff --git a/src/connectors/conn_http.cpp b/src/connectors/conn_http.cpp index d8ef2986..a90b1c97 100644 --- a/src/connectors/conn_http.cpp +++ b/src/connectors/conn_http.cpp @@ -230,11 +230,11 @@ namespace Connector_HTTP { //find out which connectors are enabled std::set conns; for (JSON::ArrIter it = ServConf["config"]["protocols"].ArrBegin(); it != ServConf["config"]["protocols"].ArrEnd(); it++){ - conns.insert(( *it)["connector"].asString()); + conns.insert(( *it)["connector"].asStringRef()); } //first, see if we have RTMP working and output all the RTMP. for (JSON::ArrIter it = ServConf["config"]["protocols"].ArrBegin(); it != ServConf["config"]["protocols"].ArrEnd(); it++){ - if (( *it)["connector"].asString() == "RTMP"){ + if (( *it)["connector"].asStringRef() == "RTMP"){ if (( *it)["port"].asInt() == 0){ ( *it)["port"] = 1935ll; } @@ -247,7 +247,7 @@ namespace Connector_HTTP { /// \todo Add raw MPEG2 TS support here? //then, see if we have HTTP working and output all the HTTP. for (JSON::ArrIter it = ServConf["config"]["protocols"].ArrBegin(); it != ServConf["config"]["protocols"].ArrEnd(); it++){ - if (( *it)["connector"].asString() == "HTTP"){ + if (( *it)["connector"].asStringRef() == "HTTP"){ if (( *it)["port"].asInt() == 0){ ( *it)["port"] = 8080ll; } diff --git a/src/connectors/conn_http_live.cpp b/src/connectors/conn_http_live.cpp index b4ece795..722da958 100644 --- a/src/connectors/conn_http_live.cpp +++ b/src/connectors/conn_http_live.cpp @@ -36,14 +36,14 @@ namespace Connector_HTTP { std::string audioName; bool defAudio = false;//set default audio track; for (JSON::ObjIter trackIt = metadata["tracks"].ObjBegin(); trackIt != metadata["tracks"].ObjEnd(); trackIt++){ - if (trackIt->second["type"].asString() == "audio"){ + if (trackIt->second["type"].asStringRef() == "audio"){ audioId = trackIt->second["trackid"].asInt(); audioName = trackIt->first; break; } } for (JSON::ObjIter trackIt = metadata["tracks"].ObjBegin(); trackIt != metadata["tracks"].ObjEnd(); trackIt++){ - if (trackIt->second["type"].asString() == "video"){ + if (trackIt->second["type"].asStringRef() == "video"){ int bWidth = trackIt->second["maxbps"].asInt(); if (audioId != -1){ bWidth += (metadata["tracks"][audioName]["maxbps"].asInt() * 2); diff --git a/src/connectors/conn_http_progressive.cpp b/src/connectors/conn_http_progressive.cpp index 85dd8e7a..288c8803 100644 --- a/src/connectors/conn_http_progressive.cpp +++ b/src/connectors/conn_http_progressive.cpp @@ -126,10 +126,10 @@ namespace Connector_HTTP { } int byterate = 0; for (JSON::ObjIter objIt = Strm.metadata["tracks"].ObjBegin(); objIt != Strm.metadata["tracks"].ObjEnd(); objIt++){ - if (videoID == -1 && objIt->second["type"].asString() == "video"){ + if (videoID == -1 && objIt->second["type"].asStringRef() == "video"){ videoID = objIt->second["trackid"].asInt(); } - if (audioID == -1 && objIt->second["type"].asString() == "audio"){ + if (audioID == -1 && objIt->second["type"].asStringRef() == "audio"){ audioID = objIt->second["trackid"].asInt(); } } diff --git a/src/connectors/conn_rtmp.cpp b/src/connectors/conn_rtmp.cpp index a7ae12a2..9c734d7d 100644 --- a/src/connectors/conn_rtmp.cpp +++ b/src/connectors/conn_rtmp.cpp @@ -560,10 +560,10 @@ namespace Connector_RTMP { } //find first audio and video tracks for (JSON::ObjIter objIt = Strm.metadata["tracks"].ObjBegin(); objIt != Strm.metadata["tracks"].ObjEnd(); objIt++){ - if (videoID == -1 && objIt->second["type"].asString() == "video"){ + if (videoID == -1 && objIt->second["type"].asStringRef() == "video"){ videoID = objIt->second["trackid"].asInt(); } - if (audioID == -1 && objIt->second["type"].asString() == "audio"){ + if (audioID == -1 && objIt->second["type"].asStringRef() == "audio"){ audioID = objIt->second["trackid"].asInt(); } } diff --git a/src/connectors/conn_ts.cpp b/src/connectors/conn_ts.cpp index 9b429ce3..0eb5c4c5 100644 --- a/src/connectors/conn_ts.cpp +++ b/src/connectors/conn_ts.cpp @@ -203,14 +203,16 @@ int main(int argc, char ** argv){ JSON::Value capa; capa["desc"] = "Enables the raw MPEG Transport Stream protocol over TCP."; capa["deps"] = ""; - capa["required"]["args"]["name"] = "Stream"; - capa["required"]["args"]["help"] = "What streamname to serve. For multiple streams, add this protocol multiple times using different ports."; - capa["required"]["args"]["type"] = "str"; - capa["optional"]["tracks"]["name"] = "Tracks"; - capa["optional"]["tracks"]["help"] = "The track IDs of the stream that this connector will transmit separated by spaces"; - capa["optional"]["tracks"]["type"] = "str"; + capa["required"]["streamname"]["name"] = "Stream"; + capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add this protocol multiple times using different ports."; + capa["required"]["streamname"]["type"] = "str"; + capa["required"]["streamname"]["option"] = "--stream"; + capa["required"]["tracks"]["name"] = "Tracks"; + capa["required"]["tracks"]["help"] = "The track IDs of the stream that this connector will transmit separated by spaces"; + capa["required"]["tracks"]["type"] = "str"; + capa["required"]["tracks"]["option"] = "--tracks"; conf.addOption("streamname", - JSON::fromString("{\"arg\":\"string\",\"arg_num\":1,\"help\":\"The name of the stream that this connector will transmit.\"}")); + JSON::fromString("{\"arg\":\"string\",\"arg_num\":1,\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}")); conf.addOption("tracks", JSON::fromString("{\"arg\":\"string\",\"default\":\"\",\"short\": \"t\",\"long\":\"tracks\",\"help\":\"The track IDs of the stream that this connector will transmit separated by spaces.\"}")); conf.addConnectorOptions(8888, capa); diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 7417629d..68576417 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -350,7 +350,7 @@ int main(int argc, char ** argv){ if (Util::epoch() - processchecker > 10){ processchecker = Util::epoch(); - Controller::CheckProtocols(Controller::Storage["config"]["protocols"]); + Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities); Controller::CheckAllStreams(Controller::Storage["streams"]); Controller::CheckStats(Controller::Storage["statistics"]); myConverter.updateStatus(); @@ -518,7 +518,7 @@ int main(int argc, char ** argv){ }else{ if (Request.isMember("config")){ Controller::CheckConfig(Request["config"], Controller::Storage["config"]); - Controller::CheckProtocols(Controller::Storage["config"]["protocols"]); + Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities); } if (Request.isMember("streams")){ Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]); @@ -545,7 +545,7 @@ int main(int argc, char ** argv){ //Parse config and streams from the request. if (Request.isMember("config")){ Controller::CheckConfig(Request["config"], Controller::Storage["config"]); - Controller::CheckProtocols(Controller::Storage["config"]["protocols"]); + Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities); } if (Request.isMember("streams")){ Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]); diff --git a/src/controller/controller_connectors.cpp b/src/controller/controller_connectors.cpp index 498a78b7..be22221a 100644 --- a/src/controller/controller_connectors.cpp +++ b/src/controller/controller_connectors.cpp @@ -8,99 +8,83 @@ #include "controller_storage.h" #include "controller_connectors.h" +#include + + ///\brief Holds everything unique to the controller. namespace Controller { - static std::map currentConnectors; /// currentConnectors; ///::iterator iter; + std::map::iterator iter; for (iter = currentConnectors.begin(); iter != currentConnectors.end(); iter++){ if (iter->second.substr(0, protocol.size()) == protocol){ Log("CONF", "Restarting connector for update: " + iter->second); - Util::Procs::Stop(iter->first); + Util::Procs::Stop(toConn(iter->first)); int i = 0; - while (Util::Procs::isActive(iter->first) && i < 30){ + while (Util::Procs::isActive(toConn(iter->first)) && i < 30){ Util::sleep(100); } if (i >= 30){ Log("WARN", "Connector still active 3 seconds after shutdown - delaying restart."); }else{ - Util::Procs::Start(iter->first, Util::getMyPath() + iter->second); + Util::Procs::Start(toConn(iter->first), Util::getMyPath() + iter->second); } return; } } } - - - - void buildPipedArguments(JSON::Value & p, std::string conn, char * argarr[]){ - int argnum = 2; //first two are progname and -n - std::string arg; - - std::string conname; - - for (JSON::ArrIter ait = p.ArrBegin(); ait != p.ArrEnd(); ait++){ - - conname = (std::string("MistConn") + ( *ait)["connector"].asString()); - conn = conn.substr(0, conn.find(" ") ); - - if ( !( *ait).isMember("connector") || ( *ait)["connector"].asString() == "" || conn != conname){ - continue; - } - - std::string tmppath = Util::getMyPath() + std::string("MistConn") + ( *ait)["connector"].asString(); - argarr[0] = new char[ tmppath.size() + 1]; std::strncpy(argarr[0], tmppath.c_str(), tmppath.size() + 1); - - argarr[1] = new char[3]; std::strncpy(argarr[1], "-n\0", 3); - - if (( *ait).isMember("port") && ( *ait)["port"].asInt() != 0){ - arg = ( *ait)["port"].asString(); - argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-p\0", 3); argnum++; - argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++; - } - - if (( *ait).isMember("interface") && ( *ait)["interface"].asString() != "" && ( *ait)["interface"].asString() != "0.0.0.0"){ - arg = ( *ait)["interface"].asString(); - argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-i\0", 3); argnum++; - argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1 ); argnum++; - } - - if (( *ait).isMember("username") && ( *ait)["username"].asString() != "" && ( *ait)["username"].asString() != "root"){ - arg = ( *ait)["username"].asString(); - argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-u\0", 3); argnum++; - argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++; - } - - if (( *ait).isMember("tracks") && ( *ait)["tracks"].asString() != ""){ - arg = ( *ait)["tracks"].asString(); - argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-t\0", 3); argnum++; - argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++; - } - - if (( *ait).isMember("args") && ( *ait)["args"].asString() != ""){ - arg = ( *ait)["args"].asString(); - argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++; + + static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){ + for (JSON::ObjIter it = argset.ObjBegin(); it != argset.ObjEnd(); ++it){ + if (it->second.isMember("option") && p.isMember(it->first)){ + if (it->second.isMember("type")){ + if (it->second["type"].asStringRef() == "str" && !p[it->first].isString()){ + p[it->first] = p[it->first].asString(); + } + if ((it->second["type"].asStringRef() == "uint" || it->second["type"].asStringRef() == "int") && !p[it->first].isInt()){ + p[it->first] = JSON::Value(p[it->first].asInt()).asString(); + } + } + if (p[it->first].asStringRef().size() > 0){ + argarr[argnum++] = (char*)(it->second["option"].c_str()); + argarr[argnum++] = (char*)(p[it->first].c_str()); + } } } - - argarr[argnum] = NULL; } - - + + static inline void buildPipedArguments(JSON::Value & p, char * argarr[], JSON::Value & capabilities){ + int argnum = 0; + static std::string tmparg; + tmparg = Util::getMyPath() + std::string("MistConn") + p["connector"].asStringRef(); + argarr[argnum++] = (char*)tmparg.c_str(); + argarr[argnum++] = (char*)"-n"; + JSON::Value & pipedCapa = capabilities["connectors"][p["connector"].asStringRef()]; + if (pipedCapa.isMember("required")){builPipedPart(p, argarr, argnum, pipedCapa["required"]);} + if (pipedCapa.isMember("optional")){builPipedPart(p, argarr, argnum, pipedCapa["optional"]);} + for (int i = 0; i < argnum; ++i){ + if (argarr[i] > 0){ + std::cerr << argarr[i] << " "; + } + } + std::cerr << std::endl; + } ///\brief Checks current protocol coguration, updates state of enabled connectors if neccesary. ///\param p An object containing all protocols. - void CheckProtocols(JSON::Value & p){ - std::map new_connectors; - std::map::iterator iter; - bool haveHTTPgeneric = false; - bool haveHTTPspecific = false; + ///\param capabilities An object containing the detected capabilities. + void CheckProtocols(JSON::Value & p, JSON::Value & capabilities){ + std::map new_connectors; + std::map::iterator iter; // used for building args int zero = 0; @@ -110,88 +94,65 @@ namespace Controller { int i; std::string tmp; - JSON::Value counter = (long long int)0; + long long counter = 0; for (JSON::ArrIter ait = p.ArrBegin(); ait != p.ArrEnd(); ait++){ - if ( !( *ait).isMember("connector") || ( *ait)["connector"].asString() == ""){ + ( *ait).removeMember("online"); + #define connName (*ait)["connector"].asStringRef() + if ( !(*ait).isMember("connector") || connName == ""){ continue; } - - tmp = std::string("MistConn") + ( *ait)["connector"].asString(); - tmp += std::string(" -n"); - - if (( *ait)["connector"].asString() == "HTTP"){ - haveHTTPgeneric = true; + + if ( !capabilities["connectors"].isMember(connName)){ + Log("WARN", connName + " connector is enabled but doesn't exist on system! Ignoring connector."); + continue; } - if (( *ait)["connector"].asString() != "HTTP" && ( *ait)["connector"].asString().substr(0, 4) == "HTTP"){ - haveHTTPspecific = true; + + #define connCapa capabilities["connectors"][connName] + if (connCapa.isMember("required")){ + bool gotAll = true; + for (JSON::ObjIter it = connCapa["required"].ObjBegin(); it != connCapa["required"].ObjEnd(); ++it){ + if ( !(*ait).isMember(it->first) || (*ait)[it->first].asStringRef().size() < 1){ + gotAll = false; + Log("WARN", connName + " connector is missing required parameter " + it->first + "! Ignoring connector."); + break; + } + } + if (!gotAll){continue;} } + + /// \TODO Check dependencies? - if (( *ait).isMember("port") && ( *ait)["port"].asInt() != 0){ - tmp += std::string(" -p ") + ( *ait)["port"].asString(); - } - - if (( *ait).isMember("interface") && ( *ait)["interface"].asString() != "" && ( *ait)["interface"].asString() != "0.0.0.0"){ - tmp += std::string(" -i ") + ( *ait)["interface"].asString(); - - } - - if (( *ait).isMember("username") && ( *ait)["username"].asString() != "" && ( *ait)["username"].asString() != "root"){ - tmp += std::string(" -u ") + ( *ait)["username"].asString(); - } - - if (( *ait).isMember("tracks") && ( *ait)["tracks"].asString() != ""){ - tmp += std::string(" -t \"") + ( *ait)["tracks"].asString() + "\""; - } - - if (( *ait).isMember("args") && ( *ait)["args"].asString() != ""){ - tmp += std::string(" ") + ( *ait)["args"].asString(); - } - - counter = counter.asInt() + 1; - new_connectors[std::string("Conn") + counter.asString()] = tmp; - if (Util::Procs::isActive(std::string("Conn") + counter.asString())){ + new_connectors[counter] = (*ait).toString(); + if (Util::Procs::isActive(toConn(counter))){ ( *ait)["online"] = 1; }else{ ( *ait)["online"] = 0; } + counter++; } //shut down deleted/changed connectors for (iter = currentConnectors.begin(); iter != currentConnectors.end(); iter++){ if (new_connectors.count(iter->first) != 1 || new_connectors[iter->first] != iter->second){ - Log("CONF", "Stopping connector: " + iter->second); - Util::Procs::Stop(iter->first); + Log("CONF", "Stopping connector " + iter->second); + Util::Procs::Stop(toConn(iter->first)); } } //start up new/changed connectors for (iter = new_connectors.begin(); iter != new_connectors.end(); iter++){ - if (currentConnectors.count(iter->first) != 1 || currentConnectors[iter->first] != iter->second || !Util::Procs::isActive(iter->first)){ + if (currentConnectors.count(iter->first) != 1 || currentConnectors[iter->first] != iter->second || !Util::Procs::isActive(toConn(iter->first))){ Log("CONF", "Starting connector: " + iter->second); - // clear out old args - for (i=0;i<15;i++) - { - argarr[i] = NULL; - } - + for (i=0; i<15; i++){argarr[i] = 0;} // get args for this connector - buildPipedArguments(p, iter->second, (char **)&argarr); - + buildPipedArguments(p[(long long unsigned)iter->first], (char **)&argarr, capabilities); // start piped w/ generated args - Util::Procs::StartPiped(iter->first, argarr, &zero, &out, &err); - + Util::Procs::StartPiped(toConn(iter->first), argarr, &zero, &out, &err); } } - if (haveHTTPgeneric && !haveHTTPspecific){ - Log("WARN", "HTTP Connector is enabled but no HTTP-based protocols are active!"); - } - if ( !haveHTTPgeneric && haveHTTPspecific){ - Log("WARN", "HTTP-based protocols will not work without the generic HTTP connector!"); - } - //store new state currentConnectors = new_connectors; } diff --git a/src/controller/controller_connectors.h b/src/controller/controller_connectors.h index bb0ad3c8..a90164f7 100644 --- a/src/controller/controller_connectors.h +++ b/src/controller/controller_connectors.h @@ -6,6 +6,6 @@ namespace Controller { void UpdateProtocol(std::string protocol); /// Checks current protocol configuration, updates state of enabled connectors if neccesary. - void CheckProtocols(JSON::Value & p); + void CheckProtocols(JSON::Value & p, JSON::Value & capabilities); }