From a247095c6d293daaeca31a88857b47622c1fa7c1 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 May 2016 18:07:03 +0200 Subject: [PATCH] Load balancer working. --- CMakeLists.txt | 5 + src/analysers/load_analyser.cpp | 301 ++++++++++++++++++++++++++++++++ 2 files changed, 306 insertions(+) create mode 100644 src/analysers/load_analyser.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a214c5d..e10467cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -267,6 +267,11 @@ makeAnalyser(TS ts) #LTS makeAnalyser(TSStream tsstream) #LTS makeAnalyser(Stats stats) #LTS + +if (DEFINED LOAD_BALANCE ) + makeAnalyser(Load load) #LTS +endif() + ######################################## # MistServer - Inputs # ######################################## diff --git a/src/analysers/load_analyser.cpp b/src/analysers/load_analyser.cpp new file mode 100644 index 00000000..bc010c96 --- /dev/null +++ b/src/analysers/load_analyser.cpp @@ -0,0 +1,301 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +Util::Config * cfg = 0; +std::string passphrase; + +struct streamDetails{ + unsigned int total; + unsigned int bandwidth; + unsigned long long prevTotal; +}; + +class hostDetails{ + private: + tthread::mutex * hostMutex; + std::map streams; + unsigned int cpu; + unsigned int ramMax; + unsigned int ramCurr; + unsigned int upSpeed; + unsigned int downSpeed; + unsigned int total; + unsigned long long upPrev; + unsigned long long downPrev; + unsigned long long prevTime; + unsigned long long addBandwidth; + public: + unsigned long long availBandwidth; + hostDetails(){ + hostMutex = 0; + cpu = 1000; + ramMax = 0; + ramCurr = 0; + upSpeed = 0; + downSpeed = 0; + upPrev = 0; + downPrev = 0; + prevTime = 0; + addBandwidth = 0; + availBandwidth = 128 * 1024 * 1024;//assume 1G connections + } + ~hostDetails(){ + if (hostMutex){ + delete hostMutex; + hostMutex = 0; + } + } + void badNess(){ + if (!hostMutex){hostMutex = new tthread::mutex();} + tthread::lock_guard guard(*hostMutex); + addBandwidth += 1 * 1024 * 1024; + addBandwidth *= 1.2; + } + ///Scores a potential new connection to this server, on a scale from 0 to 3200. + ///0 is horrible, 3200 is perfect. + unsigned int rate(std::string & s){ + if (!hostMutex){hostMutex = new tthread::mutex();} + tthread::lock_guard guard(*hostMutex); + unsigned int score = 0; + if (!ramMax){ + return 0; + } + //First, add current CPU/RAM left to the score, on a scale from 0 to 1000. + score += (1000 - cpu) + (1000 - ((ramCurr * 1000) / ramMax)); + //Next, we add 200 points if the stream is already available. + if (streams.count(s)){score += 200;} + //Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect. + score += (1000 - (((upSpeed + addBandwidth) * 1000) / availBandwidth)); + MEDIUM_MSG("Scores: CPU %u, RAM %u, Stream %u, BW %u (-%u) (%lluMB/s avail)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024); + return score; + } + void addViewer(std::string & s){ + if (!hostMutex){hostMutex = new tthread::mutex();} + tthread::lock_guard guard(*hostMutex); + if (streams.count(s)){ + addBandwidth += streams[s].bandwidth; + }else{ + if (total){ + addBandwidth += (upSpeed + downSpeed) / total; + }else{ + addBandwidth += (upSpeed + downSpeed) + 100000; + } + } + } + void update(JSON::Value & d){ + if (!hostMutex){hostMutex = new tthread::mutex();} + tthread::lock_guard guard(*hostMutex); + cpu = d["cpu"].asInt(); + ramMax = d["mem_total"].asInt(); + ramCurr = d["mem_used"].asInt(); + total = d["sess_current"].asInt(); + unsigned long long currUp = d["upload"].asInt(), currDown = d["download"].asInt(); + unsigned int timeDiff = 0; + if (prevTime){ + timeDiff = time(0) - prevTime; + if (timeDiff){ + upSpeed = (currUp - upPrev) / timeDiff; + downSpeed = (currDown - downPrev) / timeDiff; + } + } + prevTime = time(0); + upPrev = currUp; + downPrev = currDown; + + if (d.isMember("streams") && d["streams"].size()){ + jsonForEach(d["streams"], it){ + unsigned int count =(*it)["sess_current"].asInt(); + if (!count){ + if (streams.count(it.key())){ + streams.erase(it.key()); + } + continue; + } + struct streamDetails & strm = streams[it.key()]; + strm.total = count; + unsigned long long currTotal = (*it)["download"].asInt() + (*it)["upload"].asInt(); + if (timeDiff && count){ + strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count; + }else{ + if (total){ + strm.bandwidth = (upSpeed + downSpeed) / total; + }else{ + strm.bandwidth = (upSpeed + downSpeed) + 100000; + } + } + strm.prevTotal = currTotal; + } + } + addBandwidth *= 0.9; + } +}; + +std::map hosts; + + +int handleRequest(Socket::Connection & conn){ + HTTP::Parser H; + while (conn){ + if ((conn.spool() || conn.Received().size()) && H.Read(conn)){ + std::string stream = H.url.substr(1); + INFO_MSG("Balancing stream %s", stream.c_str()); + H.Clean(); + H.SetHeader("Content-Type", "text/plain"); + std::string bestHost = ""; + unsigned int bestScore = 0; + for (std::map::iterator it = hosts.begin(); it != hosts.end(); ++it){ + unsigned int score = it->second.rate(stream); + if (score > bestScore){ + bestHost = it->first; + bestScore = score; + } + INFO_MSG("%s scores %u", it->first.c_str(), score); + } + INFO_MSG("Winner: %s scores %u", bestHost.c_str(), bestScore); + hosts[bestHost].addViewer(stream); + H.SetBody(bestHost); + H.SendResponse("200", "OK", conn); + H.Clean(); + }//if HTTP request received + } + conn.close(); +} + +void handleServer(void * servName){ + std::string & name = *(std::string*)servName; + + HTTP::Parser H; + std::string host; + int port = 4242; + JSON::Value bandwidth = 128 * 1024 * 1024ll;//assume 1G connection + + size_t slash = name.find('/'); + if (slash != std::string::npos){ + bandwidth = name.substr(slash+1, std::string::npos); + bandwidth = bandwidth.asInt() * 1024 * 1024; + name = name.substr(0, slash); + } + + size_t colon = name.find(':'); + if (colon != std::string::npos && colon != 0 && colon != name.size()) { + host = name.substr(0, colon); + port = atoi(name.substr(colon + 1, std::string::npos).c_str()); + }else{ + host = name; + } + + hosts[host].availBandwidth = bandwidth.asInt(); + + INFO_MSG("Monitoring %s on port %d.", host.c_str(), port, passphrase.c_str()); + + Socket::Connection servConn(host, port, false); + while (cfg->is_active){ + if (!servConn){ + INFO_MSG("Reconnecting to %s", host.c_str()); + servConn = Socket::Connection(host, port, false); + } + if (!servConn){ + INFO_MSG("Can't reach server %s", host.c_str()); + hosts[host].badNess(); + Util::wait(5000); + continue; + } + + //retrieve update information + H.url = "/" + passphrase + ".json"; + H.method = "GET"; + H.SendRequest(servConn); + H.Clean(); + unsigned int startTime = Util::epoch(); + while (servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ + Util::sleep(100); + } + JSON::Value servData = JSON::fromString(H.body); + if (!servData){ + INFO_MSG("Can't retrieve server %s load information", host.c_str()); + hosts[host].badNess(); + servConn.close(); + }else{ + hosts[host].update(servData); + } + H.Clean(); + Util::wait(5000); + } + servConn.close(); +} + +int main(int argc, char ** argv){ + Util::Config conf(argv[0]); + cfg = &conf; + + JSON::Value opt; + opt["arg"] = "string"; + opt["short"] = "s"; + opt["long"] = "server"; + opt["help"] = "Address of a server to balance. Hostname or IP, optionally followed by API port."; + conf.addOption("server", opt); + + opt["arg"] = "integer"; + opt["short"] = "p"; + opt["long"] = "port"; + opt["help"] = "TCP port to listen on"; + opt["value"].append((long long)8042); + conf.addOption("port", opt); + + opt["arg"] = "string"; + opt["short"] = "P"; + opt["long"] = "passphrase"; + opt["help"] = "Passphrase (prometheus option value) to use for data retrieval."; + opt["value"][0u] = "koekjes"; + conf.addOption("passphrase", opt); + + opt["arg"] = "string"; + opt["short"] = "i"; + opt["long"] = "interface"; + opt["help"] = "Network interface to listen on"; + opt["value"][0u] = "0.0.0.0"; + conf.addOption("interface", opt); + + opt["arg"] = "string"; + opt["short"] = "u"; + opt["long"] = "username"; + opt["help"] = "Username to drop privileges to"; + opt["value"][0u] = "root"; + conf.addOption("username", opt); + + conf.parseArgs(argc, argv); + + passphrase = conf.getOption("passphrase").asStringRef(); + + JSON::Value & nodes = conf.getOption("server", true); + INFO_MSG("Load balancer activating. Balancing between %llu nodes.", nodes.size()); + conf.activate(); + + std::map threads; + jsonForEach(nodes, it){ + threads[it->asStringRef()] = new tthread::thread(handleServer, (void*)&(it->asStringRef())); + } + + conf.serveThreadedSocket(handleRequest); + if (!conf.is_active){ + INFO_MSG("Load balancer shutting down; received shutdown signal"); + }else{ + INFO_MSG("Load balancer shutting down; socket problem"); + } + + if (threads.size()){ + for (std::map::iterator it = threads.begin(); it != threads.end(); ++it){ + it->second->join(); + } + } + +} +