Load balancer improvements / source selection
This commit is contained in:
parent
055c87a2b3
commit
2c5b5c0460
1 changed files with 64 additions and 12 deletions
|
@ -19,9 +19,10 @@ unsigned int weight_bw = 1000;
|
||||||
unsigned int weight_bonus = 50;
|
unsigned int weight_bonus = 50;
|
||||||
|
|
||||||
struct streamDetails{
|
struct streamDetails{
|
||||||
unsigned int total;
|
uint32_t total;
|
||||||
unsigned int bandwidth;
|
uint32_t inputs;
|
||||||
unsigned long long prevTotal;
|
uint32_t bandwidth;
|
||||||
|
uint32_t prevTotal;
|
||||||
};
|
};
|
||||||
|
|
||||||
class hostDetails{
|
class hostDetails{
|
||||||
|
@ -124,6 +125,29 @@ class hostDetails{
|
||||||
MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score);
|
MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score);
|
||||||
return score;
|
return score;
|
||||||
}
|
}
|
||||||
|
///Scores this server as a source
|
||||||
|
///0 means not possible, the higher the better.
|
||||||
|
unsigned int source(std::string & s){
|
||||||
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
||||||
|
if (!streams.count(s) || !streams[s].inputs){return 0;}
|
||||||
|
if (!ramMax || !availBandwidth){
|
||||||
|
WARN_MSG("Host %s invalid: RAM %llu, BW %llu", host.c_str(), ramMax, availBandwidth);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){
|
||||||
|
INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth, availBandwidth);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
//Calculate score
|
||||||
|
unsigned int cpu_score = (weight_cpu - (cpu*weight_cpu)/1000);
|
||||||
|
unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
|
||||||
|
unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
||||||
|
unsigned int score = cpu_score + ram_score + bw_score + 1;
|
||||||
|
//Print info on host
|
||||||
|
MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score);
|
||||||
|
return score;
|
||||||
|
}
|
||||||
void addViewer(std::string & s){
|
void addViewer(std::string & s){
|
||||||
if (!hostMutex){hostMutex = new tthread::mutex();}
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
||||||
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
||||||
|
@ -184,6 +208,7 @@ class hostDetails{
|
||||||
}
|
}
|
||||||
struct streamDetails & strm = streams[it.key()];
|
struct streamDetails & strm = streams[it.key()];
|
||||||
strm.total = (*it)["curr"][0u].asInt();
|
strm.total = (*it)["curr"][0u].asInt();
|
||||||
|
strm.inputs = (*it)["curr"][1u].asInt();
|
||||||
unsigned long long currTotal = (*it)["bw"][0u].asInt() + (*it)["bw"][1u].asInt();
|
unsigned long long currTotal = (*it)["bw"][0u].asInt() + (*it)["bw"][1u].asInt();
|
||||||
if (timeDiff && count){
|
if (timeDiff && count){
|
||||||
strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count;
|
strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count;
|
||||||
|
@ -225,6 +250,8 @@ int handleRequest(Socket::Connection & conn){
|
||||||
std::string host = H.GetVar("host");
|
std::string host = H.GetVar("host");
|
||||||
std::string stream = H.GetVar("stream");
|
std::string stream = H.GetVar("stream");
|
||||||
std::string viewers = H.GetVar("viewers");
|
std::string viewers = H.GetVar("viewers");
|
||||||
|
std::string source = H.GetVar("source");
|
||||||
|
std::string fback = H.GetVar("fallback");
|
||||||
H.Clean();
|
H.Clean();
|
||||||
H.SetHeader("Content-Type", "text/plain");
|
H.SetHeader("Content-Type", "text/plain");
|
||||||
JSON::Value ret;
|
JSON::Value ret;
|
||||||
|
@ -236,7 +263,33 @@ int handleRequest(Socket::Connection & conn){
|
||||||
H.SendResponse("200", "OK", conn);
|
H.SendResponse("200", "OK", conn);
|
||||||
H.Clean();
|
H.Clean();
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
if (source.size()){
|
||||||
|
INFO_MSG("Finding source for stream %s", source.c_str());
|
||||||
|
std::string bestHost = "";
|
||||||
|
unsigned int bestScore = 0;
|
||||||
|
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){
|
||||||
|
unsigned int score = it->second.source(source);
|
||||||
|
if (score > bestScore){
|
||||||
|
bestHost = "dtsc://"+it->first;
|
||||||
|
bestScore = score;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (bestScore == 0){
|
||||||
|
if (fback.size()){
|
||||||
|
bestHost = fback;
|
||||||
}else{
|
}else{
|
||||||
|
bestHost = fallback;
|
||||||
|
}
|
||||||
|
FAIL_MSG("No source for %s found!", source.c_str());
|
||||||
|
}else{
|
||||||
|
INFO_MSG("Winner: %s scores %u", bestHost.c_str(), bestScore);
|
||||||
|
}
|
||||||
|
H.SetBody(bestHost);
|
||||||
|
H.SendResponse("200", "OK", conn);
|
||||||
|
H.Clean();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!host.size() && !stream.size()){
|
if (!host.size() && !stream.size()){
|
||||||
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){
|
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){
|
||||||
it->second.fillState(ret[it->first]);
|
it->second.fillState(ret[it->first]);
|
||||||
|
@ -251,7 +304,6 @@ int handleRequest(Socket::Connection & conn){
|
||||||
H.Clean();
|
H.Clean();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
std::string stream = H.url.substr(1);
|
std::string stream = H.url.substr(1);
|
||||||
if (stream == "favicon.ico"){
|
if (stream == "favicon.ico"){
|
||||||
H.Clean();
|
H.Clean();
|
||||||
|
|
Loading…
Add table
Reference in a new issue