Load balancer:
- Tagging system + tweaks to display of offline servers in overviews - Fixed stream name decode + tag_adjust hiding - Added ?ingest=PERCENT to get best ingest point assuming "PERCENT" CPU usage is needed to ingest - Fixed name length bug - Fixed multi-add bug - Improved shutdown speed - Fixed add/remove race condition issues - Robustify
This commit is contained in:
parent
e884dc0c32
commit
4af7f3e9ed
1 changed files with 172 additions and 22 deletions
|
@ -16,12 +16,14 @@ Util::Config *cfg = 0;
|
||||||
std::string passphrase;
|
std::string passphrase;
|
||||||
std::string fallback;
|
std::string fallback;
|
||||||
bool localMode = false;
|
bool localMode = false;
|
||||||
|
tthread::mutex globalMutex;
|
||||||
|
|
||||||
size_t weight_cpu = 500;
|
size_t weight_cpu = 500;
|
||||||
size_t weight_ram = 500;
|
size_t weight_ram = 500;
|
||||||
size_t weight_bw = 1000;
|
size_t weight_bw = 1000;
|
||||||
size_t weight_geo = 1000;
|
size_t weight_geo = 1000;
|
||||||
size_t weight_bonus = 50;
|
size_t weight_bonus = 50;
|
||||||
|
std::map<std::string, int32_t> blankTags;
|
||||||
unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesses.
|
unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesses.
|
||||||
#define HOSTLOOP \
|
#define HOSTLOOP \
|
||||||
unsigned long i = 0; \
|
unsigned long i = 0; \
|
||||||
|
@ -40,6 +42,8 @@ unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesse
|
||||||
const char *stateLookup[] ={"Offline", "Starting monitoring",
|
const char *stateLookup[] ={"Offline", "Starting monitoring",
|
||||||
"Monitored (error)", "Monitored (online)",
|
"Monitored (error)", "Monitored (online)",
|
||||||
"Requesting stop", "Requesting clean"};
|
"Requesting stop", "Requesting clean"};
|
||||||
|
#define HOSTNAMELEN 1024
|
||||||
|
#define MAXHOSTS 1000
|
||||||
|
|
||||||
struct streamDetails{
|
struct streamDetails{
|
||||||
uint64_t total;
|
uint64_t total;
|
||||||
|
@ -75,11 +79,34 @@ double geoDist(double lat1, double long1, double lat2, double long2){
|
||||||
return .31830988618379067153 * acos(dist);
|
return .31830988618379067153 * acos(dist);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t applyAdjustment(const std::set<std::string> & tags, const std::string & match, int32_t adj){
|
||||||
|
if (!match.size()){return 0;}
|
||||||
|
bool invert = false;
|
||||||
|
bool haveOne = false;
|
||||||
|
size_t prevPos = 0;
|
||||||
|
if (match[0] == '-'){
|
||||||
|
invert = true;
|
||||||
|
prevPos = 1;
|
||||||
|
}
|
||||||
|
//Check if any matches inside tags
|
||||||
|
size_t currPos = match.find(',', prevPos);
|
||||||
|
while (currPos != std::string::npos){
|
||||||
|
if (tags.count(match.substr(prevPos, currPos-prevPos))){haveOne = true;}
|
||||||
|
prevPos = currPos + 1;
|
||||||
|
currPos = match.find(',', prevPos);
|
||||||
|
}
|
||||||
|
if (tags.count(match.substr(prevPos))){haveOne = true;}
|
||||||
|
//If we have any match, apply adj, unless we're doing an inverted search, then return adj on zero matches
|
||||||
|
if (haveOne == !invert){return adj;}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
class hostDetails{
|
class hostDetails{
|
||||||
private:
|
private:
|
||||||
tthread::mutex *hostMutex;
|
tthread::mutex *hostMutex;
|
||||||
std::map<std::string, struct streamDetails> streams;
|
std::map<std::string, struct streamDetails> streams;
|
||||||
std::set<std::string> conf_streams;
|
std::set<std::string> conf_streams;
|
||||||
|
std::set<std::string> tags;
|
||||||
std::map<std::string, outUrl> outputs;
|
std::map<std::string, outUrl> outputs;
|
||||||
uint64_t cpu;
|
uint64_t cpu;
|
||||||
uint64_t ramMax;
|
uint64_t ramMax;
|
||||||
|
@ -151,6 +178,11 @@ public:
|
||||||
r["geo"]["lon"] = servLongi;
|
r["geo"]["lon"] = servLongi;
|
||||||
r["geo"]["loc"] = servLoc;
|
r["geo"]["loc"] = servLoc;
|
||||||
}
|
}
|
||||||
|
if (tags.size()){
|
||||||
|
for (std::set<std::string>::iterator it = tags.begin(); it != tags.end(); ++it){
|
||||||
|
r["tags"].append(*it);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (ramMax && availBandwidth){
|
if (ramMax && availBandwidth){
|
||||||
r["score"]["cpu"] = (uint64_t)(weight_cpu - (cpu * weight_cpu) / 1000);
|
r["score"]["cpu"] = (uint64_t)(weight_cpu - (cpu * weight_cpu) / 1000);
|
||||||
r["score"]["ram"] = (uint64_t)(weight_ram - ((ramCurr * weight_ram) / ramMax));
|
r["score"]["ram"] = (uint64_t)(weight_ram - ((ramCurr * weight_ram) / ramMax));
|
||||||
|
@ -195,7 +227,7 @@ public:
|
||||||
}
|
}
|
||||||
/// Scores a potential new connection to this server
|
/// Scores a potential new connection to this server
|
||||||
/// 0 means not possible, the higher the better.
|
/// 0 means not possible, the higher the better.
|
||||||
uint64_t rate(std::string &s, double lati = 0, double longi = 0){
|
uint64_t rate(std::string &s, double lati = 0, double longi = 0, const std::map<std::string, int32_t> &tagAdjust = blankTags){
|
||||||
if (!hostMutex){hostMutex = new tthread::mutex();}
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
||||||
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
||||||
if (!ramMax || !availBandwidth){
|
if (!ramMax || !availBandwidth){
|
||||||
|
@ -221,19 +253,30 @@ public:
|
||||||
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
|
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
|
||||||
}
|
}
|
||||||
uint64_t score = cpu_score + ram_score + bw_score + geo_score + (streams.count(s) ? weight_bonus : 0);
|
uint64_t score = cpu_score + ram_score + bw_score + geo_score + (streams.count(s) ? weight_bonus : 0);
|
||||||
|
int64_t adjustment = 0;
|
||||||
|
if (tagAdjust.size()){
|
||||||
|
for (std::map<std::string, int32_t>::const_iterator it = tagAdjust.begin(); it != tagAdjust.end(); ++it){
|
||||||
|
adjustment += applyAdjustment(tags, it->first, it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (adjustment >= 0 || -adjustment < score){
|
||||||
|
score += adjustment;
|
||||||
|
}else{
|
||||||
|
score = 0;
|
||||||
|
}
|
||||||
// Print info on host
|
// Print info on host
|
||||||
MEDIUM_MSG("%s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64
|
MEDIUM_MSG("%s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64
|
||||||
" (max %" PRIu64 " MB/s), Geo %" PRIu64 " -> %" PRIu64,
|
" (max %" PRIu64 " MB/s), Geo %" PRIu64 ", tag adjustment %" PRId64 " -> %" PRIu64,
|
||||||
host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
|
host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
|
||||||
availBandwidth / 1024 / 1024, geo_score, score);
|
availBandwidth / 1024 / 1024, geo_score, adjustment, score);
|
||||||
return score;
|
return score;
|
||||||
}
|
}
|
||||||
/// Scores this server as a source
|
/// Scores this server as a source
|
||||||
/// 0 means not possible, the higher the better.
|
/// 0 means not possible, the higher the better.
|
||||||
uint64_t source(std::string &s, double lati = 0, double longi = 0){
|
uint64_t source(const std::string &s, double lati, double longi, const std::map<std::string, int32_t> &tagAdjust, uint32_t minCpu){
|
||||||
if (!hostMutex){hostMutex = new tthread::mutex();}
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
||||||
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
||||||
if (!streams.count(s) || !streams[s].inputs){return 0;}
|
if (s.size() && (!streams.count(s) || !streams[s].inputs)){return 0;}
|
||||||
if (!ramMax || !availBandwidth){
|
if (!ramMax || !availBandwidth){
|
||||||
WARN_MSG("Host %s invalid: RAM %" PRIu64 ", BW %" PRIu64, host.c_str(), ramMax, availBandwidth);
|
WARN_MSG("Host %s invalid: RAM %" PRIu64 ", BW %" PRIu64, host.c_str(), ramMax, availBandwidth);
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -244,6 +287,7 @@ public:
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
// Calculate score
|
// Calculate score
|
||||||
|
if (minCpu && cpu + minCpu >= 1000){return 0;}
|
||||||
uint64_t cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000);
|
uint64_t cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000);
|
||||||
uint64_t ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
|
uint64_t ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
|
||||||
uint64_t bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
uint64_t bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
||||||
|
@ -252,11 +296,22 @@ public:
|
||||||
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
|
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
|
||||||
}
|
}
|
||||||
uint64_t score = cpu_score + ram_score + bw_score + geo_score + 1;
|
uint64_t score = cpu_score + ram_score + bw_score + geo_score + 1;
|
||||||
|
int64_t adjustment = 0;
|
||||||
|
if (tagAdjust.size()){
|
||||||
|
for (std::map<std::string, int32_t>::const_iterator it = tagAdjust.begin(); it != tagAdjust.end(); ++it){
|
||||||
|
adjustment += applyAdjustment(tags, it->first, it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (adjustment >= 0 || -adjustment < score){
|
||||||
|
score += adjustment;
|
||||||
|
}else{
|
||||||
|
score = 0;
|
||||||
|
}
|
||||||
// Print info on host
|
// Print info on host
|
||||||
MEDIUM_MSG("SOURCE %s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64
|
MEDIUM_MSG("SOURCE %s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64
|
||||||
" (max %" PRIu64 " MB/s), Geo %" PRIu64 " -> %" PRIu64,
|
" (max %" PRIu64 " MB/s), Geo %" PRIu64 ", tag adjustment %" PRId64 " -> %" PRIu64,
|
||||||
host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
|
host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
|
||||||
availBandwidth / 1024 / 1024, geo_score, score);
|
availBandwidth / 1024 / 1024, geo_score, adjustment, score);
|
||||||
return score;
|
return score;
|
||||||
}
|
}
|
||||||
std::string getUrl(std::string &s, std::string &proto){
|
std::string getUrl(std::string &s, std::string &proto){
|
||||||
|
@ -298,6 +353,14 @@ public:
|
||||||
int64_t nRamCur = d["mem_used"].asInt();
|
int64_t nRamCur = d["mem_used"].asInt();
|
||||||
int64_t nShmMax = d["shm_total"].asInt();
|
int64_t nShmMax = d["shm_total"].asInt();
|
||||||
int64_t nShmCur = d["shm_used"].asInt();
|
int64_t nShmCur = d["shm_used"].asInt();
|
||||||
|
if (d.isMember("tags") && d["tags"].isArray()){
|
||||||
|
std::set<std::string> newTags;
|
||||||
|
jsonForEach(d["tags"], tag){
|
||||||
|
std::string t = tag->asString();
|
||||||
|
if (t.size()){newTags.insert(t);}
|
||||||
|
}
|
||||||
|
if (newTags != tags){tags = newTags;}
|
||||||
|
}
|
||||||
if (!nRamMax){nRamMax = 1;}
|
if (!nRamMax){nRamMax = 1;}
|
||||||
if (!nShmMax){nShmMax = 1;}
|
if (!nShmMax){nShmMax = 1;}
|
||||||
if (((nRamCur + nShmCur) * 1000) / nRamMax > (nShmCur * 1000) / nShmMax){
|
if (((nRamCur + nShmCur) * 1000) / nRamMax > (nShmCur * 1000) / nShmMax){
|
||||||
|
@ -373,16 +436,24 @@ public:
|
||||||
/// Fixed-size struct for holding a host's name and details pointer
|
/// Fixed-size struct for holding a host's name and details pointer
|
||||||
struct hostEntry{
|
struct hostEntry{
|
||||||
uint8_t state; // 0 = off, 1 = booting, 2 = running, 3 = requesting shutdown, 4 = requesting clean
|
uint8_t state; // 0 = off, 1 = booting, 2 = running, 3 = requesting shutdown, 4 = requesting clean
|
||||||
char name[200]; // 200 chars should be enough
|
char name[HOSTNAMELEN]; // host+port for server
|
||||||
hostDetails *details; /// hostDetails pointer
|
hostDetails *details; /// hostDetails pointer
|
||||||
tthread::thread *thread; /// thread pointer
|
tthread::thread *thread; /// thread pointer
|
||||||
};
|
};
|
||||||
|
|
||||||
hostEntry hosts[1000]; /// Fixed-size array holding all hosts
|
hostEntry hosts[MAXHOSTS]; /// Fixed-size array holding all hosts
|
||||||
|
|
||||||
void initHost(hostEntry &H, const std::string &N);
|
void initHost(hostEntry &H, const std::string &N);
|
||||||
void cleanupHost(hostEntry &H);
|
void cleanupHost(hostEntry &H);
|
||||||
|
|
||||||
|
///Fills the given map with the given JSON string of tag adjustments
|
||||||
|
void fillTagAdjust(std::map<std::string, int32_t> & tags, const std::string & adjust){
|
||||||
|
JSON::Value adj = JSON::fromString(adjust);
|
||||||
|
jsonForEach(adj, t){
|
||||||
|
tags[t.key()] = t->asInt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int handleRequest(Socket::Connection &conn){
|
int handleRequest(Socket::Connection &conn){
|
||||||
HTTP::Parser H;
|
HTTP::Parser H;
|
||||||
while (conn){
|
while (conn){
|
||||||
|
@ -401,6 +472,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
std::string streamStats = H.GetVar("streamstats");
|
std::string streamStats = H.GetVar("streamstats");
|
||||||
std::string stream = H.GetVar("stream");
|
std::string stream = H.GetVar("stream");
|
||||||
std::string source = H.GetVar("source");
|
std::string source = H.GetVar("source");
|
||||||
|
std::string ingest = H.GetVar("ingest");
|
||||||
std::string fback = H.GetVar("fallback");
|
std::string fback = H.GetVar("fallback");
|
||||||
std::string lstserver = H.GetVar("lstserver");
|
std::string lstserver = H.GetVar("lstserver");
|
||||||
std::string addserver = H.GetVar("addserver");
|
std::string addserver = H.GetVar("addserver");
|
||||||
|
@ -431,7 +503,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
// Get server list
|
// Get server list
|
||||||
if (lstserver.size()){
|
if (lstserver.size()){
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
ret[(std::string)hosts[i].name] = stateLookup[hosts[i].state];
|
ret[(std::string)hosts[i].name] = stateLookup[hosts[i].state];
|
||||||
}
|
}
|
||||||
H.SetBody(ret.toPrettyString());
|
H.SetBody(ret.toPrettyString());
|
||||||
|
@ -442,9 +514,10 @@ int handleRequest(Socket::Connection &conn){
|
||||||
}
|
}
|
||||||
// Remove server from list
|
// Remove server from list
|
||||||
if (delserver.size()){
|
if (delserver.size()){
|
||||||
|
tthread::lock_guard<tthread::mutex> globGuard(globalMutex);
|
||||||
ret = "Server not monitored - could not delete from monitored server list!";
|
ret = "Server not monitored - could not delete from monitored server list!";
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
if ((std::string)hosts[i].name == delserver){
|
if ((std::string)hosts[i].name == delserver){
|
||||||
cleanupHost(hosts[i]);
|
cleanupHost(hosts[i]);
|
||||||
ret = stateLookup[hosts[i].state];
|
ret = stateLookup[hosts[i].state];
|
||||||
|
@ -458,7 +531,8 @@ int handleRequest(Socket::Connection &conn){
|
||||||
}
|
}
|
||||||
// Add server to list
|
// Add server to list
|
||||||
if (addserver.size()){
|
if (addserver.size()){
|
||||||
if (addserver.size() > 199){
|
tthread::lock_guard<tthread::mutex> globGuard(globalMutex);
|
||||||
|
if (addserver.size() >= HOSTNAMELEN){
|
||||||
H.SetBody("Host length too long for monitoring");
|
H.SetBody("Host length too long for monitoring");
|
||||||
H.setCORSHeaders();
|
H.setCORSHeaders();
|
||||||
H.SendResponse("200", "OK", conn);
|
H.SendResponse("200", "OK", conn);
|
||||||
|
@ -468,7 +542,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
bool stop = false;
|
bool stop = false;
|
||||||
hostEntry *newEntry = 0;
|
hostEntry *newEntry = 0;
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
if ((std::string)hosts[i].name == addserver){
|
if ((std::string)hosts[i].name == addserver){
|
||||||
stop = true;
|
stop = true;
|
||||||
break;
|
break;
|
||||||
|
@ -482,6 +556,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
initHost(hosts[i], addserver);
|
initHost(hosts[i], addserver);
|
||||||
newEntry = &(hosts[i]);
|
newEntry = &(hosts[i]);
|
||||||
stop = true;
|
stop = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!stop){
|
if (!stop){
|
||||||
|
@ -500,7 +575,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
// Request viewer count
|
// Request viewer count
|
||||||
if (viewers.size()){
|
if (viewers.size()){
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
HOST(i).details->fillStreams(ret);
|
HOST(i).details->fillStreams(ret);
|
||||||
}
|
}
|
||||||
H.SetBody(ret.toPrettyString());
|
H.SetBody(ret.toPrettyString());
|
||||||
|
@ -512,7 +587,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
// Request full stream statistics
|
// Request full stream statistics
|
||||||
if (streamStats.size()){
|
if (streamStats.size()){
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
HOST(i).details->fillStreamStats(streamStats, ret);
|
HOST(i).details->fillStreamStats(streamStats, ret);
|
||||||
}
|
}
|
||||||
H.SetBody(ret.toPrettyString());
|
H.SetBody(ret.toPrettyString());
|
||||||
|
@ -524,7 +599,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
if (stream.size()){
|
if (stream.size()){
|
||||||
uint64_t count = 0;
|
uint64_t count = 0;
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
count += HOST(i).details->getViewers(stream);
|
count += HOST(i).details->getViewers(stream);
|
||||||
}
|
}
|
||||||
H.SetBody(JSON::Value(count).asString());
|
H.SetBody(JSON::Value(count).asString());
|
||||||
|
@ -537,6 +612,21 @@ int handleRequest(Socket::Connection &conn){
|
||||||
if (source.size()){
|
if (source.size()){
|
||||||
INFO_MSG("Finding source for stream %s", source.c_str());
|
INFO_MSG("Finding source for stream %s", source.c_str());
|
||||||
std::string bestHost = "";
|
std::string bestHost = "";
|
||||||
|
std::map<std::string, int32_t> tagAdjust;
|
||||||
|
if (H.GetVar("tag_adjust") != ""){fillTagAdjust(tagAdjust, H.GetVar("tag_adjust"));}
|
||||||
|
if (H.hasHeader("X-Tag-Adjust")){fillTagAdjust(tagAdjust, H.GetHeader("X-Tag-Adjust"));}
|
||||||
|
double lat = 0;
|
||||||
|
double lon = 0;
|
||||||
|
if (H.GetVar("lat") != ""){
|
||||||
|
lat = atof(H.GetVar("lat").c_str());
|
||||||
|
H.SetVar("lat", "");
|
||||||
|
}
|
||||||
|
if (H.GetVar("lon") != ""){
|
||||||
|
lon = atof(H.GetVar("lon").c_str());
|
||||||
|
H.SetVar("lon", "");
|
||||||
|
}
|
||||||
|
if (H.hasHeader("X-Latitude")){lat = atof(H.GetHeader("X-Latitude").c_str());}
|
||||||
|
if (H.hasHeader("X-Longitude")){lon = atof(H.GetHeader("X-Longitude").c_str());}
|
||||||
uint64_t bestScore = 0;
|
uint64_t bestScore = 0;
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
HOSTCHECK;
|
||||||
|
@ -544,7 +634,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
INFO_MSG("Ignoring same-host entry %s", HOST(i).details->host.data());
|
INFO_MSG("Ignoring same-host entry %s", HOST(i).details->host.data());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
uint64_t score = HOST(i).details->source(source);
|
uint64_t score = HOST(i).details->source(source, lat, lon, tagAdjust, 0);
|
||||||
if (score > bestScore){
|
if (score > bestScore){
|
||||||
bestHost = "dtsc://" + HOST(i).details->host;
|
bestHost = "dtsc://" + HOST(i).details->host;
|
||||||
bestScore = score;
|
bestScore = score;
|
||||||
|
@ -566,16 +656,65 @@ int handleRequest(Socket::Connection &conn){
|
||||||
H.Clean();
|
H.Clean();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
// Find optimal ingest point
|
||||||
|
if (ingest.size()){
|
||||||
|
double cpuUse = atof(ingest.c_str());
|
||||||
|
INFO_MSG("Finding ingest point for CPU usage %.2f", cpuUse);
|
||||||
|
std::string bestHost = "";
|
||||||
|
std::map<std::string, int32_t> tagAdjust;
|
||||||
|
if (H.GetVar("tag_adjust") != ""){fillTagAdjust(tagAdjust, H.GetVar("tag_adjust"));}
|
||||||
|
if (H.hasHeader("X-Tag-Adjust")){fillTagAdjust(tagAdjust, H.GetHeader("X-Tag-Adjust"));}
|
||||||
|
double lat = 0;
|
||||||
|
double lon = 0;
|
||||||
|
if (H.GetVar("lat") != ""){
|
||||||
|
lat = atof(H.GetVar("lat").c_str());
|
||||||
|
H.SetVar("lat", "");
|
||||||
|
}
|
||||||
|
if (H.GetVar("lon") != ""){
|
||||||
|
lon = atof(H.GetVar("lon").c_str());
|
||||||
|
H.SetVar("lon", "");
|
||||||
|
}
|
||||||
|
if (H.hasHeader("X-Latitude")){lat = atof(H.GetHeader("X-Latitude").c_str());}
|
||||||
|
if (H.hasHeader("X-Longitude")){lon = atof(H.GetHeader("X-Longitude").c_str());}
|
||||||
|
uint64_t bestScore = 0;
|
||||||
|
for (HOSTLOOP){
|
||||||
|
HOSTCHECK;
|
||||||
|
uint64_t score = HOST(i).details->source("", lat, lon, tagAdjust, cpuUse * 10);
|
||||||
|
if (score > bestScore){
|
||||||
|
bestHost = HOST(i).details->host;
|
||||||
|
bestScore = score;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (bestScore == 0){
|
||||||
|
if (fback.size()){
|
||||||
|
bestHost = fback;
|
||||||
|
}else{
|
||||||
|
bestHost = fallback;
|
||||||
|
}
|
||||||
|
FAIL_MSG("No ingest point found!");
|
||||||
|
}else{
|
||||||
|
INFO_MSG("Winner: %s scores %" PRIu64, bestHost.c_str(), bestScore);
|
||||||
|
}
|
||||||
|
H.SetBody(bestHost);
|
||||||
|
H.setCORSHeaders();
|
||||||
|
H.SendResponse("200", "OK", conn);
|
||||||
|
H.Clean();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// Find host(s) status
|
// Find host(s) status
|
||||||
if (!host.size()){
|
if (!host.size()){
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
|
ret[HOST(i).details->host] = stateLookup[hosts[i].state];
|
||||||
HOSTCHECK;
|
HOSTCHECK;
|
||||||
HOST(i).details->fillState(ret[HOST(i).details->host]);
|
HOST(i).details->fillState(ret[HOST(i).details->host]);
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
if (hosts[i].state == STATE_OFF){continue;}
|
||||||
if (HOST(i).details->host == host){
|
if (HOST(i).details->host == host){
|
||||||
|
ret = stateLookup[hosts[i].state];
|
||||||
|
HOSTCHECK;
|
||||||
HOST(i).details->fillState(ret);
|
HOST(i).details->fillState(ret);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -588,8 +727,14 @@ int handleRequest(Socket::Connection &conn){
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Balance given stream
|
// Balance given stream
|
||||||
std::string stream = H.url.substr(1);
|
std::string stream = HTTP::URL(H.url).path;
|
||||||
std::string proto = H.GetVar("proto");
|
std::string proto = H.GetVar("proto");
|
||||||
|
std::map<std::string, int32_t> tagAdjust;
|
||||||
|
if (H.GetVar("tag_adjust") != ""){
|
||||||
|
fillTagAdjust(tagAdjust, H.GetVar("tag_adjust"));
|
||||||
|
H.SetVar("tag_adjust", "");
|
||||||
|
}
|
||||||
|
if (H.hasHeader("X-Tag-Adjust")){fillTagAdjust(tagAdjust, H.GetHeader("X-Tag-Adjust"));}
|
||||||
H.SetVar("proto", "");
|
H.SetVar("proto", "");
|
||||||
double lat = 0;
|
double lat = 0;
|
||||||
double lon = 0;
|
double lon = 0;
|
||||||
|
@ -618,7 +763,7 @@ int handleRequest(Socket::Connection &conn){
|
||||||
uint64_t bestScore = 0;
|
uint64_t bestScore = 0;
|
||||||
for (HOSTLOOP){
|
for (HOSTLOOP){
|
||||||
HOSTCHECK;
|
HOSTCHECK;
|
||||||
uint64_t score = HOST(i).details->rate(stream, lat, lon);
|
uint64_t score = HOST(i).details->rate(stream, lat, lon, tagAdjust);
|
||||||
if (score > bestScore){
|
if (score > bestScore){
|
||||||
bestHost = &HOST(i);
|
bestHost = &HOST(i);
|
||||||
bestScore = score;
|
bestScore = score;
|
||||||
|
@ -706,7 +851,7 @@ void handleServer(void *hostEntryPointer){
|
||||||
|
|
||||||
int main(int argc, char **argv){
|
int main(int argc, char **argv){
|
||||||
Util::redirectLogsIfNeeded();
|
Util::redirectLogsIfNeeded();
|
||||||
memset(hosts, 0, sizeof(hosts)); // zero-fill the hosts list
|
memset(hosts, 0, sizeof(hostEntry)*MAXHOSTS); // zero-fill the hosts list
|
||||||
Util::Config conf(argv[0]);
|
Util::Config conf(argv[0]);
|
||||||
cfg = &conf;
|
cfg = &conf;
|
||||||
|
|
||||||
|
@ -828,6 +973,10 @@ int main(int argc, char **argv){
|
||||||
conf.is_active = false;
|
conf.is_active = false;
|
||||||
|
|
||||||
// Join all threads
|
// Join all threads
|
||||||
|
for (HOSTLOOP){
|
||||||
|
if (!HOST(i).name[0]){continue;}
|
||||||
|
HOST(i).state = STATE_GODOWN;
|
||||||
|
}
|
||||||
for (HOSTLOOP){cleanupHost(HOST(i));}
|
for (HOSTLOOP){cleanupHost(HOST(i));}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -836,6 +985,7 @@ void initHost(hostEntry &H, const std::string &N){
|
||||||
if (!N.size()){return;}
|
if (!N.size()){return;}
|
||||||
H.state = STATE_BOOT;
|
H.state = STATE_BOOT;
|
||||||
H.details = new hostDetails();
|
H.details = new hostDetails();
|
||||||
|
memset(H.name, 0, HOSTNAMELEN);
|
||||||
memcpy(H.name, N.data(), N.size());
|
memcpy(H.name, N.data(), N.size());
|
||||||
H.thread = new tthread::thread(handleServer, (void *)&H);
|
H.thread = new tthread::thread(handleServer, (void *)&H);
|
||||||
INFO_MSG("Starting monitoring %s", H.name);
|
INFO_MSG("Starting monitoring %s", H.name);
|
||||||
|
@ -853,6 +1003,6 @@ void cleanupHost(hostEntry &H){
|
||||||
// Clean up details
|
// Clean up details
|
||||||
delete H.details;
|
delete H.details;
|
||||||
H.details = 0;
|
H.details = 0;
|
||||||
memset(H.name, 0, sizeof(H.name));
|
memset(H.name, 0, HOSTNAMELEN);
|
||||||
H.state = STATE_OFF;
|
H.state = STATE_OFF;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue