diff --git a/CMakeLists.txt b/CMakeLists.txt
index 154db578..3387afdb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -17,6 +17,8 @@ set( CMAKE_EXPORT_COMPILE_COMMANDS ON ) #For YCM support
include_directories(${SOURCE_DIR})
include_directories(${BINARY_DIR} ${BINARY_DIR}/generated)
+option(BUILD_SHARED_LIBS "Build the libraries as shared (default = static)")
+
########################################
# Testing - Enable Tests #
########################################
@@ -59,80 +61,88 @@ string(STRIP "${PACKAGE_VERSION_RAW}" PACKAGE_VERSION)
set(PACKAGE_VERSION \"${PACKAGE_VERSION}\" )
########################################
-# Build Variables - Debug #
+# Build Variables - Everything else #
########################################
if (NOT DEBUG)
set(DEBUG 4)
endif()
-########################################
-# Build Variables - Shared Memory #
-########################################
-if (NOT DEFINED NOSHM )
+option(NOSHM "Disabled shared memory (falling back to shared temporary files)")
+if (NOT NOSHM)
add_definitions(-DSHM_ENABLED=1)
+else()
+ message("Shared memory use is turned OFF")
endif()
-
if (NOT DEFINED FILLER_DATA OR NOT DEFINED SHARED_SECRET OR NOT DEFINED SUPER_SECRET)#LTS
message(WARNING "Not all LTS variables have been set and this is an LTS build - are you sure about this?")#LTS
endif()#LTS
add_definitions(-DFILLER_DATA="${FILLER_DATA}" -DSHARED_SECRET="${SHARED_SECRET}" -DSUPER_SECRET="${SUPER_SECRET}")#LTS
-if (DEFINED GEOIP )
+
+option(GEOIP "Enable GeoIP capabilities (deprecated)")
+if (GEOIP)
add_definitions(-DGEOIP=1)
+ message("GeoIP is turned ON")
endif()
-if (DEFINED BIGMETA )
- add_definitions(-DBIGMETA=1)
-endif()
-if (NOT DEFINED NOSSL )
+
+option(NOSSL "Disable SSL/TLS support")
+if (NOT NOSSL)
add_definitions(-DSSL=1)
+else()
+ message("SSL/TLS support is turned OFF")
endif()
+
if (DEFINED DATASIZE )
add_definitions(-DSHM_DATASIZE=${DATASIZE})
endif()
+
if (DEFINED STAT_CUTOFF )
add_definitions(-DSTAT_CUTOFF=${STAT_CUTOFF})
endif()
-if (NOT DEFINED NOUPDATE )
+
+option(NOUPDATE "Disable the updater")
+if (NOT NOUPDATE)
add_definitions(-DUPDATER=1)
endif()
-if (NOT DEFINED PERPETUAL )
+
+option(PERPETUAL "Disable the licensing system")
+if (NOT PERPETUAL)
add_definitions(-DLICENSING=1)
endif()
-if (DEFINED NOAUTH )
+
+option(NOAUTH "Disable API authentication entirely (insecure!)")
+if (NOAUTH)
add_definitions(-DNOAUTH=1)
endif()
-if (DEFINED KILLONEXIT )
+
+option(KILLONEXIT "Kill all processes on exit, ensuring nothing is running anymore (disables rolling restart/update support)")
+if (KILLONEXIT)
add_definitions(-DKILLONEXIT=true)
endif()
+
if (DEFINED UDP_API_HOST )
add_definitions(-DUDP_API_HOST=${UDP_API_HOST})
endif()
+
if (DEFINED UDP_API_PORT )
add_definitions(-DUDP_API_PORT=${UDP_API_PORT})
endif()
-if (NOT DEFINED APPNAME )
- set(APPNAME "MistServer")
-endif()
+
+set(APPNAME "MistServer" CACHE STRING "Name of the application, as used in user agent strings and the like")
add_definitions(-DAPPNAME="${APPNAME}")
-########################################
-# Build Variables - Thread Names #
-########################################
-if (DEFINED WITH_THREADNAMES )
+option(WITH_THREADNAMES "Enable fancy names for threads (not supported on all platforms)")
+if (WITH_THREADNAMES)
add_definitions(-DWITH_THREADNAMES=1)
endif()
-########################################
-# Build Variables - No Crash Check #
-########################################
-if (DEFINED NOCRASHCHECK )
+option(NOCRASHCHECK "Disables the crash check in the controller stats and input userpages. Prevents killing processes that are stalled/stuck.")
+if (NOCRASHCHECK)
add_definitions(-DNOCRASHCHECK=1)
endif()
-########################################
-# Build Variables - Stats delay overrid#
-########################################
-if (DEFINED STATS_DELAY )
+
+if (DEFINED STATS_DELAY)
add_definitions(-DSTATS_DELAY=${STATS_DELAY})
endif()
@@ -143,6 +153,20 @@ message("Builing release ${RELEASE} for version ${PACKAGE_VERSION} @ debug level
add_definitions(-g -funsigned-char -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -DDEBUG=${DEBUG} -DPACKAGE_VERSION=${PACKAGE_VERSION} -DRELEASE=${RELEASE})
add_definitions(-Wall -Wno-sign-compare -Wparentheses)
+option(NOSRT "Disable building native SRT support, regardless of library being present (by default SRT is enabled if libraries are installed)")
+if (NOT NOSRT)
+ find_library(SRT_LIB srt)
+ if(SRT_LIB)
+ add_definitions(-DWITH_SRT=1)
+ message("Building with SRT")
+ else()
+ message("Building without native SRT support")
+ endif()
+else()
+ message("Building without native SRT support")
+endif()
+
+
########################################
# MistLib - Header Files #
########################################
@@ -187,7 +211,6 @@ set(libHeaders
lib/sdp_media.h
lib/shared_memory.h
lib/socket.h
- lib/socket_srt.h
lib/srtp.h
lib/stream.h
lib/stun.h
@@ -208,6 +231,10 @@ set(libHeaders
lib/urireader.h
)
+if(SRT_LIB)
+ list(APPEND libHeaders lib/socket_srt.h)
+endif()
+
########################################
# MistLib - Build #
########################################
@@ -250,7 +277,6 @@ add_library (mist
lib/sdp_media.cpp
lib/shared_memory.cpp
lib/socket.cpp
- lib/socket_srt.cpp
lib/srtp.cpp
lib/stream.cpp
lib/stun.cpp
@@ -276,9 +302,8 @@ endif()
target_link_libraries(mist
-lpthread
${LIBRT}
- -lsrt
)
-if (NOT DEFINED NOSSL )
+if (NOT NOSSL)
target_link_libraries(mist mbedtls mbedx509 mbedcrypto srtp2)
endif()
install(
@@ -290,6 +315,16 @@ install(
DESTINATION lib
)
+
+if(SRT_LIB)
+ add_library(mist_srt lib/socket_srt.h lib/socket_srt.cpp)
+ target_link_libraries(mist_srt mist srt)
+ install(
+ TARGETS mist_srt
+ DESTINATION lib
+ )
+endif()
+
########################################
# MistLib - Local Header Install #
########################################
@@ -376,7 +411,8 @@ makeUtil(RAX rax)
makeUtil(AMF amf)
makeUtil(Certbot certbot)
makeUtil(Nuke nuke)
-if (DEFINED LOAD_BALANCE )
+option(LOAD_BALANCE "Build the load balancer")
+if (LOAD_BALANCE)
makeUtil(Load load)
endif()
#LTS_END
@@ -400,6 +436,9 @@ macro(makeInput inputName format)
src/io.cpp
${BINARY_DIR}/mist/.headers
)
+ if (";${ARGN};" MATCHES ";with_srt;")
+ target_link_libraries(MistIn${inputName} mist_srt )
+ endif()
#Set compile definitions
unset(my_definitions)
@@ -409,9 +448,7 @@ macro(makeInput inputName format)
PROPERTIES COMPILE_DEFINITIONS "${my_definitions}"
)
- target_link_libraries(MistIn${inputName}
- mist
- )
+ target_link_libraries(MistIn${inputName} mist)
install(
TARGETS MistIn${inputName}
DESTINATION bin
@@ -422,7 +459,8 @@ makeInput(HLS hls)
makeInput(DTSC dtsc)
makeInput(MP3 mp3)
makeInput(FLV flv)
-if (DEFINED WITH_AV )
+option(WITH_AV "Build a generic libav-based input (not distributable!)")
+if (WITH_AV)
makeInput(AV av)
target_link_libraries(MistInAV avformat avcodec avutil)
endif()
@@ -437,9 +475,11 @@ makeInput(Folder folder)#LTS
makeInput(Playlist playlist)#LTS
makeInput(Balancer balancer)#LTS
makeInput(RTSP rtsp)#LTS
-
makeInput(SRT srt)#LTS
-makeInput(TSSRT tssrt)#LTS
+
+if(SRT_LIB)
+ makeInput(TSSRT tssrt with_srt)#LTS
+endif()
########################################
# MistServer - Outputs #
@@ -454,7 +494,7 @@ macro(makeOutput outputName format)
SET(tsBaseClass HTTPOutput)
endif()
endif()
- if (";${ARGN};" MATCHES ";srt;")
+ if (";${ARGN};" MATCHES ";with_srt;")
SET(outBaseFile src/output/mist_out_srt.cpp)
endif()
if (";${ARGN};" MATCHES ";ts;")
@@ -476,9 +516,10 @@ macro(makeOutput outputName format)
set_target_properties(MistOut${outputName}
PROPERTIES COMPILE_DEFINITIONS "OUTPUTTYPE=\"output_${format}.h\";TS_BASECLASS=${tsBaseClass}"
)
- target_link_libraries(MistOut${outputName}
- mist
- )
+ if (";${ARGN};" MATCHES ";with_srt;")
+ target_link_libraries(MistOut${outputName} mist_srt)
+ endif()
+ target_link_libraries(MistOut${outputName} mist )
install(
TARGETS MistOut${outputName}
DESTINATION bin
@@ -497,18 +538,20 @@ makeOutput(H264 h264 http)
makeOutput(HDS hds http)
makeOutput(SRT srt http)
makeOutput(JSON json http)
-if (DEFINED WITH_JPG )
-makeOutput(JPG jpg http jpg)
+option(WITH_JPG "Build JPG thumbnailer output support")
+if (WITH_JPG)
+ makeOutput(JPG jpg http jpg)
endif()
makeOutput(TS ts ts)
-makeOutput(TSSRT tssrt ts srt)
+if(SRT_LIB)
+ makeOutput(TSSRT tssrt ts with_srt)
+endif()
makeOutput(HTTPTS httpts http ts)
makeOutput(HLS hls http ts)
makeOutput(CMAF cmaf http)#LTS
makeOutput(EBML ebml)
makeOutput(RTSP rtsp)#LTS
makeOutput(WAV wav)#LTS
-makeOutput(WebRTC webrtc http)#LTS
add_executable(MistProcFFMPEG
${BINARY_DIR}/mist/.headers
@@ -545,11 +588,13 @@ add_executable(MistProcLivepeer
)
target_link_libraries(MistProcLivepeer mist)
-if (NOT DEFINED NOSSL )
+if (NOT NOSSL)
makeOutput(HTTPS https)#LTS
+ makeOutput(WebRTC webrtc http)#LTS
endif()
-if (DEFINED WITH_SANITY )
+option(WITH_SANITY "Enable MistOutSanityCheck output for testing purposes")
+if (WITH_SANITY)
makeOutput(SanityCheck sanitycheck)#LTS
endif()
@@ -739,7 +784,8 @@ set(lspSOURCES
)
-if (NOT DEFINED NOGA )
+option(NOGA "Disables Google Analytics entirely in the LSP")
+if (NOT NOGA)
list(APPEND lspSOURCES ${SOURCE_DIR}/lsp/analytics.js)
endif()
diff --git a/lib/config.cpp b/lib/config.cpp
index 0a9b1ce5..d9a8470b 100644
--- a/lib/config.cpp
+++ b/lib/config.cpp
@@ -3,7 +3,6 @@
#include "config.h"
#include "defines.h"
-#include "lib/socket_srt.h"
#include "stream.h"
#include "timing.h"
#include "tinythread.h"
@@ -39,7 +38,6 @@
bool Util::Config::is_active = false;
bool Util::Config::is_restarting = false;
static Socket::Server *serv_sock_pointer = 0;
-static Socket::SRTServer *serv_srt_sock_pointer = 0; ///< Holds a pointer to SRT Server, if it is connected
uint32_t Util::printDebugLevel = DEBUG;
std::string Util::streamName;
char Util::exitReason[256] ={0};
@@ -55,13 +53,6 @@ void Util::logExitReason(const char *format, ...){
std::string Util::listenInterface;
uint32_t Util::listenPort = 0;
-// Sets pointer to the SRT Server, for proper cleanup later.
-//
-// Currently used for TSSRT Input only, as this doesn't use the config library to setup a listener
-void Util::Config::registerSRTSockPtr(Socket::SRTServer *ptr){
- serv_srt_sock_pointer = ptr;
-}
-
Util::Config::Config(){
// global options here
vals["debug"]["long"] = "debug";
@@ -331,23 +322,6 @@ struct callbackData{
int (*cb)(Socket::Connection &);
};
-// As above, but using an SRT Connection
-struct callbackSRTData{
- Socket::SRTConnection *sock;
- int (*cb)(Socket::SRTConnection &);
-};
-
-// Callback for SRT-serving threads
-static void callThreadCallbackSRT(void *cDataArg){
- INSANE_MSG("Thread for %p started", cDataArg);
- callbackSRTData *cData = (callbackSRTData *)cDataArg;
- cData->cb(*(cData->sock));
- cData->sock->close();
- delete cData->sock;
- delete cData;
- INSANE_MSG("Thread for %p ended", cDataArg);
-}
-
static void callThreadCallback(void *cDataArg){
INSANE_MSG("Thread for %p started", cDataArg);
callbackData *cData = (callbackData *)cDataArg;
@@ -430,53 +404,6 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){
return r;
}
-// This is a THREADED server!! Fork does not work as the SRT library itself already starts up a
-// thread, and forking after thread creation messes up all control flow internal to the library.
-int Util::Config::serveSRTSocket(int (*callback)(Socket::SRTConnection &S)){
- Socket::SRTServer server_socket;
- if (vals.isMember("port") && vals.isMember("interface")){
- server_socket = Socket::SRTServer(getInteger("port"), getString("interface"), false, "output");
- }
- if (!server_socket.connected()){
- DEVEL_MSG("Failure to open socket");
- return 1;
- }
- serv_srt_sock_pointer = &server_socket;
- activate();
- if (server_socket.getSocket()){
- int oldSock = server_socket.getSocket();
- if (!dup2(oldSock, 0)){
- server_socket = Socket::SRTServer(0);
- close(oldSock);
- }
- }
- int r = SRTServer(server_socket, callback);
- serv_srt_sock_pointer = 0;
- return r;
-}
-
-int Util::Config::SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &)){
- Util::Procs::socketList.insert(server_socket.getSocket());
- while (is_active && server_socket.connected()){
- Socket::SRTConnection S = server_socket.accept(false, "output");
- if (S.connected()){// check if the new connection is valid
- callbackSRTData *cData = new callbackSRTData;
- cData->sock = new Socket::SRTConnection(S);
- cData->cb = callback;
- // spawn a new thread for this connection
- tthread::thread T(callThreadCallbackSRT, (void *)cData);
- // detach it, no need to keep track of it anymore
- T.detach();
- HIGH_MSG("Spawned new thread for socket %i", S.getSocket());
- }else{
- Util::sleep(10); // sleep 10ms
- }
- }
- Util::Procs::socketList.erase(server_socket.getSocket());
- if (!is_restarting){server_socket.close();}
- return 0;
-}
-
int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){
Socket::Server server_socket;
if (Socket::checkTrueSocket(0)){
@@ -541,8 +468,6 @@ void Util::Config::signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
case SIGHUP:
case SIGTERM:
if (serv_sock_pointer){serv_sock_pointer->close();}
- // Close the srt server as well, if set
- if (serv_srt_sock_pointer){serv_srt_sock_pointer->close();}
#if DEBUG >= DLVL_DEVEL
static int ctr = 0;
if (!is_active && ++ctr > 4){BACKTRACE;}
diff --git a/lib/config.h b/lib/config.h
index af1e2834..f09eb3ee 100644
--- a/lib/config.h
+++ b/lib/config.h
@@ -8,7 +8,6 @@
#endif
#include "json.h"
-#include "socket_srt.h"
#include <signal.h>
#include <string>
@@ -42,7 +41,6 @@ namespace Util{
int64_t getInteger(std::string optname);
bool getBool(std::string optname);
void activate();
- void registerSRTSockPtr(Socket::SRTServer *ptr);
int threadServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S));
int forkServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S));
int serveThreadedSocket(int (*callback)(Socket::Connection &S));
@@ -51,9 +49,6 @@ namespace Util{
void addOptionsFromCapabilities(const JSON::Value &capabilities);
void addBasicConnectorOptions(JSON::Value &capabilities);
void addConnectorOptions(int port, JSON::Value &capabilities);
-
- int serveSRTSocket(int (*callback)(Socket::SRTConnection &S));
- int SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &S));
};
/// The interface address the current serveSocket function is listening on
diff --git a/lib/socket_srt.cpp b/lib/socket_srt.cpp
index fae3cdad..8562089f 100644
--- a/lib/socket_srt.cpp
+++ b/lib/socket_srt.cpp
@@ -1,6 +1,8 @@
#include "defines.h"
#include "lib/http_parser.h"
#include "socket_srt.h"
+#include "json.h"
+#include "timing.h"
#include <cstdlib>
#include <sstream>
@@ -69,13 +71,20 @@ namespace Socket{
return interpretSRTMode(params.count("mode") ? params.at("mode") : "default", u.host, "");
}
- SRTConnection::SRTConnection(){initializeEmpty();}
+ SRTConnection::SRTConnection(){
+ initializeEmpty();
+ lastGood = Util::bootMS();
+ }
SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction,
const std::map<std::string, std::string> &_params){
connect(_host, _port, _direction, _params);
}
+ SRTConnection::SRTConnection(SRTSOCKET alreadyConnected){
+ sock = alreadyConnected;
+ }
+
std::string SRTConnection::getStreamName(){
int sNameLen = 512;
char sName[sNameLen];
@@ -84,29 +93,89 @@ namespace Socket{
return "";
}
- /// Updates the downbuffer internal variable.
- /// Returns true if new data was received, false otherwise.
- std::string SRTConnection::RecvNow(){
- char recvbuf[5000];
+ std::string SRTConnection::getBinHost(){
+ char tmpBuffer[17] = "\000\000\000\000\000\000\000\000\000\000\377\377\000\000\000\000";
+ switch (remoteaddr.sin6_family){
+ case AF_INET:
+ memcpy(tmpBuffer + 12, &(reinterpret_cast<const sockaddr_in *>(&remoteaddr)->sin_addr.s_addr), 4);
+ break;
+ case AF_INET6: memcpy(tmpBuffer, &(remoteaddr.sin6_addr.s6_addr), 16); break;
+ default: return ""; break;
+ }
+ return std::string(tmpBuffer, 16);
+ }
+
+ size_t SRTConnection::RecvNow(){
bool blockState = blocking;
- setBlocking(true);
+ if (!blockState){setBlocking(true);}
SRT_MSGCTRL mc = srt_msgctrl_default;
int32_t receivedBytes = srt_recvmsg2(sock, recvbuf, 5000, &mc);
- if (prev_pktseq != 0 && (mc.pktseq - prev_pktseq > 1)){WARN_MSG("Packet lost");}
+ //if (prev_pktseq != 0 && (mc.pktseq - prev_pktseq > 1)){WARN_MSG("Packet lost");}
prev_pktseq = mc.pktseq;
- setBlocking(blockState);
+ if (!blockState){setBlocking(blockState);}
if (receivedBytes == -1){
+ int err = srt_getlasterror(0);
+ if (err == SRT_ECONNLOST){
+ close();
+ return 0;
+ }
+ if (err == SRT_ENOCONN){
+ if (Util::bootMS() > lastGood + 5000){
+ ERROR_MSG("SRT connection timed out - closing");
+ close();
+ }
+ return 0;
+ }
ERROR_MSG("Unable to receive data over socket: %s", srt_getlasterror_str());
if (srt_getsockstate(sock) != SRTS_CONNECTED){close();}
- return "";
+ return 0;
+ }
+ if (receivedBytes == 0){
+ close();
+ }else{
+ lastGood = Util::bootMS();
}
srt_bstats(sock, &performanceMonitor, false);
- return std::string(recvbuf, receivedBytes);
+ return receivedBytes;
+ }
+
+ ///Attempts a read, obeying the current blocking setting.
+ ///May result in socket being disconnected when connection was lost during read.
+ ///Returns amount of bytes actually read
+ size_t SRTConnection::Recv(){
+ SRT_MSGCTRL mc = srt_msgctrl_default;
+ int32_t receivedBytes = srt_recvmsg2(sock, recvbuf, 5000, &mc);
+ prev_pktseq = mc.pktseq;
+ if (receivedBytes == -1){
+ int err = srt_getlasterror(0);
+ if (err == SRT_EASYNCRCV){return 0;}
+ if (err == SRT_ECONNLOST){
+ close();
+ return 0;
+ }
+ if (err == SRT_ENOCONN){
+ if (Util::bootMS() > lastGood + 5000){
+ ERROR_MSG("SRT connection timed out - closing");
+ close();
+ }
+ return 0;
+ }
+ ERROR_MSG("Unable to receive data over socket: %s", srt_getlasterror_str());
+ if (srt_getsockstate(sock) != SRTS_CONNECTED){close();}
+ return 0;
+ }
+ if (receivedBytes == 0){
+ close();
+ }else{
+ lastGood = Util::bootMS();
+ }
+ srt_bstats(sock, &performanceMonitor, false);
+ return receivedBytes;
}
void SRTConnection::connect(const std::string &_host, int _port, const std::string &_direction,
@@ -143,6 +212,7 @@ namespace Socket{
HIGH_MSG("Going to connect sock %d", sock);
if (srt_connect(sock, psa, sizeof sa) == SRT_ERROR){
srt_close(sock);
+ sock = -1;
ERROR_MSG("Can't connect SRT Socket");
return;
}
@@ -153,6 +223,7 @@ namespace Socket{
return;
}
INFO_MSG("Caller SRT socket %" PRId32 " success targetting %s:%u", sock, _host.c_str(), _port);
+ lastGood = Util::bootMS();
return;
}
if (modeName == "listener"){
@@ -163,14 +234,17 @@ namespace Socket{
if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){
srt_close(sock);
- ERROR_MSG("Can't connect SRT Socket");
+ sock = -1;
+ ERROR_MSG("Can't connect SRT Socket: %s", srt_getlasterror_str());
return;
}
if (srt_listen(sock, 1) == SRT_ERROR){
srt_close(sock);
+ sock = -1;
ERROR_MSG("Can not listen on Socket");
}
INFO_MSG("Listener SRT socket sucess @ %s:%u", _host.c_str(), _port);
+ lastGood = Util::bootMS();
return;
}
if (modeName == "rendezvous"){
@@ -182,6 +256,7 @@ namespace Socket{
if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){
srt_close(sock);
+ sock = -1;
ERROR_MSG("Can't connect SRT Socket");
return;
}
@@ -191,6 +266,7 @@ namespace Socket{
if (srt_connect(sock, psb, sizeof sb) == SRT_ERROR){
srt_close(sock);
+ sock = -1;
ERROR_MSG("Can't connect SRT Socket");
return;
}
@@ -200,6 +276,7 @@ namespace Socket{
return;
}
INFO_MSG("Rendezvous SRT socket sucess @ %s:%u", _host.c_str(), _port);
+ lastGood = Util::bootMS();
return;
}
ERROR_MSG("Invalid mode parameter. Use 'client' or 'server'");
@@ -220,8 +297,23 @@ namespace Socket{
int res = srt_sendmsg2(sock, data, len, NULL);
if (res == SRT_ERROR){
+ int err = srt_getlasterror(0);
+ //Do not report normal connection lost errors
+ if (err == SRT_ECONNLOST){
+ close();
+ return;
+ }
+ if (err == SRT_ENOCONN){
+ if (Util::bootMS() > lastGood + 5000){
+ ERROR_MSG("SRT connection timed out - closing");
+ close();
+ }
+ return;
+ }
ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str());
if (srt_getsockstate(sock) != SRTS_CONNECTED){close();}
+ }else{
+ lastGood = Util::bootMS();
}
srt_bstats(sock, &performanceMonitor, false);
}
@@ -236,16 +328,16 @@ namespace Socket{
uint64_t SRTConnection::dataDown(){return performanceMonitor.byteRecvTotal;}
uint64_t SRTConnection::packetCount(){
- return (direction == "input" ? performanceMonitor.pktRecvTotal : performanceMonitor.pktSentTotal);
+ return (direction == "output" ? performanceMonitor.pktSentTotal : performanceMonitor.pktRecvTotal);
}
uint64_t SRTConnection::packetLostCount(){
- return (direction == "input" ? performanceMonitor.pktRcvLossTotal : performanceMonitor.pktSndLossTotal);
+ return (direction == "output" ? performanceMonitor.pktSndLossTotal : performanceMonitor.pktRcvLossTotal);
}
uint64_t SRTConnection::packetRetransmitCount(){
//\todo This should be updated with pktRcvRetransTotal on the retrieving end once srt has implemented this.
- return (direction == "input" ? 0 : performanceMonitor.pktRetransTotal);
+ return (direction == "output" ? performanceMonitor.pktRetransTotal : 0);
}
void SRTConnection::initializeEmpty(){
@@ -259,10 +351,8 @@ namespace Socket{
void SRTConnection::setBlocking(bool _blocking){
if (_blocking == blocking){return;}
// If we have an error setting the new blocking state, the state is unchanged so we return early.
- if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &_blocking,
- sizeof _blocking) == -1){
- return;
- }
+ if (srt_setsockopt(sock, 0, SRTO_SNDSYN, &_blocking, sizeof _blocking) == -1){return;}
+ if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &_blocking, sizeof _blocking) == -1){return;}
blocking = _blocking;
}
@@ -289,7 +379,7 @@ namespace Socket{
if (adapter == "" && modeName == "listener"){adapter = _host;}
- tsbpdMode = ((params.count("tsbpd") && isFalseString(params.at("tsbpd"))) ? false : true);
+ tsbpdMode = (params.count("tsbpd") && JSON::Value(params.at("tsbpd")).asBool());
outgoing_port = (params.count("port") ? strtol(params.at("port").c_str(), 0, 0) : 0);
@@ -332,14 +422,11 @@ namespace Socket{
int SRTConnection::postConfigureSocket(){
bool no = false;
- if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &no, sizeof no) == -1){
- return -1;
- }
+ if (srt_setsockopt(sock, 0, SRTO_SNDSYN, &no, sizeof no) == -1){return -1;}
+ if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no) == -1){return -1;}
if (timeout){
- if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDTIMEO : SRTO_RCVTIMEO), &timeout,
- sizeof timeout) == -1){
- return -1;
- }
+ if (srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &timeout, sizeof timeout) == -1){return -1;}
+ if (srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &timeout, sizeof timeout) == -1){return -1;}
}
std::string errMsg = configureSocketLoop(SRT::SockOpt::POST);
if (errMsg.size()){
@@ -455,15 +542,7 @@ namespace Socket{
}
}break;
case SRT::SockOpt::BOOL:{
- bool tmp;
- if (isFalseString(v)){
- tmp = true;
- }else if (isTrueString(v)){
- tmp = true;
- }else{
- return false;
- }
- val.b = tmp;
+ val.b = JSON::Value(v).asBool();
val.value = &val.b;
val.size = sizeof val.b;
}break;
diff --git a/lib/socket_srt.h b/lib/socket_srt.h
index 7995d938..5101d38d 100644
--- a/lib/socket_srt.h
+++ b/lib/socket_srt.h
@@ -1,11 +1,8 @@
#pragma once
-
#include "socket.h"
#include "url.h"
-
#include <map>
#include <string>
-
#include <srt/srt.h>
typedef std::map<std::string, int> SockOptVals;
@@ -13,15 +10,6 @@ typedef std::map<std::string, std::string> paramList;
namespace Socket{
std::string interpretSRTMode(const HTTP::URL &u);
-
- inline bool isFalseString(const std::string &_val){
- return _val == "0" || _val == "no" || _val == "off" || _val == "false";
- }
-
- inline bool isTrueString(const std::string &_val){
- return _val == "1" || _val == "yes" || _val == "on" || _val == "true";
- }
-
sockaddr_in createInetAddr(const std::string &_host, int _port);
namespace SRT{
@@ -39,7 +27,7 @@ namespace Socket{
class SRTConnection{
public:
SRTConnection();
- SRTConnection(SRTSOCKET alreadyConnected){sock = alreadyConnected;}
+ SRTConnection(SRTSOCKET alreadyConnected);
SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input",
const paramList &_params = paramList());
@@ -52,7 +40,10 @@ namespace Socket{
void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false).
bool isBlocking(); ///< Check if this socket is blocking (true) or nonblocking (false).
- std::string RecvNow();
+ size_t RecvNow();
+ size_t Recv();
+ char recvbuf[5000]; ///< Buffer where received data is stored in
+
void SendNow(const std::string &data);
void SendNow(const char *data, size_t len);
@@ -73,7 +64,7 @@ namespace Socket{
struct sockaddr_in6 remoteaddr;
std::string remotehost;
-
+ std::string getBinHost();
private:
SRTSOCKET sock;
CBytePerfMon performanceMonitor;
@@ -81,10 +72,11 @@ namespace Socket{
std::string host;
int outgoing_port;
int32_t prev_pktseq;
+ uint64_t lastGood;
uint32_t chunkTransmitSize;
- // From paramaeter parsing
+ // From parameter parsing
std::string adapter;
std::string modeName;
int timeout;
@@ -100,7 +92,7 @@ namespace Socket{
bool blocking;
};
- /// This class is for easily setting up listening socket, either TCP or Unix.
+ /// This class is for easily setting up a listening SRT socket
class SRTServer{
public:
SRTServer();
@@ -130,7 +122,6 @@ namespace Socket{
class SocketOption{
public:
- //{"enforcedencryption", 0, SRTO_ENFORCEDENCRYPTION, SRT::SockOpt::PRE, SRT::SockOpt::BOOL, nullptr},
SocketOption(const std::string &_name, int _protocol, int _symbol, SRT::SockOpt::Binding _binding,
SRT::SockOpt::Type _type, const SockOptVals &_values = SockOptVals())
: name(_name), protocol(_protocol), symbol(_symbol), binding(_binding), type(_type),
@@ -142,11 +133,8 @@ namespace Socket{
SRT::SockOpt::Binding binding;
SRT::SockOpt::Type type;
SockOptVals valmap;
-
bool apply(int socket, const std::string &value, bool isSrtOpt = true);
-
static int setSo(int socket, int protocol, int symbol, const void *data, size_t size, bool isSrtOpt = true);
-
bool extract(const std::string &v, OptionValue &val, SRT::SockOpt::Type asType);
};
diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp
index 90d45957..af37c63e 100644
--- a/lib/ts_stream.cpp
+++ b/lib/ts_stream.cpp
@@ -15,6 +15,51 @@ tthread::recursive_mutex tMutex;
namespace TS{
+ bool Assembler::assemble(Stream & TSStrm, char * ptr, size_t len){
+ bool ret = false;
+ size_t offset = 0;
+ size_t amount = 188-leftData.size();
+ if (leftData.size() && len >= amount){
+ //Attempt to re-assemble a packet from the leftovers of last time + current head
+ if (len == amount || ptr[amount] == 0x47){
+ VERYHIGH_MSG("Assembled scrap packet");
+ //Success!
+ leftData.append(ptr, amount);
+ tsBuf.FromPointer(leftData);
+ TSStrm.add(tsBuf);
+ ret = true;
+ if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
+ offset = amount;
+ leftData.assign(0,0);
+ }
+ //On failure, hope we might live to succeed another day
+ }
+ // Try to read full TS Packets
+ // Watch out! We push here to a global, in order for threads to be able to access it.
+ size_t junk = 0;
+ while (offset < len){
+ if (ptr[offset] == 0x47 && (offset+188 >= len || ptr[offset+188] == 0x47)){// check for sync byte
+ if (junk){
+ INFO_MSG("%zu bytes of non-sync-byte data received", junk);
+ junk = 0;
+ }
+ if (offset + 188 <= len){
+ tsBuf.FromPointer(ptr + offset);
+ TSStrm.add(tsBuf);
+ if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
+ ret = true;
+ }else{
+ leftData.assign(ptr + offset, len - offset);
+ }
+ offset += 188;
+ }else{
+ ++junk;
+ ++offset;
+ }
+ }
+ return ret;
+ }
+
void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, uint32_t avail, uint64_t bPos){
if (!p.getCompleteSize()){return;}
diff --git a/lib/ts_stream.h b/lib/ts_stream.h
index bdefb3c7..6ffaf024 100644
--- a/lib/ts_stream.h
+++ b/lib/ts_stream.h
@@ -106,4 +106,13 @@ namespace TS{
void parsePES(size_t tid, bool finished = false);
};
+
+ class Assembler{
+ public:
+ bool assemble(Stream & TSStrm, char * ptr, size_t len);
+ private:
+ Util::ResizeablePointer leftData;
+ TS::Packet tsBuf;
+ };
+
}// namespace TS
diff --git a/src/input/input.cpp b/src/input/input.cpp
index 28055928..d39e5dbf 100644
--- a/src/input/input.cpp
+++ b/src/input/input.cpp
@@ -893,18 +893,21 @@ namespace Mist{
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
- statComm.setUp(0);
- statComm.setDown(streamByteCount());
statComm.setTime(now - startTime);
statComm.setLastSecond(0);
- statComm.setHost(getConnectedBinHost());
- handleLossyStats(statComm);
+ connStats(statComm);
}
statTimer = Util::bootSecs();
}
}
}
+
+ void Input::connStats(Comms::Statistics &statComm){
+ statComm.setUp(0);
+ statComm.setDown(streamByteCount());
+ statComm.setHost(getConnectedBinHost());
+ }
void Input::realtimeMainLoop(){
uint64_t statTimer = 0;
diff --git a/src/input/input.h b/src/input/input.h
index d8aa7928..0e33c254 100644
--- a/src/input/input.h
+++ b/src/input/input.h
@@ -69,11 +69,7 @@ namespace Mist{
virtual void userOnActive(size_t id);
virtual void userOnDisconnect(size_t id);
virtual void userLeadOut();
-
- virtual void handleLossyStats(Comms::Statistics & statComm){}
-
- virtual bool preventBufferStart() {return false;}
-
+ virtual void connStats(Comms::Statistics & statComm);
virtual void parseHeader();
bool bufferFrame(size_t track, uint32_t keyNum);
diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp
index 28ccc825..c9959142 100644
--- a/src/input/input_ts.cpp
+++ b/src/input/input_ts.cpp
@@ -191,6 +191,7 @@ namespace Mist{
inputProcess = 0;
isFinished = false;
+#ifndef WITH_SRT
{
pid_t srt_tx = -1;
const char *args[] ={"srt-live-transmit", 0};
@@ -199,12 +200,13 @@ namespace Mist{
capa["source_match"].append("srt://*");
capa["always_match"].append("srt://*");
capa["desc"] =
- capa["desc"].asStringRef() + " SRT support (srt://*) is installed and available.";
+ capa["desc"].asStringRef() + " Non-native SRT support (srt://*) is installed and available.";
}else{
capa["desc"] = capa["desc"].asStringRef() +
- " To enable SRT support, please install the srt-live-transmit binary.";
+ " To enable non-native SRT support, please install the srt-live-transmit binary.";
}
}
+#endif
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
capa["optional"]["DVR"]["help"] =
@@ -534,43 +536,7 @@ namespace Mist{
gettingData = true;
INFO_MSG("Now receiving UDP data...");
}
- size_t offset = 0;
- size_t amount = 188-leftData.size();
- if (leftData.size() && udpCon.data.size() >= amount){
- //Attempt to re-assemble a packet from the leftovers of last time + current head
- if (udpCon.data.size() == amount || udpCon.data[amount] == 0x47){
- VERYHIGH_MSG("Assembled scrap packet");
- //Success!
- leftData.append(udpCon.data, amount);
- liveStream.add(leftData);
- if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
- offset = amount;
- leftData.assign(0,0);
- }
- //On failure, hope we might live to succeed another day
- }
- // Try to read full TS Packets
- // Watch out! We push here to a global, in order for threads to be able to access it.
- size_t junk = 0;
- while (offset < udpCon.data.size()){
- if (udpCon.data[offset] == 0x47 && (offset+188 >= udpCon.data.size() || udpCon.data[offset+188] == 0x47)){// check for sync byte
- if (junk){
- INFO_MSG("%zu bytes of non-sync-byte data received", junk);
- junk = 0;
- }
- if (offset + 188 <= udpCon.data.size()){
- tsBuf.FromPointer(udpCon.data + offset);
- liveStream.add(tsBuf);
- if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
- }else{
- leftData.assign(udpCon.data + offset, udpCon.data.size() - offset);
- }
- offset += 188;
- }else{
- ++junk;
- ++offset;
- }
- }
+ assembler.assemble(liveStream, udpCon.data, udpCon.data.size());
}
if (!received){
Util::sleep(100);
diff --git a/src/input/input_ts.h b/src/input/input_ts.h
index af578c87..e810ddb4 100644
--- a/src/input/input_ts.h
+++ b/src/input/input_ts.h
@@ -33,7 +33,7 @@ namespace Mist{
void streamMainLoop();
void finish();
FILE *inFile; ///< The input file with ts data
- Util::ResizeablePointer leftData;
+ TS::Assembler assembler;
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
Socket::UDPConnection udpCon;
Socket::Connection tcpCon;
diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp
index ab42c036..f0bb48dc 100644
--- a/src/input/input_tssrt.cpp
+++ b/src/input/input_tssrt.cpp
@@ -24,9 +24,17 @@
Util::Config *cfgPointer = NULL;
std::string baseStreamName;
+Socket::SRTServer sSock;
+
+void (*oldSignal)(int, siginfo_t *,void *) = 0;
+
+void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
+ sSock.close();
+ if (oldSignal){
+ oldSignal(signum, sigInfo, ignore);
+ }
+}
-/// Global, so that all tracks stay in sync
-int64_t timeStampOffset = 0;
// We use threads here for multiple input pushes, because of the internals of the SRT Library
static void callThreadCallbackSRT(void *socknum){
@@ -54,8 +62,10 @@ namespace Mist{
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("MPEG2");
capa["codecs"][0u][1u].append("AAC");
+ capa["codecs"][0u][1u].append("MP3");
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
+ capa["codecs"][0u][1u].append("opus");
JSON::Value option;
option["arg"] = "integer";
@@ -64,6 +74,7 @@ namespace Mist{
option["help"] = "DVR buffer time in ms";
option["value"].append(50000);
config->addOption("bufferTime", option);
+ option.null();
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
capa["optional"]["DVR"]["help"] =
"The target available buffer time for this live stream, in milliseconds. This is the time "
@@ -73,13 +84,44 @@ namespace Mist{
capa["optional"]["DVR"]["type"] = "uint";
capa["optional"]["DVR"]["default"] = 50000;
+ option["arg"] = "integer";
+ option["long"] = "acceptable";
+ option["short"] = "T";
+ option["help"] = "Acceptable pushed streamids (0 = use streamid as wildcard, 1 = ignore all streamids, 2 = disallow non-matching streamids)";
+ option["value"].append(0);
+ config->addOption("acceptable", option);
+ capa["optional"]["acceptable"]["name"] = "Acceptable pushed streamids";
+ capa["optional"]["acceptable"]["help"] = "What to do with the streamids for incoming pushes, if this is a listener SRT connection";
+ capa["optional"]["acceptable"]["option"] = "--acceptable";
+ capa["optional"]["acceptable"]["short"] = "T";
+ capa["optional"]["acceptable"]["default"] = 0;
+ capa["optional"]["acceptable"]["type"] = "select";
+ capa["optional"]["acceptable"]["select"][0u][0u] = 0;
+ capa["optional"]["acceptable"]["select"][0u][1u] = "Set streamid as wildcard";
+ capa["optional"]["acceptable"]["select"][1u][0u] = 1;
+ capa["optional"]["acceptable"]["select"][1u][1u] = "Ignore all streamids";
+ capa["optional"]["acceptable"]["select"][2u][0u] = 2;
+ capa["optional"]["acceptable"]["select"][2u][1u] = "Disallow non-matching streamid";
+
+
// Setup if we are called form with a thread for push-based input.
if (s != -1){
srtConn = Socket::SRTConnection(s);
streamName = baseStreamName;
- if (srtConn.getStreamName() != ""){streamName += "+" + srtConn.getStreamName();}
+ std::string streamid = srtConn.getStreamName();
+ int64_t acc = config->getInteger("acceptable");
+ if (acc == 0){
+ if (streamid.size()){streamName += "+" + streamid;}
+ }else if(acc == 2){
+ if (streamName != streamid){
+ FAIL_MSG("Stream ID '%s' does not match stream name, push blocked", streamid.c_str());
+ srtConn.close();
+ }
+ }
+ Util::setStreamName(streamName);
}
lastTimeStamp = 0;
+ timeStampOffset = 0;
singularFlag = true;
}
@@ -96,9 +138,27 @@ namespace Mist{
INFO_MSG("Parsed url: %s", u.getUrl().c_str());
if (Socket::interpretSRTMode(u) == "listener"){
sSock = Socket::SRTServer(u.getPort(), u.host, false);
- config->registerSRTSockPtr(&sSock);
+ struct sigaction new_action;
+ struct sigaction cur_action;
+ new_action.sa_sigaction = signal_handler;
+ sigemptyset(&new_action.sa_mask);
+ new_action.sa_flags = SA_SIGINFO;
+ sigaction(SIGINT, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ sigaction(SIGHUP, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ sigaction(SIGTERM, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
}else{
- INFO_MSG("A");
std::map<std::string, std::string> arguments;
HTTP::parseVars(u.args, arguments);
size_t connectCnt = 0;
@@ -117,34 +177,12 @@ namespace Mist{
void inputTSSRT::getNext(size_t idx){
thisPacket.null();
bool hasPacket = tsStream.hasPacket();
- bool firstloop = true;
- while (!hasPacket && srtConn.connected() && config->is_active){
- firstloop = false;
- // Receive data from the socket. SRT Sockets handle some internal timing as well, based on the provided settings.
- leftBuffer.append(srtConn.RecvNow());
- if (leftBuffer.size()){
- size_t offset = 0;
- size_t garbage = 0;
- while ((offset + 188) < leftBuffer.size()){
- if (leftBuffer[offset] != 0x47){
- ++garbage;
- if (garbage % 100 == 0){INFO_MSG("Accumulated %zu bytes of garbage", garbage);}
- ++offset;
- continue;
- }
- if (garbage != 0){
- WARN_MSG("Thrown away %zu bytes of garbage data", garbage);
- garbage = 0;
- }
- if (offset + 188 <= leftBuffer.size()){
- tsBuf.FromPointer(leftBuffer.data() + offset);
- tsStream.parse(tsBuf, 0);
- offset += 188;
- }
- }
- leftBuffer.erase(0, offset);
- hasPacket = tsStream.hasPacket();
- }else if (srtConn.connected()){
+ while (!hasPacket && srtConn && config->is_active){
+
+ size_t recvSize = srtConn.RecvNow();
+ if (recvSize){
+ if (assembler.assemble(tsStream, srtConn.recvbuf, recvSize, true)){hasPacket = tsStream.hasPacket();}
+ }else if (srtConn){
// This should not happen as the SRT socket is read blocking and won't return until there is
// data. But if it does, wait before retry
Util::sleep(10);
@@ -153,7 +191,12 @@ namespace Mist{
if (hasPacket){tsStream.getEarliestPacket(thisPacket);}
if (!thisPacket){
- INFO_MSG("Could not getNext TS packet!");
+ if (srtConn){
+ INFO_MSG("Could not getNext TS packet!");
+ Util::logExitReason("internal TS parser error");
+ }else{
+ Util::logExitReason("SRT connection close");
+ }
return;
}
@@ -208,7 +251,10 @@ namespace Mist{
void inputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;}
- void inputTSSRT::handleLossyStats(Comms::Statistics &statComm){
+ void inputTSSRT::connStats(Comms::Statistics &statComm){
+ statComm.setUp(srtConn.dataUp());
+ statComm.setDown(srtConn.dataDown());
+ statComm.setHost(getConnectedBinHost());
statComm.setPacketCount(srtConn.packetCount());
statComm.setPacketLostCount(srtConn.packetLostCount());
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h
index 5af13ba7..c532c041 100644
--- a/src/input/input_tssrt.h
+++ b/src/input/input_tssrt.h
@@ -8,13 +8,17 @@
#include <string>
namespace Mist{
- /// This class contains all functions needed to implement TS Input
+
class inputTSSRT : public Input{
public:
inputTSSRT(Util::Config *cfg, SRTSOCKET s = -1);
~inputTSSRT();
void setSingular(bool newSingular);
virtual bool needsLock();
+ virtual std::string getConnectedBinHost(){
+ if (srtConn){return srtConn.getBinHost();}
+ return Input::getConnectedBinHost();
+ }
protected:
// Private Functions
@@ -22,7 +26,6 @@ namespace Mist{
bool preRun();
virtual void getNext(size_t idx = INVALID_TRACK_ID);
virtual bool needHeader(){return false;}
- virtual bool preventBufferStart(){return srtConn.getSocket() == -1;}
virtual bool isSingular(){return singularFlag;}
virtual bool isThread(){return !singularFlag;}
@@ -31,17 +34,14 @@ namespace Mist{
void streamMainLoop();
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
TS::Packet tsBuf;
- std::string leftBuffer;
+ TS::Assembler assembler;
+ int64_t timeStampOffset;
uint64_t lastTimeStamp;
- Socket::SRTServer sSock;
Socket::SRTConnection srtConn;
bool singularFlag;
size_t tmpIdx;
- virtual size_t streamByteCount(){
- return srtConn.dataDown();
- }; // For live streams: to update the stats with correct values.
- virtual void handleLossyStats(Comms::Statistics &statComm);
+ virtual void connStats(Comms::Statistics &statComm);
};
}// namespace Mist
diff --git a/src/output/mist_out_srt.cpp b/src/output/mist_out_srt.cpp
index ea73cf8d..7fed8eec 100644
--- a/src/output/mist_out_srt.cpp
+++ b/src/output/mist_out_srt.cpp
@@ -5,20 +5,51 @@
#include <mist/socket_srt.h>
#include <mist/util.h>
-int spawnForked(Socket::SRTConnection &S){
- int fds[2];
- pipe(fds);
- Socket::Connection Sconn(fds[0], fds[1]);
+Socket::SRTServer server_socket;
+static uint64_t sockCount = 0;
- mistOut tmp(Sconn, S.getSocket());
- return tmp.run();
+void (*oldSignal)(int, siginfo_t *,void *) = 0;
+void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
+ server_socket.close();
+ if (oldSignal){
+ oldSignal(signum, sigInfo, ignore);
+ }
}
void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
- HIGH_MSG("USR1 received - triggering rolling restart");
- Util::Config::is_restarting = true;
- Util::logExitReason("signal USR1");
- Util::Config::is_active = false;
+ if (!sockCount){
+ INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
+ Util::Config::is_restarting = true;
+ Util::logExitReason("signal USR1, no connections");
+ server_socket.close();
+ Util::Config::is_active = false;
+ }else{
+ INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
+ Util::Config::is_restarting = true;
+ Util::logExitReason("signal USR1, after disconnect wait");
+ }
+}
+
+// Callback for SRT-serving threads
+static void callThreadCallbackSRT(void *srtPtr){
+ sockCount++;
+ Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr;
+ int fds[2];
+ pipe(fds);
+ Socket::Connection Sconn(fds[0], fds[1]);
+ HIGH_MSG("Started thread for socket %i", srtSock.getSocket());
+ mistOut tmp(Sconn,srtSock);
+ tmp.run();
+ HIGH_MSG("Closing thread for socket %i", srtSock.getSocket());
+ Sconn.close();
+ srtSock.close();
+ delete &srtSock;
+ sockCount--;
+ if (!sockCount && Util::Config::is_restarting){
+ server_socket.close();
+ Util::Config::is_active = false;
+ INFO_MSG("Last active connection closed; triggering rolling restart now!");
+ }
}
int main(int argc, char *argv[]){
@@ -41,15 +72,56 @@ int main(int argc, char *argv[]){
new_action.sa_flags = 0;
sigaction(SIGUSR1, &new_action, NULL);
}
- mistOut::listener(conf, spawnForked);
- if (conf.is_restarting && Socket::checkTrueSocket(0)){
- INFO_MSG("Reloading input while re-using server socket");
+ if (conf.getInteger("port") && conf.getString("interface").size()){
+ server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output");
+ }
+ if (!server_socket.connected()){
+ DEVEL_MSG("Failure to open socket");
+ return 1;
+ }
+ struct sigaction new_action;
+ struct sigaction cur_action;
+ new_action.sa_sigaction = signal_handler;
+ sigemptyset(&new_action.sa_mask);
+ new_action.sa_flags = SA_SIGINFO;
+ sigaction(SIGINT, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ sigaction(SIGHUP, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ sigaction(SIGTERM, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ Util::Procs::socketList.insert(server_socket.getSocket());
+ while (conf.is_active && server_socket.connected()){
+ Socket::SRTConnection S = server_socket.accept(false, "output");
+ if (S.connected()){// check if the new connection is valid
+ // spawn a new thread for this connection
+ tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S));
+ // detach it, no need to keep track of it anymore
+ T.detach();
+ }else{
+ Util::sleep(10); // sleep 10ms
+ }
+ }
+ Util::Procs::socketList.erase(server_socket.getSocket());
+ server_socket.close();
+ if (conf.is_restarting){
+ INFO_MSG("Reloading input...");
execvp(argv[0], argv);
FAIL_MSG("Error reloading: %s", strerror(errno));
}
}else{
Socket::Connection S(fileno(stdout), fileno(stdin));
- mistOut tmp(S, -1);
+ Socket::SRTConnection tmpSock;
+ mistOut tmp(S, tmpSock);
return tmp.run();
}
}
diff --git a/src/output/output.cpp b/src/output/output.cpp
index c8db21e0..bcdd97c0 100644
--- a/src/output/output.cpp
+++ b/src/output/output.cpp
@@ -1686,9 +1686,7 @@ namespace Mist{
statComm.setCRC(crc);
statComm.setStream(streamName);
statComm.setConnector(getStatsName());
- statComm.setUp(myConn.dataUp());
- statComm.setDown(myConn.dataDown());
- statComm.setTime(now - myConn.connTime());
+ connStats(now, statComm);
statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0);
statComm.setPid(getpid());
@@ -1722,6 +1720,12 @@ namespace Mist{
}
}
+ void Output::connStats(uint64_t now, Comms::Statistics &statComm){
+ statComm.setUp(myConn.dataUp());
+ statComm.setDown(myConn.dataDown());
+ statComm.setTime(now - myConn.connTime());
+ }
+
bool Output::dropPushTrack(uint32_t trackId, const std::string & dropReason){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (it->second.getTrack() == trackId){
diff --git a/src/output/output.h b/src/output/output.h
index 060634e3..0873278a 100644
--- a/src/output/output.h
+++ b/src/output/output.h
@@ -64,7 +64,7 @@ namespace Mist{
/// This function is called whenever a packet is ready for sending.
/// Inside it, thisPacket is guaranteed to contain a valid packet.
virtual void sendNext(){}// REQUIRED! Others are optional.
- virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason);
+ virtual bool dropPushTrack(uint32_t trackId, const std::string &dropReason);
bool getKeyFrame();
bool prepareNext();
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true);
@@ -103,10 +103,10 @@ namespace Mist{
uint64_t lastStats; ///< Time of last sending of stats.
std::set<sortedPageInfo> buffer; ///< A sorted list of next-to-be-loaded packets.
- bool sought; ///< If a seek has been done, this is set to true. Used for seeking on
- ///< prepareNext().
+ bool sought; ///< If a seek has been done, this is set to true. Used for seeking on
+ ///< prepareNext().
std::string prevHost; ///< Old value for getConnectedBinHost, for caching
- protected: // these are to be messed with by child classes
+ protected: // these are to be messed with by child classes
virtual bool inlineRestartCapable() const{
return false;
}///< True if the output is capable of restarting mid-stream. This is used for swapping recording files
@@ -122,6 +122,8 @@ namespace Mist{
virtual std::string getStatsName();
virtual bool hasSessionIDs(){return false;}
+ virtual void connStats(uint64_t now, Comms::Statistics &statComm);
+
std::set<size_t> getSupportedTracks(const std::string &type = "") const;
inline virtual bool keepGoing(){return config->is_active && myConn;}
diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp
index ddf9ee6f..1e5e051a 100644
--- a/src/output/output_httpts.cpp
+++ b/src/output/output_httpts.cpp
@@ -77,6 +77,7 @@ namespace Mist{
capa["push_urls"].append("/*.ts");
capa["push_urls"].append("ts-exec:*");
+#ifndef WITH_SRT
{
pid_t srt_tx = -1;
const char *args[] ={"srt-live-transmit", 0};
@@ -84,13 +85,14 @@ namespace Mist{
if (srt_tx > 1){
capa["push_urls"].append("srt://*");
capa["desc"] = capa["desc"].asStringRef() +
- ". SRT push output support (srt://*) is installed and available.";
+ ". Non-native SRT push output support (srt://*) is installed and available.";
}else{
capa["desc"] =
capa["desc"].asStringRef() +
- ". To enable SRT push output support, please install the srt-live-transmit binary.";
+ ". To enable non-native SRT push output support, please install the srt-live-transmit binary.";
}
}
+#endif
JSON::Value opt;
opt["arg"] = "string";
diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp
index a18c9a4f..18a122b8 100644
--- a/src/output/output_tssrt.cpp
+++ b/src/output/output_tssrt.cpp
@@ -1,19 +1,22 @@
-#include "mist/socket_srt.h"
+#include <mist/socket_srt.h>
#include "output_tssrt.h"
#include <mist/defines.h>
#include <mist/http_parser.h>
#include <mist/url.h>
+#include <mist/encode.h>
+#include <mist/stream.h>
+#include <mist/triggers.h>
namespace Mist{
- OutTSSRT::OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock) : TSOutput(conn){
+ OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){
// NOTE: conn is useless for SRT, as it uses a different socket type.
sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec)
streamName = config->getString("streamname");
+ Util::setStreamName(streamName);
pushOut = false;
- std::string tracks;
// Push output configuration
if (config->getString("target").size()){
- HTTP::URL target(config->getString("target"));
+ target = HTTP::URL(config->getString("target"));
if (target.protocol != "srt"){
FAIL_MSG("Target %s must begin with srt://, aborting", target.getUrl().c_str());
onFail("Invalid srt target: doesn't start with srt://", true);
@@ -25,29 +28,108 @@ namespace Mist{
return;
}
pushOut = true;
- if (targetParams.count("tracks")){tracks = targetParams["tracks"];}
+ std::map<std::string, std::string> arguments;
+ HTTP::parseVars(target.args, arguments);
+ for (std::map<std::string, std::string>::iterator it = arguments.begin(); it != arguments.end(); ++it){
+ targetParams[it->first] = it->second;
+ }
size_t connectCnt = 0;
do{
- srtConn.connect(target.host, target.getPort(), "output");
- if (!srtConn){Util::sleep(1000);}
+ srtConn.connect(target.host, target.getPort(), "output", targetParams);
+ if (!srtConn){
+ Util::sleep(1000);
+ }else{
+ INFO_MSG("Connect success on attempt %zu", connectCnt+1);
+ break;
+ }
++connectCnt;
- }while (!srtConn && connectCnt < 10);
+ }while (!srtConn && connectCnt < 5);
wantRequest = false;
parseData = true;
+ initialize();
}else{
// Pull output configuration, In this case we have an srt connection in the second constructor parameter.
- srtConn = Socket::SRTConnection(_srtSock);
- parseData = true;
- wantRequest = false;
-
// Handle override / append of streamname options
std::string sName = srtConn.getStreamName();
if (sName != ""){
streamName = sName;
- HIGH_MSG("Requesting stream %s", streamName.c_str());
+ Util::sanitizeName(streamName);
+ Util::setStreamName(streamName);
}
+
+ int64_t accTypes = config->getInteger("acceptable");
+ if (accTypes == 0){//Allow both directions
+ srtConn.setBlocking(false);
+ //Try to read the socket 10 times. If any reads succeed, assume they are pushing in
+ size_t retries = 60;
+ while (!accTypes && srtConn && retries){
+ size_t recvSize = srtConn.Recv();
+ if (recvSize){
+ accTypes = 2;
+ INFO_MSG("Connection put into ingest mode");
+ assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true);
+ }else{
+ Util::sleep(50);
+ }
+ --retries;
+ }
+ //If not, assume they are receiving.
+ if (!accTypes){
+ accTypes = 1;
+ INFO_MSG("Connection put into egress mode");
+ }
+ }
+ if (accTypes == 1){// Only allow outgoing
+ srtConn.setBlocking(true);
+ srtConn.direction = "output";
+ parseData = true;
+ wantRequest = false;
+ initialize();
+ }else if (accTypes == 2){//Only allow incoming
+ srtConn.setBlocking(false);
+ srtConn.direction = "input";
+ if (Triggers::shouldTrigger("PUSH_REWRITE")){
+ HTTP::URL reqUrl;
+ reqUrl.protocol = "srt";
+ reqUrl.port = config->getString("port");
+ reqUrl.host = config->getString("interface");
+ reqUrl.args = "streamid="+Encodings::URL::encode(sName);
+ std::string payload = reqUrl.getUrl() + "\n" + getConnectedHost();
+ std::string newUrl = "";
+ Triggers::doTrigger("PUSH_REWRITE", payload, "", false, newUrl);
+ if (!newUrl.size()){
+ FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
+ getConnectedHost().c_str(), reqUrl.getUrl().c_str());
+ Util::logExitReason(
+ "Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
+ getConnectedHost().c_str(), reqUrl.getUrl().c_str());
+ onFinish();
+ return;
+ }
+ reqUrl = HTTP::URL(newUrl);
+ if (reqUrl.args.size()){
+ std::map<std::string, std::string> args;
+ HTTP::parseVars(reqUrl.args, args);
+ if (args.count("streamid")){
+ streamName = args["streamid"];
+ Util::sanitizeName(streamName);
+ Util::setStreamName(streamName);
+ }
+ }
+ }
+ myConn.setHost(srtConn.remotehost);
+ if (!allowPush("")){
+ onFinish();
+ srtConn.close();
+ return;
+ }
+ parseData = false;
+ wantRequest = true;
+ }
+
}
- initialize();
+ lastTimeStamp = 0;
+ timeStampOffset = 0;
}
OutTSSRT::~OutTSSRT(){}
@@ -58,13 +140,29 @@ namespace Mist{
capa["friendly"] = "TS over SRT";
capa["desc"] = "Real time streaming of TS data over SRT";
capa["deps"] = "";
- capa["required"]["streamname"]["name"] = "Stream";
- capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add "
- "this protocol multiple times using different ports, "
- "or use the streamid option on the srt connection";
- capa["required"]["streamname"]["type"] = "str";
- capa["required"]["streamname"]["option"] = "--stream";
- capa["required"]["streamname"]["short"] = "s";
+
+ capa["optional"]["streamname"]["name"] = "Stream";
+ capa["optional"]["streamname"]["help"] = "What streamname to serve if no streamid is given by the other end of the connection";
+ capa["optional"]["streamname"]["type"] = "str";
+ capa["optional"]["streamname"]["option"] = "--stream";
+ capa["optional"]["streamname"]["short"] = "s";
+ capa["optional"]["streamname"]["default"] = "";
+
+ capa["optional"]["acceptable"]["name"] = "Acceptable connection types";
+ capa["optional"]["acceptable"]["help"] =
+ "Whether to allow only incoming pushes (2), only outgoing pulls (1), or both (0, default)";
+ capa["optional"]["acceptable"]["option"] = "--acceptable";
+ capa["optional"]["acceptable"]["short"] = "T";
+ capa["optional"]["acceptable"]["default"] = 0;
+ capa["optional"]["acceptable"]["type"] = "select";
+ capa["optional"]["acceptable"]["select"][0u][0u] = 0;
+ capa["optional"]["acceptable"]["select"][0u][1u] =
+ "Allow both incoming and outgoing connections";
+ capa["optional"]["acceptable"]["select"][1u][0u] = 1;
+ capa["optional"]["acceptable"]["select"][1u][1u] = "Allow only outgoing connections";
+ capa["optional"]["acceptable"]["select"][2u][0u] = 2;
+ capa["optional"]["acceptable"]["select"][2u][1u] = "Allow only incoming connections";
+
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("MPEG2");
@@ -72,6 +170,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("MP3");
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
+ capa["codecs"][0u][1u].append("opus");
cfg->addConnectorOptions(8889, capa);
config = cfg;
capa["push_urls"].append("srt://*");
@@ -84,29 +183,88 @@ namespace Mist{
cfg->addOption("target", opt);
}
- // Buffer internally in the class, and send once we have over 1000 bytes of data.
+ // Buffers TS packets and sends after 7 are buffered.
void OutTSSRT::sendTS(const char *tsData, size_t len){
- if (packetBuffer.size() >= 1000){
- srtConn.SendNow(packetBuffer);
- if (!srtConn){
- // Allow for proper disconnect
- parseData = false;
- }
- packetBuffer.clear();
- }
packetBuffer.append(tsData, len);
+ if (packetBuffer.size() >= 1316){//7 whole TS packets
+ if (!srtConn){
+ if (config->getString("target").size()){
+ INFO_MSG("Reconnecting...");
+ srtConn.connect(target.host, target.getPort(), "output", targetParams);
+ if (!srtConn){Util::sleep(500);}
+ }else{
+ Util::logExitReason("SRT connection closed");
+ myConn.close();
+ parseData = false;
+ return;
+ }
+ }
+ if (srtConn){
+ srtConn.SendNow(packetBuffer, packetBuffer.size());
+ if (!srtConn){
+ if (!config->getString("target").size()){
+ Util::logExitReason("SRT connection closed");
+ myConn.close();
+ parseData = false;
+ }
+ }
+ }
+ packetBuffer.assign(0,0);
+ }
}
- bool OutTSSRT::setAlternateConnectionStats(Comms::Statistics &statComm){
+ void OutTSSRT::requestHandler(){
+ size_t recvSize = srtConn.Recv();
+ if (!recvSize){
+ if (!srtConn){
+ myConn.close();
+ wantRequest = false;
+ }else{
+ Util::sleep(50);
+ }
+ return;
+ }
+ lastRecv = Util::bootSecs();
+ if (!assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true)){return;}
+ while (tsIn.hasPacket()){
+ tsIn.getEarliestPacket(thisPacket);
+ if (!thisPacket){
+ INFO_MSG("Could not get TS packet");
+ myConn.close();
+ wantRequest = false;
+ return;
+ }
+
+ tsIn.initializeMetadata(meta);
+ size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid());
+ if (thisIdx == INVALID_TRACK_ID){return;}
+ if (!userSelect.count(thisIdx)){
+ userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
+ }
+
+ uint64_t adjustTime = thisPacket.getTime() + timeStampOffset;
+ if (lastTimeStamp || timeStampOffset){
+ if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){
+ INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.",
+ PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime));
+ timeStampOffset += (lastTimeStamp - adjustTime);
+ adjustTime = thisPacket.getTime() + timeStampOffset;
+ }
+ }
+ lastTimeStamp = adjustTime;
+ thisPacket.setTime(adjustTime);
+ bufferLivePacket(thisPacket);
+ }
+ }
+
+ void OutTSSRT::connStats(uint64_t now, Comms::Statistics &statComm){
+ if (!srtConn){return;}
statComm.setUp(srtConn.dataUp());
statComm.setDown(srtConn.dataDown());
- statComm.setTime(Util::bootSecs() - srtConn.connTime());
- return true;
- }
-
- void OutTSSRT::handleLossyStats(Comms::Statistics &statComm){
+ statComm.setTime(now - srtConn.connTime());
statComm.setPacketCount(srtConn.packetCount());
statComm.setPacketLostCount(srtConn.packetLostCount());
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
}
+
}// namespace Mist
diff --git a/src/output/output_tssrt.h b/src/output/output_tssrt.h
index 3f671dbf..1423af8d 100644
--- a/src/output/output_tssrt.h
+++ b/src/output/output_tssrt.h
@@ -1,12 +1,11 @@
#include "output_ts_base.h"
#include <mist/ts_stream.h>
-
#include <mist/socket_srt.h>
namespace Mist{
class OutTSSRT : public TSOutput{
public:
- OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock);
+ OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock);
~OutTSSRT();
static bool listenMode(){return !(config->getString("target").size());}
@@ -14,19 +13,23 @@ namespace Mist{
static void init(Util::Config *cfg);
void sendTS(const char *tsData, size_t len = 188);
bool isReadyForPlay(){return true;}
-
+ virtual void requestHandler();
protected:
- // Stats handling
- virtual bool setAlternateConnectionStats(Comms::Statistics &statComm);
- virtual void handleLossyStats(Comms::Statistics &statComm);
+ virtual void connStats(uint64_t now, Comms::Statistics &statComm);
+ virtual std::string getConnectedHost(){return srtConn.remotehost;}
+ virtual std::string getConnectedBinHost(){return srtConn.getBinHost();}
private:
+ HTTP::URL target;
+ int64_t timeStampOffset;
+ uint64_t lastTimeStamp;
bool pushOut;
- std::string packetBuffer;
+ Util::ResizeablePointer packetBuffer;
Socket::UDPConnection pushSock;
TS::Stream tsIn;
+ TS::Assembler assembler;
- Socket::SRTConnection srtConn;
+ Socket::SRTConnection & srtConn;
};
}// namespace Mist