From be450a183f9d78561087475bdccc01ec7560ff4d Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 5 Nov 2017 15:47:51 +0100 Subject: [PATCH] Load balancer now uses HTTP downloader --- src/utils/util_load.cpp | 109 +++++++++++++++------------------------- 1 file changed, 40 insertions(+), 69 deletions(-) diff --git a/src/utils/util_load.cpp b/src/utils/util_load.cpp index 27e521f7..fdeb2b8c 100644 --- a/src/utils/util_load.cpp +++ b/src/utils/util_load.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -30,11 +30,11 @@ unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesse #define STATE_OFF 0 #define STATE_BOOT 1 -#define STATE_ONLINE 2 -#define STATE_GODOWN 3 -#define STATE_REQCLEAN 4 -const char *stateLookup[] ={"Offline", "Starting monitoring", "Monitored", "Requesting stop", - "Requesting clean"}; +#define STATE_ERROR 2 +#define STATE_ONLINE 3 +#define STATE_GODOWN 4 +#define STATE_REQCLEAN 5 +const char *stateLookup[] ={"Offline", "Starting monitoring", "Monitored (error)", "Monitored (online)", "Requesting stop", "Requesting clean"}; struct streamDetails{ uint32_t total; @@ -536,81 +536,52 @@ int handleRequest(Socket::Connection &conn){ void handleServer(void *hostEntryPointer){ hostEntry *entry = (hostEntry *)hostEntryPointer; - std::string name = entry->name; - - HTTP::Parser H; - std::string host; - int port = 4242; JSON::Value bandwidth = 128 * 1024 * 1024ll; // assume 1G connection - - size_t slash = name.find('/'); - if (slash != std::string::npos){ - bandwidth = name.substr(slash + 1, std::string::npos); + HTTP::URL url(entry->name); + if (!url.protocol.size()){url.protocol = "http";} + if (!url.port.size()){url.port = "4242";} + if (url.path.size()){ + bandwidth = url.path; bandwidth = bandwidth.asInt() * 1024 * 1024; - name = name.substr(0, slash); + url.path.clear(); } + url.path = passphrase + ".json"; - size_t colon = name.find(':'); - if (colon != std::string::npos && colon != 0 && colon != name.size()){ - host = name.substr(0, colon); - port = atoi(name.substr(colon + 1, std::string::npos).c_str()); - }else{ - host = name; - } - - INFO_MSG("Monitoring %s on port %d using passphrase %s", host.c_str(), port, passphrase.c_str()); + INFO_MSG("Monitoring %s", url.getUrl().c_str()); entry->details->availBandwidth = bandwidth.asInt(); - entry->details->host = host; - entry->state = STATE_ONLINE; + entry->details->host = url.host; + entry->state = STATE_BOOT; bool down = true; - Socket::Connection servConn(host, port, false); - while (cfg->is_active && (entry->state == STATE_ONLINE)){ - if (!servConn){ - HIGH_MSG("Reconnecting to %s", host.c_str()); - servConn = Socket::Connection(host, port, false); - } - if (!servConn){ - MEDIUM_MSG("Cannot reach server %s", host.c_str()); - entry->details->badNess(); - Util::wait(5000); - down = true; - continue; - } - - // retrieve update information - H.url = "/" + passphrase + ".json"; - H.method = "GET"; - H.SendRequest(servConn); - H.Clean(); - unsigned int startTime = Util::epoch(); - while (cfg->is_active && servConn && - !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ - if (Util::epoch() - startTime > 25){ - FAIL_MSG("Server %s timed out", host.c_str()); - servConn.close(); - H.Clean(); + HTTP::Downloader DL; + while (cfg->is_active && (entry->state != STATE_GODOWN)){ + if (DL.get(url) && DL.isOk()){ + JSON::Value servData = JSON::fromString(DL.data()); + if (!servData){ + FAIL_MSG("Can't decode server %s load information", url.host.c_str()); + entry->details->badNess(); + DL.getSocket().close(); + down = true; + entry->state = STATE_ERROR; + }else{ + if (down){ + WARN_MSG("Connection established with %s", url.host.c_str()); + entry->state = STATE_ONLINE; + down = false; + } + entry->details->update(servData); } - Util::sleep(250); - } - JSON::Value servData = JSON::fromString(H.body); - if (!servData){ - FAIL_MSG("Can't retrieve server %s load information", host.c_str()); - std::cerr << H.body << std::endl; - down = true; - entry->details->badNess(); - servConn.close(); }else{ - if (down){ - WARN_MSG("Connection established with %s", host.c_str()); - down = false; - } - entry->details->update(servData); + FAIL_MSG("Can't retrieve server %s load information", url.host.c_str()); + entry->details->badNess(); + DL.getSocket().close(); + down = true; + entry->state = STATE_ERROR; } - H.Clean(); Util::wait(5000); } - servConn.close(); + WARN_MSG("Monitoring thread for %s stopping", url.host.c_str()); + DL.getSocket().close(); entry->state = STATE_REQCLEAN; }