diff --git a/CMakeLists.txt b/CMakeLists.txt
index a2569466..4178b807 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -505,8 +505,8 @@ macro(makeOutput outputName format)
SET(tsBaseClass HTTPOutput)
endif()
endif()
- if (";${ARGN};" MATCHES ";with_srt;")
- SET(outBaseFile src/output/mist_out_srt.cpp)
+ if (";${ARGN};" MATCHES ";debased;")
+ SET(outBaseFile "")
endif()
if (";${ARGN};" MATCHES ";ts;")
SET(tsOutput src/output/output_ts_base.cpp)
@@ -555,7 +555,7 @@ if (WITH_JPG)
endif()
makeOutput(TS ts ts)
if(SRT_LIB)
- makeOutput(TSSRT tssrt ts with_srt)
+ makeOutput(TSSRT tssrt ts debased with_srt)
endif()
makeOutput(HTTPTS httpts http ts)
makeOutput(HLS hls http ts)
diff --git a/lib/util.cpp b/lib/util.cpp
index 852a6f4d..d522a151 100644
--- a/lib/util.cpp
+++ b/lib/util.cpp
@@ -16,6 +16,7 @@
#include <direct.h> // _mkdir
#endif
#include <stdlib.h>
+#include <sys/resource.h>
#define RAXHDR_FIELDOFFSET p[1]
#define RAX_REQDFIELDS_LEN 36
@@ -969,4 +970,23 @@ namespace Util{
if (!fields.count(fName)){return RelAccXFieldData();}
return fields.at(fName);
}
+
+ bool sysSetNrOpenFiles(int n){
+ struct rlimit limit;
+ if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
+ FAIL_MSG("Could not get open file limit: %s", strerror(errno));
+ return false;
+ }
+ int currLimit = limit.rlim_cur;
+ if(limit.rlim_cur < n){
+ limit.rlim_cur = n;
+ if (setrlimit(RLIMIT_NOFILE, &limit) != 0) {
+ FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno));
+ return false;
+ }
+ HIGH_MSG("Open file limit increased from %d to %d", currLimit, n)
+ }
+ return true;
+ }
+
}// namespace Util
diff --git a/lib/util.h b/lib/util.h
index c19f32e1..84b0d56c 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -19,6 +19,8 @@ namespace Util{
uint64_t ftell(FILE *stream);
uint64_t fseek(FILE *stream, uint64_t offset, int whence);
+ bool sysSetNrOpenFiles(int n);
+
class DataCallback{
public:
virtual void dataCallback(const char *ptr, size_t size){
diff --git a/src/output/mist_out_srt.cpp b/src/output/mist_out_srt.cpp
deleted file mode 100644
index 120d2103..00000000
--- a/src/output/mist_out_srt.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-#include OUTPUTTYPE
-#include <mist/config.h>
-#include <mist/defines.h>
-#include <mist/socket.h>
-#include <mist/socket_srt.h>
-#include <mist/util.h>
-#include <sys/resource.h>
-
-Socket::SRTServer server_socket;
-static uint64_t sockCount = 0;
-
-void (*oldSignal)(int, siginfo_t *,void *) = 0;
-void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
- server_socket.close();
- if (oldSignal){
- oldSignal(signum, sigInfo, ignore);
- }
-}
-
-void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
- if (!sockCount){
- INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
- Util::Config::is_restarting = true;
- Util::logExitReason("signal USR1, no connections");
- server_socket.close();
- Util::Config::is_active = false;
- }else{
- INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
- Util::Config::is_restarting = true;
- Util::logExitReason("signal USR1, after disconnect wait");
- }
-}
-
-// Callback for SRT-serving threads
-static void callThreadCallbackSRT(void *srtPtr){
- sockCount++;
- Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr;
- int fds[2];
- pipe(fds);
- Socket::Connection Sconn(fds[0], fds[1]);
- HIGH_MSG("Started thread for socket %i", srtSock.getSocket());
- mistOut tmp(Sconn,srtSock);
- tmp.run();
- HIGH_MSG("Closing thread for socket %i", srtSock.getSocket());
- Sconn.close();
- srtSock.close();
- delete &srtSock;
- sockCount--;
- if (!sockCount && Util::Config::is_restarting){
- server_socket.close();
- Util::Config::is_active = false;
- INFO_MSG("Last active connection closed; triggering rolling restart now!");
- }
-}
-
-bool sysSetNrOpenFiles(int n){
- struct rlimit limit;
- if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
- FAIL_MSG("Could not get open file limit: %s", strerror(errno));
- return false;
- }
- int currLimit = limit.rlim_cur;
- if(limit.rlim_cur < n){
- limit.rlim_cur = n;
- if (setrlimit(RLIMIT_NOFILE, &limit) != 0) {
- FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno));
- return false;
- }
- HIGH_MSG("Open file limit increased from %d to %d", currLimit, n)
- }
- return true;
- }
-
-int main(int argc, char *argv[]){
- DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
- Util::redirectLogsIfNeeded();
- Util::Config conf(argv[0]);
- mistOut::init(&conf);
- if (conf.parseArgs(argc, argv)){
- if (conf.getBool("json")){
- mistOut::capa["version"] = PACKAGE_VERSION;
- std::cout << mistOut::capa.toString() << std::endl;
- return -1;
- }
- conf.activate();
-
- int filelimit = conf.getInteger("filelimit");
- sysSetNrOpenFiles(filelimit);
-
- if (mistOut::listenMode()){
- {
- struct sigaction new_action;
- new_action.sa_sigaction = handleUSR1;
- sigemptyset(&new_action.sa_mask);
- new_action.sa_flags = 0;
- sigaction(SIGUSR1, &new_action, NULL);
- }
- if (conf.getInteger("port") && conf.getString("interface").size()){
- server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output");
- }
- if (!server_socket.connected()){
- DEVEL_MSG("Failure to open socket");
- return 1;
- }
- struct sigaction new_action;
- struct sigaction cur_action;
- new_action.sa_sigaction = signal_handler;
- sigemptyset(&new_action.sa_mask);
- new_action.sa_flags = SA_SIGINFO;
- sigaction(SIGINT, &new_action, &cur_action);
- if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
- if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
- oldSignal = cur_action.sa_sigaction;
- }
- sigaction(SIGHUP, &new_action, &cur_action);
- if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
- if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
- oldSignal = cur_action.sa_sigaction;
- }
- sigaction(SIGTERM, &new_action, &cur_action);
- if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
- if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
- oldSignal = cur_action.sa_sigaction;
- }
- Util::Procs::socketList.insert(server_socket.getSocket());
- while (conf.is_active && server_socket.connected()){
- Socket::SRTConnection S = server_socket.accept(false, "output");
- if (S.connected()){// check if the new connection is valid
- // spawn a new thread for this connection
- tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S));
- // detach it, no need to keep track of it anymore
- T.detach();
- }else{
- Util::sleep(10); // sleep 10ms
- }
- }
- Util::Procs::socketList.erase(server_socket.getSocket());
- server_socket.close();
- if (conf.is_restarting){
- INFO_MSG("Reloading input...");
- execvp(argv[0], argv);
- FAIL_MSG("Error reloading: %s", strerror(errno));
- }
- }else{
- Socket::Connection S(fileno(stdout), fileno(stdin));
- Socket::SRTConnection tmpSock;
- mistOut tmp(S, tmpSock);
- return tmp.run();
- }
- }
- INFO_MSG("Exit reason: %s", Util::exitReason);
- return 0;
-}
diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp
index ea46eccf..aa06189e 100644
--- a/src/output/output_tssrt.cpp
+++ b/src/output/output_tssrt.cpp
@@ -354,3 +354,132 @@ namespace Mist{
}
}// namespace Mist
+
+
+
+Socket::SRTServer server_socket;
+static uint64_t sockCount = 0;
+
+void (*oldSignal)(int, siginfo_t *,void *) = 0;
+void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
+ server_socket.close();
+ if (oldSignal){
+ oldSignal(signum, sigInfo, ignore);
+ }
+}
+
+void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
+ if (!sockCount){
+ INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
+ Util::Config::is_restarting = true;
+ Util::logExitReason("signal USR1, no connections");
+ server_socket.close();
+ Util::Config::is_active = false;
+ }else{
+ INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
+ Util::Config::is_restarting = true;
+ Util::logExitReason("signal USR1, after disconnect wait");
+ }
+}
+
+// Callback for SRT-serving threads
+static void callThreadCallbackSRT(void *srtPtr){
+ sockCount++;
+ Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr;
+ int fds[2];
+ pipe(fds);
+ Socket::Connection Sconn(fds[0], fds[1]);
+ HIGH_MSG("Started thread for socket %i", srtSock.getSocket());
+ mistOut tmp(Sconn,srtSock);
+ tmp.run();
+ HIGH_MSG("Closing thread for socket %i", srtSock.getSocket());
+ Sconn.close();
+ srtSock.close();
+ delete &srtSock;
+ sockCount--;
+ if (!sockCount && Util::Config::is_restarting){
+ server_socket.close();
+ Util::Config::is_active = false;
+ INFO_MSG("Last active connection closed; triggering rolling restart now!");
+ }
+}
+
+int main(int argc, char *argv[]){
+ DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
+ Util::redirectLogsIfNeeded();
+ Util::Config conf(argv[0]);
+ mistOut::init(&conf);
+ if (conf.parseArgs(argc, argv)){
+ if (conf.getBool("json")){
+ mistOut::capa["version"] = PACKAGE_VERSION;
+ std::cout << mistOut::capa.toString() << std::endl;
+ return -1;
+ }
+ conf.activate();
+
+ int filelimit = conf.getInteger("filelimit");
+ Util::sysSetNrOpenFiles(filelimit);
+
+ if (!mistOut::listenMode()){
+ Socket::Connection S(fileno(stdout), fileno(stdin));
+ Socket::SRTConnection tmpSock;
+ mistOut tmp(S, tmpSock);
+ return tmp.run();
+ }
+ {
+ struct sigaction new_action;
+ new_action.sa_sigaction = handleUSR1;
+ sigemptyset(&new_action.sa_mask);
+ new_action.sa_flags = 0;
+ sigaction(SIGUSR1, &new_action, NULL);
+ }
+ if (conf.getInteger("port") && conf.getString("interface").size()){
+ server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output");
+ }
+ if (!server_socket.connected()){
+ DEVEL_MSG("Failure to open socket");
+ return 1;
+ }
+ struct sigaction new_action;
+ struct sigaction cur_action;
+ new_action.sa_sigaction = signal_handler;
+ sigemptyset(&new_action.sa_mask);
+ new_action.sa_flags = SA_SIGINFO;
+ sigaction(SIGINT, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ sigaction(SIGHUP, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ sigaction(SIGTERM, &new_action, &cur_action);
+ if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
+ if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
+ oldSignal = cur_action.sa_sigaction;
+ }
+ Util::Procs::socketList.insert(server_socket.getSocket());
+ while (conf.is_active && server_socket.connected()){
+ Socket::SRTConnection S = server_socket.accept(false, "output");
+ if (S.connected()){// check if the new connection is valid
+ // spawn a new thread for this connection
+ tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S));
+ // detach it, no need to keep track of it anymore
+ T.detach();
+ }else{
+ Util::sleep(10); // sleep 10ms
+ }
+ }
+ Util::Procs::socketList.erase(server_socket.getSocket());
+ server_socket.close();
+ if (conf.is_restarting){
+ INFO_MSG("Reloading input...");
+ execvp(argv[0], argv);
+ FAIL_MSG("Error reloading: %s", strerror(errno));
+ }
+ }
+ INFO_MSG("Exit reason: %s", Util::exitReason);
+ return 0;
+}