Various fixes for live streams.

This commit is contained in:
Thulinma 2014-12-21 13:50:08 +01:00
parent cc4539c4da
commit 142ef73f6c
3 changed files with 69 additions and 51 deletions

View file

@ -86,7 +86,7 @@ namespace Controller {
configLock.wait(); configLock.wait();
//write config //write config
std::string temp = writeConf.toPacked(); std::string temp = writeConf.toPacked();
memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (unsigned long)mistConfOut.len)); memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len));
//unlock semaphore //unlock semaphore
configLock.post(); configLock.post();
} }

View file

@ -380,6 +380,23 @@ namespace Mist {
return keyNo; return keyNo;
} }
int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){
if (!indexPages.count(trackId)){
char id[100];
sprintf(id, "%s%lu", streamName.c_str(), trackId);
indexPages[trackId].init(id, 8 * 1024);
}
for (int i = 0; i < indexPages[trackId].len / 8; i++){
long amountKey = ntohl((((long long int*)indexPages[trackId].mapped)[i]) & 0xFFFFFFFF);
if (amountKey == 0){continue;}
long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF);
if (tmpKey <= keyNum && (tmpKey + amountKey) > keyNum){
return tmpKey;
}
}
return -1;
}
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){ if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
curPages.erase(trackId); curPages.erase(trackId);
@ -387,43 +404,26 @@ namespace Mist {
return; return;
} }
DEBUG_MSG(DLVL_HIGH, "Loading track %lu, containing key %lld", trackId, keyNum); DEBUG_MSG(DLVL_HIGH, "Loading track %lu, containing key %lld", trackId, keyNum);
int pageNum = -1;
int keyAmount = -1;
unsigned int timeout = 0; unsigned int timeout = 0;
if (!indexPages.count(trackId)){ int pageNum = pageNumForKey(trackId, keyNum);
char id[100]; while (pageNum == -1){
sprintf(id, "%s%lu", streamName.c_str(), trackId); if (!timeout){
indexPages[trackId].init(id, 8 * 1024); DEBUG_MSG(DLVL_DEVEL, "Requesting/waiting for page that has key %lu:%lld...", trackId, keyNum);
}
while (pageNum == -1 || keyAmount == -1){
for (int i = 0; i < indexPages[trackId].len / 8; i++){
long amountKey = ntohl((((long long int*)indexPages[trackId].mapped)[i]) & 0xFFFFFFFF);
if (amountKey == 0){continue;}
long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF);
if (tmpKey <= keyNum && (tmpKey + amountKey) > keyNum){
pageNum = tmpKey;
keyAmount = amountKey;
break;
}
} }
if (pageNum == -1 || keyAmount == -1){ if (timeout++ > 100){
if (!timeout){ DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting.");
DEBUG_MSG(DLVL_DEVEL, "Requesting/waiting for page that has key %lu:%lld...", trackId, keyNum); curPages.erase(trackId);
} currKeyOpen.erase(trackId);
if (timeout++ > 100){ return;
DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting.");
curPages.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
if (keyNum){
nxtKeyNum[trackId] = keyNum-1;
}else{
nxtKeyNum[trackId] = 0;
}
stats();
Util::sleep(100);
} }
if (keyNum){
nxtKeyNum[trackId] = keyNum-1;
}else{
nxtKeyNum[trackId] = 0;
}
stats();
Util::sleep(100);
pageNum = pageNumForKey(trackId, keyNum);
} }
if (keyNum){ if (keyNum){
@ -580,6 +580,7 @@ namespace Mist {
if (!buffer.size()){ if (!buffer.size()){
currentPacket.null(); currentPacket.null();
DEBUG_MSG(DLVL_DEVEL, "Buffer completely played out"); DEBUG_MSG(DLVL_DEVEL, "Buffer completely played out");
onFinish();
return; return;
} }
sortedPageInfo nxt = *(buffer.begin()); sortedPageInfo nxt = *(buffer.begin());
@ -610,44 +611,55 @@ namespace Mist {
return; return;
} }
//have we arrived at the end of the memory page? (4 zeroes mark the end)
if (!memcmp(curPages[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){ if (!memcmp(curPages[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){
if (!currentPacket.getTime()){ //if we don't currently know where we are, we're lost. We should drop the track.
if (!nxt.time){
DEBUG_MSG(DLVL_DEVEL, "Timeless empty packet on track %u - dropping track.", nxt.tid); DEBUG_MSG(DLVL_DEVEL, "Timeless empty packet on track %u - dropping track.", nxt.tid);
prepareNext(); prepareNext();
return; return;
} }
if (myMeta.live){ //if this is a live stream, we might have just reached the live point.
Util::sleep(500); //check where the next key is
updateMeta(); int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1);
if (myMeta && ++emptyCount < 20){ //are we live, and the next key hasn't shown up on another page? then we're waiting.
if (!seek(nxt.tid, currentPacket.getTime(), true)){ if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){
buffer.insert(nxt); if (myMeta && ++emptyCount < 42){
} //we're waiting for new data. Simply retry.
buffer.insert(nxt);
}else{ }else{
//after ~10 seconds, give up and drop the track.
DEBUG_MSG(DLVL_DEVEL, "Empty packet on track %u - could not reload, dropping track.", nxt.tid); DEBUG_MSG(DLVL_DEVEL, "Empty packet on track %u - could not reload, dropping track.", nxt.tid);
} }
//keep updating the metadata at 250ms intervals while waiting for more data
Util::sleep(250);
updateMeta();
}else{ }else{
//if we're not live, we've simply reached the end of the page. Load the next key.
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]); loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
nxt.offset = 0; nxt.offset = 0;
if (curPages.count(nxt.tid) && curPages[nxt.tid].mapped){ if (curPages.count(nxt.tid) && curPages[nxt.tid].mapped){
if (getDTSCTime(curPages[nxt.tid].mapped, nxt.offset) < nxt.time){ unsigned long long nextTime = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset);
if (nextTime && nextTime < nxt.time){
DEBUG_MSG(DLVL_DEVEL, "Time going backwards in track %u - dropping track.", nxt.tid); DEBUG_MSG(DLVL_DEVEL, "Time going backwards in track %u - dropping track.", nxt.tid);
}else{ }else{
nxt.time = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset); if (nextTime){
nxt.time = nextTime;
}
buffer.insert(nxt); buffer.insert(nxt);
DEBUG_MSG(DLVL_MEDIUM, "Next page for track %u starts at %llu.", nxt.tid, nxt.time);
} }
prepareNext(); }else{
return; DEBUG_MSG(DLVL_DEVEL, "Could not load next memory page for track %u - dropping track.", nxt.tid);
} }
DEBUG_MSG(DLVL_DEVEL, "Empty packet on track %u - dropping track.", nxt.tid);
} }
prepareNext(); prepareNext();
return; return;
} }
currentPacket.reInit(curPages[nxt.tid].mapped + nxt.offset, 0, true); currentPacket.reInit(curPages[nxt.tid].mapped + nxt.offset, 0, true);
if (currentPacket){ if (currentPacket){
if (currentPacket.getTime() != nxt.time){ if (currentPacket.getTime() != nxt.time && nxt.time){
DEBUG_MSG(DLVL_DEVEL, "ACTUALLY Loaded track %ld (next=%lu), %llu ms", currentPacket.getTrackId(), nxtKeyNum[nxt.tid], currentPacket.getTime()); DEBUG_MSG(DLVL_MEDIUM, "ACTUALLY Loaded track %ld (next=%lu), %llu ms", currentPacket.getTrackId(), nxtKeyNum[nxt.tid], currentPacket.getTime());
} }
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, currentPacket.getTime()); nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, currentPacket.getTime());
emptyCount = 0; emptyCount = 0;
@ -660,7 +672,12 @@ namespace Mist {
} }
if (curPages[nxt.tid]){ if (curPages[nxt.tid]){
if (nxt.offset < curPages[nxt.tid].len){ if (nxt.offset < curPages[nxt.tid].len){
nxt.time = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset); unsigned long long nextTime = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset);
if (nextTime){
nxt.time = nextTime;
}else{
++nxt.time;
}
} }
buffer.insert(nxt); buffer.insert(nxt);
} }

View file

@ -73,6 +73,7 @@ namespace Mist {
private://these *should* not be messed with in child classes. private://these *should* not be messed with in child classes.
std::map<unsigned long, unsigned int> currKeyOpen; std::map<unsigned long, unsigned int> currKeyOpen;
void loadPageForKey(long unsigned int trackId, long long int keyNum); void loadPageForKey(long unsigned int trackId, long long int keyNum);
int pageNumForKey(long unsigned int trackId, long long int keyNum);
unsigned int lastStats;///<Time of last sending of stats. unsigned int lastStats;///<Time of last sending of stats.
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending. long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes. std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.