Added variable weights to the load balancer, improved timeout behaviour and verbosity
This commit is contained in:
parent
ba7467979a
commit
cbfb3d62bf
1 changed files with 71 additions and 24 deletions
|
@ -11,6 +11,10 @@
|
||||||
|
|
||||||
Util::Config * cfg = 0;
|
Util::Config * cfg = 0;
|
||||||
std::string passphrase;
|
std::string passphrase;
|
||||||
|
unsigned int weight_cpu = 500;
|
||||||
|
unsigned int weight_ram = 500;
|
||||||
|
unsigned int weight_bw = 1000;
|
||||||
|
unsigned int weight_bonus = 50;
|
||||||
|
|
||||||
struct streamDetails{
|
struct streamDetails{
|
||||||
unsigned int total;
|
unsigned int total;
|
||||||
|
@ -33,6 +37,7 @@ class hostDetails{
|
||||||
unsigned long long prevTime;
|
unsigned long long prevTime;
|
||||||
unsigned long long addBandwidth;
|
unsigned long long addBandwidth;
|
||||||
public:
|
public:
|
||||||
|
std::string host;
|
||||||
unsigned long long availBandwidth;
|
unsigned long long availBandwidth;
|
||||||
hostDetails(){
|
hostDetails(){
|
||||||
hostMutex = 0;
|
hostMutex = 0;
|
||||||
|
@ -77,9 +82,15 @@ class hostDetails{
|
||||||
r["cpu"] = (long long)(cpu/10);
|
r["cpu"] = (long long)(cpu/10);
|
||||||
if (ramMax){r["ram"] = (long long)((ramCurr*100) / ramMax);}
|
if (ramMax){r["ram"] = (long long)((ramCurr*100) / ramMax);}
|
||||||
r["up"] = (long long)upSpeed;
|
r["up"] = (long long)upSpeed;
|
||||||
|
r["up_add"] = (long long)addBandwidth;
|
||||||
r["down"] = (long long)downSpeed;
|
r["down"] = (long long)downSpeed;
|
||||||
r["streams"] = (long long)streams.size();
|
r["streams"] = (long long)streams.size();
|
||||||
r["viewers"] = (long long)total;
|
r["viewers"] = (long long)total;
|
||||||
|
if (ramMax && availBandwidth){
|
||||||
|
r["score"]["cpu"] = (long long)(weight_cpu - (cpu*weight_cpu)/1000);
|
||||||
|
r["score"]["ram"] = (long long)(weight_ram - ((ramCurr * weight_ram) / ramMax));
|
||||||
|
r["score"]["bw"] = (long long)(weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
///Fills out a by reference given JSON::Value with current streams.
|
///Fills out a by reference given JSON::Value with current streams.
|
||||||
void fillStreams(JSON::Value & r){
|
void fillStreams(JSON::Value & r){
|
||||||
|
@ -89,30 +100,26 @@ class hostDetails{
|
||||||
r[jt->first] = r[jt->first].asInt() + jt->second.total;
|
r[jt->first] = r[jt->first].asInt() + jt->second.total;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
///Scores a potential new connection to this server, on a scale from 0 to 3200.
|
///Scores a potential new connection to this server
|
||||||
///0 is horrible, 3200 is perfect.
|
///0 means not possible, the higher the better.
|
||||||
unsigned int rate(std::string & s){
|
unsigned int rate(std::string & s){
|
||||||
if (!hostMutex){hostMutex = new tthread::mutex();}
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
||||||
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
||||||
unsigned int score = 0;
|
if (!ramMax || !availBandwidth){
|
||||||
if (!ramMax){
|
WARN_MSG("Host %s invalid: RAM %llu, BW %llu", host.c_str(), ramMax, availBandwidth);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
//First, add current CPU/RAM left to the score, on a scale from 0 to 1000.
|
if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){
|
||||||
score += (1000 - cpu) + (1000 - ((ramCurr * 1000) / ramMax));
|
INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth, availBandwidth);
|
||||||
//Next, we add 200 points if the stream is already available.
|
return 0;
|
||||||
if (streams.count(s)){score += 200;}
|
|
||||||
//Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect.
|
|
||||||
long long bwscore = (1000 - ((upSpeed * 1000) / availBandwidth));
|
|
||||||
if (bwscore < 0){bwscore = 0;}
|
|
||||||
long long bw_sub = ((addBandwidth * 1000) / availBandwidth);
|
|
||||||
if (bwscore - bw_sub < 0){bw_sub = bwscore;}
|
|
||||||
if (bwscore - bw_sub > 0){
|
|
||||||
score += (bwscore - bw_sub);
|
|
||||||
}else{
|
|
||||||
score = 0;
|
|
||||||
}
|
}
|
||||||
MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s) -> %u", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, bwscore, bw_sub, availBandwidth / 1024 / 1024, score);
|
//Calculate score
|
||||||
|
unsigned int cpu_score = (weight_cpu - (cpu*weight_cpu)/1000);
|
||||||
|
unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
|
||||||
|
unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
||||||
|
unsigned int score = cpu_score + ram_score + bw_score + (streams.count(s)?weight_bonus:0);
|
||||||
|
//Print info on host
|
||||||
|
MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score);
|
||||||
return score;
|
return score;
|
||||||
}
|
}
|
||||||
void addViewer(std::string & s){
|
void addViewer(std::string & s){
|
||||||
|
@ -125,7 +132,7 @@ class hostDetails{
|
||||||
if (total){
|
if (total){
|
||||||
toAdd = (upSpeed + downSpeed) / total;
|
toAdd = (upSpeed + downSpeed) / total;
|
||||||
}else{
|
}else{
|
||||||
toAdd = (upSpeed + downSpeed) + 100000;
|
toAdd = 131072;//assume 1mbps
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//ensure reasonable limits of bandwidth guesses
|
//ensure reasonable limits of bandwidth guesses
|
||||||
|
@ -199,7 +206,7 @@ class hostDetails{
|
||||||
}else{
|
}else{
|
||||||
streams.clear();
|
streams.clear();
|
||||||
}
|
}
|
||||||
addBandwidth *= 0.9;
|
addBandwidth *= 0.75;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -242,6 +249,12 @@ int handleRequest(Socket::Connection & conn){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::string stream = H.url.substr(1);
|
std::string stream = H.url.substr(1);
|
||||||
|
if (stream == "favicon.ico"){
|
||||||
|
H.Clean();
|
||||||
|
H.SendResponse("404", "No favicon", conn);
|
||||||
|
H.Clean();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
INFO_MSG("Balancing stream %s", stream.c_str());
|
INFO_MSG("Balancing stream %s", stream.c_str());
|
||||||
H.Clean();
|
H.Clean();
|
||||||
H.SetHeader("Content-Type", "text/plain");
|
H.SetHeader("Content-Type", "text/plain");
|
||||||
|
@ -253,7 +266,6 @@ int handleRequest(Socket::Connection & conn){
|
||||||
bestHost = it->first;
|
bestHost = it->first;
|
||||||
bestScore = score;
|
bestScore = score;
|
||||||
}
|
}
|
||||||
INFO_MSG("%s scores %u", it->first.c_str(), score);
|
|
||||||
}
|
}
|
||||||
if (bestScore == 0){
|
if (bestScore == 0){
|
||||||
bestHost = "FULL";
|
bestHost = "FULL";
|
||||||
|
@ -295,6 +307,7 @@ void handleServer(void * servName){
|
||||||
}
|
}
|
||||||
|
|
||||||
hosts[host].availBandwidth = bandwidth.asInt();
|
hosts[host].availBandwidth = bandwidth.asInt();
|
||||||
|
hosts[host].host = host;
|
||||||
|
|
||||||
INFO_MSG("Monitoring %s on port %d.", host.c_str(), port, passphrase.c_str());
|
INFO_MSG("Monitoring %s on port %d.", host.c_str(), port, passphrase.c_str());
|
||||||
bool down = true;
|
bool down = true;
|
||||||
|
@ -302,11 +315,11 @@ void handleServer(void * servName){
|
||||||
Socket::Connection servConn(host, port, false);
|
Socket::Connection servConn(host, port, false);
|
||||||
while (cfg->is_active){
|
while (cfg->is_active){
|
||||||
if (!servConn){
|
if (!servConn){
|
||||||
WARN_MSG("Reconnecting to %s", host.c_str());
|
HIGH_MSG("Reconnecting to %s", host.c_str());
|
||||||
servConn = Socket::Connection(host, port, false);
|
servConn = Socket::Connection(host, port, false);
|
||||||
}
|
}
|
||||||
if (!servConn){
|
if (!servConn){
|
||||||
FAIL_MSG("Can't reach server %s", host.c_str());
|
MEDIUM_MSG("Can't reach server %s", host.c_str());
|
||||||
hosts[host].badNess();
|
hosts[host].badNess();
|
||||||
Util::wait(5000);
|
Util::wait(5000);
|
||||||
down = true;
|
down = true;
|
||||||
|
@ -320,7 +333,8 @@ void handleServer(void * servName){
|
||||||
H.Clean();
|
H.Clean();
|
||||||
unsigned int startTime = Util::epoch();
|
unsigned int startTime = Util::epoch();
|
||||||
while (cfg->is_active && servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){
|
while (cfg->is_active && servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){
|
||||||
if (Util::epoch() - startTime > 2){
|
if (Util::epoch() - startTime > 10){
|
||||||
|
FAIL_MSG("Server %s timed out", host.c_str());
|
||||||
servConn.close();
|
servConn.close();
|
||||||
H.Clean();
|
H.Clean();
|
||||||
}
|
}
|
||||||
|
@ -329,6 +343,7 @@ void handleServer(void * servName){
|
||||||
JSON::Value servData = JSON::fromString(H.body);
|
JSON::Value servData = JSON::fromString(H.body);
|
||||||
if (!servData){
|
if (!servData){
|
||||||
FAIL_MSG("Can't retrieve server %s load information", host.c_str());
|
FAIL_MSG("Can't retrieve server %s load information", host.c_str());
|
||||||
|
std::cerr << H.body << std::endl;
|
||||||
down = true;
|
down = true;
|
||||||
hosts[host].badNess();
|
hosts[host].badNess();
|
||||||
servConn.close();
|
servConn.close();
|
||||||
|
@ -384,9 +399,41 @@ int main(int argc, char ** argv){
|
||||||
opt["value"][0u] = "root";
|
opt["value"][0u] = "root";
|
||||||
conf.addOption("username", opt);
|
conf.addOption("username", opt);
|
||||||
|
|
||||||
|
opt["arg"] = "integer";
|
||||||
|
opt["short"] = "R";
|
||||||
|
opt["long"] = "ram";
|
||||||
|
opt["help"] = "Weight for RAM scoring";
|
||||||
|
opt["value"].append((long long)weight_ram);
|
||||||
|
conf.addOption("ram", opt);
|
||||||
|
|
||||||
|
opt["arg"] = "integer";
|
||||||
|
opt["short"] = "C";
|
||||||
|
opt["long"] = "cpu";
|
||||||
|
opt["help"] = "Weight for CPU scoring";
|
||||||
|
opt["value"].append((long long)weight_cpu);
|
||||||
|
conf.addOption("cpu", opt);
|
||||||
|
|
||||||
|
opt["arg"] = "integer";
|
||||||
|
opt["short"] = "B";
|
||||||
|
opt["long"] = "bw";
|
||||||
|
opt["help"] = "Weight for BW scoring";
|
||||||
|
opt["value"].append((long long)weight_bw);
|
||||||
|
conf.addOption("bw", opt);
|
||||||
|
|
||||||
|
opt["arg"] = "integer";
|
||||||
|
opt["short"] = "X";
|
||||||
|
opt["long"] = "extra";
|
||||||
|
opt["help"] = "Weight for extra scoring when stream exists";
|
||||||
|
opt["value"].append((long long)weight_bonus);
|
||||||
|
conf.addOption("extra", opt);
|
||||||
|
|
||||||
conf.parseArgs(argc, argv);
|
conf.parseArgs(argc, argv);
|
||||||
|
|
||||||
passphrase = conf.getOption("passphrase").asStringRef();
|
passphrase = conf.getOption("passphrase").asStringRef();
|
||||||
|
weight_ram = conf.getInteger("ram");
|
||||||
|
weight_cpu = conf.getInteger("cpu");
|
||||||
|
weight_bw = conf.getInteger("bw");
|
||||||
|
weight_bonus = conf.getInteger("extra");
|
||||||
|
|
||||||
JSON::Value & nodes = conf.getOption("server", true);
|
JSON::Value & nodes = conf.getOption("server", true);
|
||||||
WARN_MSG("Load balancer activating. Balancing between %llu nodes.", nodes.size());
|
WARN_MSG("Load balancer activating. Balancing between %llu nodes.", nodes.size());
|
||||||
|
|
Loading…
Add table
Reference in a new issue