From 15801aa7b7695e1f624937e4a78ee2229e25209c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 28 Nov 2017 11:08:30 +0100 Subject: [PATCH] Track mapping and deletion fixes --- src/input/input_buffer.cpp | 26 +++++++++++++------------- src/io.cpp | 36 ++++++++++++++++++------------------ src/output/output.cpp | 22 +++++++++++++--------- 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index eb173e55..eeff523d 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -76,18 +76,18 @@ namespace Mist { } //First detect all entries on metaPage for (int i = 0; i < 8192; i += 8) { - int * tmpOffset = (int *)(nProxy.metaPages[it->first].mapped + i); - if (tmpOffset[0] == 0 && tmpOffset[1] == 0) { + char * tmpOffset = nProxy.metaPages[it->first].mapped + i; + if (Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0) { continue; - } - unsigned long keyNum = ntohl(tmpOffset[0]); + } + unsigned long keyNum = Bit::btohl(tmpOffset); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. if (!locations.count(keyNum)) { locations[keyNum].curOffset = 0; } locations[keyNum].pageNum = keyNum; - locations[keyNum].keyNum = ntohl(tmpOffset[1]); + locations[keyNum].keyNum = Bit::btohl(tmpOffset+4); } for (std::map::iterator it2 = locations.begin(); it2 != locations.end(); it2++) { char thisPageName[NAME_BUFFER_SIZE]; @@ -145,11 +145,11 @@ namespace Mist { if (indexPage.mapped){ char * mappedPointer = indexPage.mapped; for (int j = 0; j < 8192; j += 8) { - int * tmpOffset = (int *)(mappedPointer + j); - if (tmpOffset[0] == 0 && tmpOffset[1] == 0){ + char * tmpOffset = mappedPointer + j; + if (Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0){ continue; } - unsigned long keyNum = ntohl(tmpOffset[0]); + unsigned long keyNum = Bit::btohl(tmpOffset); snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), i, keyNum); IPC::sharedPage erasePage(pageName, 1024, false, false); erasePage.master = true; @@ -643,19 +643,19 @@ namespace Mist { //First detect all entries on metaPage for (int i = 0; i < 8192; i += 8) { - int * tmpOffset = (int *)(mappedPointer + i); - if (tmpOffset[0] == 0 && tmpOffset[1] == 0){ + char * tmpOffset = mappedPointer + i; + if (Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0) { continue; } - unsigned long keyNum = ntohl(tmpOffset[0]); + unsigned long keyNum = Bit::btohl(tmpOffset); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. if (!locations.count(keyNum)) { locations[keyNum].curOffset = 0; - VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); + VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, Bit::btohl(tmpOffset+4)); } locations[keyNum].pageNum = keyNum; - locations[keyNum].keyNum = ntohl(tmpOffset[1]); + locations[keyNum].keyNum = Bit::btohl(tmpOffset+4); } //Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest for (std::map::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) { diff --git a/src/io.cpp b/src/io.cpp index 54d826e3..841d9dc3 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -145,10 +145,10 @@ namespace Mist { //NOTE: It is important that this only happens if the stream is live.... bool inserted = false; for (int i = 0; i < 1024; i++) { - int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8)); - if ((tmpOffset[0] == 0 && tmpOffset[1] == 0)) { - tmpOffset[0] = htonl(curPageNum[tid]); - tmpOffset[1] = htonl(1000); + char * tmpOffset = metaPages[tid].mapped + (i * 8); + if ((Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0)) { + Bit::htobl(tmpOffset, curPageNum[tid]); + Bit::htobl(tmpOffset+4, 1000); inserted = true; break; } @@ -179,10 +179,10 @@ namespace Mist { DEBUG_MSG(DLVL_HIGH, "Removing page %lu on track %lu~>%lu from the corresponding metaPage", pageNumber, tid, mapTid); int i = 0; for (; i < 1024; i++) { - int * tmpOffset = (int *)(nProxy.metaPages[tid].mapped + (i * 8)); - if (ntohl(tmpOffset[0]) == pageNumber) { - tmpOffset[0] = 0; - tmpOffset[1] = 0; + char * tmpOffset = nProxy.metaPages[tid].mapped + (i * 8); + if (Bit::btohl(tmpOffset) == pageNumber) { + Bit::htobl(tmpOffset, 0); + Bit::htobl(tmpOffset+4, 0); break; } } @@ -239,11 +239,11 @@ namespace Mist { //Loop over the index page int len = metaPages[tid].len / 8; for (int i = 0; i < len; ++i) { - int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8)); - unsigned int keyAmount = ntohl(tmpOffset[1]); + char * tmpOffset = metaPages[tid].mapped + (i * 8); + unsigned int keyAmount = Bit::btohl(tmpOffset+4); if (keyAmount == 0){continue;} //Check whether the key is on this page - unsigned int pageNum = ntohl(tmpOffset[0]); + unsigned int pageNum = Bit::btohl(tmpOffset); if (pageNum <= keyNum && keyNum < pageNum + keyAmount) { return pageNum; } @@ -329,25 +329,25 @@ namespace Mist { bool inserted = false; int lowest = 0; for (int i = 0; i < 1024; i++) { - int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8)); - int keyNum = ntohl(tmpOffset[0]); - int keyAmount = ntohl(tmpOffset[1]); + char * tmpOffset = metaPages[tid].mapped + (i * 8); + int keyNum = Bit::btohl(tmpOffset); + int keyAmount = Bit::btohl(tmpOffset+4); if (!inserted){ if (myMeta.live){ if(keyNum == curPageNum[tid] && keyAmount == 1000){ - tmpOffset[1] = htonl(pagesByTrack[tid][curPageNum[tid]].keyNum); + Bit::htobl(tmpOffset+4, pagesByTrack[tid][curPageNum[tid]].keyNum); inserted = true; } }else{ //in case of vod, insert at the first "empty" spot if(keyNum == 0){ - tmpOffset[0] = htonl(curPageNum[tid]); - tmpOffset[1] = htonl(pagesByTrack[tid][curPageNum[tid]].keyNum); + Bit::htobl(tmpOffset, curPageNum[tid]); + Bit::htobl(tmpOffset+4, pagesByTrack[tid][curPageNum[tid]].keyNum); inserted = true; } } } - keyNum = ntohl(tmpOffset[0]); + keyNum = Bit::btohl(tmpOffset); if (!keyNum) continue; if (!lowest || keyNum < lowest){ lowest = keyNum; diff --git a/src/output/output.cpp b/src/output/output.cpp index 5f4290da..4802da44 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -398,10 +398,10 @@ namespace Mist{ if (!nProxy.metaPages[trackId].mapped){return -1;} int len = nProxy.metaPages[trackId].len / 8; for (int i = 0; i < len; i++){ - int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8)); - long amountKey = ntohl(tmpOffset[1]); + char * tmpOffset = nProxy.metaPages[trackId].mapped + (i * 8); + long amountKey = Bit::btohl(tmpOffset+4); if (amountKey == 0){continue;} - long tmpKey = ntohl(tmpOffset[0]); + long tmpKey = Bit::btohl(tmpOffset); if (tmpKey <= keyNum && ((tmpKey?tmpKey:1) + amountKey) > keyNum){ return tmpKey; } @@ -420,10 +420,10 @@ namespace Mist{ int len = nProxy.metaPages[trackId].len / 8; int highest = -1; for (int i = 0; i < len; i++){ - int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8)); - long amountKey = ntohl(tmpOffset[1]); + char * tmpOffset = nProxy.metaPages[trackId].mapped + (i * 8); + long amountKey = Bit::btohl(tmpOffset+4); if (amountKey == 0){continue;} - long tmpKey = ntohl(tmpOffset[0]); + long tmpKey = Bit::btohl(tmpOffset); if (tmpKey > highest){highest = tmpKey;} } return highest; @@ -856,11 +856,15 @@ namespace Mist{ return false; } - DONTEVEN_MSG("Loading track %u (next=%lu), %llu ms", nxt.tid, nxtKeyNum[nxt.tid], nxt.time); + DONTEVEN_MSG("Loading track %u (next=%lu), %llu ms, %llub", nxt.tid, nxtKeyNum[nxt.tid], nxt.time, nxt.offset); //if we're going to read past the end of the data page, load the next page //this only happens for VoD if (nxt.offset >= nProxy.curPage[nxt.tid].len){ + if (myMeta.vod && nxt.time >= myMeta.tracks[nxt.tid].lastms){ + dropTrack(nxt.tid, "end of VoD track reached", false); + return false; + } if (thisPacket){ nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); } @@ -876,7 +880,7 @@ namespace Mist{ buffer.insert(nxt); } }else{ - dropTrack(nxt.tid, "page load failure", true); + dropTrack(nxt.tid, "VoD page load failure"); } return false; } @@ -931,7 +935,7 @@ namespace Mist{ MEDIUM_MSG("Next page for track %u starts at %llu.", nxt.tid, nxt.time); } }else{ - dropTrack(nxt.tid, "page load failure"); + dropTrack(nxt.tid, "next page load failure"); } return false; }