Socket library updates to support detecting sockets passed as FDs, removed use of peek() in HTTP request handler, fixed 100% CPU usage problem for unfinished HTTP requests
This commit is contained in:
		
							parent
							
								
									83665c4e3b
								
							
						
					
					
						commit
						ec0b19b92c
					
				
					 7 changed files with 148 additions and 165 deletions
				
			
		
							
								
								
									
										177
									
								
								lib/socket.cpp
									
										
									
									
									
								
							
							
						
						
									
										177
									
								
								lib/socket.cpp
									
										
									
									
									
								
							|  | @ -375,9 +375,11 @@ void Socket::Buffer::clear(){ | |||
| /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection.
 | ||||
| /// \param sockNo Integer representing the socket to convert.
 | ||||
| Socket::Connection::Connection(int sockNo){ | ||||
|   sock = sockNo; | ||||
|   pipes[0] = -1; | ||||
|   pipes[1] = -1; | ||||
|   sSend = sockNo; | ||||
|   sRecv = -1; | ||||
|   isTrueSocket = false; | ||||
|   struct stat sBuf; | ||||
|   if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);} | ||||
|   up = 0; | ||||
|   down = 0; | ||||
|   conntime = Util::epoch(); | ||||
|  | @ -390,9 +392,15 @@ Socket::Connection::Connection(int sockNo){ | |||
| /// \param write The filedescriptor to write to.
 | ||||
| /// \param read The filedescriptor to read from.
 | ||||
| Socket::Connection::Connection(int write, int read){ | ||||
|   sock = -1; | ||||
|   pipes[0] = write; | ||||
|   pipes[1] = read; | ||||
|   sSend = write; | ||||
|   if (write != read){ | ||||
|     sRecv = read; | ||||
|   }else{ | ||||
|     sRecv = -1; | ||||
|   } | ||||
|   isTrueSocket = false; | ||||
|   struct stat sBuf; | ||||
|   if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);} | ||||
|   up = 0; | ||||
|   down = 0; | ||||
|   conntime = Util::epoch(); | ||||
|  | @ -404,9 +412,9 @@ Socket::Connection::Connection(int write, int read){ | |||
| /// Create a new disconnected base socket. This is a basic constructor for placeholder purposes.
 | ||||
| /// A socket created like this is always disconnected and should/could be overwritten at some point.
 | ||||
| Socket::Connection::Connection(){ | ||||
|   sock = -1; | ||||
|   pipes[0] = -1; | ||||
|   pipes[1] = -1; | ||||
|   sSend = -1; | ||||
|   sRecv = -1; | ||||
|   isTrueSocket = false; | ||||
|   up = 0; | ||||
|   down = 0; | ||||
|   conntime = Util::epoch(); | ||||
|  | @ -447,16 +455,14 @@ bool isFDBlocking(int FD){ | |||
| 
 | ||||
| /// Set this socket to be blocking (true) or nonblocking (false).
 | ||||
| void Socket::Connection::setBlocking(bool blocking){ | ||||
|   if (sock >= 0){setFDBlocking(sock, blocking);} | ||||
|   if (pipes[0] >= 0){setFDBlocking(pipes[0], blocking);} | ||||
|   if (pipes[1] >= 0){setFDBlocking(pipes[1], blocking);} | ||||
|   if (sSend >= 0){setFDBlocking(sSend, blocking);} | ||||
|   if (sRecv >= 0 && sSend != sRecv){setFDBlocking(sRecv, blocking);} | ||||
| } | ||||
| 
 | ||||
| /// Set this socket to be blocking (true) or nonblocking (false).
 | ||||
| bool Socket::Connection::isBlocking(){ | ||||
|   if (sock >= 0){return isFDBlocking(sock);} | ||||
|   if (pipes[0] >= 0){return isFDBlocking(pipes[0]);} | ||||
|   if (pipes[1] >= 0){return isFDBlocking(pipes[1]);} | ||||
|   if (sSend >= 0){return isFDBlocking(sSend);} | ||||
|   if (sRecv >= 0){return isFDBlocking(sRecv);} | ||||
|   return false; | ||||
| } | ||||
| 
 | ||||
|  | @ -465,7 +471,7 @@ bool Socket::Connection::isBlocking(){ | |||
| /// This function calls shutdown, thus making the socket unusable in all other
 | ||||
| /// processes as well. Do not use on shared sockets that are still in use.
 | ||||
| void Socket::Connection::close(){ | ||||
|   if (sock != -1){shutdown(sock, SHUT_RDWR);} | ||||
|   if (sSend != -1){shutdown(sSend, SHUT_RDWR);} | ||||
|   drop(); | ||||
| }// Socket::Connection::close
 | ||||
| 
 | ||||
|  | @ -475,36 +481,31 @@ void Socket::Connection::close(){ | |||
| /// processes.
 | ||||
| void Socket::Connection::drop(){ | ||||
|   if (connected()){ | ||||
|     if (sock != -1){ | ||||
|       DEBUG_MSG(DLVL_HIGH, "Socket %d closed", sock); | ||||
|     if (sSend != -1){ | ||||
|       HIGH_MSG("Socket %d closed", sSend); | ||||
|       errno = EINTR; | ||||
|       while (::close(sock) != 0 && errno == EINTR){} | ||||
|       sock = -1; | ||||
|       while (::close(sSend) != 0 && errno == EINTR){} | ||||
|       if (sRecv == sSend){sRecv = -1;} | ||||
|       sSend = -1; | ||||
|     } | ||||
|     if (pipes[0] != -1){ | ||||
|     if (sRecv != -1){ | ||||
|       errno = EINTR; | ||||
|       while (::close(pipes[0]) != 0 && errno == EINTR){} | ||||
|       pipes[0] = -1; | ||||
|     } | ||||
|     if (pipes[1] != -1){ | ||||
|       errno = EINTR; | ||||
|       while (::close(pipes[1]) != 0 && errno == EINTR){} | ||||
|       pipes[1] = -1; | ||||
|       while (::close(sRecv) != 0 && errno == EINTR){} | ||||
|       sRecv = -1; | ||||
|     } | ||||
|   } | ||||
| }// Socket::Connection::drop
 | ||||
| 
 | ||||
| /// Returns internal socket number.
 | ||||
| int Socket::Connection::getSocket(){ | ||||
|   if (sock != -1){return sock;} | ||||
|   if (pipes[0] != -1){return pipes[0];} | ||||
|   if (pipes[1] != -1){return pipes[1];} | ||||
|   return -1; | ||||
|   if (sSend != -1){return sSend;} | ||||
|   return sRecv; | ||||
| } | ||||
| 
 | ||||
| /// Returns non-piped internal socket number.
 | ||||
| int Socket::Connection::getPureSocket(){ | ||||
|   return sock; | ||||
|   if (!isTrueSocket){return -1;} | ||||
|   return sSend; | ||||
| } | ||||
| 
 | ||||
| /// Returns a string describing the last error that occured.
 | ||||
|  | @ -518,12 +519,12 @@ std::string Socket::Connection::getError(){ | |||
| /// \param nonblock Whether the socket should be nonblocking. False by default.
 | ||||
| Socket::Connection::Connection(std::string address, bool nonblock){ | ||||
|   skipCount = 0; | ||||
|   pipes[0] = -1; | ||||
|   pipes[1] = -1; | ||||
|   sock = socket(PF_UNIX, SOCK_STREAM, 0); | ||||
|   if (sock < 0){ | ||||
|   sRecv = -1; | ||||
|   isTrueSocket = true; | ||||
|   sSend = socket(PF_UNIX, SOCK_STREAM, 0); | ||||
|   if (sSend < 0){ | ||||
|     remotehost = strerror(errno); | ||||
|     DEBUG_MSG(DLVL_FAIL, "Could not create socket! Error: %s", remotehost.c_str()); | ||||
|     FAIL_MSG("Could not create socket! Error: %s", remotehost.c_str()); | ||||
|     return; | ||||
|   } | ||||
|   Error = false; | ||||
|  | @ -534,19 +535,19 @@ Socket::Connection::Connection(std::string address, bool nonblock){ | |||
|   sockaddr_un addr; | ||||
|   addr.sun_family = AF_UNIX; | ||||
|   strncpy(addr.sun_path, address.c_str(), address.size() + 1); | ||||
|   int r = connect(sock, (sockaddr *)&addr, sizeof(addr)); | ||||
|   int r = connect(sSend, (sockaddr *)&addr, sizeof(addr)); | ||||
|   if (r == 0){ | ||||
|     if (nonblock){ | ||||
|       int flags = fcntl(sock, F_GETFL, 0); | ||||
|       int flags = fcntl(sSend, F_GETFL, 0); | ||||
|       flags |= O_NONBLOCK; | ||||
|       fcntl(sock, F_SETFL, flags); | ||||
|       fcntl(sSend, F_SETFL, flags); | ||||
|     } | ||||
|   }else{ | ||||
|     remotehost = strerror(errno); | ||||
|     DEBUG_MSG(DLVL_FAIL, "Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); | ||||
|     FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); | ||||
|     close(); | ||||
|   } | ||||
| }// Socket::Connection Unix Contructor
 | ||||
| }// Socket::Connection Unix Constructor
 | ||||
| 
 | ||||
| /// Create a new TCP Socket. This socket will (try to) connect to the given host/port right away.
 | ||||
| /// \param host String containing the hostname to connect to.
 | ||||
|  | @ -554,8 +555,8 @@ Socket::Connection::Connection(std::string address, bool nonblock){ | |||
| /// \param nonblock Whether the socket should be nonblocking.
 | ||||
| Socket::Connection::Connection(std::string host, int port, bool nonblock){ | ||||
|   skipCount = 0; | ||||
|   pipes[0] = -1; | ||||
|   pipes[1] = -1; | ||||
|   sRecv = -1; | ||||
|   isTrueSocket = true; | ||||
|   struct addrinfo *result, *rp, hints; | ||||
|   Error = false; | ||||
|   Blocking = false; | ||||
|  | @ -578,11 +579,11 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ | |||
| 
 | ||||
|   remotehost = ""; | ||||
|   for (rp = result; rp != NULL; rp = rp->ai_next){ | ||||
|     sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); | ||||
|     if (sock < 0){continue;} | ||||
|     if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0){break;} | ||||
|     sSend = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); | ||||
|     if (sSend < 0){continue;} | ||||
|     if (connect(sSend, rp->ai_addr, rp->ai_addrlen) == 0){break;} | ||||
|     remotehost += strerror(errno); | ||||
|     ::close(sock); | ||||
|     ::close(sSend); | ||||
|   } | ||||
|   freeaddrinfo(result); | ||||
| 
 | ||||
|  | @ -591,15 +592,15 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ | |||
|     close(); | ||||
|   }else{ | ||||
|     if (nonblock){ | ||||
|       int flags = fcntl(sock, F_GETFL, 0); | ||||
|       int flags = fcntl(sSend, F_GETFL, 0); | ||||
|       flags |= O_NONBLOCK; | ||||
|       fcntl(sock, F_SETFL, flags); | ||||
|       fcntl(sSend, F_SETFL, flags); | ||||
|     } | ||||
|     int optval = 1; | ||||
|     int optlen = sizeof(optval); | ||||
|     setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); | ||||
|     setsockopt(sSend, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); | ||||
|   } | ||||
| }// Socket::Connection TCP Contructor
 | ||||
| }// Socket::Connection TCP Constructor
 | ||||
| 
 | ||||
| /// Returns the connected-state for this socket.
 | ||||
| /// Note that this function might be slightly behind the real situation.
 | ||||
|  | @ -607,7 +608,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ | |||
| /// and when the socket is closed manually.
 | ||||
| /// \returns True if socket is connected, false otherwise.
 | ||||
| bool Socket::Connection::connected() const{ | ||||
|   return (sock >= 0) || ((pipes[0] >= 0) || (pipes[1] >= 0)); | ||||
|   return (sSend >= 0) || (sRecv >= 0); | ||||
| } | ||||
| 
 | ||||
| /// Returns the time this socket has been connected.
 | ||||
|  | @ -701,10 +702,10 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ | |||
|     } | ||||
|   } | ||||
|   int r; | ||||
|   if (sock >= 0){ | ||||
|     r = send(sock, buffer, len, 0); | ||||
|   if (isTrueSocket){ | ||||
|     r = send(sSend, buffer, len, 0); | ||||
|   }else{ | ||||
|     r = write(pipes[0], buffer, len); | ||||
|     r = write(sSend, buffer, len); | ||||
|   } | ||||
|   if (r < 0){ | ||||
|     switch (errno){ | ||||
|  | @ -717,7 +718,7 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ | |||
|       break; | ||||
|     } | ||||
|   } | ||||
|   if (r == 0 && (sock >= 0)){ | ||||
|   if (r == 0 && (sSend >= 0)){ | ||||
|     DONTEVEN_MSG("Socket closed by remote"); | ||||
|     close(); | ||||
|   } | ||||
|  | @ -734,11 +735,10 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ | |||
| int Socket::Connection::iread(void *buffer, int len, int flags){ | ||||
|   if (!connected() || len < 1){return 0;} | ||||
|   int r; | ||||
|   if (sock >= 0){ | ||||
|     r = recv(sock, buffer, len, flags); | ||||
|   if (sRecv != -1 || !isTrueSocket){ | ||||
|     r = read(sRecv, buffer, len); | ||||
|   }else{ | ||||
|     r = recv(pipes[1], buffer, len, flags); | ||||
|     if (r < 0 && errno == ENOTSOCK){r = read(pipes[1], buffer, len);} | ||||
|     r = recv(sSend, buffer, len, flags); | ||||
|   } | ||||
|   if (r < 0){ | ||||
|     switch (errno){ | ||||
|  | @ -822,13 +822,13 @@ void Socket::Connection::setHost(std::string host){ | |||
| /// Returns true if these sockets are the same socket.
 | ||||
| /// Does not check the internal stats - only the socket itself.
 | ||||
| bool Socket::Connection::operator==(const Connection &B) const{ | ||||
|   return sock == B.sock && pipes[0] == B.pipes[0] && pipes[1] == B.pipes[1]; | ||||
|   return sSend == B.sSend && sRecv == B.sRecv; | ||||
| } | ||||
| 
 | ||||
| /// Returns true if these sockets are not the same socket.
 | ||||
| /// Does not check the internal stats - only the socket itself.
 | ||||
| bool Socket::Connection::operator!=(const Connection &B) const{ | ||||
|   return sock != B.sock || pipes[0] != B.pipes[0] || pipes[1] != B.pipes[1]; | ||||
|   return sSend != B.sSend || sRecv != B.sRecv; | ||||
| } | ||||
| 
 | ||||
| /// Returns true if the socket is valid.
 | ||||
|  | @ -1025,7 +1025,7 @@ unsigned int Socket::SSLConnection::iwrite(const void *buffer, int len){ | |||
|       break; | ||||
|     } | ||||
|   } | ||||
|   if (r == 0 && (sock >= 0)){ | ||||
|   if (r == 0 && (sSend >= 0)){ | ||||
|     DONTEVEN_MSG("Socket closed by remote"); | ||||
|     close(); | ||||
|   } | ||||
|  | @ -1234,36 +1234,35 @@ Socket::Connection Socket::Server::accept(bool nonblock){ | |||
|   int r = ::accept(sock, (sockaddr *)&tmpaddr, &len); | ||||
|   // set the socket to be nonblocking, if requested.
 | ||||
|   // we could do this through accept4 with a flag, but that call is non-standard...
 | ||||
|   if ((r >= 0) && nonblock){ | ||||
|   if (r < 0){ | ||||
|     if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)){ | ||||
|       FAIL_MSG("Error during accept: %s. Closing server socket %d.", strerror(errno), sock); | ||||
|       close(); | ||||
|     } | ||||
|     return Socket::Connection(); | ||||
|   } | ||||
| 
 | ||||
|   if (nonblock){ | ||||
|     int flags = fcntl(r, F_GETFL, 0); | ||||
|     flags |= O_NONBLOCK; | ||||
|     fcntl(r, F_SETFL, flags); | ||||
|   } | ||||
|   if (r >= 0){ | ||||
|     int optval = 1; | ||||
|     int optlen = sizeof(optval); | ||||
|     setsockopt(r, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); | ||||
|   } | ||||
|   int optval = 1; | ||||
|   int optlen = sizeof(optval); | ||||
|   setsockopt(r, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); | ||||
|   Socket::Connection tmp(r); | ||||
|   tmp.remoteaddr = tmpaddr; | ||||
|   if (r < 0){ | ||||
|     if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)){ | ||||
|       DEBUG_MSG(DLVL_FAIL, "Error during accept - closing server socket %d.", sock); | ||||
|       close(); | ||||
|     } | ||||
|   }else{ | ||||
|     if (tmpaddr.sin6_family == AF_INET6){ | ||||
|       tmp.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); | ||||
|       DEBUG_MSG(DLVL_HIGH, "IPv6 addr [%s]", tmp.remotehost.c_str()); | ||||
|     } | ||||
|     if (tmpaddr.sin6_family == AF_INET){ | ||||
|       tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN); | ||||
|       DEBUG_MSG(DLVL_HIGH, "IPv4 addr [%s]", tmp.remotehost.c_str()); | ||||
|     } | ||||
|     if (tmpaddr.sin6_family == AF_UNIX){ | ||||
|       DEBUG_MSG(DLVL_HIGH, "Unix connection"); | ||||
|       tmp.remotehost = "UNIX_SOCKET"; | ||||
|     } | ||||
|   if (tmpaddr.sin6_family == AF_INET6){ | ||||
|     tmp.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); | ||||
|     DEBUG_MSG(DLVL_HIGH, "IPv6 addr [%s]", tmp.remotehost.c_str()); | ||||
|   } | ||||
|   if (tmpaddr.sin6_family == AF_INET){ | ||||
|     tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN); | ||||
|     DEBUG_MSG(DLVL_HIGH, "IPv4 addr [%s]", tmp.remotehost.c_str()); | ||||
|   } | ||||
|   if (tmpaddr.sin6_family == AF_UNIX){ | ||||
|     DEBUG_MSG(DLVL_HIGH, "Unix connection"); | ||||
|     tmp.remotehost = "UNIX_SOCKET"; | ||||
|   } | ||||
|   return tmp; | ||||
| } | ||||
|  |  | |||
|  | @ -66,10 +66,13 @@ namespace Socket{ | |||
|   // Buffer
 | ||||
| 
 | ||||
|   /// This class is for easy communicating through sockets, either TCP or Unix.
 | ||||
|   /// Internally, sSend and sRecv hold the file descriptor to read/write from/to.
 | ||||
|   /// If they are not identical and sRecv is closed but sSend still open, reading from sSend will be attempted.
 | ||||
|   class Connection{ | ||||
|   protected: | ||||
|     int sock;               ///< Internally saved socket number.
 | ||||
|     int pipes[2];           ///< Internally saved file descriptors for pipe socket simulation.
 | ||||
|     bool isTrueSocket; | ||||
|     int sSend;               ///< Write end of socket.
 | ||||
|     int sRecv;               ///< Read end of socket.
 | ||||
|     std::string remotehost; ///< Stores remote host address.
 | ||||
|     struct sockaddr_in6 remoteaddr;///< Stores remote host address.
 | ||||
|     uint64_t up; | ||||
|  |  | |||
|  | @ -335,7 +335,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir | |||
|    | ||||
|   if (pid == 0){ | ||||
|     Socket::Connection io(0, 1); | ||||
|     io.close(); | ||||
|     io.drop(); | ||||
|     DEBUG_MSG(DLVL_DONTEVEN, "execvp"); | ||||
|     execvp(argv[0], argv); | ||||
|     FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); | ||||
|  |  | |||
|  | @ -760,6 +760,7 @@ namespace Mist{ | |||
|     } | ||||
|      | ||||
|     if (!keepGoing()){ | ||||
|       INFO_MSG("Aborting page load due to shutdown"); | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -14,6 +14,10 @@ namespace Mist { | |||
|     if (config->getString("ip").size()){ | ||||
|       myConn.setHost(config->getString("ip")); | ||||
|     } | ||||
|     firstRun = true; | ||||
|     if (config->getString("prequest").size()){ | ||||
|       myConn.Received().prepend(config->getString("prequest")); | ||||
|     } | ||||
|     config->activate(); | ||||
|   } | ||||
| 
 | ||||
|  | @ -35,14 +39,19 @@ namespace Mist { | |||
|     capa["forward"]["ip"]["help"] = "IP of forwarded connection."; | ||||
|     capa["forward"]["ip"]["type"] = "str"; | ||||
|     capa["forward"]["ip"]["option"] = "--ip"; | ||||
|     capa["forward"]["ip"]["name"] = "Previous request"; | ||||
|     capa["forward"]["ip"]["help"] = "Data to pretend arrived on the socket before parsing the socket."; | ||||
|     capa["forward"]["ip"]["type"] = "str"; | ||||
|     capa["forward"]["ip"]["option"] = "--prequest"; | ||||
|     cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}")); | ||||
|     cfg->addOption("ip", JSON::fromString("{\"arg\":\"string\",\"short\":\"I\",\"long\":\"ip\",\"help\":\"IP address of connection on stdio.\"}")); | ||||
|     cfg->addOption("prequest", JSON::fromString("{\"arg\":\"string\",\"short\":\"R\",\"long\":\"prequest\",\"help\":\"Data to pretend arrived on the socket before parsing the socket.\"}")); | ||||
|     cfg->addBasicConnectorOptions(capa); | ||||
|     config = cfg; | ||||
|   } | ||||
|    | ||||
|   void HTTPOutput::onFail(const std::string & msg, bool critical){ | ||||
|     INFO_MSG("Failing '%s': %s: %s", streamName.c_str(), H.url.c_str(), msg.c_str()); | ||||
|     INFO_MSG("Failing '%s': %s", H.url.c_str(), msg.c_str()); | ||||
|     if (!webSock && !isRecording()){ | ||||
|       H.Clean(); //make sure no parts of old requests are left in any buffers
 | ||||
|       H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); | ||||
|  | @ -167,83 +176,52 @@ namespace Mist { | |||
|   } | ||||
|    | ||||
|   void HTTPOutput::requestHandler(){ | ||||
|     //Handle onIdle function caller, if needed
 | ||||
|     if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){ | ||||
|       onIdle(); | ||||
|       idleLast = Util::bootMS(); | ||||
|     } | ||||
|     //Handle websockets
 | ||||
|     if (webSock){ | ||||
|       if (webSock->readFrame()){ | ||||
|         onWebsocketFrame(); | ||||
|         idleLast = Util::bootMS(); | ||||
|       }else{ | ||||
|         if (!isBlocking && !parseData){ | ||||
|           Util::sleep(100); | ||||
|         } | ||||
|         return; | ||||
|       } | ||||
|       if (!isBlocking && !parseData){Util::sleep(100);} | ||||
|       return; | ||||
|     } | ||||
|     if (myConn.Received().size() && myConn.spool()){ | ||||
|       DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); | ||||
|       onRequest(); | ||||
|     }else{ | ||||
|       if (!myConn.Received().size()){ | ||||
|         if (myConn.peek() && H.Read(myConn)){ | ||||
|           std::string handler = getHandler(); | ||||
|           /*LTS-START*/ | ||||
|           reqUrl = H.url + H.allVars(); | ||||
|           /*LTS-END*/ | ||||
|           DEBUG_MSG(DLVL_MEDIUM, "Received request: %s => %s (%s)", H.getUrl().c_str(), handler.c_str(), H.GetVar("stream").c_str()); | ||||
|           if (!handler.size()){ | ||||
|             H.Clean(); | ||||
|             H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); | ||||
|             H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>"); | ||||
|             H.SendResponse("415", "Unsupported Media Type", myConn); | ||||
|             myConn.close(); | ||||
|             return; | ||||
|           } | ||||
|           if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){ | ||||
|             DEBUG_MSG(DLVL_MEDIUM, "Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str()); | ||||
|             streamName = H.GetVar("stream"); | ||||
|             nProxy.userClient.finish(); | ||||
|             statsPage.finish(); | ||||
|             reConnector(handler); | ||||
|             H.Clean(); | ||||
|             if (myConn.connected()){ | ||||
|               FAIL_MSG("Request failed - no connector started"); | ||||
|               myConn.close(); | ||||
|             } | ||||
|             return; | ||||
|           }else{ | ||||
|             H.Clean(); | ||||
|             myConn.Received().clear(); | ||||
|             myConn.spool(); | ||||
|             DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); | ||||
|             onRequest(); | ||||
|           } | ||||
|         }else{ | ||||
|           H.Clean(); | ||||
|           if (myConn.Received().size()){ | ||||
|             myConn.Received().clear(); | ||||
|             myConn.spool(); | ||||
|             DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); | ||||
|             onRequest(); | ||||
|           } | ||||
|           if (!myConn.Received().size()){ | ||||
|             if (!isBlocking && !parseData){ | ||||
|               Util::sleep(100); | ||||
|             } | ||||
|           } | ||||
|         } | ||||
|       }else{ | ||||
|         if (!isBlocking && !parseData){ | ||||
|           Util::sleep(100); | ||||
|         } | ||||
|       } | ||||
|     //If we can't read anything more and we're non-blocking, sleep some.
 | ||||
|     if (!firstRun && !myConn.spool()){ | ||||
|       if (!isBlocking && !parseData){Util::sleep(100);} | ||||
|       return; | ||||
|     } | ||||
|   } | ||||
|   | ||||
|   void HTTPOutput::onRequest(){ | ||||
|     firstRun = false; | ||||
| 
 | ||||
|     while (H.Read(myConn)){ | ||||
|       std::string handler = getHandler(); | ||||
|       INFO_MSG("Received request: %s => %s (%s)", H.getUrl().c_str(), handler.c_str(), H.GetVar("stream").c_str()); | ||||
|       if (!handler.size()){ | ||||
|         H.Clean(); | ||||
|         H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); | ||||
|         H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>"); | ||||
|         H.SendResponse("415", "Unsupported Media Type", myConn); | ||||
|         myConn.close(); | ||||
|         return; | ||||
|       } | ||||
|       if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){ | ||||
|         MEDIUM_MSG("Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str()); | ||||
|         streamName = H.GetVar("stream"); | ||||
|         nProxy.userClient.finish(); | ||||
|         statsPage.finish(); | ||||
|         reConnector(handler); | ||||
|         onFail("Server error - could not start connector", true); | ||||
|         return; | ||||
|       } | ||||
| 
 | ||||
|       /*LTS-START*/ | ||||
|       reqUrl = H.url + H.allVars(); | ||||
|       /*LTS-END*/ | ||||
|       if (H.hasHeader("User-Agent")){ | ||||
|         UA = H.GetHeader("User-Agent"); | ||||
|       } | ||||
|  | @ -260,7 +238,6 @@ namespace Mist { | |||
|         crc = checksum::crc32(0, mixed_ua.data(), mixed_ua.size()); | ||||
|       } | ||||
| 
 | ||||
|       INFO_MSG("Received request %s", H.getUrl().c_str()); | ||||
|       if (H.GetVar("audio") != ""){targetParams["audio"] = H.GetVar("audio");} | ||||
|       if (H.GetVar("video") != ""){targetParams["video"] = H.GetVar("video");} | ||||
|       if (H.GetVar("subtitle") != ""){targetParams["subtitle"] = H.GetVar("subtitle");} | ||||
|  | @ -339,13 +316,11 @@ namespace Mist { | |||
|   } | ||||
|    | ||||
|   ///\brief Handles requests by starting a corresponding output process.
 | ||||
|   ///\param H The request to be handled
 | ||||
|   ///\param conn The connection to the client that issued the request.
 | ||||
|   ///\param connector The type of connector to be invoked.
 | ||||
|   void HTTPOutput::reConnector(std::string & connector){ | ||||
|     //taken from CheckProtocols (controller_connectors.cpp)
 | ||||
|     char * argarr[20]; | ||||
|     for (int i=0; i<20; i++){argarr[i] = 0;} | ||||
|     char * argarr[32]; | ||||
|     for (int i=0; i<32; i++){argarr[i] = 0;} | ||||
|     int id = -1; | ||||
|     JSON::Value pipedCapa; | ||||
|     JSON::Value p;//properties of protocol
 | ||||
|  | @ -398,6 +373,8 @@ namespace Mist { | |||
| 
 | ||||
|     //build arguments for starting output process
 | ||||
|     std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector; | ||||
|     std::string tmpPrequest; | ||||
|     if (H.url.size()){tmpPrequest = H.BuildRequest();} | ||||
|     int argnum = 0; | ||||
|     argarr[argnum++] = (char*)tmparg.c_str(); | ||||
|     std::string temphost=getConnectedHost(); | ||||
|  | @ -406,6 +383,8 @@ namespace Mist { | |||
|     argarr[argnum++] = (char*)(temphost.c_str()); | ||||
|     argarr[argnum++] = (char*)"--stream"; | ||||
|     argarr[argnum++] = (char*)(streamName.c_str()); | ||||
|     argarr[argnum++] = (char*)"--prequest"; | ||||
|     argarr[argnum++] = (char*)(tmpPrequest.c_str()); | ||||
|     //set the debug level if non-default
 | ||||
|     if (Util::Config::printDebugLevel != DEBUG){ | ||||
|       argarr[argnum++] = (char*)"--debug"; | ||||
|  |  | |||
|  | @ -11,7 +11,6 @@ namespace Mist { | |||
|       HTTPOutput(Socket::Connection & conn); | ||||
|       virtual ~HTTPOutput(); | ||||
|       static void init(Util::Config * cfg); | ||||
|       void onRequest(); | ||||
|       virtual void onFail(const std::string & msg, bool critical = false); | ||||
|       virtual void onHTTP(){}; | ||||
|       virtual void onIdle(){}; | ||||
|  | @ -26,6 +25,7 @@ namespace Mist { | |||
|       std::string getHandler(); | ||||
|       bool parseRange(uint64_t & byteStart, uint64_t & byteEnd); | ||||
|   protected: | ||||
|       bool firstRun; | ||||
|       HTTP::Parser H; | ||||
|       HTTP::Websocket * webSock; | ||||
|       uint32_t idleInterval; | ||||
|  |  | |||
|  | @ -36,12 +36,13 @@ namespace Mist { | |||
| 
 | ||||
|   OutHTTP::OutHTTP(Socket::Connection & conn) : HTTPOutput(conn){ | ||||
|     stayConnected = false; | ||||
|     if (myConn.getPureSocket() >= 0){ | ||||
|     //If this connection is a socket and not already connected to stdio, connect it to stdio.
 | ||||
|     if (myConn.getPureSocket() != -1 && myConn.getSocket() != STDIN_FILENO && myConn.getSocket() != STDOUT_FILENO){ | ||||
|       std::string host = getConnectedHost(); | ||||
|       dup2(myConn.getSocket(), STDIN_FILENO); | ||||
|       dup2(myConn.getSocket(), STDOUT_FILENO); | ||||
|       myConn.drop(); | ||||
|       myConn = Socket::Connection(fileno(stdout),fileno(stdin) ); | ||||
|       myConn = Socket::Connection(STDOUT_FILENO, STDIN_FILENO); | ||||
|       myConn.setHost(host); | ||||
|     } | ||||
|     if (config->getString("nostreamtext").size()){ | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Thulinma
						Thulinma