From f4242f23bf797086b633e5f15291541af7a83753 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 4 Jul 2017 12:59:51 +0200 Subject: [PATCH] Incoming pushes now wait for buffer shutdowns and restart it, if needed (no more failing quick successive pushes!), simplified output logic with keepGoing() function, added missing termination checks in some wait loops --- src/output/output.cpp | 76 +++++++++++++++++++++++++++++++------------ src/output/output.h | 5 +++ 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index db9257b0..b6e06167 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -40,16 +40,21 @@ namespace Mist{ } void Output::bufferLivePacket(DTSC::Packet & packet){ + if (!pushIsOngoing){ + waitForStreamPushReady(); + } if (nProxy.negTimer > 600){ WARN_MSG("No negotiation response from buffer - reconnecting."); nProxy.clear(); reconnect(); } InOutBase::bufferLivePacket(packet); + pushIsOngoing = true; } Output::Output(Socket::Connection & conn) : myConn(conn){ pushing = false; + pushIsOngoing = false; firstTime = 0; crc = getpid(); parseData = false; @@ -226,7 +231,7 @@ namespace Mist{ selectDefaultTracks(); if (!myMeta.vod && !isReadyForPlay()){ unsigned long long waitUntil = Util::epoch() + 30; - while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive()){ + while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive() && keepGoing()){ if (Util::epoch() > waitUntil + 45 || (!selectedTracks.size() && Util::epoch() > waitUntil)){ INFO_MSG("Giving up waiting for playable tracks. Stream: %s, IP: %s", streamName.c_str(), getConnectedHost().c_str()); break; @@ -432,7 +437,7 @@ namespace Mist{ VERYHIGH_MSG("Loading track %lu, containing key %lld", trackId, keyNum); unsigned int timeout = 0; unsigned long pageNum = pageNumForKey(trackId, keyNum); - while (config->is_active && myConn && pageNum == -1){ + while (keepGoing() && pageNum == -1){ if (!timeout){ HIGH_MSG("Requesting page with key %lu:%lld", trackId, keyNum); } @@ -555,7 +560,7 @@ namespace Mist{ bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){ if (myMeta.live && myMeta.tracks[tid].lastms < pos){ unsigned int maxTime = 0; - while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20){ + while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20 && keepGoing()){ Util::wait(500); stats(); updateMeta(); @@ -602,7 +607,7 @@ namespace Mist{ }else{ VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset); unsigned int i = 0; - while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ + while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10 && keepGoing()){ Util::wait(100*i); stats(); } @@ -677,7 +682,7 @@ namespace Mist{ int Output::run(){ DONTEVEN_MSG("MistOut client handler started"); - while (config->is_active && myConn && (wantRequest || parseData)){ + while (keepGoing() && (wantRequest || parseData)){ if (wantRequest){ requestHandler(); } @@ -693,13 +698,11 @@ namespace Mist{ initialSeek(); } if (prepareNext()){ - if (thisPacket){ - - + if (thisPacket){ //slow down processing, if real time speed is wanted if (realTime){ uint8_t i = 6; - while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){ + while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && keepGoing()){ Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu)); stats(); } @@ -712,7 +715,7 @@ namespace Mist{ //wait at most double the look ahead time, plus ten seconds uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime); uint64_t needsTime = thisPacket.getTime() + needsLookAhead; - while(--timeoutTries){ + while(--timeoutTries && keepGoing()){ bool lookReady = true; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ if (myMeta.tracks[*it].lastms <= needsTime){ @@ -734,15 +737,16 @@ namespace Mist{ } } - sendNext(); - }else{ - if (!onFinish()){ - break; + sendNext(); + }else{ + INFO_MSG("Shutting down because of stream end"); + if (!onFinish()){ + break; + } } } } - } - stats(); + stats(); } MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data"); onFinish(); @@ -1057,9 +1061,20 @@ namespace Mist{ } } if (!nProxy.userClient.isAlive()){ - onFinish(); - myConn.close(); - return; + if (isPushing() && !pushIsOngoing){ + waitForStreamPushReady(); + if (!nProxy.userClient.isAlive()){ + WARN_MSG("Failed to wait for buffer, aborting incoming push"); + onFinish(); + myConn.close(); + return; + } + }else{ + INFO_MSG("Received disconnect request from input"); + onFinish(); + myConn.close(); + return; + } } if (!isPushing()){ IPC::userConnection userConn(nProxy.userClient.getData()); @@ -1117,11 +1132,13 @@ namespace Mist{ // Initialize the stream source if needed, connect to it initialize(); + + waitForStreamPushReady(); //pull the source setting from metadata strmSource = myMeta.sourceURI; if (!strmSource.size()){ - FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + FAIL_MSG("Push rejected - stream %s not configured or unavailable", streamName.c_str()); pushing = false; return false; } @@ -1144,5 +1161,24 @@ namespace Mist{ return true; } + /// Attempts to wait for a stream to finish shutting down if it is, then restarts and reconnects. + void Output::waitForStreamPushReady(){ + uint8_t streamStatus = Util::getStreamStatus(streamName); + MEDIUM_MSG("Current status for %s buffer is %u", streamName.c_str(), streamStatus); + while (streamStatus != STRMSTAT_WAIT && streamStatus != STRMSTAT_READY && keepGoing()){ + INFO_MSG("Waiting for %s buffer to be ready... (%u)", streamName.c_str(), streamStatus); + if (nProxy.userClient.getData()){ + nProxy.userClient.finish(); + nProxy.userClient = IPC::sharedClient(); + } + Util::wait(1000); + streamStatus = Util::getStreamStatus(streamName); + if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){ + reconnect(); + streamStatus = Util::getStreamStatus(streamName); + } + } + } + } diff --git a/src/output/output.h b/src/output/output.h index 539a17c0..14ce5258 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -75,6 +75,9 @@ namespace Mist { virtual void onFail(); virtual void requestHandler(); private://these *should* not be messed with in child classes. + inline bool keepGoing(){ + return config->is_active && myConn; + } std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum); int pageNumForKey(long unsigned int trackId, long long int keyNum); @@ -114,6 +117,8 @@ namespace Mist { std::map bookKeeping; virtual bool isPushing(){return pushing;}; bool allowPush(const std::string & passwd); + void waitForStreamPushReady(); + bool pushIsOngoing; void bufferLivePacket(DTSC::Packet & packet); };