From cbfb3d62bf6409abce7e3aed6fddb04f48236efc Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 3 Sep 2016 19:01:59 +0200 Subject: [PATCH] Added variable weights to the load balancer, improved timeout behaviour and verbosity --- src/analysers/load_analyser.cpp | 95 ++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 24 deletions(-) diff --git a/src/analysers/load_analyser.cpp b/src/analysers/load_analyser.cpp index c732dd8b..4a6d9038 100644 --- a/src/analysers/load_analyser.cpp +++ b/src/analysers/load_analyser.cpp @@ -11,6 +11,10 @@ Util::Config * cfg = 0; std::string passphrase; +unsigned int weight_cpu = 500; +unsigned int weight_ram = 500; +unsigned int weight_bw = 1000; +unsigned int weight_bonus = 50; struct streamDetails{ unsigned int total; @@ -33,6 +37,7 @@ class hostDetails{ unsigned long long prevTime; unsigned long long addBandwidth; public: + std::string host; unsigned long long availBandwidth; hostDetails(){ hostMutex = 0; @@ -77,9 +82,15 @@ class hostDetails{ r["cpu"] = (long long)(cpu/10); if (ramMax){r["ram"] = (long long)((ramCurr*100) / ramMax);} r["up"] = (long long)upSpeed; + r["up_add"] = (long long)addBandwidth; r["down"] = (long long)downSpeed; r["streams"] = (long long)streams.size(); r["viewers"] = (long long)total; + if (ramMax && availBandwidth){ + r["score"]["cpu"] = (long long)(weight_cpu - (cpu*weight_cpu)/1000); + r["score"]["ram"] = (long long)(weight_ram - ((ramCurr * weight_ram) / ramMax)); + r["score"]["bw"] = (long long)(weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); + } } ///Fills out a by reference given JSON::Value with current streams. void fillStreams(JSON::Value & r){ @@ -89,30 +100,26 @@ class hostDetails{ r[jt->first] = r[jt->first].asInt() + jt->second.total; } } - ///Scores a potential new connection to this server, on a scale from 0 to 3200. - ///0 is horrible, 3200 is perfect. + ///Scores a potential new connection to this server + ///0 means not possible, the higher the better. unsigned int rate(std::string & s){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); - unsigned int score = 0; - if (!ramMax){ + if (!ramMax || !availBandwidth){ + WARN_MSG("Host %s invalid: RAM %llu, BW %llu", host.c_str(), ramMax, availBandwidth); return 0; } - //First, add current CPU/RAM left to the score, on a scale from 0 to 1000. - score += (1000 - cpu) + (1000 - ((ramCurr * 1000) / ramMax)); - //Next, we add 200 points if the stream is already available. - if (streams.count(s)){score += 200;} - //Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect. - long long bwscore = (1000 - ((upSpeed * 1000) / availBandwidth)); - if (bwscore < 0){bwscore = 0;} - long long bw_sub = ((addBandwidth * 1000) / availBandwidth); - if (bwscore - bw_sub < 0){bw_sub = bwscore;} - if (bwscore - bw_sub > 0){ - score += (bwscore - bw_sub); - }else{ - score = 0; + if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){ + INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth, availBandwidth); + return 0; } - MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s) -> %u", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, bwscore, bw_sub, availBandwidth / 1024 / 1024, score); + //Calculate score + unsigned int cpu_score = (weight_cpu - (cpu*weight_cpu)/1000); + unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax)); + unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); + unsigned int score = cpu_score + ram_score + bw_score + (streams.count(s)?weight_bonus:0); + //Print info on host + MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score); return score; } void addViewer(std::string & s){ @@ -125,7 +132,7 @@ class hostDetails{ if (total){ toAdd = (upSpeed + downSpeed) / total; }else{ - toAdd = (upSpeed + downSpeed) + 100000; + toAdd = 131072;//assume 1mbps } } //ensure reasonable limits of bandwidth guesses @@ -199,7 +206,7 @@ class hostDetails{ }else{ streams.clear(); } - addBandwidth *= 0.9; + addBandwidth *= 0.75; } }; @@ -242,6 +249,12 @@ int handleRequest(Socket::Connection & conn){ } } std::string stream = H.url.substr(1); + if (stream == "favicon.ico"){ + H.Clean(); + H.SendResponse("404", "No favicon", conn); + H.Clean(); + continue; + } INFO_MSG("Balancing stream %s", stream.c_str()); H.Clean(); H.SetHeader("Content-Type", "text/plain"); @@ -253,7 +266,6 @@ int handleRequest(Socket::Connection & conn){ bestHost = it->first; bestScore = score; } - INFO_MSG("%s scores %u", it->first.c_str(), score); } if (bestScore == 0){ bestHost = "FULL"; @@ -295,6 +307,7 @@ void handleServer(void * servName){ } hosts[host].availBandwidth = bandwidth.asInt(); + hosts[host].host = host; INFO_MSG("Monitoring %s on port %d.", host.c_str(), port, passphrase.c_str()); bool down = true; @@ -302,11 +315,11 @@ void handleServer(void * servName){ Socket::Connection servConn(host, port, false); while (cfg->is_active){ if (!servConn){ - WARN_MSG("Reconnecting to %s", host.c_str()); + HIGH_MSG("Reconnecting to %s", host.c_str()); servConn = Socket::Connection(host, port, false); } if (!servConn){ - FAIL_MSG("Can't reach server %s", host.c_str()); + MEDIUM_MSG("Can't reach server %s", host.c_str()); hosts[host].badNess(); Util::wait(5000); down = true; @@ -320,7 +333,8 @@ void handleServer(void * servName){ H.Clean(); unsigned int startTime = Util::epoch(); while (cfg->is_active && servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ - if (Util::epoch() - startTime > 2){ + if (Util::epoch() - startTime > 10){ + FAIL_MSG("Server %s timed out", host.c_str()); servConn.close(); H.Clean(); } @@ -329,6 +343,7 @@ void handleServer(void * servName){ JSON::Value servData = JSON::fromString(H.body); if (!servData){ FAIL_MSG("Can't retrieve server %s load information", host.c_str()); + std::cerr << H.body << std::endl; down = true; hosts[host].badNess(); servConn.close(); @@ -384,9 +399,41 @@ int main(int argc, char ** argv){ opt["value"][0u] = "root"; conf.addOption("username", opt); + opt["arg"] = "integer"; + opt["short"] = "R"; + opt["long"] = "ram"; + opt["help"] = "Weight for RAM scoring"; + opt["value"].append((long long)weight_ram); + conf.addOption("ram", opt); + + opt["arg"] = "integer"; + opt["short"] = "C"; + opt["long"] = "cpu"; + opt["help"] = "Weight for CPU scoring"; + opt["value"].append((long long)weight_cpu); + conf.addOption("cpu", opt); + + opt["arg"] = "integer"; + opt["short"] = "B"; + opt["long"] = "bw"; + opt["help"] = "Weight for BW scoring"; + opt["value"].append((long long)weight_bw); + conf.addOption("bw", opt); + + opt["arg"] = "integer"; + opt["short"] = "X"; + opt["long"] = "extra"; + opt["help"] = "Weight for extra scoring when stream exists"; + opt["value"].append((long long)weight_bonus); + conf.addOption("extra", opt); + conf.parseArgs(argc, argv); passphrase = conf.getOption("passphrase").asStringRef(); + weight_ram = conf.getInteger("ram"); + weight_cpu = conf.getInteger("cpu"); + weight_bw = conf.getInteger("bw"); + weight_bonus = conf.getInteger("extra"); JSON::Value & nodes = conf.getOption("server", true); WARN_MSG("Load balancer activating. Balancing between %llu nodes.", nodes.size());