diff --git a/lib/defines.h b/lib/defines.h index 9292a7b0..2d19340d 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -163,13 +163,10 @@ static inline void show_stackframe(){} // Default values, these will scale up and down when needed, and are mainly used as starting values. #define DEFAULT_TRACK_COUNT 100 -#define DEFAULT_FRAGMENT_COUNT 2000 -#define DEFAULT_KEY_COUNT \ - 3 * DEFAULT_FRAGMENT_COUNT // A highest average of 5 keys / fragment is assumed -#define DEFAULT_PART_COUNT \ - 400 * DEFAULT_KEY_COUNT // A highest average of 500 parts / key is - // assumed -#define DEFAULT_PAGE_COUNT DEFAULT_KEY_COUNT // Assume every page is a key to ensure enough space +#define DEFAULT_FRAGMENT_COUNT 60 +#define DEFAULT_KEY_COUNT 60 +#define DEFAULT_PART_COUNT 30 * DEFAULT_KEY_COUNT +#define DEFAULT_PAGE_COUNT 10 #define DEFAULT_FRAGMENT_DURATION 1900 diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index c7de596e..4baaf409 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -5,6 +5,7 @@ #include "defines.h" #include "dtsc.h" #include "encode.h" +#include "lib/shared_memory.h" #include //for htonl/ntohl #include #include @@ -1033,7 +1034,6 @@ namespace DTSC{ //Ok, we have data, let's parse it, too. Track &s = tracks[tIdx]; - s.fragments.addRecords(fragCount); uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t)); for (int i = 0; i < fragCount; i++){ char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE); @@ -1046,9 +1046,9 @@ namespace DTSC{ s.fragments.setInts("keys", vals + fragCount, fragCount); s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount); s.fragments.setInts("size", vals + (3 * fragCount), fragCount); + s.fragments.addRecords(fragCount); vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t)); - s.keys.addRecords(keyCount); uint64_t totalPartCount = 0; for (int i = 0; i < keyCount; i++){ char *ptr = keyStor + (i * DTSH_KEY_SIZE); @@ -1068,9 +1068,9 @@ namespace DTSC{ s.keys.setInts("time", vals + (4 * keyCount), keyCount); s.keys.setInts("size", vals + (5 * keyCount), keyCount); s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount); + s.keys.addRecords(keyCount); vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t)); - s.parts.addRecords(partCount); for (int i = 0; i < partCount; i++){ char *ptr = partStor + (i * DTSH_PART_SIZE); vals[i] = Bit::btoh24(ptr); @@ -1080,6 +1080,7 @@ namespace DTSC{ s.parts.setInts("size", vals, partCount); s.parts.setInts("duration", vals + partCount, partCount); s.parts.setInts("offset", vals + (2 * partCount), partCount); + s.parts.addRecords(partCount); free(vals); } @@ -1253,6 +1254,98 @@ namespace DTSC{ } } + /// Reloads shared memory pages that are marked as needing an update, if any + /// Returns true if a reload happened + bool Meta::reloadReplacedPagesIfNeeded(){ + if (isMemBuf){return false;}//Only for shm-backed metadata + if (!stream.isReady() || !stream.getPointer("tracks")){ + INFO_MSG("No track pointer, not refreshing."); + return false; + } + char pageName[NAME_BUFFER_SIZE]; + + if (stream.isReload()){ + INFO_MSG("Reloading entire metadata"); + streamPage.close(); + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, streamName.c_str()); + streamPage.init(pageName, 0, false, true); + if (!streamPage.mapped){ + INFO_MSG("Page %s not found", pageName); + return true; + } + stream = Util::RelAccX(streamPage.mapped, true); + tM.clear(); + tracks.clear(); + refresh(); + return true; + } + + bool ret = false; + for (size_t i = 0; i < trackList.getPresent(); i++){ + if (trackList.getInt("valid", i) == 0){continue;} + bool always_load = !tracks.count(i); + if (always_load || tracks[i].track.isReload()){ + ret = true; + Track &t = tracks[i]; + if (always_load){ + VERYHIGH_MSG("Loading track: %s", trackList.getPointer("page", i)); + }else{ + VERYHIGH_MSG("Reloading track: %s", trackList.getPointer("page", i)); + } + IPC::sharedPage &p = tM[i]; + p.init(trackList.getPointer("page", i), SHM_STREAM_TRACK_LEN, false, false); + if (!p.mapped){ + WARN_MSG("Failed to load page %s, retrying later", trackList.getPointer("page", i)); + tM.erase(i); + tracks.erase(i); + continue; + } + + t.track = Util::RelAccX(p.mapped, true); + t.parts = Util::RelAccX(t.track.getPointer("parts"), true); + t.keys = Util::RelAccX(t.track.getPointer("keys"), true); + t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true); + t.pages = Util::RelAccX(t.track.getPointer("pages"), true); + + t.trackIdField = t.track.getFieldData("id"); + t.trackTypeField = t.track.getFieldData("type"); + t.trackCodecField = t.track.getFieldData("codec"); + t.trackFirstmsField = t.track.getFieldData("firstms"); + t.trackLastmsField = t.track.getFieldData("lastms"); + t.trackBpsField = t.track.getFieldData("bps"); + t.trackMaxbpsField = t.track.getFieldData("maxbps"); + t.trackLangField = t.track.getFieldData("lang"); + t.trackInitField = t.track.getFieldData("init"); + t.trackRateField = t.track.getFieldData("rate"); + t.trackSizeField = t.track.getFieldData("size"); + t.trackChannelsField = t.track.getFieldData("channels"); + t.trackWidthField = t.track.getFieldData("width"); + t.trackHeightField = t.track.getFieldData("height"); + t.trackFpksField = t.track.getFieldData("fpks"); + t.trackMissedFragsField = t.track.getFieldData("missedFrags"); + + t.partSizeField = t.parts.getFieldData("size"); + t.partDurationField = t.parts.getFieldData("duration"); + t.partOffsetField = t.parts.getFieldData("offset"); + + t.keyFirstPartField = t.keys.getFieldData("firstpart"); + t.keyBposField = t.keys.getFieldData("bpos"); + t.keyDurationField = t.keys.getFieldData("duration"); + t.keyNumberField = t.keys.getFieldData("number"); + t.keyPartsField = t.keys.getFieldData("parts"); + t.keyTimeField = t.keys.getFieldData("time"); + t.keySizeField = t.keys.getFieldData("size"); + + t.fragmentDurationField = t.fragments.getFieldData("duration"); + t.fragmentKeysField = t.fragments.getFieldData("keys"); + t.fragmentFirstKeyField = t.fragments.getFieldData("firstkey"); + t.fragmentSizeField = t.fragments.getFieldData("size"); + + } + } + return ret; + } + /// Merges in track information from a given DTSC::Meta object, optionally deleting missing tracks /// and optionally making hard copies of the original data. void Meta::merge(const DTSC::Meta &M, bool deleteTracks, bool copyData){ @@ -1342,22 +1435,33 @@ namespace DTSC{ t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true); t.pages = Util::RelAccX(t.track.getPointer("pages"), true); - trackList.addRecords(1); trackList.setString(trackPageField, pageName, tNumber); trackList.setInt(trackPidField, getpid(), tNumber); trackList.setInt(trackSourceTidField, sourceTrack, tNumber); + trackList.addRecords(1); validateTrack(tNumber); return tNumber; } /// Resizes a given track to be able to hold the given amount of fragments, keys, parts and pages. /// Currently called exclusively from Meta::update(), to resize the internal structures. - void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){ - INFO_MSG("Track %zu now with %zu frags, %zu keys, %zu parts, %zu pages", source, fragCount, - keyCount, partCount, pageCount); + void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount, const char * reason){ + char pageName[NAME_BUFFER_SIZE]; + IPC::semaphore resizeLock; + + if (!isMemBuf){ + snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), getpid(), source); + resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + resizeLock.wait(); + } + size_t pageSize = (isMemBuf ? sizeMemBuf[source] : tM[source].len); char *orig = (char *)malloc(pageSize); + if (!orig){ + FAIL_MSG("Failed to re-allocate memory for track %zu: %s", source, strerror(errno)); + return; + } memcpy(orig, (isMemBuf ? tMemBuf[source] : tM[source].mapped), pageSize); Track &t = tracks[source]; @@ -1373,17 +1477,23 @@ namespace DTSC{ free(tMemBuf[source]); tMemBuf.erase(source); tMemBuf[source] = (char *)malloc(newPageSize); + if (!tMemBuf[source]){ + FAIL_MSG("Failed to re-allocate memory for track %zu: %s", source, strerror(errno)); + resizeLock.unlink(); + return; + } sizeMemBuf[source] = newPageSize; memset(tMemBuf[source], 0, newPageSize); - INFO_MSG("Done re-allocating buffer %zu", source); - t.track = Util::RelAccX(tMemBuf[source], false); }else{ - char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), source); - INFO_MSG("Re-allocating page %s", pageName); tM[source].master = true; tM[source].init(pageName, newPageSize, true); + if (!tM[source].mapped){ + FAIL_MSG("Failed to re-allocate shared memory for track %zu: %s", source, strerror(errno)); + resizeLock.unlink(); + return; + } tM[source].master = false; t.track = Util::RelAccX(tM[source].mapped, false); @@ -1391,6 +1501,17 @@ namespace DTSC{ initializeTrack(t, fragCount, keyCount, partCount, pageCount); Util::RelAccX origAccess(orig); + Util::RelAccX origFragments(origAccess.getPointer("fragments")); + Util::RelAccX origKeys(origAccess.getPointer("keys")); + Util::RelAccX origParts(origAccess.getPointer("parts")); + Util::RelAccX origPages(origAccess.getPointer("pages")); + + MEDIUM_MSG("Track %zu resizing (reason: %s): frags %" PRIu32 "->%zu, keys %" PRIu32 "->%zu, parts %" PRIu32 "->%zu, pages %" PRIu32 "->%zu", + source, reason, + origFragments.getRCount(), fragCount, + origKeys.getRCount(), keyCount, + origParts.getRCount(), partCount, + origPages.getRCount(), pageCount); t.track.setInt(t.trackIdField, origAccess.getInt("id")); t.track.setString(t.trackTypeField, origAccess.getPointer("type")); @@ -1409,8 +1530,10 @@ namespace DTSC{ t.track.setInt(t.trackFpksField, origAccess.getInt("fpks")); t.track.setInt(t.trackMissedFragsField, origAccess.getInt("missedFrags")); - Util::RelAccX origParts(origAccess.getPointer("parts")); - t.parts.deleteRecords(origParts.getDeleted()); + t.parts.setEndPos(origParts.getEndPos()); + t.parts.setStartPos(origParts.getStartPos()); + t.parts.setDeleted(origParts.getDeleted()); + t.parts.setPresent(origParts.getPresent()); Util::FieldAccX origPartSizeAccX = origParts.getFieldAccX("size"); Util::FieldAccX origPartDurationAccX = origParts.getFieldAccX("duration"); @@ -1427,10 +1550,11 @@ namespace DTSC{ partDurationAccX.set(origPartDurationAccX.uint(i), i); partOffsetAccX.set(origPartOffsetAccX.uint(i), i); } - t.parts.addRecords(origParts.getPresent()); - Util::RelAccX origKeys(origAccess.getPointer("keys")); - t.keys.deleteRecords(origKeys.getDeleted()); + t.keys.setEndPos(origKeys.getEndPos()); + t.keys.setStartPos(origKeys.getStartPos()); + t.keys.setDeleted(origKeys.getDeleted()); + t.keys.setPresent(origKeys.getPresent()); Util::FieldAccX origKeyFirstpartAccX = origKeys.getFieldAccX("firstpart"); Util::FieldAccX origKeyBposAccX = origKeys.getFieldAccX("bpos"); @@ -1459,10 +1583,11 @@ namespace DTSC{ keyTimeAccX.set(origKeyTimeAccX.uint(i), i); keySizeAccX.set(origKeySizeAccX.uint(i), i); } - t.keys.addRecords(origKeys.getPresent()); - Util::RelAccX origFragments(origAccess.getPointer("fragments")); - t.fragments.deleteRecords(origFragments.getDeleted()); + t.fragments.setEndPos(origFragments.getEndPos()); + t.fragments.setStartPos(origFragments.getStartPos()); + t.fragments.setDeleted(origFragments.getDeleted()); + t.fragments.setPresent(origFragments.getPresent()); Util::FieldAccX origFragmentDurationAccX = origFragments.getFieldAccX("duration"); Util::FieldAccX origFragmentKeysAccX = origFragments.getFieldAccX("keys"); @@ -1482,10 +1607,11 @@ namespace DTSC{ fragmentFirstkeyAccX.set(origFragmentFirstkeyAccX.uint(i), i); fragmentSizeAccX.set(origFragmentSizeAccX.uint(i), i); } - t.fragments.addRecords(origFragments.getPresent()); - Util::RelAccX origPages(origAccess.getPointer("pages")); - t.pages.deleteRecords(origPages.getDeleted()); + t.pages.setEndPos(origPages.getEndPos()); + t.pages.setStartPos(origPages.getStartPos()); + t.pages.setDeleted(origPages.getDeleted()); + t.pages.setPresent(origPages.getPresent()); Util::FieldAccX origPageFirstkeyAccX = origPages.getFieldAccX("firstkey"); Util::FieldAccX origPageKeycountAccX = origPages.getFieldAccX("keycount"); @@ -1514,9 +1640,10 @@ namespace DTSC{ pageFirsttimeAccX.set(origPageFirsttimeAccX.uint(i), i); pageLastkeytimeAccX.set(origPageLastkeytimeAccX.uint(i), i); } - t.pages.addRecords(origPages.getPresent()); + t.track.setReady(); free(orig); + resizeLock.unlink(); } size_t Meta::addDelayedTrack(size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){ @@ -1568,10 +1695,11 @@ namespace DTSC{ t.track = Util::RelAccX(tM[tNumber].mapped, false); } initializeTrack(t, fragCount, keyCount, partCount, pageCount); - trackList.addRecords(1); + t.track.setReady(); trackList.setString(trackPageField, pageName, tNumber); trackList.setInt(trackPidField, getpid(), tNumber); trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber); + trackList.addRecords(1); if (setValid){validateTrack(tNumber);} if (!isMemBuf){trackLock.post();} return tNumber; @@ -1602,7 +1730,6 @@ namespace DTSC{ t.track.addField("missedFrags", RAX_32UINT); t.track.setRCount(1); - t.track.setReady(); t.track.addRecords(1); t.parts = Util::RelAccX(t.track.getPointer("parts"), false); @@ -1971,9 +2098,12 @@ namespace DTSC{ std::set Meta::getValidTracks(bool skipEmpty) const{ std::set res; - if (!(*this) && !isMemBuf){return res;} + if (!(*this) && !isMemBuf){ + INFO_MSG("Shared metadata not ready yet - no tracks valid"); + return res; + } uint64_t firstValid = trackList.getDeleted(); - uint64_t beyondLast = firstValid + trackList.getPresent(); + uint64_t beyondLast = trackList.getEndPos(); for (size_t i = firstValid; i < beyondLast; i++){ if (trackList.getInt(trackValidField, i) & trackValidMask){res.insert(i);} if (trackList.getInt(trackSourceTidField, i) != INVALID_TRACK_ID && @@ -2008,7 +2138,7 @@ namespace DTSC{ } void Meta::removeEmptyTracks(){ - refresh(); + reloadReplacedPagesIfNeeded(); std::set validTracks = getValidTracks(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ if (!tracks.at(*it).parts.getPresent()){removeTrack(*it);} @@ -2035,9 +2165,29 @@ namespace DTSC{ } /// Removes the first key from the memory structure and caches. - void Meta::removeFirstKey(size_t trackIdx){ + bool Meta::removeFirstKey(size_t trackIdx){ + + char pageName[NAME_BUFFER_SIZE]; + IPC::semaphore resizeLock; + + if (!isMemBuf){ + __pid_t trPid = trackList.getInt(trackPidField, trackIdx); + snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), trPid, trackIdx); + resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!resizeLock.tryWait()){ + MEDIUM_MSG("Metadata is busy, delaying deletion of key a bit"); + resizeLock.close(); + return false; + } + if (reloadReplacedPagesIfNeeded()){ + MEDIUM_MSG("Metadata just got replaced, delaying deletion of key a bit"); + return false; + } + } Track &t = tracks[trackIdx]; + DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+t.keys.getInt(t.keyPartsField, t.keys.getDeleted()), t.parts.getPresent()); t.parts.deleteRecords(t.keys.getInt(t.keyPartsField, t.keys.getDeleted())); + DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.keys.getDeleted(), t.keys.getDeleted()+1, t.keys.getPresent()); t.keys.deleteRecords(1); if (t.fragments.getInt(t.fragmentFirstKeyField, t.fragments.getDeleted()) < t.keys.getDeleted()){ t.fragments.deleteRecords(1); @@ -2055,6 +2205,8 @@ namespace DTSC{ t.pages.deleteRecords(1); } setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted())); + if (resizeLock){resizeLock.unlink();} + return true; } ///\brief Updates a meta object given a DTSC::Packet with byte position override. @@ -2202,10 +2354,8 @@ namespace DTSC{ uint32_t newPartNum = t.parts.getEndPos(); if ((newPartNum - t.parts.getDeleted()) >= t.parts.getRCount()){ - resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount(), t.parts.getRCount() * 2, - t.keys.getRCount()); + resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount(), t.parts.getRCount() * 2, t.pages.getRCount(), "not enough parts"); } - t.parts.addRecords(1); t.parts.setInt(t.partSizeField, packDataSize, newPartNum); t.parts.setInt(t.partOffsetField, packOffset, newPartNum); if (newPartNum){ @@ -2214,17 +2364,15 @@ namespace DTSC{ }else{ t.parts.setInt(t.partDurationField, 0, newPartNum); } - t.track.setInt(t.trackLastmsField, packTime); + t.parts.addRecords(1); uint32_t newKeyNum = t.keys.getEndPos(); if (isKeyframe || newKeyNum == 0 || (getType(tNumber) != "video" && packTime >= AUDIO_KEY_INTERVAL && packTime - t.keys.getInt(t.keyTimeField, newKeyNum - 1) >= AUDIO_KEY_INTERVAL)){ if ((newKeyNum - t.keys.getDeleted()) >= t.keys.getRCount()){ - resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount() * 2, t.parts.getRCount(), - t.keys.getRCount() * 2); + resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount() * 2, t.parts.getRCount(), t.pages.getRCount(), "not enough keys"); } - t.keys.addRecords(1); t.keys.setInt(t.keyBposField, packBytePos, newKeyNum); t.keys.setInt(t.keyTimeField, packTime, newKeyNum); t.keys.setInt(t.keyPartsField, 0, newKeyNum); @@ -2242,6 +2390,7 @@ namespace DTSC{ }else{ t.keys.setInt(t.keyFirstPartField, 0, newKeyNum); } + t.keys.addRecords(1); if (packBytePos){t.track.setInt(t.trackFirstmsField, t.keys.getInt(t.keyTimeField, 0));} uint32_t newFragNum = t.fragments.getEndPos(); @@ -2250,10 +2399,8 @@ namespace DTSC{ (packTime - getMinimumFragmentDuration()) >= t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField, newFragNum - 1)))){ if ((newFragNum - t.fragments.getDeleted()) >= t.fragments.getRCount()){ - resizeTrack(tNumber, t.fragments.getRCount() * 2, t.keys.getRCount(), t.parts.getRCount(), - t.keys.getRCount()); + resizeTrack(tNumber, t.fragments.getRCount() * 2, t.keys.getRCount(), t.parts.getRCount(), t.pages.getRCount(), "not enough frags"); } - t.fragments.addRecords(1); if (newFragNum){ t.fragments.setInt(t.fragmentDurationField, packTime - t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField, @@ -2278,6 +2425,7 @@ namespace DTSC{ t.fragments.setInt(t.fragmentSizeField, 0, newFragNum); t.fragments.setInt(t.fragmentKeysField, 1, newFragNum); t.fragments.setInt(t.fragmentFirstKeyField, t.keys.getInt(t.keyNumberField, newKeyNum), newFragNum); + t.fragments.addRecords(1); }else{ t.fragments.setInt(t.fragmentKeysField, t.fragments.getInt(t.fragmentKeysField, newFragNum - 1) + 1, newFragNum - 1); @@ -2296,6 +2444,7 @@ namespace DTSC{ uint32_t lastFragNum = t.fragments.getEndPos() - 1; t.fragments.setInt(t.fragmentSizeField, t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum); + t.track.setInt(t.trackLastmsField, packTime); markUpdated(tNumber); } @@ -2408,8 +2557,8 @@ namespace DTSC{ memset(tMemBuf[i], 0, SHM_STREAM_TRACK_LEN); t.track = Util::RelAccX(tMemBuf[i], false); initializeTrack(t); - t.track.flowFrom(src.tracks.at(i).track); + t.track.setReady(); } } @@ -2434,6 +2583,7 @@ namespace DTSC{ t.track = Util::RelAccX(tM[i].mapped, false); initializeTrack(t); t.track.flowFrom(M.tracks[i].track); + t.track.setReady(); } } diff --git a/lib/dtsc.h b/lib/dtsc.h index d8b2b544..059d8acf 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -281,6 +281,7 @@ namespace DTSC{ void addTrackFrom(const DTSC::Scan &src); void refresh(); + bool reloadReplacedPagesIfNeeded(); operator bool() const; @@ -302,7 +303,7 @@ namespace DTSC{ size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT, bool setValid = true); void resizeTrack(size_t source, size_t fragCount = DEFAULT_FRAGMENT_COUNT, size_t keyCount = DEFAULT_KEY_COUNT, - size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT); + size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT, const char * reason = ""); void initializeTrack(Track &t, size_t fragCount = DEFAULT_FRAGMENT_COUNT, size_t keyCount = DEFAULT_KEY_COUNT, size_t parCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT); @@ -424,7 +425,7 @@ namespace DTSC{ void validateTrack(size_t trackIdx, uint8_t validType = TRACK_VALID_ALL); void removeEmptyTracks(); void removeTrack(size_t trackIdx); - void removeFirstKey(size_t trackIdx); + bool removeFirstKey(size_t trackIdx); size_t mainTrack() const; uint32_t biggestFragment(uint32_t idx = INVALID_TRACK_ID) const; diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 1a6b76d2..bdffc9da 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -507,6 +507,14 @@ namespace IPC{ } return; } + if (handle > 0 && handle < 3){ + int tmpHandle = fcntl(handle, F_DUPFD, 3); + if (tmpHandle >= 3){ + DONTEVEN_MSG("Remapped handle for page %s from %d to %d!", name.c_str(), handle, tmpHandle); + ::close(handle); + handle = tmpHandle; + } + } if (master){ if (ftruncate(handle, len) < 0){ FAIL_MSG("truncate to %" PRIu64 " for page %s failed: %s", len, name.c_str(), strerror(errno)); diff --git a/lib/util.cpp b/lib/util.cpp index 4d1114d6..0dc5e5dd 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -832,7 +832,9 @@ namespace Util{ if (*hdrPresent >= amount){ *hdrPresent -= amount; // decrease records present }else{ + BACKTRACE; WARN_MSG("Depleting recordCount!"); + exit(1); *hdrPresent = 0; } } diff --git a/src/input/input.cpp b/src/input/input.cpp index 51a68a37..7db34a84 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -1063,6 +1063,9 @@ namespace Mist{ for (uint32_t j = 0; j < endKey; j++){ uint64_t keyTime = keys.getTime(j); if (newData){ + if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ + meta.resizeTrack(*it, M.fragments(*it).getRCount(), M.keys(*it).getRCount(), M.parts(*it).getRCount(), tPages.getRCount() * 2, "not enough pages"); + } tPages.addRecords(1); ++pageNum; tPages.setInt("firsttime", keyTime, pageNum); diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 850d46a5..eec4e5d9 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -366,8 +366,7 @@ namespace Mist{ } } // Alright, everything looks good, let's delete the key and possibly also fragment - meta.removeFirstKey(tid); - return true; + return meta.removeFirstKey(tid); } void inputBuffer::finish(){ @@ -399,7 +398,7 @@ namespace Mist{ curPageNum.erase(tid); INFO_MSG("Should remove track %zu", tid); - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); meta.removeTrack(tid); /*LTS-START*/ if (!M.getValidTracks().size()){ @@ -412,7 +411,7 @@ namespace Mist{ } void inputBuffer::removeUnused(){ - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); // first remove all tracks that have not been updated for too long bool changed = true; while (changed){ @@ -456,7 +455,7 @@ namespace Mist{ streamName.c_str(), i, type.c_str(), codec.c_str(), firstms / 1000, lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000); } - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); removeTrack(i); changed = true; break; @@ -500,7 +499,7 @@ namespace Mist{ } void inputBuffer::userLeadIn(){ - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); /*LTS-START*/ // Reload the configuration to make sure we stay up to date with changes through the api if (Util::epoch() - lastReTime > 4){preRun();} @@ -529,7 +528,7 @@ namespace Mist{ void inputBuffer::userOnDisconnect(size_t id){ if (sourcePids.count(id)){ INFO_MSG("Disconnected track %zu", sourcePids[id]); - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); removeTrack(sourcePids[id]); sourcePids.erase(id); } diff --git a/src/io.cpp b/src/io.cpp index 8d6f56b1..8804f4b8 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -20,7 +20,7 @@ namespace Mist{ size_t InOutBase::getMainSelectedTrack(){ if (!userSelect.size()){return INVALID_TRACK_ID;} size_t bestSoFar = INVALID_TRACK_ID; - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ if (meta.trackValid(it->first)){ if (meta.getType(it->first) == "video"){return it->first;} @@ -324,7 +324,7 @@ namespace Mist{ void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); meta.setLive(); // Store the trackid for easier access @@ -368,28 +368,32 @@ namespace Mist{ // If there is no page, create it if (!endPage){ nextPageNum = 0; - tPages.addRecords(1); tPages.setInt("firstkey", 0, 0); tPages.setInt("firsttime", packTime, 0); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0); tPages.setInt("keycount", 0, 0); tPages.setInt("avail", 0, 0); + tPages.addRecords(1); ++endPage; } uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1); // Compare on 8 mb boundary and target duration if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){ + + if ((endPage - tPages.getDeleted()) >= tPages.getRCount()){ + meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); + } // Create the book keeping data for the new page nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1); HIGH_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%" PRIu32, packTrack, tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum); - tPages.addRecords(1); tPages.setInt("firstkey", nextPageNum, endPage); tPages.setInt("firsttime", packTime, endPage); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); tPages.setInt("keycount", 0, endPage); tPages.setInt("avail", 0, endPage); + tPages.addRecords(1); ++endPage; } tPages.setInt("lastkeytime", packTime, endPage - 1); @@ -426,7 +430,7 @@ namespace Mist{ // Open the new page if (!bufferStart(packTrack, nextPageNum)){ // if this fails, return instantly without actually buffering the packet - WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); + WARN_MSG("Dropping packet for %s: (no page) %" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); return; } } diff --git a/src/output/output.cpp b/src/output/output.cpp index 84ffc10e..74818b94 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -276,7 +276,7 @@ namespace Mist{ // If a protocol does not support any codecs, we assume you know what you're doing if (!capa.isMember("codecs")){return true;} if (!isInitialized){initialize();} - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); if (getSupportedTracks().size()){ if (!userSelect.size()){selectDefaultTracks();} size_t mainTrack = getMainSelectedTrack(); @@ -374,7 +374,6 @@ namespace Mist{ meta.reInit(streamName, false); } if (!meta){return;} - meta.refresh(); isInitialized = true; statComm.reload(); stats(true); @@ -387,7 +386,7 @@ namespace Mist{ break; } Util::wait(500); - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); stats(); } } @@ -406,7 +405,7 @@ namespace Mist{ if (!isInitialized){return false;} } - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); bool autoSeek = buffer.size(); uint64_t seekTarget = currentTime(); @@ -533,7 +532,7 @@ namespace Mist{ WARN_MSG("Load for track %zu key %zu aborted - track does not exist", trackId, keyNum); return; } - if (!M.trackLoaded(trackId)){meta.refresh();} + if (!M.trackLoaded(trackId)){meta.reloadReplacedPagesIfNeeded();} DTSC::Keys keys(M.keys(trackId)); if (!keys.getValidCount()){ WARN_MSG("Load for track %zu key %zu aborted - track is empty", trackId, keyNum); @@ -710,7 +709,7 @@ namespace Mist{ userSelect.erase(tid); return false; } - if (!M.trackLoaded(tid)){meta.refresh();} + if (!M.trackLoaded(tid)){meta.reloadReplacedPagesIfNeeded();} if (!userSelect.count(tid) || !userSelect[tid]){ WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: user select failure (%s)", pos, tid, userSelect.count(tid)?"not connected":"not selected"); userSelect.erase(tid); @@ -1496,6 +1495,7 @@ namespace Mist{ sortedPageInfo nxt = *(buffer.begin()); + if (meta.reloadReplacedPagesIfNeeded()){return false;} if (!M.getValidTracks().count(nxt.tid)){ dropTrack(nxt.tid, "disappeared from metadata"); return false; diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index 45ec0f36..3ee6e61f 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -242,7 +242,7 @@ namespace Mist{ std::string dataPacket = myConn.Received().remove(8 + rSize); DTSC::Packet metaPack(dataPacket.data(), dataPacket.size()); DTSC::Scan metaScan = metaPack.getScan(); - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); size_t prevTracks = meta.getValidTracks().size(); size_t tNum = metaScan.getMember("tracks").getSize(); @@ -256,7 +256,7 @@ namespace Mist{ HIGH_MSG("Already had track: %s", trk.asJSON().toString().c_str()); } } - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); std::stringstream rep; rep << "DTSC_HEAD parsed, we went from " << prevTracks << " to " << meta.getValidTracks().size() << " tracks. Bring on those data packets!"; sendOk(rep.str()); diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index ffc8e7f6..4f564f8f 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -7,7 +7,7 @@ namespace Mist{ bool OutHLS::isReadyForPlay(){ if (!isInitialized){initialize();} - meta.refresh(); + meta.reloadReplacedPagesIfNeeded(); if (!M.getValidTracks().size()){return false;} uint32_t mainTrack = M.mainTrack(); if (mainTrack == INVALID_TRACK_ID){return false;} diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 8dba3285..5c699098 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -1073,7 +1073,7 @@ namespace Mist{ newState = streamStatus.mapped[0]; } - if (meta){meta.refresh();} + if (meta){meta.reloadReplacedPagesIfNeeded();} if (newState != prevState || (newState == STRMSTAT_READY && M.getValidTracks() != prevTracks)){ if (newState == STRMSTAT_READY){ reconnect();