Hopefully fixed Windows

This commit is contained in:
Thulinma 2017-05-17 12:09:04 +02:00
parent 6e21e70fae
commit 3bad5c2161
2 changed files with 26 additions and 21 deletions

View file

@ -44,9 +44,9 @@ std::map<std::string, unsigned int> Controller::activeStreams;
unsigned int Controller::maxConnsPerIP = 0; unsigned int Controller::maxConnsPerIP = 0;
/// Session cache shared memory page /// Session cache shared memory page
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); IPC::sharedPage * shmSessions = 0;
/// Lock for the session cache shared memory page /// Lock for the session cache shared memory page
IPC::semaphore Controller::cacheLock(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); IPC::semaphore * cacheLock = 0;
//For server-wide totals. Local to this file only. //For server-wide totals. Local to this file only.
struct streamTotals { struct streamTotals {
@ -138,7 +138,7 @@ void Controller::sessions_invalidate(const std::string & streamname){
FAIL_MSG("In shutdown procedure - cannot invalidate sessions."); FAIL_MSG("In shutdown procedure - cannot invalidate sessions.");
return; return;
} }
cacheLock.wait(); if (cacheLock){cacheLock->wait();}
unsigned int invalidated = 0; unsigned int invalidated = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -149,7 +149,7 @@ void Controller::sessions_invalidate(const std::string & streamname){
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
cacheLock.post(); if (cacheLock){cacheLock->post();}
INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str()); INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str());
} }
@ -177,7 +177,7 @@ void Controller::sessId_shutdown(const std::string & sessId){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
cacheLock.wait(); if (cacheLock){cacheLock->wait();}
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -189,7 +189,7 @@ void Controller::sessId_shutdown(const std::string & sessId){
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
cacheLock.post(); if (cacheLock){cacheLock->post();}
INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str());
} }
@ -216,7 +216,7 @@ void Controller::tag_shutdown(const std::string & tag){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
cacheLock.wait(); if (cacheLock){cacheLock->wait();}
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -227,7 +227,7 @@ void Controller::tag_shutdown(const std::string & tag){
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
cacheLock.post(); if (cacheLock){cacheLock->post();}
INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str());
} }
@ -238,7 +238,7 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
cacheLock.wait(); if (cacheLock){cacheLock->wait();}
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -249,7 +249,7 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st
} }
} }
Controller::writeSessionCache(); Controller::writeSessionCache();
cacheLock.post(); if (cacheLock){cacheLock->post();}
INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str()); INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str());
} }
@ -259,24 +259,24 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st
/// Does no error checking after first open attempt (fails silently)! /// Does no error checking after first open attempt (fails silently)!
void Controller::writeSessionCache(){ void Controller::writeSessionCache(){
uint32_t shmOffset = 0; uint32_t shmOffset = 0;
if (shmSessions.mapped){ if (shmSessions && shmSessions->mapped){
if (sessions.size()){ if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){ for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.hasData()){ if (it->second.hasData()){
//store an entry in the shmSessions page, if it fits //store an entry in the shmSessions page, if it fits
if (it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ if (it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
*((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc; *((uint32_t*)(shmSessions->mapped+shmOffset)) = it->first.crc;
strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100); strncpy(shmSessions->mapped+shmOffset+4, it->first.streamName.c_str(), 100);
strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20); strncpy(shmSessions->mapped+shmOffset+104, it->first.connector.c_str(), 20);
strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40); strncpy(shmSessions->mapped+shmOffset+124, it->first.host.c_str(), 40);
shmSessions.mapped[shmOffset+164] = it->second.sync; shmSessions->mapped[shmOffset+164] = it->second.sync;
shmOffset += SHM_SESSIONS_ITEM; shmOffset += SHM_SESSIONS_ITEM;
} }
} }
} }
} }
//set a final shmSessions entry to all zeroes //set a final shmSessions entry to all zeroes
memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM); memset(shmSessions->mapped+shmOffset, 0, SHM_SESSIONS_ITEM);
} }
} }
@ -287,12 +287,14 @@ void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
statPointer = &statServer; statPointer = &statServer;
shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, true);
cacheLock = new IPC::semaphore(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1);
std::set<std::string> inactiveStreams; std::set<std::string> inactiveStreams;
while(((Util::Config*)config)->is_active){ while(((Util::Config*)config)->is_active){
{ {
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex); tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
tthread::lock_guard<tthread::mutex> guard2(statsMutex); tthread::lock_guard<tthread::mutex> guard2(statsMutex);
cacheLock.wait(); /*LTS*/ cacheLock->wait(); /*LTS*/
//parse current users //parse current users
statServer.parseEach(parseStatistics); statServer.parseEach(parseStatistics);
//wipe old statistics //wipe old statistics
@ -332,7 +334,7 @@ void Controller::SharedMemStats(void * config){
/*LTS-START*/ /*LTS-START*/
Controller::writeSessionCache(); Controller::writeSessionCache();
Controller::checkServerLimits(); Controller::checkServerLimits();
cacheLock.post(); cacheLock->post();
/*LTS-END*/ /*LTS-END*/
} }
Util::wait(1000); Util::wait(1000);
@ -341,7 +343,7 @@ void Controller::SharedMemStats(void * config){
HIGH_MSG("Stopping stats thread"); HIGH_MSG("Stopping stats thread");
if (Controller::restarting){ if (Controller::restarting){
statServer.abandon(); statServer.abandon();
shmSessions.master = false; shmSessions->master = false;
}else{/*LTS-START*/ }else{/*LTS-START*/
if (Controller::killOnExit){ if (Controller::killOnExit){
DEBUG_MSG(DLVL_WARN, "Killing all connected clients to force full shutdown"); DEBUG_MSG(DLVL_WARN, "Killing all connected clients to force full shutdown");
@ -349,6 +351,10 @@ void Controller::SharedMemStats(void * config){
} }
/*LTS-END*/ /*LTS-END*/
} }
delete shmSessions;
shmSessions = 0;
delete cacheLock;
cacheLock = 0;
} }
/// Forces a re-sync of the session /// Forces a re-sync of the session

View file

@ -17,7 +17,6 @@ namespace Controller {
extern bool killOnExit; extern bool killOnExit;
extern unsigned int maxConnsPerIP; extern unsigned int maxConnsPerIP;
extern Controller::semaphore cacheLock;
//These keep track of which streams are currently active. //These keep track of which streams are currently active.
extern std::map<std::string, unsigned int> activeStreams; extern std::map<std::string, unsigned int> activeStreams;