From a69583fd49954b274e2bca2f4942f6e366f41349 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Tue, 19 Feb 2013 10:57:23 +0100 Subject: [PATCH] Edits to DTSC lib for live buffering --- lib/dtsc.cpp | 86 +++++++++++++++++++++++++++++++++++++++++----------- lib/dtsc.h | 8 ++++- 2 files changed, 76 insertions(+), 18 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 5814ede3..11762845 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -13,17 +13,19 @@ DTSC::Stream::Stream(){ datapointertype = DTSC::INVALID; datapointer = 0; buffercount = 1; + buffertime = 0; } /// Initializes a DTSC::Stream with a minimum of rbuffers packet buffers. /// The actual buffer count may not at all times be the requested amount. -DTSC::Stream::Stream(unsigned int rbuffers){ +DTSC::Stream::Stream(unsigned int rbuffers, unsigned int bufferTime){ datapointertype = DTSC::INVALID; datapointer = 0; if (rbuffers < 1){ rbuffers = 1; } buffercount = rbuffers; + buffertime = bufferTime; } /// Returns the time in milliseconds of the last received packet. @@ -47,6 +49,7 @@ bool DTSC::Stream::parsePacket(std::string & buffer){ } unsigned int i = 0; metadata = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i); + metadata.removeMember( "moreheader" ); buffer.erase(0, len + 8); if (buffer.length() <= 8){ return false; @@ -122,6 +125,7 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){ unsigned int i = 0; std::string wholepacket = buffer.remove(len + 8); metadata = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i); + metadata.removeMember( "moreheader" ); if (!buffer.available(8)){ return false; } @@ -220,29 +224,44 @@ std::string & DTSC::Stream::outHeader(){ void DTSC::Stream::advanceRings(){ std::deque::iterator dit; std::set::iterator sit; - for (sit = rings.begin(); sit != rings.end(); sit++){ - ( *sit)->b++; - if (( *sit)->waiting){ - ( *sit)->waiting = false; - ( *sit)->b = 0; - } - if (( *sit)->starved || (( *sit)->b >= buffers.size())){ - ( *sit)->starved = true; - ( *sit)->b = 0; + if (rings.size()) { + for (sit = rings.begin(); sit != rings.end(); sit++){ + ( *sit)->b++; + if (( *sit)->waiting){ + ( *sit)->waiting = false; + ( *sit)->b = 0; + } + if (( *sit)->starved || (( *sit)->b >= buffers.size())){ + ( *sit)->starved = true; + ( *sit)->b = 0; + } } } - for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ - dit->b++; - if (dit->b >= buffers.size()){ - keyframes.erase(dit); - break; + if (keyframes.size()){ + for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ + dit->b++; } + bool repeat; + do { + repeat = false; + for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ + if (dit->b >= buffers.size()){ + keyframes.erase(dit); + repeat = true; + break; + } + } + } while( repeat ); } if ((lastType() == VIDEO) && (buffers.front().isMember("keyframe"))){ keyframes.push_front(DTSC::Ring(0)); } - //increase buffer size if no keyframes available - if ((buffercount > 1) && (keyframes.size() < 1)){ + int timeBuffered = 0; + if (keyframes.size() > 1){ + //increase buffer size if no keyframes available or too little time available + timeBuffered = buffers[keyframes[keyframes.size() - 1].b]["time"].asInt() - buffers[keyframes[0].b]["time"].asInt(); + } + if (((buffercount > 1) && (keyframes.size() < 2)) || (timeBuffered < buffertime)){ buffercount++; } } @@ -278,6 +297,34 @@ void DTSC::Stream::dropRing(DTSC::Ring * ptr){ } } +void DTSC::Stream::updateHeaders(){ + if( keyframes.size() > 1 ) { + metadata["keytime"].shrink(keyframes.size() - 2); + metadata["keytime"].append(buffers[keyframes[1].b]["time"].asInt()); + metadata.toPacked(); + updateRingHeaders(); + } +} + +void DTSC::Stream::updateRingHeaders(){ + std::set::iterator sit; + if (!rings.size()){ + return; + } + for (sit = rings.begin(); sit != rings.end(); sit++){ + ( *sit)->updated = true; + } +} + +unsigned int DSTSC::Stream::msSeek(unsigned int ms) { + for( std::deque::iterator it = keyframes.begin(); it != keyframes.end(); it++ ) { + if( buffers[it->b]["time"].asInt( ) < ms ) { + return it->b; + } + } + ///\todo Send PauseMark +} + /// Properly cleans up the object for erasing. /// Drops all Ring classes that have been given out. DTSC::Stream::~Stream(){ @@ -438,6 +485,11 @@ void DTSC::File::seekNext(){ jsonbuffer.null(); return; } + if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){ + readHeader(lastreadpos); + jsonbuffer = metadata; + return; + } if (memcmp(buffer, DTSC::Magic_Packet, 4) != 0){ fprintf(stderr, "Invalid header - %.4s != %.4s\n", buffer, DTSC::Magic_Packet); strbuffer = ""; diff --git a/lib/dtsc.h b/lib/dtsc.h index f3dd687d..2855809b 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -56,6 +56,7 @@ namespace DTSC { VIDEO, ///< Stream Video data META, ///< Stream Metadata PAUSEMARK, ///< Pause marker + MODIFIEDHEADER, ///< Modified header data. INVALID ///< Anything else or no data available. }; @@ -100,6 +101,7 @@ namespace DTSC { volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill. volatile bool starved; ///< If true, this Ring can no longer receive valid data. + volatile bool updated; ///< If true, this Ring should write a new header. }; /// Holds temporary data for a DTSC stream and provides functions to utilize it. @@ -109,7 +111,7 @@ namespace DTSC { public: Stream(); ~Stream(); - Stream(unsigned int buffers); + Stream(unsigned int buffers, unsigned int bufferTime = 0); JSON::Value metadata; JSON::Value & getPacket(unsigned int num = 0); datatype lastType(); @@ -123,13 +125,17 @@ namespace DTSC { Ring * getRing(); unsigned int getTime(); void dropRing(Ring * ptr); + void updateHeaders(); + unsigned int msSeek(unsigned int ms); private: std::deque buffers; std::set rings; std::deque keyframes; void advanceRings(); + void updateRingHeaders(); std::string * datapointer; datatype datapointertype; unsigned int buffercount; + unsigned int buffertime; }; }