More debug output for handling timed out tracks, including possible workaround and/or fix.
This commit is contained in:
parent
ef884845e0
commit
d05ae8fa23
4 changed files with 35 additions and 10 deletions
|
@ -416,9 +416,9 @@ namespace Mist {
|
||||||
unsigned int tid = it->first;
|
unsigned int tid = it->first;
|
||||||
//erase this track
|
//erase this track
|
||||||
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)) {
|
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)) {
|
||||||
INFO_MSG("Erasing track %d because not updated for %ds (> %ds)", it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000));
|
WARN_MSG("Erasing %s track %d because not updated for %ds (> %ds)", streamName.c_str(), it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000));
|
||||||
} else {
|
} else {
|
||||||
INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000);
|
WARN_MSG("Erasing %s inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", streamName.c_str(), it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000);
|
||||||
}
|
}
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
if (Triggers::shouldTrigger("STREAM_TRACK_REMOVE")) {
|
if (Triggers::shouldTrigger("STREAM_TRACK_REMOVE")) {
|
||||||
|
|
10
src/io.cpp
10
src/io.cpp
|
@ -266,17 +266,17 @@ namespace Mist {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
//Loop over the index page
|
//Loop over the index page
|
||||||
for (int i = 0; i < 1024; i++) {
|
int len = metaPages[tid].len / 8;
|
||||||
|
for (int i = 0; i < len; ++i) {
|
||||||
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
|
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
|
||||||
int pageNum = ntohl(tmpOffset[0]);
|
unsigned int keyAmount = ntohl(tmpOffset[1]);
|
||||||
int keyAmount = ntohl(tmpOffset[1]);
|
if (keyAmount == 0){continue;}
|
||||||
//Check whether the key is on this page
|
//Check whether the key is on this page
|
||||||
|
unsigned int pageNum = ntohl(tmpOffset[0]);
|
||||||
if (pageNum <= keyNum && keyNum < pageNum + keyAmount) {
|
if (pageNum <= keyNum && keyNum < pageNum + keyAmount) {
|
||||||
///\return The pagenumber of the page the key is located on, if the page is registered on the track index page
|
|
||||||
return pageNum;
|
return pageNum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
///\return 0 if the key was not found
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -436,10 +436,10 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){
|
int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){
|
||||||
if (!nProxy.metaPages.count(trackId)){
|
if (!nProxy.metaPages.count(trackId) || !nProxy.metaPages[trackId].mapped){
|
||||||
char id[NAME_BUFFER_SIZE];
|
char id[NAME_BUFFER_SIZE];
|
||||||
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId);
|
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId);
|
||||||
nProxy.metaPages[trackId].init(id, 8 * 1024);
|
nProxy.metaPages[trackId].init(id, SHM_TRACK_INDEX_SIZE);
|
||||||
}
|
}
|
||||||
if (!nProxy.metaPages[trackId].mapped){return -1;}
|
if (!nProxy.metaPages[trackId].mapped){return -1;}
|
||||||
int len = nProxy.metaPages[trackId].len / 8;
|
int len = nProxy.metaPages[trackId].len / 8;
|
||||||
|
@ -455,6 +455,26 @@ namespace Mist {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the highest page number available for the given trackId.
|
||||||
|
int Output::pageNumMax(long unsigned int trackId){
|
||||||
|
if (!nProxy.metaPages.count(trackId) || !nProxy.metaPages[trackId].mapped){
|
||||||
|
char id[NAME_BUFFER_SIZE];
|
||||||
|
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId);
|
||||||
|
nProxy.metaPages[trackId].init(id, SHM_TRACK_INDEX_SIZE);
|
||||||
|
}
|
||||||
|
if (!nProxy.metaPages[trackId].mapped){return -1;}
|
||||||
|
int len = nProxy.metaPages[trackId].len / 8;
|
||||||
|
int highest = -1;
|
||||||
|
for (int i = 0; i < len; i++){
|
||||||
|
int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8));
|
||||||
|
long amountKey = ntohl(tmpOffset[1]);
|
||||||
|
if (amountKey == 0){continue;}
|
||||||
|
long tmpKey = ntohl(tmpOffset[0]);
|
||||||
|
if (tmpKey > highest){highest = tmpKey;}
|
||||||
|
}
|
||||||
|
return highest;
|
||||||
|
}
|
||||||
|
|
||||||
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
|
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
|
||||||
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
|
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
|
||||||
INFO_MSG("Seek in track %lu to key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber());
|
INFO_MSG("Seek in track %lu to key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber());
|
||||||
|
@ -1023,10 +1043,14 @@ namespace Mist {
|
||||||
if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){
|
if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){
|
||||||
if (myMeta && ++emptyCount < 100){
|
if (myMeta && ++emptyCount < 100){
|
||||||
//we're waiting for new data. Simply retry.
|
//we're waiting for new data. Simply retry.
|
||||||
|
if (emptyCount % 4 == 0){
|
||||||
|
//every second, reload the metaPage.
|
||||||
|
nProxy.metaPages.erase(nxt.tid);
|
||||||
|
}
|
||||||
buffer.insert(nxt);
|
buffer.insert(nxt);
|
||||||
}else{
|
}else{
|
||||||
//after ~25 seconds, give up and drop the track.
|
//after ~25 seconds, give up and drop the track.
|
||||||
WARN_MSG("Empty packet on track %u (%s) @ key %lu (next=%d) - could not reload, dropping track.", nxt.tid, myMeta.tracks[nxt.tid].type.c_str(), nxtKeyNum[nxt.tid]+1, nextPage);
|
WARN_MSG("Empty packet on %s track %u (%s) @ key %lu (nPage=%d, lPage=%d) - could not reload, dropping track.", streamName.c_str(), nxt.tid, myMeta.tracks[nxt.tid].type.c_str(), nxtKeyNum[nxt.tid]+1, nextPage, pageNumMax(nxt.tid));
|
||||||
}
|
}
|
||||||
//keep updating the metadata at 250ms intervals while waiting for more data
|
//keep updating the metadata at 250ms intervals while waiting for more data
|
||||||
Util::wait(250);
|
Util::wait(250);
|
||||||
|
|
|
@ -99,6 +99,7 @@ namespace Mist {
|
||||||
std::map<unsigned long, unsigned int> currKeyOpen;
|
std::map<unsigned long, unsigned int> currKeyOpen;
|
||||||
void loadPageForKey(long unsigned int trackId, long long int keyNum);
|
void loadPageForKey(long unsigned int trackId, long long int keyNum);
|
||||||
int pageNumForKey(long unsigned int trackId, long long int keyNum);
|
int pageNumForKey(long unsigned int trackId, long long int keyNum);
|
||||||
|
int pageNumMax(long unsigned int trackId);
|
||||||
unsigned int lastStats;///<Time of last sending of stats.
|
unsigned int lastStats;///<Time of last sending of stats.
|
||||||
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
|
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
|
||||||
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.
|
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.
|
||||||
|
|
Loading…
Add table
Reference in a new issue