Track mapping and deletion fixes

This commit is contained in:
Thulinma 2017-11-28 11:08:30 +01:00
parent c1400fec42
commit 15801aa7b7
3 changed files with 44 additions and 40 deletions

View file

@ -76,18 +76,18 @@ namespace Mist {
}
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
int * tmpOffset = (int *)(nProxy.metaPages[it->first].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) {
char * tmpOffset = nProxy.metaPages[it->first].mapped + i;
if (Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0) {
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
unsigned long keyNum = Bit::btohl(tmpOffset);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) {
locations[keyNum].curOffset = 0;
}
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
locations[keyNum].keyNum = Bit::btohl(tmpOffset+4);
}
for (std::map<unsigned long, DTSCPageData>::iterator it2 = locations.begin(); it2 != locations.end(); it2++) {
char thisPageName[NAME_BUFFER_SIZE];
@ -145,11 +145,11 @@ namespace Mist {
if (indexPage.mapped){
char * mappedPointer = indexPage.mapped;
for (int j = 0; j < 8192; j += 8) {
int * tmpOffset = (int *)(mappedPointer + j);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
char * tmpOffset = mappedPointer + j;
if (Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0){
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
unsigned long keyNum = Bit::btohl(tmpOffset);
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), i, keyNum);
IPC::sharedPage erasePage(pageName, 1024, false, false);
erasePage.master = true;
@ -643,19 +643,19 @@ namespace Mist {
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
int * tmpOffset = (int *)(mappedPointer + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
char * tmpOffset = mappedPointer + i;
if (Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0) {
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
unsigned long keyNum = Bit::btohl(tmpOffset);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) {
locations[keyNum].curOffset = 0;
VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1]));
VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, Bit::btohl(tmpOffset+4));
}
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
locations[keyNum].keyNum = Bit::btohl(tmpOffset+4);
}
//Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) {

View file

@ -145,10 +145,10 @@ namespace Mist {
//NOTE: It is important that this only happens if the stream is live....
bool inserted = false;
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
if ((tmpOffset[0] == 0 && tmpOffset[1] == 0)) {
tmpOffset[0] = htonl(curPageNum[tid]);
tmpOffset[1] = htonl(1000);
char * tmpOffset = metaPages[tid].mapped + (i * 8);
if ((Bit::btohl(tmpOffset) == 0 && Bit::btohl(tmpOffset+4) == 0)) {
Bit::htobl(tmpOffset, curPageNum[tid]);
Bit::htobl(tmpOffset+4, 1000);
inserted = true;
break;
}
@ -179,10 +179,10 @@ namespace Mist {
DEBUG_MSG(DLVL_HIGH, "Removing page %lu on track %lu~>%lu from the corresponding metaPage", pageNumber, tid, mapTid);
int i = 0;
for (; i < 1024; i++) {
int * tmpOffset = (int *)(nProxy.metaPages[tid].mapped + (i * 8));
if (ntohl(tmpOffset[0]) == pageNumber) {
tmpOffset[0] = 0;
tmpOffset[1] = 0;
char * tmpOffset = nProxy.metaPages[tid].mapped + (i * 8);
if (Bit::btohl(tmpOffset) == pageNumber) {
Bit::htobl(tmpOffset, 0);
Bit::htobl(tmpOffset+4, 0);
break;
}
}
@ -239,11 +239,11 @@ namespace Mist {
//Loop over the index page
int len = metaPages[tid].len / 8;
for (int i = 0; i < len; ++i) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
unsigned int keyAmount = ntohl(tmpOffset[1]);
char * tmpOffset = metaPages[tid].mapped + (i * 8);
unsigned int keyAmount = Bit::btohl(tmpOffset+4);
if (keyAmount == 0){continue;}
//Check whether the key is on this page
unsigned int pageNum = ntohl(tmpOffset[0]);
unsigned int pageNum = Bit::btohl(tmpOffset);
if (pageNum <= keyNum && keyNum < pageNum + keyAmount) {
return pageNum;
}
@ -329,25 +329,25 @@ namespace Mist {
bool inserted = false;
int lowest = 0;
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int keyNum = ntohl(tmpOffset[0]);
int keyAmount = ntohl(tmpOffset[1]);
char * tmpOffset = metaPages[tid].mapped + (i * 8);
int keyNum = Bit::btohl(tmpOffset);
int keyAmount = Bit::btohl(tmpOffset+4);
if (!inserted){
if (myMeta.live){
if(keyNum == curPageNum[tid] && keyAmount == 1000){
tmpOffset[1] = htonl(pagesByTrack[tid][curPageNum[tid]].keyNum);
Bit::htobl(tmpOffset+4, pagesByTrack[tid][curPageNum[tid]].keyNum);
inserted = true;
}
}else{
//in case of vod, insert at the first "empty" spot
if(keyNum == 0){
tmpOffset[0] = htonl(curPageNum[tid]);
tmpOffset[1] = htonl(pagesByTrack[tid][curPageNum[tid]].keyNum);
Bit::htobl(tmpOffset, curPageNum[tid]);
Bit::htobl(tmpOffset+4, pagesByTrack[tid][curPageNum[tid]].keyNum);
inserted = true;
}
}
}
keyNum = ntohl(tmpOffset[0]);
keyNum = Bit::btohl(tmpOffset);
if (!keyNum) continue;
if (!lowest || keyNum < lowest){
lowest = keyNum;

View file

@ -398,10 +398,10 @@ namespace Mist{
if (!nProxy.metaPages[trackId].mapped){return -1;}
int len = nProxy.metaPages[trackId].len / 8;
for (int i = 0; i < len; i++){
int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8));
long amountKey = ntohl(tmpOffset[1]);
char * tmpOffset = nProxy.metaPages[trackId].mapped + (i * 8);
long amountKey = Bit::btohl(tmpOffset+4);
if (amountKey == 0){continue;}
long tmpKey = ntohl(tmpOffset[0]);
long tmpKey = Bit::btohl(tmpOffset);
if (tmpKey <= keyNum && ((tmpKey?tmpKey:1) + amountKey) > keyNum){
return tmpKey;
}
@ -420,10 +420,10 @@ namespace Mist{
int len = nProxy.metaPages[trackId].len / 8;
int highest = -1;
for (int i = 0; i < len; i++){
int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8));
long amountKey = ntohl(tmpOffset[1]);
char * tmpOffset = nProxy.metaPages[trackId].mapped + (i * 8);
long amountKey = Bit::btohl(tmpOffset+4);
if (amountKey == 0){continue;}
long tmpKey = ntohl(tmpOffset[0]);
long tmpKey = Bit::btohl(tmpOffset);
if (tmpKey > highest){highest = tmpKey;}
}
return highest;
@ -856,11 +856,15 @@ namespace Mist{
return false;
}
DONTEVEN_MSG("Loading track %u (next=%lu), %llu ms", nxt.tid, nxtKeyNum[nxt.tid], nxt.time);
DONTEVEN_MSG("Loading track %u (next=%lu), %llu ms, %llub", nxt.tid, nxtKeyNum[nxt.tid], nxt.time, nxt.offset);
//if we're going to read past the end of the data page, load the next page
//this only happens for VoD
if (nxt.offset >= nProxy.curPage[nxt.tid].len){
if (myMeta.vod && nxt.time >= myMeta.tracks[nxt.tid].lastms){
dropTrack(nxt.tid, "end of VoD track reached", false);
return false;
}
if (thisPacket){
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
}
@ -876,7 +880,7 @@ namespace Mist{
buffer.insert(nxt);
}
}else{
dropTrack(nxt.tid, "page load failure", true);
dropTrack(nxt.tid, "VoD page load failure");
}
return false;
}
@ -931,7 +935,7 @@ namespace Mist{
MEDIUM_MSG("Next page for track %u starts at %llu.", nxt.tid, nxt.time);
}
}else{
dropTrack(nxt.tid, "page load failure");
dropTrack(nxt.tid, "next page load failure");
}
return false;
}