initial (broken) commit

This commit is contained in:
Erik Zandvliet 2013-06-17 13:32:50 +02:00 committed by Thulinma
parent 0c65ba87df
commit ae0da6955c
2 changed files with 81 additions and 106 deletions

View file

@ -32,7 +32,7 @@ DTSC::Stream::Stream(unsigned int rbuffers, unsigned int bufferTime){
/// Returns the time in milliseconds of the last received packet.
/// This is _not_ the time this packet was received, only the stored time.
unsigned int DTSC::Stream::getTime(){
return buffers.front()["time"].asInt();
return buffers.rbegin()->second["time"].asInt();
}
/// Attempts to parse a packet from the given std::string buffer.
@ -75,44 +75,16 @@ bool DTSC::Stream::parsePacket(std::string & buffer){
if (buffer.length() < len + 8){
return false;
}
buffers.push_front(JSON::Value());
JSON::Value newPack;
livePos newPos;
unsigned int i = 0;
if (version == 1){
buffers.front() = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i);
newPack = JSON::fromDTMI((unsigned char*)buffer.c_str() + 8, len, i);
}
if (version == 2){
buffers.front() = JSON::fromDTMI2(buffer.substr(8));
newPack = JSON::fromDTMI2(buffer.substr(8));
}
datapointertype = INVALID;
if (buffers.front().isMember("data")){
datapointer = &(buffers.front()["data"].strVal);
}else{
datapointer = 0;
}
std::string tmp = "";
if (buffers.front().isMember("trackid")){
tmp = getTrackById(buffers.front()["trackid"].asInt())["type"].asString();
}
if (buffers.front().isMember("datatype")){
tmp = buffers.front()["datatype"].asString();
}
if (tmp == "video"){
datapointertype = VIDEO;
}
if (tmp == "audio"){
datapointertype = AUDIO;
}
if (tmp == "meta"){
datapointertype = META;
}
if (tmp == "pause_marker"){
datapointertype = PAUSEMARK;
}
buffer.erase(0, len + 8);
while (buffers.size() > buffercount){
buffers.pop_back();
}
advanceRings();
addPacket(newPack);
syncing = false;
return true;
}
@ -179,42 +151,15 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){
if ( !buffer.available(len + 8)){
return false;
}
buffers.push_front(JSON::Value());
JSON::Value newPack;
livePos newPos;
unsigned int i = 0;
std::string wholepacket = buffer.remove(len + 8);
if (version == 1){
buffers.front() = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i);
newPack = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i);
}
if (version == 2){
buffers.front() = JSON::fromDTMI2(wholepacket.substr(8));
}
datapointertype = INVALID;
if (buffers.front().isMember("data")){
datapointer = &(buffers.front()["data"].strVal);
}else{
datapointer = 0;
}
std::string tmp = "";
if (buffers.front().isMember("trackid")){
tmp = getTrackById(buffers.front()["trackid"].asInt())["type"].asString();
}
if (buffers.front().isMember("datatype")){
tmp = buffers.front()["datatype"].asString();
}
if (tmp == "video"){
datapointertype = VIDEO;
}
if (tmp == "audio"){
datapointertype = AUDIO;
}
if (tmp == "meta"){
datapointertype = META;
}
if (tmp == "pause_marker"){
datapointertype = PAUSEMARK;
}
while (buffers.size() > buffercount){
buffers.pop_back();
newPack = JSON::fromDTMI2(wholepacket.substr(8));
}
advanceRings();
syncing = false;
@ -231,6 +176,48 @@ bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){
return false;
}
void DTSC::Stream::addPacket(JSON::Value & newPack){
livePos newPos;
newPos.trackID = newPack["trackid"].asInt();
newPos.seekTime = newPack["time"].asInt();
buffers[newPos] = newPack;
datapointertype = INVALID;
///\todo Save keyframes when they arrive.
if (newPack.isMember("data")){
datapointer = &(buffers[newPos]["data"].strVal);
}else{
datapointer = 0;
}
std::string tmp = "";
if (newPack.isMember("trackid")){
tmp = getTrackById(newPack["trackid"].asInt())["type"].asString();
}
if (newPack.isMember("datatype")){
tmp = newPack["datatype"].asString();
}
if (tmp == "video"){
datapointertype = VIDEO;
}
if (tmp == "audio"){
datapointertype = AUDIO;
}
if (tmp == "meta"){
datapointertype = META;
}
if (tmp == "pause_marker"){
datapointertype = PAUSEMARK;
}
while (buffers.size() > buffercount){
if (buffers.begin()->second.isMember("keyframe")){
std::string track = trackMapping[buffers.begin()->first.trackID];
keyframes[buffers.begin()->first.trackID].erase(buffers.begin()->first);
int keySize = metadata["tracks"][track]["keys"].size();
metadata["tracks"][track]["keys"].shrink(keySize - 1);
}
buffers.erase(buffers.begin());
}
}
/// Returns a direct pointer to the data attribute of the last received packet, if available.
/// Returns NULL if no valid pointer or packet is available.
std::string & DTSC::Stream::lastData(){
@ -239,9 +226,9 @@ std::string & DTSC::Stream::lastData(){
/// Returns the packet in this buffer number.
/// \arg num Buffer number.
JSON::Value & DTSC::Stream::getPacket(unsigned int num){
JSON::Value & DTSC::Stream::getPacket(livePos num){
static JSON::Value empty;
if (num >= buffers.size()){
if (buffers.find(num) == buffers.end()){
return empty;
}
return buffers[num];
@ -276,9 +263,9 @@ void DTSC::Stream::setBufferTime(unsigned int ms){
}
/// Returns a packed DTSC packet, ready to sent over the network.
std::string & DTSC::Stream::outPacket(unsigned int num){
std::string & DTSC::Stream::outPacket(livePos num){
static std::string emptystring;
if (num >= buffers.size() || !buffers[num].isObject()) return emptystring;
if (buffers.find(num) == buffers.end() || !buffers[num].isObject()) return emptystring;
return buffers[num].toNetPacked();
}
@ -291,37 +278,6 @@ std::string & DTSC::Stream::outHeader(){
/// Also updates the internal keyframes ring, as well as marking rings as starved if they are.
/// Unsets waiting rings, updating them with their new buffer number.
void DTSC::Stream::advanceRings(){
std::deque<DTSC::Ring>::iterator dit;
std::set<DTSC::Ring *>::iterator sit;
if (rings.size()){
for (sit = rings.begin(); sit != rings.end(); sit++){
( *sit)->b++;
if (( *sit)->waiting){
( *sit)->waiting = false;
( *sit)->b = 0;
}
if (( *sit)->starved || (( *sit)->b >= buffers.size())){
( *sit)->starved = true;
( *sit)->b = 0;
}
}
}
if (keyframes.size()){
for (dit = keyframes.begin(); dit != keyframes.end(); dit++){
dit->b++;
}
bool repeat;
do{
repeat = false;
for (dit = keyframes.begin(); dit != keyframes.end(); dit++){
if (dit->b >= buffers.size()){
keyframes.erase(dit);
repeat = true;
break;
}
}
}while (repeat);
}
static int fragNum = 1;
static unsigned int lastkeytime = 4242;
if ((lastType() == VIDEO && buffers.front().isMember("keyframe")) || (!metadata.isMember("video") && buffers.front()["time"].asInt() / 2000 != lastkeytime)){

View file

@ -133,12 +133,31 @@ namespace DTSC {
};
//FileWriter
/// A simple structure used for ordering byte seek positions.
struct livePos {
bool operator < (const livePos& rhs) const {
if (seekTime < rhs.seekTime){
return true;
}else{
if (seekTime == rhs.seekTime){
if (trackID < rhs.trackID){
return true;
}
}
}
return false;
}
long long unsigned int seekTime;
unsigned int trackID;
};
/// A part from the DTSC::Stream ringbuffer.
/// Holds information about a buffer that will stay consistent
class Ring{
public:
Ring(unsigned int v);
volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly!
volatile livePos b;
//volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly!
volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill.
volatile bool starved; ///< If true, this Ring can no longer receive valid data.
volatile bool updated; ///< If true, this Ring should write a new header.
@ -154,7 +173,7 @@ namespace DTSC {
~Stream();
Stream(unsigned int buffers, unsigned int bufferTime = 0);
JSON::Value metadata;
JSON::Value & getPacket(unsigned int num = 0);
JSON::Value & getPacket(livePos num);
JSON::Value & getTrackById(int trackNo);
datatype lastType();
std::string & lastData();
@ -162,7 +181,7 @@ namespace DTSC {
bool hasAudio();
bool parsePacket(std::string & buffer);
bool parsePacket(Socket::Buffer & buffer);
std::string & outPacket(unsigned int num);
std::string & outPacket(livePos num);
std::string & outHeader();
Ring * getRing();
unsigned int getTime();
@ -174,11 +193,11 @@ namespace DTSC {
unsigned int frameSeek(unsigned int frameno);
void setBufferTime(unsigned int ms);
private:
std::deque<JSON::Value> buffers;
std::set<DTSC::Ring *> rings;
std::deque<DTSC::Ring> keyframes;
std::map<livePos,JSON::Value> buffers;
std::map<int,std::set<livePos> > keyframes;
void advanceRings();
void updateRingHeaders();
void addPacket(JSON::Value & newPack);
std::string * datapointer;
datatype datapointertype;
unsigned int buffercount;