Incoming pushes now wait for buffer shutdowns and restart it, if needed (no more failing quick successive pushes!), simplified output logic with keepGoing() function, added missing termination checks in some wait loops

This commit is contained in:
Thulinma 2017-07-04 12:59:51 +02:00
parent 0907d6424f
commit f4242f23bf
2 changed files with 61 additions and 20 deletions

View file

@ -40,16 +40,21 @@ namespace Mist{
}
void Output::bufferLivePacket(DTSC::Packet & packet){
if (!pushIsOngoing){
waitForStreamPushReady();
}
if (nProxy.negTimer > 600){
WARN_MSG("No negotiation response from buffer - reconnecting.");
nProxy.clear();
reconnect();
}
InOutBase::bufferLivePacket(packet);
pushIsOngoing = true;
}
Output::Output(Socket::Connection & conn) : myConn(conn){
pushing = false;
pushIsOngoing = false;
firstTime = 0;
crc = getpid();
parseData = false;
@ -226,7 +231,7 @@ namespace Mist{
selectDefaultTracks();
if (!myMeta.vod && !isReadyForPlay()){
unsigned long long waitUntil = Util::epoch() + 30;
while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive()){
while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive() && keepGoing()){
if (Util::epoch() > waitUntil + 45 || (!selectedTracks.size() && Util::epoch() > waitUntil)){
INFO_MSG("Giving up waiting for playable tracks. Stream: %s, IP: %s", streamName.c_str(), getConnectedHost().c_str());
break;
@ -432,7 +437,7 @@ namespace Mist{
VERYHIGH_MSG("Loading track %lu, containing key %lld", trackId, keyNum);
unsigned int timeout = 0;
unsigned long pageNum = pageNumForKey(trackId, keyNum);
while (config->is_active && myConn && pageNum == -1){
while (keepGoing() && pageNum == -1){
if (!timeout){
HIGH_MSG("Requesting page with key %lu:%lld", trackId, keyNum);
}
@ -555,7 +560,7 @@ namespace Mist{
bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){
if (myMeta.live && myMeta.tracks[tid].lastms < pos){
unsigned int maxTime = 0;
while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20){
while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20 && keepGoing()){
Util::wait(500);
stats();
updateMeta();
@ -602,7 +607,7 @@ namespace Mist{
}else{
VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset);
unsigned int i = 0;
while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){
while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10 && keepGoing()){
Util::wait(100*i);
stats();
}
@ -677,7 +682,7 @@ namespace Mist{
int Output::run(){
DONTEVEN_MSG("MistOut client handler started");
while (config->is_active && myConn && (wantRequest || parseData)){
while (keepGoing() && (wantRequest || parseData)){
if (wantRequest){
requestHandler();
}
@ -693,13 +698,11 @@ namespace Mist{
initialSeek();
}
if (prepareNext()){
if (thisPacket){
if (thisPacket){
//slow down processing, if real time speed is wanted
if (realTime){
uint8_t i = 6;
while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){
while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && keepGoing()){
Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu));
stats();
}
@ -712,7 +715,7 @@ namespace Mist{
//wait at most double the look ahead time, plus ten seconds
uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime);
uint64_t needsTime = thisPacket.getTime() + needsLookAhead;
while(--timeoutTries){
while(--timeoutTries && keepGoing()){
bool lookReady = true;
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].lastms <= needsTime){
@ -734,15 +737,16 @@ namespace Mist{
}
}
sendNext();
}else{
if (!onFinish()){
break;
sendNext();
}else{
INFO_MSG("Shutting down because of stream end");
if (!onFinish()){
break;
}
}
}
}
}
stats();
stats();
}
MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data");
onFinish();
@ -1057,9 +1061,20 @@ namespace Mist{
}
}
if (!nProxy.userClient.isAlive()){
onFinish();
myConn.close();
return;
if (isPushing() && !pushIsOngoing){
waitForStreamPushReady();
if (!nProxy.userClient.isAlive()){
WARN_MSG("Failed to wait for buffer, aborting incoming push");
onFinish();
myConn.close();
return;
}
}else{
INFO_MSG("Received disconnect request from input");
onFinish();
myConn.close();
return;
}
}
if (!isPushing()){
IPC::userConnection userConn(nProxy.userClient.getData());
@ -1117,11 +1132,13 @@ namespace Mist{
// Initialize the stream source if needed, connect to it
initialize();
waitForStreamPushReady();
//pull the source setting from metadata
strmSource = myMeta.sourceURI;
if (!strmSource.size()){
FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str());
FAIL_MSG("Push rejected - stream %s not configured or unavailable", streamName.c_str());
pushing = false;
return false;
}
@ -1144,5 +1161,24 @@ namespace Mist{
return true;
}
/// Attempts to wait for a stream to finish shutting down if it is, then restarts and reconnects.
void Output::waitForStreamPushReady(){
uint8_t streamStatus = Util::getStreamStatus(streamName);
MEDIUM_MSG("Current status for %s buffer is %u", streamName.c_str(), streamStatus);
while (streamStatus != STRMSTAT_WAIT && streamStatus != STRMSTAT_READY && keepGoing()){
INFO_MSG("Waiting for %s buffer to be ready... (%u)", streamName.c_str(), streamStatus);
if (nProxy.userClient.getData()){
nProxy.userClient.finish();
nProxy.userClient = IPC::sharedClient();
}
Util::wait(1000);
streamStatus = Util::getStreamStatus(streamName);
if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){
reconnect();
streamStatus = Util::getStreamStatus(streamName);
}
}
}
}

View file

@ -75,6 +75,9 @@ namespace Mist {
virtual void onFail();
virtual void requestHandler();
private://these *should* not be messed with in child classes.
inline bool keepGoing(){
return config->is_active && myConn;
}
std::map<unsigned long, unsigned int> currKeyOpen;
void loadPageForKey(long unsigned int trackId, long long int keyNum);
int pageNumForKey(long unsigned int trackId, long long int keyNum);
@ -114,6 +117,8 @@ namespace Mist {
std::map<int,DTSCPageData> bookKeeping;
virtual bool isPushing(){return pushing;};
bool allowPush(const std::string & passwd);
void waitForStreamPushReady();
bool pushIsOngoing;
void bufferLivePacket(DTSC::Packet & packet);
};