From c019dc6e9f98c69e78156f86e4560d13dd35e1aa Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 18 Sep 2012 15:48:44 +0200 Subject: [PATCH] Added new timing library, added Socket::Buffer support to RTMP library. --- lib/Makefile.am | 4 +- lib/auth.h | 1 + lib/rtmpchunks.cpp | 159 ++++++++++++++++++++++++++++++++++++++++----- lib/rtmpchunks.h | 5 +- lib/socket.cpp | 27 ++++---- lib/socket.h | 2 +- lib/timing.cpp | 27 ++++++++ lib/timing.h | 10 +++ 8 files changed, 195 insertions(+), 40 deletions(-) create mode 100644 lib/timing.cpp create mode 100644 lib/timing.h diff --git a/lib/Makefile.am b/lib/Makefile.am index fdc39b7b..6ad1607d 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -1,7 +1,7 @@ AM_CPPFLAGS = $(global_CFLAGS) lib_LTLIBRARIES=libmist-1.0.la -libmist_1_0_la_SOURCES=amf.h amf.cpp auth.h auth.cpp base64.h base64.cpp config.h config.cpp crypto.h crypto.cpp dtsc.h dtsc.cpp flv_tag.h flv_tag.cpp http_parser.h http_parser.cpp json.h json.cpp procs.h procs.cpp rtmpchunks.h rtmpchunks.cpp socket.h socket.cpp mp4.h mp4.cpp ftp.h ftp.cpp filesystem.h filesystem.cpp stream.h stream.cpp +libmist_1_0_la_SOURCES=amf.h amf.cpp auth.h auth.cpp base64.h base64.cpp config.h config.cpp crypto.h crypto.cpp dtsc.h dtsc.cpp flv_tag.h flv_tag.cpp http_parser.h http_parser.cpp json.h json.cpp procs.h procs.cpp rtmpchunks.h rtmpchunks.cpp socket.h socket.cpp mp4.h mp4.cpp ftp.h ftp.cpp filesystem.h filesystem.cpp stream.h stream.cpp timing.h timing.cpp libmist_1_0_la_LIBADD=-lssl -lcrypto libmist_1_0_la_LDFLAGS = -version-info 2:0:0 @@ -9,4 +9,4 @@ pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = mist-1.0.pc library_includedir=$(includedir)/mist-1.0/mist -library_include_HEADERS = amf.h auth.h base64.h config.h crypto.h dtsc.h flv_tag.h http_parser.h json.h procs.h rtmpchunks.h socket.h mp4.h ftp.h filesystem.h stream.h +library_include_HEADERS = amf.h auth.h base64.h config.h crypto.h dtsc.h flv_tag.h http_parser.h json.h procs.h rtmpchunks.h socket.h mp4.h ftp.h filesystem.h stream.h timing.h diff --git a/lib/auth.h b/lib/auth.h index 42d4580d..40fce7ec 100644 --- a/lib/auth.h +++ b/lib/auth.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include diff --git a/lib/rtmpchunks.cpp b/lib/rtmpchunks.cpp index 2e46da32..8a9e356f 100644 --- a/lib/rtmpchunks.cpp +++ b/lib/rtmpchunks.cpp @@ -4,19 +4,12 @@ #include "rtmpchunks.h" #include "flv_tag.h" #include "crypto.h" +#include "timing.h" char versionstring[] = "WWW.DDVTECH.COM "; ///< String that is repeated in the RTMP handshake std::string RTMPStream::handshake_in; ///< Input for the handshake. std::string RTMPStream::handshake_out;///< Output for the handshake. -/// Gets the current system time in milliseconds. -unsigned int RTMPStream::getNowMS(){ - timeval t; - gettimeofday(&t, 0); - return t.tv_sec * 1000 + t.tv_usec/1000; -}//RTMPStream::getNowMS - - unsigned int RTMPStream::chunk_rec_max = 128; unsigned int RTMPStream::chunk_snd_max = 128; unsigned int RTMPStream::rec_window_size = 2500000; @@ -147,7 +140,7 @@ RTMPStream::Chunk::Chunk(){ std::string & RTMPStream::SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data){ static RTMPStream::Chunk ch; ch.cs_id = cs_id; - ch.timestamp = RTMPStream::getNowMS(); + ch.timestamp = Util::epoch(); ch.len = data.size(); ch.real_len = data.size(); ch.len_left = 0; @@ -194,7 +187,7 @@ std::string & RTMPStream::SendMedia(FLV::Tag & tag){ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data){ static RTMPStream::Chunk ch; ch.cs_id = 2; - ch.timestamp = RTMPStream::getNowMS(); + ch.timestamp = Util::epoch(); ch.len = 4; ch.real_len = 4; ch.len_left = 0; @@ -209,7 +202,7 @@ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data){ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data, unsigned char data2){ static RTMPStream::Chunk ch; ch.cs_id = 2; - ch.timestamp = RTMPStream::getNowMS(); + ch.timestamp = Util::epoch(); ch.len = 5; ch.real_len = 5; ch.len_left = 0; @@ -225,7 +218,7 @@ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data, unsigne std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data){ static RTMPStream::Chunk ch; ch.cs_id = 2; - ch.timestamp = RTMPStream::getNowMS(); + ch.timestamp = Util::epoch(); ch.len = 6; ch.real_len = 6; ch.len_left = 0; @@ -242,7 +235,7 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data){ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigned int data2){ static RTMPStream::Chunk ch; ch.cs_id = 2; - ch.timestamp = RTMPStream::getNowMS(); + ch.timestamp = Util::epoch(); ch.len = 10; ch.real_len = 10; ch.len_left = 0; @@ -265,10 +258,7 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigne /// \param indata The input string to parse and update. /// \warning This function will destroy the current data in this chunk! /// \returns True if a whole chunk could be read, false otherwise. -bool RTMPStream::Chunk::Parse(std::string & source){ - static std::string indata; - indata.append(source); - source.clear(); +bool RTMPStream::Chunk::Parse(std::string & indata){ gettimeofday(&RTMPStream::lastrec, 0); unsigned int i = 0; if (indata.size() < 1) return false;//need at least a byte @@ -380,7 +370,7 @@ bool RTMPStream::Chunk::Parse(std::string & source){ if (len_left == 0){ return true; }else{ - return Parse(source); + return Parse(indata); } }else{ data = ""; @@ -391,6 +381,139 @@ bool RTMPStream::Chunk::Parse(std::string & source){ } }//Parse +/// Parses the argument string into the current chunk. +/// Tries to read a whole chunk, removing data from the input as it reads. +/// If only part of a chunk is read, it will remove the part and call itself again. +/// This has the effect of only causing a "true" reponse in the case a *whole* chunk +/// is read, not just part of a chunk. +/// \param indata The input string to parse and update. +/// \warning This function will destroy the current data in this chunk! +/// \returns True if a whole chunk could be read, false otherwise. +bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer){ + gettimeofday(&RTMPStream::lastrec, 0); + unsigned int i = 0; + if (!buffer.available(3)){return false;}//we want at least 3 bytes + std::string indata = buffer.copy(3); + + unsigned char chunktype = indata[i++]; + //read the chunkstream ID properly + switch (chunktype & 0x3F){ + case 0: + cs_id = indata[i++] + 64; + break; + case 1: + cs_id = indata[i++] + 64; + cs_id += indata[i++] * 256; + break; + default: + cs_id = chunktype & 0x3F; + break; + } + + RTMPStream::Chunk prev = lastrecv[cs_id]; + + //process the rest of the header, for each chunk type + headertype = chunktype & 0xC0; + switch (headertype){ + case 0x00: + if (!buffer.available(i+11)){return false;} //can't read whole header + indata = buffer.copy(i+11); + timestamp = indata[i++]*256*256; + timestamp += indata[i++]*256; + timestamp += indata[i++]; + len = indata[i++]*256*256; + len += indata[i++]*256; + len += indata[i++]; + len_left = 0; + msg_type_id = indata[i++]; + msg_stream_id = indata[i++]; + msg_stream_id += indata[i++]*256; + msg_stream_id += indata[i++]*256*256; + msg_stream_id += indata[i++]*256*256*256; + break; + case 0x40: + if (!buffer.available(i+7)){return false;} //can't read whole header + indata = buffer.copy(i+7); + if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0x40 with no valid previous chunk!\n");} + timestamp = indata[i++]*256*256; + timestamp += indata[i++]*256; + timestamp += indata[i++]; + if (timestamp != 0x00ffffff){timestamp += prev.timestamp;} + len = indata[i++]*256*256; + len += indata[i++]*256; + len += indata[i++]; + len_left = 0; + msg_type_id = indata[i++]; + msg_stream_id = prev.msg_stream_id; + break; + case 0x80: + if (!buffer.available(i+3)){return false;} //can't read whole header + indata = buffer.copy(i+3); + if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0x80 with no valid previous chunk!\n");} + timestamp = indata[i++]*256*256; + timestamp += indata[i++]*256; + timestamp += indata[i++]; + if (timestamp != 0x00ffffff){timestamp += prev.timestamp;} + len = prev.len; + len_left = prev.len_left; + msg_type_id = prev.msg_type_id; + msg_stream_id = prev.msg_stream_id; + break; + case 0xC0: + if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0xC0 with no valid previous chunk!\n");} + timestamp = prev.timestamp; + len = prev.len; + len_left = prev.len_left; + msg_type_id = prev.msg_type_id; + msg_stream_id = prev.msg_stream_id; + break; + } + //calculate chunk length, real length, and length left till complete + if (len_left > 0){ + real_len = len_left; + len_left -= real_len; + }else{ + real_len = len; + } + if (real_len > RTMPStream::chunk_rec_max){ + len_left += real_len - RTMPStream::chunk_rec_max; + real_len = RTMPStream::chunk_rec_max; + } + //read extended timestamp, if neccesary + if (timestamp == 0x00ffffff){ + if (!buffer.available(i+4)){return false;} //can't read timestamp + indata = buffer.copy(i+4); + timestamp = indata[i++]*256*256*256; + timestamp += indata[i++]*256*256; + timestamp += indata[i++]*256; + timestamp += indata[i++]; + } + + //read data if length > 0, and allocate it + if (real_len > 0){ + if (!buffer.available(i+real_len)){return false;}//can't read all data (yet) + buffer.remove(i);//remove the header + if (prev.len_left > 0){ + data = prev.data + buffer.remove(real_len);//append the data and remove from buffer + }else{ + data = buffer.remove(real_len);//append the data and remove from buffer + } + lastrecv[cs_id] = *this; + RTMPStream::rec_cnt += i+real_len; + if (len_left == 0){ + return true; + }else{ + return Parse(buffer); + } + }else{ + buffer.remove(i);//remove the header + data = ""; + indata = indata.substr(i+real_len); + lastrecv[cs_id] = *this; + RTMPStream::rec_cnt += i+real_len; + return true; + } +}//Parse /// Does the handshake. Expects handshake_in to be filled, and fills handshake_out. /// After calling this function, don't forget to read and ignore 1536 extra bytes, diff --git a/lib/rtmpchunks.h b/lib/rtmpchunks.h index e153f4ca..21dd160e 100644 --- a/lib/rtmpchunks.h +++ b/lib/rtmpchunks.h @@ -8,6 +8,7 @@ #include #include #include +#include "socket.h" //forward declaration of FLV::Tag to avoid circular dependencies. namespace FLV{ @@ -17,9 +18,6 @@ namespace FLV{ /// Contains all functions and classes needed for RTMP connections. namespace RTMPStream{ - /// Gets the current system time in milliseconds. - unsigned int getNowMS(); - extern unsigned int chunk_rec_max; ///< Maximum size for a received chunk. extern unsigned int chunk_snd_max; ///< Maximum size for a sent chunk. extern unsigned int rec_window_size; ///< Window size for receiving. @@ -46,6 +44,7 @@ namespace RTMPStream{ Chunk(); bool Parse(std::string & data); + bool Parse(Socket::Buffer & data); std::string & Pack(); private: diff --git a/lib/socket.cpp b/lib/socket.cpp index 10f39465..461904ba 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -3,6 +3,7 @@ /// Written by Jaron Vietor in 2010 for DDVTech #include "socket.h" +#include "timing.h" #include #include #include @@ -15,19 +16,13 @@ #define BUFFER_BLOCKSIZE 4096 //set buffer blocksize to 4KiB #include //temporary for debugging + std::string uint2string(unsigned int i){ std::stringstream st; st << i; return st.str(); } -void ms_sleep(int ms){ - struct timespec T; - T.tv_sec = ms/1000; - T.tv_nsec = 1000*(ms%1000); - nanosleep(&T, 0); -} - /// Returns the amount of elements in the internal std::deque of std::string objects. /// The back is popped as long as it is empty, first - this way this function is /// guaranteed to return 0 if the buffer is empty. @@ -132,7 +127,7 @@ Socket::Connection::Connection(int sockNo){ pipes[1] = -1; up = 0; down = 0; - conntime = time(0); + conntime = Util::epoch(); Error = false; Blocking = false; }//Socket::Connection basic constructor @@ -146,7 +141,7 @@ Socket::Connection::Connection(int write, int read){ pipes[1] = read; up = 0; down = 0; - conntime = time(0); + conntime = Util::epoch(); Error = false; Blocking = false; }//Socket::Connection basic constructor @@ -159,7 +154,7 @@ Socket::Connection::Connection(){ pipes[1] = -1; up = 0; down = 0; - conntime = time(0); + conntime = Util::epoch(); Error = false; Blocking = false; }//Socket::Connection basic constructor @@ -231,7 +226,7 @@ Socket::Connection::Connection(std::string address, bool nonblock){ Blocking = false; up = 0; down = 0; - conntime = time(0); + conntime = Util::epoch(); sockaddr_un addr; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, address.c_str(), address.size()+1); @@ -260,7 +255,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ Blocking = false; up = 0; down = 0; - conntime = time(0); + conntime = Util::epoch(); std::stringstream ss; ss << port; @@ -325,7 +320,7 @@ unsigned int Socket::Connection::dataDown(){ /// Returns a std::string of stats, ended by a newline. /// Requires the current connector name as an argument. std::string Socket::Connection::getStats(std::string C){ - return getHost() + " " + C + " " + uint2string(time(0) - conntime) + " " + uint2string(up) + " " + uint2string(down) + "\n"; + return "S " + getHost() + " " + C + " " + uint2string(Util::epoch() - conntime) + " " + uint2string(up) + " " + uint2string(down) + "\n"; } /// Updates the downbuffer and upbuffer internal variables. @@ -347,7 +342,7 @@ bool Socket::Connection::spool(){ bool Socket::Connection::flush(){ while (upbuffer.size() > 0 && connected()){ if (!iwrite(upbuffer.get())){ - ms_sleep(10);//sleep 10ms + Util::sleep(10);//sleep 10ms } } /// \todo Provide better mechanism to prevent overbuffering. @@ -370,7 +365,7 @@ Socket::Buffer & Socket::Connection::Received(){ void Socket::Connection::SendNow(const char * data, size_t len){ while (upbuffer.size() > 0 && connected()){ if (!iwrite(upbuffer.get())){ - ms_sleep(1);//sleep 1ms if buffer full + Util::sleep(1);//sleep 1ms if buffer full } } int i = iwrite(data, len); @@ -379,7 +374,7 @@ void Socket::Connection::SendNow(const char * data, size_t len){ if (j > 0){ i += j; }else{ - ms_sleep(1);//sleep 1ms and retry + Util::sleep(1);//sleep 1ms and retry } } } diff --git a/lib/socket.h b/lib/socket.h index 4b4394b8..c5f907f7 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -44,7 +44,7 @@ namespace Socket{ std::string remotehost; ///< Stores remote host address. unsigned int up; unsigned int down; - unsigned int conntime; + long long int conntime; Buffer downbuffer; ///< Stores temporary data coming in. Buffer upbuffer; ///< Stores temporary data going out. int iread(void * buffer, int len); ///< Incremental read call. diff --git a/lib/timing.cpp b/lib/timing.cpp new file mode 100644 index 00000000..dd26db1b --- /dev/null +++ b/lib/timing.cpp @@ -0,0 +1,27 @@ +/// \file time.cpp +/// Utilities for handling time and timestamps. + +#include "timing.h" +#include //for gettimeofday +#include //for time and nanosleep + +/// Sleeps for the indicated amount of milliseconds or longer. +void Util::sleep(int ms){ + struct timespec T; + T.tv_sec = ms/1000; + T.tv_nsec = 1000*(ms%1000); + nanosleep(&T, 0); +} + +/// Gets the current time in milliseconds. +long long int Util::getMS(){ + /// \todo Possibly change to use clock_gettime - needs -lrt though... + timeval t; + gettimeofday(&t, 0); + return t.tv_sec * 1000 + t.tv_usec/1000; +} + +/// Gets the amount of seconds since 01/01/1970. +long long int Util::epoch(){ + return time(0); +} diff --git a/lib/timing.h b/lib/timing.h new file mode 100644 index 00000000..b9c43a79 --- /dev/null +++ b/lib/timing.h @@ -0,0 +1,10 @@ +/// \file time.h +/// Utilities for handling time and timestamps. + +#pragma once + +namespace Util{ + void sleep(int ms); ///< Sleeps for the indicated amount of milliseconds or longer. + long long int getMS(); ///< Gets the current time in milliseconds. + long long int epoch(); ///< Gets the amount of seconds since 01/01/1970. +};