From 3d9ed39396be1bada9c0877985938e0d8b053ce6 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Mon, 22 Nov 2021 10:47:34 +0100 Subject: [PATCH] setstreamVodField and streamLiveField no longer mutually exclusive Removed curPage map from IO. bufferFrame now creates this variable locally and passes it to bufferStart, bufferFinalize and bufferNext Fix keyNum selection with mixed live & VoD data Fix bufferframe to handle mixed VoD and live Added check to bufferFrame to not start the countdown timer for removing live pages Fixed countdown timer being set using keyNum rather than pageNumber, which resulted in the wrong pages being deleted livePage variable moved from static to private variable to correctly handle multithreaded inputs # Conflicts: # src/io.cpp # src/output/output.cpp --- lib/defines.h | 3 + lib/dtsc.cpp | 24 ++-- lib/dtsc.h | 8 +- src/input/input.cpp | 31 +++-- src/input/input_buffer.cpp | 1 - src/io.cpp | 196 ++++++++++++++++------------ src/io.h | 12 +- src/output/output.cpp | 88 ++++++++++--- src/output/output.h | 4 + src/output/output_ebml.cpp | 12 +- src/output/output_hls.cpp | 2 +- src/output/output_http_internal.cpp | 2 +- src/output/output_mp4.cpp | 8 +- 13 files changed, 241 insertions(+), 150 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index 48e9c6af..10c4b9b1 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -170,6 +170,9 @@ static inline void show_stackframe(){} #define DEFAULT_FRAGMENT_DURATION 1900 +// Pages get marked for deletion after X seconds of no one watching +#define DEFAULT_PAGE_TIMEOUT 15 + /// \TODO These values are hardcoded and that is dangerous and probably a very bad idea. I don't even know if they are currently correct...?! I doubt they are. #define META_META_OFFSET 104 #define META_META_RECORDSIZE 576 diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 0b3228c9..5941c573 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -1815,7 +1815,7 @@ namespace DTSC{ setInit(trackIdx, init.data(), init.size()); } - /// Sets the given track's init data. + /// Sets the given track's init data.setvod void Meta::setInit(size_t trackIdx, const char *init, size_t initLen){ DTSC::Track &t = tracks.at(trackIdx); char *_init = t.track.getPointer(t.trackInitField); @@ -2049,13 +2049,11 @@ namespace DTSC{ void Meta::setVod(bool vod){ stream.setInt(streamVodField, vod ? 1 : 0); - stream.setInt(streamLiveField, vod ? 0 : 1); } bool Meta::getVod() const{return stream.getInt(streamVodField);} void Meta::setLive(bool live){ stream.setInt(streamLiveField, live ? 1 : 0); - stream.setInt(streamVodField, live ? 0 : 1); } bool Meta::getLive() const{return stream.getInt(streamLiveField);} @@ -2085,10 +2083,11 @@ namespace DTSC{ } uint64_t Meta::getBufferWindow() const{return stream.getInt(streamBufferWindowField);} - void Meta::setBootMsOffset(uint64_t bootMsOffset){ + void Meta::setBootMsOffset(int64_t bootMsOffset){ + DONTEVEN_MSG("Setting streamBootMsOffsetField to '%ld'", bootMsOffset); stream.setInt(streamBootMsOffsetField, bootMsOffset); } - uint64_t Meta::getBootMsOffset() const{return stream.getInt(streamBootMsOffsetField);} + int64_t Meta::getBootMsOffset() const{return stream.getInt(streamBootMsOffsetField);} /*LTS-START*/ void Meta::setMinimumFragmentDuration(uint64_t fragmentDuration){ stream.setInt(streamMinimumFragmentDurationField, fragmentDuration); @@ -2326,12 +2325,7 @@ namespace DTSC{ (isKeyframe ? 19 : 0) + packDataSize + 11; } - if ((packBytePos > 0) != (stream.getInt(streamVodField) == 1)){ - INFO_MSG("Changing stream from %s to %s (bPos=%" PRIu64 ")", - stream.getInt(streamVodField) ? "VoD" : "live", (packBytePos >= 0) ? "Vod" : "live", packBytePos); - stream.setInt(streamVodField, packBytePos > 0 ? 1 : 0); - stream.setInt(streamLiveField, packBytePos > 0 ? 0 : 1); - } + if ((packBytePos > 0) && !getVod()){setVod(true);} size_t tNumber = packTrack; std::map::iterator it = tracks.find(tNumber); @@ -3148,6 +3142,7 @@ namespace DTSC{ if (pages.getInt(firsttime, i) > time){break;} res = i; } + DONTEVEN_MSG("Page number for time %" PRIu64 " on track %" PRIu32 " can be found on page %zu", time, idx, pages.getInt("firstkey", res)); return pages.getInt("firstkey", res); } @@ -3166,13 +3161,13 @@ namespace DTSC{ /// Returns the key number containing a given time. /// Or, closest key if given time is not available. - /// Or, zero if no keys are available at all. + /// Or, INVALID_KEY_NUM if no keys are available at all. /// If the time is in the gap before a key, returns that next key instead. size_t Meta::getKeyNumForTime(uint32_t idx, uint64_t time) const{ const Track &trk = tracks.at(idx); const Util::RelAccX &keys = trk.keys; const Util::RelAccX &parts = trk.parts; - if (!keys.getEndPos()){return 0;} + if (!keys.getEndPos()){return INVALID_KEY_NUM;} size_t res = keys.getStartPos(); for (size_t i = res; i < keys.getEndPos(); i++){ if (keys.getInt(trk.keyTimeField, i) > time){ @@ -3185,10 +3180,11 @@ namespace DTSC{ uint64_t dur = parts.getInt(trk.partDurationField, keys.getInt(trk.keyFirstPartField, i)-1); if (keys.getInt(trk.keyTimeField, i) - dur < time){res = i;} } - break; + continue; } res = i; } + DONTEVEN_MSG("Key number for time %" PRIu64 " on track %" PRIu32 " is %zu", time, idx, res); return res; } diff --git a/lib/dtsc.h b/lib/dtsc.h index 2f05fe3c..1ce21747 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -405,10 +405,10 @@ namespace DTSC{ uint64_t getFragmentDuration() const{return DEFAULT_FRAGMENT_DURATION;} LTS-END*/ - void setVod(bool vod = true); + void setVod(bool vod); bool getVod() const; - void setLive(bool live = true); + void setLive(bool live); bool getLive() const; bool hasBFrames(size_t idx = INVALID_TRACK_ID) const; @@ -416,8 +416,8 @@ namespace DTSC{ void setBufferWindow(uint64_t bufferWindow); uint64_t getBufferWindow() const; - void setBootMsOffset(uint64_t bootMsOffset); - uint64_t getBootMsOffset() const; + void setBootMsOffset(int64_t bootMsOffset); + int64_t getBootMsOffset() const; std::set getValidTracks(bool skipEmpty = false) const; std::set getMySourceTracks(size_t pid) const; diff --git a/src/input/input.cpp b/src/input/input.cpp index 886af766..5b52c38c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -36,6 +36,7 @@ namespace Mist{ //But! What if our current key is 20+ seconds long? HAVE YOU THOUGHT OF THAT?! //Exactly! I thought not! So, if the end key number == the first, we increase by one. if (endKey == key){++endKey;} + DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time); for (size_t i = key; i <= endKey; i++){bufferFrame(track, i);} //Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM. } @@ -987,12 +988,20 @@ namespace Mist{ for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); if (pageCounter[*it].count(pageNum)){ + // If the page is still being written to, reset the counter rather than potentially unloading it + if (isCurrentLivePage(*it, pageNum)){ + pageCounter[*it][pageNum] = DEFAULT_PAGE_TIMEOUT; + continue; + } --pageCounter[*it][pageNum]; if (!pageCounter[*it][pageNum]){ pageCounter[*it].erase(pageNum); bufferRemove(*it, pageNum); } } + else{ + pageCounter[*it][pageNum] = DEFAULT_PAGE_TIMEOUT; + } } } } @@ -1212,8 +1221,8 @@ namespace Mist{ } bool Input::bufferFrame(size_t idx, uint32_t keyNum){ - if (M.getLive()){return true;} - HIGH_MSG("Buffering track %zu, key %" PRIu32, idx, keyNum); + if (!M.getVod()){return true;} + DONTEVEN_MSG("Buffering track %zu, key %" PRIu32, idx, keyNum); bool isVideo = M.getType(idx) == "video"; size_t sourceIdx = M.getSourceTrack(idx); if (sourceIdx == INVALID_TRACK_ID){sourceIdx = idx;} @@ -1241,9 +1250,9 @@ namespace Mist{ } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); if (isBuffered(idx, pageNumber)){ - // get corresponding page number - pageCounter[idx][pageNumber] = 15; - VERYHIGH_MSG("Track %zu, key %" PRIu32 "is already buffered in page %" PRIu32 + // Mark the page for removal after 15 seconds of no one watching it + pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; + DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32 ". Cancelling bufferFrame", idx, keyNum, pageNumber); return true; @@ -1251,7 +1260,8 @@ namespace Mist{ // Update keynum to point to the corresponding page uint64_t bufferTimer = Util::bootMS(); keyNum = pageNumber; - if (!bufferStart(idx, pageNumber)){ + IPC::sharedPage page; + if (!bufferStart(idx, pageNumber, page)){ WARN_MSG("bufferStart failed! Cancelling bufferFrame"); return false; } @@ -1291,7 +1301,7 @@ namespace Mist{ size_t dataLen; srtPack.getString("data", data, dataLen); bufferNext(srtPack.getTime(), 0, idx, data, dataLen, srtPack.getInt("bpos"), - srtPack.getFlag("keyframe")); + srtPack.getFlag("keyframe"), page); ++packCounter; byteCounter += srtPack.getDataLen(); lastBuffered = srtPack.getTime(); @@ -1348,8 +1358,9 @@ namespace Mist{ INFO_MSG("Part size mismatch: %zu != %zu", dataLen, parts.getSize(partNo)); } ++partNo; + HIGH_MSG("Buffering VoD packet (%zuB) @%" PRIu64 " ms on track %zu with offset %" PRIu64, dataLen, thisPacket.getTime(), idx, thisPacket.getInt("offset")); bufferNext(thisPacket.getTime(), thisPacket.getInt("offset"), idx, data, dataLen, - thisPacket.getInt("bpos"), thisPacket.getFlag("keyframe")); + thisPacket.getInt("bpos"), thisPacket.getFlag("keyframe"), page); ++packCounter; byteCounter += thisPacket.getDataLen(); lastBuffered = thisPacket.getTime(); @@ -1369,13 +1380,13 @@ namespace Mist{ } } } - bufferFinalize(idx); + bufferFinalize(idx, page); bufferTimer = Util::bootMS() - bufferTimer; INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms", idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisPacket.getTime(), bufferTimer); INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, tPages.getInt("parts", pageIdx), byteCounter); - pageCounter[idx][keyNum] = 15; + pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; return true; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index c3cf51e2..1a22f619 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -317,7 +317,6 @@ namespace Mist{ break; } - curPageNum.erase(tid); INFO_MSG("Should remove track %zu", tid); meta.reloadReplacedPagesIfNeeded(); meta.removeTrack(tid); diff --git a/src/io.cpp b/src/io.cpp index 7bd5c4d7..d3d94827 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -37,7 +37,7 @@ namespace Mist{ /// Buffering itself is done by bufferNext(). ///\param tid The trackid of the page to start buffering ///\param pageNumber The number of the page to start buffering - bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber){ + bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page){ VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %" PRIu32, streamName.c_str(), idx, pageNumber); // Initialize the stream metadata if it does not yet exist #ifndef TSLIVE_INPUT @@ -51,10 +51,9 @@ namespace Mist{ // If we are currently buffering a page, abandon it completely and print a message about this // This page will NEVER be deleted, unless we open it again later. - if (curPage.count(idx)){ - WARN_MSG("Abandoning current page (%" PRIu32 ") for track %zu", curPageNum[idx], idx); - curPage.erase(idx); - curPageNum.erase(idx); + if (page){ + WARN_MSG("Abandoning current page (%s) for track %zu", page.name.c_str(), idx); + page.close(); } Util::RelAccX &tPages = meta.pages(idx); @@ -91,11 +90,15 @@ namespace Mist{ snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, pageNumber); uint64_t pageSize = tPages.getInt("size", pageIdx); std::string pageName(pageId); - curPage[idx].init(pageName, pageSize, true); + page.init(pageName, pageSize, true); + + if (!page){ + ERROR_MSG("Could not open page %s", pageId); + return false; + } + // Make sure the data page is not destroyed when we are done buffering it later on. - curPage[idx].master = false; - // Store the pagenumber of the currently buffer page - curPageNum[idx] = pageNumber; + page.master = false; // Set the current offset to 0, to allow for using it in bufferNext() tPages.setInt("avail", 0, pageIdx); @@ -104,6 +107,21 @@ namespace Mist{ return true; } + /// Checks whether a given page is currently being written to + /// \return True if the page is the current live page, and thus not safe to remove + bool InOutBase::isCurrentLivePage(size_t idx, uint32_t pageNumber){ + // Base case: for nonlive situations no new data will be added + if (!M.getLive()){ + return false; + } + // All pages at or after the current live page should not get removed + if (curPageNum[idx] && curPageNum[idx] <= pageNumber){ + return true; + } + // If there is no set curPageNum we are definitely not writing to it + return false; + } + /// Removes a fully buffered page /// /// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function. @@ -166,9 +184,10 @@ namespace Mist{ for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); - if (pageNum > keyNum) break; + if (pageNum > keyNum) continue; uint64_t keyCount = tPages.getInt("keycount", i); if (pageNum + keyCount - 1 < keyNum) continue; + if (keyCount && pageNum + keyCount - 1 < keyNum) continue; uint64_t avail = tPages.getInt("avail", i); return avail ? pageNum : INVALID_KEY_NUM; } @@ -178,7 +197,7 @@ namespace Mist{ /// Buffers the next packet on the currently opened page ///\param pack The packet to buffer void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, - size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page){ size_t packDataLen = 24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11; @@ -190,7 +209,7 @@ namespace Mist{ } // these checks were already done in bufferSinglePacket, but we check again just to be sure - if (meta.getLive() && packTime < meta.getLastms(packTrack)){ + if (!meta.getVod() && packTime < meta.getLastms(packTrack)){ DEBUG_MSG(((multiWrong == 0) ? DLVL_WARN : DLVL_HIGH), "Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack, packTime, meta.getLastms(packTrack)); @@ -198,16 +217,15 @@ namespace Mist{ return; } // Do nothing if no page is opened for this track - if (!curPage.count(packTrack)){ + if (!page){ INFO_MSG("Trying to buffer a packet on track %" PRIu32 ", but no page is initialized", packTrack); return; } multiWrong = false; - IPC::sharedPage &myPage = curPage[packTrack]; Util::RelAccX &tPages = meta.pages(packTrack); uint32_t pageIdx = 0; - uint32_t currPagNum = curPageNum[packTrack]; + uint32_t currPagNum = atoi(page.name.data() + page.name.rfind('_') + 1); Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt(firstkey, i) == currPagNum){ @@ -218,6 +236,7 @@ namespace Mist{ // Save the current write position uint64_t pageOffset = tPages.getInt("avail", pageIdx); uint64_t pageSize = tPages.getInt("size", pageIdx); + INSANE_MSG("Current packet %" PRIu64 " on track %" PRIu32 " has an offset on page %s of %" PRIu64, packTime, packTrack, page.name.c_str(), pageOffset); // Do nothing when there is not enough free space on the page to add the packet. if (pageSize - pageOffset < packDataLen){ FAIL_MSG("Track %" PRIu32 "p%" PRIu32 " : Pack %" PRIu64 "ms of %zub exceeds size %" PRIu64 " @ bpos %" PRIu64, @@ -228,7 +247,7 @@ namespace Mist{ // First generate only the payload on the correct destination // Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is // complete - char *data = myPage.mapped + pageOffset; + char *data = page.mapped + pageOffset; data[20] = 0xE0; // start container object unsigned int offset = 21; @@ -254,18 +273,15 @@ namespace Mist{ // Copy the remaining values in reverse order: // 8 byte timestamp - Bit::htobll(myPage.mapped + pageOffset + 12, packTime); + Bit::htobll(page.mapped + pageOffset + 12, packTime); // The mapped track id - Bit::htobl(myPage.mapped + pageOffset + 8, packTrack); + Bit::htobl(page.mapped + pageOffset + 8, packTrack); // Write the size - Bit::htobl(myPage.mapped + pageOffset + 4, packDataLen - 8); + Bit::htobl(page.mapped + pageOffset + 4, packDataLen - 8); // write the 'DTP2' bytes to conclude the packet and allow for reading it - memcpy(myPage.mapped + pageOffset, "DTP2", 4); - - if (M.getLive()){ - meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); - } + memcpy(page.mapped + pageOffset, "DTP2", 4); + DONTEVEN_MSG("Setting page %" PRIu32 " available to %" PRIu64, pageIdx, pageOffset + packDataLen); tPages.setInt("avail", pageOffset + packDataLen, pageIdx); } @@ -273,10 +289,10 @@ namespace Mist{ /// /// Registers the data page on the track index page as well ///\param tid The trackid of the page to finalize - void InOutBase::bufferFinalize(size_t idx){ + void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){ // If no page is open, do nothing - if (!curPage.count(idx)){ - INFO_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx); + if (!page){ + WARN_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx); return; } @@ -298,8 +314,7 @@ namespace Mist{ // Close our link to the page. This will NOT destroy the shared page, as we've set master to // false upon construction Note: if there was a registering failure above, this WILL destroy the // shared page, to prevent a memory leak - curPage.erase(idx); - curPageNum.erase(idx); + page.close(); } /// Buffers a live packet to a page. @@ -325,7 +340,7 @@ namespace Mist{ void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ meta.reloadReplacedPagesIfNeeded(); - meta.setLive(); + meta.setLive(true); // Store the trackid for easier access // Do nothing if the trackid is invalid @@ -336,7 +351,7 @@ namespace Mist{ if (M.getType(packTrack) != "video"){ isKeyframe = false; - if (!tPages.getEndPos()){ + if (!tPages.getEndPos() || !livePage[packTrack]){ // Assume this is the first packet on the track isKeyframe = true; }else{ @@ -358,84 +373,97 @@ namespace Mist{ WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, M.getLastms(packTrack), packTime); } } - + // Determine if we need to open the next page - uint32_t nextPageNum = INVALID_KEY_NUM; if (isKeyframe){ updateTrackFromKeyframe(packTrack, packData, packDataSize); uint64_t endPage = tPages.getEndPos(); - - // If there is no page, create it - if (!endPage){ - nextPageNum = 0; - tPages.setInt("firstkey", 0, 0); - tPages.setInt("firsttime", packTime, 0); - tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0); - tPages.setInt("keycount", 0, 0); - tPages.setInt("avail", 0, 0); - tPages.addRecords(1); - ++endPage; + size_t curPage = 0; + size_t currPagNum = atoi(livePage[packTrack].name.data() + livePage[packTrack].name.rfind('_') + 1); + Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); + for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + if (tPages.getInt(firstkey, i) == currPagNum){ + curPage = i; + break; + } } - uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1); - // Compare on 8 mb boundary and target duration - if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){ + // If there is no page, create it + if (!livePage[packTrack]){ + size_t keyNum = M.getKeyNumForTime(packTrack, packTime); + if (keyNum == INVALID_KEY_NUM){ + curPageNum[packTrack] = 0; + }else{ + curPageNum[packTrack] = M.getKeyNumForTime(packTrack, packTime) + 1; + } - if ((endPage - tPages.getDeleted()) >= tPages.getRCount()){ + if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); } - // Create the book keeping data for the new page - nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1); - HIGH_MSG("Live page transition from %" PRIu32 ":%" PRIu64 " to %" PRIu32 ":%" PRIu32, packTrack, - tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum); - tPages.setInt("firstkey", nextPageNum, endPage); + + tPages.addRecords(1); + tPages.setInt("firstkey", curPageNum[packTrack], endPage); tPages.setInt("firsttime", packTime, endPage); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); tPages.setInt("keycount", 0, endPage); tPages.setInt("avail", 0, endPage); - tPages.addRecords(1); - ++endPage; - } - tPages.setInt("lastkeytime", packTime, endPage - 1); - tPages.setInt("keycount", tPages.getInt("keycount", endPage - 1) + 1, endPage - 1); - } - // Set the pageNumber if it has not been set yet - if (nextPageNum == INVALID_KEY_NUM){ - if (curPageNum.count(packTrack)){ - nextPageNum = curPageNum[packTrack]; + curPage = endPage; + DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); + if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){ + // if this fails, return instantly without actually buffering the packet + WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); + return; + } }else{ - nextPageNum = 0; + uint64_t prevPageTime = tPages.getInt("firsttime", curPage); + // Compare on 8 mb boundary and target duration + if (tPages.getInt("avail", curPage) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){ + // Create the book keeping data for the new page + curPageNum[packTrack] = tPages.getInt("firstkey", curPage) + tPages.getInt("keycount", curPage); + DONTEVEN_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%zu", packTrack, + tPages.getInt("firstkey", curPage), packTrack, curPageNum[packTrack]); + + if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ + meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); + } + + tPages.addRecords(1); + tPages.setInt("firstkey", curPageNum[packTrack], endPage); + tPages.setInt("firsttime", packTime, endPage); + tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); + tPages.setInt("keycount", 0, endPage); + tPages.setInt("avail", 0, endPage); + curPage = endPage; + if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);} + DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); + if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){ + // if this fails, return instantly without actually buffering the packet + WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); + return; + } + } } + DONTEVEN_MSG("Setting page %lu lastkeyTime to '%lu' and keycount to '%lu'", tPages.getInt("firstkey", curPage), packTime, tPages.getInt("keycount", curPage) + 1); + tPages.setInt("lastkeytime", packTime, curPage); + tPages.setInt("keycount", tPages.getInt("keycount", curPage) + 1, curPage); } - // If we have no pages by track, we have not received a starting keyframe yet. Drop this packet. - if (!tPages.getEndPos()){ - INFO_MSG("Track %" PRIu32 " not starting with a keyframe!", packTrack); + if (!livePage[packTrack]) { + INFO_MSG("Track %" PRIu32 " page %zu not starting with a keyframe!", packTrack, curPageNum[packTrack]); return; } - if (curPage.count(packTrack) && !curPage[packTrack].exists()){ - WARN_MSG("Data page was deleted - forcing source shutdown to prevent unstable state"); + if (!livePage[packTrack].exists()){ + WARN_MSG("Data page '%s' was deleted - forcing source shutdown to prevent unstable state", livePage[packTrack].name.c_str()); Util::logExitReason("data page was deleted, forcing shutdown to prevent unstable state"); - bufferFinalize(packTrack); + bufferFinalize(packTrack, livePage[packTrack]); kill(getpid(), SIGINT); return; } - - if (!curPageNum.count(packTrack) || nextPageNum != curPageNum[packTrack]){ - if (curPageNum.count(packTrack)){ - // Close the currently opened page when it exists - bufferFinalize(packTrack); - } - // Open the new page - if (!bufferStart(packTrack, nextPageNum)){ - // if this fails, return instantly without actually buffering the packet - WARN_MSG("Dropping packet for %s: (no page) %" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); - return; - } - } // Buffer the packet - bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe); + DONTEVEN_MSG("Buffering live packet (%zuB) @%" PRIu64 " ms on track %" PRIu32 " with offset %" PRIu64, packDataSize, packTime, packTrack, packOffset); + bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, livePage[packTrack]); + meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); } ///Handles updating track metadata from a new keyframe, if applicable diff --git a/src/io.h b/src/io.h index 0af9176f..e0652efd 100644 --- a/src/io.h +++ b/src/io.h @@ -19,13 +19,14 @@ namespace Mist{ size_t getMainSelectedTrack(); - bool bufferStart(size_t idx, uint32_t pageNumber); - void bufferFinalize(size_t idx); + bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page); + void bufferFinalize(size_t idx, IPC::sharedPage & page); + bool isCurrentLivePage(size_t idx, uint32_t pageNumber); void bufferRemove(size_t idx, uint32_t pageNumber); void bufferLivePacket(const DTSC::Packet &packet); void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, - size_t packDataSize, uint64_t packBytePos, bool isKeyframe); + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page); void bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe); @@ -42,7 +43,8 @@ namespace Mist{ std::map userSelect; - std::map curPageNum; ///< For each track, holds the number page that is currently being written. - std::map curPage; ///< For each track, holds the page that is currently being written. + private: + std::map livePage; + std::map curPageNum; }; }// namespace Mist diff --git a/src/output/output.cpp b/src/output/output.cpp index dd8b0ef3..28894376 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -108,6 +108,8 @@ namespace Mist{ firstData = true; newUA = true; lastPushUpdate = 0; + previousFile = ""; + currentFile = ""; lastRecv = Util::bootSecs(); if (myConn){ @@ -418,9 +420,9 @@ namespace Mist{ statComm.reload(); stats(true); if (isPushing()){return;} - if (!isRecording() && !M.getVod() && !isReadyForPlay()){ + if (!isRecording() && M.getLive() && !isReadyForPlay()){ uint64_t waitUntil = Util::bootSecs() + 45; - while (!M.getVod() && !isReadyForPlay()){ + while (M.getLive() && !isReadyForPlay()){ if (Util::bootSecs() > waitUntil || (!userSelect.size() && Util::bootSecs() > waitUntil)){ INFO_MSG("Giving up waiting for playable tracks. IP: %s", getConnectedHost().c_str()); break; @@ -537,6 +539,7 @@ namespace Mist{ if (!keys.getValidCount()){return 0;} //Get the key for the current time size_t keyNum = M.getKeyNumForTime(trk, lastPacketTime); + if (keyNum == INVALID_KEY_NUM){return 0;} if (keys.getEndValid() <= keyNum+1){return 0;} //Return the next key return keys.getTime(keyNum+1); @@ -546,7 +549,7 @@ namespace Mist{ const Util::RelAccX &tPages = M.pages(trackId); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); - if (pageNum > keyNum) break; + if (pageNum > keyNum) continue; uint64_t pageKeys = tPages.getInt("keycount", i); if (keyNum > pageNum + pageKeys - 1) continue; uint64_t pageAvail = tPages.getInt("avail", i); @@ -581,7 +584,7 @@ namespace Mist{ return; } size_t lastAvailKey = keys.getEndValid() - 1; - if (meta.getVod() && keyNum > lastAvailKey){ + if (!meta.getLive() && keyNum > lastAvailKey){ INFO_MSG("Load for track %zu key %zu aborted, is > %zu", trackId, keyNum, lastAvailKey); curPage.erase(trackId); currentPage.erase(trackId); @@ -729,6 +732,10 @@ namespace Mist{ if (M.getType(mainTrack) == "video"){ DTSC::Keys keys(M.keys(mainTrack)); uint32_t keyNum = M.getKeyNumForTime(mainTrack, pos); + if (keyNum == INVALID_KEY_NUM){ + FAIL_MSG("Attempted seek on empty track %zu", mainTrack); + return false; + } pos = keys.getTime(keyNum); } } @@ -774,13 +781,15 @@ namespace Mist{ } DTSC::Keys keys(M.keys(tid)); uint32_t keyNum = M.getKeyNumForTime(tid, pos); + if (keyNum == INVALID_KEY_NUM){ + FAIL_MSG("Attempted seek on empty track %zu", tid); + return false; + } uint64_t actualKeyTime = keys.getTime(keyNum); HIGH_MSG("Seeking to track %zu key %" PRIu32 " => time %" PRIu64, tid, keyNum, pos); if (actualKeyTime > pos){ - if (M.getLive()){ - pos = actualKeyTime; - userSelect[tid].setKeyNum(keyNum); - } + pos = actualKeyTime; + userSelect[tid].setKeyNum(keyNum); } loadPageForKey(tid, keyNum + (getNextKey ? 1 : 0)); if (!curPage.count(tid) || !curPage[tid].mapped){ @@ -818,7 +827,7 @@ namespace Mist{ VERYHIGH_MSG("Track %zu no data (key %" PRIu32 " @ %" PRIu64 ") - waiting...", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset); uint32_t i = 0; - while (!meta.getLive() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ + while (meta.getVod() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ Util::wait(100 * i); stats(); } @@ -917,7 +926,7 @@ namespace Mist{ if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){ uint64_t startRec = atoll(targetParams["recstart"].c_str()); if (startRec > endTime()){ - if (M.getVod()){ + if (!M.getLive()){ onFail("Recording start past end of non-live source", true); return; } @@ -1028,7 +1037,7 @@ namespace Mist{ size_t mainTrack = getMainSelectedTrack(); int64_t startRec = atoll(targetParams["start"].c_str()); if (startRec > M.getLastms(mainTrack)){ - if (M.getVod()){ + if (!M.getLive()){ onFail("Playback start past end of non-live source", true); return; } @@ -1360,6 +1369,9 @@ namespace Mist{ if (newTarget.rfind('?') != std::string::npos){ newTarget.erase(newTarget.rfind('?')); } + // Keep track of filenames written, so that they can be added to the playlist file + previousFile = currentFile; + currentFile = newTarget; INFO_MSG("Switching to next push target filename: %s", newTarget.c_str()); if (!connectToFile(newTarget)){ FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str()); @@ -1486,11 +1498,16 @@ namespace Mist{ // now, seek to the exact timestamp of the keyframe DTSC::Keys keys(M.keys(mainTrack)); uint32_t targetKey = M.getKeyNumForTime(mainTrack, currTime); - seek(keys.getTime(targetKey)); - // attempt to load the key into thisPacket - bool ret = prepareNext(); - if (!ret){ - WARN_MSG("Failed to load keyframe for %" PRIu64 "ms - continuing without it", currTime); + bool ret = false; + if (targetKey == INVALID_KEY_NUM){ + FAIL_MSG("No keyframes available on track %zu", mainTrack); + }else{ + seek(keys.getTime(targetKey)); + // attempt to load the key into thisPacket + ret = prepareNext(); + if (!ret){ + WARN_MSG("Failed to load keyframe for %" PRIu64 "ms - continuing without it", currTime); + } } // restore state to before the seek/load @@ -1547,7 +1564,39 @@ namespace Mist{ return false; } - Util::sortedPageInfo nxt; + Util::sortedPageInfo nxt = *(buffer.begin()); + + if (meta.reloadReplacedPagesIfNeeded()){return false;} + if (!M.getValidTracks().count(nxt.tid)){ + dropTrack(nxt.tid, "disappeared from metadata"); + return false; + } + + // if we're going to read past the end of the data page, load the next page + // this only happens for VoD + if (nxt.offset >= curPage[nxt.tid].len || + (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){ + if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ + dropTrack(nxt.tid, "end of non-live track reached", false); + return false; + } + if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){ + loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time)); + nxt.offset = 0; + //Only read the next time if the page load succeeded and there is a packet to read from + if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){ + nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0); + } + buffer.replaceFirst(nxt); + return false; + } + dropTrack(nxt.tid, "VoD page load failure"); + return false; + } + + // We know this packet will be valid, pre-load it so we know its length + DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); + uint64_t nextTime = 0; //In case we're not in sync mode, we might have to retry a few times for (size_t trackTries = 0; trackTries < buffer.size(); ++trackTries){ @@ -1582,7 +1631,6 @@ namespace Mist{ dropTrack(nxt.tid, "VoD page load failure"); return false; } - // We know this packet will be valid, pre-load it so we know its length DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); @@ -1605,10 +1653,10 @@ namespace Mist{ }else{ //no next packet yet! //Check if this is the last packet of a VoD stream. Return success and drop the track. - if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ + if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); thisIdx = nxt.tid; - dropTrack(nxt.tid, "end of VoD track reached", false); + dropTrack(nxt.tid, "end of non-live track reached", false); return true; } uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); diff --git a/src/output/output.h b/src/output/output.h index 93e3a38a..335c0b14 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -31,6 +31,8 @@ namespace Mist{ /*LTS-START*/ std::string reqUrl; /*LTS-END*/ + std::string previousFile; + std::string currentFile; // non-virtual generic functions virtual int run(); virtual void stats(bool force = false); @@ -155,6 +157,8 @@ namespace Mist{ size_t thisIdx; uint64_t thisTime; + + std::map curPage; ///< For each track, holds the page that is currently being written. }; }// namespace Mist diff --git a/src/output/output_ebml.cpp b/src/output/output_ebml.cpp index 59aec749..2c71eb2c 100644 --- a/src/output/output_ebml.cpp +++ b/src/output/output_ebml.cpp @@ -17,7 +17,7 @@ namespace Mist{ if (config->getString("target").size()){ if (config->getString("target").find(".webm") != std::string::npos){doctype = "webm";} initialize(); - if (M.getVod()){calcVodSizes();} + if (!M.getLive()){calcVodSizes();} if (!streamName.size()){ WARN_MSG("Recording unconnected EBML output to file! Cancelled."); conn.close(); @@ -139,7 +139,7 @@ namespace Mist{ if (thisPacket.getTime() >= newClusterTime){ if (liveSeek()){return;} currentClusterTime = thisPacket.getTime(); - if (M.getVod()){ + if (!M.getLive()){ // In case of VoD, clusters are aligned with the main track fragments // EXCEPT when they are more than 30 seconds long, because clusters are limited to -32 to 32 // seconds. @@ -318,7 +318,7 @@ namespace Mist{ void OutEBML::sendHeader(){ double duration = 0; size_t idx = getMainSelectedTrack(); - if (M.getVod()){ + if (!M.getLive()){ duration = M.getLastms(idx) - M.getFirstms(idx); }else{ needsLookAhead = 420; @@ -326,7 +326,7 @@ namespace Mist{ // EBML header and Segment EBML::sendElemEBML(myConn, doctype); EBML::sendElemHead(myConn, EBML::EID_SEGMENT, segmentSize); // Default = Unknown size - if (M.getVod()){ + if (!M.getLive()){ // SeekHead EBML::sendElemHead(myConn, EBML::EID_SEEKHEAD, seekSize); EBML::sendElemSeek(myConn, EBML::EID_INFO, seekheadSize); @@ -344,7 +344,7 @@ namespace Mist{ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ sendElemTrackEntry(it->first); } - if (M.getVod()){ + if (!M.getLive()){ EBML::sendElemHead(myConn, EBML::EID_CUES, cuesSize); uint64_t tmpsegSize = infoSize + tracksSize + seekheadSize + cuesSize + EBML::sizeElemHead(EBML::EID_CUES, cuesSize); @@ -407,7 +407,7 @@ namespace Mist{ // Calculate the sizes of various parts, if we're VoD. size_t totalSize = 0; - if (M.getVod()){ + if (!M.getLive()){ calcVodSizes(); // We now know the full size of the segment, thus can calculate the total size totalSize = EBML::sizeElemEBML(doctype) + EBML::sizeElemHead(EBML::EID_SEGMENT, segmentSize) + segmentSize; diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index f019094d..8aea74c8 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -125,7 +125,7 @@ namespace Mist{ } size_t skippedLines = 0; if (M.getLive() && lines.size()){ - // only print the last segment when VoD + // only print the last segment when non-live lines.pop_back(); totalDuration -= durations.back(); durations.pop_back(); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index dde45373..43c8b214 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -503,7 +503,7 @@ namespace Mist{ json_resp["width"] = 640; json_resp["height"] = (hasVideo ? 480 : 20); } - json_resp["type"] = (M.getVod() ? "vod" : "live"); + json_resp["type"] = (M.getLive() ? "live" : "vod"); if (M.getLive()){ json_resp["unixoffset"] = M.getBootMsOffset() + (Util::unixMS() - Util::bootMS()); } diff --git a/src/output/output_mp4.cpp b/src/output/output_mp4.cpp index d5695c1e..22806a70 100644 --- a/src/output/output_mp4.cpp +++ b/src/output/output_mp4.cpp @@ -243,7 +243,7 @@ namespace Mist{ + 8 // MINF Box + 36 // DINF Box + 8; // STBL Box - if (M.getVod() && M.getFirstms(it->first) != firstms){ + if (!M.getLive() && M.getFirstms(it->first) != firstms){ tmpRes += 12; // EDTS entry extra } @@ -379,7 +379,7 @@ namespace Mist{ // Construct with duration of -1, as this is the default for fragmented MP4::MVHD mvhdBox(-1); // Then override it when we are not sending a VoD asset - if (M.getVod()){ + if (!M.getLive()){ // calculating longest duration uint64_t lastms = 0; for (std::map::const_iterator it = userSelect.begin(); @@ -413,7 +413,7 @@ namespace Mist{ MP4::ELST elstBox; elstBox.setVersion(0); elstBox.setFlags(0); - if (M.getVod() && M.getFirstms(it->first) != firstms){ + if (!M.getLive() && M.getFirstms(it->first) != firstms){ elstBox.setCount(2); elstBox.setSegmentDuration(0, M.getFirstms(it->first) - firstms); @@ -1161,7 +1161,7 @@ namespace Mist{ H.StartResponse("206", "Partial content", req, myConn); } }else{ - if (M.getVod()){H.SetHeader("Content-Length", byteEnd - byteStart + 1);} + if (!M.getLive()){H.SetHeader("Content-Length", byteEnd - byteStart + 1);} H.StartResponse("200", "OK", req, myConn); }