diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 54200e02..1a6b76d2 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -159,28 +159,49 @@ namespace IPC{ sem_post(mySem); #endif --isLocked; +#if DEBUG >= DLVL_DEVEL if (!isLocked){ uint64_t micros = Util::getMicros(lockTime); if (micros > 10000){ INFO_MSG("Semaphore %s was locked for %.3f ms", myName.c_str(), (double)micros / 1000.0); } } +#endif + } + + ///\brief Posts to the semaphore, increases its value by count + void semaphore::post(size_t count){ + for (size_t i = 0; i < count; ++i){post();} } ///\brief Waits for the semaphore, decreases its value by one void semaphore::wait(){ if (*this){ +#if DEBUG >= DLVL_DEVEL + uint64_t preLockTime = Util::getMicros(); +#endif #if defined(__CYGWIN__) || defined(_WIN32) WaitForSingleObject(mySem, INFINITE); #else int tmp; do{tmp = sem_wait(mySem);}while (tmp == -1 && errno == EINTR); #endif +#if DEBUG >= DLVL_DEVEL lockTime = Util::getMicros(); + if (lockTime - preLockTime > 50000){ + INFO_MSG("Semaphore %s took %.3f ms to lock", myName.c_str(), (double)(lockTime-preLockTime) / 1000.0); + } +#endif ++isLocked; } } + ///\brief Waits for the semaphore, decreases its value by count + void semaphore::wait(size_t count){ + for (size_t i = 0; i < count; ++i){wait();} + } + + ///\brief Tries to wait for the semaphore, returns true if successful, false otherwise bool semaphore::tryWait(){ if (!(*this)){return false;} @@ -229,7 +250,7 @@ namespace IPC{ wt.tv_nsec = ms % 1000; result = sem_timedwait(mySem, &wt); #endif - return isLocked = (result == 0); + return (isLocked = (result == 0)); } ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 502db889..35dabaf1 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -32,6 +32,8 @@ namespace IPC{ int getVal() const; void post(); void wait(); + void post(size_t count); + void wait(size_t count); bool tryWait(); bool tryWait(uint64_t ms); bool tryWaitOneSecond(); diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 6b7488a8..8abd6254 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -190,7 +190,6 @@ void Controller::sessions_invalidate(const std::string &streamname){ FAIL_MSG("In shutdown procedure - cannot invalidate sessions."); return; } - if (cacheLock){cacheLock->wait();} unsigned int invalidated = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -201,7 +200,6 @@ void Controller::sessions_invalidate(const std::string &streamname){ } } Controller::writeSessionCache(); - if (cacheLock){cacheLock->post();} INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str()); } @@ -227,7 +225,6 @@ void Controller::sessId_shutdown(const std::string &sessId){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } - if (cacheLock){cacheLock->wait();} unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -239,7 +236,6 @@ void Controller::sessId_shutdown(const std::string &sessId){ } } Controller::writeSessionCache(); - if (cacheLock){cacheLock->post();} INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); } @@ -268,7 +264,6 @@ void Controller::tag_shutdown(const std::string &tag){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } - if (cacheLock){cacheLock->wait();} unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -279,7 +274,6 @@ void Controller::tag_shutdown(const std::string &tag){ } } Controller::writeSessionCache(); - if (cacheLock){cacheLock->post();} INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); } @@ -290,7 +284,6 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } - if (cacheLock){cacheLock->wait();} unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -302,7 +295,6 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str } } Controller::writeSessionCache(); - if (cacheLock){cacheLock->post();} INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str()); } @@ -314,6 +306,7 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str void Controller::writeSessionCache(){ uint32_t shmOffset = 0; if (shmSessions && shmSessions->mapped){ + if (cacheLock){cacheLock->wait(16);} if (sessions.size()){ for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.hasData()){ @@ -331,6 +324,7 @@ void Controller::writeSessionCache(){ } // set a final shmSessions entry to all zeroes memset(shmSessions->mapped + shmOffset, 0, SHM_SESSIONS_ITEM); + if (cacheLock){cacheLock->post(16);} } } @@ -346,9 +340,9 @@ void Controller::SharedMemStats(void *config){ if (shmSessions){delete shmSessions;} shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); } - cacheLock = new IPC::semaphore(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); + cacheLock = new IPC::semaphore(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 16); cacheLock->unlink(); - cacheLock->open(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); + cacheLock->open(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 16); std::set inactiveStreams; Controller::initState(); bool shiftWrites = true; @@ -379,7 +373,6 @@ void Controller::SharedMemStats(void *config){ tthread::lock_guard guard(Controller::configMutex); tthread::lock_guard guard2(statsMutex); - cacheLock->wait(); /*LTS*/ // parse current users statLeadIn(); COMM_LOOP(statComm, statOnActive(id), statOnDisconnect(id)); @@ -485,7 +478,6 @@ void Controller::SharedMemStats(void *config){ /*LTS-START*/ Controller::writeSessionCache(); Controller::checkServerLimits(); - cacheLock->post(); /*LTS-END*/ } Util::wait(1000); diff --git a/src/output/output.cpp b/src/output/output.cpp index 3ab5214c..8d98557e 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -184,7 +184,7 @@ namespace Mist{ char initialSync = 0; // attempt to load sync status from session cache in shm { - IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 1); + IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 16); if (cacheLock){cacheLock.wait();} IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); if (shmSessions.mapped){ @@ -196,7 +196,7 @@ namespace Mist{ const std::string &cName = capa["name"].asStringRef(); while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ // compare crc - if (Bit::btohl(shmSessions.mapped + shmOffset) == statComm.getCRC()){ + if (*(uint32_t*)(shmSessions.mapped + shmOffset) == crc){ // compare stream name if (strncmp(shmSessions.mapped + shmOffset + 4, streamName.c_str(), 100) == 0){ // compare connector @@ -204,7 +204,7 @@ namespace Mist{ // compare host if (strncmp(shmSessions.mapped + shmOffset + 124, host.c_str(), 40) == 0){ initialSync = shmSessions.mapped[shmOffset + 164]; - INFO_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync); + HIGH_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync); break; } }