diff --git a/lib/config.cpp b/lib/config.cpp index dce06ca4..9a389871 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -401,7 +401,7 @@ void Util::Config::activate() { struct sigaction cur_action; new_action.sa_sigaction = signal_handler; sigemptyset(&new_action.sa_mask); - new_action.sa_flags = 0; + new_action.sa_flags = SA_SIGINFO; sigaction(SIGINT, &new_action, NULL); sigaction(SIGHUP, &new_action, NULL); sigaction(SIGTERM, &new_action, NULL); @@ -423,6 +423,10 @@ void Util::Config::signal_handler(int signum, siginfo_t * sigInfo, void * ignore case SIGHUP: case SIGTERM: if (serv_sock_pointer){serv_sock_pointer->close();} +#if DEBUG >= DLVL_DEVEL + static int ctr = 0; + if (!is_active && ++ctr > 4){BACKTRACE;} +#endif is_active = false; default: switch (sigInfo->si_code){ diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index fc2f9548..dd7f169f 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -93,6 +93,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif + isLocked = false; } ///\brief Constructs a named semaphore @@ -106,6 +107,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif + isLocked = false; open(name, oflag, mode, value, noWait); } @@ -178,9 +180,9 @@ namespace IPC { } #endif } - if (!(*this)) { + if (*this) { + myName = (char *)name; } - myName = (char *)name; } ///\brief Returns the current value of the semaphore @@ -197,12 +199,20 @@ namespace IPC { ///\brief Posts to the semaphore, increases its value by one void semaphore::post() { + if (!*this || !isLocked){ + FAIL_MSG("Attempted to unlock a non-locked semaphore: '%s'!", myName.c_str()); +#if DEBUG >= DLVL_DEVEL + BACKTRACE; +#endif + return; + } if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) ReleaseMutex(mySem); #else sem_post(mySem); #endif + isLocked = false; } } @@ -217,6 +227,7 @@ namespace IPC { tmp = sem_wait(mySem); } while (tmp == -1 && errno == EINTR); #endif + isLocked = true; } } @@ -235,7 +246,7 @@ namespace IPC { result = sem_trywait(mySem); } while (result == -1 && errno == EINTR); #endif - return (result == 0); + return isLocked = (result == 0); } ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise @@ -254,6 +265,7 @@ namespace IPC { long long unsigned int timeout = now + 1e6; while (now < timeout) { if (0 == sem_trywait(mySem)) { + isLocked = true; return true; } usleep(100e3); @@ -266,12 +278,28 @@ namespace IPC { wt.tv_nsec = 0; result = sem_timedwait(mySem, &wt); #endif - return (result == 0); + return isLocked = (result == 0); } ///\brief Closes the currently opened semaphore void semaphore::close() { if (*this) { + if (isLocked){post();} +#if defined(__CYGWIN__) || defined(_WIN32) + CloseHandle(mySem); + mySem = 0; +#else + sem_close(mySem); + mySem = SEM_FAILED; +#endif + } + myName.clear(); + } + + /// Closes the semaphore, without unlocking it first. + /// Intended to be called from forked child processes, to drop the reference to the semaphore. + void semaphore::abandon() { + if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) CloseHandle(mySem); mySem = 0; @@ -280,19 +308,28 @@ namespace IPC { mySem = SEM_FAILED; #endif } - } - - ///\brief Unlinks the previously opened semaphore - void semaphore::unlink() { - close(); -#if !defined(__CYGWIN__) && !defined(_WIN32) - if (myName.size()) { - sem_unlink(myName.c_str()); - } -#endif myName.clear(); } + /// Unlinks the previously opened semaphore, closing it (if open) in the process. + void semaphore::unlink() { +#if defined(__CYGWIN__) || defined(_WIN32) + if (isLocked){post();} +#endif +#if !defined(__CYGWIN__) && !defined(_WIN32) + if (myName.size()){sem_unlink(myName.c_str());} +#endif + if (*this) { +#if defined(__CYGWIN__) || defined(_WIN32) + CloseHandle(mySem); + mySem = 0; +#else + sem_close(mySem); + mySem = SEM_FAILED; +#endif + } + myName.clear(); + } #if defined(__CYGWIN__) || defined(_WIN32) SECURITY_ATTRIBUTES semaphore::getSecurityAttributes() { @@ -368,7 +405,7 @@ namespace IPC { ///\brief Unmaps a shared page if allowed void sharedPage::unmap() { - if (mapped && len) { + if (mapped) { #if defined(__CYGWIN__) || defined(_WIN32) //under Cygwin, the mapped location is shifted by 4 to contain the page size. UnmapViewOfFile(mapped - 4); @@ -795,26 +832,19 @@ namespace IPC { baseName = "/" + name; payLen = len; hasCounter = withCounter; - mySemaphore.open(baseName.c_str(), O_CREAT | O_EXCL | O_RDWR, ACCESSPERMS, 1); - if (!mySemaphore) { - mySemaphore.open(baseName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - } + mySemaphore.open(baseName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!mySemaphore) { DEBUG_MSG(DLVL_FAIL, "Creating semaphore failed: %s", strerror(errno)); return; + }else{ + semGuard tmpGuard(&mySemaphore); + amount = 0; + newPage(); } - if (!mySemaphore.tryWaitOneSecond()){ - WARN_MSG("Force unlocking sharedServer semaphore to prevent deadlock"); - } - mySemaphore.post(); - semGuard tmpGuard(&mySemaphore); - amount = 0; - newPage(); } ///\brief The deconstructor sharedServer::~sharedServer() { - mySemaphore.close(); mySemaphore.unlink(); } @@ -1251,7 +1281,7 @@ namespace IPC { if (!hasCounter) { return (myPage.mapped != 0); } - if (myPage.mapped){ + if (myPage.mapped && offsetOnPage >= 0){ return (myPage.mapped[offsetOnPage] & 0x7F) < 60; } return false; diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 0b7f23f2..fe146910 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -75,6 +75,7 @@ namespace IPC { bool tryWait(); bool tryWaitOneSecond(); void close(); + void abandon(); void unlink(); private: #if defined(__CYGWIN__) || defined(_WIN32) @@ -84,6 +85,7 @@ namespace IPC { #else sem_t * mySem; #endif + bool isLocked; std::string myName; }; diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index b63c2af9..7ce1e016 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -83,7 +83,8 @@ void statusMonitor(void *np){ WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); changed = true; } - configLock.post(); + configLock.unlink(); + configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (changed || Controller::configChanged){ Controller::writeConfig(); Controller::configChanged = false; diff --git a/src/input/input.cpp b/src/input/input.cpp index 5d50cb7f..d322ac9c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -109,41 +109,77 @@ namespace Mist { } IPC::semaphore playerLock; - if (needsLock() && streamName.size()){ - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); - playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); - return 1; + IPC::semaphore pullLock; + + //If we're not converting, we might need a lock. + if (streamName.size()){ + if (needsLock()){ + //needsLock() == true means this input is the sole responsible input for a stream + //That means it's MistInBuffer for live, or the actual input binary for VoD + //For these cases, we lock the SEM_INPUT semaphore. + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); + playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()){ + INFO_MSG("A player for stream %s is already running", streamName.c_str()); + playerLock.close(); + return 1; + } + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} + streamStatus.master = false; + streamStatus.close(); + }else{ + //needsLock() == false means this binary will itself start the sole responsible input + //So, we definitely do NOT lock SEM_INPUT, since the child process will do that later. + //However, most of these processes are singular, meaning they expect to be the only source of data. + //To prevent multiple singular processes starting, we use the MstPull semaphore if this input + //is indeed a singular input type. + if (isSingular()){ + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return 1; + } + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return 1; + } + } } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} - streamStatus.master = false; - streamStatus.close(); } + config->activate(); uint64_t reTimer = 0; while (config->is_active){ pid_t pid = fork(); if (pid == 0){ - //Re-init streamStatus, previously closed - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - streamStatus.master = false; - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} - if (needsLock()){playerLock.close();} + if (playerLock){ + //Re-init streamStatus, previously closed + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + streamStatus.master = false; + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} + } + //Abandon all semaphores, ye who enter here. + playerLock.abandon(); + pullLock.abandon(); if (!preRun()){return 0;} return run(); } if (pid == -1){ FAIL_MSG("Unable to spawn input process"); - if (needsLock()){playerLock.post();} + //We failed. Release the kra... semaphores! + //post() contains an is-open check already, no need to double-check. + playerLock.unlink(); + pullLock.unlink(); return 2; } + HIGH_MSG("Waiting for child for stream %s", streamName.c_str()); //wait for the process to exit int status; while (waitpid(pid, &status, 0) != pid && errno == EINTR){ @@ -153,35 +189,38 @@ namespace Mist { } continue; } + HIGH_MSG("Done waiting for child for stream %s", streamName.c_str()); //if the exit was clean, don't restart it if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ INFO_MSG("Input for stream %s shut down cleanly", streamName.c_str()); break; } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;} + if (playerLock){ + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;} + } #if DEBUG >= DLVL_DEVEL - WARN_MSG("Aborting autoclean; this is a development build."); - INFO_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); + WARN_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); break; #else + WARN_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); onCrash(); - INFO_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); Util::wait(reTimer); reTimer += 1000; #endif } - if (needsLock()){ - playerLock.post(); + + if (playerLock){ playerLock.unlink(); - playerLock.close(); + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + streamStatus.close(); } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - streamStatus.close(); + pullLock.unlink(); + HIGH_MSG("Angel process for %s exiting", streamName.c_str()); return 0; } @@ -207,29 +246,21 @@ namespace Mist { } if (!streamName.size()) { + //If we don't have a stream name, that means we're in stand-alone conversion mode. MEDIUM_MSG("Starting convert"); convert(); } else if (!needsLock()) { + //We have a name and aren't the sole process. That means we're streaming live data to a buffer. MEDIUM_MSG("Starting stream"); stream(); }else{ + //We are the sole process and have a name. That means this is a Buffer or VoD input. MEDIUM_MSG("Starting serve"); serve(); } return 0; } - /// Default crash handler, cleans up Pull semaphore on crashes - void Input::onCrash(){ - if (streamName.size() && !needsLock()) { - //we have a Pull semaphore to clean up, do it - IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - pullLock.close(); - pullLock.unlink(); - } - } - void Input::convert() { //check filename for no - if (config->getString("output") != "-"){ @@ -296,7 +327,7 @@ namespace Mist { userPage.init(userPageName, PLAY_EX_SIZE, true); if (streamStatus){streamStatus.mapped[0] = STRMSTAT_READY;} - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); + INFO_MSG("Input for stream %s started", streamName.c_str()); activityCounter = Util::bootSecs(); //main serve loop while (keepRunning()) { @@ -321,7 +352,7 @@ namespace Mist { if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} config->is_active = false; finish(); - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str()); + INFO_MSG("Input for stream %s closing clean", streamName.c_str()); userPage.finishEach(); //end player functionality } @@ -352,27 +383,10 @@ namespace Mist { /// - if there are tracks, register as a non-viewer on the user page of the buffer /// - call getNext() in a loop, buffering packets void Input::stream(){ - IPC::semaphore pullLock; - if(isSingular()){ - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; - } - - if (Util::streamAlive(streamName)){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - WARN_MSG("Stream already online, cancelling"); - return; - } + + if (Util::streamAlive(streamName)){ + WARN_MSG("Stream already online, cancelling"); + return; } std::map overrides; @@ -382,11 +396,6 @@ namespace Mist { } if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } WARN_MSG("Could not start buffer, cancelling"); return; } @@ -396,10 +405,6 @@ namespace Mist { if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - } return; } @@ -413,11 +418,6 @@ namespace Mist { if (myMeta.tracks.size() == 0){ nProxy.userClient.finish(); finish(); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } INFO_MSG("No tracks found, cancelling"); return; } @@ -434,11 +434,6 @@ namespace Mist { nProxy.userClient.finish(); finish(); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); return; } diff --git a/src/input/input.h b/src/input/input.h index 30727947..3bfe22af 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -20,7 +20,7 @@ namespace Mist { public: Input(Util::Config * cfg); virtual int run(); - virtual void onCrash(); + virtual void onCrash(){} virtual int boot(int argc, char * argv[]); virtual ~Input() {}; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 027da501..e1c6147e 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -463,7 +463,7 @@ namespace Mist { } //Track is set to "New track request", assign new track id and create shared memory page //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process - if (value & 0x80000000) { + if (config->is_active && (value & 0x80000000)) { if (value & 0x40000000) { unsigned long finalMap = value & ~0xC0000000; //Register the new track as an active track. @@ -480,9 +480,15 @@ namespace Mist { char tempMetaName[NAME_BUFFER_SIZE]; snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap); - tMeta.init(tempMetaName, 8388608, false); + tMeta.init(tempMetaName, 8388608, false, false); + if (!tMeta){continue;}//abort for now if page doesn't exist yet - //The page exist, now we try to read in the metadata of the track + char firstPage[NAME_BUFFER_SIZE]; + snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); + nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false, false); + if (!nProxy.metaPages[finalMap]){continue;}//abort for now if page doesn't exist yet + + //The pages exist, now we try to read in the metadata of the track //Store the size of the dtsc packet to read. unsigned int len = ntohl(((int *)tMeta.mapped)[1]); @@ -505,11 +511,6 @@ namespace Mist { userConn.setTrackId(index, finalMap); userConn.setKeynum(index, 0x0000); - - char firstPage[NAME_BUFFER_SIZE]; - snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); - nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false); - //Update the metadata for this track updateTrackMeta(finalMap); hasPush = true; @@ -536,7 +537,7 @@ namespace Mist { } //The track id is set to the value of a track that we are currently negotiating about - if (negotiatingTracks.count(value)) { + if (config->is_active && negotiatingTracks.count(value)) { //If the metadata page for this track is not yet registered, initialize it if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) { char tempMetaName[NAME_BUFFER_SIZE]; @@ -740,7 +741,10 @@ namespace Mist { strName = strName.substr(0, (strName.find_first_of("+ "))); IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); + if (!configLock.tryWaitOneSecond()){ + INFO_MSG("Aborting stream config refresh: locking took longer than expected"); + return false; + } DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); long long tmpNum; @@ -783,8 +787,6 @@ namespace Mist { resumeMode = tmpNum; } - configLock.post(); - configLock.close(); return true; }