From 0bd5d742f6f2eb422f07f05eb3a436b16674415a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 28 Aug 2020 00:42:38 +0200 Subject: [PATCH] SRT improvements: - Made SRT support optional - Make build options visible in cmake-gui - Improved generic connection stats for outputs - Added streamid handling configuration for MistInTSSRT - Push input support over SRT - Fixed support for SRT settings in push outputs - Fix parsing of SRT-passed stream names - Fixed hostnames in MistOutTSSRT, fixed PUSH_REWRITE trigger payload - Opus support in TS-SRT - Fixed SRT socket stats, fixed SRT socket address logic, improved SRT socket rolling restart support - Fixed SRT push deny --- CMakeLists.txt | 150 +++++++++++++++-------- lib/config.cpp | 75 ------------ lib/config.h | 5 - lib/socket_srt.cpp | 149 +++++++++++++++++------ lib/socket_srt.h | 30 ++--- lib/ts_stream.cpp | 45 +++++++ lib/ts_stream.h | 9 ++ src/input/input.cpp | 11 +- src/input/input.h | 6 +- src/input/input_ts.cpp | 44 +------ src/input/input_ts.h | 2 +- src/input/input_tssrt.cpp | 116 ++++++++++++------ src/input/input_tssrt.h | 16 +-- src/output/mist_out_srt.cpp | 100 ++++++++++++--- src/output/output.cpp | 10 +- src/output/output.h | 10 +- src/output/output_httpts.cpp | 6 +- src/output/output_tssrt.cpp | 230 +++++++++++++++++++++++++++++------ src/output/output_tssrt.h | 19 +-- 19 files changed, 686 insertions(+), 347 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 154db578..3387afdb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,8 @@ set( CMAKE_EXPORT_COMPILE_COMMANDS ON ) #For YCM support include_directories(${SOURCE_DIR}) include_directories(${BINARY_DIR} ${BINARY_DIR}/generated) +option(BUILD_SHARED_LIBS "Build the libraries as shared (default = static)") + ######################################## # Testing - Enable Tests # ######################################## @@ -59,80 +61,88 @@ string(STRIP "${PACKAGE_VERSION_RAW}" PACKAGE_VERSION) set(PACKAGE_VERSION \"${PACKAGE_VERSION}\" ) ######################################## -# Build Variables - Debug # +# Build Variables - Everything else # ######################################## if (NOT DEBUG) set(DEBUG 4) endif() -######################################## -# Build Variables - Shared Memory # -######################################## -if (NOT DEFINED NOSHM ) +option(NOSHM "Disabled shared memory (falling back to shared temporary files)") +if (NOT NOSHM) add_definitions(-DSHM_ENABLED=1) +else() + message("Shared memory use is turned OFF") endif() - if (NOT DEFINED FILLER_DATA OR NOT DEFINED SHARED_SECRET OR NOT DEFINED SUPER_SECRET)#LTS message(WARNING "Not all LTS variables have been set and this is an LTS build - are you sure about this?")#LTS endif()#LTS add_definitions(-DFILLER_DATA="${FILLER_DATA}" -DSHARED_SECRET="${SHARED_SECRET}" -DSUPER_SECRET="${SUPER_SECRET}")#LTS -if (DEFINED GEOIP ) + +option(GEOIP "Enable GeoIP capabilities (deprecated)") +if (GEOIP) add_definitions(-DGEOIP=1) + message("GeoIP is turned ON") endif() -if (DEFINED BIGMETA ) - add_definitions(-DBIGMETA=1) -endif() -if (NOT DEFINED NOSSL ) + +option(NOSSL "Disable SSL/TLS support") +if (NOT NOSSL) add_definitions(-DSSL=1) +else() + message("SSL/TLS support is turned OFF") endif() + if (DEFINED DATASIZE ) add_definitions(-DSHM_DATASIZE=${DATASIZE}) endif() + if (DEFINED STAT_CUTOFF ) add_definitions(-DSTAT_CUTOFF=${STAT_CUTOFF}) endif() -if (NOT DEFINED NOUPDATE ) + +option(NOUPDATE "Disable the updater") +if (NOT NOUPDATE) add_definitions(-DUPDATER=1) endif() -if (NOT DEFINED PERPETUAL ) + +option(PERPETUAL "Disable the licensing system") +if (NOT PERPETUAL) add_definitions(-DLICENSING=1) endif() -if (DEFINED NOAUTH ) + +option(NOAUTH "Disable API authentication entirely (insecure!)") +if (NOAUTH) add_definitions(-DNOAUTH=1) endif() -if (DEFINED KILLONEXIT ) + +option(KILLONEXIT "Kill all processes on exit, ensuring nothing is running anymore (disables rolling restart/update support)") +if (KILLONEXIT) add_definitions(-DKILLONEXIT=true) endif() + if (DEFINED UDP_API_HOST ) add_definitions(-DUDP_API_HOST=${UDP_API_HOST}) endif() + if (DEFINED UDP_API_PORT ) add_definitions(-DUDP_API_PORT=${UDP_API_PORT}) endif() -if (NOT DEFINED APPNAME ) - set(APPNAME "MistServer") -endif() + +set(APPNAME "MistServer" CACHE STRING "Name of the application, as used in user agent strings and the like") add_definitions(-DAPPNAME="${APPNAME}") -######################################## -# Build Variables - Thread Names # -######################################## -if (DEFINED WITH_THREADNAMES ) +option(WITH_THREADNAMES "Enable fancy names for threads (not supported on all platforms)") +if (WITH_THREADNAMES) add_definitions(-DWITH_THREADNAMES=1) endif() -######################################## -# Build Variables - No Crash Check # -######################################## -if (DEFINED NOCRASHCHECK ) +option(NOCRASHCHECK "Disables the crash check in the controller stats and input userpages. Prevents killing processes that are stalled/stuck.") +if (NOCRASHCHECK) add_definitions(-DNOCRASHCHECK=1) endif() -######################################## -# Build Variables - Stats delay overrid# -######################################## -if (DEFINED STATS_DELAY ) + +if (DEFINED STATS_DELAY) add_definitions(-DSTATS_DELAY=${STATS_DELAY}) endif() @@ -143,6 +153,20 @@ message("Builing release ${RELEASE} for version ${PACKAGE_VERSION} @ debug level add_definitions(-g -funsigned-char -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -DDEBUG=${DEBUG} -DPACKAGE_VERSION=${PACKAGE_VERSION} -DRELEASE=${RELEASE}) add_definitions(-Wall -Wno-sign-compare -Wparentheses) +option(NOSRT "Disable building native SRT support, regardless of library being present (by default SRT is enabled if libraries are installed)") +if (NOT NOSRT) + find_library(SRT_LIB srt) + if(SRT_LIB) + add_definitions(-DWITH_SRT=1) + message("Building with SRT") + else() + message("Building without native SRT support") + endif() +else() + message("Building without native SRT support") +endif() + + ######################################## # MistLib - Header Files # ######################################## @@ -187,7 +211,6 @@ set(libHeaders lib/sdp_media.h lib/shared_memory.h lib/socket.h - lib/socket_srt.h lib/srtp.h lib/stream.h lib/stun.h @@ -208,6 +231,10 @@ set(libHeaders lib/urireader.h ) +if(SRT_LIB) + list(APPEND libHeaders lib/socket_srt.h) +endif() + ######################################## # MistLib - Build # ######################################## @@ -250,7 +277,6 @@ add_library (mist lib/sdp_media.cpp lib/shared_memory.cpp lib/socket.cpp - lib/socket_srt.cpp lib/srtp.cpp lib/stream.cpp lib/stun.cpp @@ -276,9 +302,8 @@ endif() target_link_libraries(mist -lpthread ${LIBRT} - -lsrt ) -if (NOT DEFINED NOSSL ) +if (NOT NOSSL) target_link_libraries(mist mbedtls mbedx509 mbedcrypto srtp2) endif() install( @@ -290,6 +315,16 @@ install( DESTINATION lib ) + +if(SRT_LIB) + add_library(mist_srt lib/socket_srt.h lib/socket_srt.cpp) + target_link_libraries(mist_srt mist srt) + install( + TARGETS mist_srt + DESTINATION lib + ) +endif() + ######################################## # MistLib - Local Header Install # ######################################## @@ -376,7 +411,8 @@ makeUtil(RAX rax) makeUtil(AMF amf) makeUtil(Certbot certbot) makeUtil(Nuke nuke) -if (DEFINED LOAD_BALANCE ) +option(LOAD_BALANCE "Build the load balancer") +if (LOAD_BALANCE) makeUtil(Load load) endif() #LTS_END @@ -400,6 +436,9 @@ macro(makeInput inputName format) src/io.cpp ${BINARY_DIR}/mist/.headers ) + if (";${ARGN};" MATCHES ";with_srt;") + target_link_libraries(MistIn${inputName} mist_srt ) + endif() #Set compile definitions unset(my_definitions) @@ -409,9 +448,7 @@ macro(makeInput inputName format) PROPERTIES COMPILE_DEFINITIONS "${my_definitions}" ) - target_link_libraries(MistIn${inputName} - mist - ) + target_link_libraries(MistIn${inputName} mist) install( TARGETS MistIn${inputName} DESTINATION bin @@ -422,7 +459,8 @@ makeInput(HLS hls) makeInput(DTSC dtsc) makeInput(MP3 mp3) makeInput(FLV flv) -if (DEFINED WITH_AV ) +option(WITH_AV "Build a generic libav-based input (not distributable!)") +if (WITH_AV) makeInput(AV av) target_link_libraries(MistInAV avformat avcodec avutil) endif() @@ -437,9 +475,11 @@ makeInput(Folder folder)#LTS makeInput(Playlist playlist)#LTS makeInput(Balancer balancer)#LTS makeInput(RTSP rtsp)#LTS - makeInput(SRT srt)#LTS -makeInput(TSSRT tssrt)#LTS + +if(SRT_LIB) + makeInput(TSSRT tssrt with_srt)#LTS +endif() ######################################## # MistServer - Outputs # @@ -454,7 +494,7 @@ macro(makeOutput outputName format) SET(tsBaseClass HTTPOutput) endif() endif() - if (";${ARGN};" MATCHES ";srt;") + if (";${ARGN};" MATCHES ";with_srt;") SET(outBaseFile src/output/mist_out_srt.cpp) endif() if (";${ARGN};" MATCHES ";ts;") @@ -476,9 +516,10 @@ macro(makeOutput outputName format) set_target_properties(MistOut${outputName} PROPERTIES COMPILE_DEFINITIONS "OUTPUTTYPE=\"output_${format}.h\";TS_BASECLASS=${tsBaseClass}" ) - target_link_libraries(MistOut${outputName} - mist - ) + if (";${ARGN};" MATCHES ";with_srt;") + target_link_libraries(MistOut${outputName} mist_srt) + endif() + target_link_libraries(MistOut${outputName} mist ) install( TARGETS MistOut${outputName} DESTINATION bin @@ -497,18 +538,20 @@ makeOutput(H264 h264 http) makeOutput(HDS hds http) makeOutput(SRT srt http) makeOutput(JSON json http) -if (DEFINED WITH_JPG ) -makeOutput(JPG jpg http jpg) +option(WITH_JPG "Build JPG thumbnailer output support") +if (WITH_JPG) + makeOutput(JPG jpg http jpg) endif() makeOutput(TS ts ts) -makeOutput(TSSRT tssrt ts srt) +if(SRT_LIB) + makeOutput(TSSRT tssrt ts with_srt) +endif() makeOutput(HTTPTS httpts http ts) makeOutput(HLS hls http ts) makeOutput(CMAF cmaf http)#LTS makeOutput(EBML ebml) makeOutput(RTSP rtsp)#LTS makeOutput(WAV wav)#LTS -makeOutput(WebRTC webrtc http)#LTS add_executable(MistProcFFMPEG ${BINARY_DIR}/mist/.headers @@ -545,11 +588,13 @@ add_executable(MistProcLivepeer ) target_link_libraries(MistProcLivepeer mist) -if (NOT DEFINED NOSSL ) +if (NOT NOSSL) makeOutput(HTTPS https)#LTS + makeOutput(WebRTC webrtc http)#LTS endif() -if (DEFINED WITH_SANITY ) +option(WITH_SANITY "Enable MistOutSanityCheck output for testing purposes") +if (WITH_SANITY) makeOutput(SanityCheck sanitycheck)#LTS endif() @@ -739,7 +784,8 @@ set(lspSOURCES ) -if (NOT DEFINED NOGA ) +option(NOGA "Disables Google Analytics entirely in the LSP") +if (NOT NOGA) list(APPEND lspSOURCES ${SOURCE_DIR}/lsp/analytics.js) endif() diff --git a/lib/config.cpp b/lib/config.cpp index 0a9b1ce5..d9a8470b 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -3,7 +3,6 @@ #include "config.h" #include "defines.h" -#include "lib/socket_srt.h" #include "stream.h" #include "timing.h" #include "tinythread.h" @@ -39,7 +38,6 @@ bool Util::Config::is_active = false; bool Util::Config::is_restarting = false; static Socket::Server *serv_sock_pointer = 0; -static Socket::SRTServer *serv_srt_sock_pointer = 0; ///< Holds a pointer to SRT Server, if it is connected uint32_t Util::printDebugLevel = DEBUG; std::string Util::streamName; char Util::exitReason[256] ={0}; @@ -55,13 +53,6 @@ void Util::logExitReason(const char *format, ...){ std::string Util::listenInterface; uint32_t Util::listenPort = 0; -// Sets pointer to the SRT Server, for proper cleanup later. -// -// Currently used for TSSRT Input only, as this doesn't use the config library to setup a listener -void Util::Config::registerSRTSockPtr(Socket::SRTServer *ptr){ - serv_srt_sock_pointer = ptr; -} - Util::Config::Config(){ // global options here vals["debug"]["long"] = "debug"; @@ -331,23 +322,6 @@ struct callbackData{ int (*cb)(Socket::Connection &); }; -// As above, but using an SRT Connection -struct callbackSRTData{ - Socket::SRTConnection *sock; - int (*cb)(Socket::SRTConnection &); -}; - -// Callback for SRT-serving threads -static void callThreadCallbackSRT(void *cDataArg){ - INSANE_MSG("Thread for %p started", cDataArg); - callbackSRTData *cData = (callbackSRTData *)cDataArg; - cData->cb(*(cData->sock)); - cData->sock->close(); - delete cData->sock; - delete cData; - INSANE_MSG("Thread for %p ended", cDataArg); -} - static void callThreadCallback(void *cDataArg){ INSANE_MSG("Thread for %p started", cDataArg); callbackData *cData = (callbackData *)cDataArg; @@ -430,53 +404,6 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ return r; } -// This is a THREADED server!! Fork does not work as the SRT library itself already starts up a -// thread, and forking after thread creation messes up all control flow internal to the library. -int Util::Config::serveSRTSocket(int (*callback)(Socket::SRTConnection &S)){ - Socket::SRTServer server_socket; - if (vals.isMember("port") && vals.isMember("interface")){ - server_socket = Socket::SRTServer(getInteger("port"), getString("interface"), false, "output"); - } - if (!server_socket.connected()){ - DEVEL_MSG("Failure to open socket"); - return 1; - } - serv_srt_sock_pointer = &server_socket; - activate(); - if (server_socket.getSocket()){ - int oldSock = server_socket.getSocket(); - if (!dup2(oldSock, 0)){ - server_socket = Socket::SRTServer(0); - close(oldSock); - } - } - int r = SRTServer(server_socket, callback); - serv_srt_sock_pointer = 0; - return r; -} - -int Util::Config::SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &)){ - Util::Procs::socketList.insert(server_socket.getSocket()); - while (is_active && server_socket.connected()){ - Socket::SRTConnection S = server_socket.accept(false, "output"); - if (S.connected()){// check if the new connection is valid - callbackSRTData *cData = new callbackSRTData; - cData->sock = new Socket::SRTConnection(S); - cData->cb = callback; - // spawn a new thread for this connection - tthread::thread T(callThreadCallbackSRT, (void *)cData); - // detach it, no need to keep track of it anymore - T.detach(); - HIGH_MSG("Spawned new thread for socket %i", S.getSocket()); - }else{ - Util::sleep(10); // sleep 10ms - } - } - Util::Procs::socketList.erase(server_socket.getSocket()); - if (!is_restarting){server_socket.close();} - return 0; -} - int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){ Socket::Server server_socket; if (Socket::checkTrueSocket(0)){ @@ -541,8 +468,6 @@ void Util::Config::signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ case SIGHUP: case SIGTERM: if (serv_sock_pointer){serv_sock_pointer->close();} - // Close the srt server as well, if set - if (serv_srt_sock_pointer){serv_srt_sock_pointer->close();} #if DEBUG >= DLVL_DEVEL static int ctr = 0; if (!is_active && ++ctr > 4){BACKTRACE;} diff --git a/lib/config.h b/lib/config.h index af1e2834..f09eb3ee 100644 --- a/lib/config.h +++ b/lib/config.h @@ -8,7 +8,6 @@ #endif #include "json.h" -#include "socket_srt.h" #include #include @@ -42,7 +41,6 @@ namespace Util{ int64_t getInteger(std::string optname); bool getBool(std::string optname); void activate(); - void registerSRTSockPtr(Socket::SRTServer *ptr); int threadServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S)); int forkServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S)); int serveThreadedSocket(int (*callback)(Socket::Connection &S)); @@ -51,9 +49,6 @@ namespace Util{ void addOptionsFromCapabilities(const JSON::Value &capabilities); void addBasicConnectorOptions(JSON::Value &capabilities); void addConnectorOptions(int port, JSON::Value &capabilities); - - int serveSRTSocket(int (*callback)(Socket::SRTConnection &S)); - int SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &S)); }; /// The interface address the current serveSocket function is listening on diff --git a/lib/socket_srt.cpp b/lib/socket_srt.cpp index fae3cdad..8562089f 100644 --- a/lib/socket_srt.cpp +++ b/lib/socket_srt.cpp @@ -1,6 +1,8 @@ #include "defines.h" #include "lib/http_parser.h" #include "socket_srt.h" +#include "json.h" +#include "timing.h" #include #include @@ -69,13 +71,20 @@ namespace Socket{ return interpretSRTMode(params.count("mode") ? params.at("mode") : "default", u.host, ""); } - SRTConnection::SRTConnection(){initializeEmpty();} + SRTConnection::SRTConnection(){ + initializeEmpty(); + lastGood = Util::bootMS(); + } SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction, const std::map &_params){ connect(_host, _port, _direction, _params); } + SRTConnection::SRTConnection(SRTSOCKET alreadyConnected){ + sock = alreadyConnected; + } + std::string SRTConnection::getStreamName(){ int sNameLen = 512; char sName[sNameLen]; @@ -84,29 +93,89 @@ namespace Socket{ return ""; } - /// Updates the downbuffer internal variable. - /// Returns true if new data was received, false otherwise. - std::string SRTConnection::RecvNow(){ - char recvbuf[5000]; + std::string SRTConnection::getBinHost(){ + char tmpBuffer[17] = "\000\000\000\000\000\000\000\000\000\000\377\377\000\000\000\000"; + switch (remoteaddr.sin6_family){ + case AF_INET: + memcpy(tmpBuffer + 12, &(reinterpret_cast(&remoteaddr)->sin_addr.s_addr), 4); + break; + case AF_INET6: memcpy(tmpBuffer, &(remoteaddr.sin6_addr.s6_addr), 16); break; + default: return ""; break; + } + return std::string(tmpBuffer, 16); + } + + size_t SRTConnection::RecvNow(){ bool blockState = blocking; - setBlocking(true); + if (!blockState){setBlocking(true);} SRT_MSGCTRL mc = srt_msgctrl_default; int32_t receivedBytes = srt_recvmsg2(sock, recvbuf, 5000, &mc); - if (prev_pktseq != 0 && (mc.pktseq - prev_pktseq > 1)){WARN_MSG("Packet lost");} + //if (prev_pktseq != 0 && (mc.pktseq - prev_pktseq > 1)){WARN_MSG("Packet lost");} prev_pktseq = mc.pktseq; - setBlocking(blockState); + if (!blockState){setBlocking(blockState);} if (receivedBytes == -1){ + int err = srt_getlasterror(0); + if (err == SRT_ECONNLOST){ + close(); + return 0; + } + if (err == SRT_ENOCONN){ + if (Util::bootMS() > lastGood + 5000){ + ERROR_MSG("SRT connection timed out - closing"); + close(); + } + return 0; + } ERROR_MSG("Unable to receive data over socket: %s", srt_getlasterror_str()); if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} - return ""; + return 0; + } + if (receivedBytes == 0){ + close(); + }else{ + lastGood = Util::bootMS(); } srt_bstats(sock, &performanceMonitor, false); - return std::string(recvbuf, receivedBytes); + return receivedBytes; + } + + ///Attempts a read, obeying the current blocking setting. + ///May result in socket being disconnected when connection was lost during read. + ///Returns amount of bytes actually read + size_t SRTConnection::Recv(){ + SRT_MSGCTRL mc = srt_msgctrl_default; + int32_t receivedBytes = srt_recvmsg2(sock, recvbuf, 5000, &mc); + prev_pktseq = mc.pktseq; + if (receivedBytes == -1){ + int err = srt_getlasterror(0); + if (err == SRT_EASYNCRCV){return 0;} + if (err == SRT_ECONNLOST){ + close(); + return 0; + } + if (err == SRT_ENOCONN){ + if (Util::bootMS() > lastGood + 5000){ + ERROR_MSG("SRT connection timed out - closing"); + close(); + } + return 0; + } + ERROR_MSG("Unable to receive data over socket: %s", srt_getlasterror_str()); + if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} + return 0; + } + if (receivedBytes == 0){ + close(); + }else{ + lastGood = Util::bootMS(); + } + srt_bstats(sock, &performanceMonitor, false); + return receivedBytes; } void SRTConnection::connect(const std::string &_host, int _port, const std::string &_direction, @@ -143,6 +212,7 @@ namespace Socket{ HIGH_MSG("Going to connect sock %d", sock); if (srt_connect(sock, psa, sizeof sa) == SRT_ERROR){ srt_close(sock); + sock = -1; ERROR_MSG("Can't connect SRT Socket"); return; } @@ -153,6 +223,7 @@ namespace Socket{ return; } INFO_MSG("Caller SRT socket %" PRId32 " success targetting %s:%u", sock, _host.c_str(), _port); + lastGood = Util::bootMS(); return; } if (modeName == "listener"){ @@ -163,14 +234,17 @@ namespace Socket{ if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){ srt_close(sock); - ERROR_MSG("Can't connect SRT Socket"); + sock = -1; + ERROR_MSG("Can't connect SRT Socket: %s", srt_getlasterror_str()); return; } if (srt_listen(sock, 1) == SRT_ERROR){ srt_close(sock); + sock = -1; ERROR_MSG("Can not listen on Socket"); } INFO_MSG("Listener SRT socket sucess @ %s:%u", _host.c_str(), _port); + lastGood = Util::bootMS(); return; } if (modeName == "rendezvous"){ @@ -182,6 +256,7 @@ namespace Socket{ if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){ srt_close(sock); + sock = -1; ERROR_MSG("Can't connect SRT Socket"); return; } @@ -191,6 +266,7 @@ namespace Socket{ if (srt_connect(sock, psb, sizeof sb) == SRT_ERROR){ srt_close(sock); + sock = -1; ERROR_MSG("Can't connect SRT Socket"); return; } @@ -200,6 +276,7 @@ namespace Socket{ return; } INFO_MSG("Rendezvous SRT socket sucess @ %s:%u", _host.c_str(), _port); + lastGood = Util::bootMS(); return; } ERROR_MSG("Invalid mode parameter. Use 'client' or 'server'"); @@ -220,8 +297,23 @@ namespace Socket{ int res = srt_sendmsg2(sock, data, len, NULL); if (res == SRT_ERROR){ + int err = srt_getlasterror(0); + //Do not report normal connection lost errors + if (err == SRT_ECONNLOST){ + close(); + return; + } + if (err == SRT_ENOCONN){ + if (Util::bootMS() > lastGood + 5000){ + ERROR_MSG("SRT connection timed out - closing"); + close(); + } + return; + } ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str()); if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} + }else{ + lastGood = Util::bootMS(); } srt_bstats(sock, &performanceMonitor, false); } @@ -236,16 +328,16 @@ namespace Socket{ uint64_t SRTConnection::dataDown(){return performanceMonitor.byteRecvTotal;} uint64_t SRTConnection::packetCount(){ - return (direction == "input" ? performanceMonitor.pktRecvTotal : performanceMonitor.pktSentTotal); + return (direction == "output" ? performanceMonitor.pktSentTotal : performanceMonitor.pktRecvTotal); } uint64_t SRTConnection::packetLostCount(){ - return (direction == "input" ? performanceMonitor.pktRcvLossTotal : performanceMonitor.pktSndLossTotal); + return (direction == "output" ? performanceMonitor.pktSndLossTotal : performanceMonitor.pktRcvLossTotal); } uint64_t SRTConnection::packetRetransmitCount(){ //\todo This should be updated with pktRcvRetransTotal on the retrieving end once srt has implemented this. - return (direction == "input" ? 0 : performanceMonitor.pktRetransTotal); + return (direction == "output" ? performanceMonitor.pktRetransTotal : 0); } void SRTConnection::initializeEmpty(){ @@ -259,10 +351,8 @@ namespace Socket{ void SRTConnection::setBlocking(bool _blocking){ if (_blocking == blocking){return;} // If we have an error setting the new blocking state, the state is unchanged so we return early. - if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &_blocking, - sizeof _blocking) == -1){ - return; - } + if (srt_setsockopt(sock, 0, SRTO_SNDSYN, &_blocking, sizeof _blocking) == -1){return;} + if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &_blocking, sizeof _blocking) == -1){return;} blocking = _blocking; } @@ -289,7 +379,7 @@ namespace Socket{ if (adapter == "" && modeName == "listener"){adapter = _host;} - tsbpdMode = ((params.count("tsbpd") && isFalseString(params.at("tsbpd"))) ? false : true); + tsbpdMode = (params.count("tsbpd") && JSON::Value(params.at("tsbpd")).asBool()); outgoing_port = (params.count("port") ? strtol(params.at("port").c_str(), 0, 0) : 0); @@ -332,14 +422,11 @@ namespace Socket{ int SRTConnection::postConfigureSocket(){ bool no = false; - if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &no, sizeof no) == -1){ - return -1; - } + if (srt_setsockopt(sock, 0, SRTO_SNDSYN, &no, sizeof no) == -1){return -1;} + if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no) == -1){return -1;} if (timeout){ - if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDTIMEO : SRTO_RCVTIMEO), &timeout, - sizeof timeout) == -1){ - return -1; - } + if (srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &timeout, sizeof timeout) == -1){return -1;} + if (srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &timeout, sizeof timeout) == -1){return -1;} } std::string errMsg = configureSocketLoop(SRT::SockOpt::POST); if (errMsg.size()){ @@ -455,15 +542,7 @@ namespace Socket{ } }break; case SRT::SockOpt::BOOL:{ - bool tmp; - if (isFalseString(v)){ - tmp = true; - }else if (isTrueString(v)){ - tmp = true; - }else{ - return false; - } - val.b = tmp; + val.b = JSON::Value(v).asBool(); val.value = &val.b; val.size = sizeof val.b; }break; diff --git a/lib/socket_srt.h b/lib/socket_srt.h index 7995d938..5101d38d 100644 --- a/lib/socket_srt.h +++ b/lib/socket_srt.h @@ -1,11 +1,8 @@ #pragma once - #include "socket.h" #include "url.h" - #include #include - #include typedef std::map SockOptVals; @@ -13,15 +10,6 @@ typedef std::map paramList; namespace Socket{ std::string interpretSRTMode(const HTTP::URL &u); - - inline bool isFalseString(const std::string &_val){ - return _val == "0" || _val == "no" || _val == "off" || _val == "false"; - } - - inline bool isTrueString(const std::string &_val){ - return _val == "1" || _val == "yes" || _val == "on" || _val == "true"; - } - sockaddr_in createInetAddr(const std::string &_host, int _port); namespace SRT{ @@ -39,7 +27,7 @@ namespace Socket{ class SRTConnection{ public: SRTConnection(); - SRTConnection(SRTSOCKET alreadyConnected){sock = alreadyConnected;} + SRTConnection(SRTSOCKET alreadyConnected); SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input", const paramList &_params = paramList()); @@ -52,7 +40,10 @@ namespace Socket{ void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). bool isBlocking(); ///< Check if this socket is blocking (true) or nonblocking (false). - std::string RecvNow(); + size_t RecvNow(); + size_t Recv(); + char recvbuf[5000]; ///< Buffer where received data is stored in + void SendNow(const std::string &data); void SendNow(const char *data, size_t len); @@ -73,7 +64,7 @@ namespace Socket{ struct sockaddr_in6 remoteaddr; std::string remotehost; - + std::string getBinHost(); private: SRTSOCKET sock; CBytePerfMon performanceMonitor; @@ -81,10 +72,11 @@ namespace Socket{ std::string host; int outgoing_port; int32_t prev_pktseq; + uint64_t lastGood; uint32_t chunkTransmitSize; - // From paramaeter parsing + // From parameter parsing std::string adapter; std::string modeName; int timeout; @@ -100,7 +92,7 @@ namespace Socket{ bool blocking; }; - /// This class is for easily setting up listening socket, either TCP or Unix. + /// This class is for easily setting up a listening SRT socket class SRTServer{ public: SRTServer(); @@ -130,7 +122,6 @@ namespace Socket{ class SocketOption{ public: - //{"enforcedencryption", 0, SRTO_ENFORCEDENCRYPTION, SRT::SockOpt::PRE, SRT::SockOpt::BOOL, nullptr}, SocketOption(const std::string &_name, int _protocol, int _symbol, SRT::SockOpt::Binding _binding, SRT::SockOpt::Type _type, const SockOptVals &_values = SockOptVals()) : name(_name), protocol(_protocol), symbol(_symbol), binding(_binding), type(_type), @@ -142,11 +133,8 @@ namespace Socket{ SRT::SockOpt::Binding binding; SRT::SockOpt::Type type; SockOptVals valmap; - bool apply(int socket, const std::string &value, bool isSrtOpt = true); - static int setSo(int socket, int protocol, int symbol, const void *data, size_t size, bool isSrtOpt = true); - bool extract(const std::string &v, OptionValue &val, SRT::SockOpt::Type asType); }; diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index 90d45957..af37c63e 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -15,6 +15,51 @@ tthread::recursive_mutex tMutex; namespace TS{ + bool Assembler::assemble(Stream & TSStrm, char * ptr, size_t len){ + bool ret = false; + size_t offset = 0; + size_t amount = 188-leftData.size(); + if (leftData.size() && len >= amount){ + //Attempt to re-assemble a packet from the leftovers of last time + current head + if (len == amount || ptr[amount] == 0x47){ + VERYHIGH_MSG("Assembled scrap packet"); + //Success! + leftData.append(ptr, amount); + tsBuf.FromPointer(leftData); + TSStrm.add(tsBuf); + ret = true; + if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} + offset = amount; + leftData.assign(0,0); + } + //On failure, hope we might live to succeed another day + } + // Try to read full TS Packets + // Watch out! We push here to a global, in order for threads to be able to access it. + size_t junk = 0; + while (offset < len){ + if (ptr[offset] == 0x47 && (offset+188 >= len || ptr[offset+188] == 0x47)){// check for sync byte + if (junk){ + INFO_MSG("%zu bytes of non-sync-byte data received", junk); + junk = 0; + } + if (offset + 188 <= len){ + tsBuf.FromPointer(ptr + offset); + TSStrm.add(tsBuf); + if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} + ret = true; + }else{ + leftData.assign(ptr + offset, len - offset); + } + offset += 188; + }else{ + ++junk; + ++offset; + } + } + return ret; + } + void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, uint32_t avail, uint64_t bPos){ if (!p.getCompleteSize()){return;} diff --git a/lib/ts_stream.h b/lib/ts_stream.h index bdefb3c7..6ffaf024 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -106,4 +106,13 @@ namespace TS{ void parsePES(size_t tid, bool finished = false); }; + + class Assembler{ + public: + bool assemble(Stream & TSStrm, char * ptr, size_t len); + private: + Util::ResizeablePointer leftData; + TS::Packet tsBuf; + }; + }// namespace TS diff --git a/src/input/input.cpp b/src/input/input.cpp index 28055928..d39e5dbf 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -893,18 +893,21 @@ namespace Mist{ statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); - statComm.setUp(0); - statComm.setDown(streamByteCount()); statComm.setTime(now - startTime); statComm.setLastSecond(0); - statComm.setHost(getConnectedBinHost()); - handleLossyStats(statComm); + connStats(statComm); } statTimer = Util::bootSecs(); } } } + + void Input::connStats(Comms::Statistics &statComm){ + statComm.setUp(0); + statComm.setDown(streamByteCount()); + statComm.setHost(getConnectedBinHost()); + } void Input::realtimeMainLoop(){ uint64_t statTimer = 0; diff --git a/src/input/input.h b/src/input/input.h index d8aa7928..0e33c254 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -69,11 +69,7 @@ namespace Mist{ virtual void userOnActive(size_t id); virtual void userOnDisconnect(size_t id); virtual void userLeadOut(); - - virtual void handleLossyStats(Comms::Statistics & statComm){} - - virtual bool preventBufferStart() {return false;} - + virtual void connStats(Comms::Statistics & statComm); virtual void parseHeader(); bool bufferFrame(size_t track, uint32_t keyNum); diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 28ccc825..c9959142 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -191,6 +191,7 @@ namespace Mist{ inputProcess = 0; isFinished = false; +#ifndef WITH_SRT { pid_t srt_tx = -1; const char *args[] ={"srt-live-transmit", 0}; @@ -199,12 +200,13 @@ namespace Mist{ capa["source_match"].append("srt://*"); capa["always_match"].append("srt://*"); capa["desc"] = - capa["desc"].asStringRef() + " SRT support (srt://*) is installed and available."; + capa["desc"].asStringRef() + " Non-native SRT support (srt://*) is installed and available."; }else{ capa["desc"] = capa["desc"].asStringRef() + - " To enable SRT support, please install the srt-live-transmit binary."; + " To enable non-native SRT support, please install the srt-live-transmit binary."; } } +#endif capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; capa["optional"]["DVR"]["help"] = @@ -534,43 +536,7 @@ namespace Mist{ gettingData = true; INFO_MSG("Now receiving UDP data..."); } - size_t offset = 0; - size_t amount = 188-leftData.size(); - if (leftData.size() && udpCon.data.size() >= amount){ - //Attempt to re-assemble a packet from the leftovers of last time + current head - if (udpCon.data.size() == amount || udpCon.data[amount] == 0x47){ - VERYHIGH_MSG("Assembled scrap packet"); - //Success! - leftData.append(udpCon.data, amount); - liveStream.add(leftData); - if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} - offset = amount; - leftData.assign(0,0); - } - //On failure, hope we might live to succeed another day - } - // Try to read full TS Packets - // Watch out! We push here to a global, in order for threads to be able to access it. - size_t junk = 0; - while (offset < udpCon.data.size()){ - if (udpCon.data[offset] == 0x47 && (offset+188 >= udpCon.data.size() || udpCon.data[offset+188] == 0x47)){// check for sync byte - if (junk){ - INFO_MSG("%zu bytes of non-sync-byte data received", junk); - junk = 0; - } - if (offset + 188 <= udpCon.data.size()){ - tsBuf.FromPointer(udpCon.data + offset); - liveStream.add(tsBuf); - if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} - }else{ - leftData.assign(udpCon.data + offset, udpCon.data.size() - offset); - } - offset += 188; - }else{ - ++junk; - ++offset; - } - } + assembler.assemble(liveStream, udpCon.data, udpCon.data.size()); } if (!received){ Util::sleep(100); diff --git a/src/input/input_ts.h b/src/input/input_ts.h index af578c87..e810ddb4 100644 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -33,7 +33,7 @@ namespace Mist{ void streamMainLoop(); void finish(); FILE *inFile; ///< The input file with ts data - Util::ResizeablePointer leftData; + TS::Assembler assembler; TS::Stream tsStream; ///< Used for parsing the incoming ts stream Socket::UDPConnection udpCon; Socket::Connection tcpCon; diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp index ab42c036..f0bb48dc 100644 --- a/src/input/input_tssrt.cpp +++ b/src/input/input_tssrt.cpp @@ -24,9 +24,17 @@ Util::Config *cfgPointer = NULL; std::string baseStreamName; +Socket::SRTServer sSock; + +void (*oldSignal)(int, siginfo_t *,void *) = 0; + +void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ + sSock.close(); + if (oldSignal){ + oldSignal(signum, sigInfo, ignore); + } +} -/// Global, so that all tracks stay in sync -int64_t timeStampOffset = 0; // We use threads here for multiple input pushes, because of the internals of the SRT Library static void callThreadCallbackSRT(void *socknum){ @@ -54,8 +62,10 @@ namespace Mist{ capa["codecs"][0u][0u].append("HEVC"); capa["codecs"][0u][0u].append("MPEG2"); capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("MP3"); capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); + capa["codecs"][0u][1u].append("opus"); JSON::Value option; option["arg"] = "integer"; @@ -64,6 +74,7 @@ namespace Mist{ option["help"] = "DVR buffer time in ms"; option["value"].append(50000); config->addOption("bufferTime", option); + option.null(); capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in milliseconds. This is the time " @@ -73,13 +84,44 @@ namespace Mist{ capa["optional"]["DVR"]["type"] = "uint"; capa["optional"]["DVR"]["default"] = 50000; + option["arg"] = "integer"; + option["long"] = "acceptable"; + option["short"] = "T"; + option["help"] = "Acceptable pushed streamids (0 = use streamid as wildcard, 1 = ignore all streamids, 2 = disallow non-matching streamids)"; + option["value"].append(0); + config->addOption("acceptable", option); + capa["optional"]["acceptable"]["name"] = "Acceptable pushed streamids"; + capa["optional"]["acceptable"]["help"] = "What to do with the streamids for incoming pushes, if this is a listener SRT connection"; + capa["optional"]["acceptable"]["option"] = "--acceptable"; + capa["optional"]["acceptable"]["short"] = "T"; + capa["optional"]["acceptable"]["default"] = 0; + capa["optional"]["acceptable"]["type"] = "select"; + capa["optional"]["acceptable"]["select"][0u][0u] = 0; + capa["optional"]["acceptable"]["select"][0u][1u] = "Set streamid as wildcard"; + capa["optional"]["acceptable"]["select"][1u][0u] = 1; + capa["optional"]["acceptable"]["select"][1u][1u] = "Ignore all streamids"; + capa["optional"]["acceptable"]["select"][2u][0u] = 2; + capa["optional"]["acceptable"]["select"][2u][1u] = "Disallow non-matching streamid"; + + // Setup if we are called form with a thread for push-based input. if (s != -1){ srtConn = Socket::SRTConnection(s); streamName = baseStreamName; - if (srtConn.getStreamName() != ""){streamName += "+" + srtConn.getStreamName();} + std::string streamid = srtConn.getStreamName(); + int64_t acc = config->getInteger("acceptable"); + if (acc == 0){ + if (streamid.size()){streamName += "+" + streamid;} + }else if(acc == 2){ + if (streamName != streamid){ + FAIL_MSG("Stream ID '%s' does not match stream name, push blocked", streamid.c_str()); + srtConn.close(); + } + } + Util::setStreamName(streamName); } lastTimeStamp = 0; + timeStampOffset = 0; singularFlag = true; } @@ -96,9 +138,27 @@ namespace Mist{ INFO_MSG("Parsed url: %s", u.getUrl().c_str()); if (Socket::interpretSRTMode(u) == "listener"){ sSock = Socket::SRTServer(u.getPort(), u.host, false); - config->registerSRTSockPtr(&sSock); + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = signal_handler; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_SIGINFO; + sigaction(SIGINT, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGHUP, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGTERM, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } }else{ - INFO_MSG("A"); std::map arguments; HTTP::parseVars(u.args, arguments); size_t connectCnt = 0; @@ -117,34 +177,12 @@ namespace Mist{ void inputTSSRT::getNext(size_t idx){ thisPacket.null(); bool hasPacket = tsStream.hasPacket(); - bool firstloop = true; - while (!hasPacket && srtConn.connected() && config->is_active){ - firstloop = false; - // Receive data from the socket. SRT Sockets handle some internal timing as well, based on the provided settings. - leftBuffer.append(srtConn.RecvNow()); - if (leftBuffer.size()){ - size_t offset = 0; - size_t garbage = 0; - while ((offset + 188) < leftBuffer.size()){ - if (leftBuffer[offset] != 0x47){ - ++garbage; - if (garbage % 100 == 0){INFO_MSG("Accumulated %zu bytes of garbage", garbage);} - ++offset; - continue; - } - if (garbage != 0){ - WARN_MSG("Thrown away %zu bytes of garbage data", garbage); - garbage = 0; - } - if (offset + 188 <= leftBuffer.size()){ - tsBuf.FromPointer(leftBuffer.data() + offset); - tsStream.parse(tsBuf, 0); - offset += 188; - } - } - leftBuffer.erase(0, offset); - hasPacket = tsStream.hasPacket(); - }else if (srtConn.connected()){ + while (!hasPacket && srtConn && config->is_active){ + + size_t recvSize = srtConn.RecvNow(); + if (recvSize){ + if (assembler.assemble(tsStream, srtConn.recvbuf, recvSize, true)){hasPacket = tsStream.hasPacket();} + }else if (srtConn){ // This should not happen as the SRT socket is read blocking and won't return until there is // data. But if it does, wait before retry Util::sleep(10); @@ -153,7 +191,12 @@ namespace Mist{ if (hasPacket){tsStream.getEarliestPacket(thisPacket);} if (!thisPacket){ - INFO_MSG("Could not getNext TS packet!"); + if (srtConn){ + INFO_MSG("Could not getNext TS packet!"); + Util::logExitReason("internal TS parser error"); + }else{ + Util::logExitReason("SRT connection close"); + } return; } @@ -208,7 +251,10 @@ namespace Mist{ void inputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;} - void inputTSSRT::handleLossyStats(Comms::Statistics &statComm){ + void inputTSSRT::connStats(Comms::Statistics &statComm){ + statComm.setUp(srtConn.dataUp()); + statComm.setDown(srtConn.dataDown()); + statComm.setHost(getConnectedBinHost()); statComm.setPacketCount(srtConn.packetCount()); statComm.setPacketLostCount(srtConn.packetLostCount()); statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h index 5af13ba7..c532c041 100644 --- a/src/input/input_tssrt.h +++ b/src/input/input_tssrt.h @@ -8,13 +8,17 @@ #include namespace Mist{ - /// This class contains all functions needed to implement TS Input + class inputTSSRT : public Input{ public: inputTSSRT(Util::Config *cfg, SRTSOCKET s = -1); ~inputTSSRT(); void setSingular(bool newSingular); virtual bool needsLock(); + virtual std::string getConnectedBinHost(){ + if (srtConn){return srtConn.getBinHost();} + return Input::getConnectedBinHost(); + } protected: // Private Functions @@ -22,7 +26,6 @@ namespace Mist{ bool preRun(); virtual void getNext(size_t idx = INVALID_TRACK_ID); virtual bool needHeader(){return false;} - virtual bool preventBufferStart(){return srtConn.getSocket() == -1;} virtual bool isSingular(){return singularFlag;} virtual bool isThread(){return !singularFlag;} @@ -31,17 +34,14 @@ namespace Mist{ void streamMainLoop(); TS::Stream tsStream; ///< Used for parsing the incoming ts stream TS::Packet tsBuf; - std::string leftBuffer; + TS::Assembler assembler; + int64_t timeStampOffset; uint64_t lastTimeStamp; - Socket::SRTServer sSock; Socket::SRTConnection srtConn; bool singularFlag; size_t tmpIdx; - virtual size_t streamByteCount(){ - return srtConn.dataDown(); - }; // For live streams: to update the stats with correct values. - virtual void handleLossyStats(Comms::Statistics &statComm); + virtual void connStats(Comms::Statistics &statComm); }; }// namespace Mist diff --git a/src/output/mist_out_srt.cpp b/src/output/mist_out_srt.cpp index ea73cf8d..7fed8eec 100644 --- a/src/output/mist_out_srt.cpp +++ b/src/output/mist_out_srt.cpp @@ -5,20 +5,51 @@ #include #include -int spawnForked(Socket::SRTConnection &S){ - int fds[2]; - pipe(fds); - Socket::Connection Sconn(fds[0], fds[1]); +Socket::SRTServer server_socket; +static uint64_t sockCount = 0; - mistOut tmp(Sconn, S.getSocket()); - return tmp.run(); +void (*oldSignal)(int, siginfo_t *,void *) = 0; +void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ + server_socket.close(); + if (oldSignal){ + oldSignal(signum, sigInfo, ignore); + } } void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){ - HIGH_MSG("USR1 received - triggering rolling restart"); - Util::Config::is_restarting = true; - Util::logExitReason("signal USR1"); - Util::Config::is_active = false; + if (!sockCount){ + INFO_MSG("USR1 received - triggering rolling restart (no connections active)"); + Util::Config::is_restarting = true; + Util::logExitReason("signal USR1, no connections"); + server_socket.close(); + Util::Config::is_active = false; + }else{ + INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero"); + Util::Config::is_restarting = true; + Util::logExitReason("signal USR1, after disconnect wait"); + } +} + +// Callback for SRT-serving threads +static void callThreadCallbackSRT(void *srtPtr){ + sockCount++; + Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr; + int fds[2]; + pipe(fds); + Socket::Connection Sconn(fds[0], fds[1]); + HIGH_MSG("Started thread for socket %i", srtSock.getSocket()); + mistOut tmp(Sconn,srtSock); + tmp.run(); + HIGH_MSG("Closing thread for socket %i", srtSock.getSocket()); + Sconn.close(); + srtSock.close(); + delete &srtSock; + sockCount--; + if (!sockCount && Util::Config::is_restarting){ + server_socket.close(); + Util::Config::is_active = false; + INFO_MSG("Last active connection closed; triggering rolling restart now!"); + } } int main(int argc, char *argv[]){ @@ -41,15 +72,56 @@ int main(int argc, char *argv[]){ new_action.sa_flags = 0; sigaction(SIGUSR1, &new_action, NULL); } - mistOut::listener(conf, spawnForked); - if (conf.is_restarting && Socket::checkTrueSocket(0)){ - INFO_MSG("Reloading input while re-using server socket"); + if (conf.getInteger("port") && conf.getString("interface").size()){ + server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output"); + } + if (!server_socket.connected()){ + DEVEL_MSG("Failure to open socket"); + return 1; + } + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = signal_handler; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_SIGINFO; + sigaction(SIGINT, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGHUP, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGTERM, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + Util::Procs::socketList.insert(server_socket.getSocket()); + while (conf.is_active && server_socket.connected()){ + Socket::SRTConnection S = server_socket.accept(false, "output"); + if (S.connected()){// check if the new connection is valid + // spawn a new thread for this connection + tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S)); + // detach it, no need to keep track of it anymore + T.detach(); + }else{ + Util::sleep(10); // sleep 10ms + } + } + Util::Procs::socketList.erase(server_socket.getSocket()); + server_socket.close(); + if (conf.is_restarting){ + INFO_MSG("Reloading input..."); execvp(argv[0], argv); FAIL_MSG("Error reloading: %s", strerror(errno)); } }else{ Socket::Connection S(fileno(stdout), fileno(stdin)); - mistOut tmp(S, -1); + Socket::SRTConnection tmpSock; + mistOut tmp(S, tmpSock); return tmp.run(); } } diff --git a/src/output/output.cpp b/src/output/output.cpp index c8db21e0..bcdd97c0 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1686,9 +1686,7 @@ namespace Mist{ statComm.setCRC(crc); statComm.setStream(streamName); statComm.setConnector(getStatsName()); - statComm.setUp(myConn.dataUp()); - statComm.setDown(myConn.dataDown()); - statComm.setTime(now - myConn.connTime()); + connStats(now, statComm); statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0); statComm.setPid(getpid()); @@ -1722,6 +1720,12 @@ namespace Mist{ } } + void Output::connStats(uint64_t now, Comms::Statistics &statComm){ + statComm.setUp(myConn.dataUp()); + statComm.setDown(myConn.dataDown()); + statComm.setTime(now - myConn.connTime()); + } + bool Output::dropPushTrack(uint32_t trackId, const std::string & dropReason){ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ if (it->second.getTrack() == trackId){ diff --git a/src/output/output.h b/src/output/output.h index 060634e3..0873278a 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -64,7 +64,7 @@ namespace Mist{ /// This function is called whenever a packet is ready for sending. /// Inside it, thisPacket is guaranteed to contain a valid packet. virtual void sendNext(){}// REQUIRED! Others are optional. - virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason); + virtual bool dropPushTrack(uint32_t trackId, const std::string &dropReason); bool getKeyFrame(); bool prepareNext(); virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true); @@ -103,10 +103,10 @@ namespace Mist{ uint64_t lastStats; ///< Time of last sending of stats. std::set buffer; ///< A sorted list of next-to-be-loaded packets. - bool sought; ///< If a seek has been done, this is set to true. Used for seeking on - ///< prepareNext(). + bool sought; ///< If a seek has been done, this is set to true. Used for seeking on + ///< prepareNext(). std::string prevHost; ///< Old value for getConnectedBinHost, for caching - protected: // these are to be messed with by child classes + protected: // these are to be messed with by child classes virtual bool inlineRestartCapable() const{ return false; }///< True if the output is capable of restarting mid-stream. This is used for swapping recording files @@ -122,6 +122,8 @@ namespace Mist{ virtual std::string getStatsName(); virtual bool hasSessionIDs(){return false;} + virtual void connStats(uint64_t now, Comms::Statistics &statComm); + std::set getSupportedTracks(const std::string &type = "") const; inline virtual bool keepGoing(){return config->is_active && myConn;} diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index ddf9ee6f..1e5e051a 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -77,6 +77,7 @@ namespace Mist{ capa["push_urls"].append("/*.ts"); capa["push_urls"].append("ts-exec:*"); +#ifndef WITH_SRT { pid_t srt_tx = -1; const char *args[] ={"srt-live-transmit", 0}; @@ -84,13 +85,14 @@ namespace Mist{ if (srt_tx > 1){ capa["push_urls"].append("srt://*"); capa["desc"] = capa["desc"].asStringRef() + - ". SRT push output support (srt://*) is installed and available."; + ". Non-native SRT push output support (srt://*) is installed and available."; }else{ capa["desc"] = capa["desc"].asStringRef() + - ". To enable SRT push output support, please install the srt-live-transmit binary."; + ". To enable non-native SRT push output support, please install the srt-live-transmit binary."; } } +#endif JSON::Value opt; opt["arg"] = "string"; diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index a18c9a4f..18a122b8 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -1,19 +1,22 @@ -#include "mist/socket_srt.h" +#include #include "output_tssrt.h" #include #include #include +#include +#include +#include namespace Mist{ - OutTSSRT::OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock) : TSOutput(conn){ + OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){ // NOTE: conn is useless for SRT, as it uses a different socket type. sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec) streamName = config->getString("streamname"); + Util::setStreamName(streamName); pushOut = false; - std::string tracks; // Push output configuration if (config->getString("target").size()){ - HTTP::URL target(config->getString("target")); + target = HTTP::URL(config->getString("target")); if (target.protocol != "srt"){ FAIL_MSG("Target %s must begin with srt://, aborting", target.getUrl().c_str()); onFail("Invalid srt target: doesn't start with srt://", true); @@ -25,29 +28,108 @@ namespace Mist{ return; } pushOut = true; - if (targetParams.count("tracks")){tracks = targetParams["tracks"];} + std::map arguments; + HTTP::parseVars(target.args, arguments); + for (std::map::iterator it = arguments.begin(); it != arguments.end(); ++it){ + targetParams[it->first] = it->second; + } size_t connectCnt = 0; do{ - srtConn.connect(target.host, target.getPort(), "output"); - if (!srtConn){Util::sleep(1000);} + srtConn.connect(target.host, target.getPort(), "output", targetParams); + if (!srtConn){ + Util::sleep(1000); + }else{ + INFO_MSG("Connect success on attempt %zu", connectCnt+1); + break; + } ++connectCnt; - }while (!srtConn && connectCnt < 10); + }while (!srtConn && connectCnt < 5); wantRequest = false; parseData = true; + initialize(); }else{ // Pull output configuration, In this case we have an srt connection in the second constructor parameter. - srtConn = Socket::SRTConnection(_srtSock); - parseData = true; - wantRequest = false; - // Handle override / append of streamname options std::string sName = srtConn.getStreamName(); if (sName != ""){ streamName = sName; - HIGH_MSG("Requesting stream %s", streamName.c_str()); + Util::sanitizeName(streamName); + Util::setStreamName(streamName); } + + int64_t accTypes = config->getInteger("acceptable"); + if (accTypes == 0){//Allow both directions + srtConn.setBlocking(false); + //Try to read the socket 10 times. If any reads succeed, assume they are pushing in + size_t retries = 60; + while (!accTypes && srtConn && retries){ + size_t recvSize = srtConn.Recv(); + if (recvSize){ + accTypes = 2; + INFO_MSG("Connection put into ingest mode"); + assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true); + }else{ + Util::sleep(50); + } + --retries; + } + //If not, assume they are receiving. + if (!accTypes){ + accTypes = 1; + INFO_MSG("Connection put into egress mode"); + } + } + if (accTypes == 1){// Only allow outgoing + srtConn.setBlocking(true); + srtConn.direction = "output"; + parseData = true; + wantRequest = false; + initialize(); + }else if (accTypes == 2){//Only allow incoming + srtConn.setBlocking(false); + srtConn.direction = "input"; + if (Triggers::shouldTrigger("PUSH_REWRITE")){ + HTTP::URL reqUrl; + reqUrl.protocol = "srt"; + reqUrl.port = config->getString("port"); + reqUrl.host = config->getString("interface"); + reqUrl.args = "streamid="+Encodings::URL::encode(sName); + std::string payload = reqUrl.getUrl() + "\n" + getConnectedHost(); + std::string newUrl = ""; + Triggers::doTrigger("PUSH_REWRITE", payload, "", false, newUrl); + if (!newUrl.size()){ + FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL", + getConnectedHost().c_str(), reqUrl.getUrl().c_str()); + Util::logExitReason( + "Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL", + getConnectedHost().c_str(), reqUrl.getUrl().c_str()); + onFinish(); + return; + } + reqUrl = HTTP::URL(newUrl); + if (reqUrl.args.size()){ + std::map args; + HTTP::parseVars(reqUrl.args, args); + if (args.count("streamid")){ + streamName = args["streamid"]; + Util::sanitizeName(streamName); + Util::setStreamName(streamName); + } + } + } + myConn.setHost(srtConn.remotehost); + if (!allowPush("")){ + onFinish(); + srtConn.close(); + return; + } + parseData = false; + wantRequest = true; + } + } - initialize(); + lastTimeStamp = 0; + timeStampOffset = 0; } OutTSSRT::~OutTSSRT(){} @@ -58,13 +140,29 @@ namespace Mist{ capa["friendly"] = "TS over SRT"; capa["desc"] = "Real time streaming of TS data over SRT"; capa["deps"] = ""; - capa["required"]["streamname"]["name"] = "Stream"; - capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add " - "this protocol multiple times using different ports, " - "or use the streamid option on the srt connection"; - capa["required"]["streamname"]["type"] = "str"; - capa["required"]["streamname"]["option"] = "--stream"; - capa["required"]["streamname"]["short"] = "s"; + + capa["optional"]["streamname"]["name"] = "Stream"; + capa["optional"]["streamname"]["help"] = "What streamname to serve if no streamid is given by the other end of the connection"; + capa["optional"]["streamname"]["type"] = "str"; + capa["optional"]["streamname"]["option"] = "--stream"; + capa["optional"]["streamname"]["short"] = "s"; + capa["optional"]["streamname"]["default"] = ""; + + capa["optional"]["acceptable"]["name"] = "Acceptable connection types"; + capa["optional"]["acceptable"]["help"] = + "Whether to allow only incoming pushes (2), only outgoing pulls (1), or both (0, default)"; + capa["optional"]["acceptable"]["option"] = "--acceptable"; + capa["optional"]["acceptable"]["short"] = "T"; + capa["optional"]["acceptable"]["default"] = 0; + capa["optional"]["acceptable"]["type"] = "select"; + capa["optional"]["acceptable"]["select"][0u][0u] = 0; + capa["optional"]["acceptable"]["select"][0u][1u] = + "Allow both incoming and outgoing connections"; + capa["optional"]["acceptable"]["select"][1u][0u] = 1; + capa["optional"]["acceptable"]["select"][1u][1u] = "Allow only outgoing connections"; + capa["optional"]["acceptable"]["select"][2u][0u] = 2; + capa["optional"]["acceptable"]["select"][2u][1u] = "Allow only incoming connections"; + capa["codecs"][0u][0u].append("HEVC"); capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("MPEG2"); @@ -72,6 +170,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("MP3"); capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); + capa["codecs"][0u][1u].append("opus"); cfg->addConnectorOptions(8889, capa); config = cfg; capa["push_urls"].append("srt://*"); @@ -84,29 +183,88 @@ namespace Mist{ cfg->addOption("target", opt); } - // Buffer internally in the class, and send once we have over 1000 bytes of data. + // Buffers TS packets and sends after 7 are buffered. void OutTSSRT::sendTS(const char *tsData, size_t len){ - if (packetBuffer.size() >= 1000){ - srtConn.SendNow(packetBuffer); - if (!srtConn){ - // Allow for proper disconnect - parseData = false; - } - packetBuffer.clear(); - } packetBuffer.append(tsData, len); + if (packetBuffer.size() >= 1316){//7 whole TS packets + if (!srtConn){ + if (config->getString("target").size()){ + INFO_MSG("Reconnecting..."); + srtConn.connect(target.host, target.getPort(), "output", targetParams); + if (!srtConn){Util::sleep(500);} + }else{ + Util::logExitReason("SRT connection closed"); + myConn.close(); + parseData = false; + return; + } + } + if (srtConn){ + srtConn.SendNow(packetBuffer, packetBuffer.size()); + if (!srtConn){ + if (!config->getString("target").size()){ + Util::logExitReason("SRT connection closed"); + myConn.close(); + parseData = false; + } + } + } + packetBuffer.assign(0,0); + } } - bool OutTSSRT::setAlternateConnectionStats(Comms::Statistics &statComm){ + void OutTSSRT::requestHandler(){ + size_t recvSize = srtConn.Recv(); + if (!recvSize){ + if (!srtConn){ + myConn.close(); + wantRequest = false; + }else{ + Util::sleep(50); + } + return; + } + lastRecv = Util::bootSecs(); + if (!assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true)){return;} + while (tsIn.hasPacket()){ + tsIn.getEarliestPacket(thisPacket); + if (!thisPacket){ + INFO_MSG("Could not get TS packet"); + myConn.close(); + wantRequest = false; + return; + } + + tsIn.initializeMetadata(meta); + size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); + if (thisIdx == INVALID_TRACK_ID){return;} + if (!userSelect.count(thisIdx)){ + userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + } + + uint64_t adjustTime = thisPacket.getTime() + timeStampOffset; + if (lastTimeStamp || timeStampOffset){ + if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){ + INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.", + PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime)); + timeStampOffset += (lastTimeStamp - adjustTime); + adjustTime = thisPacket.getTime() + timeStampOffset; + } + } + lastTimeStamp = adjustTime; + thisPacket.setTime(adjustTime); + bufferLivePacket(thisPacket); + } + } + + void OutTSSRT::connStats(uint64_t now, Comms::Statistics &statComm){ + if (!srtConn){return;} statComm.setUp(srtConn.dataUp()); statComm.setDown(srtConn.dataDown()); - statComm.setTime(Util::bootSecs() - srtConn.connTime()); - return true; - } - - void OutTSSRT::handleLossyStats(Comms::Statistics &statComm){ + statComm.setTime(now - srtConn.connTime()); statComm.setPacketCount(srtConn.packetCount()); statComm.setPacketLostCount(srtConn.packetLostCount()); statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); } + }// namespace Mist diff --git a/src/output/output_tssrt.h b/src/output/output_tssrt.h index 3f671dbf..1423af8d 100644 --- a/src/output/output_tssrt.h +++ b/src/output/output_tssrt.h @@ -1,12 +1,11 @@ #include "output_ts_base.h" #include - #include namespace Mist{ class OutTSSRT : public TSOutput{ public: - OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock); + OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock); ~OutTSSRT(); static bool listenMode(){return !(config->getString("target").size());} @@ -14,19 +13,23 @@ namespace Mist{ static void init(Util::Config *cfg); void sendTS(const char *tsData, size_t len = 188); bool isReadyForPlay(){return true;} - + virtual void requestHandler(); protected: - // Stats handling - virtual bool setAlternateConnectionStats(Comms::Statistics &statComm); - virtual void handleLossyStats(Comms::Statistics &statComm); + virtual void connStats(uint64_t now, Comms::Statistics &statComm); + virtual std::string getConnectedHost(){return srtConn.remotehost;} + virtual std::string getConnectedBinHost(){return srtConn.getBinHost();} private: + HTTP::URL target; + int64_t timeStampOffset; + uint64_t lastTimeStamp; bool pushOut; - std::string packetBuffer; + Util::ResizeablePointer packetBuffer; Socket::UDPConnection pushSock; TS::Stream tsIn; + TS::Assembler assembler; - Socket::SRTConnection srtConn; + Socket::SRTConnection & srtConn; }; }// namespace Mist