diff --git a/lib/defines.h b/lib/defines.h index 6b4ebc04..ceb8517e 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -59,7 +59,7 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #ifndef SHM_DATASIZE -#define SHM_DATASIZE 25 +#define SHM_DATASIZE 20 #endif @@ -83,8 +83,13 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " /// The size used for server configuration pages. #define DEFAULT_CONF_PAGE_SIZE 4 * 1024 * 1024 -/// The position from where on stream data pages are switched over to the next page. +/// The data size or duration from where on stream data pages are switched over to the next page. +/// The flip happens whenever either of these is matched. #define FLIP_DATA_PAGE_SIZE 8 * 1024 * 1024 +#define FLIP_TARGET_DURATION 60000 +/// The minimum duration for switching to next page. The flip will never happen before this. +/// Does not affect live streams. +#define FLIP_MIN_DURATION 20000 #define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name #define SHM_TRACK_META "MstTRAK%s@%lu" //%s stream name, %lu track ID diff --git a/lib/stream.cpp b/lib/stream.cpp index 2dda6339..8113a659 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -86,7 +86,7 @@ JSON::Value Util::getStreamConfig(std::string streamname){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return result; } - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); diff --git a/src/input/input.cpp b/src/input/input.cpp index 4571e646..47f72801 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -395,10 +395,11 @@ namespace Mist { nProxy.pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime(); newData = false; } - nProxy.pagesByTrack[it->first].rbegin()->second.keyNum++; - nProxy.pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts(); - nProxy.pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i]; - if (nProxy.pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) { + DTSCPageData & dPage = nProxy.pagesByTrack[it->first].rbegin()->second; + dPage.keyNum++; + dPage.partNum += it->second.keys[i].getParts(); + dPage.dataSize += it->second.keySizes[i]; + if ((dPage.dataSize > FLIP_DATA_PAGE_SIZE || it->second.keys[i].getTime() - dPage.firstTime > FLIP_TARGET_DURATION) && it->second.keys[i].getTime() - dPage.firstTime > FLIP_MIN_DURATION) { newData = true; } } @@ -430,7 +431,7 @@ namespace Mist { } if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum) { - if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) { + if ((curData[tid].dataSize > FLIP_DATA_PAGE_SIZE || myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime() - curData[tid].firstTime > FLIP_TARGET_DURATION) && myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime() - curData[tid].firstTime > FLIP_MIN_DURATION) { nProxy.pagesByTrack[tid][bookKeeping[tid].first] = curData[tid]; bookKeeping[tid].first += curData[tid].keyNum; curData[tid].keyNum = 0; @@ -495,6 +496,7 @@ namespace Mist { return false; } //Update keynum to point to the corresponding page + uint64_t bufferTimer = Util::getMS(); INFO_MSG("Loading key %u from page %lu", keyNum, (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first); keyNum = (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first; if (!bufferStart(track, keyNum)) { @@ -525,7 +527,8 @@ namespace Mist { getNext(); } bufferFinalize(track); - DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d for track %d", keyNum, track); + bufferTimer = Util::getMS() - bufferTimer; + DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d for track %d in %llums", keyNum, track, bufferTimer); pageCounter[track][keyNum] = 15; return true; } diff --git a/src/io.cpp b/src/io.cpp index 838ca437..617966f8 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -515,16 +515,18 @@ namespace Mist { nextPageNum = 1; pagesByTrack[tid][1].dataSize = DEFAULT_DATA_PAGE_SIZE;//Initialize op 25mb pagesByTrack[tid][1].pageNum = 1; + pagesByTrack[tid][1].firstTime = packet.getTime(); } //Take the last allocated page std::map::reverse_iterator tmpIt = pagesByTrack[tid].rbegin(); //Compare on 8 mb boundary - if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE) { + if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE || packet.getTime() - tmpIt->second.firstTime > FLIP_TARGET_DURATION) { //Create the book keeping data for the new page nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum; INFO_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum); pagesByTrack[tid][nextPageNum].dataSize = DEFAULT_DATA_PAGE_SIZE; pagesByTrack[tid][nextPageNum].pageNum = nextPageNum; + pagesByTrack[tid][nextPageNum].firstTime = packet.getTime(); } pagesByTrack[tid].rbegin()->second.lastKeyTime = packet.getTime(); pagesByTrack[tid].rbegin()->second.keyNum++;