Ensure key and page numbers are all 32 bits, robustify/fix packet handling internals
This commit is contained in:
parent
2a5a808107
commit
0a3c399a98
6 changed files with 69 additions and 60 deletions
|
@ -201,7 +201,7 @@ static inline void show_stackframe(){}
|
||||||
#define SEM_STATISTICS "/MstStat"
|
#define SEM_STATISTICS "/MstStat"
|
||||||
#define SEM_USERS "/MstUser%s" //%s stream name
|
#define SEM_USERS "/MstUser%s" //%s stream name
|
||||||
|
|
||||||
#define SHM_TRACK_DATA "MstData%s@%zu_%zu" //%s stream name, %zu track ID, %PRIu32 page #
|
#define SHM_TRACK_DATA "MstData%s@%zu_%" PRIu32 //%s stream name, %zu track ID, %PRIu32 page #
|
||||||
// End new meta
|
// End new meta
|
||||||
|
|
||||||
#define INPUT_USER_INTERVAL 1000
|
#define INPUT_USER_INTERVAL 1000
|
||||||
|
@ -258,12 +258,9 @@ static inline void show_stackframe(){}
|
||||||
#define STAT_EX_SIZE 177
|
#define STAT_EX_SIZE 177
|
||||||
#define PLAY_EX_SIZE 2 + 6 * SIMUL_TRACKS
|
#define PLAY_EX_SIZE 2 + 6 * SIMUL_TRACKS
|
||||||
|
|
||||||
#define INVALID_TRACK_ID 0xFFFFFFFF
|
#define INVALID_TRACK_ID 0xFFFFFFFFu
|
||||||
#define INVALID_KEY_NUM 0xFFFFFFFF
|
#define INVALID_KEY_NUM 0xFFFFFFFFu
|
||||||
#define INVALID_PAGE_NUM 0xFFFF
|
#define INVALID_RECORD_INDEX 0xFFFFFFFFFFFFFFFFull
|
||||||
#define INVALID_RECORD_INDEX 0xFFFFFFFFFFFFFFFF
|
|
||||||
|
|
||||||
#define MAX_SIZE_T 0xFFFFFFFF
|
|
||||||
|
|
||||||
#define NEW_TRACK_ID 0x80000000
|
#define NEW_TRACK_ID 0x80000000
|
||||||
#define QUICK_NEGOTIATE 0xC0000000
|
#define QUICK_NEGOTIATE 0xC0000000
|
||||||
|
|
|
@ -2157,7 +2157,7 @@ namespace DTSC{
|
||||||
if (t.pages.getInt("avail", i) == 0){continue;}
|
if (t.pages.getInt("avail", i) == 0){continue;}
|
||||||
char thisPageName[NAME_BUFFER_SIZE];
|
char thisPageName[NAME_BUFFER_SIZE];
|
||||||
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
|
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
|
||||||
t.pages.getInt("firstkey", i));
|
(uint32_t)t.pages.getInt("firstkey", i));
|
||||||
IPC::sharedPage p(thisPageName, 20971520);
|
IPC::sharedPage p(thisPageName, 20971520);
|
||||||
p.master = true;
|
p.master = true;
|
||||||
}
|
}
|
||||||
|
@ -2201,7 +2201,7 @@ namespace DTSC{
|
||||||
// Initialize the correct page, make it master so it gets cleaned up when leaving scope.
|
// Initialize the correct page, make it master so it gets cleaned up when leaving scope.
|
||||||
char thisPageName[NAME_BUFFER_SIZE];
|
char thisPageName[NAME_BUFFER_SIZE];
|
||||||
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
|
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
|
||||||
t.pages.getInt("firstkey", t.pages.getDeleted()));
|
(uint32_t)t.pages.getInt("firstkey", t.pages.getDeleted()));
|
||||||
IPC::sharedPage p(thisPageName, 20971520);
|
IPC::sharedPage p(thisPageName, 20971520);
|
||||||
p.master = true;
|
p.master = true;
|
||||||
|
|
||||||
|
@ -3151,8 +3151,8 @@ namespace DTSC{
|
||||||
return pages.getInt("firstkey", res);
|
return pages.getInt("firstkey", res);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Given a key, returns the page number that timestamp can be found on.
|
/// Given a key, returns the page number it can be found on.
|
||||||
/// If the key is not available, returns the closest page number that is.
|
/// If the key is not available, returns the closest page that is.
|
||||||
size_t Meta::getPageNumberForKey(uint32_t idx, uint64_t keyNum) const{
|
size_t Meta::getPageNumberForKey(uint32_t idx, uint64_t keyNum) const{
|
||||||
const Util::RelAccX &pages = tracks.at(idx).pages;
|
const Util::RelAccX &pages = tracks.at(idx).pages;
|
||||||
size_t res = pages.getStartPos();
|
size_t res = pages.getStartPos();
|
||||||
|
|
40
src/io.cpp
40
src/io.cpp
|
@ -37,8 +37,8 @@ namespace Mist{
|
||||||
/// Buffering itself is done by bufferNext().
|
/// Buffering itself is done by bufferNext().
|
||||||
///\param tid The trackid of the page to start buffering
|
///\param tid The trackid of the page to start buffering
|
||||||
///\param pageNumber The number of the page to start buffering
|
///\param pageNumber The number of the page to start buffering
|
||||||
bool InOutBase::bufferStart(size_t idx, size_t pageNumber){
|
bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber){
|
||||||
VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %zu", streamName.c_str(), idx, pageNumber);
|
VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %" PRIu32, streamName.c_str(), idx, pageNumber);
|
||||||
// Initialize the stream metadata if it does not yet exist
|
// Initialize the stream metadata if it does not yet exist
|
||||||
#ifndef TSLIVE_INPUT
|
#ifndef TSLIVE_INPUT
|
||||||
if (!meta){meta.reInit(streamName);}
|
if (!meta){meta.reInit(streamName);}
|
||||||
|
@ -52,15 +52,15 @@ namespace Mist{
|
||||||
// If we are currently buffering a page, abandon it completely and print a message about this
|
// If we are currently buffering a page, abandon it completely and print a message about this
|
||||||
// This page will NEVER be deleted, unless we open it again later.
|
// This page will NEVER be deleted, unless we open it again later.
|
||||||
if (curPage.count(idx)){
|
if (curPage.count(idx)){
|
||||||
WARN_MSG("Abandoning current page (%zu) for track %zu", curPageNum[idx], idx);
|
WARN_MSG("Abandoning current page (%" PRIu32 ") for track %zu", curPageNum[idx], idx);
|
||||||
curPage.erase(idx);
|
curPage.erase(idx);
|
||||||
curPageNum.erase(idx);
|
curPageNum.erase(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
Util::RelAccX &tPages = meta.pages(idx);
|
Util::RelAccX &tPages = meta.pages(idx);
|
||||||
|
|
||||||
size_t pageIdx = INVALID_PAGE_NUM;
|
uint32_t pageIdx = INVALID_KEY_NUM;
|
||||||
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
||||||
if (tPages.getInt("firstkey", i) == pageNumber){
|
if (tPages.getInt("firstkey", i) == pageNumber){
|
||||||
pageIdx = i;
|
pageIdx = i;
|
||||||
break;
|
break;
|
||||||
|
@ -68,10 +68,10 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is not a valid page number on this track, stop buffering this page.
|
// If this is not a valid page number on this track, stop buffering this page.
|
||||||
if (pageIdx == INVALID_PAGE_NUM){
|
if (pageIdx == INVALID_KEY_NUM){
|
||||||
WARN_MSG("Aborting page buffer start: %zu is not a valid page number on track %zu.", pageNumber, idx);
|
WARN_MSG("Aborting page buffer start: %" PRIu32 " is not a valid page number on track %zu.", pageNumber, idx);
|
||||||
std::stringstream test;
|
std::stringstream test;
|
||||||
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
||||||
test << tPages.getInt("firstkey", i) << " ";
|
test << tPages.getInt("firstkey", i) << " ";
|
||||||
}
|
}
|
||||||
INFO_MSG("Valid page numbers: %s", test.str().c_str());
|
INFO_MSG("Valid page numbers: %s", test.str().c_str());
|
||||||
|
@ -81,7 +81,7 @@ namespace Mist{
|
||||||
|
|
||||||
// If the page is already buffered, ignore this request
|
// If the page is already buffered, ignore this request
|
||||||
if (isBuffered(idx, pageNumber)){
|
if (isBuffered(idx, pageNumber)){
|
||||||
INFO_MSG("Page %zu on track %zu already buffered", pageNumber, idx);
|
INFO_MSG("Page %" PRIu32 " on track %zu already buffered", pageNumber, idx);
|
||||||
///\return false if the page was already buffered.
|
///\return false if the page was already buffered.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -100,7 +100,7 @@ namespace Mist{
|
||||||
// Set the current offset to 0, to allow for using it in bufferNext()
|
// Set the current offset to 0, to allow for using it in bufferNext()
|
||||||
tPages.setInt("avail", 0, pageIdx);
|
tPages.setInt("avail", 0, pageIdx);
|
||||||
|
|
||||||
HIGH_MSG("Start buffering page %zu on track %zu successful", pageNumber, idx);
|
HIGH_MSG("Start buffering page %" PRIu32 " on track %zu successful", pageNumber, idx);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,26 +109,26 @@ namespace Mist{
|
||||||
/// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function.
|
/// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function.
|
||||||
///\param tid The trackid to remove the page from
|
///\param tid The trackid to remove the page from
|
||||||
///\param pageNumber The number of the page to remove
|
///\param pageNumber The number of the page to remove
|
||||||
void InOutBase::bufferRemove(size_t idx, size_t pageNumber){
|
void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber){
|
||||||
if (!standAlone){// A different process will handle this for us
|
if (!standAlone){// A different process will handle this for us
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Util::RelAccX &tPages = meta.pages(idx);
|
Util::RelAccX &tPages = meta.pages(idx);
|
||||||
|
|
||||||
size_t pageIdx = INVALID_PAGE_NUM;
|
uint32_t pageIdx = INVALID_KEY_NUM;
|
||||||
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
||||||
if (tPages.getInt("firstkey", i) == pageNumber){
|
if (tPages.getInt("firstkey", i) == pageNumber){
|
||||||
pageIdx = i;
|
pageIdx = i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If the given pagenumber is not a valid page on this track, do nothing
|
// If the given pagenumber is not a valid page on this track, do nothing
|
||||||
if (pageIdx == INVALID_PAGE_NUM){
|
if (pageIdx == INVALID_KEY_NUM){
|
||||||
INFO_MSG("Can't remove page %zu on track %zu as it is not a valid page number.", pageNumber, idx);
|
INFO_MSG("Can't remove page %" PRIu32 " on track %zu as it is not a valid page number.", pageNumber, idx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
HIGH_MSG("Removing page %zu on track %zu from the corresponding metaPage", pageNumber, idx);
|
HIGH_MSG("Removing page %" PRIu32 " on track %zu from the corresponding metaPage", pageNumber, idx);
|
||||||
tPages.setInt("avail", 0, pageIdx);
|
tPages.setInt("avail", 0, pageIdx);
|
||||||
|
|
||||||
// Open the correct page
|
// Open the correct page
|
||||||
|
@ -161,7 +161,7 @@ namespace Mist{
|
||||||
/// Returns the pagenumber where this key is buffered on
|
/// Returns the pagenumber where this key is buffered on
|
||||||
///\param tid The trackid on which to locate the key
|
///\param tid The trackid on which to locate the key
|
||||||
///\param keyNum The number of the keyframe to find
|
///\param keyNum The number of the keyframe to find
|
||||||
size_t InOutBase::bufferedOnPage(size_t idx, size_t keyNum){
|
uint32_t InOutBase::bufferedOnPage(size_t idx, uint32_t keyNum){
|
||||||
Util::RelAccX &tPages = meta.pages(idx);
|
Util::RelAccX &tPages = meta.pages(idx);
|
||||||
|
|
||||||
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
||||||
|
@ -206,8 +206,8 @@ namespace Mist{
|
||||||
IPC::sharedPage &myPage = curPage[packTrack];
|
IPC::sharedPage &myPage = curPage[packTrack];
|
||||||
|
|
||||||
Util::RelAccX &tPages = meta.pages(packTrack);
|
Util::RelAccX &tPages = meta.pages(packTrack);
|
||||||
size_t pageIdx = 0;
|
uint32_t pageIdx = 0;
|
||||||
size_t currPagNum = curPageNum[packTrack];
|
uint32_t currPagNum = curPageNum[packTrack];
|
||||||
Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey");
|
Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey");
|
||||||
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
||||||
if (tPages.getInt(firstkey, i) == currPagNum){
|
if (tPages.getInt(firstkey, i) == currPagNum){
|
||||||
|
@ -220,7 +220,7 @@ namespace Mist{
|
||||||
uint64_t pageSize = tPages.getInt("size", pageIdx);
|
uint64_t pageSize = tPages.getInt("size", pageIdx);
|
||||||
// Do nothing when there is not enough free space on the page to add the packet.
|
// Do nothing when there is not enough free space on the page to add the packet.
|
||||||
if (pageSize - pageOffset < packDataLen){
|
if (pageSize - pageOffset < packDataLen){
|
||||||
FAIL_MSG("Track %" PRIu32 "p%zu : Pack %" PRIu64 "ms of %" PRIu64 "b exceeds size %" PRIu64 " @ bpos %" PRIu64,
|
FAIL_MSG("Track %" PRIu32 "p%" PRIu32 " : Pack %" PRIu64 "ms of %" PRIu64 "b exceeds size %" PRIu64 " @ bpos %" PRIu64,
|
||||||
packTrack, currPagNum, packTime, packDataLen, pageSize, pageOffset);
|
packTrack, currPagNum, packTime, packDataLen, pageSize, pageOffset);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
8
src/io.h
8
src/io.h
|
@ -15,13 +15,13 @@ namespace Mist{
|
||||||
InOutBase();
|
InOutBase();
|
||||||
|
|
||||||
bool isBuffered(size_t idx, uint32_t keyNum);
|
bool isBuffered(size_t idx, uint32_t keyNum);
|
||||||
size_t bufferedOnPage(size_t idx, size_t keyNum);
|
uint32_t bufferedOnPage(size_t idx, uint32_t keyNum);
|
||||||
|
|
||||||
size_t getMainSelectedTrack();
|
size_t getMainSelectedTrack();
|
||||||
|
|
||||||
bool bufferStart(size_t idx, size_t pageNumber);
|
bool bufferStart(size_t idx, uint32_t pageNumber);
|
||||||
void bufferFinalize(size_t idx);
|
void bufferFinalize(size_t idx);
|
||||||
void bufferRemove(size_t idx, size_t pageNumber);
|
void bufferRemove(size_t idx, uint32_t pageNumber);
|
||||||
void bufferLivePacket(const DTSC::Packet &packet);
|
void bufferLivePacket(const DTSC::Packet &packet);
|
||||||
|
|
||||||
void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
|
void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
|
||||||
|
@ -42,7 +42,7 @@ namespace Mist{
|
||||||
|
|
||||||
std::map<size_t, Comms::Users> userSelect;
|
std::map<size_t, Comms::Users> userSelect;
|
||||||
|
|
||||||
std::map<size_t, size_t> curPageNum; ///< For each track, holds the number page that is currently being written.
|
std::map<size_t, uint32_t> curPageNum; ///< For each track, holds the number page that is currently being written.
|
||||||
std::map<size_t, IPC::sharedPage> curPage; ///< For each track, holds the page that is currently being written.
|
std::map<size_t, IPC::sharedPage> curPage; ///< For each track, holds the page that is currently being written.
|
||||||
};
|
};
|
||||||
}// namespace Mist
|
}// namespace Mist
|
||||||
|
|
|
@ -511,9 +511,9 @@ namespace Mist{
|
||||||
uint64_t pageKeys = tPages.getInt("keycount", i);
|
uint64_t pageKeys = tPages.getInt("keycount", i);
|
||||||
if (keyNum > pageNum + pageKeys - 1) continue;
|
if (keyNum > pageNum + pageKeys - 1) continue;
|
||||||
uint64_t pageAvail = tPages.getInt("avail", i);
|
uint64_t pageAvail = tPages.getInt("avail", i);
|
||||||
return pageAvail == 0 ? INVALID_PAGE_NUM : pageNum;
|
return pageAvail == 0 ? INVALID_KEY_NUM : pageNum;
|
||||||
}
|
}
|
||||||
return INVALID_PAGE_NUM;
|
return INVALID_KEY_NUM;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the highest page number available for the given trackId.
|
/// Gets the highest page number available for the given trackId.
|
||||||
|
@ -551,8 +551,8 @@ namespace Mist{
|
||||||
uint64_t micros = Util::getMicros();
|
uint64_t micros = Util::getMicros();
|
||||||
VERYHIGH_MSG("Loading track %zu, containing key %zu", trackId, keyNum);
|
VERYHIGH_MSG("Loading track %zu, containing key %zu", trackId, keyNum);
|
||||||
uint32_t timeout = 0;
|
uint32_t timeout = 0;
|
||||||
uint64_t pageNum = pageNumForKey(trackId, keyNum);
|
uint32_t pageNum = pageNumForKey(trackId, keyNum);
|
||||||
while (keepGoing() && pageNum == INVALID_PAGE_NUM){
|
while (keepGoing() && pageNum == INVALID_KEY_NUM){
|
||||||
if (!timeout){HIGH_MSG("Requesting page with key %zu:%zu", trackId, keyNum);}
|
if (!timeout){HIGH_MSG("Requesting page with key %zu:%zu", trackId, keyNum);}
|
||||||
++timeout;
|
++timeout;
|
||||||
//Time out after 15 seconds
|
//Time out after 15 seconds
|
||||||
|
@ -689,7 +689,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
if (M.getType(mainTrack) == "video"){
|
if (M.getType(mainTrack) == "video"){
|
||||||
DTSC::Keys keys(M.keys(mainTrack));
|
DTSC::Keys keys(M.keys(mainTrack));
|
||||||
size_t keyNum = M.getKeyNumForTime(mainTrack, pos);
|
uint32_t keyNum = M.getKeyNumForTime(mainTrack, pos);
|
||||||
pos = keys.getTime(keyNum);
|
pos = keys.getTime(keyNum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -734,9 +734,9 @@ namespace Mist{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
DTSC::Keys keys(M.keys(tid));
|
DTSC::Keys keys(M.keys(tid));
|
||||||
size_t keyNum = M.getKeyNumForTime(tid, pos);
|
uint32_t keyNum = M.getKeyNumForTime(tid, pos);
|
||||||
uint64_t actualKeyTime = keys.getTime(keyNum);
|
uint64_t actualKeyTime = keys.getTime(keyNum);
|
||||||
HIGH_MSG("Seeking to track %zu key %zu => time %" PRIu64, tid, keyNum, pos);
|
HIGH_MSG("Seeking to track %zu key %" PRIu32 " => time %" PRIu64, tid, keyNum, pos);
|
||||||
if (actualKeyTime > pos){
|
if (actualKeyTime > pos){
|
||||||
if (M.getLive()){
|
if (M.getLive()){
|
||||||
pos = actualKeyTime;
|
pos = actualKeyTime;
|
||||||
|
@ -776,7 +776,7 @@ namespace Mist{
|
||||||
tid);
|
tid);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
VERYHIGH_MSG("Track %zu no data (key %zu @ %" PRIu64 ") - waiting...", tid,
|
VERYHIGH_MSG("Track %zu no data (key %" PRIu32 " @ %" PRIu64 ") - waiting...", tid,
|
||||||
keyNum + (getNextKey ? 1 : 0), tmp.offset);
|
keyNum + (getNextKey ? 1 : 0), tmp.offset);
|
||||||
uint32_t i = 0;
|
uint32_t i = 0;
|
||||||
while (!meta.getLive() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){
|
while (!meta.getLive() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){
|
||||||
|
@ -784,7 +784,7 @@ namespace Mist{
|
||||||
stats();
|
stats();
|
||||||
}
|
}
|
||||||
if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);}
|
if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);}
|
||||||
FAIL_MSG("Track %zu no data (key %zu@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1));
|
FAIL_MSG("Track %zu no data (key %" PRIu32 "@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1));
|
||||||
userSelect.erase(tid);
|
userSelect.erase(tid);
|
||||||
firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000);
|
firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000);
|
||||||
return false;
|
return false;
|
||||||
|
@ -1061,7 +1061,7 @@ namespace Mist{
|
||||||
DTSC::Keys mainKeys(meta.keys(mainTrack));
|
DTSC::Keys mainKeys(meta.keys(mainTrack));
|
||||||
if (!mainKeys.getValidCount()){return false;}
|
if (!mainKeys.getValidCount()){return false;}
|
||||||
|
|
||||||
for (size_t keyNum = mainKeys.getEndValid() - 1; keyNum >= mainKeys.getFirstValid(); keyNum--){
|
for (uint32_t keyNum = mainKeys.getEndValid() - 1; keyNum >= mainKeys.getFirstValid(); keyNum--){
|
||||||
seekPos = mainKeys.getTime(keyNum);
|
seekPos = mainKeys.getTime(keyNum);
|
||||||
// Only skip forward if we can win a decent amount (100ms)
|
// Only skip forward if we can win a decent amount (100ms)
|
||||||
if (seekPos <= cTime + 100 * seekCount){break;}
|
if (seekPos <= cTime + 100 * seekCount){break;}
|
||||||
|
@ -1407,7 +1407,7 @@ namespace Mist{
|
||||||
// store copy of current state
|
// store copy of current state
|
||||||
std::set<sortedPageInfo> tmp_buffer = buffer;
|
std::set<sortedPageInfo> tmp_buffer = buffer;
|
||||||
std::map<size_t, Comms::Users> tmp_userSelect = userSelect;
|
std::map<size_t, Comms::Users> tmp_userSelect = userSelect;
|
||||||
std::map<size_t, size_t> tmp_currentPage = currentPage;
|
std::map<size_t, uint32_t> tmp_currentPage = currentPage;
|
||||||
|
|
||||||
// reset the current packet to null, assuming failure
|
// reset the current packet to null, assuming failure
|
||||||
thisPacket.null();
|
thisPacket.null();
|
||||||
|
@ -1424,7 +1424,7 @@ namespace Mist{
|
||||||
userSelect[mainTrack].reload(streamName, mainTrack);
|
userSelect[mainTrack].reload(streamName, mainTrack);
|
||||||
// now, seek to the exact timestamp of the keyframe
|
// now, seek to the exact timestamp of the keyframe
|
||||||
DTSC::Keys keys(M.keys(mainTrack));
|
DTSC::Keys keys(M.keys(mainTrack));
|
||||||
size_t targetKey = M.getKeyNumForTime(mainTrack, currTime);
|
uint32_t targetKey = M.getKeyNumForTime(mainTrack, currTime);
|
||||||
seek(keys.getTime(targetKey));
|
seek(keys.getTime(targetKey));
|
||||||
// attempt to load the key into thisPacket
|
// attempt to load the key into thisPacket
|
||||||
bool ret = prepareNext();
|
bool ret = prepareNext();
|
||||||
|
@ -1437,7 +1437,7 @@ namespace Mist{
|
||||||
buffer = tmp_buffer;
|
buffer = tmp_buffer;
|
||||||
userSelect = tmp_userSelect;
|
userSelect = tmp_userSelect;
|
||||||
// but the currentPage map must also load keys as needed
|
// but the currentPage map must also load keys as needed
|
||||||
for (std::map<size_t, size_t>::iterator it = tmp_currentPage.begin(); it != tmp_currentPage.end(); ++it){
|
for (std::map<size_t, uint32_t>::iterator it = tmp_currentPage.begin(); it != tmp_currentPage.end(); ++it){
|
||||||
loadPageForKey(it->first, it->second);
|
loadPageForKey(it->first, it->second);
|
||||||
}
|
}
|
||||||
// now we are back to normal and can return safely
|
// now we are back to normal and can return safely
|
||||||
|
@ -1543,6 +1543,12 @@ namespace Mist{
|
||||||
dropTrack(nxt.tid, "EOP: invalid next packet");
|
dropTrack(nxt.tid, "EOP: invalid next packet");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (nextTime < nxt.time){
|
||||||
|
std::stringstream errMsg;
|
||||||
|
errMsg << "next packet has timestamp " << nextTime << " but current timestamp is " << nxt.time;
|
||||||
|
dropTrack(nxt.tid, errMsg.str().c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}else{
|
}else{
|
||||||
//no next packet yet!
|
//no next packet yet!
|
||||||
//Check if this is the last packet of a VoD stream. Return success and drop the track.
|
//Check if this is the last packet of a VoD stream. Return success and drop the track.
|
||||||
|
@ -1552,13 +1558,25 @@ namespace Mist{
|
||||||
dropTrack(nxt.tid, "end of VoD track reached", false);
|
dropTrack(nxt.tid, "end of VoD track reached", false);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
size_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
|
uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
|
||||||
//Check if there exists a different page for the next key
|
//Check if there exists a different page for the next key
|
||||||
size_t nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);
|
uint32_t nextKeyPage = INVALID_KEY_NUM;
|
||||||
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
|
//Make sure we only try to read the page for the next key if it actually should be available
|
||||||
DTSC::Keys keys(M.keys(nxt.tid));
|
DTSC::Keys keys(M.keys(nxt.tid));
|
||||||
|
if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);}
|
||||||
|
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
|
||||||
// If so, the next key is our next packet
|
// If so, the next key is our next packet
|
||||||
nextTime = keys.getTime(thisKey + 1);
|
nextTime = keys.getTime(thisKey + 1);
|
||||||
|
|
||||||
|
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
|
||||||
|
if (nextTime < nxt.time){
|
||||||
|
std::stringstream errMsg;
|
||||||
|
errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time;
|
||||||
|
errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage;
|
||||||
|
errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid();
|
||||||
|
dropTrack(nxt.tid, errMsg.str().c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}else{
|
}else{
|
||||||
//Okay, there's no next page yet, and no next packet on this page either.
|
//Okay, there's no next page yet, and no next packet on this page either.
|
||||||
//That means we're waiting for data to show up, somewhere.
|
//That means we're waiting for data to show up, somewhere.
|
||||||
|
@ -1575,7 +1593,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
//every ~16 seconds, reconnect to metadata
|
//every ~16 seconds, reconnect to metadata
|
||||||
if (emptyCount % 1600 == 0){
|
if (emptyCount % 1600 == 0){
|
||||||
INFO_MSG("Reconnecting to input; track %" PRIu64 " key %zu is on page %zu and we're currently serving %zu from %zu", nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]);
|
INFO_MSG("Reconnecting to input; track %" PRIu64 " key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]);
|
||||||
reconnect();
|
reconnect();
|
||||||
if (!meta){
|
if (!meta){
|
||||||
onFail("Could not connect to stream data", true);
|
onFail("Could not connect to stream data", true);
|
||||||
|
@ -1596,12 +1614,6 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
|
|
||||||
if (nextTime < nxt.time){
|
|
||||||
dropTrack(nxt.tid, "time going backwards");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we've handled all special cases - at this point the packet should exist
|
// we've handled all special cases - at this point the packet should exist
|
||||||
// let's load it
|
// let's load it
|
||||||
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
|
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
|
||||||
|
@ -1622,7 +1634,7 @@ namespace Mist{
|
||||||
//Update keynum only when the second flips over in the timestamp
|
//Update keynum only when the second flips over in the timestamp
|
||||||
//We do this because DTSC::Keys is pretty CPU-heavy
|
//We do this because DTSC::Keys is pretty CPU-heavy
|
||||||
if (nxt.time / 1000 < nextTime/1000){
|
if (nxt.time / 1000 < nextTime/1000){
|
||||||
size_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
|
uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
|
||||||
userSelect[nxt.tid].setKeyNum(thisKey);
|
userSelect[nxt.tid].setKeyNum(thisKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ namespace Mist{
|
||||||
std::string getCountry(std::string ip);
|
std::string getCountry(std::string ip);
|
||||||
void doSync(bool force = false);
|
void doSync(bool force = false);
|
||||||
/*LTS-END*/
|
/*LTS-END*/
|
||||||
std::map<size_t, size_t> currentPage;
|
std::map<size_t, uint32_t> currentPage;
|
||||||
void loadPageForKey(size_t trackId, size_t keyNum);
|
void loadPageForKey(size_t trackId, size_t keyNum);
|
||||||
uint64_t pageNumForKey(size_t trackId, size_t keyNum);
|
uint64_t pageNumForKey(size_t trackId, size_t keyNum);
|
||||||
uint64_t pageNumMax(size_t trackId);
|
uint64_t pageNumMax(size_t trackId);
|
||||||
|
|
Loading…
Add table
Reference in a new issue