Added load balancer input

This commit is contained in:
Thulinma 2017-01-15 23:42:56 +01:00
parent f5faa61ed4
commit 9cc2f75a8e
4 changed files with 108 additions and 0 deletions

View file

@ -354,6 +354,7 @@ makeInput(ISMV ismv)#LTS
makeInput(MP4 mp4)#LTS
makeInput(TS ts)#LTS
makeInput(Folder folder)#LTS
makeInput(Balancer balancer)#LTS
########################################
# MistServer - Outputs #

View file

@ -286,6 +286,11 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
//finally, unlock the config semaphore
configLock.post();
if (isProvider){
//Set environment variable so we can know if we have a provider when re-exec'ing.
setenv("MISTPROVIDER", "1", 1);
}
INFO_MSG("Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()};
int argNum = 3;

View file

@ -0,0 +1,86 @@
#include <mist/defines.h>
#include <mist/stream.h>
#include <mist/http_parser.h>
#include <mist/encode.h>
#include "input_balancer.h"
namespace Mist {
inputBalancer::inputBalancer(Util::Config * cfg) : Input(cfg) {
capa["name"] = "Balancer";
capa["desc"] = "Load balancer input, re-starts itself as the input a load balancer tells it it should be.";
capa["source_match"] = "balance:*";
capa["priority"] = 9ll;
capa["morphic"] = 1ll;
}
int inputBalancer::boot(int argc, char * argv[]){
if (!config->parseArgs(argc, argv)){return 1;}
if (config->getBool("json")){return Input::boot(argc, argv);}
streamName = config->getString("streamname");
std::string blncr = config->getString("input");
if (blncr.substr(0, 8) != "balance:"){
FAIL_MSG("Input must start with \"balance:\"");
return 1;
}
HTTP::URL url(blncr.substr(8));
if (url.protocol != "http"){
FAIL_MSG("Load balancer protocol %s is not supported", url.protocol.c_str());
return 1;
}
std::string source; //empty by default
//Parse fallback from URL arguments, if possible.
if (url.args.size()){
std::map<std::string, std::string> args;
HTTP::parseVars(url.args, args);
if (args.count("fallback")){source = args.at("fallback");}
}
Socket::Connection balConn(url.host, url.getPort(), true);
if (!balConn){
WARN_MSG("Failed to reach %s on port %lu", url.host.c_str(), url.getPort());
}else{
HTTP::Parser http;
http.url = "/" + url.path + "?source=" + Encodings::URL::encode(streamName);
if (source.size()){
http.url += "&fallback=" + Encodings::URL::encode(source);
}
http.method = "GET";
http.SetHeader("Host", url.host);
http.SetHeader("X-MistServer", PACKAGE_VERSION);
balConn.SendNow(http.BuildRequest());
http.Clean();
unsigned int startTime = Util::epoch();
while ((Util::epoch() - startTime < 10) && (balConn || balConn.Received().size())){
if (balConn.spool() || balConn.Received().size()){
if (http.Read(balConn.Received().get())){
source = http.body;
startTime = 0;//note success
break;//break out of while loop
}
}
}
if (startTime){
FAIL_MSG("Timeout while trying to contact load balancer at %s!", blncr.c_str()+8);
}
balConn.close();
}
if (!source.size()){
FAIL_MSG("Could not determine source to use for %s", streamName.c_str());
return 1;
}
//Attempt to boot the source we got
Util::startInput(streamName, source, false, getenv("MISTPROVIDER"));
return 1;
}
}

View file

@ -0,0 +1,16 @@
#include "input.h"
#include <mist/dtsc.h>
namespace Mist {
class inputBalancer : public Input {
public:
inputBalancer(Util::Config * cfg);
int boot(int argc, char * argv[]);
protected:
bool setup(){return false;};
bool readHeader(){return false;};
};
}
typedef Mist::inputBalancer mistIn;