Fixed session cache race conditions

This commit is contained in:
Thulinma 2017-04-30 15:04:54 +02:00
parent 40b7cb7e63
commit 2cec1f7836
4 changed files with 64 additions and 19 deletions

View file

@ -99,6 +99,7 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "
#define SEM_LIVE "/MstLIVE%s" //%s stream name #define SEM_LIVE "/MstLIVE%s" //%s stream name
#define SEM_INPUT "/MstInpt%s" //%s stream name #define SEM_INPUT "/MstInpt%s" //%s stream name
#define SEM_CONF "/MstConfLock" #define SEM_CONF "/MstConfLock"
#define SEM_SESSCACHE "/MstSessCacheLock"
#define SHM_CONF "MstConf" #define SHM_CONF "MstConf"
#define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames
#define SHM_SESSIONS "/MstSess" #define SHM_SESSIONS "/MstSess"

View file

@ -43,6 +43,11 @@ tthread::mutex Controller::statsMutex;
std::map<std::string, unsigned int> Controller::activeStreams; std::map<std::string, unsigned int> Controller::activeStreams;
unsigned int Controller::maxConnsPerIP = 0; unsigned int Controller::maxConnsPerIP = 0;
/// Session cache shared memory page
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true);
/// Lock for the session cache shared memory page
IPC::semaphore cacheLock(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 1);
//For server-wide totals. Local to this file only. //For server-wide totals. Local to this file only.
struct streamTotals { struct streamTotals {
unsigned long long upBytes; unsigned long long upBytes;
@ -127,11 +132,13 @@ void Controller::streamStopped(std::string stream){
IPC::sharedServer * statPointer = 0; IPC::sharedServer * statPointer = 0;
///Invalidates all current sessions for the given streamname ///Invalidates all current sessions for the given streamname
///Updates the session cache, afterwards.
void Controller::sessions_invalidate(const std::string & streamname){ void Controller::sessions_invalidate(const std::string & streamname){
if (!statPointer){ if (!statPointer){
FAIL_MSG("In shutdown procedure - cannot invalidate sessions."); FAIL_MSG("In shutdown procedure - cannot invalidate sessions.");
return; return;
} }
cacheLock.wait();
unsigned int invalidated = 0; unsigned int invalidated = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -141,11 +148,14 @@ void Controller::sessions_invalidate(const std::string & streamname){
invalidated += it->second.invalidate(); invalidated += it->second.invalidate();
} }
} }
Controller::writeSessionCache();
cacheLock.post();
INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str()); INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str());
} }
///Shuts down all current sessions for the given streamname ///Shuts down all current sessions for the given streamname
///Updates the session cache, afterwards. (if any action was taken)
void Controller::sessions_shutdown(JSON::Iter & i){ void Controller::sessions_shutdown(JSON::Iter & i){
if (i->isArray() || i->isObject()){ if (i->isArray() || i->isObject()){
jsonForEach(*i, it){ jsonForEach(*i, it){
@ -161,11 +171,13 @@ void Controller::sessions_shutdown(JSON::Iter & i){
} }
///Shuts down the given session ///Shuts down the given session
///Updates the session cache, afterwards.
void Controller::sessId_shutdown(const std::string & sessId){ void Controller::sessId_shutdown(const std::string & sessId){
if (!statPointer){ if (!statPointer){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
cacheLock.wait();
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -176,6 +188,8 @@ void Controller::sessId_shutdown(const std::string & sessId){
break; break;
} }
} }
Controller::writeSessionCache();
cacheLock.post();
INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str());
} }
@ -196,11 +210,13 @@ void Controller::sessId_tag(const std::string & sessId, const std::string & tag)
} }
///Shuts down sessions with the given tag set ///Shuts down sessions with the given tag set
///Updates the session cache, afterwards.
void Controller::tag_shutdown(const std::string & tag){ void Controller::tag_shutdown(const std::string & tag){
if (!statPointer){ if (!statPointer){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
cacheLock.wait();
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -210,15 +226,19 @@ void Controller::tag_shutdown(const std::string & tag){
murdered += it->second.kill(); murdered += it->second.kill();
} }
} }
Controller::writeSessionCache();
cacheLock.post();
INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str());
} }
///Shuts down all current sessions for the given streamname ///Shuts down all current sessions for the given streamname
///Updates the session cache, afterwards.
void Controller::sessions_shutdown(const std::string & streamname, const std::string & protocol){ void Controller::sessions_shutdown(const std::string & streamname, const std::string & protocol){
if (!statPointer){ if (!statPointer){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return; return;
} }
cacheLock.wait();
unsigned int murdered = 0; unsigned int murdered = 0;
unsigned int sessCount = 0; unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -228,23 +248,51 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st
murdered += it->second.kill(); murdered += it->second.kill();
} }
} }
Controller::writeSessionCache();
cacheLock.post();
INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str()); INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str());
} }
/// Writes the session cache to shared memory.
/// Assumes the config mutex, stats mutex and session cache semaphore are already locked.
/// Does nothing if the session cache could not be initialized on the first try
/// Does no error checking after first open attempt (fails silently)!
void Controller::writeSessionCache(){
uint32_t shmOffset = 0;
if (shmSessions.mapped){
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.hasData()){
//store an entry in the shmSessions page, if it fits
if (it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
*((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc;
strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100);
strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20);
strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40);
shmSessions.mapped[shmOffset+164] = it->second.sync;
shmOffset += SHM_SESSIONS_ITEM;
}
}
}
}
//set a final shmSessions entry to all zeroes
memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM);
}
}
/// This function runs as a thread and roughly once per second retrieves /// This function runs as a thread and roughly once per second retrieves
/// statistics from all connected clients, as well as wipes /// statistics from all connected clients, as well as wipes
/// old statistics that have disconnected over 10 minutes ago. /// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){ void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true);
statPointer = &statServer; statPointer = &statServer;
std::set<std::string> inactiveStreams; std::set<std::string> inactiveStreams;
while(((Util::Config*)config)->is_active){ while(((Util::Config*)config)->is_active){
uint32_t shmOffset = 0;
{ {
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex); tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
tthread::lock_guard<tthread::mutex> guard2(statsMutex); tthread::lock_guard<tthread::mutex> guard2(statsMutex);
cacheLock.wait(); /*LTS*/
//parse current users //parse current users
statServer.parseEach(parseStatistics); statServer.parseEach(parseStatistics);
//wipe old statistics //wipe old statistics
@ -261,16 +309,6 @@ void Controller::SharedMemStats(void * config){
} }
if (!it->second.hasData()){ if (!it->second.hasData()){
mustWipe.push_back(it->first); mustWipe.push_back(it->first);
}else{
//store an entry in the shmSessions page, if it fits
if (shmSessions.mapped && it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
*((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc;
strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100);
strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20);
strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40);
shmSessions.mapped[shmOffset+164] = it->second.sync;
shmOffset += SHM_SESSIONS_ITEM;
}
} }
} }
while (mustWipe.size()){ while (mustWipe.size()){
@ -278,10 +316,6 @@ void Controller::SharedMemStats(void * config){
mustWipe.pop_front(); mustWipe.pop_front();
} }
} }
if (shmSessions.mapped){
//set a final shmSessions entry to all zeroes
memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM);
}
if (activeStreams.size()){ if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ for (std::map<std::string, unsigned int>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
if (++it->second > STATS_DELAY){ if (++it->second > STATS_DELAY){
@ -295,7 +329,11 @@ void Controller::SharedMemStats(void * config){
inactiveStreams.erase(inactiveStreams.begin()); inactiveStreams.erase(inactiveStreams.begin());
} }
} }
Controller::checkServerLimits(); /*LTS*/ /*LTS-START*/
Controller::writeSessionCache();
Controller::checkServerLimits();
cacheLock.post();
/*LTS-END*/
} }
Util::wait(1000); Util::wait(1000);
} }
@ -314,6 +352,7 @@ void Controller::SharedMemStats(void * config){
} }
/// Forces a re-sync of the session /// Forces a re-sync of the session
/// Assumes the session cache will be updated separately - may not work correctly if this is forgotten!
uint32_t Controller::statSession::invalidate(){ uint32_t Controller::statSession::invalidate(){
uint32_t ret = 0; uint32_t ret = 0;
sync = 1; sync = 1;
@ -331,6 +370,7 @@ uint32_t Controller::statSession::invalidate(){
} }
/// Kills all active connections, sets the session state to denied (sync=100). /// Kills all active connections, sets the session state to denied (sync=100).
/// Assumes the session cache will be updated separately - may not work correctly if this is forgotten!
uint32_t Controller::statSession::kill(){ uint32_t Controller::statSession::kill(){
uint32_t ret = 0; uint32_t ret = 0;
sync = 100; sync = 100;

View file

@ -18,11 +18,11 @@ namespace Controller {
extern bool killOnExit; extern bool killOnExit;
extern unsigned int maxConnsPerIP; extern unsigned int maxConnsPerIP;
//These functions keep track of which streams are currently active. //These keep track of which streams are currently active.
extern std::map<std::string, unsigned int> activeStreams; extern std::map<std::string, unsigned int> activeStreams;
///This function is ran whenever a stream becomes active. ///This function is ran whenever a stream becomes active.
void streamStarted(std::string stream); void streamStarted(std::string stream);
///This function is ran whenever a stream becomes active. ///This function is ran whenever a stream becomes inactive.
void streamStopped(std::string stream); void streamStopped(std::string stream);
struct statLog { struct statLog {
@ -130,6 +130,7 @@ namespace Controller {
void sessId_tag(const std::string & sessId, const std::string & tag); void sessId_tag(const std::string & sessId, const std::string & tag);
void sessions_shutdown(const std::string & streamname, const std::string & protocol = ""); void sessions_shutdown(const std::string & streamname, const std::string & protocol = "");
bool hasViewers(std::string streamName); bool hasViewers(std::string streamName);
void writeSessionCache(); /*LTS*/
#define PROMETHEUS_TEXT 0 #define PROMETHEUS_TEXT 0
#define PROMETHEUS_JSON 1 #define PROMETHEUS_JSON 1

View file

@ -179,6 +179,8 @@ namespace Mist{
char initialSync = 0; char initialSync = 0;
//attempt to load sync status from session cache in shm //attempt to load sync status from session cache in shm
{ {
IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 1);
if (cacheLock){cacheLock.wait();}
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false);
if (shmSessions.mapped){ if (shmSessions.mapped){
char shmEmpty[SHM_SESSIONS_ITEM]; char shmEmpty[SHM_SESSIONS_ITEM];
@ -218,6 +220,7 @@ namespace Mist{
shmOffset += SHM_SESSIONS_ITEM; shmOffset += SHM_SESSIONS_ITEM;
} }
} }
if (cacheLock){cacheLock.post();}
} }
unsigned int i = 0; unsigned int i = 0;
tmpEx.setSync(initialSync); tmpEx.setSync(initialSync);