diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index b6b2cf9c..0d921627 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -47,7 +47,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif - isLocked = false; + isLocked = 0; } ///\brief Constructs a named semaphore @@ -61,7 +61,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif - isLocked = false; + isLocked = 0; open(name, oflag, mode, value, noWait); } @@ -168,13 +168,18 @@ namespace IPC { #endif return; } - if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) - ReleaseMutex(mySem); + ReleaseMutex(mySem); #else - sem_post(mySem); + sem_post(mySem); #endif - isLocked = false; + --isLocked; + if (!isLocked){ + uint64_t micros = Util::getMicros(lockTime); + if (micros > 500){ + INFO_MSG("Semaphore %s was locked for %.3f ms", myName.c_str(), (double)micros/1000.0); + BACKTRACE; + } } } @@ -189,7 +194,8 @@ namespace IPC { tmp = sem_wait(mySem); } while (tmp == -1 && errno == EINTR); #endif - isLocked = true; + lockTime = Util::getMicros(); + ++isLocked; } } @@ -208,7 +214,9 @@ namespace IPC { result = sem_trywait(mySem); } while (result == -1 && errno == EINTR); #endif - return isLocked = (result == 0); + isLocked += (result == 0?1:0); + if (isLocked == 1){lockTime = Util::getMicros();} + return isLocked; } ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise @@ -225,28 +233,27 @@ namespace IPC { /// \todo (roxlu) test tryWaitOneSecond, shared_memory.cpp uint64_t now = Util::getMicros(); uint64_t timeout = now + 1e6; - while (now < timeout) { - if (0 == sem_trywait(mySem)) { - isLocked = true; - return true; - } + result = 1; + while (result && now < timeout) { + result = sem_trywait(mySem); usleep(100e3); now = Util::getMicros(); } - return false; #else struct timespec wt; wt.tv_sec = 1; wt.tv_nsec = 0; result = sem_timedwait(mySem, &wt); #endif - return isLocked = (result == 0); + isLocked += (result == 0?1:0); + if (isLocked == 1){lockTime = Util::getMicros();} + return isLocked; } ///\brief Closes the currently opened semaphore void semaphore::close() { if (*this) { - if (isLocked){post();} + while (isLocked){post();} #if defined(__CYGWIN__) || defined(_WIN32) CloseHandle(mySem); mySem = 0; @@ -276,7 +283,7 @@ namespace IPC { /// Unlinks the previously opened semaphore, closing it (if open) in the process. void semaphore::unlink() { #if defined(__CYGWIN__) || defined(_WIN32) - if (isLocked){post();} + while (isLocked){post();} #endif #if !defined(__CYGWIN__) && !defined(_WIN32) if (myName.size()){sem_unlink(myName.c_str());} diff --git a/lib/shared_memory.h b/lib/shared_memory.h index d6a8d665..3a96d7bb 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -86,7 +86,8 @@ namespace IPC { #else sem_t * mySem; #endif - bool isLocked; + unsigned int isLocked; + uint64_t lockTime; std::string myName; }; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 1a339e8e..06cc4129 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -1319,7 +1319,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){ if (streamIndex.mapped){ static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, it->c_str()); - IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1); + IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 8); metaLocker.wait(); DTSC::Scan strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan(); uint64_t lms = 0; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 5f25d2a7..e0868a37 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -341,9 +341,16 @@ namespace Mist { if (!liveMeta){ static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveMeta = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + liveMeta = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 8); } liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped) { char pageName[NAME_BUFFER_SIZE]; @@ -354,6 +361,13 @@ namespace Mist { myMeta.writeTo(nProxy.metaPages[0].mapped); memset(nProxy.metaPages[0].mapped + myMeta.getSendLen(), 0, (nProxy.metaPages[0].len > myMeta.getSendLen() ? std::min((size_t)(nProxy.metaPages[0].len - myMeta.getSendLen()), (size_t)4) : 0)); liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); } ///Checks if removing a key from this track is allowed/safe, and if so, removes it. @@ -722,12 +736,12 @@ namespace Mist { updateTrackMeta(finalMap); hasPush = true; } + //Update the metadata to reflect all changes + updateMeta(); //Write the final mapped track number and keyframe number to the user page element //This is used to resume pushing as well as pushing new tracks userConn.setTrackId(index, finalMap); userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); - //Update the metadata to reflect all changes - updateMeta(); continue; } //Set the temporary track id for this item, and increase the temporary value for use with the next track @@ -859,6 +873,8 @@ namespace Mist { myMeta.tracks[finalMap].lastms = 0; myMeta.tracks[finalMap].trackID = finalMap; } + //Update the metadata to reflect all changes + updateMeta(); //Write the final mapped track number and keyframe number to the user page element //This is used to resume pushing as well as pushing new tracks userConn.setTrackId(index, finalMap); @@ -867,8 +883,6 @@ namespace Mist { }else{ userConn.setKeynum(index, 0); } - //Update the metadata to reflect all changes - updateMeta(); } //If the track is active, and this is the element responsible for pushing it if (activeTracks.count(value) && pushLocation[value] == data) { @@ -914,7 +928,6 @@ namespace Mist { for (std::map::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) { updateMetaFromPage(tNum, pageIt->first); } - updateMeta(); } void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) { diff --git a/src/output/output.cpp b/src/output/output.cpp index 5285b636..dd74d081 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -151,7 +151,7 @@ namespace Mist{ if (!myMeta.vod){ static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live); + liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 8, !myMeta.live); if (*liveSem){ liveSem->wait(); }else{ @@ -1828,7 +1828,7 @@ namespace Mist{ IPC::semaphore * liveSem = 0; static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live); + liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 8, !myMeta.live); if (*liveSem){ liveSem->wait(); }else{