Fixed various input buffering bugs, fixed various output bugs, fixed MP4.

This commit is contained in:
Thulinma 2014-04-21 20:59:16 +02:00
parent 9bdb6c17d6
commit 71e4a0cc26
4 changed files with 94 additions and 97 deletions

View file

@ -78,32 +78,11 @@ namespace Mist {
}
if (!Util::Stream::getStream(streamName)){
DEBUG_MSG(DLVL_FAIL, "Opening stream disallowed - aborting initalization");
onFail();
return;
}
isInitialized = true;
streamIndex.init(streamName,0,false,false);
if (!streamIndex.mapped){
sem_t * waiting = sem_open(std::string("/wait_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 0);
if (waiting == SEM_FAILED){
DEBUG_MSG(DLVL_FAIL, "Failed to open semaphore - cancelling");
onFail();
return;
}
#ifdef __APPLE__
unsigned int timeout = 0;
while (++timeout < 300 && sem_trywait(waiting) == -1 && (errno == EINTR || errno == EAGAIN) ){
Util::sleep(100);
}
#else
struct timespec ts;
ts.tv_sec = Util::epoch() + 30;
ts.tv_nsec = 0;
while (sem_timedwait(waiting, &ts) == -1 && errno == EINTR) continue;
#endif
sem_post(waiting);
sem_close(waiting);
streamIndex.init(streamName,0);
}
streamIndex.init(streamName,0);
if (!streamIndex.mapped){
DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str());
onFail();
@ -233,7 +212,10 @@ namespace Mist {
}
unsigned int Output::getKeyForTime(long unsigned int trackId, long long timeStamp){
unsigned int keyNo = 0;
if (!myMeta.tracks[trackId].keys.size()){
return 0;
}
unsigned int keyNo = myMeta.tracks[trackId].keys.begin()->getNumber();
for (std::deque<DTSC::Key>::iterator it = myMeta.tracks[trackId].keys.begin(); it != myMeta.tracks[trackId].keys.end(); it++){
if (it->getTime() <= timeStamp){
keyNo = it->getNumber();
@ -245,11 +227,12 @@ namespace Mist {
}
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
if (keyNum >= myMeta.tracks[trackId].keys.rbegin()->getNumber()){
//curPages.erase(trackId);
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
curPages.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
DEBUG_MSG(DLVL_MEDIUM, "Loading track %lu, containing key %lld", trackId, keyNum);
DEBUG_MSG(DLVL_HIGH, "Loading track %lu, containing key %lld", trackId, keyNum);
int pageNum = -1;
int keyAmount = -1;
unsigned int timeout = 0;
@ -260,8 +243,9 @@ namespace Mist {
}
while (pageNum == -1 || keyAmount == -1){
for (int i = 0; i < indexPages[trackId].len / 8; i++){
long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF);
long amountKey = ntohl((((long long int*)indexPages[trackId].mapped)[i]) & 0xFFFFFFFF);
if (amountKey == 0){continue;}
long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF);
if (tmpKey <= keyNum && (tmpKey + amountKey) > keyNum){
pageNum = tmpKey;
keyAmount = amountKey;
@ -275,14 +259,25 @@ namespace Mist {
if (timeout++ > 100){
DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting.");
curPages.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
nxtKeyNum[trackId] = keyNum-1;
if (keyNum){
nxtKeyNum[trackId] = keyNum-1;
}else{
nxtKeyNum[trackId] = 0;
}
stats();
Util::sleep(100);
}
}
if (keyNum){
nxtKeyNum[trackId] = keyNum-1;
}else{
nxtKeyNum[trackId] = 0;
}
stats();
nxtKeyNum[trackId] = pageNum;
if (currKeyOpen.count(trackId) && currKeyOpen[trackId] == pageNum){
@ -292,7 +287,7 @@ namespace Mist {
sprintf(id, "%s%lu_%d", streamName.c_str(), trackId, pageNum);
curPages[trackId].init(std::string(id),0);
if (!(curPages[trackId].mapped)){
DEBUG_MSG(DLVL_FAIL, "(%d) Initializing page %s failed", getpid(), curPages[trackId].name.c_str());
DEBUG_MSG(DLVL_FAIL, "Initializing page %s failed", curPages[trackId].name.c_str());
return;
}
currKeyOpen[trackId] = pageNum;
@ -308,6 +303,7 @@ namespace Mist {
buffer.clear();
currentPacket.null();
updateMeta();
DEBUG_MSG(DLVL_MEDIUM, "Seeking to %llims", pos);
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
seek(*it, pos);
}
@ -339,6 +335,15 @@ namespace Mist {
DEBUG_MSG(DLVL_FAIL, "Noes! Couldn't find packet on track %d because of some kind of corruption error or somesuch.", tid);
}else{
DEBUG_MSG(DLVL_FAIL, "Track %d no data (key %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
unsigned int i = 0;
while (curPages[tid].mapped[tmp.offset] == 0 && ++i < 10){
Util::sleep(100);
}
if (curPages[tid].mapped[tmp.offset] == 0){
DEBUG_MSG(DLVL_FAIL, "Track %d no data (key %u) - timeout", tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
}else{
return seek(tid, pos, getNextKey);
}
}
return false;
}
@ -352,7 +357,7 @@ namespace Mist {
if (wantRequest){
if ((firstData && myConn.Received().size()) || myConn.spool()){
firstData = false;
DEBUG_MSG(DLVL_VERYHIGH, "(%d) OnRequest", getpid());
DEBUG_MSG(DLVL_VERYHIGH, "OnRequest");
onRequest();
}else{
if (!isBlocking && !parseData){
@ -365,7 +370,7 @@ namespace Mist {
initialize();
}
if ( !sentHeader){
DEBUG_MSG(DLVL_VERYHIGH, "(%d) SendHeader", getpid());
DEBUG_MSG(DLVL_VERYHIGH, "SendHeader");
sendHeader();
}
prepareNext();