MistInHLS improvements and speedups, part 2/2

This commit is contained in:
Marco van Dijk 2023-06-29 17:28:56 +02:00 committed by Thulinma
parent ed1c291955
commit 9e30444476
6 changed files with 378 additions and 243 deletions

View file

@ -2272,15 +2272,46 @@ namespace DTSC{
}
}
Track &t = tracks[trackIdx];
DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+t.keys.getInt(t.keyPartsField, t.keys.getDeleted()), t.parts.getPresent());
t.parts.deleteRecords(t.keys.getInt(t.keyPartsField, t.keys.getDeleted()));
DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.keys.getDeleted(), t.keys.getDeleted()+1, t.keys.getPresent());
uint64_t deletedPartCount = t.keys.getInt(t.keyPartsField, t.keys.getDeleted());
DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+deletedPartCount, t.parts.getPresent());
t.parts.deleteRecords(deletedPartCount);
uint64_t deletedKeyNum = t.keys.getDeleted();
DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", deletedKeyNum, deletedKeyNum+1, t.keys.getPresent());
t.keys.deleteRecords(1);
if (t.fragments.getInt(t.fragmentFirstKeyField, t.fragments.getDeleted()) < t.keys.getDeleted()){
t.fragments.deleteRecords(1);
setMissedFragments(trackIdx, getMissedFragments(trackIdx) + 1);
}
if (t.pages.getPresent() > 1 && t.pages.getInt("firstkey", t.pages.getDeleted() + 1) < t.keys.getDeleted()){
// Check if any page contains the just-deleted key
for (uint64_t i = t.pages.getDeleted(); i < t.pages.getEndPos(); i++){
uint64_t thisKey = t.pages.getInt("firstkey", i);
uint64_t avtmp = t.pages.getInt("avail", i);
uint64_t keycount = t.pages.getInt("keycount", i);
DONTEVEN_MSG("Found page idx=%lu number=%lu avail=%lu, keycount=%lu", i, thisKey, avtmp, keycount);
uint64_t pageNum = t.pages.getInt("firstkey", i);
if (pageNum > deletedKeyNum) continue;
uint64_t keyCount = t.pages.getInt("keycount", i);
if (keyCount){
if (pageNum + keyCount - 1 < deletedKeyNum) continue;
}else if (pageNum < deletedKeyNum) continue;
uint64_t avail = t.pages.getInt("avail", i);
if (avail){
break;
}
// 'Resize' the page to whatever keys are still available
if (t.pages.getInt("keycount", i) > 1){
DONTEVEN_MSG("Key count %lu -> %lu", t.pages.getInt("keycount", i), t.pages.getInt("keycount", i) - 1);
t.pages.setInt("keycount", t.pages.getInt("keycount", i) - 1, i);
DONTEVEN_MSG("Part count %lu -> %lu", t.pages.getInt("parts", i), t.pages.getInt("parts", i) - deletedPartCount);
t.pages.setInt("parts", t.pages.getInt("parts", i) - deletedPartCount, i);
DONTEVEN_MSG("First key %lu -> %lu", t.pages.getInt("firstkey", i), t.pages.getInt("firstkey", i) + 1);
t.pages.setInt("firstkey", t.pages.getInt("firstkey", i) + 1, i);
break;
}
// Unload the page if there are no more keys left on it
// Initialize the correct page, make it master so it gets cleaned up when leaving scope.
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
@ -2290,6 +2321,7 @@ namespace DTSC{
// Then delete the page entry
t.pages.deleteRecords(1);
break;
}
setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted()));
if (resizeLock){resizeLock.unlink();}

View file

@ -48,10 +48,11 @@ namespace Mist{
const Util::RelAccX &tPages = M.pages(track);
if (!tPages.getEndPos()){return;}
DTSC::Keys keys(M.keys(track));
if (i > keys.getValidCount()){return;}
if (i > keys.getEndValid()){return;}
uint64_t pageIdx = 0;
for (uint64_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){
if (tPages.getInt("firstkey", j) > i) break;
uint64_t thisKey = tPages.getInt("firstkey", j);
if (thisKey > i) break;
pageIdx = j;
}
uint32_t pageNumber = tPages.getInt("firstkey", pageIdx);
@ -210,7 +211,19 @@ namespace Mist{
capa["optional"]["realtime"]["name"] = "Simulated Live";
capa["optional"]["realtime"]["help"] = "Make this input run as a simulated live stream";
capa["optional"]["realtime"]["option"] = "--realtime";
option.null();
option["short"] = "P";
option["long"] = "pagetimeout";
option["arg"] = "integer";
option["value"].append(DEFAULT_PAGE_TIMEOUT);
option["help"] = "For bufferless or live inputs like HLS, set the timeout in seconds for old, inactive pages to be deleted. A longer value results in more memory usage, but ensures that recently buffered data stays in memory for longer";
config->addOption("pagetimeout", option);
capa["optional"]["pagetimeout"]["name"] = "Memory page timeout";
capa["optional"]["pagetimeout"]["help"] = "For bufferless or live inputs like HLS, set the timeout in seconds for old, inactive pages to be deleted. A longer value results in more memory usage, but ensures that recently buffered data stays in memory for longer";
capa["optional"]["pagetimeout"]["option"] = "--pagetimeout";
capa["optional"]["pagetimeout"]["type"] = "uint";
capa["optional"]["pagetimeout"]["default"] = DEFAULT_PAGE_TIMEOUT;
/*LTS-END*/
capa["optional"]["debug"]["name"] = "debug";
@ -1226,16 +1239,21 @@ namespace Mist{
}
void Input::removeUnused(){
uint64_t timeout = config->getInteger("pagetimeout") * 1000;
uint64_t timeout = config->getInteger("pagetimeout");
uint64_t bufferTime = timeout * 1000;
if (config->hasOption("bufferTime")){
bufferTime = config->getInteger("bufferTime");
}
uint64_t cTime = Util::bootSecs();
for (std::map<size_t, std::map<uint32_t, uint64_t> >::iterator it = pageCounter.begin();
it != pageCounter.end(); it++){
std::set<uint32_t> deletedEntries;
for (std::map<uint32_t, uint64_t>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){
if (isRecentLivePage(it->first, it2->first, timeout)){continue;}
if (cTime > it2->second + DEFAULT_PAGE_TIMEOUT){
if (isRecentLivePage(it->first, it2->first, bufferTime)){continue;}
if (cTime > it2->second + timeout){
deletedEntries.insert(it2->first);
bufferRemove(it->first, it2->first);
HIGH_MSG("Unloading page %u track %lu", it2->first, it->first);
}
}
while (deletedEntries.size()){
@ -1470,20 +1488,27 @@ namespace Mist{
const Util::RelAccX &tPages = M.pages(idx);
DTSC::Keys keys(M.keys(idx));
uint32_t keyCount = keys.getValidCount();
uint64_t firstKey = keys.getFirstValid();
if (keyNum < firstKey){
HIGH_MSG("Key %" PRIu32 " on track %zu no longer seekable (earliest requestable key is %" PRIu64
"). Cancelling buffering.",
keyNum, idx, firstKey);
return true;
}
uint64_t lastKey = keys.getEndValid();
if (keyNum > lastKey){
// End of movie here, returning true to avoid various error messages
if (keyNum > lastKey + 1){
WARN_MSG("Key %" PRIu32 " on track %zu is higher than total (latest key is %" PRIu64
"). Cancelling buffering.",
keyNum, idx, lastKey);
}
return true;
}
if (!tPages.getEndPos()){
WARN_MSG("No pages for track %zu found! Cancelling bufferFrame", idx);
return false;
}
if (keyNum > keyCount){
// End of movie here, returning true to avoid various error messages
if (keyNum > keyCount + 1){
WARN_MSG("Key %" PRIu32 " on track %zu is higher than total (%" PRIu32
"). Cancelling buffering.",
keyNum, idx, keyCount);
}
return true;
}
uint64_t pageIdx = 0;
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt("firstkey", i) > keyNum) break;
@ -1553,8 +1578,7 @@ namespace Mist{
}
}else{
size_t prevPos = 0;
size_t partNo = 0;
for (size_t i = 0; i < keyNum; ++i){partNo += keys.getParts(i);}
size_t partNo = keys.getFirstPart(keyNum);
DTSC::Parts parts(M.parts(idx));
while (thisPacket && thisTime < stopTime){
if (connectedUsers || isAlwaysOn()){activityCounter = Util::bootSecs();}

View file

@ -132,9 +132,10 @@ namespace Mist{
static unsigned int plsTotalCount = 0; /// Total playlists active
static unsigned int plsInitCount = 0; /// Count of playlists fully inited
bool streamIsLive;
uint32_t globalWaitTime;
std::map<uint32_t, std::deque<playListEntries> > listEntries;
bool streamIsLive; //< Playlist can be sliding window or get new segments appended
bool streamIsVOD; //< Playlist segments do not disappear
uint32_t globalWaitTime; //< Time between playlist reloads, based on TARGETDURATION
std::map<uint32_t, std::deque<playListEntries> > listEntries; //< Segments currently in the playlist
// These are used in the HTTP::Downloader callback, to prevent timeouts when downloading
// segments/playlists.
@ -201,9 +202,8 @@ namespace Mist{
uriSrc = uriSource;
}
if (uriSrc.size()){INFO_MSG("Adding variant playlist: %s -> %s", relurl.c_str(), uriSrc.c_str());}
lastFileIndex = 0;
lastSegment = 0;
waitTime = 2;
playlistEnd = false;
noChangeCount = 0;
lastTimestamp = 0;
root = HTTP::URL(uriSrc);
@ -215,6 +215,7 @@ namespace Mist{
memset(keyAES, 0, 16);
startTime = Util::bootSecs();
reloadNext = 0;
firstIndex = 0;
}
/// Returns true if there is no protocol defined in the playlist root URL.
@ -479,7 +480,7 @@ namespace Mist{
/// Handles both initial load and future reloads.
/// Returns how many segments were added to the internal segment list.
bool Playlist::reload(){
uint64_t fileNo = 0;
uint64_t bposCounter = 1;
nextUTC = 0; // Make sure we don't use old timestamps
std::string line;
std::string key;
@ -490,9 +491,6 @@ namespace Mist{
std::string keyIV;
int count = 0;
uint64_t totalBytes = 0;
playlistType = LIVE; // Temporary value
std::istringstream urlSource;
std::ifstream fileSource;
@ -568,25 +566,32 @@ namespace Mist{
if (waitTime < 2){waitTime = 2;}
}
// Assuming this always comes before any segment
if (key == "MEDIA-SEQUENCE"){
fileNo = atoll(val.c_str());
// Reinit the segment counter
firstIndex = atoll(val.c_str());
bposCounter = firstIndex + 1;
}
if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);}
if (key == "PLAYLIST-TYPE"){
if (val == "VOD"){
playlistType = VOD;
streamIsVOD = true;
streamIsLive = false;
}else if (val == "LIVE"){
playlistType = LIVE;
streamIsVOD = false;
streamIsLive = true;
}else if (val == "EVENT"){
playlistType = EVENT;
streamIsVOD = true;
streamIsLive = true;
}
}
// Once we see this tag, the entire playlist becomes VOD
if (key == "ENDLIST"){
// end of playlist reached!
playlistEnd = true;
playlistType = VOD;
streamIsVOD = true;
streamIsLive = false;
}
continue;
}
@ -600,30 +605,28 @@ namespace Mist{
std::getline(input, filename);
// check for already added segments
DONTEVEN_MSG("Current file has index #%" PRIu64 ", last index was #%" PRIu64 "", fileNo, lastFileIndex);
if (fileNo >= lastFileIndex){
DONTEVEN_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment);
if (bposCounter > lastSegment){
cleanLine(filename);
char ivec[16];
if (keyIV.size()){
parseKey(keyIV, ivec, 16);
}else{
memset(ivec, 0, 16);
Bit::htobll(ivec + 8, fileNo);
Bit::htobll(ivec + 8, bposCounter);
}
addEntry(root.link(filename).getUrl(), filename, f, totalBytes, keys[keyUri], std::string(ivec, 16));
lastFileIndex = fileNo + 1;
addEntry(root.link(filename).getUrl(), filename, f, bposCounter, keys[keyUri], std::string(ivec, 16));
lastSegment = bposCounter;
++count;
}
nextUTC = 0;
++fileNo;
++bposCounter;
}
// VOD over HTTP needs to be processed as LIVE.
if (!isUrl()){
fileSource.close();
}
// Set the global live/vod bool to live if this playlist looks like a live playlist
if (playlistType == LIVE){streamIsLive = true;}
if (globalWaitTime < waitTime){globalWaitTime = waitTime;}
@ -648,7 +651,7 @@ namespace Mist{
}
/// Adds playlist segments to be processed
void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &totalBytes,
void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &bpos,
const std::string &key, const std::string &iv){
// if (!isSupportedFile(filename)){
// WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
@ -659,7 +662,7 @@ namespace Mist{
entry.filename = absolute_filename;
entry.relative_filename = filename;
cleanLine(entry.filename);
entry.bytePos = totalBytes;
entry.bytePos = bpos;
entry.duration = duration;
if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){
DTSC::veryUglyJitterOverride = entry.duration * 1000;
@ -679,7 +682,6 @@ namespace Mist{
std::string test = root.link(entry.filename).getFilePath();
fileSource.open(test.c_str(), std::ios::ate | std::ios::binary);
if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));}
totalBytes += fileSource.tellg();
}
entry.timestamp = lastTimestamp + startTime;
@ -700,14 +702,15 @@ namespace Mist{
inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){
zUTC = nUTC = 0;
self = this;
streamIsLive = false;
streamIsLive = true; //< default to sliding window playlist
streamIsVOD = false; //< default to sliding window playlist
globalWaitTime = 0;
currentPlaylist = 0;
streamOffset = 0;
isInitialRun = false;
pidCounter = 1;
isLiveDVR = false;
previousSegmentIndex = -1;
currentIndex = 0;
@ -738,6 +741,21 @@ namespace Mist{
capa["codecs"]["audio"].append("AC3");
capa["codecs"]["audio"].append("MP3");
JSON::Value option;
option["arg"] = "integer";
option["long"] = "buffer";
option["short"] = "b";
option["help"] = "Live buffer window in ms. Segments within this range from the live point will be kept in memory";
option["value"].append(50000);
config->addOption("bufferTime", option);
capa["optional"]["bufferTime"]["name"] = "Buffer time (ms)";
capa["optional"]["bufferTime"]["help"] =
"Live buffer window in ms. Segments within this range from the live point will be kept in memory";
capa["optional"]["bufferTime"]["option"] = "--buffer";
capa["optional"]["bufferTime"]["type"] = "uint";
capa["optional"]["bufferTime"]["default"] = 50000;
option.null();
inFile = NULL;
}
@ -756,99 +774,18 @@ namespace Mist{
return false;
}
// If the playlist is of event type, init the amount of segments in the playlist
if (isLiveDVR){
// Set the previousSegmentIndex by quickly going through the existing PLS files
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end();
pListIt++){
parsedSegments[pListIt->first] = 0;
INFO_MSG("Playlist %" PRIu32 " contains %zu segments", pListIt->first, pListIt->second.size());
}
// Segments can be added (and removed if VOD is false)
if (streamIsLive){
meta.setLive(true);
}
// Segments can not be removed
if (streamIsVOD){
meta.setVod(true);
streamIsLive = true;
}
return true;
}
void inputHLS::parseStreamHeader(){
if (!readExistingHeader()){
if (!initPlaylist(config->getString("input"))){
Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting");
return;
}
uint64_t oldBootMsOffset = M.getBootMsOffset();
meta.reInit(isSingular() ? streamName : "", false);
meta.setUTCOffset(zUTC);
meta.setBootMsOffset(oldBootMsOffset);
INFO_MSG("Parsing live stream to create header...");
TS::Packet packet; // to analyse and extract data
int pidCounter = 1;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
// Skip empty playlists
if (!pListIt->second.size()){continue;}
int prepidCounter = pidCounter;
tsStream.clear();
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end(); ++entryIt){
keepAlive();
if (!segDowner.loadSegment(*entryIt)){
WARN_MSG("Skipping segment that could not be loaded in an attempt to recover");
tsStream.clear();
continue;
}
do{
if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){
WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str());
tsStream.clear();
break; // Abort load
}
tsStream.parse(packet, entryIt->bytePos);
if (tsStream.hasPacketOnEachTrack()){
while (tsStream.hasPacket()){
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
int tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
size_t idx = M.trackIDToIndex(packetId, getpid());
if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
if (idx != INVALID_TRACK_ID){
meta.setMinKeepAway(idx, globalWaitTime * 2000);
VERYHIGH_MSG("setting minKeepAway = %" PRIu32 " for track: %zu", globalWaitTime * 2000, idx);
}
}
}
break; // we have all tracks discovered, next playlist!
}
}while (!segDowner.atEnd());
if (!segDowner.atEnd()){
segDowner.close();
tsStream.clear();
}
if (prepidCounter < pidCounter){break;}// We're done reading this playlist!
}
}
}
tsStream.clear();
currentPlaylist = 0;
segDowner.close(); // make sure we have nothing left over
INFO_MSG("header complete, beginning live ingest of %d tracks", pidCounter - 1);
}
bool inputHLS::readExistingHeader(){
if (!Input::readExistingHeader()){
INFO_MSG("Could not read existing header, regenerating");
@ -873,12 +810,24 @@ namespace Mist{
}
// Recover playlist entries
tthread::lock_guard<tthread::mutex> guard(entryMutex);
HTTP::URL root(config->getString("input"));
HTTP::URL root = HTTP::localURIResolver().link(config->getString("input"));
jsonForEachConst(M.inputLocalVars["playlistEntries"], i){
uint64_t plNum = JSON::Value(i.key()).asInt();
if (M.inputLocalVars["playlistEntries"][i.key()].size() < listEntries[plNum].size()){
INFO_MSG("Header needs update as the amount of segments in the playlist has decreased, regenerating header");
return false;
}
std::deque<playListEntries> newList;
jsonForEachConst(*i, j){
const JSON::Value & thisEntry = *j;
if (thisEntry[1u].asInt() < playlistMapping[plNum].firstIndex + 1){
INFO_MSG("Skipping segment %lu which is present in the header, but no longer available in the playlist", thisEntry[1u].asInt());
continue;
}
if (thisEntry[1u].asInt() > playlistMapping[plNum].firstIndex + listEntries[plNum].size()){
INFO_MSG("Header needs update as the segment index has decreased. The stream has likely restarted, regenerating");
return false;
}
playListEntries newEntry;
newEntry.relative_filename = thisEntry[0u].asString();
newEntry.filename = root.link(M.inputLocalVars["playlist_urls"][i.key()]).link(thisEntry[0u].asString()).getUrl();
@ -913,8 +862,13 @@ namespace Mist{
jsonForEachConst(M.inputLocalVars["parsedSegments"], i){
uint64_t key = JSON::Value(i.key()).asInt();
uint64_t val = i->asInt();
// If there was a jump in MEDIA-SEQUENCE, start from there
if (val < playlistMapping[key].firstIndex){
INFO_MSG("Detected a jump in MEDIA-SEQUENCE, adjusting segment counter from %lu to %lu", val, playlistMapping[key].firstIndex);
val = playlistMapping[key].firstIndex;
}
parsedSegments[key] = val;
playlistMapping[key].lastFileIndex = val;
playlistMapping[key].lastSegment = val;
INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val);
}
@ -926,8 +880,12 @@ namespace Mist{
return true;
}
void inputHLS::parseStreamHeader(){
streamIsVOD = false;
readHeader();
}
bool inputHLS::readHeader(){
if (streamIsLive && !isLiveDVR){return true;}
// to analyse and extract data
TS::Packet packet;
char *data;
@ -947,6 +905,8 @@ namespace Mist{
pListIt != listEntries.end() && config->is_active; pListIt++){
tsStream.clear();
uint32_t entId = 0;
bool foundAtLeastOnePacket = false;
INFO_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex);
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end() && config->is_active; entryIt++){
@ -980,14 +940,15 @@ namespace Mist{
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize);
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
tsStream.getEarliestPacket(headerPack);
foundAtLeastOnePacket = true;
}
}
// No packets available, so read the next TS packet if available
if (segDowner.readNext()){
packet.FromPointer(segDowner.packetPtr);
tsStream.parse(packet, entId);
tsStream.parse(packet, entryIt->bytePos);
}
}
// get last packets
@ -1009,7 +970,7 @@ namespace Mist{
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize);
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
tsStream.getEarliestPacket(headerPack);
}
// Finally save the offset as part of the TS segment. This is required for bufferframe
@ -1026,6 +987,12 @@ namespace Mist{
if (streamStatus && streamStatus.len > 1){
streamStatus.mapped[1] = (255 * currentSegment) / totalSegments;
}
// Init segment counters to what was set to MEDIA-SEQUENCE
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment;
// For non-vod, only parse the first segment for each playlist
if (!streamIsVOD && foundAtLeastOnePacket){break;}
}
}
if (!config->is_active){return false;}
@ -1033,9 +1000,9 @@ namespace Mist{
// set bootMsOffset in order to display the program time correctly in the player
meta.setUTCOffset(zUTC);
if (M.getLive()){meta.setBootMsOffset(streamOffset);}
if (streamIsLive && !isLiveDVR){return true;}
injectLocalVars();
isInitialRun = true;
return true;
}
@ -1064,7 +1031,7 @@ namespace Mist{
thisPlaylist.append(thisEntries);
}
allEntries[JSON::Value(pListIt->first).asString()] = thisPlaylist;
meta.inputLocalVars["parsedSegments"][JSON::Value(pListIt->first).asString()] = pListIt->second.size();
meta.inputLocalVars["parsedSegments"][JSON::Value(pListIt->first).asString()] = parsedSegments[pListIt->first];
}
meta.inputLocalVars["playlist_urls"] = playlist_urls;
meta.inputLocalVars["playlistEntries"] = allEntries;
@ -1079,20 +1046,18 @@ namespace Mist{
meta.inputLocalVars["pidMappingR"] = thisMappingsR;
}
bool inputHLS::needsLock(){
if (config->getBool("realtime")){return false;}
if (isLiveDVR){
return true;
}
return !streamIsLive;
}
/// \brief Parses new segments added to playlist files as live data
/// \param segmentIndex: the index of the segment in the current playlist
/// \return True if the segment has been buffered successfully
bool inputHLS::parseSegmentAsLive(uint64_t segmentIndex){
bool hasOffset = false;
bool hasPacket = false;
uint64_t bufferTime = config->getInteger("pagetimeout");
if (config->hasOption("bufferTime")){
bufferTime = config->getInteger("bufferTime") / 1000;
}
// Used to immediately mark pages for removal when we're bursting through segments on initial boot
uint64_t curTimeout = Util::bootSecs() - bufferTime;
// Keep our own variables to make sure buffering live data does not interfere with VoD pages loading
TS::Packet packet;
TS::Stream tsStream;
@ -1136,19 +1101,25 @@ namespace Mist{
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
playlistMapping[currentPlaylist].tracks[idx] = true;
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe"));
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
if (isInitialRun){
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
}else{
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
}
tsStream.getEarliestPacket(headerPack);
}
}
// No packets available, so read the next TS packet if available
if (segDowner.readNext()){
packet.FromPointer(segDowner.packetPtr);
tsStream.parse(packet, segmentIndex + 1);
tsStream.parse(packet, curList.at(segmentIndex).bytePos);
}
}
// get last packets
@ -1168,24 +1139,61 @@ namespace Mist{
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
playlistMapping[currentPlaylist].tracks[idx] = true;
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe"));
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
if (isInitialRun){
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
}else{
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
}
tsStream.getEarliestPacket(headerPack);
}
return true;
}
/// \brief Override userLeadOut to buffer new data as live packets
void inputHLS::userLeadOut(){
Input::userLeadOut();
if (!isLiveDVR){
void inputHLS::streamMainLoop(){
parseLivePoint();
}
// Removes any metadata which is no longer and the playlist or buffered in memory
void inputHLS::updateMeta(){
// EVENT and VOD type playlists should never segments disappear from the start
// Only LIVE (sliding-window) type playlists should execute updateMeta()
if (streamIsVOD || !streamIsLive){
return;
}
for (std::map<size_t, bool>::iterator trackIdx = playlistMapping[currentPlaylist].tracks.begin();
trackIdx != playlistMapping[currentPlaylist].tracks.end(); trackIdx++){
// Calc after how many MS segments are no longer part of the buffer window
uint64_t bufferTime = config->getInteger("pagetimeout");
if (config->hasOption("bufferTime")){
bufferTime = config->getInteger("bufferTime") / 1000;
}
// Remove keys which are not requestable anymore
while (true) {
DTSC::Keys keys = M.getKeys(trackIdx->first);
// Stop if the earliest key is still in the playlist
if (listEntries[currentPlaylist].front().bytePos < keys.getBpos(keys.getFirstValid())){
break;
}
// Stop if earliest key is still in the buffer window
if (listEntries[currentPlaylist].back().timestamp - listEntries[currentPlaylist].front().timestamp < bufferTime){
break;
}
// First key could still be in memory, but is no longer seekable: drop it
HIGH_MSG("Removing key %lu @%lu ms on track %lu from metadata", M.getKeys(trackIdx->first).getFirstValid(), M.getFirstms(trackIdx->first), trackIdx->first);
meta.removeFirstKey(trackIdx->first);
}
}
}
void inputHLS::parseLivePoint(){
uint64_t maxTime = Util::bootMS() + 500;
// Update all playlists to make sure listEntries contains all live segments
for (std::map<uint64_t, Playlist>::iterator pListIt = playlistMapping.begin();
@ -1194,19 +1202,59 @@ namespace Mist{
pListIt->second.reload();
}
currentPlaylist = pListIt->first;
if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist]){
INFO_MSG("Current playlist has parsed %" PRIu64 "/%zu entries", parsedSegments[currentPlaylist], listEntries[currentPlaylist].size());
const uint64_t firstIdx = playlistMapping[currentPlaylist].firstIndex;
// If the segment counter decreases, reset counters and remove old segments from metadata
if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){
WARN_MSG("Segment counter for playlist %lu has decreased to %lu. Exiting to reset stream", currentPlaylist, firstIdx);
config->is_active = false;
Util::logExitReason(ER_FORMAT_SPECIFIC, "Segment counter decreased. Exiting to reset stream");
return;
}
for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){
INFO_MSG("Adding entry #%" PRIu64 " as live data", entryIt+1);
// Remove segments from listEntries as soon as it is no longer requestable
{
tthread::lock_guard<tthread::mutex> guard(entryMutex);
while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){
MEDIUM_MSG("Segment #%lu no longer in the input playlist", firstIdx + 1);
listEntries[currentPlaylist].pop_front();
}
}
// Unload memory pages which are outside of the buffer window and not recently loaded
removeUnused();
// Remove meta info for expired keys
updateMeta();
// Check for new segments
if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){
INFO_MSG("Playlist #%lu has parsed %" PRIu64 "/%zu entries. Parsing new segments...", currentPlaylist, parsedSegments[currentPlaylist] - firstIdx, listEntries[currentPlaylist].size());
}else if (isInitialRun){
isInitialRun = false;
}
if (parsedSegments[currentPlaylist] < firstIdx){
WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx);
parsedSegments[currentPlaylist] = firstIdx;
}
for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){
MEDIUM_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1);
if (parseSegmentAsLive(entryIt)){
parsedSegments[currentPlaylist]++;
parsedSegments[currentPlaylist] = firstIdx + entryIt + 1;
}
if (Util::bootMS() > maxTime){return;}
}
}
}
/// \brief Override userLeadOut to buffer new data as live packets
void inputHLS::userLeadOut(){
Input::userLeadOut();
if (streamIsLive){
parseLivePoint();
}
}
bool inputHLS::openStreamSource(){return true;}
void inputHLS::getNext(size_t idx){
@ -1242,7 +1290,12 @@ namespace Mist{
return;
}
uint64_t packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC);
uint64_t packetTime = thisPacket.getTime();
if (listEntries[currentPlaylist].at(currentIndex).timeOffset){
packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset;
}else{
packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC);
}
INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime);
// overwrite trackId on success
Bit::htobl(thisPacket.getData() + 8, tid);
@ -1255,7 +1308,7 @@ namespace Mist{
// No? Let's read some more data and check again.
if (!segDowner.atEnd() && segDowner.readNext()){
tsBuf.FromPointer(segDowner.packetPtr);
tsStream.parse(tsBuf, streamIsLive && !isLiveDVR ? 0 : currentIndex + 1);
tsStream.parse(tsBuf, listEntries[currentPlaylist].at(currentIndex).bytePos);
continue; // check again
}
@ -1319,19 +1372,19 @@ namespace Mist{
DTSC::Keys keys(M.keys(idx));
for (size_t i = keys.getFirstValid(); i < keys.getEndValid(); i++){
if (keys.getTime(i) > seekTime){
VERYHIGH_MSG("Found elapsed key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1);
VERYHIGH_MSG("Found elapsed key with a time of %" PRIu64 " ms. Using playlist index %zu to match requested time %lu", keys.getTime(i), plistEntry, seekTime);
break;
}
VERYHIGH_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1);
plistEntry = keys.getBpos(i);
// Keys can still be accessible in memory. Skip any segments we cannot seek to in the playlist
if (keys.getBpos(i) <= playlistMapping[currentPlaylist].firstIndex){
INSANE_MSG("Skipping segment #%lu (key %lu @ %lu ms) for seeking, as it is no longer available in the playlist", keys.getBpos(i) - 1, i, keys.getTime(i));
continue;
}
plistEntry = keys.getBpos(i) - 1 - playlistMapping[currentPlaylist].firstIndex;
INSANE_MSG("Found valid key with a time of %" PRIu64 " ms at playlist index %zu while seeking", keys.getTime(i), plistEntry);
}
if (plistEntry < 1){
WARN_MSG("attempted to seek outside the file");
return;
}
currentIndex = plistEntry - 1;
currentIndex = plistEntry;
currentPlaylist = getMappedTrackPlaylist(trackId);
VERYHIGH_MSG("Seeking to index %zu on playlist %" PRIu64, currentIndex, currentPlaylist);
@ -1362,7 +1415,7 @@ namespace Mist{
while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){
if (!segDowner.readNext()){break;}
tsBuffer.FromPointer(segDowner.packetPtr);
tsStream.parse(tsBuffer, streamIsLive && !isLiveDVR ? 0 : plistEntry);
tsStream.parse(tsBuffer, listEntries[currentPlaylist].at(currentIndex).bytePos);
}
}
@ -1446,15 +1499,6 @@ namespace Mist{
return packetId;
}
size_t inputHLS::getEntryId(uint32_t playlistId, uint64_t bytePos){
if (bytePos == 0){return 0;}
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (int i = 0; i < listEntries[playlistId].size(); i++){
if (listEntries[playlistId].at(i).bytePos > bytePos){return i - 1;}
}
return listEntries[playlistId].size() - 1;
}
uint64_t inputHLS::getOriginalTrackId(uint32_t playlistId, uint32_t id){
return pidMapping[(((uint64_t)playlistId) << 32) + id];
}
@ -1483,11 +1527,6 @@ namespace Mist{
return lastOut;
}
/// \brief Sets parsedSegments for all playlists, specifying how many segments
/// have already been parsed. Additional segments can then be parsed as live data
void inputHLS::setParsedSegments(){
}
/// Parses the main playlist, possibly containing variants.
bool inputHLS::initPlaylist(const std::string &uri, bool fullInit){
// Used to set zUTC, in case the first EXT-X-PROGRAM-DATE-TIME does not appear before the first segment
@ -1554,8 +1593,12 @@ namespace Mist{
// skip empty lines in the playlist
continue;
}
if (line.compare(0, 26, "#EXT-X-PLAYLIST-TYPE:EVENT") == 0){isLiveDVR = true;}
if (line.compare(0, 14, "#EXT-X-ENDLIST") == 0){isLiveDVR = false;}
if (line.compare(0, 14, "#EXT-X-ENDLIST") == 0){
streamIsLive = false;
streamIsVOD = true;
meta.setLive(false);
meta.setVod(true);
}
if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){
// this is a variant playlist file.. next line is an uri to a playlist
// file
@ -1693,29 +1736,14 @@ namespace Mist{
playListEntries ntry;
// This scope limiter prevents the recursion down below from deadlocking us
{
// Switch to next file
currentIndex++;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
std::deque<playListEntries> &curList = listEntries[currentPlaylist];
INSANE_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist);
if (!curList.size()){
INFO_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist);
if (curList.size() <= currentIndex){
if (streamIsLive){
INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist);
if (streamIsLive || isLiveDVR){Util::wait(500);}
return false;
}
if (!streamIsLive || isLiveDVR){
// VoD advances the index by one and attempts to read
// The playlist is not altered in this case, since we may need to seek back later
currentIndex++;
if (curList.size() - 1 < currentIndex){
INFO_MSG("Reached last entry");
return false;
}
ntry = curList[currentIndex];
}else{
// Live does not use the currentIndex, but simply takes the first segment
// That segment is then removed from the playlist so we don't read it again - live streams can't seek anyway
ntry = *curList.begin();
curList.pop_front();
if (Util::bootSecs() < ntry.timestamp){
VERYHIGH_MSG("Slowing down to realtime...");
while (Util::bootSecs() < ntry.timestamp){
@ -1723,7 +1751,12 @@ namespace Mist{
Util::wait(250);
}
}
}else{
INFO_MSG("Reached last entry in playlist %" PRIu64, currentPlaylist);
}
return false;
}
ntry = curList[currentIndex];
}
if (!segDowner.loadSegment(ntry)){
@ -1769,7 +1802,7 @@ namespace Mist{
}
void inputHLS::finish(){
if (isLiveDVR){
if (!streamIsVOD){ //< Already generated from readHeader
INFO_MSG("Writing updated header to disk");
injectLocalVars();
M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl());
@ -1778,7 +1811,7 @@ namespace Mist{
}
void inputHLS::checkHeaderTimes(const HTTP::URL & streamFile){
if (isLiveDVR){return;}
if (streamIsLive){return;} //< Since the playlist will likely be newer than the DTSH for live-dvr
Input::checkHeaderTimes(streamFile);
}

View file

@ -16,9 +16,8 @@
namespace Mist{
enum PlaylistType{VOD, LIVE, EVENT};
extern bool streamIsLive;
extern bool streamIsLive; //< Playlist can be sliding window or get new segments appended
extern bool streamIsVOD; //< Playlist segments do not disappear
extern uint32_t globalWaitTime; // largest waitTime for any playlist we're loading - used to update minKeepAway
void parseKey(std::string key, char *newKey, unsigned int len);
@ -80,7 +79,7 @@ namespace Mist{
Playlist(const std::string &uriSrc = "");
bool isUrl() const;
bool reload();
void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &totalBytes,
void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &bpos,
const std::string &key, const std::string &keyIV);
bool isSupportedFile(const std::string filename);
@ -93,15 +92,16 @@ namespace Mist{
uint32_t id;
bool playlistEnd;
int noChangeCount;
uint64_t lastFileIndex;
uint64_t waitTime;
PlaylistType playlistType;
uint64_t lastTimestamp;
uint64_t startTime;
uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist
char keyAES[16];
std::map<std::string, std::string> keys;
uint64_t firstIndex; //< the index of the first segment in the playlist
uint64_t lastSegment;
std::map<size_t, bool> tracks;
};
void playlistRunner(void *ptr);
@ -110,7 +110,7 @@ namespace Mist{
public:
inputHLS(Util::Config *cfg);
~inputHLS();
bool needsLock();
bool needsLock(){return !config->getBool("realtime");}
bool openStreamSource();
bool callback();
@ -119,7 +119,6 @@ namespace Mist{
uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis
int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header
unsigned int startTime;
PlaylistType playlistType;
SegmentDownloader segDowner;
int version;
int targetDuration;
@ -145,14 +144,12 @@ namespace Mist{
// Used to map packetId of packets in pidMapping
int pidCounter;
/// HLS live VoD stream, set if: #EXT-X-PLAYLIST-TYPE:EVENT
bool isLiveDVR;
// Override userLeadOut to buffer new data as live packets
void userLeadOut();
// Removes any metadata which is no longer and the playlist or buffered in memory
void updateMeta();
/// Tries to add as much live packets from a TS file at the given location
bool parseSegmentAsLive(uint64_t segmentIndex);
// Updates parsedSegmentIndex for all playlists
void setParsedSegments();
// index of last playlist entry finished parsing
long previousSegmentIndex;
@ -174,16 +171,19 @@ namespace Mist{
bool readNextFile();
void parseStreamHeader();
void parseLivePoint();
void streamMainLoop();
uint32_t getMappedTrackId(uint64_t id);
uint32_t getMappedTrackPlaylist(uint64_t id);
uint64_t getOriginalTrackId(uint32_t playlistId, uint32_t id);
uint64_t getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC = 0);
uint64_t getPacketID(uint64_t currentPlaylist, uint64_t trackId);
size_t getEntryId(uint32_t playlistId, uint64_t bytePos);
virtual void finish();
void injectLocalVars();
virtual void checkHeaderTimes(const HTTP::URL & streamFile);
// Used to immediately mark pages for removal when we're bursting through segments on initial boot
bool isInitialRun;
};
}// namespace Mist

View file

@ -107,9 +107,9 @@ namespace Mist{
return true;
}
/// Checks whether a given page is currently being written to
/// \return True if the page is the current live page, and thus not safe to remove
bool InOutBase::isCurrentLivePage(size_t idx, uint32_t pageNumber){
/// Checks whether a given page was recently being written to
/// \return True if the page is in the current live window, and thus not safe to remove
bool InOutBase::isRecentLivePage(size_t idx, uint32_t pageNumber, uint64_t maxAge){
// Base case: for nonlive situations no new data will be added
if (!M.getLive()){
return false;
@ -118,6 +118,12 @@ namespace Mist{
if (curPageNum[idx] && curPageNum[idx] <= pageNumber){
return true;
}
// Compare last timestamp on the track with the time of the first key of the page
uint64_t lastMs = meta.getNowms(idx);
uint64_t thisTime = meta.getTimeForKeyIndex(idx, pageNumber);
if (lastMs - thisTime < maxAge) {
return true;
}
// If there is no set curPageNum we are definitely not writing to it
return false;
}
@ -166,7 +172,32 @@ namespace Mist{
IPC::releasePage(pageName);
#endif
toErase.master = true;
// Remove the page from the tracks index page
// Update the page on the tracks index page if needed
uint64_t firstKeyNum = tPages.getInt("firstkey", pageIdx);
uint64_t keyCount = tPages.getInt("keycount", pageIdx);
uint64_t partCount = tPages.getInt("parts", pageIdx);
uint64_t newFirstKey = M.getKeys(idx).getFirstValid();
if (firstKeyNum + keyCount <= newFirstKey){
INFO_MSG("Page %" PRIu64 " track %zu has expired during the time it was kept cached in memory (contains up to key %lu, but the earliest key is %lu). Removing it now", firstKeyNum, idx, firstKeyNum + keyCount, newFirstKey);
tPages.setInt("keycount", 0, pageIdx); //< Force removal by having avail and keycount both 0
}else if (firstKeyNum < newFirstKey){
uint64_t newPartCount = 0;
DTSC::Keys keys = M.getKeys(idx);
for (uint32_t i = newFirstKey; i < firstKeyNum + keyCount; i++){
newPartCount += keys.getParts(i);
}
MEDIUM_MSG("Adjusting meta info for page %lu track %lu before unloading it. First key %lu -> %lu. Key count %lu -> %lu. Part count %lu -> %lu", firstKeyNum, idx, firstKeyNum, newFirstKey, keyCount, keyCount - (newFirstKey - firstKeyNum), partCount, newPartCount);
tPages.setInt("keycount", keyCount - (newFirstKey - firstKeyNum), pageIdx);
tPages.setInt("parts", newPartCount, pageIdx);
tPages.setInt("firstkey", newFirstKey, pageIdx);
}
// Delete pages from the tracks index page that will never contain any more
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt("keycount", i) || tPages.getInt("avail", i)){
break;
}
tPages.deleteRecords(1);
}
// Leaving scope here, the page will now be destroyed
}
@ -188,8 +219,7 @@ namespace Mist{
uint64_t pageNum = tPages.getInt("firstkey", i);
if (pageNum > keyNum) continue;
uint64_t keyCount = tPages.getInt("keycount", i);
if (pageNum + keyCount - 1 < keyNum) continue;
if (keyCount && pageNum + keyCount - 1 < keyNum) continue;
if (!keyCount || pageNum + keyCount - 1 < keyNum) continue;
uint64_t avail = tPages.getInt("avail", i);
return avail ? pageNum : INVALID_KEY_NUM;
}
@ -445,13 +475,22 @@ namespace Mist{
if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){
aMeta.resizeTrack(packTrack, aMeta.fragments(packTrack).getRCount(), aMeta.keys(packTrack).getRCount(), aMeta.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages");
}
// Finalize part count of the previous live page
uint64_t newPartCount = 0;
DTSC::Keys keys = M.getKeys(packTrack);
uint64_t lastKey = tPages.getInt("firstkey", curPage) + tPages.getInt("keycount", curPage);
for (uint32_t i = tPages.getInt("firstkey", curPage); i < lastKey; i++){
newPartCount += keys.getParts(i);
}
tPages.setInt("parts", newPartCount, curPage);
curPage = endPage;
tPages.setInt("firstkey", curPageNum[packTrack], endPage);
tPages.setInt("firsttime", packTime, endPage);
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage);
tPages.setInt("keycount", 0, endPage);
tPages.setInt("avail", 0, endPage);
tPages.setInt("parts", 0, endPage);
tPages.setInt("lastkeytime", 0, endPage);
tPages.addRecords(1);
if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);}
DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack);

View file

@ -22,7 +22,7 @@ namespace Mist{
bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta);
void bufferFinalize(size_t idx, IPC::sharedPage & page);
void liveFinalize(size_t idx);
bool isCurrentLivePage(size_t idx, uint32_t pageNumber);
bool isRecentLivePage(size_t idx, uint32_t pageNumber, uint64_t maxAge);
void bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx = INVALID_KEY_NUM);
void bufferLivePacket(const DTSC::Packet &packet);
@ -51,6 +51,13 @@ namespace Mist{
std::map<size_t, Comms::Users> userSelect;
size_t getCurrentLivePage(uint32_t trackIdx){
if (!curPageNum.count(trackIdx)){
return INVALID_KEY_NUM;
}
return curPageNum[trackIdx];
};
private:
std::map<uint32_t, IPC::sharedPage> livePage;
std::map<uint32_t, size_t> curPageNum;