Session cache fix, semaphore verbosity only in dev builds

This commit is contained in:
Thulinma 2020-10-29 23:38:56 +01:00
parent 854e71c06f
commit fb56a8f9e8
4 changed files with 31 additions and 16 deletions

View file

@ -159,28 +159,49 @@ namespace IPC{
sem_post(mySem); sem_post(mySem);
#endif #endif
--isLocked; --isLocked;
#if DEBUG >= DLVL_DEVEL
if (!isLocked){ if (!isLocked){
uint64_t micros = Util::getMicros(lockTime); uint64_t micros = Util::getMicros(lockTime);
if (micros > 10000){ if (micros > 10000){
INFO_MSG("Semaphore %s was locked for %.3f ms", myName.c_str(), (double)micros / 1000.0); INFO_MSG("Semaphore %s was locked for %.3f ms", myName.c_str(), (double)micros / 1000.0);
} }
} }
#endif
}
///\brief Posts to the semaphore, increases its value by count
void semaphore::post(size_t count){
for (size_t i = 0; i < count; ++i){post();}
} }
///\brief Waits for the semaphore, decreases its value by one ///\brief Waits for the semaphore, decreases its value by one
void semaphore::wait(){ void semaphore::wait(){
if (*this){ if (*this){
#if DEBUG >= DLVL_DEVEL
uint64_t preLockTime = Util::getMicros();
#endif
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
WaitForSingleObject(mySem, INFINITE); WaitForSingleObject(mySem, INFINITE);
#else #else
int tmp; int tmp;
do{tmp = sem_wait(mySem);}while (tmp == -1 && errno == EINTR); do{tmp = sem_wait(mySem);}while (tmp == -1 && errno == EINTR);
#endif #endif
#if DEBUG >= DLVL_DEVEL
lockTime = Util::getMicros(); lockTime = Util::getMicros();
if (lockTime - preLockTime > 50000){
INFO_MSG("Semaphore %s took %.3f ms to lock", myName.c_str(), (double)(lockTime-preLockTime) / 1000.0);
}
#endif
++isLocked; ++isLocked;
} }
} }
///\brief Waits for the semaphore, decreases its value by count
void semaphore::wait(size_t count){
for (size_t i = 0; i < count; ++i){wait();}
}
///\brief Tries to wait for the semaphore, returns true if successful, false otherwise ///\brief Tries to wait for the semaphore, returns true if successful, false otherwise
bool semaphore::tryWait(){ bool semaphore::tryWait(){
if (!(*this)){return false;} if (!(*this)){return false;}
@ -229,7 +250,7 @@ namespace IPC{
wt.tv_nsec = ms % 1000; wt.tv_nsec = ms % 1000;
result = sem_timedwait(mySem, &wt); result = sem_timedwait(mySem, &wt);
#endif #endif
return isLocked = (result == 0); return (isLocked = (result == 0));
} }
///\brief Tries to wait for the semaphore for a single second, returns true if successful, false ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false

View file

@ -32,6 +32,8 @@ namespace IPC{
int getVal() const; int getVal() const;
void post(); void post();
void wait(); void wait();
void post(size_t count);
void wait(size_t count);
bool tryWait(); bool tryWait();
bool tryWait(uint64_t ms); bool tryWait(uint64_t ms);
bool tryWaitOneSecond(); bool tryWaitOneSecond();

View file

@ -190,7 +190,6 @@ void Controller::sessions_invalidate(const std::string &streamname){
FAIL_MSG("In shutdown procedure - cannot invalidate sessions."); FAIL_MSG("In shutdown procedure - cannot invalidate sessions.");
return; return;
} }
if (cacheLock){cacheLock->wait();}
unsigned int invalidated = 0; unsigned int invalidated = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -201,7 +200,6 @@ void Controller::sessions_invalidate(const std::string &streamname){
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
if (cacheLock){cacheLock->post();}
INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount,
streamname.c_str()); streamname.c_str());
} }
@ -227,7 +225,6 @@ void Controller::sessId_shutdown(const std::string &sessId){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
if (cacheLock){cacheLock->wait();}
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -239,7 +236,6 @@ void Controller::sessId_shutdown(const std::string &sessId){
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
if (cacheLock){cacheLock->post();}
INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str());
} }
@ -268,7 +264,6 @@ void Controller::tag_shutdown(const std::string &tag){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
if (cacheLock){cacheLock->wait();}
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -279,7 +274,6 @@ void Controller::tag_shutdown(const std::string &tag){
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
if (cacheLock){cacheLock->post();}
INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str());
} }
@ -290,7 +284,6 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
if (cacheLock){cacheLock->wait();}
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -302,7 +295,6 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
if (cacheLock){cacheLock->post();}
INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount,
streamname.c_str(), protocol.c_str()); streamname.c_str(), protocol.c_str());
} }
@ -314,6 +306,7 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str
void Controller::writeSessionCache(){ void Controller::writeSessionCache(){
uint32_t shmOffset = 0; uint32_t shmOffset = 0;
if (shmSessions && shmSessions->mapped){ if (shmSessions && shmSessions->mapped){
if (cacheLock){cacheLock->wait(16);}
if (sessions.size()){ if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){ for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.hasData()){ if (it->second.hasData()){
@ -331,6 +324,7 @@ void Controller::writeSessionCache(){
} }
// set a final shmSessions entry to all zeroes // set a final shmSessions entry to all zeroes
memset(shmSessions->mapped + shmOffset, 0, SHM_SESSIONS_ITEM); memset(shmSessions->mapped + shmOffset, 0, SHM_SESSIONS_ITEM);
if (cacheLock){cacheLock->post(16);}
} }
} }
@ -346,9 +340,9 @@ void Controller::SharedMemStats(void *config){
if (shmSessions){delete shmSessions;} if (shmSessions){delete shmSessions;}
shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, true);
} }
cacheLock = new IPC::semaphore(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); cacheLock = new IPC::semaphore(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 16);
cacheLock->unlink(); cacheLock->unlink();
cacheLock->open(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); cacheLock->open(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 16);
std::set<std::string> inactiveStreams; std::set<std::string> inactiveStreams;
Controller::initState(); Controller::initState();
bool shiftWrites = true; bool shiftWrites = true;
@ -379,7 +373,6 @@ void Controller::SharedMemStats(void *config){
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex); tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
tthread::lock_guard<tthread::mutex> guard2(statsMutex); tthread::lock_guard<tthread::mutex> guard2(statsMutex);
cacheLock->wait(); /*LTS*/
// parse current users // parse current users
statLeadIn(); statLeadIn();
COMM_LOOP(statComm, statOnActive(id), statOnDisconnect(id)); COMM_LOOP(statComm, statOnActive(id), statOnDisconnect(id));
@ -485,7 +478,6 @@ void Controller::SharedMemStats(void *config){
/*LTS-START*/ /*LTS-START*/
Controller::writeSessionCache(); Controller::writeSessionCache();
Controller::checkServerLimits(); Controller::checkServerLimits();
cacheLock->post();
/*LTS-END*/ /*LTS-END*/
} }
Util::wait(1000); Util::wait(1000);

View file

@ -184,7 +184,7 @@ namespace Mist{
char initialSync = 0; char initialSync = 0;
// attempt to load sync status from session cache in shm // attempt to load sync status from session cache in shm
{ {
IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 1); IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 16);
if (cacheLock){cacheLock.wait();} if (cacheLock){cacheLock.wait();}
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false);
if (shmSessions.mapped){ if (shmSessions.mapped){
@ -196,7 +196,7 @@ namespace Mist{
const std::string &cName = capa["name"].asStringRef(); const std::string &cName = capa["name"].asStringRef();
while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
// compare crc // compare crc
if (Bit::btohl(shmSessions.mapped + shmOffset) == statComm.getCRC()){ if (*(uint32_t*)(shmSessions.mapped + shmOffset) == crc){
// compare stream name // compare stream name
if (strncmp(shmSessions.mapped + shmOffset + 4, streamName.c_str(), 100) == 0){ if (strncmp(shmSessions.mapped + shmOffset + 4, streamName.c_str(), 100) == 0){
// compare connector // compare connector
@ -204,7 +204,7 @@ namespace Mist{
// compare host // compare host
if (strncmp(shmSessions.mapped + shmOffset + 124, host.c_str(), 40) == 0){ if (strncmp(shmSessions.mapped + shmOffset + 124, host.c_str(), 40) == 0){
initialSync = shmSessions.mapped[shmOffset + 164]; initialSync = shmSessions.mapped[shmOffset + 164];
INFO_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync); HIGH_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync);
break; break;
} }
} }