Added shared memory session cache + instant-sync for sessions in cache.

This commit is contained in:
Thulinma 2016-10-09 20:14:58 +02:00
parent ad514b6744
commit 0b78a57e40
3 changed files with 64 additions and 1 deletions

View file

@ -104,6 +104,9 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "
#define SEM_CONF "/MstConfLock"
#define SHM_CONF "MstConf"
#define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames
#define SHM_SESSIONS "/MstSess"
#define SHM_SESSIONS_ITEM 165 //4 byte crc, 100b streamname, 20b connector, 40b host, 1b sync
#define SHM_SESSIONS_SIZE 5248000 //5MiB = almost 32k sessions
#define SHM_STREAM_ENCRYPT "MstCRYP%s" //%s stream name

View file

@ -212,9 +212,11 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true);
statPointer = &statServer;
std::set<std::string> inactiveStreams;
while(((Util::Config*)config)->is_active){
uint32_t shmOffset = 0;
{
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
tthread::lock_guard<tthread::mutex> guard2(statsMutex);
@ -228,6 +230,16 @@ void Controller::SharedMemStats(void * config){
it->second.wipeOld(cutOffPoint);
if (!it->second.hasData()){
mustWipe.push_back(it->first);
}else{
//store an entry in the shmSessions page, if it fits
if (shmSessions.mapped && it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
*((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc;
strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100);
strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20);
strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40);
shmSessions.mapped[shmOffset+164] = it->second.sync;
shmOffset += SHM_SESSIONS_ITEM;
}
}
}
while (mustWipe.size()){
@ -235,6 +247,10 @@ void Controller::SharedMemStats(void * config){
mustWipe.pop_front();
}
}
if (shmSessions.mapped){
//set a final shmSessions entry to all zeroes
memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM);
}
if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
if (++it->second > STATS_DELAY){
@ -256,6 +272,7 @@ void Controller::SharedMemStats(void * config){
HIGH_MSG("Stopping stats thread");
if (Controller::restarting){
statServer.abandon();
shmSessions.master = false;
}else{/*LTS-START*/
if (Controller::killOnExit){
DEBUG_MSG(DLVL_WARN, "Killing all connected clients to force full shutdown");

View file

@ -172,8 +172,51 @@ namespace Mist {
if (tmpEx.getSync() == 2 || force){
if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){
//sync byte 0 = no sync yet, wait for sync from controller...
char initialSync = 0;
//attempt to load sync status from session cache in shm
{
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false);
if (shmSessions.mapped){
char shmEmpty[SHM_SESSIONS_ITEM];
memset(shmEmpty, 0, SHM_SESSIONS_ITEM);
std::string host = tmpEx.host();
if (host.substr(0, 12) == std::string("\000\000\000\000\000\000\000\000\000\000\377\377", 12)){
char tmpstr[16];
snprintf(tmpstr, 16, "%hhu.%hhu.%hhu.%hhu", host[12], host[13], host[14], host[15]);
host = tmpstr;
}else{
char tmpstr[40];
snprintf(tmpstr, 40, "%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x", host[0], host[1], host[2], host[3], host[4], host[5], host[6], host[7], host[8], host[9], host[10], host[11], host[12], host[13], host[14], host[15]);
host = tmpstr;
}
uint32_t shmOffset = 0;
const std::string & cName = capa["name"].asStringRef();
while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
//compare crc
if (*((uint32_t*)(shmSessions.mapped+shmOffset)) == tmpEx.crc()){
//compare stream name
if (strncmp(shmSessions.mapped+shmOffset+4, streamName.c_str(), 100) == 0){
//compare connector
if (strncmp(shmSessions.mapped+shmOffset+104, cName.c_str(), 20) == 0){
//compare host
if (strncmp(shmSessions.mapped+shmOffset+124, host.c_str(), 40) == 0){
initialSync = shmSessions.mapped[shmOffset+164];
INFO_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync);
break;
}
}
}
}
//stop if we reached the end
if (memcmp(shmSessions.mapped+shmOffset, shmEmpty, SHM_SESSIONS_ITEM) == 0){
break;
}
shmOffset += SHM_SESSIONS_ITEM;
}
}
}
unsigned int i = 0;
tmpEx.setSync(0);
tmpEx.setSync(initialSync);
//wait max 10 seconds for sync
while ((!tmpEx.getSync() || tmpEx.getSync() == 2) && i++ < 100){
Util::wait(100);