diff --git a/CMakeLists.txt b/CMakeLists.txt index d0aac6c7..c42300f3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -855,6 +855,7 @@ add_executable(MistController src/controller/controller_capabilities.h src/controller/controller_streams.h src/controller/controller_push.h + src/controller/controller_variables.h src/controller/controller.cpp src/controller/controller_updater.cpp src/controller/controller_streams.cpp @@ -866,6 +867,7 @@ add_executable(MistController src/controller/controller_uplink.cpp src/controller/controller_api.cpp src/controller/controller_push.cpp + src/controller/controller_variables.cpp generated/server.html.h ${BINARY_DIR}/mist/.headers ) diff --git a/lib/defines.h b/lib/defines.h index 7f09b00c..9f04f8e4 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -204,6 +204,8 @@ static inline void show_stackframe(){} #define COMMS_SESSIONS "MstSession%s" #define COMMS_SESSIONS_INITSIZE 8 * 1024 * 1024 +#define CUSTOM_VARIABLES_INITSIZE 64 * 1024 + #define SEM_STATISTICS "/MstStat" #define SEM_USERS "/MstUser%s" //%s stream name @@ -238,6 +240,7 @@ static inline void show_stackframe(){} #define SHM_STATE_LOGS "MstStateLogs" #define SHM_STATE_ACCS "MstStateAccs" #define SHM_STATE_STREAMS "MstStateStreams" +#define SHM_CUSTOM_VARIABLES "MstVars" #define NAME_BUFFER_SIZE 200 // char buffer size for snprintf'ing shm filenames #define SHM_SESSIONS "/MstSess" #define SHM_SESSIONS_ITEM 165 // 4 byte crc, 100b streamname, 20b connector, 40b host, 1b sync diff --git a/lib/procs.cpp b/lib/procs.cpp index 2ed48020..36e0332e 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -234,19 +234,96 @@ void Util::Procs::childsig_handler(int signum){ } /// Runs the given command and returns the stdout output as a string. -std::string Util::Procs::getOutputOf(char *const *argv){ +/// \param maxWait amount of milliseconds to wait for new output to come in over stdout before aborting +std::string Util::Procs::getOutputOf(char *const *argv, uint64_t maxWait){ int fin = 0, fout = -1, ferr = 0; + uint64_t waitedFor = 0; + uint8_t tries = 0; pid_t myProc = StartPiped(argv, &fin, &fout, &ferr); Socket::Connection O(-1, fout); + O.setBlocking(false); Util::ResizeablePointer ret; while (childRunning(myProc) || O){ if (O.spool() || O.Received().size()){ + waitedFor = 0; + tries = 0; while (O.Received().size()){ std::string & t = O.Received().get(); ret.append(t); t.clear(); } }else{ + if (maxWait && waitedFor > maxWait){ + WARN_MSG("Timeout while getting output of '%s', returning %luB of data", (char *)argv, ret.size()); + break; + } + else if(maxWait){ + uint64_t waitTime = Util::expBackoffMs(tries++, 10, maxWait); + Util::sleep(waitTime); + waitedFor += waitTime; + } + else{ + Util::sleep(50); + } + } + } + return std::string(ret, ret.size()); +} + +/// Runs the given command and returns the stdout output as a string. +/// \param maxWait amount of milliseconds to wait before shutting down the spawned process +/// \param maxValBytes amount of Bytes allowed in the output before shutting down the spawned process +std::string Util::Procs::getLimitedOutputOf(char *const *argv, uint64_t maxWait, uint32_t maxValBytes){ + int fin = 0, fout = -1, ferr = 0; + uint64_t waitedFor = 0; + uint8_t tries = 0; + pid_t myProc = StartPiped(argv, &fin, &fout, &ferr); + Socket::Connection O(-1, fout); + O.setBlocking(false); + Util::ResizeablePointer ret; + std::string fullCmd; + uint8_t idx = 0; + while (argv[idx]){ + fullCmd += argv[idx++]; + fullCmd += " "; + } + while (childRunning(myProc) || O){ + if (O.spool() || O.Received().size()){ + tries = 0; + while (O.Received().size()){ + std::string & t = O.Received().get(); + ret.append(t); + t.clear(); + } + }else{ + if (waitedFor > maxWait){ + WARN_MSG("Reached timeout of %lu ms. Killing process with command %s...", maxWait, fullCmd.c_str()); + break; + } + else { + uint64_t waitTime = Util::expBackoffMs(tries++, 10, maxWait); + Util::sleep(waitTime); + waitedFor += waitTime; + } + } + if (ret.size() > maxValBytes){ + WARN_MSG("Have a limit of %uB, but received %luB of data. Killing process with command %s...", maxValBytes, ret.size(), fullCmd.c_str()); + break; + } + } + // Stop the process if it is still running + if (childRunning(myProc)){ + close(fout); + Stop(myProc); + waitedFor = 0; + } + // Give it a few seconds, but then forcefully stop it + while (childRunning(myProc)){ + if (waitedFor > 2000){ + Murder(myProc); + break; + }else{ + waitedFor += 50; Util::sleep(50); } } @@ -261,10 +338,10 @@ char *const *Util::Procs::dequeToArgv(std::deque &argDeq){ return ret; } -std::string Util::Procs::getOutputOf(std::deque &argDeq){ +std::string Util::Procs::getOutputOf(std::deque &argDeq, uint64_t maxWait){ std::string ret; char *const *argv = dequeToArgv(argDeq); // Note: Do not edit deque before executing command - ret = getOutputOf(argv); + ret = getOutputOf(argv, maxWait); return ret; } diff --git a/lib/procs.h b/lib/procs.h index bf46036f..71641551 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -8,6 +8,7 @@ #include #include #include +#include /// Contains utility code, not directly related to streaming media namespace Util{ @@ -29,8 +30,9 @@ namespace Util{ static void fork_prepare(); static void fork_complete(); static void setHandler(); - static std::string getOutputOf(char *const *argv); - static std::string getOutputOf(std::deque &argDeq); + static std::string getOutputOf(char *const *argv, uint64_t maxWait = 0); + static std::string getOutputOf(std::deque &argDeq, uint64_t maxWait = 0); + static std::string getLimitedOutputOf(char *const *argv, uint64_t maxWait, uint32_t maxValBytes); static pid_t StartPiped(const char *const *argv, int *fdin, int *fdout, int *fderr); static pid_t StartPiped(std::deque &argDeq, int *fdin, int *fdout, int *fderr); static void Stop(pid_t name); diff --git a/lib/stream.cpp b/lib/stream.cpp index 693a3c74..e4c4f01d 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -107,35 +107,87 @@ std::string Util::codecString(const std::string &codec, const std::string &initD return ""; } +/// Local-only helper function that replaces a variable and returns the amount of replacements done +size_t replaceVar(std::string & input, const std::string & var, const std::string & rep){ + size_t count = 0; + const std::string withBraces = "${"+var+"}"; + const std::string noBraces = "$"+var; + count += Util::replace(input, withBraces, rep); + count += Util::replace(input, noBraces, rep); + return count; +} + +size_t Util::streamCustomVariables(std::string &str){ + size_t count = 0; + // Read shared memory page containing custom variables + static IPC::sharedPage variablePage(SHM_CUSTOM_VARIABLES, 0, false, false); + // Check if the page needs to be reopened + if (variablePage.mapped){ + Util::RelAccX varAccX(variablePage.mapped, false); + if (varAccX.isReload()){variablePage.close();} + } + // Reopen memory page if it has been closed + if (!variablePage.mapped){ + variablePage.init(SHM_CUSTOM_VARIABLES, 0, false, false); + if(!variablePage.mapped){ + ERROR_MSG("Unable to substitute custom variables, as memory page %s failed to open", SHM_CUSTOM_VARIABLES); + return 0; + } + } + // Extract variables + Util::RelAccX varAccX(variablePage.mapped, false); + for (size_t i = 0; i < varAccX.getEndPos(); i++){ + // Replace $thisName with $thisVal + if (varAccX.getPointer("name", i)){ + std::string thisName = "$" + std::string(varAccX.getPointer("name", i)); + std::string thisVal = std::string(varAccX.getPointer("lastVal", i)); + count += replaceVar(str, thisName, thisVal); + } + } + return count; +} + /// Replaces all stream-related variables in the given 'str' with their values. -void Util::streamVariables(std::string &str, const std::string &streamname, const std::string &source){ - Util::replace(str, "$source", source); - Util::replace(str, "$datetime", "$year.$month.$day.$hour.$minute.$second"); - Util::replace(str, "$day", strftime_now("%d")); - Util::replace(str, "$month", strftime_now("%m")); - Util::replace(str, "$year", strftime_now("%Y")); - Util::replace(str, "$hour", strftime_now("%H")); - Util::replace(str, "$minute", strftime_now("%M")); - Util::replace(str, "$second", strftime_now("%S")); - Util::replace(str, "$wday", strftime_now("%u")); // weekday, 1-7, monday=1 - Util::replace(str, "$yday", strftime_now("%j")); // yearday, 001-366 - Util::replace(str, "$week", strftime_now("%V")); // week number, 01-53 - Util::replace(str, "$stream", streamname); +size_t Util::streamVariables(std::string &str, const std::string &streamname, const std::string &source, uint8_t depth){ + size_t replaced = 0; + if (depth > 9){ + WARN_MSG("Reached a depth of %u when replacing stream variables", depth); + return 0; + } + // If there are no variables, abort + if (str.find('$') == std::string::npos){return 0;} + // Find and replace any custom variables + replaced += streamCustomVariables(str); + replaced += replaceVar(str, "source", source); + replaced += replaceVar(str, "datetime", "$year.$month.$day.$hour.$minute.$second"); + replaced += replaceVar(str, "day", strftime_now("%d")); + replaced += replaceVar(str, "month", strftime_now("%m")); + replaced += replaceVar(str, "year", strftime_now("%Y")); + replaced += replaceVar(str, "hour", strftime_now("%H")); + replaced += replaceVar(str, "minute", strftime_now("%M")); + replaced += replaceVar(str, "second", strftime_now("%S")); + replaced += replaceVar(str, "wday", strftime_now("%u")); // weekday, 1-7, monday=1 + replaced += replaceVar(str, "yday", strftime_now("%j")); // yearday, 001-366 + replaced += replaceVar(str, "week", strftime_now("%V")); // week number, 01-53 + replaced += replaceVar(str, "stream", streamname); if (streamname.find('+') != std::string::npos){ std::string strbase = streamname.substr(0, streamname.find('+')); std::string strext = streamname.substr(streamname.find('+') + 1); - Util::replace(str, "$basename", strbase); - Util::replace(str, "$wildcard", strext); + replaced += Util::replace(str, "basename", strbase); + replaced += Util::replace(str, "wildcard", strext); if (strext.size()){ - Util::replace(str, "$pluswildcard", "+" + strext); + replaced += Util::replace(str, "pluswildcard", "+" + strext); }else{ - Util::replace(str, "$pluswildcard", ""); + replaced += Util::replace(str, "pluswildcard", ""); } }else{ - Util::replace(str, "$basename", streamname); - Util::replace(str, "$wildcard", ""); - Util::replace(str, "$pluswildcard", ""); + replaced += Util::replace(str, "basename", streamname); + replaced += Util::replace(str, "wildcard", ""); + replaced += Util::replace(str, "pluswildcard", ""); } + // Continue recursively if we've replaced a variable which exposed another variable to be replaced + if (replaced && str.find('$') != std::string::npos){replaced += streamVariables(str, streamName, source, ++depth);} + return replaced; } std::string Util::getTmpFolder(){ diff --git a/lib/stream.h b/lib/stream.h index 8b60c309..4e07c3e7 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -13,7 +13,8 @@ const JSON::Value empty; namespace Util{ - void streamVariables(std::string &str, const std::string &streamname, const std::string &source = ""); + size_t streamCustomVariables(std::string &str); + size_t streamVariables(std::string &str, const std::string &streamname, const std::string &source = "", uint8_t depth = 0); std::string getTmpFolder(); void sanitizeName(std::string &streamname); bool streamAlive(std::string &streamname); diff --git a/lib/triggers.cpp b/lib/triggers.cpp index d206634b..de33d8ba 100644 --- a/lib/triggers.cpp +++ b/lib/triggers.cpp @@ -94,9 +94,9 @@ namespace Triggers{ while (Util::Procs::isActive(myProc) && counter < 150){ Util::sleep(100); ++counter; - if (counter >= 150){ - if (counter == 150){FAIL_MSG("Trigger taking too long - killing process");} - if (counter >= 250){ + if (counter >= 100){ + if (counter == 100){FAIL_MSG("Trigger taking too long - killing process");} + if (counter >= 140){ Util::Procs::Stop(myProc); }else{ Util::Procs::Murder(myProc); diff --git a/lib/util.cpp b/lib/util.cpp index 2edab36c..60b0bd65 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -150,14 +150,41 @@ namespace Util{ } } - /// Replaces any occurrences of 'from' with 'to' in 'str'. - void replace(std::string &str, const std::string &from, const std::string &to){ - if (from.empty()){return;} + /// Replaces any occurrences of 'from' with 'to' in 'str', returns how many replacements were made + size_t replace(std::string &str, const std::string &from, const std::string &to){ + if (from.empty()){return 0;} + size_t counter = 0; size_t start_pos = 0; while ((start_pos = str.find(from, start_pos)) != std::string::npos){ str.replace(start_pos, from.length(), to); + ++counter; start_pos += to.length(); } + return counter; + } + + /// \brief Removes whitespace from the beginning and end of a given string + void stringTrim(std::string &val){ + if (!val.size()){ return; } + uint64_t startPos = 0; + uint64_t length = 0; + // Set startPos to the first character which does not have value 09-13 + for (uint64_t i = 0; i < val.size(); i++){ + if (val[i] == 32){continue;} + if (val[i] < 9 || val[i] > 13){ + startPos = i; + break; + } + } + // Same thing in reverse for endPos + for (uint64_t i = val.size() - 1; i > 0 ; i--){ + if (val[i] == 32){continue;} + if (val[i] < 9 || val[i] > 13){ + length = i + 1 - startPos; + break; + } + } + val = val.substr(startPos, length); } //Returns the time to wait in milliseconds for exponential back-off waiting. diff --git a/lib/util.h b/lib/util.h index 0659b2c0..8fc39f13 100644 --- a/lib/util.h +++ b/lib/util.h @@ -13,7 +13,8 @@ namespace Util{ bool createPath(const std::string &path); bool stringScan(const std::string &src, const std::string &pattern, std::deque &result); void stringToLower(std::string &val); - void replace(std::string &str, const std::string &from, const std::string &to); + size_t replace(std::string &str, const std::string &from, const std::string &to); + void stringTrim(std::string &val); int64_t expBackoffMs(const size_t currIter, const size_t maxIter, const int64_t maxWait); diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 8350b14d..695b2422 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -8,6 +8,7 @@ #include "controller_statistics.h" #include "controller_storage.h" #include "controller_streams.h" +#include "controller_variables.h" #include #include //for ram space check #include @@ -567,6 +568,8 @@ int main_loop(int argc, char **argv){ tthread::thread uplinkThread(Controller::uplinkConnection, 0); /*LTS*/ // start push checking thread tthread::thread pushThread(Controller::pushCheckLoop, 0); + // start variable checking thread + tthread::thread variableThread(Controller::variableCheckLoop, 0); #ifdef UPDATER // start updater thread tthread::thread updaterThread(Controller::updateThread, 0); @@ -620,6 +623,8 @@ int main_loop(int argc, char **argv){ uplinkThread.join(); HIGH_MSG("Joining push thread..."); pushThread.join(); + HIGH_MSG("Joining variable thread..."); + variableThread.join(); #ifdef UPDATER HIGH_MSG("Joining updater thread..."); updaterThread.join(); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 672ce24d..cf8808d3 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -19,6 +19,7 @@ /*LTS-START*/ #include "controller_limits.h" #include "controller_push.h" +#include "controller_variables.h" #include "controller_updater.h" /*LTS-END*/ @@ -1198,6 +1199,10 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ Controller::pushSettings(Request["push_settings"], Response["push_settings"]); } + if (Request.isMember("variable_list")){Controller::listCustomVariables(Response["variable_list"]);} + if (Request.isMember("variable_add")){Controller::addVariable(Request["variable_add"], Response["variable_list"]);} + if (Request.isMember("variable_remove")){Controller::removeVariable(Request["variable_remove"], Response["variable_list"]);} + Controller::writeConfig(); Controller::configChanged = false; diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index f12c9221..21799c3b 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -77,8 +77,15 @@ namespace Controller{ std::set toWipe; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ if (Util::Procs::isActive(it->first)){ - if (it->second[1u].asStringRef() == streamname && it->second[2u].asStringRef() == target){ - return true; + // Apply variable substitution to make sure another push target does not resolve to the same target + if (it->second[1u].asStringRef() == streamname){ + std::string activeTarget = it->second[2u].asStringRef(); + std::string cmpTarget = target; + Util::streamVariables(activeTarget, streamname); + Util::streamVariables(cmpTarget, streamname); + if (activeTarget == cmpTarget){ + return true; + } } }else{ toWipe.insert(it->first); @@ -323,9 +330,7 @@ namespace Controller{ for (std::set::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ std::string streamname = *it; if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ - std::string tmpName = streamname; - std::string tmpTarget = target; - startPush(tmpName, tmpTarget); + startPush(streamname, target); } } } @@ -372,7 +377,9 @@ namespace Controller{ std::string stream = streamname; Util::sanitizeName(stream); std::string target = (*it)[1u]; - startPush(stream, target); + if (!isPushActive(stream, target)){ + startPush(stream, target); + } } } } diff --git a/src/controller/controller_variables.cpp b/src/controller/controller_variables.cpp new file mode 100644 index 00000000..cc82709d --- /dev/null +++ b/src/controller/controller_variables.cpp @@ -0,0 +1,387 @@ +#include "controller_variables.h" +#include "controller_statistics.h" +#include "controller_storage.h" +#include +#include +#include +#include +#include +#include +#include + +namespace Controller{ + // Indicates whether the shared memory page is stale compared to the server config + static bool mutateShm = true; + // Size of the shared memory page + static uint64_t pageSize = CUSTOM_VARIABLES_INITSIZE; + tthread::mutex variableMutex; + + /// \brief Loops, checking every second if any variables require updating + void variableCheckLoop(void *np){ + while (Controller::conf.is_active){ + { + tthread::lock_guard guard(variableMutex); + uint64_t now = Util::epoch(); + // Check if any custom variable target needs to be run + IPC::sharedPage variablePage(SHM_CUSTOM_VARIABLES, 0, false, false); + if (variablePage.mapped){ + Util::RelAccX varAccX(variablePage.mapped, false); + if (varAccX.isReady()){ + for (size_t i = 0; i < varAccX.getEndPos(); i++){ + std::string name = varAccX.getPointer("name", i); + std::string target = varAccX.getPointer("target", i); + uint32_t interval = varAccX.getInt("interval", i); + uint64_t lastRun = varAccX.getInt("lastRun", i); + uint32_t waitTime = varAccX.getInt("waitTime", i); + if (target.size() && (!lastRun || (interval && (lastRun + interval < now)))){ + // Set the wait time to the interval, or 1 second if it is less than 1 second + if (!waitTime){ + waitTime = interval; + } + if (waitTime < 1){ + waitTime = 1; + } + runVariableTarget(name, target, waitTime); + } + } + } + } + // Write variables from the server config to shm if any data has changed + if (mutateShm){ + writeToShm(); + mutateShm = false; + } + } + Util::sleep(1000); + } + // Cleanup shared memory page + IPC::sharedPage variablesPage(SHM_CUSTOM_VARIABLES, pageSize, false, false); + if (variablesPage.mapped){ + variablesPage.master = true; + } + } + + /// \brief Writes custom variable from the server config to shared memory + void writeToShm(){ + uint64_t variableCount = Controller::Storage["variables"].size(); + IPC::sharedPage variablesPage(SHM_CUSTOM_VARIABLES, pageSize, false, false); + // If we have an existing page, set the reload flag + if (variablesPage.mapped){ + variablesPage.master = true; + Util::RelAccX varAccX = Util::RelAccX(variablesPage.mapped, false); + // Check if we need a bigger page + uint64_t sizeRequired = varAccX.getOffset() + varAccX.getRSize() * variableCount; + if (pageSize < sizeRequired){pageSize = sizeRequired;} + varAccX.setReload(); + } + // Close & unlink any existing page and create a new one + variablesPage.close(); + variablesPage.init(SHM_CUSTOM_VARIABLES, pageSize, true, false); + Util::RelAccX varAccX = Util::RelAccX(variablesPage.mapped, false); + varAccX = Util::RelAccX(variablesPage.mapped, false); + varAccX.addField("name", RAX_32STRING); + varAccX.addField("target", RAX_512STRING); + varAccX.addField("interval", RAX_32UINT); + varAccX.addField("lastRun", RAX_64UINT); + varAccX.addField("lastVal", RAX_128STRING); + varAccX.addField("waitTime", RAX_32UINT); + // Set amount of records that can fit and how many will be used by custom variables + uint64_t reqCount = (pageSize - varAccX.getOffset()) / varAccX.getRSize(); + varAccX.setRCount(reqCount); + varAccX.setPresent(reqCount); + varAccX.setEndPos(variableCount); + // Write the server config to shm + uint64_t index = 0; + jsonForEach(Controller::Storage["variables"], it){ + std::string name = (*it)[0u].asString(); + std::string target = (*it)[1u].asString(); + uint32_t interval = (*it)[2u].asInt(); + uint64_t lastRun = (*it)[3u].asInt(); + std::string lastVal = (*it)[4u].asString(); + uint32_t waitTime = (*it)[5u].asInt(); + varAccX.setString(varAccX.getFieldData("name"), name, index); + varAccX.setString(varAccX.getFieldData("target"), target, index); + varAccX.setInt(varAccX.getFieldData("interval"), interval, index); + varAccX.setInt(varAccX.getFieldData("lastRun"), lastRun, index); + varAccX.setString(varAccX.getFieldData("lastVal"), lastVal, index); + varAccX.setInt(varAccX.getFieldData("waitTime"), waitTime, index); + index++; + } + varAccX.setReady(); + // Leave the page in memory after returning + variablesPage.master = false; + } + + /// \brief Queues a new custom variable to be added + /// The request should contain a variable name, a target and an interval + void addVariable(const JSON::Value &request, JSON::Value &output){ + std::string name; + std::string target; + std::string value; + uint32_t interval; + uint32_t waitTime; + bool isNew = true; + if (request.isArray()){ + // With length 2 is a hardcoded custom variable of [name, value] + if (request.size() == 2){ + name = request[0u].asString(); + value = request[1u].asString(); + target = ""; + interval = 0; + waitTime = interval; + }else if(request.size() == 3){ + name = request[0u].asString(); + target = request[1u].asString(); + interval = request[2u].asInt(); + value = ""; + waitTime = interval; + }else if(request.size() == 4){ + name = request[0u].asString(); + target = request[1u].asString(); + interval = request[2u].asInt(); + value = request[3u].asString(); + waitTime = interval; + }else if(request.size() == 5){ + name = request[0u].asString(); + target = request[1u].asString(); + interval = request[2u].asInt(); + value = request[3u].asString(); + waitTime = request[4u].asInt(); + }else{ + ERROR_MSG("Cannot add custom variable, as the request contained %u variables", request.size()); + return; + } + }else{ + name = request["name"].asString(); + if (request.isMember("target")){ + target = request["target"].asString(); + }else{ + target = ""; + } + if (request.isMember("interval")){ + interval = request["interval"].asInt(); + }else{ + interval = 0; + } + if (request.isMember("value")){ + value = request["value"].asString(); + }else{ + value = ""; + } + if (request.isMember("waitTime")){ + waitTime = request["waitTime"].asInt(); + }else{ + waitTime = interval; + } + } + if (!name.size()){ + WARN_MSG("Unable to retrieve variable name from request"); + return; + } + if ((target.find("'") != std::string::npos) || (target.find('"') != std::string::npos)){ + ERROR_MSG("Cannot add custom variable, as the request contained a ' or \" character (got '%s')", target.c_str()); + return; + } + if (name.size() > 31){ + name = name.substr(0, 31); + WARN_MSG("Maximum name size is 31 characters, truncating name to '%s'", name.c_str()); + } + if (target.size() > 511){ + target = target.substr(0, 511); + WARN_MSG("Maximum target size is 511 characters, truncating target to '%s'", target.c_str()); + } + if (value.size() > 64){ + value = value.substr(0, 63); + WARN_MSG("Maximum value size is 63 characters, truncating value to '%s'", value.c_str()); + } + tthread::lock_guard guard(variableMutex); + // Check if we have an existing variable with the same name to modify + jsonForEach(Controller::Storage["variables"], it){ + if ((*it)[0u].asString() == name){ + INFO_MSG("Modifying existing custom variable '%s'", name.c_str()); + (*it)[1u] = target; + (*it)[2u] = interval; + // Reset lastRun so that the lastValue gets updated during the next iteration + (*it)[3u] = 0; + // If we received a value, overwrite it + if (value.size()){(*it)[4u] = value;} + (*it)[5u] = waitTime; + isNew = false; + } + } + // Else push a new custom variable to the list + if (isNew){ + INFO_MSG("Adding new custom variable '%s'", name.c_str()); + JSON::Value thisVar; + thisVar.append(name); + thisVar.append(target); + thisVar.append(interval); + thisVar.append(0); + thisVar.append(value); + thisVar.append(waitTime); + Controller::Storage["variables"].append(thisVar); + } + // Modify shm + writeToShm(); + // Return variable list + listCustomVariables(output); + } + + /// \brief Fills output with all defined custom variables + void listCustomVariables(JSON::Value &output){ + output.null(); + // First check shm for custom variables + IPC::sharedPage variablePage(SHM_CUSTOM_VARIABLES, 0, false, false); + if (variablePage.mapped){ + Util::RelAccX varAccX(variablePage.mapped, false); + if (varAccX.isReady()){ + for (size_t i = 0; i < varAccX.getEndPos(); i++){ + std::string name = varAccX.getPointer("name", i); + std::string target = varAccX.getPointer("target", i); + uint32_t interval = varAccX.getInt("interval", i); + uint64_t lastRun = varAccX.getInt("lastRun", i); + std::string lastVal = varAccX.getPointer("lastVal", i); + uint32_t waitTime = varAccX.getInt("waitTime", i); + // If there is no target, assume this is a static custom variable + if (target.size()){ + output[name].append(target); + output[name].append(interval); + output[name].append(lastRun); + output[name].append(lastVal); + output[name].append(waitTime); + }else{ + output[name] = lastVal; + } + } + return; + } + } + ERROR_MSG("Unable to list custom variables from shm. Retrying from server config"); + jsonForEach(Controller::Storage["variables"], it){ + std::string name = (*it)[0u].asString(); + std::string target = (*it)[1u].asString(); + uint32_t interval = (*it)[2u].asInt(); + uint64_t lastRun = (*it)[3u].asInt(); + std::string lastVal = (*it)[4u].asString(); + uint32_t waitTime = (*it)[5u].asInt(); + // If there is no target, assume this is a static custom variable + if (target.size()){ + output[name].append(target); + output[name].append(interval); + output[name].append(lastRun); + output[name].append(lastVal); + output[name].append(waitTime); + }else{ + output[name] = lastVal; + } + } + } + + /// \brief Removes the variable name contained in the request from shm and the sever config + void removeVariable(const JSON::Value &request, JSON::Value &output){ + if (request.isString()){ + removeVariableByName(request.asStringRef()); + listCustomVariables(output); + return; + } + if (request.isArray()){ + if (request[0u].size()){ + removeVariableByName(request[0u].asStringRef()); + listCustomVariables(output); + return; + } + }else{ + if (request.isMember("name")){ + removeVariableByName(request["name"].asStringRef()); + listCustomVariables(output); + return; + } + } + WARN_MSG("Received a request to remove a custom variable, but no name was given"); + } + + /// \brief Removes the variable with the given name from shm and the server config + void removeVariableByName(const std::string &name){ + tthread::lock_guard guard(variableMutex); + // Modify config + jsonForEach(Controller::Storage["variables"], it){ + if ((*it)[0u].asString() == name){ + INFO_MSG("Removing variable named `%s`", name.c_str()); + it.remove(); + } + } + // Modify shm + writeToShm(); + } + + /// \brief Runs the target of a specific variable and stores the result + /// \param name name of the variable we are running + /// \param target path or url to get results from + void runVariableTarget(const std::string &name, const std::string &target, const uint64_t &waitTime){ + HIGH_MSG("Updating custom variable '%s' <- '%s'", name.c_str(), target.c_str()); + // Post URL for data + if (target.substr(0, 7) == "http://" || target.substr(0, 8) == "https://"){ + HTTP::Downloader DL; + DL.setHeader("X-Custom-Variable", name); + DL.setHeader("Content-Type", "text/plain"); + HTTP::URL url(target); + if (DL.post(url, NULL, 0, true) && DL.isOk()){ + mutateVariable(name, DL.data()); + return; + } + ERROR_MSG("Custom variable target failed to execute (%s)", DL.getStatusText().c_str()); + return; + // Fork target executable and catch stdout + }else{ + // Rewrite target command + std::string tmpCmd = target; + char exec_cmd[10240]; + strncpy(exec_cmd, tmpCmd.c_str(), 10240); + HIGH_MSG("Executing command: %s", exec_cmd); + uint8_t argCnt = 0; + char *startCh = 0; + char *args[1280]; + for (char *i = exec_cmd; i - exec_cmd < 10240; ++i){ + if (!*i){ + if (startCh){args[argCnt++] = startCh;} + break; + } + if (*i == ' '){ + if (startCh){ + args[argCnt++] = startCh; + startCh = 0; + *i = 0; + } + }else{ + if (!startCh){startCh = i;} + } + } + args[argCnt] = 0; + // Run and get stdout + setenv("MIST_CUSTOM_VARIABLE", name.c_str(), 1); + std::string ret = Util::Procs::getLimitedOutputOf(args, waitTime * 1000, 128); + unsetenv("MIST_CUSTOM_VARIABLE"); + // Save output to custom variable + mutateVariable(name, ret); + return; + } + } + + /// \brief Modifies the lastVal of the given custom variable in shm and the server config + void mutateVariable(const std::string name, std::string &newVal){ + uint64_t lastRun = Util::epoch(); + Util::stringTrim(newVal); + if (newVal.size() > 127){ + WARN_MSG("Truncating response of custom variable %s to 127 bytes (received %lu bytes)", name.c_str(), newVal.size()); + newVal = newVal.substr(0, 127); + } + // Modify config + jsonForEach(Controller::Storage["variables"], it){ + if ((*it)[0u].asString() == name){ + (*it)[3u] = lastRun; + (*it)[4u] = newVal; + mutateShm = true; + } + } + } +}// namespace Controller diff --git a/src/controller/controller_variables.h b/src/controller/controller_variables.h new file mode 100644 index 00000000..bf1f424e --- /dev/null +++ b/src/controller/controller_variables.h @@ -0,0 +1,18 @@ +#include +#include +#include +#include + +namespace Controller{ + // API calls to manage custom variables + void addVariable(const JSON::Value &request, JSON::Value &output); + void listCustomVariables(JSON::Value &output); + void removeVariable(const JSON::Value &request, JSON::Value &output); + + // internal use only + void variableCheckLoop(void *np); + void writeToShm(); + void removeVariableByName(const std::string &name); + void runVariableTarget(const std::string &name, const std::string &target, const uint64_t &maxWait); + void mutateVariable(const std::string name, std::string &newVal); +}// namespace Controller diff --git a/src/controller/meson.build b/src/controller/meson.build index 54ac1f5e..0b91f005 100644 --- a/src/controller/meson.build +++ b/src/controller/meson.build @@ -12,7 +12,8 @@ executables += { 'controller_capabilities.cpp', 'controller_uplink.cpp', 'controller_api.cpp', - 'controller_push.cpp'), + 'controller_push.cpp', + 'controller_variables.cpp'), header_tgts, server_html], 'defines': [],