From 9cc2f75a8e3aced34b7dfac7c3f3586a081eb5ac Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 15 Jan 2017 23:42:56 +0100 Subject: [PATCH] Added load balancer input --- CMakeLists.txt | 1 + lib/stream.cpp | 5 +++ src/input/input_balancer.cpp | 86 ++++++++++++++++++++++++++++++++++++ src/input/input_balancer.h | 16 +++++++ 4 files changed, 108 insertions(+) create mode 100644 src/input/input_balancer.cpp create mode 100644 src/input/input_balancer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 0aadfee6..359295a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -354,6 +354,7 @@ makeInput(ISMV ismv)#LTS makeInput(MP4 mp4)#LTS makeInput(TS ts)#LTS makeInput(Folder folder)#LTS +makeInput(Balancer balancer)#LTS ######################################## # MistServer - Outputs # diff --git a/lib/stream.cpp b/lib/stream.cpp index f7ed3d04..e711a17b 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -286,6 +286,11 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir //finally, unlock the config semaphore configLock.post(); + if (isProvider){ + //Set environment variable so we can know if we have a provider when re-exec'ing. + setenv("MISTPROVIDER", "1", 1); + } + INFO_MSG("Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()}; int argNum = 3; diff --git a/src/input/input_balancer.cpp b/src/input/input_balancer.cpp new file mode 100644 index 00000000..710bb477 --- /dev/null +++ b/src/input/input_balancer.cpp @@ -0,0 +1,86 @@ +#include +#include +#include +#include +#include "input_balancer.h" + +namespace Mist { + inputBalancer::inputBalancer(Util::Config * cfg) : Input(cfg) { + capa["name"] = "Balancer"; + capa["desc"] = "Load balancer input, re-starts itself as the input a load balancer tells it it should be."; + capa["source_match"] = "balance:*"; + capa["priority"] = 9ll; + capa["morphic"] = 1ll; + } + + int inputBalancer::boot(int argc, char * argv[]){ + if (!config->parseArgs(argc, argv)){return 1;} + if (config->getBool("json")){return Input::boot(argc, argv);} + + streamName = config->getString("streamname"); + + std::string blncr = config->getString("input"); + if (blncr.substr(0, 8) != "balance:"){ + FAIL_MSG("Input must start with \"balance:\""); + return 1; + } + + HTTP::URL url(blncr.substr(8)); + if (url.protocol != "http"){ + FAIL_MSG("Load balancer protocol %s is not supported", url.protocol.c_str()); + return 1; + } + + std::string source; //empty by default + + //Parse fallback from URL arguments, if possible. + if (url.args.size()){ + std::map args; + HTTP::parseVars(url.args, args); + if (args.count("fallback")){source = args.at("fallback");} + } + + + Socket::Connection balConn(url.host, url.getPort(), true); + if (!balConn){ + WARN_MSG("Failed to reach %s on port %lu", url.host.c_str(), url.getPort()); + }else{ + HTTP::Parser http; + http.url = "/" + url.path + "?source=" + Encodings::URL::encode(streamName); + if (source.size()){ + http.url += "&fallback=" + Encodings::URL::encode(source); + } + http.method = "GET"; + http.SetHeader("Host", url.host); + http.SetHeader("X-MistServer", PACKAGE_VERSION); + balConn.SendNow(http.BuildRequest()); + http.Clean(); + + unsigned int startTime = Util::epoch(); + while ((Util::epoch() - startTime < 10) && (balConn || balConn.Received().size())){ + if (balConn.spool() || balConn.Received().size()){ + if (http.Read(balConn.Received().get())){ + source = http.body; + startTime = 0;//note success + break;//break out of while loop + } + } + } + if (startTime){ + FAIL_MSG("Timeout while trying to contact load balancer at %s!", blncr.c_str()+8); + } + balConn.close(); + } + + if (!source.size()){ + FAIL_MSG("Could not determine source to use for %s", streamName.c_str()); + return 1; + } + + //Attempt to boot the source we got + Util::startInput(streamName, source, false, getenv("MISTPROVIDER")); + return 1; + } + +} + diff --git a/src/input/input_balancer.h b/src/input/input_balancer.h new file mode 100644 index 00000000..a048b6e1 --- /dev/null +++ b/src/input/input_balancer.h @@ -0,0 +1,16 @@ +#include "input.h" +#include + +namespace Mist { + class inputBalancer : public Input { + public: + inputBalancer(Util::Config * cfg); + int boot(int argc, char * argv[]); + protected: + bool setup(){return false;}; + bool readHeader(){return false;}; + }; +} + +typedef Mist::inputBalancer mistIn; +