From ae0da6955c798b223d48f837ea2c8185a8d28970 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Mon, 17 Jun 2013 13:32:50 +0200 Subject: [PATCH] initial (broken) commit --- lib/dtsc.cpp | 156 ++++++++++++++++++--------------------------------- lib/dtsc.h | 31 ++++++++-- 2 files changed, 81 insertions(+), 106 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 786140a2..e319cd66 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -32,7 +32,7 @@ DTSC::Stream::Stream(unsigned int rbuffers, unsigned int bufferTime){ /// Returns the time in milliseconds of the last received packet. /// This is _not_ the time this packet was received, only the stored time. unsigned int DTSC::Stream::getTime(){ - return buffers.front()["time"].asInt(); + return buffers.rbegin()->second["time"].asInt(); } /// Attempts to parse a packet from the given std::string buffer. @@ -75,44 +75,16 @@ bool DTSC::Stream::parsePacket(std::string & buffer){ if (buffer.length() < len + 8){ return false; } - buffers.push_front(JSON::Value()); + JSON::Value newPack; + livePos newPos; unsigned int i = 0; if (version == 1){ - buffers.front() = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i); + newPack = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i); } if (version == 2){ - buffers.front() = JSON::fromDTMI2(buffer.substr(8)); + newPack = JSON::fromDTMI2(buffer.substr(8)); } - datapointertype = INVALID; - if (buffers.front().isMember("data")){ - datapointer = &(buffers.front()["data"].strVal); - }else{ - datapointer = 0; - } - std::string tmp = ""; - if (buffers.front().isMember("trackid")){ - tmp = getTrackById(buffers.front()["trackid"].asInt())["type"].asString(); - } - if (buffers.front().isMember("datatype")){ - tmp = buffers.front()["datatype"].asString(); - } - if (tmp == "video"){ - datapointertype = VIDEO; - } - if (tmp == "audio"){ - datapointertype = AUDIO; - } - if (tmp == "meta"){ - datapointertype = META; - } - if (tmp == "pause_marker"){ - datapointertype = PAUSEMARK; - } - buffer.erase(0, len + 8); - while (buffers.size() > buffercount){ - buffers.pop_back(); - } - advanceRings(); + addPacket(newPack); syncing = false; return true; } @@ -179,42 +151,15 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){ if ( !buffer.available(len + 8)){ return false; } - buffers.push_front(JSON::Value()); + JSON::Value newPack; + livePos newPos; unsigned int i = 0; std::string wholepacket = buffer.remove(len + 8); if (version == 1){ - buffers.front() = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i); + newPack = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i); } if (version == 2){ - buffers.front() = JSON::fromDTMI2(wholepacket.substr(8)); - } - datapointertype = INVALID; - if (buffers.front().isMember("data")){ - datapointer = &(buffers.front()["data"].strVal); - }else{ - datapointer = 0; - } - std::string tmp = ""; - if (buffers.front().isMember("trackid")){ - tmp = getTrackById(buffers.front()["trackid"].asInt())["type"].asString(); - } - if (buffers.front().isMember("datatype")){ - tmp = buffers.front()["datatype"].asString(); - } - if (tmp == "video"){ - datapointertype = VIDEO; - } - if (tmp == "audio"){ - datapointertype = AUDIO; - } - if (tmp == "meta"){ - datapointertype = META; - } - if (tmp == "pause_marker"){ - datapointertype = PAUSEMARK; - } - while (buffers.size() > buffercount){ - buffers.pop_back(); + newPack = JSON::fromDTMI2(wholepacket.substr(8)); } advanceRings(); syncing = false; @@ -231,6 +176,48 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){ return false; } +void DTSC::Stream::addPacket(JSON::Value & newPack){ + livePos newPos; + newPos.trackID = newPack["trackid"].asInt(); + newPos.seekTime = newPack["time"].asInt(); + buffers[newPos] = newPack; + datapointertype = INVALID; + ///\todo Save keyframes when they arrive. + if (newPack.isMember("data")){ + datapointer = &(buffers[newPos]["data"].strVal); + }else{ + datapointer = 0; + } + std::string tmp = ""; + if (newPack.isMember("trackid")){ + tmp = getTrackById(newPack["trackid"].asInt())["type"].asString(); + } + if (newPack.isMember("datatype")){ + tmp = newPack["datatype"].asString(); + } + if (tmp == "video"){ + datapointertype = VIDEO; + } + if (tmp == "audio"){ + datapointertype = AUDIO; + } + if (tmp == "meta"){ + datapointertype = META; + } + if (tmp == "pause_marker"){ + datapointertype = PAUSEMARK; + } + while (buffers.size() > buffercount){ + if (buffers.begin()->second.isMember("keyframe")){ + std::string track = trackMapping[buffers.begin()->first.trackID]; + keyframes[buffers.begin()->first.trackID].erase(buffers.begin()->first); + int keySize = metadata["tracks"][track]["keys"].size(); + metadata["tracks"][track]["keys"].shrink(keySize - 1); + } + buffers.erase(buffers.begin()); + } +} + /// Returns a direct pointer to the data attribute of the last received packet, if available. /// Returns NULL if no valid pointer or packet is available. std::string & DTSC::Stream::lastData(){ @@ -239,9 +226,9 @@ std::string & DTSC::Stream::lastData(){ /// Returns the packet in this buffer number. /// \arg num Buffer number. -JSON::Value & DTSC::Stream::getPacket(unsigned int num){ +JSON::Value & DTSC::Stream::getPacket(livePos num){ static JSON::Value empty; - if (num >= buffers.size()){ + if (buffers.find(num) == buffers.end()){ return empty; } return buffers[num]; @@ -276,9 +263,9 @@ void DTSC::Stream::setBufferTime(unsigned int ms){ } /// Returns a packed DTSC packet, ready to sent over the network. -std::string & DTSC::Stream::outPacket(unsigned int num){ +std::string & DTSC::Stream::outPacket(livePos num){ static std::string emptystring; - if (num >= buffers.size() || !buffers[num].isObject()) return emptystring; + if (buffers.find(num) == buffers.end() || !buffers[num].isObject()) return emptystring; return buffers[num].toNetPacked(); } @@ -291,37 +278,6 @@ std::string & DTSC::Stream::outHeader(){ /// Also updates the internal keyframes ring, as well as marking rings as starved if they are. /// Unsets waiting rings, updating them with their new buffer number. void DTSC::Stream::advanceRings(){ - std::deque::iterator dit; - std::set::iterator sit; - if (rings.size()){ - for (sit = rings.begin(); sit != rings.end(); sit++){ - ( *sit)->b++; - if (( *sit)->waiting){ - ( *sit)->waiting = false; - ( *sit)->b = 0; - } - if (( *sit)->starved || (( *sit)->b >= buffers.size())){ - ( *sit)->starved = true; - ( *sit)->b = 0; - } - } - } - if (keyframes.size()){ - for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ - dit->b++; - } - bool repeat; - do{ - repeat = false; - for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ - if (dit->b >= buffers.size()){ - keyframes.erase(dit); - repeat = true; - break; - } - } - }while (repeat); - } static int fragNum = 1; static unsigned int lastkeytime = 4242; if ((lastType() == VIDEO && buffers.front().isMember("keyframe")) || (!metadata.isMember("video") && buffers.front()["time"].asInt() / 2000 != lastkeytime)){ diff --git a/lib/dtsc.h b/lib/dtsc.h index 7c903516..65763097 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -133,12 +133,31 @@ namespace DTSC { }; //FileWriter + /// A simple structure used for ordering byte seek positions. + struct livePos { + bool operator < (const livePos& rhs) const { + if (seekTime < rhs.seekTime){ + return true; + }else{ + if (seekTime == rhs.seekTime){ + if (trackID < rhs.trackID){ + return true; + } + } + } + return false; + } + long long unsigned int seekTime; + unsigned int trackID; + }; + /// A part from the DTSC::Stream ringbuffer. /// Holds information about a buffer that will stay consistent class Ring{ public: Ring(unsigned int v); - volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! + volatile livePos b; + //volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill. volatile bool starved; ///< If true, this Ring can no longer receive valid data. volatile bool updated; ///< If true, this Ring should write a new header. @@ -154,7 +173,7 @@ namespace DTSC { ~Stream(); Stream(unsigned int buffers, unsigned int bufferTime = 0); JSON::Value metadata; - JSON::Value & getPacket(unsigned int num = 0); + JSON::Value & getPacket(livePos num); JSON::Value & getTrackById(int trackNo); datatype lastType(); std::string & lastData(); @@ -162,7 +181,7 @@ namespace DTSC { bool hasAudio(); bool parsePacket(std::string & buffer); bool parsePacket(Socket::Buffer & buffer); - std::string & outPacket(unsigned int num); + std::string & outPacket(livePos num); std::string & outHeader(); Ring * getRing(); unsigned int getTime(); @@ -174,11 +193,11 @@ namespace DTSC { unsigned int frameSeek(unsigned int frameno); void setBufferTime(unsigned int ms); private: - std::deque buffers; - std::set rings; - std::deque keyframes; + std::map buffers; + std::map > keyframes; void advanceRings(); void updateRingHeaders(); + void addPacket(JSON::Value & newPack); std::string * datapointer; datatype datapointertype; unsigned int buffercount;