From 8ddc1ab884d626ebe79db1ae8e4ef13a3d72e4cc Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Wed, 19 Jun 2013 11:06:56 +0200 Subject: [PATCH] Edits for live multibitrate --- lib/dtsc.cpp | 164 ++++++++++++++++----------------------------------- lib/dtsc.h | 28 ++++++--- 2 files changed, 71 insertions(+), 121 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index e319cd66..cb212c65 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -76,7 +76,6 @@ bool DTSC::Stream::parsePacket(std::string & buffer){ return false; } JSON::Value newPack; - livePos newPos; unsigned int i = 0; if (version == 1){ newPack = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i); @@ -152,7 +151,6 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){ return false; } JSON::Value newPack; - livePos newPos; unsigned int i = 0; std::string wholepacket = buffer.remove(len + 8); if (version == 1){ @@ -161,7 +159,7 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){ if (version == 2){ newPack = JSON::fromDTMI2(wholepacket.substr(8)); } - advanceRings(); + addPacket(newPack); syncing = false; return true; } @@ -207,6 +205,25 @@ void DTSC::Stream::addPacket(JSON::Value & newPack){ if (tmp == "pause_marker"){ datapointertype = PAUSEMARK; } + int keySize = metadata["tracks"][trackMapping[newPos.trackID]]["keys"].size(); + if (newPack.isMember("keyframe") || (keySize && ((newPack["time"].asInt() - 2000) > metadata["tracks"][trackMapping[newPos.trackID]]["keys"][keySize - 1]["time"].asInt())) || (!keySize)){ + keyframes[newPos.trackID].insert(newPos); + JSON::Value key; + key["time"] = newPack["time"]; + if (keySize){ + key["num"] = (*(metadata["tracks"][trackMapping[newPos.trackID]]["keys"].ArrEnd()--))["num"].asInt() + 1; + }else{ + key["num"] = 1; + } + } + unsigned int timeBuffered = 0; + if (keySize > 1){ + //increase buffer size if no keyframes available or too little time available + timeBuffered = (buffers.end()--)->second["time"].asInt() - buffers.begin()->second["time"].asInt(); + } + if (buffercount > 1 && (keyframes.size() < 2 || timeBuffered < buffertime)){ + buffercount++; + } while (buffers.size() > buffercount){ if (buffers.begin()->second.isMember("keyframe")){ std::string track = trackMapping[buffers.begin()->first.trackID]; @@ -234,6 +251,10 @@ JSON::Value & DTSC::Stream::getPacket(livePos num){ return buffers[num]; } +JSON::Value & DTSC::Stream::getPacket(){ + return buffers.begin()->second; +} + /// Returns a track element by giving the id. JSON::Value & DTSC::Stream::getTrackById(int trackNo){ static JSON::Value empty; @@ -262,6 +283,14 @@ void DTSC::Stream::setBufferTime(unsigned int ms){ buffertime = ms; } +std::string & DTSC::Stream::outPacket(){ + static std::string emptystring; + if (!buffers.size() || !buffers.begin()->second.isObject()){ + return emptystring; + } + return buffers.begin()->second.toNetPacked(); +} + /// Returns a packed DTSC packet, ready to sent over the network. std::string & DTSC::Stream::outPacket(livePos num){ static std::string emptystring; @@ -274,32 +303,9 @@ std::string & DTSC::Stream::outHeader(){ return metadata.toNetPacked(); } -/// advances all given out and internal Ring classes to point to the new buffer, after one has been added. -/// Also updates the internal keyframes ring, as well as marking rings as starved if they are. -/// Unsets waiting rings, updating them with their new buffer number. -void DTSC::Stream::advanceRings(){ - static int fragNum = 1; - static unsigned int lastkeytime = 4242; - if ((lastType() == VIDEO && buffers.front().isMember("keyframe")) || (!metadata.isMember("video") && buffers.front()["time"].asInt() / 2000 != lastkeytime)){ - keyframes.push_front(DTSC::Ring(0)); - if ( !buffers.front().isMember("fragnum")){ - buffers.front()["fragnum"] = fragNum++; - } - lastkeytime = buffers.front()["time"].asInt() / 2000; - } - unsigned int timeBuffered = 0; - if (keyframes.size() > 1){ - //increase buffer size if no keyframes available or too little time available - timeBuffered = buffers[keyframes[0].b]["time"].asInt() - buffers[keyframes[keyframes.size() - 1].b]["time"].asInt(); - } - if (buffercount > 1 && (keyframes.size() < 2 || timeBuffered < buffertime)){ - buffercount++; - } -} - /// Constructs a new Ring, at the given buffer position. /// \arg v Position for buffer. -DTSC::Ring::Ring(unsigned int v){ +DTSC::Ring::Ring(livePos v){ b = v; waiting = false; starved = false; @@ -311,42 +317,23 @@ DTSC::Ring::Ring(unsigned int v){ /// This Ring will be kept updated so it always points to valid data or has the starved boolean set. /// Don't forget to call dropRing() for all requested Ring classes that are no longer neccessary! DTSC::Ring * DTSC::Stream::getRing(){ - DTSC::Ring * tmp; - if (keyframes.size() == 0){ - tmp = new DTSC::Ring(0); - }else{ - tmp = new DTSC::Ring(keyframes[0].b); - } - rings.insert(tmp); - return tmp; + return new DTSC::Ring(buffers.begin()->first); } /// Deletes a given out Ring class from memory and internal Ring list. /// Checks for NULL pointers and invalid pointers, silently discarding them. void DTSC::Stream::dropRing(DTSC::Ring * ptr){ - if (rings.find(ptr) != rings.end()){ - rings.erase(ptr); - delete ptr; - } } /// Updates the headers for a live stream, keeping track of all available /// keyframes and their media times. The function MAY NOT be run at any other /// time than right after receiving a new keyframe, or there'll be raptors. void DTSC::Stream::updateHeaders(){ - if (keyframes.size() > 2){ - if (buffers[keyframes[0].b]["time"].asInt() < buffers[keyframes[keyframes.size() - 1].b]["time"].asInt()){ + if (buffers.size() > 2){ + if (buffers.begin()->second["time"].asInt() < (buffers.end()--)->second["time"].asInt()){ std::cerr << "Detected new video - resetting all buffers and metadata - hold on, this ride might get bumpy!" << std::endl; keyframes.clear(); buffers.clear(); - std::set::iterator sit; - if (rings.size()){ - for (sit = rings.begin(); sit != rings.end(); sit++){ - ( *sit)->updated = true; - ( *sit)->b = 0; - ( *sit)->starved = true; - } - } metadata.removeMember("keytime"); metadata.removeMember("keynum"); metadata.removeMember("keylen"); @@ -356,12 +343,9 @@ void DTSC::Stream::updateHeaders(){ metadata.netPrepare(); return; } - metadata["keytime"].shrink(keyframes.size() - 2); - metadata["keynum"].shrink(keyframes.size() - 2); - metadata["keylen"].shrink(keyframes.size() - 2); - metadata["keytime"].append(buffers[keyframes[1].b]["time"].asInt()); - metadata["keynum"].append(buffers[keyframes[1].b]["fragnum"].asInt()); - metadata["keylen"].append(buffers[keyframes[0].b]["time"].asInt() - buffers[keyframes[1].b]["time"].asInt()); + for (JSON::ObjIter trIt = metadata["tracks"].ObjBegin(); trIt != metadata["tracks"].ObjEnd(); trIt++){ + trIt->second["keys"].shrink(keyframes[trIt->second["trackid"].asInt()].size() - 2); + } unsigned int fragStart = 0; if ( !metadata["frags"]){ // this means that if we have < ~10 seconds in the buffer, fragmenting goes horribly wrong. @@ -410,87 +394,43 @@ void DTSC::Stream::updateHeaders(){ } } } - metadata["lastms"] = buffers[keyframes[0].b]["time"].asInt(); + //metadata["lastms"] = buffers[keyframes[0].b]["time"].asInt(); metadata["buffer_window"] = (long long int)buffertime; metadata["live"] = true; metadata.netPrepare(); - updateRingHeaders(); - } -} - -void DTSC::Stream::updateRingHeaders(){ - std::set::iterator sit; - if ( !rings.size()){ - return; - } - for (sit = rings.begin(); sit != rings.end(); sit++){ - ( *sit)->updated = true; } } /// Returns 0 if seeking is possible, -1 if the wanted frame is too old, 1 if the wanted frame is too new. int DTSC::Stream::canSeekms(unsigned int ms){ - if ( !metadata["keytime"].size()){ + if ( !buffers.size()){ return 1; } - if (ms > metadata["keytime"][metadata["keytime"].size() - 1].asInt()){ + if (ms > (buffers.end()--)->second["time"].asInt()){ return 1; } - if (ms < metadata["keytime"][0u].asInt()){ + if (ms < buffers.begin()->second["time"].asInt()){ return -1; } return 0; } -/// Returns 0 if seeking is possible, -1 if the wanted frame is too old, 1 if the wanted frame is too new. -int DTSC::Stream::canSeekFrame(unsigned int frameno){ - if ( !metadata["keynum"].size()){ - return 1; - } - if (frameno > metadata["keynum"][metadata["keynum"].size() - 1].asInt()){ - return 1; - } - if (frameno < metadata["keynum"][0u].asInt()){ - return -1; - } - return 0; -} - -unsigned int DTSC::Stream::msSeek(unsigned int ms){ - if (ms > buffers[keyframes[0u].b]["time"].asInt()){ - std::cerr << "Warning: seeking past ingest! (" << ms << "ms > " << buffers[keyframes[0u].b]["time"].asInt() << "ms)" << std::endl; - return keyframes[0u].b; - } - for (std::deque::iterator it = keyframes.begin(); it != keyframes.end(); it++){ - if (buffers[it->b]["time"].asInt() <= ms){ - return it->b; +DTSC::livePos DTSC::Stream::msSeek(unsigned int ms, std::set allowedTracks){ + livePos result = buffers.begin()->first; + for (std::map::iterator bIt = buffers.begin(); bIt != buffers.end(); bIt++){ + if (allowedTracks.find(bIt->first.trackID) != allowedTracks.end()){ + if (bIt->first.seekTime > ms){ + break; + } + result = bIt->first; } } - std::cerr << "Warning: seeking past buffer size! (" << ms << "ms < " << buffers[keyframes[keyframes.size() - 1].b]["time"].asInt() << "ms)" << std::endl; - return keyframes[keyframes.size() - 1].b; -} - -unsigned int DTSC::Stream::frameSeek(unsigned int frameno){ - if (frameno > buffers[keyframes[0u].b]["fragnum"].asInt()){ - std::cerr << "Warning: seeking past ingest! (F" << frameno << " > F" << buffers[keyframes[0u].b]["fragnum"].asInt() << ")" << std::endl; - return keyframes[0u].b; - } - for (std::deque::iterator it = keyframes.begin(); it != keyframes.end(); it++){ - if (buffers[it->b]["fragnum"].asInt() == frameno){ - return it->b; - } - } - std::cerr << "Warning: seeking past buffer size! (F" << frameno << " < F" << buffers[keyframes[keyframes.size() - 1].b]["fragnum"].asInt() << ")" << std::endl; - return keyframes[keyframes.size() - 1].b; + return result; } /// Properly cleans up the object for erasing. /// Drops all Ring classes that have been given out. DTSC::Stream::~Stream(){ - std::set::iterator sit; - for (sit = rings.begin(); sit != rings.end(); sit++){ - delete ( *sit); - } } DTSC::File::File(){ diff --git a/lib/dtsc.h b/lib/dtsc.h index 65763097..3388a288 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -135,6 +135,18 @@ namespace DTSC { /// A simple structure used for ordering byte seek positions. struct livePos { + livePos(){ + seekTime = 0; + trackID = 0; + } + livePos(const livePos & rhs){ + seekTime = rhs.seekTime; + trackID = rhs.trackID; + } + void operator = (const livePos& rhs) { + seekTime = rhs.seekTime; + trackID = rhs.trackID; + } bool operator < (const livePos& rhs) const { if (seekTime < rhs.seekTime){ return true; @@ -147,16 +159,16 @@ namespace DTSC { } return false; } - long long unsigned int seekTime; - unsigned int trackID; + volatile long long unsigned int seekTime; + volatile unsigned int trackID; }; /// A part from the DTSC::Stream ringbuffer. /// Holds information about a buffer that will stay consistent class Ring{ public: - Ring(unsigned int v); - volatile livePos b; + Ring(livePos v); + livePos b; //volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill. volatile bool starved; ///< If true, this Ring can no longer receive valid data. @@ -173,6 +185,7 @@ namespace DTSC { ~Stream(); Stream(unsigned int buffers, unsigned int bufferTime = 0); JSON::Value metadata; + JSON::Value & getPacket(); JSON::Value & getPacket(livePos num); JSON::Value & getTrackById(int trackNo); datatype lastType(); @@ -181,6 +194,7 @@ namespace DTSC { bool hasAudio(); bool parsePacket(std::string & buffer); bool parsePacket(Socket::Buffer & buffer); + std::string & outPacket(); std::string & outPacket(livePos num); std::string & outHeader(); Ring * getRing(); @@ -188,15 +202,11 @@ namespace DTSC { void dropRing(Ring * ptr); void updateHeaders(); int canSeekms(unsigned int ms); - int canSeekFrame(unsigned int frameno); - unsigned int msSeek(unsigned int ms); - unsigned int frameSeek(unsigned int frameno); + livePos msSeek(unsigned int ms, std::set allowedTracks); void setBufferTime(unsigned int ms); private: std::map buffers; std::map > keyframes; - void advanceRings(); - void updateRingHeaders(); void addPacket(JSON::Value & newPack); std::string * datapointer; datatype datapointertype;