Implemented auto-re-push behaviour.

This commit is contained in:
Thulinma 2016-05-11 17:34:00 +02:00
parent b9f6107528
commit 34df78ce0b
4 changed files with 86 additions and 9 deletions

View file

@ -56,6 +56,7 @@
#include "controller_uplink.h" #include "controller_uplink.h"
/*LTS-END*/ /*LTS-END*/
#include "controller_api.h" #include "controller_api.h"
#include "controller_push.h"
#ifndef COMPILED_USERNAME #ifndef COMPILED_USERNAME
#define COMPILED_USERNAME "" #define COMPILED_USERNAME ""
@ -318,6 +319,8 @@ int main(int argc, char ** argv){
tthread::thread monitorThread(statusMonitor, 0); tthread::thread monitorThread(statusMonitor, 0);
//start monitoring thread /*LTS*/ //start monitoring thread /*LTS*/
tthread::thread uplinkThread(Controller::uplinkConnection, 0);/*LTS*/ tthread::thread uplinkThread(Controller::uplinkConnection, 0);/*LTS*/
//start push checking thread
tthread::thread pushThread(Controller::pushCheckLoop, 0);
//start main loop //start main loop
while (Controller::conf.is_active){/*LTS*/ while (Controller::conf.is_active){/*LTS*/
@ -352,6 +355,7 @@ int main(int argc, char ** argv){
statsThread.join(); statsThread.join();
monitorThread.join(); monitorThread.join();
uplinkThread.join();/*LTS*/ uplinkThread.join();/*LTS*/
pushThread.join();/*LTS*/
//write config //write config
tthread::lock_guard<tthread::mutex> guard(Controller::logMutex); tthread::lock_guard<tthread::mutex> guard(Controller::logMutex);
Controller::Storage.removeMember("log"); Controller::Storage.removeMember("log");

View file

@ -581,7 +581,19 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
stream = Request["push_start"]["stream"].asStringRef(); stream = Request["push_start"]["stream"].asStringRef();
target = Request["push_start"]["target"].asStringRef(); target = Request["push_start"]["target"].asStringRef();
} }
Controller::startPush(stream, target); if (*stream.rbegin() != '+'){
Controller::startPush(stream, target);
}else{
if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){
if (jt->first.substr(0, stream.size()) == stream){
std::string streamname = jt->first;
std::string target_tmp = target;
startPush(streamname, target_tmp);
}
}
}
}
} }
if (Request.isMember("push_list")){ if (Request.isMember("push_list")){

View file

@ -6,6 +6,7 @@
#include <mist/stream.h> #include <mist/stream.h>
#include "controller_storage.h" #include "controller_storage.h"
#include "controller_statistics.h" #include "controller_statistics.h"
#include "controller_push.h"
namespace Controller { namespace Controller {
@ -13,28 +14,84 @@ namespace Controller {
std::map<pid_t, JSON::Value> activePushes; std::map<pid_t, JSON::Value> activePushes;
/// Internal list of waiting pushes /// Internal list of waiting pushes
std::deque<JSON::Value> waitingPushes; std::map<std::string, std::map<std::string, unsigned int> > waitingPushes;
/// Immediately starts a push for the given stream to the given target. /// Immediately starts a push for the given stream to the given target.
/// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map.
void startPush(std::string & stream, std::string & target){ void startPush(std::string & stream, std::string & target){
std::string originalTarget = target;
pid_t ret = Util::startPush(stream, target); pid_t ret = Util::startPush(stream, target);
if (ret){ if (ret){
JSON::Value push; JSON::Value push;
push.append((long long)ret); push.append((long long)ret);
push.append(stream); push.append(stream);
push.append(originalTarget);
push.append(target); push.append(target);
activePushes[ret] = push; activePushes[ret] = push;
} }
} }
/// Returns true if the push is currently active, false otherwise.
bool isPushActive(std::string & streamname, std::string & target){
std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
if (Util::Procs::isActive(it->first)){
if (it->second[1u].asStringRef() == streamname && it->second[2u].asStringRef() == target){
return true;
}
}else{
toWipe.insert(it->first);
}
}
while (toWipe.size()){
activePushes.erase(*toWipe.begin());
toWipe.erase(toWipe.begin());
}
return false;
}
/// Immediately stops a push with the given ID /// Immediately stops a push with the given ID
void stopPush(unsigned int ID){ void stopPush(unsigned int ID){
Util::Procs::Stop(ID); if (ID > 1 && activePushes.count(ID)){
Util::Procs::Stop(ID);
}
} }
/// Loops, checking every second if any pushes need restarting. /// Loops, checking every second if any pushes need restarting.
void pushCheckLoop(){ void pushCheckLoop(void * np){
while (Controller::conf.is_active){
//this scope prevents the configMutex from being locked constantly
{
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt();
long long waittime = Controller::Storage["push_settings"]["wait"].asInt();
long long curCount = 0;
if (waittime){
jsonForEach(Controller::Storage["autopushes"], it){
const std::string & pStr = (*it)[0u].asStringRef();
if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){
std::string streamname = jt->first;
std::string target = (*it)[1u];
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
if (!isPushActive(streamname, target)){
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
waitingPushes[streamname].erase(target);
if (!waitingPushes[streamname].size()){
waitingPushes.erase(streamname);
}
startPush(streamname, target);
curCount++;
}
}
}
}
}
}
}
}
Util::wait(1000);//wait at least 5 seconds
}
} }
/// Gives a list of all currently active pushes /// Gives a list of all currently active pushes
@ -82,7 +139,8 @@ namespace Controller {
void removePush(const JSON::Value & request){ void removePush(const JSON::Value & request){
JSON::Value delPush; JSON::Value delPush;
if (request.isString()){ if (request.isString()){
return removePush(request.asStringRef()); removeAllPush(request.asStringRef());
return;
} }
if (request.isArray()){ if (request.isArray()){
delPush = request; delPush = request;
@ -101,7 +159,7 @@ namespace Controller {
/// Removes a push from the list of auto-pushes. /// Removes a push from the list of auto-pushes.
/// Does not stop currently active matching pushes. /// Does not stop currently active matching pushes.
void removePush(const std::string & streamname){ void removeAllPush(const std::string & streamname){
JSON::Value newautopushes; JSON::Value newautopushes;
jsonForEach(Controller::Storage["autopushes"], it){ jsonForEach(Controller::Storage["autopushes"], it){
if ((*it)[0u] != streamname){ if ((*it)[0u] != streamname){
@ -118,7 +176,9 @@ namespace Controller {
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
std::string stream = streamname; std::string stream = streamname;
std::string target = (*it)[1u]; std::string target = (*it)[1u];
startPush(stream, target); if (!isPushActive(stream, target)){
startPush(stream, target);
}
} }
} }
} }

View file

@ -12,11 +12,12 @@ namespace Controller {
//Functions for automated pushes, add/remove //Functions for automated pushes, add/remove
void addPush(JSON::Value & request); void addPush(JSON::Value & request);
void removePush(const JSON::Value & request); void removePush(const JSON::Value & request);
void removePush(const std::string & streamname); void removeAllPush(const std::string & streamname);
//internal use only //internal use only
void doAutoPush(std::string & streamname); void doAutoPush(std::string & streamname);
void pushCheckLoop(); void pushCheckLoop(void * np);
bool isPushActive(std::string & streamname, std::string & target);
//for storing/retrieving settings //for storing/retrieving settings
void pushSettings(const JSON::Value & request, JSON::Value & response); void pushSettings(const JSON::Value & request, JSON::Value & response);