diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 6abc42c4..3582c369 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -141,15 +141,32 @@ namespace Mist { return true; } + void inputBuffer::eraseTrackDataPages(unsigned long tid){ + if (!bufferLocations.count(tid)){ + return; + } + for (std::map::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){ + char thisPageName[NAME_BUFFER_SIZE]; + snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first); + curPage[tid].init(thisPageName, 20971520, false, false); + curPage[tid].master = true; + curPage.erase(tid); + } + bufferLocations.erase(tid); + metaPages[tid].master = true; + metaPages.erase(tid); + } + void inputBuffer::finish() { Input::finish(); - for (std::map >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++) { - for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { - char thisPageName[NAME_BUFFER_SIZE]; - snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first); - curPage[it->first].init(thisPageName, 20971520, false, false); - curPage[it->first].master = true; - curPage.erase(it->first); + updateMeta(); + if (bufferLocations.size()){ + std::set toErase; + for (std::map >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++){ + toErase.insert(it->first); + } + for (std::set::iterator it = toErase.begin(); it != toErase.end(); ++it){ + eraseTrackDataPages(*it); } } } @@ -262,9 +279,13 @@ namespace Mist { metaPages.erase(value); } if (activeTracks.count(value)) { + updateMeta(); + eraseTrackDataPages(value); activeTracks.erase(value); bufferLocations.erase(value); } + metaPages[value].master = true; + metaPages.erase(value); continue; } } @@ -298,7 +319,11 @@ namespace Mist { } //If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later if (!metaPages[value].mapped) { - ///\todo Maybe add a timeout counter here, for when we dont expect the track to appear anymore + //remove the negotiation if it has timed out + if (++negotiationTimeout[value] >= 1000){ + negotiatingTracks.erase(value); + negotiationTimeout.erase(value); + } continue; } @@ -316,6 +341,14 @@ namespace Mist { DTSC::Meta trackMeta(tempJSONForMeta); //If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call. if (!trackMeta.tracks.count(value)) { + //remove the negotiation if it has timed out + if (++negotiationTimeout[value] >= 1000){ + negotiatingTracks.erase(value); + //Set master to true before erasing the page, because we are responsible for cleaning up unused pages + metaPages[value].master = true; + metaPages.erase(value); + negotiationTimeout.erase(value); + } continue; } @@ -335,15 +368,17 @@ namespace Mist { //or if the firstms of the replacement track is later than the lastms on the existing track if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms) { if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0) { - INFO_MSG("Resume of track %d detected, coming from temporary track %lu of user %u", finalMap, value, id); + INFO_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); } else { - INFO_MSG("New track detected, assigned track id %d, coming from temporary track %lu of user %u", finalMap, value, id); + INFO_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id); } } else { //Otherwise replace existing track INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); myMeta.tracks.erase(finalMap); //Set master to true before erasing the page, because we are responsible for cleaning up unused pages + updateMeta(); + eraseTrackDataPages(value); metaPages[finalMap].master = true; metaPages.erase(finalMap); bufferLocations.erase(finalMap); @@ -432,7 +467,7 @@ namespace Mist { //Otherwise open and parse the page //Open the page if it is not yet open - if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum) { + if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum || !curPage[tNum].mapped){ //DO NOT ERASE THE PAGE HERE, master is not set to true curPageNum.erase(tNum); char nextPageName[NAME_BUFFER_SIZE]; @@ -448,6 +483,10 @@ namespace Mist { DTSC::Packet tmpPack; + if (!curPage[tNum].mapped[pageData.curOffset]){ + VERYHIGH_MSG("No packet on page %lu for track %lu, waiting...", pageNum, tNum); + return; + } tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0); //No new data has been written on the page since last update if (!tmpPack) { diff --git a/src/input/input_buffer.h b/src/input/input_buffer.h index e6e96af8..f3fce2ad 100644 --- a/src/input/input_buffer.h +++ b/src/input/input_buffer.h @@ -22,11 +22,13 @@ namespace Mist { void trackSelect(std::string trackSpec); bool removeKey(unsigned int tid); void removeUnused(); + void eraseTrackDataPages(unsigned long tid); void finish(); void userCallback(char * data, size_t len, unsigned int id); std::set negotiatingTracks; std::set activeTracks; std::map lastUpdated; + std::map negotiationTimeout; ///Maps trackid to a pagenum->pageData map std::map > bufferLocations; std::map pushLocation; diff --git a/src/io.cpp b/src/io.cpp index b1934d87..0e868c44 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -531,6 +531,9 @@ namespace Mist { INFO_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage); trackMap[tid] = finalTid; + if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){ + myMeta.tracks[finalTid].lastms = 0; + } trackState[tid] = FILL_ACC; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), finalTid);