Load balancer fixes/improvements made during paper writing.

This commit is contained in:
Thulinma 2016-07-20 11:07:34 +02:00
parent 8219ac15ca
commit b608f011e3

View file

@ -23,8 +23,8 @@ class hostDetails{
tthread::mutex * hostMutex; tthread::mutex * hostMutex;
std::map<std::string, struct streamDetails> streams; std::map<std::string, struct streamDetails> streams;
unsigned int cpu; unsigned int cpu;
unsigned int ramMax; unsigned long long ramMax;
unsigned int ramCurr; unsigned long long ramCurr;
unsigned int upSpeed; unsigned int upSpeed;
unsigned int downSpeed; unsigned int downSpeed;
unsigned int total; unsigned int total;
@ -95,29 +95,47 @@ class hostDetails{
//Next, we add 200 points if the stream is already available. //Next, we add 200 points if the stream is already available.
if (streams.count(s)){score += 200;} if (streams.count(s)){score += 200;}
//Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect. //Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect.
score += (1000 - (((upSpeed + addBandwidth) * 1000) / availBandwidth)); long long bwscore = (1000 - ((upSpeed * 1000) / availBandwidth));
MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024); if (bwscore < 0){bwscore = 0;}
long long bw_sub = ((addBandwidth * 1000) / availBandwidth);
if (bwscore - bw_sub < 0){bw_sub = bwscore;}
score += (bwscore - bw_sub);
MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, bwscore, bw_sub, availBandwidth / 1024 / 1024);
return score; return score;
} }
void addViewer(std::string & s){ void addViewer(std::string & s){
if (!hostMutex){hostMutex = new tthread::mutex();} if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex); tthread::lock_guard<tthread::mutex> guard(*hostMutex);
unsigned long long toAdd = 0;
if (streams.count(s)){ if (streams.count(s)){
addBandwidth += streams[s].bandwidth; toAdd = streams[s].bandwidth;
}else{ }else{
if (total){ if (total){
addBandwidth += (upSpeed + downSpeed) / total; toAdd = (upSpeed + downSpeed) / total;
}else{ }else{
addBandwidth += (upSpeed + downSpeed) + 100000; toAdd = (upSpeed + downSpeed) + 100000;
} }
} }
//ensure reasonable limits of bandwidth guesses
if (toAdd < 64*1024){toAdd = 64*1024;}//minimum of 0.5 mbps
if (toAdd > 1024*1024){toAdd = 1024*1024;}//maximum of 8 mbps
addBandwidth += toAdd;
} }
void update(JSON::Value & d){ void update(JSON::Value & d){
if (!hostMutex){hostMutex = new tthread::mutex();} if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex); tthread::lock_guard<tthread::mutex> guard(*hostMutex);
cpu = d["cpu"].asInt(); cpu = d["cpu"].asInt();
ramMax = d["mem_total"].asInt(); long long nRamMax = d["mem_total"].asInt();
ramCurr = d["mem_used"].asInt(); long long nRamCur = d["mem_used"].asInt();
long long nShmMax = d["shm_total"].asInt();
long long nShmCur = d["shm_used"].asInt();
if (((nRamCur + nShmCur)*1000) / nRamMax > (nShmCur*1000) / nShmMax){
ramMax = nRamMax;
ramCurr = nRamCur + nShmCur;
}else{
ramMax = nShmMax;
ramCurr = nShmCur;
}
total = d["curr"][0u].asInt(); total = d["curr"][0u].asInt();
unsigned long long currUp = d["bw"][0u].asInt(), currDown = d["bw"][1u].asInt(); unsigned long long currUp = d["bw"][0u].asInt(), currDown = d["bw"][1u].asInt();
unsigned int timeDiff = 0; unsigned int timeDiff = 0;
@ -155,6 +173,19 @@ class hostDetails{
} }
strm.prevTotal = currTotal; strm.prevTotal = currTotal;
} }
if (streams.size()){
std::set<std::string> eraseList;
for (std::map<std::string, struct streamDetails>::iterator it = streams.begin(); it != streams.end(); ++it){
if (!d["streams"].isMember(it->first)){
eraseList.insert(it->first);
}
}
for (std::set<std::string>::iterator it = eraseList.begin(); it != eraseList.end(); ++it){
streams.erase(*it);
}
}
}else{
streams.clear();
} }
addBandwidth *= 0.9; addBandwidth *= 0.9;
} }
@ -215,6 +246,7 @@ int handleRequest(Socket::Connection & conn){
}//if HTTP request received }//if HTTP request received
} }
conn.close(); conn.close();
return 0;
} }
void handleServer(void * servName){ void handleServer(void * servName){
@ -263,8 +295,12 @@ void handleServer(void * servName){
H.SendRequest(servConn); H.SendRequest(servConn);
H.Clean(); H.Clean();
unsigned int startTime = Util::epoch(); unsigned int startTime = Util::epoch();
while (servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ while (cfg->is_active && servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){
Util::sleep(100); if (Util::epoch() - startTime > 2){
servConn.close();
H.Clean();
}
Util::sleep(250);
} }
JSON::Value servData = JSON::fromString(H.body); JSON::Value servData = JSON::fromString(H.body);
if (!servData){ if (!servData){