Various small fixes to SRT sockets and SRT socket statistics
This commit is contained in:
parent
0af85de22d
commit
cac86fff57
8 changed files with 89 additions and 25 deletions
|
@ -76,6 +76,38 @@ namespace Socket{
|
||||||
lastGood = Util::bootMS();
|
lastGood = Util::bootMS();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Copy constructor
|
||||||
|
SRTConnection::SRTConnection(const SRTConnection &rhs){
|
||||||
|
initializeEmpty();
|
||||||
|
*this = rhs;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assignment constructor
|
||||||
|
SRTConnection &SRTConnection::operator=(const SRTConnection &rhs){
|
||||||
|
close();
|
||||||
|
initializeEmpty();
|
||||||
|
if (!rhs){return *this;}
|
||||||
|
memcpy(&remoteaddr, &(rhs.remoteaddr), sizeof(sockaddr_in6));
|
||||||
|
direction = rhs.direction;
|
||||||
|
remotehost = rhs.remotehost;
|
||||||
|
sock = rhs.sock;
|
||||||
|
performanceMonitor = rhs.performanceMonitor;
|
||||||
|
host = rhs.host;
|
||||||
|
outgoing_port = rhs.outgoing_port;
|
||||||
|
prev_pktseq = rhs.prev_pktseq;
|
||||||
|
lastGood = rhs.lastGood;
|
||||||
|
chunkTransmitSize = rhs.chunkTransmitSize;
|
||||||
|
adapter = rhs.adapter;
|
||||||
|
modeName = rhs.modeName;
|
||||||
|
timeout = rhs.timeout;
|
||||||
|
tsbpdMode = rhs.tsbpdMode;
|
||||||
|
params = rhs.params;
|
||||||
|
blocking = rhs.blocking;
|
||||||
|
getBinHost();
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction,
|
SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction,
|
||||||
const std::map<std::string, std::string> &_params){
|
const std::map<std::string, std::string> &_params){
|
||||||
connect(_host, _port, _direction, _params);
|
connect(_host, _port, _direction, _params);
|
||||||
|
@ -101,7 +133,7 @@ namespace Socket{
|
||||||
memcpy(tmpBuffer + 12, &(reinterpret_cast<const sockaddr_in *>(&remoteaddr)->sin_addr.s_addr), 4);
|
memcpy(tmpBuffer + 12, &(reinterpret_cast<const sockaddr_in *>(&remoteaddr)->sin_addr.s_addr), 4);
|
||||||
break;
|
break;
|
||||||
case AF_INET6: memcpy(tmpBuffer, &(remoteaddr.sin6_addr.s6_addr), 16); break;
|
case AF_INET6: memcpy(tmpBuffer, &(remoteaddr.sin6_addr.s6_addr), 16); break;
|
||||||
default: return ""; break;
|
default: memset(tmpBuffer, 0, 16); break;
|
||||||
}
|
}
|
||||||
return std::string(tmpBuffer, 16);
|
return std::string(tmpBuffer, 16);
|
||||||
}
|
}
|
||||||
|
@ -343,6 +375,7 @@ namespace Socket{
|
||||||
}
|
}
|
||||||
|
|
||||||
void SRTConnection::initializeEmpty(){
|
void SRTConnection::initializeEmpty(){
|
||||||
|
memset(&performanceMonitor, 0, sizeof(performanceMonitor));
|
||||||
prev_pktseq = 0;
|
prev_pktseq = 0;
|
||||||
sock = SRT_INVALID_SOCK;
|
sock = SRT_INVALID_SOCK;
|
||||||
outgoing_port = 0;
|
outgoing_port = 0;
|
||||||
|
@ -494,11 +527,12 @@ namespace Socket{
|
||||||
}
|
}
|
||||||
|
|
||||||
r.direction = direction;
|
r.direction = direction;
|
||||||
|
r.params = conn.params;
|
||||||
r.postConfigureSocket();
|
r.postConfigureSocket();
|
||||||
r.setBlocking(!nonblock);
|
r.setBlocking(!nonblock);
|
||||||
static char addrconv[INET6_ADDRSTRLEN];
|
static char addrconv[INET6_ADDRSTRLEN];
|
||||||
|
|
||||||
r.remoteaddr = tmpaddr;
|
memcpy(&(r.remoteaddr), &tmpaddr, sizeof(tmpaddr));
|
||||||
if (tmpaddr.sin6_family == AF_INET6){
|
if (tmpaddr.sin6_family == AF_INET6){
|
||||||
r.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN);
|
r.remotehost = inet_ntop(AF_INET6, &(tmpaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN);
|
||||||
HIGH_MSG("IPv6 addr [%s]", r.remotehost.c_str());
|
HIGH_MSG("IPv6 addr [%s]", r.remotehost.c_str());
|
||||||
|
@ -508,6 +542,7 @@ namespace Socket{
|
||||||
HIGH_MSG("IPv4 addr [%s]", r.remotehost.c_str());
|
HIGH_MSG("IPv4 addr [%s]", r.remotehost.c_str());
|
||||||
}
|
}
|
||||||
INFO_MSG("Accepted a socket coming from %s", r.remotehost.c_str());
|
INFO_MSG("Accepted a socket coming from %s", r.remotehost.c_str());
|
||||||
|
r.getBinHost();
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,16 @@ namespace Socket{
|
||||||
}// namespace SockOpt
|
}// namespace SockOpt
|
||||||
}// namespace SRT
|
}// namespace SRT
|
||||||
|
|
||||||
|
//Advance declaration so we can make it a friend of SRTConnection
|
||||||
|
class SRTServer;
|
||||||
|
|
||||||
class SRTConnection{
|
class SRTConnection{
|
||||||
|
friend class SRTServer;
|
||||||
public:
|
public:
|
||||||
SRTConnection();
|
SRTConnection();
|
||||||
|
// copy/assignment constructors
|
||||||
|
SRTConnection(const SRTConnection &rhs);
|
||||||
|
SRTConnection &operator=(const SRTConnection &rhs);
|
||||||
SRTConnection(SRTSOCKET alreadyConnected);
|
SRTConnection(SRTSOCKET alreadyConnected);
|
||||||
SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input",
|
SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input",
|
||||||
const paramList &_params = paramList());
|
const paramList &_params = paramList());
|
||||||
|
|
|
@ -15,6 +15,14 @@ tthread::recursive_mutex tMutex;
|
||||||
|
|
||||||
namespace TS{
|
namespace TS{
|
||||||
|
|
||||||
|
Assembler::Assembler(){
|
||||||
|
isLive = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Assembler::setLive(bool live){
|
||||||
|
isLive = live;
|
||||||
|
}
|
||||||
|
|
||||||
bool Assembler::assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse, uint64_t bytePos){
|
bool Assembler::assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse, uint64_t bytePos){
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
|
@ -30,7 +38,7 @@ namespace TS{
|
||||||
tsBuf.FromPointer(leftData);
|
tsBuf.FromPointer(leftData);
|
||||||
if (!ret && tsBuf.getUnitStart()){ret = true;}
|
if (!ret && tsBuf.getUnitStart()){ret = true;}
|
||||||
if (parse){
|
if (parse){
|
||||||
TSStrm.parse(tsBuf, bytePos);
|
TSStrm.parse(tsBuf, isLive?0:bytePos);
|
||||||
}else{
|
}else{
|
||||||
TSStrm.add(tsBuf);
|
TSStrm.add(tsBuf);
|
||||||
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
|
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
|
||||||
|
@ -59,7 +67,7 @@ namespace TS{
|
||||||
tsBuf.FromPointer(ptr + offset);
|
tsBuf.FromPointer(ptr + offset);
|
||||||
if (!ret && tsBuf.getUnitStart()){ret = true;}
|
if (!ret && tsBuf.getUnitStart()){ret = true;}
|
||||||
if (parse){
|
if (parse){
|
||||||
TSStrm.parse(tsBuf, bytePos);
|
TSStrm.parse(tsBuf, isLive?0:bytePos);
|
||||||
}else{
|
}else{
|
||||||
TSStrm.add(tsBuf);
|
TSStrm.add(tsBuf);
|
||||||
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
|
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
|
||||||
|
|
|
@ -112,9 +112,12 @@ namespace TS{
|
||||||
|
|
||||||
class Assembler{
|
class Assembler{
|
||||||
public:
|
public:
|
||||||
|
Assembler();
|
||||||
bool assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse = false, uint64_t bytePos = 0);
|
bool assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse = false, uint64_t bytePos = 0);
|
||||||
void clear();
|
void clear();
|
||||||
|
void setLive(bool live = true);
|
||||||
private:
|
private:
|
||||||
|
bool isLive;
|
||||||
Util::ResizeablePointer leftData;
|
Util::ResizeablePointer leftData;
|
||||||
TS::Packet tsBuf;
|
TS::Packet tsBuf;
|
||||||
};
|
};
|
||||||
|
|
|
@ -957,7 +957,6 @@ namespace Mist{
|
||||||
void Input::connStats(Comms::Connections &statComm){
|
void Input::connStats(Comms::Connections &statComm){
|
||||||
statComm.setUp(0);
|
statComm.setUp(0);
|
||||||
statComm.setDown(streamByteCount());
|
statComm.setDown(streamByteCount());
|
||||||
statComm.setHost(getConnectedBinHost());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Input::realtimeMainLoop(){
|
void Input::realtimeMainLoop(){
|
||||||
|
|
|
@ -39,9 +39,8 @@ void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
|
||||||
|
|
||||||
// We use threads here for multiple input pushes, because of the internals of the SRT Library
|
// We use threads here for multiple input pushes, because of the internals of the SRT Library
|
||||||
static void callThreadCallbackSRT(void *socknum){
|
static void callThreadCallbackSRT(void *socknum){
|
||||||
SRTSOCKET sock = *((SRTSOCKET *)socknum);
|
|
||||||
// use the accepted socket as the second parameter
|
// use the accepted socket as the second parameter
|
||||||
Mist::inputTSSRT inp(cfgPointer, sock);
|
Mist::inputTSSRT inp(cfgPointer, *(Socket::SRTConnection *)socknum);
|
||||||
inp.setSingular(false);
|
inp.setSingular(false);
|
||||||
inp.run();
|
inp.run();
|
||||||
}
|
}
|
||||||
|
@ -49,7 +48,7 @@ static void callThreadCallbackSRT(void *socknum){
|
||||||
namespace Mist{
|
namespace Mist{
|
||||||
/// Constructor of TS Input
|
/// Constructor of TS Input
|
||||||
/// \arg cfg Util::Config that contains all current configurations.
|
/// \arg cfg Util::Config that contains all current configurations.
|
||||||
inputTSSRT::inputTSSRT(Util::Config *cfg, SRTSOCKET s) : Input(cfg){
|
inputTSSRT::inputTSSRT(Util::Config *cfg, Socket::SRTConnection s) : Input(cfg){
|
||||||
rawIdx = INVALID_TRACK_ID;
|
rawIdx = INVALID_TRACK_ID;
|
||||||
lastRawPacket = 0;
|
lastRawPacket = 0;
|
||||||
capa["name"] = "TSSRT";
|
capa["name"] = "TSSRT";
|
||||||
|
@ -118,8 +117,8 @@ namespace Mist{
|
||||||
config->addOption("raw", option);
|
config->addOption("raw", option);
|
||||||
|
|
||||||
// Setup if we are called form with a thread for push-based input.
|
// Setup if we are called form with a thread for push-based input.
|
||||||
if (s != -1){
|
if (s.connected()){
|
||||||
srtConn = Socket::SRTConnection(s);
|
srtConn = s;
|
||||||
streamName = baseStreamName;
|
streamName = baseStreamName;
|
||||||
std::string streamid = srtConn.getStreamName();
|
std::string streamid = srtConn.getStreamName();
|
||||||
int64_t acc = config->getInteger("acceptable");
|
int64_t acc = config->getInteger("acceptable");
|
||||||
|
@ -263,9 +262,8 @@ namespace Mist{
|
||||||
while (config->is_active && sSock.connected()){
|
while (config->is_active && sSock.connected()){
|
||||||
Socket::SRTConnection S = sSock.accept();
|
Socket::SRTConnection S = sSock.accept();
|
||||||
if (S.connected()){// check if the new connection is valid
|
if (S.connected()){// check if the new connection is valid
|
||||||
SRTSOCKET sock = S.getSocket();
|
|
||||||
// spawn a new thread for this connection
|
// spawn a new thread for this connection
|
||||||
tthread::thread T(callThreadCallbackSRT, (void *)&sock);
|
tthread::thread T(callThreadCallbackSRT, (void *)&S);
|
||||||
// detach it, no need to keep track of it anymore
|
// detach it, no need to keep track of it anymore
|
||||||
T.detach();
|
T.detach();
|
||||||
HIGH_MSG("Spawned new thread for socket %i", S.getSocket());
|
HIGH_MSG("Spawned new thread for socket %i", S.getSocket());
|
||||||
|
@ -285,7 +283,6 @@ namespace Mist{
|
||||||
void inputTSSRT::connStats(Comms::Connections &statComm){
|
void inputTSSRT::connStats(Comms::Connections &statComm){
|
||||||
statComm.setUp(srtConn.dataUp());
|
statComm.setUp(srtConn.dataUp());
|
||||||
statComm.setDown(srtConn.dataDown());
|
statComm.setDown(srtConn.dataDown());
|
||||||
statComm.setHost(getConnectedBinHost());
|
|
||||||
statComm.setPacketCount(srtConn.packetCount());
|
statComm.setPacketCount(srtConn.packetCount());
|
||||||
statComm.setPacketLostCount(srtConn.packetLostCount());
|
statComm.setPacketLostCount(srtConn.packetLostCount());
|
||||||
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
|
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
|
||||||
|
|
|
@ -11,7 +11,7 @@ namespace Mist{
|
||||||
|
|
||||||
class inputTSSRT : public Input{
|
class inputTSSRT : public Input{
|
||||||
public:
|
public:
|
||||||
inputTSSRT(Util::Config *cfg, SRTSOCKET s = -1);
|
inputTSSRT(Util::Config *cfg, Socket::SRTConnection s = Socket::SRTConnection());
|
||||||
~inputTSSRT();
|
~inputTSSRT();
|
||||||
void setSingular(bool newSingular);
|
void setSingular(bool newSingular);
|
||||||
virtual bool needsLock();
|
virtual bool needsLock();
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
#include <mist/stream.h>
|
#include <mist/stream.h>
|
||||||
#include <mist/triggers.h>
|
#include <mist/triggers.h>
|
||||||
|
|
||||||
|
bool allowStreamNameOverride = true;
|
||||||
|
|
||||||
namespace Mist{
|
namespace Mist{
|
||||||
OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){
|
OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){
|
||||||
// NOTE: conn is useless for SRT, as it uses a different socket type.
|
// NOTE: conn is useless for SRT, as it uses a different socket type.
|
||||||
|
@ -14,6 +16,7 @@ namespace Mist{
|
||||||
streamName = config->getString("streamname");
|
streamName = config->getString("streamname");
|
||||||
Util::setStreamName(streamName);
|
Util::setStreamName(streamName);
|
||||||
pushOut = false;
|
pushOut = false;
|
||||||
|
assembler.setLive();
|
||||||
// Push output configuration
|
// Push output configuration
|
||||||
if (config->getString("target").size()){
|
if (config->getString("target").size()){
|
||||||
target = HTTP::URL(config->getString("target"));
|
target = HTTP::URL(config->getString("target"));
|
||||||
|
@ -28,11 +31,7 @@ namespace Mist{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pushOut = true;
|
pushOut = true;
|
||||||
std::map<std::string, std::string> arguments;
|
HTTP::parseVars(target.args, targetParams);
|
||||||
HTTP::parseVars(target.args, arguments);
|
|
||||||
for (std::map<std::string, std::string>::iterator it = arguments.begin(); it != arguments.end(); ++it){
|
|
||||||
targetParams[it->first] = it->second;
|
|
||||||
}
|
|
||||||
size_t connectCnt = 0;
|
size_t connectCnt = 0;
|
||||||
do{
|
do{
|
||||||
srtConn.connect(target.host, target.getPort(), "output", targetParams);
|
srtConn.connect(target.host, target.getPort(), "output", targetParams);
|
||||||
|
@ -44,6 +43,9 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
++connectCnt;
|
++connectCnt;
|
||||||
}while (!srtConn && connectCnt < 5);
|
}while (!srtConn && connectCnt < 5);
|
||||||
|
if (!srtConn){
|
||||||
|
FAIL_MSG("Failed to connect to '%s'!", config->getString("target").c_str());
|
||||||
|
}
|
||||||
wantRequest = false;
|
wantRequest = false;
|
||||||
parseData = true;
|
parseData = true;
|
||||||
initialize();
|
initialize();
|
||||||
|
@ -51,11 +53,13 @@ namespace Mist{
|
||||||
// Pull output configuration, In this case we have an srt connection in the second constructor parameter.
|
// Pull output configuration, In this case we have an srt connection in the second constructor parameter.
|
||||||
// Handle override / append of streamname options
|
// Handle override / append of streamname options
|
||||||
std::string sName = srtConn.getStreamName();
|
std::string sName = srtConn.getStreamName();
|
||||||
|
if (allowStreamNameOverride){
|
||||||
if (sName != ""){
|
if (sName != ""){
|
||||||
streamName = sName;
|
streamName = sName;
|
||||||
Util::sanitizeName(streamName);
|
Util::sanitizeName(streamName);
|
||||||
Util::setStreamName(streamName);
|
Util::setStreamName(streamName);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int64_t accTypes = config->getInteger("acceptable");
|
int64_t accTypes = config->getInteger("acceptable");
|
||||||
if (accTypes == 0){//Allow both directions
|
if (accTypes == 0){//Allow both directions
|
||||||
|
@ -437,7 +441,8 @@ int main(int argc, char *argv[]){
|
||||||
int filelimit = conf.getInteger("filelimit");
|
int filelimit = conf.getInteger("filelimit");
|
||||||
Util::sysSetNrOpenFiles(filelimit);
|
Util::sysSetNrOpenFiles(filelimit);
|
||||||
|
|
||||||
if (!mistOut::listenMode()){
|
std::string target = conf.getString("target");
|
||||||
|
if (!mistOut::listenMode() && (!target.size() || Socket::interpretSRTMode(HTTP::URL(target)) != "listener")){
|
||||||
Socket::Connection S(fileno(stdout), fileno(stdin));
|
Socket::Connection S(fileno(stdout), fileno(stdin));
|
||||||
Socket::SRTConnection tmpSock;
|
Socket::SRTConnection tmpSock;
|
||||||
mistOut tmp(S, tmpSock);
|
mistOut tmp(S, tmpSock);
|
||||||
|
@ -450,7 +455,17 @@ int main(int argc, char *argv[]){
|
||||||
new_action.sa_flags = 0;
|
new_action.sa_flags = 0;
|
||||||
sigaction(SIGUSR1, &new_action, NULL);
|
sigaction(SIGUSR1, &new_action, NULL);
|
||||||
}
|
}
|
||||||
if (conf.getInteger("port") && conf.getString("interface").size()){
|
if (target.size()){
|
||||||
|
//Force acceptable option to 1 (outgoing only), since this is a push output and we can't accept incoming connections
|
||||||
|
conf.getOption("acceptable", true).append((uint64_t)1);
|
||||||
|
//Disable overriding streamname with streamid parameter on other side
|
||||||
|
allowStreamNameOverride = false;
|
||||||
|
HTTP::URL tgt(target);
|
||||||
|
std::map<std::string, std::string> arguments;
|
||||||
|
HTTP::parseVars(tgt.args, arguments);
|
||||||
|
server_socket = Socket::SRTServer(tgt.getPort(), tgt.host, arguments, false, "output");
|
||||||
|
conf.getOption("target", true).append("");
|
||||||
|
}else{
|
||||||
std::map<std::string, std::string> arguments;
|
std::map<std::string, std::string> arguments;
|
||||||
server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), arguments, false, "output");
|
server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), arguments, false, "output");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue