Cleaned up, fixed and robustified semaphore and signal related code

This commit is contained in:
Thulinma 2018-11-01 16:11:47 +01:00
parent ee9b076b76
commit 24006648f9
7 changed files with 165 additions and 131 deletions

View file

@ -83,7 +83,8 @@ void statusMonitor(void *np){
WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config.");
changed = true;
}
configLock.post();
configLock.unlink();
configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (changed || Controller::configChanged){
Controller::writeConfig();
Controller::configChanged = false;

View file

@ -109,41 +109,77 @@ namespace Mist {
}
IPC::semaphore playerLock;
if (needsLock() && streamName.size()){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
IPC::semaphore pullLock;
//If we're not converting, we might need a lock.
if (streamName.size()){
if (needsLock()){
//needsLock() == true means this input is the sole responsible input for a stream
//That means it's MistInBuffer for live, or the actual input binary for VoD
//For these cases, we lock the SEM_INPUT semaphore.
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
INFO_MSG("A player for stream %s is already running", streamName.c_str());
playerLock.close();
return 1;
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;}
streamStatus.master = false;
streamStatus.close();
}else{
//needsLock() == false means this binary will itself start the sole responsible input
//So, we definitely do NOT lock SEM_INPUT, since the child process will do that later.
//However, most of these processes are singular, meaning they expect to be the only source of data.
//To prevent multiple singular processes starting, we use the MstPull semaphore if this input
//is indeed a singular input type.
if (isSingular()){
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!pullLock){
FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str());
return 1;
}
if (!pullLock.tryWait()){
WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
pullLock.close();
return 1;
}
}
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;}
streamStatus.master = false;
streamStatus.close();
}
config->activate();
uint64_t reTimer = 0;
while (config->is_active){
pid_t pid = fork();
if (pid == 0){
//Re-init streamStatus, previously closed
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
streamStatus.master = false;
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;}
if (needsLock()){playerLock.close();}
if (playerLock){
//Re-init streamStatus, previously closed
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
streamStatus.master = false;
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;}
}
//Abandon all semaphores, ye who enter here.
playerLock.abandon();
pullLock.abandon();
if (!preRun()){return 0;}
return run();
}
if (pid == -1){
FAIL_MSG("Unable to spawn input process");
if (needsLock()){playerLock.post();}
//We failed. Release the kra... semaphores!
//post() contains an is-open check already, no need to double-check.
playerLock.unlink();
pullLock.unlink();
return 2;
}
HIGH_MSG("Waiting for child for stream %s", streamName.c_str());
//wait for the process to exit
int status;
while (waitpid(pid, &status, 0) != pid && errno == EINTR){
@ -153,35 +189,38 @@ namespace Mist {
}
continue;
}
HIGH_MSG("Done waiting for child for stream %s", streamName.c_str());
//if the exit was clean, don't restart it
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
INFO_MSG("Input for stream %s shut down cleanly", streamName.c_str());
break;
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;}
if (playerLock){
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;}
}
#if DEBUG >= DLVL_DEVEL
WARN_MSG("Aborting autoclean; this is a development build.");
INFO_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
WARN_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
break;
#else
WARN_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
onCrash();
INFO_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
Util::wait(reTimer);
reTimer += 1000;
#endif
}
if (needsLock()){
playerLock.post();
if (playerLock){
playerLock.unlink();
playerLock.close();
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
streamStatus.close();
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.init(pageName, 1, true, false);
streamStatus.close();
pullLock.unlink();
HIGH_MSG("Angel process for %s exiting", streamName.c_str());
return 0;
}
@ -207,29 +246,21 @@ namespace Mist {
}
if (!streamName.size()) {
//If we don't have a stream name, that means we're in stand-alone conversion mode.
MEDIUM_MSG("Starting convert");
convert();
} else if (!needsLock()) {
//We have a name and aren't the sole process. That means we're streaming live data to a buffer.
MEDIUM_MSG("Starting stream");
stream();
}else{
//We are the sole process and have a name. That means this is a Buffer or VoD input.
MEDIUM_MSG("Starting serve");
serve();
}
return 0;
}
/// Default crash handler, cleans up Pull semaphore on crashes
void Input::onCrash(){
if (streamName.size() && !needsLock()) {
//we have a Pull semaphore to clean up, do it
IPC::semaphore pullLock;
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
pullLock.close();
pullLock.unlink();
}
}
void Input::convert() {
//check filename for no -
if (config->getString("output") != "-"){
@ -296,7 +327,7 @@ namespace Mist {
userPage.init(userPageName, PLAY_EX_SIZE, true);
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_READY;}
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str());
INFO_MSG("Input for stream %s started", streamName.c_str());
activityCounter = Util::bootSecs();
//main serve loop
while (keepRunning()) {
@ -321,7 +352,7 @@ namespace Mist {
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;}
config->is_active = false;
finish();
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str());
INFO_MSG("Input for stream %s closing clean", streamName.c_str());
userPage.finishEach();
//end player functionality
}
@ -352,27 +383,10 @@ namespace Mist {
/// - if there are tracks, register as a non-viewer on the user page of the buffer
/// - call getNext() in a loop, buffering packets
void Input::stream(){
IPC::semaphore pullLock;
if(isSingular()){
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!pullLock){
FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str());
return;
}
if (!pullLock.tryWait()){
WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
pullLock.close();
return;
}
if (Util::streamAlive(streamName)){
pullLock.post();
pullLock.close();
pullLock.unlink();
WARN_MSG("Stream already online, cancelling");
return;
}
if (Util::streamAlive(streamName)){
WARN_MSG("Stream already online, cancelling");
return;
}
std::map<std::string, std::string> overrides;
@ -382,11 +396,6 @@ namespace Mist {
}
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer
if(isSingular()){
pullLock.post();
pullLock.close();
pullLock.unlink();
}
WARN_MSG("Could not start buffer, cancelling");
return;
}
@ -396,10 +405,6 @@ namespace Mist {
if (!openStreamSource()){
FAIL_MSG("Unable to connect to source");
if(isSingular()){
pullLock.post();
pullLock.close();
}
return;
}
@ -413,11 +418,6 @@ namespace Mist {
if (myMeta.tracks.size() == 0){
nProxy.userClient.finish();
finish();
if(isSingular()){
pullLock.post();
pullLock.close();
pullLock.unlink();
}
INFO_MSG("No tracks found, cancelling");
return;
}
@ -434,11 +434,6 @@ namespace Mist {
nProxy.userClient.finish();
finish();
if(isSingular()){
pullLock.post();
pullLock.close();
pullLock.unlink();
}
INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str());
return;
}

View file

@ -20,7 +20,7 @@ namespace Mist {
public:
Input(Util::Config * cfg);
virtual int run();
virtual void onCrash();
virtual void onCrash(){}
virtual int boot(int argc, char * argv[]);
virtual ~Input() {};

View file

@ -463,7 +463,7 @@ namespace Mist {
}
//Track is set to "New track request", assign new track id and create shared memory page
//This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process
if (value & 0x80000000) {
if (config->is_active && (value & 0x80000000)) {
if (value & 0x40000000) {
unsigned long finalMap = value & ~0xC0000000;
//Register the new track as an active track.
@ -480,9 +480,15 @@ namespace Mist {
char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap);
tMeta.init(tempMetaName, 8388608, false);
tMeta.init(tempMetaName, 8388608, false, false);
if (!tMeta){continue;}//abort for now if page doesn't exist yet
//The page exist, now we try to read in the metadata of the track
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap);
nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false, false);
if (!nProxy.metaPages[finalMap]){continue;}//abort for now if page doesn't exist yet
//The pages exist, now we try to read in the metadata of the track
//Store the size of the dtsc packet to read.
unsigned int len = ntohl(((int *)tMeta.mapped)[1]);
@ -505,11 +511,6 @@ namespace Mist {
userConn.setTrackId(index, finalMap);
userConn.setKeynum(index, 0x0000);
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap);
nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false);
//Update the metadata for this track
updateTrackMeta(finalMap);
hasPush = true;
@ -536,7 +537,7 @@ namespace Mist {
}
//The track id is set to the value of a track that we are currently negotiating about
if (negotiatingTracks.count(value)) {
if (config->is_active && negotiatingTracks.count(value)) {
//If the metadata page for this track is not yet registered, initialize it
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) {
char tempMetaName[NAME_BUFFER_SIZE];
@ -740,7 +741,10 @@ namespace Mist {
strName = strName.substr(0, (strName.find_first_of("+ ")));
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
if (!configLock.tryWaitOneSecond()){
INFO_MSG("Aborting stream config refresh: locking took longer than expected");
return false;
}
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName);
long long tmpNum;
@ -783,8 +787,6 @@ namespace Mist {
resumeMode = tmpNum;
}
configLock.post();
configLock.close();
return true;
}