diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index 18f17ff5..a14b9f98 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -59,6 +59,28 @@ namespace Controller{ return false; } + /// Stops any pushes matching the stream name (pattern) and target + void stopActivePushes(const std::string &streamname, const std::string &target){ + while (Controller::conf.is_active && !pushListRead){ + Util::sleep(100); + } + std::set toWipe; + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + if (Util::Procs::isActive(it->first)){ + if (it->second[2u].asStringRef() == target && (it->second[1u].asStringRef() == streamname || (*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){ + Util::Procs::Stop(it->first); + } + }else{ + toWipe.insert(it->first); + } + } + while (toWipe.size()){ + activePushes.erase(*toWipe.begin()); + mustWritePushList = true; + toWipe.erase(toWipe.begin()); + } + } + /// Immediately stops a push with the given ID void stopPush(unsigned int ID){ if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);} @@ -125,8 +147,29 @@ namespace Controller{ long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt(); long long waittime = Controller::Storage["push_settings"]["wait"].asInt(); long long curCount = 0; - if (waittime){ - jsonForEach(Controller::Storage["autopushes"], it){ + jsonForEach(Controller::Storage["autopushes"], it){ + if (it->size() > 3 && (*it)[3u].asInt() < Util::epoch()){ + INFO_MSG("Deleting autopush from %s to %s because end time passed", (*it)[0u].asStringRef().c_str(), (*it)[1u].asStringRef().c_str()); + stopActivePushes((*it)[0u], (*it)[1u]); + removePush(*it); + break; + } + if (it->size() > 2 && *((*it)[0u].asStringRef().rbegin()) != '+'){ + if ((*it)[2u].asInt() <= Util::epoch()){ + std::string streamname = (*it)[0u]; + std::string target = (*it)[1u]; + if (!isPushActive(streamname, target)){ + if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ + waitingPushes[streamname].erase(target); + if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} + startPush(streamname, target); + curCount++; + } + } + } + continue; + } + if (waittime || it->size() > 2){ const std::string &pStr = (*it)[0u].asStringRef(); if (activeStreams.size()){ for (std::map::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ @@ -145,6 +188,10 @@ namespace Controller{ } } } + if (it->size() == 3){ + removePush(*it); + break; + } } if (mustWritePushList && pushPage.mapped){ writePushList(pushPage.mapped); @@ -190,16 +237,46 @@ namespace Controller{ }else{ newPush.append(request["stream"]); newPush.append(request["target"]); + bool startTime = false; + if (request.isMember("scheduletime") && request["scheduletime"].isInt()){ + newPush.append(request["scheduletime"]); + startTime = true; + } + if (request.isMember("completetime") && request["completetime"].isInt()){ + if (!startTime){newPush.append(0ll);} + newPush.append(request["completetime"]); + } } - Controller::Storage["autopushes"].append(newPush); - if (activeStreams.size()){ - const std::string &pStr = newPush[0u].asStringRef(); - std::string target = newPush[1u].asStringRef(); - for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ - std::string streamname = it->first; - if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ - - startPush(streamname, target); + long long epo = Util::epoch(); + if (newPush.size() > 3 && newPush[3u].asInt() <= epo){ + WARN_MSG("Automatic push not added: removal time is in the past! (%lld <= %lld)", newPush[3u].asInt(), Util::epoch()); + return; + } + bool edited = false; + jsonForEach(Controller::Storage["autopushes"], it){ + if ((*it)[0u] == newPush[0u] && (*it)[1u] == newPush[1u]){ + (*it) = newPush; + edited = true; + } + } + if (!edited && (newPush.size() != 3 || newPush[2u].asInt() > epo)){ + Controller::Storage["autopushes"].append(newPush); + } + if (newPush.size() < 3 || newPush[2u].asInt() <= epo){ + if (newPush.size() > 2 && *(newPush[0u].asStringRef().rbegin()) != '+'){ + std::string streamname = newPush[0u].asStringRef(); + std::string target = newPush[1u].asStringRef(); + startPush(streamname, target); + return; + } + if (activeStreams.size()){ + const std::string &pStr = newPush[0u].asStringRef(); + std::string target = newPush[1u].asStringRef(); + for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ + std::string streamname = it->first; + if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ + startPush(streamname, target); + } } } } @@ -239,6 +316,7 @@ namespace Controller{ /// Starts all configured auto pushes for the given stream. void doAutoPush(std::string &streamname){ jsonForEach(Controller::Storage["autopushes"], it){ + if (it->size() > 2 || (*it)[2u].asInt() < Util::epoch()){continue;} const std::string &pStr = (*it)[0u].asStringRef(); if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ std::string stream = streamname; diff --git a/src/controller/controller_push.h b/src/controller/controller_push.h index 8b71f839..18eb3782 100644 --- a/src/controller/controller_push.h +++ b/src/controller/controller_push.h @@ -18,6 +18,7 @@ namespace Controller{ void doAutoPush(std::string &streamname); void pushCheckLoop(void *np); bool isPushActive(const std::string &streamname, const std::string &target); + void stopActivePushes(const std::string &streamname, const std::string &target); // for storing/retrieving settings void pushSettings(const JSON::Value &request, JSON::Value &response);