diff --git a/src/output/output.cpp b/src/output/output.cpp index db9257b0..b6e06167 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -40,16 +40,21 @@ namespace Mist{ } void Output::bufferLivePacket(DTSC::Packet & packet){ + if (!pushIsOngoing){ + waitForStreamPushReady(); + } if (nProxy.negTimer > 600){ WARN_MSG("No negotiation response from buffer - reconnecting."); nProxy.clear(); reconnect(); } InOutBase::bufferLivePacket(packet); + pushIsOngoing = true; } Output::Output(Socket::Connection & conn) : myConn(conn){ pushing = false; + pushIsOngoing = false; firstTime = 0; crc = getpid(); parseData = false; @@ -226,7 +231,7 @@ namespace Mist{ selectDefaultTracks(); if (!myMeta.vod && !isReadyForPlay()){ unsigned long long waitUntil = Util::epoch() + 30; - while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive()){ + while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive() && keepGoing()){ if (Util::epoch() > waitUntil + 45 || (!selectedTracks.size() && Util::epoch() > waitUntil)){ INFO_MSG("Giving up waiting for playable tracks. Stream: %s, IP: %s", streamName.c_str(), getConnectedHost().c_str()); break; @@ -432,7 +437,7 @@ namespace Mist{ VERYHIGH_MSG("Loading track %lu, containing key %lld", trackId, keyNum); unsigned int timeout = 0; unsigned long pageNum = pageNumForKey(trackId, keyNum); - while (config->is_active && myConn && pageNum == -1){ + while (keepGoing() && pageNum == -1){ if (!timeout){ HIGH_MSG("Requesting page with key %lu:%lld", trackId, keyNum); } @@ -555,7 +560,7 @@ namespace Mist{ bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){ if (myMeta.live && myMeta.tracks[tid].lastms < pos){ unsigned int maxTime = 0; - while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20){ + while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20 && keepGoing()){ Util::wait(500); stats(); updateMeta(); @@ -602,7 +607,7 @@ namespace Mist{ }else{ VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset); unsigned int i = 0; - while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ + while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10 && keepGoing()){ Util::wait(100*i); stats(); } @@ -677,7 +682,7 @@ namespace Mist{ int Output::run(){ DONTEVEN_MSG("MistOut client handler started"); - while (config->is_active && myConn && (wantRequest || parseData)){ + while (keepGoing() && (wantRequest || parseData)){ if (wantRequest){ requestHandler(); } @@ -693,13 +698,11 @@ namespace Mist{ initialSeek(); } if (prepareNext()){ - if (thisPacket){ - - + if (thisPacket){ //slow down processing, if real time speed is wanted if (realTime){ uint8_t i = 6; - while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){ + while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && keepGoing()){ Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu)); stats(); } @@ -712,7 +715,7 @@ namespace Mist{ //wait at most double the look ahead time, plus ten seconds uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime); uint64_t needsTime = thisPacket.getTime() + needsLookAhead; - while(--timeoutTries){ + while(--timeoutTries && keepGoing()){ bool lookReady = true; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ if (myMeta.tracks[*it].lastms <= needsTime){ @@ -734,15 +737,16 @@ namespace Mist{ } } - sendNext(); - }else{ - if (!onFinish()){ - break; + sendNext(); + }else{ + INFO_MSG("Shutting down because of stream end"); + if (!onFinish()){ + break; + } } } } - } - stats(); + stats(); } MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data"); onFinish(); @@ -1057,9 +1061,20 @@ namespace Mist{ } } if (!nProxy.userClient.isAlive()){ - onFinish(); - myConn.close(); - return; + if (isPushing() && !pushIsOngoing){ + waitForStreamPushReady(); + if (!nProxy.userClient.isAlive()){ + WARN_MSG("Failed to wait for buffer, aborting incoming push"); + onFinish(); + myConn.close(); + return; + } + }else{ + INFO_MSG("Received disconnect request from input"); + onFinish(); + myConn.close(); + return; + } } if (!isPushing()){ IPC::userConnection userConn(nProxy.userClient.getData()); @@ -1117,11 +1132,13 @@ namespace Mist{ // Initialize the stream source if needed, connect to it initialize(); + + waitForStreamPushReady(); //pull the source setting from metadata strmSource = myMeta.sourceURI; if (!strmSource.size()){ - FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + FAIL_MSG("Push rejected - stream %s not configured or unavailable", streamName.c_str()); pushing = false; return false; } @@ -1144,5 +1161,24 @@ namespace Mist{ return true; } + /// Attempts to wait for a stream to finish shutting down if it is, then restarts and reconnects. + void Output::waitForStreamPushReady(){ + uint8_t streamStatus = Util::getStreamStatus(streamName); + MEDIUM_MSG("Current status for %s buffer is %u", streamName.c_str(), streamStatus); + while (streamStatus != STRMSTAT_WAIT && streamStatus != STRMSTAT_READY && keepGoing()){ + INFO_MSG("Waiting for %s buffer to be ready... (%u)", streamName.c_str(), streamStatus); + if (nProxy.userClient.getData()){ + nProxy.userClient.finish(); + nProxy.userClient = IPC::sharedClient(); + } + Util::wait(1000); + streamStatus = Util::getStreamStatus(streamName); + if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){ + reconnect(); + streamStatus = Util::getStreamStatus(streamName); + } + } + } + } diff --git a/src/output/output.h b/src/output/output.h index 539a17c0..14ce5258 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -75,6 +75,9 @@ namespace Mist { virtual void onFail(); virtual void requestHandler(); private://these *should* not be messed with in child classes. + inline bool keepGoing(){ + return config->is_active && myConn; + } std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum); int pageNumForKey(long unsigned int trackId, long long int keyNum); @@ -114,6 +117,8 @@ namespace Mist { std::map bookKeeping; virtual bool isPushing(){return pushing;}; bool allowPush(const std::string & passwd); + void waitForStreamPushReady(); + bool pushIsOngoing; void bufferLivePacket(DTSC::Packet & packet); };