diff --git a/src/output/output.cpp b/src/output/output.cpp index 73f1a7a0..31d18554 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -47,16 +47,21 @@ namespace Mist{ } void Output::bufferLivePacket(DTSC::Packet & packet){ + if (!pushIsOngoing){ + waitForStreamPushReady(); + } if (nProxy.negTimer > 600){ WARN_MSG("No negotiation response from buffer - reconnecting."); nProxy.clear(); reconnect(); } InOutBase::bufferLivePacket(packet); + pushIsOngoing = true; } Output::Output(Socket::Connection & conn) : myConn(conn){ pushing = false; + pushIsOngoing = false; firstTime = 0; firstPacketTime = 0xFFFFFFFFFFFFFFFFull; lastPacketTime = 0; @@ -350,7 +355,7 @@ namespace Mist{ selectDefaultTracks(); if (!myMeta.vod && !isReadyForPlay()){ unsigned long long waitUntil = Util::epoch() + 30; - while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive()){ + while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive() && keepGoing()){ if (Util::epoch() > waitUntil + 45 || (!selectedTracks.size() && Util::epoch() > waitUntil)){ INFO_MSG("Giving up waiting for playable tracks. Stream: %s, IP: %s", streamName.c_str(), getConnectedHost().c_str()); break; @@ -556,7 +561,7 @@ namespace Mist{ VERYHIGH_MSG("Loading track %lu, containing key %lld", trackId, keyNum); unsigned int timeout = 0; unsigned long pageNum = pageNumForKey(trackId, keyNum); - while (config->is_active && myConn && pageNum == -1){ + while (keepGoing() && pageNum == -1){ if (!timeout){ HIGH_MSG("Requesting page with key %lu:%lld", trackId, keyNum); } @@ -679,7 +684,7 @@ namespace Mist{ bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){ if (myMeta.live && myMeta.tracks[tid].lastms < pos){ unsigned int maxTime = 0; - while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20){ + while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20 && keepGoing()){ Util::wait(500); stats(); updateMeta(); @@ -726,7 +731,7 @@ namespace Mist{ }else{ VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset); unsigned int i = 0; - while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ + while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10 && keepGoing()){ Util::wait(100*i); stats(); } @@ -824,7 +829,7 @@ namespace Mist{ } /*LTS-END*/ DONTEVEN_MSG("MistOut client handler started"); - while (config->is_active && myConn && (wantRequest || parseData)){ + while (keepGoing() && (wantRequest || parseData)){ if (wantRequest){ requestHandler(); } @@ -846,11 +851,10 @@ namespace Mist{ firstPacketTime = lastPacketTime; } - //slow down processing, if real time speed is wanted if (realTime){ uint8_t i = 6; - while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){ + while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && keepGoing()){ Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu)); stats(); } @@ -863,7 +867,7 @@ namespace Mist{ //wait at most double the look ahead time, plus ten seconds uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime); uint64_t needsTime = thisPacket.getTime() + needsLookAhead; - while(--timeoutTries){ + while(--timeoutTries && keepGoing()){ bool lookReady = true; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ if (myMeta.tracks[*it].lastms <= needsTime){ @@ -893,6 +897,7 @@ namespace Mist{ Triggers::doTrigger("CONN_STOP", payload, streamName); } /*LTS-END*/ + INFO_MSG("Shutting down because of stream end"); if (!onFinish()){ break; } @@ -1259,9 +1264,20 @@ namespace Mist{ } } if (!nProxy.userClient.isAlive()){ - onFinish(); - myConn.close(); - return; + if (isPushing() && !pushIsOngoing){ + waitForStreamPushReady(); + if (!nProxy.userClient.isAlive()){ + WARN_MSG("Failed to wait for buffer, aborting incoming push"); + onFinish(); + myConn.close(); + return; + } + }else{ + INFO_MSG("Received disconnect request from input"); + onFinish(); + myConn.close(); + return; + } } if (!isPushing()){ IPC::userConnection userConn(nProxy.userClient.getData()); @@ -1320,11 +1336,13 @@ namespace Mist{ // Initialize the stream source if needed, connect to it initialize(); + + waitForStreamPushReady(); //pull the source setting from metadata strmSource = myMeta.sourceURI; if (!strmSource.size()){ - FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + FAIL_MSG("Push rejected - stream %s not configured or unavailable", streamName.c_str()); pushing = false; return false; } @@ -1375,5 +1393,24 @@ namespace Mist{ return true; } + /// Attempts to wait for a stream to finish shutting down if it is, then restarts and reconnects. + void Output::waitForStreamPushReady(){ + uint8_t streamStatus = Util::getStreamStatus(streamName); + MEDIUM_MSG("Current status for %s buffer is %u", streamName.c_str(), streamStatus); + while (streamStatus != STRMSTAT_WAIT && streamStatus != STRMSTAT_READY && keepGoing()){ + INFO_MSG("Waiting for %s buffer to be ready... (%u)", streamName.c_str(), streamStatus); + if (nProxy.userClient.getData()){ + nProxy.userClient.finish(); + nProxy.userClient = IPC::sharedClient(); + } + Util::wait(1000); + streamStatus = Util::getStreamStatus(streamName); + if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){ + reconnect(); + streamStatus = Util::getStreamStatus(streamName); + } + } + } + } diff --git a/src/output/output.h b/src/output/output.h index 43b7b2bc..235cc1dd 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -87,8 +87,9 @@ namespace Mist { std::string getCountry(std::string ip); /*LTS-END*/ void doSync(bool force = false); - - + inline bool keepGoing(){ + return config->is_active && myConn; + } std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum); int pageNumForKey(long unsigned int trackId, long long int keyNum); @@ -132,6 +133,8 @@ namespace Mist { virtual bool isRecording(); virtual bool isPushing(){return pushing;}; bool allowPush(const std::string & passwd); + void waitForStreamPushReady(); + bool pushIsOngoing; void bufferLivePacket(DTSC::Packet & packet); uint64_t firstPacketTime; uint64_t lastPacketTime;