From fa41e02047a6472a82f29a2a5b959f57235896ef Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 5 May 2016 14:28:39 +0200 Subject: [PATCH] Removed 5s timeout from streamAlive call. --- lib/shared_memory.cpp | 8 ++++---- lib/shared_memory.h | 4 ++-- lib/stream.cpp | 10 ++++++++-- src/controller/controller_statistics.cpp | 4 +++- src/input/input.cpp | 12 ++++++------ src/output/output.cpp | 18 +++++++++--------- src/output/output_dtsc.cpp | 3 +-- src/output/output_http_internal.cpp | 4 +++- 8 files changed, 36 insertions(+), 27 deletions(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index f149c834..1ed06b12 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -95,13 +95,13 @@ namespace IPC { ///\param oflag The flags with which to open the semaphore ///\param mode The mode in which to create the semaphore, if O_CREAT is given in oflag, ignored otherwise ///\param value The initial value of the semaphore if O_CREAT is given in oflag, ignored otherwise - semaphore::semaphore(const char * name, int oflag, mode_t mode, unsigned int value) { + semaphore::semaphore(const char * name, int oflag, mode_t mode, unsigned int value, bool noWait) { #if defined(__CYGWIN__) || defined(_WIN32) mySem = 0; #else mySem = SEM_FAILED; #endif - open(name, oflag, mode, value); + open(name, oflag, mode, value, noWait); } ///\brief The deconstructor @@ -126,7 +126,7 @@ namespace IPC { ///\param oflag The flags with which to open the semaphore ///\param mode The mode in which to create the semaphore, if O_CREAT is given in oflag, ignored otherwise ///\param value The initial value of the semaphore if O_CREAT is given in oflag, ignored otherwise - void semaphore::open(const char * name, int oflag, mode_t mode, unsigned int value) { + void semaphore::open(const char * name, int oflag, mode_t mode, unsigned int value, bool noWait) { close(); int timer = 0; while (!(*this) && timer++ < 10) { @@ -165,7 +165,7 @@ namespace IPC { mySem = sem_open(name, oflag); } if (!(*this)) { - if (errno == ENOENT) { + if (errno == ENOENT && !noWait) { Util::wait(500); } else { break; diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 3ee041bf..adb99c06 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -60,10 +60,10 @@ namespace IPC { class semaphore { public: semaphore(); - semaphore(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0); + semaphore(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0, bool noWait = false); ~semaphore(); operator bool() const; - void open(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0); + void open(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0, bool noWait = false); int getVal() const; void post(); void wait(); diff --git a/lib/stream.cpp b/lib/stream.cpp index f01cc937..ae4e450c 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -109,7 +109,7 @@ JSON::Value Util::getStreamConfig(std::string streamname){ bool Util::streamAlive(std::string & streamname){ char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamname.c_str()); - IPC::semaphore playerLock(semName, O_RDWR, ACCESSPERMS, 1); + IPC::semaphore playerLock(semName, O_RDWR, ACCESSPERMS, 1, true); if (!playerLock){return false;} if (!playerLock.tryWait()) { playerLock.close(); @@ -310,7 +310,13 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); _exit(42); } - return true; + + unsigned int waiting = 0; + while (!streamAlive(streamname) && ++waiting < 40){ + Util::sleep(250); + } + + return streamAlive(streamname); } /* roxlu-begin */ diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 4220af31..6cf912e7 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -783,7 +783,9 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){ snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, it->c_str()); streamIndex.init(pageId, DEFAULT_META_PAGE_SIZE, false, false); if (streamIndex.mapped){ - IPC::semaphore metaLocker(std::string("liveMeta@" + (*it)).c_str(), O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1); + static char liveSemName[NAME_BUFFER_SIZE]; + snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, it->c_str()); + IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1); metaLocker.wait(); DTSC::Scan strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan(); long long lms = 0; diff --git a/src/input/input.cpp b/src/input/input.cpp index 73882984..7c2dfa19 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -195,8 +195,14 @@ namespace Mist { /// ~~~~~~~~~~~~~~~ // void Input::serve(){ + if (!isBuffer) { + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + bufferFrame(it->first, 1); + } + } char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + userPage.init(userPageName, PLAY_EX_SIZE, true); /*LTS-START*/ if(Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){ std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; @@ -205,12 +211,6 @@ namespace Mist { } } /*LTS-END*/ - userPage.init(userPageName, PLAY_EX_SIZE, true); - if (!isBuffer) { - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { - bufferFrame(it->first, 1); - } - } DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); diff --git a/src/output/output.cpp b/src/output/output.cpp index 88a23693..0b9f1d8d 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -278,15 +278,6 @@ namespace Mist { onFail(); return; } - char pageId[NAME_BUFFER_SIZE]; - snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); - nProxy.metaPages.clear(); - nProxy.metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE); - if (!nProxy.metaPages[0].mapped){ - FAIL_MSG("Could not connect to server for %s", streamName.c_str()); - onFail(); - return; - } if (statsPage.getData()){ statsPage.finish(); } @@ -297,6 +288,15 @@ namespace Mist { char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + char pageId[NAME_BUFFER_SIZE]; + snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + nProxy.metaPages.clear(); + nProxy.metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE); + if (!nProxy.metaPages[0].mapped){ + FAIL_MSG("Could not connect to server for %s", streamName.c_str()); + onFail(); + return; + } updateMeta(); } diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index bf19e790..069de089 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -88,8 +88,7 @@ namespace Mist { } } }else{ - fastAsPossibleTime = 50000;//50 seconds - realTime = 0; + realTime = 1000; } } diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 5b663702..419e446a 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -348,7 +348,9 @@ namespace Mist { json_resp["on_error"] = config->getString("nostreamtext"); } IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - IPC::semaphore metaLocker(std::string("liveMeta@" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + static char liveSemName[NAME_BUFFER_SIZE]; + snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); + IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); bool metaLock = false; configLock.wait(); IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);