TSSRT Support

This commit is contained in:
Phencys 2020-07-26 16:19:14 +02:00 committed by Thulinma
parent 974380ab30
commit 19199cbff8
17 changed files with 1471 additions and 15 deletions

View file

@ -187,6 +187,7 @@ set(libHeaders
lib/sdp_media.h
lib/shared_memory.h
lib/socket.h
lib/socket_srt.h
lib/srtp.h
lib/stream.h
lib/stun.h
@ -249,6 +250,7 @@ add_library (mist
lib/sdp_media.cpp
lib/shared_memory.cpp
lib/socket.cpp
lib/socket_srt.cpp
lib/srtp.cpp
lib/stream.cpp
lib/stun.cpp
@ -274,6 +276,7 @@ endif()
target_link_libraries(mist
-lpthread
${LIBRT}
-lsrt
)
if (NOT DEFINED NOSSL )
target_link_libraries(mist mbedtls mbedx509 mbedcrypto srtp2)
@ -436,6 +439,7 @@ makeInput(Balancer balancer)#LTS
makeInput(RTSP rtsp)#LTS
makeInput(SRT srt)#LTS
makeInput(TSSRT tssrt)#LTS
########################################
# MistServer - Outputs #
@ -443,12 +447,16 @@ makeInput(SRT srt)#LTS
macro(makeOutput outputName format)
#Parse all extra arguments, for http and ts flags
SET (tsBaseClass Output)
SET (outBaseFile src/output/mist_out.cpp)
if (";${ARGN};" MATCHES ";http;")
SET(httpOutput src/output/output_http.cpp)
if (";${ARGN};" MATCHES ";ts;")
SET(tsBaseClass HTTPOutput)
endif()
endif()
if (";${ARGN};" MATCHES ";srt;")
SET(outBaseFile src/output/mist_out_srt.cpp)
endif()
if (";${ARGN};" MATCHES ";ts;")
SET(tsOutput src/output/output_ts_base.cpp)
endif()
@ -456,7 +464,7 @@ macro(makeOutput outputName format)
SET(tsOutput generated/noffmpeg.h generated/noh264.h)
endif()
add_executable(MistOut${outputName}
src/output/mist_out.cpp
${outBaseFile}
src/output/output.cpp
src/output/output_${format}.cpp
src/io.cpp
@ -493,6 +501,7 @@ if (DEFINED WITH_JPG )
makeOutput(JPG jpg http jpg)
endif()
makeOutput(TS ts ts)
makeOutput(TSSRT tssrt ts srt)
makeOutput(HTTPTS httpts http ts)
makeOutput(HLS hls http ts)
makeOutput(CMAF cmaf http)#LTS

View file

@ -5,8 +5,8 @@
#include "encode.h"
#include "procs.h"
#include "timing.h"
#include <string.h>
#include <fcntl.h>
#include <string.h>
namespace Comms{
Comms::Comms(){
@ -166,6 +166,9 @@ namespace Comms{
dataAccX.addField("stream", RAX_STRING, 100);
dataAccX.addField("connector", RAX_STRING, 20);
dataAccX.addField("crc", RAX_32UINT);
dataAccX.addField("pktcount", RAX_64UINT);
dataAccX.addField("pktloss", RAX_64UINT);
dataAccX.addField("pktretrans", RAX_64UINT);
}
void Statistics::nullFields(){
@ -180,6 +183,9 @@ namespace Comms{
setTime(0);
setNow(0);
setSync(0);
setPacketCount(0);
setPacketLostCount(0);
setPacketRetransmitCount(0);
}
void Statistics::fieldAccess(){
@ -194,6 +200,9 @@ namespace Comms{
stream = dataAccX.getFieldAccX("stream");
connector = dataAccX.getFieldAccX("connector");
crc = dataAccX.getFieldAccX("crc");
pktcount = dataAccX.getFieldAccX("pktcount");
pktloss = dataAccX.getFieldAccX("pktloss");
pktretrans = dataAccX.getFieldAccX("pktretrans");
}
uint8_t Statistics::getSync() const{return sync.uint(index);}
@ -246,9 +255,7 @@ namespace Comms{
up.set(_up, idx);
}
std::string Statistics::getHost() const{
return std::string(host.ptr(index), 16);
}
std::string Statistics::getHost() const{return std::string(host.ptr(index), 16);}
std::string Statistics::getHost(size_t idx) const{
if (!master){return std::string((size_t)16, (char)'\000');}
return std::string(host.ptr(idx), 16);
@ -285,6 +292,36 @@ namespace Comms{
crc.set(_crc, idx);
}
uint64_t Statistics::getPacketCount() const{return pktcount.uint(index);}
uint64_t Statistics::getPacketCount(size_t idx) const{
return (master ? pktcount.uint(idx) : 0);
}
void Statistics::setPacketCount(uint64_t _count){pktcount.set(_count, index);}
void Statistics::setPacketCount(uint64_t _count, size_t idx){
if (!master){return;}
pktcount.set(_count, idx);
}
uint64_t Statistics::getPacketLostCount() const{return pktloss.uint(index);}
uint64_t Statistics::getPacketLostCount(size_t idx) const{
return (master ? pktloss.uint(idx) : 0);
}
void Statistics::setPacketLostCount(uint64_t _lost){pktloss.set(_lost, index);}
void Statistics::setPacketLostCount(uint64_t _lost, size_t idx){
if (!master){return;}
pktloss.set(_lost, idx);
}
uint64_t Statistics::getPacketRetransmitCount() const{return pktretrans.uint(index);}
uint64_t Statistics::getPacketRetransmitCount(size_t idx) const{
return (master ? pktretrans.uint(idx) : 0);
}
void Statistics::setPacketRetransmitCount(uint64_t _retrans){pktretrans.set(_retrans, index);}
void Statistics::setPacketRetransmitCount(uint64_t _retrans, size_t idx){
if (!master){return;}
pktretrans.set(_retrans, idx);
}
std::string Statistics::getSessId() const{return getSessId(index);}
std::string Statistics::getSessId(size_t idx) const{

View file

@ -124,6 +124,21 @@ namespace Comms{
void setCRC(uint32_t _crc);
void setCRC(uint32_t _crc, size_t idx);
uint64_t getPacketCount() const;
uint64_t getPacketCount(size_t idx) const;
void setPacketCount(uint64_t _count);
void setPacketCount(uint64_t _count, size_t idx);
uint64_t getPacketLostCount() const;
uint64_t getPacketLostCount(size_t idx) const;
void setPacketLostCount(uint64_t _lost);
void setPacketLostCount(uint64_t _lost, size_t idx);
uint64_t getPacketRetransmitCount() const;
uint64_t getPacketRetransmitCount(size_t idx) const;
void setPacketRetransmitCount(uint64_t _retransmit);
void setPacketRetransmitCount(uint64_t _retransmit, size_t idx);
std::string getSessId() const;
std::string getSessId(size_t index) const;
@ -138,6 +153,9 @@ namespace Comms{
Util::FieldAccX stream;
Util::FieldAccX connector;
Util::FieldAccX crc;
Util::FieldAccX pktcount;
Util::FieldAccX pktloss;
Util::FieldAccX pktretrans;
};
class Users : public Comms{

View file

@ -3,6 +3,7 @@
#include "config.h"
#include "defines.h"
#include "lib/socket_srt.h"
#include "stream.h"
#include "timing.h"
#include "tinythread.h"
@ -30,17 +31,18 @@
#include <iostream>
#include <pwd.h>
#include <signal.h>
#include <stdarg.h> // for va_list
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdarg.h> // for va_list
bool Util::Config::is_active = false;
bool Util::Config::is_restarting = false;
static Socket::Server *serv_sock_pointer = 0;
static Socket::SRTServer *serv_srt_sock_pointer = 0; ///< Holds a pointer to SRT Server, if it is connected
uint32_t Util::printDebugLevel = DEBUG;
std::string Util::streamName;
char Util::exitReason[256] = {0};
char Util::exitReason[256] ={0};
void Util::logExitReason(const char *format, ...){
if (exitReason[0]){return;}
@ -53,6 +55,13 @@ void Util::logExitReason(const char *format, ...){
std::string Util::listenInterface;
uint32_t Util::listenPort = 0;
// Sets pointer to the SRT Server, for proper cleanup later.
//
// Currently used for TSSRT Input only, as this doesn't use the config library to setup a listener
void Util::Config::registerSRTSockPtr(Socket::SRTServer *ptr){
serv_srt_sock_pointer = ptr;
}
Util::Config::Config(){
// global options here
vals["debug"]["long"] = "debug";
@ -202,7 +211,9 @@ bool Util::Config::parseArgs(int &argc, char **&argv){
#endif
#ifdef STAT_CUTOFF
if (STAT_CUTOFF != 600){
std::cout << "- Setting: Stats cutoff point " << STAT_CUTOFF << " seconds. Statistics and session cache are only kept for this long, as opposed to the default of 600 seconds." << std::endl;
std::cout << "- Setting: Stats cutoff point "
<< STAT_CUTOFF << " seconds. Statistics and session cache are only kept for this long, as opposed to the default of 600 seconds."
<< std::endl;
}
#endif
#ifndef SSL
@ -320,6 +331,23 @@ struct callbackData{
int (*cb)(Socket::Connection &);
};
// As above, but using an SRT Connection
struct callbackSRTData{
Socket::SRTConnection *sock;
int (*cb)(Socket::SRTConnection &);
};
// Callback for SRT-serving threads
static void callThreadCallbackSRT(void *cDataArg){
INSANE_MSG("Thread for %p started", cDataArg);
callbackSRTData *cData = (callbackSRTData *)cDataArg;
cData->cb(*(cData->sock));
cData->sock->close();
delete cData->sock;
delete cData;
INSANE_MSG("Thread for %p ended", cDataArg);
}
static void callThreadCallback(void *cDataArg){
INSANE_MSG("Thread for %p started", cDataArg);
callbackData *cData = (callbackData *)cDataArg;
@ -402,6 +430,53 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){
return r;
}
// This is a THREADED server!! Fork does not work as the SRT library itself already starts up a
// thread, and forking after thread creation messes up all control flow internal to the library.
int Util::Config::serveSRTSocket(int (*callback)(Socket::SRTConnection &S)){
Socket::SRTServer server_socket;
if (vals.isMember("port") && vals.isMember("interface")){
server_socket = Socket::SRTServer(getInteger("port"), getString("interface"), false, "output");
}
if (!server_socket.connected()){
DEVEL_MSG("Failure to open socket");
return 1;
}
serv_srt_sock_pointer = &server_socket;
activate();
if (server_socket.getSocket()){
int oldSock = server_socket.getSocket();
if (!dup2(oldSock, 0)){
server_socket = Socket::SRTServer(0);
close(oldSock);
}
}
int r = SRTServer(server_socket, callback);
serv_srt_sock_pointer = 0;
return r;
}
int Util::Config::SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &)){
Util::Procs::socketList.insert(server_socket.getSocket());
while (is_active && server_socket.connected()){
Socket::SRTConnection S = server_socket.accept(false, "output");
if (S.connected()){// check if the new connection is valid
callbackSRTData *cData = new callbackSRTData;
cData->sock = new Socket::SRTConnection(S);
cData->cb = callback;
// spawn a new thread for this connection
tthread::thread T(callThreadCallbackSRT, (void *)cData);
// detach it, no need to keep track of it anymore
T.detach();
HIGH_MSG("Spawned new thread for socket %i", S.getSocket());
}else{
Util::sleep(10); // sleep 10ms
}
}
Util::Procs::socketList.erase(server_socket.getSocket());
if (!is_restarting){server_socket.close();}
return 0;
}
int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){
Socket::Server server_socket;
if (Socket::checkTrueSocket(0)){
@ -466,6 +541,8 @@ void Util::Config::signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
case SIGHUP:
case SIGTERM:
if (serv_sock_pointer){serv_sock_pointer->close();}
// Close the srt server as well, if set
if (serv_srt_sock_pointer){serv_srt_sock_pointer->close();}
#if DEBUG >= DLVL_DEVEL
static int ctr = 0;
if (!is_active && ++ctr > 4){BACKTRACE;}

View file

@ -8,6 +8,7 @@
#endif
#include "json.h"
#include "socket_srt.h"
#include <signal.h>
#include <string>
@ -16,7 +17,7 @@ namespace Util{
extern uint32_t printDebugLevel;
extern std::string streamName; ///< Used by debug messages to identify the stream name
extern char exitReason[256];
void logExitReason(const char * format, ...);
void logExitReason(const char *format, ...);
/// Deals with parsing configuration from commandline options.
class Config{
@ -41,6 +42,7 @@ namespace Util{
int64_t getInteger(std::string optname);
bool getBool(std::string optname);
void activate();
void registerSRTSockPtr(Socket::SRTServer *ptr);
int threadServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S));
int forkServer(Socket::Server &server_socket, int (*callback)(Socket::Connection &S));
int serveThreadedSocket(int (*callback)(Socket::Connection &S));
@ -49,6 +51,9 @@ namespace Util{
void addOptionsFromCapabilities(const JSON::Value &capabilities);
void addBasicConnectorOptions(JSON::Value &capabilities);
void addConnectorOptions(int port, JSON::Value &capabilities);
int serveSRTSocket(int (*callback)(Socket::SRTConnection &S));
int SRTServer(Socket::SRTServer &server_socket, int (*callback)(Socket::SRTConnection &S));
};
/// The interface address the current serveSocket function is listening on

548
lib/socket_srt.cpp Normal file
View file

@ -0,0 +1,548 @@
#include "defines.h"
#include "lib/http_parser.h"
#include "socket_srt.h"
#include <cstdlib>
#include <sstream>
#define INVALID_SRT_SOCKET -1
namespace Socket{
namespace SRT{
bool isInited = false;
// Both Init and Cleanup functions are called implicitly if not done ourselves.
// SRT documentation states explicitly that this is unreliable behaviour
bool libraryInit(){
if (!isInited){
int res = srt_startup();
if (res == -1){ERROR_MSG("Unable to initialize SRT Library!");}
isInited = (res != -1);
}
return isInited;
}
bool libraryCleanup(){
if (isInited){
srt_cleanup();
isInited = false;
}
return true;
}
}// namespace SRT
template <typename T> std::string asString(const T &val){
std::stringstream x;
x << val;
return x.str();
}
sockaddr_in createInetAddr(const std::string &_host, int _port){
sockaddr_in res;
memset(&res, 9, sizeof res);
res.sin_family = AF_INET;
res.sin_port = htons(_port);
if (_host != ""){
if (inet_pton(AF_INET, _host.c_str(), &res.sin_addr) == 1){return res;}
hostent *he = gethostbyname(_host.c_str());
if (!he || he->h_addrtype != AF_INET){ERROR_MSG("Host not found %s", _host.c_str());}
res.sin_addr = *(in_addr *)he->h_addr_list[0];
}
return res;
}
std::string interpretSRTMode(const std::string &_mode, const std::string &_host, const std::string &_adapter){
if (_mode == "client" || _mode == "caller"){return "caller";}
if (_mode == "server" || _mode == "listener"){return "listener";}
if (_mode == "rendezvouz"){return "rendezvous";}
if (_mode != "default"){return "";}
if (_host == ""){return "listener";}
if (_adapter != ""){return "rendezvous";}
return "caller";
}
std::string interpretSRTMode(const HTTP::URL &u){
paramList params;
HTTP::parseVars(u.args, params);
return interpretSRTMode(params.count("mode") ? params.at("mode") : "default", u.host, "");
}
SRTConnection::SRTConnection(){initializeEmpty();}
SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction,
const std::map<std::string, std::string> &_params){
connect(_host, _port, _direction, _params);
}
std::string SRTConnection::getStreamName(){
int sNameLen = 512;
char sName[sNameLen];
int optRes = srt_getsockflag(sock, SRTO_STREAMID, (void *)sName, &sNameLen);
if (optRes != -1 && sNameLen){return sName;}
return "";
}
/// Updates the downbuffer internal variable.
/// Returns true if new data was received, false otherwise.
std::string SRTConnection::RecvNow(){
char recvbuf[5000];
bool blockState = blocking;
setBlocking(true);
SRT_MSGCTRL mc = srt_msgctrl_default;
int32_t receivedBytes = srt_recvmsg2(sock, recvbuf, 5000, &mc);
if (prev_pktseq != 0 && (mc.pktseq - prev_pktseq > 1)){WARN_MSG("Packet lost");}
prev_pktseq = mc.pktseq;
setBlocking(blockState);
if (receivedBytes == -1){
ERROR_MSG("Unable to receive data over socket: %s", srt_getlasterror_str());
if (srt_getsockstate(sock) != SRTS_CONNECTED){close();}
return "";
}
srt_bstats(sock, &performanceMonitor, false);
return std::string(recvbuf, receivedBytes);
}
void SRTConnection::connect(const std::string &_host, int _port, const std::string &_direction,
const std::map<std::string, std::string> &_params){
initializeEmpty();
direction = _direction;
handleConnectionParameters(_host, _params);
HIGH_MSG("Opening SRT connection %s in %s mode on %s:%d", modeName.c_str(), direction.c_str(),
_host.c_str(), _port);
sock = srt_create_socket();
if (sock == SRT_ERROR){
ERROR_MSG("Error creating an SRT socket");
return;
}
if (modeName == "rendezvous"){
bool v = true;
srt_setsockopt(sock, 0, SRTO_RENDEZVOUS, &v, sizeof v);
}
if (preConfigureSocket() == SRT_ERROR){
ERROR_MSG("Error configuring SRT socket");
return;
}
if (modeName == "caller"){
if (outgoing_port){setupAdapter("", outgoing_port);}
sockaddr_in sa = createInetAddr(_host, _port);
sockaddr *psa = (sockaddr *)&sa;
HIGH_MSG("Going to connect sock %d", sock);
if (srt_connect(sock, psa, sizeof sa) == SRT_ERROR){
srt_close(sock);
ERROR_MSG("Can't connect SRT Socket");
return;
}
HIGH_MSG("Connected sock %d", sock);
if (postConfigureSocket() == SRT_ERROR){
ERROR_MSG("Error during postconfigure socket");
return;
}
INFO_MSG("Caller SRT socket %" PRId32 " success targetting %s:%u", sock, _host.c_str(), _port);
return;
}
if (modeName == "listener"){
HIGH_MSG("Going to bind a server on %s:%u", _host.c_str(), _port);
sockaddr_in sa = createInetAddr(_host, _port);
sockaddr *psa = (sockaddr *)&sa;
if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){
srt_close(sock);
ERROR_MSG("Can't connect SRT Socket");
return;
}
if (srt_listen(sock, 1) == SRT_ERROR){
srt_close(sock);
ERROR_MSG("Can not listen on Socket");
}
INFO_MSG("Listener SRT socket sucess @ %s:%u", _host.c_str(), _port);
return;
}
if (modeName == "rendezvous"){
int outport = (outgoing_port ? outgoing_port : _port);
HIGH_MSG("Going to bind a server on %s:%u", _host.c_str(), _port);
sockaddr_in sa = createInetAddr(_host, outport);
sockaddr *psa = (sockaddr *)&sa;
if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){
srt_close(sock);
ERROR_MSG("Can't connect SRT Socket");
return;
}
sockaddr_in sb = createInetAddr(_host, outport);
sockaddr *psb = (sockaddr *)&sb;
if (srt_connect(sock, psb, sizeof sb) == SRT_ERROR){
srt_close(sock);
ERROR_MSG("Can't connect SRT Socket");
return;
}
if (postConfigureSocket() == SRT_ERROR){
ERROR_MSG("Error during postconfigure socket");
return;
}
INFO_MSG("Rendezvous SRT socket sucess @ %s:%u", _host.c_str(), _port);
return;
}
ERROR_MSG("Invalid mode parameter. Use 'client' or 'server'");
}
void SRTConnection::setupAdapter(const std::string &_host, int _port){
sockaddr_in localsa = createInetAddr(_host, _port);
sockaddr *psa = (sockaddr *)&localsa;
if (srt_bind(sock, psa, sizeof localsa) == SRT_ERROR){
ERROR_MSG("Unable to bind socket to %s:%u", _host.c_str(), _port);
}
}
void SRTConnection::SendNow(const std::string &data){SendNow(data.data(), data.size());}
void SRTConnection::SendNow(const char *data, size_t len){
srt_clearlasterror();
int res = srt_sendmsg2(sock, data, len, NULL);
if (res == SRT_ERROR){
ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str());
if (srt_getsockstate(sock) != SRTS_CONNECTED){close();}
}
srt_bstats(sock, &performanceMonitor, false);
}
unsigned int SRTConnection::connTime(){
srt_bstats(sock, &performanceMonitor, false);
return performanceMonitor.msTimeStamp / 1000;
}
uint64_t SRTConnection::dataUp(){return performanceMonitor.byteSentTotal;}
uint64_t SRTConnection::dataDown(){return performanceMonitor.byteRecvTotal;}
uint64_t SRTConnection::packetCount(){
return (direction == "input" ? performanceMonitor.pktRecvTotal : performanceMonitor.pktSentTotal);
}
uint64_t SRTConnection::packetLostCount(){
return (direction == "input" ? performanceMonitor.pktRcvLossTotal : performanceMonitor.pktSndLossTotal);
}
uint64_t SRTConnection::packetRetransmitCount(){
//\todo This should be updated with pktRcvRetransTotal on the retrieving end once srt has implemented this.
return (direction == "input" ? 0 : performanceMonitor.pktRetransTotal);
}
void SRTConnection::initializeEmpty(){
prev_pktseq = 0;
sock = SRT_INVALID_SOCK;
outgoing_port = 0;
chunkTransmitSize = 1316;
blocking = false;
}
void SRTConnection::setBlocking(bool _blocking){
if (_blocking == blocking){return;}
// If we have an error setting the new blocking state, the state is unchanged so we return early.
if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &_blocking,
sizeof _blocking) == -1){
return;
}
blocking = _blocking;
}
bool SRTConnection::isBlocking(){return blocking;}
void SRTConnection::handleConnectionParameters(const std::string &_host,
const std::map<std::string, std::string> &_params){
params = _params;
DONTEVEN_MSG("SRT Received parameters: ");
for (std::map<std::string, std::string>::const_iterator it = params.begin(); it != params.end(); it++){
DONTEVEN_MSG(" %s: %s", it->first.c_str(), it->second.c_str());
}
adapter = (params.count("adapter") ? params.at("adapter") : "");
modeName = interpretSRTMode((params.count("mode") ? params.at("mode") : "default"), _host, adapter);
if (modeName == ""){
ERROR_MSG("Invalid SRT mode encountered");
return;
}
// Using strtol because the original code uses base 0 -> automatic detection of octal and hexadecimal systems.
timeout = (params.count("timeout") ? strtol(params.at("timeout").c_str(), 0, 0) : 0);
if (adapter == "" && modeName == "listener"){adapter = _host;}
tsbpdMode = ((params.count("tsbpd") && isFalseString(params.at("tsbpd"))) ? false : true);
outgoing_port = (params.count("port") ? strtol(params.at("port").c_str(), 0, 0) : 0);
if ((!params.count("transtype") || params.at("transtype") != "file") && chunkTransmitSize > SRT_LIVE_DEF_PLSIZE){
if (chunkTransmitSize > SRT_LIVE_MAX_PLSIZE){
ERROR_MSG("Chunk size in live mode exceeds 1456 bytes!");
return;
}
}
params["payloadsize"] = asString(chunkTransmitSize);
}
int SRTConnection::preConfigureSocket(){
bool no = false;
if (!tsbpdMode){
if (srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no) == -1){return -1;}
}
if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no) == -1){return -1;}
if (params.count("linger")){
linger lin;
lin.l_linger = atoi(params.at("linger").c_str());
lin.l_onoff = lin.l_linger > 0 ? 1 : 0;
srt_setsockopt(sock, 0, SRTO_LINGER, &lin, sizeof(linger));
}
std::string errMsg = configureSocketLoop(SRT::SockOpt::PRE);
if (errMsg.size()){
WARN_MSG("Failed to set the following options: %s", errMsg.c_str());
return SRT_ERROR;
}
if (direction == "output"){
int v = 1;
if (srt_setsockopt(sock, 0, SRTO_SENDER, &v, sizeof v) == SRT_ERROR){return SRT_ERROR;}
}
return 0;
}
int SRTConnection::postConfigureSocket(){
bool no = false;
if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDSYN : SRTO_RCVSYN), &no, sizeof no) == -1){
return -1;
}
if (timeout){
if (srt_setsockopt(sock, 0, (direction == "output" ? SRTO_SNDTIMEO : SRTO_RCVTIMEO), &timeout,
sizeof timeout) == -1){
return -1;
}
}
std::string errMsg = configureSocketLoop(SRT::SockOpt::POST);
if (errMsg.size()){
WARN_MSG("Failed to set the following options: %s", errMsg.c_str());
return SRT_ERROR;
}
return 0;
}
std::string SRTConnection::configureSocketLoop(SRT::SockOpt::Binding _binding){
std::string errMsg;
std::vector<SocketOption> allSrtOptions = srtOptions();
for (std::vector<SocketOption>::iterator it = allSrtOptions.begin(); it != allSrtOptions.end(); it++){
if (it->binding == _binding && params.count(it->name)){
std::string value = params.at(it->name);
if (!it->apply(sock, value)){errMsg += it->name + " ";}
}
}
return errMsg;
}
void SRTConnection::close(){
if (sock != -1){
srt_close(sock);
sock = -1;
}
}
SRTServer::SRTServer(){}
SRTServer::SRTServer(int fromSock){conn = SRTConnection(fromSock);}
SRTServer::SRTServer(int port, std::string hostname, bool nonblock, const std::string &_direction){
// We always create a server as listening
std::map<std::string, std::string> listenMode;
listenMode["mode"] = "listener";
if (hostname == ""){hostname = "0.0.0.0";}
conn.connect(hostname, port, _direction, listenMode);
conn.setBlocking(true);
if (!conn){
ERROR_MSG("Unable to create socket");
return;
}
}
SRTConnection SRTServer::accept(bool nonblock, const std::string &direction){
if (!conn){return SRTConnection();}
struct sockaddr_in6 tmpaddr;
int len = sizeof(tmpaddr);
SRTConnection r(srt_accept(conn.getSocket(), (sockaddr *)&tmpaddr, &len));
if (!r){
if (conn.getSocket() != -1 && srt_getlasterror(0) != SRT_EASYNCRCV){
FAIL_MSG("Error during accept: %s. Closing server socket %d.", srt_getlasterror_str(), conn.getSocket());
close();
}
return r;
}
r.direction = direction;
r.postConfigureSocket();
r.setBlocking(!nonblock);
static char addrconv[INET6_ADDRSTRLEN];
r.remoteaddr = tmpaddr;
if (tmpaddr.sin6_family == AF_INET6){
r.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN);
HIGH_MSG("IPv6 addr [%s]", r.remotehost.c_str());
}
if (tmpaddr.sin6_family == AF_INET){
r.remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&tmpaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN);
HIGH_MSG("IPv4 addr [%s]", r.remotehost.c_str());
}
INFO_MSG("Accepted a socket coming from %s", r.remotehost.c_str());
return r;
}
void SRTServer::setBlocking(bool blocking){conn.setBlocking(blocking);}
bool SRTServer::isBlocking(){return (conn ? conn.isBlocking() : false);}
void SRTServer::close(){conn.close();}
bool SRTServer::connected() const{return conn.connected();}
int SRTServer::getSocket(){return conn.getSocket();}
inline int SocketOption::setSo(int socket, int proto, int sym, const void *data, size_t size, bool isSrtOpt){
if (isSrtOpt){return srt_setsockopt(socket, 0, SRT_SOCKOPT(sym), data, (int)size);}
return ::setsockopt(socket, proto, sym, (const char *)data, (int)size);
}
bool SocketOption::extract(const std::string &v, OptionValue &val, SRT::SockOpt::Type asType){
switch (asType){
case SRT::SockOpt::STRING:
val.s = v;
val.value = val.s.data();
val.size = val.s.size();
break;
case SRT::SockOpt::INT:
case SRT::SockOpt::INT64:{
int64_t tmp = strtol(v.c_str(), 0, 0);
if (tmp == 0 && (!v.size() || v[0] != '0')){return false;}
if (asType == SRT::SockOpt::INT){
val.i = tmp;
val.value = &val.i;
val.size = sizeof(val.i);
}else{
val.l = tmp;
val.value = &val.l;
val.size = sizeof(val.l);
}
}break;
case SRT::SockOpt::BOOL:{
bool tmp;
if (isFalseString(v)){
tmp = true;
}else if (isTrueString(v)){
tmp = true;
}else{
return false;
}
val.b = tmp;
val.value = &val.b;
val.size = sizeof val.b;
}break;
case SRT::SockOpt::ENUM:{
// Search value in the map. If found, set to o.
SockOptVals::const_iterator p = valmap.find(v);
if (p != valmap.end()){
val.i = p->second;
val.value = &val.i;
val.size = sizeof val.i;
return true;
}
// Fallback: try interpreting it as integer.
return extract(v, val, SRT::SockOpt::INT);
}
}
return true;
}
bool SocketOption::apply(int socket, const std::string &value, bool isSrtOpt){
OptionValue o;
int result = -1;
if (extract(value, o, type)){
result = setSo(socket, protocol, symbol, o.value, o.size, isSrtOpt);
}
return result != -1;
}
const std::map<std::string, int> enummap_transtype;
std::vector<SocketOption> srtOptions(){
static std::map<std::string, int> enummap_transtype;
if (!enummap_transtype.size()){
enummap_transtype["live"] = SRTT_LIVE;
enummap_transtype["file"] = SRTT_FILE;
}
static std::vector<SocketOption> res;
if (res.size()){return res;}
res.push_back(SocketOption("transtype", 0, SRTO_TRANSTYPE, SRT::SockOpt::PRE,
SRT::SockOpt::ENUM, enummap_transtype));
res.push_back(SocketOption("maxbw", 0, SRTO_MAXBW, SRT::SockOpt::PRE, SRT::SockOpt::INT64));
res.push_back(SocketOption("pbkeylen", 0, SRTO_PBKEYLEN, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("passphrase", 0, SRTO_PASSPHRASE, SRT::SockOpt::PRE, SRT::SockOpt::STRING));
res.push_back(SocketOption("mss", 0, SRTO_MSS, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("fc", 0, SRTO_FC, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("sndbuf", 0, SRTO_SNDBUF, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("rcvbuf", 0, SRTO_RCVBUF, SRT::SockOpt::PRE, SRT::SockOpt::INT));
// linger option is handled outside of the common loop, therefore commented out.
// res.push_back(SocketOption( "linger", 0, SRTO_LINGER, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("ipttl", 0, SRTO_IPTTL, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("iptos", 0, SRTO_IPTOS, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("inputbw", 0, SRTO_INPUTBW, SRT::SockOpt::POST, SRT::SockOpt::INT64));
res.push_back(SocketOption("oheadbw", 0, SRTO_OHEADBW, SRT::SockOpt::POST, SRT::SockOpt::INT));
res.push_back(SocketOption("latency", 0, SRTO_LATENCY, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("tsbpdmode", 0, SRTO_TSBPDMODE, SRT::SockOpt::PRE, SRT::SockOpt::BOOL));
res.push_back(SocketOption("tlpktdrop", 0, SRTO_TLPKTDROP, SRT::SockOpt::PRE, SRT::SockOpt::BOOL));
res.push_back(SocketOption("snddropdelay", 0, SRTO_SNDDROPDELAY, SRT::SockOpt::POST, SRT::SockOpt::INT));
res.push_back(SocketOption("nakreport", 0, SRTO_NAKREPORT, SRT::SockOpt::PRE, SRT::SockOpt::BOOL));
res.push_back(SocketOption("conntimeo", 0, SRTO_CONNTIMEO, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("lossmaxttl", 0, SRTO_LOSSMAXTTL, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("rcvlatency", 0, SRTO_RCVLATENCY, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("peerlatency", 0, SRTO_PEERLATENCY, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("minversion", 0, SRTO_MINVERSION, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("streamid", 0, SRTO_STREAMID, SRT::SockOpt::PRE, SRT::SockOpt::STRING));
res.push_back(SocketOption("congestion", 0, SRTO_CONGESTION, SRT::SockOpt::PRE, SRT::SockOpt::STRING));
res.push_back(SocketOption("messageapi", 0, SRTO_MESSAGEAPI, SRT::SockOpt::PRE, SRT::SockOpt::BOOL));
// res.push_back(SocketOption("payloadsize", 0, SRTO_PAYLOADSIZE, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("kmrefreshrate", 0, SRTO_KMREFRESHRATE, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("kmpreannounce", 0, SRTO_KMPREANNOUNCE, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("enforcedencryption", 0, SRTO_ENFORCEDENCRYPTION, SRT::SockOpt::PRE,
SRT::SockOpt::BOOL));
res.push_back(SocketOption("peeridletimeo", 0, SRTO_PEERIDLETIMEO, SRT::SockOpt::PRE, SRT::SockOpt::INT));
res.push_back(SocketOption("packetfilter", 0, SRTO_PACKETFILTER, SRT::SockOpt::PRE, SRT::SockOpt::STRING));
// res.push_back(SocketOption( "groupconnect", 0, SRTO_GROUPCONNECT, SRT::SockOpt::PRE, SRT::SockOpt::INT));
// res.push_back(SocketOption( "groupstabtimeo", 0, SRTO_GROUPSTABTIMEO, SRT::SockOpt::PRE, SRT::SockOpt::INT));
return res;
}
}// namespace Socket

154
lib/socket_srt.h Normal file
View file

@ -0,0 +1,154 @@
#pragma once
#include "socket.h"
#include "url.h"
#include <map>
#include <string>
#include <srt/srt.h>
typedef std::map<std::string, int> SockOptVals;
typedef std::map<std::string, std::string> paramList;
namespace Socket{
std::string interpretSRTMode(const HTTP::URL &u);
inline bool isFalseString(const std::string &_val){
return _val == "0" || _val == "no" || _val == "off" || _val == "false";
}
inline bool isTrueString(const std::string &_val){
return _val == "1" || _val == "yes" || _val == "on" || _val == "true";
}
sockaddr_in createInetAddr(const std::string &_host, int _port);
namespace SRT{
extern bool isInited;
bool libraryInit();
bool libraryCleanup();
// By absence of class enum (c++11), moved enums to a separate namespace
namespace SockOpt{
enum Type{STRING = 0, INT, INT64, BOOL, ENUM};
enum Binding{PRE = 0, POST};
}// namespace SockOpt
}// namespace SRT
class SRTConnection{
public:
SRTConnection();
SRTConnection(SRTSOCKET alreadyConnected){sock = alreadyConnected;}
SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input",
const paramList &_params = paramList());
void connect(const std::string &_host, int _port, const std::string &_direction = "input",
const paramList &_params = paramList());
void close();
bool connected() const{return sock != -1;}
operator bool() const{return connected();}
void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false).
bool isBlocking(); ///< Check if this socket is blocking (true) or nonblocking (false).
std::string RecvNow();
void SendNow(const std::string &data);
void SendNow(const char *data, size_t len);
SRTSOCKET getSocket(){return sock;}
int postConfigureSocket();
std::string getStreamName();
unsigned int connTime();
uint64_t dataUp();
uint64_t dataDown();
uint64_t packetCount();
uint64_t packetLostCount();
uint64_t packetRetransmitCount();
std::string direction;
struct sockaddr_in6 remoteaddr;
std::string remotehost;
private:
SRTSOCKET sock;
CBytePerfMon performanceMonitor;
std::string host;
int outgoing_port;
int32_t prev_pktseq;
uint32_t chunkTransmitSize;
// From paramaeter parsing
std::string adapter;
std::string modeName;
int timeout;
bool tsbpdMode;
paramList params;
void initializeEmpty();
void handleConnectionParameters(const std::string &_host, const paramList &_params);
int preConfigureSocket();
std::string configureSocketLoop(SRT::SockOpt::Binding _binding);
void setupAdapter(const std::string &_host, int _port);
bool blocking;
};
/// This class is for easily setting up listening socket, either TCP or Unix.
class SRTServer{
public:
SRTServer();
SRTServer(int existingSock);
SRTServer(int port, std::string hostname, bool nonblock = false, const std::string &_direction = "input");
SRTConnection accept(bool nonblock = false, const std::string &direction = "input");
void setBlocking(bool blocking);
bool connected() const;
bool isBlocking();
void close();
int getSocket();
private:
SRTConnection conn;
std::string direction;
};
struct OptionValue{
std::string s;
int i;
int64_t l;
bool b;
const void *value;
size_t size;
};
class SocketOption{
public:
//{"enforcedencryption", 0, SRTO_ENFORCEDENCRYPTION, SRT::SockOpt::PRE, SRT::SockOpt::BOOL, nullptr},
SocketOption(const std::string &_name, int _protocol, int _symbol, SRT::SockOpt::Binding _binding,
SRT::SockOpt::Type _type, const SockOptVals &_values = SockOptVals())
: name(_name), protocol(_protocol), symbol(_symbol), binding(_binding), type(_type),
valmap(_values){};
std::string name;
int protocol;
int symbol;
SRT::SockOpt::Binding binding;
SRT::SockOpt::Type type;
SockOptVals valmap;
bool apply(int socket, const std::string &value, bool isSrtOpt = true);
static int setSo(int socket, int protocol, int symbol, const void *data, size_t size, bool isSrtOpt = true);
bool extract(const std::string &v, OptionValue &val, SRT::SockOpt::Type asType);
};
std::vector<SocketOption> srtOptions();
}// namespace Socket

View file

@ -1015,7 +1015,8 @@ namespace TS{
((adtsInfo[it->first].getFrequencyIndex() & 0x0E) >> 1);
init[1] = ((adtsInfo[it->first].getFrequencyIndex() & 0x01) << 7) |
((adtsInfo[it->first].getChannelConfig() & 0x0F) << 3);
// Wait with adding the track until we have init data
if (init[0] == 0 && init[1] == 0){addNewTrack = false;}
type = "audio";
codec = "AAC";
size = 16;

View file

@ -32,6 +32,9 @@
#define STAT_CLI_BPS_UP 256
#define STAT_CLI_CRC 512
#define STAT_CLI_SESSID 1024
#define STAT_CLI_PKTCOUNT 2048
#define STAT_CLI_PKTLOST 4096
#define STAT_CLI_PKTRETRANSMIT 8192
#define STAT_CLI_ALL 0xFFFF
// These are used to store "totals" field requests in a bitfield for speedup.
#define STAT_TOT_CLIENTS 1
@ -713,6 +716,9 @@ void Controller::statSession::wipeOld(uint64_t cutOff){
if (it->log.size() == 1){
wipedDown += it->log.begin()->second.down;
wipedUp += it->log.begin()->second.up;
wipedPktCount += it->log.begin()->second.pktCount;
wipedPktLost += it->log.begin()->second.pktCount;
wipedPktRetransmit += it->log.begin()->second.pktRetransmit;
}
it->log.erase(it->log.begin());
}
@ -800,6 +806,9 @@ void Controller::statSession::dropSession(const Controller::sessIndex &index){
lastSec = 0;
wipedUp = 0;
wipedDown = 0;
wipedPktCount = 0;
wipedPktLost = 0;
wipedPktRetransmit = 0;
oldConns.clear();
sessionType = SESS_UNSET;
}
@ -819,6 +828,9 @@ Controller::statSession::statSession(){
sync = 1;
wipedUp = 0;
wipedDown = 0;
wipedPktCount = 0;
wipedPktLost = 0;
wipedPktRetransmit = 0;
sessionType = SESS_UNSET;
noBWCount = 0;
}
@ -1014,6 +1026,97 @@ uint64_t Controller::statSession::getUp(){
return retVal;
}
uint64_t Controller::statSession::getPktCount(uint64_t t){
uint64_t retVal = wipedPktCount;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktCount;}
}
}
if (curConns.size()){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktCount;}
}
}
return retVal;
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
uint64_t Controller::statSession::getPktCount(){
uint64_t retVal = wipedPktCount;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){retVal += it->log.rbegin()->second.pktCount;}
}
}
if (curConns.size()){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktCount;}
}
}
return retVal;
}
uint64_t Controller::statSession::getPktLost(uint64_t t){
uint64_t retVal = wipedPktLost;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktLost;}
}
}
if (curConns.size()){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktLost;}
}
}
return retVal;
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
uint64_t Controller::statSession::getPktLost(){
uint64_t retVal = wipedPktLost;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){retVal += it->log.rbegin()->second.pktLost;}
}
}
if (curConns.size()){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktLost;}
}
}
return retVal;
}
uint64_t Controller::statSession::getPktRetransmit(uint64_t t){
uint64_t retVal = wipedPktRetransmit;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktRetransmit;}
}
}
if (curConns.size()){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktRetransmit;}
}
}
return retVal;
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
uint64_t Controller::statSession::getPktRetransmit(){
uint64_t retVal = wipedPktRetransmit;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){retVal += it->log.rbegin()->second.pktRetransmit;}
}
}
if (curConns.size()){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktRetransmit;}
}
}
return retVal;
}
/// Returns the cumulative downloaded bytes per second for this session at timestamp t.
uint64_t Controller::statSession::getBpsDown(uint64_t t){
uint64_t aTime = t - 5;
@ -1048,6 +1151,9 @@ Controller::statLog &Controller::statStorage::getDataFor(unsigned long long t){
empty.lastSecond = 0;
empty.down = 0;
empty.up = 0;
empty.pktCount = 0;
empty.pktLost = 0;
empty.pktRetransmit = 0;
return empty;
}
std::map<unsigned long long, statLog>::iterator it = log.upper_bound(t);
@ -1063,6 +1169,9 @@ void Controller::statStorage::update(Comms::Statistics &statComm, size_t index){
tmp.lastSecond = statComm.getLastSecond(index);
tmp.down = statComm.getDown(index);
tmp.up = statComm.getUp(index);
tmp.pktCount = statComm.getPacketCount(index);
tmp.pktLost = statComm.getPacketLostCount(index);
tmp.pktRetransmit = statComm.getPacketRetransmitCount(index);
log[statComm.getNow(index)] = tmp;
// wipe data older than approx. STAT_CUTOFF seconds
/// \todo Remove least interesting data first.
@ -1132,7 +1241,7 @@ bool Controller::hasViewers(std::string streamName){
/// //array of protocols to accumulate. Empty means all.
/// "protocols": ["HLS", "HSS"],
/// //list of requested data fields. Empty means all.
/// "fields": ["host", "stream", "protocol", "conntime", "position", "down", "up", "downbps", "upbps"],
/// "fields": ["host", "stream", "protocol", "conntime", "position", "down", "up", "downbps", "upbps","pktcount","pktlost","pktretransmit"],
/// //unix timestamp of measuring moment. Negative means X seconds ago. Empty means now.
/// "time": 1234567
///}
@ -1186,6 +1295,9 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){
if ((*it).asStringRef() == "downbps"){fields |= STAT_CLI_BPS_DOWN;}
if ((*it).asStringRef() == "upbps"){fields |= STAT_CLI_BPS_UP;}
if ((*it).asStringRef() == "sessid"){fields |= STAT_CLI_SESSID;}
if ((*it).asStringRef() == "pktcount"){fields |= STAT_CLI_PKTCOUNT;}
if ((*it).asStringRef() == "pktlost"){fields |= STAT_CLI_PKTLOST;}
if ((*it).asStringRef() == "pktretransmit"){fields |= STAT_CLI_PKTRETRANSMIT;}
}
}
// select all, if none selected
@ -1213,6 +1325,9 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){
if (fields & STAT_CLI_BPS_UP){rep["fields"].append("upbps");}
if (fields & STAT_CLI_CRC){rep["fields"].append("crc");}
if (fields & STAT_CLI_SESSID){rep["fields"].append("sessid");}
if (fields & STAT_CLI_PKTCOUNT){rep["fields"].append("pktcount");}
if (fields & STAT_CLI_PKTLOST){rep["fields"].append("pktlost");}
if (fields & STAT_CLI_PKTRETRANSMIT){rep["fields"].append("pktretransmit");}
// output the data itself
rep["data"].null();
// loop over all sessions
@ -1237,6 +1352,9 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){
if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));}
if (fields & STAT_CLI_CRC){d.append(it->first.crc);}
if (fields & STAT_CLI_SESSID){d.append(it->first.ID);}
if (fields & STAT_CLI_PKTCOUNT){d.append(it->second.getPktCount(time));}
if (fields & STAT_CLI_PKTLOST){d.append(it->second.getPktLost(time));}
if (fields & STAT_CLI_PKTRETRANSMIT){d.append(it->second.getPktRetransmit(time));}
rep["data"].append(d);
}
}

View file

@ -32,6 +32,9 @@ namespace Controller{
uint64_t lastSecond;
uint64_t down;
uint64_t up;
uint64_t pktCount;
uint64_t pktLost;
uint64_t pktRetransmit;
};
enum sessType{SESS_UNSET = 0, SESS_INPUT, SESS_OUTPUT, SESS_VIEWER};
@ -74,6 +77,9 @@ namespace Controller{
uint64_t lastSec;
uint64_t wipedUp;
uint64_t wipedDown;
uint64_t wipedPktCount;
uint64_t wipedPktLost;
uint64_t wipedPktRetransmit;
std::deque<statStorage> oldConns;
sessType sessionType;
bool tracked;
@ -104,6 +110,12 @@ namespace Controller{
uint64_t getUp();
uint64_t getDown();
uint64_t getUp(uint64_t time);
uint64_t getPktCount();
uint64_t getPktCount(uint64_t time);
uint64_t getPktLost();
uint64_t getPktLost(uint64_t time);
uint64_t getPktRetransmit();
uint64_t getPktRetransmit(uint64_t time);
uint64_t getBpsDown(uint64_t time);
uint64_t getBpsUp(uint64_t time);
uint64_t getBpsDown(uint64_t start, uint64_t end);

View file

@ -679,12 +679,16 @@ namespace Mist{
// if not shutting down, wait 1 second before looping
if (config->is_active){Util::wait(INPUT_USER_INTERVAL);}
}
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;}
config->is_active = false;
if (!isThread()){
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;}
config->is_active = false;
}
finish();
INFO_MSG("Input closing clean, reason: %s", Util::exitReason);
userSelect.clear();
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_OFF;}
if (!isThread()){
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_OFF;}
}
}
/// This function checks if an input in serve mode should keep running or not.
@ -729,7 +733,6 @@ namespace Mist{
/// - if there are tracks, register as a non-viewer on the user page of the buffer
/// - call getNext() in a loop, buffering packets
void Input::stream(){
std::map<std::string, std::string> overrides;
overrides["throughboot"] = "";
if (config->getBool("realtime") ||
@ -895,6 +898,7 @@ namespace Mist{
statComm.setTime(now - startTime);
statComm.setLastSecond(0);
statComm.setHost(getConnectedBinHost());
handleLossyStats(statComm);
}
statTimer = Util::bootSecs();

View file

@ -41,6 +41,7 @@ namespace Mist{
virtual bool readHeader();
virtual bool needHeader(){return !readExistingHeader();}
virtual bool preRun(){return true;}
virtual bool isThread(){return false;}
virtual bool isSingular(){return !config->getBool("realtime");}
virtual bool readExistingHeader();
virtual bool atKeyFrame();
@ -69,6 +70,10 @@ namespace Mist{
virtual void userOnDisconnect(size_t id);
virtual void userLeadOut();
virtual void handleLossyStats(Comms::Statistics & statComm){}
virtual bool preventBufferStart() {return false;}
virtual void parseHeader();
bool bufferFrame(size_t track, uint32_t keyNum);

217
src/input/input_tssrt.cpp Normal file
View file

@ -0,0 +1,217 @@
#include "input_tssrt.h"
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <mist/defines.h>
#include <mist/downloader.h>
#include <mist/flv_tag.h>
#include <mist/http_parser.h>
#include <mist/mp4_generic.h>
#include <mist/socket_srt.h>
#include <mist/stream.h>
#include <mist/timing.h>
#include <mist/ts_packet.h>
#include <mist/util.h>
#include <string>
#include <mist/procs.h>
#include <mist/tinythread.h>
#include <sys/stat.h>
Util::Config *cfgPointer = NULL;
std::string baseStreamName;
/// Global, so that all tracks stay in sync
int64_t timeStampOffset = 0;
// We use threads here for multiple input pushes, because of the internals of the SRT Library
static void callThreadCallbackSRT(void *socknum){
SRTSOCKET sock = *((SRTSOCKET *)socknum);
// use the accepted socket as the second parameter
Mist::inputTSSRT inp(cfgPointer, sock);
inp.setSingular(false);
inp.run();
}
namespace Mist{
/// Constructor of TS Input
/// \arg cfg Util::Config that contains all current configurations.
inputTSSRT::inputTSSRT(Util::Config *cfg, SRTSOCKET s) : Input(cfg){
capa["name"] = "TSSRT";
capa["desc"] = "This input allows for processing MPEG2-TS-based SRT streams. Use mode=listener "
"for push input.";
capa["source_match"].append("srt://*");
// These can/may be set to always-on mode
capa["always_match"].append("srt://*");
capa["incoming_push_url"] = "srt://$host:$port";
capa["incoming_push_url_match"] = "srt://*";
capa["priority"] = 9;
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("MPEG2");
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
JSON::Value option;
option["arg"] = "integer";
option["long"] = "buffer";
option["short"] = "b";
option["help"] = "DVR buffer time in ms";
option["value"].append(50000);
config->addOption("bufferTime", option);
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
capa["optional"]["DVR"]["help"] =
"The target available buffer time for this live stream, in milliseconds. This is the time "
"available to seek around in, and will automatically be extended to fit whole keyframes as "
"well as the minimum duration needed for stable playback.";
capa["optional"]["DVR"]["option"] = "--buffer";
capa["optional"]["DVR"]["type"] = "uint";
capa["optional"]["DVR"]["default"] = 50000;
// Setup if we are called form with a thread for push-based input.
if (s != -1){
srtConn = Socket::SRTConnection(s);
streamName = baseStreamName;
if (srtConn.getStreamName() != ""){streamName += "+" + srtConn.getStreamName();}
}
lastTimeStamp = 0;
singularFlag = true;
}
inputTSSRT::~inputTSSRT(){}
bool inputTSSRT::checkArguments(){return true;}
/// Live Setup of SRT Input. Runs only if we are the "main" thread
bool inputTSSRT::preRun(){
if (srtConn.getSocket() == -1){
std::string source = config->getString("input");
standAlone = false;
HTTP::URL u(source);
INFO_MSG("Parsed url: %s", u.getUrl().c_str());
if (Socket::interpretSRTMode(u) == "listener"){
sSock = Socket::SRTServer(u.getPort(), u.host, false);
config->registerSRTSockPtr(&sSock);
}else{
INFO_MSG("A");
std::map<std::string, std::string> arguments;
HTTP::parseVars(u.args, arguments);
size_t connectCnt = 0;
do{
srtConn.connect(u.host, u.getPort(), "input", arguments);
if (!srtConn){Util::sleep(1000);}
++connectCnt;
}while (!srtConn && connectCnt < 10);
if (!srtConn){WARN_MSG("Connecting to %s timed out", u.getUrl().c_str());}
}
}
return true;
}
// Retrieve the next packet to be played from the srt connection.
void inputTSSRT::getNext(size_t idx){
thisPacket.null();
bool hasPacket = tsStream.hasPacket();
bool firstloop = true;
while (!hasPacket && srtConn.connected() && config->is_active){
firstloop = false;
// Receive data from the socket. SRT Sockets handle some internal timing as well, based on the provided settings.
leftBuffer.append(srtConn.RecvNow());
if (leftBuffer.size()){
size_t offset = 0;
size_t garbage = 0;
while ((offset + 188) < leftBuffer.size()){
if (leftBuffer[offset] != 0x47){
++garbage;
if (garbage % 100 == 0){INFO_MSG("Accumulated %zu bytes of garbage", garbage);}
++offset;
continue;
}
if (garbage != 0){
WARN_MSG("Thrown away %zu bytes of garbage data", garbage);
garbage = 0;
}
if (offset + 188 <= leftBuffer.size()){
tsBuf.FromPointer(leftBuffer.data() + offset);
tsStream.parse(tsBuf, 0);
offset += 188;
}
}
leftBuffer.erase(0, offset);
hasPacket = tsStream.hasPacket();
}else if (srtConn.connected()){
// This should not happen as the SRT socket is read blocking and won't return until there is
// data. But if it does, wait before retry
Util::sleep(10);
}
}
if (hasPacket){tsStream.getEarliestPacket(thisPacket);}
if (!thisPacket){
INFO_MSG("Could not getNext TS packet!");
return;
}
tsStream.initializeMetadata(meta);
size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid());
if (thisIdx == INVALID_TRACK_ID){getNext(idx);}
uint64_t adjustTime = thisPacket.getTime() + timeStampOffset;
if (lastTimeStamp || timeStampOffset){
if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){
INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.",
PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime));
timeStampOffset += (lastTimeStamp - adjustTime);
adjustTime = thisPacket.getTime() + timeStampOffset;
}
}
lastTimeStamp = adjustTime;
thisPacket.setTime(adjustTime);
}
bool inputTSSRT::openStreamSource(){return true;}
void inputTSSRT::parseStreamHeader(){
// Placeholder empty track to force normal code to continue despite no tracks available
tmpIdx = meta.addTrack(0, 0, 0, 0);
}
void inputTSSRT::streamMainLoop(){
meta.removeTrack(tmpIdx);
// If we do not have a srtConn here, we are the main thread and should start accepting pushes.
if (srtConn.getSocket() == -1){
cfgPointer = config;
baseStreamName = streamName;
while (config->is_active && sSock.connected()){
Socket::SRTConnection S = sSock.accept();
if (S.connected()){// check if the new connection is valid
SRTSOCKET sock = S.getSocket();
// spawn a new thread for this connection
tthread::thread T(callThreadCallbackSRT, (void *)&sock);
// detach it, no need to keep track of it anymore
T.detach();
HIGH_MSG("Spawned new thread for socket %i", S.getSocket());
}
}
return;
}
// If we are here: we have a proper connection (either accepted or pull input) and should start parsing it as such
Input::streamMainLoop();
}
bool inputTSSRT::needsLock(){return false;}
void inputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;}
void inputTSSRT::handleLossyStats(Comms::Statistics &statComm){
statComm.setPacketCount(srtConn.packetCount());
statComm.setPacketLostCount(srtConn.packetLostCount());
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
}
}// namespace Mist

48
src/input/input_tssrt.h Normal file
View file

@ -0,0 +1,48 @@
#include "input.h"
#include <mist/dtsc.h>
#include <mist/nal.h>
#include <mist/socket_srt.h>
#include <mist/ts_packet.h>
#include <mist/ts_stream.h>
#include <set>
#include <string>
namespace Mist{
/// This class contains all functions needed to implement TS Input
class inputTSSRT : public Input{
public:
inputTSSRT(Util::Config *cfg, SRTSOCKET s = -1);
~inputTSSRT();
void setSingular(bool newSingular);
virtual bool needsLock();
protected:
// Private Functions
bool checkArguments();
bool preRun();
virtual void getNext(size_t idx = INVALID_TRACK_ID);
virtual bool needHeader(){return false;}
virtual bool preventBufferStart(){return srtConn.getSocket() == -1;}
virtual bool isSingular(){return singularFlag;}
virtual bool isThread(){return !singularFlag;}
bool openStreamSource();
void parseStreamHeader();
void streamMainLoop();
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
TS::Packet tsBuf;
std::string leftBuffer;
uint64_t lastTimeStamp;
Socket::SRTServer sSock;
Socket::SRTConnection srtConn;
bool singularFlag;
size_t tmpIdx;
virtual size_t streamByteCount(){
return srtConn.dataDown();
}; // For live streams: to update the stats with correct values.
virtual void handleLossyStats(Comms::Statistics &statComm);
};
}// namespace Mist
typedef Mist::inputTSSRT mistIn;

View file

@ -0,0 +1,58 @@
#include OUTPUTTYPE
#include <mist/config.h>
#include <mist/defines.h>
#include <mist/socket.h>
#include <mist/socket_srt.h>
#include <mist/util.h>
int spawnForked(Socket::SRTConnection &S){
int fds[2];
pipe(fds);
Socket::Connection Sconn(fds[0], fds[1]);
mistOut tmp(Sconn, S.getSocket());
return tmp.run();
}
void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
HIGH_MSG("USR1 received - triggering rolling restart");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1");
Util::Config::is_active = false;
}
int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
Util::redirectLogsIfNeeded();
Util::Config conf(argv[0]);
mistOut::init(&conf);
if (conf.parseArgs(argc, argv)){
if (conf.getBool("json")){
mistOut::capa["version"] = PACKAGE_VERSION;
std::cout << mistOut::capa.toString() << std::endl;
return -1;
}
conf.activate();
if (mistOut::listenMode()){
{
struct sigaction new_action;
new_action.sa_sigaction = handleUSR1;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = 0;
sigaction(SIGUSR1, &new_action, NULL);
}
mistOut::listener(conf, spawnForked);
if (conf.is_restarting && Socket::checkTrueSocket(0)){
INFO_MSG("Reloading input while re-using server socket");
execvp(argv[0], argv);
FAIL_MSG("Error reloading: %s", strerror(errno));
}
}else{
Socket::Connection S(fileno(stdout), fileno(stdin));
mistOut tmp(S, -1);
return tmp.run();
}
}
INFO_MSG("Exit reason: %s", Util::exitReason);
return 0;
}

112
src/output/output_tssrt.cpp Normal file
View file

@ -0,0 +1,112 @@
#include "mist/socket_srt.h"
#include "output_tssrt.h"
#include <mist/defines.h>
#include <mist/http_parser.h>
#include <mist/url.h>
namespace Mist{
OutTSSRT::OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock) : TSOutput(conn){
// NOTE: conn is useless for SRT, as it uses a different socket type.
sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec)
streamName = config->getString("streamname");
pushOut = false;
std::string tracks;
// Push output configuration
if (config->getString("target").size()){
HTTP::URL target(config->getString("target"));
if (target.protocol != "srt"){
FAIL_MSG("Target %s must begin with srt://, aborting", target.getUrl().c_str());
onFail("Invalid srt target: doesn't start with srt://", true);
return;
}
if (!target.getPort()){
FAIL_MSG("Target %s must contain a port, aborting", target.getUrl().c_str());
onFail("Invalid srt target: missing port", true);
return;
}
pushOut = true;
if (targetParams.count("tracks")){tracks = targetParams["tracks"];}
size_t connectCnt = 0;
do{
srtConn.connect(target.host, target.getPort(), "output");
if (!srtConn){Util::sleep(1000);}
++connectCnt;
}while (!srtConn && connectCnt < 10);
wantRequest = false;
parseData = true;
}else{
// Pull output configuration, In this case we have an srt connection in the second constructor parameter.
srtConn = Socket::SRTConnection(_srtSock);
parseData = true;
wantRequest = false;
// Handle override / append of streamname options
std::string sName = srtConn.getStreamName();
if (sName != ""){
streamName = sName;
HIGH_MSG("Requesting stream %s", streamName.c_str());
}
}
initialize();
}
OutTSSRT::~OutTSSRT(){}
void OutTSSRT::init(Util::Config *cfg){
Output::init(cfg);
capa["name"] = "TSSRT";
capa["friendly"] = "TS over SRT";
capa["desc"] = "Real time streaming of TS data over SRT";
capa["deps"] = "";
capa["required"]["streamname"]["name"] = "Stream";
capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add "
"this protocol multiple times using different ports, "
"or use the streamid option on the srt connection";
capa["required"]["streamname"]["type"] = "str";
capa["required"]["streamname"]["option"] = "--stream";
capa["required"]["streamname"]["short"] = "s";
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("MPEG2");
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("MP3");
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
cfg->addConnectorOptions(8889, capa);
config = cfg;
capa["push_urls"].append("srt://*");
JSON::Value opt;
opt["arg"] = "string";
opt["default"] = "";
opt["arg_num"] = 1;
opt["help"] = "Target srt:// URL to push out towards.";
cfg->addOption("target", opt);
}
// Buffer internally in the class, and send once we have over 1000 bytes of data.
void OutTSSRT::sendTS(const char *tsData, size_t len){
if (packetBuffer.size() >= 1000){
srtConn.SendNow(packetBuffer);
if (!srtConn){
// Allow for proper disconnect
parseData = false;
}
packetBuffer.clear();
}
packetBuffer.append(tsData, len);
}
bool OutTSSRT::setAlternateConnectionStats(Comms::Statistics &statComm){
statComm.setUp(srtConn.dataUp());
statComm.setDown(srtConn.dataDown());
statComm.setTime(Util::bootSecs() - srtConn.connTime());
return true;
}
void OutTSSRT::handleLossyStats(Comms::Statistics &statComm){
statComm.setPacketCount(srtConn.packetCount());
statComm.setPacketLostCount(srtConn.packetLostCount());
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
}
}// namespace Mist

33
src/output/output_tssrt.h Normal file
View file

@ -0,0 +1,33 @@
#include "output_ts_base.h"
#include <mist/ts_stream.h>
#include <mist/socket_srt.h>
namespace Mist{
class OutTSSRT : public TSOutput{
public:
OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock);
~OutTSSRT();
static bool listenMode(){return !(config->getString("target").size());}
static void init(Util::Config *cfg);
void sendTS(const char *tsData, size_t len = 188);
bool isReadyForPlay(){return true;}
protected:
// Stats handling
virtual bool setAlternateConnectionStats(Comms::Statistics &statComm);
virtual void handleLossyStats(Comms::Statistics &statComm);
private:
bool pushOut;
std::string packetBuffer;
Socket::UDPConnection pushSock;
TS::Stream tsIn;
Socket::SRTConnection srtConn;
};
}// namespace Mist
typedef Mist::OutTSSRT mistOut;