Various optimalisations to improve performance. More coming soon.

This commit is contained in:
Thulinma 2012-09-16 01:17:08 +02:00
parent 24a3bcd8db
commit 4140b04608
7 changed files with 243 additions and 50 deletions

View file

@ -86,6 +86,59 @@ bool DTSC::Stream::parsePacket(std::string & buffer){
return false; return false;
} }
/// Attempts to parse a packet from the given Socket::Buffer.
/// Returns true if successful, removing the parsed part from the buffer.
/// Returns false if invalid or not enough data is in the buffer.
/// \arg buffer The Socket::Buffer to attempt to parse.
bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){
uint32_t len;
static bool syncing = false;
if (buffer.available(8)){
std::string header_bytes = buffer.copy(8);
if (memcmp(header_bytes.c_str(), DTSC::Magic_Header, 4) == 0){
len = ntohl(((uint32_t *)header_bytes.c_str())[1]);
if (!buffer.available(len+8)){return false;}
unsigned int i = 0;
std::string wholepacket = buffer.remove(len+8);
metadata = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i);
return false;
}
if (memcmp(header_bytes.c_str(), DTSC::Magic_Packet, 4) == 0){
len = ntohl(((uint32_t *)header_bytes.c_str())[1]);
if (!buffer.available(len+8)){return false;}
buffers.push_front(JSON::Value());
unsigned int i = 0;
std::string wholepacket = buffer.remove(len+8);
buffers.front() = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i);
datapointertype = INVALID;
if (buffers.front().isMember("data")){
datapointer = &(buffers.front()["data"].strVal);
}else{
datapointer = 0;
}
if (buffers.front().isMember("datatype")){
std::string tmp = buffers.front()["datatype"].asString();
if (tmp == "video"){datapointertype = VIDEO;}
if (tmp == "audio"){datapointertype = AUDIO;}
if (tmp == "meta"){datapointertype = META;}
if (tmp == "pause_marker"){datapointertype = PAUSEMARK;}
}
while (buffers.size() > buffercount){buffers.pop_back();}
advanceRings();
syncing = false;
return true;
}
#if DEBUG >= 2
if (!syncing){
std::cerr << "Error: Invalid DTMI data detected - syncing" << std::endl;
syncing = true;
}
#endif
buffer.get().clear();
}
return false;
}
/// Returns a direct pointer to the data attribute of the last received packet, if available. /// Returns a direct pointer to the data attribute of the last received packet, if available.
/// Returns NULL if no valid pointer or packet is available. /// Returns NULL if no valid pointer or packet is available.
std::string & DTSC::Stream::lastData(){ std::string & DTSC::Stream::lastData(){
@ -295,7 +348,7 @@ void DTSC::File::seekNext(){
if (frames[currframe] != pos){ if (frames[currframe] != pos){
currframe++; currframe++;
currtime = jsonbuffer["time"].asInt(); currtime = jsonbuffer["time"].asInt();
#if DEBUG >= 4 #if DEBUG >= 6
if (frames[currframe] != pos){ if (frames[currframe] != pos){
std::cerr << "Found a new frame " << currframe << " @ " << pos << "b/" << currtime << "ms" << std::endl; std::cerr << "Found a new frame " << currframe << " @ " << pos << "b/" << currtime << "ms" << std::endl;
}else{ }else{

View file

@ -10,6 +10,7 @@
#include <set> #include <set>
#include <stdio.h> //for FILE #include <stdio.h> //for FILE
#include "json.h" #include "json.h"
#include "socket.h"
@ -113,6 +114,7 @@ namespace DTSC{
bool hasVideo(); bool hasVideo();
bool hasAudio(); bool hasAudio();
bool parsePacket(std::string & buffer); bool parsePacket(std::string & buffer);
bool parsePacket(Socket::Buffer & buffer);
std::string & outPacket(unsigned int num); std::string & outPacket(unsigned int num);
std::string & outHeader(); std::string & outHeader();
Ring * getRing(); Ring * getRing();

View file

@ -169,7 +169,8 @@ int FTP::User::ParseCommand( std::string Command ) {
fprintf( stderr, "Reading STOR information\n" ); fprintf( stderr, "Reading STOR information\n" );
std::string Buffer; std::string Buffer;
while( Connected.spool() ) { } while( Connected.spool() ) { }
Buffer = Connected.Received(); /// \todo Comment me back in. ^_^
//Buffer = Connected.Received();
MyDir.STOR( Command, Buffer ); MyDir.STOR( Command, Buffer );
return 250; return 250;
break; break;

View file

@ -133,6 +133,7 @@ bool HTTP::Parser::Read(std::string & strbuf){
return parse(strbuf); return parse(strbuf);
}//HTTPReader::Read }//HTTPReader::Read
#include <iostream>
/// Attempt to read a whole HTTP response or request from a data buffer. /// Attempt to read a whole HTTP response or request from a data buffer.
/// If succesful, fills its own fields with the proper data and removes the response/request /// If succesful, fills its own fields with the proper data and removes the response/request
/// from the data buffer. /// from the data buffer.
@ -141,13 +142,17 @@ bool HTTP::Parser::Read(std::string & strbuf){
bool HTTP::Parser::parse(std::string & HTTPbuffer){ bool HTTP::Parser::parse(std::string & HTTPbuffer){
size_t f; size_t f;
std::string tmpA, tmpB, tmpC; std::string tmpA, tmpB, tmpC;
/// \todo Make this not resize HTTPbuffer in parts, but read all at once and then remove the entire request, like doxygen claims it does. /// \todo Make this not resize HTTPbuffer in parts, but read all at once and then remove the entire request, like doxygen claims it does?
while (!HTTPbuffer.empty()){ while (!HTTPbuffer.empty()){
if (!seenHeaders){ if (!seenHeaders){
f = HTTPbuffer.find('\n'); f = HTTPbuffer.find('\n');
if (f == std::string::npos) return false; if (f == std::string::npos) return false;
tmpA = HTTPbuffer.substr(0, f); tmpA = HTTPbuffer.substr(0, f);
if (f+1 == HTTPbuffer.size()){
HTTPbuffer.clear();
}else{
HTTPbuffer.erase(0, f+1); HTTPbuffer.erase(0, f+1);
}
while (tmpA.find('\r') != std::string::npos){tmpA.erase(tmpA.find('\r'));} while (tmpA.find('\r') != std::string::npos){tmpA.erase(tmpA.find('\r'));}
if (!seenReq){ if (!seenReq){
seenReq = true; seenReq = true;
@ -166,7 +171,11 @@ bool HTTP::Parser::parse(std::string & HTTPbuffer){
}else{ }else{
if (tmpA.size() == 0){ if (tmpA.size() == 0){
seenHeaders = true; seenHeaders = true;
if (GetHeader("Content-Length") != ""){length = atoi(GetHeader("Content-Length").c_str());} body.clear();
if (GetHeader("Content-Length") != ""){
length = atoi(GetHeader("Content-Length").c_str());
if (body.capacity() < length){body.reserve(length);}
}
}else{ }else{
f = tmpA.find(':'); f = tmpA.find(':');
if (f == std::string::npos) continue; if (f == std::string::npos) continue;
@ -178,10 +187,13 @@ bool HTTP::Parser::parse(std::string & HTTPbuffer){
} }
if (seenHeaders){ if (seenHeaders){
if (length > 0){ if (length > 0){
if (HTTPbuffer.length() >= length){ unsigned int toappend = length - body.length();
body = HTTPbuffer.substr(0, length); if (toappend > 0){
body.append(HTTPbuffer, 0, toappend);
HTTPbuffer.erase(0, toappend);
}
if (length == body.length()){
parseVars(body); //parse POST variables parseVars(body); //parse POST variables
HTTPbuffer.erase(0, length);
return true; return true;
}else{ }else{
return false; return false;
@ -194,7 +206,6 @@ bool HTTP::Parser::parse(std::string & HTTPbuffer){
return false; //empty input return false; //empty input
}//HTTPReader::parse }//HTTPReader::parse
#include <iostream>
/// Parses GET or POST-style variable data. /// Parses GET or POST-style variable data.
/// Saves to internal variable structure using HTTP::Parser::SetVar. /// Saves to internal variable structure using HTTP::Parser::SetVar.
void HTTP::Parser::parseVars(std::string data){ void HTTP::Parser::parseVars(std::string data){

View file

@ -258,15 +258,17 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigne
/// Parses the argument string into the current chunk. /// Parses the argument string into the current chunk.
/// Tries to read a whole chunk, if successful it will remove /// Tries to read a whole chunk, removing data from the input string as it reads.
/// the corresponding data from the input string.
/// If only part of a chunk is read, it will remove the part and call itself again. /// If only part of a chunk is read, it will remove the part and call itself again.
/// This has the effect of only causing a "true" reponse in the case a *whole* chunk /// This has the effect of only causing a "true" reponse in the case a *whole* chunk
/// is read, not just part of a chunk. /// is read, not just part of a chunk.
/// \param indata The input string to parse and update. /// \param indata The input string to parse and update.
/// \warning This function will destroy the current data in this chunk! /// \warning This function will destroy the current data in this chunk!
/// \returns True if a whole chunk could be read, false otherwise. /// \returns True if a whole chunk could be read, false otherwise.
bool RTMPStream::Chunk::Parse(std::string & indata){ bool RTMPStream::Chunk::Parse(std::string & source){
static std::string indata;
indata.append(source);
source.clear();
gettimeofday(&RTMPStream::lastrec, 0); gettimeofday(&RTMPStream::lastrec, 0);
unsigned int i = 0; unsigned int i = 0;
if (indata.size() < 1) return false;//need at least a byte if (indata.size() < 1) return false;//need at least a byte
@ -378,7 +380,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){
if (len_left == 0){ if (len_left == 0){
return true; return true;
}else{ }else{
return Parse(indata); return Parse(source);
} }
}else{ }else{
data = ""; data = "";

View file

@ -12,12 +12,110 @@
#include <netinet/in.h> #include <netinet/in.h>
#endif #endif
#define BUFFER_BLOCKSIZE 4096 //set buffer blocksize to 4KiB
#include <iostream>//temporary for debugging
std::string uint2string(unsigned int i){ std::string uint2string(unsigned int i){
std::stringstream st; std::stringstream st;
st << i; st << i;
return st.str(); return st.str();
} }
/// Returns the amount of elements in the internal std::deque of std::string objects.
/// The back is popped as long as it is empty, first - this way this function is
/// guaranteed to return 0 if the buffer is empty.
unsigned int Socket::Buffer::size(){
while (data.size() > 0 && data.back().empty()){data.pop_back();}
return data.size();
}
/// Appends this string to the internal std::deque of std::string objects.
/// It is automatically split every BUFFER_BLOCKSIZE bytes.
void Socket::Buffer::append(const std::string & newdata){
append(newdata.c_str(), newdata.size());
}
/// Appends this data block to the internal std::deque of std::string objects.
/// It is automatically split every BUFFER_BLOCKSIZE bytes.
void Socket::Buffer::append(const char * newdata, const unsigned int newdatasize){
unsigned int i = 0, j = 0;
while (i < newdatasize){
j = i;
while (j < newdatasize && j - i <= BUFFER_BLOCKSIZE){
j++;
if (newdata[j-1] == '\n'){break;}
}
if (i != j){
data.push_front(std::string(newdata+i, (size_t)(j - i)));
i = j;
}else{
break;
}
}
if (data.size() > 1000){
std::cerr << "Warning: After " << newdatasize << " new bytes, buffer has " << data.size() << " parts!" << std::endl;
}
}
/// Returns true if at least count bytes are available in this buffer.
bool Socket::Buffer::available(unsigned int count){
unsigned int i = 0;
for (std::deque<std::string>::iterator it = data.begin(); it != data.end(); ++it){
i += (*it).size();
if (i >= count){return true;}
}
return false;
}
/// Removes count bytes from the buffer, returning them by value.
/// Returns an empty string if not all count bytes are available.
std::string Socket::Buffer::remove(unsigned int count){
if (!available(count)){return "";}
unsigned int i = 0;
std::string ret;
for (std::deque<std::string>::reverse_iterator it = data.rbegin(); it != data.rend(); ++it){
if (i + (*it).size() < count){
ret.append(*it);
i += (*it).size();
(*it).clear();
}else{
ret.append(*it, 0, count - i);
(*it).erase(0, count - i);
break;
}
}
return ret;
}
/// Copies count bytes from the buffer, returning them by value.
/// Returns an empty string if not all count bytes are available.
std::string Socket::Buffer::copy(unsigned int count){
if (!available(count)){return "";}
unsigned int i = 0;
std::string ret;
for (std::deque<std::string>::reverse_iterator it = data.rbegin(); it != data.rend(); ++it){
if (i + (*it).size() < count){
ret.append(*it);
i += (*it).size();
}else{
ret.append(*it, 0, count - i);
break;
}
}
return ret;
}
/// Gets a reference to the back of the internal std::deque of std::string objects.
std::string & Socket::Buffer::get(){
static std::string empty;
if (data.size() > 0){
return data.back();
}else{
return empty;
}
}
/// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection.
/// \param sockNo Integer representing the socket to convert. /// \param sockNo Integer representing the socket to convert.
Socket::Connection::Connection(int sockNo){ Socket::Connection::Connection(int sockNo){
@ -225,23 +323,36 @@ std::string Socket::Connection::getStats(std::string C){
/// Updates the downbuffer and upbuffer internal variables. /// Updates the downbuffer and upbuffer internal variables.
/// Returns true if new data was received, false otherwise. /// Returns true if new data was received, false otherwise.
bool Socket::Connection::spool(){ bool Socket::Connection::spool(){
iwrite(upbuffer); if (upbuffer.size() > 0){
iwrite(upbuffer.get());
}
/// \todo Provide better mechanism to prevent overbuffering.
if (downbuffer.size() > 1000){
return true;
}else{
return iread(downbuffer); return iread(downbuffer);
}
} }
/// Updates the downbuffer and upbuffer internal variables until upbuffer is empty. /// Updates the downbuffer and upbuffer internal variables until upbuffer is empty.
/// Returns true if new data was received, false otherwise. /// Returns true if new data was received, false otherwise.
bool Socket::Connection::flush(){ bool Socket::Connection::flush(){
while (upbuffer.size() > 0 && connected()){ while (upbuffer.size() > 0 && connected()){
iwrite(upbuffer); if (!iwrite(upbuffer.get())){
usleep(5000);//sleep 5 ms usleep(10000);//sleep 10ms
} }
}
/// \todo Provide better mechanism to prevent overbuffering.
if (downbuffer.size() > 1000){
return true;
}else{
return iread(downbuffer); return iread(downbuffer);
}
} }
/// Returns a reference to the download buffer. /// Returns a reference to the download buffer.
std::string & Socket::Connection::Received(){ Socket::Buffer & Socket::Connection::Received(){
return downbuffer; return downbuffer;
} }
@ -251,16 +362,15 @@ std::string & Socket::Connection::Received(){
/// the data right away. Any data that could not be send will be put into the upbuffer. /// the data right away. Any data that could not be send will be put into the upbuffer.
/// This means this function is blocking if the socket is, but nonblocking otherwise. /// This means this function is blocking if the socket is, but nonblocking otherwise.
void Socket::Connection::Send(std::string & data){ void Socket::Connection::Send(std::string & data){
if (upbuffer.size() > 0){ while (upbuffer.size() > 0){
iwrite(upbuffer); if (!iwrite(upbuffer.get())){break;}
}
if (upbuffer.size() > 0){ if (upbuffer.size() > 0){
upbuffer.append(data); upbuffer.append(data);
} }else{
}
if (upbuffer.size() == 0){
int i = iwrite(data.c_str(), data.size()); int i = iwrite(data.c_str(), data.size());
if (i < data.size()){ if (i < data.size()){
upbuffer.append(data, i, data.size() - i); upbuffer.append(data.c_str()+i, data.size() - i);
} }
} }
} }
@ -272,16 +382,15 @@ void Socket::Connection::Send(std::string & data){
/// This means this function is blocking if the socket is, but nonblocking otherwise. /// This means this function is blocking if the socket is, but nonblocking otherwise.
void Socket::Connection::Send(const char * data){ void Socket::Connection::Send(const char * data){
int len = strlen(data); int len = strlen(data);
if (upbuffer.size() > 0){ while (upbuffer.size() > 0){
iwrite(upbuffer); if (!iwrite(upbuffer.get())){break;}
if (upbuffer.size() > 0){
upbuffer.append(data, (size_t)len);
} }
} if (upbuffer.size() > 0){
if (upbuffer.size() == 0){ upbuffer.append(data, len);
}else{
int i = iwrite(data, len); int i = iwrite(data, len);
if (i < len){ if (i < len){
upbuffer.append(data + i, (size_t)(len - i)); upbuffer.append(data + i, len - i);
} }
} }
} }
@ -292,13 +401,12 @@ void Socket::Connection::Send(const char * data){
/// the data right away. Any data that could not be send will be put into the upbuffer. /// the data right away. Any data that could not be send will be put into the upbuffer.
/// This means this function is blocking if the socket is, but nonblocking otherwise. /// This means this function is blocking if the socket is, but nonblocking otherwise.
void Socket::Connection::Send(const char * data, size_t len){ void Socket::Connection::Send(const char * data, size_t len){
if (upbuffer.size() > 0){ while (upbuffer.size() > 0){
iwrite(upbuffer); if (!iwrite(upbuffer.get())){break;}
}
if (upbuffer.size() > 0){ if (upbuffer.size() > 0){
upbuffer.append(data, len); upbuffer.append(data, len);
} }else{
}
if (upbuffer.size() == 0){
int i = iwrite(data, len); int i = iwrite(data, len);
if (i < len){ if (i < len){
upbuffer.append(data + i, len - i); upbuffer.append(data + i, len - i);
@ -380,14 +488,14 @@ int Socket::Connection::iread(void * buffer, int len){
return r; return r;
}//Socket::Connection::iread }//Socket::Connection::iread
/// Read call that is compatible with std::string. /// Read call that is compatible with Socket::Buffer.
/// Data is read using iread (which is nonblocking if the Socket::Connection itself is), /// Data is read using iread (which is nonblocking if the Socket::Connection itself is),
/// then appended to end of buffer. /// then appended to end of buffer.
/// \param buffer std::string to append data to. /// \param buffer Socket::Buffer to append data to.
/// \return True if new data arrived, false otherwise. /// \return True if new data arrived, false otherwise.
bool Socket::Connection::iread(std::string & buffer){ bool Socket::Connection::iread(Buffer & buffer){
char cbuffer[5000]; char cbuffer[BUFFER_BLOCKSIZE];
int num = iread(cbuffer, 5000); int num = iread(cbuffer, BUFFER_BLOCKSIZE);
if (num < 1){return false;} if (num < 1){return false;}
buffer.append(cbuffer, num); buffer.append(cbuffer, num);
return true; return true;

View file

@ -4,6 +4,7 @@
#pragma once #pragma once
#include <string> #include <string>
#include <sstream>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
@ -13,6 +14,7 @@
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <fcntl.h> #include <fcntl.h>
#include <deque>
//for being friendly with Socket::Connection down below //for being friendly with Socket::Connection down below
namespace Buffer{class user;}; namespace Buffer{class user;};
@ -20,6 +22,20 @@ namespace Buffer{class user;};
///Holds Socket tools. ///Holds Socket tools.
namespace Socket{ namespace Socket{
/// A buffer made out of std::string objects that can be efficiently read from and written to.
class Buffer{
private:
std::deque<std::string> data;
public:
unsigned int size();
void append(const std::string & newdata);
void append(const char * newdata, const unsigned int newdatasize);
std::string & get();
bool available(unsigned int count);
std::string remove(unsigned int count);
std::string copy(unsigned int count);
};//Buffer
/// This class is for easy communicating through sockets, either TCP or Unix. /// This class is for easy communicating through sockets, either TCP or Unix.
class Connection{ class Connection{
private: private:
@ -29,15 +45,15 @@ namespace Socket{
unsigned int up; unsigned int up;
unsigned int down; unsigned int down;
unsigned int conntime; unsigned int conntime;
std::string downbuffer; ///< Stores temporary data coming in. Buffer downbuffer; ///< Stores temporary data coming in.
std::string upbuffer; ///< Stores temporary data going out. Buffer upbuffer; ///< Stores temporary data going out.
int iread(void * buffer, int len); ///< Incremental read call. int iread(void * buffer, int len); ///< Incremental read call.
int iwrite(const void * buffer, int len); ///< Incremental write call. int iwrite(const void * buffer, int len); ///< Incremental write call.
bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. bool iread(Buffer & buffer); ///< Incremental write call that is compatible with Socket::Buffer.
bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string.
public: public:
//friends //friends
friend class Buffer::user; friend class ::Buffer::user;
//constructors //constructors
Connection(); ///< Create a new disconnected base socket. Connection(); ///< Create a new disconnected base socket.
Connection(int sockNo); ///< Create a new base socket. Connection(int sockNo); ///< Create a new base socket.
@ -55,7 +71,7 @@ namespace Socket{
//buffered i/o methods //buffered i/o methods
bool spool(); ///< Updates the downbuffer and upbuffer internal variables. bool spool(); ///< Updates the downbuffer and upbuffer internal variables.
bool flush(); ///< Updates the downbuffer and upbuffer internal variables until upbuffer is empty. bool flush(); ///< Updates the downbuffer and upbuffer internal variables until upbuffer is empty.
std::string & Received(); ///< Returns a reference to the download buffer. Buffer & Received(); ///< Returns a reference to the download buffer.
void Send(std::string & data); ///< Appends data to the upbuffer. void Send(std::string & data); ///< Appends data to the upbuffer.
void Send(const char * data); ///< Appends data to the upbuffer. void Send(const char * data); ///< Appends data to the upbuffer.
void Send(const char * data, size_t len); ///< Appends data to the upbuffer. void Send(const char * data, size_t len); ///< Appends data to the upbuffer.