diff --git a/lib/defines.h b/lib/defines.h index 92c8d48b..48e9c6af 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -201,7 +201,7 @@ static inline void show_stackframe(){} #define SEM_STATISTICS "/MstStat" #define SEM_USERS "/MstUser%s" //%s stream name -#define SHM_TRACK_DATA "MstData%s@%zu_%zu" //%s stream name, %zu track ID, %PRIu32 page # +#define SHM_TRACK_DATA "MstData%s@%zu_%" PRIu32 //%s stream name, %zu track ID, %PRIu32 page # // End new meta #define INPUT_USER_INTERVAL 1000 @@ -258,12 +258,9 @@ static inline void show_stackframe(){} #define STAT_EX_SIZE 177 #define PLAY_EX_SIZE 2 + 6 * SIMUL_TRACKS -#define INVALID_TRACK_ID 0xFFFFFFFF -#define INVALID_KEY_NUM 0xFFFFFFFF -#define INVALID_PAGE_NUM 0xFFFF -#define INVALID_RECORD_INDEX 0xFFFFFFFFFFFFFFFF - -#define MAX_SIZE_T 0xFFFFFFFF +#define INVALID_TRACK_ID 0xFFFFFFFFu +#define INVALID_KEY_NUM 0xFFFFFFFFu +#define INVALID_RECORD_INDEX 0xFFFFFFFFFFFFFFFFull #define NEW_TRACK_ID 0x80000000 #define QUICK_NEGOTIATE 0xC0000000 diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index ea7d4889..4f9c8096 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -2157,7 +2157,7 @@ namespace DTSC{ if (t.pages.getInt("avail", i) == 0){continue;} char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx, - t.pages.getInt("firstkey", i)); + (uint32_t)t.pages.getInt("firstkey", i)); IPC::sharedPage p(thisPageName, 20971520); p.master = true; } @@ -2201,7 +2201,7 @@ namespace DTSC{ // Initialize the correct page, make it master so it gets cleaned up when leaving scope. char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx, - t.pages.getInt("firstkey", t.pages.getDeleted())); + (uint32_t)t.pages.getInt("firstkey", t.pages.getDeleted())); IPC::sharedPage p(thisPageName, 20971520); p.master = true; @@ -3151,8 +3151,8 @@ namespace DTSC{ return pages.getInt("firstkey", res); } - /// Given a key, returns the page number that timestamp can be found on. - /// If the key is not available, returns the closest page number that is. + /// Given a key, returns the page number it can be found on. + /// If the key is not available, returns the closest page that is. size_t Meta::getPageNumberForKey(uint32_t idx, uint64_t keyNum) const{ const Util::RelAccX &pages = tracks.at(idx).pages; size_t res = pages.getStartPos(); diff --git a/src/io.cpp b/src/io.cpp index 8804f4b8..bb9f9a11 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -37,8 +37,8 @@ namespace Mist{ /// Buffering itself is done by bufferNext(). ///\param tid The trackid of the page to start buffering ///\param pageNumber The number of the page to start buffering - bool InOutBase::bufferStart(size_t idx, size_t pageNumber){ - VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %zu", streamName.c_str(), idx, pageNumber); + bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber){ + VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %" PRIu32, streamName.c_str(), idx, pageNumber); // Initialize the stream metadata if it does not yet exist #ifndef TSLIVE_INPUT if (!meta){meta.reInit(streamName);} @@ -52,15 +52,15 @@ namespace Mist{ // If we are currently buffering a page, abandon it completely and print a message about this // This page will NEVER be deleted, unless we open it again later. if (curPage.count(idx)){ - WARN_MSG("Abandoning current page (%zu) for track %zu", curPageNum[idx], idx); + WARN_MSG("Abandoning current page (%" PRIu32 ") for track %zu", curPageNum[idx], idx); curPage.erase(idx); curPageNum.erase(idx); } Util::RelAccX &tPages = meta.pages(idx); - size_t pageIdx = INVALID_PAGE_NUM; - for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + uint32_t pageIdx = INVALID_KEY_NUM; + for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt("firstkey", i) == pageNumber){ pageIdx = i; break; @@ -68,10 +68,10 @@ namespace Mist{ } // If this is not a valid page number on this track, stop buffering this page. - if (pageIdx == INVALID_PAGE_NUM){ - WARN_MSG("Aborting page buffer start: %zu is not a valid page number on track %zu.", pageNumber, idx); + if (pageIdx == INVALID_KEY_NUM){ + WARN_MSG("Aborting page buffer start: %" PRIu32 " is not a valid page number on track %zu.", pageNumber, idx); std::stringstream test; - for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ test << tPages.getInt("firstkey", i) << " "; } INFO_MSG("Valid page numbers: %s", test.str().c_str()); @@ -81,7 +81,7 @@ namespace Mist{ // If the page is already buffered, ignore this request if (isBuffered(idx, pageNumber)){ - INFO_MSG("Page %zu on track %zu already buffered", pageNumber, idx); + INFO_MSG("Page %" PRIu32 " on track %zu already buffered", pageNumber, idx); ///\return false if the page was already buffered. return false; } @@ -100,7 +100,7 @@ namespace Mist{ // Set the current offset to 0, to allow for using it in bufferNext() tPages.setInt("avail", 0, pageIdx); - HIGH_MSG("Start buffering page %zu on track %zu successful", pageNumber, idx); + HIGH_MSG("Start buffering page %" PRIu32 " on track %zu successful", pageNumber, idx); return true; } @@ -109,26 +109,26 @@ namespace Mist{ /// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function. ///\param tid The trackid to remove the page from ///\param pageNumber The number of the page to remove - void InOutBase::bufferRemove(size_t idx, size_t pageNumber){ + void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber){ if (!standAlone){// A different process will handle this for us return; } Util::RelAccX &tPages = meta.pages(idx); - size_t pageIdx = INVALID_PAGE_NUM; - for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + uint32_t pageIdx = INVALID_KEY_NUM; + for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt("firstkey", i) == pageNumber){ pageIdx = i; break; } } // If the given pagenumber is not a valid page on this track, do nothing - if (pageIdx == INVALID_PAGE_NUM){ - INFO_MSG("Can't remove page %zu on track %zu as it is not a valid page number.", pageNumber, idx); + if (pageIdx == INVALID_KEY_NUM){ + INFO_MSG("Can't remove page %" PRIu32 " on track %zu as it is not a valid page number.", pageNumber, idx); return; } - HIGH_MSG("Removing page %zu on track %zu from the corresponding metaPage", pageNumber, idx); + HIGH_MSG("Removing page %" PRIu32 " on track %zu from the corresponding metaPage", pageNumber, idx); tPages.setInt("avail", 0, pageIdx); // Open the correct page @@ -161,7 +161,7 @@ namespace Mist{ /// Returns the pagenumber where this key is buffered on ///\param tid The trackid on which to locate the key ///\param keyNum The number of the keyframe to find - size_t InOutBase::bufferedOnPage(size_t idx, size_t keyNum){ + uint32_t InOutBase::bufferedOnPage(size_t idx, uint32_t keyNum){ Util::RelAccX &tPages = meta.pages(idx); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ @@ -206,8 +206,8 @@ namespace Mist{ IPC::sharedPage &myPage = curPage[packTrack]; Util::RelAccX &tPages = meta.pages(packTrack); - size_t pageIdx = 0; - size_t currPagNum = curPageNum[packTrack]; + uint32_t pageIdx = 0; + uint32_t currPagNum = curPageNum[packTrack]; Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt(firstkey, i) == currPagNum){ @@ -220,7 +220,7 @@ namespace Mist{ uint64_t pageSize = tPages.getInt("size", pageIdx); // Do nothing when there is not enough free space on the page to add the packet. if (pageSize - pageOffset < packDataLen){ - FAIL_MSG("Track %" PRIu32 "p%zu : Pack %" PRIu64 "ms of %" PRIu64 "b exceeds size %" PRIu64 " @ bpos %" PRIu64, + FAIL_MSG("Track %" PRIu32 "p%" PRIu32 " : Pack %" PRIu64 "ms of %" PRIu64 "b exceeds size %" PRIu64 " @ bpos %" PRIu64, packTrack, currPagNum, packTime, packDataLen, pageSize, pageOffset); return; } diff --git a/src/io.h b/src/io.h index 82a7ec24..0af9176f 100644 --- a/src/io.h +++ b/src/io.h @@ -15,13 +15,13 @@ namespace Mist{ InOutBase(); bool isBuffered(size_t idx, uint32_t keyNum); - size_t bufferedOnPage(size_t idx, size_t keyNum); + uint32_t bufferedOnPage(size_t idx, uint32_t keyNum); size_t getMainSelectedTrack(); - bool bufferStart(size_t idx, size_t pageNumber); + bool bufferStart(size_t idx, uint32_t pageNumber); void bufferFinalize(size_t idx); - void bufferRemove(size_t idx, size_t pageNumber); + void bufferRemove(size_t idx, uint32_t pageNumber); void bufferLivePacket(const DTSC::Packet &packet); void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, @@ -42,7 +42,7 @@ namespace Mist{ std::map userSelect; - std::map curPageNum; ///< For each track, holds the number page that is currently being written. + std::map curPageNum; ///< For each track, holds the number page that is currently being written. std::map curPage; ///< For each track, holds the page that is currently being written. }; }// namespace Mist diff --git a/src/output/output.cpp b/src/output/output.cpp index 2d28c6ef..aaa68930 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -511,9 +511,9 @@ namespace Mist{ uint64_t pageKeys = tPages.getInt("keycount", i); if (keyNum > pageNum + pageKeys - 1) continue; uint64_t pageAvail = tPages.getInt("avail", i); - return pageAvail == 0 ? INVALID_PAGE_NUM : pageNum; + return pageAvail == 0 ? INVALID_KEY_NUM : pageNum; } - return INVALID_PAGE_NUM; + return INVALID_KEY_NUM; } /// Gets the highest page number available for the given trackId. @@ -551,8 +551,8 @@ namespace Mist{ uint64_t micros = Util::getMicros(); VERYHIGH_MSG("Loading track %zu, containing key %zu", trackId, keyNum); uint32_t timeout = 0; - uint64_t pageNum = pageNumForKey(trackId, keyNum); - while (keepGoing() && pageNum == INVALID_PAGE_NUM){ + uint32_t pageNum = pageNumForKey(trackId, keyNum); + while (keepGoing() && pageNum == INVALID_KEY_NUM){ if (!timeout){HIGH_MSG("Requesting page with key %zu:%zu", trackId, keyNum);} ++timeout; //Time out after 15 seconds @@ -689,7 +689,7 @@ namespace Mist{ } if (M.getType(mainTrack) == "video"){ DTSC::Keys keys(M.keys(mainTrack)); - size_t keyNum = M.getKeyNumForTime(mainTrack, pos); + uint32_t keyNum = M.getKeyNumForTime(mainTrack, pos); pos = keys.getTime(keyNum); } } @@ -734,9 +734,9 @@ namespace Mist{ return false; } DTSC::Keys keys(M.keys(tid)); - size_t keyNum = M.getKeyNumForTime(tid, pos); + uint32_t keyNum = M.getKeyNumForTime(tid, pos); uint64_t actualKeyTime = keys.getTime(keyNum); - HIGH_MSG("Seeking to track %zu key %zu => time %" PRIu64, tid, keyNum, pos); + HIGH_MSG("Seeking to track %zu key %" PRIu32 " => time %" PRIu64, tid, keyNum, pos); if (actualKeyTime > pos){ if (M.getLive()){ pos = actualKeyTime; @@ -776,7 +776,7 @@ namespace Mist{ tid); return false; } - VERYHIGH_MSG("Track %zu no data (key %zu @ %" PRIu64 ") - waiting...", tid, + VERYHIGH_MSG("Track %zu no data (key %" PRIu32 " @ %" PRIu64 ") - waiting...", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset); uint32_t i = 0; while (!meta.getLive() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ @@ -784,7 +784,7 @@ namespace Mist{ stats(); } if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);} - FAIL_MSG("Track %zu no data (key %zu@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1)); + FAIL_MSG("Track %zu no data (key %" PRIu32 "@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1)); userSelect.erase(tid); firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000); return false; @@ -1061,7 +1061,7 @@ namespace Mist{ DTSC::Keys mainKeys(meta.keys(mainTrack)); if (!mainKeys.getValidCount()){return false;} - for (size_t keyNum = mainKeys.getEndValid() - 1; keyNum >= mainKeys.getFirstValid(); keyNum--){ + for (uint32_t keyNum = mainKeys.getEndValid() - 1; keyNum >= mainKeys.getFirstValid(); keyNum--){ seekPos = mainKeys.getTime(keyNum); // Only skip forward if we can win a decent amount (100ms) if (seekPos <= cTime + 100 * seekCount){break;} @@ -1407,7 +1407,7 @@ namespace Mist{ // store copy of current state std::set tmp_buffer = buffer; std::map tmp_userSelect = userSelect; - std::map tmp_currentPage = currentPage; + std::map tmp_currentPage = currentPage; // reset the current packet to null, assuming failure thisPacket.null(); @@ -1424,7 +1424,7 @@ namespace Mist{ userSelect[mainTrack].reload(streamName, mainTrack); // now, seek to the exact timestamp of the keyframe DTSC::Keys keys(M.keys(mainTrack)); - size_t targetKey = M.getKeyNumForTime(mainTrack, currTime); + uint32_t targetKey = M.getKeyNumForTime(mainTrack, currTime); seek(keys.getTime(targetKey)); // attempt to load the key into thisPacket bool ret = prepareNext(); @@ -1437,7 +1437,7 @@ namespace Mist{ buffer = tmp_buffer; userSelect = tmp_userSelect; // but the currentPage map must also load keys as needed - for (std::map::iterator it = tmp_currentPage.begin(); it != tmp_currentPage.end(); ++it){ + for (std::map::iterator it = tmp_currentPage.begin(); it != tmp_currentPage.end(); ++it){ loadPageForKey(it->first, it->second); } // now we are back to normal and can return safely @@ -1543,6 +1543,12 @@ namespace Mist{ dropTrack(nxt.tid, "EOP: invalid next packet"); return false; } + if (nextTime < nxt.time){ + std::stringstream errMsg; + errMsg << "next packet has timestamp " << nextTime << " but current timestamp is " << nxt.time; + dropTrack(nxt.tid, errMsg.str().c_str()); + return false; + } }else{ //no next packet yet! //Check if this is the last packet of a VoD stream. Return success and drop the track. @@ -1552,13 +1558,25 @@ namespace Mist{ dropTrack(nxt.tid, "end of VoD track reached", false); return true; } - size_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); + uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); //Check if there exists a different page for the next key - size_t nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1); + uint32_t nextKeyPage = INVALID_KEY_NUM; + //Make sure we only try to read the page for the next key if it actually should be available + DTSC::Keys keys(M.keys(nxt.tid)); + if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);} if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){ - DTSC::Keys keys(M.keys(nxt.tid)); // If so, the next key is our next packet nextTime = keys.getTime(thisKey + 1); + + //If the next packet should've been before the current packet, something is wrong. Abort, abort! + if (nextTime < nxt.time){ + std::stringstream errMsg; + errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time; + errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage; + errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid(); + dropTrack(nxt.tid, errMsg.str().c_str()); + return false; + } }else{ //Okay, there's no next page yet, and no next packet on this page either. //That means we're waiting for data to show up, somewhere. @@ -1575,7 +1593,7 @@ namespace Mist{ } //every ~16 seconds, reconnect to metadata if (emptyCount % 1600 == 0){ - INFO_MSG("Reconnecting to input; track %" PRIu64 " key %zu is on page %zu and we're currently serving %zu from %zu", nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]); + INFO_MSG("Reconnecting to input; track %" PRIu64 " key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]); reconnect(); if (!meta){ onFail("Could not connect to stream data", true); @@ -1596,12 +1614,6 @@ namespace Mist{ } } - //If the next packet should've been before the current packet, something is wrong. Abort, abort! - if (nextTime < nxt.time){ - dropTrack(nxt.tid, "time going backwards"); - return false; - } - // we've handled all special cases - at this point the packet should exist // let's load it thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); @@ -1622,7 +1634,7 @@ namespace Mist{ //Update keynum only when the second flips over in the timestamp //We do this because DTSC::Keys is pretty CPU-heavy if (nxt.time / 1000 < nextTime/1000){ - size_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); + uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); userSelect[nxt.tid].setKeyNum(thisKey); } diff --git a/src/output/output.h b/src/output/output.h index 866a1e8c..20425fde 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -95,7 +95,7 @@ namespace Mist{ std::string getCountry(std::string ip); void doSync(bool force = false); /*LTS-END*/ - std::map currentPage; + std::map currentPage; void loadPageForKey(size_t trackId, size_t keyNum); uint64_t pageNumForKey(size_t trackId, size_t keyNum); uint64_t pageNumMax(size_t trackId);