Load balancer now uses HTTP downloader
This commit is contained in:
		
							parent
							
								
									01ffc42f9f
								
							
						
					
					
						commit
						be450a183f
					
				
					 1 changed files with 40 additions and 69 deletions
				
			
		| 
						 | 
				
			
			@ -4,7 +4,7 @@
 | 
			
		|||
#include <iostream>
 | 
			
		||||
#include <mist/config.h>
 | 
			
		||||
#include <mist/defines.h>
 | 
			
		||||
#include <mist/http_parser.h>
 | 
			
		||||
#include <mist/downloader.h>
 | 
			
		||||
#include <mist/timing.h>
 | 
			
		||||
#include <mist/tinythread.h>
 | 
			
		||||
#include <set>
 | 
			
		||||
| 
						 | 
				
			
			@ -30,11 +30,11 @@ unsigned long hostsCounter = 0; // This is a pointer to guarantee atomic accesse
 | 
			
		|||
 | 
			
		||||
#define STATE_OFF 0
 | 
			
		||||
#define STATE_BOOT 1
 | 
			
		||||
#define STATE_ONLINE 2
 | 
			
		||||
#define STATE_GODOWN 3
 | 
			
		||||
#define STATE_REQCLEAN 4
 | 
			
		||||
const char *stateLookup[] ={"Offline", "Starting monitoring", "Monitored", "Requesting stop",
 | 
			
		||||
                             "Requesting clean"};
 | 
			
		||||
#define STATE_ERROR 2
 | 
			
		||||
#define STATE_ONLINE 3
 | 
			
		||||
#define STATE_GODOWN 4
 | 
			
		||||
#define STATE_REQCLEAN 5
 | 
			
		||||
const char *stateLookup[] ={"Offline", "Starting monitoring", "Monitored (error)", "Monitored (online)", "Requesting stop", "Requesting clean"};
 | 
			
		||||
 | 
			
		||||
struct streamDetails{
 | 
			
		||||
  uint32_t total;
 | 
			
		||||
| 
						 | 
				
			
			@ -536,81 +536,52 @@ int handleRequest(Socket::Connection &conn){
 | 
			
		|||
 | 
			
		||||
void handleServer(void *hostEntryPointer){
 | 
			
		||||
  hostEntry *entry = (hostEntry *)hostEntryPointer;
 | 
			
		||||
  std::string name = entry->name;
 | 
			
		||||
 | 
			
		||||
  HTTP::Parser H;
 | 
			
		||||
  std::string host;
 | 
			
		||||
  int port = 4242;
 | 
			
		||||
  JSON::Value bandwidth = 128 * 1024 * 1024ll; // assume 1G connection
 | 
			
		||||
 | 
			
		||||
  size_t slash = name.find('/');
 | 
			
		||||
  if (slash != std::string::npos){
 | 
			
		||||
    bandwidth = name.substr(slash + 1, std::string::npos);
 | 
			
		||||
  HTTP::URL url(entry->name);
 | 
			
		||||
  if (!url.protocol.size()){url.protocol = "http";}
 | 
			
		||||
  if (!url.port.size()){url.port = "4242";}
 | 
			
		||||
  if (url.path.size()){
 | 
			
		||||
    bandwidth = url.path;
 | 
			
		||||
    bandwidth = bandwidth.asInt() * 1024 * 1024;
 | 
			
		||||
    name = name.substr(0, slash);
 | 
			
		||||
    url.path.clear();
 | 
			
		||||
  }
 | 
			
		||||
  url.path = passphrase + ".json";
 | 
			
		||||
 | 
			
		||||
  size_t colon = name.find(':');
 | 
			
		||||
  if (colon != std::string::npos && colon != 0 && colon != name.size()){
 | 
			
		||||
    host = name.substr(0, colon);
 | 
			
		||||
    port = atoi(name.substr(colon + 1, std::string::npos).c_str());
 | 
			
		||||
  }else{
 | 
			
		||||
    host = name;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  INFO_MSG("Monitoring %s on port %d using passphrase %s", host.c_str(), port, passphrase.c_str());
 | 
			
		||||
  INFO_MSG("Monitoring %s", url.getUrl().c_str());
 | 
			
		||||
  entry->details->availBandwidth = bandwidth.asInt();
 | 
			
		||||
  entry->details->host = host;
 | 
			
		||||
  entry->state = STATE_ONLINE;
 | 
			
		||||
  entry->details->host = url.host;
 | 
			
		||||
  entry->state = STATE_BOOT;
 | 
			
		||||
  bool down = true;
 | 
			
		||||
 | 
			
		||||
  Socket::Connection servConn(host, port, false);
 | 
			
		||||
  while (cfg->is_active && (entry->state == STATE_ONLINE)){
 | 
			
		||||
    if (!servConn){
 | 
			
		||||
      HIGH_MSG("Reconnecting to %s", host.c_str());
 | 
			
		||||
      servConn = Socket::Connection(host, port, false);
 | 
			
		||||
    }
 | 
			
		||||
    if (!servConn){
 | 
			
		||||
      MEDIUM_MSG("Cannot reach server %s", host.c_str());
 | 
			
		||||
      entry->details->badNess();
 | 
			
		||||
      Util::wait(5000);
 | 
			
		||||
      down = true;
 | 
			
		||||
      continue;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // retrieve update information
 | 
			
		||||
    H.url = "/" + passphrase + ".json";
 | 
			
		||||
    H.method = "GET";
 | 
			
		||||
    H.SendRequest(servConn);
 | 
			
		||||
    H.Clean();
 | 
			
		||||
    unsigned int startTime = Util::epoch();
 | 
			
		||||
    while (cfg->is_active && servConn &&
 | 
			
		||||
           !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){
 | 
			
		||||
      if (Util::epoch() - startTime > 25){
 | 
			
		||||
        FAIL_MSG("Server %s timed out", host.c_str());
 | 
			
		||||
        servConn.close();
 | 
			
		||||
        H.Clean();
 | 
			
		||||
  HTTP::Downloader DL;
 | 
			
		||||
  while (cfg->is_active && (entry->state != STATE_GODOWN)){
 | 
			
		||||
    if (DL.get(url) && DL.isOk()){
 | 
			
		||||
      JSON::Value servData = JSON::fromString(DL.data());
 | 
			
		||||
      if (!servData){
 | 
			
		||||
        FAIL_MSG("Can't decode server %s load information", url.host.c_str());
 | 
			
		||||
        entry->details->badNess();
 | 
			
		||||
        DL.getSocket().close();
 | 
			
		||||
        down = true;
 | 
			
		||||
        entry->state = STATE_ERROR;
 | 
			
		||||
      }else{
 | 
			
		||||
        if (down){
 | 
			
		||||
          WARN_MSG("Connection established with %s", url.host.c_str());
 | 
			
		||||
          entry->state = STATE_ONLINE;
 | 
			
		||||
          down = false;
 | 
			
		||||
        }
 | 
			
		||||
        entry->details->update(servData);
 | 
			
		||||
      }
 | 
			
		||||
      Util::sleep(250);
 | 
			
		||||
    }
 | 
			
		||||
    JSON::Value servData = JSON::fromString(H.body);
 | 
			
		||||
    if (!servData){
 | 
			
		||||
      FAIL_MSG("Can't retrieve server %s load information", host.c_str());
 | 
			
		||||
      std::cerr << H.body << std::endl;
 | 
			
		||||
      down = true;
 | 
			
		||||
      entry->details->badNess();
 | 
			
		||||
      servConn.close();
 | 
			
		||||
    }else{
 | 
			
		||||
      if (down){
 | 
			
		||||
        WARN_MSG("Connection established with %s", host.c_str());
 | 
			
		||||
        down = false;
 | 
			
		||||
      }
 | 
			
		||||
      entry->details->update(servData);
 | 
			
		||||
      FAIL_MSG("Can't retrieve server %s load information", url.host.c_str());
 | 
			
		||||
      entry->details->badNess();
 | 
			
		||||
      DL.getSocket().close();
 | 
			
		||||
      down = true;
 | 
			
		||||
      entry->state = STATE_ERROR;
 | 
			
		||||
    }
 | 
			
		||||
    H.Clean();
 | 
			
		||||
    Util::wait(5000);
 | 
			
		||||
  }
 | 
			
		||||
  servConn.close();
 | 
			
		||||
  WARN_MSG("Monitoring thread for %s stopping", url.host.c_str());
 | 
			
		||||
  DL.getSocket().close();
 | 
			
		||||
  entry->state = STATE_REQCLEAN;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue