diff --git a/src/input/input.cpp b/src/input/input.cpp index 7899533a..10a40231 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -121,7 +121,7 @@ namespace Mist { if (!isBuffer){ for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - bufferFrame(it->first, 0); + bufferFrame(it->first, 1); } } @@ -162,7 +162,6 @@ namespace Mist { change = false; for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ if (!it2->second){ - pagesByTrack[it->first].erase(it2->first); pageCounter[it->first].erase(it2->first); dataPages[it->first].erase(it2->first); change = true; @@ -201,7 +200,7 @@ namespace Mist { while(lastPack){//loop through all int tid = lastPack.getTrackId(); if (!bookKeeping.count(tid)){ - bookKeeping[tid].first = 0; + bookKeeping[tid].first = 1; bookKeeping[tid].curPart = 0; bookKeeping[tid].curKey = 0; @@ -240,8 +239,9 @@ namespace Mist { if (!pagesByTrack.count(it->first)){ DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first); }else{ - DEBUG_MSG(DLVL_HIGH, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size()); + DEBUG_MSG(DLVL_DEVEL, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size()); for (std::map::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++){ + DEBUG_MSG(DLVL_DEVEL, "Page %u-%u", it2->first, it2->first + it2->second.keyNum - 1); } } } @@ -249,42 +249,42 @@ namespace Mist { bool Input::bufferFrame(int track, int keyNum){ - DEBUG_MSG(DLVL_DONTEVEN, "Attempting to buffer %d:%d", track, keyNum); + if (keyNum < 1){keyNum = 1;} if (!pagesByTrack.count(track)){ return false; } - std::map ::iterator it = pagesByTrack[track].upper_bound(keyNum); - if (it == pagesByTrack[track].begin()){ - return false; + std::map::iterator it = pagesByTrack[track].upper_bound(keyNum); + if (it != pagesByTrack[track].begin()){ + it--; } - it --; int pageNum = it->first; pageCounter[track][pageNum] = 15;///Keep page 15seconds in memory after last use - if (!dataPages[track].count(pageNum)){ - char pageId[100]; - int pageIdLen = sprintf(pageId, "%s%d_%d", config->getString("streamname").c_str(), track, pageNum); - std::string tmpString(pageId, pageIdLen); - dataPages[track][pageNum].init(tmpString, it->second.dataSize, true); - DEBUG_MSG(DLVL_HIGH, "Buffering page %d through %d / %lu", pageNum, pageNum + it->second.keyNum, myMeta.tracks[track].keys.size()); - - std::stringstream trackSpec; - trackSpec << track; - trackSelect(trackSpec.str()); - }else{ + DEBUG_MSG(DLVL_DONTEVEN, "Attempting to buffer page %d key %d->%d", track, keyNum, pageNum); + if (dataPages[track].count(pageNum)){ return true; } - seek(myMeta.tracks[track].keys[pageNum].getTime()); + char pageId[100]; + int pageIdLen = sprintf(pageId, "%s%d_%d", config->getString("streamname").c_str(), track, pageNum); + std::string tmpString(pageId, pageIdLen); + dataPages[track][pageNum].init(tmpString, it->second.dataSize, true); + DEBUG_MSG(DLVL_DEVEL, "Buffering page %d through %d / %lu", pageNum, pageNum-1 + it->second.keyNum, myMeta.tracks[track].keys.size()); + + std::stringstream trackSpec; + trackSpec << track; + trackSelect(trackSpec.str()); + seek(myMeta.tracks[track].keys[pageNum-1].getTime()); long long unsigned int stopTime = myMeta.tracks[track].lastms + 1; - if ((int)myMeta.tracks[track].keys.size() > pageNum + it->second.keyNum){ - stopTime = myMeta.tracks[track].keys[pageNum + it->second.keyNum].getTime(); + if ((int)myMeta.tracks[track].keys.size() > pageNum-1 + it->second.keyNum){ + stopTime = myMeta.tracks[track].keys[pageNum-1 + it->second.keyNum].getTime(); } - DEBUG_MSG(DLVL_HIGH, "Playing from %ld to %llu", myMeta.tracks[track].keys[pageNum].getTime(), stopTime); + DEBUG_MSG(DLVL_HIGH, "Playing from %ld to %llu", myMeta.tracks[track].keys[pageNum-1].getTime(), stopTime); + it->second.curOffset = 0; getNext(); while (lastPack && lastPack.getTime() < stopTime){ if (it->second.curOffset + lastPack.getDataLen() > pagesByTrack[track][pageNum].dataSize){ - DEBUG_MSG(DLVL_WARN, "Trying to write %u bytes past the end of page %u/%u", lastPack.getDataLen(), track, pageNum); - return true; + DEBUG_MSG(DLVL_WARN, "Trying to write %u bytes on pos %llu where size is %llu (time: %llu / %llu, track %u page %u)", lastPack.getDataLen(), it->second.curOffset, pagesByTrack[track][pageNum].dataSize, lastPack.getTime(), stopTime, track, pageNum); + break; }else{ memcpy(dataPages[track][pageNum].mapped + it->second.curOffset, lastPack.getData(), lastPack.getDataLen()); it->second.curOffset += lastPack.getDataLen(); diff --git a/src/output/output.cpp b/src/output/output.cpp index 486edb82..73304260 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -78,32 +78,11 @@ namespace Mist { } if (!Util::Stream::getStream(streamName)){ DEBUG_MSG(DLVL_FAIL, "Opening stream disallowed - aborting initalization"); + onFail(); return; } isInitialized = true; - streamIndex.init(streamName,0,false,false); - if (!streamIndex.mapped){ - sem_t * waiting = sem_open(std::string("/wait_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 0); - if (waiting == SEM_FAILED){ - DEBUG_MSG(DLVL_FAIL, "Failed to open semaphore - cancelling"); - onFail(); - return; - } - #ifdef __APPLE__ - unsigned int timeout = 0; - while (++timeout < 300 && sem_trywait(waiting) == -1 && (errno == EINTR || errno == EAGAIN) ){ - Util::sleep(100); - } - #else - struct timespec ts; - ts.tv_sec = Util::epoch() + 30; - ts.tv_nsec = 0; - while (sem_timedwait(waiting, &ts) == -1 && errno == EINTR) continue; - #endif - sem_post(waiting); - sem_close(waiting); - streamIndex.init(streamName,0); - } + streamIndex.init(streamName,0); if (!streamIndex.mapped){ DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str()); onFail(); @@ -233,7 +212,10 @@ namespace Mist { } unsigned int Output::getKeyForTime(long unsigned int trackId, long long timeStamp){ - unsigned int keyNo = 0; + if (!myMeta.tracks[trackId].keys.size()){ + return 0; + } + unsigned int keyNo = myMeta.tracks[trackId].keys.begin()->getNumber(); for (std::deque::iterator it = myMeta.tracks[trackId].keys.begin(); it != myMeta.tracks[trackId].keys.end(); it++){ if (it->getTime() <= timeStamp){ keyNo = it->getNumber(); @@ -245,11 +227,12 @@ namespace Mist { } void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ - if (keyNum >= myMeta.tracks[trackId].keys.rbegin()->getNumber()){ - //curPages.erase(trackId); + if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){ + curPages.erase(trackId); + currKeyOpen.erase(trackId); return; } - DEBUG_MSG(DLVL_MEDIUM, "Loading track %lu, containing key %lld", trackId, keyNum); + DEBUG_MSG(DLVL_HIGH, "Loading track %lu, containing key %lld", trackId, keyNum); int pageNum = -1; int keyAmount = -1; unsigned int timeout = 0; @@ -260,8 +243,9 @@ namespace Mist { } while (pageNum == -1 || keyAmount == -1){ for (int i = 0; i < indexPages[trackId].len / 8; i++){ - long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF); long amountKey = ntohl((((long long int*)indexPages[trackId].mapped)[i]) & 0xFFFFFFFF); + if (amountKey == 0){continue;} + long tmpKey = ntohl(((((long long int*)indexPages[trackId].mapped)[i]) >> 32) & 0xFFFFFFFF); if (tmpKey <= keyNum && (tmpKey + amountKey) > keyNum){ pageNum = tmpKey; keyAmount = amountKey; @@ -275,14 +259,25 @@ namespace Mist { if (timeout++ > 100){ DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting."); curPages.erase(trackId); + currKeyOpen.erase(trackId); return; } - nxtKeyNum[trackId] = keyNum-1; + if (keyNum){ + nxtKeyNum[trackId] = keyNum-1; + }else{ + nxtKeyNum[trackId] = 0; + } stats(); Util::sleep(100); } } + if (keyNum){ + nxtKeyNum[trackId] = keyNum-1; + }else{ + nxtKeyNum[trackId] = 0; + } + stats(); nxtKeyNum[trackId] = pageNum; if (currKeyOpen.count(trackId) && currKeyOpen[trackId] == pageNum){ @@ -292,7 +287,7 @@ namespace Mist { sprintf(id, "%s%lu_%d", streamName.c_str(), trackId, pageNum); curPages[trackId].init(std::string(id),0); if (!(curPages[trackId].mapped)){ - DEBUG_MSG(DLVL_FAIL, "(%d) Initializing page %s failed", getpid(), curPages[trackId].name.c_str()); + DEBUG_MSG(DLVL_FAIL, "Initializing page %s failed", curPages[trackId].name.c_str()); return; } currKeyOpen[trackId] = pageNum; @@ -308,6 +303,7 @@ namespace Mist { buffer.clear(); currentPacket.null(); updateMeta(); + DEBUG_MSG(DLVL_MEDIUM, "Seeking to %llims", pos); for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ seek(*it, pos); } @@ -339,6 +335,15 @@ namespace Mist { DEBUG_MSG(DLVL_FAIL, "Noes! Couldn't find packet on track %d because of some kind of corruption error or somesuch.", tid); }else{ DEBUG_MSG(DLVL_FAIL, "Track %d no data (key %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0)); + unsigned int i = 0; + while (curPages[tid].mapped[tmp.offset] == 0 && ++i < 10){ + Util::sleep(100); + } + if (curPages[tid].mapped[tmp.offset] == 0){ + DEBUG_MSG(DLVL_FAIL, "Track %d no data (key %u) - timeout", tid, getKeyForTime(tid, pos) + (getNextKey?1:0)); + }else{ + return seek(tid, pos, getNextKey); + } } return false; } @@ -352,7 +357,7 @@ namespace Mist { if (wantRequest){ if ((firstData && myConn.Received().size()) || myConn.spool()){ firstData = false; - DEBUG_MSG(DLVL_VERYHIGH, "(%d) OnRequest", getpid()); + DEBUG_MSG(DLVL_VERYHIGH, "OnRequest"); onRequest(); }else{ if (!isBlocking && !parseData){ @@ -365,7 +370,7 @@ namespace Mist { initialize(); } if ( !sentHeader){ - DEBUG_MSG(DLVL_VERYHIGH, "(%d) SendHeader", getpid()); + DEBUG_MSG(DLVL_VERYHIGH, "SendHeader"); sendHeader(); } prepareNext(); diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp index 4b49420f..eeb03b1f 100644 --- a/src/output/output_progressive_mp4.cpp +++ b/src/output/output_progressive_mp4.cpp @@ -299,26 +299,17 @@ namespace Mist { if (byteStart <= headerSize){return;} //okay, we're past the header. Substract the headersize from the starting postion. byteStart -= headerSize; - //initialize a list of sorted parts that this file contains - std::set sortSet; - for (std::set::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) { - keyPart temp; - temp.trackID = *subIt; - temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame - temp.endTime = myMeta.tracks[*subIt].firstms + myMeta.tracks[*subIt].parts[0].getDuration(); - temp.size = myMeta.tracks[*subIt].parts[0].getSize();//bytesize of frame (alle parts all together) - temp.index = 0; - sortSet.insert(temp); - } //forward through the file by headers, until we reach the point where we need to be while (!sortSet.empty()){ + //record where we are + seekPoint = sortSet.begin()->time; //substract the size of this fragment from byteStart byteStart -= sortSet.begin()->size; //if that put us past the point where we wanted to be, return right now if (byteStart < 0){return;} - //otherwise, set seekPoint to where we are now - seekPoint = sortSet.begin()->time; - //then find the next part + //otherwise, set currPos to where we are now and continue + currPos += sortSet.begin()->size; + //find the next part keyPart temp; temp.index = sortSet.begin()->index + 1; temp.trackID = sortSet.begin()->trackID; @@ -330,6 +321,7 @@ namespace Mist { } //remove highest keyPart sortSet.erase(sortSet.begin()); + //wash, rinse, repeat } //If we're here, we're in the last fragment. //That's technically legal, of course. @@ -359,13 +351,11 @@ namespace Mist { if (byteStart > byteEnd){ //entire file if starting before byte zero byteStart = 0; - DEBUG_MSG(DLVL_DEVEL, "Full negative range: %lli-%lli", byteStart, byteEnd); findSeekPoint(byteStart, seekPoint, headerSize); return; }else{ //start byteStart bytes before byteEnd byteStart = byteEnd - byteStart; - DEBUG_MSG(DLVL_DEVEL, "Partial negative range: %lli-%lli", byteStart, byteEnd); findSeekPoint(byteStart, seekPoint, headerSize); return; } @@ -401,41 +391,39 @@ namespace Mist { }else{ byteEnd = size; } - DEBUG_MSG(DLVL_DEVEL, "Range request: %lli-%lli (%s)", byteStart, byteEnd, header.c_str()); + DEBUG_MSG(DLVL_MEDIUM, "Range request: %lli-%lli (%s)", byteStart, byteEnd, header.c_str()); findSeekPoint(byteStart, seekPoint, headerSize); return; } } void OutProgressiveMP4::onRequest(){ - while (HTTP_R.Read(myConn)){ - DEBUG_MSG(DLVL_DEVEL, "Received request: %s", HTTP_R.getUrl().c_str()); + if (HTTP_R.Read(myConn)){ + DEBUG_MSG(DLVL_MEDIUM, "Received request: %s", HTTP_R.getUrl().c_str()); myConn.setHost(HTTP_R.GetHeader("X-Origin")); streamName = HTTP_R.GetHeader("X-Stream"); if (HTTP_R.GetVar("audio") != ""){ - DEBUG_MSG(DLVL_DEVEL, "GetVar Aud = %s", HTTP_R.GetVar("audio").c_str()); selectedTracks.insert(JSON::Value(HTTP_R.GetVar("audio")).asInt()); - }else{ - DEBUG_MSG(DLVL_DEVEL, "No audio param given"); } if (HTTP_R.GetVar("video") != ""){ - DEBUG_MSG(DLVL_DEVEL, "GetVar Vid = %s", HTTP_R.GetVar("video").c_str()); selectedTracks.insert(JSON::Value(HTTP_R.GetVar("video")).asInt()); - }else{ - DEBUG_MSG(DLVL_DEVEL, "No video param given"); } parseData = true; wantRequest = false; + sentHeader = false; } } + /* bool OutProgressiveMP4::onFinish(){ + //HTTP_S.Chunkify("", myConn); HTTP_R.Clean(); parseData = false; wantRequest = true; return true; } + */ void OutProgressiveMP4::onFail(){ HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers @@ -448,9 +436,14 @@ namespace Mist { char * dataPointer = 0; int len = 0; currentPacket.getString("data", dataPointer, len); - - //keep track of where we are - fast-forward until where we are now - while (!sortSet.empty() && ((long long)sortSet.begin()->trackID != currentPacket.getTrackId() || (long long)sortSet.begin()->time != currentPacket.getTime())){ + if (currentPacket.getTrackId() != sortSet.begin()->trackID || currentPacket.getTime() != sortSet.begin()->time){ + DEBUG_MSG(DLVL_WARN, "Warning: current packet %ld_%llu does not match expected packet %ld_%llu. We're most likely sending out corrupt data at this point. Have a nice day.", currentPacket.getTrackId(), currentPacket.getTime(), sortSet.begin()->trackID, sortSet.begin()->time); + stop(); + myConn.close(); + return; + } + //keep track of where we are + if (!sortSet.empty()){ keyPart temp; temp.index = sortSet.begin()->index + 1; temp.trackID = sortSet.begin()->trackID; @@ -464,8 +457,9 @@ namespace Mist { //remove highest keyPart sortSet.erase(sortSet.begin()); } + + if (currPos >= byteStart){ - sortSet.clear();//we don't need you anymore! myConn.SendNow(dataPointer, std::min(leftOver, (long long)len)); //HTTP_S.Chunkify(Strm.lastData().data(), Strm.lastData().size(), conn); leftOver -= len; @@ -474,9 +468,9 @@ namespace Mist { myConn.SendNow(dataPointer+(byteStart-currPos), len-(byteStart-currPos)); leftOver -= len-(byteStart-currPos); currPos = byteStart; - sortSet.clear();//we don't need you anymore! } } + //sortSet.clear();//we don't need you anymore! if (leftOver < 1){ //stop playback, wait for new request stop(); @@ -491,10 +485,7 @@ namespace Mist { byteEnd = fileSize - 1; long long seekPoint = 0; char rangeType = ' '; - if (HTTP_R.GetHeader("Range") != ""){ - parseRange(HTTP_R.GetHeader("Range"), byteStart, byteEnd, seekPoint, headerData.size()); - rangeType = HTTP_R.GetHeader("Range")[0]; - } + currPos = 0; sortSet.clear(); for (std::set::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) { keyPart temp; @@ -505,11 +496,14 @@ namespace Mist { temp.index = 0; sortSet.insert(temp); } + if (HTTP_R.GetHeader("Range") != ""){ + parseRange(HTTP_R.GetHeader("Range"), byteStart, byteEnd, seekPoint, headerData.size()); + rangeType = HTTP_R.GetHeader("Range")[0]; + } HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers HTTP_S.SetHeader("Content-Type", "video/MP4"); //Send the correct content-type for MP4 files HTTP_S.SetHeader("Accept-Ranges", "bytes, parsec"); if (rangeType != ' '){ - DEBUG_MSG(DLVL_DEVEL, "Ranged request"); if (!byteEnd){ if (rangeType == 'p'){ HTTP_S.SetBody("Starsystem not in communications range"); @@ -534,7 +528,6 @@ namespace Mist { //HTTP_S.StartResponse("206", "Partial content", HTTP_R, conn); } }else{ - DEBUG_MSG(DLVL_DEVEL, "Non-Ranged request"); HTTP_S.SetHeader("Content-Length", byteEnd - byteStart + 1); //do not multiplex requests that aren't ranged HTTP_S.SetHeader("MistMultiplex", "No"); @@ -543,14 +536,13 @@ namespace Mist { //HTTP_S.StartResponse(HTTP_R, conn); } leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data - currPos = 0; if (byteStart < (long long)headerData.size()){ /// \todo Switch to chunked? //HTTP_S.Chunkify(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart, conn);//send MP4 header myConn.SendNow(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart);//send MP4 header leftOver -= std::min((long long)headerData.size(), byteEnd) - byteStart; } - currPos = headerData.size();//we're now guaranteed to be past the header point, no matter what + currPos += headerData.size();//we're now guaranteed to be past the header point, no matter what seek(seekPoint); sentHeader = true; } diff --git a/src/output/output_progressive_mp4.h b/src/output/output_progressive_mp4.h index f82f24a5..41efcd92 100644 --- a/src/output/output_progressive_mp4.h +++ b/src/output/output_progressive_mp4.h @@ -33,7 +33,7 @@ namespace Mist { void onRequest(); void sendNext(); - bool onFinish(); + //bool onFinish(); void sendHeader(); void onFail(); protected: