SRT improvements:

- Made SRT support optional
- Make build options visible in cmake-gui
- Improved generic connection stats for outputs
- Added streamid handling configuration for MistInTSSRT
- Push input support over SRT
- Fixed support for SRT settings in push outputs
- Fix parsing of SRT-passed stream names
- Fixed hostnames in MistOutTSSRT, fixed PUSH_REWRITE trigger payload
- Opus support in TS-SRT
- Fixed SRT socket stats, fixed SRT socket address logic, improved SRT socket rolling restart support
- Fixed SRT push deny
This commit is contained in:
Thulinma 2020-08-28 00:42:38 +02:00
parent 19199cbff8
commit 0bd5d742f6
19 changed files with 686 additions and 347 deletions

View file

@ -5,20 +5,51 @@
#include <mist/socket_srt.h>
#include <mist/util.h>
int spawnForked(Socket::SRTConnection &S){
int fds[2];
pipe(fds);
Socket::Connection Sconn(fds[0], fds[1]);
Socket::SRTServer server_socket;
static uint64_t sockCount = 0;
mistOut tmp(Sconn, S.getSocket());
return tmp.run();
void (*oldSignal)(int, siginfo_t *,void *) = 0;
void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
server_socket.close();
if (oldSignal){
oldSignal(signum, sigInfo, ignore);
}
}
void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
HIGH_MSG("USR1 received - triggering rolling restart");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1");
Util::Config::is_active = false;
if (!sockCount){
INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, no connections");
server_socket.close();
Util::Config::is_active = false;
}else{
INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, after disconnect wait");
}
}
// Callback for SRT-serving threads
static void callThreadCallbackSRT(void *srtPtr){
sockCount++;
Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr;
int fds[2];
pipe(fds);
Socket::Connection Sconn(fds[0], fds[1]);
HIGH_MSG("Started thread for socket %i", srtSock.getSocket());
mistOut tmp(Sconn,srtSock);
tmp.run();
HIGH_MSG("Closing thread for socket %i", srtSock.getSocket());
Sconn.close();
srtSock.close();
delete &srtSock;
sockCount--;
if (!sockCount && Util::Config::is_restarting){
server_socket.close();
Util::Config::is_active = false;
INFO_MSG("Last active connection closed; triggering rolling restart now!");
}
}
int main(int argc, char *argv[]){
@ -41,15 +72,56 @@ int main(int argc, char *argv[]){
new_action.sa_flags = 0;
sigaction(SIGUSR1, &new_action, NULL);
}
mistOut::listener(conf, spawnForked);
if (conf.is_restarting && Socket::checkTrueSocket(0)){
INFO_MSG("Reloading input while re-using server socket");
if (conf.getInteger("port") && conf.getString("interface").size()){
server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output");
}
if (!server_socket.connected()){
DEVEL_MSG("Failure to open socket");
return 1;
}
struct sigaction new_action;
struct sigaction cur_action;
new_action.sa_sigaction = signal_handler;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = SA_SIGINFO;
sigaction(SIGINT, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
sigaction(SIGHUP, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
sigaction(SIGTERM, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
Util::Procs::socketList.insert(server_socket.getSocket());
while (conf.is_active && server_socket.connected()){
Socket::SRTConnection S = server_socket.accept(false, "output");
if (S.connected()){// check if the new connection is valid
// spawn a new thread for this connection
tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S));
// detach it, no need to keep track of it anymore
T.detach();
}else{
Util::sleep(10); // sleep 10ms
}
}
Util::Procs::socketList.erase(server_socket.getSocket());
server_socket.close();
if (conf.is_restarting){
INFO_MSG("Reloading input...");
execvp(argv[0], argv);
FAIL_MSG("Error reloading: %s", strerror(errno));
}
}else{
Socket::Connection S(fileno(stdout), fileno(stdin));
mistOut tmp(S, -1);
Socket::SRTConnection tmpSock;
mistOut tmp(S, tmpSock);
return tmp.run();
}
}

View file

@ -1686,9 +1686,7 @@ namespace Mist{
statComm.setCRC(crc);
statComm.setStream(streamName);
statComm.setConnector(getStatsName());
statComm.setUp(myConn.dataUp());
statComm.setDown(myConn.dataDown());
statComm.setTime(now - myConn.connTime());
connStats(now, statComm);
statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0);
statComm.setPid(getpid());
@ -1722,6 +1720,12 @@ namespace Mist{
}
}
void Output::connStats(uint64_t now, Comms::Statistics &statComm){
statComm.setUp(myConn.dataUp());
statComm.setDown(myConn.dataDown());
statComm.setTime(now - myConn.connTime());
}
bool Output::dropPushTrack(uint32_t trackId, const std::string & dropReason){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (it->second.getTrack() == trackId){

View file

@ -64,7 +64,7 @@ namespace Mist{
/// This function is called whenever a packet is ready for sending.
/// Inside it, thisPacket is guaranteed to contain a valid packet.
virtual void sendNext(){}// REQUIRED! Others are optional.
virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason);
virtual bool dropPushTrack(uint32_t trackId, const std::string &dropReason);
bool getKeyFrame();
bool prepareNext();
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true);
@ -103,10 +103,10 @@ namespace Mist{
uint64_t lastStats; ///< Time of last sending of stats.
std::set<sortedPageInfo> buffer; ///< A sorted list of next-to-be-loaded packets.
bool sought; ///< If a seek has been done, this is set to true. Used for seeking on
///< prepareNext().
bool sought; ///< If a seek has been done, this is set to true. Used for seeking on
///< prepareNext().
std::string prevHost; ///< Old value for getConnectedBinHost, for caching
protected: // these are to be messed with by child classes
protected: // these are to be messed with by child classes
virtual bool inlineRestartCapable() const{
return false;
}///< True if the output is capable of restarting mid-stream. This is used for swapping recording files
@ -122,6 +122,8 @@ namespace Mist{
virtual std::string getStatsName();
virtual bool hasSessionIDs(){return false;}
virtual void connStats(uint64_t now, Comms::Statistics &statComm);
std::set<size_t> getSupportedTracks(const std::string &type = "") const;
inline virtual bool keepGoing(){return config->is_active && myConn;}

View file

@ -77,6 +77,7 @@ namespace Mist{
capa["push_urls"].append("/*.ts");
capa["push_urls"].append("ts-exec:*");
#ifndef WITH_SRT
{
pid_t srt_tx = -1;
const char *args[] ={"srt-live-transmit", 0};
@ -84,13 +85,14 @@ namespace Mist{
if (srt_tx > 1){
capa["push_urls"].append("srt://*");
capa["desc"] = capa["desc"].asStringRef() +
". SRT push output support (srt://*) is installed and available.";
". Non-native SRT push output support (srt://*) is installed and available.";
}else{
capa["desc"] =
capa["desc"].asStringRef() +
". To enable SRT push output support, please install the srt-live-transmit binary.";
". To enable non-native SRT push output support, please install the srt-live-transmit binary.";
}
}
#endif
JSON::Value opt;
opt["arg"] = "string";

View file

@ -1,19 +1,22 @@
#include "mist/socket_srt.h"
#include <mist/socket_srt.h>
#include "output_tssrt.h"
#include <mist/defines.h>
#include <mist/http_parser.h>
#include <mist/url.h>
#include <mist/encode.h>
#include <mist/stream.h>
#include <mist/triggers.h>
namespace Mist{
OutTSSRT::OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock) : TSOutput(conn){
OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){
// NOTE: conn is useless for SRT, as it uses a different socket type.
sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec)
streamName = config->getString("streamname");
Util::setStreamName(streamName);
pushOut = false;
std::string tracks;
// Push output configuration
if (config->getString("target").size()){
HTTP::URL target(config->getString("target"));
target = HTTP::URL(config->getString("target"));
if (target.protocol != "srt"){
FAIL_MSG("Target %s must begin with srt://, aborting", target.getUrl().c_str());
onFail("Invalid srt target: doesn't start with srt://", true);
@ -25,29 +28,108 @@ namespace Mist{
return;
}
pushOut = true;
if (targetParams.count("tracks")){tracks = targetParams["tracks"];}
std::map<std::string, std::string> arguments;
HTTP::parseVars(target.args, arguments);
for (std::map<std::string, std::string>::iterator it = arguments.begin(); it != arguments.end(); ++it){
targetParams[it->first] = it->second;
}
size_t connectCnt = 0;
do{
srtConn.connect(target.host, target.getPort(), "output");
if (!srtConn){Util::sleep(1000);}
srtConn.connect(target.host, target.getPort(), "output", targetParams);
if (!srtConn){
Util::sleep(1000);
}else{
INFO_MSG("Connect success on attempt %zu", connectCnt+1);
break;
}
++connectCnt;
}while (!srtConn && connectCnt < 10);
}while (!srtConn && connectCnt < 5);
wantRequest = false;
parseData = true;
initialize();
}else{
// Pull output configuration, In this case we have an srt connection in the second constructor parameter.
srtConn = Socket::SRTConnection(_srtSock);
parseData = true;
wantRequest = false;
// Handle override / append of streamname options
std::string sName = srtConn.getStreamName();
if (sName != ""){
streamName = sName;
HIGH_MSG("Requesting stream %s", streamName.c_str());
Util::sanitizeName(streamName);
Util::setStreamName(streamName);
}
int64_t accTypes = config->getInteger("acceptable");
if (accTypes == 0){//Allow both directions
srtConn.setBlocking(false);
//Try to read the socket 10 times. If any reads succeed, assume they are pushing in
size_t retries = 60;
while (!accTypes && srtConn && retries){
size_t recvSize = srtConn.Recv();
if (recvSize){
accTypes = 2;
INFO_MSG("Connection put into ingest mode");
assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true);
}else{
Util::sleep(50);
}
--retries;
}
//If not, assume they are receiving.
if (!accTypes){
accTypes = 1;
INFO_MSG("Connection put into egress mode");
}
}
if (accTypes == 1){// Only allow outgoing
srtConn.setBlocking(true);
srtConn.direction = "output";
parseData = true;
wantRequest = false;
initialize();
}else if (accTypes == 2){//Only allow incoming
srtConn.setBlocking(false);
srtConn.direction = "input";
if (Triggers::shouldTrigger("PUSH_REWRITE")){
HTTP::URL reqUrl;
reqUrl.protocol = "srt";
reqUrl.port = config->getString("port");
reqUrl.host = config->getString("interface");
reqUrl.args = "streamid="+Encodings::URL::encode(sName);
std::string payload = reqUrl.getUrl() + "\n" + getConnectedHost();
std::string newUrl = "";
Triggers::doTrigger("PUSH_REWRITE", payload, "", false, newUrl);
if (!newUrl.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.getUrl().c_str());
Util::logExitReason(
"Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.getUrl().c_str());
onFinish();
return;
}
reqUrl = HTTP::URL(newUrl);
if (reqUrl.args.size()){
std::map<std::string, std::string> args;
HTTP::parseVars(reqUrl.args, args);
if (args.count("streamid")){
streamName = args["streamid"];
Util::sanitizeName(streamName);
Util::setStreamName(streamName);
}
}
}
myConn.setHost(srtConn.remotehost);
if (!allowPush("")){
onFinish();
srtConn.close();
return;
}
parseData = false;
wantRequest = true;
}
}
initialize();
lastTimeStamp = 0;
timeStampOffset = 0;
}
OutTSSRT::~OutTSSRT(){}
@ -58,13 +140,29 @@ namespace Mist{
capa["friendly"] = "TS over SRT";
capa["desc"] = "Real time streaming of TS data over SRT";
capa["deps"] = "";
capa["required"]["streamname"]["name"] = "Stream";
capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add "
"this protocol multiple times using different ports, "
"or use the streamid option on the srt connection";
capa["required"]["streamname"]["type"] = "str";
capa["required"]["streamname"]["option"] = "--stream";
capa["required"]["streamname"]["short"] = "s";
capa["optional"]["streamname"]["name"] = "Stream";
capa["optional"]["streamname"]["help"] = "What streamname to serve if no streamid is given by the other end of the connection";
capa["optional"]["streamname"]["type"] = "str";
capa["optional"]["streamname"]["option"] = "--stream";
capa["optional"]["streamname"]["short"] = "s";
capa["optional"]["streamname"]["default"] = "";
capa["optional"]["acceptable"]["name"] = "Acceptable connection types";
capa["optional"]["acceptable"]["help"] =
"Whether to allow only incoming pushes (2), only outgoing pulls (1), or both (0, default)";
capa["optional"]["acceptable"]["option"] = "--acceptable";
capa["optional"]["acceptable"]["short"] = "T";
capa["optional"]["acceptable"]["default"] = 0;
capa["optional"]["acceptable"]["type"] = "select";
capa["optional"]["acceptable"]["select"][0u][0u] = 0;
capa["optional"]["acceptable"]["select"][0u][1u] =
"Allow both incoming and outgoing connections";
capa["optional"]["acceptable"]["select"][1u][0u] = 1;
capa["optional"]["acceptable"]["select"][1u][1u] = "Allow only outgoing connections";
capa["optional"]["acceptable"]["select"][2u][0u] = 2;
capa["optional"]["acceptable"]["select"][2u][1u] = "Allow only incoming connections";
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("MPEG2");
@ -72,6 +170,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("MP3");
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
capa["codecs"][0u][1u].append("opus");
cfg->addConnectorOptions(8889, capa);
config = cfg;
capa["push_urls"].append("srt://*");
@ -84,29 +183,88 @@ namespace Mist{
cfg->addOption("target", opt);
}
// Buffer internally in the class, and send once we have over 1000 bytes of data.
// Buffers TS packets and sends after 7 are buffered.
void OutTSSRT::sendTS(const char *tsData, size_t len){
if (packetBuffer.size() >= 1000){
srtConn.SendNow(packetBuffer);
if (!srtConn){
// Allow for proper disconnect
parseData = false;
}
packetBuffer.clear();
}
packetBuffer.append(tsData, len);
if (packetBuffer.size() >= 1316){//7 whole TS packets
if (!srtConn){
if (config->getString("target").size()){
INFO_MSG("Reconnecting...");
srtConn.connect(target.host, target.getPort(), "output", targetParams);
if (!srtConn){Util::sleep(500);}
}else{
Util::logExitReason("SRT connection closed");
myConn.close();
parseData = false;
return;
}
}
if (srtConn){
srtConn.SendNow(packetBuffer, packetBuffer.size());
if (!srtConn){
if (!config->getString("target").size()){
Util::logExitReason("SRT connection closed");
myConn.close();
parseData = false;
}
}
}
packetBuffer.assign(0,0);
}
}
bool OutTSSRT::setAlternateConnectionStats(Comms::Statistics &statComm){
void OutTSSRT::requestHandler(){
size_t recvSize = srtConn.Recv();
if (!recvSize){
if (!srtConn){
myConn.close();
wantRequest = false;
}else{
Util::sleep(50);
}
return;
}
lastRecv = Util::bootSecs();
if (!assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true)){return;}
while (tsIn.hasPacket()){
tsIn.getEarliestPacket(thisPacket);
if (!thisPacket){
INFO_MSG("Could not get TS packet");
myConn.close();
wantRequest = false;
return;
}
tsIn.initializeMetadata(meta);
size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid());
if (thisIdx == INVALID_TRACK_ID){return;}
if (!userSelect.count(thisIdx)){
userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
}
uint64_t adjustTime = thisPacket.getTime() + timeStampOffset;
if (lastTimeStamp || timeStampOffset){
if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){
INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.",
PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime));
timeStampOffset += (lastTimeStamp - adjustTime);
adjustTime = thisPacket.getTime() + timeStampOffset;
}
}
lastTimeStamp = adjustTime;
thisPacket.setTime(adjustTime);
bufferLivePacket(thisPacket);
}
}
void OutTSSRT::connStats(uint64_t now, Comms::Statistics &statComm){
if (!srtConn){return;}
statComm.setUp(srtConn.dataUp());
statComm.setDown(srtConn.dataDown());
statComm.setTime(Util::bootSecs() - srtConn.connTime());
return true;
}
void OutTSSRT::handleLossyStats(Comms::Statistics &statComm){
statComm.setTime(now - srtConn.connTime());
statComm.setPacketCount(srtConn.packetCount());
statComm.setPacketLostCount(srtConn.packetLostCount());
statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount());
}
}// namespace Mist

View file

@ -1,12 +1,11 @@
#include "output_ts_base.h"
#include <mist/ts_stream.h>
#include <mist/socket_srt.h>
namespace Mist{
class OutTSSRT : public TSOutput{
public:
OutTSSRT(Socket::Connection &conn, SRTSOCKET _srtSock);
OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock);
~OutTSSRT();
static bool listenMode(){return !(config->getString("target").size());}
@ -14,19 +13,23 @@ namespace Mist{
static void init(Util::Config *cfg);
void sendTS(const char *tsData, size_t len = 188);
bool isReadyForPlay(){return true;}
virtual void requestHandler();
protected:
// Stats handling
virtual bool setAlternateConnectionStats(Comms::Statistics &statComm);
virtual void handleLossyStats(Comms::Statistics &statComm);
virtual void connStats(uint64_t now, Comms::Statistics &statComm);
virtual std::string getConnectedHost(){return srtConn.remotehost;}
virtual std::string getConnectedBinHost(){return srtConn.getBinHost();}
private:
HTTP::URL target;
int64_t timeStampOffset;
uint64_t lastTimeStamp;
bool pushOut;
std::string packetBuffer;
Util::ResizeablePointer packetBuffer;
Socket::UDPConnection pushSock;
TS::Stream tsIn;
TS::Assembler assembler;
Socket::SRTConnection srtConn;
Socket::SRTConnection & srtConn;
};
}// namespace Mist