SRT edits:

- Increased SRT socket queue from 1 to 100
- Fixed SRT initialization (now clean)
- Made output_ts_base.cpp thread-safe
- Made Output class thread-safe
- SRT TS output can now optionally set open file limit
This commit is contained in:
Ramkoemar 2020-11-19 12:40:21 +01:00 committed by Thulinma
parent 0bd5d742f6
commit 77aa90d48c
8 changed files with 123 additions and 15 deletions

View file

@ -4,6 +4,7 @@
#include <mist/socket.h>
#include <mist/socket_srt.h>
#include <mist/util.h>
#include <sys/resource.h>
Socket::SRTServer server_socket;
static uint64_t sockCount = 0;
@ -52,6 +53,24 @@ static void callThreadCallbackSRT(void *srtPtr){
}
}
bool sysSetNrOpenFiles(int n){
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
FAIL_MSG("Could not get open file limit: %s", strerror(errno));
return false;
}
int currLimit = limit.rlim_cur;
if(limit.rlim_cur < n){
limit.rlim_cur = n;
if (setrlimit(RLIMIT_NOFILE, &limit) != 0) {
FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno));
return false;
}
HIGH_MSG("Open file limit increased from %d to %d", currLimit, n)
}
return true;
}
int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
Util::redirectLogsIfNeeded();
@ -64,6 +83,10 @@ int main(int argc, char *argv[]){
return -1;
}
conf.activate();
int filelimit = conf.getInteger("filelimit");
sysSetNrOpenFiles(filelimit);
if (mistOut::listenMode()){
{
struct sigaction new_action;

View file

@ -65,6 +65,12 @@ namespace Mist{
maxSkipAhead = 7500;
uaDelay = 10;
realTime = 1000;
emptyCount = 0;
seekCount = 2;
firstData = true;
newUA = true;
lastPushUpdate = 0;
lastRecv = Util::bootSecs();
if (myConn){
setBlocking(true);
@ -174,10 +180,9 @@ namespace Mist{
/// May be called recursively because it calls stats() which calls this function.
/// If this happens, the extra calls to the function return instantly.
void Output::doSync(bool force){
static bool recursing = false;
if (!statComm){return;}
if (recursing){return;}
recursing = true;
if (recursingSync){return;}
recursingSync = true;
if (statComm.getSync() == 2 || force){
if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){
// sync byte 0 = no sync yet, wait for sync from controller...
@ -252,7 +257,7 @@ namespace Mist{
statComm.setSync(10); // auto-accept if no trigger
}
}
recursing = false;
recursingSync = false;
}
std::string Output::getConnectedHost(){return myConn.getHost();}
@ -1019,7 +1024,6 @@ namespace Mist{
/// Aborts if not live, there is no main track or it has no keyframes.
bool Output::liveSeek(){
if (!realTime){return false;}//Makes no sense when playing in turbo mode
static uint32_t seekCount = 2;
uint64_t seekPos = 0;
if (!meta.getLive()){return false;}
size_t mainTrack = getMainSelectedTrack();
@ -1093,7 +1097,6 @@ namespace Mist{
}
void Output::requestHandler(){
static bool firstData = true; // only the first time, we call onRequest if there's data buffered already.
if ((firstData && myConn.Received().size()) || myConn.spool()){
firstData = false;
DONTEVEN_MSG("onRequest");
@ -1442,7 +1445,6 @@ namespace Mist{
/// \returns true if thisPacket was filled with the next packet.
/// \returns false if we could not reliably determine the next packet yet.
bool Output::prepareNext(){
static size_t emptyCount = 0;
if (!buffer.size()){
thisPacket.null();
INFO_MSG("Buffer completely played out");
@ -1650,7 +1652,10 @@ namespace Mist{
if (now == lastStats && !force){return;}
if (isRecording()){
static uint64_t lastPushUpdate = now;
if(lastPushUpdate == 0){
lastPushUpdate = now;
}
if (lastPushUpdate + 5 <= now){
JSON::Value pStat;
pStat["push_status_update"]["id"] = getpid();
@ -1692,7 +1697,6 @@ namespace Mist{
/*LTS-START*/
// Tag the session with the user agent
static bool newUA = true; // we only do this once per connection
if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){
std::string APIcall =
"{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}";

View file

@ -106,6 +106,12 @@ namespace Mist{
bool sought; ///< If a seek has been done, this is set to true. Used for seeking on
///< prepareNext().
std::string prevHost; ///< Old value for getConnectedBinHost, for caching
size_t emptyCount;
bool recursingSync;
uint32_t seekCount;
bool firstData;
uint64_t lastPushUpdate;
bool newUA;
protected: // these are to be messed with by child classes
virtual bool inlineRestartCapable() const{
return false;

View file

@ -106,7 +106,8 @@ namespace Mist{
uint32_t i = 0;
uint64_t offset = thisPacket.getInt("offset") * 90;
bs = TS::Packet::getPESVideoLeadIn(
bs.clear();
TS::Packet::getPESVideoLeadIn(bs,
(((dataLen + extraSize) > MAX_PES_SIZE) ? 0 : dataLen + extraSize),
packTime, offset, true, M.getBps(thisIdx));
fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg);
@ -143,7 +144,8 @@ namespace Mist{
}
}else{
uint64_t offset = thisPacket.getInt("offset") * 90;
bs = TS::Packet::getPESVideoLeadIn(0, packTime, offset, true, M.getBps(thisIdx));
bs.clear();
TS::Packet::getPESVideoLeadIn(bs, 0, packTime, offset, true, M.getBps(thisIdx));
fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg);
fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg);
@ -171,7 +173,8 @@ namespace Mist{
bs.append(1, (char)(dataLen-255*(dataLen/255)));
fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg);
}else{
bs = TS::Packet::getPESAudioLeadIn(tempLen, packTime, M.getBps(thisIdx));
bs.clear();
TS::Packet::getPESAudioLeadIn(bs, tempLen, packTime, M.getBps(thisIdx));
fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg);
if (codec == "AAC"){
bs = TS::getAudioHeader(dataLen, M.getInit(thisIdx));

View file

@ -148,6 +148,14 @@ namespace Mist{
capa["optional"]["streamname"]["short"] = "s";
capa["optional"]["streamname"]["default"] = "";
capa["optional"]["filelimit"]["name"] = "Open file descriptor limit";
capa["optional"]["filelimit"]["help"] = "Increase open file descriptor to this value if current system value is lower. A higher value may be needed for handling many concurrent SRT connections.";
capa["optional"]["filelimit"]["type"] = "int";
capa["optional"]["filelimit"]["option"] = "--filelimit";
capa["optional"]["filelimit"]["short"] = "l";
capa["optional"]["filelimit"]["default"] = "1024";
capa["optional"]["acceptable"]["name"] = "Acceptable connection types";
capa["optional"]["acceptable"]["help"] =
"Whether to allow only incoming pushes (2), only outgoing pulls (1), or both (0, default)";