diff --git a/lib/defines.h b/lib/defines.h index 625e468e..c491018b 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -99,6 +99,7 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define SEM_LIVE "/MstLIVE%s" //%s stream name #define SEM_INPUT "/MstInpt%s" //%s stream name #define SEM_CONF "/MstConfLock" +#define SEM_SESSCACHE "/MstSessCacheLock" #define SHM_CONF "MstConf" #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define SHM_SESSIONS "/MstSess" diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index ef499bff..c0e3072e 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -43,6 +43,11 @@ tthread::mutex Controller::statsMutex; std::map Controller::activeStreams; unsigned int Controller::maxConnsPerIP = 0; +/// Session cache shared memory page +IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); +/// Lock for the session cache shared memory page +IPC::semaphore cacheLock(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1); + //For server-wide totals. Local to this file only. struct streamTotals { unsigned long long upBytes; @@ -127,11 +132,13 @@ void Controller::streamStopped(std::string stream){ IPC::sharedServer * statPointer = 0; ///Invalidates all current sessions for the given streamname +///Updates the session cache, afterwards. void Controller::sessions_invalidate(const std::string & streamname){ if (!statPointer){ FAIL_MSG("In shutdown procedure - cannot invalidate sessions."); return; } + cacheLock.wait(); unsigned int invalidated = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -141,11 +148,14 @@ void Controller::sessions_invalidate(const std::string & streamname){ invalidated += it->second.invalidate(); } } + Controller::writeSessionCache(); + cacheLock.post(); INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str()); } ///Shuts down all current sessions for the given streamname +///Updates the session cache, afterwards. (if any action was taken) void Controller::sessions_shutdown(JSON::Iter & i){ if (i->isArray() || i->isObject()){ jsonForEach(*i, it){ @@ -161,11 +171,13 @@ void Controller::sessions_shutdown(JSON::Iter & i){ } ///Shuts down the given session +///Updates the session cache, afterwards. void Controller::sessId_shutdown(const std::string & sessId){ if (!statPointer){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } + cacheLock.wait(); unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -176,6 +188,8 @@ void Controller::sessId_shutdown(const std::string & sessId){ break; } } + Controller::writeSessionCache(); + cacheLock.post(); INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); } @@ -196,11 +210,13 @@ void Controller::sessId_tag(const std::string & sessId, const std::string & tag) } ///Shuts down sessions with the given tag set +///Updates the session cache, afterwards. void Controller::tag_shutdown(const std::string & tag){ if (!statPointer){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } + cacheLock.wait(); unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -210,15 +226,19 @@ void Controller::tag_shutdown(const std::string & tag){ murdered += it->second.kill(); } } + Controller::writeSessionCache(); + cacheLock.post(); INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); } ///Shuts down all current sessions for the given streamname +///Updates the session cache, afterwards. void Controller::sessions_shutdown(const std::string & streamname, const std::string & protocol){ if (!statPointer){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } + cacheLock.wait(); unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); @@ -228,23 +248,51 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st murdered += it->second.kill(); } } + Controller::writeSessionCache(); + cacheLock.post(); INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str()); } +/// Writes the session cache to shared memory. +/// Assumes the config mutex, stats mutex and session cache semaphore are already locked. +/// Does nothing if the session cache could not be initialized on the first try +/// Does no error checking after first open attempt (fails silently)! +void Controller::writeSessionCache(){ + uint32_t shmOffset = 0; + if (shmSessions.mapped){ + if (sessions.size()){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->second.hasData()){ + //store an entry in the shmSessions page, if it fits + if (it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ + *((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc; + strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100); + strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20); + strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40); + shmSessions.mapped[shmOffset+164] = it->second.sync; + shmOffset += SHM_SESSIONS_ITEM; + } + } + } + } + //set a final shmSessions entry to all zeroes + memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM); + } +} + /// This function runs as a thread and roughly once per second retrieves /// statistics from all connected clients, as well as wipes /// old statistics that have disconnected over 10 minutes ago. void Controller::SharedMemStats(void * config){ DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); - IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); statPointer = &statServer; std::set inactiveStreams; while(((Util::Config*)config)->is_active){ - uint32_t shmOffset = 0; { tthread::lock_guard guard(Controller::configMutex); tthread::lock_guard guard2(statsMutex); + cacheLock.wait(); /*LTS*/ //parse current users statServer.parseEach(parseStatistics); //wipe old statistics @@ -261,16 +309,6 @@ void Controller::SharedMemStats(void * config){ } if (!it->second.hasData()){ mustWipe.push_back(it->first); - }else{ - //store an entry in the shmSessions page, if it fits - if (shmSessions.mapped && it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ - *((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc; - strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100); - strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20); - strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40); - shmSessions.mapped[shmOffset+164] = it->second.sync; - shmOffset += SHM_SESSIONS_ITEM; - } } } while (mustWipe.size()){ @@ -278,10 +316,6 @@ void Controller::SharedMemStats(void * config){ mustWipe.pop_front(); } } - if (shmSessions.mapped){ - //set a final shmSessions entry to all zeroes - memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM); - } if (activeStreams.size()){ for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ if (++it->second > STATS_DELAY){ @@ -295,7 +329,11 @@ void Controller::SharedMemStats(void * config){ inactiveStreams.erase(inactiveStreams.begin()); } } - Controller::checkServerLimits(); /*LTS*/ + /*LTS-START*/ + Controller::writeSessionCache(); + Controller::checkServerLimits(); + cacheLock.post(); + /*LTS-END*/ } Util::wait(1000); } @@ -314,6 +352,7 @@ void Controller::SharedMemStats(void * config){ } /// Forces a re-sync of the session +/// Assumes the session cache will be updated separately - may not work correctly if this is forgotten! uint32_t Controller::statSession::invalidate(){ uint32_t ret = 0; sync = 1; @@ -331,6 +370,7 @@ uint32_t Controller::statSession::invalidate(){ } /// Kills all active connections, sets the session state to denied (sync=100). +/// Assumes the session cache will be updated separately - may not work correctly if this is forgotten! uint32_t Controller::statSession::kill(){ uint32_t ret = 0; sync = 100; diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 276c42a8..f134019e 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -18,11 +18,11 @@ namespace Controller { extern bool killOnExit; extern unsigned int maxConnsPerIP; - //These functions keep track of which streams are currently active. + //These keep track of which streams are currently active. extern std::map activeStreams; ///This function is ran whenever a stream becomes active. void streamStarted(std::string stream); - ///This function is ran whenever a stream becomes active. + ///This function is ran whenever a stream becomes inactive. void streamStopped(std::string stream); struct statLog { @@ -130,6 +130,7 @@ namespace Controller { void sessId_tag(const std::string & sessId, const std::string & tag); void sessions_shutdown(const std::string & streamname, const std::string & protocol = ""); bool hasViewers(std::string streamName); + void writeSessionCache(); /*LTS*/ #define PROMETHEUS_TEXT 0 #define PROMETHEUS_JSON 1 diff --git a/src/output/output.cpp b/src/output/output.cpp index 3db1f7d7..5eaf0266 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -179,6 +179,8 @@ namespace Mist{ char initialSync = 0; //attempt to load sync status from session cache in shm { + IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 1); + if (cacheLock){cacheLock.wait();} IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); if (shmSessions.mapped){ char shmEmpty[SHM_SESSIONS_ITEM]; @@ -218,6 +220,7 @@ namespace Mist{ shmOffset += SHM_SESSIONS_ITEM; } } + if (cacheLock){cacheLock.post();} } unsigned int i = 0; tmpEx.setSync(initialSync);