From 19199cbff810e47c270164f01378e93cbcea0229 Mon Sep 17 00:00:00 2001 From: Phencys Date: Sun, 26 Jul 2020 16:19:14 +0200 Subject: [PATCH] TSSRT Support --- CMakeLists.txt | 11 +- lib/comms.cpp | 45 +- lib/comms.h | 18 + lib/config.cpp | 83 +++- lib/config.h | 7 +- lib/socket_srt.cpp | 548 +++++++++++++++++++++++ lib/socket_srt.h | 154 +++++++ lib/ts_stream.cpp | 3 +- src/controller/controller_statistics.cpp | 120 ++++- src/controller/controller_statistics.h | 12 + src/input/input.cpp | 12 +- src/input/input.h | 5 + src/input/input_tssrt.cpp | 217 +++++++++ src/input/input_tssrt.h | 48 ++ src/output/mist_out_srt.cpp | 58 +++ src/output/output_tssrt.cpp | 112 +++++ src/output/output_tssrt.h | 33 ++ 17 files changed, 1471 insertions(+), 15 deletions(-) create mode 100644 lib/socket_srt.cpp create mode 100644 lib/socket_srt.h create mode 100644 src/input/input_tssrt.cpp create mode 100644 src/input/input_tssrt.h create mode 100644 src/output/mist_out_srt.cpp create mode 100644 src/output/output_tssrt.cpp create mode 100644 src/output/output_tssrt.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 72a54e38..154db578 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -187,6 +187,7 @@ set(libHeaders lib/sdp_media.h lib/shared_memory.h lib/socket.h + lib/socket_srt.h lib/srtp.h lib/stream.h lib/stun.h @@ -249,6 +250,7 @@ add_library (mist lib/sdp_media.cpp lib/shared_memory.cpp lib/socket.cpp + lib/socket_srt.cpp lib/srtp.cpp lib/stream.cpp lib/stun.cpp @@ -274,6 +276,7 @@ endif() target_link_libraries(mist -lpthread ${LIBRT} + -lsrt ) if (NOT DEFINED NOSSL ) target_link_libraries(mist mbedtls mbedx509 mbedcrypto srtp2) @@ -436,6 +439,7 @@ makeInput(Balancer balancer)#LTS makeInput(RTSP rtsp)#LTS makeInput(SRT srt)#LTS +makeInput(TSSRT tssrt)#LTS ######################################## # MistServer - Outputs # @@ -443,12 +447,16 @@ makeInput(SRT srt)#LTS macro(makeOutput outputName format) #Parse all extra arguments, for http and ts flags SET (tsBaseClass Output) + SET (outBaseFile src/output/mist_out.cpp) if (";${ARGN};" MATCHES ";http;") SET(httpOutput src/output/output_http.cpp) if (";${ARGN};" MATCHES ";ts;") SET(tsBaseClass HTTPOutput) endif() endif() + if (";${ARGN};" MATCHES ";srt;") + SET(outBaseFile src/output/mist_out_srt.cpp) + endif() if (";${ARGN};" MATCHES ";ts;") SET(tsOutput src/output/output_ts_base.cpp) endif() @@ -456,7 +464,7 @@ macro(makeOutput outputName format) SET(tsOutput generated/noffmpeg.h generated/noh264.h) endif() add_executable(MistOut${outputName} - src/output/mist_out.cpp + ${outBaseFile} src/output/output.cpp src/output/output_${format}.cpp src/io.cpp @@ -493,6 +501,7 @@ if (DEFINED WITH_JPG ) makeOutput(JPG jpg http jpg) endif() makeOutput(TS ts ts) +makeOutput(TSSRT tssrt ts srt) makeOutput(HTTPTS httpts http ts) makeOutput(HLS hls http ts) makeOutput(CMAF cmaf http)#LTS diff --git a/lib/comms.cpp b/lib/comms.cpp index 30863071..108c686a 100644 --- a/lib/comms.cpp +++ b/lib/comms.cpp @@ -5,8 +5,8 @@ #include "encode.h" #include "procs.h" #include "timing.h" -#include #include +#include namespace Comms{ Comms::Comms(){ @@ -166,6 +166,9 @@ namespace Comms{ dataAccX.addField("stream", RAX_STRING, 100); dataAccX.addField("connector", RAX_STRING, 20); dataAccX.addField("crc", RAX_32UINT); + dataAccX.addField("pktcount", RAX_64UINT); + dataAccX.addField("pktloss", RAX_64UINT); + dataAccX.addField("pktretrans", RAX_64UINT); } void Statistics::nullFields(){ @@ -180,6 +183,9 @@ namespace Comms{ setTime(0); setNow(0); setSync(0); + setPacketCount(0); + setPacketLostCount(0); + setPacketRetransmitCount(0); } void Statistics::fieldAccess(){ @@ -194,6 +200,9 @@ namespace Comms{ stream = dataAccX.getFieldAccX("stream"); connector = dataAccX.getFieldAccX("connector"); crc = dataAccX.getFieldAccX("crc"); + pktcount = dataAccX.getFieldAccX("pktcount"); + pktloss = dataAccX.getFieldAccX("pktloss"); + pktretrans = dataAccX.getFieldAccX("pktretrans"); } uint8_t Statistics::getSync() const{return sync.uint(index);} @@ -246,9 +255,7 @@ namespace Comms{ up.set(_up, idx); } - std::string Statistics::getHost() const{ - return std::string(host.ptr(index), 16); - } + std::string Statistics::getHost() const{return std::string(host.ptr(index), 16);} std::string Statistics::getHost(size_t idx) const{ if (!master){return std::string((size_t)16, (char)'\000');} return std::string(host.ptr(idx), 16); @@ -285,6 +292,36 @@ namespace Comms{ crc.set(_crc, idx); } + uint64_t Statistics::getPacketCount() const{return pktcount.uint(index);} + uint64_t Statistics::getPacketCount(size_t idx) const{ + return (master ? pktcount.uint(idx) : 0); + } + void Statistics::setPacketCount(uint64_t _count){pktcount.set(_count, index);} + void Statistics::setPacketCount(uint64_t _count, size_t idx){ + if (!master){return;} + pktcount.set(_count, idx); + } + + uint64_t Statistics::getPacketLostCount() const{return pktloss.uint(index);} + uint64_t Statistics::getPacketLostCount(size_t idx) const{ + return (master ? pktloss.uint(idx) : 0); + } + void Statistics::setPacketLostCount(uint64_t _lost){pktloss.set(_lost, index);} + void Statistics::setPacketLostCount(uint64_t _lost, size_t idx){ + if (!master){return;} + pktloss.set(_lost, idx); + } + + uint64_t Statistics::getPacketRetransmitCount() const{return pktretrans.uint(index);} + uint64_t Statistics::getPacketRetransmitCount(size_t idx) const{ + return (master ? pktretrans.uint(idx) : 0); + } + void Statistics::setPacketRetransmitCount(uint64_t _retrans){pktretrans.set(_retrans, index);} + void Statistics::setPacketRetransmitCount(uint64_t _retrans, size_t idx){ + if (!master){return;} + pktretrans.set(_retrans, idx); + } + std::string Statistics::getSessId() const{return getSessId(index);} std::string Statistics::getSessId(size_t idx) const{ diff --git a/lib/comms.h b/lib/comms.h index dc1c5757..a3f1b7b5 100644 --- a/lib/comms.h +++ b/lib/comms.h @@ -124,6 +124,21 @@ namespace Comms{ void setCRC(uint32_t _crc); void setCRC(uint32_t _crc, size_t idx); + uint64_t getPacketCount() const; + uint64_t getPacketCount(size_t idx) const; + void setPacketCount(uint64_t _count); + void setPacketCount(uint64_t _count, size_t idx); + + uint64_t getPacketLostCount() const; + uint64_t getPacketLostCount(size_t idx) const; + void setPacketLostCount(uint64_t _lost); + void setPacketLostCount(uint64_t _lost, size_t idx); + + uint64_t getPacketRetransmitCount() const; + uint64_t getPacketRetransmitCount(size_t idx) const; + void setPacketRetransmitCount(uint64_t _retransmit); + void setPacketRetransmitCount(uint64_t _retransmit, size_t idx); + std::string getSessId() const; std::string getSessId(size_t index) const; @@ -138,6 +153,9 @@ namespace Comms{ Util::FieldAccX stream; Util::FieldAccX connector; Util::FieldAccX crc; + Util::FieldAccX pktcount; + Util::FieldAccX pktloss; + Util::FieldAccX pktretrans; }; class Users : public Comms{ diff --git a/lib/config.cpp b/lib/config.cpp index f950ea7c..0a9b1ce5 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -3,6 +3,7 @@ #include "config.h" #include "defines.h" +#include "lib/socket_srt.h" #include "stream.h" #include "timing.h" #include "tinythread.h" @@ -30,17 +31,18 @@ #include #include #include +#include // for va_list #include #include #include -#include // for va_list bool Util::Config::is_active = false; bool Util::Config::is_restarting = false; static Socket::Server *serv_sock_pointer = 0; +static Socket::SRTServer *serv_srt_sock_pointer = 0; ///< Holds a pointer to SRT Server, if it is connected uint32_t Util::printDebugLevel = DEBUG; std::string Util::streamName; -char Util::exitReason[256] = {0}; +char Util::exitReason[256] ={0}; void Util::logExitReason(const char *format, ...){ if (exitReason[0]){return;} @@ -53,6 +55,13 @@ void Util::logExitReason(const char *format, ...){ std::string Util::listenInterface; uint32_t Util::listenPort = 0; +// Sets pointer to the SRT Server, for proper cleanup later. +// +// Currently used for TSSRT Input only, as this doesn't use the config library to setup a listener +void Util::Config::registerSRTSockPtr(Socket::SRTServer *ptr){ + serv_srt_sock_pointer = ptr; +} + Util::Config::Config(){ // global options here vals["debug"]["long"] = "debug"; @@ -202,7 +211,9 @@ bool Util::Config::parseArgs(int &argc, char **&argv){ #endif #ifdef STAT_CUTOFF if (STAT_CUTOFF != 600){ - std::cout << "- Setting: Stats cutoff point " << STAT_CUTOFF << " seconds. Statistics and session cache are only kept for this long, as opposed to the default of 600 seconds." << std::endl; + std::cout << "- Setting: Stats cutoff point " + << STAT_CUTOFF << " seconds. Statistics and session cache are only kept for this long, as opposed to the default of 600 seconds." + << std::endl; } #endif #ifndef SSL @@ -320,6 +331,23 @@ struct callbackData{ int (*cb)(Socket::Connection &); }; +// As above, but using an SRT Connection +struct callbackSRTData{ + Socket::SRTConnection *sock; + int (*cb)(Socket::SRTConnection &); +}; + +// Callback for SRT-serving threads +static void callThreadCallbackSRT(void *cDataArg){ + INSANE_MSG("Thread for %p started", cDataArg); + callbackSRTData *cData = (callbackSRTData *)cDataArg; + cData->cb(*(cData->sock)); + cData->sock->close(); + delete cData->sock; + delete cData; + INSANE_MSG("Thread for %p ended", cDataArg); +} + static void callThreadCallback(void *cDataArg){ INSANE_MSG("Thread for %p started", cDataArg); callbackData *cData = (callbackData *)cDataArg; @@ -402,6 +430,53 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ return r; } +// This is a THREADED server!! Fork does not work as the SRT library itself already starts up a +// thread, and forking after thread creation messes up all control flow internal to the library. +int Util::Config::serveSRTSocket(int (*callback)(Socket::SRTConnection &S)){ + Socket::SRTServer server_socket; + if (vals.isMember("port") && vals.isMember("interface")){ + server_socket = Socket::SRTServer(getInteger("port"), getString("interface"), false, "output"); + } + if (!server_socket.connected()){ + DEVEL_MSG("Failure to open socket"); + return 1; + } + serv_srt_sock_pointer = &server_socket; + activate(); + if (server_socket.getSocket()){ + int oldSock = server_socket.getSocket(); + if (!dup2(oldSock, 0)){ + server_socket = Socket::SRTServer(0); + close(oldSock); + } + } + int r = SRTServer(server_socket, callback); + serv_srt_sock_pointer = 0; + return r; +} + +int Util::Config::SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &)){ + Util::Procs::socketList.insert(server_socket.getSocket()); + while (is_active && server_socket.connected()){ + Socket::SRTConnection S = server_socket.accept(false, "output"); + if (S.connected()){// check if the new connection is valid + callbackSRTData *cData = new callbackSRTData; + cData->sock = new Socket::SRTConnection(S); + cData->cb = callback; + // spawn a new thread for this connection + tthread::thread T(callThreadCallbackSRT, (void *)cData); + // detach it, no need to keep track of it anymore + T.detach(); + HIGH_MSG("Spawned new thread for socket %i", S.getSocket()); + }else{ + Util::sleep(10); // sleep 10ms + } + } + Util::Procs::socketList.erase(server_socket.getSocket()); + if (!is_restarting){server_socket.close();} + return 0; +} + int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){ Socket::Server server_socket; if (Socket::checkTrueSocket(0)){ @@ -466,6 +541,8 @@ void Util::Config::signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ case SIGHUP: case SIGTERM: if (serv_sock_pointer){serv_sock_pointer->close();} + // Close the srt server as well, if set + if (serv_srt_sock_pointer){serv_srt_sock_pointer->close();} #if DEBUG >= DLVL_DEVEL static int ctr = 0; if (!is_active && ++ctr > 4){BACKTRACE;} diff --git a/lib/config.h b/lib/config.h index 5a860828..af1e2834 100644 --- a/lib/config.h +++ b/lib/config.h @@ -8,6 +8,7 @@ #endif #include "json.h" +#include "socket_srt.h" #include #include @@ -16,7 +17,7 @@ namespace Util{ extern uint32_t printDebugLevel; extern std::string streamName; ///< Used by debug messages to identify the stream name extern char exitReason[256]; - void logExitReason(const char * format, ...); + void logExitReason(const char *format, ...); /// Deals with parsing configuration from commandline options. class Config{ @@ -41,6 +42,7 @@ namespace Util{ int64_t getInteger(std::string optname); bool getBool(std::string optname); void activate(); + void registerSRTSockPtr(Socket::SRTServer *ptr); int threadServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S)); int forkServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S)); int serveThreadedSocket(int (*callback)(Socket::Connection &S)); @@ -49,6 +51,9 @@ namespace Util{ void addOptionsFromCapabilities(const JSON::Value &capabilities); void addBasicConnectorOptions(JSON::Value &capabilities); void addConnectorOptions(int port, JSON::Value &capabilities); + + int serveSRTSocket(int (*callback)(Socket::SRTConnection &S)); + int SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &S)); }; /// The interface address the current serveSocket function is listening on diff --git a/lib/socket_srt.cpp b/lib/socket_srt.cpp new file mode 100644 index 00000000..fae3cdad --- /dev/null +++ b/lib/socket_srt.cpp @@ -0,0 +1,548 @@ +#include "defines.h" +#include "lib/http_parser.h" +#include "socket_srt.h" + +#include +#include + +#define INVALID_SRT_SOCKET -1 + +namespace Socket{ + namespace SRT{ + bool isInited = false; + + // Both Init and Cleanup functions are called implicitly if not done ourselves. + // SRT documentation states explicitly that this is unreliable behaviour + bool libraryInit(){ + if (!isInited){ + int res = srt_startup(); + if (res == -1){ERROR_MSG("Unable to initialize SRT Library!");} + isInited = (res != -1); + } + return isInited; + } + + bool libraryCleanup(){ + if (isInited){ + srt_cleanup(); + isInited = false; + } + return true; + } + }// namespace SRT + + template std::string asString(const T &val){ + std::stringstream x; + x << val; + return x.str(); + } + + sockaddr_in createInetAddr(const std::string &_host, int _port){ + sockaddr_in res; + memset(&res, 9, sizeof res); + res.sin_family = AF_INET; + res.sin_port = htons(_port); + + if (_host != ""){ + if (inet_pton(AF_INET, _host.c_str(), &res.sin_addr) == 1){return res;} + hostent *he = gethostbyname(_host.c_str()); + if (!he || he->h_addrtype != AF_INET){ERROR_MSG("Host not found %s", _host.c_str());} + res.sin_addr = *(in_addr *)he->h_addr_list[0]; + } + + return res; + } + + std::string interpretSRTMode(const std::string &_mode, const std::string &_host, const std::string &_adapter){ + if (_mode == "client" || _mode == "caller"){return "caller";} + if (_mode == "server" || _mode == "listener"){return "listener";} + if (_mode == "rendezvouz"){return "rendezvous";} + if (_mode != "default"){return "";} + if (_host == ""){return "listener";} + if (_adapter != ""){return "rendezvous";} + return "caller"; + } + + std::string interpretSRTMode(const HTTP::URL &u){ + paramList params; + HTTP::parseVars(u.args, params); + return interpretSRTMode(params.count("mode") ? params.at("mode") : "default", u.host, ""); + } + + SRTConnection::SRTConnection(){initializeEmpty();} + + SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction, + const std::map &_params){ + connect(_host, _port, _direction, _params); + } + + std::string SRTConnection::getStreamName(){ + int sNameLen = 512; + char sName[sNameLen]; + int optRes = srt_getsockflag(sock, SRTO_STREAMID, (void *)sName, &sNameLen); + if (optRes != -1 && sNameLen){return sName;} + return ""; + } + + /// Updates the downbuffer internal variable. + /// Returns true if new data was received, false otherwise. + std::string SRTConnection::RecvNow(){ + char recvbuf[5000]; + + bool blockState = blocking; + setBlocking(true); + + SRT_MSGCTRL mc = srt_msgctrl_default; + int32_t receivedBytes = srt_recvmsg2(sock, recvbuf, 5000, &mc); + + if (prev_pktseq != 0 && (mc.pktseq - prev_pktseq > 1)){WARN_MSG("Packet lost");} + prev_pktseq = mc.pktseq; + + setBlocking(blockState); + if (receivedBytes == -1){ + ERROR_MSG("Unable to receive data over socket: %s", srt_getlasterror_str()); + if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} + return ""; + } + + srt_bstats(sock, &performanceMonitor, false); + return std::string(recvbuf, receivedBytes); + } + + void SRTConnection::connect(const std::string &_host, int _port, const std::string &_direction, + const std::map &_params){ + initializeEmpty(); + + direction = _direction; + + handleConnectionParameters(_host, _params); + + HIGH_MSG("Opening SRT connection %s in %s mode on %s:%d", modeName.c_str(), direction.c_str(), + _host.c_str(), _port); + + sock = srt_create_socket(); + if (sock == SRT_ERROR){ + ERROR_MSG("Error creating an SRT socket"); + return; + } + if (modeName == "rendezvous"){ + bool v = true; + srt_setsockopt(sock, 0, SRTO_RENDEZVOUS, &v, sizeof v); + } + if (preConfigureSocket() == SRT_ERROR){ + ERROR_MSG("Error configuring SRT socket"); + return; + } + + if (modeName == "caller"){ + if (outgoing_port){setupAdapter("", outgoing_port);} + + sockaddr_in sa = createInetAddr(_host, _port); + sockaddr *psa = (sockaddr *)&sa; + + HIGH_MSG("Going to connect sock %d", sock); + if (srt_connect(sock, psa, sizeof sa) == SRT_ERROR){ + srt_close(sock); + ERROR_MSG("Can't connect SRT Socket"); + return; + } + HIGH_MSG("Connected sock %d", sock); + + if (postConfigureSocket() == SRT_ERROR){ + ERROR_MSG("Error during postconfigure socket"); + return; + } + INFO_MSG("Caller SRT socket %" PRId32 " success targetting %s:%u", sock, _host.c_str(), _port); + return; + } + if (modeName == "listener"){ + HIGH_MSG("Going to bind a server on %s:%u", _host.c_str(), _port); + + sockaddr_in sa = createInetAddr(_host, _port); + sockaddr *psa = (sockaddr *)&sa; + + if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){ + srt_close(sock); + ERROR_MSG("Can't connect SRT Socket"); + return; + } + if (srt_listen(sock, 1) == SRT_ERROR){ + srt_close(sock); + ERROR_MSG("Can not listen on Socket"); + } + INFO_MSG("Listener SRT socket sucess @ %s:%u", _host.c_str(), _port); + return; + } + if (modeName == "rendezvous"){ + int outport = (outgoing_port ? outgoing_port : _port); + HIGH_MSG("Going to bind a server on %s:%u", _host.c_str(), _port); + + sockaddr_in sa = createInetAddr(_host, outport); + sockaddr *psa = (sockaddr *)&sa; + + if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){ + srt_close(sock); + ERROR_MSG("Can't connect SRT Socket"); + return; + } + + sockaddr_in sb = createInetAddr(_host, outport); + sockaddr *psb = (sockaddr *)&sb; + + if (srt_connect(sock, psb, sizeof sb) == SRT_ERROR){ + srt_close(sock); + ERROR_MSG("Can't connect SRT Socket"); + return; + } + + if (postConfigureSocket() == SRT_ERROR){ + ERROR_MSG("Error during postconfigure socket"); + return; + } + INFO_MSG("Rendezvous SRT socket sucess @ %s:%u", _host.c_str(), _port); + return; + } + ERROR_MSG("Invalid mode parameter. Use 'client' or 'server'"); + } + + void SRTConnection::setupAdapter(const std::string &_host, int _port){ + sockaddr_in localsa = createInetAddr(_host, _port); + sockaddr *psa = (sockaddr *)&localsa; + if (srt_bind(sock, psa, sizeof localsa) == SRT_ERROR){ + ERROR_MSG("Unable to bind socket to %s:%u", _host.c_str(), _port); + } + } + + void SRTConnection::SendNow(const std::string &data){SendNow(data.data(), data.size());} + + void SRTConnection::SendNow(const char *data, size_t len){ + srt_clearlasterror(); + int res = srt_sendmsg2(sock, data, len, NULL); + + if (res == SRT_ERROR){ + ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str()); + if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} + } + srt_bstats(sock, &performanceMonitor, false); + } + + unsigned int SRTConnection::connTime(){ + srt_bstats(sock, &performanceMonitor, false); + return performanceMonitor.msTimeStamp / 1000; + } + + uint64_t SRTConnection::dataUp(){return performanceMonitor.byteSentTotal;} + + uint64_t SRTConnection::dataDown(){return performanceMonitor.byteRecvTotal;} + + uint64_t SRTConnection::packetCount(){ + return (direction == "input" ? performanceMonitor.pktRecvTotal : performanceMonitor.pktSentTotal); + } + + uint64_t SRTConnection::packetLostCount(){ + return (direction == "input" ? performanceMonitor.pktRcvLossTotal : performanceMonitor.pktSndLossTotal); + } + + uint64_t SRTConnection::packetRetransmitCount(){ + //\todo This should be updated with pktRcvRetransTotal on the retrieving end once srt has implemented this. + return (direction == "input" ? 0 : performanceMonitor.pktRetransTotal); + } + + void SRTConnection::initializeEmpty(){ + prev_pktseq = 0; + sock = SRT_INVALID_SOCK; + outgoing_port = 0; + chunkTransmitSize = 1316; + blocking = false; + } + + void SRTConnection::setBlocking(bool _blocking){ + if (_blocking == blocking){return;} + // If we have an error setting the new blocking state, the state is unchanged so we return early. + if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &_blocking, + sizeof _blocking) == -1){ + return; + } + blocking = _blocking; + } + + bool SRTConnection::isBlocking(){return blocking;} + + void SRTConnection::handleConnectionParameters(const std::string &_host, + const std::map &_params){ + params = _params; + DONTEVEN_MSG("SRT Received parameters: "); + for (std::map::const_iterator it = params.begin(); it != params.end(); it++){ + DONTEVEN_MSG(" %s: %s", it->first.c_str(), it->second.c_str()); + } + + adapter = (params.count("adapter") ? params.at("adapter") : ""); + + modeName = interpretSRTMode((params.count("mode") ? params.at("mode") : "default"), _host, adapter); + if (modeName == ""){ + ERROR_MSG("Invalid SRT mode encountered"); + return; + } + + // Using strtol because the original code uses base 0 -> automatic detection of octal and hexadecimal systems. + timeout = (params.count("timeout") ? strtol(params.at("timeout").c_str(), 0, 0) : 0); + + if (adapter == "" && modeName == "listener"){adapter = _host;} + + tsbpdMode = ((params.count("tsbpd") && isFalseString(params.at("tsbpd"))) ? false : true); + + outgoing_port = (params.count("port") ? strtol(params.at("port").c_str(), 0, 0) : 0); + + if ((!params.count("transtype") || params.at("transtype") != "file") && chunkTransmitSize > SRT_LIVE_DEF_PLSIZE){ + if (chunkTransmitSize > SRT_LIVE_MAX_PLSIZE){ + ERROR_MSG("Chunk size in live mode exceeds 1456 bytes!"); + return; + } + } + params["payloadsize"] = asString(chunkTransmitSize); + } + + int SRTConnection::preConfigureSocket(){ + bool no = false; + if (!tsbpdMode){ + if (srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no) == -1){return -1;} + } + if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no) == -1){return -1;} + + if (params.count("linger")){ + linger lin; + lin.l_linger = atoi(params.at("linger").c_str()); + lin.l_onoff = lin.l_linger > 0 ? 1 : 0; + srt_setsockopt(sock, 0, SRTO_LINGER, &lin, sizeof(linger)); + } + + std::string errMsg = configureSocketLoop(SRT::SockOpt::PRE); + if (errMsg.size()){ + WARN_MSG("Failed to set the following options: %s", errMsg.c_str()); + return SRT_ERROR; + } + + if (direction == "output"){ + int v = 1; + if (srt_setsockopt(sock, 0, SRTO_SENDER, &v, sizeof v) == SRT_ERROR){return SRT_ERROR;} + } + + return 0; + } + + int SRTConnection::postConfigureSocket(){ + bool no = false; + if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &no, sizeof no) == -1){ + return -1; + } + if (timeout){ + if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDTIMEO : SRTO_RCVTIMEO), &timeout, + sizeof timeout) == -1){ + return -1; + } + } + std::string errMsg = configureSocketLoop(SRT::SockOpt::POST); + if (errMsg.size()){ + WARN_MSG("Failed to set the following options: %s", errMsg.c_str()); + return SRT_ERROR; + } + return 0; + } + + std::string SRTConnection::configureSocketLoop(SRT::SockOpt::Binding _binding){ + std::string errMsg; + + std::vector allSrtOptions = srtOptions(); + for (std::vector::iterator it = allSrtOptions.begin(); it != allSrtOptions.end(); it++){ + if (it->binding == _binding && params.count(it->name)){ + std::string value = params.at(it->name); + if (!it->apply(sock, value)){errMsg += it->name + " ";} + } + } + return errMsg; + } + + void SRTConnection::close(){ + if (sock != -1){ + srt_close(sock); + sock = -1; + } + } + + SRTServer::SRTServer(){} + + SRTServer::SRTServer(int fromSock){conn = SRTConnection(fromSock);} + + SRTServer::SRTServer(int port, std::string hostname, bool nonblock, const std::string &_direction){ + // We always create a server as listening + std::map listenMode; + listenMode["mode"] = "listener"; + if (hostname == ""){hostname = "0.0.0.0";} + conn.connect(hostname, port, _direction, listenMode); + conn.setBlocking(true); + if (!conn){ + ERROR_MSG("Unable to create socket"); + return; + } + } + + SRTConnection SRTServer::accept(bool nonblock, const std::string &direction){ + if (!conn){return SRTConnection();} + struct sockaddr_in6 tmpaddr; + int len = sizeof(tmpaddr); + + SRTConnection r(srt_accept(conn.getSocket(), (sockaddr *)&tmpaddr, &len)); + if (!r){ + if (conn.getSocket() != -1 && srt_getlasterror(0) != SRT_EASYNCRCV){ + FAIL_MSG("Error during accept: %s. Closing server socket %d.", srt_getlasterror_str(), conn.getSocket()); + close(); + } + return r; + } + + r.direction = direction; + r.postConfigureSocket(); + r.setBlocking(!nonblock); + static char addrconv[INET6_ADDRSTRLEN]; + + r.remoteaddr = tmpaddr; + if (tmpaddr.sin6_family == AF_INET6){ + r.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); + HIGH_MSG("IPv6 addr [%s]", r.remotehost.c_str()); + } + if (tmpaddr.sin6_family == AF_INET){ + r.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN); + HIGH_MSG("IPv4 addr [%s]", r.remotehost.c_str()); + } + INFO_MSG("Accepted a socket coming from %s", r.remotehost.c_str()); + return r; + } + + void SRTServer::setBlocking(bool blocking){conn.setBlocking(blocking);} + + bool SRTServer::isBlocking(){return (conn ? conn.isBlocking() : false);} + + void SRTServer::close(){conn.close();} + + bool SRTServer::connected() const{return conn.connected();} + + int SRTServer::getSocket(){return conn.getSocket();} + + inline int SocketOption::setSo(int socket, int proto, int sym, const void *data, size_t size, bool isSrtOpt){ + if (isSrtOpt){return srt_setsockopt(socket, 0, SRT_SOCKOPT(sym), data, (int)size);} + return ::setsockopt(socket, proto, sym, (const char *)data, (int)size); + } + + bool SocketOption::extract(const std::string &v, OptionValue &val, SRT::SockOpt::Type asType){ + switch (asType){ + case SRT::SockOpt::STRING: + val.s = v; + val.value = val.s.data(); + val.size = val.s.size(); + break; + case SRT::SockOpt::INT: + case SRT::SockOpt::INT64:{ + int64_t tmp = strtol(v.c_str(), 0, 0); + if (tmp == 0 && (!v.size() || v[0] != '0')){return false;} + if (asType == SRT::SockOpt::INT){ + val.i = tmp; + val.value = &val.i; + val.size = sizeof(val.i); + }else{ + val.l = tmp; + val.value = &val.l; + val.size = sizeof(val.l); + } + }break; + case SRT::SockOpt::BOOL:{ + bool tmp; + if (isFalseString(v)){ + tmp = true; + }else if (isTrueString(v)){ + tmp = true; + }else{ + return false; + } + val.b = tmp; + val.value = &val.b; + val.size = sizeof val.b; + }break; + case SRT::SockOpt::ENUM:{ + // Search value in the map. If found, set to o. + SockOptVals::const_iterator p = valmap.find(v); + if (p != valmap.end()){ + val.i = p->second; + val.value = &val.i; + val.size = sizeof val.i; + return true; + } + // Fallback: try interpreting it as integer. + return extract(v, val, SRT::SockOpt::INT); + } + } + + return true; + } + + bool SocketOption::apply(int socket, const std::string &value, bool isSrtOpt){ + OptionValue o; + int result = -1; + if (extract(value, o, type)){ + result = setSo(socket, protocol, symbol, o.value, o.size, isSrtOpt); + } + return result != -1; + } + + const std::map enummap_transtype; + + std::vector srtOptions(){ + + static std::map enummap_transtype; + if (!enummap_transtype.size()){ + enummap_transtype["live"] = SRTT_LIVE; + enummap_transtype["file"] = SRTT_FILE; + } + + static std::vector res; + if (res.size()){return res;} + res.push_back(SocketOption("transtype", 0, SRTO_TRANSTYPE, SRT::SockOpt::PRE, + SRT::SockOpt::ENUM, enummap_transtype)); + res.push_back(SocketOption("maxbw", 0, SRTO_MAXBW, SRT::SockOpt::PRE, SRT::SockOpt::INT64)); + res.push_back(SocketOption("pbkeylen", 0, SRTO_PBKEYLEN, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("passphrase", 0, SRTO_PASSPHRASE, SRT::SockOpt::PRE, SRT::SockOpt::STRING)); + + res.push_back(SocketOption("mss", 0, SRTO_MSS, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("fc", 0, SRTO_FC, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("sndbuf", 0, SRTO_SNDBUF, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("rcvbuf", 0, SRTO_RCVBUF, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + // linger option is handled outside of the common loop, therefore commented out. + // res.push_back(SocketOption( "linger", 0, SRTO_LINGER, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("ipttl", 0, SRTO_IPTTL, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("iptos", 0, SRTO_IPTOS, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("inputbw", 0, SRTO_INPUTBW, SRT::SockOpt::POST, SRT::SockOpt::INT64)); + res.push_back(SocketOption("oheadbw", 0, SRTO_OHEADBW, SRT::SockOpt::POST, SRT::SockOpt::INT)); + res.push_back(SocketOption("latency", 0, SRTO_LATENCY, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("tsbpdmode", 0, SRTO_TSBPDMODE, SRT::SockOpt::PRE, SRT::SockOpt::BOOL)); + res.push_back(SocketOption("tlpktdrop", 0, SRTO_TLPKTDROP, SRT::SockOpt::PRE, SRT::SockOpt::BOOL)); + res.push_back(SocketOption("snddropdelay", 0, SRTO_SNDDROPDELAY, SRT::SockOpt::POST, SRT::SockOpt::INT)); + res.push_back(SocketOption("nakreport", 0, SRTO_NAKREPORT, SRT::SockOpt::PRE, SRT::SockOpt::BOOL)); + res.push_back(SocketOption("conntimeo", 0, SRTO_CONNTIMEO, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("lossmaxttl", 0, SRTO_LOSSMAXTTL, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("rcvlatency", 0, SRTO_RCVLATENCY, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("peerlatency", 0, SRTO_PEERLATENCY, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("minversion", 0, SRTO_MINVERSION, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("streamid", 0, SRTO_STREAMID, SRT::SockOpt::PRE, SRT::SockOpt::STRING)); + res.push_back(SocketOption("congestion", 0, SRTO_CONGESTION, SRT::SockOpt::PRE, SRT::SockOpt::STRING)); + res.push_back(SocketOption("messageapi", 0, SRTO_MESSAGEAPI, SRT::SockOpt::PRE, SRT::SockOpt::BOOL)); + // res.push_back(SocketOption("payloadsize", 0, SRTO_PAYLOADSIZE, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("kmrefreshrate", 0, SRTO_KMREFRESHRATE, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("kmpreannounce", 0, SRTO_KMPREANNOUNCE, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("enforcedencryption", 0, SRTO_ENFORCEDENCRYPTION, SRT::SockOpt::PRE, + SRT::SockOpt::BOOL)); + res.push_back(SocketOption("peeridletimeo", 0, SRTO_PEERIDLETIMEO, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + res.push_back(SocketOption("packetfilter", 0, SRTO_PACKETFILTER, SRT::SockOpt::PRE, SRT::SockOpt::STRING)); + // res.push_back(SocketOption( "groupconnect", 0, SRTO_GROUPCONNECT, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + // res.push_back(SocketOption( "groupstabtimeo", 0, SRTO_GROUPSTABTIMEO, SRT::SockOpt::PRE, SRT::SockOpt::INT)); + return res; + } +}// namespace Socket diff --git a/lib/socket_srt.h b/lib/socket_srt.h new file mode 100644 index 00000000..7995d938 --- /dev/null +++ b/lib/socket_srt.h @@ -0,0 +1,154 @@ +#pragma once + +#include "socket.h" +#include "url.h" + +#include +#include + +#include + +typedef std::map SockOptVals; +typedef std::map paramList; + +namespace Socket{ + std::string interpretSRTMode(const HTTP::URL &u); + + inline bool isFalseString(const std::string &_val){ + return _val == "0" || _val == "no" || _val == "off" || _val == "false"; + } + + inline bool isTrueString(const std::string &_val){ + return _val == "1" || _val == "yes" || _val == "on" || _val == "true"; + } + + sockaddr_in createInetAddr(const std::string &_host, int _port); + + namespace SRT{ + extern bool isInited; + bool libraryInit(); + bool libraryCleanup(); + + // By absence of class enum (c++11), moved enums to a separate namespace + namespace SockOpt{ + enum Type{STRING = 0, INT, INT64, BOOL, ENUM}; + enum Binding{PRE = 0, POST}; + }// namespace SockOpt + }// namespace SRT + + class SRTConnection{ + public: + SRTConnection(); + SRTConnection(SRTSOCKET alreadyConnected){sock = alreadyConnected;} + SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input", + const paramList &_params = paramList()); + + void connect(const std::string &_host, int _port, const std::string &_direction = "input", + const paramList &_params = paramList()); + void close(); + bool connected() const{return sock != -1;} + operator bool() const{return connected();} + + void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). + bool isBlocking(); ///< Check if this socket is blocking (true) or nonblocking (false). + + std::string RecvNow(); + void SendNow(const std::string &data); + void SendNow(const char *data, size_t len); + + SRTSOCKET getSocket(){return sock;} + + int postConfigureSocket(); + + std::string getStreamName(); + + unsigned int connTime(); + uint64_t dataUp(); + uint64_t dataDown(); + uint64_t packetCount(); + uint64_t packetLostCount(); + uint64_t packetRetransmitCount(); + + std::string direction; + + struct sockaddr_in6 remoteaddr; + std::string remotehost; + + private: + SRTSOCKET sock; + CBytePerfMon performanceMonitor; + + std::string host; + int outgoing_port; + int32_t prev_pktseq; + + uint32_t chunkTransmitSize; + + // From paramaeter parsing + std::string adapter; + std::string modeName; + int timeout; + bool tsbpdMode; + paramList params; + + void initializeEmpty(); + void handleConnectionParameters(const std::string &_host, const paramList &_params); + int preConfigureSocket(); + std::string configureSocketLoop(SRT::SockOpt::Binding _binding); + void setupAdapter(const std::string &_host, int _port); + + bool blocking; + }; + + /// This class is for easily setting up listening socket, either TCP or Unix. + class SRTServer{ + public: + SRTServer(); + SRTServer(int existingSock); + SRTServer(int port, std::string hostname, bool nonblock = false, const std::string &_direction = "input"); + SRTConnection accept(bool nonblock = false, const std::string &direction = "input"); + void setBlocking(bool blocking); + bool connected() const; + bool isBlocking(); + void close(); + int getSocket(); + + private: + SRTConnection conn; + std::string direction; + }; + + struct OptionValue{ + std::string s; + int i; + int64_t l; + bool b; + + const void *value; + size_t size; + }; + + class SocketOption{ + public: + //{"enforcedencryption", 0, SRTO_ENFORCEDENCRYPTION, SRT::SockOpt::PRE, SRT::SockOpt::BOOL, nullptr}, + SocketOption(const std::string &_name, int _protocol, int _symbol, SRT::SockOpt::Binding _binding, + SRT::SockOpt::Type _type, const SockOptVals &_values = SockOptVals()) + : name(_name), protocol(_protocol), symbol(_symbol), binding(_binding), type(_type), + valmap(_values){}; + + std::string name; + int protocol; + int symbol; + SRT::SockOpt::Binding binding; + SRT::SockOpt::Type type; + SockOptVals valmap; + + bool apply(int socket, const std::string &value, bool isSrtOpt = true); + + static int setSo(int socket, int protocol, int symbol, const void *data, size_t size, bool isSrtOpt = true); + + bool extract(const std::string &v, OptionValue &val, SRT::SockOpt::Type asType); + }; + + std::vector srtOptions(); +}// namespace Socket diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index 506c0613..90d45957 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -1015,7 +1015,8 @@ namespace TS{ ((adtsInfo[it->first].getFrequencyIndex() & 0x0E) >> 1); init[1] = ((adtsInfo[it->first].getFrequencyIndex() & 0x01) << 7) | ((adtsInfo[it->first].getChannelConfig() & 0x0F) << 3); - + // Wait with adding the track until we have init data + if (init[0] == 0 && init[1] == 0){addNewTrack = false;} type = "audio"; codec = "AAC"; size = 16; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 23f59f8d..f9f579db 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -32,6 +32,9 @@ #define STAT_CLI_BPS_UP 256 #define STAT_CLI_CRC 512 #define STAT_CLI_SESSID 1024 +#define STAT_CLI_PKTCOUNT 2048 +#define STAT_CLI_PKTLOST 4096 +#define STAT_CLI_PKTRETRANSMIT 8192 #define STAT_CLI_ALL 0xFFFF // These are used to store "totals" field requests in a bitfield for speedup. #define STAT_TOT_CLIENTS 1 @@ -713,6 +716,9 @@ void Controller::statSession::wipeOld(uint64_t cutOff){ if (it->log.size() == 1){ wipedDown += it->log.begin()->second.down; wipedUp += it->log.begin()->second.up; + wipedPktCount += it->log.begin()->second.pktCount; + wipedPktLost += it->log.begin()->second.pktCount; + wipedPktRetransmit += it->log.begin()->second.pktRetransmit; } it->log.erase(it->log.begin()); } @@ -800,6 +806,9 @@ void Controller::statSession::dropSession(const Controller::sessIndex &index){ lastSec = 0; wipedUp = 0; wipedDown = 0; + wipedPktCount = 0; + wipedPktLost = 0; + wipedPktRetransmit = 0; oldConns.clear(); sessionType = SESS_UNSET; } @@ -819,6 +828,9 @@ Controller::statSession::statSession(){ sync = 1; wipedUp = 0; wipedDown = 0; + wipedPktCount = 0; + wipedPktLost = 0; + wipedPktRetransmit = 0; sessionType = SESS_UNSET; noBWCount = 0; } @@ -1014,6 +1026,97 @@ uint64_t Controller::statSession::getUp(){ return retVal; } +uint64_t Controller::statSession::getPktCount(uint64_t t){ + uint64_t retVal = wipedPktCount; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktCount;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktCount;} + } + } + return retVal; +} + +/// Returns the cumulative uploaded bytes for this session at timestamp t. +uint64_t Controller::statSession::getPktCount(){ + uint64_t retVal = wipedPktCount; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){retVal += it->log.rbegin()->second.pktCount;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktCount;} + } + } + return retVal; +} +uint64_t Controller::statSession::getPktLost(uint64_t t){ + uint64_t retVal = wipedPktLost; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktLost;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktLost;} + } + } + return retVal; +} + +/// Returns the cumulative uploaded bytes for this session at timestamp t. +uint64_t Controller::statSession::getPktLost(){ + uint64_t retVal = wipedPktLost; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){retVal += it->log.rbegin()->second.pktLost;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktLost;} + } + } + return retVal; +} +uint64_t Controller::statSession::getPktRetransmit(uint64_t t){ + uint64_t retVal = wipedPktRetransmit; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktRetransmit;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktRetransmit;} + } + } + return retVal; +} + +/// Returns the cumulative uploaded bytes for this session at timestamp t. +uint64_t Controller::statSession::getPktRetransmit(){ + uint64_t retVal = wipedPktRetransmit; + if (oldConns.size()){ + for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ + if (it->log.size()){retVal += it->log.rbegin()->second.pktRetransmit;} + } + } + if (curConns.size()){ + for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ + if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktRetransmit;} + } + } + return retVal; +} + /// Returns the cumulative downloaded bytes per second for this session at timestamp t. uint64_t Controller::statSession::getBpsDown(uint64_t t){ uint64_t aTime = t - 5; @@ -1048,6 +1151,9 @@ Controller::statLog &Controller::statStorage::getDataFor(unsigned long long t){ empty.lastSecond = 0; empty.down = 0; empty.up = 0; + empty.pktCount = 0; + empty.pktLost = 0; + empty.pktRetransmit = 0; return empty; } std::map::iterator it = log.upper_bound(t); @@ -1063,6 +1169,9 @@ void Controller::statStorage::update(Comms::Statistics &statComm, size_t index){ tmp.lastSecond = statComm.getLastSecond(index); tmp.down = statComm.getDown(index); tmp.up = statComm.getUp(index); + tmp.pktCount = statComm.getPacketCount(index); + tmp.pktLost = statComm.getPacketLostCount(index); + tmp.pktRetransmit = statComm.getPacketRetransmitCount(index); log[statComm.getNow(index)] = tmp; // wipe data older than approx. STAT_CUTOFF seconds /// \todo Remove least interesting data first. @@ -1132,7 +1241,7 @@ bool Controller::hasViewers(std::string streamName){ /// //array of protocols to accumulate. Empty means all. /// "protocols": ["HLS", "HSS"], /// //list of requested data fields. Empty means all. -/// "fields": ["host", "stream", "protocol", "conntime", "position", "down", "up", "downbps", "upbps"], +/// "fields": ["host", "stream", "protocol", "conntime", "position", "down", "up", "downbps", "upbps","pktcount","pktlost","pktretransmit"], /// //unix timestamp of measuring moment. Negative means X seconds ago. Empty means now. /// "time": 1234567 ///} @@ -1186,6 +1295,9 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ if ((*it).asStringRef() == "downbps"){fields |= STAT_CLI_BPS_DOWN;} if ((*it).asStringRef() == "upbps"){fields |= STAT_CLI_BPS_UP;} if ((*it).asStringRef() == "sessid"){fields |= STAT_CLI_SESSID;} + if ((*it).asStringRef() == "pktcount"){fields |= STAT_CLI_PKTCOUNT;} + if ((*it).asStringRef() == "pktlost"){fields |= STAT_CLI_PKTLOST;} + if ((*it).asStringRef() == "pktretransmit"){fields |= STAT_CLI_PKTRETRANSMIT;} } } // select all, if none selected @@ -1213,6 +1325,9 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ if (fields & STAT_CLI_BPS_UP){rep["fields"].append("upbps");} if (fields & STAT_CLI_CRC){rep["fields"].append("crc");} if (fields & STAT_CLI_SESSID){rep["fields"].append("sessid");} + if (fields & STAT_CLI_PKTCOUNT){rep["fields"].append("pktcount");} + if (fields & STAT_CLI_PKTLOST){rep["fields"].append("pktlost");} + if (fields & STAT_CLI_PKTRETRANSMIT){rep["fields"].append("pktretransmit");} // output the data itself rep["data"].null(); // loop over all sessions @@ -1237,6 +1352,9 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));} if (fields & STAT_CLI_CRC){d.append(it->first.crc);} if (fields & STAT_CLI_SESSID){d.append(it->first.ID);} + if (fields & STAT_CLI_PKTCOUNT){d.append(it->second.getPktCount(time));} + if (fields & STAT_CLI_PKTLOST){d.append(it->second.getPktLost(time));} + if (fields & STAT_CLI_PKTRETRANSMIT){d.append(it->second.getPktRetransmit(time));} rep["data"].append(d); } } diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index eb224e82..0af4e1b6 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -32,6 +32,9 @@ namespace Controller{ uint64_t lastSecond; uint64_t down; uint64_t up; + uint64_t pktCount; + uint64_t pktLost; + uint64_t pktRetransmit; }; enum sessType{SESS_UNSET = 0, SESS_INPUT, SESS_OUTPUT, SESS_VIEWER}; @@ -74,6 +77,9 @@ namespace Controller{ uint64_t lastSec; uint64_t wipedUp; uint64_t wipedDown; + uint64_t wipedPktCount; + uint64_t wipedPktLost; + uint64_t wipedPktRetransmit; std::deque oldConns; sessType sessionType; bool tracked; @@ -104,6 +110,12 @@ namespace Controller{ uint64_t getUp(); uint64_t getDown(); uint64_t getUp(uint64_t time); + uint64_t getPktCount(); + uint64_t getPktCount(uint64_t time); + uint64_t getPktLost(); + uint64_t getPktLost(uint64_t time); + uint64_t getPktRetransmit(); + uint64_t getPktRetransmit(uint64_t time); uint64_t getBpsDown(uint64_t time); uint64_t getBpsUp(uint64_t time); uint64_t getBpsDown(uint64_t start, uint64_t end); diff --git a/src/input/input.cpp b/src/input/input.cpp index ce50bc78..28055928 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -679,12 +679,16 @@ namespace Mist{ // if not shutting down, wait 1 second before looping if (config->is_active){Util::wait(INPUT_USER_INTERVAL);} } - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} - config->is_active = false; + if (!isThread()){ + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} + config->is_active = false; + } finish(); INFO_MSG("Input closing clean, reason: %s", Util::exitReason); userSelect.clear(); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_OFF;} + if (!isThread()){ + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_OFF;} + } } /// This function checks if an input in serve mode should keep running or not. @@ -729,7 +733,6 @@ namespace Mist{ /// - if there are tracks, register as a non-viewer on the user page of the buffer /// - call getNext() in a loop, buffering packets void Input::stream(){ - std::map overrides; overrides["throughboot"] = ""; if (config->getBool("realtime") || @@ -895,6 +898,7 @@ namespace Mist{ statComm.setTime(now - startTime); statComm.setLastSecond(0); statComm.setHost(getConnectedBinHost()); + handleLossyStats(statComm); } statTimer = Util::bootSecs(); diff --git a/src/input/input.h b/src/input/input.h index e0957fb8..d8aa7928 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -41,6 +41,7 @@ namespace Mist{ virtual bool readHeader(); virtual bool needHeader(){return !readExistingHeader();} virtual bool preRun(){return true;} + virtual bool isThread(){return false;} virtual bool isSingular(){return !config->getBool("realtime");} virtual bool readExistingHeader(); virtual bool atKeyFrame(); @@ -69,6 +70,10 @@ namespace Mist{ virtual void userOnDisconnect(size_t id); virtual void userLeadOut(); + virtual void handleLossyStats(Comms::Statistics & statComm){} + + virtual bool preventBufferStart() {return false;} + virtual void parseHeader(); bool bufferFrame(size_t track, uint32_t keyNum); diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp new file mode 100644 index 00000000..ab42c036 --- /dev/null +++ b/src/input/input_tssrt.cpp @@ -0,0 +1,217 @@ +#include "input_tssrt.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +Util::Config *cfgPointer = NULL; +std::string baseStreamName; + +/// Global, so that all tracks stay in sync +int64_t timeStampOffset = 0; + +// We use threads here for multiple input pushes, because of the internals of the SRT Library +static void callThreadCallbackSRT(void *socknum){ + SRTSOCKET sock = *((SRTSOCKET *)socknum); + // use the accepted socket as the second parameter + Mist::inputTSSRT inp(cfgPointer, sock); + inp.setSingular(false); + inp.run(); +} + +namespace Mist{ + /// Constructor of TS Input + /// \arg cfg Util::Config that contains all current configurations. + inputTSSRT::inputTSSRT(Util::Config *cfg, SRTSOCKET s) : Input(cfg){ + capa["name"] = "TSSRT"; + capa["desc"] = "This input allows for processing MPEG2-TS-based SRT streams. Use mode=listener " + "for push input."; + capa["source_match"].append("srt://*"); + // These can/may be set to always-on mode + capa["always_match"].append("srt://*"); + capa["incoming_push_url"] = "srt://$host:$port"; + capa["incoming_push_url_match"] = "srt://*"; + capa["priority"] = 9; + capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][0u].append("HEVC"); + capa["codecs"][0u][0u].append("MPEG2"); + capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("AC3"); + capa["codecs"][0u][1u].append("MP2"); + + JSON::Value option; + option["arg"] = "integer"; + option["long"] = "buffer"; + option["short"] = "b"; + option["help"] = "DVR buffer time in ms"; + option["value"].append(50000); + config->addOption("bufferTime", option); + capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; + capa["optional"]["DVR"]["help"] = + "The target available buffer time for this live stream, in milliseconds. This is the time " + "available to seek around in, and will automatically be extended to fit whole keyframes as " + "well as the minimum duration needed for stable playback."; + capa["optional"]["DVR"]["option"] = "--buffer"; + capa["optional"]["DVR"]["type"] = "uint"; + capa["optional"]["DVR"]["default"] = 50000; + + // Setup if we are called form with a thread for push-based input. + if (s != -1){ + srtConn = Socket::SRTConnection(s); + streamName = baseStreamName; + if (srtConn.getStreamName() != ""){streamName += "+" + srtConn.getStreamName();} + } + lastTimeStamp = 0; + singularFlag = true; + } + + inputTSSRT::~inputTSSRT(){} + + bool inputTSSRT::checkArguments(){return true;} + + /// Live Setup of SRT Input. Runs only if we are the "main" thread + bool inputTSSRT::preRun(){ + if (srtConn.getSocket() == -1){ + std::string source = config->getString("input"); + standAlone = false; + HTTP::URL u(source); + INFO_MSG("Parsed url: %s", u.getUrl().c_str()); + if (Socket::interpretSRTMode(u) == "listener"){ + sSock = Socket::SRTServer(u.getPort(), u.host, false); + config->registerSRTSockPtr(&sSock); + }else{ + INFO_MSG("A"); + std::map arguments; + HTTP::parseVars(u.args, arguments); + size_t connectCnt = 0; + do{ + srtConn.connect(u.host, u.getPort(), "input", arguments); + if (!srtConn){Util::sleep(1000);} + ++connectCnt; + }while (!srtConn && connectCnt < 10); + if (!srtConn){WARN_MSG("Connecting to %s timed out", u.getUrl().c_str());} + } + } + return true; + } + + // Retrieve the next packet to be played from the srt connection. + void inputTSSRT::getNext(size_t idx){ + thisPacket.null(); + bool hasPacket = tsStream.hasPacket(); + bool firstloop = true; + while (!hasPacket && srtConn.connected() && config->is_active){ + firstloop = false; + // Receive data from the socket. SRT Sockets handle some internal timing as well, based on the provided settings. + leftBuffer.append(srtConn.RecvNow()); + if (leftBuffer.size()){ + size_t offset = 0; + size_t garbage = 0; + while ((offset + 188) < leftBuffer.size()){ + if (leftBuffer[offset] != 0x47){ + ++garbage; + if (garbage % 100 == 0){INFO_MSG("Accumulated %zu bytes of garbage", garbage);} + ++offset; + continue; + } + if (garbage != 0){ + WARN_MSG("Thrown away %zu bytes of garbage data", garbage); + garbage = 0; + } + if (offset + 188 <= leftBuffer.size()){ + tsBuf.FromPointer(leftBuffer.data() + offset); + tsStream.parse(tsBuf, 0); + offset += 188; + } + } + leftBuffer.erase(0, offset); + hasPacket = tsStream.hasPacket(); + }else if (srtConn.connected()){ + // This should not happen as the SRT socket is read blocking and won't return until there is + // data. But if it does, wait before retry + Util::sleep(10); + } + } + if (hasPacket){tsStream.getEarliestPacket(thisPacket);} + + if (!thisPacket){ + INFO_MSG("Could not getNext TS packet!"); + return; + } + + tsStream.initializeMetadata(meta); + size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); + if (thisIdx == INVALID_TRACK_ID){getNext(idx);} + + uint64_t adjustTime = thisPacket.getTime() + timeStampOffset; + if (lastTimeStamp || timeStampOffset){ + if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){ + INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.", + PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime)); + timeStampOffset += (lastTimeStamp - adjustTime); + adjustTime = thisPacket.getTime() + timeStampOffset; + } + } + lastTimeStamp = adjustTime; + thisPacket.setTime(adjustTime); + } + + bool inputTSSRT::openStreamSource(){return true;} + + void inputTSSRT::parseStreamHeader(){ + // Placeholder empty track to force normal code to continue despite no tracks available + tmpIdx = meta.addTrack(0, 0, 0, 0); + } + + void inputTSSRT::streamMainLoop(){ + meta.removeTrack(tmpIdx); + // If we do not have a srtConn here, we are the main thread and should start accepting pushes. + if (srtConn.getSocket() == -1){ + cfgPointer = config; + baseStreamName = streamName; + while (config->is_active && sSock.connected()){ + Socket::SRTConnection S = sSock.accept(); + if (S.connected()){// check if the new connection is valid + SRTSOCKET sock = S.getSocket(); + // spawn a new thread for this connection + tthread::thread T(callThreadCallbackSRT, (void *)&sock); + // detach it, no need to keep track of it anymore + T.detach(); + HIGH_MSG("Spawned new thread for socket %i", S.getSocket()); + } + } + return; + } + // If we are here: we have a proper connection (either accepted or pull input) and should start parsing it as such + Input::streamMainLoop(); + } + + bool inputTSSRT::needsLock(){return false;} + + void inputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;} + + void inputTSSRT::handleLossyStats(Comms::Statistics &statComm){ + statComm.setPacketCount(srtConn.packetCount()); + statComm.setPacketLostCount(srtConn.packetLostCount()); + statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); + } + +}// namespace Mist diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h new file mode 100644 index 00000000..5af13ba7 --- /dev/null +++ b/src/input/input_tssrt.h @@ -0,0 +1,48 @@ +#include "input.h" +#include +#include +#include +#include +#include +#include +#include + +namespace Mist{ + /// This class contains all functions needed to implement TS Input + class inputTSSRT : public Input{ + public: + inputTSSRT(Util::Config *cfg, SRTSOCKET s = -1); + ~inputTSSRT(); + void setSingular(bool newSingular); + virtual bool needsLock(); + + protected: + // Private Functions + bool checkArguments(); + bool preRun(); + virtual void getNext(size_t idx = INVALID_TRACK_ID); + virtual bool needHeader(){return false;} + virtual bool preventBufferStart(){return srtConn.getSocket() == -1;} + virtual bool isSingular(){return singularFlag;} + virtual bool isThread(){return !singularFlag;} + + bool openStreamSource(); + void parseStreamHeader(); + void streamMainLoop(); + TS::Stream tsStream; ///< Used for parsing the incoming ts stream + TS::Packet tsBuf; + std::string leftBuffer; + uint64_t lastTimeStamp; + + Socket::SRTServer sSock; + Socket::SRTConnection srtConn; + bool singularFlag; + size_t tmpIdx; + virtual size_t streamByteCount(){ + return srtConn.dataDown(); + }; // For live streams: to update the stats with correct values. + virtual void handleLossyStats(Comms::Statistics &statComm); + }; +}// namespace Mist + +typedef Mist::inputTSSRT mistIn; diff --git a/src/output/mist_out_srt.cpp b/src/output/mist_out_srt.cpp new file mode 100644 index 00000000..ea73cf8d --- /dev/null +++ b/src/output/mist_out_srt.cpp @@ -0,0 +1,58 @@ +#include OUTPUTTYPE +#include +#include +#include +#include +#include + +int spawnForked(Socket::SRTConnection &S){ + int fds[2]; + pipe(fds); + Socket::Connection Sconn(fds[0], fds[1]); + + mistOut tmp(Sconn, S.getSocket()); + return tmp.run(); +} + +void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){ + HIGH_MSG("USR1 received - triggering rolling restart"); + Util::Config::is_restarting = true; + Util::logExitReason("signal USR1"); + Util::Config::is_active = false; +} + +int main(int argc, char *argv[]){ + DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN; + Util::redirectLogsIfNeeded(); + Util::Config conf(argv[0]); + mistOut::init(&conf); + if (conf.parseArgs(argc, argv)){ + if (conf.getBool("json")){ + mistOut::capa["version"] = PACKAGE_VERSION; + std::cout << mistOut::capa.toString() << std::endl; + return -1; + } + conf.activate(); + if (mistOut::listenMode()){ + { + struct sigaction new_action; + new_action.sa_sigaction = handleUSR1; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGUSR1, &new_action, NULL); + } + mistOut::listener(conf, spawnForked); + if (conf.is_restarting && Socket::checkTrueSocket(0)){ + INFO_MSG("Reloading input while re-using server socket"); + execvp(argv[0], argv); + FAIL_MSG("Error reloading: %s", strerror(errno)); + } + }else{ + Socket::Connection S(fileno(stdout), fileno(stdin)); + mistOut tmp(S, -1); + return tmp.run(); + } + } + INFO_MSG("Exit reason: %s", Util::exitReason); + return 0; +} diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp new file mode 100644 index 00000000..a18c9a4f --- /dev/null +++ b/src/output/output_tssrt.cpp @@ -0,0 +1,112 @@ +#include "mist/socket_srt.h" +#include "output_tssrt.h" +#include +#include +#include + +namespace Mist{ + OutTSSRT::OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock) : TSOutput(conn){ + // NOTE: conn is useless for SRT, as it uses a different socket type. + sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec) + streamName = config->getString("streamname"); + pushOut = false; + std::string tracks; + // Push output configuration + if (config->getString("target").size()){ + HTTP::URL target(config->getString("target")); + if (target.protocol != "srt"){ + FAIL_MSG("Target %s must begin with srt://, aborting", target.getUrl().c_str()); + onFail("Invalid srt target: doesn't start with srt://", true); + return; + } + if (!target.getPort()){ + FAIL_MSG("Target %s must contain a port, aborting", target.getUrl().c_str()); + onFail("Invalid srt target: missing port", true); + return; + } + pushOut = true; + if (targetParams.count("tracks")){tracks = targetParams["tracks"];} + size_t connectCnt = 0; + do{ + srtConn.connect(target.host, target.getPort(), "output"); + if (!srtConn){Util::sleep(1000);} + ++connectCnt; + }while (!srtConn && connectCnt < 10); + wantRequest = false; + parseData = true; + }else{ + // Pull output configuration, In this case we have an srt connection in the second constructor parameter. + srtConn = Socket::SRTConnection(_srtSock); + parseData = true; + wantRequest = false; + + // Handle override / append of streamname options + std::string sName = srtConn.getStreamName(); + if (sName != ""){ + streamName = sName; + HIGH_MSG("Requesting stream %s", streamName.c_str()); + } + } + initialize(); + } + + OutTSSRT::~OutTSSRT(){} + + void OutTSSRT::init(Util::Config *cfg){ + Output::init(cfg); + capa["name"] = "TSSRT"; + capa["friendly"] = "TS over SRT"; + capa["desc"] = "Real time streaming of TS data over SRT"; + capa["deps"] = ""; + capa["required"]["streamname"]["name"] = "Stream"; + capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add " + "this protocol multiple times using different ports, " + "or use the streamid option on the srt connection"; + capa["required"]["streamname"]["type"] = "str"; + capa["required"]["streamname"]["option"] = "--stream"; + capa["required"]["streamname"]["short"] = "s"; + capa["codecs"][0u][0u].append("HEVC"); + capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][0u].append("MPEG2"); + capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("MP3"); + capa["codecs"][0u][1u].append("AC3"); + capa["codecs"][0u][1u].append("MP2"); + cfg->addConnectorOptions(8889, capa); + config = cfg; + capa["push_urls"].append("srt://*"); + + JSON::Value opt; + opt["arg"] = "string"; + opt["default"] = ""; + opt["arg_num"] = 1; + opt["help"] = "Target srt:// URL to push out towards."; + cfg->addOption("target", opt); + } + + // Buffer internally in the class, and send once we have over 1000 bytes of data. + void OutTSSRT::sendTS(const char *tsData, size_t len){ + if (packetBuffer.size() >= 1000){ + srtConn.SendNow(packetBuffer); + if (!srtConn){ + // Allow for proper disconnect + parseData = false; + } + packetBuffer.clear(); + } + packetBuffer.append(tsData, len); + } + + bool OutTSSRT::setAlternateConnectionStats(Comms::Statistics &statComm){ + statComm.setUp(srtConn.dataUp()); + statComm.setDown(srtConn.dataDown()); + statComm.setTime(Util::bootSecs() - srtConn.connTime()); + return true; + } + + void OutTSSRT::handleLossyStats(Comms::Statistics &statComm){ + statComm.setPacketCount(srtConn.packetCount()); + statComm.setPacketLostCount(srtConn.packetLostCount()); + statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); + } +}// namespace Mist diff --git a/src/output/output_tssrt.h b/src/output/output_tssrt.h new file mode 100644 index 00000000..3f671dbf --- /dev/null +++ b/src/output/output_tssrt.h @@ -0,0 +1,33 @@ +#include "output_ts_base.h" +#include + +#include + +namespace Mist{ + class OutTSSRT : public TSOutput{ + public: + OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock); + ~OutTSSRT(); + + static bool listenMode(){return !(config->getString("target").size());} + + static void init(Util::Config *cfg); + void sendTS(const char *tsData, size_t len = 188); + bool isReadyForPlay(){return true;} + + protected: + // Stats handling + virtual bool setAlternateConnectionStats(Comms::Statistics &statComm); + virtual void handleLossyStats(Comms::Statistics &statComm); + + private: + bool pushOut; + std::string packetBuffer; + Socket::UDPConnection pushSock; + TS::Stream tsIn; + + Socket::SRTConnection srtConn; + }; +}// namespace Mist + +typedef Mist::OutTSSRT mistOut;