Load balancer can now on-the-fly add/remove servers and change weights, load balancer now reads bandwidth limits from servers, if provided

This commit is contained in:
Thulinma 2017-06-24 19:06:00 +02:00
parent 4a5599f802
commit 0fc40fb5a2

View file

@ -1,13 +1,14 @@
#include <cstdint>
#include <cstdlib> #include <cstdlib>
#include <iostream>
#include <fstream> #include <fstream>
#include <string> #include <iostream>
#include <set>
#include <mist/config.h> #include <mist/config.h>
#include <mist/defines.h> #include <mist/defines.h>
#include <mist/timing.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/timing.h>
#include <mist/tinythread.h> #include <mist/tinythread.h>
#include <set>
#include <string>
Util::Config *cfg = 0; Util::Config *cfg = 0;
std::string passphrase; std::string passphrase;
@ -17,6 +18,22 @@ unsigned int weight_cpu = 500;
unsigned int weight_ram = 500; unsigned int weight_ram = 500;
unsigned int weight_bw = 1000; unsigned int weight_bw = 1000;
unsigned int weight_bonus = 50; unsigned int weight_bonus = 50;
char *hostsCounter = 0; // This is a pointer to guarantee atomic accesses.
#define HOSTLOOP \
unsigned long i = 0; \
i < reinterpret_cast<std::uintptr_t>(hostsCounter); \
++i
#define HOST(no) (hosts[reinterpret_cast<std::uintptr_t>(no)])
#define HOSTCHECK \
if (hosts[i].state != STATE_ONLINE){continue;}
#define STATE_OFF 0
#define STATE_BOOT 1
#define STATE_ONLINE 2
#define STATE_GODOWN 3
#define STATE_REQCLEAN 4
const char *stateLookup[] ={"Offline", "Starting monitoring", "Monitored", "Requesting stop",
"Requesting clean"};
struct streamDetails{ struct streamDetails{
uint32_t total; uint32_t total;
@ -39,6 +56,7 @@ class hostDetails{
unsigned long long downPrev; unsigned long long downPrev;
unsigned long long prevTime; unsigned long long prevTime;
unsigned long long addBandwidth; unsigned long long addBandwidth;
public: public:
std::string host; std::string host;
unsigned long long availBandwidth; unsigned long long availBandwidth;
@ -89,17 +107,20 @@ class hostDetails{
r["down"] = (long long)downSpeed; r["down"] = (long long)downSpeed;
r["streams"] = (long long)streams.size(); r["streams"] = (long long)streams.size();
r["viewers"] = (long long)total; r["viewers"] = (long long)total;
r["bwlimit"] = (long long)availBandwidth;
if (ramMax && availBandwidth){ if (ramMax && availBandwidth){
r["score"]["cpu"] = (long long)(weight_cpu - (cpu * weight_cpu) / 1000); r["score"]["cpu"] = (long long)(weight_cpu - (cpu * weight_cpu) / 1000);
r["score"]["ram"] = (long long)(weight_ram - ((ramCurr * weight_ram) / ramMax)); r["score"]["ram"] = (long long)(weight_ram - ((ramCurr * weight_ram) / ramMax));
r["score"]["bw"] = (long long)(weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); r["score"]["bw"] =
(long long)(weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
} }
} }
/// Fills out a by reference given JSON::Value with current streams. /// Fills out a by reference given JSON::Value with current streams.
void fillStreams(JSON::Value &r){ void fillStreams(JSON::Value &r){
if (!hostMutex){hostMutex = new tthread::mutex();} if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex); tthread::lock_guard<tthread::mutex> guard(*hostMutex);
for (std::map<std::string, struct streamDetails>::iterator jt = streams.begin(); jt != streams.end(); ++jt){ for (std::map<std::string, struct streamDetails>::iterator jt = streams.begin();
jt != streams.end(); ++jt){
r[jt->first] = r[jt->first].asInt() + jt->second.total; r[jt->first] = r[jt->first].asInt() + jt->second.total;
} }
} }
@ -113,7 +134,8 @@ class hostDetails{
return 0; return 0;
} }
if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){ if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){
INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth, availBandwidth); INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth,
availBandwidth);
return 0; return 0;
} }
// Calculate score // Calculate score
@ -122,7 +144,9 @@ class hostDetails{
unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
unsigned int score = cpu_score + ram_score + bw_score + (streams.count(s) ? weight_bonus : 0); unsigned int score = cpu_score + ram_score + bw_score + (streams.count(s) ? weight_bonus : 0);
// Print info on host // Print info on host
MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score); MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(),
cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
availBandwidth / 1024 / 1024, score);
return score; return score;
} }
/// Scores this server as a source /// Scores this server as a source
@ -136,7 +160,8 @@ class hostDetails{
return 1; return 1;
} }
if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){ if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){
INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth, availBandwidth); INFO_MSG("Host %s over bandwidth: %llu+%llu >= %llu", host.c_str(), upSpeed, addBandwidth,
availBandwidth);
return 1; return 1;
} }
// Calculate score // Calculate score
@ -145,7 +170,9 @@ class hostDetails{
unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
unsigned int score = cpu_score + ram_score + bw_score + 1; unsigned int score = cpu_score + ram_score + bw_score + 1;
// Print info on host // Print info on host
MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), cpu_score, ram_score, streams.count(s)?weight_bonus:0, bw_score, availBandwidth / 1024 / 1024, score); MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(),
cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
availBandwidth / 1024 / 1024, score);
return score; return score;
} }
void addViewer(std::string &s){ void addViewer(std::string &s){
@ -170,6 +197,9 @@ class hostDetails{
if (!hostMutex){hostMutex = new tthread::mutex();} if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex); tthread::lock_guard<tthread::mutex> guard(*hostMutex);
cpu = d["cpu"].asInt(); cpu = d["cpu"].asInt();
if (d.isMember("bwlimit") && d["bwlimit"].asInt()){
availBandwidth = d["bwlimit"].asInt();
}
long long nRamMax = d["mem_total"].asInt(); long long nRamMax = d["mem_total"].asInt();
long long nRamCur = d["mem_used"].asInt(); long long nRamCur = d["mem_used"].asInt();
long long nShmMax = d["shm_total"].asInt(); long long nShmMax = d["shm_total"].asInt();
@ -199,11 +229,10 @@ class hostDetails{
if (d.isMember("streams") && d["streams"].size()){ if (d.isMember("streams") && d["streams"].size()){
jsonForEach(d["streams"], it){ jsonForEach(d["streams"], it){
unsigned int count = (*it)["curr"][0u].asInt() + (*it)["curr"][1u].asInt() + (*it)["curr"][2u].asInt(); unsigned int count =
(*it)["curr"][0u].asInt() + (*it)["curr"][1u].asInt() + (*it)["curr"][2u].asInt();
if (!count){ if (!count){
if (streams.count(it.key())){ if (streams.count(it.key())){streams.erase(it.key());}
streams.erase(it.key());
}
continue; continue;
} }
struct streamDetails &strm = streams[it.key()]; struct streamDetails &strm = streams[it.key()];
@ -223,10 +252,9 @@ class hostDetails{
} }
if (streams.size()){ if (streams.size()){
std::set<std::string> eraseList; std::set<std::string> eraseList;
for (std::map<std::string, struct streamDetails>::iterator it = streams.begin(); it != streams.end(); ++it){ for (std::map<std::string, struct streamDetails>::iterator it = streams.begin();
if (!d["streams"].isMember(it->first)){ it != streams.end(); ++it){
eraseList.insert(it->first); if (!d["streams"].isMember(it->first)){eraseList.insert(it->first);}
}
} }
for (std::set<std::string>::iterator it = eraseList.begin(); it != eraseList.end(); ++it){ for (std::set<std::string>::iterator it = eraseList.begin(); it != eraseList.end(); ++it){
streams.erase(*it); streams.erase(*it);
@ -239,39 +267,138 @@ class hostDetails{
} }
}; };
std::map<std::string, hostDetails> hosts; /// Fixed-size struct for holding a host's name and details pointer
struct hostEntry{
uint8_t state; // 0 = off, 1 = booting, 2 = running, 3 = requesting shutdown, 4 = requesting clean
char name[200]; // 200 chars should be enough
hostDetails *details; /// hostDetails pointer
tthread::thread *thread; /// thread pointer
};
hostEntry hosts[1000]; /// Fixed-size array holding all hosts
void initHost(hostEntry &H, const std::string &N);
void cleanupHost(hostEntry &H);
int handleRequest(Socket::Connection &conn){ int handleRequest(Socket::Connection &conn){
HTTP::Parser H; HTTP::Parser H;
while (conn){ while (conn){
if ((conn.spool() || conn.Received().size()) && H.Read(conn)){ if ((conn.spool() || conn.Received().size()) && H.Read(conn)){
// Special commands
if (H.url.size() == 1){ if (H.url.size() == 1){
std::string host = H.GetVar("host"); std::string host = H.GetVar("host");
std::string stream = H.GetVar("stream");
std::string viewers = H.GetVar("viewers"); std::string viewers = H.GetVar("viewers");
std::string source = H.GetVar("source"); std::string source = H.GetVar("source");
std::string fback = H.GetVar("fallback"); std::string fback = H.GetVar("fallback");
std::string lstserver = H.GetVar("lstserver");
std::string addserver = H.GetVar("addserver");
std::string delserver = H.GetVar("delserver");
std::string weights = H.GetVar("weights");
H.Clean(); H.Clean();
H.SetHeader("Content-Type", "text/plain"); H.SetHeader("Content-Type", "text/plain");
JSON::Value ret; JSON::Value ret;
if (viewers.size()){ //Get/set weights
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){ if (weights.size()){
it->second.fillStreams(ret); JSON::Value newVals = JSON::fromString(weights);
if (newVals.isMember("cpu")){weight_cpu = newVals["cpu"].asInt();}
if (newVals.isMember("ram")){weight_ram = newVals["ram"].asInt();}
if (newVals.isMember("bw")){weight_bw = newVals["bw"].asInt();}
if (newVals.isMember("bonus")){weight_bonus = newVals["bonus"].asInt();}
ret["cpu"] = weight_cpu;
ret["ram"] = weight_ram;
ret["bw"] = weight_bw;
ret["bonus"] = weight_bonus;
H.SetBody(ret.toString());
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
// Get server list
if (lstserver.size()){
for (HOSTLOOP){
HOSTCHECK;
ret[(std::string)hosts[i].name] = stateLookup[hosts[i].state];
} }
H.SetBody(ret.toPrettyString()); H.SetBody(ret.toPrettyString());
H.SendResponse("200", "OK", conn); H.SendResponse("200", "OK", conn);
H.Clean(); H.Clean();
continue; continue;
} }
// Remove server from list
if (delserver.size()){
ret = "Server not monitored - could not delete from monitored server list!";
for (HOSTLOOP){
HOSTCHECK;
if ((std::string)hosts[i].name == delserver){
cleanupHost(hosts[i]);
ret = stateLookup[hosts[i].state];
}
}
H.SetBody(ret.toPrettyString());
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
// Add server to list
if (addserver.size()){
if (addserver.size() > 199){
H.SetBody("Host length too long for monitoring");
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
bool stop = false;
hostEntry *newEntry = 0;
for (HOSTLOOP){
HOSTCHECK;
if ((std::string)hosts[i].name == addserver){
stop = true;
break;
}
}
if (stop){
ret = "Server already monitored - add request ignored";
}else{
for (HOSTLOOP){
if (hosts[i].state == STATE_OFF){
initHost(hosts[i], addserver);
newEntry = &(hosts[i]);
stop = true;
}
}
if (!stop){
initHost(HOST(hostsCounter), addserver);
newEntry = &HOST(hostsCounter);
++hostsCounter; // up the hosts counter
}
ret[addserver] = stateLookup[newEntry->state];
}
H.SetBody(ret.toPrettyString());
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
// Request viewer count
if (viewers.size()){
for (HOSTLOOP){
HOSTCHECK;
HOST(i).details->fillStreams(ret);
}
H.SetBody(ret.toPrettyString());
H.SendResponse("200", "OK", conn);
H.Clean();
continue;
}
// Find source for given stream
if (source.size()){ if (source.size()){
INFO_MSG("Finding source for stream %s", source.c_str()); INFO_MSG("Finding source for stream %s", source.c_str());
std::string bestHost = ""; std::string bestHost = "";
unsigned int bestScore = 0; unsigned int bestScore = 0;
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){ for (HOSTLOOP){
unsigned int score = it->second.source(source); HOSTCHECK;
unsigned int score = HOST(i).details->source(source);
if (score > bestScore){ if (score > bestScore){
bestHost = "dtsc://"+it->first; bestHost = "dtsc://" + HOST(i).details->host;
bestScore = score; bestScore = score;
} }
} }
@ -290,13 +417,19 @@ int handleRequest(Socket::Connection & conn){
H.Clean(); H.Clean();
continue; continue;
} }
if (!host.size() && !stream.size()){ // Find host(s) status
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){ if (!host.size()){
it->second.fillState(ret[it->first]); for (HOSTLOOP){
HOSTCHECK;
HOST(i).details->fillState(ret[HOST(i).details->host]);
} }
}else{ }else{
if (hosts.count(host)){ for (HOSTLOOP){
hosts[host].fillState(ret); HOSTCHECK;
if (HOST(i).details->host == host){
HOST(i).details->fillState(ret);
break;
}
} }
} }
H.SetBody(ret.toPrettyString()); H.SetBody(ret.toPrettyString());
@ -304,6 +437,7 @@ int handleRequest(Socket::Connection & conn){
H.Clean(); H.Clean();
continue; continue;
} }
// Balance given stream
std::string stream = H.url.substr(1); std::string stream = H.url.substr(1);
if (stream == "favicon.ico"){ if (stream == "favicon.ico"){
H.Clean(); H.Clean();
@ -314,23 +448,24 @@ int handleRequest(Socket::Connection & conn){
INFO_MSG("Balancing stream %s", stream.c_str()); INFO_MSG("Balancing stream %s", stream.c_str());
H.Clean(); H.Clean();
H.SetHeader("Content-Type", "text/plain"); H.SetHeader("Content-Type", "text/plain");
std::string bestHost = ""; hostEntry *bestHost = 0;
unsigned int bestScore = 0; unsigned int bestScore = 0;
for (std::map<std::string, hostDetails>::iterator it = hosts.begin(); it != hosts.end(); ++it){ for (HOSTLOOP){
unsigned int score = it->second.rate(stream); HOSTCHECK;
unsigned int score = HOST(i).details->rate(stream);
if (score > bestScore){ if (score > bestScore){
bestHost = it->first; bestHost = &HOST(i);
bestScore = score; bestScore = score;
} }
} }
if (bestScore == 0){ if (!bestScore || !bestHost){
bestHost = fallback; H.SetBody(fallback);
FAIL_MSG("All servers seem to be out of bandwidth!"); FAIL_MSG("All servers seem to be out of bandwidth!");
}else{ }else{
INFO_MSG("Winner: %s scores %u", bestHost.c_str(), bestScore); INFO_MSG("Winner: %s scores %u", bestHost->details->host.c_str(), bestScore);
hosts[bestHost].addViewer(stream); bestHost->details->addViewer(stream);
H.SetBody(bestHost->details->host);
} }
H.SetBody(bestHost);
H.SendResponse("200", "OK", conn); H.SendResponse("200", "OK", conn);
H.Clean(); H.Clean();
}// if HTTP request received }// if HTTP request received
@ -339,8 +474,9 @@ int handleRequest(Socket::Connection & conn){
return 0; return 0;
} }
void handleServer(void * servName){ void handleServer(void *hostEntryPointer){
std::string & name = *(std::string*)servName; hostEntry *entry = (hostEntry *)hostEntryPointer;
std::string name = entry->name;
HTTP::Parser H; HTTP::Parser H;
std::string host; std::string host;
@ -362,21 +498,21 @@ void handleServer(void * servName){
host = name; host = name;
} }
hosts[host].availBandwidth = bandwidth.asInt(); INFO_MSG("Monitoring %s on port %d using passphrase %s", host.c_str(), port, passphrase.c_str());
hosts[host].host = host; entry->details->availBandwidth = bandwidth.asInt();
entry->details->host = host;
INFO_MSG("Monitoring %s on port %d.", host.c_str(), port, passphrase.c_str()); entry->state = STATE_ONLINE;
bool down = true; bool down = true;
Socket::Connection servConn(host, port, false); Socket::Connection servConn(host, port, false);
while (cfg->is_active){ while (cfg->is_active && (entry->state == STATE_ONLINE)){
if (!servConn){ if (!servConn){
HIGH_MSG("Reconnecting to %s", host.c_str()); HIGH_MSG("Reconnecting to %s", host.c_str());
servConn = Socket::Connection(host, port, false); servConn = Socket::Connection(host, port, false);
} }
if (!servConn){ if (!servConn){
MEDIUM_MSG("Can't reach server %s", host.c_str()); MEDIUM_MSG("Cannot reach server %s", host.c_str());
hosts[host].badNess(); entry->details->badNess();
Util::wait(5000); Util::wait(5000);
down = true; down = true;
continue; continue;
@ -388,7 +524,8 @@ void handleServer(void * servName){
H.SendRequest(servConn); H.SendRequest(servConn);
H.Clean(); H.Clean();
unsigned int startTime = Util::epoch(); unsigned int startTime = Util::epoch();
while (cfg->is_active && servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ while (cfg->is_active && servConn &&
!((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){
if (Util::epoch() - startTime > 10){ if (Util::epoch() - startTime > 10){
FAIL_MSG("Server %s timed out", host.c_str()); FAIL_MSG("Server %s timed out", host.c_str());
servConn.close(); servConn.close();
@ -401,22 +538,24 @@ void handleServer(void * servName){
FAIL_MSG("Can't retrieve server %s load information", host.c_str()); FAIL_MSG("Can't retrieve server %s load information", host.c_str());
std::cerr << H.body << std::endl; std::cerr << H.body << std::endl;
down = true; down = true;
hosts[host].badNess(); entry->details->badNess();
servConn.close(); servConn.close();
}else{ }else{
if (down){ if (down){
WARN_MSG("Connection established with %s", host.c_str()); WARN_MSG("Connection established with %s", host.c_str());
down = false; down = false;
} }
hosts[host].update(servData); entry->details->update(servData);
} }
H.Clean(); H.Clean();
Util::wait(5000); Util::wait(5000);
} }
servConn.close(); servConn.close();
entry->state = STATE_REQCLEAN;
} }
int main(int argc, char **argv){ int main(int argc, char **argv){
memset(hosts, 0, sizeof(hosts)); // zero-fill the hosts list
Util::Config conf(argv[0]); Util::Config conf(argv[0]);
cfg = &conf; cfg = &conf;
@ -500,13 +639,18 @@ int main(int argc, char ** argv){
fallback = conf.getString("fallback"); fallback = conf.getString("fallback");
JSON::Value &nodes = conf.getOption("server", true); JSON::Value &nodes = conf.getOption("server", true);
WARN_MSG("Load balancer activating. Balancing between %llu nodes.", nodes.size());
conf.activate(); conf.activate();
std::map<std::string, tthread::thread *> threads; std::map<std::string, tthread::thread *> threads;
jsonForEach(nodes, it){ jsonForEach(nodes, it){
threads[it->asStringRef()] = new tthread::thread(handleServer, (void*)&(it->asStringRef())); if (it->asStringRef().size() > 199){
FAIL_MSG("Host length too long for monitoring, skipped: %s", it->asStringRef().c_str());
continue;
} }
initHost(HOST(hostsCounter), it->asStringRef());
++hostsCounter; // up the hosts counter
}
WARN_MSG("Load balancer activating. Balancing between %lu nodes.", (unsigned long)hostsCounter);
conf.serveThreadedSocket(handleRequest); conf.serveThreadedSocket(handleRequest);
if (!conf.is_active){ if (!conf.is_active){
@ -514,12 +658,35 @@ int main(int argc, char ** argv){
}else{ }else{
WARN_MSG("Load balancer shutting down; socket problem"); WARN_MSG("Load balancer shutting down; socket problem");
} }
conf.is_active = false;
if (threads.size()){ // Join all threads
for (std::map<std::string, tthread::thread *>::iterator it = threads.begin(); it != threads.end(); ++it){ for (HOSTLOOP){cleanupHost(HOST(i));}
it->second->join();
}
} }
void initHost(hostEntry &H, const std::string &N){
// Cancel if this host has no name set
if (!N.size()){return;}
H.state = STATE_BOOT;
H.details = new hostDetails();
memcpy(H.name, N.data(), N.size());
H.thread = new tthread::thread(handleServer, (void *)&H);
INFO_MSG("Starting monitoring %s", H.name);
}
void cleanupHost(hostEntry &H){
// Cancel if this host has no name set
if (!H.name[0]){return;}
H.state = STATE_GODOWN;
INFO_MSG("Stopping monitoring %s", H.name);
// Clean up thread
H.thread->join();
delete H.thread;
H.thread = 0;
// Clean up details
delete H.details;
H.details = 0;
memset(H.name, 0, sizeof(H.name));
H.state = STATE_OFF;
} }