Added new timing library, added Socket::Buffer support to RTMP library.

This commit is contained in:
Thulinma 2012-09-18 15:48:44 +02:00
parent c95bf32fae
commit c019dc6e9f
8 changed files with 195 additions and 40 deletions

View file

@ -1,7 +1,7 @@
AM_CPPFLAGS = $(global_CFLAGS) AM_CPPFLAGS = $(global_CFLAGS)
lib_LTLIBRARIES=libmist-1.0.la lib_LTLIBRARIES=libmist-1.0.la
libmist_1_0_la_SOURCES=amf.h amf.cpp auth.h auth.cpp base64.h base64.cpp config.h config.cpp crypto.h crypto.cpp dtsc.h dtsc.cpp flv_tag.h flv_tag.cpp http_parser.h http_parser.cpp json.h json.cpp procs.h procs.cpp rtmpchunks.h rtmpchunks.cpp socket.h socket.cpp mp4.h mp4.cpp ftp.h ftp.cpp filesystem.h filesystem.cpp stream.h stream.cpp libmist_1_0_la_SOURCES=amf.h amf.cpp auth.h auth.cpp base64.h base64.cpp config.h config.cpp crypto.h crypto.cpp dtsc.h dtsc.cpp flv_tag.h flv_tag.cpp http_parser.h http_parser.cpp json.h json.cpp procs.h procs.cpp rtmpchunks.h rtmpchunks.cpp socket.h socket.cpp mp4.h mp4.cpp ftp.h ftp.cpp filesystem.h filesystem.cpp stream.h stream.cpp timing.h timing.cpp
libmist_1_0_la_LIBADD=-lssl -lcrypto libmist_1_0_la_LIBADD=-lssl -lcrypto
libmist_1_0_la_LDFLAGS = -version-info 2:0:0 libmist_1_0_la_LDFLAGS = -version-info 2:0:0
@ -9,4 +9,4 @@ pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = mist-1.0.pc pkgconfig_DATA = mist-1.0.pc
library_includedir=$(includedir)/mist-1.0/mist library_includedir=$(includedir)/mist-1.0/mist
library_include_HEADERS = amf.h auth.h base64.h config.h crypto.h dtsc.h flv_tag.h http_parser.h json.h procs.h rtmpchunks.h socket.h mp4.h ftp.h filesystem.h stream.h library_include_HEADERS = amf.h auth.h base64.h config.h crypto.h dtsc.h flv_tag.h http_parser.h json.h procs.h rtmpchunks.h socket.h mp4.h ftp.h filesystem.h stream.h timing.h

View file

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <string> #include <string>
#include <time.h>
#include <openssl/rsa.h> #include <openssl/rsa.h>
#include <openssl/x509.h> #include <openssl/x509.h>

View file

@ -4,19 +4,12 @@
#include "rtmpchunks.h" #include "rtmpchunks.h"
#include "flv_tag.h" #include "flv_tag.h"
#include "crypto.h" #include "crypto.h"
#include "timing.h"
char versionstring[] = "WWW.DDVTECH.COM "; ///< String that is repeated in the RTMP handshake char versionstring[] = "WWW.DDVTECH.COM "; ///< String that is repeated in the RTMP handshake
std::string RTMPStream::handshake_in; ///< Input for the handshake. std::string RTMPStream::handshake_in; ///< Input for the handshake.
std::string RTMPStream::handshake_out;///< Output for the handshake. std::string RTMPStream::handshake_out;///< Output for the handshake.
/// Gets the current system time in milliseconds.
unsigned int RTMPStream::getNowMS(){
timeval t;
gettimeofday(&t, 0);
return t.tv_sec * 1000 + t.tv_usec/1000;
}//RTMPStream::getNowMS
unsigned int RTMPStream::chunk_rec_max = 128; unsigned int RTMPStream::chunk_rec_max = 128;
unsigned int RTMPStream::chunk_snd_max = 128; unsigned int RTMPStream::chunk_snd_max = 128;
unsigned int RTMPStream::rec_window_size = 2500000; unsigned int RTMPStream::rec_window_size = 2500000;
@ -147,7 +140,7 @@ RTMPStream::Chunk::Chunk(){
std::string & RTMPStream::SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data){ std::string & RTMPStream::SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data){
static RTMPStream::Chunk ch; static RTMPStream::Chunk ch;
ch.cs_id = cs_id; ch.cs_id = cs_id;
ch.timestamp = RTMPStream::getNowMS(); ch.timestamp = Util::epoch();
ch.len = data.size(); ch.len = data.size();
ch.real_len = data.size(); ch.real_len = data.size();
ch.len_left = 0; ch.len_left = 0;
@ -194,7 +187,7 @@ std::string & RTMPStream::SendMedia(FLV::Tag & tag){
std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data){ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data){
static RTMPStream::Chunk ch; static RTMPStream::Chunk ch;
ch.cs_id = 2; ch.cs_id = 2;
ch.timestamp = RTMPStream::getNowMS(); ch.timestamp = Util::epoch();
ch.len = 4; ch.len = 4;
ch.real_len = 4; ch.real_len = 4;
ch.len_left = 0; ch.len_left = 0;
@ -209,7 +202,7 @@ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data){
std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data, unsigned char data2){ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data, unsigned char data2){
static RTMPStream::Chunk ch; static RTMPStream::Chunk ch;
ch.cs_id = 2; ch.cs_id = 2;
ch.timestamp = RTMPStream::getNowMS(); ch.timestamp = Util::epoch();
ch.len = 5; ch.len = 5;
ch.real_len = 5; ch.real_len = 5;
ch.len_left = 0; ch.len_left = 0;
@ -225,7 +218,7 @@ std::string & RTMPStream::SendCTL(unsigned char type, unsigned int data, unsigne
std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data){ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data){
static RTMPStream::Chunk ch; static RTMPStream::Chunk ch;
ch.cs_id = 2; ch.cs_id = 2;
ch.timestamp = RTMPStream::getNowMS(); ch.timestamp = Util::epoch();
ch.len = 6; ch.len = 6;
ch.real_len = 6; ch.real_len = 6;
ch.len_left = 0; ch.len_left = 0;
@ -242,7 +235,7 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data){
std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigned int data2){ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigned int data2){
static RTMPStream::Chunk ch; static RTMPStream::Chunk ch;
ch.cs_id = 2; ch.cs_id = 2;
ch.timestamp = RTMPStream::getNowMS(); ch.timestamp = Util::epoch();
ch.len = 10; ch.len = 10;
ch.real_len = 10; ch.real_len = 10;
ch.len_left = 0; ch.len_left = 0;
@ -265,10 +258,7 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigne
/// \param indata The input string to parse and update. /// \param indata The input string to parse and update.
/// \warning This function will destroy the current data in this chunk! /// \warning This function will destroy the current data in this chunk!
/// \returns True if a whole chunk could be read, false otherwise. /// \returns True if a whole chunk could be read, false otherwise.
bool RTMPStream::Chunk::Parse(std::string & source){ bool RTMPStream::Chunk::Parse(std::string & indata){
static std::string indata;
indata.append(source);
source.clear();
gettimeofday(&RTMPStream::lastrec, 0); gettimeofday(&RTMPStream::lastrec, 0);
unsigned int i = 0; unsigned int i = 0;
if (indata.size() < 1) return false;//need at least a byte if (indata.size() < 1) return false;//need at least a byte
@ -380,7 +370,7 @@ bool RTMPStream::Chunk::Parse(std::string & source){
if (len_left == 0){ if (len_left == 0){
return true; return true;
}else{ }else{
return Parse(source); return Parse(indata);
} }
}else{ }else{
data = ""; data = "";
@ -391,6 +381,139 @@ bool RTMPStream::Chunk::Parse(std::string & source){
} }
}//Parse }//Parse
/// Parses the argument string into the current chunk.
/// Tries to read a whole chunk, removing data from the input as it reads.
/// If only part of a chunk is read, it will remove the part and call itself again.
/// This has the effect of only causing a "true" reponse in the case a *whole* chunk
/// is read, not just part of a chunk.
/// \param indata The input string to parse and update.
/// \warning This function will destroy the current data in this chunk!
/// \returns True if a whole chunk could be read, false otherwise.
bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer){
gettimeofday(&RTMPStream::lastrec, 0);
unsigned int i = 0;
if (!buffer.available(3)){return false;}//we want at least 3 bytes
std::string indata = buffer.copy(3);
unsigned char chunktype = indata[i++];
//read the chunkstream ID properly
switch (chunktype & 0x3F){
case 0:
cs_id = indata[i++] + 64;
break;
case 1:
cs_id = indata[i++] + 64;
cs_id += indata[i++] * 256;
break;
default:
cs_id = chunktype & 0x3F;
break;
}
RTMPStream::Chunk prev = lastrecv[cs_id];
//process the rest of the header, for each chunk type
headertype = chunktype & 0xC0;
switch (headertype){
case 0x00:
if (!buffer.available(i+11)){return false;} //can't read whole header
indata = buffer.copy(i+11);
timestamp = indata[i++]*256*256;
timestamp += indata[i++]*256;
timestamp += indata[i++];
len = indata[i++]*256*256;
len += indata[i++]*256;
len += indata[i++];
len_left = 0;
msg_type_id = indata[i++];
msg_stream_id = indata[i++];
msg_stream_id += indata[i++]*256;
msg_stream_id += indata[i++]*256*256;
msg_stream_id += indata[i++]*256*256*256;
break;
case 0x40:
if (!buffer.available(i+7)){return false;} //can't read whole header
indata = buffer.copy(i+7);
if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0x40 with no valid previous chunk!\n");}
timestamp = indata[i++]*256*256;
timestamp += indata[i++]*256;
timestamp += indata[i++];
if (timestamp != 0x00ffffff){timestamp += prev.timestamp;}
len = indata[i++]*256*256;
len += indata[i++]*256;
len += indata[i++];
len_left = 0;
msg_type_id = indata[i++];
msg_stream_id = prev.msg_stream_id;
break;
case 0x80:
if (!buffer.available(i+3)){return false;} //can't read whole header
indata = buffer.copy(i+3);
if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0x80 with no valid previous chunk!\n");}
timestamp = indata[i++]*256*256;
timestamp += indata[i++]*256;
timestamp += indata[i++];
if (timestamp != 0x00ffffff){timestamp += prev.timestamp;}
len = prev.len;
len_left = prev.len_left;
msg_type_id = prev.msg_type_id;
msg_stream_id = prev.msg_stream_id;
break;
case 0xC0:
if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0xC0 with no valid previous chunk!\n");}
timestamp = prev.timestamp;
len = prev.len;
len_left = prev.len_left;
msg_type_id = prev.msg_type_id;
msg_stream_id = prev.msg_stream_id;
break;
}
//calculate chunk length, real length, and length left till complete
if (len_left > 0){
real_len = len_left;
len_left -= real_len;
}else{
real_len = len;
}
if (real_len > RTMPStream::chunk_rec_max){
len_left += real_len - RTMPStream::chunk_rec_max;
real_len = RTMPStream::chunk_rec_max;
}
//read extended timestamp, if neccesary
if (timestamp == 0x00ffffff){
if (!buffer.available(i+4)){return false;} //can't read timestamp
indata = buffer.copy(i+4);
timestamp = indata[i++]*256*256*256;
timestamp += indata[i++]*256*256;
timestamp += indata[i++]*256;
timestamp += indata[i++];
}
//read data if length > 0, and allocate it
if (real_len > 0){
if (!buffer.available(i+real_len)){return false;}//can't read all data (yet)
buffer.remove(i);//remove the header
if (prev.len_left > 0){
data = prev.data + buffer.remove(real_len);//append the data and remove from buffer
}else{
data = buffer.remove(real_len);//append the data and remove from buffer
}
lastrecv[cs_id] = *this;
RTMPStream::rec_cnt += i+real_len;
if (len_left == 0){
return true;
}else{
return Parse(buffer);
}
}else{
buffer.remove(i);//remove the header
data = "";
indata = indata.substr(i+real_len);
lastrecv[cs_id] = *this;
RTMPStream::rec_cnt += i+real_len;
return true;
}
}//Parse
/// Does the handshake. Expects handshake_in to be filled, and fills handshake_out. /// Does the handshake. Expects handshake_in to be filled, and fills handshake_out.
/// After calling this function, don't forget to read and ignore 1536 extra bytes, /// After calling this function, don't forget to read and ignore 1536 extra bytes,

View file

@ -8,6 +8,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <string> #include <string>
#include <arpa/inet.h> #include <arpa/inet.h>
#include "socket.h"
//forward declaration of FLV::Tag to avoid circular dependencies. //forward declaration of FLV::Tag to avoid circular dependencies.
namespace FLV{ namespace FLV{
@ -17,9 +18,6 @@ namespace FLV{
/// Contains all functions and classes needed for RTMP connections. /// Contains all functions and classes needed for RTMP connections.
namespace RTMPStream{ namespace RTMPStream{
/// Gets the current system time in milliseconds.
unsigned int getNowMS();
extern unsigned int chunk_rec_max; ///< Maximum size for a received chunk. extern unsigned int chunk_rec_max; ///< Maximum size for a received chunk.
extern unsigned int chunk_snd_max; ///< Maximum size for a sent chunk. extern unsigned int chunk_snd_max; ///< Maximum size for a sent chunk.
extern unsigned int rec_window_size; ///< Window size for receiving. extern unsigned int rec_window_size; ///< Window size for receiving.
@ -46,6 +44,7 @@ namespace RTMPStream{
Chunk(); Chunk();
bool Parse(std::string & data); bool Parse(std::string & data);
bool Parse(Socket::Buffer & data);
std::string & Pack(); std::string & Pack();
private: private:

View file

@ -3,6 +3,7 @@
/// Written by Jaron Vietor in 2010 for DDVTech /// Written by Jaron Vietor in 2010 for DDVTech
#include "socket.h" #include "socket.h"
#include "timing.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <poll.h> #include <poll.h>
#include <netdb.h> #include <netdb.h>
@ -15,19 +16,13 @@
#define BUFFER_BLOCKSIZE 4096 //set buffer blocksize to 4KiB #define BUFFER_BLOCKSIZE 4096 //set buffer blocksize to 4KiB
#include <iostream>//temporary for debugging #include <iostream>//temporary for debugging
std::string uint2string(unsigned int i){ std::string uint2string(unsigned int i){
std::stringstream st; std::stringstream st;
st << i; st << i;
return st.str(); return st.str();
} }
void ms_sleep(int ms){
struct timespec T;
T.tv_sec = ms/1000;
T.tv_nsec = 1000*(ms%1000);
nanosleep(&T, 0);
}
/// Returns the amount of elements in the internal std::deque of std::string objects. /// Returns the amount of elements in the internal std::deque of std::string objects.
/// The back is popped as long as it is empty, first - this way this function is /// The back is popped as long as it is empty, first - this way this function is
/// guaranteed to return 0 if the buffer is empty. /// guaranteed to return 0 if the buffer is empty.
@ -132,7 +127,7 @@ Socket::Connection::Connection(int sockNo){
pipes[1] = -1; pipes[1] = -1;
up = 0; up = 0;
down = 0; down = 0;
conntime = time(0); conntime = Util::epoch();
Error = false; Error = false;
Blocking = false; Blocking = false;
}//Socket::Connection basic constructor }//Socket::Connection basic constructor
@ -146,7 +141,7 @@ Socket::Connection::Connection(int write, int read){
pipes[1] = read; pipes[1] = read;
up = 0; up = 0;
down = 0; down = 0;
conntime = time(0); conntime = Util::epoch();
Error = false; Error = false;
Blocking = false; Blocking = false;
}//Socket::Connection basic constructor }//Socket::Connection basic constructor
@ -159,7 +154,7 @@ Socket::Connection::Connection(){
pipes[1] = -1; pipes[1] = -1;
up = 0; up = 0;
down = 0; down = 0;
conntime = time(0); conntime = Util::epoch();
Error = false; Error = false;
Blocking = false; Blocking = false;
}//Socket::Connection basic constructor }//Socket::Connection basic constructor
@ -231,7 +226,7 @@ Socket::Connection::Connection(std::string address, bool nonblock){
Blocking = false; Blocking = false;
up = 0; up = 0;
down = 0; down = 0;
conntime = time(0); conntime = Util::epoch();
sockaddr_un addr; sockaddr_un addr;
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, address.c_str(), address.size()+1); strncpy(addr.sun_path, address.c_str(), address.size()+1);
@ -260,7 +255,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){
Blocking = false; Blocking = false;
up = 0; up = 0;
down = 0; down = 0;
conntime = time(0); conntime = Util::epoch();
std::stringstream ss; std::stringstream ss;
ss << port; ss << port;
@ -325,7 +320,7 @@ unsigned int Socket::Connection::dataDown(){
/// Returns a std::string of stats, ended by a newline. /// Returns a std::string of stats, ended by a newline.
/// Requires the current connector name as an argument. /// Requires the current connector name as an argument.
std::string Socket::Connection::getStats(std::string C){ std::string Socket::Connection::getStats(std::string C){
return getHost() + " " + C + " " + uint2string(time(0) - conntime) + " " + uint2string(up) + " " + uint2string(down) + "\n"; return "S " + getHost() + " " + C + " " + uint2string(Util::epoch() - conntime) + " " + uint2string(up) + " " + uint2string(down) + "\n";
} }
/// Updates the downbuffer and upbuffer internal variables. /// Updates the downbuffer and upbuffer internal variables.
@ -347,7 +342,7 @@ bool Socket::Connection::spool(){
bool Socket::Connection::flush(){ bool Socket::Connection::flush(){
while (upbuffer.size() > 0 && connected()){ while (upbuffer.size() > 0 && connected()){
if (!iwrite(upbuffer.get())){ if (!iwrite(upbuffer.get())){
ms_sleep(10);//sleep 10ms Util::sleep(10);//sleep 10ms
} }
} }
/// \todo Provide better mechanism to prevent overbuffering. /// \todo Provide better mechanism to prevent overbuffering.
@ -370,7 +365,7 @@ Socket::Buffer & Socket::Connection::Received(){
void Socket::Connection::SendNow(const char * data, size_t len){ void Socket::Connection::SendNow(const char * data, size_t len){
while (upbuffer.size() > 0 && connected()){ while (upbuffer.size() > 0 && connected()){
if (!iwrite(upbuffer.get())){ if (!iwrite(upbuffer.get())){
ms_sleep(1);//sleep 1ms if buffer full Util::sleep(1);//sleep 1ms if buffer full
} }
} }
int i = iwrite(data, len); int i = iwrite(data, len);
@ -379,7 +374,7 @@ void Socket::Connection::SendNow(const char * data, size_t len){
if (j > 0){ if (j > 0){
i += j; i += j;
}else{ }else{
ms_sleep(1);//sleep 1ms and retry Util::sleep(1);//sleep 1ms and retry
} }
} }
} }

View file

@ -44,7 +44,7 @@ namespace Socket{
std::string remotehost; ///< Stores remote host address. std::string remotehost; ///< Stores remote host address.
unsigned int up; unsigned int up;
unsigned int down; unsigned int down;
unsigned int conntime; long long int conntime;
Buffer downbuffer; ///< Stores temporary data coming in. Buffer downbuffer; ///< Stores temporary data coming in.
Buffer upbuffer; ///< Stores temporary data going out. Buffer upbuffer; ///< Stores temporary data going out.
int iread(void * buffer, int len); ///< Incremental read call. int iread(void * buffer, int len); ///< Incremental read call.

27
lib/timing.cpp Normal file
View file

@ -0,0 +1,27 @@
/// \file time.cpp
/// Utilities for handling time and timestamps.
#include "timing.h"
#include <sys/time.h>//for gettimeofday
#include <time.h>//for time and nanosleep
/// Sleeps for the indicated amount of milliseconds or longer.
void Util::sleep(int ms){
struct timespec T;
T.tv_sec = ms/1000;
T.tv_nsec = 1000*(ms%1000);
nanosleep(&T, 0);
}
/// Gets the current time in milliseconds.
long long int Util::getMS(){
/// \todo Possibly change to use clock_gettime - needs -lrt though...
timeval t;
gettimeofday(&t, 0);
return t.tv_sec * 1000 + t.tv_usec/1000;
}
/// Gets the amount of seconds since 01/01/1970.
long long int Util::epoch(){
return time(0);
}

10
lib/timing.h Normal file
View file

@ -0,0 +1,10 @@
/// \file time.h
/// Utilities for handling time and timestamps.
#pragma once
namespace Util{
void sleep(int ms); ///< Sleeps for the indicated amount of milliseconds or longer.
long long int getMS(); ///< Gets the current time in milliseconds.
long long int epoch(); ///< Gets the amount of seconds since 01/01/1970.
};