diff --git a/src/utils/util_load.cpp b/src/utils/util_load.cpp index 9e9bb2a5..e7268bfb 100644 --- a/src/utils/util_load.cpp +++ b/src/utils/util_load.cpp @@ -16,12 +16,14 @@ Util::Config *cfg = 0; std::string passphrase; std::string fallback; bool localMode = false; +tthread::mutex globalMutex; size_t weight_cpu = 500; size_t weight_ram = 500; size_t weight_bw = 1000; size_t weight_geo = 1000; size_t weight_bonus = 50; +std::map blankTags; unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesses. #define HOSTLOOP \ unsigned long i = 0; \ @@ -40,6 +42,8 @@ unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesse const char *stateLookup[] ={"Offline", "Starting monitoring", "Monitored (error)", "Monitored (online)", "Requesting stop", "Requesting clean"}; +#define HOSTNAMELEN 1024 +#define MAXHOSTS 1000 struct streamDetails{ uint64_t total; @@ -75,11 +79,34 @@ double geoDist(double lat1, double long1, double lat2, double long2){ return .31830988618379067153 * acos(dist); } +int32_t applyAdjustment(const std::set & tags, const std::string & match, int32_t adj){ + if (!match.size()){return 0;} + bool invert = false; + bool haveOne = false; + size_t prevPos = 0; + if (match[0] == '-'){ + invert = true; + prevPos = 1; + } + //Check if any matches inside tags + size_t currPos = match.find(',', prevPos); + while (currPos != std::string::npos){ + if (tags.count(match.substr(prevPos, currPos-prevPos))){haveOne = true;} + prevPos = currPos + 1; + currPos = match.find(',', prevPos); + } + if (tags.count(match.substr(prevPos))){haveOne = true;} + //If we have any match, apply adj, unless we're doing an inverted search, then return adj on zero matches + if (haveOne == !invert){return adj;} + return 0; +} + class hostDetails{ private: tthread::mutex *hostMutex; std::map streams; std::set conf_streams; + std::set tags; std::map outputs; uint64_t cpu; uint64_t ramMax; @@ -151,6 +178,11 @@ public: r["geo"]["lon"] = servLongi; r["geo"]["loc"] = servLoc; } + if (tags.size()){ + for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ + r["tags"].append(*it); + } + } if (ramMax && availBandwidth){ r["score"]["cpu"] = (uint64_t)(weight_cpu - (cpu * weight_cpu) / 1000); r["score"]["ram"] = (uint64_t)(weight_ram - ((ramCurr * weight_ram) / ramMax)); @@ -195,7 +227,7 @@ public: } /// Scores a potential new connection to this server /// 0 means not possible, the higher the better. - uint64_t rate(std::string &s, double lati = 0, double longi = 0){ + uint64_t rate(std::string &s, double lati = 0, double longi = 0, const std::map &tagAdjust = blankTags){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); if (!ramMax || !availBandwidth){ @@ -221,19 +253,30 @@ public: geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi); } uint64_t score = cpu_score + ram_score + bw_score + geo_score + (streams.count(s) ? weight_bonus : 0); + int64_t adjustment = 0; + if (tagAdjust.size()){ + for (std::map::const_iterator it = tagAdjust.begin(); it != tagAdjust.end(); ++it){ + adjustment += applyAdjustment(tags, it->first, it->second); + } + } + if (adjustment >= 0 || -adjustment < score){ + score += adjustment; + }else{ + score = 0; + } // Print info on host MEDIUM_MSG("%s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64 - " (max %" PRIu64 " MB/s), Geo %" PRIu64 " -> %" PRIu64, + " (max %" PRIu64 " MB/s), Geo %" PRIu64 ", tag adjustment %" PRId64 " -> %" PRIu64, host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score, - availBandwidth / 1024 / 1024, geo_score, score); + availBandwidth / 1024 / 1024, geo_score, adjustment, score); return score; } /// Scores this server as a source /// 0 means not possible, the higher the better. - uint64_t source(std::string &s, double lati = 0, double longi = 0){ + uint64_t source(const std::string &s, double lati, double longi, const std::map &tagAdjust, uint32_t minCpu){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); - if (!streams.count(s) || !streams[s].inputs){return 0;} + if (s.size() && (!streams.count(s) || !streams[s].inputs)){return 0;} if (!ramMax || !availBandwidth){ WARN_MSG("Host %s invalid: RAM %" PRIu64 ", BW %" PRIu64, host.c_str(), ramMax, availBandwidth); return 1; @@ -244,6 +287,7 @@ public: return 1; } // Calculate score + if (minCpu && cpu + minCpu >= 1000){return 0;} uint64_t cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000); uint64_t ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax)); uint64_t bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth)); @@ -252,11 +296,22 @@ public: geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi); } uint64_t score = cpu_score + ram_score + bw_score + geo_score + 1; + int64_t adjustment = 0; + if (tagAdjust.size()){ + for (std::map::const_iterator it = tagAdjust.begin(); it != tagAdjust.end(); ++it){ + adjustment += applyAdjustment(tags, it->first, it->second); + } + } + if (adjustment >= 0 || -adjustment < score){ + score += adjustment; + }else{ + score = 0; + } // Print info on host MEDIUM_MSG("SOURCE %s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64 - " (max %" PRIu64 " MB/s), Geo %" PRIu64 " -> %" PRIu64, + " (max %" PRIu64 " MB/s), Geo %" PRIu64 ", tag adjustment %" PRId64 " -> %" PRIu64, host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score, - availBandwidth / 1024 / 1024, geo_score, score); + availBandwidth / 1024 / 1024, geo_score, adjustment, score); return score; } std::string getUrl(std::string &s, std::string &proto){ @@ -298,6 +353,14 @@ public: int64_t nRamCur = d["mem_used"].asInt(); int64_t nShmMax = d["shm_total"].asInt(); int64_t nShmCur = d["shm_used"].asInt(); + if (d.isMember("tags") && d["tags"].isArray()){ + std::set newTags; + jsonForEach(d["tags"], tag){ + std::string t = tag->asString(); + if (t.size()){newTags.insert(t);} + } + if (newTags != tags){tags = newTags;} + } if (!nRamMax){nRamMax = 1;} if (!nShmMax){nShmMax = 1;} if (((nRamCur + nShmCur) * 1000) / nRamMax > (nShmCur * 1000) / nShmMax){ @@ -373,16 +436,24 @@ public: /// Fixed-size struct for holding a host's name and details pointer struct hostEntry{ uint8_t state; // 0 = off, 1 = booting, 2 = running, 3 = requesting shutdown, 4 = requesting clean - char name[200]; // 200 chars should be enough + char name[HOSTNAMELEN]; // host+port for server hostDetails *details; /// hostDetails pointer tthread::thread *thread; /// thread pointer }; -hostEntry hosts[1000]; /// Fixed-size array holding all hosts +hostEntry hosts[MAXHOSTS]; /// Fixed-size array holding all hosts void initHost(hostEntry &H, const std::string &N); void cleanupHost(hostEntry &H); +///Fills the given map with the given JSON string of tag adjustments +void fillTagAdjust(std::map & tags, const std::string & adjust){ + JSON::Value adj = JSON::fromString(adjust); + jsonForEach(adj, t){ + tags[t.key()] = t->asInt(); + } +} + int handleRequest(Socket::Connection &conn){ HTTP::Parser H; while (conn){ @@ -401,6 +472,7 @@ int handleRequest(Socket::Connection &conn){ std::string streamStats = H.GetVar("streamstats"); std::string stream = H.GetVar("stream"); std::string source = H.GetVar("source"); + std::string ingest = H.GetVar("ingest"); std::string fback = H.GetVar("fallback"); std::string lstserver = H.GetVar("lstserver"); std::string addserver = H.GetVar("addserver"); @@ -431,7 +503,7 @@ int handleRequest(Socket::Connection &conn){ // Get server list if (lstserver.size()){ for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} ret[(std::string)hosts[i].name] = stateLookup[hosts[i].state]; } H.SetBody(ret.toPrettyString()); @@ -442,9 +514,10 @@ int handleRequest(Socket::Connection &conn){ } // Remove server from list if (delserver.size()){ + tthread::lock_guard globGuard(globalMutex); ret = "Server not monitored - could not delete from monitored server list!"; for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} if ((std::string)hosts[i].name == delserver){ cleanupHost(hosts[i]); ret = stateLookup[hosts[i].state]; @@ -458,7 +531,8 @@ int handleRequest(Socket::Connection &conn){ } // Add server to list if (addserver.size()){ - if (addserver.size() > 199){ + tthread::lock_guard globGuard(globalMutex); + if (addserver.size() >= HOSTNAMELEN){ H.SetBody("Host length too long for monitoring"); H.setCORSHeaders(); H.SendResponse("200", "OK", conn); @@ -468,7 +542,7 @@ int handleRequest(Socket::Connection &conn){ bool stop = false; hostEntry *newEntry = 0; for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} if ((std::string)hosts[i].name == addserver){ stop = true; break; @@ -482,6 +556,7 @@ int handleRequest(Socket::Connection &conn){ initHost(hosts[i], addserver); newEntry = &(hosts[i]); stop = true; + break; } } if (!stop){ @@ -500,7 +575,7 @@ int handleRequest(Socket::Connection &conn){ // Request viewer count if (viewers.size()){ for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} HOST(i).details->fillStreams(ret); } H.SetBody(ret.toPrettyString()); @@ -512,7 +587,7 @@ int handleRequest(Socket::Connection &conn){ // Request full stream statistics if (streamStats.size()){ for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} HOST(i).details->fillStreamStats(streamStats, ret); } H.SetBody(ret.toPrettyString()); @@ -524,7 +599,7 @@ int handleRequest(Socket::Connection &conn){ if (stream.size()){ uint64_t count = 0; for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} count += HOST(i).details->getViewers(stream); } H.SetBody(JSON::Value(count).asString()); @@ -537,6 +612,21 @@ int handleRequest(Socket::Connection &conn){ if (source.size()){ INFO_MSG("Finding source for stream %s", source.c_str()); std::string bestHost = ""; + std::map tagAdjust; + if (H.GetVar("tag_adjust") != ""){fillTagAdjust(tagAdjust, H.GetVar("tag_adjust"));} + if (H.hasHeader("X-Tag-Adjust")){fillTagAdjust(tagAdjust, H.GetHeader("X-Tag-Adjust"));} + double lat = 0; + double lon = 0; + if (H.GetVar("lat") != ""){ + lat = atof(H.GetVar("lat").c_str()); + H.SetVar("lat", ""); + } + if (H.GetVar("lon") != ""){ + lon = atof(H.GetVar("lon").c_str()); + H.SetVar("lon", ""); + } + if (H.hasHeader("X-Latitude")){lat = atof(H.GetHeader("X-Latitude").c_str());} + if (H.hasHeader("X-Longitude")){lon = atof(H.GetHeader("X-Longitude").c_str());} uint64_t bestScore = 0; for (HOSTLOOP){ HOSTCHECK; @@ -544,7 +634,7 @@ int handleRequest(Socket::Connection &conn){ INFO_MSG("Ignoring same-host entry %s", HOST(i).details->host.data()); continue; } - uint64_t score = HOST(i).details->source(source); + uint64_t score = HOST(i).details->source(source, lat, lon, tagAdjust, 0); if (score > bestScore){ bestHost = "dtsc://" + HOST(i).details->host; bestScore = score; @@ -566,16 +656,65 @@ int handleRequest(Socket::Connection &conn){ H.Clean(); continue; } + // Find optimal ingest point + if (ingest.size()){ + double cpuUse = atof(ingest.c_str()); + INFO_MSG("Finding ingest point for CPU usage %.2f", cpuUse); + std::string bestHost = ""; + std::map tagAdjust; + if (H.GetVar("tag_adjust") != ""){fillTagAdjust(tagAdjust, H.GetVar("tag_adjust"));} + if (H.hasHeader("X-Tag-Adjust")){fillTagAdjust(tagAdjust, H.GetHeader("X-Tag-Adjust"));} + double lat = 0; + double lon = 0; + if (H.GetVar("lat") != ""){ + lat = atof(H.GetVar("lat").c_str()); + H.SetVar("lat", ""); + } + if (H.GetVar("lon") != ""){ + lon = atof(H.GetVar("lon").c_str()); + H.SetVar("lon", ""); + } + if (H.hasHeader("X-Latitude")){lat = atof(H.GetHeader("X-Latitude").c_str());} + if (H.hasHeader("X-Longitude")){lon = atof(H.GetHeader("X-Longitude").c_str());} + uint64_t bestScore = 0; + for (HOSTLOOP){ + HOSTCHECK; + uint64_t score = HOST(i).details->source("", lat, lon, tagAdjust, cpuUse * 10); + if (score > bestScore){ + bestHost = HOST(i).details->host; + bestScore = score; + } + } + if (bestScore == 0){ + if (fback.size()){ + bestHost = fback; + }else{ + bestHost = fallback; + } + FAIL_MSG("No ingest point found!"); + }else{ + INFO_MSG("Winner: %s scores %" PRIu64, bestHost.c_str(), bestScore); + } + H.SetBody(bestHost); + H.setCORSHeaders(); + H.SendResponse("200", "OK", conn); + H.Clean(); + continue; + } // Find host(s) status if (!host.size()){ for (HOSTLOOP){ + if (hosts[i].state == STATE_OFF){continue;} + ret[HOST(i).details->host] = stateLookup[hosts[i].state]; HOSTCHECK; HOST(i).details->fillState(ret[HOST(i).details->host]); } }else{ for (HOSTLOOP){ - HOSTCHECK; + if (hosts[i].state == STATE_OFF){continue;} if (HOST(i).details->host == host){ + ret = stateLookup[hosts[i].state]; + HOSTCHECK; HOST(i).details->fillState(ret); break; } @@ -588,8 +727,14 @@ int handleRequest(Socket::Connection &conn){ continue; } // Balance given stream - std::string stream = H.url.substr(1); + std::string stream = HTTP::URL(H.url).path; std::string proto = H.GetVar("proto"); + std::map tagAdjust; + if (H.GetVar("tag_adjust") != ""){ + fillTagAdjust(tagAdjust, H.GetVar("tag_adjust")); + H.SetVar("tag_adjust", ""); + } + if (H.hasHeader("X-Tag-Adjust")){fillTagAdjust(tagAdjust, H.GetHeader("X-Tag-Adjust"));} H.SetVar("proto", ""); double lat = 0; double lon = 0; @@ -618,7 +763,7 @@ int handleRequest(Socket::Connection &conn){ uint64_t bestScore = 0; for (HOSTLOOP){ HOSTCHECK; - uint64_t score = HOST(i).details->rate(stream, lat, lon); + uint64_t score = HOST(i).details->rate(stream, lat, lon, tagAdjust); if (score > bestScore){ bestHost = &HOST(i); bestScore = score; @@ -706,7 +851,7 @@ void handleServer(void *hostEntryPointer){ int main(int argc, char **argv){ Util::redirectLogsIfNeeded(); - memset(hosts, 0, sizeof(hosts)); // zero-fill the hosts list + memset(hosts, 0, sizeof(hostEntry)*MAXHOSTS); // zero-fill the hosts list Util::Config conf(argv[0]); cfg = &conf; @@ -828,6 +973,10 @@ int main(int argc, char **argv){ conf.is_active = false; // Join all threads + for (HOSTLOOP){ + if (!HOST(i).name[0]){continue;} + HOST(i).state = STATE_GODOWN; + } for (HOSTLOOP){cleanupHost(HOST(i));} } @@ -836,6 +985,7 @@ void initHost(hostEntry &H, const std::string &N){ if (!N.size()){return;} H.state = STATE_BOOT; H.details = new hostDetails(); + memset(H.name, 0, HOSTNAMELEN); memcpy(H.name, N.data(), N.size()); H.thread = new tthread::thread(handleServer, (void *)&H); INFO_MSG("Starting monitoring %s", H.name); @@ -853,6 +1003,6 @@ void cleanupHost(hostEntry &H){ // Clean up details delete H.details; H.details = 0; - memset(H.name, 0, sizeof(H.name)); + memset(H.name, 0, HOSTNAMELEN); H.state = STATE_OFF; }