From 5ed7c7ab2264c0e38b628843c3a193c13e29a5af Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 15 May 2017 12:26:27 +0200 Subject: [PATCH] Fixed TS input --- src/input/input_ts.cpp | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 7b404dd4..74337a74 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -47,7 +47,8 @@ void parseThread(void * ignored) { } if (liveStream.isDataTrack(tid)){ - if (!Util::startInput(globalStreamName)) { + if (!Util::startInput(globalStreamName, "push://INTERNAL_ONLY:"+cfgPointer->getString("input"))) {//manually override stream url to start the buffer + FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); return; } } @@ -60,11 +61,12 @@ void parseThread(void * ignored) { char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, globalStreamName.c_str()); myProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + myProxy.userClient.countAsViewer = false; } threadTimer[tid] = Util::bootSecs(); - while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active) { + while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())) { liveStream.parse(tid); if (liveStream.hasPacket(tid)){ liveStream.initializeMetadata(myMeta, tid); @@ -87,6 +89,14 @@ void parseThread(void * ignored) { } } lock.wait(); + std::string reason = "unknown reason"; + if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} + if (!cfgPointer->is_active){reason = "input shutting down";} + if (!(!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())){ + reason = "buffer disconnect"; + cfgPointer->is_active = false; + } + INFO_MSG("Shutting down thread because %s", reason.c_str()); threadTimer.erase(tid); lock.post(); liveStream.eraseTrack(tid); @@ -299,18 +309,15 @@ namespace Mist { } void inputTS::stream() { - if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"))) {//manually override stream url to start the buffer - FAIL_MSG("Could not start buffer for %s", streamName.c_str()); - return; - } IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); uint64_t downCounter = 0; uint64_t startTime = Util::epoch(); uint64_t noDataSince = Util::bootSecs(); + bool hasStarted = false; cfgPointer = config; globalStreamName = streamName; unsigned long long threadCheckTimer = Util::bootSecs(); - while (config->is_active && nProxy.userClient.isAlive()) { + while (config->is_active) { if (inFile) { if (feof(inFile)){ config->is_active = false; @@ -382,11 +389,18 @@ namespace Mist { snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); lock.wait(); + if (hasStarted && !threadTimer.size()){ + INFO_MSG("Shutting down because no active threads and we had input in the past"); + config->is_active = false; + } for (std::set::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) { if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) { WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]); threadTimer.erase(*it); } + if (!hasStarted){ + hasStarted = true; + } if (!threadTimer.count(*it)) { //Add to list of unclaimed threads