diff --git a/lib/config.cpp b/lib/config.cpp index cbfa9b85..9d931e3e 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -422,7 +422,7 @@ void Util::Config::activate() { struct sigaction cur_action; new_action.sa_sigaction = signal_handler; sigemptyset(&new_action.sa_mask); - new_action.sa_flags = 0; + new_action.sa_flags = SA_SIGINFO; sigaction(SIGINT, &new_action, NULL); sigaction(SIGHUP, &new_action, NULL); sigaction(SIGTERM, &new_action, NULL); @@ -444,6 +444,10 @@ void Util::Config::signal_handler(int signum, siginfo_t * sigInfo, void * ignore case SIGHUP: case SIGTERM: if (serv_sock_pointer){serv_sock_pointer->close();} +#if DEBUG >= DLVL_DEVEL + static int ctr = 0; + if (!is_active && ++ctr > 4){BACKTRACE;} +#endif is_active = false; default: switch (sigInfo->si_code){ diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 0b32f20c..b13efe21 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -94,6 +94,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif + isLocked = false; } ///\brief Constructs a named semaphore @@ -107,6 +108,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif + isLocked = false; open(name, oflag, mode, value, noWait); } @@ -179,9 +181,9 @@ namespace IPC { } #endif } - if (!(*this)) { + if (*this) { + myName = (char *)name; } - myName = (char *)name; } ///\brief Returns the current value of the semaphore @@ -198,12 +200,20 @@ namespace IPC { ///\brief Posts to the semaphore, increases its value by one void semaphore::post() { + if (!*this || !isLocked){ + FAIL_MSG("Attempted to unlock a non-locked semaphore: '%s'!", myName.c_str()); +#if DEBUG >= DLVL_DEVEL + BACKTRACE; +#endif + return; + } if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) ReleaseMutex(mySem); #else sem_post(mySem); #endif + isLocked = false; } } @@ -218,6 +228,7 @@ namespace IPC { tmp = sem_wait(mySem); } while (tmp == -1 && errno == EINTR); #endif + isLocked = true; } } @@ -236,7 +247,7 @@ namespace IPC { result = sem_trywait(mySem); } while (result == -1 && errno == EINTR); #endif - return (result == 0); + return isLocked = (result == 0); } ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise @@ -255,6 +266,7 @@ namespace IPC { long long unsigned int timeout = now + 1e6; while (now < timeout) { if (0 == sem_trywait(mySem)) { + isLocked = true; return true; } usleep(100e3); @@ -267,12 +279,28 @@ namespace IPC { wt.tv_nsec = 0; result = sem_timedwait(mySem, &wt); #endif - return (result == 0); + return isLocked = (result == 0); } ///\brief Closes the currently opened semaphore void semaphore::close() { if (*this) { + if (isLocked){post();} +#if defined(__CYGWIN__) || defined(_WIN32) + CloseHandle(mySem); + mySem = 0; +#else + sem_close(mySem); + mySem = SEM_FAILED; +#endif + } + myName.clear(); + } + + /// Closes the semaphore, without unlocking it first. + /// Intended to be called from forked child processes, to drop the reference to the semaphore. + void semaphore::abandon() { + if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) CloseHandle(mySem); mySem = 0; @@ -281,19 +309,28 @@ namespace IPC { mySem = SEM_FAILED; #endif } - } - - ///\brief Unlinks the previously opened semaphore - void semaphore::unlink() { - close(); -#if !defined(__CYGWIN__) && !defined(_WIN32) - if (myName.size()) { - sem_unlink(myName.c_str()); - } -#endif myName.clear(); } + /// Unlinks the previously opened semaphore, closing it (if open) in the process. + void semaphore::unlink() { +#if defined(__CYGWIN__) || defined(_WIN32) + if (isLocked){post();} +#endif +#if !defined(__CYGWIN__) && !defined(_WIN32) + if (myName.size()){sem_unlink(myName.c_str());} +#endif + if (*this) { +#if defined(__CYGWIN__) || defined(_WIN32) + CloseHandle(mySem); + mySem = 0; +#else + sem_close(mySem); + mySem = SEM_FAILED; +#endif + } + myName.clear(); + } #if defined(__CYGWIN__) || defined(_WIN32) SECURITY_ATTRIBUTES semaphore::getSecurityAttributes() { @@ -369,7 +406,7 @@ namespace IPC { ///\brief Unmaps a shared page if allowed void sharedPage::unmap() { - if (mapped && len) { + if (mapped) { #if defined(__CYGWIN__) || defined(_WIN32) //under Cygwin, the mapped location is shifted by 4 to contain the page size. UnmapViewOfFile(mapped - 4); @@ -801,26 +838,19 @@ namespace IPC { baseName = "/" + name; payLen = len; hasCounter = withCounter; - mySemaphore.open(baseName.c_str(), O_CREAT | O_EXCL | O_RDWR, ACCESSPERMS, 1); - if (!mySemaphore) { - mySemaphore.open(baseName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - } + mySemaphore.open(baseName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!mySemaphore) { DEBUG_MSG(DLVL_FAIL, "Creating semaphore failed: %s", strerror(errno)); return; + }else{ + semGuard tmpGuard(&mySemaphore); + amount = 0; + newPage(); } - if (!mySemaphore.tryWaitOneSecond()){ - WARN_MSG("Force unlocking sharedServer semaphore to prevent deadlock"); - } - mySemaphore.post(); - semGuard tmpGuard(&mySemaphore); - amount = 0; - newPage(); } ///\brief The deconstructor sharedServer::~sharedServer() { - mySemaphore.close(); mySemaphore.unlink(); } @@ -1255,7 +1285,7 @@ namespace IPC { if (!hasCounter) { return (myPage.mapped != 0); } - if (myPage.mapped){ + if (myPage.mapped && offsetOnPage >= 0){ return (myPage.mapped[offsetOnPage] & 0x7F) < 60; } return false; diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 2e202b0e..5044b069 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -76,6 +76,7 @@ namespace IPC { bool tryWait(); bool tryWaitOneSecond(); void close(); + void abandon(); void unlink(); private: #if defined(__CYGWIN__) || defined(_WIN32) @@ -85,6 +86,7 @@ namespace IPC { #else sem_t * mySem; #endif + bool isLocked; std::string myName; }; diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 295c8a77..dff64b46 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -91,7 +91,8 @@ void statusMonitor(void *np){ WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); changed = true; } - configLock.post(); + configLock.unlink(); + configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (changed || Controller::configChanged){ Controller::writeConfig(); Controller::configChanged = false; diff --git a/src/input/input.cpp b/src/input/input.cpp index e192f599..7e3c0542 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -231,41 +231,77 @@ namespace Mist { } IPC::semaphore playerLock; - if (needsLock() && streamName.size()){ - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); - playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); - return 1; + IPC::semaphore pullLock; + + //If we're not converting, we might need a lock. + if (streamName.size()){ + if (needsLock()){ + //needsLock() == true means this input is the sole responsible input for a stream + //That means it's MistInBuffer for live, or the actual input binary for VoD + //For these cases, we lock the SEM_INPUT semaphore. + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); + playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()){ + INFO_MSG("A player for stream %s is already running", streamName.c_str()); + playerLock.close(); + return 1; + } + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} + streamStatus.master = false; + streamStatus.close(); + }else{ + //needsLock() == false means this binary will itself start the sole responsible input + //So, we definitely do NOT lock SEM_INPUT, since the child process will do that later. + //However, most of these processes are singular, meaning they expect to be the only source of data. + //To prevent multiple singular processes starting, we use the MstPull semaphore if this input + //is indeed a singular input type. + if (isSingular()){ + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return 1; + } + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return 1; + } + } } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} - streamStatus.master = false; - streamStatus.close(); } + config->activate(); uint64_t reTimer = 0; while (config->is_active){ pid_t pid = fork(); if (pid == 0){ - //Re-init streamStatus, previously closed - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - streamStatus.master = false; - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} - if (needsLock()){playerLock.close();} + if (playerLock){ + //Re-init streamStatus, previously closed + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + streamStatus.master = false; + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} + } + //Abandon all semaphores, ye who enter here. + playerLock.abandon(); + pullLock.abandon(); if (!preRun()){return 0;} return run(); } if (pid == -1){ FAIL_MSG("Unable to spawn input process"); - if (needsLock()){playerLock.post();} + //We failed. Release the kra... semaphores! + //post() contains an is-open check already, no need to double-check. + playerLock.unlink(); + pullLock.unlink(); return 2; } + HIGH_MSG("Waiting for child for stream %s", streamName.c_str()); //wait for the process to exit int status; while (waitpid(pid, &status, 0) != pid && errno == EINTR){ @@ -275,35 +311,38 @@ namespace Mist { } continue; } + HIGH_MSG("Done waiting for child for stream %s", streamName.c_str()); //if the exit was clean, don't restart it if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ INFO_MSG("Input for stream %s shut down cleanly", streamName.c_str()); break; } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;} + if (playerLock){ + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;} + } #if DEBUG >= DLVL_DEVEL - WARN_MSG("Aborting autoclean; this is a development build."); - INFO_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); + WARN_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); break; #else + WARN_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); onCrash(); - INFO_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); Util::wait(reTimer); reTimer += 1000; #endif } - if (needsLock()){ - playerLock.post(); + + if (playerLock){ playerLock.unlink(); - playerLock.close(); + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + streamStatus.close(); } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - streamStatus.close(); + pullLock.unlink(); + HIGH_MSG("Angel process for %s exiting", streamName.c_str()); return 0; } @@ -329,29 +368,21 @@ namespace Mist { } if (!streamName.size()) { + //If we don't have a stream name, that means we're in stand-alone conversion mode. MEDIUM_MSG("Starting convert"); convert(); } else if (!needsLock()) { + //We have a name and aren't the sole process. That means we're streaming live data to a buffer. MEDIUM_MSG("Starting stream"); stream(); }else{ + //We are the sole process and have a name. That means this is a Buffer or VoD input. MEDIUM_MSG("Starting serve"); serve(); } return 0; } - /// Default crash handler, cleans up Pull semaphore on crashes - void Input::onCrash(){ - if (streamName.size() && !needsLock()) { - //we have a Pull semaphore to clean up, do it - IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - pullLock.close(); - pullLock.unlink(); - } - } - void Input::convert() { //check filename for no - if (config->getString("output") != "-") { @@ -433,7 +464,7 @@ namespace Mist { /*LTS-END*/ if (streamStatus){streamStatus.mapped[0] = STRMSTAT_READY;} - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); + INFO_MSG("Input for stream %s started", streamName.c_str()); activityCounter = Util::bootSecs(); //main serve loop while (keepRunning()) { @@ -458,7 +489,7 @@ namespace Mist { if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} config->is_active = false; finish(); - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str()); + INFO_MSG("Input for stream %s closing clean", streamName.c_str()); userPage.finishEach(); //end player functionality } @@ -502,27 +533,10 @@ namespace Mist { /// - if there are tracks, register as a non-viewer on the user page of the buffer /// - call getNext() in a loop, buffering packets void Input::stream(){ - IPC::semaphore pullLock; - if(isSingular()){ - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; - } - - if (Util::streamAlive(streamName)){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - WARN_MSG("Stream already online, cancelling"); - return; - } + + if (Util::streamAlive(streamName)){ + WARN_MSG("Stream already online, cancelling"); + return; } std::map overrides; @@ -532,11 +546,6 @@ namespace Mist { } if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } WARN_MSG("Could not start buffer, cancelling"); return; } @@ -546,10 +555,6 @@ namespace Mist { if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - } return; } @@ -563,11 +568,6 @@ namespace Mist { if (myMeta.tracks.size() == 0){ nProxy.userClient.finish(); finish(); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } INFO_MSG("No tracks found, cancelling"); return; } @@ -584,11 +584,6 @@ namespace Mist { nProxy.userClient.finish(); finish(); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); return; } diff --git a/src/input/input.h b/src/input/input.h index 3ae390b8..cc1a9ec0 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -21,7 +21,7 @@ namespace Mist { public: Input(Util::Config * cfg); virtual int run(); - virtual void onCrash(); + virtual void onCrash(){} virtual int boot(int argc, char * argv[]); virtual ~Input() {}; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 2fee7e57..17d0b4d9 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -643,7 +643,7 @@ namespace Mist { } //Track is set to "New track request", assign new track id and create shared memory page //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process - if (value & 0x80000000) { + if (config->is_active && (value & 0x80000000)) { if (value & 0x40000000) { unsigned long finalMap = value & ~0xC0000000; //Register the new track as an active track. @@ -660,9 +660,15 @@ namespace Mist { char tempMetaName[NAME_BUFFER_SIZE]; snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap); - tMeta.init(tempMetaName, 8388608, false); + tMeta.init(tempMetaName, 8388608, false, false); + if (!tMeta){continue;}//abort for now if page doesn't exist yet - //The page exist, now we try to read in the metadata of the track + char firstPage[NAME_BUFFER_SIZE]; + snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); + nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false, false); + if (!nProxy.metaPages[finalMap]){continue;}//abort for now if page doesn't exist yet + + //The pages exist, now we try to read in the metadata of the track //Store the size of the dtsc packet to read. unsigned int len = ntohl(((int *)tMeta.mapped)[1]); @@ -685,11 +691,6 @@ namespace Mist { userConn.setTrackId(index, finalMap); userConn.setKeynum(index, 0x0000); - - char firstPage[NAME_BUFFER_SIZE]; - snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); - nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false); - //Update the metadata for this track updateTrackMeta(finalMap); hasPush = true; @@ -716,7 +717,7 @@ namespace Mist { } //The track id is set to the value of a track that we are currently negotiating about - if (negotiatingTracks.count(value)) { + if (config->is_active && negotiatingTracks.count(value)) { //If the metadata page for this track is not yet registered, initialize it if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) { char tempMetaName[NAME_BUFFER_SIZE]; @@ -947,7 +948,10 @@ namespace Mist { strName = strName.substr(0, (strName.find_first_of("+ "))); IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); + if (!configLock.tryWaitOneSecond()){ + INFO_MSG("Aborting stream config refresh: locking took longer than expected"); + return false; + } DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); long long tmpNum; @@ -1030,8 +1034,6 @@ namespace Mist { segmentSize = tmpNum; } /*LTS-END*/ - configLock.post(); - configLock.close(); return true; }