UDP API port is now stored in and read from global config
This commit is contained in:
parent
40df03439c
commit
0af85de22d
13 changed files with 61 additions and 39 deletions
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
#include "defines.h"
|
#include "defines.h"
|
||||||
#include "procs.h"
|
#include "procs.h"
|
||||||
|
#include "stream.h"
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
|
@ -465,9 +466,7 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in
|
||||||
ERROR_MSG("%s trigger failed to execute %s: %s", trggr, argv[0], strerror(errno));
|
ERROR_MSG("%s trigger failed to execute %s: %s", trggr, argv[0], strerror(errno));
|
||||||
JSON::Value j;
|
JSON::Value j;
|
||||||
j["trigger_fail"] = trggr;
|
j["trigger_fail"] = trggr;
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(j);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(j.toString());
|
|
||||||
std::cout << getenv("MIST_TRIG_DEF");
|
std::cout << getenv("MIST_TRIG_DEF");
|
||||||
_exit(42);
|
_exit(42);
|
||||||
}
|
}
|
||||||
|
|
|
@ -744,6 +744,18 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sends a message to the local UDP API port
|
||||||
|
void Util::sendUDPApi(JSON::Value & cmd){
|
||||||
|
HTTP::URL UDPAddr(getGlobalConfig("udpApi").asStringRef());
|
||||||
|
if (UDPAddr.protocol != "udp"){
|
||||||
|
FAIL_MSG("Local UDP API address not defined; can't send command to MistController!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Socket::UDPConnection uSock;
|
||||||
|
uSock.SetDestination(UDPAddr.host, UDPAddr.getPort());
|
||||||
|
uSock.SendNow(cmd.toString());
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempt to start a push for streamname to target.
|
/// Attempt to start a push for streamname to target.
|
||||||
/// streamname MUST be pre-sanitized
|
/// streamname MUST be pre-sanitized
|
||||||
/// target gets variables replaced and may be altered by the PUSH_OUT_START trigger response.
|
/// target gets variables replaced and may be altered by the PUSH_OUT_START trigger response.
|
||||||
|
|
|
@ -26,6 +26,7 @@ namespace Util{
|
||||||
JSON::Value getStreamConfig(const std::string &streamname);
|
JSON::Value getStreamConfig(const std::string &streamname);
|
||||||
JSON::Value getGlobalConfig(const std::string &optionName);
|
JSON::Value getGlobalConfig(const std::string &optionName);
|
||||||
JSON::Value getInputBySource(const std::string &filename, bool isProvider = false);
|
JSON::Value getInputBySource(const std::string &filename, bool isProvider = false);
|
||||||
|
void sendUDPApi(JSON::Value & cmd);
|
||||||
uint8_t getStreamStatus(const std::string &streamname);
|
uint8_t getStreamStatus(const std::string &streamname);
|
||||||
uint8_t getStreamStatusPercentage(const std::string &streamname);
|
uint8_t getStreamStatusPercentage(const std::string &streamname);
|
||||||
bool checkException(const JSON::Value &ex, const std::string &useragent);
|
bool checkException(const JSON::Value &ex, const std::string &useragent);
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "triggers.h"
|
#include "triggers.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "json.h"
|
#include "json.h"
|
||||||
|
#include "stream.h"
|
||||||
#include <string.h> //for strncmp
|
#include <string.h> //for strncmp
|
||||||
|
|
||||||
namespace Triggers{
|
namespace Triggers{
|
||||||
|
@ -34,9 +35,7 @@ namespace Triggers{
|
||||||
j["trigger_stat"]["name"] = trigger;
|
j["trigger_stat"]["name"] = trigger;
|
||||||
j["trigger_stat"]["ms"] = Util::bootMS() - millis;
|
j["trigger_stat"]["ms"] = Util::bootMS() - millis;
|
||||||
j["trigger_stat"]["ok"] = ok;
|
j["trigger_stat"]["ok"] = ok;
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(j);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(j.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
///\brief Handles a trigger by sending a payload to a destination.
|
///\brief Handles a trigger by sending a payload to a destination.
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#include "defines.h"
|
#include "defines.h"
|
||||||
#include "encode.h"
|
#include "encode.h"
|
||||||
#include "url.h"
|
#include "url.h"
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
/// Helper function to check if the given c-string is numeric or not
|
/// Helper function to check if the given c-string is numeric or not
|
||||||
static bool is_numeric(const char *str){
|
static bool is_numeric(const char *str){
|
||||||
|
@ -161,6 +162,13 @@ uint16_t HTTP::URL::getPort() const{
|
||||||
return atoi(port.c_str());
|
return atoi(port.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the port in numeric format
|
||||||
|
void HTTP::URL::setPort(uint16_t newPort){
|
||||||
|
std::stringstream st;
|
||||||
|
st << newPort;
|
||||||
|
port = st.str();
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the default port for the protocol in numeric format
|
/// Returns the default port for the protocol in numeric format
|
||||||
uint16_t HTTP::URL::getDefaultPort() const{
|
uint16_t HTTP::URL::getDefaultPort() const{
|
||||||
if (protocol == "http"){return 80;}
|
if (protocol == "http"){return 80;}
|
||||||
|
|
|
@ -14,6 +14,7 @@ namespace HTTP{
|
||||||
public:
|
public:
|
||||||
URL(const std::string &url = "");
|
URL(const std::string &url = "");
|
||||||
uint16_t getPort() const;
|
uint16_t getPort() const;
|
||||||
|
void setPort(uint16_t newPort);
|
||||||
uint16_t getDefaultPort() const;
|
uint16_t getDefaultPort() const;
|
||||||
std::string getExt() const;
|
std::string getExt() const;
|
||||||
std::string getUrl() const;
|
std::string getUrl() const;
|
||||||
|
|
|
@ -441,12 +441,22 @@ int Controller::handleAPIConnection(Socket::Connection &conn){
|
||||||
|
|
||||||
void Controller::handleUDPAPI(void *np){
|
void Controller::handleUDPAPI(void *np){
|
||||||
Socket::UDPConnection uSock(true);
|
Socket::UDPConnection uSock(true);
|
||||||
if (!uSock.bind(UDP_API_PORT, UDP_API_HOST)){
|
uint16_t boundPort = uSock.bind(UDP_API_PORT, UDP_API_HOST);
|
||||||
|
if (!boundPort){
|
||||||
FAIL_MSG("Could not open local API UDP socket - not all functionality will be available");
|
FAIL_MSG("Could not open local API UDP socket - not all functionality will be available");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
HTTP::URL boundAddr;
|
||||||
|
boundAddr.protocol = "udp";
|
||||||
|
boundAddr.setPort(boundPort);
|
||||||
|
boundAddr.host = uSock.getBoundAddress();
|
||||||
|
{
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(configMutex);
|
||||||
|
udpApiBindAddr = boundAddr.getUrl();
|
||||||
|
Controller::writeConfig();
|
||||||
|
}
|
||||||
Util::Procs::socketList.insert(uSock.getSock());
|
Util::Procs::socketList.insert(uSock.getSock());
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
uSock.allocateDestination();
|
||||||
while (Controller::conf.is_active){
|
while (Controller::conf.is_active){
|
||||||
if (uSock.Receive()){
|
if (uSock.Receive()){
|
||||||
MEDIUM_MSG("UDP API: %s", (const char*)uSock.data);
|
MEDIUM_MSG("UDP API: %s", (const char*)uSock.data);
|
||||||
|
|
|
@ -16,6 +16,7 @@ namespace Controller{
|
||||||
std::string instanceId; /// instanceId (previously uniqId) is set in controller.cpp
|
std::string instanceId; /// instanceId (previously uniqId) is set in controller.cpp
|
||||||
std::string prometheus;
|
std::string prometheus;
|
||||||
std::string accesslog;
|
std::string accesslog;
|
||||||
|
std::string udpApiBindAddr;
|
||||||
Util::Config conf;
|
Util::Config conf;
|
||||||
JSON::Value Storage; ///< Global storage of data.
|
JSON::Value Storage; ///< Global storage of data.
|
||||||
tthread::mutex configMutex;
|
tthread::mutex configMutex;
|
||||||
|
@ -444,7 +445,9 @@ namespace Controller{
|
||||||
|| !globAccX.getFieldAccX("sessionOutputMode")
|
|| !globAccX.getFieldAccX("sessionOutputMode")
|
||||||
|| !globAccX.getFieldAccX("sessionUnspecifiedMode")
|
|| !globAccX.getFieldAccX("sessionUnspecifiedMode")
|
||||||
|| !globAccX.getFieldAccX("sessionStreamInfoMode")
|
|| !globAccX.getFieldAccX("sessionStreamInfoMode")
|
||||||
|| !globAccX.getFieldAccX("tknMode")){
|
|| !globAccX.getFieldAccX("tknMode")
|
||||||
|
|| !globAccX.getFieldAccX("udpApi")
|
||||||
|
){
|
||||||
globAccX.setReload();
|
globAccX.setReload();
|
||||||
globCfg.master = true;
|
globCfg.master = true;
|
||||||
globCfg.close();
|
globCfg.close();
|
||||||
|
@ -461,6 +464,7 @@ namespace Controller{
|
||||||
globAccX.addField("sessionUnspecifiedMode", RAX_64UINT);
|
globAccX.addField("sessionUnspecifiedMode", RAX_64UINT);
|
||||||
globAccX.addField("sessionStreamInfoMode", RAX_64UINT);
|
globAccX.addField("sessionStreamInfoMode", RAX_64UINT);
|
||||||
globAccX.addField("tknMode", RAX_64UINT);
|
globAccX.addField("tknMode", RAX_64UINT);
|
||||||
|
globAccX.addField("udpApi", RAX_128STRING);
|
||||||
globAccX.setRCount(1);
|
globAccX.setRCount(1);
|
||||||
globAccX.setEndPos(1);
|
globAccX.setEndPos(1);
|
||||||
globAccX.setReady();
|
globAccX.setReady();
|
||||||
|
@ -472,6 +476,7 @@ namespace Controller{
|
||||||
globAccX.setInt("sessionUnspecifiedMode", Storage["config"]["sessionUnspecifiedMode"].asInt());
|
globAccX.setInt("sessionUnspecifiedMode", Storage["config"]["sessionUnspecifiedMode"].asInt());
|
||||||
globAccX.setInt("sessionStreamInfoMode", Storage["config"]["sessionStreamInfoMode"].asInt());
|
globAccX.setInt("sessionStreamInfoMode", Storage["config"]["sessionStreamInfoMode"].asInt());
|
||||||
globAccX.setInt("tknMode", Storage["config"]["tknMode"].asInt());
|
globAccX.setInt("tknMode", Storage["config"]["tknMode"].asInt());
|
||||||
|
globAccX.setString("udpApi", udpApiBindAddr);
|
||||||
globAccX.setInt("systemBoot", systemBoot);
|
globAccX.setInt("systemBoot", systemBoot);
|
||||||
globCfg.master = false; // leave the page after closing
|
globCfg.master = false; // leave the page after closing
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ namespace Controller{
|
||||||
extern std::string instanceId; ///< global storage of instanceId (previously uniqID) is set in controller.cpp
|
extern std::string instanceId; ///< global storage of instanceId (previously uniqID) is set in controller.cpp
|
||||||
extern std::string prometheus; ///< Prometheus access string
|
extern std::string prometheus; ///< Prometheus access string
|
||||||
extern std::string accesslog; ///< Where to write the access log
|
extern std::string accesslog; ///< Where to write the access log
|
||||||
|
extern std::string udpApiBindAddr; ///< Bound address where the UDP API listens
|
||||||
extern Util::Config conf; ///< Global storage of configuration.
|
extern Util::Config conf; ///< Global storage of configuration.
|
||||||
extern JSON::Value Storage; ///< Global storage of data.
|
extern JSON::Value Storage; ///< Global storage of data.
|
||||||
extern tthread::mutex logMutex; ///< Mutex for log thread.
|
extern tthread::mutex logMutex; ///< Mutex for log thread.
|
||||||
|
|
|
@ -2199,9 +2199,7 @@ namespace Mist{
|
||||||
prevLosCount = pktLosNow;
|
prevLosCount = pktLosNow;
|
||||||
}
|
}
|
||||||
pData["active_seconds"] = statComm.getTime();
|
pData["active_seconds"] = statComm.getTime();
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(pStat);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(pStat.toString());
|
|
||||||
lastPushUpdate = now;
|
lastPushUpdate = now;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2240,11 +2238,9 @@ namespace Mist{
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
// Tag the session with the user agent
|
// Tag the session with the user agent
|
||||||
if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){
|
if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){
|
||||||
std::string APIcall =
|
JSON::Value APIcall;
|
||||||
"{\"tag_sessid\":{\"" + statComm.sessionId + "\":" + JSON::string_escape("UA:" + UA) + "}}";
|
APIcall["tag_sessid"][statComm.sessionId] = "UA:"+UA;
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(APIcall);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(APIcall);
|
|
||||||
newUA = false;
|
newUA = false;
|
||||||
}
|
}
|
||||||
/*LTS-END*/
|
/*LTS-END*/
|
||||||
|
|
|
@ -226,9 +226,7 @@ namespace Mist{
|
||||||
pData["active_seconds"] = (Util::bootSecs() - startTime);
|
pData["active_seconds"] = (Util::bootSecs() - startTime);
|
||||||
pData["ainfo"]["sourceTime"] = statSourceMs;
|
pData["ainfo"]["sourceTime"] = statSourceMs;
|
||||||
pData["ainfo"]["sinkTime"] = statSinkMs;
|
pData["ainfo"]["sinkTime"] = statSinkMs;
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(pStat);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(pStat.toString());
|
|
||||||
lastProcUpdate = Util::bootSecs();
|
lastProcUpdate = Util::bootSecs();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -948,9 +948,7 @@ int main(int argc, char *argv[]){
|
||||||
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
|
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
|
||||||
pData["ainfo"]["bc"] = Mist::currBroadAddr;
|
pData["ainfo"]["bc"] = Mist::currBroadAddr;
|
||||||
}
|
}
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(pStat);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(pStat.toString());
|
|
||||||
lastProcUpdate = Util::bootSecs();
|
lastProcUpdate = Util::bootSecs();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,8 +59,6 @@ int main(int argc, char **argv){
|
||||||
std::string cbPath = getenv("RENEWED_LINEAGE");
|
std::string cbPath = getenv("RENEWED_LINEAGE");
|
||||||
std::string cbCert = cbPath + "/fullchain.pem";
|
std::string cbCert = cbPath + "/fullchain.pem";
|
||||||
std::string cbKey = cbPath + "/privkey.pem";
|
std::string cbKey = cbPath + "/privkey.pem";
|
||||||
Socket::UDPConnection uSock;
|
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
Util::DTSCShmReader rProto(SHM_PROTO);
|
Util::DTSCShmReader rProto(SHM_PROTO);
|
||||||
DTSC::Scan prtcls = rProto.getScan();
|
DTSC::Scan prtcls = rProto.getScan();
|
||||||
unsigned int pro_cnt = prtcls.getSize();
|
unsigned int pro_cnt = prtcls.getSize();
|
||||||
|
@ -76,11 +74,11 @@ int main(int argc, char **argv){
|
||||||
cmd["updateprotocol"][1u]["cert"] = cbCert;
|
cmd["updateprotocol"][1u]["cert"] = cbCert;
|
||||||
cmd["updateprotocol"][1u]["key"] = cbKey;
|
cmd["updateprotocol"][1u]["key"] = cbKey;
|
||||||
INFO_MSG("Executing: %s", cmd.toString().c_str());
|
INFO_MSG("Executing: %s", cmd.toString().c_str());
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
Util::wait(500);
|
Util::wait(500);
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
Util::wait(500);
|
Util::wait(500);
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!found){
|
if (!found){
|
||||||
|
@ -91,11 +89,11 @@ int main(int argc, char **argv){
|
||||||
cmd["addprotocol"]["cert"] = cbCert;
|
cmd["addprotocol"]["cert"] = cbCert;
|
||||||
cmd["addprotocol"]["key"] = cbKey;
|
cmd["addprotocol"]["key"] = cbKey;
|
||||||
INFO_MSG("Executing: %s", cmd.toString().c_str());
|
INFO_MSG("Executing: %s", cmd.toString().c_str());
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
Util::wait(500);
|
Util::wait(500);
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
Util::wait(500);
|
Util::wait(500);
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
}
|
}
|
||||||
Util::wait(5000);
|
Util::wait(5000);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -119,15 +117,13 @@ int main(int argc, char **argv){
|
||||||
cmd["addprotocol"]["connector"] = "HTTP";
|
cmd["addprotocol"]["connector"] = "HTTP";
|
||||||
cmd["addprotocol"]["port"] = 80;
|
cmd["addprotocol"]["port"] = 80;
|
||||||
cmd["addprotocol"]["certbot"] = cbCombo;
|
cmd["addprotocol"]["certbot"] = cbCombo;
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(cmd);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(cmd.toString());
|
|
||||||
Util::wait(1000);
|
Util::wait(1000);
|
||||||
int counter = 10;
|
int counter = 10;
|
||||||
while (--counter && ((foundHTTP80 = checkPort80(currConf)) == -1 ||
|
while (--counter && ((foundHTTP80 = checkPort80(currConf)) == -1 ||
|
||||||
currConf["certbot"].asStringRef() != cbCombo)){
|
currConf["certbot"].asStringRef() != cbCombo)){
|
||||||
INFO_MSG("Waiting for Controller to pick up new config...");
|
INFO_MSG("Waiting for Controller to pick up new config...");
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
Util::wait(1000);
|
Util::wait(1000);
|
||||||
}
|
}
|
||||||
if (!counter){
|
if (!counter){
|
||||||
|
@ -146,15 +142,13 @@ int main(int argc, char **argv){
|
||||||
cmd["updateprotocol"].append(currConf);
|
cmd["updateprotocol"].append(currConf);
|
||||||
cmd["updateprotocol"].append(currConf);
|
cmd["updateprotocol"].append(currConf);
|
||||||
cmd["updateprotocol"][1u]["certbot"] = cbCombo;
|
cmd["updateprotocol"][1u]["certbot"] = cbCombo;
|
||||||
Socket::UDPConnection uSock;
|
Util::sendUDPApi(cmd);
|
||||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
|
||||||
uSock.SendNow(cmd.toString());
|
|
||||||
Util::wait(1000);
|
Util::wait(1000);
|
||||||
int counter = 10;
|
int counter = 10;
|
||||||
while (--counter && ((foundHTTP80 = checkPort80(currConf)) == -1 ||
|
while (--counter && ((foundHTTP80 = checkPort80(currConf)) == -1 ||
|
||||||
currConf["certbot"].asStringRef() != cbCombo)){
|
currConf["certbot"].asStringRef() != cbCombo)){
|
||||||
INFO_MSG("Waiting for Controller to pick up new config...");
|
INFO_MSG("Waiting for Controller to pick up new config...");
|
||||||
uSock.SendNow(cmd.toString());
|
Util::sendUDPApi(cmd);
|
||||||
Util::wait(1000);
|
Util::wait(1000);
|
||||||
}
|
}
|
||||||
if (!counter){
|
if (!counter){
|
||||||
|
|
Loading…
Add table
Reference in a new issue