From cac86fff575b5eceee8ad4c4edf2673aa32c7ed6 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 1 Dec 2022 10:13:46 +0100 Subject: [PATCH] Various small fixes to SRT sockets and SRT socket statistics --- lib/socket_srt.cpp | 39 +++++++++++++++++++++++++++++++++++-- lib/socket_srt.h | 7 +++++++ lib/ts_stream.cpp | 12 ++++++++++-- lib/ts_stream.h | 3 +++ src/input/input.cpp | 1 - src/input/input_tssrt.cpp | 13 +++++-------- src/input/input_tssrt.h | 2 +- src/output/output_tssrt.cpp | 37 ++++++++++++++++++++++++----------- 8 files changed, 89 insertions(+), 25 deletions(-) diff --git a/lib/socket_srt.cpp b/lib/socket_srt.cpp index ac08dd8c..dff28e74 100644 --- a/lib/socket_srt.cpp +++ b/lib/socket_srt.cpp @@ -76,6 +76,38 @@ namespace Socket{ lastGood = Util::bootMS(); } + // Copy constructor + SRTConnection::SRTConnection(const SRTConnection &rhs){ + initializeEmpty(); + *this = rhs; + } + + // Assignment constructor + SRTConnection &SRTConnection::operator=(const SRTConnection &rhs){ + close(); + initializeEmpty(); + if (!rhs){return *this;} + memcpy(&remoteaddr, &(rhs.remoteaddr), sizeof(sockaddr_in6)); + direction = rhs.direction; + remotehost = rhs.remotehost; + sock = rhs.sock; + performanceMonitor = rhs.performanceMonitor; + host = rhs.host; + outgoing_port = rhs.outgoing_port; + prev_pktseq = rhs.prev_pktseq; + lastGood = rhs.lastGood; + chunkTransmitSize = rhs.chunkTransmitSize; + adapter = rhs.adapter; + modeName = rhs.modeName; + timeout = rhs.timeout; + tsbpdMode = rhs.tsbpdMode; + params = rhs.params; + blocking = rhs.blocking; + getBinHost(); + return *this; + } + + SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction, const std::map &_params){ connect(_host, _port, _direction, _params); @@ -101,7 +133,7 @@ namespace Socket{ memcpy(tmpBuffer + 12, &(reinterpret_cast(&remoteaddr)->sin_addr.s_addr), 4); break; case AF_INET6: memcpy(tmpBuffer, &(remoteaddr.sin6_addr.s6_addr), 16); break; - default: return ""; break; + default: memset(tmpBuffer, 0, 16); break; } return std::string(tmpBuffer, 16); } @@ -343,6 +375,7 @@ namespace Socket{ } void SRTConnection::initializeEmpty(){ + memset(&performanceMonitor, 0, sizeof(performanceMonitor)); prev_pktseq = 0; sock = SRT_INVALID_SOCK; outgoing_port = 0; @@ -494,11 +527,12 @@ namespace Socket{ } r.direction = direction; + r.params = conn.params; r.postConfigureSocket(); r.setBlocking(!nonblock); static char addrconv[INET6_ADDRSTRLEN]; - r.remoteaddr = tmpaddr; + memcpy(&(r.remoteaddr), &tmpaddr, sizeof(tmpaddr)); if (tmpaddr.sin6_family == AF_INET6){ r.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); HIGH_MSG("IPv6 addr [%s]", r.remotehost.c_str()); @@ -508,6 +542,7 @@ namespace Socket{ HIGH_MSG("IPv4 addr [%s]", r.remotehost.c_str()); } INFO_MSG("Accepted a socket coming from %s", r.remotehost.c_str()); + r.getBinHost(); return r; } diff --git a/lib/socket_srt.h b/lib/socket_srt.h index 07e947f8..c25ec1da 100644 --- a/lib/socket_srt.h +++ b/lib/socket_srt.h @@ -24,9 +24,16 @@ namespace Socket{ }// namespace SockOpt }// namespace SRT + //Advance declaration so we can make it a friend of SRTConnection + class SRTServer; + class SRTConnection{ + friend class SRTServer; public: SRTConnection(); + // copy/assignment constructors + SRTConnection(const SRTConnection &rhs); + SRTConnection &operator=(const SRTConnection &rhs); SRTConnection(SRTSOCKET alreadyConnected); SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input", const paramList &_params = paramList()); diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index 38b224f4..81011421 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -15,6 +15,14 @@ tthread::recursive_mutex tMutex; namespace TS{ + Assembler::Assembler(){ + isLive = false; + } + + void Assembler::setLive(bool live){ + isLive = live; + } + bool Assembler::assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse, uint64_t bytePos){ bool ret = false; size_t offset = 0; @@ -30,7 +38,7 @@ namespace TS{ tsBuf.FromPointer(leftData); if (!ret && tsBuf.getUnitStart()){ret = true;} if (parse){ - TSStrm.parse(tsBuf, bytePos); + TSStrm.parse(tsBuf, isLive?0:bytePos); }else{ TSStrm.add(tsBuf); if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} @@ -59,7 +67,7 @@ namespace TS{ tsBuf.FromPointer(ptr + offset); if (!ret && tsBuf.getUnitStart()){ret = true;} if (parse){ - TSStrm.parse(tsBuf, bytePos); + TSStrm.parse(tsBuf, isLive?0:bytePos); }else{ TSStrm.add(tsBuf); if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} diff --git a/lib/ts_stream.h b/lib/ts_stream.h index aef91ed8..023dcd42 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -112,9 +112,12 @@ namespace TS{ class Assembler{ public: + Assembler(); bool assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse = false, uint64_t bytePos = 0); void clear(); + void setLive(bool live = true); private: + bool isLive; Util::ResizeablePointer leftData; TS::Packet tsBuf; }; diff --git a/src/input/input.cpp b/src/input/input.cpp index c76bc185..ec75da5e 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -957,7 +957,6 @@ namespace Mist{ void Input::connStats(Comms::Connections &statComm){ statComm.setUp(0); statComm.setDown(streamByteCount()); - statComm.setHost(getConnectedBinHost()); } void Input::realtimeMainLoop(){ diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp index 28342930..2edc3afa 100644 --- a/src/input/input_tssrt.cpp +++ b/src/input/input_tssrt.cpp @@ -39,9 +39,8 @@ void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ // We use threads here for multiple input pushes, because of the internals of the SRT Library static void callThreadCallbackSRT(void *socknum){ - SRTSOCKET sock = *((SRTSOCKET *)socknum); // use the accepted socket as the second parameter - Mist::inputTSSRT inp(cfgPointer, sock); + Mist::inputTSSRT inp(cfgPointer, *(Socket::SRTConnection *)socknum); inp.setSingular(false); inp.run(); } @@ -49,7 +48,7 @@ static void callThreadCallbackSRT(void *socknum){ namespace Mist{ /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. - inputTSSRT::inputTSSRT(Util::Config *cfg, SRTSOCKET s) : Input(cfg){ + inputTSSRT::inputTSSRT(Util::Config *cfg, Socket::SRTConnection s) : Input(cfg){ rawIdx = INVALID_TRACK_ID; lastRawPacket = 0; capa["name"] = "TSSRT"; @@ -118,8 +117,8 @@ namespace Mist{ config->addOption("raw", option); // Setup if we are called form with a thread for push-based input. - if (s != -1){ - srtConn = Socket::SRTConnection(s); + if (s.connected()){ + srtConn = s; streamName = baseStreamName; std::string streamid = srtConn.getStreamName(); int64_t acc = config->getInteger("acceptable"); @@ -263,9 +262,8 @@ namespace Mist{ while (config->is_active && sSock.connected()){ Socket::SRTConnection S = sSock.accept(); if (S.connected()){// check if the new connection is valid - SRTSOCKET sock = S.getSocket(); // spawn a new thread for this connection - tthread::thread T(callThreadCallbackSRT, (void *)&sock); + tthread::thread T(callThreadCallbackSRT, (void *)&S); // detach it, no need to keep track of it anymore T.detach(); HIGH_MSG("Spawned new thread for socket %i", S.getSocket()); @@ -285,7 +283,6 @@ namespace Mist{ void inputTSSRT::connStats(Comms::Connections &statComm){ statComm.setUp(srtConn.dataUp()); statComm.setDown(srtConn.dataDown()); - statComm.setHost(getConnectedBinHost()); statComm.setPacketCount(srtConn.packetCount()); statComm.setPacketLostCount(srtConn.packetLostCount()); statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h index 143174cb..a70c8a07 100644 --- a/src/input/input_tssrt.h +++ b/src/input/input_tssrt.h @@ -11,7 +11,7 @@ namespace Mist{ class inputTSSRT : public Input{ public: - inputTSSRT(Util::Config *cfg, SRTSOCKET s = -1); + inputTSSRT(Util::Config *cfg, Socket::SRTConnection s = Socket::SRTConnection()); ~inputTSSRT(); void setSingular(bool newSingular); virtual bool needsLock(); diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index 3a3402e1..2ff1fc8c 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -7,6 +7,8 @@ #include #include +bool allowStreamNameOverride = true; + namespace Mist{ OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){ // NOTE: conn is useless for SRT, as it uses a different socket type. @@ -14,6 +16,7 @@ namespace Mist{ streamName = config->getString("streamname"); Util::setStreamName(streamName); pushOut = false; + assembler.setLive(); // Push output configuration if (config->getString("target").size()){ target = HTTP::URL(config->getString("target")); @@ -28,11 +31,7 @@ namespace Mist{ return; } pushOut = true; - std::map arguments; - HTTP::parseVars(target.args, arguments); - for (std::map::iterator it = arguments.begin(); it != arguments.end(); ++it){ - targetParams[it->first] = it->second; - } + HTTP::parseVars(target.args, targetParams); size_t connectCnt = 0; do{ srtConn.connect(target.host, target.getPort(), "output", targetParams); @@ -44,6 +43,9 @@ namespace Mist{ } ++connectCnt; }while (!srtConn && connectCnt < 5); + if (!srtConn){ + FAIL_MSG("Failed to connect to '%s'!", config->getString("target").c_str()); + } wantRequest = false; parseData = true; initialize(); @@ -51,10 +53,12 @@ namespace Mist{ // Pull output configuration, In this case we have an srt connection in the second constructor parameter. // Handle override / append of streamname options std::string sName = srtConn.getStreamName(); - if (sName != ""){ - streamName = sName; - Util::sanitizeName(streamName); - Util::setStreamName(streamName); + if (allowStreamNameOverride){ + if (sName != ""){ + streamName = sName; + Util::sanitizeName(streamName); + Util::setStreamName(streamName); + } } int64_t accTypes = config->getInteger("acceptable"); @@ -437,7 +441,8 @@ int main(int argc, char *argv[]){ int filelimit = conf.getInteger("filelimit"); Util::sysSetNrOpenFiles(filelimit); - if (!mistOut::listenMode()){ + std::string target = conf.getString("target"); + if (!mistOut::listenMode() && (!target.size() || Socket::interpretSRTMode(HTTP::URL(target)) != "listener")){ Socket::Connection S(fileno(stdout), fileno(stdin)); Socket::SRTConnection tmpSock; mistOut tmp(S, tmpSock); @@ -450,7 +455,17 @@ int main(int argc, char *argv[]){ new_action.sa_flags = 0; sigaction(SIGUSR1, &new_action, NULL); } - if (conf.getInteger("port") && conf.getString("interface").size()){ + if (target.size()){ + //Force acceptable option to 1 (outgoing only), since this is a push output and we can't accept incoming connections + conf.getOption("acceptable", true).append((uint64_t)1); + //Disable overriding streamname with streamid parameter on other side + allowStreamNameOverride = false; + HTTP::URL tgt(target); + std::map arguments; + HTTP::parseVars(tgt.args, arguments); + server_socket = Socket::SRTServer(tgt.getPort(), tgt.host, arguments, false, "output"); + conf.getOption("target", true).append(""); + }else{ std::map arguments; server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), arguments, false, "output"); }