diff --git a/lib/defines.h b/lib/defines.h index ceb8517e..9f26a974 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -104,6 +104,9 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define SEM_CONF "/MstConfLock" #define SHM_CONF "MstConf" #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames +#define SHM_SESSIONS "/MstSess" +#define SHM_SESSIONS_ITEM 165 //4 byte crc, 100b streamname, 20b connector, 40b host, 1b sync +#define SHM_SESSIONS_SIZE 5248000 //5MiB = almost 32k sessions #define SHM_STREAM_ENCRYPT "MstCRYP%s" //%s stream name diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 6b89e996..5fb21e51 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -212,9 +212,11 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st void Controller::SharedMemStats(void * config){ DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); + IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); statPointer = &statServer; std::set inactiveStreams; while(((Util::Config*)config)->is_active){ + uint32_t shmOffset = 0; { tthread::lock_guard guard(Controller::configMutex); tthread::lock_guard guard2(statsMutex); @@ -228,6 +230,16 @@ void Controller::SharedMemStats(void * config){ it->second.wipeOld(cutOffPoint); if (!it->second.hasData()){ mustWipe.push_back(it->first); + }else{ + //store an entry in the shmSessions page, if it fits + if (shmSessions.mapped && it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ + *((uint32_t*)(shmSessions.mapped+shmOffset)) = it->first.crc; + strncpy(shmSessions.mapped+shmOffset+4, it->first.streamName.c_str(), 100); + strncpy(shmSessions.mapped+shmOffset+104, it->first.connector.c_str(), 20); + strncpy(shmSessions.mapped+shmOffset+124, it->first.host.c_str(), 40); + shmSessions.mapped[shmOffset+164] = it->second.sync; + shmOffset += SHM_SESSIONS_ITEM; + } } } while (mustWipe.size()){ @@ -235,6 +247,10 @@ void Controller::SharedMemStats(void * config){ mustWipe.pop_front(); } } + if (shmSessions.mapped){ + //set a final shmSessions entry to all zeroes + memset(shmSessions.mapped+shmOffset, 0, SHM_SESSIONS_ITEM); + } if (activeStreams.size()){ for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ if (++it->second > STATS_DELAY){ @@ -256,6 +272,7 @@ void Controller::SharedMemStats(void * config){ HIGH_MSG("Stopping stats thread"); if (Controller::restarting){ statServer.abandon(); + shmSessions.master = false; }else{/*LTS-START*/ if (Controller::killOnExit){ DEBUG_MSG(DLVL_WARN, "Killing all connected clients to force full shutdown"); diff --git a/src/output/output.cpp b/src/output/output.cpp index 63251760..8ca7645e 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -172,8 +172,51 @@ namespace Mist { if (tmpEx.getSync() == 2 || force){ if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){ //sync byte 0 = no sync yet, wait for sync from controller... + char initialSync = 0; + //attempt to load sync status from session cache in shm + { + IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); + if (shmSessions.mapped){ + char shmEmpty[SHM_SESSIONS_ITEM]; + memset(shmEmpty, 0, SHM_SESSIONS_ITEM); + std::string host = tmpEx.host(); + if (host.substr(0, 12) == std::string("\000\000\000\000\000\000\000\000\000\000\377\377", 12)){ + char tmpstr[16]; + snprintf(tmpstr, 16, "%hhu.%hhu.%hhu.%hhu", host[12], host[13], host[14], host[15]); + host = tmpstr; + }else{ + char tmpstr[40]; + snprintf(tmpstr, 40, "%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x:%0.2x%0.2x", host[0], host[1], host[2], host[3], host[4], host[5], host[6], host[7], host[8], host[9], host[10], host[11], host[12], host[13], host[14], host[15]); + host = tmpstr; + } + uint32_t shmOffset = 0; + const std::string & cName = capa["name"].asStringRef(); + while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ + //compare crc + if (*((uint32_t*)(shmSessions.mapped+shmOffset)) == tmpEx.crc()){ + //compare stream name + if (strncmp(shmSessions.mapped+shmOffset+4, streamName.c_str(), 100) == 0){ + //compare connector + if (strncmp(shmSessions.mapped+shmOffset+104, cName.c_str(), 20) == 0){ + //compare host + if (strncmp(shmSessions.mapped+shmOffset+124, host.c_str(), 40) == 0){ + initialSync = shmSessions.mapped[shmOffset+164]; + INFO_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync); + break; + } + } + } + } + //stop if we reached the end + if (memcmp(shmSessions.mapped+shmOffset, shmEmpty, SHM_SESSIONS_ITEM) == 0){ + break; + } + shmOffset += SHM_SESSIONS_ITEM; + } + } + } unsigned int i = 0; - tmpEx.setSync(0); + tmpEx.setSync(initialSync); //wait max 10 seconds for sync while ((!tmpEx.getSync() || tmpEx.getSync() == 2) && i++ < 100){ Util::wait(100);