diff --git a/lib/defines.h b/lib/defines.h index 48e9c6af..10c4b9b1 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -170,6 +170,9 @@ static inline void show_stackframe(){} #define DEFAULT_FRAGMENT_DURATION 1900 +// Pages get marked for deletion after X seconds of no one watching +#define DEFAULT_PAGE_TIMEOUT 15 + /// \TODO These values are hardcoded and that is dangerous and probably a very bad idea. I don't even know if they are currently correct...?! I doubt they are. #define META_META_OFFSET 104 #define META_META_RECORDSIZE 576 diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 0b3228c9..5941c573 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -1815,7 +1815,7 @@ namespace DTSC{ setInit(trackIdx, init.data(), init.size()); } - /// Sets the given track's init data. + /// Sets the given track's init data.setvod void Meta::setInit(size_t trackIdx, const char *init, size_t initLen){ DTSC::Track &t = tracks.at(trackIdx); char *_init = t.track.getPointer(t.trackInitField); @@ -2049,13 +2049,11 @@ namespace DTSC{ void Meta::setVod(bool vod){ stream.setInt(streamVodField, vod ? 1 : 0); - stream.setInt(streamLiveField, vod ? 0 : 1); } bool Meta::getVod() const{return stream.getInt(streamVodField);} void Meta::setLive(bool live){ stream.setInt(streamLiveField, live ? 1 : 0); - stream.setInt(streamVodField, live ? 0 : 1); } bool Meta::getLive() const{return stream.getInt(streamLiveField);} @@ -2085,10 +2083,11 @@ namespace DTSC{ } uint64_t Meta::getBufferWindow() const{return stream.getInt(streamBufferWindowField);} - void Meta::setBootMsOffset(uint64_t bootMsOffset){ + void Meta::setBootMsOffset(int64_t bootMsOffset){ + DONTEVEN_MSG("Setting streamBootMsOffsetField to '%ld'", bootMsOffset); stream.setInt(streamBootMsOffsetField, bootMsOffset); } - uint64_t Meta::getBootMsOffset() const{return stream.getInt(streamBootMsOffsetField);} + int64_t Meta::getBootMsOffset() const{return stream.getInt(streamBootMsOffsetField);} /*LTS-START*/ void Meta::setMinimumFragmentDuration(uint64_t fragmentDuration){ stream.setInt(streamMinimumFragmentDurationField, fragmentDuration); @@ -2326,12 +2325,7 @@ namespace DTSC{ (isKeyframe ? 19 : 0) + packDataSize + 11; } - if ((packBytePos > 0) != (stream.getInt(streamVodField) == 1)){ - INFO_MSG("Changing stream from %s to %s (bPos=%" PRIu64 ")", - stream.getInt(streamVodField) ? "VoD" : "live", (packBytePos >= 0) ? "Vod" : "live", packBytePos); - stream.setInt(streamVodField, packBytePos > 0 ? 1 : 0); - stream.setInt(streamLiveField, packBytePos > 0 ? 0 : 1); - } + if ((packBytePos > 0) && !getVod()){setVod(true);} size_t tNumber = packTrack; std::map::iterator it = tracks.find(tNumber); @@ -3148,6 +3142,7 @@ namespace DTSC{ if (pages.getInt(firsttime, i) > time){break;} res = i; } + DONTEVEN_MSG("Page number for time %" PRIu64 " on track %" PRIu32 " can be found on page %zu", time, idx, pages.getInt("firstkey", res)); return pages.getInt("firstkey", res); } @@ -3166,13 +3161,13 @@ namespace DTSC{ /// Returns the key number containing a given time. /// Or, closest key if given time is not available. - /// Or, zero if no keys are available at all. + /// Or, INVALID_KEY_NUM if no keys are available at all. /// If the time is in the gap before a key, returns that next key instead. size_t Meta::getKeyNumForTime(uint32_t idx, uint64_t time) const{ const Track &trk = tracks.at(idx); const Util::RelAccX &keys = trk.keys; const Util::RelAccX &parts = trk.parts; - if (!keys.getEndPos()){return 0;} + if (!keys.getEndPos()){return INVALID_KEY_NUM;} size_t res = keys.getStartPos(); for (size_t i = res; i < keys.getEndPos(); i++){ if (keys.getInt(trk.keyTimeField, i) > time){ @@ -3185,10 +3180,11 @@ namespace DTSC{ uint64_t dur = parts.getInt(trk.partDurationField, keys.getInt(trk.keyFirstPartField, i)-1); if (keys.getInt(trk.keyTimeField, i) - dur < time){res = i;} } - break; + continue; } res = i; } + DONTEVEN_MSG("Key number for time %" PRIu64 " on track %" PRIu32 " is %zu", time, idx, res); return res; } diff --git a/lib/dtsc.h b/lib/dtsc.h index 2f05fe3c..1ce21747 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -405,10 +405,10 @@ namespace DTSC{ uint64_t getFragmentDuration() const{return DEFAULT_FRAGMENT_DURATION;} LTS-END*/ - void setVod(bool vod = true); + void setVod(bool vod); bool getVod() const; - void setLive(bool live = true); + void setLive(bool live); bool getLive() const; bool hasBFrames(size_t idx = INVALID_TRACK_ID) const; @@ -416,8 +416,8 @@ namespace DTSC{ void setBufferWindow(uint64_t bufferWindow); uint64_t getBufferWindow() const; - void setBootMsOffset(uint64_t bootMsOffset); - uint64_t getBootMsOffset() const; + void setBootMsOffset(int64_t bootMsOffset); + int64_t getBootMsOffset() const; std::set getValidTracks(bool skipEmpty = false) const; std::set getMySourceTracks(size_t pid) const; diff --git a/src/input/input.cpp b/src/input/input.cpp index 886af766..5b52c38c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -36,6 +36,7 @@ namespace Mist{ //But! What if our current key is 20+ seconds long? HAVE YOU THOUGHT OF THAT?! //Exactly! I thought not! So, if the end key number == the first, we increase by one. if (endKey == key){++endKey;} + DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time); for (size_t i = key; i <= endKey; i++){bufferFrame(track, i);} //Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM. } @@ -987,12 +988,20 @@ namespace Mist{ for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); if (pageCounter[*it].count(pageNum)){ + // If the page is still being written to, reset the counter rather than potentially unloading it + if (isCurrentLivePage(*it, pageNum)){ + pageCounter[*it][pageNum] = DEFAULT_PAGE_TIMEOUT; + continue; + } --pageCounter[*it][pageNum]; if (!pageCounter[*it][pageNum]){ pageCounter[*it].erase(pageNum); bufferRemove(*it, pageNum); } } + else{ + pageCounter[*it][pageNum] = DEFAULT_PAGE_TIMEOUT; + } } } } @@ -1212,8 +1221,8 @@ namespace Mist{ } bool Input::bufferFrame(size_t idx, uint32_t keyNum){ - if (M.getLive()){return true;} - HIGH_MSG("Buffering track %zu, key %" PRIu32, idx, keyNum); + if (!M.getVod()){return true;} + DONTEVEN_MSG("Buffering track %zu, key %" PRIu32, idx, keyNum); bool isVideo = M.getType(idx) == "video"; size_t sourceIdx = M.getSourceTrack(idx); if (sourceIdx == INVALID_TRACK_ID){sourceIdx = idx;} @@ -1241,9 +1250,9 @@ namespace Mist{ } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); if (isBuffered(idx, pageNumber)){ - // get corresponding page number - pageCounter[idx][pageNumber] = 15; - VERYHIGH_MSG("Track %zu, key %" PRIu32 "is already buffered in page %" PRIu32 + // Mark the page for removal after 15 seconds of no one watching it + pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; + DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32 ". Cancelling bufferFrame", idx, keyNum, pageNumber); return true; @@ -1251,7 +1260,8 @@ namespace Mist{ // Update keynum to point to the corresponding page uint64_t bufferTimer = Util::bootMS(); keyNum = pageNumber; - if (!bufferStart(idx, pageNumber)){ + IPC::sharedPage page; + if (!bufferStart(idx, pageNumber, page)){ WARN_MSG("bufferStart failed! Cancelling bufferFrame"); return false; } @@ -1291,7 +1301,7 @@ namespace Mist{ size_t dataLen; srtPack.getString("data", data, dataLen); bufferNext(srtPack.getTime(), 0, idx, data, dataLen, srtPack.getInt("bpos"), - srtPack.getFlag("keyframe")); + srtPack.getFlag("keyframe"), page); ++packCounter; byteCounter += srtPack.getDataLen(); lastBuffered = srtPack.getTime(); @@ -1348,8 +1358,9 @@ namespace Mist{ INFO_MSG("Part size mismatch: %zu != %zu", dataLen, parts.getSize(partNo)); } ++partNo; + HIGH_MSG("Buffering VoD packet (%zuB) @%" PRIu64 " ms on track %zu with offset %" PRIu64, dataLen, thisPacket.getTime(), idx, thisPacket.getInt("offset")); bufferNext(thisPacket.getTime(), thisPacket.getInt("offset"), idx, data, dataLen, - thisPacket.getInt("bpos"), thisPacket.getFlag("keyframe")); + thisPacket.getInt("bpos"), thisPacket.getFlag("keyframe"), page); ++packCounter; byteCounter += thisPacket.getDataLen(); lastBuffered = thisPacket.getTime(); @@ -1369,13 +1380,13 @@ namespace Mist{ } } } - bufferFinalize(idx); + bufferFinalize(idx, page); bufferTimer = Util::bootMS() - bufferTimer; INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms", idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisPacket.getTime(), bufferTimer); INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, tPages.getInt("parts", pageIdx), byteCounter); - pageCounter[idx][keyNum] = 15; + pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; return true; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index c3cf51e2..1a22f619 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -317,7 +317,6 @@ namespace Mist{ break; } - curPageNum.erase(tid); INFO_MSG("Should remove track %zu", tid); meta.reloadReplacedPagesIfNeeded(); meta.removeTrack(tid); diff --git a/src/io.cpp b/src/io.cpp index 7bd5c4d7..d3d94827 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -37,7 +37,7 @@ namespace Mist{ /// Buffering itself is done by bufferNext(). ///\param tid The trackid of the page to start buffering ///\param pageNumber The number of the page to start buffering - bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber){ + bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page){ VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %" PRIu32, streamName.c_str(), idx, pageNumber); // Initialize the stream metadata if it does not yet exist #ifndef TSLIVE_INPUT @@ -51,10 +51,9 @@ namespace Mist{ // If we are currently buffering a page, abandon it completely and print a message about this // This page will NEVER be deleted, unless we open it again later. - if (curPage.count(idx)){ - WARN_MSG("Abandoning current page (%" PRIu32 ") for track %zu", curPageNum[idx], idx); - curPage.erase(idx); - curPageNum.erase(idx); + if (page){ + WARN_MSG("Abandoning current page (%s) for track %zu", page.name.c_str(), idx); + page.close(); } Util::RelAccX &tPages = meta.pages(idx); @@ -91,11 +90,15 @@ namespace Mist{ snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, pageNumber); uint64_t pageSize = tPages.getInt("size", pageIdx); std::string pageName(pageId); - curPage[idx].init(pageName, pageSize, true); + page.init(pageName, pageSize, true); + + if (!page){ + ERROR_MSG("Could not open page %s", pageId); + return false; + } + // Make sure the data page is not destroyed when we are done buffering it later on. - curPage[idx].master = false; - // Store the pagenumber of the currently buffer page - curPageNum[idx] = pageNumber; + page.master = false; // Set the current offset to 0, to allow for using it in bufferNext() tPages.setInt("avail", 0, pageIdx); @@ -104,6 +107,21 @@ namespace Mist{ return true; } + /// Checks whether a given page is currently being written to + /// \return True if the page is the current live page, and thus not safe to remove + bool InOutBase::isCurrentLivePage(size_t idx, uint32_t pageNumber){ + // Base case: for nonlive situations no new data will be added + if (!M.getLive()){ + return false; + } + // All pages at or after the current live page should not get removed + if (curPageNum[idx] && curPageNum[idx] <= pageNumber){ + return true; + } + // If there is no set curPageNum we are definitely not writing to it + return false; + } + /// Removes a fully buffered page /// /// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function. @@ -166,9 +184,10 @@ namespace Mist{ for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); - if (pageNum > keyNum) break; + if (pageNum > keyNum) continue; uint64_t keyCount = tPages.getInt("keycount", i); if (pageNum + keyCount - 1 < keyNum) continue; + if (keyCount && pageNum + keyCount - 1 < keyNum) continue; uint64_t avail = tPages.getInt("avail", i); return avail ? pageNum : INVALID_KEY_NUM; } @@ -178,7 +197,7 @@ namespace Mist{ /// Buffers the next packet on the currently opened page ///\param pack The packet to buffer void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, - size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page){ size_t packDataLen = 24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11; @@ -190,7 +209,7 @@ namespace Mist{ } // these checks were already done in bufferSinglePacket, but we check again just to be sure - if (meta.getLive() && packTime < meta.getLastms(packTrack)){ + if (!meta.getVod() && packTime < meta.getLastms(packTrack)){ DEBUG_MSG(((multiWrong == 0) ? DLVL_WARN : DLVL_HIGH), "Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack, packTime, meta.getLastms(packTrack)); @@ -198,16 +217,15 @@ namespace Mist{ return; } // Do nothing if no page is opened for this track - if (!curPage.count(packTrack)){ + if (!page){ INFO_MSG("Trying to buffer a packet on track %" PRIu32 ", but no page is initialized", packTrack); return; } multiWrong = false; - IPC::sharedPage &myPage = curPage[packTrack]; Util::RelAccX &tPages = meta.pages(packTrack); uint32_t pageIdx = 0; - uint32_t currPagNum = curPageNum[packTrack]; + uint32_t currPagNum = atoi(page.name.data() + page.name.rfind('_') + 1); Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt(firstkey, i) == currPagNum){ @@ -218,6 +236,7 @@ namespace Mist{ // Save the current write position uint64_t pageOffset = tPages.getInt("avail", pageIdx); uint64_t pageSize = tPages.getInt("size", pageIdx); + INSANE_MSG("Current packet %" PRIu64 " on track %" PRIu32 " has an offset on page %s of %" PRIu64, packTime, packTrack, page.name.c_str(), pageOffset); // Do nothing when there is not enough free space on the page to add the packet. if (pageSize - pageOffset < packDataLen){ FAIL_MSG("Track %" PRIu32 "p%" PRIu32 " : Pack %" PRIu64 "ms of %zub exceeds size %" PRIu64 " @ bpos %" PRIu64, @@ -228,7 +247,7 @@ namespace Mist{ // First generate only the payload on the correct destination // Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is // complete - char *data = myPage.mapped + pageOffset; + char *data = page.mapped + pageOffset; data[20] = 0xE0; // start container object unsigned int offset = 21; @@ -254,18 +273,15 @@ namespace Mist{ // Copy the remaining values in reverse order: // 8 byte timestamp - Bit::htobll(myPage.mapped + pageOffset + 12, packTime); + Bit::htobll(page.mapped + pageOffset + 12, packTime); // The mapped track id - Bit::htobl(myPage.mapped + pageOffset + 8, packTrack); + Bit::htobl(page.mapped + pageOffset + 8, packTrack); // Write the size - Bit::htobl(myPage.mapped + pageOffset + 4, packDataLen - 8); + Bit::htobl(page.mapped + pageOffset + 4, packDataLen - 8); // write the 'DTP2' bytes to conclude the packet and allow for reading it - memcpy(myPage.mapped + pageOffset, "DTP2", 4); - - if (M.getLive()){ - meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); - } + memcpy(page.mapped + pageOffset, "DTP2", 4); + DONTEVEN_MSG("Setting page %" PRIu32 " available to %" PRIu64, pageIdx, pageOffset + packDataLen); tPages.setInt("avail", pageOffset + packDataLen, pageIdx); } @@ -273,10 +289,10 @@ namespace Mist{ /// /// Registers the data page on the track index page as well ///\param tid The trackid of the page to finalize - void InOutBase::bufferFinalize(size_t idx){ + void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){ // If no page is open, do nothing - if (!curPage.count(idx)){ - INFO_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx); + if (!page){ + WARN_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx); return; } @@ -298,8 +314,7 @@ namespace Mist{ // Close our link to the page. This will NOT destroy the shared page, as we've set master to // false upon construction Note: if there was a registering failure above, this WILL destroy the // shared page, to prevent a memory leak - curPage.erase(idx); - curPageNum.erase(idx); + page.close(); } /// Buffers a live packet to a page. @@ -325,7 +340,7 @@ namespace Mist{ void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ meta.reloadReplacedPagesIfNeeded(); - meta.setLive(); + meta.setLive(true); // Store the trackid for easier access // Do nothing if the trackid is invalid @@ -336,7 +351,7 @@ namespace Mist{ if (M.getType(packTrack) != "video"){ isKeyframe = false; - if (!tPages.getEndPos()){ + if (!tPages.getEndPos() || !livePage[packTrack]){ // Assume this is the first packet on the track isKeyframe = true; }else{ @@ -358,84 +373,97 @@ namespace Mist{ WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, M.getLastms(packTrack), packTime); } } - + // Determine if we need to open the next page - uint32_t nextPageNum = INVALID_KEY_NUM; if (isKeyframe){ updateTrackFromKeyframe(packTrack, packData, packDataSize); uint64_t endPage = tPages.getEndPos(); - - // If there is no page, create it - if (!endPage){ - nextPageNum = 0; - tPages.setInt("firstkey", 0, 0); - tPages.setInt("firsttime", packTime, 0); - tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0); - tPages.setInt("keycount", 0, 0); - tPages.setInt("avail", 0, 0); - tPages.addRecords(1); - ++endPage; + size_t curPage = 0; + size_t currPagNum = atoi(livePage[packTrack].name.data() + livePage[packTrack].name.rfind('_') + 1); + Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); + for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + if (tPages.getInt(firstkey, i) == currPagNum){ + curPage = i; + break; + } } - uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1); - // Compare on 8 mb boundary and target duration - if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){ + // If there is no page, create it + if (!livePage[packTrack]){ + size_t keyNum = M.getKeyNumForTime(packTrack, packTime); + if (keyNum == INVALID_KEY_NUM){ + curPageNum[packTrack] = 0; + }else{ + curPageNum[packTrack] = M.getKeyNumForTime(packTrack, packTime) + 1; + } - if ((endPage - tPages.getDeleted()) >= tPages.getRCount()){ + if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); } - // Create the book keeping data for the new page - nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1); - HIGH_MSG("Live page transition from %" PRIu32 ":%" PRIu64 " to %" PRIu32 ":%" PRIu32, packTrack, - tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum); - tPages.setInt("firstkey", nextPageNum, endPage); + + tPages.addRecords(1); + tPages.setInt("firstkey", curPageNum[packTrack], endPage); tPages.setInt("firsttime", packTime, endPage); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); tPages.setInt("keycount", 0, endPage); tPages.setInt("avail", 0, endPage); - tPages.addRecords(1); - ++endPage; - } - tPages.setInt("lastkeytime", packTime, endPage - 1); - tPages.setInt("keycount", tPages.getInt("keycount", endPage - 1) + 1, endPage - 1); - } - // Set the pageNumber if it has not been set yet - if (nextPageNum == INVALID_KEY_NUM){ - if (curPageNum.count(packTrack)){ - nextPageNum = curPageNum[packTrack]; + curPage = endPage; + DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); + if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){ + // if this fails, return instantly without actually buffering the packet + WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); + return; + } }else{ - nextPageNum = 0; + uint64_t prevPageTime = tPages.getInt("firsttime", curPage); + // Compare on 8 mb boundary and target duration + if (tPages.getInt("avail", curPage) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){ + // Create the book keeping data for the new page + curPageNum[packTrack] = tPages.getInt("firstkey", curPage) + tPages.getInt("keycount", curPage); + DONTEVEN_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%zu", packTrack, + tPages.getInt("firstkey", curPage), packTrack, curPageNum[packTrack]); + + if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ + meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); + } + + tPages.addRecords(1); + tPages.setInt("firstkey", curPageNum[packTrack], endPage); + tPages.setInt("firsttime", packTime, endPage); + tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); + tPages.setInt("keycount", 0, endPage); + tPages.setInt("avail", 0, endPage); + curPage = endPage; + if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);} + DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); + if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){ + // if this fails, return instantly without actually buffering the packet + WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); + return; + } + } } + DONTEVEN_MSG("Setting page %lu lastkeyTime to '%lu' and keycount to '%lu'", tPages.getInt("firstkey", curPage), packTime, tPages.getInt("keycount", curPage) + 1); + tPages.setInt("lastkeytime", packTime, curPage); + tPages.setInt("keycount", tPages.getInt("keycount", curPage) + 1, curPage); } - // If we have no pages by track, we have not received a starting keyframe yet. Drop this packet. - if (!tPages.getEndPos()){ - INFO_MSG("Track %" PRIu32 " not starting with a keyframe!", packTrack); + if (!livePage[packTrack]) { + INFO_MSG("Track %" PRIu32 " page %zu not starting with a keyframe!", packTrack, curPageNum[packTrack]); return; } - if (curPage.count(packTrack) && !curPage[packTrack].exists()){ - WARN_MSG("Data page was deleted - forcing source shutdown to prevent unstable state"); + if (!livePage[packTrack].exists()){ + WARN_MSG("Data page '%s' was deleted - forcing source shutdown to prevent unstable state", livePage[packTrack].name.c_str()); Util::logExitReason("data page was deleted, forcing shutdown to prevent unstable state"); - bufferFinalize(packTrack); + bufferFinalize(packTrack, livePage[packTrack]); kill(getpid(), SIGINT); return; } - - if (!curPageNum.count(packTrack) || nextPageNum != curPageNum[packTrack]){ - if (curPageNum.count(packTrack)){ - // Close the currently opened page when it exists - bufferFinalize(packTrack); - } - // Open the new page - if (!bufferStart(packTrack, nextPageNum)){ - // if this fails, return instantly without actually buffering the packet - WARN_MSG("Dropping packet for %s: (no page) %" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); - return; - } - } // Buffer the packet - bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe); + DONTEVEN_MSG("Buffering live packet (%zuB) @%" PRIu64 " ms on track %" PRIu32 " with offset %" PRIu64, packDataSize, packTime, packTrack, packOffset); + bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, livePage[packTrack]); + meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); } ///Handles updating track metadata from a new keyframe, if applicable diff --git a/src/io.h b/src/io.h index 0af9176f..e0652efd 100644 --- a/src/io.h +++ b/src/io.h @@ -19,13 +19,14 @@ namespace Mist{ size_t getMainSelectedTrack(); - bool bufferStart(size_t idx, uint32_t pageNumber); - void bufferFinalize(size_t idx); + bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page); + void bufferFinalize(size_t idx, IPC::sharedPage & page); + bool isCurrentLivePage(size_t idx, uint32_t pageNumber); void bufferRemove(size_t idx, uint32_t pageNumber); void bufferLivePacket(const DTSC::Packet &packet); void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, - size_t packDataSize, uint64_t packBytePos, bool isKeyframe); + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page); void bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe); @@ -42,7 +43,8 @@ namespace Mist{ std::map userSelect; - std::map curPageNum; ///< For each track, holds the number page that is currently being written. - std::map curPage; ///< For each track, holds the page that is currently being written. + private: + std::map livePage; + std::map curPageNum; }; }// namespace Mist diff --git a/src/output/output.cpp b/src/output/output.cpp index dd8b0ef3..28894376 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -108,6 +108,8 @@ namespace Mist{ firstData = true; newUA = true; lastPushUpdate = 0; + previousFile = ""; + currentFile = ""; lastRecv = Util::bootSecs(); if (myConn){ @@ -418,9 +420,9 @@ namespace Mist{ statComm.reload(); stats(true); if (isPushing()){return;} - if (!isRecording() && !M.getVod() && !isReadyForPlay()){ + if (!isRecording() && M.getLive() && !isReadyForPlay()){ uint64_t waitUntil = Util::bootSecs() + 45; - while (!M.getVod() && !isReadyForPlay()){ + while (M.getLive() && !isReadyForPlay()){ if (Util::bootSecs() > waitUntil || (!userSelect.size() && Util::bootSecs() > waitUntil)){ INFO_MSG("Giving up waiting for playable tracks. IP: %s", getConnectedHost().c_str()); break; @@ -537,6 +539,7 @@ namespace Mist{ if (!keys.getValidCount()){return 0;} //Get the key for the current time size_t keyNum = M.getKeyNumForTime(trk, lastPacketTime); + if (keyNum == INVALID_KEY_NUM){return 0;} if (keys.getEndValid() <= keyNum+1){return 0;} //Return the next key return keys.getTime(keyNum+1); @@ -546,7 +549,7 @@ namespace Mist{ const Util::RelAccX &tPages = M.pages(trackId); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); - if (pageNum > keyNum) break; + if (pageNum > keyNum) continue; uint64_t pageKeys = tPages.getInt("keycount", i); if (keyNum > pageNum + pageKeys - 1) continue; uint64_t pageAvail = tPages.getInt("avail", i); @@ -581,7 +584,7 @@ namespace Mist{ return; } size_t lastAvailKey = keys.getEndValid() - 1; - if (meta.getVod() && keyNum > lastAvailKey){ + if (!meta.getLive() && keyNum > lastAvailKey){ INFO_MSG("Load for track %zu key %zu aborted, is > %zu", trackId, keyNum, lastAvailKey); curPage.erase(trackId); currentPage.erase(trackId); @@ -729,6 +732,10 @@ namespace Mist{ if (M.getType(mainTrack) == "video"){ DTSC::Keys keys(M.keys(mainTrack)); uint32_t keyNum = M.getKeyNumForTime(mainTrack, pos); + if (keyNum == INVALID_KEY_NUM){ + FAIL_MSG("Attempted seek on empty track %zu", mainTrack); + return false; + } pos = keys.getTime(keyNum); } } @@ -774,13 +781,15 @@ namespace Mist{ } DTSC::Keys keys(M.keys(tid)); uint32_t keyNum = M.getKeyNumForTime(tid, pos); + if (keyNum == INVALID_KEY_NUM){ + FAIL_MSG("Attempted seek on empty track %zu", tid); + return false; + } uint64_t actualKeyTime = keys.getTime(keyNum); HIGH_MSG("Seeking to track %zu key %" PRIu32 " => time %" PRIu64, tid, keyNum, pos); if (actualKeyTime > pos){ - if (M.getLive()){ - pos = actualKeyTime; - userSelect[tid].setKeyNum(keyNum); - } + pos = actualKeyTime; + userSelect[tid].setKeyNum(keyNum); } loadPageForKey(tid, keyNum + (getNextKey ? 1 : 0)); if (!curPage.count(tid) || !curPage[tid].mapped){ @@ -818,7 +827,7 @@ namespace Mist{ VERYHIGH_MSG("Track %zu no data (key %" PRIu32 " @ %" PRIu64 ") - waiting...", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset); uint32_t i = 0; - while (!meta.getLive() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ + while (meta.getVod() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ Util::wait(100 * i); stats(); } @@ -917,7 +926,7 @@ namespace Mist{ if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){ uint64_t startRec = atoll(targetParams["recstart"].c_str()); if (startRec > endTime()){ - if (M.getVod()){ + if (!M.getLive()){ onFail("Recording start past end of non-live source", true); return; } @@ -1028,7 +1037,7 @@ namespace Mist{ size_t mainTrack = getMainSelectedTrack(); int64_t startRec = atoll(targetParams["start"].c_str()); if (startRec > M.getLastms(mainTrack)){ - if (M.getVod()){ + if (!M.getLive()){ onFail("Playback start past end of non-live source", true); return; } @@ -1360,6 +1369,9 @@ namespace Mist{ if (newTarget.rfind('?') != std::string::npos){ newTarget.erase(newTarget.rfind('?')); } + // Keep track of filenames written, so that they can be added to the playlist file + previousFile = currentFile; + currentFile = newTarget; INFO_MSG("Switching to next push target filename: %s", newTarget.c_str()); if (!connectToFile(newTarget)){ FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str()); @@ -1486,11 +1498,16 @@ namespace Mist{ // now, seek to the exact timestamp of the keyframe DTSC::Keys keys(M.keys(mainTrack)); uint32_t targetKey = M.getKeyNumForTime(mainTrack, currTime); - seek(keys.getTime(targetKey)); - // attempt to load the key into thisPacket - bool ret = prepareNext(); - if (!ret){ - WARN_MSG("Failed to load keyframe for %" PRIu64 "ms - continuing without it", currTime); + bool ret = false; + if (targetKey == INVALID_KEY_NUM){ + FAIL_MSG("No keyframes available on track %zu", mainTrack); + }else{ + seek(keys.getTime(targetKey)); + // attempt to load the key into thisPacket + ret = prepareNext(); + if (!ret){ + WARN_MSG("Failed to load keyframe for %" PRIu64 "ms - continuing without it", currTime); + } } // restore state to before the seek/load @@ -1547,7 +1564,39 @@ namespace Mist{ return false; } - Util::sortedPageInfo nxt; + Util::sortedPageInfo nxt = *(buffer.begin()); + + if (meta.reloadReplacedPagesIfNeeded()){return false;} + if (!M.getValidTracks().count(nxt.tid)){ + dropTrack(nxt.tid, "disappeared from metadata"); + return false; + } + + // if we're going to read past the end of the data page, load the next page + // this only happens for VoD + if (nxt.offset >= curPage[nxt.tid].len || + (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){ + if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ + dropTrack(nxt.tid, "end of non-live track reached", false); + return false; + } + if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){ + loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time)); + nxt.offset = 0; + //Only read the next time if the page load succeeded and there is a packet to read from + if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){ + nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0); + } + buffer.replaceFirst(nxt); + return false; + } + dropTrack(nxt.tid, "VoD page load failure"); + return false; + } + + // We know this packet will be valid, pre-load it so we know its length + DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); + uint64_t nextTime = 0; //In case we're not in sync mode, we might have to retry a few times for (size_t trackTries = 0; trackTries < buffer.size(); ++trackTries){ @@ -1582,7 +1631,6 @@ namespace Mist{ dropTrack(nxt.tid, "VoD page load failure"); return false; } - // We know this packet will be valid, pre-load it so we know its length DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); @@ -1605,10 +1653,10 @@ namespace Mist{ }else{ //no next packet yet! //Check if this is the last packet of a VoD stream. Return success and drop the track. - if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ + if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); thisIdx = nxt.tid; - dropTrack(nxt.tid, "end of VoD track reached", false); + dropTrack(nxt.tid, "end of non-live track reached", false); return true; } uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); diff --git a/src/output/output.h b/src/output/output.h index 93e3a38a..335c0b14 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -31,6 +31,8 @@ namespace Mist{ /*LTS-START*/ std::string reqUrl; /*LTS-END*/ + std::string previousFile; + std::string currentFile; // non-virtual generic functions virtual int run(); virtual void stats(bool force = false); @@ -155,6 +157,8 @@ namespace Mist{ size_t thisIdx; uint64_t thisTime; + + std::map curPage; ///< For each track, holds the page that is currently being written. }; }// namespace Mist diff --git a/src/output/output_ebml.cpp b/src/output/output_ebml.cpp index 59aec749..2c71eb2c 100644 --- a/src/output/output_ebml.cpp +++ b/src/output/output_ebml.cpp @@ -17,7 +17,7 @@ namespace Mist{ if (config->getString("target").size()){ if (config->getString("target").find(".webm") != std::string::npos){doctype = "webm";} initialize(); - if (M.getVod()){calcVodSizes();} + if (!M.getLive()){calcVodSizes();} if (!streamName.size()){ WARN_MSG("Recording unconnected EBML output to file! Cancelled."); conn.close(); @@ -139,7 +139,7 @@ namespace Mist{ if (thisPacket.getTime() >= newClusterTime){ if (liveSeek()){return;} currentClusterTime = thisPacket.getTime(); - if (M.getVod()){ + if (!M.getLive()){ // In case of VoD, clusters are aligned with the main track fragments // EXCEPT when they are more than 30 seconds long, because clusters are limited to -32 to 32 // seconds. @@ -318,7 +318,7 @@ namespace Mist{ void OutEBML::sendHeader(){ double duration = 0; size_t idx = getMainSelectedTrack(); - if (M.getVod()){ + if (!M.getLive()){ duration = M.getLastms(idx) - M.getFirstms(idx); }else{ needsLookAhead = 420; @@ -326,7 +326,7 @@ namespace Mist{ // EBML header and Segment EBML::sendElemEBML(myConn, doctype); EBML::sendElemHead(myConn, EBML::EID_SEGMENT, segmentSize); // Default = Unknown size - if (M.getVod()){ + if (!M.getLive()){ // SeekHead EBML::sendElemHead(myConn, EBML::EID_SEEKHEAD, seekSize); EBML::sendElemSeek(myConn, EBML::EID_INFO, seekheadSize); @@ -344,7 +344,7 @@ namespace Mist{ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ sendElemTrackEntry(it->first); } - if (M.getVod()){ + if (!M.getLive()){ EBML::sendElemHead(myConn, EBML::EID_CUES, cuesSize); uint64_t tmpsegSize = infoSize + tracksSize + seekheadSize + cuesSize + EBML::sizeElemHead(EBML::EID_CUES, cuesSize); @@ -407,7 +407,7 @@ namespace Mist{ // Calculate the sizes of various parts, if we're VoD. size_t totalSize = 0; - if (M.getVod()){ + if (!M.getLive()){ calcVodSizes(); // We now know the full size of the segment, thus can calculate the total size totalSize = EBML::sizeElemEBML(doctype) + EBML::sizeElemHead(EBML::EID_SEGMENT, segmentSize) + segmentSize; diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index f019094d..8aea74c8 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -125,7 +125,7 @@ namespace Mist{ } size_t skippedLines = 0; if (M.getLive() && lines.size()){ - // only print the last segment when VoD + // only print the last segment when non-live lines.pop_back(); totalDuration -= durations.back(); durations.pop_back(); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index dde45373..43c8b214 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -503,7 +503,7 @@ namespace Mist{ json_resp["width"] = 640; json_resp["height"] = (hasVideo ? 480 : 20); } - json_resp["type"] = (M.getVod() ? "vod" : "live"); + json_resp["type"] = (M.getLive() ? "live" : "vod"); if (M.getLive()){ json_resp["unixoffset"] = M.getBootMsOffset() + (Util::unixMS() - Util::bootMS()); } diff --git a/src/output/output_mp4.cpp b/src/output/output_mp4.cpp index d5695c1e..22806a70 100644 --- a/src/output/output_mp4.cpp +++ b/src/output/output_mp4.cpp @@ -243,7 +243,7 @@ namespace Mist{ + 8 // MINF Box + 36 // DINF Box + 8; // STBL Box - if (M.getVod() && M.getFirstms(it->first) != firstms){ + if (!M.getLive() && M.getFirstms(it->first) != firstms){ tmpRes += 12; // EDTS entry extra } @@ -379,7 +379,7 @@ namespace Mist{ // Construct with duration of -1, as this is the default for fragmented MP4::MVHD mvhdBox(-1); // Then override it when we are not sending a VoD asset - if (M.getVod()){ + if (!M.getLive()){ // calculating longest duration uint64_t lastms = 0; for (std::map::const_iterator it = userSelect.begin(); @@ -413,7 +413,7 @@ namespace Mist{ MP4::ELST elstBox; elstBox.setVersion(0); elstBox.setFlags(0); - if (M.getVod() && M.getFirstms(it->first) != firstms){ + if (!M.getLive() && M.getFirstms(it->first) != firstms){ elstBox.setCount(2); elstBox.setSegmentDuration(0, M.getFirstms(it->first) - firstms); @@ -1161,7 +1161,7 @@ namespace Mist{ H.StartResponse("206", "Partial content", req, myConn); } }else{ - if (M.getVod()){H.SetHeader("Content-Length", byteEnd - byteStart + 1);} + if (!M.getLive()){H.SetHeader("Content-Length", byteEnd - byteStart + 1);} H.StartResponse("200", "OK", req, myConn); }