From d05ae8fa2349558df0aea1433bfef7a0aee29a0b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 24 May 2016 18:10:53 +0200 Subject: [PATCH] More debug output for handling timed out tracks, including possible workaround and/or fix. --- src/input/input_buffer.cpp | 4 ++-- src/io.cpp | 10 +++++----- src/output/output.cpp | 30 +++++++++++++++++++++++++++--- src/output/output.h | 1 + 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index e69b889e..2c4bb782 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -416,9 +416,9 @@ namespace Mist { unsigned int tid = it->first; //erase this track if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)) { - INFO_MSG("Erasing track %d because not updated for %ds (> %ds)", it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000)); + WARN_MSG("Erasing %s track %d because not updated for %ds (> %ds)", streamName.c_str(), it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000)); } else { - INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000); + WARN_MSG("Erasing %s inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", streamName.c_str(), it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000); } /*LTS-START*/ if (Triggers::shouldTrigger("STREAM_TRACK_REMOVE")) { diff --git a/src/io.cpp b/src/io.cpp index cd3ca831..f55fce2a 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -266,17 +266,17 @@ namespace Mist { return 0; } //Loop over the index page - for (int i = 0; i < 1024; i++) { + int len = metaPages[tid].len / 8; + for (int i = 0; i < len; ++i) { int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8)); - int pageNum = ntohl(tmpOffset[0]); - int keyAmount = ntohl(tmpOffset[1]); + unsigned int keyAmount = ntohl(tmpOffset[1]); + if (keyAmount == 0){continue;} //Check whether the key is on this page + unsigned int pageNum = ntohl(tmpOffset[0]); if (pageNum <= keyNum && keyNum < pageNum + keyAmount) { - ///\return The pagenumber of the page the key is located on, if the page is registered on the track index page return pageNum; } } - ///\return 0 if the key was not found return 0; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 13b2b96e..8c27e596 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -436,10 +436,10 @@ namespace Mist { } int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){ - if (!nProxy.metaPages.count(trackId)){ + if (!nProxy.metaPages.count(trackId) || !nProxy.metaPages[trackId].mapped){ char id[NAME_BUFFER_SIZE]; snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId); - nProxy.metaPages[trackId].init(id, 8 * 1024); + nProxy.metaPages[trackId].init(id, SHM_TRACK_INDEX_SIZE); } if (!nProxy.metaPages[trackId].mapped){return -1;} int len = nProxy.metaPages[trackId].len / 8; @@ -454,6 +454,26 @@ namespace Mist { } return -1; } + + /// Gets the highest page number available for the given trackId. + int Output::pageNumMax(long unsigned int trackId){ + if (!nProxy.metaPages.count(trackId) || !nProxy.metaPages[trackId].mapped){ + char id[NAME_BUFFER_SIZE]; + snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId); + nProxy.metaPages[trackId].init(id, SHM_TRACK_INDEX_SIZE); + } + if (!nProxy.metaPages[trackId].mapped){return -1;} + int len = nProxy.metaPages[trackId].len / 8; + int highest = -1; + for (int i = 0; i < len; i++){ + int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8)); + long amountKey = ntohl(tmpOffset[1]); + if (amountKey == 0){continue;} + long tmpKey = ntohl(tmpOffset[0]); + if (tmpKey > highest){highest = tmpKey;} + } + return highest; + } void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){ @@ -1023,10 +1043,14 @@ namespace Mist { if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){ if (myMeta && ++emptyCount < 100){ //we're waiting for new data. Simply retry. + if (emptyCount % 4 == 0){ + //every second, reload the metaPage. + nProxy.metaPages.erase(nxt.tid); + } buffer.insert(nxt); }else{ //after ~25 seconds, give up and drop the track. - WARN_MSG("Empty packet on track %u (%s) @ key %lu (next=%d) - could not reload, dropping track.", nxt.tid, myMeta.tracks[nxt.tid].type.c_str(), nxtKeyNum[nxt.tid]+1, nextPage); + WARN_MSG("Empty packet on %s track %u (%s) @ key %lu (nPage=%d, lPage=%d) - could not reload, dropping track.", streamName.c_str(), nxt.tid, myMeta.tracks[nxt.tid].type.c_str(), nxtKeyNum[nxt.tid]+1, nextPage, pageNumMax(nxt.tid)); } //keep updating the metadata at 250ms intervals while waiting for more data Util::wait(250); diff --git a/src/output/output.h b/src/output/output.h index 69c33357..a3ebd152 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -99,6 +99,7 @@ namespace Mist { std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum); int pageNumForKey(long unsigned int trackId, long long int keyNum); + int pageNumMax(long unsigned int trackId); unsigned int lastStats;///