Removing epoll in favor of more cross-platform poll - also adding RTMP push support and Buffer push support with IP security

This commit is contained in:
Thulinma 2011-09-22 06:56:39 +02:00
parent 880b19f465
commit f45d02124a
4 changed files with 191 additions and 34 deletions

View file

@ -13,8 +13,6 @@
#include "../util/flv_tag.h" //FLV format parser
#include "../util/socket.h" //Socket lib
#include <sys/epoll.h>
/// Holds all code unique to the Buffer.
namespace Buffer{
@ -137,9 +135,16 @@ namespace Buffer{
//then check and parse the commandline
if (argc < 3) {
std::cout << "usage: " << argv[0] << " buffers_count streamname" << std::endl;
std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl;
return 1;
}
std::string waiting_ip = "";
bool ip_waiting = false;
int ip_input = -1;
if (argc >= 4){
waiting_ip += argv[3];
ip_waiting = true;
}
std::string shared_socket = "/tmp/shared_socket_";
shared_socket += argv[2];
@ -156,26 +161,23 @@ namespace Buffer{
int lastproper = 0;//last properly finished buffer number
unsigned int loopcount = 0;
Socket::Connection incoming;
Socket::Connection std_input(fileno(stdin));
unsigned char packtype;
bool gotVideoInfo = false;
bool gotAudioInfo = false;
bool gotData = false;
int infile = fileno(stdin);//get file number for stdin
//add stdin to an epoll
int poller = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = infile;
epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev);
struct epoll_event events[1];
while(!feof(stdin) && !FLV::Parse_Error){
while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){
//invalidate the current buffer
ringbuf[current_buffer]->number = -1;
if ((epoll_wait(poller, events, 1, 10) > 0) && ringbuf[current_buffer]->FLV.FileLoader(stdin)){
if (
(!ip_waiting &&
(std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin)
) || (ip_waiting && (ip_input > -1) &&
ringbuf[current_buffer]->FLV.SockLoader(ip_input)
)
){
loopcount++;
packtype = ringbuf[current_buffer]->FLV.data[0];
//store metadata, if available
@ -230,17 +232,19 @@ namespace Buffer{
users.back().MyBuffer = lastproper;
users.back().MyBuffer_num = -1;
/// \todo Do this more nicely?
if (!users.back().S.write(FLV::Header, 13)){
users.back().Disconnect("failed to receive the header!");
}else{
if (!users.back().S.write(metadata.data, metadata.len)){
users.back().Disconnect("failed to receive metadata!");
}
if (!users.back().S.write(audio_init.data, audio_init.len)){
users.back().Disconnect("failed to receive audio init!");
}
if (!users.back().S.write(video_init.data, video_init.len)){
users.back().Disconnect("failed to receive video init!");
if (gotData){
if (!users.back().S.write(FLV::Header, 13)){
users.back().Disconnect("failed to receive the header!");
}else{
if (!users.back().S.write(metadata.data, metadata.len)){
users.back().Disconnect("failed to receive metadata!");
}
if (!users.back().S.write(audio_init.data, audio_init.len)){
users.back().Disconnect("failed to receive audio init!");
}
if (!users.back().S.write(video_init.data, video_init.len)){
users.back().Disconnect("failed to receive video init!");
}
}
}
}
@ -251,6 +255,23 @@ namespace Buffer{
if (!(*usersIt).S.connected()){
users.erase(usersIt); break;
}else{
if (!gotData && ip_waiting){
if ((*usersIt).S.canRead()){
std::string tmp = "";
char charbuf;
while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
tmp += charbuf;
}
if (tmp != ""){
std::cout << "Push attempt from IP " << tmp << std::endl;
if (tmp == waiting_ip){
std::cout << "Push accepted!" << std::endl;
}else{
std::cout << "Push denied!" << std::endl;
}
}
}
}
(*usersIt).Send(ringbuf, buffers);
}
}

View file

@ -25,6 +25,7 @@ namespace Connector_RTMP{
bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
Socket::Connection Socket; ///< Socket connected to user
Socket::Connection SS; ///< Socket connected to server
std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened
void parseChunk();
int Connector_RTMP(Socket::Connection conn);
@ -34,7 +35,6 @@ namespace Connector_RTMP{
/// Main Connector_RTMP function
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
Socket = conn;
Socket::Connection SS;
FLV::Tag tag, viddata, auddata;
bool viddone = false, auddone = false;
@ -168,6 +168,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
void Connector_RTMP::parseChunk(){
static RTMPStream::Chunk next;
static std::string inbuffer;
FLV::Tag F;
static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER);
static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
@ -243,11 +244,19 @@ void Connector_RTMP::parseChunk(){
#if DEBUG >= 4
fprintf(stderr, "Received audio data\n");
#endif
F.ChunkLoader(next);
if (SS.connected()){
SS.write(std::string(F.data, F.len));
}
break;
case 9:
#if DEBUG >= 4
fprintf(stderr, "Received video data\n");
#endif
F.ChunkLoader(next);
if (SS.connected()){
SS.write(std::string(F.data, F.len));
}
break;
case 15:
#if DEBUG >= 4
@ -352,6 +361,51 @@ void Connector_RTMP::parseChunk(){
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
parsed3 = true;
}//getStreamLength
if ((amfdata.getContentP(0)->StrValue() == "publish")){
if (amfdata.getContentP(3)){
streamname = amfdata.getContentP(3)->StrValue();
bool stoptokens = false;
for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
if (*i == '?'){stoptokens = true;}
if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);}
}
streamname = "/tmp/shared_socket_" + streamname;
SS = Socket::Connection(streamname);
if (!SS.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
Socket.close();//disconnect user
break;
}
}
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
parsed3 = true;
}//getStreamLength
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
@ -421,6 +475,10 @@ void Connector_RTMP::parseChunk(){
#if DEBUG >= 4
fprintf(stderr, "Received AFM0 data message (metadata)\n");
#endif
F.ChunkLoader(next);
if (SS.connected()){
SS.write(std::string(F.data, F.len));
}
break;
case 19:
#if DEBUG >= 4
@ -441,12 +499,16 @@ void Connector_RTMP::parseChunk(){
fprintf(stderr, "Object encoding set to %e\n", objencoding);
#if DEBUG >= 4
int tmpint;
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
if (amfdata.getContentP(2)->getContentP("videoCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
}
if (amfdata.getContentP(2)->getContentP("audioCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
}
#endif
RTMPStream::chunk_snd_max = 4096;
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
@ -508,6 +570,52 @@ void Connector_RTMP::parseChunk(){
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
parsed = true;
}//getStreamLength
if ((amfdata.getContentP(0)->StrValue() == "publish")){
if (amfdata.getContentP(3)){
streamname = amfdata.getContentP(3)->StrValue();
bool stoptokens = false;
for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
if (*i == '?'){stoptokens = true;}
if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);}
}
streamname = "/tmp/shared_socket_" + streamname;
SS = Socket::Connection(streamname);
if (!SS.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
Socket.close();//disconnect user
break;
}
SS.write(Socket.getHost()+'\n');
}
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()));
parsed = true;
}//getStreamLength
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);

View file

@ -3,6 +3,11 @@
/// Written by Jaron Vietor in 2010 for DDVTech
#include "socket.h"
#include <poll.h>
#ifdef __FreeBSD__
#include <netinet/in.h>
#endif
/// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection.
/// \param sockNo Integer representing the socket to convert.
@ -69,6 +74,27 @@ Socket::Connection::Connection(std::string address, bool nonblock){
}
}//Socket::Connection Unix Contructor
/// Calls poll() on the socket, checking if data is available.
/// This function may return true even if there is no data, but never returns false when there is.
bool Socket::Connection::canRead(){
struct pollfd PFD;
PFD.fd = sock;
PFD.events = POLLIN;
PFD.revents = 0;
poll(&PFD, 1, 5);
return (PFD.revents & POLLIN) == POLLIN;
}
/// Calls poll() on the socket, checking if data can be written.
bool Socket::Connection::canWrite(){
struct pollfd PFD;
PFD.fd = sock;
PFD.events = POLLOUT;
PFD.revents = 0;
poll(&PFD, 1, 5);
return (PFD.revents & POLLOUT) == POLLOUT;
}
/// Returns the ready-state for this socket.
/// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise.
signed int Socket::Connection::ready(){

View file

@ -27,6 +27,8 @@ namespace Socket{
Connection(); ///< Create a new disconnected base socket.
Connection(int sockNo); ///< Create a new base socket.
Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket.
bool canRead(); ///< Calls poll() on the socket, checking if data is available.
bool canWrite(); ///< Calls poll() on the socket, checking if data can be written.
bool Error; ///< Set to true if a socket error happened.
bool Blocking; ///< Set to true if a socket is currently or wants to be blocking.
signed int ready(); ///< Returns the ready-state for this socket.