diff --git a/lib/procs.cpp b/lib/procs.cpp index 36e0332e..d2a37a4f 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -274,10 +274,10 @@ std::string Util::Procs::getOutputOf(char *const *argv, uint64_t maxWait){ /// \param maxWait amount of milliseconds to wait before shutting down the spawned process /// \param maxValBytes amount of Bytes allowed in the output before shutting down the spawned process std::string Util::Procs::getLimitedOutputOf(char *const *argv, uint64_t maxWait, uint32_t maxValBytes){ - int fin = 0, fout = -1, ferr = 0; + int fout = -1; uint64_t waitedFor = 0; uint8_t tries = 0; - pid_t myProc = StartPiped(argv, &fin, &fout, &ferr); + pid_t myProc = StartPiped(argv, NULL, &fout, NULL); Socket::Connection O(-1, fout); O.setBlocking(false); Util::ResizeablePointer ret; @@ -311,9 +311,9 @@ std::string Util::Procs::getLimitedOutputOf(char *const *argv, uint64_t maxWait, break; } } + O.close(); // Stop the process if it is still running if (childRunning(myProc)){ - close(fout); Stop(myProc); waitedFor = 0; } @@ -469,11 +469,11 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); uSock.SendNow(j.toString()); std::cout << getenv("MIST_TRIG_DEF"); - exit(42); + _exit(42); } /*LTS-END*/ ERROR_MSG("execvp failed for process %s, reason: %s", argv[0], strerror(errno)); - exit(42); + _exit(42); }else if (pid == -1){ ERROR_MSG("fork failed for process %s, reason: %s", argv[0], strerror(errno)); if (fdin && *fdin == -1){ diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 695b2422..f8d69a4e 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -291,7 +291,11 @@ int main_loop(int argc, char **argv){ } } } - + // Set default delay before retry + if (!Controller::Storage.isMember("push_settings")){ + Controller::Storage["push_settings"]["wait"] = 3; + Controller::Storage["push_settings"]["maxspeed"] = 0; + } if (Controller::conf.getOption("debug", true).size() > 1){ Controller::Storage["config"]["debug"] = Controller::conf.getInteger("debug"); } diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index cf8808d3..e0b1d000 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -1181,13 +1181,13 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ } } - if (Request.isMember("push_auto_add")){Controller::addPush(Request["push_auto_add"]);} + if (Request.isMember("push_auto_add")){Controller::addPush(Request["push_auto_add"], Response["push_list"]);} if (Request.isMember("push_auto_remove")){ if (Request["push_auto_remove"].isArray()){ - jsonForEach(Request["push_auto_remove"], it){Controller::removePush(*it);} + jsonForEach(Request["push_auto_remove"], it){Controller::removePush(*it, Response["push_list"]);} }else{ - Controller::removePush(Request["push_auto_remove"]); + Controller::removePush(Request["push_auto_remove"], Response["push_list"]); } } diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index 21799c3b..cdd2c580 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -169,6 +169,123 @@ namespace Controller{ } } + /// \brief Evaluates + /// Will apply numerical comparison if passed a numerical matchedValue + // and apply lexical comparison if passed a nonnumerical matchedValue + /// \param operator can be: + /// 0: boolean true + /// 1: boolean false + /// 2: == + /// 3: != + /// 10: > (numerical comparison) + /// 11: >= (numerical comparison) + /// 12: < (numerical comparison) + /// 13: <= (numerical comparison) + /// 20 > (lexical comparison) + /// 21: >= (lexical comparison) + /// 22: < (lexical comparison) + /// 23: <= (lexical comparison) + bool checkCondition(const JSON::Value ¤tValue, const uint8_t &comparisonOperator, const JSON::Value &matchedValue){ + std::string currentValueAsString = currentValue.asStringRef(); + if (comparisonOperator == 0){ + return Util::stringToBool(currentValueAsString); + }else if (comparisonOperator == 1){ + return !Util::stringToBool(currentValueAsString); + }else if (comparisonOperator == 2){ + return currentValue == matchedValue; + } else if (comparisonOperator == 3){ + return currentValue != matchedValue; + }else if (comparisonOperator >= 10 && comparisonOperator < 20){ + return checkCondition(currentValue.asInt(), comparisonOperator, matchedValue.asInt()); + }else{ + return checkCondition(currentValueAsString, comparisonOperator, matchedValue.asStringRef()); + } + } + bool checkCondition(const int64_t ¤tValue, const uint8_t &comparisonOperator, const int64_t &matchedValue){ + switch (comparisonOperator){ + case 10: + if (currentValue > matchedValue){return true;} + break; + case 11: + if (currentValue >= matchedValue){return true;} + break; + case 12: + if (currentValue < matchedValue){return true;} + break; + case 13: + if (currentValue <= matchedValue){return true;} + break; + default: + ERROR_MSG("Passed invalid comparison operator of type %u", comparisonOperator); + break; + } + return false; + } + bool checkCondition(const std::string ¤tValue, const uint8_t &comparisonOperator,const std::string &matchedValue){ + int lexCmpResult = strcmp(currentValue.c_str(), matchedValue.c_str()); + switch (comparisonOperator){ + case 20: + if (lexCmpResult > 0){return true;} + break; + case 21: + if (lexCmpResult >= 0){return true;} + break; + case 22: + if (lexCmpResult < 0){return true;} + break; + case 23: + if (lexCmpResult <= 0){return true;} + break; + default: + ERROR_MSG("Passed invalid comparison operator of type %u", comparisonOperator); + break; + } + return false; + } + + /// \brief Returns true if a push should be active, false if it shouldn't be active + bool checkPush(JSON::Value &thisPush){ + uint64_t startTime = thisPush[2u].asInt(); + std::string startVariableName = thisPush[4u].asString(); + std::string endVariableName = thisPush[7u].asString(); + // Get sanitized stream name + std::string stream = thisPush[0u].asString(); + Util::sanitizeName(stream); + // Skip if we have a start time which is in the future + if (startTime && *(stream.rbegin()) != '+' && startTime > Util::epoch()){return false;} + // Check if it supposed to stop + if (endVariableName.size()){ + // Get current value of configured variable + std::string currentValue = "$" + endVariableName; + if (!Util::streamVariables(currentValue, stream)){ + WARN_MSG("Could not find a variable with name `%s`", endVariableName.c_str()); + return false; + } + // Get matched value and apply variable substitution + std::string replacedMatchedValue = thisPush[9u].asString(); + if (replacedMatchedValue.size()){Util::streamVariables(replacedMatchedValue, stream);} + JSON::Value matchedValue(replacedMatchedValue); + // Finally indicate that the push should not be active if the end condition resolves to true + if(checkCondition(JSON::Value(currentValue), thisPush[8u].asInt(), matchedValue)){return false;} + } + // Check if it is allowed to start + if (startVariableName.size()){ + // Get current value of configured variable + std::string currentValue = "$" + startVariableName; + if (!Util::streamVariables(currentValue, stream)){ + WARN_MSG("Could not find a variable with name `%s`", startVariableName.c_str()); + return false; + } + // Get matched value and apply variable substitution + std::string replacedMatchedValue = thisPush[6u].asString(); + if (replacedMatchedValue.size()){Util::streamVariables(replacedMatchedValue, stream);} + JSON::Value matchedValue(replacedMatchedValue); + // Finally indicate that the push should not be active if the end condition resolves to true + return checkCondition(JSON::Value(currentValue), thisPush[5u].asInt(), matchedValue); + } + return true; + } + /// Loops, checking every second if any pushes need restarting. void pushCheckLoop(void *np){ { @@ -188,53 +305,53 @@ namespace Controller{ long long waittime = Controller::Storage["push_settings"]["wait"].asInt(); long long curCount = 0; jsonForEach(Controller::Storage["autopushes"], it){ - if (it->size() > 3 && (*it)[3u].asInt() < Util::epoch()){ - INFO_MSG("Deleting autopush from %s to %s because end time passed", - (*it)[0u].asStringRef().c_str(), (*it)[1u].asStringRef().c_str()); - stopActivePushes((*it)[0u], (*it)[1u]); + std::string stream = (*it)[0u].asStringRef(); + std::string target = (*it)[1u].asStringRef(); + uint64_t startTime = (*it)[2u].asInt(); + uint64_t endTime = (*it)[3u].asInt(); + // Stop any auto pushes which have an elapsed end time + if (endTime && endTime < Util::epoch()){ + INFO_MSG("Deleting autopush from %s to %s because end time passed", stream.c_str(), target.c_str()); + stopActivePushes(stream, target); removePush(*it); break; } - if (it->size() > 2 && *((*it)[0u].asStringRef().rbegin()) != '+'){ - if ((*it)[2u].asInt() <= Util::epoch()){ - std::string streamname = (*it)[0u]; - std::string target = (*it)[1u]; - if (!isPushActive(streamname, target)){ - if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ - waitingPushes[streamname].erase(target); - if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} - startPush(streamname, target); - curCount++; - } - } + // Stop any active push if conditions are not met + if (!checkPush(*it)){ + if (isPushActive(stream, target)){ + MEDIUM_MSG("Conditions of push `%s->%s` evaluate to false. Stopping push...", stream.c_str(), target.c_str()); + stopActivePushes(stream, target); } continue; } - if (waittime || it->size() > 2){ - const std::string &pStr = (*it)[0u].asStringRef(); - std::set activeStreams = Controller::getActiveStreams(pStr); + // We can continue if it is already running + if (isPushActive(stream, target)){continue;} + // Start the push if conditions are met + if (waittime || startTime){ + std::set activeStreams = Controller::getActiveStreams(stream); if (activeStreams.size()){ for (std::set::iterator jt = activeStreams.begin(); - jt != activeStreams.end(); ++jt){ + jt != activeStreams.end(); ++jt){ std::string streamname = *jt; - std::string target = (*it)[1u]; - if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ + if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){ if (!isPushActive(streamname, target)){ if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ waitingPushes[streamname].erase(target); if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} + MEDIUM_MSG("Conditions of push `%s->%s` evaluate to true. Starting push...", stream.c_str(), target.c_str()); startPush(streamname, target); curCount++; + // If no end time is given but there is a start time, remove the push after starting it + if (startTime && !endTime){ + removePush(*it); + break; + } } } } } } } - if (it->size() == 3){ - removePush(*it); - break; - } } //Check if any pushes have ended, clean them up std::set toWipe; @@ -283,73 +400,172 @@ namespace Controller{ /// Adds a push to the list of auto-pushes. /// Auto-starts currently active matches immediately. - void addPush(JSON::Value &request){ + void addPush(JSON::Value &request, JSON::Value &response){ JSON::Value newPush; if (request.isArray()){ newPush = request; }else{ + if (!request.isMember("stream") || !request["stream"].isString()){ + ERROR_MSG("Automatic push not added: it does not contain a valid stream name"); + return; + } newPush.append(request["stream"]); + if (!request.isMember("target") || !request["target"].isString()){ + ERROR_MSG("Automatic push not added: it does not contain a valid target"); + return; + } newPush.append(request["target"]); - bool startTime = false; if (request.isMember("scheduletime") && request["scheduletime"].isInt()){ newPush.append(request["scheduletime"]); - startTime = true; + }else{ + newPush.append(0u); } if (request.isMember("completetime") && request["completetime"].isInt()){ - if (!startTime){newPush.append(0u);} newPush.append(request["completetime"]); + }else{ + newPush.append(0u); + } + if (request.isMember("startVariableName")){ + newPush.append(request["startVariableName"]); + }else{ + newPush.append(""); + } + if (request.isMember("startVariableOperator")){ + newPush.append(request["startVariableOperator"]); + }else{ + newPush.append(0); + } + if (request.isMember("startVariableValue")){ + newPush.append(request["startVariableValue"]); + }else{ + newPush.append(""); + } + if (request.isMember("endVariableName")){ + newPush.append(request["endVariableName"]); + }else{ + newPush.append(""); + } + if (request.isMember("endVariableOperator")){ + newPush.append(request["endVariableOperator"]); + }else{ + newPush.append(0); + } + if (request.isMember("endVariableValue")){ + newPush.append(request["endVariableValue"]); + }else{ + newPush.append(""); } } long long epo = Util::epoch(); - if (newPush.size() > 3 && newPush[3u].asInt() <= epo){ - WARN_MSG("Automatic push not added: removal time is in the past! (%" PRId64 " <= %" PRIu64 ")", - newPush[3u].asInt(), Util::epoch()); + if (request.size() < 2){ + ERROR_MSG("Automatic push not added: should contain at least a stream name and target"); return; } - bool edited = false; - jsonForEach(Controller::Storage["autopushes"], it){ - if ((*it)[0u] == newPush[0u] && (*it)[1u] == newPush[1u]){ - (*it) = newPush; - edited = true; + // Init optional fields if they were omitted from the addPush request + // We only have a stream and target, so fill in the scheduletime and completetime + while(newPush.size() < 4){newPush.append(0u);} + // The request seems to be using variables and likely skipped the scheduletime and completetime set to 0 + if (newPush[2].isString()){ + JSON::Value modPush; + modPush.append(newPush[0u]); + modPush.append(newPush[1u]); + modPush.append(0u); + modPush.append(0u); + for (uint8_t idx = 2; idx < newPush.size(); idx++){ + modPush.append(newPush[idx]); } + newPush = modPush; } - if (!edited && (newPush.size() != 3 || newPush[2u].asInt() > epo)){ - Controller::Storage["autopushes"].append(newPush); - } - if (newPush.size() < 3 || newPush[2u].asInt() <= epo){ - if (newPush.size() > 2 && *(newPush[0u].asStringRef().rbegin()) != '+'){ - std::string streamname = newPush[0u].asStringRef(); - std::string target = newPush[1u].asStringRef(); - startPush(streamname, target); + // Variable conditions are used. We should have either 7 (only start variable condition) or 10 values (start + stop variable conditions) + if (newPush.size() > 4){ + if (newPush.size() == 7){ + newPush.append(""); + newPush.append(0u); + newPush.append(""); + } else if (newPush.size() != 10){ + ERROR_MSG("Automatic push not added: passed incomplete data for the start or stop variable"); return; } - const std::string &pStr = newPush[0u].asStringRef(); - std::set activeStreams = Controller::getActiveStreams(pStr); - if (activeStreams.size()){ - std::string target = newPush[1u].asStringRef(); - for (std::set::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ - std::string streamname = *it; - if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ - startPush(streamname, target); - } + }else{ + // Init the start and stop variable conditions + newPush.append(""); + newPush.append(0u); + newPush.append(""); + newPush.append(""); + newPush.append(0u); + newPush.append(""); + } + // Make sure all start variable values have been initialised + if (newPush.size() == 7 && (!newPush[5u].isString() || !newPush[6u].isInt() || !newPush[7u].isString())); + // Make sure all stop variable values have been initialised + if (newPush.size() == 10 && (!newPush[8u].isString() || !newPush[9u].isInt() || !newPush[10u].isString())); + // Final sanity checks on input + std::string stream = newPush[0u].asStringRef(); + std::string target = newPush[1u].asStringRef(); + uint64_t startTime = newPush[2u].asInt(); + uint64_t endTime = newPush[3u].asInt(); + if (endTime && endTime <= epo){ + ERROR_MSG("Automatic push not added: removal time is in the past! (%" PRIu64 " <= %lld)", endTime, epo); + return; + } + + // If we have an existing push: edit it + bool shouldSave = true; + jsonForEach(Controller::Storage["autopushes"], it){ + if ((*it)[0u] == stream && (*it)[1u] == target){ + (*it) = newPush; + shouldSave = false; + } + } + // If a newly added push only has a defined start time, immediately start it and never save it + if (startTime && !endTime){ + INFO_MSG("Immediately starting push %s->%s as the added push only has a defined start time" + , stream.c_str(), target.c_str()); + startPush(stream, target); + // Return push list + response["push_auto_list"] = Controller::Storage["autopushes"]; + return; + } + // Save as a new variable if we have not edited an existing variable + if (shouldSave){ + Controller::Storage["autopushes"].append(newPush); + } + // and start it immediately if conditions are met + if (!checkPush(newPush)){return;} + std::set activeStreams = Controller::getActiveStreams(stream); + if (activeStreams.size()){ + for (std::set::iterator jt = activeStreams.begin(); + jt != activeStreams.end(); ++jt){ + std::string streamname = *jt; + if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){ + startPush(streamname, target); } } } + // Return push list + response["push_auto_list"] = Controller::Storage["autopushes"]; } - /// Removes a push from the list of auto-pushes. + /// Removes a push from the list of auto-pushes and returns the new list of pushes /// Does not stop currently active matching pushes. - void removePush(const JSON::Value &request){ + void removePush(const JSON::Value &request, JSON::Value &response){ + removePush(request); + // Return push list + response["push_auto_list"] = Controller::Storage["autopushes"]; + } + + /// Removes a push from the list of auto-pushes + void removePush(const JSON::Value &pushInfo){ JSON::Value delPush; - if (request.isString()){ - removeAllPush(request.asStringRef()); + if (pushInfo.isString()){ + removeAllPush(pushInfo.asStringRef()); return; } - if (request.isArray()){ - delPush = request; + if (pushInfo.isArray()){ + delPush = pushInfo; }else{ - delPush.append(request["stream"]); - delPush.append(request["target"]); + delPush.append(pushInfo["stream"]); + delPush.append(pushInfo["target"]); } JSON::Value newautopushes; jsonForEach(Controller::Storage["autopushes"], it){ @@ -358,8 +574,7 @@ namespace Controller{ Controller::Storage["autopushes"] = newautopushes; } - /// Removes a push from the list of auto-pushes. - /// Does not stop currently active matching pushes. + /// Removes all auto pushes of a given streamname void removeAllPush(const std::string &streamname){ JSON::Value newautopushes; jsonForEach(Controller::Storage["autopushes"], it){ @@ -371,15 +586,15 @@ namespace Controller{ /// Starts all configured auto pushes for the given stream. void doAutoPush(std::string &streamname){ jsonForEach(Controller::Storage["autopushes"], it){ - if (it->size() > 2 && (*it)[2u].asInt() < Util::epoch()){continue;} + if ((*it)[2u].asInt() && (*it)[2u].asInt() < Util::epoch()){continue;} const std::string &pStr = (*it)[0u].asStringRef(); if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ std::string stream = streamname; Util::sanitizeName(stream); + // Check variable condition if it exists + if((*it)[4u].asStringRef().size() && !checkPush(*it)){continue;} std::string target = (*it)[1u]; - if (!isPushActive(stream, target)){ - startPush(stream, target); - } + startPush(stream, target); } } } diff --git a/src/controller/controller_push.h b/src/controller/controller_push.h index 070e3e64..8c3f64e2 100644 --- a/src/controller/controller_push.h +++ b/src/controller/controller_push.h @@ -13,15 +13,19 @@ namespace Controller{ bool isPushActive(uint64_t id); // Functions for automated pushes, add/remove - void addPush(JSON::Value &request); - void removePush(const JSON::Value &request); + void addPush(JSON::Value &request, JSON::Value &response); + void removePush(const JSON::Value &request, JSON::Value &response); void removeAllPush(const std::string &streamname); // internal use only + void removePush(const JSON::Value &pushInfo); void doAutoPush(std::string &streamname); void pushCheckLoop(void *np); bool isPushActive(const std::string &streamname, const std::string &target); void stopActivePushes(const std::string &streamname, const std::string &target); + bool checkCondition(const JSON::Value ¤tValue, const uint8_t &comparisonOperator, const JSON::Value &matchedValue); + bool checkCondition(const std::string ¤tValue, const uint8_t &comparisonOperator, const std::string &matchedValue); + bool checkCondition(const int64_t ¤tValue, const uint8_t &comparisonOperator, const int64_t &matchedValue); // for storing/retrieving settings void pushSettings(const JSON::Value &request, JSON::Value &response); diff --git a/src/output/output.cpp b/src/output/output.cpp index 30099c07..83b217c0 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1005,7 +1005,8 @@ namespace Mist{ } } if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){ - uint64_t startRec = atoll(targetParams["recstart"].c_str()); + int64_t startRec = atoll(targetParams["recstart"].c_str()); + if (startRec < 0){startRec = 0;} if (startRec > endTime()){ if (!M.getLive()){ onFail("Recording start past end of non-live source", true); @@ -1014,7 +1015,7 @@ namespace Mist{ } if (startRec < startTime()){ startRec = startTime(); - WARN_MSG("Record begin at %llu ms not available, starting at %" PRIu64 + WARN_MSG("Record begin at %lld ms not available, starting at %" PRIu64 " ms instead", atoll(targetParams["recstart"].c_str()), startRec); targetParams["recstart"] = JSON::Value(startRec).asString(); }