From 77aa90d48c3396b12d5de37193cc21e39f4864d6 Mon Sep 17 00:00:00 2001 From: Ramkoemar Date: Thu, 19 Nov 2020 12:40:21 +0100 Subject: [PATCH] SRT edits: - Increased SRT socket queue from 1 to 100 - Fixed SRT initialization (now clean) - Made output_ts_base.cpp thread-safe - Made Output class thread-safe - SRT TS output can now optionally set open file limit --- lib/socket_srt.cpp | 6 ++-- lib/ts_packet.cpp | 61 ++++++++++++++++++++++++++++++++++- lib/ts_packet.h | 3 ++ src/output/mist_out_srt.cpp | 23 +++++++++++++ src/output/output.cpp | 22 +++++++------ src/output/output.h | 6 ++++ src/output/output_ts_base.cpp | 9 ++++-- src/output/output_tssrt.cpp | 8 +++++ 8 files changed, 123 insertions(+), 15 deletions(-) diff --git a/lib/socket_srt.cpp b/lib/socket_srt.cpp index 8562089f..a01117db 100644 --- a/lib/socket_srt.cpp +++ b/lib/socket_srt.cpp @@ -82,6 +82,7 @@ namespace Socket{ } SRTConnection::SRTConnection(SRTSOCKET alreadyConnected){ + initializeEmpty(); sock = alreadyConnected; } @@ -238,7 +239,7 @@ namespace Socket{ ERROR_MSG("Can't connect SRT Socket: %s", srt_getlasterror_str()); return; } - if (srt_listen(sock, 1) == SRT_ERROR){ + if (srt_listen(sock, 100) == SRT_ERROR){ srt_close(sock); sock = -1; ERROR_MSG("Can not listen on Socket"); @@ -310,7 +311,7 @@ namespace Socket{ } return; } - ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str()); +// ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str()); if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} }else{ lastGood = Util::bootMS(); @@ -346,6 +347,7 @@ namespace Socket{ outgoing_port = 0; chunkTransmitSize = 1316; blocking = false; + timeout = 0; } void SRTConnection::setBlocking(bool _blocking){ diff --git a/lib/ts_packet.cpp b/lib/ts_packet.cpp index 863c4140..ebb3a0b8 100644 --- a/lib/ts_packet.cpp +++ b/lib/ts_packet.cpp @@ -129,7 +129,7 @@ namespace TS{ bool Packet::FromPointer(const char *data){ memcpy((void *)strBuf, (void *)data, 188); pos = 188; - return true; + return strBuf[0] == 0x47; } /// The deconstructor deletes all space that may be occupied by a Packet. @@ -492,6 +492,39 @@ namespace TS{ tmpBuf += (char)(((time & 0x00000007FLL) << 1) | 0x01); } + /// Generates a PES Lead-in for a video frame. + /// Prepends the lead-in to variable toSend, assumes toSend's length is all other data. + /// \param len The length of this frame. + /// \param PTS The timestamp of the frame. + void Packet::getPESVideoLeadIn(std::string &outData, unsigned int len, unsigned long long PTS, + unsigned long long offset, bool isAligned, uint64_t bps){ + if (len){len += (offset ? 13 : 8);} + if (bps >= 50){ + if (len){len += 3;} + }else{ + bps = 0; + } + + outData.append("\000\000\001\340", 4); + outData += (char)((len >> 8) & 0xFF); + outData += (char)(len & 0xFF); + if (isAligned){ + outData.append("\204", 1); + }else{ + outData.append("\200", 1); + } + outData += (char)((offset ? 0xC0 : 0x80) | (bps ? 0x10 : 0)); // PTS/DTS + Flags + outData += (char)((offset ? 10 : 5) + (bps ? 3 : 0)); // PESHeaderDataLength + encodePESTimestamp(outData, (offset ? 0x30 : 0x20), PTS + offset); + if (offset){encodePESTimestamp(outData, 0x10, PTS);} + if (bps){ + char rate_buf[3]; + Bit::htob24(rate_buf, (bps / 50) | 0x800001); + outData.append(rate_buf, 3); + } + } + + /// Generates a PES Lead-in for a video frame. /// Prepends the lead-in to variable toSend, assumes toSend's length is all other data. /// \param len The length of this frame. @@ -527,6 +560,32 @@ namespace TS{ return tmpStr; } + /// Generates a PES Lead-in for an audio frame. + /// Prepends the lead-in to variable toSend, assumes toSend's length is all other data. + /// \param len The length of this frame. + /// \param PTS The timestamp of the frame. + void Packet::getPESAudioLeadIn(std::string & outData, unsigned int len, unsigned long long PTS, uint64_t bps){ + if (bps >= 50){ + len += 3; + }else{ + bps = 0; + } + + len += 8; + outData.append("\000\000\001\300", 4); + outData += (char)((len & 0xFF00) >> 8); // PES PacketLength + outData += (char)(len & 0x00FF); // PES PacketLength (Cont) + outData += (char)0x84; // isAligned + outData += (char)(0x80 | (bps ? 0x10 : 0)); // PTS/DTS + Flags + outData += (char)(5 + (bps ? 3 : 0)); // PESHeaderDataLength + encodePESTimestamp(outData, 0x20, PTS); + if (bps){ + char rate_buf[3]; + Bit::htob24(rate_buf, (bps / 50) | 0x800001); + outData.append(rate_buf, 3); + } + } + /// Generates a PES Lead-in for an audio frame. /// Prepends the lead-in to variable toSend, assumes toSend's length is all other data. /// \param len The length of this frame. diff --git a/lib/ts_packet.h b/lib/ts_packet.h index eeb09309..3cc3a771 100644 --- a/lib/ts_packet.h +++ b/lib/ts_packet.h @@ -72,8 +72,11 @@ namespace TS{ void updPos(unsigned int newPos); // PES helpers + static void getPESVideoLeadIn(std::string & outData, unsigned int len, unsigned long long PTS, + unsigned long long offset, bool isAligned, uint64_t bps = 0); static std::string &getPESVideoLeadIn(unsigned int len, unsigned long long PTS, unsigned long long offset, bool isAligned, uint64_t bps = 0); + static void getPESAudioLeadIn(std::string & outData, unsigned int len, unsigned long long PTS, uint64_t bps); static std::string &getPESAudioLeadIn(unsigned int len, unsigned long long PTS, uint64_t bps = 0); static std::string &getPESMetaLeadIn(unsigned int len, unsigned long long PTS, uint64_t bps = 0); static std::string &getPESPS1LeadIn(unsigned int len, unsigned long long PTS, uint64_t bps = 0); diff --git a/src/output/mist_out_srt.cpp b/src/output/mist_out_srt.cpp index 7fed8eec..120d2103 100644 --- a/src/output/mist_out_srt.cpp +++ b/src/output/mist_out_srt.cpp @@ -4,6 +4,7 @@ #include #include #include +#include Socket::SRTServer server_socket; static uint64_t sockCount = 0; @@ -52,6 +53,24 @@ static void callThreadCallbackSRT(void *srtPtr){ } } +bool sysSetNrOpenFiles(int n){ + struct rlimit limit; + if (getrlimit(RLIMIT_NOFILE, &limit) != 0) { + FAIL_MSG("Could not get open file limit: %s", strerror(errno)); + return false; + } + int currLimit = limit.rlim_cur; + if(limit.rlim_cur < n){ + limit.rlim_cur = n; + if (setrlimit(RLIMIT_NOFILE, &limit) != 0) { + FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno)); + return false; + } + HIGH_MSG("Open file limit increased from %d to %d", currLimit, n) + } + return true; + } + int main(int argc, char *argv[]){ DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN; Util::redirectLogsIfNeeded(); @@ -64,6 +83,10 @@ int main(int argc, char *argv[]){ return -1; } conf.activate(); + + int filelimit = conf.getInteger("filelimit"); + sysSetNrOpenFiles(filelimit); + if (mistOut::listenMode()){ { struct sigaction new_action; diff --git a/src/output/output.cpp b/src/output/output.cpp index bcdd97c0..4c7b2f57 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -65,6 +65,12 @@ namespace Mist{ maxSkipAhead = 7500; uaDelay = 10; realTime = 1000; + emptyCount = 0; + seekCount = 2; + firstData = true; + newUA = true; + lastPushUpdate = 0; + lastRecv = Util::bootSecs(); if (myConn){ setBlocking(true); @@ -174,10 +180,9 @@ namespace Mist{ /// May be called recursively because it calls stats() which calls this function. /// If this happens, the extra calls to the function return instantly. void Output::doSync(bool force){ - static bool recursing = false; if (!statComm){return;} - if (recursing){return;} - recursing = true; + if (recursingSync){return;} + recursingSync = true; if (statComm.getSync() == 2 || force){ if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){ // sync byte 0 = no sync yet, wait for sync from controller... @@ -252,7 +257,7 @@ namespace Mist{ statComm.setSync(10); // auto-accept if no trigger } } - recursing = false; + recursingSync = false; } std::string Output::getConnectedHost(){return myConn.getHost();} @@ -1019,7 +1024,6 @@ namespace Mist{ /// Aborts if not live, there is no main track or it has no keyframes. bool Output::liveSeek(){ if (!realTime){return false;}//Makes no sense when playing in turbo mode - static uint32_t seekCount = 2; uint64_t seekPos = 0; if (!meta.getLive()){return false;} size_t mainTrack = getMainSelectedTrack(); @@ -1093,7 +1097,6 @@ namespace Mist{ } void Output::requestHandler(){ - static bool firstData = true; // only the first time, we call onRequest if there's data buffered already. if ((firstData && myConn.Received().size()) || myConn.spool()){ firstData = false; DONTEVEN_MSG("onRequest"); @@ -1442,7 +1445,6 @@ namespace Mist{ /// \returns true if thisPacket was filled with the next packet. /// \returns false if we could not reliably determine the next packet yet. bool Output::prepareNext(){ - static size_t emptyCount = 0; if (!buffer.size()){ thisPacket.null(); INFO_MSG("Buffer completely played out"); @@ -1650,7 +1652,10 @@ namespace Mist{ if (now == lastStats && !force){return;} if (isRecording()){ - static uint64_t lastPushUpdate = now; + if(lastPushUpdate == 0){ + lastPushUpdate = now; + } + if (lastPushUpdate + 5 <= now){ JSON::Value pStat; pStat["push_status_update"]["id"] = getpid(); @@ -1692,7 +1697,6 @@ namespace Mist{ /*LTS-START*/ // Tag the session with the user agent - static bool newUA = true; // we only do this once per connection if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){ std::string APIcall = "{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}"; diff --git a/src/output/output.h b/src/output/output.h index 0873278a..7209766d 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -106,6 +106,12 @@ namespace Mist{ bool sought; ///< If a seek has been done, this is set to true. Used for seeking on ///< prepareNext(). std::string prevHost; ///< Old value for getConnectedBinHost, for caching + size_t emptyCount; + bool recursingSync; + uint32_t seekCount; + bool firstData; + uint64_t lastPushUpdate; + bool newUA; protected: // these are to be messed with by child classes virtual bool inlineRestartCapable() const{ return false; diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 410b4699..90a5be58 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -106,7 +106,8 @@ namespace Mist{ uint32_t i = 0; uint64_t offset = thisPacket.getInt("offset") * 90; - bs = TS::Packet::getPESVideoLeadIn( + bs.clear(); + TS::Packet::getPESVideoLeadIn(bs, (((dataLen + extraSize) > MAX_PES_SIZE) ? 0 : dataLen + extraSize), packTime, offset, true, M.getBps(thisIdx)); fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); @@ -143,7 +144,8 @@ namespace Mist{ } }else{ uint64_t offset = thisPacket.getInt("offset") * 90; - bs = TS::Packet::getPESVideoLeadIn(0, packTime, offset, true, M.getBps(thisIdx)); + bs.clear(); + TS::Packet::getPESVideoLeadIn(bs, 0, packTime, offset, true, M.getBps(thisIdx)); fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg); @@ -171,7 +173,8 @@ namespace Mist{ bs.append(1, (char)(dataLen-255*(dataLen/255))); fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); }else{ - bs = TS::Packet::getPESAudioLeadIn(tempLen, packTime, M.getBps(thisIdx)); + bs.clear(); + TS::Packet::getPESAudioLeadIn(bs, tempLen, packTime, M.getBps(thisIdx)); fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); if (codec == "AAC"){ bs = TS::getAudioHeader(dataLen, M.getInit(thisIdx)); diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index 18a122b8..0abdc09d 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -148,6 +148,14 @@ namespace Mist{ capa["optional"]["streamname"]["short"] = "s"; capa["optional"]["streamname"]["default"] = ""; + capa["optional"]["filelimit"]["name"] = "Open file descriptor limit"; + capa["optional"]["filelimit"]["help"] = "Increase open file descriptor to this value if current system value is lower. A higher value may be needed for handling many concurrent SRT connections."; + + capa["optional"]["filelimit"]["type"] = "int"; + capa["optional"]["filelimit"]["option"] = "--filelimit"; + capa["optional"]["filelimit"]["short"] = "l"; + capa["optional"]["filelimit"]["default"] = "1024"; + capa["optional"]["acceptable"]["name"] = "Acceptable connection types"; capa["optional"]["acceptable"]["help"] = "Whether to allow only incoming pushes (2), only outgoing pulls (1), or both (0, default)";