From 185fd6ebb87d188ba79773f4c08a2e4a6fc914b8 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 28 Nov 2017 15:17:08 +0100 Subject: [PATCH] TS input now follows generic code path, AlwaysOn made slightly more persistent --- src/input/input.cpp | 4 ++++ src/input/input_ts.cpp | 48 +++++++++++++++++------------------------- src/input/input_ts.h | 4 +++- 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index e544f3dd..26815803 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -465,6 +465,10 @@ namespace Mist { // - INPUT_TIMEOUT seconds haven't passed yet, // - this is a live stream and at least two of the biggest fragment haven't passed yet, bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500))); + if (!ret && config->is_active && isAlwaysOn()){ + ret = true; + activityCounter = Util::bootSecs(); + } /*LTS-START*/ if (!ret){ if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){ diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 4fedaabc..23c4859b 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -375,18 +375,7 @@ namespace Mist { Util::fseek(inFile, seekPos, SEEK_SET);//seek to the correct position } - void inputTS::stream() { - IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; - } + bool inputTS::openStreamSource(){ const std::string & inpt = config->getString("input"); if (inpt.substr(0, 8) == "tsudp://"){ HTTP::URL input_url(inpt); @@ -394,12 +383,19 @@ namespace Mist { udpCon.bind(input_url.getPort(), input_url.host, input_url.path); if (udpCon.getSock() == -1){ FAIL_MSG("Could not open UDP socket. Aborting."); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - return; + return false; } } + return true; + } + + void inputTS::parseStreamHeader(){ + //Placeholder to force normal code to continue despite no tracks available + myMeta.tracks[0].type = "audio"; + } + + std::string inputTS::streamMainLoop() { + myMeta.tracks.clear();//wipe the placeholder track from above IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); uint64_t downCounter = 0; uint64_t startTime = Util::epoch(); @@ -409,7 +405,7 @@ namespace Mist { cfgPointer = config; globalStreamName = streamName; unsigned long long threadCheckTimer = Util::bootSecs(); - while (config->is_active) { + while (config->is_active && nProxy.userClient.isAlive()) { if (tcpCon) { if (tcpCon.spool()){ while (tcpCon.Received().available(188)){ @@ -431,7 +427,7 @@ namespace Mist { } if (!tcpCon){ config->is_active = false; - INFO_MSG("End of streamed input"); + return "end of streamed input"; } } else { std::string leftData; @@ -499,10 +495,7 @@ namespace Mist { if (statsPage.getData()){ if (!statsPage.isAlive()){ config->is_active = false; - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - return; + return "received shutdown request from controller"; } IPC::statExchange tmpEx(statsPage.getData()); tmpEx.now(now); @@ -515,14 +508,15 @@ namespace Mist { tmpEx.lastSecond(0); statsPage.keepAlive(); } + nProxy.userClient.keepAlive(); std::set activeTracks = liveStream.getActiveTracks(); { tthread::lock_guard guard(threadClaimMutex); if (hasStarted && !threadTimer.size()){ if (!isAlwaysOn()){ - INFO_MSG("Shutting down because no active threads and we had input in the past"); config->is_active = false; + return "no active threads and we had input in the past"; }else{ hasStarted = false; } @@ -551,18 +545,14 @@ namespace Mist { } if (Util::bootSecs() - noDataSince > 20){ if (!isAlwaysOn()){ - WARN_MSG("No packets received for 20 seconds - terminating"); config->is_active = false; + return "No packets received for 20 seconds - terminating"; }else{ noDataSince = Util::bootSecs(); } } } - finish(); - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - INFO_MSG("Input for stream %s closing clean", streamName.c_str()); + return "received shutdown request"; } void inputTS::finish() { diff --git a/src/input/input_ts.h b/src/input/input_ts.h index 8d543ee7..6e4677d7 100755 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -24,7 +24,9 @@ namespace Mist { void seek(int seekTime); void trackSelect(std::string trackSpec); void readPMT(); - void stream(); + bool openStreamSource(); + void parseStreamHeader(); + std::string streamMainLoop(); void finish(); FILE * inFile;///