Refactored DTSC::Stream to prepare for proper inheritance in the Buffer process.

This commit is contained in:
Thulinma 2013-11-18 13:13:53 +01:00
parent 7d5942adf1
commit 12dd880bc1
2 changed files with 44 additions and 66 deletions

View file

@ -27,6 +27,10 @@ DTSC::Stream::Stream(unsigned int rbuffers, unsigned int bufferTime){
buffertime = bufferTime;
}
/// This function does nothing, it's supposed to be overridden.
/// It will be called right before a buffer position is deleted.
void DTSC::Stream::deletionCallback(livePos deleting){}
/// Returns the time in milliseconds of the last received packet.
/// This is _not_ the time this packet was received, only the stored time.
unsigned int DTSC::Stream::getTime(){
@ -213,8 +217,16 @@ void DTSC::Stream::waitForMeta(Socket::Connection & sourceSocket){
}
}
/// Resets the stream by clearing the buffers and keyframes, making sure to call the deletionCallback first.
void DTSC::Stream::resetStream(){
for (std::map<livePos, JSON::Value >::iterator it = buffers.begin(); it != buffers.end(); it++){
deletionCallback(it->first);
}
buffers.clear();
keyframes.clear();
}
void DTSC::Stream::addPacket(JSON::Value & newPack){
bool updateMeta = false;
long long unsigned int now = Util::getMS();
livePos newPos;
newPos.trackID = newPack["trackid"].asInt();
@ -223,15 +235,13 @@ void DTSC::Stream::addPacket(JSON::Value & newPack){
livePos lastPos = buffers.rbegin()->first;
if (newPos < lastPos){
if ((lastPos.seekTime > 1000) && newPos.seekTime < lastPos.seekTime - 1000){
metadata["reset"] = 1LL;
buffers.clear();
keyframes.clear();
resetStream();
}else{
newPos.seekTime = lastPos.seekTime+1;
}
}
}else{
buffers.clear();
resetStream();
}
std::string newTrack = trackMapping[newPos.trackID];
while (buffers.count(newPos) > 0){
@ -265,7 +275,6 @@ void DTSC::Stream::addPacket(JSON::Value & newPack){
if (buffercount > 1){
#define prevKey metadata["tracks"][newTrack]["keys"][keySize - 1]
if (newPack.isMember("keyframe") || !keySize || (datapointertype != VIDEO && newPack["time"].asInt() - 5000 > prevKey["time"].asInt())){
updateMeta = true;
metadata["tracks"][newTrack]["lastms"] = newPack["time"];
keyframes[newPos.trackID].insert(newPos);
JSON::Value key;
@ -337,37 +346,39 @@ void DTSC::Stream::addPacket(JSON::Value & newPack){
buffercount = buffers.size();
if (buffercount < 2){buffercount = 2;}
}
if (updateMeta && metadata["buffer_window"].asInt() < timeBuffered){
if (metadata["buffer_window"].asInt() < timeBuffered){
metadata["buffer_window"] = (long long int)timeBuffered;
}
}
while (buffers.size() > buffercount){
if (buffercount > 1 && keyframes[buffers.begin()->first.trackID].count(buffers.begin()->first)){
updateMeta = true;
//if there are < 3 keyframes, throwing one away would mean less than 2 left.
if (keyframes[buffers.begin()->first.trackID].size() < 3){
std::cerr << "Warning - track " << buffers.begin()->first.trackID << " doesn't have enough keyframes to be reliably served." << std::endl;
}
std::string track = trackMapping[buffers.begin()->first.trackID];
keyframes[buffers.begin()->first.trackID].erase(buffers.begin()->first);
int keySize = metadata["tracks"][track]["keys"].size();
metadata["tracks"][track]["keys"].shrink(keySize - 1);
if (metadata["tracks"][track]["frags"].size() > 0){
// delete fragments of which the beginning can no longer be reached
while (metadata["tracks"][track]["frags"].size() > 0 && metadata["tracks"][track]["frags"][0u]["num"].asInt() < metadata["tracks"][track]["keys"][0u]["num"].asInt()){
metadata["tracks"][track]["firstms"] = metadata["tracks"][track]["firstms"].asInt() + metadata["tracks"][track]["frags"][0u]["dur"].asInt();
metadata["tracks"][track]["frags"].shrink(metadata["tracks"][track]["frags"].size() - 1);
// increase the missed fragments counter
metadata["tracks"][track]["missed_frags"] = metadata["tracks"][track]["missed_frags"].asInt() + 1;
}
cutOneBuffer();
}
}
/// Deletes a the first part of the buffer, updating the keyframes list and metadata as required.
/// Will print a warning to std::cerr if a track has less than 2 keyframes left because of this.
void DTSC::Stream::cutOneBuffer(){
if (buffercount > 1 && keyframes[buffers.begin()->first.trackID].count(buffers.begin()->first)){
//if there are < 3 keyframes, throwing one away would mean less than 2 left.
if (keyframes[buffers.begin()->first.trackID].size() < 3){
std::cerr << "Warning - track " << buffers.begin()->first.trackID << " doesn't have enough keyframes to be reliably served." << std::endl;
}
std::string track = trackMapping[buffers.begin()->first.trackID];
keyframes[buffers.begin()->first.trackID].erase(buffers.begin()->first);
int keySize = metadata["tracks"][track]["keys"].size();
metadata["tracks"][track]["keys"].shrink(keySize - 1);
if (metadata["tracks"][track]["frags"].size() > 0){
// delete fragments of which the beginning can no longer be reached
while (metadata["tracks"][track]["frags"].size() > 0 && metadata["tracks"][track]["frags"][0u]["num"].asInt() < metadata["tracks"][track]["keys"][0u]["num"].asInt()){
metadata["tracks"][track]["firstms"] = metadata["tracks"][track]["firstms"].asInt() + metadata["tracks"][track]["frags"][0u]["dur"].asInt();
metadata["tracks"][track]["frags"].shrink(metadata["tracks"][track]["frags"].size() - 1);
// increase the missed fragments counter
metadata["tracks"][track]["missed_frags"] = metadata["tracks"][track]["missed_frags"].asInt() + 1;
}
}
buffers.erase(buffers.begin());
}
if (updateMeta){
//metadata.netPrepare();
}
buffers.erase(buffers.begin());
}
/// Returns a direct pointer to the data attribute of the last received packet, if available.

View file

@ -13,42 +13,6 @@
#include "socket.h"
#include "timing.h"
/// Holds all DDVTECH Stream Container classes and parsers.
///length (int, length in seconds, if available)
///video:
/// - codec (string: H264, H263, VP6)
/// - width (int, pixels)
/// - height (int, pixels)
/// - fpks (int, frames per kilosecond (FPS * 1000))
/// - bps (int, bytes per second)
/// - init (string, init data)
/// - keycount (int, count of keyframes)
/// - keyms (int, average ms per keyframe)
/// - keyvar (int, max ms per keyframe variance)
/// - keys (array of byte position ints - first is first keyframe, last is last keyframe, in between have ~equal spacing)
///
///audio:
/// - codec (string: AAC, MP3)
/// - rate (int, Hz)
/// - size (int, bitsize)
/// - bps (int, bytes per second)
/// - channels (int, channelcount)
/// - init (string, init data)
///
///All packets:
/// - datatype (string: audio, video, meta (unused))
/// - data (string: data)
/// - time (int: ms into video)
///
///Video packets:
/// - keyframe (int, if set, is a seekable keyframe)
/// - interframe (int, if set, is a non-seekable interframe)
/// - disposableframe (int, if set, is a disposable interframe)
///
///H264 video packets:
/// - nalu (int, if set, is a nalu)
/// - nalu_end (int, if set, is a end-of-sequence)
/// - offset (int, unsigned version of signed int! Holds the ms offset between timestamp and proper display time for B-frames)
namespace DTSC {
bool isFixed(JSON::Value & metadata);
@ -215,7 +179,9 @@ namespace DTSC {
DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks);
void endStream();
void waitForMeta(Socket::Connection & sourceSocket);
private:
protected:
void cutOneBuffer();
void resetStream();
std::map<livePos,JSON::Value> buffers;
std::map<int,std::set<livePos> > keyframes;
void addPacket(JSON::Value & newPack);
@ -223,5 +189,6 @@ namespace DTSC {
unsigned int buffercount;
unsigned int buffertime;
std::map<int,std::string> trackMapping;
void deletionCallback(livePos deleting);
};
}