Unified all input types into a single flexible type

This commit is contained in:
Thulinma 2017-02-28 13:57:59 +01:00
parent 56a85cdab5
commit 8866b299fb
4 changed files with 66 additions and 86 deletions

View file

@ -226,13 +226,8 @@ makeAnalyser(RAX rax)
# MistServer - Inputs #
########################################
macro(makeInput inputName format)
if (";${ARGN};" MATCHES ";folder;")
set(mainScript src/input/mist_in_folder.cpp)
else()
set(mainScript src/input/mist_in.cpp)
endif()
add_executable(MistIn${inputName}
${mainScript}
src/input/mist_in.cpp
src/input/input.cpp
src/input/input_${format}.cpp
src/io.cpp

View file

@ -4,6 +4,8 @@
#include <mist/stream.h>
#include <mist/defines.h>
#include <mist/procs.h>
#include <sys/wait.h>
#include "input.h"
#include <sstream>
#include <fstream>
@ -88,6 +90,67 @@ namespace Mist {
}
}
/// Starts checks the SEM_INPUT lock, starts an angel process and then
int Input::boot(int argc, char * argv[]){
if (!(config->parseArgs(argc, argv))){return 1;}
streamName = config->getString("streamname");
IPC::semaphore playerLock;
if (needsLock() && streamName.size()){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
}
}
config->activate();
uint64_t reTimer = 0;
while (config->is_active){
pid_t pid = fork();
if (pid == 0){
if (needsLock()){playerLock.close();}
return run();
}
if (pid == -1){
FAIL_MSG("Unable to spawn input process");
if (needsLock()){playerLock.post();}
return 2;
}
//wait for the process to exit
int status;
while (waitpid(pid, &status, 0) != pid && errno == EINTR){
if (!config->is_active){
INFO_MSG("Shutting down input for stream %s because of signal interrupt...", streamName.c_str());
Util::Procs::Stop(pid);
}
continue;
}
//if the exit was clean, don't restart it
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
MEDIUM_MSG("Input for stream %s shut down cleanly", streamName.c_str());
break;
}
#if DEBUG >= DLVL_DEVEL
WARN_MSG("Aborting autoclean; this is a development build.");
INFO_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
break;
#else
onCrash();
INFO_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
Util::wait(reTimer);
reTimer += 1000;
#endif
}
if (needsLock()){
playerLock.post();
playerLock.unlink();
playerLock.close();
}
return 0;
}
int Input::run() {
if (config->getBool("json")) {
std::cout << capa.toString() << std::endl;

View file

@ -21,7 +21,7 @@ namespace Mist {
Input(Util::Config * cfg);
virtual int run();
virtual void onCrash();
virtual void argumentsParsed(){}
virtual int boot(int argc, char * argv[]);
virtual ~Input() {};
virtual bool needsLock(){return true;}

View file

@ -1,86 +1,8 @@
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
#include <semaphore.h>
#include INPUTTYPE
#include <mist/config.h>
#include <mist/defines.h>
#include <mist/procs.h>
int main(int argc, char * argv[]) {
Util::Config conf(argv[0]);
mistIn conv(&conf);
if (conf.parseArgs(argc, argv)) {
std::string streamName = conf.getString("streamname");
conv.argumentsParsed();
IPC::semaphore playerLock;
if (conv.needsLock()){
if (streamName.size()){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
}
}
}
conf.activate();
uint64_t reTimer = 0;
while (conf.is_active){
pid_t pid = fork();
if (pid == 0){
if (conv.needsLock()){
playerLock.close();
}
return conv.run();
}
if (pid == -1){
DEBUG_MSG(DLVL_FAIL, "Unable to spawn player process");
if (conv.needsLock()){
playerLock.post();
}
return 2;
}
//wait for the process to exit
int status;
while (waitpid(pid, &status, 0) != pid && errno == EINTR){
if (!conf.is_active){
DEBUG_MSG(DLVL_DEVEL, "Shutting down input for stream %s because of signal interrupt...", streamName.c_str());
Util::Procs::Stop(pid);
}
continue;
}
//if the exit was clean, don't restart it
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str());
break;
}
#if DEBUG >= DLVL_DEVEL
WARN_MSG("Aborting autoclean; this is a development build.");
#else
conv.onCrash();
#endif
if (DEBUG >= DLVL_DEVEL){
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
break;
}else{
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
Util::wait(reTimer);
reTimer += 1000;
}
}
if (conv.needsLock()){
playerLock.post();
playerLock.unlink();
playerLock.close();
}
}
return 0;
return conv.boot(argc, argv);
}