From 747438746c068f9217f804fab4d58bdf57156a43 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 18 Aug 2022 15:45:46 +0200 Subject: [PATCH] Change VoD data page logic to use wallclock seconds rather than loop iterations for timeouts --- src/input/input.cpp | 32 +++++++++++++++++++++++--------- src/input/input.h | 2 +- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index 0a062392..a86fe3cd 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -1022,28 +1022,42 @@ namespace Mist{ } void Input::removeUnused(){ + uint64_t cTime = Util::bootSecs(); std::set validTracks = M.getValidTracks(); + std::map > checkedPages; for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ Util::RelAccX &tPages = meta.pages(*it); for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); + checkedPages[*it].insert(pageNum); if (pageCounter[*it].count(pageNum)){ // If the page is still being written to, reset the counter rather than potentially unloading it if (isCurrentLivePage(*it, pageNum)){ - pageCounter[*it][pageNum] = DEFAULT_PAGE_TIMEOUT; + pageCounter[*it][pageNum] = cTime; continue; } - --pageCounter[*it][pageNum]; - if (!pageCounter[*it][pageNum]){ + if (cTime > pageCounter[*it][pageNum] + DEFAULT_PAGE_TIMEOUT){ pageCounter[*it].erase(pageNum); bufferRemove(*it, pageNum); } - } - else{ - pageCounter[*it][pageNum] = DEFAULT_PAGE_TIMEOUT; + }else{ + pageCounter[*it][pageNum] = cTime; } } } + //Check pages we buffered but forgot about + for (std::map >::iterator it = pageCounter.begin(); + it != pageCounter.end(); it++){ + for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ + if (!checkedPages.count(it->first) || !checkedPages[it->first].count(it2->first)){ + INFO_MSG("Deleting forgotten page %zu:%" PRIu32, it->first, it2->first); + bufferRemove(it->first, it2->first); + it->second.erase(it2); + it2 = it->second.begin(); + } + } + } + } std::string formatGUID(const std::string &val){ @@ -1290,8 +1304,8 @@ namespace Mist{ } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); if (isBuffered(idx, pageNumber, meta)){ - // Mark the page for removal after 15 seconds of no one watching it - pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; + // Mark the page as still actively requested + pageCounter[idx][pageNumber] = Util::bootSecs(); DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32 ". Cancelling bufferFrame", idx, keyNum, pageNumber); @@ -1432,7 +1446,7 @@ namespace Mist{ idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer); INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, tPages.getInt("parts", pageIdx), byteCounter); - pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; + pageCounter[idx][pageNumber] = Util::bootSecs(); return true; } diff --git a/src/input/input.h b/src/input/input.h index 28880337..8d7e8891 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -87,7 +87,7 @@ namespace Mist{ IPC::sharedPage streamStatus; - std::map > pageCounter; + std::map > pageCounter; static Input *singleton;