From bafe30e234c3bd07a3ba88a8e874e31529411d4b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 30 Sep 2020 17:24:57 +0200 Subject: [PATCH] Improvements/tests for SEM_LIVE locking --- lib/shared_memory.cpp | 41 ++++++++++++++---------- lib/shared_memory.h | 3 +- src/controller/controller_statistics.cpp | 2 +- src/input/input_buffer.cpp | 31 ++++++++++++++---- src/output/output.cpp | 4 +-- 5 files changed, 53 insertions(+), 28 deletions(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index cfbc34d2..03fb37d8 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -46,7 +46,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif - isLocked = false; + isLocked = 0; } ///\brief Constructs a named semaphore @@ -60,7 +60,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif - isLocked = false; + isLocked = 0; open(name, oflag, mode, value, noWait); } @@ -167,13 +167,18 @@ namespace IPC { #endif return; } - if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) - ReleaseMutex(mySem); + ReleaseMutex(mySem); #else - sem_post(mySem); + sem_post(mySem); #endif - isLocked = false; + --isLocked; + if (!isLocked){ + uint64_t micros = Util::getMicros(lockTime); + if (micros > 500){ + INFO_MSG("Semaphore %s was locked for %.3f ms", myName.c_str(), (double)micros/1000.0); + BACKTRACE; + } } } @@ -188,7 +193,8 @@ namespace IPC { tmp = sem_wait(mySem); } while (tmp == -1 && errno == EINTR); #endif - isLocked = true; + lockTime = Util::getMicros(); + ++isLocked; } } @@ -207,7 +213,9 @@ namespace IPC { result = sem_trywait(mySem); } while (result == -1 && errno == EINTR); #endif - return isLocked = (result == 0); + isLocked += (result == 0?1:0); + if (isLocked == 1){lockTime = Util::getMicros();} + return isLocked; } ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise @@ -224,28 +232,27 @@ namespace IPC { /// \todo (roxlu) test tryWaitOneSecond, shared_memory.cpp uint64_t now = Util::getMicros(); uint64_t timeout = now + 1e6; - while (now < timeout) { - if (0 == sem_trywait(mySem)) { - isLocked = true; - return true; - } + result = 1; + while (result && now < timeout) { + result = sem_trywait(mySem); usleep(100e3); now = Util::getMicros(); } - return false; #else struct timespec wt; wt.tv_sec = 1; wt.tv_nsec = 0; result = sem_timedwait(mySem, &wt); #endif - return isLocked = (result == 0); + isLocked += (result == 0?1:0); + if (isLocked == 1){lockTime = Util::getMicros();} + return isLocked; } ///\brief Closes the currently opened semaphore void semaphore::close() { if (*this) { - if (isLocked){post();} + while (isLocked){post();} #if defined(__CYGWIN__) || defined(_WIN32) CloseHandle(mySem); mySem = 0; @@ -275,7 +282,7 @@ namespace IPC { /// Unlinks the previously opened semaphore, closing it (if open) in the process. void semaphore::unlink() { #if defined(__CYGWIN__) || defined(_WIN32) - if (isLocked){post();} + while (isLocked){post();} #endif #if !defined(__CYGWIN__) && !defined(_WIN32) if (myName.size()){sem_unlink(myName.c_str());} diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 64b60796..e0b970f9 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -85,7 +85,8 @@ namespace IPC { #else sem_t * mySem; #endif - bool isLocked; + unsigned int isLocked; + uint64_t lockTime; std::string myName; }; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 1dc6273e..62d27c31 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -931,7 +931,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){ if (streamIndex.mapped){ static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, it->c_str()); - IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1); + IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 8); metaLocker.wait(); DTSC::Scan strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan(); uint64_t lms = 0; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index c17b255d..44d9808b 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -195,9 +195,16 @@ namespace Mist { if (!liveMeta){ static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveMeta = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + liveMeta = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 8); } liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); + liveMeta->wait(); if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped) { char pageName[NAME_BUFFER_SIZE]; @@ -208,6 +215,13 @@ namespace Mist { myMeta.writeTo(nProxy.metaPages[0].mapped); memset(nProxy.metaPages[0].mapped + myMeta.getSendLen(), 0, (nProxy.metaPages[0].len > myMeta.getSendLen() ? std::min((size_t)(nProxy.metaPages[0].len - myMeta.getSendLen()), (size_t)4) : 0)); liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); + liveMeta->post(); } ///Checks if removing a key from this track is allowed/safe, and if so, removes it. @@ -515,12 +529,12 @@ namespace Mist { updateTrackMeta(finalMap); hasPush = true; } + //Update the metadata to reflect all changes + updateMeta(); //Write the final mapped track number and keyframe number to the user page element //This is used to resume pushing as well as pushing new tracks userConn.setTrackId(index, finalMap); userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); - //Update the metadata to reflect all changes - updateMeta(); continue; } //Set the temporary track id for this item, and increase the temporary value for use with the next track @@ -643,12 +657,16 @@ namespace Mist { myMeta.tracks[finalMap].lastms = 0; myMeta.tracks[finalMap].trackID = finalMap; } + //Update the metadata to reflect all changes + updateMeta(); //Write the final mapped track number and keyframe number to the user page element //This is used to resume pushing as well as pushing new tracks userConn.setTrackId(index, finalMap); - userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); - //Update the metadata to reflect all changes - updateMeta(); + if (myMeta.tracks[finalMap].keys.size()){ + userConn.setKeynum(index, myMeta.tracks[finalMap].keys.rbegin()->getNumber()); + }else{ + userConn.setKeynum(index, 0); + } } //If the track is active, and this is the element responsible for pushing it if (activeTracks.count(value) && pushLocation[value] == data) { @@ -694,7 +712,6 @@ namespace Mist { for (std::map::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) { updateMetaFromPage(tNum, pageIt->first); } - updateMeta(); } void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) { diff --git a/src/output/output.cpp b/src/output/output.cpp index fb003bda..e6ebb125 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -109,7 +109,7 @@ namespace Mist{ if (!myMeta.vod){ static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live); + liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 8, !myMeta.live); if (*liveSem){ liveSem->wait(); }else{ @@ -1411,7 +1411,7 @@ namespace Mist{ IPC::semaphore * liveSem = 0; static char liveSemName[NAME_BUFFER_SIZE]; snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live); + liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 8, !myMeta.live); if (*liveSem){ liveSem->wait(); }else{