diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index a9ff6b2b..b547f7a4 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -2282,47 +2282,8 @@ namespace DTSC{ t.fragments.deleteRecords(1); setMissedFragments(trackIdx, getMissedFragments(trackIdx) + 1); } - // Check if any page contains the just-deleted key - for (uint64_t i = t.pages.getDeleted(); i < t.pages.getEndPos(); i++){ - - uint64_t thisKey = t.pages.getInt("firstkey", i); - uint64_t avtmp = t.pages.getInt("avail", i); - uint64_t keycount = t.pages.getInt("keycount", i); - DONTEVEN_MSG("Found page idx=%lu number=%lu avail=%lu, keycount=%lu", i, thisKey, avtmp, keycount); - - uint64_t pageNum = t.pages.getInt("firstkey", i); - if (pageNum > deletedKeyNum) continue; - uint64_t keyCount = t.pages.getInt("keycount", i); - if (keyCount){ - if (pageNum + keyCount - 1 < deletedKeyNum) continue; - }else if (pageNum < deletedKeyNum) continue; - - uint64_t avail = t.pages.getInt("avail", i); - if (avail){ - break; - } - // 'Resize' the page to whatever keys are still available - if (t.pages.getInt("keycount", i) > 1){ - DONTEVEN_MSG("Key count %lu -> %lu", t.pages.getInt("keycount", i), t.pages.getInt("keycount", i) - 1); - t.pages.setInt("keycount", t.pages.getInt("keycount", i) - 1, i); - DONTEVEN_MSG("Part count %lu -> %lu", t.pages.getInt("parts", i), t.pages.getInt("parts", i) - deletedPartCount); - t.pages.setInt("parts", t.pages.getInt("parts", i) - deletedPartCount, i); - DONTEVEN_MSG("First key %lu -> %lu", t.pages.getInt("firstkey", i), t.pages.getInt("firstkey", i) + 1); - t.pages.setInt("firstkey", t.pages.getInt("firstkey", i) + 1, i); - break; - } - // Unload the page if there are no more keys left on it - // Initialize the correct page, make it master so it gets cleaned up when leaving scope. - char thisPageName[NAME_BUFFER_SIZE]; - snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx, - (uint32_t)t.pages.getInt("firstkey", t.pages.getDeleted())); - IPC::sharedPage p(thisPageName, 20971520, false, false); - p.master = true; - - // Then delete the page entry - t.pages.deleteRecords(1); - break; - } + // Note: pages are not deleted here, but instead deleted by Input::removeUnused (or a child-override) + // This is fine, as pages can (and will, at least temporarily) exist for data we no longer fully have in stream metadata setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted())); if (resizeLock){resizeLock.unlink();} return true; diff --git a/src/input/input.cpp b/src/input/input.cpp index 89e4d3e4..834e785f 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -43,27 +43,32 @@ namespace Mist{ if (endKey > key + 1000){endKey = key + 1000;} DONTEVEN_MSG("User with ID:%zu is on %zu:%zu -> %zu (timestamp %" PRIu64 ")", id, track, key, endKey, time); for (size_t i = key; i <= endKey; ){ - - const Util::RelAccX &tPages = M.pages(track); + Util::RelAccXFieldData firstkeyEntry = tPages.getFieldData("firstkey"); + Util::RelAccXFieldData keyCountEntry = tPages.getFieldData("keycount"); if (!tPages.getEndPos()){return;} DTSC::Keys keys(M.keys(track)); if (i > keys.getEndValid()){return;} - uint64_t pageIdx = 0; + bool found = false; + uint64_t cnt = 1, pageNumber = 0; for (uint64_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){ - uint64_t thisKey = tPages.getInt("firstkey", j); - if (thisKey > i) break; - pageIdx = j; + pageNumber = tPages.getInt(firstkeyEntry, j); + cnt = tPages.getInt(keyCountEntry, j); + if (pageNumber <= i && pageNumber + cnt > i){ + found = true; + break; + } } - uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); + // Could not find key? Then we're done here. + if (!found){return;} uint64_t pageTime = M.getTimeForKeyIndex(track, pageNumber); - if (pageTime < time){ + if (pageTime <= time){ keyLoadPriority[trackKey(track, pageNumber)] += 10000; }else{ keyLoadPriority[trackKey(track, pageNumber)] += 600 - (pageTime - time) / 1000; } - uint64_t cnt = tPages.getInt("keycount", pageIdx); - if (pageNumber + cnt <= i){return;} + // Make sure we always progress, even in edge cases where there is no full key buffered yet + if (!cnt){cnt = 1;} i = pageNumber + cnt; } //Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM. @@ -1663,7 +1668,7 @@ namespace Mist{ } } } - bufferFinalize(idx, page); + page.close(); bufferTimer = Util::bootMS() - bufferTimer; if (packCounter < tPages.getInt("parts", pageIdx)){ FAIL_MSG("Track %zu, page %" PRIu32 " (" PRETTY_PRINT_MSTIME " - " PRETTY_PRINT_MSTIME ") NOT FULLY buffered in %" PRIu64 "ms - erasing for later retry", diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 088494de..a48faf57 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -423,6 +423,13 @@ namespace Mist{ while (keys.getValidCount() > 1 && (M.getLastms(i) - keys.getTime(keys.getFirstValid() + 1)) > bufferTime){ if (!removeKey(i)){break;} } + Util::RelAccX &tPages = meta.pages(i); + Util::RelAccXFieldData firstKeyEnt = tPages.getFieldData("firstkey"); + Util::RelAccXFieldData keyCount = tPages.getFieldData("keycount"); + for (uint32_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){ + if (tPages.getInt(firstKeyEnt, j) + tPages.getInt(keyCount, j) > firstKey){break;} + bufferRemove(i, tPages.getInt(firstKeyEnt, j), j); + } } updateMeta(); } diff --git a/src/io.cpp b/src/io.cpp index 14efa588..0c32590b 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -134,9 +134,8 @@ namespace Mist{ ///\param tid The trackid to remove the page from ///\param pageNumber The number of the page to remove void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx){ - if (!standAlone){// A different process will handle this for us - return; - } + if (!standAlone){return;}// A different process will handle this for us + Util::RelAccX &tPages = meta.pages(idx); Util::RelAccXFieldData firstKey = tPages.getFieldData("firstkey"); @@ -150,7 +149,7 @@ namespace Mist{ } // If the given pagenumber is not a valid page on this track, do nothing if (pageIdx == INVALID_KEY_NUM){ - INFO_MSG("Can't remove page %" PRIu32 " on track %zu as it is not a valid page number.", pageNumber, idx); + WARN_MSG("Can't remove page %" PRIu32 " on track %zu as it is not a valid page number.", pageNumber, idx); return; } @@ -173,12 +172,11 @@ namespace Mist{ #endif toErase.master = true; // Update the page on the tracks index page if needed - uint64_t firstKeyNum = tPages.getInt("firstkey", pageIdx); + uint64_t firstKeyNum = tPages.getInt(firstKey, pageIdx); uint64_t keyCount = tPages.getInt("keycount", pageIdx); - uint64_t partCount = tPages.getInt("parts", pageIdx); uint64_t newFirstKey = M.getKeys(idx).getFirstValid(); if (firstKeyNum + keyCount <= newFirstKey){ - INFO_MSG("Page %" PRIu64 " track %zu has expired during the time it was kept cached in memory (contains up to key %lu, but the earliest key is %lu). Removing it now", firstKeyNum, idx, firstKeyNum + keyCount, newFirstKey); + HIGH_MSG("Page %" PRIu64 " track %zu has expired during the time it was kept cached in memory (contains up to key %lu, but the earliest key is %lu). Removing it now", firstKeyNum, idx, firstKeyNum + keyCount, newFirstKey); tPages.setInt("keycount", 0, pageIdx); //< Force removal by having avail and keycount both 0 }else if (firstKeyNum < newFirstKey){ uint64_t newPartCount = 0; @@ -186,7 +184,8 @@ namespace Mist{ for (uint32_t i = newFirstKey; i < firstKeyNum + keyCount; i++){ newPartCount += keys.getParts(i); } - MEDIUM_MSG("Adjusting meta info for page %lu track %lu before unloading it. First key %lu -> %lu. Key count %lu -> %lu. Part count %lu -> %lu", firstKeyNum, idx, firstKeyNum, newFirstKey, keyCount, keyCount - (newFirstKey - firstKeyNum), partCount, newPartCount); + uint64_t partCount = tPages.getInt("parts", pageIdx); + HIGH_MSG("Adjusting meta info for page %lu track %lu before unloading it. First key %lu -> %lu. Key count %lu -> %lu. Part count %lu -> %lu", firstKeyNum, idx, firstKeyNum, newFirstKey, keyCount, keyCount - (newFirstKey - firstKeyNum), partCount, newPartCount); tPages.setInt("keycount", keyCount - (newFirstKey - firstKeyNum), pageIdx); tPages.setInt("parts", newPartCount, pageIdx); tPages.setInt("firstkey", newFirstKey, pageIdx); @@ -328,37 +327,7 @@ namespace Mist{ /// \param idx The track index of the page to finalize void InOutBase::liveFinalize(size_t idx){ if (!livePage.count(idx)){return;} - bufferFinalize(idx, livePage[idx]); - } - - /// Wraps up the buffering of a shared memory data page - /// \param idx The track index of the page to finalize - void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){ - // If no page is open, do nothing - if (!page){ - WARN_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx); - return; - } - -/// \TODO META Re-Implement for Cygwin/Win32! -#if defined(__CYGWIN__) || defined(_WIN32) - /* - static int wipedAlready = 0; - if (lowest && lowest > wipedAlready + 1){ - for (int curr = wipedAlready + 1; curr < lowest; ++curr){ - char pageId[NAME_BUFFER_SIZE]; - snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, curr); - IPC::releasePage(std::string(pageId)); - } - } - // Print a message about registering the page or not. - if (inserted){IPC::preservePage(curPage[idx].name);} - */ -#endif - // Close our link to the page. This will NOT destroy the shared page, as we've set master to - // false upon construction Note: if there was a registering failure above, this WILL destroy the - // shared page, to prevent a memory leak - page.close(); + livePage[idx].close(); } /// Buffers a live packet to a page. @@ -492,7 +461,7 @@ namespace Mist{ tPages.setInt("parts", 0, endPage); tPages.setInt("lastkeytime", 0, endPage); tPages.addRecords(1); - if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);} + if (livePage[packTrack]){livePage[packTrack].close();} DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack], aMeta)){ // if this fails, return instantly without actually buffering the packet @@ -513,7 +482,7 @@ namespace Mist{ if (!livePage[packTrack].exists()){ WARN_MSG("Data page '%s' was deleted - forcing source shutdown to prevent unstable state", livePage[packTrack].name.c_str()); Util::logExitReason(ER_SHM_LOST, "data page was deleted, forcing shutdown to prevent unstable state"); - bufferFinalize(packTrack, livePage[packTrack]); + livePage[packTrack].close(); kill(getpid(), SIGINT); return; } diff --git a/src/io.h b/src/io.h index 8c099cb3..cef6b7db 100644 --- a/src/io.h +++ b/src/io.h @@ -20,7 +20,6 @@ namespace Mist{ size_t getMainSelectedTrack(); bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta); - void bufferFinalize(size_t idx, IPC::sharedPage & page); void liveFinalize(size_t idx); bool isRecentLivePage(size_t idx, uint32_t pageNumber, uint64_t maxAge); void bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx = INVALID_KEY_NUM);