
- Fixed segfault when attempting to initialseek on disconnected streams - Fix 100% CPU bug in controller's stats code - WebRTC UDP bind socket improvements - Several segfault fixes - Increased packet reordering buffer size from 30 to 150 packets - Tweaks to default output/buffer behaviour for incoming pushes - Added message for load balancer checks - Fixed HLS content type - Stats fixes - Exit reason fixes - Fixed socket IP address detection - Fixed non-string arguments for stream settings - Added caching for getConnectedBinHost() - Added WebRTC playback rate control - Added/completed VP8/VP9 support to WebRTC/RTSP - Added live seek option to WebRTC - Fixed seek to exactly newest timestamp - Fixed HLS input # Conflicts: # lib/defines.h # src/input/input.cpp
841 lines
28 KiB
C++
841 lines
28 KiB
C++
#include <cmath>
|
|
#include <cstdlib>
|
|
#include <fstream>
|
|
#include <iostream>
|
|
#include <mist/config.h>
|
|
#include <mist/defines.h>
|
|
#include <mist/downloader.h>
|
|
#include <mist/timing.h>
|
|
#include <mist/tinythread.h>
|
|
#include <mist/util.h>
|
|
#include <set>
|
|
#include <stdint.h>
|
|
#include <string>
|
|
|
|
Util::Config *cfg = 0;
|
|
std::string passphrase;
|
|
std::string fallback;
|
|
bool localMode = false;
|
|
|
|
size_t weight_cpu = 500;
|
|
size_t weight_ram = 500;
|
|
size_t weight_bw = 1000;
|
|
size_t weight_geo = 1000;
|
|
size_t weight_bonus = 50;
|
|
unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesses.
|
|
#define HOSTLOOP \
|
|
unsigned long i = 0; \
|
|
i < hostsCounter; \
|
|
++i
|
|
#define HOST(no) (hosts[no])
|
|
#define HOSTCHECK \
|
|
if (hosts[i].state != STATE_ONLINE){continue;}
|
|
|
|
#define STATE_OFF 0
|
|
#define STATE_BOOT 1
|
|
#define STATE_ERROR 2
|
|
#define STATE_ONLINE 3
|
|
#define STATE_GODOWN 4
|
|
#define STATE_REQCLEAN 5
|
|
const char *stateLookup[] ={"Offline", "Starting monitoring",
|
|
"Monitored (error)", "Monitored (online)",
|
|
"Requesting stop", "Requesting clean"};
|
|
|
|
struct streamDetails{
|
|
uint32_t total;
|
|
uint32_t inputs;
|
|
uint32_t bandwidth;
|
|
uint32_t prevTotal;
|
|
};
|
|
|
|
class outUrl{
|
|
public:
|
|
std::string pre, post;
|
|
outUrl(){};
|
|
outUrl(const std::string &u, const std::string &host){
|
|
std::string tmp = u;
|
|
if (u.find("HOST") != std::string::npos){
|
|
tmp = u.substr(0, u.find("HOST")) + host + u.substr(u.find("HOST") + 4);
|
|
}
|
|
size_t dolsign = tmp.find('$');
|
|
pre = tmp.substr(0, dolsign);
|
|
if (dolsign != std::string::npos){post = tmp.substr(dolsign + 1);}
|
|
}
|
|
};
|
|
|
|
inline double toRad(double degree){
|
|
return degree / 57.29577951308232087684;
|
|
}
|
|
|
|
double geoDist(double lat1, double long1, double lat2, double long2){
|
|
double dist;
|
|
dist = sin(toRad(lat1)) * sin(toRad(lat2)) + cos(toRad(lat1)) * cos(toRad(lat2)) * cos(toRad(long1 - long2));
|
|
return .31830988618379067153 * acos(dist);
|
|
}
|
|
|
|
class hostDetails{
|
|
private:
|
|
tthread::mutex *hostMutex;
|
|
std::map<std::string, struct streamDetails> streams;
|
|
std::set<std::string> conf_streams;
|
|
std::map<std::string, outUrl> outputs;
|
|
uint64_t cpu;
|
|
uint64_t ramMax;
|
|
uint64_t ramCurr;
|
|
uint64_t upSpeed;
|
|
uint64_t downSpeed;
|
|
uint64_t total;
|
|
uint64_t upPrev;
|
|
uint64_t downPrev;
|
|
uint64_t prevTime;
|
|
uint64_t addBandwidth;
|
|
|
|
public:
|
|
std::string host;
|
|
char binHost[16];
|
|
uint64_t availBandwidth;
|
|
JSON::Value geoDetails;
|
|
double servLati, servLongi;
|
|
std::string servLoc;
|
|
hostDetails(){
|
|
hostMutex = 0;
|
|
cpu = 1000;
|
|
ramMax = 0;
|
|
ramCurr = 0;
|
|
upSpeed = 0;
|
|
downSpeed = 0;
|
|
upPrev = 0;
|
|
downPrev = 0;
|
|
prevTime = 0;
|
|
total = 0;
|
|
addBandwidth = 0;
|
|
servLati = 0;
|
|
servLongi = 0;
|
|
availBandwidth = 128 * 1024 * 1024; // assume 1G connections
|
|
}
|
|
~hostDetails(){
|
|
if (hostMutex){
|
|
delete hostMutex;
|
|
hostMutex = 0;
|
|
}
|
|
}
|
|
void badNess(){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
addBandwidth += 1 * 1024 * 1024;
|
|
addBandwidth *= 1.2;
|
|
}
|
|
/// Returns the count of viewers for a given stream s.
|
|
size_t count(std::string &s){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
if (streams.count(s)){return streams[s].total;}
|
|
return 0;
|
|
}
|
|
/// Fills out a by reference given JSON::Value with current state.
|
|
void fillState(JSON::Value &r){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
r["cpu"] = (uint64_t)(cpu / 10);
|
|
if (ramMax){r["ram"] = (uint64_t)((ramCurr * 100) / ramMax);}
|
|
r["up"] = upSpeed;
|
|
r["up_add"] = addBandwidth;
|
|
r["down"] = downSpeed;
|
|
r["streams"] = streams.size();
|
|
r["viewers"] = total;
|
|
r["bwlimit"] = availBandwidth;
|
|
if (servLati || servLongi){
|
|
r["geo"]["lat"] = servLati;
|
|
r["geo"]["lon"] = servLongi;
|
|
r["geo"]["loc"] = servLoc;
|
|
}
|
|
if (ramMax && availBandwidth){
|
|
r["score"]["cpu"] = (uint64_t)(weight_cpu - (cpu * weight_cpu) / 1000);
|
|
r["score"]["ram"] = (uint64_t)(weight_ram - ((ramCurr * weight_ram) / ramMax));
|
|
r["score"]["bw"] = (uint64_t)(weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
|
}
|
|
}
|
|
/// Fills out a by reference given JSON::Value with current streams.
|
|
void fillStreams(JSON::Value &r){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
for (std::map<std::string, struct streamDetails>::iterator jt = streams.begin();
|
|
jt != streams.end(); ++jt){
|
|
r[jt->first] = r[jt->first].asInt() + jt->second.total;
|
|
}
|
|
}
|
|
/// Returns viewcount for the given stream
|
|
long long getViewers(const std::string &strm){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
if (!streams.count(strm)){return 0;}
|
|
return streams[strm].total;
|
|
}
|
|
/// Scores a potential new connection to this server
|
|
/// 0 means not possible, the higher the better.
|
|
uint64_t rate(std::string &s, double lati = 0, double longi = 0){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
if (!ramMax || !availBandwidth){
|
|
WARN_MSG("Host %s invalid: RAM %" PRIu64 ", BW %" PRIu64, host.c_str(), ramMax, availBandwidth);
|
|
return 0;
|
|
}
|
|
if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){
|
|
INFO_MSG("Host %s over bandwidth: %" PRIu64 "+%" PRIu64 " >= %" PRIu64, host.c_str(), upSpeed,
|
|
addBandwidth, availBandwidth);
|
|
return 0;
|
|
}
|
|
if (conf_streams.size() && !conf_streams.count(s) &&
|
|
!conf_streams.count(s.substr(0, s.find_first_of("+ ")))){
|
|
MEDIUM_MSG("Stream %s not available from %s", s.c_str(), host.c_str());
|
|
return 0;
|
|
}
|
|
// Calculate score
|
|
uint64_t cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000);
|
|
uint64_t ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
|
|
uint64_t bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
|
uint64_t geo_score = 0;
|
|
if (servLati && servLongi && lati && longi){
|
|
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
|
|
}
|
|
uint64_t score = cpu_score + ram_score + bw_score + geo_score + (streams.count(s) ? weight_bonus : 0);
|
|
// Print info on host
|
|
MEDIUM_MSG("%s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64
|
|
" (max %" PRIu64 " MB/s), Geo %" PRIu64 " -> %" PRIu64,
|
|
host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
|
|
availBandwidth / 1024 / 1024, geo_score, score);
|
|
return score;
|
|
}
|
|
/// Scores this server as a source
|
|
/// 0 means not possible, the higher the better.
|
|
uint64_t source(std::string &s, double lati = 0, double longi = 0){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
if (!streams.count(s) || !streams[s].inputs){return 0;}
|
|
if (!ramMax || !availBandwidth){
|
|
WARN_MSG("Host %s invalid: RAM %" PRIu64 ", BW %" PRIu64, host.c_str(), ramMax, availBandwidth);
|
|
return 1;
|
|
}
|
|
if (upSpeed >= availBandwidth || (upSpeed + addBandwidth) >= availBandwidth){
|
|
INFO_MSG("Host %s over bandwidth: %" PRIu64 "+%" PRIu64 " >= %" PRIu64, host.c_str(), upSpeed,
|
|
addBandwidth, availBandwidth);
|
|
return 1;
|
|
}
|
|
// Calculate score
|
|
uint64_t cpu_score = (weight_cpu - (cpu * weight_cpu) / 1000);
|
|
uint64_t ram_score = (weight_ram - ((ramCurr * weight_ram) / ramMax));
|
|
uint64_t bw_score = (weight_bw - (((upSpeed + addBandwidth) * weight_bw) / availBandwidth));
|
|
uint64_t geo_score = 0;
|
|
if (servLati && servLongi && lati && longi){
|
|
geo_score = weight_geo - weight_geo * geoDist(servLati, servLongi, lati, longi);
|
|
}
|
|
uint64_t score = cpu_score + ram_score + bw_score + geo_score + 1;
|
|
// Print info on host
|
|
MEDIUM_MSG("SOURCE %s: CPU %" PRIu64 ", RAM %" PRIu64 ", Stream %" PRIu64 ", BW %" PRIu64
|
|
" (max %" PRIu64 " MB/s), Geo %" PRIu64 " -> %" PRIu64,
|
|
host.c_str(), cpu_score, ram_score, streams.count(s) ? weight_bonus : 0, bw_score,
|
|
availBandwidth / 1024 / 1024, geo_score, score);
|
|
return score;
|
|
}
|
|
std::string getUrl(std::string &s, std::string &proto){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
if (!outputs.count(proto)){return "";}
|
|
const outUrl &o = outputs[proto];
|
|
return o.pre + s + o.post;
|
|
}
|
|
void addViewer(std::string &s){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
uint64_t toAdd = 0;
|
|
if (streams.count(s)){
|
|
toAdd = streams[s].bandwidth;
|
|
}else{
|
|
if (total){
|
|
toAdd = (upSpeed + downSpeed) / total;
|
|
}else{
|
|
toAdd = 131072; // assume 1mbps
|
|
}
|
|
}
|
|
// ensure reasonable limits of bandwidth guesses
|
|
if (toAdd < 64 * 1024){toAdd = 64 * 1024;}// minimum of 0.5 mbps
|
|
if (toAdd > 1024 * 1024){toAdd = 1024 * 1024;}// maximum of 8 mbps
|
|
addBandwidth += toAdd;
|
|
}
|
|
void update(JSON::Value &d){
|
|
if (!hostMutex){hostMutex = new tthread::mutex();}
|
|
tthread::lock_guard<tthread::mutex> guard(*hostMutex);
|
|
cpu = d["cpu"].asInt();
|
|
if (d.isMember("bwlimit") && d["bwlimit"].asInt()){availBandwidth = d["bwlimit"].asInt();}
|
|
int64_t nRamMax = d["mem_total"].asInt();
|
|
int64_t nRamCur = d["mem_used"].asInt();
|
|
int64_t nShmMax = d["shm_total"].asInt();
|
|
int64_t nShmCur = d["shm_used"].asInt();
|
|
if (!nRamMax){nRamMax = 1;}
|
|
if (!nShmMax){nShmMax = 1;}
|
|
if (((nRamCur + nShmCur) * 1000) / nRamMax > (nShmCur * 1000) / nShmMax){
|
|
ramMax = nRamMax;
|
|
ramCurr = nRamCur + nShmCur;
|
|
}else{
|
|
ramMax = nShmMax;
|
|
ramCurr = nShmCur;
|
|
}
|
|
total = d["curr"][0u].asInt();
|
|
uint64_t currUp = d["bw"][0u].asInt(), currDown = d["bw"][1u].asInt();
|
|
uint64_t timeDiff = 0;
|
|
if (prevTime){
|
|
timeDiff = time(0) - prevTime;
|
|
if (timeDiff){
|
|
upSpeed = (currUp - upPrev) / timeDiff;
|
|
downSpeed = (currDown - downPrev) / timeDiff;
|
|
}
|
|
}
|
|
prevTime = time(0);
|
|
upPrev = currUp;
|
|
downPrev = currDown;
|
|
|
|
if (d.isMember("streams") && d["streams"].size()){
|
|
jsonForEach(d["streams"], it){
|
|
uint64_t count = (*it)["curr"][0u].asInt() + (*it)["curr"][1u].asInt() + (*it)["curr"][2u].asInt();
|
|
if (!count){
|
|
if (streams.count(it.key())){streams.erase(it.key());}
|
|
continue;
|
|
}
|
|
struct streamDetails &strm = streams[it.key()];
|
|
strm.total = (*it)["curr"][0u].asInt();
|
|
strm.inputs = (*it)["curr"][1u].asInt();
|
|
uint64_t currTotal = (*it)["bw"][0u].asInt() + (*it)["bw"][1u].asInt();
|
|
if (timeDiff && count){
|
|
strm.bandwidth = ((currTotal - strm.prevTotal) / timeDiff) / count;
|
|
}else{
|
|
if (total){
|
|
strm.bandwidth = (upSpeed + downSpeed) / total;
|
|
}else{
|
|
strm.bandwidth = (upSpeed + downSpeed) + 100000;
|
|
}
|
|
}
|
|
strm.prevTotal = currTotal;
|
|
}
|
|
if (streams.size()){
|
|
std::set<std::string> eraseList;
|
|
for (std::map<std::string, struct streamDetails>::iterator it = streams.begin();
|
|
it != streams.end(); ++it){
|
|
if (!d["streams"].isMember(it->first)){eraseList.insert(it->first);}
|
|
}
|
|
for (std::set<std::string>::iterator it = eraseList.begin(); it != eraseList.end(); ++it){
|
|
streams.erase(*it);
|
|
}
|
|
}
|
|
}else{
|
|
streams.clear();
|
|
}
|
|
conf_streams.clear();
|
|
if (d.isMember("conf_streams") && d["conf_streams"].size()){
|
|
jsonForEach(d["conf_streams"], it){conf_streams.insert(it->asStringRef());}
|
|
}
|
|
outputs.clear();
|
|
if (d.isMember("outputs") && d["outputs"].size()){
|
|
jsonForEach(d["outputs"], op){outputs[op.key()] = outUrl(op->asStringRef(), host);}
|
|
}
|
|
addBandwidth *= 0.75;
|
|
}
|
|
};
|
|
|
|
/// Fixed-size struct for holding a host's name and details pointer
|
|
struct hostEntry{
|
|
uint8_t state; // 0 = off, 1 = booting, 2 = running, 3 = requesting shutdown, 4 = requesting clean
|
|
char name[200]; // 200 chars should be enough
|
|
hostDetails *details; /// hostDetails pointer
|
|
tthread::thread *thread; /// thread pointer
|
|
};
|
|
|
|
hostEntry hosts[1000]; /// Fixed-size array holding all hosts
|
|
|
|
void initHost(hostEntry &H, const std::string &N);
|
|
void cleanupHost(hostEntry &H);
|
|
|
|
int handleRequest(Socket::Connection &conn){
|
|
HTTP::Parser H;
|
|
while (conn){
|
|
if ((conn.spool() || conn.Received().size()) && H.Read(conn)){
|
|
// Special commands
|
|
if (H.url.size() == 1){
|
|
if (localMode && !conn.isLocal()){
|
|
H.SetBody("Configuration only accessible from local interfaces");
|
|
H.setCORSHeaders();
|
|
H.SendResponse("403", "Forbidden", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
std::string host = H.GetVar("host");
|
|
std::string viewers = H.GetVar("viewers");
|
|
std::string stream = H.GetVar("stream");
|
|
std::string source = H.GetVar("source");
|
|
std::string fback = H.GetVar("fallback");
|
|
std::string lstserver = H.GetVar("lstserver");
|
|
std::string addserver = H.GetVar("addserver");
|
|
std::string delserver = H.GetVar("delserver");
|
|
std::string weights = H.GetVar("weights");
|
|
H.Clean();
|
|
H.SetHeader("Content-Type", "text/plain");
|
|
JSON::Value ret;
|
|
// Get/set weights
|
|
if (weights.size()){
|
|
JSON::Value newVals = JSON::fromString(weights);
|
|
if (newVals.isMember("cpu")){weight_cpu = newVals["cpu"].asInt();}
|
|
if (newVals.isMember("ram")){weight_ram = newVals["ram"].asInt();}
|
|
if (newVals.isMember("bw")){weight_bw = newVals["bw"].asInt();}
|
|
if (newVals.isMember("geo")){weight_geo = newVals["geo"].asInt();}
|
|
if (newVals.isMember("bonus")){weight_bonus = newVals["bonus"].asInt();}
|
|
ret["cpu"] = weight_cpu;
|
|
ret["ram"] = weight_ram;
|
|
ret["bw"] = weight_bw;
|
|
ret["geo"] = weight_geo;
|
|
ret["bonus"] = weight_bonus;
|
|
H.SetBody(ret.toString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Get server list
|
|
if (lstserver.size()){
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
ret[(std::string)hosts[i].name] = stateLookup[hosts[i].state];
|
|
}
|
|
H.SetBody(ret.toPrettyString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Remove server from list
|
|
if (delserver.size()){
|
|
ret = "Server not monitored - could not delete from monitored server list!";
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
if ((std::string)hosts[i].name == delserver){
|
|
cleanupHost(hosts[i]);
|
|
ret = stateLookup[hosts[i].state];
|
|
}
|
|
}
|
|
H.SetBody(ret.toPrettyString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Add server to list
|
|
if (addserver.size()){
|
|
if (addserver.size() > 199){
|
|
H.SetBody("Host length too long for monitoring");
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
bool stop = false;
|
|
hostEntry *newEntry = 0;
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
if ((std::string)hosts[i].name == addserver){
|
|
stop = true;
|
|
break;
|
|
}
|
|
}
|
|
if (stop){
|
|
ret = "Server already monitored - add request ignored";
|
|
}else{
|
|
for (HOSTLOOP){
|
|
if (hosts[i].state == STATE_OFF){
|
|
initHost(hosts[i], addserver);
|
|
newEntry = &(hosts[i]);
|
|
stop = true;
|
|
}
|
|
}
|
|
if (!stop){
|
|
initHost(HOST(hostsCounter), addserver);
|
|
newEntry = &HOST(hostsCounter);
|
|
++hostsCounter; // up the hosts counter
|
|
}
|
|
ret[addserver] = stateLookup[newEntry->state];
|
|
}
|
|
H.SetBody(ret.toPrettyString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Request viewer count
|
|
if (viewers.size()){
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
HOST(i).details->fillStreams(ret);
|
|
}
|
|
H.SetBody(ret.toPrettyString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
if (stream.size()){
|
|
uint64_t count = 0;
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
count += HOST(i).details->getViewers(stream);
|
|
}
|
|
H.SetBody(JSON::Value(count).asString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Find source for given stream
|
|
if (source.size()){
|
|
INFO_MSG("Finding source for stream %s", source.c_str());
|
|
std::string bestHost = "";
|
|
uint64_t bestScore = 0;
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
if (Socket::matchIPv6Addr(std::string(HOST(i).details->binHost, 16), conn.getBinHost(), 0)){
|
|
INFO_MSG("Ignoring same-host entry %s", HOST(i).details->host.data());
|
|
continue;
|
|
}
|
|
uint64_t score = HOST(i).details->source(source);
|
|
if (score > bestScore){
|
|
bestHost = "dtsc://" + HOST(i).details->host;
|
|
bestScore = score;
|
|
}
|
|
}
|
|
if (bestScore == 0){
|
|
if (fback.size()){
|
|
bestHost = fback;
|
|
}else{
|
|
bestHost = fallback;
|
|
}
|
|
FAIL_MSG("No source for %s found!", source.c_str());
|
|
}else{
|
|
INFO_MSG("Winner: %s scores %" PRIu64, bestHost.c_str(), bestScore);
|
|
}
|
|
H.SetBody(bestHost);
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Find host(s) status
|
|
if (!host.size()){
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
HOST(i).details->fillState(ret[HOST(i).details->host]);
|
|
}
|
|
}else{
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
if (HOST(i).details->host == host){
|
|
HOST(i).details->fillState(ret);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
H.SetBody(ret.toPrettyString());
|
|
H.setCORSHeaders();
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
// Balance given stream
|
|
std::string stream = H.url.substr(1);
|
|
std::string proto = H.GetVar("proto");
|
|
H.SetVar("proto", "");
|
|
double lat = 0;
|
|
double lon = 0;
|
|
if (H.GetVar("lat") != ""){
|
|
lat = atof(H.GetVar("lat").c_str());
|
|
H.SetVar("lat", "");
|
|
}
|
|
if (H.GetVar("lon") != ""){
|
|
lon = atof(H.GetVar("lon").c_str());
|
|
H.SetVar("lon", "");
|
|
}
|
|
if (H.hasHeader("X-Latitude")){lat = atof(H.GetHeader("X-Latitude").c_str());}
|
|
if (H.hasHeader("X-Longitude")){lon = atof(H.GetHeader("X-Longitude").c_str());}
|
|
std::string vars = H.allVars();
|
|
if (stream == "favicon.ico"){
|
|
H.Clean();
|
|
H.SendResponse("404", "No favicon", conn);
|
|
H.Clean();
|
|
continue;
|
|
}
|
|
INFO_MSG("Balancing stream %s", stream.c_str());
|
|
H.Clean();
|
|
H.SetHeader("Content-Type", "text/plain");
|
|
H.setCORSHeaders();
|
|
hostEntry *bestHost = 0;
|
|
uint64_t bestScore = 0;
|
|
for (HOSTLOOP){
|
|
HOSTCHECK;
|
|
uint64_t score = HOST(i).details->rate(stream, lat, lon);
|
|
if (score > bestScore){
|
|
bestHost = &HOST(i);
|
|
bestScore = score;
|
|
}
|
|
}
|
|
if (!bestScore || !bestHost){
|
|
H.SetBody(fallback);
|
|
FAIL_MSG("All servers seem to be out of bandwidth!");
|
|
}else{
|
|
INFO_MSG("Winner: %s scores %" PRIu64, bestHost->details->host.c_str(), bestScore);
|
|
bestHost->details->addViewer(stream);
|
|
H.SetBody(bestHost->details->host);
|
|
}
|
|
if (proto != "" && bestHost && bestScore){
|
|
H.Clean();
|
|
H.setCORSHeaders();
|
|
H.SetHeader("Location", bestHost->details->getUrl(stream, proto) + vars);
|
|
H.SetBody(H.GetHeader("Location"));
|
|
H.SendResponse("307", "Redirecting", conn);
|
|
H.Clean();
|
|
}else{
|
|
H.SendResponse("200", "OK", conn);
|
|
H.Clean();
|
|
}
|
|
}// if HTTP request received
|
|
}
|
|
conn.close();
|
|
return 0;
|
|
}
|
|
|
|
void handleServer(void *hostEntryPointer){
|
|
hostEntry *entry = (hostEntry *)hostEntryPointer;
|
|
JSON::Value bandwidth = 128 * 1024 * 1024u; // assume 1G connection
|
|
HTTP::URL url(entry->name);
|
|
if (!url.protocol.size()){url.protocol = "http";}
|
|
if (!url.port.size()){url.port = "4242";}
|
|
if (url.path.size()){
|
|
bandwidth = url.path;
|
|
bandwidth = bandwidth.asInt() * 1024 * 1024;
|
|
url.path.clear();
|
|
}
|
|
url.path = passphrase + ".json";
|
|
|
|
INFO_MSG("Monitoring %s", url.getUrl().c_str());
|
|
entry->details->availBandwidth = bandwidth.asInt();
|
|
entry->details->host = url.host;
|
|
entry->state = STATE_BOOT;
|
|
bool down = true;
|
|
|
|
HTTP::Downloader DL;
|
|
|
|
if (DL.get(HTTP::URL("http://api.ipstack.com/" + url.host + "?access_key=05eb21db3983dec4cd6d22131ec0a40d&format=1")) &&
|
|
DL.isOk()){
|
|
JSON::Value &gDet = entry->details->geoDetails;
|
|
INFO_MSG("Location: %s", DL.data().c_str());
|
|
gDet = JSON::fromString(DL.data());
|
|
INFO_MSG("Location: %s", gDet.toString().c_str());
|
|
if (gDet && gDet.isMember("latitude") && gDet.isMember("longitude")){
|
|
entry->details->servLati = gDet["latitude"].asDouble();
|
|
entry->details->servLongi = gDet["longitude"].asDouble();
|
|
std::stringstream loc;
|
|
if (gDet.isMember("country_name")){
|
|
if (gDet.isMember("city") && gDet["city"].asString().size()){
|
|
entry->details->servLoc = gDet["city"].asString() + ", " + gDet["country_name"].asString();
|
|
}else{
|
|
entry->details->servLoc = gDet["country_name"].asString();
|
|
}
|
|
INFO_MSG("%s is in %s", url.host.c_str(), entry->details->servLoc.c_str());
|
|
}else{
|
|
INFO_MSG("%s is at %f, %f", url.host.c_str(), entry->details->servLati, entry->details->servLongi);
|
|
}
|
|
}
|
|
}else{
|
|
WARN_MSG("Could not reach location server for %s", url.host.c_str());
|
|
}
|
|
|
|
while (cfg->is_active && (entry->state != STATE_GODOWN)){
|
|
if (DL.get(url) && DL.isOk()){
|
|
JSON::Value servData = JSON::fromString(DL.data());
|
|
if (!servData){
|
|
FAIL_MSG("Can't decode server %s load information", url.host.c_str());
|
|
entry->details->badNess();
|
|
DL.getSocket().close();
|
|
down = true;
|
|
entry->state = STATE_ERROR;
|
|
}else{
|
|
if (down){
|
|
std::string ipStr;
|
|
Socket::hostBytesToStr(DL.getSocket().getBinHost().data(), 16, ipStr);
|
|
WARN_MSG("Connection established with %s (%s)", url.host.c_str(), ipStr.c_str());
|
|
memcpy(entry->details->binHost, DL.getSocket().getBinHost().data(), 16);
|
|
entry->state = STATE_ONLINE;
|
|
down = false;
|
|
}
|
|
entry->details->update(servData);
|
|
}
|
|
}else{
|
|
FAIL_MSG("Can't retrieve server %s load information", url.host.c_str());
|
|
entry->details->badNess();
|
|
DL.getSocket().close();
|
|
down = true;
|
|
entry->state = STATE_ERROR;
|
|
}
|
|
Util::wait(5000);
|
|
}
|
|
WARN_MSG("Monitoring thread for %s stopping", url.host.c_str());
|
|
DL.getSocket().close();
|
|
entry->state = STATE_REQCLEAN;
|
|
}
|
|
|
|
int main(int argc, char **argv){
|
|
Util::redirectLogsIfNeeded();
|
|
memset(hosts, 0, sizeof(hosts)); // zero-fill the hosts list
|
|
Util::Config conf(argv[0]);
|
|
cfg = &conf;
|
|
|
|
JSON::Value opt;
|
|
opt["arg"] = "string";
|
|
opt["short"] = "s";
|
|
opt["long"] = "server";
|
|
opt["help"] = "Address of a server to balance. Hostname or IP, optionally followed by API port.";
|
|
conf.addOption("server", opt);
|
|
|
|
opt["arg"] = "integer";
|
|
opt["short"] = "p";
|
|
opt["long"] = "port";
|
|
opt["help"] = "TCP port to listen on";
|
|
opt["value"].append(8042u);
|
|
conf.addOption("port", opt);
|
|
|
|
opt["arg"] = "string";
|
|
opt["short"] = "P";
|
|
opt["long"] = "passphrase";
|
|
opt["help"] = "Passphrase (prometheus option value) to use for data retrieval.";
|
|
opt["value"][0u] = "koekjes";
|
|
conf.addOption("passphrase", opt);
|
|
|
|
opt["arg"] = "string";
|
|
opt["short"] = "i";
|
|
opt["long"] = "interface";
|
|
opt["help"] = "Network interface to listen on";
|
|
opt["value"][0u] = "0.0.0.0";
|
|
conf.addOption("interface", opt);
|
|
|
|
opt["arg"] = "string";
|
|
opt["short"] = "u";
|
|
opt["long"] = "username";
|
|
opt["help"] = "Username to drop privileges to";
|
|
opt["value"][0u] = "root";
|
|
conf.addOption("username", opt);
|
|
|
|
opt["arg"] = "string";
|
|
opt["short"] = "F";
|
|
opt["long"] = "fallback";
|
|
opt["help"] = "Default reply if no servers are available";
|
|
opt["value"][0u] = "FULL";
|
|
conf.addOption("fallback", opt);
|
|
|
|
opt["arg"] = "integer";
|
|
opt["short"] = "R";
|
|
opt["long"] = "ram";
|
|
opt["help"] = "Weight for RAM scoring";
|
|
opt["value"].append(weight_ram);
|
|
conf.addOption("ram", opt);
|
|
|
|
opt["arg"] = "integer";
|
|
opt["short"] = "C";
|
|
opt["long"] = "cpu";
|
|
opt["help"] = "Weight for CPU scoring";
|
|
opt["value"].append(weight_cpu);
|
|
conf.addOption("cpu", opt);
|
|
|
|
opt["arg"] = "integer";
|
|
opt["short"] = "B";
|
|
opt["long"] = "bw";
|
|
opt["help"] = "Weight for BW scoring";
|
|
opt["value"].append(weight_bw);
|
|
conf.addOption("bw", opt);
|
|
|
|
opt["arg"] = "integer";
|
|
opt["short"] = "G";
|
|
opt["long"] = "geo";
|
|
opt["help"] = "Weight for geo scoring";
|
|
opt["value"].append(weight_geo);
|
|
conf.addOption("geo", opt);
|
|
|
|
opt["arg"] = "integer";
|
|
opt["short"] = "X";
|
|
opt["long"] = "extra";
|
|
opt["help"] = "Weight for extra scoring when stream exists";
|
|
opt["value"].append(weight_bonus);
|
|
conf.addOption("extra", opt);
|
|
|
|
opt.null();
|
|
opt["short"] = "L";
|
|
opt["long"] = "localmode";
|
|
opt["help"] = "Control only from local interfaces, request balance from all";
|
|
conf.addOption("localmode", opt);
|
|
|
|
conf.parseArgs(argc, argv);
|
|
|
|
passphrase = conf.getOption("passphrase").asStringRef();
|
|
weight_ram = conf.getInteger("ram");
|
|
weight_cpu = conf.getInteger("cpu");
|
|
weight_bw = conf.getInteger("bw");
|
|
weight_geo = conf.getInteger("geo");
|
|
weight_bonus = conf.getInteger("extra");
|
|
fallback = conf.getString("fallback");
|
|
localMode = conf.getBool("localmode");
|
|
INFO_MSG("Local control only mode is %s", localMode ? "on" : "off");
|
|
|
|
JSON::Value &nodes = conf.getOption("server", true);
|
|
conf.activate();
|
|
|
|
std::map<std::string, tthread::thread *> threads;
|
|
jsonForEach(nodes, it){
|
|
if (it->asStringRef().size() > 199){
|
|
FAIL_MSG("Host length too long for monitoring, skipped: %s", it->asStringRef().c_str());
|
|
continue;
|
|
}
|
|
initHost(HOST(hostsCounter), it->asStringRef());
|
|
++hostsCounter; // up the hosts counter
|
|
}
|
|
WARN_MSG("Load balancer activating. Balancing between %lu nodes.", hostsCounter);
|
|
|
|
conf.serveThreadedSocket(handleRequest);
|
|
if (!conf.is_active){
|
|
WARN_MSG("Load balancer shutting down; received shutdown signal");
|
|
}else{
|
|
WARN_MSG("Load balancer shutting down; socket problem");
|
|
}
|
|
conf.is_active = false;
|
|
|
|
// Join all threads
|
|
for (HOSTLOOP){cleanupHost(HOST(i));}
|
|
}
|
|
|
|
void initHost(hostEntry &H, const std::string &N){
|
|
// Cancel if this host has no name set
|
|
if (!N.size()){return;}
|
|
H.state = STATE_BOOT;
|
|
H.details = new hostDetails();
|
|
memcpy(H.name, N.data(), N.size());
|
|
H.thread = new tthread::thread(handleServer, (void *)&H);
|
|
INFO_MSG("Starting monitoring %s", H.name);
|
|
}
|
|
|
|
void cleanupHost(hostEntry &H){
|
|
// Cancel if this host has no name set
|
|
if (!H.name[0]){return;}
|
|
H.state = STATE_GODOWN;
|
|
INFO_MSG("Stopping monitoring %s", H.name);
|
|
// Clean up thread
|
|
H.thread->join();
|
|
delete H.thread;
|
|
H.thread = 0;
|
|
// Clean up details
|
|
delete H.details;
|
|
H.details = 0;
|
|
memset(H.name, 0, sizeof(H.name));
|
|
H.state = STATE_OFF;
|
|
}
|