Added DTSC::Stream::waitForPause() function to sync streams, fixed various MistPlayer misbehaviours.

This commit is contained in:
Thulinma 2014-02-12 15:31:06 +01:00
parent 1e8c394b95
commit e8f973b2e7
2 changed files with 67 additions and 26 deletions

View file

@ -212,6 +212,38 @@ void DTSC::Stream::waitForMeta(Socket::Connection & sourceSocket){
} }
} }
/// Blocks until either the stream encounters a pause mark or the sourceSocket errors.
/// This function is intended to be run after the 'q' command is sent, throwing away superfluous packets.
/// It will time out after 5 seconds, disconnecting the sourceSocket.
void DTSC::Stream::waitForPause(Socket::Connection & sourceSocket){
//cancel the attempt after 5000 milliseconds
long long int start = Util::getMS();
while (lastType() != DTSC::PAUSEMARK && sourceSocket.connected() && Util::getMS() - start < 5000){
//we have data? parse it
if (sourceSocket.Received().size()){
//return value is ignored because we're not interested.
parsePacket(sourceSocket.Received());
}
//still no pause mark? check for more data
if (lastType() != DTSC::PAUSEMARK){
if (sourceSocket.spool()){
//more received? attempt to read
//return value is ignored because we're not interested in data packets, just metadata.
parsePacket(sourceSocket.Received());
}else{
//nothing extra to receive? wait a bit and retry
Util::sleep(5);
}
}
}
//if the timeout has passed, close the socket
if (Util::getMS() - start >= 5000){
sourceSocket.close();
//and optionally print a debug message that this happened
DEBUG_MSG(DLVL_DEVEL, "Timing out while waiting for pause break");
}
}
/// Resets the stream by clearing the buffers and keyframes, making sure to call the deletionCallback first. /// Resets the stream by clearing the buffers and keyframes, making sure to call the deletionCallback first.
void DTSC::Stream::resetStream(){ void DTSC::Stream::resetStream(){
for (std::map<livePos, JSON::Value >::iterator it = buffers.begin(); it != buffers.end(); it++){ for (std::map<livePos, JSON::Value >::iterator it = buffers.begin(); it != buffers.end(); it++){
@ -674,10 +706,12 @@ bool DTSC::File::reachedEOF(){
/// Reading the packet means the file position is increased to the next packet. /// Reading the packet means the file position is increased to the next packet.
void DTSC::File::seekNext(){ void DTSC::File::seekNext(){
if ( !currentPositions.size()){ if ( !currentPositions.size()){
DEBUG_MSG(DLVL_HIGH, "No seek positions set - returning empty packet.");
strbuffer = ""; strbuffer = "";
jsonbuffer.null(); jsonbuffer.null();
return; return;
} }
DEBUG_MSG(DLVL_HIGH, "Seeking to %uT%lli @ %llu", currentPositions.begin()->trackID, currentPositions.begin()->seekTime, currentPositions.begin()->bytePos);
fseek(F,currentPositions.begin()->bytePos, SEEK_SET); fseek(F,currentPositions.begin()->bytePos, SEEK_SET);
if ( reachedEOF()){ if ( reachedEOF()){
strbuffer = ""; strbuffer = "";
@ -687,8 +721,8 @@ void DTSC::File::seekNext(){
clearerr(F); clearerr(F);
if ( !metadata.merged){ if ( !metadata.merged){
seek_time(currentPositions.begin()->seekTime + 1, currentPositions.begin()->trackID); seek_time(currentPositions.begin()->seekTime + 1, currentPositions.begin()->trackID);
fseek(F,currentPositions.begin()->bytePos, SEEK_SET);
} }
fseek(F,currentPositions.begin()->bytePos, SEEK_SET);
currentPositions.erase(currentPositions.begin()); currentPositions.erase(currentPositions.begin());
lastreadpos = ftell(F); lastreadpos = ftell(F);
if (fread(buffer, 4, 1, F) != 1){ if (fread(buffer, 4, 1, F) != 1){
@ -702,9 +736,8 @@ void DTSC::File::seekNext(){
return; return;
} }
if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){ if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){
readHeader(lastreadpos); seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true);
jsonbuffer = metadata.toJSON(); return seekNext();
return;
} }
long long unsigned int version = 0; long long unsigned int version = 0;
if (memcmp(buffer, DTSC::Magic_Packet, 4) == 0){ if (memcmp(buffer, DTSC::Magic_Packet, 4) == 0){
@ -737,20 +770,24 @@ void DTSC::File::seekNext(){
if (version == 2){ if (version == 2){
JSON::fromDTMI2(strbuffer, jsonbuffer); JSON::fromDTMI2(strbuffer, jsonbuffer);
}else{ }else{
JSON::fromDTMI(strbuffer, jsonbuffer); if (version == 1){
JSON::fromDTMI(strbuffer, jsonbuffer);
}
} }
if ( metadata.merged){ if ( metadata.merged){
int tempLoc = getBytePos(); int tempLoc = getBytePos();
char newHeader[20]; char newHeader[20];
bool insert = false;
seekPos tmpPos;
if (fread((void*)newHeader, 20, 1, F) == 1){ if (fread((void*)newHeader, 20, 1, F) == 1){
if (memcmp(newHeader, DTSC::Magic_Packet2, 4) == 0){ if (memcmp(newHeader, DTSC::Magic_Packet2, 4) == 0){
seekPos tmpPos;
tmpPos.bytePos = tempLoc; tmpPos.bytePos = tempLoc;
tmpPos.trackID = ntohl(((int*)newHeader)[2]); tmpPos.trackID = ntohl(((int*)newHeader)[2]);
tmpPos.seekTime = 0; tmpPos.seekTime = 0;
if (selectedTracks.find(tmpPos.trackID) != selectedTracks.end()){ if (selectedTracks.find(tmpPos.trackID) != selectedTracks.end()){
tmpPos.seekTime = ((long long unsigned int)ntohl(((int*)newHeader)[3])) << 32; tmpPos.seekTime = ((long long unsigned int)ntohl(((int*)newHeader)[3])) << 32;
tmpPos.seekTime += ntohl(((int*)newHeader)[4]); tmpPos.seekTime += ntohl(((int*)newHeader)[4]);
insert = true;
}else{ }else{
long tid = jsonbuffer["trackid"].asInt(); long tid = jsonbuffer["trackid"].asInt();
for (unsigned int i = 0; i != metadata.tracks[tid].keyLen; i++){ for (unsigned int i = 0; i != metadata.tracks[tid].keyLen; i++){
@ -758,24 +795,26 @@ void DTSC::File::seekNext(){
tmpPos.seekTime = metadata.tracks[tid].keys[i].getTime(); tmpPos.seekTime = metadata.tracks[tid].keys[i].getTime();
tmpPos.bytePos = metadata.tracks[tid].keys[i].getBpos(); tmpPos.bytePos = metadata.tracks[tid].keys[i].getBpos();
tmpPos.trackID = tid; tmpPos.trackID = tid;
insert = true;
break; break;
} }
} }
} }
bool insert = true; if (currentPositions.size()){
for (std::set<seekPos>::iterator curPosIter = currentPositions.begin(); curPosIter != currentPositions.end(); curPosIter++){ for (std::set<seekPos>::iterator curPosIter = currentPositions.begin(); curPosIter != currentPositions.end(); curPosIter++){
if ((*curPosIter).trackID == tmpPos.trackID && (*curPosIter).seekTime >= tmpPos.seekTime){ if ((*curPosIter).trackID == tmpPos.trackID && (*curPosIter).seekTime >= tmpPos.seekTime){
insert = false; insert = false;
break; break;
}
} }
} }
if (insert){
currentPositions.insert(tmpPos);
}else{
seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true);
}
} }
} }
if (insert){
currentPositions.insert(tmpPos);
}else{
seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true);
}
} }
} }
@ -885,6 +924,9 @@ bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){
tmpPos.bytePos = metadata.tracks[trackNo].keys[i].getBpos(); tmpPos.bytePos = metadata.tracks[trackNo].keys[i].getBpos();
} }
} }
if (reachedEOF()){
clearerr(F);
}
bool foundPacket = false; bool foundPacket = false;
while ( !foundPacket){ while ( !foundPacket){
lastreadpos = ftell(F); lastreadpos = ftell(F);
@ -914,6 +956,7 @@ bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){
continue; continue;
} }
} }
DEBUG_MSG(DLVL_HIGH, "Seek to %d:%d resulted in %lli", trackNo, ms, tmpPos.seekTime);
currentPositions.insert(tmpPos); currentPositions.insert(tmpPos);
return true; return true;
} }
@ -922,10 +965,10 @@ bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){
/// Returns true if successful, false otherwise. /// Returns true if successful, false otherwise.
bool DTSC::File::seek_time(unsigned int ms){ bool DTSC::File::seek_time(unsigned int ms){
currentPositions.clear(); currentPositions.clear();
/// \todo Check this. Doesn't seem right? if (selectedTracks.size()){
for (std::set<int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ for (std::set<int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
seek_bpos(0); seek_time(ms,(*it));
seek_time(ms,(*it)); }
} }
return true; return true;
} }
@ -973,11 +1016,8 @@ bool DTSC::File::atKeyframe(){
void DTSC::File::selectTracks(std::set<int> & tracks){ void DTSC::File::selectTracks(std::set<int> & tracks){
selectedTracks = tracks; selectedTracks = tracks;
if ( !currentPositions.size()){ currentPositions.clear();
seek_time(0); seek_time(0);
}else{
currentPositions.clear();
}
} }
/// Close the file if open /// Close the file if open

View file

@ -301,7 +301,8 @@ namespace DTSC {
DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks); DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks);
void endStream(); void endStream();
void waitForMeta(Socket::Connection & sourceSocket); void waitForMeta(Socket::Connection & sourceSocket);
protected: void waitForPause(Socket::Connection & sourceSocket);
protected:
void cutOneBuffer(); void cutOneBuffer();
void resetStream(); void resetStream();
std::map<livePos,JSON::Value> buffers; std::map<livePos,JSON::Value> buffers;