diff --git a/src/utils/util_load.cpp b/src/utils/util_load.cpp index fdd3445c..3a49234c 100644 --- a/src/utils/util_load.cpp +++ b/src/utils/util_load.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -19,6 +20,7 @@ bool localMode = false; unsigned int weight_cpu = 500; unsigned int weight_ram = 500; unsigned int weight_bw = 1000; +unsigned int weight_geo = 1000; unsigned int weight_bonus = 50; unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesses. #define HOSTLOOP \ @@ -61,6 +63,17 @@ class outUrl{ } }; +inline double toRad(double degree){ + return degree / 57.29577951308232087684; +} + +double geoDist(double lat1, double long1, double lat2, double long2){ + double dist; + dist = sin(toRad(lat1)) * sin(toRad(lat2)) + + cos(toRad(lat1)) * cos(toRad(lat2)) * cos(toRad(long1 - long2)); + return .31830988618379067153 * acos(dist); +} + class hostDetails{ private: tthread::mutex *hostMutex; @@ -81,6 +94,9 @@ private: public: std::string host; unsigned long long availBandwidth; + JSON::Value geoDetails; + double servLati, servLongi; + std::string servLoc; hostDetails(){ hostMutex = 0; cpu = 1000; @@ -93,6 +109,8 @@ public: prevTime = 0; total = 0; addBandwidth = 0; + servLati = 0; + servLongi = 0; availBandwidth = 128 * 1024 * 1024; // assume 1G connections } ~hostDetails(){ @@ -129,6 +147,11 @@ public: r["streams"] = (long long)streams.size(); r["viewers"] = (long long)total; r["bwlimit"] = (long long)availBandwidth; + if (servLati || servLongi){ + r["geo"]["lat"] = servLati; + r["geo"]["lon"] = servLongi; + r["geo"]["loc"] = servLoc; + } if (ramMax && availBandwidth){ r["score"]["cpu"] = (long long)(weight_cpu - (cpu * weight_cpu) / 1000); r["score"]["ram"] = (long long)(weight_ram - ((ramCurr * weight_ram) / ramMax)); @@ -147,7 +170,7 @@ public: } /// Scores a potential new connection to this server /// 0 means not possible, the higher the better. - unsigned int rate(std::string &s){ + unsigned int rate(std::string &s, double lati = 0, double longi = 0){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); if (!ramMax || !availBandwidth){ @@ -167,16 +190,20 @@ public: unsigned int cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000); unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax)); unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); - unsigned int score = cpu_score + ram_score + bw_score + (streams.count(s) ? weight_bonus : 0); + unsigned int geo_score = 0; + if (servLati && servLongi && lati && longi){ + geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi); + } + unsigned int score = cpu_score + ram_score + bw_score + geo_score + (streams.count(s) ? weight_bonus : 0); // Print info on host - MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), + MEDIUM_MSG("%s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s), Geo %u -> %u", host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score, - availBandwidth / 1024 / 1024, score); + availBandwidth / 1024 / 1024, geo_score, score); return score; } /// Scores this server as a source /// 0 means not possible, the higher the better. - unsigned int source(std::string &s){ + unsigned int source(std::string &s, double lati = 0, double longi = 0){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); if (!streams.count(s) || !streams[s].inputs){return 0;} @@ -193,11 +220,15 @@ public: unsigned int cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000); unsigned int ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax)); unsigned int bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); - unsigned int score = cpu_score + ram_score + bw_score + 1; + unsigned int geo_score = 0; + if (servLati && servLongi && lati && longi){ + geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi); + } + unsigned int score = cpu_score + ram_score + bw_score + geo_score + 1; // Print info on host - MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s) -> %u", host.c_str(), + MEDIUM_MSG("SOURCE %s: CPU %u, RAM %u, Stream %u, BW %u (max %llu MB/s), Geo %u -> %u", host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score, - availBandwidth / 1024 / 1024, score); + availBandwidth / 1024 / 1024, geo_score, score); return score; } std::string getUrl(std::string &s, std::string &proto){ @@ -353,10 +384,12 @@ int handleRequest(Socket::Connection &conn){ if (newVals.isMember("cpu")){weight_cpu = newVals["cpu"].asInt();} if (newVals.isMember("ram")){weight_ram = newVals["ram"].asInt();} if (newVals.isMember("bw")){weight_bw = newVals["bw"].asInt();} + if (newVals.isMember("geo")){weight_geo = newVals["geo"].asInt();} if (newVals.isMember("bonus")){weight_bonus = newVals["bonus"].asInt();} ret["cpu"] = weight_cpu; ret["ram"] = weight_ram; ret["bw"] = weight_bw; + ret["geo"] = weight_geo; ret["bonus"] = weight_bonus; H.SetBody(ret.toString()); H.SendResponse("200", "OK", conn); @@ -491,6 +524,22 @@ int handleRequest(Socket::Connection &conn){ std::string stream = H.url.substr(1); std::string proto = H.GetVar("proto"); H.SetVar("proto", ""); + double lat = 0; + double lon = 0; + if (H.GetVar("lat") != ""){ + lat = atof(H.GetVar("lat").c_str()); + H.SetVar("lat", ""); + } + if (H.GetVar("lon") != ""){ + lon = atof(H.GetVar("lon").c_str()); + H.SetVar("lon", ""); + } + if (H.hasHeader("X-Latitude")){ + lat = atof(H.GetHeader("X-Latitude").c_str()); + } + if (H.hasHeader("X-Longitude")){ + lon = atof(H.GetHeader("X-Longitude").c_str()); + } std::string vars = H.allVars(); if (stream == "favicon.ico"){ H.Clean(); @@ -505,7 +554,7 @@ int handleRequest(Socket::Connection &conn){ unsigned int bestScore = 0; for (HOSTLOOP){ HOSTCHECK; - unsigned int score = HOST(i).details->rate(stream); + unsigned int score = HOST(i).details->rate(stream, lat, lon); if (score > bestScore){ bestHost = &HOST(i); bestScore = score; @@ -556,6 +605,31 @@ void handleServer(void *hostEntryPointer){ bool down = true; HTTP::Downloader DL; + + if (DL.get(HTTP::URL("http://freegeoip.net/json/"+url.host)) && DL.isOk()){ + JSON::Value &gDet = entry->details->geoDetails; + INFO_MSG("Location: %s", DL.data().c_str()); + gDet = JSON::fromString(DL.data()); + INFO_MSG("Location: %s", gDet.toString().c_str()); + if (gDet && gDet.isMember("latitude") && gDet.isMember("longitude")){ + entry->details->servLati = gDet["latitude"].asDouble(); + entry->details->servLongi = gDet["longitude"].asDouble(); + std::stringstream loc; + if (gDet.isMember("country_name")){ + if (gDet.isMember("city") && gDet["city"].asString().size()){ + entry->details->servLoc = gDet["city"].asString() + ", " + gDet["country_name"].asString(); + }else{ + entry->details->servLoc = gDet["country_name"].asString(); + } + INFO_MSG("%s is in %s", url.host.c_str(), entry->details->servLoc.c_str()); + }else{ + INFO_MSG("%s is at %f, %f", url.host.c_str(), entry->details->servLati, entry->details->servLongi); + } + } + }else{ + WARN_MSG("Could not reach location server for %s", url.host.c_str()); + } + while (cfg->is_active && (entry->state != STATE_GODOWN)){ if (DL.get(url) && DL.isOk()){ JSON::Value servData = JSON::fromString(DL.data()); @@ -656,6 +730,13 @@ int main(int argc, char **argv){ opt["value"].append((long long)weight_bw); conf.addOption("bw", opt); + opt["arg"] = "integer"; + opt["short"] = "G"; + opt["long"] = "geo"; + opt["help"] = "Weight for geo scoring"; + opt["value"].append((long long)weight_geo); + conf.addOption("geo", opt); + opt["arg"] = "integer"; opt["short"] = "X"; opt["long"] = "extra"; @@ -675,6 +756,7 @@ int main(int argc, char **argv){ weight_ram = conf.getInteger("ram"); weight_cpu = conf.getInteger("cpu"); weight_bw = conf.getInteger("bw"); + weight_geo = conf.getInteger("geo"); weight_bonus = conf.getInteger("extra"); fallback = conf.getString("fallback"); localMode = conf.getBool("localmode");