From 34df78ce0b564e28d912e8c2fbe58223f5c53e83 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 11 May 2016 17:34:00 +0200 Subject: [PATCH] Implemented auto-re-push behaviour. --- src/controller/controller.cpp | 4 ++ src/controller/controller_api.cpp | 14 +++++- src/controller/controller_push.cpp | 72 +++++++++++++++++++++++++++--- src/controller/controller_push.h | 5 ++- 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 90e413f4..3cd60a8b 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -56,6 +56,7 @@ #include "controller_uplink.h" /*LTS-END*/ #include "controller_api.h" +#include "controller_push.h" #ifndef COMPILED_USERNAME #define COMPILED_USERNAME "" @@ -318,6 +319,8 @@ int main(int argc, char ** argv){ tthread::thread monitorThread(statusMonitor, 0); //start monitoring thread /*LTS*/ tthread::thread uplinkThread(Controller::uplinkConnection, 0);/*LTS*/ + //start push checking thread + tthread::thread pushThread(Controller::pushCheckLoop, 0); //start main loop while (Controller::conf.is_active){/*LTS*/ @@ -352,6 +355,7 @@ int main(int argc, char ** argv){ statsThread.join(); monitorThread.join(); uplinkThread.join();/*LTS*/ + pushThread.join();/*LTS*/ //write config tthread::lock_guard guard(Controller::logMutex); Controller::Storage.removeMember("log"); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 0df116d6..7ece9a50 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -581,7 +581,19 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ stream = Request["push_start"]["stream"].asStringRef(); target = Request["push_start"]["target"].asStringRef(); } - Controller::startPush(stream, target); + if (*stream.rbegin() != '+'){ + Controller::startPush(stream, target); + }else{ + if (activeStreams.size()){ + for (std::map::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ + if (jt->first.substr(0, stream.size()) == stream){ + std::string streamname = jt->first; + std::string target_tmp = target; + startPush(streamname, target_tmp); + } + } + } + } } if (Request.isMember("push_list")){ diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index 09a30d8b..655a8d5b 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -6,6 +6,7 @@ #include #include "controller_storage.h" #include "controller_statistics.h" +#include "controller_push.h" namespace Controller { @@ -13,28 +14,84 @@ namespace Controller { std::map activePushes; /// Internal list of waiting pushes - std::deque waitingPushes; + std::map > waitingPushes; /// Immediately starts a push for the given stream to the given target. /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. void startPush(std::string & stream, std::string & target){ + std::string originalTarget = target; pid_t ret = Util::startPush(stream, target); if (ret){ JSON::Value push; push.append((long long)ret); push.append(stream); + push.append(originalTarget); push.append(target); activePushes[ret] = push; } } + /// Returns true if the push is currently active, false otherwise. + bool isPushActive(std::string & streamname, std::string & target){ + std::set toWipe; + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + if (Util::Procs::isActive(it->first)){ + if (it->second[1u].asStringRef() == streamname && it->second[2u].asStringRef() == target){ + return true; + } + }else{ + toWipe.insert(it->first); + } + } + while (toWipe.size()){ + activePushes.erase(*toWipe.begin()); + toWipe.erase(toWipe.begin()); + } + return false; + } + /// Immediately stops a push with the given ID void stopPush(unsigned int ID){ - Util::Procs::Stop(ID); + if (ID > 1 && activePushes.count(ID)){ + Util::Procs::Stop(ID); + } } /// Loops, checking every second if any pushes need restarting. - void pushCheckLoop(){ + void pushCheckLoop(void * np){ + while (Controller::conf.is_active){ + //this scope prevents the configMutex from being locked constantly + { + tthread::lock_guard guard(Controller::configMutex); + long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt(); + long long waittime = Controller::Storage["push_settings"]["wait"].asInt(); + long long curCount = 0; + if (waittime){ + jsonForEach(Controller::Storage["autopushes"], it){ + const std::string & pStr = (*it)[0u].asStringRef(); + if (activeStreams.size()){ + for (std::map::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ + std::string streamname = jt->first; + std::string target = (*it)[1u]; + if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ + if (!isPushActive(streamname, target)){ + if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ + waitingPushes[streamname].erase(target); + if (!waitingPushes[streamname].size()){ + waitingPushes.erase(streamname); + } + startPush(streamname, target); + curCount++; + } + } + } + } + } + } + } + } + Util::wait(1000);//wait at least 5 seconds + } } /// Gives a list of all currently active pushes @@ -82,7 +139,8 @@ namespace Controller { void removePush(const JSON::Value & request){ JSON::Value delPush; if (request.isString()){ - return removePush(request.asStringRef()); + removeAllPush(request.asStringRef()); + return; } if (request.isArray()){ delPush = request; @@ -101,7 +159,7 @@ namespace Controller { /// Removes a push from the list of auto-pushes. /// Does not stop currently active matching pushes. - void removePush(const std::string & streamname){ + void removeAllPush(const std::string & streamname){ JSON::Value newautopushes; jsonForEach(Controller::Storage["autopushes"], it){ if ((*it)[0u] != streamname){ @@ -118,7 +176,9 @@ namespace Controller { if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ std::string stream = streamname; std::string target = (*it)[1u]; - startPush(stream, target); + if (!isPushActive(stream, target)){ + startPush(stream, target); + } } } } diff --git a/src/controller/controller_push.h b/src/controller/controller_push.h index 8b1e0c14..d53b8ad7 100644 --- a/src/controller/controller_push.h +++ b/src/controller/controller_push.h @@ -12,11 +12,12 @@ namespace Controller { //Functions for automated pushes, add/remove void addPush(JSON::Value & request); void removePush(const JSON::Value & request); - void removePush(const std::string & streamname); + void removeAllPush(const std::string & streamname); //internal use only void doAutoPush(std::string & streamname); - void pushCheckLoop(); + void pushCheckLoop(void * np); + bool isPushActive(std::string & streamname, std::string & target); //for storing/retrieving settings void pushSettings(const JSON::Value & request, JSON::Value & response);