Fix buffer behaviour after changes for MistInHLS

This commit is contained in:
Thulinma 2023-10-23 14:20:30 +02:00
parent 9e30444476
commit 26746c139d
5 changed files with 35 additions and 94 deletions

View file

@ -2282,47 +2282,8 @@ namespace DTSC{
t.fragments.deleteRecords(1);
setMissedFragments(trackIdx, getMissedFragments(trackIdx) + 1);
}
// Check if any page contains the just-deleted key
for (uint64_t i = t.pages.getDeleted(); i < t.pages.getEndPos(); i++){
uint64_t thisKey = t.pages.getInt("firstkey", i);
uint64_t avtmp = t.pages.getInt("avail", i);
uint64_t keycount = t.pages.getInt("keycount", i);
DONTEVEN_MSG("Found page idx=%lu number=%lu avail=%lu, keycount=%lu", i, thisKey, avtmp, keycount);
uint64_t pageNum = t.pages.getInt("firstkey", i);
if (pageNum > deletedKeyNum) continue;
uint64_t keyCount = t.pages.getInt("keycount", i);
if (keyCount){
if (pageNum + keyCount - 1 < deletedKeyNum) continue;
}else if (pageNum < deletedKeyNum) continue;
uint64_t avail = t.pages.getInt("avail", i);
if (avail){
break;
}
// 'Resize' the page to whatever keys are still available
if (t.pages.getInt("keycount", i) > 1){
DONTEVEN_MSG("Key count %lu -> %lu", t.pages.getInt("keycount", i), t.pages.getInt("keycount", i) - 1);
t.pages.setInt("keycount", t.pages.getInt("keycount", i) - 1, i);
DONTEVEN_MSG("Part count %lu -> %lu", t.pages.getInt("parts", i), t.pages.getInt("parts", i) - deletedPartCount);
t.pages.setInt("parts", t.pages.getInt("parts", i) - deletedPartCount, i);
DONTEVEN_MSG("First key %lu -> %lu", t.pages.getInt("firstkey", i), t.pages.getInt("firstkey", i) + 1);
t.pages.setInt("firstkey", t.pages.getInt("firstkey", i) + 1, i);
break;
}
// Unload the page if there are no more keys left on it
// Initialize the correct page, make it master so it gets cleaned up when leaving scope.
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
(uint32_t)t.pages.getInt("firstkey", t.pages.getDeleted()));
IPC::sharedPage p(thisPageName, 20971520, false, false);
p.master = true;
// Then delete the page entry
t.pages.deleteRecords(1);
break;
}
// Note: pages are not deleted here, but instead deleted by Input::removeUnused (or a child-override)
// This is fine, as pages can (and will, at least temporarily) exist for data we no longer fully have in stream metadata
setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted()));
if (resizeLock){resizeLock.unlink();}
return true;

View file

@ -43,27 +43,32 @@ namespace Mist{
if (endKey > key + 1000){endKey = key + 1000;}
DONTEVEN_MSG("User with ID:%zu is on %zu:%zu -> %zu (timestamp %" PRIu64 ")", id, track, key, endKey, time);
for (size_t i = key; i <= endKey; ){
const Util::RelAccX &tPages = M.pages(track);
Util::RelAccXFieldData firstkeyEntry = tPages.getFieldData("firstkey");
Util::RelAccXFieldData keyCountEntry = tPages.getFieldData("keycount");
if (!tPages.getEndPos()){return;}
DTSC::Keys keys(M.keys(track));
if (i > keys.getEndValid()){return;}
uint64_t pageIdx = 0;
bool found = false;
uint64_t cnt = 1, pageNumber = 0;
for (uint64_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){
uint64_t thisKey = tPages.getInt("firstkey", j);
if (thisKey > i) break;
pageIdx = j;
pageNumber = tPages.getInt(firstkeyEntry, j);
cnt = tPages.getInt(keyCountEntry, j);
if (pageNumber <= i && pageNumber + cnt > i){
found = true;
break;
}
uint32_t pageNumber = tPages.getInt("firstkey", pageIdx);
}
// Could not find key? Then we're done here.
if (!found){return;}
uint64_t pageTime = M.getTimeForKeyIndex(track, pageNumber);
if (pageTime < time){
if (pageTime <= time){
keyLoadPriority[trackKey(track, pageNumber)] += 10000;
}else{
keyLoadPriority[trackKey(track, pageNumber)] += 600 - (pageTime - time) / 1000;
}
uint64_t cnt = tPages.getInt("keycount", pageIdx);
if (pageNumber + cnt <= i){return;}
// Make sure we always progress, even in edge cases where there is no full key buffered yet
if (!cnt){cnt = 1;}
i = pageNumber + cnt;
}
//Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM.
@ -1663,7 +1668,7 @@ namespace Mist{
}
}
}
bufferFinalize(idx, page);
page.close();
bufferTimer = Util::bootMS() - bufferTimer;
if (packCounter < tPages.getInt("parts", pageIdx)){
FAIL_MSG("Track %zu, page %" PRIu32 " (" PRETTY_PRINT_MSTIME " - " PRETTY_PRINT_MSTIME ") NOT FULLY buffered in %" PRIu64 "ms - erasing for later retry",

View file

@ -423,6 +423,13 @@ namespace Mist{
while (keys.getValidCount() > 1 && (M.getLastms(i) - keys.getTime(keys.getFirstValid() + 1)) > bufferTime){
if (!removeKey(i)){break;}
}
Util::RelAccX &tPages = meta.pages(i);
Util::RelAccXFieldData firstKeyEnt = tPages.getFieldData("firstkey");
Util::RelAccXFieldData keyCount = tPages.getFieldData("keycount");
for (uint32_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){
if (tPages.getInt(firstKeyEnt, j) + tPages.getInt(keyCount, j) > firstKey){break;}
bufferRemove(i, tPages.getInt(firstKeyEnt, j), j);
}
}
updateMeta();
}

View file

@ -134,9 +134,8 @@ namespace Mist{
///\param tid The trackid to remove the page from
///\param pageNumber The number of the page to remove
void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx){
if (!standAlone){// A different process will handle this for us
return;
}
if (!standAlone){return;}// A different process will handle this for us
Util::RelAccX &tPages = meta.pages(idx);
Util::RelAccXFieldData firstKey = tPages.getFieldData("firstkey");
@ -150,7 +149,7 @@ namespace Mist{
}
// If the given pagenumber is not a valid page on this track, do nothing
if (pageIdx == INVALID_KEY_NUM){
INFO_MSG("Can't remove page %" PRIu32 " on track %zu as it is not a valid page number.", pageNumber, idx);
WARN_MSG("Can't remove page %" PRIu32 " on track %zu as it is not a valid page number.", pageNumber, idx);
return;
}
@ -173,12 +172,11 @@ namespace Mist{
#endif
toErase.master = true;
// Update the page on the tracks index page if needed
uint64_t firstKeyNum = tPages.getInt("firstkey", pageIdx);
uint64_t firstKeyNum = tPages.getInt(firstKey, pageIdx);
uint64_t keyCount = tPages.getInt("keycount", pageIdx);
uint64_t partCount = tPages.getInt("parts", pageIdx);
uint64_t newFirstKey = M.getKeys(idx).getFirstValid();
if (firstKeyNum + keyCount <= newFirstKey){
INFO_MSG("Page %" PRIu64 " track %zu has expired during the time it was kept cached in memory (contains up to key %lu, but the earliest key is %lu). Removing it now", firstKeyNum, idx, firstKeyNum + keyCount, newFirstKey);
HIGH_MSG("Page %" PRIu64 " track %zu has expired during the time it was kept cached in memory (contains up to key %lu, but the earliest key is %lu). Removing it now", firstKeyNum, idx, firstKeyNum + keyCount, newFirstKey);
tPages.setInt("keycount", 0, pageIdx); //< Force removal by having avail and keycount both 0
}else if (firstKeyNum < newFirstKey){
uint64_t newPartCount = 0;
@ -186,7 +184,8 @@ namespace Mist{
for (uint32_t i = newFirstKey; i < firstKeyNum + keyCount; i++){
newPartCount += keys.getParts(i);
}
MEDIUM_MSG("Adjusting meta info for page %lu track %lu before unloading it. First key %lu -> %lu. Key count %lu -> %lu. Part count %lu -> %lu", firstKeyNum, idx, firstKeyNum, newFirstKey, keyCount, keyCount - (newFirstKey - firstKeyNum), partCount, newPartCount);
uint64_t partCount = tPages.getInt("parts", pageIdx);
HIGH_MSG("Adjusting meta info for page %lu track %lu before unloading it. First key %lu -> %lu. Key count %lu -> %lu. Part count %lu -> %lu", firstKeyNum, idx, firstKeyNum, newFirstKey, keyCount, keyCount - (newFirstKey - firstKeyNum), partCount, newPartCount);
tPages.setInt("keycount", keyCount - (newFirstKey - firstKeyNum), pageIdx);
tPages.setInt("parts", newPartCount, pageIdx);
tPages.setInt("firstkey", newFirstKey, pageIdx);
@ -328,37 +327,7 @@ namespace Mist{
/// \param idx The track index of the page to finalize
void InOutBase::liveFinalize(size_t idx){
if (!livePage.count(idx)){return;}
bufferFinalize(idx, livePage[idx]);
}
/// Wraps up the buffering of a shared memory data page
/// \param idx The track index of the page to finalize
void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){
// If no page is open, do nothing
if (!page){
WARN_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx);
return;
}
/// \TODO META Re-Implement for Cygwin/Win32!
#if defined(__CYGWIN__) || defined(_WIN32)
/*
static int wipedAlready = 0;
if (lowest && lowest > wipedAlready + 1){
for (int curr = wipedAlready + 1; curr < lowest; ++curr){
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, curr);
IPC::releasePage(std::string(pageId));
}
}
// Print a message about registering the page or not.
if (inserted){IPC::preservePage(curPage[idx].name);}
*/
#endif
// Close our link to the page. This will NOT destroy the shared page, as we've set master to
// false upon construction Note: if there was a registering failure above, this WILL destroy the
// shared page, to prevent a memory leak
page.close();
livePage[idx].close();
}
/// Buffers a live packet to a page.
@ -492,7 +461,7 @@ namespace Mist{
tPages.setInt("parts", 0, endPage);
tPages.setInt("lastkeytime", 0, endPage);
tPages.addRecords(1);
if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);}
if (livePage[packTrack]){livePage[packTrack].close();}
DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack);
if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack], aMeta)){
// if this fails, return instantly without actually buffering the packet
@ -513,7 +482,7 @@ namespace Mist{
if (!livePage[packTrack].exists()){
WARN_MSG("Data page '%s' was deleted - forcing source shutdown to prevent unstable state", livePage[packTrack].name.c_str());
Util::logExitReason(ER_SHM_LOST, "data page was deleted, forcing shutdown to prevent unstable state");
bufferFinalize(packTrack, livePage[packTrack]);
livePage[packTrack].close();
kill(getpid(), SIGINT);
return;
}

View file

@ -20,7 +20,6 @@ namespace Mist{
size_t getMainSelectedTrack();
bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta);
void bufferFinalize(size_t idx, IPC::sharedPage & page);
void liveFinalize(size_t idx);
bool isRecentLivePage(size_t idx, uint32_t pageNumber, uint64_t maxAge);
void bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx = INVALID_KEY_NUM);