Cleanup TS SRT output code

This commit is contained in:
Thulinma 2020-09-19 21:34:15 +02:00
parent 1ec18d83b8
commit 840a1f5f4e
5 changed files with 154 additions and 156 deletions

View file

@ -505,8 +505,8 @@ macro(makeOutput outputName format)
SET(tsBaseClass HTTPOutput)
endif()
endif()
if (";${ARGN};" MATCHES ";with_srt;")
SET(outBaseFile src/output/mist_out_srt.cpp)
if (";${ARGN};" MATCHES ";debased;")
SET(outBaseFile "")
endif()
if (";${ARGN};" MATCHES ";ts;")
SET(tsOutput src/output/output_ts_base.cpp)
@ -555,7 +555,7 @@ if (WITH_JPG)
endif()
makeOutput(TS ts ts)
if(SRT_LIB)
makeOutput(TSSRT tssrt ts with_srt)
makeOutput(TSSRT tssrt ts debased with_srt)
endif()
makeOutput(HTTPTS httpts http ts)
makeOutput(HLS hls http ts)

View file

@ -16,6 +16,7 @@
#include <direct.h> // _mkdir
#endif
#include <stdlib.h>
#include <sys/resource.h>
#define RAXHDR_FIELDOFFSET p[1]
#define RAX_REQDFIELDS_LEN 36
@ -969,4 +970,23 @@ namespace Util{
if (!fields.count(fName)){return RelAccXFieldData();}
return fields.at(fName);
}
bool sysSetNrOpenFiles(int n){
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
FAIL_MSG("Could not get open file limit: %s", strerror(errno));
return false;
}
int currLimit = limit.rlim_cur;
if(limit.rlim_cur < n){
limit.rlim_cur = n;
if (setrlimit(RLIMIT_NOFILE, &limit) != 0) {
FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno));
return false;
}
HIGH_MSG("Open file limit increased from %d to %d", currLimit, n)
}
return true;
}
}// namespace Util

View file

@ -19,6 +19,8 @@ namespace Util{
uint64_t ftell(FILE *stream);
uint64_t fseek(FILE *stream, uint64_t offset, int whence);
bool sysSetNrOpenFiles(int n);
class DataCallback{
public:
virtual void dataCallback(const char *ptr, size_t size){

View file

@ -1,153 +0,0 @@
#include OUTPUTTYPE
#include <mist/config.h>
#include <mist/defines.h>
#include <mist/socket.h>
#include <mist/socket_srt.h>
#include <mist/util.h>
#include <sys/resource.h>
Socket::SRTServer server_socket;
static uint64_t sockCount = 0;
void (*oldSignal)(int, siginfo_t *,void *) = 0;
void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
server_socket.close();
if (oldSignal){
oldSignal(signum, sigInfo, ignore);
}
}
void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
if (!sockCount){
INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, no connections");
server_socket.close();
Util::Config::is_active = false;
}else{
INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, after disconnect wait");
}
}
// Callback for SRT-serving threads
static void callThreadCallbackSRT(void *srtPtr){
sockCount++;
Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr;
int fds[2];
pipe(fds);
Socket::Connection Sconn(fds[0], fds[1]);
HIGH_MSG("Started thread for socket %i", srtSock.getSocket());
mistOut tmp(Sconn,srtSock);
tmp.run();
HIGH_MSG("Closing thread for socket %i", srtSock.getSocket());
Sconn.close();
srtSock.close();
delete &srtSock;
sockCount--;
if (!sockCount && Util::Config::is_restarting){
server_socket.close();
Util::Config::is_active = false;
INFO_MSG("Last active connection closed; triggering rolling restart now!");
}
}
bool sysSetNrOpenFiles(int n){
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
FAIL_MSG("Could not get open file limit: %s", strerror(errno));
return false;
}
int currLimit = limit.rlim_cur;
if(limit.rlim_cur < n){
limit.rlim_cur = n;
if (setrlimit(RLIMIT_NOFILE, &limit) != 0) {
FAIL_MSG("Could not set open file limit from %d to %d: %s", currLimit, n, strerror(errno));
return false;
}
HIGH_MSG("Open file limit increased from %d to %d", currLimit, n)
}
return true;
}
int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
Util::redirectLogsIfNeeded();
Util::Config conf(argv[0]);
mistOut::init(&conf);
if (conf.parseArgs(argc, argv)){
if (conf.getBool("json")){
mistOut::capa["version"] = PACKAGE_VERSION;
std::cout << mistOut::capa.toString() << std::endl;
return -1;
}
conf.activate();
int filelimit = conf.getInteger("filelimit");
sysSetNrOpenFiles(filelimit);
if (mistOut::listenMode()){
{
struct sigaction new_action;
new_action.sa_sigaction = handleUSR1;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = 0;
sigaction(SIGUSR1, &new_action, NULL);
}
if (conf.getInteger("port") && conf.getString("interface").size()){
server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output");
}
if (!server_socket.connected()){
DEVEL_MSG("Failure to open socket");
return 1;
}
struct sigaction new_action;
struct sigaction cur_action;
new_action.sa_sigaction = signal_handler;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = SA_SIGINFO;
sigaction(SIGINT, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
sigaction(SIGHUP, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
sigaction(SIGTERM, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
Util::Procs::socketList.insert(server_socket.getSocket());
while (conf.is_active && server_socket.connected()){
Socket::SRTConnection S = server_socket.accept(false, "output");
if (S.connected()){// check if the new connection is valid
// spawn a new thread for this connection
tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S));
// detach it, no need to keep track of it anymore
T.detach();
}else{
Util::sleep(10); // sleep 10ms
}
}
Util::Procs::socketList.erase(server_socket.getSocket());
server_socket.close();
if (conf.is_restarting){
INFO_MSG("Reloading input...");
execvp(argv[0], argv);
FAIL_MSG("Error reloading: %s", strerror(errno));
}
}else{
Socket::Connection S(fileno(stdout), fileno(stdin));
Socket::SRTConnection tmpSock;
mistOut tmp(S, tmpSock);
return tmp.run();
}
}
INFO_MSG("Exit reason: %s", Util::exitReason);
return 0;
}

View file

@ -354,3 +354,132 @@ namespace Mist{
}
}// namespace Mist
Socket::SRTServer server_socket;
static uint64_t sockCount = 0;
void (*oldSignal)(int, siginfo_t *,void *) = 0;
void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){
server_socket.close();
if (oldSignal){
oldSignal(signum, sigInfo, ignore);
}
}
void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
if (!sockCount){
INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, no connections");
server_socket.close();
Util::Config::is_active = false;
}else{
INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, after disconnect wait");
}
}
// Callback for SRT-serving threads
static void callThreadCallbackSRT(void *srtPtr){
sockCount++;
Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr;
int fds[2];
pipe(fds);
Socket::Connection Sconn(fds[0], fds[1]);
HIGH_MSG("Started thread for socket %i", srtSock.getSocket());
mistOut tmp(Sconn,srtSock);
tmp.run();
HIGH_MSG("Closing thread for socket %i", srtSock.getSocket());
Sconn.close();
srtSock.close();
delete &srtSock;
sockCount--;
if (!sockCount && Util::Config::is_restarting){
server_socket.close();
Util::Config::is_active = false;
INFO_MSG("Last active connection closed; triggering rolling restart now!");
}
}
int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
Util::redirectLogsIfNeeded();
Util::Config conf(argv[0]);
mistOut::init(&conf);
if (conf.parseArgs(argc, argv)){
if (conf.getBool("json")){
mistOut::capa["version"] = PACKAGE_VERSION;
std::cout << mistOut::capa.toString() << std::endl;
return -1;
}
conf.activate();
int filelimit = conf.getInteger("filelimit");
Util::sysSetNrOpenFiles(filelimit);
if (!mistOut::listenMode()){
Socket::Connection S(fileno(stdout), fileno(stdin));
Socket::SRTConnection tmpSock;
mistOut tmp(S, tmpSock);
return tmp.run();
}
{
struct sigaction new_action;
new_action.sa_sigaction = handleUSR1;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = 0;
sigaction(SIGUSR1, &new_action, NULL);
}
if (conf.getInteger("port") && conf.getString("interface").size()){
server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), false, "output");
}
if (!server_socket.connected()){
DEVEL_MSG("Failure to open socket");
return 1;
}
struct sigaction new_action;
struct sigaction cur_action;
new_action.sa_sigaction = signal_handler;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = SA_SIGINFO;
sigaction(SIGINT, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
sigaction(SIGHUP, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
sigaction(SIGTERM, &new_action, &cur_action);
if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){
if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");}
oldSignal = cur_action.sa_sigaction;
}
Util::Procs::socketList.insert(server_socket.getSocket());
while (conf.is_active && server_socket.connected()){
Socket::SRTConnection S = server_socket.accept(false, "output");
if (S.connected()){// check if the new connection is valid
// spawn a new thread for this connection
tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S));
// detach it, no need to keep track of it anymore
T.detach();
}else{
Util::sleep(10); // sleep 10ms
}
}
Util::Procs::socketList.erase(server_socket.getSocket());
server_socket.close();
if (conf.is_restarting){
INFO_MSG("Reloading input...");
execvp(argv[0], argv);
FAIL_MSG("Error reloading: %s", strerror(errno));
}
}
INFO_MSG("Exit reason: %s", Util::exitReason);
return 0;
}