Page limits implemented, performance timer for buffering pages added on debug level 5
This commit is contained in:
parent
9a1fb0873f
commit
5c5ab6c058
4 changed files with 20 additions and 10 deletions
|
@ -59,7 +59,7 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "
|
||||||
|
|
||||||
|
|
||||||
#ifndef SHM_DATASIZE
|
#ifndef SHM_DATASIZE
|
||||||
#define SHM_DATASIZE 25
|
#define SHM_DATASIZE 20
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
@ -83,8 +83,13 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "
|
||||||
/// The size used for server configuration pages.
|
/// The size used for server configuration pages.
|
||||||
#define DEFAULT_CONF_PAGE_SIZE 4 * 1024 * 1024
|
#define DEFAULT_CONF_PAGE_SIZE 4 * 1024 * 1024
|
||||||
|
|
||||||
/// The position from where on stream data pages are switched over to the next page.
|
/// The data size or duration from where on stream data pages are switched over to the next page.
|
||||||
|
/// The flip happens whenever either of these is matched.
|
||||||
#define FLIP_DATA_PAGE_SIZE 8 * 1024 * 1024
|
#define FLIP_DATA_PAGE_SIZE 8 * 1024 * 1024
|
||||||
|
#define FLIP_TARGET_DURATION 60000
|
||||||
|
/// The minimum duration for switching to next page. The flip will never happen before this.
|
||||||
|
/// Does not affect live streams.
|
||||||
|
#define FLIP_MIN_DURATION 20000
|
||||||
|
|
||||||
#define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name
|
#define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name
|
||||||
#define SHM_TRACK_META "MstTRAK%s@%lu" //%s stream name, %lu track ID
|
#define SHM_TRACK_META "MstTRAK%s@%lu" //%s stream name, %lu track ID
|
||||||
|
|
|
@ -79,7 +79,7 @@ JSON::Value Util::getStreamConfig(std::string streamname){
|
||||||
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
|
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
|
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false);
|
||||||
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
configLock.wait();
|
configLock.wait();
|
||||||
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
||||||
|
|
|
@ -346,10 +346,11 @@ namespace Mist {
|
||||||
nProxy.pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime();
|
nProxy.pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime();
|
||||||
newData = false;
|
newData = false;
|
||||||
}
|
}
|
||||||
nProxy.pagesByTrack[it->first].rbegin()->second.keyNum++;
|
DTSCPageData & dPage = nProxy.pagesByTrack[it->first].rbegin()->second;
|
||||||
nProxy.pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts();
|
dPage.keyNum++;
|
||||||
nProxy.pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i];
|
dPage.partNum += it->second.keys[i].getParts();
|
||||||
if (nProxy.pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) {
|
dPage.dataSize += it->second.keySizes[i];
|
||||||
|
if ((dPage.dataSize > FLIP_DATA_PAGE_SIZE || it->second.keys[i].getTime() - dPage.firstTime > FLIP_TARGET_DURATION) && it->second.keys[i].getTime() - dPage.firstTime > FLIP_MIN_DURATION) {
|
||||||
newData = true;
|
newData = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -381,7 +382,7 @@ namespace Mist {
|
||||||
|
|
||||||
}
|
}
|
||||||
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum){
|
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum){
|
||||||
if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) {
|
if ((curData[tid].dataSize > FLIP_DATA_PAGE_SIZE || myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime() - curData[tid].firstTime > FLIP_TARGET_DURATION) && myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime() - curData[tid].firstTime > FLIP_MIN_DURATION) {
|
||||||
nProxy.pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
|
nProxy.pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
|
||||||
bookKeeping[tid].first += curData[tid].keyNum;
|
bookKeeping[tid].first += curData[tid].keyNum;
|
||||||
curData[tid].keyNum = 0;
|
curData[tid].keyNum = 0;
|
||||||
|
@ -446,6 +447,7 @@ namespace Mist {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
//Update keynum to point to the corresponding page
|
//Update keynum to point to the corresponding page
|
||||||
|
uint64_t bufferTimer = Util::getMS();
|
||||||
INFO_MSG("Loading key %u from page %lu", keyNum, (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first);
|
INFO_MSG("Loading key %u from page %lu", keyNum, (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first);
|
||||||
keyNum = (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first;
|
keyNum = (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first;
|
||||||
if (!bufferStart(track, keyNum)){
|
if (!bufferStart(track, keyNum)){
|
||||||
|
@ -476,7 +478,8 @@ namespace Mist {
|
||||||
getNext();
|
getNext();
|
||||||
}
|
}
|
||||||
bufferFinalize(track);
|
bufferFinalize(track);
|
||||||
DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d for track %d", keyNum, track);
|
bufferTimer = Util::getMS() - bufferTimer;
|
||||||
|
DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d for track %d in %llums", keyNum, track, bufferTimer);
|
||||||
pageCounter[track][keyNum] = 15;
|
pageCounter[track][keyNum] = 15;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -451,16 +451,18 @@ namespace Mist {
|
||||||
nextPageNum = 1;
|
nextPageNum = 1;
|
||||||
pagesByTrack[tid][1].dataSize = DEFAULT_DATA_PAGE_SIZE;//Initialize op 25mb
|
pagesByTrack[tid][1].dataSize = DEFAULT_DATA_PAGE_SIZE;//Initialize op 25mb
|
||||||
pagesByTrack[tid][1].pageNum = 1;
|
pagesByTrack[tid][1].pageNum = 1;
|
||||||
|
pagesByTrack[tid][1].firstTime = packet.getTime();
|
||||||
}
|
}
|
||||||
//Take the last allocated page
|
//Take the last allocated page
|
||||||
std::map<unsigned long, DTSCPageData>::reverse_iterator tmpIt = pagesByTrack[tid].rbegin();
|
std::map<unsigned long, DTSCPageData>::reverse_iterator tmpIt = pagesByTrack[tid].rbegin();
|
||||||
//Compare on 8 mb boundary
|
//Compare on 8 mb boundary
|
||||||
if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE) {
|
if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE || packet.getTime() - tmpIt->second.firstTime > FLIP_TARGET_DURATION) {
|
||||||
//Create the book keeping data for the new page
|
//Create the book keeping data for the new page
|
||||||
nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum;
|
nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum;
|
||||||
INFO_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum);
|
INFO_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum);
|
||||||
pagesByTrack[tid][nextPageNum].dataSize = DEFAULT_DATA_PAGE_SIZE;
|
pagesByTrack[tid][nextPageNum].dataSize = DEFAULT_DATA_PAGE_SIZE;
|
||||||
pagesByTrack[tid][nextPageNum].pageNum = nextPageNum;
|
pagesByTrack[tid][nextPageNum].pageNum = nextPageNum;
|
||||||
|
pagesByTrack[tid][nextPageNum].firstTime = packet.getTime();
|
||||||
}
|
}
|
||||||
pagesByTrack[tid].rbegin()->second.lastKeyTime = packet.getTime();
|
pagesByTrack[tid].rbegin()->second.lastKeyTime = packet.getTime();
|
||||||
pagesByTrack[tid].rbegin()->second.keyNum++;
|
pagesByTrack[tid].rbegin()->second.keyNum++;
|
||||||
|
|
Loading…
Add table
Reference in a new issue