diff --git a/lib/socket.cpp b/lib/socket.cpp index fa88f87e..84239e0f 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -375,9 +375,11 @@ void Socket::Buffer::clear(){ /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// \param sockNo Integer representing the socket to convert. Socket::Connection::Connection(int sockNo){ - sock = sockNo; - pipes[0] = -1; - pipes[1] = -1; + sSend = sockNo; + sRecv = -1; + isTrueSocket = false; + struct stat sBuf; + if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);} up = 0; down = 0; conntime = Util::epoch(); @@ -390,9 +392,15 @@ Socket::Connection::Connection(int sockNo){ /// \param write The filedescriptor to write to. /// \param read The filedescriptor to read from. Socket::Connection::Connection(int write, int read){ - sock = -1; - pipes[0] = write; - pipes[1] = read; + sSend = write; + if (write != read){ + sRecv = read; + }else{ + sRecv = -1; + } + isTrueSocket = false; + struct stat sBuf; + if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);} up = 0; down = 0; conntime = Util::epoch(); @@ -404,9 +412,9 @@ Socket::Connection::Connection(int write, int read){ /// Create a new disconnected base socket. This is a basic constructor for placeholder purposes. /// A socket created like this is always disconnected and should/could be overwritten at some point. Socket::Connection::Connection(){ - sock = -1; - pipes[0] = -1; - pipes[1] = -1; + sSend = -1; + sRecv = -1; + isTrueSocket = false; up = 0; down = 0; conntime = Util::epoch(); @@ -447,16 +455,14 @@ bool isFDBlocking(int FD){ /// Set this socket to be blocking (true) or nonblocking (false). void Socket::Connection::setBlocking(bool blocking){ - if (sock >= 0){setFDBlocking(sock, blocking);} - if (pipes[0] >= 0){setFDBlocking(pipes[0], blocking);} - if (pipes[1] >= 0){setFDBlocking(pipes[1], blocking);} + if (sSend >= 0){setFDBlocking(sSend, blocking);} + if (sRecv >= 0 && sSend != sRecv){setFDBlocking(sRecv, blocking);} } /// Set this socket to be blocking (true) or nonblocking (false). bool Socket::Connection::isBlocking(){ - if (sock >= 0){return isFDBlocking(sock);} - if (pipes[0] >= 0){return isFDBlocking(pipes[0]);} - if (pipes[1] >= 0){return isFDBlocking(pipes[1]);} + if (sSend >= 0){return isFDBlocking(sSend);} + if (sRecv >= 0){return isFDBlocking(sRecv);} return false; } @@ -465,7 +471,7 @@ bool Socket::Connection::isBlocking(){ /// This function calls shutdown, thus making the socket unusable in all other /// processes as well. Do not use on shared sockets that are still in use. void Socket::Connection::close(){ - if (sock != -1){shutdown(sock, SHUT_RDWR);} + if (sSend != -1){shutdown(sSend, SHUT_RDWR);} drop(); }// Socket::Connection::close @@ -475,36 +481,31 @@ void Socket::Connection::close(){ /// processes. void Socket::Connection::drop(){ if (connected()){ - if (sock != -1){ - DEBUG_MSG(DLVL_HIGH, "Socket %d closed", sock); + if (sSend != -1){ + HIGH_MSG("Socket %d closed", sSend); errno = EINTR; - while (::close(sock) != 0 && errno == EINTR){} - sock = -1; + while (::close(sSend) != 0 && errno == EINTR){} + if (sRecv == sSend){sRecv = -1;} + sSend = -1; } - if (pipes[0] != -1){ + if (sRecv != -1){ errno = EINTR; - while (::close(pipes[0]) != 0 && errno == EINTR){} - pipes[0] = -1; - } - if (pipes[1] != -1){ - errno = EINTR; - while (::close(pipes[1]) != 0 && errno == EINTR){} - pipes[1] = -1; + while (::close(sRecv) != 0 && errno == EINTR){} + sRecv = -1; } } }// Socket::Connection::drop /// Returns internal socket number. int Socket::Connection::getSocket(){ - if (sock != -1){return sock;} - if (pipes[0] != -1){return pipes[0];} - if (pipes[1] != -1){return pipes[1];} - return -1; + if (sSend != -1){return sSend;} + return sRecv; } /// Returns non-piped internal socket number. int Socket::Connection::getPureSocket(){ - return sock; + if (!isTrueSocket){return -1;} + return sSend; } /// Returns a string describing the last error that occured. @@ -518,12 +519,12 @@ std::string Socket::Connection::getError(){ /// \param nonblock Whether the socket should be nonblocking. False by default. Socket::Connection::Connection(std::string address, bool nonblock){ skipCount = 0; - pipes[0] = -1; - pipes[1] = -1; - sock = socket(PF_UNIX, SOCK_STREAM, 0); - if (sock < 0){ + sRecv = -1; + isTrueSocket = true; + sSend = socket(PF_UNIX, SOCK_STREAM, 0); + if (sSend < 0){ remotehost = strerror(errno); - DEBUG_MSG(DLVL_FAIL, "Could not create socket! Error: %s", remotehost.c_str()); + FAIL_MSG("Could not create socket! Error: %s", remotehost.c_str()); return; } Error = false; @@ -534,19 +535,19 @@ Socket::Connection::Connection(std::string address, bool nonblock){ sockaddr_un addr; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, address.c_str(), address.size() + 1); - int r = connect(sock, (sockaddr *)&addr, sizeof(addr)); + int r = connect(sSend, (sockaddr *)&addr, sizeof(addr)); if (r == 0){ if (nonblock){ - int flags = fcntl(sock, F_GETFL, 0); + int flags = fcntl(sSend, F_GETFL, 0); flags |= O_NONBLOCK; - fcntl(sock, F_SETFL, flags); + fcntl(sSend, F_SETFL, flags); } }else{ remotehost = strerror(errno); - DEBUG_MSG(DLVL_FAIL, "Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); + FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); close(); } -}// Socket::Connection Unix Contructor +}// Socket::Connection Unix Constructor /// Create a new TCP Socket. This socket will (try to) connect to the given host/port right away. /// \param host String containing the hostname to connect to. @@ -554,8 +555,8 @@ Socket::Connection::Connection(std::string address, bool nonblock){ /// \param nonblock Whether the socket should be nonblocking. Socket::Connection::Connection(std::string host, int port, bool nonblock){ skipCount = 0; - pipes[0] = -1; - pipes[1] = -1; + sRecv = -1; + isTrueSocket = true; struct addrinfo *result, *rp, hints; Error = false; Blocking = false; @@ -578,11 +579,11 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ remotehost = ""; for (rp = result; rp != NULL; rp = rp->ai_next){ - sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sock < 0){continue;} - if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0){break;} + sSend = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sSend < 0){continue;} + if (connect(sSend, rp->ai_addr, rp->ai_addrlen) == 0){break;} remotehost += strerror(errno); - ::close(sock); + ::close(sSend); } freeaddrinfo(result); @@ -591,15 +592,15 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ close(); }else{ if (nonblock){ - int flags = fcntl(sock, F_GETFL, 0); + int flags = fcntl(sSend, F_GETFL, 0); flags |= O_NONBLOCK; - fcntl(sock, F_SETFL, flags); + fcntl(sSend, F_SETFL, flags); } int optval = 1; int optlen = sizeof(optval); - setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); + setsockopt(sSend, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); } -}// Socket::Connection TCP Contructor +}// Socket::Connection TCP Constructor /// Returns the connected-state for this socket. /// Note that this function might be slightly behind the real situation. @@ -607,7 +608,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ /// and when the socket is closed manually. /// \returns True if socket is connected, false otherwise. bool Socket::Connection::connected() const{ - return (sock >= 0) || ((pipes[0] >= 0) || (pipes[1] >= 0)); + return (sSend >= 0) || (sRecv >= 0); } /// Returns the time this socket has been connected. @@ -701,10 +702,10 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ } } int r; - if (sock >= 0){ - r = send(sock, buffer, len, 0); + if (isTrueSocket){ + r = send(sSend, buffer, len, 0); }else{ - r = write(pipes[0], buffer, len); + r = write(sSend, buffer, len); } if (r < 0){ switch (errno){ @@ -717,7 +718,7 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ break; } } - if (r == 0 && (sock >= 0)){ + if (r == 0 && (sSend >= 0)){ DONTEVEN_MSG("Socket closed by remote"); close(); } @@ -734,11 +735,10 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ int Socket::Connection::iread(void *buffer, int len, int flags){ if (!connected() || len < 1){return 0;} int r; - if (sock >= 0){ - r = recv(sock, buffer, len, flags); + if (sRecv != -1 || !isTrueSocket){ + r = read(sRecv, buffer, len); }else{ - r = recv(pipes[1], buffer, len, flags); - if (r < 0 && errno == ENOTSOCK){r = read(pipes[1], buffer, len);} + r = recv(sSend, buffer, len, flags); } if (r < 0){ switch (errno){ @@ -822,13 +822,13 @@ void Socket::Connection::setHost(std::string host){ /// Returns true if these sockets are the same socket. /// Does not check the internal stats - only the socket itself. bool Socket::Connection::operator==(const Connection &B) const{ - return sock == B.sock && pipes[0] == B.pipes[0] && pipes[1] == B.pipes[1]; + return sSend == B.sSend && sRecv == B.sRecv; } /// Returns true if these sockets are not the same socket. /// Does not check the internal stats - only the socket itself. bool Socket::Connection::operator!=(const Connection &B) const{ - return sock != B.sock || pipes[0] != B.pipes[0] || pipes[1] != B.pipes[1]; + return sSend != B.sSend || sRecv != B.sRecv; } /// Returns true if the socket is valid. @@ -1025,7 +1025,7 @@ unsigned int Socket::SSLConnection::iwrite(const void *buffer, int len){ break; } } - if (r == 0 && (sock >= 0)){ + if (r == 0 && (sSend >= 0)){ DONTEVEN_MSG("Socket closed by remote"); close(); } @@ -1234,36 +1234,35 @@ Socket::Connection Socket::Server::accept(bool nonblock){ int r = ::accept(sock, (sockaddr *)&tmpaddr, &len); // set the socket to be nonblocking, if requested. // we could do this through accept4 with a flag, but that call is non-standard... - if ((r >= 0) && nonblock){ + if (r < 0){ + if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)){ + FAIL_MSG("Error during accept: %s. Closing server socket %d.", strerror(errno), sock); + close(); + } + return Socket::Connection(); + } + + if (nonblock){ int flags = fcntl(r, F_GETFL, 0); flags |= O_NONBLOCK; fcntl(r, F_SETFL, flags); } - if (r >= 0){ - int optval = 1; - int optlen = sizeof(optval); - setsockopt(r, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); - } + int optval = 1; + int optlen = sizeof(optval); + setsockopt(r, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); Socket::Connection tmp(r); tmp.remoteaddr = tmpaddr; - if (r < 0){ - if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)){ - DEBUG_MSG(DLVL_FAIL, "Error during accept - closing server socket %d.", sock); - close(); - } - }else{ - if (tmpaddr.sin6_family == AF_INET6){ - tmp.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); - DEBUG_MSG(DLVL_HIGH, "IPv6 addr [%s]", tmp.remotehost.c_str()); - } - if (tmpaddr.sin6_family == AF_INET){ - tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN); - DEBUG_MSG(DLVL_HIGH, "IPv4 addr [%s]", tmp.remotehost.c_str()); - } - if (tmpaddr.sin6_family == AF_UNIX){ - DEBUG_MSG(DLVL_HIGH, "Unix connection"); - tmp.remotehost = "UNIX_SOCKET"; - } + if (tmpaddr.sin6_family == AF_INET6){ + tmp.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); + DEBUG_MSG(DLVL_HIGH, "IPv6 addr [%s]", tmp.remotehost.c_str()); + } + if (tmpaddr.sin6_family == AF_INET){ + tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN); + DEBUG_MSG(DLVL_HIGH, "IPv4 addr [%s]", tmp.remotehost.c_str()); + } + if (tmpaddr.sin6_family == AF_UNIX){ + DEBUG_MSG(DLVL_HIGH, "Unix connection"); + tmp.remotehost = "UNIX_SOCKET"; } return tmp; } diff --git a/lib/socket.h b/lib/socket.h index b9c50d02..cdc8b1b3 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -66,10 +66,13 @@ namespace Socket{ // Buffer /// This class is for easy communicating through sockets, either TCP or Unix. + /// Internally, sSend and sRecv hold the file descriptor to read/write from/to. + /// If they are not identical and sRecv is closed but sSend still open, reading from sSend will be attempted. class Connection{ protected: - int sock; ///< Internally saved socket number. - int pipes[2]; ///< Internally saved file descriptors for pipe socket simulation. + bool isTrueSocket; + int sSend; ///< Write end of socket. + int sRecv; ///< Read end of socket. std::string remotehost; ///< Stores remote host address. struct sockaddr_in6 remoteaddr;///< Stores remote host address. uint64_t up; diff --git a/lib/stream.cpp b/lib/stream.cpp index da09f5d5..879e1f51 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -253,7 +253,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir if (pid == 0){ Socket::Connection io(0, 1); - io.close(); + io.drop(); DEBUG_MSG(DLVL_DONTEVEN, "execvp"); execvp(argv[0], argv); FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); diff --git a/src/output/output.cpp b/src/output/output.cpp index 1605f30d..b8c95fc9 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -528,6 +528,7 @@ namespace Mist{ } if (!keepGoing()){ + INFO_MSG("Aborting page load due to shutdown"); return; } diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index e52322f6..f885e3a2 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -12,6 +12,10 @@ namespace Mist { if (config->getString("ip").size()){ myConn.setHost(config->getString("ip")); } + firstRun = true; + if (config->getString("prequest").size()){ + myConn.Received().prepend(config->getString("prequest")); + } config->activate(); } @@ -33,14 +37,19 @@ namespace Mist { capa["forward"]["ip"]["help"] = "IP of forwarded connection."; capa["forward"]["ip"]["type"] = "str"; capa["forward"]["ip"]["option"] = "--ip"; + capa["forward"]["ip"]["name"] = "Previous request"; + capa["forward"]["ip"]["help"] = "Data to pretend arrived on the socket before parsing the socket."; + capa["forward"]["ip"]["type"] = "str"; + capa["forward"]["ip"]["option"] = "--prequest"; cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}")); cfg->addOption("ip", JSON::fromString("{\"arg\":\"string\",\"short\":\"I\",\"long\":\"ip\",\"help\":\"IP address of connection on stdio.\"}")); + cfg->addOption("prequest", JSON::fromString("{\"arg\":\"string\",\"short\":\"R\",\"long\":\"prequest\",\"help\":\"Data to pretend arrived on the socket before parsing the socket.\"}")); cfg->addBasicConnectorOptions(capa); config = cfg; } void HTTPOutput::onFail(const std::string & msg, bool critical){ - INFO_MSG("Failing '%s': %s: %s", streamName.c_str(), H.url.c_str(), msg.c_str()); + INFO_MSG("Failing '%s': %s", H.url.c_str(), msg.c_str()); if (!webSock){ H.Clean(); //make sure no parts of old requests are left in any buffers H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); @@ -165,80 +174,49 @@ namespace Mist { } void HTTPOutput::requestHandler(){ + //Handle onIdle function caller, if needed if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){ onIdle(); idleLast = Util::bootMS(); } + //Handle websockets if (webSock){ if (webSock->readFrame()){ onWebsocketFrame(); idleLast = Util::bootMS(); - }else{ - if (!isBlocking && !parseData){ - Util::sleep(100); - } + return; } + if (!isBlocking && !parseData){Util::sleep(100);} return; } - if (myConn.Received().size() && myConn.spool()){ - DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); - onRequest(); - }else{ - if (!myConn.Received().size()){ - if (myConn.peek() && H.Read(myConn)){ - std::string handler = getHandler(); - DEBUG_MSG(DLVL_MEDIUM, "Received request: %s => %s (%s)", H.getUrl().c_str(), handler.c_str(), H.GetVar("stream").c_str()); - if (!handler.size()){ - H.Clean(); - H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); - H.SetBody("