Added duplicate checking for manual pushes

This commit is contained in:
Thulinma 2016-11-17 16:40:27 +01:00
parent acbdede296
commit 6254c285d6
5 changed files with 58 additions and 77 deletions

View file

@ -325,14 +325,10 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
}
/// Attempt to start a push for streamname to target.
/// Both streamname and target may be changed by this function:
/// - streamname is sanitized to a permissible streamname
/// - target gets variables replaced and may be altered by the PUSH_OUT_START trigger response.
/// streamname MUST be pre-sanitized
/// target gets variables replaced and may be altered by the PUSH_OUT_START trigger response.
/// Attempts to match the altered target to an output that can push to it.
pid_t Util::startPush(std::string & streamname, std::string & target) {
sanitizeName(streamname);
pid_t Util::startPush(const std::string & streamname, std::string & target) {
if (Triggers::shouldTrigger("PUSH_OUT_START", streamname)) {
std::string payload = streamname+"\n"+target;
std::string filepath_response;

View file

@ -11,7 +11,7 @@ namespace Util {
void sanitizeName(std::string & streamname);
bool streamAlive(std::string & streamname);
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true);
int startPush(std::string & streamname, std::string & target);
int startPush(const std::string & streamname, std::string & target);
JSON::Value getStreamConfig(std::string streamname);
}

View file

@ -2,6 +2,7 @@
#include <sys/stat.h> //for browse API call
#include <mist/http_parser.h>
#include <mist/auth.h>
#include <mist/stream.h>
#include <mist/config.h>
#include <mist/defines.h>
#include <mist/timing.h>
@ -607,8 +608,9 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
stream = Request["push_start"]["stream"].asStringRef();
target = Request["push_start"]["target"].asStringRef();
}
Util::sanitizeName(stream);
if (*stream.rbegin() != '+'){
Controller::startPush(stream, target);
startPush(stream, target);
}else{
if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){

View file

@ -1,24 +1,26 @@
#include <string>
#include <mist/json.h>
#include "controller_push.h"
#include "controller_statistics.h"
#include "controller_storage.h"
#include <mist/config.h>
#include <mist/tinythread.h>
#include <mist/json.h>
#include <mist/procs.h>
#include <mist/stream.h>
#include "controller_storage.h"
#include "controller_statistics.h"
#include "controller_push.h"
#include <mist/tinythread.h>
#include <string>
namespace Controller {
namespace Controller{
/// Internal list of currently active pushes
std::map<pid_t, JSON::Value> activePushes;
/// Internal list of waiting pushes
std::map<std::string, std::map<std::string, unsigned int> > waitingPushes;
std::map<std::string, std::map<std::string, unsigned int>> waitingPushes;
/// Immediately starts a push for the given stream to the given target.
/// Simply calls Util::startPush and stores the resulting PID in the local activePushes map.
void startPush(std::string & stream, std::string & target){
void startPush(const std::string &stream, std::string &target){
//Cancel if already active
if (isPushActive(stream, target)){return;}
std::string originalTarget = target;
pid_t ret = Util::startPush(stream, target);
if (ret){
@ -32,13 +34,11 @@ namespace Controller {
}
/// Returns true if the push is currently active, false otherwise.
bool isPushActive(std::string & streamname, std::string & target){
bool isPushActive(const std::string &streamname, const std::string &target){
std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
if (Util::Procs::isActive(it->first)){
if (it->second[1u].asStringRef() == streamname && it->second[2u].asStringRef() == target){
return true;
}
if (it->second[1u].asStringRef() == streamname && it->second[2u].asStringRef() == target){return true;}
}else{
toWipe.insert(it->first);
}
@ -52,15 +52,13 @@ namespace Controller {
/// Immediately stops a push with the given ID
void stopPush(unsigned int ID){
if (ID > 1 && activePushes.count(ID)){
Util::Procs::Stop(ID);
}
if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);}
}
/// Loops, checking every second if any pushes need restarting.
void pushCheckLoop(void * np){
void pushCheckLoop(void *np){
while (Controller::conf.is_active){
//this scope prevents the configMutex from being locked constantly
// this scope prevents the configMutex from being locked constantly
{
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt();
@ -68,7 +66,7 @@ namespace Controller {
long long curCount = 0;
if (waittime){
jsonForEach(Controller::Storage["autopushes"], it){
const std::string & pStr = (*it)[0u].asStringRef();
const std::string &pStr = (*it)[0u].asStringRef();
if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){
std::string streamname = jt->first;
@ -77,9 +75,7 @@ namespace Controller {
if (!isPushActive(streamname, target)){
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
waitingPushes[streamname].erase(target);
if (!waitingPushes[streamname].size()){
waitingPushes.erase(streamname);
}
if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);}
startPush(streamname, target);
curCount++;
}
@ -90,12 +86,12 @@ namespace Controller {
}
}
}
Util::wait(1000);//wait at least 5 seconds
Util::wait(1000); // wait at least a second
}
}
/// Gives a list of all currently active pushes
void listPush(JSON::Value & output){
void listPush(JSON::Value &output){
output.null();
std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
@ -113,7 +109,7 @@ namespace Controller {
/// Adds a push to the list of auto-pushes.
/// Auto-starts currently active matches immediately.
void addPush(JSON::Value & request){
void addPush(JSON::Value &request){
JSON::Value newPush;
if (request.isArray()){
newPush = request;
@ -123,23 +119,21 @@ namespace Controller {
}
Controller::Storage["autopushes"].append(newPush);
if (activeStreams.size()){
const std::string & pStr = newPush[0u].asStringRef();
const std::string &pStr = newPush[0u].asStringRef();
std::string target = newPush[1u].asStringRef();
for (std::map<std::string, unsigned int>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
std::string streamname = it->first;
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
if (!isPushActive(streamname, target)){
startPush(streamname, target);
}
}
}
}
}
/// Removes a push from the list of auto-pushes.
/// Does not stop currently active matching pushes.
void removePush(const JSON::Value & request){
void removePush(const JSON::Value &request){
JSON::Value delPush;
if (request.isString()){
removeAllPush(request.asStringRef());
@ -153,51 +147,40 @@ namespace Controller {
}
JSON::Value newautopushes;
jsonForEach(Controller::Storage["autopushes"], it){
if ((*it) != delPush){
newautopushes.append(*it);
}
if ((*it) != delPush){newautopushes.append(*it);}
}
Controller::Storage["autopushes"] = newautopushes;
}
/// Removes a push from the list of auto-pushes.
/// Does not stop currently active matching pushes.
void removeAllPush(const std::string & streamname){
void removeAllPush(const std::string &streamname){
JSON::Value newautopushes;
jsonForEach(Controller::Storage["autopushes"], it){
if ((*it)[0u] != streamname){
newautopushes.append(*it);
}
if ((*it)[0u] != streamname){newautopushes.append(*it);}
}
Controller::Storage["autopushes"] = newautopushes;
}
/// Starts all configured auto pushes for the given stream.
void doAutoPush(std::string & streamname){
void doAutoPush(std::string &streamname){
jsonForEach(Controller::Storage["autopushes"], it){
const std::string & pStr = (*it)[0u].asStringRef();
const std::string &pStr = (*it)[0u].asStringRef();
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
std::string stream = streamname;
Util::sanitizeName(stream);
std::string target = (*it)[1u];
if (!isPushActive(stream, target)){
startPush(stream, target);
}
}
}
}
void pushSettings(const JSON::Value & request, JSON::Value & response){
void pushSettings(const JSON::Value &request, JSON::Value &response){
if (request.isObject()){
if (request.isMember("wait")){
Controller::Storage["push_settings"]["wait"] = request["wait"].asInt();
}
if (request.isMember("maxspeed")){
Controller::Storage["push_settings"]["maxspeed"] = request["maxspeed"].asInt();
}
if (request.isMember("wait")){Controller::Storage["push_settings"]["wait"] = request["wait"].asInt();}
if (request.isMember("maxspeed")){Controller::Storage["push_settings"]["maxspeed"] = request["maxspeed"].asInt();}
}
response = Controller::Storage["push_settings"];
}
}

View file

@ -1,25 +1,25 @@
#include <string>
#include <mist/json.h>
#include <mist/config.h>
#include <mist/json.h>
#include <mist/tinythread.h>
#include <string>
namespace Controller {
//Functions for current pushes, start/stop/list
void startPush(std::string & streamname, std::string & target);
namespace Controller{
// Functions for current pushes, start/stop/list
void startPush(const std::string &streamname, std::string &target);
void stopPush(unsigned int ID);
void listPush(JSON::Value & output);
void listPush(JSON::Value &output);
//Functions for automated pushes, add/remove
void addPush(JSON::Value & request);
void removePush(const JSON::Value & request);
void removeAllPush(const std::string & streamname);
// Functions for automated pushes, add/remove
void addPush(JSON::Value &request);
void removePush(const JSON::Value &request);
void removeAllPush(const std::string &streamname);
//internal use only
void doAutoPush(std::string & streamname);
void pushCheckLoop(void * np);
bool isPushActive(std::string & streamname, std::string & target);
// internal use only
void doAutoPush(std::string &streamname);
void pushCheckLoop(void *np);
bool isPushActive(const std::string &streamname, const std::string &target);
//for storing/retrieving settings
void pushSettings(const JSON::Value & request, JSON::Value & response);
// for storing/retrieving settings
void pushSettings(const JSON::Value &request, JSON::Value &response);
}