diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 7c62d4b0..a9ff6b2b 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -2272,15 +2272,46 @@ namespace DTSC{ } } Track &t = tracks[trackIdx]; - DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+t.keys.getInt(t.keyPartsField, t.keys.getDeleted()), t.parts.getPresent()); - t.parts.deleteRecords(t.keys.getInt(t.keyPartsField, t.keys.getDeleted())); - DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.keys.getDeleted(), t.keys.getDeleted()+1, t.keys.getPresent()); + uint64_t deletedPartCount = t.keys.getInt(t.keyPartsField, t.keys.getDeleted()); + DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+deletedPartCount, t.parts.getPresent()); + t.parts.deleteRecords(deletedPartCount); + uint64_t deletedKeyNum = t.keys.getDeleted(); + DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", deletedKeyNum, deletedKeyNum+1, t.keys.getPresent()); t.keys.deleteRecords(1); if (t.fragments.getInt(t.fragmentFirstKeyField, t.fragments.getDeleted()) < t.keys.getDeleted()){ t.fragments.deleteRecords(1); setMissedFragments(trackIdx, getMissedFragments(trackIdx) + 1); } - if (t.pages.getPresent() > 1 && t.pages.getInt("firstkey", t.pages.getDeleted() + 1) < t.keys.getDeleted()){ + // Check if any page contains the just-deleted key + for (uint64_t i = t.pages.getDeleted(); i < t.pages.getEndPos(); i++){ + + uint64_t thisKey = t.pages.getInt("firstkey", i); + uint64_t avtmp = t.pages.getInt("avail", i); + uint64_t keycount = t.pages.getInt("keycount", i); + DONTEVEN_MSG("Found page idx=%lu number=%lu avail=%lu, keycount=%lu", i, thisKey, avtmp, keycount); + + uint64_t pageNum = t.pages.getInt("firstkey", i); + if (pageNum > deletedKeyNum) continue; + uint64_t keyCount = t.pages.getInt("keycount", i); + if (keyCount){ + if (pageNum + keyCount - 1 < deletedKeyNum) continue; + }else if (pageNum < deletedKeyNum) continue; + + uint64_t avail = t.pages.getInt("avail", i); + if (avail){ + break; + } + // 'Resize' the page to whatever keys are still available + if (t.pages.getInt("keycount", i) > 1){ + DONTEVEN_MSG("Key count %lu -> %lu", t.pages.getInt("keycount", i), t.pages.getInt("keycount", i) - 1); + t.pages.setInt("keycount", t.pages.getInt("keycount", i) - 1, i); + DONTEVEN_MSG("Part count %lu -> %lu", t.pages.getInt("parts", i), t.pages.getInt("parts", i) - deletedPartCount); + t.pages.setInt("parts", t.pages.getInt("parts", i) - deletedPartCount, i); + DONTEVEN_MSG("First key %lu -> %lu", t.pages.getInt("firstkey", i), t.pages.getInt("firstkey", i) + 1); + t.pages.setInt("firstkey", t.pages.getInt("firstkey", i) + 1, i); + break; + } + // Unload the page if there are no more keys left on it // Initialize the correct page, make it master so it gets cleaned up when leaving scope. char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx, @@ -2290,6 +2321,7 @@ namespace DTSC{ // Then delete the page entry t.pages.deleteRecords(1); + break; } setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted())); if (resizeLock){resizeLock.unlink();} diff --git a/src/input/input.cpp b/src/input/input.cpp index 2fac23e6..89e4d3e4 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -48,10 +48,11 @@ namespace Mist{ const Util::RelAccX &tPages = M.pages(track); if (!tPages.getEndPos()){return;} DTSC::Keys keys(M.keys(track)); - if (i > keys.getValidCount()){return;} + if (i > keys.getEndValid()){return;} uint64_t pageIdx = 0; for (uint64_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){ - if (tPages.getInt("firstkey", j) > i) break; + uint64_t thisKey = tPages.getInt("firstkey", j); + if (thisKey > i) break; pageIdx = j; } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); @@ -210,7 +211,19 @@ namespace Mist{ capa["optional"]["realtime"]["name"] = "Simulated Live"; capa["optional"]["realtime"]["help"] = "Make this input run as a simulated live stream"; capa["optional"]["realtime"]["option"] = "--realtime"; + option.null(); + option["short"] = "P"; + option["long"] = "pagetimeout"; + option["arg"] = "integer"; + option["value"].append(DEFAULT_PAGE_TIMEOUT); + option["help"] = "For bufferless or live inputs like HLS, set the timeout in seconds for old, inactive pages to be deleted. A longer value results in more memory usage, but ensures that recently buffered data stays in memory for longer"; + config->addOption("pagetimeout", option); + capa["optional"]["pagetimeout"]["name"] = "Memory page timeout"; + capa["optional"]["pagetimeout"]["help"] = "For bufferless or live inputs like HLS, set the timeout in seconds for old, inactive pages to be deleted. A longer value results in more memory usage, but ensures that recently buffered data stays in memory for longer"; + capa["optional"]["pagetimeout"]["option"] = "--pagetimeout"; + capa["optional"]["pagetimeout"]["type"] = "uint"; + capa["optional"]["pagetimeout"]["default"] = DEFAULT_PAGE_TIMEOUT; /*LTS-END*/ capa["optional"]["debug"]["name"] = "debug"; @@ -1226,16 +1239,21 @@ namespace Mist{ } void Input::removeUnused(){ - uint64_t timeout = config->getInteger("pagetimeout") * 1000; + uint64_t timeout = config->getInteger("pagetimeout"); + uint64_t bufferTime = timeout * 1000; + if (config->hasOption("bufferTime")){ + bufferTime = config->getInteger("bufferTime"); + } uint64_t cTime = Util::bootSecs(); for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ std::set deletedEntries; for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ - if (isRecentLivePage(it->first, it2->first, timeout)){continue;} - if (cTime > it2->second + DEFAULT_PAGE_TIMEOUT){ + if (isRecentLivePage(it->first, it2->first, bufferTime)){continue;} + if (cTime > it2->second + timeout){ deletedEntries.insert(it2->first); bufferRemove(it->first, it2->first); + HIGH_MSG("Unloading page %u track %lu", it2->first, it->first); } } while (deletedEntries.size()){ @@ -1470,20 +1488,27 @@ namespace Mist{ const Util::RelAccX &tPages = M.pages(idx); DTSC::Keys keys(M.keys(idx)); - uint32_t keyCount = keys.getValidCount(); + uint64_t firstKey = keys.getFirstValid(); + if (keyNum < firstKey){ + HIGH_MSG("Key %" PRIu32 " on track %zu no longer seekable (earliest requestable key is %" PRIu64 + "). Cancelling buffering.", + keyNum, idx, firstKey); + return true; + } + uint64_t lastKey = keys.getEndValid(); + if (keyNum > lastKey){ + // End of movie here, returning true to avoid various error messages + if (keyNum > lastKey + 1){ + WARN_MSG("Key %" PRIu32 " on track %zu is higher than total (latest key is %" PRIu64 + "). Cancelling buffering.", + keyNum, idx, lastKey); + } + return true; + } if (!tPages.getEndPos()){ WARN_MSG("No pages for track %zu found! Cancelling bufferFrame", idx); return false; } - if (keyNum > keyCount){ - // End of movie here, returning true to avoid various error messages - if (keyNum > keyCount + 1){ - WARN_MSG("Key %" PRIu32 " on track %zu is higher than total (%" PRIu32 - "). Cancelling buffering.", - keyNum, idx, keyCount); - } - return true; - } uint64_t pageIdx = 0; for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt("firstkey", i) > keyNum) break; @@ -1553,8 +1578,7 @@ namespace Mist{ } }else{ size_t prevPos = 0; - size_t partNo = 0; - for (size_t i = 0; i < keyNum; ++i){partNo += keys.getParts(i);} + size_t partNo = keys.getFirstPart(keyNum); DTSC::Parts parts(M.parts(idx)); while (thisPacket && thisTime < stopTime){ if (connectedUsers || isAlwaysOn()){activityCounter = Util::bootSecs();} diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 014092e1..a840e53e 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -132,9 +132,10 @@ namespace Mist{ static unsigned int plsTotalCount = 0; /// Total playlists active static unsigned int plsInitCount = 0; /// Count of playlists fully inited - bool streamIsLive; - uint32_t globalWaitTime; - std::map > listEntries; + bool streamIsLive; //< Playlist can be sliding window or get new segments appended + bool streamIsVOD; //< Playlist segments do not disappear + uint32_t globalWaitTime; //< Time between playlist reloads, based on TARGETDURATION + std::map > listEntries; //< Segments currently in the playlist // These are used in the HTTP::Downloader callback, to prevent timeouts when downloading // segments/playlists. @@ -201,9 +202,8 @@ namespace Mist{ uriSrc = uriSource; } if (uriSrc.size()){INFO_MSG("Adding variant playlist: %s -> %s", relurl.c_str(), uriSrc.c_str());} - lastFileIndex = 0; + lastSegment = 0; waitTime = 2; - playlistEnd = false; noChangeCount = 0; lastTimestamp = 0; root = HTTP::URL(uriSrc); @@ -215,6 +215,7 @@ namespace Mist{ memset(keyAES, 0, 16); startTime = Util::bootSecs(); reloadNext = 0; + firstIndex = 0; } /// Returns true if there is no protocol defined in the playlist root URL. @@ -479,7 +480,7 @@ namespace Mist{ /// Handles both initial load and future reloads. /// Returns how many segments were added to the internal segment list. bool Playlist::reload(){ - uint64_t fileNo = 0; + uint64_t bposCounter = 1; nextUTC = 0; // Make sure we don't use old timestamps std::string line; std::string key; @@ -490,9 +491,6 @@ namespace Mist{ std::string keyIV; int count = 0; - uint64_t totalBytes = 0; - - playlistType = LIVE; // Temporary value std::istringstream urlSource; std::ifstream fileSource; @@ -568,25 +566,32 @@ namespace Mist{ if (waitTime < 2){waitTime = 2;} } + // Assuming this always comes before any segment if (key == "MEDIA-SEQUENCE"){ - fileNo = atoll(val.c_str()); + // Reinit the segment counter + firstIndex = atoll(val.c_str()); + bposCounter = firstIndex + 1; } + if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);} if (key == "PLAYLIST-TYPE"){ if (val == "VOD"){ - playlistType = VOD; + streamIsVOD = true; + streamIsLive = false; }else if (val == "LIVE"){ - playlistType = LIVE; + streamIsVOD = false; + streamIsLive = true; }else if (val == "EVENT"){ - playlistType = EVENT; + streamIsVOD = true; + streamIsLive = true; } } + // Once we see this tag, the entire playlist becomes VOD if (key == "ENDLIST"){ - // end of playlist reached! - playlistEnd = true; - playlistType = VOD; + streamIsVOD = true; + streamIsLive = false; } continue; } @@ -600,30 +605,28 @@ namespace Mist{ std::getline(input, filename); // check for already added segments - DONTEVEN_MSG("Current file has index #%" PRIu64 ", last index was #%" PRIu64 "", fileNo, lastFileIndex); - if (fileNo >= lastFileIndex){ + DONTEVEN_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); + if (bposCounter > lastSegment){ cleanLine(filename); char ivec[16]; if (keyIV.size()){ parseKey(keyIV, ivec, 16); }else{ memset(ivec, 0, 16); - Bit::htobll(ivec + 8, fileNo); + Bit::htobll(ivec + 8, bposCounter); } - addEntry(root.link(filename).getUrl(), filename, f, totalBytes, keys[keyUri], std::string(ivec, 16)); - lastFileIndex = fileNo + 1; + addEntry(root.link(filename).getUrl(), filename, f, bposCounter, keys[keyUri], std::string(ivec, 16)); + lastSegment = bposCounter; ++count; } nextUTC = 0; - ++fileNo; + ++bposCounter; } // VOD over HTTP needs to be processed as LIVE. if (!isUrl()){ fileSource.close(); } - // Set the global live/vod bool to live if this playlist looks like a live playlist - if (playlistType == LIVE){streamIsLive = true;} if (globalWaitTime < waitTime){globalWaitTime = waitTime;} @@ -648,7 +651,7 @@ namespace Mist{ } /// Adds playlist segments to be processed - void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &totalBytes, + void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &bpos, const std::string &key, const std::string &iv){ // if (!isSupportedFile(filename)){ // WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); @@ -659,7 +662,7 @@ namespace Mist{ entry.filename = absolute_filename; entry.relative_filename = filename; cleanLine(entry.filename); - entry.bytePos = totalBytes; + entry.bytePos = bpos; entry.duration = duration; if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){ DTSC::veryUglyJitterOverride = entry.duration * 1000; @@ -679,7 +682,6 @@ namespace Mist{ std::string test = root.link(entry.filename).getFilePath(); fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));} - totalBytes += fileSource.tellg(); } entry.timestamp = lastTimestamp + startTime; @@ -700,14 +702,15 @@ namespace Mist{ inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){ zUTC = nUTC = 0; self = this; - streamIsLive = false; + streamIsLive = true; //< default to sliding window playlist + streamIsVOD = false; //< default to sliding window playlist globalWaitTime = 0; currentPlaylist = 0; streamOffset = 0; + isInitialRun = false; pidCounter = 1; - isLiveDVR = false; previousSegmentIndex = -1; currentIndex = 0; @@ -738,6 +741,21 @@ namespace Mist{ capa["codecs"]["audio"].append("AC3"); capa["codecs"]["audio"].append("MP3"); + JSON::Value option; + option["arg"] = "integer"; + option["long"] = "buffer"; + option["short"] = "b"; + option["help"] = "Live buffer window in ms. Segments within this range from the live point will be kept in memory"; + option["value"].append(50000); + config->addOption("bufferTime", option); + capa["optional"]["bufferTime"]["name"] = "Buffer time (ms)"; + capa["optional"]["bufferTime"]["help"] = + "Live buffer window in ms. Segments within this range from the live point will be kept in memory"; + capa["optional"]["bufferTime"]["option"] = "--buffer"; + capa["optional"]["bufferTime"]["type"] = "uint"; + capa["optional"]["bufferTime"]["default"] = 50000; + option.null(); + inFile = NULL; } @@ -756,99 +774,18 @@ namespace Mist{ return false; } - // If the playlist is of event type, init the amount of segments in the playlist - if (isLiveDVR){ - - // Set the previousSegmentIndex by quickly going through the existing PLS files - for (std::map >::iterator pListIt = listEntries.begin(); - pListIt != listEntries.end(); - pListIt++){ - parsedSegments[pListIt->first] = 0; - INFO_MSG("Playlist %" PRIu32 " contains %zu segments", pListIt->first, pListIt->second.size()); - } - + // Segments can be added (and removed if VOD is false) + if (streamIsLive){ meta.setLive(true); + } + // Segments can not be removed + if (streamIsVOD){ meta.setVod(true); - streamIsLive = true; } return true; } - void inputHLS::parseStreamHeader(){ - if (!readExistingHeader()){ - if (!initPlaylist(config->getString("input"))){ - Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); - return; - } - uint64_t oldBootMsOffset = M.getBootMsOffset(); - meta.reInit(isSingular() ? streamName : "", false); - meta.setUTCOffset(zUTC); - meta.setBootMsOffset(oldBootMsOffset); - INFO_MSG("Parsing live stream to create header..."); - TS::Packet packet; // to analyse and extract data - int pidCounter = 1; - - tthread::lock_guard guard(entryMutex); - for (std::map >::iterator pListIt = listEntries.begin(); - pListIt != listEntries.end(); pListIt++){ - // Skip empty playlists - if (!pListIt->second.size()){continue;} - int prepidCounter = pidCounter; - tsStream.clear(); - - for (std::deque::iterator entryIt = pListIt->second.begin(); - entryIt != pListIt->second.end(); ++entryIt){ - keepAlive(); - if (!segDowner.loadSegment(*entryIt)){ - WARN_MSG("Skipping segment that could not be loaded in an attempt to recover"); - tsStream.clear(); - continue; - } - - do{ - if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){ - WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str()); - tsStream.clear(); - break; // Abort load - } - tsStream.parse(packet, entryIt->bytePos); - - if (tsStream.hasPacketOnEachTrack()){ - while (tsStream.hasPacket()){ - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); - - size_t idx = M.trackIDToIndex(packetId, getpid()); - if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - if (idx != INVALID_TRACK_ID){ - meta.setMinKeepAway(idx, globalWaitTime * 2000); - VERYHIGH_MSG("setting minKeepAway = %" PRIu32 " for track: %zu", globalWaitTime * 2000, idx); - } - } - } - break; // we have all tracks discovered, next playlist! - } - }while (!segDowner.atEnd()); - if (!segDowner.atEnd()){ - segDowner.close(); - tsStream.clear(); - } - - if (prepidCounter < pidCounter){break;}// We're done reading this playlist! - } - } - } - tsStream.clear(); - currentPlaylist = 0; - segDowner.close(); // make sure we have nothing left over - INFO_MSG("header complete, beginning live ingest of %d tracks", pidCounter - 1); - } - bool inputHLS::readExistingHeader(){ if (!Input::readExistingHeader()){ INFO_MSG("Could not read existing header, regenerating"); @@ -873,12 +810,24 @@ namespace Mist{ } // Recover playlist entries tthread::lock_guard guard(entryMutex); - HTTP::URL root(config->getString("input")); + HTTP::URL root = HTTP::localURIResolver().link(config->getString("input")); jsonForEachConst(M.inputLocalVars["playlistEntries"], i){ uint64_t plNum = JSON::Value(i.key()).asInt(); + if (M.inputLocalVars["playlistEntries"][i.key()].size() < listEntries[plNum].size()){ + INFO_MSG("Header needs update as the amount of segments in the playlist has decreased, regenerating header"); + return false; + } std::deque newList; jsonForEachConst(*i, j){ const JSON::Value & thisEntry = *j; + if (thisEntry[1u].asInt() < playlistMapping[plNum].firstIndex + 1){ + INFO_MSG("Skipping segment %lu which is present in the header, but no longer available in the playlist", thisEntry[1u].asInt()); + continue; + } + if (thisEntry[1u].asInt() > playlistMapping[plNum].firstIndex + listEntries[plNum].size()){ + INFO_MSG("Header needs update as the segment index has decreased. The stream has likely restarted, regenerating"); + return false; + } playListEntries newEntry; newEntry.relative_filename = thisEntry[0u].asString(); newEntry.filename = root.link(M.inputLocalVars["playlist_urls"][i.key()]).link(thisEntry[0u].asString()).getUrl(); @@ -913,8 +862,13 @@ namespace Mist{ jsonForEachConst(M.inputLocalVars["parsedSegments"], i){ uint64_t key = JSON::Value(i.key()).asInt(); uint64_t val = i->asInt(); + // If there was a jump in MEDIA-SEQUENCE, start from there + if (val < playlistMapping[key].firstIndex){ + INFO_MSG("Detected a jump in MEDIA-SEQUENCE, adjusting segment counter from %lu to %lu", val, playlistMapping[key].firstIndex); + val = playlistMapping[key].firstIndex; + } parsedSegments[key] = val; - playlistMapping[key].lastFileIndex = val; + playlistMapping[key].lastSegment = val; INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val); } @@ -926,8 +880,12 @@ namespace Mist{ return true; } + void inputHLS::parseStreamHeader(){ + streamIsVOD = false; + readHeader(); + } + bool inputHLS::readHeader(){ - if (streamIsLive && !isLiveDVR){return true;} // to analyse and extract data TS::Packet packet; char *data; @@ -947,6 +905,8 @@ namespace Mist{ pListIt != listEntries.end() && config->is_active; pListIt++){ tsStream.clear(); uint32_t entId = 0; + bool foundAtLeastOnePacket = false; + INFO_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); for (std::deque::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end() && config->is_active; entryIt++){ @@ -980,14 +940,15 @@ namespace Mist{ uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize); + meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); tsStream.getEarliestPacket(headerPack); + foundAtLeastOnePacket = true; } } // No packets available, so read the next TS packet if available if (segDowner.readNext()){ packet.FromPointer(segDowner.packetPtr); - tsStream.parse(packet, entId); + tsStream.parse(packet, entryIt->bytePos); } } // get last packets @@ -1009,7 +970,7 @@ namespace Mist{ uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize); + meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); tsStream.getEarliestPacket(headerPack); } // Finally save the offset as part of the TS segment. This is required for bufferframe @@ -1026,6 +987,12 @@ namespace Mist{ if (streamStatus && streamStatus.len > 1){ streamStatus.mapped[1] = (255 * currentSegment) / totalSegments; } + + // Init segment counters to what was set to MEDIA-SEQUENCE + parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment; + + // For non-vod, only parse the first segment for each playlist + if (!streamIsVOD && foundAtLeastOnePacket){break;} } } if (!config->is_active){return false;} @@ -1033,9 +1000,9 @@ namespace Mist{ // set bootMsOffset in order to display the program time correctly in the player meta.setUTCOffset(zUTC); if (M.getLive()){meta.setBootMsOffset(streamOffset);} - if (streamIsLive && !isLiveDVR){return true;} injectLocalVars(); + isInitialRun = true; return true; } @@ -1064,7 +1031,7 @@ namespace Mist{ thisPlaylist.append(thisEntries); } allEntries[JSON::Value(pListIt->first).asString()] = thisPlaylist; - meta.inputLocalVars["parsedSegments"][JSON::Value(pListIt->first).asString()] = pListIt->second.size(); + meta.inputLocalVars["parsedSegments"][JSON::Value(pListIt->first).asString()] = parsedSegments[pListIt->first]; } meta.inputLocalVars["playlist_urls"] = playlist_urls; meta.inputLocalVars["playlistEntries"] = allEntries; @@ -1079,20 +1046,18 @@ namespace Mist{ meta.inputLocalVars["pidMappingR"] = thisMappingsR; } - bool inputHLS::needsLock(){ - if (config->getBool("realtime")){return false;} - if (isLiveDVR){ - return true; - } - return !streamIsLive; - } - /// \brief Parses new segments added to playlist files as live data /// \param segmentIndex: the index of the segment in the current playlist /// \return True if the segment has been buffered successfully bool inputHLS::parseSegmentAsLive(uint64_t segmentIndex){ bool hasOffset = false; bool hasPacket = false; + uint64_t bufferTime = config->getInteger("pagetimeout"); + if (config->hasOption("bufferTime")){ + bufferTime = config->getInteger("bufferTime") / 1000; + } + // Used to immediately mark pages for removal when we're bursting through segments on initial boot + uint64_t curTimeout = Util::bootSecs() - bufferTime; // Keep our own variables to make sure buffering live data does not interfere with VoD pages loading TS::Packet packet; TS::Stream tsStream; @@ -1136,19 +1101,25 @@ namespace Mist{ tsStream.initializeMetadata(meta, tmpTrackId, packetId); idx = M.trackIDToIndex(packetId, getpid()); } + playlistMapping[currentPlaylist].tracks[idx] = true; headerPack.getString("data", data, dataLen); // keyframe data exists, so always add 19 bytes keyframedata. uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe")); + bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); + if (isInitialRun){ + pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; + }else{ + pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); + } tsStream.getEarliestPacket(headerPack); } } // No packets available, so read the next TS packet if available if (segDowner.readNext()){ packet.FromPointer(segDowner.packetPtr); - tsStream.parse(packet, segmentIndex + 1); + tsStream.parse(packet, curList.at(segmentIndex).bytePos); } } // get last packets @@ -1168,24 +1139,61 @@ namespace Mist{ tsStream.initializeMetadata(meta, tmpTrackId, packetId); idx = M.trackIDToIndex(packetId, getpid()); } + playlistMapping[currentPlaylist].tracks[idx] = true; headerPack.getString("data", data, dataLen); // keyframe data exists, so always add 19 bytes keyframedata. uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe")); + bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); + if (isInitialRun){ + pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; + }else{ + pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); + } tsStream.getEarliestPacket(headerPack); } return true; } - /// \brief Override userLeadOut to buffer new data as live packets - void inputHLS::userLeadOut(){ - Input::userLeadOut(); - if (!isLiveDVR){ + void inputHLS::streamMainLoop(){ + parseLivePoint(); + } + + // Removes any metadata which is no longer and the playlist or buffered in memory + void inputHLS::updateMeta(){ + // EVENT and VOD type playlists should never segments disappear from the start + // Only LIVE (sliding-window) type playlists should execute updateMeta() + if (streamIsVOD || !streamIsLive){ return; } + for (std::map::iterator trackIdx = playlistMapping[currentPlaylist].tracks.begin(); + trackIdx != playlistMapping[currentPlaylist].tracks.end(); trackIdx++){ + // Calc after how many MS segments are no longer part of the buffer window + uint64_t bufferTime = config->getInteger("pagetimeout"); + if (config->hasOption("bufferTime")){ + bufferTime = config->getInteger("bufferTime") / 1000; + } + // Remove keys which are not requestable anymore + while (true) { + DTSC::Keys keys = M.getKeys(trackIdx->first); + // Stop if the earliest key is still in the playlist + if (listEntries[currentPlaylist].front().bytePos < keys.getBpos(keys.getFirstValid())){ + break; + } + // Stop if earliest key is still in the buffer window + if (listEntries[currentPlaylist].back().timestamp - listEntries[currentPlaylist].front().timestamp < bufferTime){ + break; + } + // First key could still be in memory, but is no longer seekable: drop it + HIGH_MSG("Removing key %lu @%lu ms on track %lu from metadata", M.getKeys(trackIdx->first).getFirstValid(), M.getFirstms(trackIdx->first), trackIdx->first); + meta.removeFirstKey(trackIdx->first); + } + } + } + + void inputHLS::parseLivePoint(){ uint64_t maxTime = Util::bootMS() + 500; // Update all playlists to make sure listEntries contains all live segments for (std::map::iterator pListIt = playlistMapping.begin(); @@ -1194,19 +1202,59 @@ namespace Mist{ pListIt->second.reload(); } currentPlaylist = pListIt->first; - if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist]){ - INFO_MSG("Current playlist has parsed %" PRIu64 "/%zu entries", parsedSegments[currentPlaylist], listEntries[currentPlaylist].size()); + const uint64_t firstIdx = playlistMapping[currentPlaylist].firstIndex; + + // If the segment counter decreases, reset counters and remove old segments from metadata + if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){ + WARN_MSG("Segment counter for playlist %lu has decreased to %lu. Exiting to reset stream", currentPlaylist, firstIdx); + config->is_active = false; + Util::logExitReason(ER_FORMAT_SPECIFIC, "Segment counter decreased. Exiting to reset stream"); + return; } - for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){ - INFO_MSG("Adding entry #%" PRIu64 " as live data", entryIt+1); + + // Remove segments from listEntries as soon as it is no longer requestable + { + tthread::lock_guard guard(entryMutex); + while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){ + MEDIUM_MSG("Segment #%lu no longer in the input playlist", firstIdx + 1); + listEntries[currentPlaylist].pop_front(); + } + } + + // Unload memory pages which are outside of the buffer window and not recently loaded + removeUnused(); + + // Remove meta info for expired keys + updateMeta(); + + // Check for new segments + if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){ + INFO_MSG("Playlist #%lu has parsed %" PRIu64 "/%zu entries. Parsing new segments...", currentPlaylist, parsedSegments[currentPlaylist] - firstIdx, listEntries[currentPlaylist].size()); + }else if (isInitialRun){ + isInitialRun = false; + } + if (parsedSegments[currentPlaylist] < firstIdx){ + WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx); + parsedSegments[currentPlaylist] = firstIdx; + } + for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){ + MEDIUM_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1); if (parseSegmentAsLive(entryIt)){ - parsedSegments[currentPlaylist]++; + parsedSegments[currentPlaylist] = firstIdx + entryIt + 1; } if (Util::bootMS() > maxTime){return;} } } } + /// \brief Override userLeadOut to buffer new data as live packets + void inputHLS::userLeadOut(){ + Input::userLeadOut(); + if (streamIsLive){ + parseLivePoint(); + } + } + bool inputHLS::openStreamSource(){return true;} void inputHLS::getNext(size_t idx){ @@ -1242,7 +1290,12 @@ namespace Mist{ return; } - uint64_t packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); + uint64_t packetTime = thisPacket.getTime(); + if (listEntries[currentPlaylist].at(currentIndex).timeOffset){ + packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset; + }else{ + packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); + } INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); // overwrite trackId on success Bit::htobl(thisPacket.getData() + 8, tid); @@ -1255,7 +1308,7 @@ namespace Mist{ // No? Let's read some more data and check again. if (!segDowner.atEnd() && segDowner.readNext()){ tsBuf.FromPointer(segDowner.packetPtr); - tsStream.parse(tsBuf, streamIsLive && !isLiveDVR ? 0 : currentIndex + 1); + tsStream.parse(tsBuf, listEntries[currentPlaylist].at(currentIndex).bytePos); continue; // check again } @@ -1319,19 +1372,19 @@ namespace Mist{ DTSC::Keys keys(M.keys(idx)); for (size_t i = keys.getFirstValid(); i < keys.getEndValid(); i++){ if (keys.getTime(i) > seekTime){ - VERYHIGH_MSG("Found elapsed key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1); + VERYHIGH_MSG("Found elapsed key with a time of %" PRIu64 " ms. Using playlist index %zu to match requested time %lu", keys.getTime(i), plistEntry, seekTime); break; } - VERYHIGH_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1); - plistEntry = keys.getBpos(i); + // Keys can still be accessible in memory. Skip any segments we cannot seek to in the playlist + if (keys.getBpos(i) <= playlistMapping[currentPlaylist].firstIndex){ + INSANE_MSG("Skipping segment #%lu (key %lu @ %lu ms) for seeking, as it is no longer available in the playlist", keys.getBpos(i) - 1, i, keys.getTime(i)); + continue; + } + plistEntry = keys.getBpos(i) - 1 - playlistMapping[currentPlaylist].firstIndex; + INSANE_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), plistEntry); } - if (plistEntry < 1){ - WARN_MSG("attempted to seek outside the file"); - return; - } - - currentIndex = plistEntry - 1; + currentIndex = plistEntry; currentPlaylist = getMappedTrackPlaylist(trackId); VERYHIGH_MSG("Seeking to index %zu on playlist %" PRIu64, currentIndex, currentPlaylist); @@ -1362,8 +1415,8 @@ namespace Mist{ while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){ if (!segDowner.readNext()){break;} tsBuffer.FromPointer(segDowner.packetPtr); - tsStream.parse(tsBuffer, streamIsLive && !isLiveDVR ? 0 : plistEntry); - } + tsStream.parse(tsBuffer, listEntries[currentPlaylist].at(currentIndex).bytePos); + } } /// \brief Applies any offset to the packets original timestamp @@ -1446,15 +1499,6 @@ namespace Mist{ return packetId; } - size_t inputHLS::getEntryId(uint32_t playlistId, uint64_t bytePos){ - if (bytePos == 0){return 0;} - tthread::lock_guard guard(entryMutex); - for (int i = 0; i < listEntries[playlistId].size(); i++){ - if (listEntries[playlistId].at(i).bytePos > bytePos){return i - 1;} - } - return listEntries[playlistId].size() - 1; - } - uint64_t inputHLS::getOriginalTrackId(uint32_t playlistId, uint32_t id){ return pidMapping[(((uint64_t)playlistId) << 32) + id]; } @@ -1483,11 +1527,6 @@ namespace Mist{ return lastOut; } - /// \brief Sets parsedSegments for all playlists, specifying how many segments - /// have already been parsed. Additional segments can then be parsed as live data - void inputHLS::setParsedSegments(){ - } - /// Parses the main playlist, possibly containing variants. bool inputHLS::initPlaylist(const std::string &uri, bool fullInit){ // Used to set zUTC, in case the first EXT-X-PROGRAM-DATE-TIME does not appear before the first segment @@ -1554,8 +1593,12 @@ namespace Mist{ // skip empty lines in the playlist continue; } - if (line.compare(0, 26, "#EXT-X-PLAYLIST-TYPE:EVENT") == 0){isLiveDVR = true;} - if (line.compare(0, 14, "#EXT-X-ENDLIST") == 0){isLiveDVR = false;} + if (line.compare(0, 14, "#EXT-X-ENDLIST") == 0){ + streamIsLive = false; + streamIsVOD = true; + meta.setLive(false); + meta.setVod(true); + } if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){ // this is a variant playlist file.. next line is an uri to a playlist // file @@ -1693,37 +1736,27 @@ namespace Mist{ playListEntries ntry; // This scope limiter prevents the recursion down below from deadlocking us { + // Switch to next file + currentIndex++; tthread::lock_guard guard(entryMutex); std::deque &curList = listEntries[currentPlaylist]; - INSANE_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); - if (!curList.size()){ - INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist); - if (streamIsLive || isLiveDVR){Util::wait(500);} + INFO_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); + if (curList.size() <= currentIndex){ + if (streamIsLive){ + INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist); + if (Util::bootSecs() < ntry.timestamp){ + VERYHIGH_MSG("Slowing down to realtime..."); + while (Util::bootSecs() < ntry.timestamp){ + keepAlive(); + Util::wait(250); + } + } + }else{ + INFO_MSG("Reached last entry in playlist %" PRIu64, currentPlaylist); + } return false; } - if (!streamIsLive || isLiveDVR){ - // VoD advances the index by one and attempts to read - // The playlist is not altered in this case, since we may need to seek back later - currentIndex++; - if (curList.size() - 1 < currentIndex){ - INFO_MSG("Reached last entry"); - return false; - } - ntry = curList[currentIndex]; - }else{ - // Live does not use the currentIndex, but simply takes the first segment - // That segment is then removed from the playlist so we don't read it again - live streams can't seek anyway - ntry = *curList.begin(); - curList.pop_front(); - - if (Util::bootSecs() < ntry.timestamp){ - VERYHIGH_MSG("Slowing down to realtime..."); - while (Util::bootSecs() < ntry.timestamp){ - keepAlive(); - Util::wait(250); - } - } - } + ntry = curList[currentIndex]; } if (!segDowner.loadSegment(ntry)){ @@ -1769,7 +1802,7 @@ namespace Mist{ } void inputHLS::finish(){ - if (isLiveDVR){ + if (!streamIsVOD){ //< Already generated from readHeader INFO_MSG("Writing updated header to disk"); injectLocalVars(); M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl()); @@ -1778,7 +1811,7 @@ namespace Mist{ } void inputHLS::checkHeaderTimes(const HTTP::URL & streamFile){ - if (isLiveDVR){return;} + if (streamIsLive){return;} //< Since the playlist will likely be newer than the DTSH for live-dvr Input::checkHeaderTimes(streamFile); } diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 7d0cc334..c805e108 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -16,9 +16,8 @@ namespace Mist{ - enum PlaylistType{VOD, LIVE, EVENT}; - - extern bool streamIsLive; + extern bool streamIsLive; //< Playlist can be sliding window or get new segments appended + extern bool streamIsVOD; //< Playlist segments do not disappear extern uint32_t globalWaitTime; // largest waitTime for any playlist we're loading - used to update minKeepAway void parseKey(std::string key, char *newKey, unsigned int len); @@ -80,7 +79,7 @@ namespace Mist{ Playlist(const std::string &uriSrc = ""); bool isUrl() const; bool reload(); - void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &totalBytes, + void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &bpos, const std::string &key, const std::string &keyIV); bool isSupportedFile(const std::string filename); @@ -93,15 +92,16 @@ namespace Mist{ uint32_t id; bool playlistEnd; int noChangeCount; - uint64_t lastFileIndex; uint64_t waitTime; - PlaylistType playlistType; uint64_t lastTimestamp; uint64_t startTime; uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist char keyAES[16]; std::map keys; + uint64_t firstIndex; //< the index of the first segment in the playlist + uint64_t lastSegment; + std::map tracks; }; void playlistRunner(void *ptr); @@ -110,7 +110,7 @@ namespace Mist{ public: inputHLS(Util::Config *cfg); ~inputHLS(); - bool needsLock(); + bool needsLock(){return !config->getBool("realtime");} bool openStreamSource(); bool callback(); @@ -119,7 +119,6 @@ namespace Mist{ uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header unsigned int startTime; - PlaylistType playlistType; SegmentDownloader segDowner; int version; int targetDuration; @@ -145,14 +144,12 @@ namespace Mist{ // Used to map packetId of packets in pidMapping int pidCounter; - /// HLS live VoD stream, set if: #EXT-X-PLAYLIST-TYPE:EVENT - bool isLiveDVR; // Override userLeadOut to buffer new data as live packets void userLeadOut(); + // Removes any metadata which is no longer and the playlist or buffered in memory + void updateMeta(); /// Tries to add as much live packets from a TS file at the given location bool parseSegmentAsLive(uint64_t segmentIndex); - // Updates parsedSegmentIndex for all playlists - void setParsedSegments(); // index of last playlist entry finished parsing long previousSegmentIndex; @@ -174,16 +171,19 @@ namespace Mist{ bool readNextFile(); void parseStreamHeader(); + void parseLivePoint(); + void streamMainLoop(); uint32_t getMappedTrackId(uint64_t id); uint32_t getMappedTrackPlaylist(uint64_t id); uint64_t getOriginalTrackId(uint32_t playlistId, uint32_t id); uint64_t getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC = 0); uint64_t getPacketID(uint64_t currentPlaylist, uint64_t trackId); - size_t getEntryId(uint32_t playlistId, uint64_t bytePos); virtual void finish(); void injectLocalVars(); virtual void checkHeaderTimes(const HTTP::URL & streamFile); + // Used to immediately mark pages for removal when we're bursting through segments on initial boot + bool isInitialRun; }; }// namespace Mist diff --git a/src/io.cpp b/src/io.cpp index 31573924..14efa588 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -107,9 +107,9 @@ namespace Mist{ return true; } - /// Checks whether a given page is currently being written to - /// \return True if the page is the current live page, and thus not safe to remove - bool InOutBase::isCurrentLivePage(size_t idx, uint32_t pageNumber){ + /// Checks whether a given page was recently being written to + /// \return True if the page is in the current live window, and thus not safe to remove + bool InOutBase::isRecentLivePage(size_t idx, uint32_t pageNumber, uint64_t maxAge){ // Base case: for nonlive situations no new data will be added if (!M.getLive()){ return false; @@ -118,6 +118,12 @@ namespace Mist{ if (curPageNum[idx] && curPageNum[idx] <= pageNumber){ return true; } + // Compare last timestamp on the track with the time of the first key of the page + uint64_t lastMs = meta.getNowms(idx); + uint64_t thisTime = meta.getTimeForKeyIndex(idx, pageNumber); + if (lastMs - thisTime < maxAge) { + return true; + } // If there is no set curPageNum we are definitely not writing to it return false; } @@ -166,7 +172,32 @@ namespace Mist{ IPC::releasePage(pageName); #endif toErase.master = true; - // Remove the page from the tracks index page + // Update the page on the tracks index page if needed + uint64_t firstKeyNum = tPages.getInt("firstkey", pageIdx); + uint64_t keyCount = tPages.getInt("keycount", pageIdx); + uint64_t partCount = tPages.getInt("parts", pageIdx); + uint64_t newFirstKey = M.getKeys(idx).getFirstValid(); + if (firstKeyNum + keyCount <= newFirstKey){ + INFO_MSG("Page %" PRIu64 " track %zu has expired during the time it was kept cached in memory (contains up to key %lu, but the earliest key is %lu). Removing it now", firstKeyNum, idx, firstKeyNum + keyCount, newFirstKey); + tPages.setInt("keycount", 0, pageIdx); //< Force removal by having avail and keycount both 0 + }else if (firstKeyNum < newFirstKey){ + uint64_t newPartCount = 0; + DTSC::Keys keys = M.getKeys(idx); + for (uint32_t i = newFirstKey; i < firstKeyNum + keyCount; i++){ + newPartCount += keys.getParts(i); + } + MEDIUM_MSG("Adjusting meta info for page %lu track %lu before unloading it. First key %lu -> %lu. Key count %lu -> %lu. Part count %lu -> %lu", firstKeyNum, idx, firstKeyNum, newFirstKey, keyCount, keyCount - (newFirstKey - firstKeyNum), partCount, newPartCount); + tPages.setInt("keycount", keyCount - (newFirstKey - firstKeyNum), pageIdx); + tPages.setInt("parts", newPartCount, pageIdx); + tPages.setInt("firstkey", newFirstKey, pageIdx); + } + // Delete pages from the tracks index page that will never contain any more + for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + if (tPages.getInt("keycount", i) || tPages.getInt("avail", i)){ + break; + } + tPages.deleteRecords(1); + } // Leaving scope here, the page will now be destroyed } @@ -188,8 +219,7 @@ namespace Mist{ uint64_t pageNum = tPages.getInt("firstkey", i); if (pageNum > keyNum) continue; uint64_t keyCount = tPages.getInt("keycount", i); - if (pageNum + keyCount - 1 < keyNum) continue; - if (keyCount && pageNum + keyCount - 1 < keyNum) continue; + if (!keyCount || pageNum + keyCount - 1 < keyNum) continue; uint64_t avail = tPages.getInt("avail", i); return avail ? pageNum : INVALID_KEY_NUM; } @@ -445,13 +475,22 @@ namespace Mist{ if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ aMeta.resizeTrack(packTrack, aMeta.fragments(packTrack).getRCount(), aMeta.keys(packTrack).getRCount(), aMeta.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); } - + // Finalize part count of the previous live page + uint64_t newPartCount = 0; + DTSC::Keys keys = M.getKeys(packTrack); + uint64_t lastKey = tPages.getInt("firstkey", curPage) + tPages.getInt("keycount", curPage); + for (uint32_t i = tPages.getInt("firstkey", curPage); i < lastKey; i++){ + newPartCount += keys.getParts(i); + } + tPages.setInt("parts", newPartCount, curPage); curPage = endPage; tPages.setInt("firstkey", curPageNum[packTrack], endPage); tPages.setInt("firsttime", packTime, endPage); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); tPages.setInt("keycount", 0, endPage); tPages.setInt("avail", 0, endPage); + tPages.setInt("parts", 0, endPage); + tPages.setInt("lastkeytime", 0, endPage); tPages.addRecords(1); if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);} DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); diff --git a/src/io.h b/src/io.h index 3db01406..8c099cb3 100644 --- a/src/io.h +++ b/src/io.h @@ -22,7 +22,7 @@ namespace Mist{ bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta); void bufferFinalize(size_t idx, IPC::sharedPage & page); void liveFinalize(size_t idx); - bool isCurrentLivePage(size_t idx, uint32_t pageNumber); + bool isRecentLivePage(size_t idx, uint32_t pageNumber, uint64_t maxAge); void bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx = INVALID_KEY_NUM); void bufferLivePacket(const DTSC::Packet &packet); @@ -51,6 +51,13 @@ namespace Mist{ std::map userSelect; + size_t getCurrentLivePage(uint32_t trackIdx){ + if (!curPageNum.count(trackIdx)){ + return INVALID_KEY_NUM; + } + return curPageNum[trackIdx]; + }; + private: std::map livePage; std::map curPageNum;