Geo IP based load balancing (requires external coords)

This commit is contained in:
Thulinma 2017-12-19 00:05:31 +01:00
parent 75cefe9956
commit 3e303e0320

View file

@ -1,5 +1,6 @@
#include <stdint.h>
#include <cstdlib>
#include <cmath>
#include <fstream>
#include <iostream>
#include <mist/config.h>
@ -19,6 +20,7 @@ bool localMode = false;
unsigned int weight_cpu = 500;
unsigned int weight_ram = 500;
unsigned int weight_bw = 1000;
unsigned int weight_geo = 1000;
unsigned int weight_bonus = 50;
unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesses.
#define HOSTLOOP \
@ -61,6 +63,17 @@ class outUrl{
}
};
inline double toRad(double degree){
return degree / 57.29577951308232087684;
}
double geoDist(double lat1, double long1, double lat2, double long2){
double dist;
dist = sin(toRad(lat1)) * sin(toRad(lat2)) +
cos(toRad(lat1)) * cos(toRad(lat2)) * cos(toRad(long1 - long2));
return .31830988618379067153 * acos(dist);
}
class hostDetails{
private:
tthread::mutex *hostMutex;
@ -81,6 +94,9 @@ private:
public:
std::string host;
unsigned long long availBandwidth;
JSON::Value geoDetails;
double servLati, servLongi;
std::string servLoc;
hostDetails(){
hostMutex = 0;
cpu = 1000;
@ -93,6 +109,8 @@ public:
prevTime = 0;
total = 0;
addBandwidth = 0;
servLati = 0;
servLongi = 0;
availBandwidth = 128 * 1024 * 1024; // assume 1G connections
}
~hostDetails(){
@ -129,6 +147,11 @@ public:
r["streams"] = (long long)streams.size();
r["viewers"] = (long long)total;
r["bwlimit"] = (long long)availBandwidth;
if (servLati || servLongi){
r["geo"]["lat"] = servLati;
r["geo"]["lon"] = servLongi;
r["geo"]["loc"] = servLoc;
}
if (ramMax && availBandwidth){
r["score"]["cpu"] = (long long)(weight_cpu - (cpu * weight_cpu) / 1000);
r["score"]["ram"] = (long long)(weight_ram - ((ramCurr * weight_ram) / ramMax));
@ -147,7 +170,7 @@ public:
}
/// Scores a potential new connection to this server
/// 0 means not possible, the higher the better.
unsigned int rate(std::string &s){
unsigned int rate(std::string &s, double lati = 0, double longi = 0){
if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
if (!ramMax || !availBandwidth){
@ -167,16 +190,20 @@ public:
unsigned int cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000);
unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
unsigned int score = cpu_score + ram_score + bw_score + (streams.count(s) ? weight_bonus : 0);
unsigned int geo_score = 0;
if (servLati && servLongi && lati && longi){
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
}
unsigned int score = cpu_score + ram_score + bw_score + geo_score + (streams.count(s) ? weight_bonus : 0);
// Print info on host
MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(),
MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s), Geo %u -> %u", host.c_str(),
cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
availBandwidth / 1024 / 1024, score);
availBandwidth / 1024 / 1024, geo_score, score);
return score;
}
/// Scores this server as a source
/// 0 means not possible, the higher the better.
unsigned int source(std::string &s){
unsigned int source(std::string &s, double lati = 0, double longi = 0){
if (!hostMutex){hostMutex = new tthread::mutex();}
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
if (!streams.count(s) || !streams[s].inputs){return 0;}
@ -193,11 +220,15 @@ public:
unsigned int cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000);
unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
unsigned int score = cpu_score + ram_score + bw_score + 1;
unsigned int geo_score = 0;
if (servLati && servLongi && lati && longi){
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
}
unsigned int score = cpu_score + ram_score + bw_score + geo_score + 1;
// Print info on host
MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(),
MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s), Geo %u -> %u", host.c_str(),
cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
availBandwidth / 1024 / 1024, score);
availBandwidth / 1024 / 1024, geo_score, score);
return score;
}
std::string getUrl(std::string &s, std::string &proto){
@ -353,10 +384,12 @@ int handleRequest(Socket::Connection &conn){
if (newVals.isMember("cpu")){weight_cpu = newVals["cpu"].asInt();}
if (newVals.isMember("ram")){weight_ram = newVals["ram"].asInt();}
if (newVals.isMember("bw")){weight_bw = newVals["bw"].asInt();}
if (newVals.isMember("geo")){weight_geo = newVals["geo"].asInt();}
if (newVals.isMember("bonus")){weight_bonus = newVals["bonus"].asInt();}
ret["cpu"] = weight_cpu;
ret["ram"] = weight_ram;
ret["bw"] = weight_bw;
ret["geo"] = weight_geo;
ret["bonus"] = weight_bonus;
H.SetBody(ret.toString());
H.SendResponse("200", "OK", conn);
@ -491,6 +524,22 @@ int handleRequest(Socket::Connection &conn){
std::string stream = H.url.substr(1);
std::string proto = H.GetVar("proto");
H.SetVar("proto", "");
double lat = 0;
double lon = 0;
if (H.GetVar("lat") != ""){
lat = atof(H.GetVar("lat").c_str());
H.SetVar("lat", "");
}
if (H.GetVar("lon") != ""){
lon = atof(H.GetVar("lon").c_str());
H.SetVar("lon", "");
}
if (H.hasHeader("X-Latitude")){
lat = atof(H.GetHeader("X-Latitude").c_str());
}
if (H.hasHeader("X-Longitude")){
lon = atof(H.GetHeader("X-Longitude").c_str());
}
std::string vars = H.allVars();
if (stream == "favicon.ico"){
H.Clean();
@ -505,7 +554,7 @@ int handleRequest(Socket::Connection &conn){
unsigned int bestScore = 0;
for (HOSTLOOP){
HOSTCHECK;
unsigned int score = HOST(i).details->rate(stream);
unsigned int score = HOST(i).details->rate(stream, lat, lon);
if (score > bestScore){
bestHost = &HOST(i);
bestScore = score;
@ -556,6 +605,31 @@ void handleServer(void *hostEntryPointer){
bool down = true;
HTTP::Downloader DL;
if (DL.get(HTTP::URL("http://freegeoip.net/json/"+url.host)) && DL.isOk()){
JSON::Value &gDet = entry->details->geoDetails;
INFO_MSG("Location: %s", DL.data().c_str());
gDet = JSON::fromString(DL.data());
INFO_MSG("Location: %s", gDet.toString().c_str());
if (gDet && gDet.isMember("latitude") && gDet.isMember("longitude")){
entry->details->servLati = gDet["latitude"].asDouble();
entry->details->servLongi = gDet["longitude"].asDouble();
std::stringstream loc;
if (gDet.isMember("country_name")){
if (gDet.isMember("city") && gDet["city"].asString().size()){
entry->details->servLoc = gDet["city"].asString() + ", " + gDet["country_name"].asString();
}else{
entry->details->servLoc = gDet["country_name"].asString();
}
INFO_MSG("%s is in %s", url.host.c_str(), entry->details->servLoc.c_str());
}else{
INFO_MSG("%s is at %f, %f", url.host.c_str(), entry->details->servLati, entry->details->servLongi);
}
}
}else{
WARN_MSG("Could not reach location server for %s", url.host.c_str());
}
while (cfg->is_active && (entry->state != STATE_GODOWN)){
if (DL.get(url) && DL.isOk()){
JSON::Value servData = JSON::fromString(DL.data());
@ -656,6 +730,13 @@ int main(int argc, char **argv){
opt["value"].append((long long)weight_bw);
conf.addOption("bw", opt);
opt["arg"] = "integer";
opt["short"] = "G";
opt["long"] = "geo";
opt["help"] = "Weight for geo scoring";
opt["value"].append((long long)weight_geo);
conf.addOption("geo", opt);
opt["arg"] = "integer";
opt["short"] = "X";
opt["long"] = "extra";
@ -675,6 +756,7 @@ int main(int argc, char **argv){
weight_ram = conf.getInteger("ram");
weight_cpu = conf.getInteger("cpu");
weight_bw = conf.getInteger("bw");
weight_geo = conf.getInteger("geo");
weight_bonus = conf.getInteger("extra");
fallback = conf.getString("fallback");
localMode = conf.getBool("localmode");