Edits to DTSC lib for live buffering

This commit is contained in:
Erik Zandvliet 2013-02-19 10:57:23 +01:00
parent 1baff448f2
commit a69583fd49
2 changed files with 76 additions and 18 deletions

View file

@ -13,17 +13,19 @@ DTSC::Stream::Stream(){
datapointertype = DTSC::INVALID; datapointertype = DTSC::INVALID;
datapointer = 0; datapointer = 0;
buffercount = 1; buffercount = 1;
buffertime = 0;
} }
/// Initializes a DTSC::Stream with a minimum of rbuffers packet buffers. /// Initializes a DTSC::Stream with a minimum of rbuffers packet buffers.
/// The actual buffer count may not at all times be the requested amount. /// The actual buffer count may not at all times be the requested amount.
DTSC::Stream::Stream(unsigned int rbuffers){ DTSC::Stream::Stream(unsigned int rbuffers, unsigned int bufferTime){
datapointertype = DTSC::INVALID; datapointertype = DTSC::INVALID;
datapointer = 0; datapointer = 0;
if (rbuffers < 1){ if (rbuffers < 1){
rbuffers = 1; rbuffers = 1;
} }
buffercount = rbuffers; buffercount = rbuffers;
buffertime = bufferTime;
} }
/// Returns the time in milliseconds of the last received packet. /// Returns the time in milliseconds of the last received packet.
@ -47,6 +49,7 @@ bool DTSC::Stream::parsePacket(std::string & buffer){
} }
unsigned int i = 0; unsigned int i = 0;
metadata = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i); metadata = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i);
metadata.removeMember( "moreheader" );
buffer.erase(0, len + 8); buffer.erase(0, len + 8);
if (buffer.length() <= 8){ if (buffer.length() <= 8){
return false; return false;
@ -122,6 +125,7 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){
unsigned int i = 0; unsigned int i = 0;
std::string wholepacket = buffer.remove(len + 8); std::string wholepacket = buffer.remove(len + 8);
metadata = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i); metadata = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i);
metadata.removeMember( "moreheader" );
if (!buffer.available(8)){ if (!buffer.available(8)){
return false; return false;
} }
@ -220,29 +224,44 @@ std::string & DTSC::Stream::outHeader(){
void DTSC::Stream::advanceRings(){ void DTSC::Stream::advanceRings(){
std::deque<DTSC::Ring>::iterator dit; std::deque<DTSC::Ring>::iterator dit;
std::set<DTSC::Ring *>::iterator sit; std::set<DTSC::Ring *>::iterator sit;
for (sit = rings.begin(); sit != rings.end(); sit++){ if (rings.size()) {
( *sit)->b++; for (sit = rings.begin(); sit != rings.end(); sit++){
if (( *sit)->waiting){ ( *sit)->b++;
( *sit)->waiting = false; if (( *sit)->waiting){
( *sit)->b = 0; ( *sit)->waiting = false;
} ( *sit)->b = 0;
if (( *sit)->starved || (( *sit)->b >= buffers.size())){ }
( *sit)->starved = true; if (( *sit)->starved || (( *sit)->b >= buffers.size())){
( *sit)->b = 0; ( *sit)->starved = true;
( *sit)->b = 0;
}
} }
} }
for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ if (keyframes.size()){
dit->b++; for (dit = keyframes.begin(); dit != keyframes.end(); dit++){
if (dit->b >= buffers.size()){ dit->b++;
keyframes.erase(dit);
break;
} }
bool repeat;
do {
repeat = false;
for (dit = keyframes.begin(); dit != keyframes.end(); dit++){
if (dit->b >= buffers.size()){
keyframes.erase(dit);
repeat = true;
break;
}
}
} while( repeat );
} }
if ((lastType() == VIDEO) && (buffers.front().isMember("keyframe"))){ if ((lastType() == VIDEO) && (buffers.front().isMember("keyframe"))){
keyframes.push_front(DTSC::Ring(0)); keyframes.push_front(DTSC::Ring(0));
} }
//increase buffer size if no keyframes available int timeBuffered = 0;
if ((buffercount > 1) && (keyframes.size() < 1)){ if (keyframes.size() > 1){
//increase buffer size if no keyframes available or too little time available
timeBuffered = buffers[keyframes[keyframes.size() - 1].b]["time"].asInt() - buffers[keyframes[0].b]["time"].asInt();
}
if (((buffercount > 1) && (keyframes.size() < 2)) || (timeBuffered < buffertime)){
buffercount++; buffercount++;
} }
} }
@ -278,6 +297,34 @@ void DTSC::Stream::dropRing(DTSC::Ring * ptr){
} }
} }
void DTSC::Stream::updateHeaders(){
if( keyframes.size() > 1 ) {
metadata["keytime"].shrink(keyframes.size() - 2);
metadata["keytime"].append(buffers[keyframes[1].b]["time"].asInt());
metadata.toPacked();
updateRingHeaders();
}
}
void DTSC::Stream::updateRingHeaders(){
std::set<DTSC::Ring *>::iterator sit;
if (!rings.size()){
return;
}
for (sit = rings.begin(); sit != rings.end(); sit++){
( *sit)->updated = true;
}
}
unsigned int DSTSC::Stream::msSeek(unsigned int ms) {
for( std::deque<DTSC::Ring>::iterator it = keyframes.begin(); it != keyframes.end(); it++ ) {
if( buffers[it->b]["time"].asInt( ) < ms ) {
return it->b;
}
}
///\todo Send PauseMark
}
/// Properly cleans up the object for erasing. /// Properly cleans up the object for erasing.
/// Drops all Ring classes that have been given out. /// Drops all Ring classes that have been given out.
DTSC::Stream::~Stream(){ DTSC::Stream::~Stream(){
@ -438,6 +485,11 @@ void DTSC::File::seekNext(){
jsonbuffer.null(); jsonbuffer.null();
return; return;
} }
if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){
readHeader(lastreadpos);
jsonbuffer = metadata;
return;
}
if (memcmp(buffer, DTSC::Magic_Packet, 4) != 0){ if (memcmp(buffer, DTSC::Magic_Packet, 4) != 0){
fprintf(stderr, "Invalid header - %.4s != %.4s\n", buffer, DTSC::Magic_Packet); fprintf(stderr, "Invalid header - %.4s != %.4s\n", buffer, DTSC::Magic_Packet);
strbuffer = ""; strbuffer = "";

View file

@ -56,6 +56,7 @@ namespace DTSC {
VIDEO, ///< Stream Video data VIDEO, ///< Stream Video data
META, ///< Stream Metadata META, ///< Stream Metadata
PAUSEMARK, ///< Pause marker PAUSEMARK, ///< Pause marker
MODIFIEDHEADER, ///< Modified header data.
INVALID ///< Anything else or no data available. INVALID ///< Anything else or no data available.
}; };
@ -100,6 +101,7 @@ namespace DTSC {
volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly!
volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill. volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill.
volatile bool starved; ///< If true, this Ring can no longer receive valid data. volatile bool starved; ///< If true, this Ring can no longer receive valid data.
volatile bool updated; ///< If true, this Ring should write a new header.
}; };
/// Holds temporary data for a DTSC stream and provides functions to utilize it. /// Holds temporary data for a DTSC stream and provides functions to utilize it.
@ -109,7 +111,7 @@ namespace DTSC {
public: public:
Stream(); Stream();
~Stream(); ~Stream();
Stream(unsigned int buffers); Stream(unsigned int buffers, unsigned int bufferTime = 0);
JSON::Value metadata; JSON::Value metadata;
JSON::Value & getPacket(unsigned int num = 0); JSON::Value & getPacket(unsigned int num = 0);
datatype lastType(); datatype lastType();
@ -123,13 +125,17 @@ namespace DTSC {
Ring * getRing(); Ring * getRing();
unsigned int getTime(); unsigned int getTime();
void dropRing(Ring * ptr); void dropRing(Ring * ptr);
void updateHeaders();
unsigned int msSeek(unsigned int ms);
private: private:
std::deque<JSON::Value> buffers; std::deque<JSON::Value> buffers;
std::set<DTSC::Ring *> rings; std::set<DTSC::Ring *> rings;
std::deque<DTSC::Ring> keyframes; std::deque<DTSC::Ring> keyframes;
void advanceRings(); void advanceRings();
void updateRingHeaders();
std::string * datapointer; std::string * datapointer;
datatype datapointertype; datatype datapointertype;
unsigned int buffercount; unsigned int buffercount;
unsigned int buffertime;
}; };
} }