Robustified Socket::Connection, added debugging data for copies/assigns and Socket::Connetion::open() calls for proper socket re-use.

This commit is contained in:
Thulinma 2019-06-30 22:36:29 +02:00
parent 8fe1dbb618
commit 66890c4564
16 changed files with 141 additions and 44 deletions

View file

@ -81,12 +81,12 @@ namespace HTTP{
connectedPort = link.getPort(); connectedPort = link.getPort();
#ifdef SSL #ifdef SSL
if (needSSL){ if (needSSL){
S = Socket::Connection(connectedHost, connectedPort, true, true); S.open(connectedHost, connectedPort, true, true);
}else{ }else{
S = Socket::Connection(connectedHost, connectedPort, true); S.open(connectedHost, connectedPort, true);
} }
#else #else
S = Socket::Connection(connectedHost, connectedPort, true); S.open(connectedHost, connectedPort, true);
#endif #endif
} }
}else{ }else{
@ -95,7 +95,7 @@ namespace HTTP{
getSocket().close(); getSocket().close();
connectedHost = proxyUrl.host; connectedHost = proxyUrl.host;
connectedPort = proxyUrl.getPort(); connectedPort = proxyUrl.getPort();
S = Socket::Connection(connectedHost, connectedPort, true); S.open(connectedHost, connectedPort, true);
} }
} }
ssl = needSSL; ssl = needSSL;

View file

@ -402,19 +402,39 @@ void Socket::Connection::setBoundAddr(){
} }
} }
//Cleans up the socket by dropping the connection.
//Does not call close because it calls shutdown, which would destroy any copies of this socket too.
Socket::Connection::~Connection(){
drop();
}
/// Create a new base socket. This is a basic constructor for converting any valid socket to a /// Create a new base socket. This is a basic constructor for converting any valid socket to a
/// Socket::Connection. \param sockNo Integer representing the socket to convert. /// Socket::Connection. \param sockNo Integer representing the socket to convert.
Socket::Connection::Connection(int sockNo){ Socket::Connection::Connection(int sockNo){
clear(); clear();
sSend = sockNo; open(sockNo, sockNo);
isTrueSocket = Socket::checkTrueSocket(sSend);
setBoundAddr();
}// Socket::Connection basic constructor }// Socket::Connection basic constructor
/// Open from existing socket connection.
/// Closes any existing connections and resets all internal values beforehand.
/// Simply calls open(sockNo, sockNo) internally.
void Socket::Connection::open(int sockNo){
open(sockNo, sockNo);
}
/// Simulate a socket using two file descriptors. /// Simulate a socket using two file descriptors.
/// \param write The filedescriptor to write to. /// \param write The filedescriptor to write to.
/// \param read The filedescriptor to read from. /// \param read The filedescriptor to read from.
Socket::Connection::Connection(int write, int read){ Socket::Connection::Connection(int write, int read){
clear();
open(write, read);
}// Socket::Connection basic constructor
/// Open from two existing file descriptors.
/// Closes any existing connections and resets all internal values beforehand.
void Socket::Connection::open(int write, int read){
drop();
clear(); clear();
sSend = write; sSend = write;
if (write != read){ if (write != read){
@ -424,7 +444,7 @@ Socket::Connection::Connection(int write, int read){
} }
isTrueSocket = Socket::checkTrueSocket(sSend); isTrueSocket = Socket::checkTrueSocket(sSend);
setBoundAddr(); setBoundAddr();
}// Socket::Connection basic constructor }
void Socket::Connection::clear(){ void Socket::Connection::clear(){
sSend = -1; sSend = -1;
@ -516,9 +536,21 @@ bool Socket::Connection::isBlocking(){
/// This function calls shutdown, thus making the socket unusable in all other /// This function calls shutdown, thus making the socket unusable in all other
/// processes as well. Do not use on shared sockets that are still in use. /// processes as well. Do not use on shared sockets that are still in use.
void Socket::Connection::close(){ void Socket::Connection::close(){
if (sSend != -1){shutdown(sSend, SHUT_RDWR);}
drop();
}// Socket::Connection::close
/// Close connection. The internal socket is closed and then set to -1.
/// If the connection is already closed, nothing happens.
/// This function does *not* call shutdown, allowing continued use in other
/// processes.
void Socket::Connection::drop(){
#ifdef SSL #ifdef SSL
if (sslConnected){ if (sslConnected){
DONTEVEN_MSG("SSL close"); DONTEVEN_MSG("SSL close");
if (ssl){
mbedtls_ssl_close_notify(ssl);
}
if (server_fd){ if (server_fd){
mbedtls_net_free(server_fd); mbedtls_net_free(server_fd);
delete server_fd; delete server_fd;
@ -548,15 +580,6 @@ void Socket::Connection::close(){
return; return;
} }
#endif #endif
if (sSend != -1){shutdown(sSend, SHUT_RDWR);}
drop();
}// Socket::Connection::close
/// Close connection. The internal socket is closed and then set to -1.
/// If the connection is already closed, nothing happens.
/// This function does *not* call shutdown, allowing continued use in other
/// processes.
void Socket::Connection::drop(){
if (connected()){ if (connected()){
if (sSend != -1){ if (sSend != -1){
HIGH_MSG("Socket %d closed", sSend); HIGH_MSG("Socket %d closed", sSend);
@ -596,6 +619,14 @@ std::string Socket::Connection::getError(){
/// \param address String containing the location of the Unix socket to connect to. /// \param address String containing the location of the Unix socket to connect to.
/// \param nonblock Whether the socket should be nonblocking. False by default. /// \param nonblock Whether the socket should be nonblocking. False by default.
Socket::Connection::Connection(std::string address, bool nonblock){ Socket::Connection::Connection(std::string address, bool nonblock){
clear();
open(address, nonblock);
}// Socket::Connection Unix Constructor
/// Open Unix connection.
/// Closes any existing connections and resets all internal values beforehand.
void Socket::Connection::open(std::string address, bool nonblock){
drop();
clear(); clear();
isTrueSocket = true; isTrueSocket = true;
sSend = socket(PF_UNIX, SOCK_STREAM, 0); sSend = socket(PF_UNIX, SOCK_STREAM, 0);
@ -619,7 +650,7 @@ Socket::Connection::Connection(std::string address, bool nonblock){
FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str());
close(); close();
} }
}// Socket::Connection Unix Constructor }
#ifdef SSL #ifdef SSL
///Local-only function for debugging SSL sockets ///Local-only function for debugging SSL sockets
@ -635,6 +666,14 @@ static void my_debug(void *ctx, int level, const char *file, int line, const cha
/// \param port String containing the port to connect to. /// \param port String containing the port to connect to.
/// \param nonblock Whether the socket should be nonblocking. /// \param nonblock Whether the socket should be nonblocking.
Socket::Connection::Connection(std::string host, int port, bool nonblock, bool with_ssl){ Socket::Connection::Connection(std::string host, int port, bool nonblock, bool with_ssl){
clear();
open(host, port, nonblock, with_ssl);
}
/// Open TCP connection.
/// Closes any existing connections and resets all internal values beforehand.
void Socket::Connection::open(std::string host, int port, bool nonblock, bool with_ssl){
drop();
clear(); clear();
if (with_ssl){ if (with_ssl){
#ifdef SSL #ifdef SSL
@ -743,7 +782,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock, bool w
setsockopt(sSend, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); setsockopt(sSend, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
setBoundAddr(); setBoundAddr();
} }
}// Socket::Connection TCP Constructor }
/// Returns the connected-state for this socket. /// Returns the connected-state for this socket.
/// Note that this function might be slightly behind the real situation. /// Note that this function might be slightly behind the real situation.
@ -1053,6 +1092,56 @@ Socket::Connection::operator bool() const{
return connected(); return connected();
} }
//Copy constructor
Socket::Connection::Connection(const Connection& rhs){
clear();
if (!rhs){return;}
#if DEBUG >= DLVL_DEVEL
INFO_MSG("Copying %s socket", rhs.sslConnected?"SSL":"regular");
BACKTRACE;
#endif
conntime = rhs.conntime;
isTrueSocket = rhs.isTrueSocket;
remotehost = rhs.remotehost;
boundaddr = rhs.boundaddr;
up = rhs.up;
down = rhs.down;
downbuffer = rhs.downbuffer;
if (!rhs.sslConnected){
if (rhs.sSend >= 0){sSend = dup(rhs.sSend);}
if (rhs.sRecv >= 0){sRecv = dup(rhs.sRecv);}
#if DEBUG >= DLVL_DEVEL
INFO_MSG("Socket original = (%d / %d), copy = (%d / %d)", rhs.sSend, rhs.sRecv, sSend, sRecv);
#endif
}
}
//Assignment constructor
Socket::Connection& Socket::Connection::operator=(const Socket::Connection& rhs){
drop();
clear();
if (!rhs){return *this;}
#if DEBUG >= DLVL_DEVEL
INFO_MSG("Assigning %s socket", rhs.sslConnected?"SSL":"regular");
BACKTRACE;
#endif
conntime = rhs.conntime;
isTrueSocket = rhs.isTrueSocket;
remotehost = rhs.remotehost;
boundaddr = rhs.boundaddr;
up = rhs.up;
down = rhs.down;
downbuffer = rhs.downbuffer;
if (!rhs.sslConnected){
if (rhs.sSend >= 0){sSend = dup(rhs.sSend);}
if (rhs.sRecv >= 0){sRecv = dup(rhs.sRecv);}
#if DEBUG >= DLVL_DEVEL
INFO_MSG("Socket original = (%d / %d), copy = (%d / %d)", rhs.sSend, rhs.sRecv, sSend, sRecv);
#endif
}
return *this;
}
/// Returns true if the given address can be matched with the remote host. /// Returns true if the given address can be matched with the remote host.
/// Can no longer return true after any socket error have occurred. /// Can no longer return true after any socket error have occurred.
bool Socket::Connection::isAddress(const std::string &addr){ bool Socket::Connection::isAddress(const std::string &addr){

View file

@ -109,7 +109,16 @@ namespace Socket{
Connection(std::string hostname, int port, bool nonblock, bool with_ssl = false); ///< Create a new TCP socket. Connection(std::string hostname, int port, bool nonblock, bool with_ssl = false); ///< Create a new TCP socket.
Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket.
Connection(int write, int read); ///< Simulate a socket using two file descriptors. Connection(int write, int read); ///< Simulate a socket using two file descriptors.
// copy/assignment constructors
Connection(const Connection& rhs);
Connection& operator=(const Connection& rhs);
// destructor
~Connection();
// generic methods // generic methods
void open(int sockNo);//Open from existing socket connection.
void open(std::string hostname, int port, bool nonblock, bool with_ssl = false);//Open TCP connection.
void open(std::string adres, bool nonblock = false);//Open Unix connection.
void open(int write, int read);//Open from two existing file descriptors.
void close(); ///< Close connection. void close(); ///< Close connection.
void drop(); ///< Close connection without shutdown. void drop(); ///< Close connection without shutdown.
void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false).

View file

@ -323,7 +323,7 @@ dashAnalyser::dashAnalyser(Util::Config conf) : analysers(conf) {
startTime = Util::bootSecs(); startTime = Util::bootSecs();
abortTime = conf.getInteger("abort"); abortTime = conf.getInteger("abort");
conn = Socket::Connection(server, port, false); conn.open(server, port, false);
if(!conn.connected()) if(!conn.connected())
{ {
@ -336,7 +336,7 @@ dashAnalyser::dashAnalyser(Util::Config conf) : analysers(conf) {
urlPrependStuff = url.substr(0, url.rfind("/") + 1); urlPrependStuff = url.substr(0, url.rfind("/") + 1);
DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str()); DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str());
if (!conn) { if (!conn) {
conn = Socket::Connection(server, port, false); conn.open(server, port, false);
} }
pos = 0; pos = 0;
@ -414,7 +414,7 @@ int dashAnalyser::doAnalyse() {
for (unsigned int i = 0; i < streamData.size(); i++) { for (unsigned int i = 0; i < streamData.size(); i++) {
if (streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet) tempID = i; if (streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet) tempID = i;
} }
if (!conn) { conn = Socket::Connection(server, port, false); } if (!conn) { conn.open(server, port, false); }
HTTP::Parser H; HTTP::Parser H;
H.url = urlPrependStuff; H.url = urlPrependStuff;
H.url.append(currentPos.begin()->url); H.url.append(currentPos.begin()->url);
@ -530,7 +530,7 @@ int main2(int argc, char **argv) {
DEBUG_MSG(DLVL_INFO, "url %s server: %s port: %d", url.c_str(), server.c_str(), port); DEBUG_MSG(DLVL_INFO, "url %s server: %s port: %d", url.c_str(), server.c_str(), port);
std::string urlPrependStuff = url.substr(0, url.rfind("/") + 1); std::string urlPrependStuff = url.substr(0, url.rfind("/") + 1);
DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str()); DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str());
if (!conn) { conn = Socket::Connection(server, port, false); } if (!conn) { conn.open(server, port, false); }
unsigned int pos = 0; unsigned int pos = 0;
HTTP::Parser H; HTTP::Parser H;
H.url = url; H.url = url;
@ -593,7 +593,7 @@ int main2(int argc, char **argv) {
for (unsigned int i = 0; i < streamData.size(); i++) { for (unsigned int i = 0; i < streamData.size(); i++) {
if (streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet) tempID = i; if (streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet) tempID = i;
} }
if (!conn) { conn = Socket::Connection(server, port, false); } if (!conn) { conn.open(server, port, false); }
HTTP::Parser H; HTTP::Parser H;
H.url = urlPrependStuff; H.url = urlPrependStuff;
H.url.append(currentPos.begin()->url); H.url.append(currentPos.begin()->url);

View file

@ -14,7 +14,7 @@ void AnalyserDTSC::init(Util::Config &conf){
bool AnalyserDTSC::open(const std::string &filename){ bool AnalyserDTSC::open(const std::string &filename){
if (!Analyser::open(filename)){return false;} if (!Analyser::open(filename)){return false;}
conn = Socket::Connection(1, 0); conn.open(1, 0);
totalBytes = 0; totalBytes = 0;
return true; return true;
} }

View file

@ -27,7 +27,7 @@ void AnalyserRTSP::incoming(const DTSC::Packet &pkt){
bool AnalyserRTSP::open(const std::string &filename){ bool AnalyserRTSP::open(const std::string &filename){
if (!Analyser::open(filename)){return false;} if (!Analyser::open(filename)){return false;}
myConn = Socket::Connection(1, 0); myConn.open(1, 0);
return true; return true;
} }

View file

@ -369,7 +369,7 @@ int main(int argc, char ** argv) {
std::string urlPrependStuff= url.substr(0, url.rfind("/")+1); std::string urlPrependStuff= url.substr(0, url.rfind("/")+1);
DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str()); DEBUG_MSG(DLVL_INFO, "prepend stuff: %s", urlPrependStuff.c_str());
if (!conn) { if (!conn) {
conn = Socket::Connection(server, port, false); conn.open(server, port, false);
} }
unsigned int pos = 0; unsigned int pos = 0;
HTTP::Parser H; HTTP::Parser H;
@ -434,7 +434,7 @@ int main(int argc, char ** argv) {
if( streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet ) tempID=i; if( streamData[i].trackID == currentPos.begin()->trackID && streamData[i].adaptationSet == currentPos.begin()->adaptationSet ) tempID=i;
} }
if (!conn) { if (!conn) {
conn = Socket::Connection(server,port, false); conn.open(server,port, false);
} }
HTTP::Parser H; HTTP::Parser H;
H.url = urlPrependStuff; H.url = urlPrependStuff;

View file

@ -39,7 +39,7 @@ void Controller::uplinkConnection(void * np) {
while (Controller::conf.is_active) { while (Controller::conf.is_active) {
if (!uplink) { if (!uplink) {
INFO_MSG("Connecting to uplink at %s:%u", uplink_host.c_str(), uplink_port); INFO_MSG("Connecting to uplink at %s:%u", uplink_host.c_str(), uplink_port);
uplink = Socket::Connection(uplink_host, uplink_port, true); uplink.open(uplink_host, uplink_port, true);
} }
if (uplink) { if (uplink) {
if (uplink.spool()) { if (uplink.spool()) {

View file

@ -160,7 +160,7 @@ namespace Mist {
bool inputDTSC::openStreamSource() { bool inputDTSC::openStreamSource() {
std::string source = config->getString("input"); std::string source = config->getString("input");
if (source == "-"){ if (source == "-"){
srcConn = Socket::Connection(fileno(stdout),fileno(stdin)); srcConn.open(fileno(stdout),fileno(stdin));
return true; return true;
} }
if (source.find("dtsc://") == 0) { if (source.find("dtsc://") == 0) {
@ -175,7 +175,7 @@ namespace Mist {
if (streamName == "") { if (streamName == "") {
streamName = givenStream; streamName = givenStream;
} }
srcConn = Socket::Connection(host, port, true); srcConn.open(host, port, true);
if (!srcConn.connected()){ if (!srcConn.connected()){
return false; return false;
} }

View file

@ -44,9 +44,9 @@ namespace Mist{
int fin = -1, fout = -1; int fin = -1, fout = -1;
inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0);
myConn = Socket::Connection(-1, fout); myConn.open(-1, fout);
}else{ }else{
myConn = Socket::Connection(fileno(stdout), fileno(stdin)); myConn.open(fileno(stdout), fileno(stdin));
} }
myConn.Received().splitter.assign("\000\000\001", 3); myConn.Received().splitter.assign("\000\000\001", 3);
myMeta.vod = false; myMeta.vod = false;

View file

@ -134,7 +134,7 @@ namespace Mist{
} }
bool InputRTSP::openStreamSource(){ bool InputRTSP::openStreamSource(){
tcpCon = Socket::Connection(url.host, url.getPort(), false); tcpCon.open(url.host, url.getPort(), false);
mainConn = &tcpCon; mainConn = &tcpCon;
return tcpCon; return tcpCon;
} }

View file

@ -212,13 +212,13 @@ namespace Mist {
int fin = -1, fout = -1; int fin = -1, fout = -1;
inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0);
tcpCon = Socket::Connection(-1, fout); tcpCon.open(-1, fout);
return true; return true;
} }
//streamed file //streamed file
if (inpt.substr(0,9) == "stream://"){ if (inpt.substr(0,9) == "stream://"){
inFile = fopen(inpt.c_str()+9, "r"); inFile = fopen(inpt.c_str()+9, "r");
tcpCon = Socket::Connection(-1, fileno(inFile)); tcpCon.open(-1, fileno(inFile));
standAlone = false; standAlone = false;
return inFile; return inFile;
} }

View file

@ -41,8 +41,7 @@ namespace Mist {
std::string host = getConnectedHost(); std::string host = getConnectedHost();
dup2(myConn.getSocket(), STDIN_FILENO); dup2(myConn.getSocket(), STDIN_FILENO);
dup2(myConn.getSocket(), STDOUT_FILENO); dup2(myConn.getSocket(), STDOUT_FILENO);
myConn.drop(); myConn.open(STDOUT_FILENO, STDIN_FILENO);
myConn = Socket::Connection(STDOUT_FILENO, STDIN_FILENO);
myConn.setHost(host); myConn.setHost(host);
} }
if (config->getString("nostreamtext").size()){ if (config->getString("nostreamtext").size()){

View file

@ -33,7 +33,7 @@ namespace Mist{
int fin = -1; int fin = -1;
Util::Procs::StartPiped(args, &fin, 0, 0); Util::Procs::StartPiped(args, &fin, 0, 0);
myConn = Socket::Connection(fin, -1); myConn.open(fin, -1);
wantRequest = false; wantRequest = false;
parseData = true; parseData = true;

View file

@ -143,7 +143,7 @@ void pushFirstElement(std::string qId) {
proxyToPost(srcConn, srcLocation, dstConn, dstLocation); proxyToPost(srcConn, srcLocation, dstConn, dstLocation);
srcConn = Socket::Connection(srcHost, srcPort, true); srcConn.open(srcHost, srcPort, true);
//Set the location to push to for the index containing this segment. //Set the location to push to for the index containing this segment.
//The index will contain (at most) the last PUSH_INDEX_SIZE segments. //The index will contain (at most) the last PUSH_INDEX_SIZE segments.
@ -156,7 +156,7 @@ void pushFirstElement(std::string qId) {
proxyToPost(srcConn, srcLocation, dstConn, dstLocation); proxyToPost(srcConn, srcLocation, dstConn, dstLocation);
srcConn = Socket::Connection(srcHost, srcPort, true); srcConn.open(srcHost, srcPort, true);
//Set the location to push to for the global index containing all qualities. //Set the location to push to for the global index containing all qualities.
srcLocation = baseURL + "/push/index.m3u8"; srcLocation = baseURL + "/push/index.m3u8";
@ -255,7 +255,7 @@ namespace Mist {
} }
//Reconnect when disconnected //Reconnect when disconnected
if (!listConn.connected()){ if (!listConn.connected()){
listConn = Socket::Connection(srcHost, srcPort, true); listConn.open(srcHost, srcPort, true);
} }
//Request the push list //Request the push list
if (listConn.connected()){ if (listConn.connected()){

View file

@ -43,9 +43,9 @@ namespace Mist{
} }
initialize(); initialize();
INFO_MSG("About to push stream %s out. Host: %s, port: %d, app: %s, stream: %s", streamName.c_str(), pushUrl.host.c_str(), pushUrl.getPort(), app.c_str(), streamOut.c_str()); INFO_MSG("About to push stream %s out. Host: %s, port: %d, app: %s, stream: %s", streamName.c_str(), pushUrl.host.c_str(), pushUrl.getPort(), app.c_str(), streamOut.c_str());
if (pushUrl.protocol == "rtmp"){myConn = Socket::Connection(pushUrl.host, pushUrl.getPort(), false);} if (pushUrl.protocol == "rtmp"){myConn.open(pushUrl.host, pushUrl.getPort(), false);}
#ifdef SSL #ifdef SSL
if (pushUrl.protocol == "rtmps"){myConn = Socket::Connection(pushUrl.host, pushUrl.getPort(), false, true);} if (pushUrl.protocol == "rtmps"){myConn.open(pushUrl.host, pushUrl.getPort(), false, true);}
#endif #endif
if (!myConn){ if (!myConn){
FAIL_MSG("Could not connect to %s:%d!", pushUrl.host.c_str(), pushUrl.getPort()); FAIL_MSG("Could not connect to %s:%d!", pushUrl.host.c_str(), pushUrl.getPort());