diff --git a/lib/defines.h b/lib/defines.h index 0c561630..18c2ab25 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -81,7 +81,10 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define SHM_STATISTICS "MstSTAT" #define SHM_USERS "MstUSER%s" //%s stream name #define SHM_TRIGGER "MstTRIG%s" //%s trigger name -#define SEM_LIVE "MstLIVE%s" //%s stream name +#define SEM_LIVE "/MstLIVE%s" //%s stream name +#define SEM_INPUT "/MstInpt%s" //%s stream name +#define SEM_CONF "/MstConfLock" +#define SHM_CONF "MstConf" #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define SHM_STREAM_ENCRYPT "MstCRYP%s" //%s stream name diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 9af28ed8..f1618206 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -132,7 +132,7 @@ namespace IPC { while (!(*this) && timer++ < 10) { #if defined(__CYGWIN__) || defined(_WIN32) std::string semaName = "Global\\"; - semaName += name; + semaName += (name+1); if (oflag & O_CREAT) { if (oflag & O_EXCL) { //attempt opening, if succes, close handle and return false; diff --git a/lib/stream.cpp b/lib/stream.cpp index c5704daf..39c91100 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -86,8 +86,8 @@ JSON::Value Util::getStreamConfig(std::string streamname){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return result; } - IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); @@ -107,7 +107,10 @@ JSON::Value Util::getStreamConfig(std::string streamname){ /// Checks if the given streamname has an active input serving it. Returns true if this is the case. /// Assumes the streamname has already been through sanitizeName()! bool Util::streamAlive(std::string & streamname){ - IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamname.c_str()); + IPC::semaphore playerLock(semName, O_RDWR, ACCESSPERMS, 1); + if (!playerLock){return false;} if (!playerLock.tryWait()) { playerLock.close(); return true; @@ -151,8 +154,8 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir } //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); //Lock the config to prevent race conditions and corruption issues while reading configLock.wait(); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); @@ -304,8 +307,8 @@ int Util::startRecording(std::string streamname) { } // Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); //Lock the config to prevent race conditions and corruption issues while reading configLock.wait(); diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index 05ce7157..e92d92fa 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -10,10 +10,7 @@ namespace TS { Stream::Stream(bool _threaded){ threaded = _threaded; if (threaded){ - globalSem.open("MstTSInputLock", O_CREAT | O_EXCL | O_RDWR, ACCESSPERMS, 1); - if (!globalSem) { - globalSem.open("MstTSInputLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); - } + globalSem.open("MstTSInputLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!globalSem) { FAIL_MSG("Creating semaphore failed: %s", strerror(errno)); threaded = false; @@ -23,6 +20,12 @@ namespace TS { } } + Stream::~Stream(){ + if (threaded){ + globalSem.unlink(); + } + } + void Stream::parse(char * newPack, unsigned long long bytePos) { Packet newPacket; newPacket.FromPointer(newPack); diff --git a/lib/ts_stream.h b/lib/ts_stream.h index 7b9ebeec..a93aba93 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -21,6 +21,7 @@ namespace TS { class Stream{ public: Stream(bool _threaded = false); + ~Stream(); void add(char * newPack, unsigned long long bytePos = 0); void add(Packet & newPack, unsigned long long bytePos = 0); void parse(Packet & newPack, unsigned long long bytePos); diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 85ada5dc..9c9db8ae 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -100,6 +100,7 @@ void statusMonitor(void * np){ #ifdef UPDATER unsigned long updatechecker = Util::epoch(); /*LTS*/ #endif + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); while (Controller::conf.is_active){ /*LTS-START*/ #ifdef UPDATER @@ -120,7 +121,6 @@ void statusMonitor(void * np){ changed |= Controller::CheckAllStreams(Controller::Storage["streams"]); //check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second... - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){ //that failed. We now unlock it, no matter what - and print a warning that it was stuck. WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); @@ -133,6 +133,7 @@ void statusMonitor(void * np){ } Util::wait(5000);//wait at least 5 seconds } + configLock.unlink(); } ///\brief The main entry point for the controller. diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 1a41afd4..a2331066 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -100,8 +100,8 @@ namespace Controller { } if (!changed){return;}//cancel further processing if no changes - static IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE, true); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + static IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, true); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); //lock semaphore configLock.wait(); //write config diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 960a442f..1343902c 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -179,12 +179,16 @@ namespace Mist { } } } + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); + IPC::semaphore liveMeta(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + liveMeta.unlink(); } ///Cleans up any left-over data for the current stream void inputBuffer::onCrash(){ - WARN_MSG("BUffer crashed. Cleaning."); + WARN_MSG("Buffer crashed. Cleaning."); streamName = config->getString("streamname"); char pageName[NAME_BUFFER_SIZE]; @@ -194,23 +198,12 @@ namespace Mist { for (long unsigned i = 0; i < 15; ++i){ unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024)); IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false); - tmp.master = false; if (tmp.mapped){ + tmp.master = true; WARN_MSG("Wiping %s", std::string(baseName + (char)(i + (int)'A')).c_str()); memset(tmp.mapped, 0xFF, size); } } - //Wait five seconds to allow everyone to disconnect gracefully. - Util::wait(5000); - //Now delete those pages - for (long unsigned i = 0; i < 15; ++i){ - unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024)); - IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false); - tmp.master = true; - if (tmp.mapped){ - WARN_MSG("Wiping %s some more", std::string(baseName + (char)(i + (int)'A')).c_str()); - } - } { //Delete the live stream semaphore, if any. @@ -859,8 +852,8 @@ namespace Mist { std::string strName = config->getString("streamname"); Util::sanitizeName(strName); strName = strName.substr(0, (strName.find_first_of("+ "))); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); long long tmpNum; diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index fc0a8735..1cbbe72d 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -17,7 +17,7 @@ #include #include - +#define SEM_TS_CLAIM "/MstTSIN%s" /// \todo Implement this trigger equivalent... @@ -46,8 +46,9 @@ std::set claimableThreads; void parseThread(void * ignored) { - std::string semName = "MstInTSStreamClaim" + globalStreamName; - IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); + IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); int tid = -1; lock.wait(); @@ -151,12 +152,14 @@ namespace Mist { fclose(inFile); } #ifdef TSLIVE_INPUT - std::string semName = "MstInTSStreamClaim" + globalStreamName; - IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); + IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); lock.wait(); threadTimer.clear(); claimableThreads.clear(); lock.post(); + lock.unlink(); #endif } @@ -394,8 +397,9 @@ namespace Mist { //Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 2) { std::set activeTracks = liveStream.getActiveTracks(); - std::string semName = "MstInTSStreamClaim" + globalStreamName; - IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); + IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); lock.wait(); for (std::set::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) { if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) { @@ -422,8 +426,9 @@ namespace Mist { } void inputTS::finish() { - std::string semName = "MstInTSStreamClaim" + globalStreamName; - IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); + IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); int threadCount = 0; diff --git a/src/input/mist_in.cpp b/src/input/mist_in.cpp index baeb2ade..d3319c59 100644 --- a/src/input/mist_in.cpp +++ b/src/input/mist_in.cpp @@ -20,7 +20,9 @@ int main(int argc, char * argv[]) { #ifndef INPUT_NOLOCK IPC::semaphore playerLock; if (streamName.size()){ - playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); + playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!playerLock.tryWait()){ DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); return 1; @@ -71,6 +73,7 @@ int main(int argc, char * argv[]) { } #ifndef INPUT_NOLOCK playerLock.post(); + playerLock.unlink(); playerLock.close(); #endif } diff --git a/src/output/output.cpp b/src/output/output.cpp index 53baaa36..0a481ed9 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -156,21 +156,23 @@ namespace Mist { void Output::updateMeta(){ //read metadata from page to myMeta variable - static char liveSemName[NAME_BUFFER_SIZE]; - snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); - IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - bool lock = myMeta.live; - if (lock){ - liveMeta.wait(); - } if (nProxy.metaPages[0].mapped){ + IPC::semaphore * liveSem = 0; + if (myMeta.live){ + static char liveSemName[NAME_BUFFER_SIZE]; + snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); + liveSem = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + liveSem->wait(); + } DTSC::Packet tmpMeta(nProxy.metaPages[0].mapped, nProxy.metaPages[0].len, true); if (tmpMeta.getVersion()){ myMeta.reinit(tmpMeta); } - } - if (lock){ - liveMeta.post(); + if (liveSem){ + liveSem->post(); + delete liveSem; + liveSem = 0; + } } } diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index ac3c4e7c..a4979fba 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -105,9 +105,9 @@ namespace Mist { } //loop over the connectors - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); unsigned int capa_ctr = capa.getSize(); for (unsigned int i = 0; i < capa_ctr; ++i){ @@ -278,9 +278,9 @@ namespace Mist { for (int i=0; i<20; i++){argarr[i] = 0;} int id = -1; - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); unsigned int prots_ctr = prots.getSize(); @@ -376,8 +376,8 @@ namespace Mist { trustedProxies.insert("::1"); trustedProxies.insert("127.0.0.1"); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Open server config - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Open server config + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); std::string trustedList = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("trustedproxy").asString(); configLock.post(); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index cdf52e01..5b663702 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -272,9 +272,9 @@ namespace Mist { std::string port, url_rel; - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember("RTMP"); unsigned int pro_cnt = prtcls.getSize(); @@ -347,11 +347,11 @@ namespace Mist { if (config->getString("nostreamtext") != ""){ json_resp["on_error"] = config->getString("nostreamtext"); } - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); IPC::semaphore metaLocker(std::string("liveMeta@" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); bool metaLock = false; configLock.wait(); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); DTSC::Scan strm = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(streamName).getMember("meta"); IPC::sharedPage streamIndex; if (!strm){ diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index db94cb80..227f1574 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -522,8 +522,8 @@ namespace Mist { Util::sanitizeName(streamName); //pull the server configuration std::string smp = streamName.substr(0,(streamName.find_first_of("+ "))); - IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp);