Scheduled Pushes
Change-Id: I3d3a75f4a45d96e6a7849b14634caedd0899ac69
This commit is contained in:
parent
c66d236e58
commit
ea25d4b74b
6 changed files with 306 additions and 82 deletions
|
@ -274,10 +274,10 @@ std::string Util::Procs::getOutputOf(char *const *argv, uint64_t maxWait){
|
|||
/// \param maxWait amount of milliseconds to wait before shutting down the spawned process
|
||||
/// \param maxValBytes amount of Bytes allowed in the output before shutting down the spawned process
|
||||
std::string Util::Procs::getLimitedOutputOf(char *const *argv, uint64_t maxWait, uint32_t maxValBytes){
|
||||
int fin = 0, fout = -1, ferr = 0;
|
||||
int fout = -1;
|
||||
uint64_t waitedFor = 0;
|
||||
uint8_t tries = 0;
|
||||
pid_t myProc = StartPiped(argv, &fin, &fout, &ferr);
|
||||
pid_t myProc = StartPiped(argv, NULL, &fout, NULL);
|
||||
Socket::Connection O(-1, fout);
|
||||
O.setBlocking(false);
|
||||
Util::ResizeablePointer ret;
|
||||
|
@ -311,9 +311,9 @@ std::string Util::Procs::getLimitedOutputOf(char *const *argv, uint64_t maxWait,
|
|||
break;
|
||||
}
|
||||
}
|
||||
O.close();
|
||||
// Stop the process if it is still running
|
||||
if (childRunning(myProc)){
|
||||
close(fout);
|
||||
Stop(myProc);
|
||||
waitedFor = 0;
|
||||
}
|
||||
|
@ -469,11 +469,11 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in
|
|||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
||||
uSock.SendNow(j.toString());
|
||||
std::cout << getenv("MIST_TRIG_DEF");
|
||||
exit(42);
|
||||
_exit(42);
|
||||
}
|
||||
/*LTS-END*/
|
||||
ERROR_MSG("execvp failed for process %s, reason: %s", argv[0], strerror(errno));
|
||||
exit(42);
|
||||
_exit(42);
|
||||
}else if (pid == -1){
|
||||
ERROR_MSG("fork failed for process %s, reason: %s", argv[0], strerror(errno));
|
||||
if (fdin && *fdin == -1){
|
||||
|
|
|
@ -291,7 +291,11 @@ int main_loop(int argc, char **argv){
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set default delay before retry
|
||||
if (!Controller::Storage.isMember("push_settings")){
|
||||
Controller::Storage["push_settings"]["wait"] = 3;
|
||||
Controller::Storage["push_settings"]["maxspeed"] = 0;
|
||||
}
|
||||
if (Controller::conf.getOption("debug", true).size() > 1){
|
||||
Controller::Storage["config"]["debug"] = Controller::conf.getInteger("debug");
|
||||
}
|
||||
|
|
|
@ -1181,13 +1181,13 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
|
|||
}
|
||||
}
|
||||
|
||||
if (Request.isMember("push_auto_add")){Controller::addPush(Request["push_auto_add"]);}
|
||||
if (Request.isMember("push_auto_add")){Controller::addPush(Request["push_auto_add"], Response["push_list"]);}
|
||||
|
||||
if (Request.isMember("push_auto_remove")){
|
||||
if (Request["push_auto_remove"].isArray()){
|
||||
jsonForEach(Request["push_auto_remove"], it){Controller::removePush(*it);}
|
||||
jsonForEach(Request["push_auto_remove"], it){Controller::removePush(*it, Response["push_list"]);}
|
||||
}else{
|
||||
Controller::removePush(Request["push_auto_remove"]);
|
||||
Controller::removePush(Request["push_auto_remove"], Response["push_list"]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -169,6 +169,123 @@ namespace Controller{
|
|||
}
|
||||
}
|
||||
|
||||
/// \brief Evaluates <value of currentVariable> <operator> <matchedValue>
|
||||
/// Will apply numerical comparison if passed a numerical matchedValue
|
||||
// and apply lexical comparison if passed a nonnumerical matchedValue
|
||||
/// \param operator can be:
|
||||
/// 0: boolean true
|
||||
/// 1: boolean false
|
||||
/// 2: ==
|
||||
/// 3: !=
|
||||
/// 10: > (numerical comparison)
|
||||
/// 11: >= (numerical comparison)
|
||||
/// 12: < (numerical comparison)
|
||||
/// 13: <= (numerical comparison)
|
||||
/// 20 > (lexical comparison)
|
||||
/// 21: >= (lexical comparison)
|
||||
/// 22: < (lexical comparison)
|
||||
/// 23: <= (lexical comparison)
|
||||
bool checkCondition(const JSON::Value ¤tValue, const uint8_t &comparisonOperator, const JSON::Value &matchedValue){
|
||||
std::string currentValueAsString = currentValue.asStringRef();
|
||||
if (comparisonOperator == 0){
|
||||
return Util::stringToBool(currentValueAsString);
|
||||
}else if (comparisonOperator == 1){
|
||||
return !Util::stringToBool(currentValueAsString);
|
||||
}else if (comparisonOperator == 2){
|
||||
return currentValue == matchedValue;
|
||||
} else if (comparisonOperator == 3){
|
||||
return currentValue != matchedValue;
|
||||
}else if (comparisonOperator >= 10 && comparisonOperator < 20){
|
||||
return checkCondition(currentValue.asInt(), comparisonOperator, matchedValue.asInt());
|
||||
}else{
|
||||
return checkCondition(currentValueAsString, comparisonOperator, matchedValue.asStringRef());
|
||||
}
|
||||
}
|
||||
bool checkCondition(const int64_t ¤tValue, const uint8_t &comparisonOperator, const int64_t &matchedValue){
|
||||
switch (comparisonOperator){
|
||||
case 10:
|
||||
if (currentValue > matchedValue){return true;}
|
||||
break;
|
||||
case 11:
|
||||
if (currentValue >= matchedValue){return true;}
|
||||
break;
|
||||
case 12:
|
||||
if (currentValue < matchedValue){return true;}
|
||||
break;
|
||||
case 13:
|
||||
if (currentValue <= matchedValue){return true;}
|
||||
break;
|
||||
default:
|
||||
ERROR_MSG("Passed invalid comparison operator of type %u", comparisonOperator);
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
bool checkCondition(const std::string ¤tValue, const uint8_t &comparisonOperator,const std::string &matchedValue){
|
||||
int lexCmpResult = strcmp(currentValue.c_str(), matchedValue.c_str());
|
||||
switch (comparisonOperator){
|
||||
case 20:
|
||||
if (lexCmpResult > 0){return true;}
|
||||
break;
|
||||
case 21:
|
||||
if (lexCmpResult >= 0){return true;}
|
||||
break;
|
||||
case 22:
|
||||
if (lexCmpResult < 0){return true;}
|
||||
break;
|
||||
case 23:
|
||||
if (lexCmpResult <= 0){return true;}
|
||||
break;
|
||||
default:
|
||||
ERROR_MSG("Passed invalid comparison operator of type %u", comparisonOperator);
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// \brief Returns true if a push should be active, false if it shouldn't be active
|
||||
bool checkPush(JSON::Value &thisPush){
|
||||
uint64_t startTime = thisPush[2u].asInt();
|
||||
std::string startVariableName = thisPush[4u].asString();
|
||||
std::string endVariableName = thisPush[7u].asString();
|
||||
// Get sanitized stream name
|
||||
std::string stream = thisPush[0u].asString();
|
||||
Util::sanitizeName(stream);
|
||||
// Skip if we have a start time which is in the future
|
||||
if (startTime && *(stream.rbegin()) != '+' && startTime > Util::epoch()){return false;}
|
||||
// Check if it supposed to stop
|
||||
if (endVariableName.size()){
|
||||
// Get current value of configured variable
|
||||
std::string currentValue = "$" + endVariableName;
|
||||
if (!Util::streamVariables(currentValue, stream)){
|
||||
WARN_MSG("Could not find a variable with name `%s`", endVariableName.c_str());
|
||||
return false;
|
||||
}
|
||||
// Get matched value and apply variable substitution
|
||||
std::string replacedMatchedValue = thisPush[9u].asString();
|
||||
if (replacedMatchedValue.size()){Util::streamVariables(replacedMatchedValue, stream);}
|
||||
JSON::Value matchedValue(replacedMatchedValue);
|
||||
// Finally indicate that the push should not be active if the end condition resolves to true
|
||||
if(checkCondition(JSON::Value(currentValue), thisPush[8u].asInt(), matchedValue)){return false;}
|
||||
}
|
||||
// Check if it is allowed to start
|
||||
if (startVariableName.size()){
|
||||
// Get current value of configured variable
|
||||
std::string currentValue = "$" + startVariableName;
|
||||
if (!Util::streamVariables(currentValue, stream)){
|
||||
WARN_MSG("Could not find a variable with name `%s`", startVariableName.c_str());
|
||||
return false;
|
||||
}
|
||||
// Get matched value and apply variable substitution
|
||||
std::string replacedMatchedValue = thisPush[6u].asString();
|
||||
if (replacedMatchedValue.size()){Util::streamVariables(replacedMatchedValue, stream);}
|
||||
JSON::Value matchedValue(replacedMatchedValue);
|
||||
// Finally indicate that the push should not be active if the end condition resolves to true
|
||||
return checkCondition(JSON::Value(currentValue), thisPush[5u].asInt(), matchedValue);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Loops, checking every second if any pushes need restarting.
|
||||
void pushCheckLoop(void *np){
|
||||
{
|
||||
|
@ -188,53 +305,53 @@ namespace Controller{
|
|||
long long waittime = Controller::Storage["push_settings"]["wait"].asInt();
|
||||
long long curCount = 0;
|
||||
jsonForEach(Controller::Storage["autopushes"], it){
|
||||
if (it->size() > 3 && (*it)[3u].asInt() < Util::epoch()){
|
||||
INFO_MSG("Deleting autopush from %s to %s because end time passed",
|
||||
(*it)[0u].asStringRef().c_str(), (*it)[1u].asStringRef().c_str());
|
||||
stopActivePushes((*it)[0u], (*it)[1u]);
|
||||
std::string stream = (*it)[0u].asStringRef();
|
||||
std::string target = (*it)[1u].asStringRef();
|
||||
uint64_t startTime = (*it)[2u].asInt();
|
||||
uint64_t endTime = (*it)[3u].asInt();
|
||||
// Stop any auto pushes which have an elapsed end time
|
||||
if (endTime && endTime < Util::epoch()){
|
||||
INFO_MSG("Deleting autopush from %s to %s because end time passed", stream.c_str(), target.c_str());
|
||||
stopActivePushes(stream, target);
|
||||
removePush(*it);
|
||||
break;
|
||||
}
|
||||
if (it->size() > 2 && *((*it)[0u].asStringRef().rbegin()) != '+'){
|
||||
if ((*it)[2u].asInt() <= Util::epoch()){
|
||||
std::string streamname = (*it)[0u];
|
||||
std::string target = (*it)[1u];
|
||||
if (!isPushActive(streamname, target)){
|
||||
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
|
||||
waitingPushes[streamname].erase(target);
|
||||
if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);}
|
||||
startPush(streamname, target);
|
||||
curCount++;
|
||||
}
|
||||
}
|
||||
// Stop any active push if conditions are not met
|
||||
if (!checkPush(*it)){
|
||||
if (isPushActive(stream, target)){
|
||||
MEDIUM_MSG("Conditions of push `%s->%s` evaluate to false. Stopping push...", stream.c_str(), target.c_str());
|
||||
stopActivePushes(stream, target);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (waittime || it->size() > 2){
|
||||
const std::string &pStr = (*it)[0u].asStringRef();
|
||||
std::set<std::string> activeStreams = Controller::getActiveStreams(pStr);
|
||||
// We can continue if it is already running
|
||||
if (isPushActive(stream, target)){continue;}
|
||||
// Start the push if conditions are met
|
||||
if (waittime || startTime){
|
||||
std::set<std::string> activeStreams = Controller::getActiveStreams(stream);
|
||||
if (activeStreams.size()){
|
||||
for (std::set<std::string>::iterator jt = activeStreams.begin();
|
||||
jt != activeStreams.end(); ++jt){
|
||||
jt != activeStreams.end(); ++jt){
|
||||
std::string streamname = *jt;
|
||||
std::string target = (*it)[1u];
|
||||
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
||||
if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){
|
||||
if (!isPushActive(streamname, target)){
|
||||
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
|
||||
waitingPushes[streamname].erase(target);
|
||||
if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);}
|
||||
MEDIUM_MSG("Conditions of push `%s->%s` evaluate to true. Starting push...", stream.c_str(), target.c_str());
|
||||
startPush(streamname, target);
|
||||
curCount++;
|
||||
// If no end time is given but there is a start time, remove the push after starting it
|
||||
if (startTime && !endTime){
|
||||
removePush(*it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (it->size() == 3){
|
||||
removePush(*it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
//Check if any pushes have ended, clean them up
|
||||
std::set<pid_t> toWipe;
|
||||
|
@ -283,73 +400,172 @@ namespace Controller{
|
|||
|
||||
/// Adds a push to the list of auto-pushes.
|
||||
/// Auto-starts currently active matches immediately.
|
||||
void addPush(JSON::Value &request){
|
||||
void addPush(JSON::Value &request, JSON::Value &response){
|
||||
JSON::Value newPush;
|
||||
if (request.isArray()){
|
||||
newPush = request;
|
||||
}else{
|
||||
if (!request.isMember("stream") || !request["stream"].isString()){
|
||||
ERROR_MSG("Automatic push not added: it does not contain a valid stream name");
|
||||
return;
|
||||
}
|
||||
newPush.append(request["stream"]);
|
||||
if (!request.isMember("target") || !request["target"].isString()){
|
||||
ERROR_MSG("Automatic push not added: it does not contain a valid target");
|
||||
return;
|
||||
}
|
||||
newPush.append(request["target"]);
|
||||
bool startTime = false;
|
||||
if (request.isMember("scheduletime") && request["scheduletime"].isInt()){
|
||||
newPush.append(request["scheduletime"]);
|
||||
startTime = true;
|
||||
}else{
|
||||
newPush.append(0u);
|
||||
}
|
||||
if (request.isMember("completetime") && request["completetime"].isInt()){
|
||||
if (!startTime){newPush.append(0u);}
|
||||
newPush.append(request["completetime"]);
|
||||
}else{
|
||||
newPush.append(0u);
|
||||
}
|
||||
if (request.isMember("startVariableName")){
|
||||
newPush.append(request["startVariableName"]);
|
||||
}else{
|
||||
newPush.append("");
|
||||
}
|
||||
if (request.isMember("startVariableOperator")){
|
||||
newPush.append(request["startVariableOperator"]);
|
||||
}else{
|
||||
newPush.append(0);
|
||||
}
|
||||
if (request.isMember("startVariableValue")){
|
||||
newPush.append(request["startVariableValue"]);
|
||||
}else{
|
||||
newPush.append("");
|
||||
}
|
||||
if (request.isMember("endVariableName")){
|
||||
newPush.append(request["endVariableName"]);
|
||||
}else{
|
||||
newPush.append("");
|
||||
}
|
||||
if (request.isMember("endVariableOperator")){
|
||||
newPush.append(request["endVariableOperator"]);
|
||||
}else{
|
||||
newPush.append(0);
|
||||
}
|
||||
if (request.isMember("endVariableValue")){
|
||||
newPush.append(request["endVariableValue"]);
|
||||
}else{
|
||||
newPush.append("");
|
||||
}
|
||||
}
|
||||
long long epo = Util::epoch();
|
||||
if (newPush.size() > 3 && newPush[3u].asInt() <= epo){
|
||||
WARN_MSG("Automatic push not added: removal time is in the past! (%" PRId64 " <= %" PRIu64 ")",
|
||||
newPush[3u].asInt(), Util::epoch());
|
||||
if (request.size() < 2){
|
||||
ERROR_MSG("Automatic push not added: should contain at least a stream name and target");
|
||||
return;
|
||||
}
|
||||
bool edited = false;
|
||||
jsonForEach(Controller::Storage["autopushes"], it){
|
||||
if ((*it)[0u] == newPush[0u] && (*it)[1u] == newPush[1u]){
|
||||
(*it) = newPush;
|
||||
edited = true;
|
||||
// Init optional fields if they were omitted from the addPush request
|
||||
// We only have a stream and target, so fill in the scheduletime and completetime
|
||||
while(newPush.size() < 4){newPush.append(0u);}
|
||||
// The request seems to be using variables and likely skipped the scheduletime and completetime set to 0
|
||||
if (newPush[2].isString()){
|
||||
JSON::Value modPush;
|
||||
modPush.append(newPush[0u]);
|
||||
modPush.append(newPush[1u]);
|
||||
modPush.append(0u);
|
||||
modPush.append(0u);
|
||||
for (uint8_t idx = 2; idx < newPush.size(); idx++){
|
||||
modPush.append(newPush[idx]);
|
||||
}
|
||||
newPush = modPush;
|
||||
}
|
||||
if (!edited && (newPush.size() != 3 || newPush[2u].asInt() > epo)){
|
||||
Controller::Storage["autopushes"].append(newPush);
|
||||
}
|
||||
if (newPush.size() < 3 || newPush[2u].asInt() <= epo){
|
||||
if (newPush.size() > 2 && *(newPush[0u].asStringRef().rbegin()) != '+'){
|
||||
std::string streamname = newPush[0u].asStringRef();
|
||||
std::string target = newPush[1u].asStringRef();
|
||||
startPush(streamname, target);
|
||||
// Variable conditions are used. We should have either 7 (only start variable condition) or 10 values (start + stop variable conditions)
|
||||
if (newPush.size() > 4){
|
||||
if (newPush.size() == 7){
|
||||
newPush.append("");
|
||||
newPush.append(0u);
|
||||
newPush.append("");
|
||||
} else if (newPush.size() != 10){
|
||||
ERROR_MSG("Automatic push not added: passed incomplete data for the start or stop variable");
|
||||
return;
|
||||
}
|
||||
const std::string &pStr = newPush[0u].asStringRef();
|
||||
std::set<std::string> activeStreams = Controller::getActiveStreams(pStr);
|
||||
if (activeStreams.size()){
|
||||
std::string target = newPush[1u].asStringRef();
|
||||
for (std::set<std::string>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
|
||||
std::string streamname = *it;
|
||||
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
||||
startPush(streamname, target);
|
||||
}
|
||||
}else{
|
||||
// Init the start and stop variable conditions
|
||||
newPush.append("");
|
||||
newPush.append(0u);
|
||||
newPush.append("");
|
||||
newPush.append("");
|
||||
newPush.append(0u);
|
||||
newPush.append("");
|
||||
}
|
||||
// Make sure all start variable values have been initialised
|
||||
if (newPush.size() == 7 && (!newPush[5u].isString() || !newPush[6u].isInt() || !newPush[7u].isString()));
|
||||
// Make sure all stop variable values have been initialised
|
||||
if (newPush.size() == 10 && (!newPush[8u].isString() || !newPush[9u].isInt() || !newPush[10u].isString()));
|
||||
// Final sanity checks on input
|
||||
std::string stream = newPush[0u].asStringRef();
|
||||
std::string target = newPush[1u].asStringRef();
|
||||
uint64_t startTime = newPush[2u].asInt();
|
||||
uint64_t endTime = newPush[3u].asInt();
|
||||
if (endTime && endTime <= epo){
|
||||
ERROR_MSG("Automatic push not added: removal time is in the past! (%" PRIu64 " <= %lld)", endTime, epo);
|
||||
return;
|
||||
}
|
||||
|
||||
// If we have an existing push: edit it
|
||||
bool shouldSave = true;
|
||||
jsonForEach(Controller::Storage["autopushes"], it){
|
||||
if ((*it)[0u] == stream && (*it)[1u] == target){
|
||||
(*it) = newPush;
|
||||
shouldSave = false;
|
||||
}
|
||||
}
|
||||
// If a newly added push only has a defined start time, immediately start it and never save it
|
||||
if (startTime && !endTime){
|
||||
INFO_MSG("Immediately starting push %s->%s as the added push only has a defined start time"
|
||||
, stream.c_str(), target.c_str());
|
||||
startPush(stream, target);
|
||||
// Return push list
|
||||
response["push_auto_list"] = Controller::Storage["autopushes"];
|
||||
return;
|
||||
}
|
||||
// Save as a new variable if we have not edited an existing variable
|
||||
if (shouldSave){
|
||||
Controller::Storage["autopushes"].append(newPush);
|
||||
}
|
||||
// and start it immediately if conditions are met
|
||||
if (!checkPush(newPush)){return;}
|
||||
std::set<std::string> activeStreams = Controller::getActiveStreams(stream);
|
||||
if (activeStreams.size()){
|
||||
for (std::set<std::string>::iterator jt = activeStreams.begin();
|
||||
jt != activeStreams.end(); ++jt){
|
||||
std::string streamname = *jt;
|
||||
if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){
|
||||
startPush(streamname, target);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return push list
|
||||
response["push_auto_list"] = Controller::Storage["autopushes"];
|
||||
}
|
||||
|
||||
/// Removes a push from the list of auto-pushes.
|
||||
/// Removes a push from the list of auto-pushes and returns the new list of pushes
|
||||
/// Does not stop currently active matching pushes.
|
||||
void removePush(const JSON::Value &request){
|
||||
void removePush(const JSON::Value &request, JSON::Value &response){
|
||||
removePush(request);
|
||||
// Return push list
|
||||
response["push_auto_list"] = Controller::Storage["autopushes"];
|
||||
}
|
||||
|
||||
/// Removes a push from the list of auto-pushes
|
||||
void removePush(const JSON::Value &pushInfo){
|
||||
JSON::Value delPush;
|
||||
if (request.isString()){
|
||||
removeAllPush(request.asStringRef());
|
||||
if (pushInfo.isString()){
|
||||
removeAllPush(pushInfo.asStringRef());
|
||||
return;
|
||||
}
|
||||
if (request.isArray()){
|
||||
delPush = request;
|
||||
if (pushInfo.isArray()){
|
||||
delPush = pushInfo;
|
||||
}else{
|
||||
delPush.append(request["stream"]);
|
||||
delPush.append(request["target"]);
|
||||
delPush.append(pushInfo["stream"]);
|
||||
delPush.append(pushInfo["target"]);
|
||||
}
|
||||
JSON::Value newautopushes;
|
||||
jsonForEach(Controller::Storage["autopushes"], it){
|
||||
|
@ -358,8 +574,7 @@ namespace Controller{
|
|||
Controller::Storage["autopushes"] = newautopushes;
|
||||
}
|
||||
|
||||
/// Removes a push from the list of auto-pushes.
|
||||
/// Does not stop currently active matching pushes.
|
||||
/// Removes all auto pushes of a given streamname
|
||||
void removeAllPush(const std::string &streamname){
|
||||
JSON::Value newautopushes;
|
||||
jsonForEach(Controller::Storage["autopushes"], it){
|
||||
|
@ -371,15 +586,15 @@ namespace Controller{
|
|||
/// Starts all configured auto pushes for the given stream.
|
||||
void doAutoPush(std::string &streamname){
|
||||
jsonForEach(Controller::Storage["autopushes"], it){
|
||||
if (it->size() > 2 && (*it)[2u].asInt() < Util::epoch()){continue;}
|
||||
if ((*it)[2u].asInt() && (*it)[2u].asInt() < Util::epoch()){continue;}
|
||||
const std::string &pStr = (*it)[0u].asStringRef();
|
||||
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
||||
std::string stream = streamname;
|
||||
Util::sanitizeName(stream);
|
||||
// Check variable condition if it exists
|
||||
if((*it)[4u].asStringRef().size() && !checkPush(*it)){continue;}
|
||||
std::string target = (*it)[1u];
|
||||
if (!isPushActive(stream, target)){
|
||||
startPush(stream, target);
|
||||
}
|
||||
startPush(stream, target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,15 +13,19 @@ namespace Controller{
|
|||
bool isPushActive(uint64_t id);
|
||||
|
||||
// Functions for automated pushes, add/remove
|
||||
void addPush(JSON::Value &request);
|
||||
void removePush(const JSON::Value &request);
|
||||
void addPush(JSON::Value &request, JSON::Value &response);
|
||||
void removePush(const JSON::Value &request, JSON::Value &response);
|
||||
void removeAllPush(const std::string &streamname);
|
||||
|
||||
// internal use only
|
||||
void removePush(const JSON::Value &pushInfo);
|
||||
void doAutoPush(std::string &streamname);
|
||||
void pushCheckLoop(void *np);
|
||||
bool isPushActive(const std::string &streamname, const std::string &target);
|
||||
void stopActivePushes(const std::string &streamname, const std::string &target);
|
||||
bool checkCondition(const JSON::Value ¤tValue, const uint8_t &comparisonOperator, const JSON::Value &matchedValue);
|
||||
bool checkCondition(const std::string ¤tValue, const uint8_t &comparisonOperator, const std::string &matchedValue);
|
||||
bool checkCondition(const int64_t ¤tValue, const uint8_t &comparisonOperator, const int64_t &matchedValue);
|
||||
|
||||
// for storing/retrieving settings
|
||||
void pushSettings(const JSON::Value &request, JSON::Value &response);
|
||||
|
|
|
@ -1005,7 +1005,8 @@ namespace Mist{
|
|||
}
|
||||
}
|
||||
if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){
|
||||
uint64_t startRec = atoll(targetParams["recstart"].c_str());
|
||||
int64_t startRec = atoll(targetParams["recstart"].c_str());
|
||||
if (startRec < 0){startRec = 0;}
|
||||
if (startRec > endTime()){
|
||||
if (!M.getLive()){
|
||||
onFail("Recording start past end of non-live source", true);
|
||||
|
@ -1014,7 +1015,7 @@ namespace Mist{
|
|||
}
|
||||
if (startRec < startTime()){
|
||||
startRec = startTime();
|
||||
WARN_MSG("Record begin at %llu ms not available, starting at %" PRIu64
|
||||
WARN_MSG("Record begin at %lld ms not available, starting at %" PRIu64
|
||||
" ms instead", atoll(targetParams["recstart"].c_str()), startRec);
|
||||
targetParams["recstart"] = JSON::Value(startRec).asString();
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue