Added support for scheduled pushes
This commit is contained in:
parent
014eed32e8
commit
fefbfe2375
2 changed files with 90 additions and 11 deletions
|
@ -59,6 +59,28 @@ namespace Controller{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stops any pushes matching the stream name (pattern) and target
|
||||||
|
void stopActivePushes(const std::string &streamname, const std::string &target){
|
||||||
|
while (Controller::conf.is_active && !pushListRead){
|
||||||
|
Util::sleep(100);
|
||||||
|
}
|
||||||
|
std::set<pid_t> toWipe;
|
||||||
|
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
|
||||||
|
if (Util::Procs::isActive(it->first)){
|
||||||
|
if (it->second[2u].asStringRef() == target && (it->second[1u].asStringRef() == streamname || (*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){
|
||||||
|
Util::Procs::Stop(it->first);
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
toWipe.insert(it->first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (toWipe.size()){
|
||||||
|
activePushes.erase(*toWipe.begin());
|
||||||
|
mustWritePushList = true;
|
||||||
|
toWipe.erase(toWipe.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Immediately stops a push with the given ID
|
/// Immediately stops a push with the given ID
|
||||||
void stopPush(unsigned int ID){
|
void stopPush(unsigned int ID){
|
||||||
if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);}
|
if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);}
|
||||||
|
@ -125,8 +147,29 @@ namespace Controller{
|
||||||
long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt();
|
long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt();
|
||||||
long long waittime = Controller::Storage["push_settings"]["wait"].asInt();
|
long long waittime = Controller::Storage["push_settings"]["wait"].asInt();
|
||||||
long long curCount = 0;
|
long long curCount = 0;
|
||||||
if (waittime){
|
jsonForEach(Controller::Storage["autopushes"], it){
|
||||||
jsonForEach(Controller::Storage["autopushes"], it){
|
if (it->size() > 3 && (*it)[3u].asInt() < Util::epoch()){
|
||||||
|
INFO_MSG("Deleting autopush from %s to %s because end time passed", (*it)[0u].asStringRef().c_str(), (*it)[1u].asStringRef().c_str());
|
||||||
|
stopActivePushes((*it)[0u], (*it)[1u]);
|
||||||
|
removePush(*it);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (it->size() > 2 && *((*it)[0u].asStringRef().rbegin()) != '+'){
|
||||||
|
if ((*it)[2u].asInt() <= Util::epoch()){
|
||||||
|
std::string streamname = (*it)[0u];
|
||||||
|
std::string target = (*it)[1u];
|
||||||
|
if (!isPushActive(streamname, target)){
|
||||||
|
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
|
||||||
|
waitingPushes[streamname].erase(target);
|
||||||
|
if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);}
|
||||||
|
startPush(streamname, target);
|
||||||
|
curCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (waittime || it->size() > 2){
|
||||||
const std::string &pStr = (*it)[0u].asStringRef();
|
const std::string &pStr = (*it)[0u].asStringRef();
|
||||||
if (activeStreams.size()){
|
if (activeStreams.size()){
|
||||||
for (std::map<std::string, uint8_t>::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){
|
for (std::map<std::string, uint8_t>::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){
|
||||||
|
@ -145,6 +188,10 @@ namespace Controller{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (it->size() == 3){
|
||||||
|
removePush(*it);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (mustWritePushList && pushPage.mapped){
|
if (mustWritePushList && pushPage.mapped){
|
||||||
writePushList(pushPage.mapped);
|
writePushList(pushPage.mapped);
|
||||||
|
@ -190,16 +237,46 @@ namespace Controller{
|
||||||
}else{
|
}else{
|
||||||
newPush.append(request["stream"]);
|
newPush.append(request["stream"]);
|
||||||
newPush.append(request["target"]);
|
newPush.append(request["target"]);
|
||||||
|
bool startTime = false;
|
||||||
|
if (request.isMember("scheduletime") && request["scheduletime"].isInt()){
|
||||||
|
newPush.append(request["scheduletime"]);
|
||||||
|
startTime = true;
|
||||||
|
}
|
||||||
|
if (request.isMember("completetime") && request["completetime"].isInt()){
|
||||||
|
if (!startTime){newPush.append(0ll);}
|
||||||
|
newPush.append(request["completetime"]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Controller::Storage["autopushes"].append(newPush);
|
long long epo = Util::epoch();
|
||||||
if (activeStreams.size()){
|
if (newPush.size() > 3 && newPush[3u].asInt() <= epo){
|
||||||
const std::string &pStr = newPush[0u].asStringRef();
|
WARN_MSG("Automatic push not added: removal time is in the past! (%lld <= %lld)", newPush[3u].asInt(), Util::epoch());
|
||||||
std::string target = newPush[1u].asStringRef();
|
return;
|
||||||
for (std::map<std::string, uint8_t>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
|
}
|
||||||
std::string streamname = it->first;
|
bool edited = false;
|
||||||
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
jsonForEach(Controller::Storage["autopushes"], it){
|
||||||
|
if ((*it)[0u] == newPush[0u] && (*it)[1u] == newPush[1u]){
|
||||||
startPush(streamname, target);
|
(*it) = newPush;
|
||||||
|
edited = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!edited && (newPush.size() != 3 || newPush[2u].asInt() > epo)){
|
||||||
|
Controller::Storage["autopushes"].append(newPush);
|
||||||
|
}
|
||||||
|
if (newPush.size() < 3 || newPush[2u].asInt() <= epo){
|
||||||
|
if (newPush.size() > 2 && *(newPush[0u].asStringRef().rbegin()) != '+'){
|
||||||
|
std::string streamname = newPush[0u].asStringRef();
|
||||||
|
std::string target = newPush[1u].asStringRef();
|
||||||
|
startPush(streamname, target);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (activeStreams.size()){
|
||||||
|
const std::string &pStr = newPush[0u].asStringRef();
|
||||||
|
std::string target = newPush[1u].asStringRef();
|
||||||
|
for (std::map<std::string, uint8_t>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
|
||||||
|
std::string streamname = it->first;
|
||||||
|
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
||||||
|
startPush(streamname, target);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -239,6 +316,7 @@ namespace Controller{
|
||||||
/// Starts all configured auto pushes for the given stream.
|
/// Starts all configured auto pushes for the given stream.
|
||||||
void doAutoPush(std::string &streamname){
|
void doAutoPush(std::string &streamname){
|
||||||
jsonForEach(Controller::Storage["autopushes"], it){
|
jsonForEach(Controller::Storage["autopushes"], it){
|
||||||
|
if (it->size() > 2 || (*it)[2u].asInt() < Util::epoch()){continue;}
|
||||||
const std::string &pStr = (*it)[0u].asStringRef();
|
const std::string &pStr = (*it)[0u].asStringRef();
|
||||||
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
|
||||||
std::string stream = streamname;
|
std::string stream = streamname;
|
||||||
|
|
|
@ -18,6 +18,7 @@ namespace Controller{
|
||||||
void doAutoPush(std::string &streamname);
|
void doAutoPush(std::string &streamname);
|
||||||
void pushCheckLoop(void *np);
|
void pushCheckLoop(void *np);
|
||||||
bool isPushActive(const std::string &streamname, const std::string &target);
|
bool isPushActive(const std::string &streamname, const std::string &target);
|
||||||
|
void stopActivePushes(const std::string &streamname, const std::string &target);
|
||||||
|
|
||||||
// for storing/retrieving settings
|
// for storing/retrieving settings
|
||||||
void pushSettings(const JSON::Value &request, JSON::Value &response);
|
void pushSettings(const JSON::Value &request, JSON::Value &response);
|
||||||
|
|
Loading…
Add table
Reference in a new issue