From 0af85de22de08aa303baa48dd5f80e68c6b872a0 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 1 May 2023 02:09:21 +0200 Subject: [PATCH] UDP API port is now stored in and read from global config --- lib/procs.cpp | 5 ++--- lib/stream.cpp | 12 ++++++++++++ lib/stream.h | 1 + lib/triggers.cpp | 5 ++--- lib/url.cpp | 8 ++++++++ lib/url.h | 1 + src/controller/controller_api.cpp | 14 ++++++++++++-- src/controller/controller_storage.cpp | 7 ++++++- src/controller/controller_storage.h | 1 + src/output/output.cpp | 12 ++++-------- src/process/process_exec.cpp | 4 +--- src/process/process_livepeer.cpp | 4 +--- src/utils/util_certbot.cpp | 26 ++++++++++---------------- 13 files changed, 61 insertions(+), 39 deletions(-) diff --git a/lib/procs.cpp b/lib/procs.cpp index 2ed13fd8..5b81bd54 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -3,6 +3,7 @@ #include "defines.h" #include "procs.h" +#include "stream.h" #include #include #include @@ -465,9 +466,7 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in ERROR_MSG("%s trigger failed to execute %s: %s", trggr, argv[0], strerror(errno)); JSON::Value j; j["trigger_fail"] = trggr; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(j.toString()); + Util::sendUDPApi(j); std::cout << getenv("MIST_TRIG_DEF"); _exit(42); } diff --git a/lib/stream.cpp b/lib/stream.cpp index 5549d2c5..8384ada2 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -744,6 +744,18 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider) return ret; } +/// Sends a message to the local UDP API port +void Util::sendUDPApi(JSON::Value & cmd){ + HTTP::URL UDPAddr(getGlobalConfig("udpApi").asStringRef()); + if (UDPAddr.protocol != "udp"){ + FAIL_MSG("Local UDP API address not defined; can't send command to MistController!"); + return; + } + Socket::UDPConnection uSock; + uSock.SetDestination(UDPAddr.host, UDPAddr.getPort()); + uSock.SendNow(cmd.toString()); +} + /// Attempt to start a push for streamname to target. /// streamname MUST be pre-sanitized /// target gets variables replaced and may be altered by the PUSH_OUT_START trigger response. diff --git a/lib/stream.h b/lib/stream.h index 509c023c..f002a034 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -26,6 +26,7 @@ namespace Util{ JSON::Value getStreamConfig(const std::string &streamname); JSON::Value getGlobalConfig(const std::string &optionName); JSON::Value getInputBySource(const std::string &filename, bool isProvider = false); + void sendUDPApi(JSON::Value & cmd); uint8_t getStreamStatus(const std::string &streamname); uint8_t getStreamStatusPercentage(const std::string &streamname); bool checkException(const JSON::Value &ex, const std::string &useragent); diff --git a/lib/triggers.cpp b/lib/triggers.cpp index de33d8ba..46f2a87b 100644 --- a/lib/triggers.cpp +++ b/lib/triggers.cpp @@ -25,6 +25,7 @@ #include "triggers.h" #include "util.h" #include "json.h" +#include "stream.h" #include //for strncmp namespace Triggers{ @@ -34,9 +35,7 @@ namespace Triggers{ j["trigger_stat"]["name"] = trigger; j["trigger_stat"]["ms"] = Util::bootMS() - millis; j["trigger_stat"]["ok"] = ok; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(j.toString()); + Util::sendUDPApi(j); } ///\brief Handles a trigger by sending a payload to a destination. diff --git a/lib/url.cpp b/lib/url.cpp index 0978dc14..8e0fc311 100644 --- a/lib/url.cpp +++ b/lib/url.cpp @@ -4,6 +4,7 @@ #include "defines.h" #include "encode.h" #include "url.h" +#include /// Helper function to check if the given c-string is numeric or not static bool is_numeric(const char *str){ @@ -161,6 +162,13 @@ uint16_t HTTP::URL::getPort() const{ return atoi(port.c_str()); } +/// Sets the port in numeric format +void HTTP::URL::setPort(uint16_t newPort){ + std::stringstream st; + st << newPort; + port = st.str(); +} + /// Returns the default port for the protocol in numeric format uint16_t HTTP::URL::getDefaultPort() const{ if (protocol == "http"){return 80;} diff --git a/lib/url.h b/lib/url.h index 479e4378..68e760a6 100644 --- a/lib/url.h +++ b/lib/url.h @@ -14,6 +14,7 @@ namespace HTTP{ public: URL(const std::string &url = ""); uint16_t getPort() const; + void setPort(uint16_t newPort); uint16_t getDefaultPort() const; std::string getExt() const; std::string getUrl() const; diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index f5fd3494..c167b9c1 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -441,12 +441,22 @@ int Controller::handleAPIConnection(Socket::Connection &conn){ void Controller::handleUDPAPI(void *np){ Socket::UDPConnection uSock(true); - if (!uSock.bind(UDP_API_PORT, UDP_API_HOST)){ + uint16_t boundPort = uSock.bind(UDP_API_PORT, UDP_API_HOST); + if (!boundPort){ FAIL_MSG("Could not open local API UDP socket - not all functionality will be available"); return; } + HTTP::URL boundAddr; + boundAddr.protocol = "udp"; + boundAddr.setPort(boundPort); + boundAddr.host = uSock.getBoundAddress(); + { + tthread::lock_guard guard(configMutex); + udpApiBindAddr = boundAddr.getUrl(); + Controller::writeConfig(); + } Util::Procs::socketList.insert(uSock.getSock()); - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); + uSock.allocateDestination(); while (Controller::conf.is_active){ if (uSock.Receive()){ MEDIUM_MSG("UDP API: %s", (const char*)uSock.data); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 532ee4fe..c8422c43 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -16,6 +16,7 @@ namespace Controller{ std::string instanceId; /// instanceId (previously uniqId) is set in controller.cpp std::string prometheus; std::string accesslog; + std::string udpApiBindAddr; Util::Config conf; JSON::Value Storage; ///< Global storage of data. tthread::mutex configMutex; @@ -444,7 +445,9 @@ namespace Controller{ || !globAccX.getFieldAccX("sessionOutputMode") || !globAccX.getFieldAccX("sessionUnspecifiedMode") || !globAccX.getFieldAccX("sessionStreamInfoMode") - || !globAccX.getFieldAccX("tknMode")){ + || !globAccX.getFieldAccX("tknMode") + || !globAccX.getFieldAccX("udpApi") + ){ globAccX.setReload(); globCfg.master = true; globCfg.close(); @@ -461,6 +464,7 @@ namespace Controller{ globAccX.addField("sessionUnspecifiedMode", RAX_64UINT); globAccX.addField("sessionStreamInfoMode", RAX_64UINT); globAccX.addField("tknMode", RAX_64UINT); + globAccX.addField("udpApi", RAX_128STRING); globAccX.setRCount(1); globAccX.setEndPos(1); globAccX.setReady(); @@ -472,6 +476,7 @@ namespace Controller{ globAccX.setInt("sessionUnspecifiedMode", Storage["config"]["sessionUnspecifiedMode"].asInt()); globAccX.setInt("sessionStreamInfoMode", Storage["config"]["sessionStreamInfoMode"].asInt()); globAccX.setInt("tknMode", Storage["config"]["tknMode"].asInt()); + globAccX.setString("udpApi", udpApiBindAddr); globAccX.setInt("systemBoot", systemBoot); globCfg.master = false; // leave the page after closing } diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 974d989b..833fe20f 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -8,6 +8,7 @@ namespace Controller{ extern std::string instanceId; ///< global storage of instanceId (previously uniqID) is set in controller.cpp extern std::string prometheus; ///< Prometheus access string extern std::string accesslog; ///< Where to write the access log + extern std::string udpApiBindAddr; ///< Bound address where the UDP API listens extern Util::Config conf; ///< Global storage of configuration. extern JSON::Value Storage; ///< Global storage of data. extern tthread::mutex logMutex; ///< Mutex for log thread. diff --git a/src/output/output.cpp b/src/output/output.cpp index a3b1eb6a..397f31a7 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -2199,9 +2199,7 @@ namespace Mist{ prevLosCount = pktLosNow; } pData["active_seconds"] = statComm.getTime(); - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(pStat.toString()); + Util::sendUDPApi(pStat); lastPushUpdate = now; } } @@ -2240,11 +2238,9 @@ namespace Mist{ /*LTS-START*/ // Tag the session with the user agent if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){ - std::string APIcall = - "{\"tag_sessid\":{\"" + statComm.sessionId + "\":" + JSON::string_escape("UA:" + UA) + "}}"; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(APIcall); + JSON::Value APIcall; + APIcall["tag_sessid"][statComm.sessionId] = "UA:"+UA; + Util::sendUDPApi(APIcall); newUA = false; } /*LTS-END*/ diff --git a/src/process/process_exec.cpp b/src/process/process_exec.cpp index e89c2444..296aa747 100644 --- a/src/process/process_exec.cpp +++ b/src/process/process_exec.cpp @@ -226,9 +226,7 @@ namespace Mist{ pData["active_seconds"] = (Util::bootSecs() - startTime); pData["ainfo"]["sourceTime"] = statSourceMs; pData["ainfo"]["sinkTime"] = statSinkMs; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(pStat.toString()); + Util::sendUDPApi(pStat); lastProcUpdate = Util::bootSecs(); } } diff --git a/src/process/process_livepeer.cpp b/src/process/process_livepeer.cpp index ff331edd..21f3a177 100644 --- a/src/process/process_livepeer.cpp +++ b/src/process/process_livepeer.cpp @@ -948,9 +948,7 @@ int main(int argc, char *argv[]){ tthread::lock_guard guard(broadcasterMutex); pData["ainfo"]["bc"] = Mist::currBroadAddr; } - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(pStat.toString()); + Util::sendUDPApi(pStat); lastProcUpdate = Util::bootSecs(); } } diff --git a/src/utils/util_certbot.cpp b/src/utils/util_certbot.cpp index ed2a6db6..0fd2416f 100644 --- a/src/utils/util_certbot.cpp +++ b/src/utils/util_certbot.cpp @@ -59,8 +59,6 @@ int main(int argc, char **argv){ std::string cbPath = getenv("RENEWED_LINEAGE"); std::string cbCert = cbPath + "/fullchain.pem"; std::string cbKey = cbPath + "/privkey.pem"; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); Util::DTSCShmReader rProto(SHM_PROTO); DTSC::Scan prtcls = rProto.getScan(); unsigned int pro_cnt = prtcls.getSize(); @@ -76,11 +74,11 @@ int main(int argc, char **argv){ cmd["updateprotocol"][1u]["cert"] = cbCert; cmd["updateprotocol"][1u]["key"] = cbKey; INFO_MSG("Executing: %s", cmd.toString().c_str()); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(500); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(500); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); } } if (!found){ @@ -91,11 +89,11 @@ int main(int argc, char **argv){ cmd["addprotocol"]["cert"] = cbCert; cmd["addprotocol"]["key"] = cbKey; INFO_MSG("Executing: %s", cmd.toString().c_str()); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(500); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(500); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); } Util::wait(5000); return 0; @@ -119,15 +117,13 @@ int main(int argc, char **argv){ cmd["addprotocol"]["connector"] = "HTTP"; cmd["addprotocol"]["port"] = 80; cmd["addprotocol"]["certbot"] = cbCombo; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(1000); int counter = 10; while (--counter && ((foundHTTP80 = checkPort80(currConf)) == -1 || currConf["certbot"].asStringRef() != cbCombo)){ INFO_MSG("Waiting for Controller to pick up new config..."); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(1000); } if (!counter){ @@ -146,15 +142,13 @@ int main(int argc, char **argv){ cmd["updateprotocol"].append(currConf); cmd["updateprotocol"].append(currConf); cmd["updateprotocol"][1u]["certbot"] = cbCombo; - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(1000); int counter = 10; while (--counter && ((foundHTTP80 = checkPort80(currConf)) == -1 || currConf["certbot"].asStringRef() != cbCombo)){ INFO_MSG("Waiting for Controller to pick up new config..."); - uSock.SendNow(cmd.toString()); + Util::sendUDPApi(cmd); Util::wait(1000); } if (!counter){