From 66890c4564f4c337bf0ee265c3e79d3a0df3f1a4 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 30 Jun 2019 22:36:29 +0200 Subject: [PATCH] Robustified Socket::Connection, added debugging data for copies/assigns and Socket::Connetion::open() calls for proper socket re-use. --- lib/downloader.cpp | 8 +- lib/socket.cpp | 119 +++++++++++++++++++++++---- lib/socket.h | 9 ++ src/analysers/analyser_dash.cpp | 10 +-- src/analysers/analyser_dtsc.cpp | 2 +- src/analysers/analyser_rtsp.cpp | 2 +- src/analysers/dash_analyser.cpp | 4 +- src/controller/controller_uplink.cpp | 2 +- src/input/input_dtsc.cpp | 4 +- src/input/input_h264.cpp | 4 +- src/input/input_rtsp.cpp | 2 +- src/input/input_ts.cpp | 4 +- src/output/output_http_internal.cpp | 3 +- src/output/output_httpts.cpp | 2 +- src/output/output_push.cpp | 6 +- src/output/output_rtmp.cpp | 4 +- 16 files changed, 141 insertions(+), 44 deletions(-) diff --git a/lib/downloader.cpp b/lib/downloader.cpp index 8e46abc2..d6836c3e 100644 --- a/lib/downloader.cpp +++ b/lib/downloader.cpp @@ -81,12 +81,12 @@ namespace HTTP{ connectedPort = link.getPort(); #ifdef SSL if (needSSL){ - S = Socket::Connection(connectedHost, connectedPort, true, true); + S.open(connectedHost, connectedPort, true, true); }else{ - S = Socket::Connection(connectedHost, connectedPort, true); + S.open(connectedHost, connectedPort, true); } #else - S = Socket::Connection(connectedHost, connectedPort, true); + S.open(connectedHost, connectedPort, true); #endif } }else{ @@ -95,7 +95,7 @@ namespace HTTP{ getSocket().close(); connectedHost = proxyUrl.host; connectedPort = proxyUrl.getPort(); - S = Socket::Connection(connectedHost, connectedPort, true); + S.open(connectedHost, connectedPort, true); } } ssl = needSSL; diff --git a/lib/socket.cpp b/lib/socket.cpp index 6a3bb84f..47fdac60 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -402,19 +402,39 @@ void Socket::Connection::setBoundAddr(){ } } +//Cleans up the socket by dropping the connection. +//Does not call close because it calls shutdown, which would destroy any copies of this socket too. +Socket::Connection::~Connection(){ + drop(); +} + + /// Create a new base socket. This is a basic constructor for converting any valid socket to a /// Socket::Connection. \param sockNo Integer representing the socket to convert. Socket::Connection::Connection(int sockNo){ clear(); - sSend = sockNo; - isTrueSocket = Socket::checkTrueSocket(sSend); - setBoundAddr(); + open(sockNo, sockNo); }// Socket::Connection basic constructor +/// Open from existing socket connection. +/// Closes any existing connections and resets all internal values beforehand. +/// Simply calls open(sockNo, sockNo) internally. +void Socket::Connection::open(int sockNo){ + open(sockNo, sockNo); +} + /// Simulate a socket using two file descriptors. /// \param write The filedescriptor to write to. /// \param read The filedescriptor to read from. Socket::Connection::Connection(int write, int read){ + clear(); + open(write, read); +}// Socket::Connection basic constructor + +/// Open from two existing file descriptors. +/// Closes any existing connections and resets all internal values beforehand. +void Socket::Connection::open(int write, int read){ + drop(); clear(); sSend = write; if (write != read){ @@ -424,7 +444,7 @@ Socket::Connection::Connection(int write, int read){ } isTrueSocket = Socket::checkTrueSocket(sSend); setBoundAddr(); -}// Socket::Connection basic constructor +} void Socket::Connection::clear(){ sSend = -1; @@ -516,9 +536,21 @@ bool Socket::Connection::isBlocking(){ /// This function calls shutdown, thus making the socket unusable in all other /// processes as well. Do not use on shared sockets that are still in use. void Socket::Connection::close(){ + if (sSend != -1){shutdown(sSend, SHUT_RDWR);} + drop(); +}// Socket::Connection::close + +/// Close connection. The internal socket is closed and then set to -1. +/// If the connection is already closed, nothing happens. +/// This function does *not* call shutdown, allowing continued use in other +/// processes. +void Socket::Connection::drop(){ #ifdef SSL if (sslConnected){ DONTEVEN_MSG("SSL close"); + if (ssl){ + mbedtls_ssl_close_notify(ssl); + } if (server_fd){ mbedtls_net_free(server_fd); delete server_fd; @@ -548,15 +580,6 @@ void Socket::Connection::close(){ return; } #endif - if (sSend != -1){shutdown(sSend, SHUT_RDWR);} - drop(); -}// Socket::Connection::close - -/// Close connection. The internal socket is closed and then set to -1. -/// If the connection is already closed, nothing happens. -/// This function does *not* call shutdown, allowing continued use in other -/// processes. -void Socket::Connection::drop(){ if (connected()){ if (sSend != -1){ HIGH_MSG("Socket %d closed", sSend); @@ -596,6 +619,14 @@ std::string Socket::Connection::getError(){ /// \param address String containing the location of the Unix socket to connect to. /// \param nonblock Whether the socket should be nonblocking. False by default. Socket::Connection::Connection(std::string address, bool nonblock){ + clear(); + open(address, nonblock); +}// Socket::Connection Unix Constructor + +/// Open Unix connection. +/// Closes any existing connections and resets all internal values beforehand. +void Socket::Connection::open(std::string address, bool nonblock){ + drop(); clear(); isTrueSocket = true; sSend = socket(PF_UNIX, SOCK_STREAM, 0); @@ -619,7 +650,7 @@ Socket::Connection::Connection(std::string address, bool nonblock){ FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); close(); } -}// Socket::Connection Unix Constructor +} #ifdef SSL ///Local-only function for debugging SSL sockets @@ -635,6 +666,14 @@ static void my_debug(void *ctx, int level, const char *file, int line, const cha /// \param port String containing the port to connect to. /// \param nonblock Whether the socket should be nonblocking. Socket::Connection::Connection(std::string host, int port, bool nonblock, bool with_ssl){ + clear(); + open(host, port, nonblock, with_ssl); +} + +/// Open TCP connection. +/// Closes any existing connections and resets all internal values beforehand. +void Socket::Connection::open(std::string host, int port, bool nonblock, bool with_ssl){ + drop(); clear(); if (with_ssl){ #ifdef SSL @@ -743,7 +782,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock, bool w setsockopt(sSend, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); setBoundAddr(); } -}// Socket::Connection TCP Constructor +} /// Returns the connected-state for this socket. /// Note that this function might be slightly behind the real situation. @@ -1053,6 +1092,56 @@ Socket::Connection::operator bool() const{ return connected(); } +//Copy constructor +Socket::Connection::Connection(const Connection& rhs){ + clear(); + if (!rhs){return;} +#if DEBUG >= DLVL_DEVEL + INFO_MSG("Copying %s socket", rhs.sslConnected?"SSL":"regular"); + BACKTRACE; +#endif + conntime = rhs.conntime; + isTrueSocket = rhs.isTrueSocket; + remotehost = rhs.remotehost; + boundaddr = rhs.boundaddr; + up = rhs.up; + down = rhs.down; + downbuffer = rhs.downbuffer; + if (!rhs.sslConnected){ + if (rhs.sSend >= 0){sSend = dup(rhs.sSend);} + if (rhs.sRecv >= 0){sRecv = dup(rhs.sRecv);} +#if DEBUG >= DLVL_DEVEL + INFO_MSG("Socket original = (%d / %d), copy = (%d / %d)", rhs.sSend, rhs.sRecv, sSend, sRecv); +#endif + } +} + +//Assignment constructor +Socket::Connection& Socket::Connection::operator=(const Socket::Connection& rhs){ + drop(); + clear(); + if (!rhs){return *this;} +#if DEBUG >= DLVL_DEVEL + INFO_MSG("Assigning %s socket", rhs.sslConnected?"SSL":"regular"); + BACKTRACE; +#endif + conntime = rhs.conntime; + isTrueSocket = rhs.isTrueSocket; + remotehost = rhs.remotehost; + boundaddr = rhs.boundaddr; + up = rhs.up; + down = rhs.down; + downbuffer = rhs.downbuffer; + if (!rhs.sslConnected){ + if (rhs.sSend >= 0){sSend = dup(rhs.sSend);} + if (rhs.sRecv >= 0){sRecv = dup(rhs.sRecv);} +#if DEBUG >= DLVL_DEVEL + INFO_MSG("Socket original = (%d / %d), copy = (%d / %d)", rhs.sSend, rhs.sRecv, sSend, sRecv); +#endif + } + return *this; +} + /// Returns true if the given address can be matched with the remote host. /// Can no longer return true after any socket error have occurred. bool Socket::Connection::isAddress(const std::string &addr){ diff --git a/lib/socket.h b/lib/socket.h index 3b0ec143..d6ac4249 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -109,7 +109,16 @@ namespace Socket{ Connection(std::string hostname, int port, bool nonblock, bool with_ssl = false); ///< Create a new TCP socket. Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. Connection(int write, int read); ///< Simulate a socket using two file descriptors. + // copy/assignment constructors + Connection(const Connection& rhs); + Connection& operator=(const Connection& rhs); + // destructor + ~Connection(); // generic methods + void open(int sockNo);//Open from existing socket connection. + void open(std::string hostname, int port, bool nonblock, bool with_ssl = false);//Open TCP connection. + void open(std::string adres, bool nonblock = false);//Open Unix connection. + void open(int write, int read);//Open from two existing file descriptors. void close(); ///< Close connection. void drop(); ///< Close connection without shutdown. void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). diff --git a/src/analysers/analyser_dash.cpp b/src/analysers/analyser_dash.cpp index a70ef069..34ec8b89 100644 --- a/src/analysers/analyser_dash.cpp +++ b/src/analysers/analyser_dash.cpp @@ -323,7 +323,7 @@ dashAnalyser::dashAnalyser(Util::Config conf) : analysers(conf) { startTime = Util::bootSecs(); abortTime = conf.getInteger("abort"); - conn = Socket::Connection(server, port, false); + conn.open(server, port, false); if(!conn.connected()) { @@ -336,7 +336,7 @@ dashAnalyser::dashAnalyser(Util::Config conf) : analysers(conf) { urlPrependStuff = url.substr(0, url.rfind("/") + 1); DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str()); if (!conn) { - conn = Socket::Connection(server, port, false); + conn.open(server, port, false); } pos = 0; @@ -414,7 +414,7 @@ int dashAnalyser::doAnalyse() { for (unsigned int i = 0; i < streamData.size(); i++) { if (streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet) tempID = i; } - if (!conn) { conn = Socket::Connection(server, port, false); } + if (!conn) { conn.open(server, port, false); } HTTP::Parser H; H.url = urlPrependStuff; H.url.append(currentPos.begin()->url); @@ -530,7 +530,7 @@ int main2(int argc, char **argv) { DEBUG_MSG(DLVL_INFO, "url %s server: %s port: %d", url.c_str(), server.c_str(), port); std::string urlPrependStuff = url.substr(0, url.rfind("/") + 1); DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str()); - if (!conn) { conn = Socket::Connection(server, port, false); } + if (!conn) { conn.open(server, port, false); } unsigned int pos = 0; HTTP::Parser H; H.url = url; @@ -593,7 +593,7 @@ int main2(int argc, char **argv) { for (unsigned int i = 0; i < streamData.size(); i++) { if (streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet) tempID = i; } - if (!conn) { conn = Socket::Connection(server, port, false); } + if (!conn) { conn.open(server, port, false); } HTTP::Parser H; H.url = urlPrependStuff; H.url.append(currentPos.begin()->url); diff --git a/src/analysers/analyser_dtsc.cpp b/src/analysers/analyser_dtsc.cpp index 044660be..ca810288 100644 --- a/src/analysers/analyser_dtsc.cpp +++ b/src/analysers/analyser_dtsc.cpp @@ -14,7 +14,7 @@ void AnalyserDTSC::init(Util::Config &conf){ bool AnalyserDTSC::open(const std::string &filename){ if (!Analyser::open(filename)){return false;} - conn = Socket::Connection(1, 0); + conn.open(1, 0); totalBytes = 0; return true; } diff --git a/src/analysers/analyser_rtsp.cpp b/src/analysers/analyser_rtsp.cpp index 0adf690b..caf55754 100644 --- a/src/analysers/analyser_rtsp.cpp +++ b/src/analysers/analyser_rtsp.cpp @@ -27,7 +27,7 @@ void AnalyserRTSP::incoming(const DTSC::Packet &pkt){ bool AnalyserRTSP::open(const std::string &filename){ if (!Analyser::open(filename)){return false;} - myConn = Socket::Connection(1, 0); + myConn.open(1, 0); return true; } diff --git a/src/analysers/dash_analyser.cpp b/src/analysers/dash_analyser.cpp index e9eabcfb..16220bb3 100644 --- a/src/analysers/dash_analyser.cpp +++ b/src/analysers/dash_analyser.cpp @@ -369,7 +369,7 @@ int main(int argc, char ** argv) { std::string urlPrependStuff= url.substr(0, url.rfind("/")+1); DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str()); if (!conn) { - conn = Socket::Connection(server, port, false); + conn.open(server, port, false); } unsigned int pos = 0; HTTP::Parser H; @@ -434,7 +434,7 @@ int main(int argc, char ** argv) { if( streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet ) tempID=i; } if (!conn) { - conn = Socket::Connection(server,port, false); + conn.open(server,port, false); } HTTP::Parser H; H.url = urlPrependStuff; diff --git a/src/controller/controller_uplink.cpp b/src/controller/controller_uplink.cpp index d46488aa..5e102945 100644 --- a/src/controller/controller_uplink.cpp +++ b/src/controller/controller_uplink.cpp @@ -39,7 +39,7 @@ void Controller::uplinkConnection(void * np) { while (Controller::conf.is_active) { if (!uplink) { INFO_MSG("Connecting to uplink at %s:%u", uplink_host.c_str(), uplink_port); - uplink = Socket::Connection(uplink_host, uplink_port, true); + uplink.open(uplink_host, uplink_port, true); } if (uplink) { if (uplink.spool()) { diff --git a/src/input/input_dtsc.cpp b/src/input/input_dtsc.cpp index 58b94e1e..07f9d037 100644 --- a/src/input/input_dtsc.cpp +++ b/src/input/input_dtsc.cpp @@ -160,7 +160,7 @@ namespace Mist { bool inputDTSC::openStreamSource() { std::string source = config->getString("input"); if (source == "-"){ - srcConn = Socket::Connection(fileno(stdout),fileno(stdin)); + srcConn.open(fileno(stdout),fileno(stdin)); return true; } if (source.find("dtsc://") == 0) { @@ -175,7 +175,7 @@ namespace Mist { if (streamName == "") { streamName = givenStream; } - srcConn = Socket::Connection(host, port, true); + srcConn.open(host, port, true); if (!srcConn.connected()){ return false; } diff --git a/src/input/input_h264.cpp b/src/input/input_h264.cpp index f2af2d6d..563eeac2 100644 --- a/src/input/input_h264.cpp +++ b/src/input/input_h264.cpp @@ -44,9 +44,9 @@ namespace Mist{ int fin = -1, fout = -1; inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); - myConn = Socket::Connection(-1, fout); + myConn.open(-1, fout); }else{ - myConn = Socket::Connection(fileno(stdout), fileno(stdin)); + myConn.open(fileno(stdout), fileno(stdin)); } myConn.Received().splitter.assign("\000\000\001", 3); myMeta.vod = false; diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp index 3cf2eb5a..9b63c2ef 100755 --- a/src/input/input_rtsp.cpp +++ b/src/input/input_rtsp.cpp @@ -134,7 +134,7 @@ namespace Mist{ } bool InputRTSP::openStreamSource(){ - tcpCon = Socket::Connection(url.host, url.getPort(), false); + tcpCon.open(url.host, url.getPort(), false); mainConn = &tcpCon; return tcpCon; } diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index e496d3fc..83067a2a 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -212,13 +212,13 @@ namespace Mist { int fin = -1, fout = -1; inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); - tcpCon = Socket::Connection(-1, fout); + tcpCon.open(-1, fout); return true; } //streamed file if (inpt.substr(0,9) == "stream://"){ inFile = fopen(inpt.c_str()+9, "r"); - tcpCon = Socket::Connection(-1, fileno(inFile)); + tcpCon.open(-1, fileno(inFile)); standAlone = false; return inFile; } diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 9225dc0a..6753ceb9 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -41,8 +41,7 @@ namespace Mist { std::string host = getConnectedHost(); dup2(myConn.getSocket(), STDIN_FILENO); dup2(myConn.getSocket(), STDOUT_FILENO); - myConn.drop(); - myConn = Socket::Connection(STDOUT_FILENO, STDIN_FILENO); + myConn.open(STDOUT_FILENO, STDIN_FILENO); myConn.setHost(host); } if (config->getString("nostreamtext").size()){ diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index b25df9db..2c9ec91d 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -33,7 +33,7 @@ namespace Mist{ int fin = -1; Util::Procs::StartPiped(args, &fin, 0, 0); - myConn = Socket::Connection(fin, -1); + myConn.open(fin, -1); wantRequest = false; parseData = true; diff --git a/src/output/output_push.cpp b/src/output/output_push.cpp index c016bbe0..0208e0fb 100644 --- a/src/output/output_push.cpp +++ b/src/output/output_push.cpp @@ -143,7 +143,7 @@ void pushFirstElement(std::string qId) { proxyToPost(srcConn, srcLocation, dstConn, dstLocation); - srcConn = Socket::Connection(srcHost, srcPort, true); + srcConn.open(srcHost, srcPort, true); //Set the location to push to for the index containing this segment. //The index will contain (at most) the last PUSH_INDEX_SIZE segments. @@ -156,7 +156,7 @@ void pushFirstElement(std::string qId) { proxyToPost(srcConn, srcLocation, dstConn, dstLocation); - srcConn = Socket::Connection(srcHost, srcPort, true); + srcConn.open(srcHost, srcPort, true); //Set the location to push to for the global index containing all qualities. srcLocation = baseURL + "/push/index.m3u8"; @@ -255,7 +255,7 @@ namespace Mist { } //Reconnect when disconnected if (!listConn.connected()){ - listConn = Socket::Connection(srcHost, srcPort, true); + listConn.open(srcHost, srcPort, true); } //Request the push list if (listConn.connected()){ diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 6d76bf45..ba18a000 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -43,9 +43,9 @@ namespace Mist{ } initialize(); INFO_MSG("About to push stream %s out. Host: %s, port: %d, app: %s, stream: %s", streamName.c_str(), pushUrl.host.c_str(), pushUrl.getPort(), app.c_str(), streamOut.c_str()); - if (pushUrl.protocol == "rtmp"){myConn = Socket::Connection(pushUrl.host, pushUrl.getPort(), false);} + if (pushUrl.protocol == "rtmp"){myConn.open(pushUrl.host, pushUrl.getPort(), false);} #ifdef SSL - if (pushUrl.protocol == "rtmps"){myConn = Socket::Connection(pushUrl.host, pushUrl.getPort(), false, true);} + if (pushUrl.protocol == "rtmps"){myConn.open(pushUrl.host, pushUrl.getPort(), false, true);} #endif if (!myConn){ FAIL_MSG("Could not connect to %s:%d!", pushUrl.host.c_str(), pushUrl.getPort());