diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 6bd0674f..fa496c92 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -86,7 +86,7 @@ namespace Controller { configLock.wait(); //write config std::string temp = writeConf.toPacked(); - memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (unsigned long)mistConfOut.len)); + memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len)); //unlock semaphore configLock.post(); } diff --git a/src/output/output.cpp b/src/output/output.cpp index 76bc9e33..15f7f267 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -380,6 +380,23 @@ namespace Mist { return keyNo; } + int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){ + if (!indexPages.count(trackId)){ + char id[100]; + sprintf(id, "%s%lu", streamName.c_str(), trackId); + indexPages[trackId].init(id, 8 * 1024); + } + for (int i = 0; i < indexPages[trackId].len / 8; i++){ + long amountKey = ntohl((((long long int*)indexPages[trackId].mapped)[i]) & 0xFFFFFFFF); + if (amountKey == 0){continue;} + long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF); + if (tmpKey <= keyNum && (tmpKey + amountKey) > keyNum){ + return tmpKey; + } + } + return -1; + } + void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){ curPages.erase(trackId); @@ -387,43 +404,26 @@ namespace Mist { return; } DEBUG_MSG(DLVL_HIGH, "Loading track %lu, containing key %lld", trackId, keyNum); - int pageNum = -1; - int keyAmount = -1; unsigned int timeout = 0; - if (!indexPages.count(trackId)){ - char id[100]; - sprintf(id, "%s%lu", streamName.c_str(), trackId); - indexPages[trackId].init(id, 8 * 1024); - } - while (pageNum == -1 || keyAmount == -1){ - for (int i = 0; i < indexPages[trackId].len / 8; i++){ - long amountKey = ntohl((((long long int*)indexPages[trackId].mapped)[i]) & 0xFFFFFFFF); - if (amountKey == 0){continue;} - long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF); - if (tmpKey <= keyNum && (tmpKey + amountKey) > keyNum){ - pageNum = tmpKey; - keyAmount = amountKey; - break; - } + int pageNum = pageNumForKey(trackId, keyNum); + while (pageNum == -1){ + if (!timeout){ + DEBUG_MSG(DLVL_DEVEL, "Requesting/waiting for page that has key %lu:%lld...", trackId, keyNum); } - if (pageNum == -1 || keyAmount == -1){ - if (!timeout){ - DEBUG_MSG(DLVL_DEVEL, "Requesting/waiting for page that has key %lu:%lld...", trackId, keyNum); - } - if (timeout++ > 100){ - DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting."); - curPages.erase(trackId); - currKeyOpen.erase(trackId); - return; - } - if (keyNum){ - nxtKeyNum[trackId] = keyNum-1; - }else{ - nxtKeyNum[trackId] = 0; - } - stats(); - Util::sleep(100); + if (timeout++ > 100){ + DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting."); + curPages.erase(trackId); + currKeyOpen.erase(trackId); + return; } + if (keyNum){ + nxtKeyNum[trackId] = keyNum-1; + }else{ + nxtKeyNum[trackId] = 0; + } + stats(); + Util::sleep(100); + pageNum = pageNumForKey(trackId, keyNum); } if (keyNum){ @@ -580,6 +580,7 @@ namespace Mist { if (!buffer.size()){ currentPacket.null(); DEBUG_MSG(DLVL_DEVEL, "Buffer completely played out"); + onFinish(); return; } sortedPageInfo nxt = *(buffer.begin()); @@ -610,44 +611,55 @@ namespace Mist { return; } + //have we arrived at the end of the memory page? (4 zeroes mark the end) if (!memcmp(curPages[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){ - if (!currentPacket.getTime()){ + //if we don't currently know where we are, we're lost. We should drop the track. + if (!nxt.time){ DEBUG_MSG(DLVL_DEVEL, "Timeless empty packet on track %u - dropping track.", nxt.tid); prepareNext(); return; } - if (myMeta.live){ - Util::sleep(500); - updateMeta(); - if (myMeta && ++emptyCount < 20){ - if (!seek(nxt.tid, currentPacket.getTime(), true)){ - buffer.insert(nxt); - } + //if this is a live stream, we might have just reached the live point. + //check where the next key is + int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1); + //are we live, and the next key hasn't shown up on another page? then we're waiting. + if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){ + if (myMeta && ++emptyCount < 42){ + //we're waiting for new data. Simply retry. + buffer.insert(nxt); }else{ + //after ~10 seconds, give up and drop the track. DEBUG_MSG(DLVL_DEVEL, "Empty packet on track %u - could not reload, dropping track.", nxt.tid); } + //keep updating the metadata at 250ms intervals while waiting for more data + Util::sleep(250); + updateMeta(); }else{ + //if we're not live, we've simply reached the end of the page. Load the next key. loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]); nxt.offset = 0; if (curPages.count(nxt.tid) && curPages[nxt.tid].mapped){ - if (getDTSCTime(curPages[nxt.tid].mapped, nxt.offset) < nxt.time){ + unsigned long long nextTime = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset); + if (nextTime && nextTime < nxt.time){ DEBUG_MSG(DLVL_DEVEL, "Time going backwards in track %u - dropping track.", nxt.tid); }else{ - nxt.time = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset); + if (nextTime){ + nxt.time = nextTime; + } buffer.insert(nxt); + DEBUG_MSG(DLVL_MEDIUM, "Next page for track %u starts at %llu.", nxt.tid, nxt.time); } - prepareNext(); - return; + }else{ + DEBUG_MSG(DLVL_DEVEL, "Could not load next memory page for track %u - dropping track.", nxt.tid); } - DEBUG_MSG(DLVL_DEVEL, "Empty packet on track %u - dropping track.", nxt.tid); } prepareNext(); return; } currentPacket.reInit(curPages[nxt.tid].mapped + nxt.offset, 0, true); if (currentPacket){ - if (currentPacket.getTime() != nxt.time){ - DEBUG_MSG(DLVL_DEVEL, "ACTUALLY Loaded track %ld (next=%lu), %llu ms", currentPacket.getTrackId(), nxtKeyNum[nxt.tid], currentPacket.getTime()); + if (currentPacket.getTime() != nxt.time && nxt.time){ + DEBUG_MSG(DLVL_MEDIUM, "ACTUALLY Loaded track %ld (next=%lu), %llu ms", currentPacket.getTrackId(), nxtKeyNum[nxt.tid], currentPacket.getTime()); } nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, currentPacket.getTime()); emptyCount = 0; @@ -660,7 +672,12 @@ namespace Mist { } if (curPages[nxt.tid]){ if (nxt.offset < curPages[nxt.tid].len){ - nxt.time = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset); + unsigned long long nextTime = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset); + if (nextTime){ + nxt.time = nextTime; + }else{ + ++nxt.time; + } } buffer.insert(nxt); } diff --git a/src/output/output.h b/src/output/output.h index 54eee158..fafc6ca7 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -73,6 +73,7 @@ namespace Mist { private://these *should* not be messed with in child classes. std::map currKeyOpen; void loadPageForKey(long unsigned int trackId, long long int keyNum); + int pageNumForKey(long unsigned int trackId, long long int keyNum); unsigned int lastStats;///