diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index aa82c9b5..d7981462 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -86,6 +86,59 @@ bool DTSC::Stream::parsePacket(std::string & buffer){ return false; } +/// Attempts to parse a packet from the given Socket::Buffer. +/// Returns true if successful, removing the parsed part from the buffer. +/// Returns false if invalid or not enough data is in the buffer. +/// \arg buffer The Socket::Buffer to attempt to parse. +bool DTSC::Stream::parsePacket(Socket::Buffer & buffer){ + uint32_t len; + static bool syncing = false; + if (buffer.available(8)){ + std::string header_bytes = buffer.copy(8); + if (memcmp(header_bytes.c_str(), DTSC::Magic_Header, 4) == 0){ + len = ntohl(((uint32_t *)header_bytes.c_str())[1]); + if (!buffer.available(len+8)){return false;} + unsigned int i = 0; + std::string wholepacket = buffer.remove(len+8); + metadata = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i); + return false; + } + if (memcmp(header_bytes.c_str(), DTSC::Magic_Packet, 4) == 0){ + len = ntohl(((uint32_t *)header_bytes.c_str())[1]); + if (!buffer.available(len+8)){return false;} + buffers.push_front(JSON::Value()); + unsigned int i = 0; + std::string wholepacket = buffer.remove(len+8); + buffers.front() = JSON::fromDTMI((unsigned char*)wholepacket.c_str() + 8, len, i); + datapointertype = INVALID; + if (buffers.front().isMember("data")){ + datapointer = &(buffers.front()["data"].strVal); + }else{ + datapointer = 0; + } + if (buffers.front().isMember("datatype")){ + std::string tmp = buffers.front()["datatype"].asString(); + if (tmp == "video"){datapointertype = VIDEO;} + if (tmp == "audio"){datapointertype = AUDIO;} + if (tmp == "meta"){datapointertype = META;} + if (tmp == "pause_marker"){datapointertype = PAUSEMARK;} + } + while (buffers.size() > buffercount){buffers.pop_back();} + advanceRings(); + syncing = false; + return true; + } + #if DEBUG >= 2 + if (!syncing){ + std::cerr << "Error: Invalid DTMI data detected - syncing" << std::endl; + syncing = true; + } + #endif + buffer.get().clear(); + } + return false; +} + /// Returns a direct pointer to the data attribute of the last received packet, if available. /// Returns NULL if no valid pointer or packet is available. std::string & DTSC::Stream::lastData(){ @@ -295,7 +348,7 @@ void DTSC::File::seekNext(){ if (frames[currframe] != pos){ currframe++; currtime = jsonbuffer["time"].asInt(); - #if DEBUG >= 4 + #if DEBUG >= 6 if (frames[currframe] != pos){ std::cerr << "Found a new frame " << currframe << " @ " << pos << "b/" << currtime << "ms" << std::endl; }else{ diff --git a/lib/dtsc.h b/lib/dtsc.h index c4c578ac..668b0e26 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -10,6 +10,7 @@ #include #include //for FILE #include "json.h" +#include "socket.h" @@ -113,6 +114,7 @@ namespace DTSC{ bool hasVideo(); bool hasAudio(); bool parsePacket(std::string & buffer); + bool parsePacket(Socket::Buffer & buffer); std::string & outPacket(unsigned int num); std::string & outHeader(); Ring * getRing(); diff --git a/lib/ftp.cpp b/lib/ftp.cpp index 25695dee..7510e948 100644 --- a/lib/ftp.cpp +++ b/lib/ftp.cpp @@ -169,7 +169,8 @@ int FTP::User::ParseCommand( std::string Command ) { fprintf( stderr, "Reading STOR information\n" ); std::string Buffer; while( Connected.spool() ) { } - Buffer = Connected.Received(); + /// \todo Comment me back in. ^_^ + //Buffer = Connected.Received(); MyDir.STOR( Command, Buffer ); return 250; break; diff --git a/lib/http_parser.cpp b/lib/http_parser.cpp index f8489fdb..b554e5c0 100644 --- a/lib/http_parser.cpp +++ b/lib/http_parser.cpp @@ -133,6 +133,7 @@ bool HTTP::Parser::Read(std::string & strbuf){ return parse(strbuf); }//HTTPReader::Read +#include /// Attempt to read a whole HTTP response or request from a data buffer. /// If succesful, fills its own fields with the proper data and removes the response/request /// from the data buffer. @@ -141,13 +142,17 @@ bool HTTP::Parser::Read(std::string & strbuf){ bool HTTP::Parser::parse(std::string & HTTPbuffer){ size_t f; std::string tmpA, tmpB, tmpC; - /// \todo Make this not resize HTTPbuffer in parts, but read all at once and then remove the entire request, like doxygen claims it does. + /// \todo Make this not resize HTTPbuffer in parts, but read all at once and then remove the entire request, like doxygen claims it does? while (!HTTPbuffer.empty()){ if (!seenHeaders){ f = HTTPbuffer.find('\n'); if (f == std::string::npos) return false; tmpA = HTTPbuffer.substr(0, f); - HTTPbuffer.erase(0, f+1); + if (f+1 == HTTPbuffer.size()){ + HTTPbuffer.clear(); + }else{ + HTTPbuffer.erase(0, f+1); + } while (tmpA.find('\r') != std::string::npos){tmpA.erase(tmpA.find('\r'));} if (!seenReq){ seenReq = true; @@ -166,7 +171,11 @@ bool HTTP::Parser::parse(std::string & HTTPbuffer){ }else{ if (tmpA.size() == 0){ seenHeaders = true; - if (GetHeader("Content-Length") != ""){length = atoi(GetHeader("Content-Length").c_str());} + body.clear(); + if (GetHeader("Content-Length") != ""){ + length = atoi(GetHeader("Content-Length").c_str()); + if (body.capacity() < length){body.reserve(length);} + } }else{ f = tmpA.find(':'); if (f == std::string::npos) continue; @@ -178,10 +187,13 @@ bool HTTP::Parser::parse(std::string & HTTPbuffer){ } if (seenHeaders){ if (length > 0){ - if (HTTPbuffer.length() >= length){ - body = HTTPbuffer.substr(0, length); + unsigned int toappend = length - body.length(); + if (toappend > 0){ + body.append(HTTPbuffer, 0, toappend); + HTTPbuffer.erase(0, toappend); + } + if (length == body.length()){ parseVars(body); //parse POST variables - HTTPbuffer.erase(0, length); return true; }else{ return false; @@ -194,7 +206,6 @@ bool HTTP::Parser::parse(std::string & HTTPbuffer){ return false; //empty input }//HTTPReader::parse -#include /// Parses GET or POST-style variable data. /// Saves to internal variable structure using HTTP::Parser::SetVar. void HTTP::Parser::parseVars(std::string data){ diff --git a/lib/rtmpchunks.cpp b/lib/rtmpchunks.cpp index 74a41a2a..2e46da32 100644 --- a/lib/rtmpchunks.cpp +++ b/lib/rtmpchunks.cpp @@ -258,15 +258,17 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigne /// Parses the argument string into the current chunk. -/// Tries to read a whole chunk, if successful it will remove -/// the corresponding data from the input string. +/// Tries to read a whole chunk, removing data from the input string as it reads. /// If only part of a chunk is read, it will remove the part and call itself again. /// This has the effect of only causing a "true" reponse in the case a *whole* chunk /// is read, not just part of a chunk. /// \param indata The input string to parse and update. /// \warning This function will destroy the current data in this chunk! /// \returns True if a whole chunk could be read, false otherwise. -bool RTMPStream::Chunk::Parse(std::string & indata){ +bool RTMPStream::Chunk::Parse(std::string & source){ + static std::string indata; + indata.append(source); + source.clear(); gettimeofday(&RTMPStream::lastrec, 0); unsigned int i = 0; if (indata.size() < 1) return false;//need at least a byte @@ -378,7 +380,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){ if (len_left == 0){ return true; }else{ - return Parse(indata); + return Parse(source); } }else{ data = ""; diff --git a/lib/socket.cpp b/lib/socket.cpp index d94545b6..f6250aa7 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -12,12 +12,110 @@ #include #endif +#define BUFFER_BLOCKSIZE 4096 //set buffer blocksize to 4KiB +#include //temporary for debugging + std::string uint2string(unsigned int i){ std::stringstream st; st << i; return st.str(); } +/// Returns the amount of elements in the internal std::deque of std::string objects. +/// The back is popped as long as it is empty, first - this way this function is +/// guaranteed to return 0 if the buffer is empty. +unsigned int Socket::Buffer::size(){ + while (data.size() > 0 && data.back().empty()){data.pop_back();} + return data.size(); +} + +/// Appends this string to the internal std::deque of std::string objects. +/// It is automatically split every BUFFER_BLOCKSIZE bytes. +void Socket::Buffer::append(const std::string & newdata){ + append(newdata.c_str(), newdata.size()); +} + +/// Appends this data block to the internal std::deque of std::string objects. +/// It is automatically split every BUFFER_BLOCKSIZE bytes. +void Socket::Buffer::append(const char * newdata, const unsigned int newdatasize){ + unsigned int i = 0, j = 0; + while (i < newdatasize){ + j = i; + while (j < newdatasize && j - i <= BUFFER_BLOCKSIZE){ + j++; + if (newdata[j-1] == '\n'){break;} + } + if (i != j){ + data.push_front(std::string(newdata+i, (size_t)(j - i))); + i = j; + }else{ + break; + } + } + if (data.size() > 1000){ + std::cerr << "Warning: After " << newdatasize << " new bytes, buffer has " << data.size() << " parts!" << std::endl; + } +} + +/// Returns true if at least count bytes are available in this buffer. +bool Socket::Buffer::available(unsigned int count){ + unsigned int i = 0; + for (std::deque::iterator it = data.begin(); it != data.end(); ++it){ + i += (*it).size(); + if (i >= count){return true;} + } + return false; +} + +/// Removes count bytes from the buffer, returning them by value. +/// Returns an empty string if not all count bytes are available. +std::string Socket::Buffer::remove(unsigned int count){ + if (!available(count)){return "";} + unsigned int i = 0; + std::string ret; + for (std::deque::reverse_iterator it = data.rbegin(); it != data.rend(); ++it){ + if (i + (*it).size() < count){ + ret.append(*it); + i += (*it).size(); + (*it).clear(); + }else{ + ret.append(*it, 0, count - i); + (*it).erase(0, count - i); + break; + } + } + return ret; +} + +/// Copies count bytes from the buffer, returning them by value. +/// Returns an empty string if not all count bytes are available. +std::string Socket::Buffer::copy(unsigned int count){ + if (!available(count)){return "";} + unsigned int i = 0; + std::string ret; + for (std::deque::reverse_iterator it = data.rbegin(); it != data.rend(); ++it){ + if (i + (*it).size() < count){ + ret.append(*it); + i += (*it).size(); + }else{ + ret.append(*it, 0, count - i); + break; + } + } + return ret; +} + +/// Gets a reference to the back of the internal std::deque of std::string objects. +std::string & Socket::Buffer::get(){ + static std::string empty; + if (data.size() > 0){ + return data.back(); + }else{ + return empty; + } +} + + /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// \param sockNo Integer representing the socket to convert. Socket::Connection::Connection(int sockNo){ @@ -225,23 +323,36 @@ std::string Socket::Connection::getStats(std::string C){ /// Updates the downbuffer and upbuffer internal variables. /// Returns true if new data was received, false otherwise. bool Socket::Connection::spool(){ - iwrite(upbuffer); - return iread(downbuffer); + if (upbuffer.size() > 0){ + iwrite(upbuffer.get()); + } + /// \todo Provide better mechanism to prevent overbuffering. + if (downbuffer.size() > 1000){ + return true; + }else{ + return iread(downbuffer); + } } /// Updates the downbuffer and upbuffer internal variables until upbuffer is empty. /// Returns true if new data was received, false otherwise. bool Socket::Connection::flush(){ while (upbuffer.size() > 0 && connected()){ - iwrite(upbuffer); - usleep(5000);//sleep 5 ms + if (!iwrite(upbuffer.get())){ + usleep(10000);//sleep 10ms + } + } + /// \todo Provide better mechanism to prevent overbuffering. + if (downbuffer.size() > 1000){ + return true; + }else{ + return iread(downbuffer); } - return iread(downbuffer); } /// Returns a reference to the download buffer. -std::string & Socket::Connection::Received(){ +Socket::Buffer & Socket::Connection::Received(){ return downbuffer; } @@ -251,16 +362,15 @@ std::string & Socket::Connection::Received(){ /// the data right away. Any data that could not be send will be put into the upbuffer. /// This means this function is blocking if the socket is, but nonblocking otherwise. void Socket::Connection::Send(std::string & data){ - if (upbuffer.size() > 0){ - iwrite(upbuffer); - if (upbuffer.size() > 0){ - upbuffer.append(data); - } + while (upbuffer.size() > 0){ + if (!iwrite(upbuffer.get())){break;} } - if (upbuffer.size() == 0){ + if (upbuffer.size() > 0){ + upbuffer.append(data); + }else{ int i = iwrite(data.c_str(), data.size()); if (i < data.size()){ - upbuffer.append(data, i, data.size() - i); + upbuffer.append(data.c_str()+i, data.size() - i); } } } @@ -272,16 +382,15 @@ void Socket::Connection::Send(std::string & data){ /// This means this function is blocking if the socket is, but nonblocking otherwise. void Socket::Connection::Send(const char * data){ int len = strlen(data); - if (upbuffer.size() > 0){ - iwrite(upbuffer); - if (upbuffer.size() > 0){ - upbuffer.append(data, (size_t)len); - } + while (upbuffer.size() > 0){ + if (!iwrite(upbuffer.get())){break;} } - if (upbuffer.size() == 0){ + if (upbuffer.size() > 0){ + upbuffer.append(data, len); + }else{ int i = iwrite(data, len); if (i < len){ - upbuffer.append(data + i, (size_t)(len - i)); + upbuffer.append(data + i, len - i); } } } @@ -292,13 +401,12 @@ void Socket::Connection::Send(const char * data){ /// the data right away. Any data that could not be send will be put into the upbuffer. /// This means this function is blocking if the socket is, but nonblocking otherwise. void Socket::Connection::Send(const char * data, size_t len){ - if (upbuffer.size() > 0){ - iwrite(upbuffer); - if (upbuffer.size() > 0){ - upbuffer.append(data, len); - } + while (upbuffer.size() > 0){ + if (!iwrite(upbuffer.get())){break;} } - if (upbuffer.size() == 0){ + if (upbuffer.size() > 0){ + upbuffer.append(data, len); + }else{ int i = iwrite(data, len); if (i < len){ upbuffer.append(data + i, len - i); @@ -380,14 +488,14 @@ int Socket::Connection::iread(void * buffer, int len){ return r; }//Socket::Connection::iread -/// Read call that is compatible with std::string. +/// Read call that is compatible with Socket::Buffer. /// Data is read using iread (which is nonblocking if the Socket::Connection itself is), /// then appended to end of buffer. -/// \param buffer std::string to append data to. +/// \param buffer Socket::Buffer to append data to. /// \return True if new data arrived, false otherwise. -bool Socket::Connection::iread(std::string & buffer){ - char cbuffer[5000]; - int num = iread(cbuffer, 5000); +bool Socket::Connection::iread(Buffer & buffer){ + char cbuffer[BUFFER_BLOCKSIZE]; + int num = iread(cbuffer, BUFFER_BLOCKSIZE); if (num < 1){return false;} buffer.append(cbuffer, num); return true; diff --git a/lib/socket.h b/lib/socket.h index 273a6433..3283c99b 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include //for being friendly with Socket::Connection down below namespace Buffer{class user;}; @@ -20,6 +22,20 @@ namespace Buffer{class user;}; ///Holds Socket tools. namespace Socket{ + /// A buffer made out of std::string objects that can be efficiently read from and written to. + class Buffer{ + private: + std::deque data; + public: + unsigned int size(); + void append(const std::string & newdata); + void append(const char * newdata, const unsigned int newdatasize); + std::string & get(); + bool available(unsigned int count); + std::string remove(unsigned int count); + std::string copy(unsigned int count); + };//Buffer + /// This class is for easy communicating through sockets, either TCP or Unix. class Connection{ private: @@ -29,15 +45,15 @@ namespace Socket{ unsigned int up; unsigned int down; unsigned int conntime; - std::string downbuffer; ///< Stores temporary data coming in. - std::string upbuffer; ///< Stores temporary data going out. + Buffer downbuffer; ///< Stores temporary data coming in. + Buffer upbuffer; ///< Stores temporary data going out. int iread(void * buffer, int len); ///< Incremental read call. int iwrite(const void * buffer, int len); ///< Incremental write call. - bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. + bool iread(Buffer & buffer); ///< Incremental write call that is compatible with Socket::Buffer. bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. - public: + public: //friends - friend class Buffer::user; + friend class ::Buffer::user; //constructors Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. @@ -55,7 +71,7 @@ namespace Socket{ //buffered i/o methods bool spool(); ///< Updates the downbuffer and upbuffer internal variables. bool flush(); ///< Updates the downbuffer and upbuffer internal variables until upbuffer is empty. - std::string & Received(); ///< Returns a reference to the download buffer. + Buffer & Received(); ///< Returns a reference to the download buffer. void Send(std::string & data); ///< Appends data to the upbuffer. void Send(const char * data); ///< Appends data to the upbuffer. void Send(const char * data, size_t len); ///< Appends data to the upbuffer.