From 840a1f5f4edfb922323d894971065777807ba7e5 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 19 Sep 2020 21:34:15 +0200 Subject: [PATCH] Cleanup TS SRT output code --- CMakeLists.txt | 6 +- lib/util.cpp | 20 +++++ lib/util.h | 2 + src/output/mist_out_srt.cpp | 153 ------------------------------------ src/output/output_tssrt.cpp | 129 ++++++++++++++++++++++++++++++ 5 files changed, 154 insertions(+), 156 deletions(-) delete mode 100644 src/output/mist_out_srt.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a2569466..4178b807 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -505,8 +505,8 @@ macro(makeOutput outputName format) SET(tsBaseClass HTTPOutput) endif() endif() - if (";${ARGN};" MATCHES ";with_srt;") - SET(outBaseFile src/output/mist_out_srt.cpp) + if (";${ARGN};" MATCHES ";debased;") + SET(outBaseFile "") endif() if (";${ARGN};" MATCHES ";ts;") SET(tsOutput src/output/output_ts_base.cpp) @@ -555,7 +555,7 @@ if (WITH_JPG) endif() makeOutput(TS ts ts) if(SRT_LIB) - makeOutput(TSSRT tssrt ts with_srt) + makeOutput(TSSRT tssrt ts debased with_srt) endif() makeOutput(HTTPTS httpts http ts) makeOutput(HLS hls http ts) diff --git a/lib/util.cpp b/lib/util.cpp index 852a6f4d..d522a151 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -16,6 +16,7 @@ #include // _mkdir #endif #include +#include #define RAXHDR_FIELDOFFSET p[1] #define RAX_REQDFIELDS_LEN 36 @@ -969,4 +970,23 @@ namespace Util{ if (!fields.count(fName)){return RelAccXFieldData();} return fields.at(fName); } + + bool sysSetNrOpenFiles(int n){ + struct rlimit limit; + if (getrlimit(RLIMIT_NOFILE, &limit) != 0) { + FAIL_MSG("Could not get open file limit: %s", strerror(errno)); + return false; + } + int currLimit = limit.rlim_cur; + if(limit.rlim_cur < n){ + limit.rlim_cur = n; + if (setrlimit(RLIMIT_NOFILE, &limit) != 0) { + FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno)); + return false; + } + HIGH_MSG("Open file limit increased from %d to %d", currLimit, n) + } + return true; + } + }// namespace Util diff --git a/lib/util.h b/lib/util.h index c19f32e1..84b0d56c 100644 --- a/lib/util.h +++ b/lib/util.h @@ -19,6 +19,8 @@ namespace Util{ uint64_t ftell(FILE *stream); uint64_t fseek(FILE *stream, uint64_t offset, int whence); + bool sysSetNrOpenFiles(int n); + class DataCallback{ public: virtual void dataCallback(const char *ptr, size_t size){ diff --git a/src/output/mist_out_srt.cpp b/src/output/mist_out_srt.cpp deleted file mode 100644 index 120d2103..00000000 --- a/src/output/mist_out_srt.cpp +++ /dev/null @@ -1,153 +0,0 @@ -#include OUTPUTTYPE -#include -#include -#include -#include -#include -#include - -Socket::SRTServer server_socket; -static uint64_t sockCount = 0; - -void (*oldSignal)(int, siginfo_t *,void *) = 0; -void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ - server_socket.close(); - if (oldSignal){ - oldSignal(signum, sigInfo, ignore); - } -} - -void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){ - if (!sockCount){ - INFO_MSG("USR1 received - triggering rolling restart (no connections active)"); - Util::Config::is_restarting = true; - Util::logExitReason("signal USR1, no connections"); - server_socket.close(); - Util::Config::is_active = false; - }else{ - INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero"); - Util::Config::is_restarting = true; - Util::logExitReason("signal USR1, after disconnect wait"); - } -} - -// Callback for SRT-serving threads -static void callThreadCallbackSRT(void *srtPtr){ - sockCount++; - Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr; - int fds[2]; - pipe(fds); - Socket::Connection Sconn(fds[0], fds[1]); - HIGH_MSG("Started thread for socket %i", srtSock.getSocket()); - mistOut tmp(Sconn,srtSock); - tmp.run(); - HIGH_MSG("Closing thread for socket %i", srtSock.getSocket()); - Sconn.close(); - srtSock.close(); - delete &srtSock; - sockCount--; - if (!sockCount && Util::Config::is_restarting){ - server_socket.close(); - Util::Config::is_active = false; - INFO_MSG("Last active connection closed; triggering rolling restart now!"); - } -} - -bool sysSetNrOpenFiles(int n){ - struct rlimit limit; - if (getrlimit(RLIMIT_NOFILE, &limit) != 0) { - FAIL_MSG("Could not get open file limit: %s", strerror(errno)); - return false; - } - int currLimit = limit.rlim_cur; - if(limit.rlim_cur < n){ - limit.rlim_cur = n; - if (setrlimit(RLIMIT_NOFILE, &limit) != 0) { - FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno)); - return false; - } - HIGH_MSG("Open file limit increased from %d to %d", currLimit, n) - } - return true; - } - -int main(int argc, char *argv[]){ - DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN; - Util::redirectLogsIfNeeded(); - Util::Config conf(argv[0]); - mistOut::init(&conf); - if (conf.parseArgs(argc, argv)){ - if (conf.getBool("json")){ - mistOut::capa["version"] = PACKAGE_VERSION; - std::cout << mistOut::capa.toString() << std::endl; - return -1; - } - conf.activate(); - - int filelimit = conf.getInteger("filelimit"); - sysSetNrOpenFiles(filelimit); - - if (mistOut::listenMode()){ - { - struct sigaction new_action; - new_action.sa_sigaction = handleUSR1; - sigemptyset(&new_action.sa_mask); - new_action.sa_flags = 0; - sigaction(SIGUSR1, &new_action, NULL); - } - if (conf.getInteger("port") && conf.getString("interface").size()){ - server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output"); - } - if (!server_socket.connected()){ - DEVEL_MSG("Failure to open socket"); - return 1; - } - struct sigaction new_action; - struct sigaction cur_action; - new_action.sa_sigaction = signal_handler; - sigemptyset(&new_action.sa_mask); - new_action.sa_flags = SA_SIGINFO; - sigaction(SIGINT, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - sigaction(SIGHUP, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - sigaction(SIGTERM, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - Util::Procs::socketList.insert(server_socket.getSocket()); - while (conf.is_active && server_socket.connected()){ - Socket::SRTConnection S = server_socket.accept(false, "output"); - if (S.connected()){// check if the new connection is valid - // spawn a new thread for this connection - tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S)); - // detach it, no need to keep track of it anymore - T.detach(); - }else{ - Util::sleep(10); // sleep 10ms - } - } - Util::Procs::socketList.erase(server_socket.getSocket()); - server_socket.close(); - if (conf.is_restarting){ - INFO_MSG("Reloading input..."); - execvp(argv[0], argv); - FAIL_MSG("Error reloading: %s", strerror(errno)); - } - }else{ - Socket::Connection S(fileno(stdout), fileno(stdin)); - Socket::SRTConnection tmpSock; - mistOut tmp(S, tmpSock); - return tmp.run(); - } - } - INFO_MSG("Exit reason: %s", Util::exitReason); - return 0; -} diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index ea46eccf..aa06189e 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -354,3 +354,132 @@ namespace Mist{ } }// namespace Mist + + + +Socket::SRTServer server_socket; +static uint64_t sockCount = 0; + +void (*oldSignal)(int, siginfo_t *,void *) = 0; +void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ + server_socket.close(); + if (oldSignal){ + oldSignal(signum, sigInfo, ignore); + } +} + +void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){ + if (!sockCount){ + INFO_MSG("USR1 received - triggering rolling restart (no connections active)"); + Util::Config::is_restarting = true; + Util::logExitReason("signal USR1, no connections"); + server_socket.close(); + Util::Config::is_active = false; + }else{ + INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero"); + Util::Config::is_restarting = true; + Util::logExitReason("signal USR1, after disconnect wait"); + } +} + +// Callback for SRT-serving threads +static void callThreadCallbackSRT(void *srtPtr){ + sockCount++; + Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr; + int fds[2]; + pipe(fds); + Socket::Connection Sconn(fds[0], fds[1]); + HIGH_MSG("Started thread for socket %i", srtSock.getSocket()); + mistOut tmp(Sconn,srtSock); + tmp.run(); + HIGH_MSG("Closing thread for socket %i", srtSock.getSocket()); + Sconn.close(); + srtSock.close(); + delete &srtSock; + sockCount--; + if (!sockCount && Util::Config::is_restarting){ + server_socket.close(); + Util::Config::is_active = false; + INFO_MSG("Last active connection closed; triggering rolling restart now!"); + } +} + +int main(int argc, char *argv[]){ + DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN; + Util::redirectLogsIfNeeded(); + Util::Config conf(argv[0]); + mistOut::init(&conf); + if (conf.parseArgs(argc, argv)){ + if (conf.getBool("json")){ + mistOut::capa["version"] = PACKAGE_VERSION; + std::cout << mistOut::capa.toString() << std::endl; + return -1; + } + conf.activate(); + + int filelimit = conf.getInteger("filelimit"); + Util::sysSetNrOpenFiles(filelimit); + + if (!mistOut::listenMode()){ + Socket::Connection S(fileno(stdout), fileno(stdin)); + Socket::SRTConnection tmpSock; + mistOut tmp(S, tmpSock); + return tmp.run(); + } + { + struct sigaction new_action; + new_action.sa_sigaction = handleUSR1; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGUSR1, &new_action, NULL); + } + if (conf.getInteger("port") && conf.getString("interface").size()){ + server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output"); + } + if (!server_socket.connected()){ + DEVEL_MSG("Failure to open socket"); + return 1; + } + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = signal_handler; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_SIGINFO; + sigaction(SIGINT, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGHUP, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGTERM, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + Util::Procs::socketList.insert(server_socket.getSocket()); + while (conf.is_active && server_socket.connected()){ + Socket::SRTConnection S = server_socket.accept(false, "output"); + if (S.connected()){// check if the new connection is valid + // spawn a new thread for this connection + tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S)); + // detach it, no need to keep track of it anymore + T.detach(); + }else{ + Util::sleep(10); // sleep 10ms + } + } + Util::Procs::socketList.erase(server_socket.getSocket()); + server_socket.close(); + if (conf.is_restarting){ + INFO_MSG("Reloading input..."); + execvp(argv[0], argv); + FAIL_MSG("Error reloading: %s", strerror(errno)); + } + } + INFO_MSG("Exit reason: %s", Util::exitReason); + return 0; +}