Fixes for DTSC metadata handling
This commit is contained in:
parent
f862263354
commit
63acbb1ddb
12 changed files with 235 additions and 71 deletions
|
@ -163,13 +163,10 @@ static inline void show_stackframe(){}
|
||||||
|
|
||||||
// Default values, these will scale up and down when needed, and are mainly used as starting values.
|
// Default values, these will scale up and down when needed, and are mainly used as starting values.
|
||||||
#define DEFAULT_TRACK_COUNT 100
|
#define DEFAULT_TRACK_COUNT 100
|
||||||
#define DEFAULT_FRAGMENT_COUNT 2000
|
#define DEFAULT_FRAGMENT_COUNT 60
|
||||||
#define DEFAULT_KEY_COUNT \
|
#define DEFAULT_KEY_COUNT 60
|
||||||
3 * DEFAULT_FRAGMENT_COUNT // A highest average of 5 keys / fragment is assumed
|
#define DEFAULT_PART_COUNT 30 * DEFAULT_KEY_COUNT
|
||||||
#define DEFAULT_PART_COUNT \
|
#define DEFAULT_PAGE_COUNT 10
|
||||||
400 * DEFAULT_KEY_COUNT // A highest average of 500 parts / key is
|
|
||||||
// assumed
|
|
||||||
#define DEFAULT_PAGE_COUNT DEFAULT_KEY_COUNT // Assume every page is a key to ensure enough space
|
|
||||||
|
|
||||||
#define DEFAULT_FRAGMENT_DURATION 1900
|
#define DEFAULT_FRAGMENT_DURATION 1900
|
||||||
|
|
||||||
|
|
230
lib/dtsc.cpp
230
lib/dtsc.cpp
|
@ -5,6 +5,7 @@
|
||||||
#include "defines.h"
|
#include "defines.h"
|
||||||
#include "dtsc.h"
|
#include "dtsc.h"
|
||||||
#include "encode.h"
|
#include "encode.h"
|
||||||
|
#include "lib/shared_memory.h"
|
||||||
#include <arpa/inet.h> //for htonl/ntohl
|
#include <arpa/inet.h> //for htonl/ntohl
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
@ -1033,7 +1034,6 @@ namespace DTSC{
|
||||||
|
|
||||||
//Ok, we have data, let's parse it, too.
|
//Ok, we have data, let's parse it, too.
|
||||||
Track &s = tracks[tIdx];
|
Track &s = tracks[tIdx];
|
||||||
s.fragments.addRecords(fragCount);
|
|
||||||
uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t));
|
uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t));
|
||||||
for (int i = 0; i < fragCount; i++){
|
for (int i = 0; i < fragCount; i++){
|
||||||
char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE);
|
char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE);
|
||||||
|
@ -1046,9 +1046,9 @@ namespace DTSC{
|
||||||
s.fragments.setInts("keys", vals + fragCount, fragCount);
|
s.fragments.setInts("keys", vals + fragCount, fragCount);
|
||||||
s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount);
|
s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount);
|
||||||
s.fragments.setInts("size", vals + (3 * fragCount), fragCount);
|
s.fragments.setInts("size", vals + (3 * fragCount), fragCount);
|
||||||
|
s.fragments.addRecords(fragCount);
|
||||||
|
|
||||||
vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t));
|
vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t));
|
||||||
s.keys.addRecords(keyCount);
|
|
||||||
uint64_t totalPartCount = 0;
|
uint64_t totalPartCount = 0;
|
||||||
for (int i = 0; i < keyCount; i++){
|
for (int i = 0; i < keyCount; i++){
|
||||||
char *ptr = keyStor + (i * DTSH_KEY_SIZE);
|
char *ptr = keyStor + (i * DTSH_KEY_SIZE);
|
||||||
|
@ -1068,9 +1068,9 @@ namespace DTSC{
|
||||||
s.keys.setInts("time", vals + (4 * keyCount), keyCount);
|
s.keys.setInts("time", vals + (4 * keyCount), keyCount);
|
||||||
s.keys.setInts("size", vals + (5 * keyCount), keyCount);
|
s.keys.setInts("size", vals + (5 * keyCount), keyCount);
|
||||||
s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount);
|
s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount);
|
||||||
|
s.keys.addRecords(keyCount);
|
||||||
|
|
||||||
vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t));
|
vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t));
|
||||||
s.parts.addRecords(partCount);
|
|
||||||
for (int i = 0; i < partCount; i++){
|
for (int i = 0; i < partCount; i++){
|
||||||
char *ptr = partStor + (i * DTSH_PART_SIZE);
|
char *ptr = partStor + (i * DTSH_PART_SIZE);
|
||||||
vals[i] = Bit::btoh24(ptr);
|
vals[i] = Bit::btoh24(ptr);
|
||||||
|
@ -1080,6 +1080,7 @@ namespace DTSC{
|
||||||
s.parts.setInts("size", vals, partCount);
|
s.parts.setInts("size", vals, partCount);
|
||||||
s.parts.setInts("duration", vals + partCount, partCount);
|
s.parts.setInts("duration", vals + partCount, partCount);
|
||||||
s.parts.setInts("offset", vals + (2 * partCount), partCount);
|
s.parts.setInts("offset", vals + (2 * partCount), partCount);
|
||||||
|
s.parts.addRecords(partCount);
|
||||||
free(vals);
|
free(vals);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1253,6 +1254,98 @@ namespace DTSC{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reloads shared memory pages that are marked as needing an update, if any
|
||||||
|
/// Returns true if a reload happened
|
||||||
|
bool Meta::reloadReplacedPagesIfNeeded(){
|
||||||
|
if (isMemBuf){return false;}//Only for shm-backed metadata
|
||||||
|
if (!stream.isReady() || !stream.getPointer("tracks")){
|
||||||
|
INFO_MSG("No track pointer, not refreshing.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
char pageName[NAME_BUFFER_SIZE];
|
||||||
|
|
||||||
|
if (stream.isReload()){
|
||||||
|
INFO_MSG("Reloading entire metadata");
|
||||||
|
streamPage.close();
|
||||||
|
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, streamName.c_str());
|
||||||
|
streamPage.init(pageName, 0, false, true);
|
||||||
|
if (!streamPage.mapped){
|
||||||
|
INFO_MSG("Page %s not found", pageName);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
stream = Util::RelAccX(streamPage.mapped, true);
|
||||||
|
tM.clear();
|
||||||
|
tracks.clear();
|
||||||
|
refresh();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ret = false;
|
||||||
|
for (size_t i = 0; i < trackList.getPresent(); i++){
|
||||||
|
if (trackList.getInt("valid", i) == 0){continue;}
|
||||||
|
bool always_load = !tracks.count(i);
|
||||||
|
if (always_load || tracks[i].track.isReload()){
|
||||||
|
ret = true;
|
||||||
|
Track &t = tracks[i];
|
||||||
|
if (always_load){
|
||||||
|
VERYHIGH_MSG("Loading track: %s", trackList.getPointer("page", i));
|
||||||
|
}else{
|
||||||
|
VERYHIGH_MSG("Reloading track: %s", trackList.getPointer("page", i));
|
||||||
|
}
|
||||||
|
IPC::sharedPage &p = tM[i];
|
||||||
|
p.init(trackList.getPointer("page", i), SHM_STREAM_TRACK_LEN, false, false);
|
||||||
|
if (!p.mapped){
|
||||||
|
WARN_MSG("Failed to load page %s, retrying later", trackList.getPointer("page", i));
|
||||||
|
tM.erase(i);
|
||||||
|
tracks.erase(i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
t.track = Util::RelAccX(p.mapped, true);
|
||||||
|
t.parts = Util::RelAccX(t.track.getPointer("parts"), true);
|
||||||
|
t.keys = Util::RelAccX(t.track.getPointer("keys"), true);
|
||||||
|
t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true);
|
||||||
|
t.pages = Util::RelAccX(t.track.getPointer("pages"), true);
|
||||||
|
|
||||||
|
t.trackIdField = t.track.getFieldData("id");
|
||||||
|
t.trackTypeField = t.track.getFieldData("type");
|
||||||
|
t.trackCodecField = t.track.getFieldData("codec");
|
||||||
|
t.trackFirstmsField = t.track.getFieldData("firstms");
|
||||||
|
t.trackLastmsField = t.track.getFieldData("lastms");
|
||||||
|
t.trackBpsField = t.track.getFieldData("bps");
|
||||||
|
t.trackMaxbpsField = t.track.getFieldData("maxbps");
|
||||||
|
t.trackLangField = t.track.getFieldData("lang");
|
||||||
|
t.trackInitField = t.track.getFieldData("init");
|
||||||
|
t.trackRateField = t.track.getFieldData("rate");
|
||||||
|
t.trackSizeField = t.track.getFieldData("size");
|
||||||
|
t.trackChannelsField = t.track.getFieldData("channels");
|
||||||
|
t.trackWidthField = t.track.getFieldData("width");
|
||||||
|
t.trackHeightField = t.track.getFieldData("height");
|
||||||
|
t.trackFpksField = t.track.getFieldData("fpks");
|
||||||
|
t.trackMissedFragsField = t.track.getFieldData("missedFrags");
|
||||||
|
|
||||||
|
t.partSizeField = t.parts.getFieldData("size");
|
||||||
|
t.partDurationField = t.parts.getFieldData("duration");
|
||||||
|
t.partOffsetField = t.parts.getFieldData("offset");
|
||||||
|
|
||||||
|
t.keyFirstPartField = t.keys.getFieldData("firstpart");
|
||||||
|
t.keyBposField = t.keys.getFieldData("bpos");
|
||||||
|
t.keyDurationField = t.keys.getFieldData("duration");
|
||||||
|
t.keyNumberField = t.keys.getFieldData("number");
|
||||||
|
t.keyPartsField = t.keys.getFieldData("parts");
|
||||||
|
t.keyTimeField = t.keys.getFieldData("time");
|
||||||
|
t.keySizeField = t.keys.getFieldData("size");
|
||||||
|
|
||||||
|
t.fragmentDurationField = t.fragments.getFieldData("duration");
|
||||||
|
t.fragmentKeysField = t.fragments.getFieldData("keys");
|
||||||
|
t.fragmentFirstKeyField = t.fragments.getFieldData("firstkey");
|
||||||
|
t.fragmentSizeField = t.fragments.getFieldData("size");
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/// Merges in track information from a given DTSC::Meta object, optionally deleting missing tracks
|
/// Merges in track information from a given DTSC::Meta object, optionally deleting missing tracks
|
||||||
/// and optionally making hard copies of the original data.
|
/// and optionally making hard copies of the original data.
|
||||||
void Meta::merge(const DTSC::Meta &M, bool deleteTracks, bool copyData){
|
void Meta::merge(const DTSC::Meta &M, bool deleteTracks, bool copyData){
|
||||||
|
@ -1342,22 +1435,33 @@ namespace DTSC{
|
||||||
t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true);
|
t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true);
|
||||||
t.pages = Util::RelAccX(t.track.getPointer("pages"), true);
|
t.pages = Util::RelAccX(t.track.getPointer("pages"), true);
|
||||||
|
|
||||||
trackList.addRecords(1);
|
|
||||||
trackList.setString(trackPageField, pageName, tNumber);
|
trackList.setString(trackPageField, pageName, tNumber);
|
||||||
trackList.setInt(trackPidField, getpid(), tNumber);
|
trackList.setInt(trackPidField, getpid(), tNumber);
|
||||||
trackList.setInt(trackSourceTidField, sourceTrack, tNumber);
|
trackList.setInt(trackSourceTidField, sourceTrack, tNumber);
|
||||||
|
trackList.addRecords(1);
|
||||||
validateTrack(tNumber);
|
validateTrack(tNumber);
|
||||||
return tNumber;
|
return tNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resizes a given track to be able to hold the given amount of fragments, keys, parts and pages.
|
/// Resizes a given track to be able to hold the given amount of fragments, keys, parts and pages.
|
||||||
/// Currently called exclusively from Meta::update(), to resize the internal structures.
|
/// Currently called exclusively from Meta::update(), to resize the internal structures.
|
||||||
void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){
|
void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount, const char * reason){
|
||||||
INFO_MSG("Track %zu now with %zu frags, %zu keys, %zu parts, %zu pages", source, fragCount,
|
char pageName[NAME_BUFFER_SIZE];
|
||||||
keyCount, partCount, pageCount);
|
IPC::semaphore resizeLock;
|
||||||
|
|
||||||
|
if (!isMemBuf){
|
||||||
|
snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), getpid(), source);
|
||||||
|
resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
|
resizeLock.wait();
|
||||||
|
}
|
||||||
|
|
||||||
size_t pageSize = (isMemBuf ? sizeMemBuf[source] : tM[source].len);
|
size_t pageSize = (isMemBuf ? sizeMemBuf[source] : tM[source].len);
|
||||||
|
|
||||||
char *orig = (char *)malloc(pageSize);
|
char *orig = (char *)malloc(pageSize);
|
||||||
|
if (!orig){
|
||||||
|
FAIL_MSG("Failed to re-allocate memory for track %zu: %s", source, strerror(errno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
memcpy(orig, (isMemBuf ? tMemBuf[source] : tM[source].mapped), pageSize);
|
memcpy(orig, (isMemBuf ? tMemBuf[source] : tM[source].mapped), pageSize);
|
||||||
|
|
||||||
Track &t = tracks[source];
|
Track &t = tracks[source];
|
||||||
|
@ -1373,17 +1477,23 @@ namespace DTSC{
|
||||||
free(tMemBuf[source]);
|
free(tMemBuf[source]);
|
||||||
tMemBuf.erase(source);
|
tMemBuf.erase(source);
|
||||||
tMemBuf[source] = (char *)malloc(newPageSize);
|
tMemBuf[source] = (char *)malloc(newPageSize);
|
||||||
|
if (!tMemBuf[source]){
|
||||||
|
FAIL_MSG("Failed to re-allocate memory for track %zu: %s", source, strerror(errno));
|
||||||
|
resizeLock.unlink();
|
||||||
|
return;
|
||||||
|
}
|
||||||
sizeMemBuf[source] = newPageSize;
|
sizeMemBuf[source] = newPageSize;
|
||||||
memset(tMemBuf[source], 0, newPageSize);
|
memset(tMemBuf[source], 0, newPageSize);
|
||||||
INFO_MSG("Done re-allocating buffer %zu", source);
|
|
||||||
|
|
||||||
t.track = Util::RelAccX(tMemBuf[source], false);
|
t.track = Util::RelAccX(tMemBuf[source], false);
|
||||||
}else{
|
}else{
|
||||||
char pageName[NAME_BUFFER_SIZE];
|
|
||||||
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), source);
|
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), source);
|
||||||
INFO_MSG("Re-allocating page %s", pageName);
|
|
||||||
tM[source].master = true;
|
tM[source].master = true;
|
||||||
tM[source].init(pageName, newPageSize, true);
|
tM[source].init(pageName, newPageSize, true);
|
||||||
|
if (!tM[source].mapped){
|
||||||
|
FAIL_MSG("Failed to re-allocate shared memory for track %zu: %s", source, strerror(errno));
|
||||||
|
resizeLock.unlink();
|
||||||
|
return;
|
||||||
|
}
|
||||||
tM[source].master = false;
|
tM[source].master = false;
|
||||||
|
|
||||||
t.track = Util::RelAccX(tM[source].mapped, false);
|
t.track = Util::RelAccX(tM[source].mapped, false);
|
||||||
|
@ -1391,6 +1501,17 @@ namespace DTSC{
|
||||||
initializeTrack(t, fragCount, keyCount, partCount, pageCount);
|
initializeTrack(t, fragCount, keyCount, partCount, pageCount);
|
||||||
|
|
||||||
Util::RelAccX origAccess(orig);
|
Util::RelAccX origAccess(orig);
|
||||||
|
Util::RelAccX origFragments(origAccess.getPointer("fragments"));
|
||||||
|
Util::RelAccX origKeys(origAccess.getPointer("keys"));
|
||||||
|
Util::RelAccX origParts(origAccess.getPointer("parts"));
|
||||||
|
Util::RelAccX origPages(origAccess.getPointer("pages"));
|
||||||
|
|
||||||
|
MEDIUM_MSG("Track %zu resizing (reason: %s): frags %" PRIu32 "->%zu, keys %" PRIu32 "->%zu, parts %" PRIu32 "->%zu, pages %" PRIu32 "->%zu",
|
||||||
|
source, reason,
|
||||||
|
origFragments.getRCount(), fragCount,
|
||||||
|
origKeys.getRCount(), keyCount,
|
||||||
|
origParts.getRCount(), partCount,
|
||||||
|
origPages.getRCount(), pageCount);
|
||||||
|
|
||||||
t.track.setInt(t.trackIdField, origAccess.getInt("id"));
|
t.track.setInt(t.trackIdField, origAccess.getInt("id"));
|
||||||
t.track.setString(t.trackTypeField, origAccess.getPointer("type"));
|
t.track.setString(t.trackTypeField, origAccess.getPointer("type"));
|
||||||
|
@ -1409,8 +1530,10 @@ namespace DTSC{
|
||||||
t.track.setInt(t.trackFpksField, origAccess.getInt("fpks"));
|
t.track.setInt(t.trackFpksField, origAccess.getInt("fpks"));
|
||||||
t.track.setInt(t.trackMissedFragsField, origAccess.getInt("missedFrags"));
|
t.track.setInt(t.trackMissedFragsField, origAccess.getInt("missedFrags"));
|
||||||
|
|
||||||
Util::RelAccX origParts(origAccess.getPointer("parts"));
|
t.parts.setEndPos(origParts.getEndPos());
|
||||||
t.parts.deleteRecords(origParts.getDeleted());
|
t.parts.setStartPos(origParts.getStartPos());
|
||||||
|
t.parts.setDeleted(origParts.getDeleted());
|
||||||
|
t.parts.setPresent(origParts.getPresent());
|
||||||
|
|
||||||
Util::FieldAccX origPartSizeAccX = origParts.getFieldAccX("size");
|
Util::FieldAccX origPartSizeAccX = origParts.getFieldAccX("size");
|
||||||
Util::FieldAccX origPartDurationAccX = origParts.getFieldAccX("duration");
|
Util::FieldAccX origPartDurationAccX = origParts.getFieldAccX("duration");
|
||||||
|
@ -1427,10 +1550,11 @@ namespace DTSC{
|
||||||
partDurationAccX.set(origPartDurationAccX.uint(i), i);
|
partDurationAccX.set(origPartDurationAccX.uint(i), i);
|
||||||
partOffsetAccX.set(origPartOffsetAccX.uint(i), i);
|
partOffsetAccX.set(origPartOffsetAccX.uint(i), i);
|
||||||
}
|
}
|
||||||
t.parts.addRecords(origParts.getPresent());
|
|
||||||
|
|
||||||
Util::RelAccX origKeys(origAccess.getPointer("keys"));
|
t.keys.setEndPos(origKeys.getEndPos());
|
||||||
t.keys.deleteRecords(origKeys.getDeleted());
|
t.keys.setStartPos(origKeys.getStartPos());
|
||||||
|
t.keys.setDeleted(origKeys.getDeleted());
|
||||||
|
t.keys.setPresent(origKeys.getPresent());
|
||||||
|
|
||||||
Util::FieldAccX origKeyFirstpartAccX = origKeys.getFieldAccX("firstpart");
|
Util::FieldAccX origKeyFirstpartAccX = origKeys.getFieldAccX("firstpart");
|
||||||
Util::FieldAccX origKeyBposAccX = origKeys.getFieldAccX("bpos");
|
Util::FieldAccX origKeyBposAccX = origKeys.getFieldAccX("bpos");
|
||||||
|
@ -1459,10 +1583,11 @@ namespace DTSC{
|
||||||
keyTimeAccX.set(origKeyTimeAccX.uint(i), i);
|
keyTimeAccX.set(origKeyTimeAccX.uint(i), i);
|
||||||
keySizeAccX.set(origKeySizeAccX.uint(i), i);
|
keySizeAccX.set(origKeySizeAccX.uint(i), i);
|
||||||
}
|
}
|
||||||
t.keys.addRecords(origKeys.getPresent());
|
|
||||||
|
|
||||||
Util::RelAccX origFragments(origAccess.getPointer("fragments"));
|
t.fragments.setEndPos(origFragments.getEndPos());
|
||||||
t.fragments.deleteRecords(origFragments.getDeleted());
|
t.fragments.setStartPos(origFragments.getStartPos());
|
||||||
|
t.fragments.setDeleted(origFragments.getDeleted());
|
||||||
|
t.fragments.setPresent(origFragments.getPresent());
|
||||||
|
|
||||||
Util::FieldAccX origFragmentDurationAccX = origFragments.getFieldAccX("duration");
|
Util::FieldAccX origFragmentDurationAccX = origFragments.getFieldAccX("duration");
|
||||||
Util::FieldAccX origFragmentKeysAccX = origFragments.getFieldAccX("keys");
|
Util::FieldAccX origFragmentKeysAccX = origFragments.getFieldAccX("keys");
|
||||||
|
@ -1482,10 +1607,11 @@ namespace DTSC{
|
||||||
fragmentFirstkeyAccX.set(origFragmentFirstkeyAccX.uint(i), i);
|
fragmentFirstkeyAccX.set(origFragmentFirstkeyAccX.uint(i), i);
|
||||||
fragmentSizeAccX.set(origFragmentSizeAccX.uint(i), i);
|
fragmentSizeAccX.set(origFragmentSizeAccX.uint(i), i);
|
||||||
}
|
}
|
||||||
t.fragments.addRecords(origFragments.getPresent());
|
|
||||||
|
|
||||||
Util::RelAccX origPages(origAccess.getPointer("pages"));
|
t.pages.setEndPos(origPages.getEndPos());
|
||||||
t.pages.deleteRecords(origPages.getDeleted());
|
t.pages.setStartPos(origPages.getStartPos());
|
||||||
|
t.pages.setDeleted(origPages.getDeleted());
|
||||||
|
t.pages.setPresent(origPages.getPresent());
|
||||||
|
|
||||||
Util::FieldAccX origPageFirstkeyAccX = origPages.getFieldAccX("firstkey");
|
Util::FieldAccX origPageFirstkeyAccX = origPages.getFieldAccX("firstkey");
|
||||||
Util::FieldAccX origPageKeycountAccX = origPages.getFieldAccX("keycount");
|
Util::FieldAccX origPageKeycountAccX = origPages.getFieldAccX("keycount");
|
||||||
|
@ -1514,9 +1640,10 @@ namespace DTSC{
|
||||||
pageFirsttimeAccX.set(origPageFirsttimeAccX.uint(i), i);
|
pageFirsttimeAccX.set(origPageFirsttimeAccX.uint(i), i);
|
||||||
pageLastkeytimeAccX.set(origPageLastkeytimeAccX.uint(i), i);
|
pageLastkeytimeAccX.set(origPageLastkeytimeAccX.uint(i), i);
|
||||||
}
|
}
|
||||||
t.pages.addRecords(origPages.getPresent());
|
t.track.setReady();
|
||||||
|
|
||||||
free(orig);
|
free(orig);
|
||||||
|
resizeLock.unlink();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t Meta::addDelayedTrack(size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){
|
size_t Meta::addDelayedTrack(size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){
|
||||||
|
@ -1568,10 +1695,11 @@ namespace DTSC{
|
||||||
t.track = Util::RelAccX(tM[tNumber].mapped, false);
|
t.track = Util::RelAccX(tM[tNumber].mapped, false);
|
||||||
}
|
}
|
||||||
initializeTrack(t, fragCount, keyCount, partCount, pageCount);
|
initializeTrack(t, fragCount, keyCount, partCount, pageCount);
|
||||||
trackList.addRecords(1);
|
t.track.setReady();
|
||||||
trackList.setString(trackPageField, pageName, tNumber);
|
trackList.setString(trackPageField, pageName, tNumber);
|
||||||
trackList.setInt(trackPidField, getpid(), tNumber);
|
trackList.setInt(trackPidField, getpid(), tNumber);
|
||||||
trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber);
|
trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber);
|
||||||
|
trackList.addRecords(1);
|
||||||
if (setValid){validateTrack(tNumber);}
|
if (setValid){validateTrack(tNumber);}
|
||||||
if (!isMemBuf){trackLock.post();}
|
if (!isMemBuf){trackLock.post();}
|
||||||
return tNumber;
|
return tNumber;
|
||||||
|
@ -1602,7 +1730,6 @@ namespace DTSC{
|
||||||
t.track.addField("missedFrags", RAX_32UINT);
|
t.track.addField("missedFrags", RAX_32UINT);
|
||||||
|
|
||||||
t.track.setRCount(1);
|
t.track.setRCount(1);
|
||||||
t.track.setReady();
|
|
||||||
t.track.addRecords(1);
|
t.track.addRecords(1);
|
||||||
|
|
||||||
t.parts = Util::RelAccX(t.track.getPointer("parts"), false);
|
t.parts = Util::RelAccX(t.track.getPointer("parts"), false);
|
||||||
|
@ -1971,9 +2098,12 @@ namespace DTSC{
|
||||||
|
|
||||||
std::set<size_t> Meta::getValidTracks(bool skipEmpty) const{
|
std::set<size_t> Meta::getValidTracks(bool skipEmpty) const{
|
||||||
std::set<size_t> res;
|
std::set<size_t> res;
|
||||||
if (!(*this) && !isMemBuf){return res;}
|
if (!(*this) && !isMemBuf){
|
||||||
|
INFO_MSG("Shared metadata not ready yet - no tracks valid");
|
||||||
|
return res;
|
||||||
|
}
|
||||||
uint64_t firstValid = trackList.getDeleted();
|
uint64_t firstValid = trackList.getDeleted();
|
||||||
uint64_t beyondLast = firstValid + trackList.getPresent();
|
uint64_t beyondLast = trackList.getEndPos();
|
||||||
for (size_t i = firstValid; i < beyondLast; i++){
|
for (size_t i = firstValid; i < beyondLast; i++){
|
||||||
if (trackList.getInt(trackValidField, i) & trackValidMask){res.insert(i);}
|
if (trackList.getInt(trackValidField, i) & trackValidMask){res.insert(i);}
|
||||||
if (trackList.getInt(trackSourceTidField, i) != INVALID_TRACK_ID &&
|
if (trackList.getInt(trackSourceTidField, i) != INVALID_TRACK_ID &&
|
||||||
|
@ -2008,7 +2138,7 @@ namespace DTSC{
|
||||||
}
|
}
|
||||||
|
|
||||||
void Meta::removeEmptyTracks(){
|
void Meta::removeEmptyTracks(){
|
||||||
refresh();
|
reloadReplacedPagesIfNeeded();
|
||||||
std::set<size_t> validTracks = getValidTracks();
|
std::set<size_t> validTracks = getValidTracks();
|
||||||
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
|
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
|
||||||
if (!tracks.at(*it).parts.getPresent()){removeTrack(*it);}
|
if (!tracks.at(*it).parts.getPresent()){removeTrack(*it);}
|
||||||
|
@ -2035,9 +2165,29 @@ namespace DTSC{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes the first key from the memory structure and caches.
|
/// Removes the first key from the memory structure and caches.
|
||||||
void Meta::removeFirstKey(size_t trackIdx){
|
bool Meta::removeFirstKey(size_t trackIdx){
|
||||||
|
|
||||||
|
char pageName[NAME_BUFFER_SIZE];
|
||||||
|
IPC::semaphore resizeLock;
|
||||||
|
|
||||||
|
if (!isMemBuf){
|
||||||
|
__pid_t trPid = trackList.getInt(trackPidField, trackIdx);
|
||||||
|
snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), trPid, trackIdx);
|
||||||
|
resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
|
if (!resizeLock.tryWait()){
|
||||||
|
MEDIUM_MSG("Metadata is busy, delaying deletion of key a bit");
|
||||||
|
resizeLock.close();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (reloadReplacedPagesIfNeeded()){
|
||||||
|
MEDIUM_MSG("Metadata just got replaced, delaying deletion of key a bit");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
Track &t = tracks[trackIdx];
|
Track &t = tracks[trackIdx];
|
||||||
|
DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+t.keys.getInt(t.keyPartsField, t.keys.getDeleted()), t.parts.getPresent());
|
||||||
t.parts.deleteRecords(t.keys.getInt(t.keyPartsField, t.keys.getDeleted()));
|
t.parts.deleteRecords(t.keys.getInt(t.keyPartsField, t.keys.getDeleted()));
|
||||||
|
DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.keys.getDeleted(), t.keys.getDeleted()+1, t.keys.getPresent());
|
||||||
t.keys.deleteRecords(1);
|
t.keys.deleteRecords(1);
|
||||||
if (t.fragments.getInt(t.fragmentFirstKeyField, t.fragments.getDeleted()) < t.keys.getDeleted()){
|
if (t.fragments.getInt(t.fragmentFirstKeyField, t.fragments.getDeleted()) < t.keys.getDeleted()){
|
||||||
t.fragments.deleteRecords(1);
|
t.fragments.deleteRecords(1);
|
||||||
|
@ -2055,6 +2205,8 @@ namespace DTSC{
|
||||||
t.pages.deleteRecords(1);
|
t.pages.deleteRecords(1);
|
||||||
}
|
}
|
||||||
setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted()));
|
setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted()));
|
||||||
|
if (resizeLock){resizeLock.unlink();}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
///\brief Updates a meta object given a DTSC::Packet with byte position override.
|
///\brief Updates a meta object given a DTSC::Packet with byte position override.
|
||||||
|
@ -2202,10 +2354,8 @@ namespace DTSC{
|
||||||
|
|
||||||
uint32_t newPartNum = t.parts.getEndPos();
|
uint32_t newPartNum = t.parts.getEndPos();
|
||||||
if ((newPartNum - t.parts.getDeleted()) >= t.parts.getRCount()){
|
if ((newPartNum - t.parts.getDeleted()) >= t.parts.getRCount()){
|
||||||
resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount(), t.parts.getRCount() * 2,
|
resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount(), t.parts.getRCount() * 2, t.pages.getRCount(), "not enough parts");
|
||||||
t.keys.getRCount());
|
|
||||||
}
|
}
|
||||||
t.parts.addRecords(1);
|
|
||||||
t.parts.setInt(t.partSizeField, packDataSize, newPartNum);
|
t.parts.setInt(t.partSizeField, packDataSize, newPartNum);
|
||||||
t.parts.setInt(t.partOffsetField, packOffset, newPartNum);
|
t.parts.setInt(t.partOffsetField, packOffset, newPartNum);
|
||||||
if (newPartNum){
|
if (newPartNum){
|
||||||
|
@ -2214,17 +2364,15 @@ namespace DTSC{
|
||||||
}else{
|
}else{
|
||||||
t.parts.setInt(t.partDurationField, 0, newPartNum);
|
t.parts.setInt(t.partDurationField, 0, newPartNum);
|
||||||
}
|
}
|
||||||
t.track.setInt(t.trackLastmsField, packTime);
|
t.parts.addRecords(1);
|
||||||
|
|
||||||
uint32_t newKeyNum = t.keys.getEndPos();
|
uint32_t newKeyNum = t.keys.getEndPos();
|
||||||
if (isKeyframe || newKeyNum == 0 ||
|
if (isKeyframe || newKeyNum == 0 ||
|
||||||
(getType(tNumber) != "video" && packTime >= AUDIO_KEY_INTERVAL &&
|
(getType(tNumber) != "video" && packTime >= AUDIO_KEY_INTERVAL &&
|
||||||
packTime - t.keys.getInt(t.keyTimeField, newKeyNum - 1) >= AUDIO_KEY_INTERVAL)){
|
packTime - t.keys.getInt(t.keyTimeField, newKeyNum - 1) >= AUDIO_KEY_INTERVAL)){
|
||||||
if ((newKeyNum - t.keys.getDeleted()) >= t.keys.getRCount()){
|
if ((newKeyNum - t.keys.getDeleted()) >= t.keys.getRCount()){
|
||||||
resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount() * 2, t.parts.getRCount(),
|
resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount() * 2, t.parts.getRCount(), t.pages.getRCount(), "not enough keys");
|
||||||
t.keys.getRCount() * 2);
|
|
||||||
}
|
}
|
||||||
t.keys.addRecords(1);
|
|
||||||
t.keys.setInt(t.keyBposField, packBytePos, newKeyNum);
|
t.keys.setInt(t.keyBposField, packBytePos, newKeyNum);
|
||||||
t.keys.setInt(t.keyTimeField, packTime, newKeyNum);
|
t.keys.setInt(t.keyTimeField, packTime, newKeyNum);
|
||||||
t.keys.setInt(t.keyPartsField, 0, newKeyNum);
|
t.keys.setInt(t.keyPartsField, 0, newKeyNum);
|
||||||
|
@ -2242,6 +2390,7 @@ namespace DTSC{
|
||||||
}else{
|
}else{
|
||||||
t.keys.setInt(t.keyFirstPartField, 0, newKeyNum);
|
t.keys.setInt(t.keyFirstPartField, 0, newKeyNum);
|
||||||
}
|
}
|
||||||
|
t.keys.addRecords(1);
|
||||||
if (packBytePos){t.track.setInt(t.trackFirstmsField, t.keys.getInt(t.keyTimeField, 0));}
|
if (packBytePos){t.track.setInt(t.trackFirstmsField, t.keys.getInt(t.keyTimeField, 0));}
|
||||||
|
|
||||||
uint32_t newFragNum = t.fragments.getEndPos();
|
uint32_t newFragNum = t.fragments.getEndPos();
|
||||||
|
@ -2250,10 +2399,8 @@ namespace DTSC{
|
||||||
(packTime - getMinimumFragmentDuration()) >=
|
(packTime - getMinimumFragmentDuration()) >=
|
||||||
t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField, newFragNum - 1)))){
|
t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField, newFragNum - 1)))){
|
||||||
if ((newFragNum - t.fragments.getDeleted()) >= t.fragments.getRCount()){
|
if ((newFragNum - t.fragments.getDeleted()) >= t.fragments.getRCount()){
|
||||||
resizeTrack(tNumber, t.fragments.getRCount() * 2, t.keys.getRCount(), t.parts.getRCount(),
|
resizeTrack(tNumber, t.fragments.getRCount() * 2, t.keys.getRCount(), t.parts.getRCount(), t.pages.getRCount(), "not enough frags");
|
||||||
t.keys.getRCount());
|
|
||||||
}
|
}
|
||||||
t.fragments.addRecords(1);
|
|
||||||
if (newFragNum){
|
if (newFragNum){
|
||||||
t.fragments.setInt(t.fragmentDurationField,
|
t.fragments.setInt(t.fragmentDurationField,
|
||||||
packTime - t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField,
|
packTime - t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField,
|
||||||
|
@ -2278,6 +2425,7 @@ namespace DTSC{
|
||||||
t.fragments.setInt(t.fragmentSizeField, 0, newFragNum);
|
t.fragments.setInt(t.fragmentSizeField, 0, newFragNum);
|
||||||
t.fragments.setInt(t.fragmentKeysField, 1, newFragNum);
|
t.fragments.setInt(t.fragmentKeysField, 1, newFragNum);
|
||||||
t.fragments.setInt(t.fragmentFirstKeyField, t.keys.getInt(t.keyNumberField, newKeyNum), newFragNum);
|
t.fragments.setInt(t.fragmentFirstKeyField, t.keys.getInt(t.keyNumberField, newKeyNum), newFragNum);
|
||||||
|
t.fragments.addRecords(1);
|
||||||
}else{
|
}else{
|
||||||
t.fragments.setInt(t.fragmentKeysField,
|
t.fragments.setInt(t.fragmentKeysField,
|
||||||
t.fragments.getInt(t.fragmentKeysField, newFragNum - 1) + 1, newFragNum - 1);
|
t.fragments.getInt(t.fragmentKeysField, newFragNum - 1) + 1, newFragNum - 1);
|
||||||
|
@ -2296,6 +2444,7 @@ namespace DTSC{
|
||||||
uint32_t lastFragNum = t.fragments.getEndPos() - 1;
|
uint32_t lastFragNum = t.fragments.getEndPos() - 1;
|
||||||
t.fragments.setInt(t.fragmentSizeField,
|
t.fragments.setInt(t.fragmentSizeField,
|
||||||
t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum);
|
t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum);
|
||||||
|
t.track.setInt(t.trackLastmsField, packTime);
|
||||||
markUpdated(tNumber);
|
markUpdated(tNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2408,8 +2557,8 @@ namespace DTSC{
|
||||||
memset(tMemBuf[i], 0, SHM_STREAM_TRACK_LEN);
|
memset(tMemBuf[i], 0, SHM_STREAM_TRACK_LEN);
|
||||||
t.track = Util::RelAccX(tMemBuf[i], false);
|
t.track = Util::RelAccX(tMemBuf[i], false);
|
||||||
initializeTrack(t);
|
initializeTrack(t);
|
||||||
|
|
||||||
t.track.flowFrom(src.tracks.at(i).track);
|
t.track.flowFrom(src.tracks.at(i).track);
|
||||||
|
t.track.setReady();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2434,6 +2583,7 @@ namespace DTSC{
|
||||||
t.track = Util::RelAccX(tM[i].mapped, false);
|
t.track = Util::RelAccX(tM[i].mapped, false);
|
||||||
initializeTrack(t);
|
initializeTrack(t);
|
||||||
t.track.flowFrom(M.tracks[i].track);
|
t.track.flowFrom(M.tracks[i].track);
|
||||||
|
t.track.setReady();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -281,6 +281,7 @@ namespace DTSC{
|
||||||
void addTrackFrom(const DTSC::Scan &src);
|
void addTrackFrom(const DTSC::Scan &src);
|
||||||
|
|
||||||
void refresh();
|
void refresh();
|
||||||
|
bool reloadReplacedPagesIfNeeded();
|
||||||
|
|
||||||
operator bool() const;
|
operator bool() const;
|
||||||
|
|
||||||
|
@ -302,7 +303,7 @@ namespace DTSC{
|
||||||
size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT,
|
size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT,
|
||||||
bool setValid = true);
|
bool setValid = true);
|
||||||
void resizeTrack(size_t source, size_t fragCount = DEFAULT_FRAGMENT_COUNT, size_t keyCount = DEFAULT_KEY_COUNT,
|
void resizeTrack(size_t source, size_t fragCount = DEFAULT_FRAGMENT_COUNT, size_t keyCount = DEFAULT_KEY_COUNT,
|
||||||
size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT);
|
size_t partCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT, const char * reason = "");
|
||||||
void initializeTrack(Track &t, size_t fragCount = DEFAULT_FRAGMENT_COUNT, size_t keyCount = DEFAULT_KEY_COUNT,
|
void initializeTrack(Track &t, size_t fragCount = DEFAULT_FRAGMENT_COUNT, size_t keyCount = DEFAULT_KEY_COUNT,
|
||||||
size_t parCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT);
|
size_t parCount = DEFAULT_PART_COUNT, size_t pageCount = DEFAULT_PAGE_COUNT);
|
||||||
|
|
||||||
|
@ -424,7 +425,7 @@ namespace DTSC{
|
||||||
void validateTrack(size_t trackIdx, uint8_t validType = TRACK_VALID_ALL);
|
void validateTrack(size_t trackIdx, uint8_t validType = TRACK_VALID_ALL);
|
||||||
void removeEmptyTracks();
|
void removeEmptyTracks();
|
||||||
void removeTrack(size_t trackIdx);
|
void removeTrack(size_t trackIdx);
|
||||||
void removeFirstKey(size_t trackIdx);
|
bool removeFirstKey(size_t trackIdx);
|
||||||
|
|
||||||
size_t mainTrack() const;
|
size_t mainTrack() const;
|
||||||
uint32_t biggestFragment(uint32_t idx = INVALID_TRACK_ID) const;
|
uint32_t biggestFragment(uint32_t idx = INVALID_TRACK_ID) const;
|
||||||
|
|
|
@ -507,6 +507,14 @@ namespace IPC{
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (handle > 0 && handle < 3){
|
||||||
|
int tmpHandle = fcntl(handle, F_DUPFD, 3);
|
||||||
|
if (tmpHandle >= 3){
|
||||||
|
DONTEVEN_MSG("Remapped handle for page %s from %d to %d!", name.c_str(), handle, tmpHandle);
|
||||||
|
::close(handle);
|
||||||
|
handle = tmpHandle;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (master){
|
if (master){
|
||||||
if (ftruncate(handle, len) < 0){
|
if (ftruncate(handle, len) < 0){
|
||||||
FAIL_MSG("truncate to %" PRIu64 " for page %s failed: %s", len, name.c_str(), strerror(errno));
|
FAIL_MSG("truncate to %" PRIu64 " for page %s failed: %s", len, name.c_str(), strerror(errno));
|
||||||
|
|
|
@ -832,7 +832,9 @@ namespace Util{
|
||||||
if (*hdrPresent >= amount){
|
if (*hdrPresent >= amount){
|
||||||
*hdrPresent -= amount; // decrease records present
|
*hdrPresent -= amount; // decrease records present
|
||||||
}else{
|
}else{
|
||||||
|
BACKTRACE;
|
||||||
WARN_MSG("Depleting recordCount!");
|
WARN_MSG("Depleting recordCount!");
|
||||||
|
exit(1);
|
||||||
*hdrPresent = 0;
|
*hdrPresent = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1063,6 +1063,9 @@ namespace Mist{
|
||||||
for (uint32_t j = 0; j < endKey; j++){
|
for (uint32_t j = 0; j < endKey; j++){
|
||||||
uint64_t keyTime = keys.getTime(j);
|
uint64_t keyTime = keys.getTime(j);
|
||||||
if (newData){
|
if (newData){
|
||||||
|
if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){
|
||||||
|
meta.resizeTrack(*it, M.fragments(*it).getRCount(), M.keys(*it).getRCount(), M.parts(*it).getRCount(), tPages.getRCount() * 2, "not enough pages");
|
||||||
|
}
|
||||||
tPages.addRecords(1);
|
tPages.addRecords(1);
|
||||||
++pageNum;
|
++pageNum;
|
||||||
tPages.setInt("firsttime", keyTime, pageNum);
|
tPages.setInt("firsttime", keyTime, pageNum);
|
||||||
|
|
|
@ -366,8 +366,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Alright, everything looks good, let's delete the key and possibly also fragment
|
// Alright, everything looks good, let's delete the key and possibly also fragment
|
||||||
meta.removeFirstKey(tid);
|
return meta.removeFirstKey(tid);
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void inputBuffer::finish(){
|
void inputBuffer::finish(){
|
||||||
|
@ -399,7 +398,7 @@ namespace Mist{
|
||||||
|
|
||||||
curPageNum.erase(tid);
|
curPageNum.erase(tid);
|
||||||
INFO_MSG("Should remove track %zu", tid);
|
INFO_MSG("Should remove track %zu", tid);
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
meta.removeTrack(tid);
|
meta.removeTrack(tid);
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
if (!M.getValidTracks().size()){
|
if (!M.getValidTracks().size()){
|
||||||
|
@ -412,7 +411,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
void inputBuffer::removeUnused(){
|
void inputBuffer::removeUnused(){
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
// first remove all tracks that have not been updated for too long
|
// first remove all tracks that have not been updated for too long
|
||||||
bool changed = true;
|
bool changed = true;
|
||||||
while (changed){
|
while (changed){
|
||||||
|
@ -456,7 +455,7 @@ namespace Mist{
|
||||||
streamName.c_str(), i, type.c_str(), codec.c_str(), firstms / 1000,
|
streamName.c_str(), i, type.c_str(), codec.c_str(), firstms / 1000,
|
||||||
lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000);
|
lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000);
|
||||||
}
|
}
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
removeTrack(i);
|
removeTrack(i);
|
||||||
changed = true;
|
changed = true;
|
||||||
break;
|
break;
|
||||||
|
@ -500,7 +499,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
void inputBuffer::userLeadIn(){
|
void inputBuffer::userLeadIn(){
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
// Reload the configuration to make sure we stay up to date with changes through the api
|
// Reload the configuration to make sure we stay up to date with changes through the api
|
||||||
if (Util::epoch() - lastReTime > 4){preRun();}
|
if (Util::epoch() - lastReTime > 4){preRun();}
|
||||||
|
@ -529,7 +528,7 @@ namespace Mist{
|
||||||
void inputBuffer::userOnDisconnect(size_t id){
|
void inputBuffer::userOnDisconnect(size_t id){
|
||||||
if (sourcePids.count(id)){
|
if (sourcePids.count(id)){
|
||||||
INFO_MSG("Disconnected track %zu", sourcePids[id]);
|
INFO_MSG("Disconnected track %zu", sourcePids[id]);
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
removeTrack(sourcePids[id]);
|
removeTrack(sourcePids[id]);
|
||||||
sourcePids.erase(id);
|
sourcePids.erase(id);
|
||||||
}
|
}
|
||||||
|
|
14
src/io.cpp
14
src/io.cpp
|
@ -20,7 +20,7 @@ namespace Mist{
|
||||||
size_t InOutBase::getMainSelectedTrack(){
|
size_t InOutBase::getMainSelectedTrack(){
|
||||||
if (!userSelect.size()){return INVALID_TRACK_ID;}
|
if (!userSelect.size()){return INVALID_TRACK_ID;}
|
||||||
size_t bestSoFar = INVALID_TRACK_ID;
|
size_t bestSoFar = INVALID_TRACK_ID;
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
||||||
if (meta.trackValid(it->first)){
|
if (meta.trackValid(it->first)){
|
||||||
if (meta.getType(it->first) == "video"){return it->first;}
|
if (meta.getType(it->first) == "video"){return it->first;}
|
||||||
|
@ -324,7 +324,7 @@ namespace Mist{
|
||||||
|
|
||||||
void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
|
void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
|
||||||
size_t packDataSize, uint64_t packBytePos, bool isKeyframe){
|
size_t packDataSize, uint64_t packBytePos, bool isKeyframe){
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
meta.setLive();
|
meta.setLive();
|
||||||
|
|
||||||
// Store the trackid for easier access
|
// Store the trackid for easier access
|
||||||
|
@ -368,28 +368,32 @@ namespace Mist{
|
||||||
// If there is no page, create it
|
// If there is no page, create it
|
||||||
if (!endPage){
|
if (!endPage){
|
||||||
nextPageNum = 0;
|
nextPageNum = 0;
|
||||||
tPages.addRecords(1);
|
|
||||||
tPages.setInt("firstkey", 0, 0);
|
tPages.setInt("firstkey", 0, 0);
|
||||||
tPages.setInt("firsttime", packTime, 0);
|
tPages.setInt("firsttime", packTime, 0);
|
||||||
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0);
|
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0);
|
||||||
tPages.setInt("keycount", 0, 0);
|
tPages.setInt("keycount", 0, 0);
|
||||||
tPages.setInt("avail", 0, 0);
|
tPages.setInt("avail", 0, 0);
|
||||||
|
tPages.addRecords(1);
|
||||||
++endPage;
|
++endPage;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1);
|
uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1);
|
||||||
// Compare on 8 mb boundary and target duration
|
// Compare on 8 mb boundary and target duration
|
||||||
if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){
|
if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){
|
||||||
|
|
||||||
|
if ((endPage - tPages.getDeleted()) >= tPages.getRCount()){
|
||||||
|
meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages");
|
||||||
|
}
|
||||||
// Create the book keeping data for the new page
|
// Create the book keeping data for the new page
|
||||||
nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1);
|
nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1);
|
||||||
HIGH_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%" PRIu32, packTrack,
|
HIGH_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%" PRIu32, packTrack,
|
||||||
tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum);
|
tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum);
|
||||||
tPages.addRecords(1);
|
|
||||||
tPages.setInt("firstkey", nextPageNum, endPage);
|
tPages.setInt("firstkey", nextPageNum, endPage);
|
||||||
tPages.setInt("firsttime", packTime, endPage);
|
tPages.setInt("firsttime", packTime, endPage);
|
||||||
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage);
|
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage);
|
||||||
tPages.setInt("keycount", 0, endPage);
|
tPages.setInt("keycount", 0, endPage);
|
||||||
tPages.setInt("avail", 0, endPage);
|
tPages.setInt("avail", 0, endPage);
|
||||||
|
tPages.addRecords(1);
|
||||||
++endPage;
|
++endPage;
|
||||||
}
|
}
|
||||||
tPages.setInt("lastkeytime", packTime, endPage - 1);
|
tPages.setInt("lastkeytime", packTime, endPage - 1);
|
||||||
|
@ -426,7 +430,7 @@ namespace Mist{
|
||||||
// Open the new page
|
// Open the new page
|
||||||
if (!bufferStart(packTrack, nextPageNum)){
|
if (!bufferStart(packTrack, nextPageNum)){
|
||||||
// if this fails, return instantly without actually buffering the packet
|
// if this fails, return instantly without actually buffering the packet
|
||||||
WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime);
|
WARN_MSG("Dropping packet for %s: (no page) %" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,7 +276,7 @@ namespace Mist{
|
||||||
// If a protocol does not support any codecs, we assume you know what you're doing
|
// If a protocol does not support any codecs, we assume you know what you're doing
|
||||||
if (!capa.isMember("codecs")){return true;}
|
if (!capa.isMember("codecs")){return true;}
|
||||||
if (!isInitialized){initialize();}
|
if (!isInitialized){initialize();}
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
if (getSupportedTracks().size()){
|
if (getSupportedTracks().size()){
|
||||||
if (!userSelect.size()){selectDefaultTracks();}
|
if (!userSelect.size()){selectDefaultTracks();}
|
||||||
size_t mainTrack = getMainSelectedTrack();
|
size_t mainTrack = getMainSelectedTrack();
|
||||||
|
@ -374,7 +374,6 @@ namespace Mist{
|
||||||
meta.reInit(streamName, false);
|
meta.reInit(streamName, false);
|
||||||
}
|
}
|
||||||
if (!meta){return;}
|
if (!meta){return;}
|
||||||
meta.refresh();
|
|
||||||
isInitialized = true;
|
isInitialized = true;
|
||||||
statComm.reload();
|
statComm.reload();
|
||||||
stats(true);
|
stats(true);
|
||||||
|
@ -387,7 +386,7 @@ namespace Mist{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Util::wait(500);
|
Util::wait(500);
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
stats();
|
stats();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -406,7 +405,7 @@ namespace Mist{
|
||||||
if (!isInitialized){return false;}
|
if (!isInitialized){return false;}
|
||||||
}
|
}
|
||||||
|
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
|
|
||||||
bool autoSeek = buffer.size();
|
bool autoSeek = buffer.size();
|
||||||
uint64_t seekTarget = currentTime();
|
uint64_t seekTarget = currentTime();
|
||||||
|
@ -533,7 +532,7 @@ namespace Mist{
|
||||||
WARN_MSG("Load for track %zu key %zu aborted - track does not exist", trackId, keyNum);
|
WARN_MSG("Load for track %zu key %zu aborted - track does not exist", trackId, keyNum);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!M.trackLoaded(trackId)){meta.refresh();}
|
if (!M.trackLoaded(trackId)){meta.reloadReplacedPagesIfNeeded();}
|
||||||
DTSC::Keys keys(M.keys(trackId));
|
DTSC::Keys keys(M.keys(trackId));
|
||||||
if (!keys.getValidCount()){
|
if (!keys.getValidCount()){
|
||||||
WARN_MSG("Load for track %zu key %zu aborted - track is empty", trackId, keyNum);
|
WARN_MSG("Load for track %zu key %zu aborted - track is empty", trackId, keyNum);
|
||||||
|
@ -710,7 +709,7 @@ namespace Mist{
|
||||||
userSelect.erase(tid);
|
userSelect.erase(tid);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!M.trackLoaded(tid)){meta.refresh();}
|
if (!M.trackLoaded(tid)){meta.reloadReplacedPagesIfNeeded();}
|
||||||
if (!userSelect.count(tid) || !userSelect[tid]){
|
if (!userSelect.count(tid) || !userSelect[tid]){
|
||||||
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: user select failure (%s)", pos, tid, userSelect.count(tid)?"not connected":"not selected");
|
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: user select failure (%s)", pos, tid, userSelect.count(tid)?"not connected":"not selected");
|
||||||
userSelect.erase(tid);
|
userSelect.erase(tid);
|
||||||
|
@ -1496,6 +1495,7 @@ namespace Mist{
|
||||||
|
|
||||||
sortedPageInfo nxt = *(buffer.begin());
|
sortedPageInfo nxt = *(buffer.begin());
|
||||||
|
|
||||||
|
if (meta.reloadReplacedPagesIfNeeded()){return false;}
|
||||||
if (!M.getValidTracks().count(nxt.tid)){
|
if (!M.getValidTracks().count(nxt.tid)){
|
||||||
dropTrack(nxt.tid, "disappeared from metadata");
|
dropTrack(nxt.tid, "disappeared from metadata");
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -242,7 +242,7 @@ namespace Mist{
|
||||||
std::string dataPacket = myConn.Received().remove(8 + rSize);
|
std::string dataPacket = myConn.Received().remove(8 + rSize);
|
||||||
DTSC::Packet metaPack(dataPacket.data(), dataPacket.size());
|
DTSC::Packet metaPack(dataPacket.data(), dataPacket.size());
|
||||||
DTSC::Scan metaScan = metaPack.getScan();
|
DTSC::Scan metaScan = metaPack.getScan();
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
size_t prevTracks = meta.getValidTracks().size();
|
size_t prevTracks = meta.getValidTracks().size();
|
||||||
|
|
||||||
size_t tNum = metaScan.getMember("tracks").getSize();
|
size_t tNum = metaScan.getMember("tracks").getSize();
|
||||||
|
@ -256,7 +256,7 @@ namespace Mist{
|
||||||
HIGH_MSG("Already had track: %s", trk.asJSON().toString().c_str());
|
HIGH_MSG("Already had track: %s", trk.asJSON().toString().c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
std::stringstream rep;
|
std::stringstream rep;
|
||||||
rep << "DTSC_HEAD parsed, we went from " << prevTracks << " to " << meta.getValidTracks().size() << " tracks. Bring on those data packets!";
|
rep << "DTSC_HEAD parsed, we went from " << prevTracks << " to " << meta.getValidTracks().size() << " tracks. Bring on those data packets!";
|
||||||
sendOk(rep.str());
|
sendOk(rep.str());
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
namespace Mist{
|
namespace Mist{
|
||||||
bool OutHLS::isReadyForPlay(){
|
bool OutHLS::isReadyForPlay(){
|
||||||
if (!isInitialized){initialize();}
|
if (!isInitialized){initialize();}
|
||||||
meta.refresh();
|
meta.reloadReplacedPagesIfNeeded();
|
||||||
if (!M.getValidTracks().size()){return false;}
|
if (!M.getValidTracks().size()){return false;}
|
||||||
uint32_t mainTrack = M.mainTrack();
|
uint32_t mainTrack = M.mainTrack();
|
||||||
if (mainTrack == INVALID_TRACK_ID){return false;}
|
if (mainTrack == INVALID_TRACK_ID){return false;}
|
||||||
|
|
|
@ -1073,7 +1073,7 @@ namespace Mist{
|
||||||
newState = streamStatus.mapped[0];
|
newState = streamStatus.mapped[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (meta){meta.refresh();}
|
if (meta){meta.reloadReplacedPagesIfNeeded();}
|
||||||
if (newState != prevState || (newState == STRMSTAT_READY && M.getValidTracks() != prevTracks)){
|
if (newState != prevState || (newState == STRMSTAT_READY && M.getValidTracks() != prevTracks)){
|
||||||
if (newState == STRMSTAT_READY){
|
if (newState == STRMSTAT_READY){
|
||||||
reconnect();
|
reconnect();
|
||||||
|
|
Loading…
Add table
Reference in a new issue