From e8f973b2e702dfe8fd16615cf8aa64d4e303848a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 12 Feb 2014 15:31:06 +0100 Subject: [PATCH] Added DTSC::Stream::waitForPause() function to sync streams, fixed various MistPlayer misbehaviours. --- lib/dtsc.cpp | 90 +++++++++++++++++++++++++++++++++++++--------------- lib/dtsc.h | 3 +- 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 6b373e45..ac147c8c 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -212,6 +212,38 @@ void DTSC::Stream::waitForMeta(Socket::Connection & sourceSocket){ } } +/// Blocks until either the stream encounters a pause mark or the sourceSocket errors. +/// This function is intended to be run after the 'q' command is sent, throwing away superfluous packets. +/// It will time out after 5 seconds, disconnecting the sourceSocket. +void DTSC::Stream::waitForPause(Socket::Connection & sourceSocket){ + //cancel the attempt after 5000 milliseconds + long long int start = Util::getMS(); + while (lastType() != DTSC::PAUSEMARK && sourceSocket.connected() && Util::getMS() - start < 5000){ + //we have data? parse it + if (sourceSocket.Received().size()){ + //return value is ignored because we're not interested. + parsePacket(sourceSocket.Received()); + } + //still no pause mark? check for more data + if (lastType() != DTSC::PAUSEMARK){ + if (sourceSocket.spool()){ + //more received? attempt to read + //return value is ignored because we're not interested in data packets, just metadata. + parsePacket(sourceSocket.Received()); + }else{ + //nothing extra to receive? wait a bit and retry + Util::sleep(5); + } + } + } + //if the timeout has passed, close the socket + if (Util::getMS() - start >= 5000){ + sourceSocket.close(); + //and optionally print a debug message that this happened + DEBUG_MSG(DLVL_DEVEL, "Timing out while waiting for pause break"); + } +} + /// Resets the stream by clearing the buffers and keyframes, making sure to call the deletionCallback first. void DTSC::Stream::resetStream(){ for (std::map::iterator it = buffers.begin(); it != buffers.end(); it++){ @@ -674,10 +706,12 @@ bool DTSC::File::reachedEOF(){ /// Reading the packet means the file position is increased to the next packet. void DTSC::File::seekNext(){ if ( !currentPositions.size()){ + DEBUG_MSG(DLVL_HIGH, "No seek positions set - returning empty packet."); strbuffer = ""; jsonbuffer.null(); return; } + DEBUG_MSG(DLVL_HIGH, "Seeking to %uT%lli @ %llu", currentPositions.begin()->trackID, currentPositions.begin()->seekTime, currentPositions.begin()->bytePos); fseek(F,currentPositions.begin()->bytePos, SEEK_SET); if ( reachedEOF()){ strbuffer = ""; @@ -687,8 +721,8 @@ void DTSC::File::seekNext(){ clearerr(F); if ( !metadata.merged){ seek_time(currentPositions.begin()->seekTime + 1, currentPositions.begin()->trackID); + fseek(F,currentPositions.begin()->bytePos, SEEK_SET); } - fseek(F,currentPositions.begin()->bytePos, SEEK_SET); currentPositions.erase(currentPositions.begin()); lastreadpos = ftell(F); if (fread(buffer, 4, 1, F) != 1){ @@ -702,9 +736,8 @@ void DTSC::File::seekNext(){ return; } if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){ - readHeader(lastreadpos); - jsonbuffer = metadata.toJSON(); - return; + seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true); + return seekNext(); } long long unsigned int version = 0; if (memcmp(buffer, DTSC::Magic_Packet, 4) == 0){ @@ -737,20 +770,24 @@ void DTSC::File::seekNext(){ if (version == 2){ JSON::fromDTMI2(strbuffer, jsonbuffer); }else{ - JSON::fromDTMI(strbuffer, jsonbuffer); + if (version == 1){ + JSON::fromDTMI(strbuffer, jsonbuffer); + } } if ( metadata.merged){ int tempLoc = getBytePos(); char newHeader[20]; + bool insert = false; + seekPos tmpPos; if (fread((void*)newHeader, 20, 1, F) == 1){ if (memcmp(newHeader, DTSC::Magic_Packet2, 4) == 0){ - seekPos tmpPos; tmpPos.bytePos = tempLoc; tmpPos.trackID = ntohl(((int*)newHeader)[2]); tmpPos.seekTime = 0; if (selectedTracks.find(tmpPos.trackID) != selectedTracks.end()){ tmpPos.seekTime = ((long long unsigned int)ntohl(((int*)newHeader)[3])) << 32; tmpPos.seekTime += ntohl(((int*)newHeader)[4]); + insert = true; }else{ long tid = jsonbuffer["trackid"].asInt(); for (unsigned int i = 0; i != metadata.tracks[tid].keyLen; i++){ @@ -758,24 +795,26 @@ void DTSC::File::seekNext(){ tmpPos.seekTime = metadata.tracks[tid].keys[i].getTime(); tmpPos.bytePos = metadata.tracks[tid].keys[i].getBpos(); tmpPos.trackID = tid; + insert = true; break; } } } - bool insert = true; - for (std::set::iterator curPosIter = currentPositions.begin(); curPosIter != currentPositions.end(); curPosIter++){ - if ((*curPosIter).trackID == tmpPos.trackID && (*curPosIter).seekTime >= tmpPos.seekTime){ - insert = false; - break; + if (currentPositions.size()){ + for (std::set::iterator curPosIter = currentPositions.begin(); curPosIter != currentPositions.end(); curPosIter++){ + if ((*curPosIter).trackID == tmpPos.trackID && (*curPosIter).seekTime >= tmpPos.seekTime){ + insert = false; + break; + } } } - if (insert){ - currentPositions.insert(tmpPos); - }else{ - seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true); - } } } + if (insert){ + currentPositions.insert(tmpPos); + }else{ + seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true); + } } } @@ -885,6 +924,9 @@ bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){ tmpPos.bytePos = metadata.tracks[trackNo].keys[i].getBpos(); } } + if (reachedEOF()){ + clearerr(F); + } bool foundPacket = false; while ( !foundPacket){ lastreadpos = ftell(F); @@ -914,6 +956,7 @@ bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){ continue; } } + DEBUG_MSG(DLVL_HIGH, "Seek to %d:%d resulted in %lli", trackNo, ms, tmpPos.seekTime); currentPositions.insert(tmpPos); return true; } @@ -922,10 +965,10 @@ bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){ /// Returns true if successful, false otherwise. bool DTSC::File::seek_time(unsigned int ms){ currentPositions.clear(); - /// \todo Check this. Doesn't seem right? - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - seek_bpos(0); - seek_time(ms,(*it)); + if (selectedTracks.size()){ + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ + seek_time(ms,(*it)); + } } return true; } @@ -973,11 +1016,8 @@ bool DTSC::File::atKeyframe(){ void DTSC::File::selectTracks(std::set & tracks){ selectedTracks = tracks; - if ( !currentPositions.size()){ - seek_time(0); - }else{ - currentPositions.clear(); - } + currentPositions.clear(); + seek_time(0); } /// Close the file if open diff --git a/lib/dtsc.h b/lib/dtsc.h index 7e66519e..9650ee68 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -301,7 +301,8 @@ namespace DTSC { DTSC::livePos getNext(DTSC::livePos & pos, std::set & allowedTracks); void endStream(); void waitForMeta(Socket::Connection & sourceSocket); - protected: + void waitForPause(Socket::Connection & sourceSocket); + protected: void cutOneBuffer(); void resetStream(); std::map buffers;