Socket library updates to support detecting sockets passed as FDs, removed use of peek() in HTTP request handler, fixed 100% CPU usage problem for unfinished HTTP requests
This commit is contained in:
		
							parent
							
								
									3cb03392e1
								
							
						
					
					
						commit
						6a6dd5d7ed
					
				
					 7 changed files with 145 additions and 162 deletions
				
			
		
							
								
								
									
										177
									
								
								lib/socket.cpp
									
										
									
									
									
								
							
							
						
						
									
										177
									
								
								lib/socket.cpp
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -375,9 +375,11 @@ void Socket::Buffer::clear(){
 | 
			
		|||
/// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection.
 | 
			
		||||
/// \param sockNo Integer representing the socket to convert.
 | 
			
		||||
Socket::Connection::Connection(int sockNo){
 | 
			
		||||
  sock = sockNo;
 | 
			
		||||
  pipes[0] = -1;
 | 
			
		||||
  pipes[1] = -1;
 | 
			
		||||
  sSend = sockNo;
 | 
			
		||||
  sRecv = -1;
 | 
			
		||||
  isTrueSocket = false;
 | 
			
		||||
  struct stat sBuf;
 | 
			
		||||
  if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);}
 | 
			
		||||
  up = 0;
 | 
			
		||||
  down = 0;
 | 
			
		||||
  conntime = Util::epoch();
 | 
			
		||||
| 
						 | 
				
			
			@ -390,9 +392,15 @@ Socket::Connection::Connection(int sockNo){
 | 
			
		|||
/// \param write The filedescriptor to write to.
 | 
			
		||||
/// \param read The filedescriptor to read from.
 | 
			
		||||
Socket::Connection::Connection(int write, int read){
 | 
			
		||||
  sock = -1;
 | 
			
		||||
  pipes[0] = write;
 | 
			
		||||
  pipes[1] = read;
 | 
			
		||||
  sSend = write;
 | 
			
		||||
  if (write != read){
 | 
			
		||||
    sRecv = read;
 | 
			
		||||
  }else{
 | 
			
		||||
    sRecv = -1;
 | 
			
		||||
  }
 | 
			
		||||
  isTrueSocket = false;
 | 
			
		||||
  struct stat sBuf;
 | 
			
		||||
  if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);}
 | 
			
		||||
  up = 0;
 | 
			
		||||
  down = 0;
 | 
			
		||||
  conntime = Util::epoch();
 | 
			
		||||
| 
						 | 
				
			
			@ -404,9 +412,9 @@ Socket::Connection::Connection(int write, int read){
 | 
			
		|||
/// Create a new disconnected base socket. This is a basic constructor for placeholder purposes.
 | 
			
		||||
/// A socket created like this is always disconnected and should/could be overwritten at some point.
 | 
			
		||||
Socket::Connection::Connection(){
 | 
			
		||||
  sock = -1;
 | 
			
		||||
  pipes[0] = -1;
 | 
			
		||||
  pipes[1] = -1;
 | 
			
		||||
  sSend = -1;
 | 
			
		||||
  sRecv = -1;
 | 
			
		||||
  isTrueSocket = false;
 | 
			
		||||
  up = 0;
 | 
			
		||||
  down = 0;
 | 
			
		||||
  conntime = Util::epoch();
 | 
			
		||||
| 
						 | 
				
			
			@ -447,16 +455,14 @@ bool isFDBlocking(int FD){
 | 
			
		|||
 | 
			
		||||
/// Set this socket to be blocking (true) or nonblocking (false).
 | 
			
		||||
void Socket::Connection::setBlocking(bool blocking){
 | 
			
		||||
  if (sock >= 0){setFDBlocking(sock, blocking);}
 | 
			
		||||
  if (pipes[0] >= 0){setFDBlocking(pipes[0], blocking);}
 | 
			
		||||
  if (pipes[1] >= 0){setFDBlocking(pipes[1], blocking);}
 | 
			
		||||
  if (sSend >= 0){setFDBlocking(sSend, blocking);}
 | 
			
		||||
  if (sRecv >= 0 && sSend != sRecv){setFDBlocking(sRecv, blocking);}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Set this socket to be blocking (true) or nonblocking (false).
 | 
			
		||||
bool Socket::Connection::isBlocking(){
 | 
			
		||||
  if (sock >= 0){return isFDBlocking(sock);}
 | 
			
		||||
  if (pipes[0] >= 0){return isFDBlocking(pipes[0]);}
 | 
			
		||||
  if (pipes[1] >= 0){return isFDBlocking(pipes[1]);}
 | 
			
		||||
  if (sSend >= 0){return isFDBlocking(sSend);}
 | 
			
		||||
  if (sRecv >= 0){return isFDBlocking(sRecv);}
 | 
			
		||||
  return false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -465,7 +471,7 @@ bool Socket::Connection::isBlocking(){
 | 
			
		|||
/// This function calls shutdown, thus making the socket unusable in all other
 | 
			
		||||
/// processes as well. Do not use on shared sockets that are still in use.
 | 
			
		||||
void Socket::Connection::close(){
 | 
			
		||||
  if (sock != -1){shutdown(sock, SHUT_RDWR);}
 | 
			
		||||
  if (sSend != -1){shutdown(sSend, SHUT_RDWR);}
 | 
			
		||||
  drop();
 | 
			
		||||
}// Socket::Connection::close
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -475,36 +481,31 @@ void Socket::Connection::close(){
 | 
			
		|||
/// processes.
 | 
			
		||||
void Socket::Connection::drop(){
 | 
			
		||||
  if (connected()){
 | 
			
		||||
    if (sock != -1){
 | 
			
		||||
      DEBUG_MSG(DLVL_HIGH, "Socket %d closed", sock);
 | 
			
		||||
    if (sSend != -1){
 | 
			
		||||
      HIGH_MSG("Socket %d closed", sSend);
 | 
			
		||||
      errno = EINTR;
 | 
			
		||||
      while (::close(sock) != 0 && errno == EINTR){}
 | 
			
		||||
      sock = -1;
 | 
			
		||||
      while (::close(sSend) != 0 && errno == EINTR){}
 | 
			
		||||
      if (sRecv == sSend){sRecv = -1;}
 | 
			
		||||
      sSend = -1;
 | 
			
		||||
    }
 | 
			
		||||
    if (pipes[0] != -1){
 | 
			
		||||
    if (sRecv != -1){
 | 
			
		||||
      errno = EINTR;
 | 
			
		||||
      while (::close(pipes[0]) != 0 && errno == EINTR){}
 | 
			
		||||
      pipes[0] = -1;
 | 
			
		||||
    }
 | 
			
		||||
    if (pipes[1] != -1){
 | 
			
		||||
      errno = EINTR;
 | 
			
		||||
      while (::close(pipes[1]) != 0 && errno == EINTR){}
 | 
			
		||||
      pipes[1] = -1;
 | 
			
		||||
      while (::close(sRecv) != 0 && errno == EINTR){}
 | 
			
		||||
      sRecv = -1;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}// Socket::Connection::drop
 | 
			
		||||
 | 
			
		||||
/// Returns internal socket number.
 | 
			
		||||
int Socket::Connection::getSocket(){
 | 
			
		||||
  if (sock != -1){return sock;}
 | 
			
		||||
  if (pipes[0] != -1){return pipes[0];}
 | 
			
		||||
  if (pipes[1] != -1){return pipes[1];}
 | 
			
		||||
  return -1;
 | 
			
		||||
  if (sSend != -1){return sSend;}
 | 
			
		||||
  return sRecv;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Returns non-piped internal socket number.
 | 
			
		||||
int Socket::Connection::getPureSocket(){
 | 
			
		||||
  return sock;
 | 
			
		||||
  if (!isTrueSocket){return -1;}
 | 
			
		||||
  return sSend;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Returns a string describing the last error that occured.
 | 
			
		||||
| 
						 | 
				
			
			@ -518,12 +519,12 @@ std::string Socket::Connection::getError(){
 | 
			
		|||
/// \param nonblock Whether the socket should be nonblocking. False by default.
 | 
			
		||||
Socket::Connection::Connection(std::string address, bool nonblock){
 | 
			
		||||
  skipCount = 0;
 | 
			
		||||
  pipes[0] = -1;
 | 
			
		||||
  pipes[1] = -1;
 | 
			
		||||
  sock = socket(PF_UNIX, SOCK_STREAM, 0);
 | 
			
		||||
  if (sock < 0){
 | 
			
		||||
  sRecv = -1;
 | 
			
		||||
  isTrueSocket = true;
 | 
			
		||||
  sSend = socket(PF_UNIX, SOCK_STREAM, 0);
 | 
			
		||||
  if (sSend < 0){
 | 
			
		||||
    remotehost = strerror(errno);
 | 
			
		||||
    DEBUG_MSG(DLVL_FAIL, "Could not create socket! Error: %s", remotehost.c_str());
 | 
			
		||||
    FAIL_MSG("Could not create socket! Error: %s", remotehost.c_str());
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
  Error = false;
 | 
			
		||||
| 
						 | 
				
			
			@ -534,19 +535,19 @@ Socket::Connection::Connection(std::string address, bool nonblock){
 | 
			
		|||
  sockaddr_un addr;
 | 
			
		||||
  addr.sun_family = AF_UNIX;
 | 
			
		||||
  strncpy(addr.sun_path, address.c_str(), address.size() + 1);
 | 
			
		||||
  int r = connect(sock, (sockaddr *)&addr, sizeof(addr));
 | 
			
		||||
  int r = connect(sSend, (sockaddr *)&addr, sizeof(addr));
 | 
			
		||||
  if (r == 0){
 | 
			
		||||
    if (nonblock){
 | 
			
		||||
      int flags = fcntl(sock, F_GETFL, 0);
 | 
			
		||||
      int flags = fcntl(sSend, F_GETFL, 0);
 | 
			
		||||
      flags |= O_NONBLOCK;
 | 
			
		||||
      fcntl(sock, F_SETFL, flags);
 | 
			
		||||
      fcntl(sSend, F_SETFL, flags);
 | 
			
		||||
    }
 | 
			
		||||
  }else{
 | 
			
		||||
    remotehost = strerror(errno);
 | 
			
		||||
    DEBUG_MSG(DLVL_FAIL, "Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str());
 | 
			
		||||
    FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str());
 | 
			
		||||
    close();
 | 
			
		||||
  }
 | 
			
		||||
}// Socket::Connection Unix Contructor
 | 
			
		||||
}// Socket::Connection Unix Constructor
 | 
			
		||||
 | 
			
		||||
/// Create a new TCP Socket. This socket will (try to) connect to the given host/port right away.
 | 
			
		||||
/// \param host String containing the hostname to connect to.
 | 
			
		||||
| 
						 | 
				
			
			@ -554,8 +555,8 @@ Socket::Connection::Connection(std::string address, bool nonblock){
 | 
			
		|||
/// \param nonblock Whether the socket should be nonblocking.
 | 
			
		||||
Socket::Connection::Connection(std::string host, int port, bool nonblock){
 | 
			
		||||
  skipCount = 0;
 | 
			
		||||
  pipes[0] = -1;
 | 
			
		||||
  pipes[1] = -1;
 | 
			
		||||
  sRecv = -1;
 | 
			
		||||
  isTrueSocket = true;
 | 
			
		||||
  struct addrinfo *result, *rp, hints;
 | 
			
		||||
  Error = false;
 | 
			
		||||
  Blocking = false;
 | 
			
		||||
| 
						 | 
				
			
			@ -578,11 +579,11 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){
 | 
			
		|||
 | 
			
		||||
  remotehost = "";
 | 
			
		||||
  for (rp = result; rp != NULL; rp = rp->ai_next){
 | 
			
		||||
    sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
 | 
			
		||||
    if (sock < 0){continue;}
 | 
			
		||||
    if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0){break;}
 | 
			
		||||
    sSend = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
 | 
			
		||||
    if (sSend < 0){continue;}
 | 
			
		||||
    if (connect(sSend, rp->ai_addr, rp->ai_addrlen) == 0){break;}
 | 
			
		||||
    remotehost += strerror(errno);
 | 
			
		||||
    ::close(sock);
 | 
			
		||||
    ::close(sSend);
 | 
			
		||||
  }
 | 
			
		||||
  freeaddrinfo(result);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -591,15 +592,15 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){
 | 
			
		|||
    close();
 | 
			
		||||
  }else{
 | 
			
		||||
    if (nonblock){
 | 
			
		||||
      int flags = fcntl(sock, F_GETFL, 0);
 | 
			
		||||
      int flags = fcntl(sSend, F_GETFL, 0);
 | 
			
		||||
      flags |= O_NONBLOCK;
 | 
			
		||||
      fcntl(sock, F_SETFL, flags);
 | 
			
		||||
      fcntl(sSend, F_SETFL, flags);
 | 
			
		||||
    }
 | 
			
		||||
    int optval = 1;
 | 
			
		||||
    int optlen = sizeof(optval);
 | 
			
		||||
    setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
 | 
			
		||||
    setsockopt(sSend, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
 | 
			
		||||
  }
 | 
			
		||||
}// Socket::Connection TCP Contructor
 | 
			
		||||
}// Socket::Connection TCP Constructor
 | 
			
		||||
 | 
			
		||||
/// Returns the connected-state for this socket.
 | 
			
		||||
/// Note that this function might be slightly behind the real situation.
 | 
			
		||||
| 
						 | 
				
			
			@ -607,7 +608,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){
 | 
			
		|||
/// and when the socket is closed manually.
 | 
			
		||||
/// \returns True if socket is connected, false otherwise.
 | 
			
		||||
bool Socket::Connection::connected() const{
 | 
			
		||||
  return (sock >= 0) || ((pipes[0] >= 0) || (pipes[1] >= 0));
 | 
			
		||||
  return (sSend >= 0) || (sRecv >= 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Returns the time this socket has been connected.
 | 
			
		||||
| 
						 | 
				
			
			@ -701,10 +702,10 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){
 | 
			
		|||
    }
 | 
			
		||||
  }
 | 
			
		||||
  int r;
 | 
			
		||||
  if (sock >= 0){
 | 
			
		||||
    r = send(sock, buffer, len, 0);
 | 
			
		||||
  if (isTrueSocket){
 | 
			
		||||
    r = send(sSend, buffer, len, 0);
 | 
			
		||||
  }else{
 | 
			
		||||
    r = write(pipes[0], buffer, len);
 | 
			
		||||
    r = write(sSend, buffer, len);
 | 
			
		||||
  }
 | 
			
		||||
  if (r < 0){
 | 
			
		||||
    switch (errno){
 | 
			
		||||
| 
						 | 
				
			
			@ -717,7 +718,7 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){
 | 
			
		|||
      break;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  if (r == 0 && (sock >= 0)){
 | 
			
		||||
  if (r == 0 && (sSend >= 0)){
 | 
			
		||||
    DONTEVEN_MSG("Socket closed by remote");
 | 
			
		||||
    close();
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -734,11 +735,10 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){
 | 
			
		|||
int Socket::Connection::iread(void *buffer, int len, int flags){
 | 
			
		||||
  if (!connected() || len < 1){return 0;}
 | 
			
		||||
  int r;
 | 
			
		||||
  if (sock >= 0){
 | 
			
		||||
    r = recv(sock, buffer, len, flags);
 | 
			
		||||
  if (sRecv != -1 || !isTrueSocket){
 | 
			
		||||
    r = read(sRecv, buffer, len);
 | 
			
		||||
  }else{
 | 
			
		||||
    r = recv(pipes[1], buffer, len, flags);
 | 
			
		||||
    if (r < 0 && errno == ENOTSOCK){r = read(pipes[1], buffer, len);}
 | 
			
		||||
    r = recv(sSend, buffer, len, flags);
 | 
			
		||||
  }
 | 
			
		||||
  if (r < 0){
 | 
			
		||||
    switch (errno){
 | 
			
		||||
| 
						 | 
				
			
			@ -822,13 +822,13 @@ void Socket::Connection::setHost(std::string host){
 | 
			
		|||
/// Returns true if these sockets are the same socket.
 | 
			
		||||
/// Does not check the internal stats - only the socket itself.
 | 
			
		||||
bool Socket::Connection::operator==(const Connection &B) const{
 | 
			
		||||
  return sock == B.sock && pipes[0] == B.pipes[0] && pipes[1] == B.pipes[1];
 | 
			
		||||
  return sSend == B.sSend && sRecv == B.sRecv;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Returns true if these sockets are not the same socket.
 | 
			
		||||
/// Does not check the internal stats - only the socket itself.
 | 
			
		||||
bool Socket::Connection::operator!=(const Connection &B) const{
 | 
			
		||||
  return sock != B.sock || pipes[0] != B.pipes[0] || pipes[1] != B.pipes[1];
 | 
			
		||||
  return sSend != B.sSend || sRecv != B.sRecv;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Returns true if the socket is valid.
 | 
			
		||||
| 
						 | 
				
			
			@ -1025,7 +1025,7 @@ unsigned int Socket::SSLConnection::iwrite(const void *buffer, int len){
 | 
			
		|||
      break;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  if (r == 0 && (sock >= 0)){
 | 
			
		||||
  if (r == 0 && (sSend >= 0)){
 | 
			
		||||
    DONTEVEN_MSG("Socket closed by remote");
 | 
			
		||||
    close();
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -1234,36 +1234,35 @@ Socket::Connection Socket::Server::accept(bool nonblock){
 | 
			
		|||
  int r = ::accept(sock, (sockaddr *)&tmpaddr, &len);
 | 
			
		||||
  // set the socket to be nonblocking, if requested.
 | 
			
		||||
  // we could do this through accept4 with a flag, but that call is non-standard...
 | 
			
		||||
  if ((r >= 0) && nonblock){
 | 
			
		||||
  if (r < 0){
 | 
			
		||||
    if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)){
 | 
			
		||||
      FAIL_MSG("Error during accept: %s. Closing server socket %d.", strerror(errno), sock);
 | 
			
		||||
      close();
 | 
			
		||||
    }
 | 
			
		||||
    return Socket::Connection();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (nonblock){
 | 
			
		||||
    int flags = fcntl(r, F_GETFL, 0);
 | 
			
		||||
    flags |= O_NONBLOCK;
 | 
			
		||||
    fcntl(r, F_SETFL, flags);
 | 
			
		||||
  }
 | 
			
		||||
  if (r >= 0){
 | 
			
		||||
    int optval = 1;
 | 
			
		||||
    int optlen = sizeof(optval);
 | 
			
		||||
    setsockopt(r, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
 | 
			
		||||
  }
 | 
			
		||||
  int optval = 1;
 | 
			
		||||
  int optlen = sizeof(optval);
 | 
			
		||||
  setsockopt(r, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
 | 
			
		||||
  Socket::Connection tmp(r);
 | 
			
		||||
  tmp.remoteaddr = tmpaddr;
 | 
			
		||||
  if (r < 0){
 | 
			
		||||
    if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)){
 | 
			
		||||
      DEBUG_MSG(DLVL_FAIL, "Error during accept - closing server socket %d.", sock);
 | 
			
		||||
      close();
 | 
			
		||||
    }
 | 
			
		||||
  }else{
 | 
			
		||||
    if (tmpaddr.sin6_family == AF_INET6){
 | 
			
		||||
      tmp.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN);
 | 
			
		||||
      DEBUG_MSG(DLVL_HIGH, "IPv6 addr [%s]", tmp.remotehost.c_str());
 | 
			
		||||
    }
 | 
			
		||||
    if (tmpaddr.sin6_family == AF_INET){
 | 
			
		||||
      tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN);
 | 
			
		||||
      DEBUG_MSG(DLVL_HIGH, "IPv4 addr [%s]", tmp.remotehost.c_str());
 | 
			
		||||
    }
 | 
			
		||||
    if (tmpaddr.sin6_family == AF_UNIX){
 | 
			
		||||
      DEBUG_MSG(DLVL_HIGH, "Unix connection");
 | 
			
		||||
      tmp.remotehost = "UNIX_SOCKET";
 | 
			
		||||
    }
 | 
			
		||||
  if (tmpaddr.sin6_family == AF_INET6){
 | 
			
		||||
    tmp.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN);
 | 
			
		||||
    DEBUG_MSG(DLVL_HIGH, "IPv6 addr [%s]", tmp.remotehost.c_str());
 | 
			
		||||
  }
 | 
			
		||||
  if (tmpaddr.sin6_family == AF_INET){
 | 
			
		||||
    tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN);
 | 
			
		||||
    DEBUG_MSG(DLVL_HIGH, "IPv4 addr [%s]", tmp.remotehost.c_str());
 | 
			
		||||
  }
 | 
			
		||||
  if (tmpaddr.sin6_family == AF_UNIX){
 | 
			
		||||
    DEBUG_MSG(DLVL_HIGH, "Unix connection");
 | 
			
		||||
    tmp.remotehost = "UNIX_SOCKET";
 | 
			
		||||
  }
 | 
			
		||||
  return tmp;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -66,10 +66,13 @@ namespace Socket{
 | 
			
		|||
  // Buffer
 | 
			
		||||
 | 
			
		||||
  /// This class is for easy communicating through sockets, either TCP or Unix.
 | 
			
		||||
  /// Internally, sSend and sRecv hold the file descriptor to read/write from/to.
 | 
			
		||||
  /// If they are not identical and sRecv is closed but sSend still open, reading from sSend will be attempted.
 | 
			
		||||
  class Connection{
 | 
			
		||||
  protected:
 | 
			
		||||
    int sock;               ///< Internally saved socket number.
 | 
			
		||||
    int pipes[2];           ///< Internally saved file descriptors for pipe socket simulation.
 | 
			
		||||
    bool isTrueSocket;
 | 
			
		||||
    int sSend;               ///< Write end of socket.
 | 
			
		||||
    int sRecv;               ///< Read end of socket.
 | 
			
		||||
    std::string remotehost; ///< Stores remote host address.
 | 
			
		||||
    struct sockaddr_in6 remoteaddr;///< Stores remote host address.
 | 
			
		||||
    uint64_t up;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -253,7 +253,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
 | 
			
		|||
  
 | 
			
		||||
  if (pid == 0){
 | 
			
		||||
    Socket::Connection io(0, 1);
 | 
			
		||||
    io.close();
 | 
			
		||||
    io.drop();
 | 
			
		||||
    DEBUG_MSG(DLVL_DONTEVEN, "execvp");
 | 
			
		||||
    execvp(argv[0], argv);
 | 
			
		||||
    FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno));
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -528,6 +528,7 @@ namespace Mist{
 | 
			
		|||
    }
 | 
			
		||||
    
 | 
			
		||||
    if (!keepGoing()){
 | 
			
		||||
      INFO_MSG("Aborting page load due to shutdown");
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,6 +12,10 @@ namespace Mist {
 | 
			
		|||
    if (config->getString("ip").size()){
 | 
			
		||||
      myConn.setHost(config->getString("ip"));
 | 
			
		||||
    }
 | 
			
		||||
    firstRun = true;
 | 
			
		||||
    if (config->getString("prequest").size()){
 | 
			
		||||
      myConn.Received().prepend(config->getString("prequest"));
 | 
			
		||||
    }
 | 
			
		||||
    config->activate();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -33,14 +37,19 @@ namespace Mist {
 | 
			
		|||
    capa["forward"]["ip"]["help"] = "IP of forwarded connection.";
 | 
			
		||||
    capa["forward"]["ip"]["type"] = "str";
 | 
			
		||||
    capa["forward"]["ip"]["option"] = "--ip";
 | 
			
		||||
    capa["forward"]["ip"]["name"] = "Previous request";
 | 
			
		||||
    capa["forward"]["ip"]["help"] = "Data to pretend arrived on the socket before parsing the socket.";
 | 
			
		||||
    capa["forward"]["ip"]["type"] = "str";
 | 
			
		||||
    capa["forward"]["ip"]["option"] = "--prequest";
 | 
			
		||||
    cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}"));
 | 
			
		||||
    cfg->addOption("ip", JSON::fromString("{\"arg\":\"string\",\"short\":\"I\",\"long\":\"ip\",\"help\":\"IP address of connection on stdio.\"}"));
 | 
			
		||||
    cfg->addOption("prequest", JSON::fromString("{\"arg\":\"string\",\"short\":\"R\",\"long\":\"prequest\",\"help\":\"Data to pretend arrived on the socket before parsing the socket.\"}"));
 | 
			
		||||
    cfg->addBasicConnectorOptions(capa);
 | 
			
		||||
    config = cfg;
 | 
			
		||||
  }
 | 
			
		||||
  
 | 
			
		||||
  void HTTPOutput::onFail(const std::string & msg, bool critical){
 | 
			
		||||
    INFO_MSG("Failing '%s': %s: %s", streamName.c_str(), H.url.c_str(), msg.c_str());
 | 
			
		||||
    INFO_MSG("Failing '%s': %s", H.url.c_str(), msg.c_str());
 | 
			
		||||
    if (!webSock){
 | 
			
		||||
      H.Clean(); //make sure no parts of old requests are left in any buffers
 | 
			
		||||
      H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
 | 
			
		||||
| 
						 | 
				
			
			@ -165,80 +174,49 @@ namespace Mist {
 | 
			
		|||
  }
 | 
			
		||||
  
 | 
			
		||||
  void HTTPOutput::requestHandler(){
 | 
			
		||||
    //Handle onIdle function caller, if needed
 | 
			
		||||
    if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){
 | 
			
		||||
      onIdle();
 | 
			
		||||
      idleLast = Util::bootMS();
 | 
			
		||||
    }
 | 
			
		||||
    //Handle websockets
 | 
			
		||||
    if (webSock){
 | 
			
		||||
      if (webSock->readFrame()){
 | 
			
		||||
        onWebsocketFrame();
 | 
			
		||||
        idleLast = Util::bootMS();
 | 
			
		||||
      }else{
 | 
			
		||||
        if (!isBlocking && !parseData){
 | 
			
		||||
          Util::sleep(100);
 | 
			
		||||
        }
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      if (!isBlocking && !parseData){Util::sleep(100);}
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    if (myConn.Received().size() && myConn.spool()){
 | 
			
		||||
      DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
 | 
			
		||||
      onRequest();
 | 
			
		||||
    }else{
 | 
			
		||||
      if (!myConn.Received().size()){
 | 
			
		||||
        if (myConn.peek() && H.Read(myConn)){
 | 
			
		||||
          std::string handler = getHandler();
 | 
			
		||||
          DEBUG_MSG(DLVL_MEDIUM, "Received request: %s => %s (%s)", H.getUrl().c_str(), handler.c_str(), H.GetVar("stream").c_str());
 | 
			
		||||
          if (!handler.size()){
 | 
			
		||||
            H.Clean();
 | 
			
		||||
            H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
 | 
			
		||||
            H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
 | 
			
		||||
            H.SendResponse("415", "Unsupported Media Type", myConn);
 | 
			
		||||
            myConn.close();
 | 
			
		||||
            return;
 | 
			
		||||
          }
 | 
			
		||||
          if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){
 | 
			
		||||
            DEBUG_MSG(DLVL_MEDIUM, "Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
 | 
			
		||||
            streamName = H.GetVar("stream");
 | 
			
		||||
            nProxy.userClient.finish();
 | 
			
		||||
            statsPage.finish();
 | 
			
		||||
            reConnector(handler);
 | 
			
		||||
            H.Clean();
 | 
			
		||||
            if (myConn.connected()){
 | 
			
		||||
              FAIL_MSG("Request failed - no connector started");
 | 
			
		||||
              myConn.close();
 | 
			
		||||
            }
 | 
			
		||||
            return;
 | 
			
		||||
          }else{
 | 
			
		||||
            H.Clean();
 | 
			
		||||
            myConn.Received().clear();
 | 
			
		||||
            myConn.spool();
 | 
			
		||||
            DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
 | 
			
		||||
            onRequest();
 | 
			
		||||
          }
 | 
			
		||||
        }else{
 | 
			
		||||
          H.Clean();
 | 
			
		||||
          if (myConn.Received().size()){
 | 
			
		||||
            myConn.Received().clear();
 | 
			
		||||
            myConn.spool();
 | 
			
		||||
            DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
 | 
			
		||||
            onRequest();
 | 
			
		||||
          }
 | 
			
		||||
          if (!myConn.Received().size()){
 | 
			
		||||
            if (!isBlocking && !parseData){
 | 
			
		||||
              Util::sleep(100);
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }else{
 | 
			
		||||
        if (!isBlocking && !parseData){
 | 
			
		||||
          Util::sleep(100);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    //If we can't read anything more and we're non-blocking, sleep some.
 | 
			
		||||
    if (!firstRun && !myConn.spool()){
 | 
			
		||||
      if (!isBlocking && !parseData){Util::sleep(100);}
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  
 | 
			
		||||
  void HTTPOutput::onRequest(){
 | 
			
		||||
    firstRun = false;
 | 
			
		||||
 | 
			
		||||
    while (H.Read(myConn)){
 | 
			
		||||
      std::string handler = getHandler();
 | 
			
		||||
      INFO_MSG("Received request: %s => %s (%s)", H.getUrl().c_str(), handler.c_str(), H.GetVar("stream").c_str());
 | 
			
		||||
      if (!handler.size()){
 | 
			
		||||
        H.Clean();
 | 
			
		||||
        H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
 | 
			
		||||
        H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
 | 
			
		||||
        H.SendResponse("415", "Unsupported Media Type", myConn);
 | 
			
		||||
        myConn.close();
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){
 | 
			
		||||
        MEDIUM_MSG("Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
 | 
			
		||||
        streamName = H.GetVar("stream");
 | 
			
		||||
        nProxy.userClient.finish();
 | 
			
		||||
        statsPage.finish();
 | 
			
		||||
        reConnector(handler);
 | 
			
		||||
        onFail("Server error - could not start connector", true);
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (H.hasHeader("User-Agent")){
 | 
			
		||||
        UA = H.GetHeader("User-Agent");
 | 
			
		||||
      }
 | 
			
		||||
| 
						 | 
				
			
			@ -255,7 +233,6 @@ namespace Mist {
 | 
			
		|||
        crc = checksum::crc32(0, mixed_ua.data(), mixed_ua.size());
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      INFO_MSG("Received request %s", H.getUrl().c_str());
 | 
			
		||||
      //Handle upgrade to websocket if the output supports it
 | 
			
		||||
      if (doesWebsockets() && H.GetHeader("Upgrade") == "websocket"){
 | 
			
		||||
        INFO_MSG("Switching to Websocket mode");
 | 
			
		||||
| 
						 | 
				
			
			@ -309,13 +286,11 @@ namespace Mist {
 | 
			
		|||
  }
 | 
			
		||||
  
 | 
			
		||||
  ///\brief Handles requests by starting a corresponding output process.
 | 
			
		||||
  ///\param H The request to be handled
 | 
			
		||||
  ///\param conn The connection to the client that issued the request.
 | 
			
		||||
  ///\param connector The type of connector to be invoked.
 | 
			
		||||
  void HTTPOutput::reConnector(std::string & connector){
 | 
			
		||||
    //taken from CheckProtocols (controller_connectors.cpp)
 | 
			
		||||
    char * argarr[20];
 | 
			
		||||
    for (int i=0; i<20; i++){argarr[i] = 0;}
 | 
			
		||||
    char * argarr[32];
 | 
			
		||||
    for (int i=0; i<32; i++){argarr[i] = 0;}
 | 
			
		||||
    int id = -1;
 | 
			
		||||
    JSON::Value pipedCapa;
 | 
			
		||||
    JSON::Value p;//properties of protocol
 | 
			
		||||
| 
						 | 
				
			
			@ -358,6 +333,8 @@ namespace Mist {
 | 
			
		|||
 | 
			
		||||
    //build arguments for starting output process
 | 
			
		||||
    std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector;
 | 
			
		||||
    std::string tmpPrequest;
 | 
			
		||||
    if (H.url.size()){tmpPrequest = H.BuildRequest();}
 | 
			
		||||
    int argnum = 0;
 | 
			
		||||
    argarr[argnum++] = (char*)tmparg.c_str();
 | 
			
		||||
    std::string temphost=getConnectedHost();
 | 
			
		||||
| 
						 | 
				
			
			@ -366,6 +343,8 @@ namespace Mist {
 | 
			
		|||
    argarr[argnum++] = (char*)(temphost.c_str());
 | 
			
		||||
    argarr[argnum++] = (char*)"--stream";
 | 
			
		||||
    argarr[argnum++] = (char*)(streamName.c_str());
 | 
			
		||||
    argarr[argnum++] = (char*)"--prequest";
 | 
			
		||||
    argarr[argnum++] = (char*)(tmpPrequest.c_str());
 | 
			
		||||
    //set the debug level if non-default
 | 
			
		||||
    if (Util::Config::printDebugLevel != DEBUG){
 | 
			
		||||
      argarr[argnum++] = (char*)"--debug";
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,7 +11,6 @@ namespace Mist {
 | 
			
		|||
      HTTPOutput(Socket::Connection & conn);
 | 
			
		||||
      virtual ~HTTPOutput();
 | 
			
		||||
      static void init(Util::Config * cfg);
 | 
			
		||||
      void onRequest();
 | 
			
		||||
      virtual void onFail(const std::string & msg, bool critical = false);
 | 
			
		||||
      virtual void onHTTP(){};
 | 
			
		||||
      virtual void onIdle(){};
 | 
			
		||||
| 
						 | 
				
			
			@ -26,6 +25,7 @@ namespace Mist {
 | 
			
		|||
      std::string getHandler();
 | 
			
		||||
      bool parseRange(uint64_t & byteStart, uint64_t & byteEnd);
 | 
			
		||||
  protected:
 | 
			
		||||
      bool firstRun;
 | 
			
		||||
      HTTP::Parser H;
 | 
			
		||||
      HTTP::Websocket * webSock;
 | 
			
		||||
      uint32_t idleInterval;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,12 +36,13 @@ namespace Mist {
 | 
			
		|||
 | 
			
		||||
  OutHTTP::OutHTTP(Socket::Connection & conn) : HTTPOutput(conn){
 | 
			
		||||
    stayConnected = false;
 | 
			
		||||
    if (myConn.getPureSocket() >= 0){
 | 
			
		||||
    //If this connection is a socket and not already connected to stdio, connect it to stdio.
 | 
			
		||||
    if (myConn.getPureSocket() != -1 && myConn.getSocket() != STDIN_FILENO && myConn.getSocket() != STDOUT_FILENO){
 | 
			
		||||
      std::string host = getConnectedHost();
 | 
			
		||||
      dup2(myConn.getSocket(), STDIN_FILENO);
 | 
			
		||||
      dup2(myConn.getSocket(), STDOUT_FILENO);
 | 
			
		||||
      myConn.drop();
 | 
			
		||||
      myConn = Socket::Connection(fileno(stdout),fileno(stdin) );
 | 
			
		||||
      myConn = Socket::Connection(STDOUT_FILENO, STDIN_FILENO);
 | 
			
		||||
      myConn.setHost(host);
 | 
			
		||||
    }
 | 
			
		||||
    if (config->getOption("wrappers",true).size() == 0 || config->getString("wrappers") == ""){
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue