From f45d02124a2c65a28b84b2ca2c8e1459d9616369 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 22 Sep 2011 06:56:39 +0200 Subject: [PATCH 1/6] Removing epoll in favor of more cross-platform poll - also adding RTMP push support and Buffer push support with IP security --- Buffer/main.cpp | 75 +++++++++++++++--------- Connector_RTMP/main.cpp | 122 +++++++++++++++++++++++++++++++++++++--- util/socket.cpp | 26 +++++++++ util/socket.h | 2 + 4 files changed, 191 insertions(+), 34 deletions(-) diff --git a/Buffer/main.cpp b/Buffer/main.cpp index dbfa9b94..4dcdfd27 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -13,8 +13,6 @@ #include "../util/flv_tag.h" //FLV format parser #include "../util/socket.h" //Socket lib -#include - /// Holds all code unique to the Buffer. namespace Buffer{ @@ -137,9 +135,16 @@ namespace Buffer{ //then check and parse the commandline if (argc < 3) { - std::cout << "usage: " << argv[0] << " buffers_count streamname" << std::endl; + std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl; return 1; } + std::string waiting_ip = ""; + bool ip_waiting = false; + int ip_input = -1; + if (argc >= 4){ + waiting_ip += argv[3]; + ip_waiting = true; + } std::string shared_socket = "/tmp/shared_socket_"; shared_socket += argv[2]; @@ -156,26 +161,23 @@ namespace Buffer{ int lastproper = 0;//last properly finished buffer number unsigned int loopcount = 0; Socket::Connection incoming; + Socket::Connection std_input(fileno(stdin)); unsigned char packtype; bool gotVideoInfo = false; bool gotAudioInfo = false; + bool gotData = false; - int infile = fileno(stdin);//get file number for stdin - - //add stdin to an epoll - int poller = epoll_create(1); - struct epoll_event ev; - ev.events = EPOLLIN; - ev.data.fd = infile; - epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev); - struct epoll_event events[1]; - - - while(!feof(stdin) && !FLV::Parse_Error){ + while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){ //invalidate the current buffer ringbuf[current_buffer]->number = -1; - if ((epoll_wait(poller, events, 1, 10) > 0) && ringbuf[current_buffer]->FLV.FileLoader(stdin)){ + if ( + (!ip_waiting && + (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin) + ) || (ip_waiting && (ip_input > -1) && + ringbuf[current_buffer]->FLV.SockLoader(ip_input) + ) + ){ loopcount++; packtype = ringbuf[current_buffer]->FLV.data[0]; //store metadata, if available @@ -230,17 +232,19 @@ namespace Buffer{ users.back().MyBuffer = lastproper; users.back().MyBuffer_num = -1; /// \todo Do this more nicely? - if (!users.back().S.write(FLV::Header, 13)){ - users.back().Disconnect("failed to receive the header!"); - }else{ - if (!users.back().S.write(metadata.data, metadata.len)){ - users.back().Disconnect("failed to receive metadata!"); - } - if (!users.back().S.write(audio_init.data, audio_init.len)){ - users.back().Disconnect("failed to receive audio init!"); - } - if (!users.back().S.write(video_init.data, video_init.len)){ - users.back().Disconnect("failed to receive video init!"); + if (gotData){ + if (!users.back().S.write(FLV::Header, 13)){ + users.back().Disconnect("failed to receive the header!"); + }else{ + if (!users.back().S.write(metadata.data, metadata.len)){ + users.back().Disconnect("failed to receive metadata!"); + } + if (!users.back().S.write(audio_init.data, audio_init.len)){ + users.back().Disconnect("failed to receive audio init!"); + } + if (!users.back().S.write(video_init.data, video_init.len)){ + users.back().Disconnect("failed to receive video init!"); + } } } } @@ -251,6 +255,23 @@ namespace Buffer{ if (!(*usersIt).S.connected()){ users.erase(usersIt); break; }else{ + if (!gotData && ip_waiting){ + if ((*usersIt).S.canRead()){ + std::string tmp = ""; + char charbuf; + while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){ + tmp += charbuf; + } + if (tmp != ""){ + std::cout << "Push attempt from IP " << tmp << std::endl; + if (tmp == waiting_ip){ + std::cout << "Push accepted!" << std::endl; + }else{ + std::cout << "Push denied!" << std::endl; + } + } + } + } (*usersIt).Send(ringbuf, buffers); } } diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 3b4ceb43..07432279 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -25,6 +25,7 @@ namespace Connector_RTMP{ bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled. Socket::Connection Socket; ///< Socket connected to user + Socket::Connection SS; ///< Socket connected to server std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened void parseChunk(); int Connector_RTMP(Socket::Connection conn); @@ -34,7 +35,6 @@ namespace Connector_RTMP{ /// Main Connector_RTMP function int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket = conn; - Socket::Connection SS; FLV::Tag tag, viddata, auddata; bool viddone = false, auddone = false; @@ -168,6 +168,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ void Connector_RTMP::parseChunk(){ static RTMPStream::Chunk next; static std::string inbuffer; + FLV::Tag F; static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER); static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); @@ -243,11 +244,19 @@ void Connector_RTMP::parseChunk(){ #if DEBUG >= 4 fprintf(stderr, "Received audio data\n"); #endif + F.ChunkLoader(next); + if (SS.connected()){ + SS.write(std::string(F.data, F.len)); + } break; case 9: #if DEBUG >= 4 fprintf(stderr, "Received video data\n"); #endif + F.ChunkLoader(next); + if (SS.connected()){ + SS.write(std::string(F.data, F.len)); + } break; case 15: #if DEBUG >= 4 @@ -352,6 +361,51 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); parsed3 = true; }//getStreamLength + if ((amfdata.getContentP(0)->StrValue() == "publish")){ + if (amfdata.getContentP(3)){ + streamname = amfdata.getContentP(3)->StrValue(); + bool stoptokens = false; + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (*i == '?'){stoptokens = true;} + if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);} + } + streamname = "/tmp/shared_socket_" + streamname; + SS = Socket::Connection(streamname); + if (!SS.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close();//disconnect user + break; + } + } + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a status reply + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); + parsed3 = true; + }//getStreamLength if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -421,6 +475,10 @@ void Connector_RTMP::parseChunk(){ #if DEBUG >= 4 fprintf(stderr, "Received AFM0 data message (metadata)\n"); #endif + F.ChunkLoader(next); + if (SS.connected()){ + SS.write(std::string(F.data, F.len)); + } break; case 19: #if DEBUG >= 4 @@ -441,12 +499,16 @@ void Connector_RTMP::parseChunk(){ fprintf(stderr, "Object encoding set to %e\n", objencoding); #if DEBUG >= 4 int tmpint; - tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} - if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} - tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} - if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + if (amfdata.getContentP(2)->getContentP("videoCodecs")){ + tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} + if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} + } + if (amfdata.getContentP(2)->getContentP("audioCodecs")){ + tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} + if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + } #endif RTMPStream::chunk_snd_max = 4096; Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) @@ -508,6 +570,52 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); parsed = true; }//getStreamLength + if ((amfdata.getContentP(0)->StrValue() == "publish")){ + if (amfdata.getContentP(3)){ + streamname = amfdata.getContentP(3)->StrValue(); + bool stoptokens = false; + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (*i == '?'){stoptokens = true;} + if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);} + } + streamname = "/tmp/shared_socket_" + streamname; + SS = Socket::Connection(streamname); + if (!SS.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close();//disconnect user + break; + } + SS.write(Socket.getHost()+'\n'); + } + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a status reply + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack())); + parsed = true; + }//getStreamLength if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); diff --git a/util/socket.cpp b/util/socket.cpp index ecb1dab5..b0046d02 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -3,6 +3,11 @@ /// Written by Jaron Vietor in 2010 for DDVTech #include "socket.h" +#include + +#ifdef __FreeBSD__ +#include +#endif /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// \param sockNo Integer representing the socket to convert. @@ -69,6 +74,27 @@ Socket::Connection::Connection(std::string address, bool nonblock){ } }//Socket::Connection Unix Contructor +/// Calls poll() on the socket, checking if data is available. +/// This function may return true even if there is no data, but never returns false when there is. +bool Socket::Connection::canRead(){ + struct pollfd PFD; + PFD.fd = sock; + PFD.events = POLLIN; + PFD.revents = 0; + poll(&PFD, 1, 5); + return (PFD.revents & POLLIN) == POLLIN; +} +/// Calls poll() on the socket, checking if data can be written. +bool Socket::Connection::canWrite(){ + struct pollfd PFD; + PFD.fd = sock; + PFD.events = POLLOUT; + PFD.revents = 0; + poll(&PFD, 1, 5); + return (PFD.revents & POLLOUT) == POLLOUT; +} + + /// Returns the ready-state for this socket. /// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise. signed int Socket::Connection::ready(){ diff --git a/util/socket.h b/util/socket.h index 9aa754be..57aca183 100644 --- a/util/socket.h +++ b/util/socket.h @@ -27,6 +27,8 @@ namespace Socket{ Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. + bool canRead(); ///< Calls poll() on the socket, checking if data is available. + bool canWrite(); ///< Calls poll() on the socket, checking if data can be written. bool Error; ///< Set to true if a socket error happened. bool Blocking; ///< Set to true if a socket is currently or wants to be blocking. signed int ready(); ///< Returns the ready-state for this socket. From 1640dbb93300efffa099f8390612c1af0d76e09f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 22 Sep 2011 07:09:43 +0200 Subject: [PATCH 2/6] epoll fully removed - needs testing still --- Buffer/main.cpp | 31 ++++++++++++++++++++++--------- Connector_HTTP/main.cpp | 15 +-------------- Connector_RTMP/main.cpp | 18 ++---------------- 3 files changed, 25 insertions(+), 39 deletions(-) diff --git a/Buffer/main.cpp b/Buffer/main.cpp index 4dcdfd27..bcc0f68f 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -140,7 +140,7 @@ namespace Buffer{ } std::string waiting_ip = ""; bool ip_waiting = false; - int ip_input = -1; + Socket::Connection ip_input; if (argc >= 4){ waiting_ip += argv[3]; ip_waiting = true; @@ -174,7 +174,7 @@ namespace Buffer{ if ( (!ip_waiting && (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin) - ) || (ip_waiting && (ip_input > -1) && + ) || (ip_waiting && (ip_input.connected()) && ringbuf[current_buffer]->FLV.SockLoader(ip_input) ) ){ @@ -217,6 +217,7 @@ namespace Buffer{ lastproper = current_buffer; } } + if (loopcount > 5){gotData = true;} //keep track of buffers ringbuf[current_buffer]->number = loopcount; current_buffer++; @@ -236,14 +237,20 @@ namespace Buffer{ if (!users.back().S.write(FLV::Header, 13)){ users.back().Disconnect("failed to receive the header!"); }else{ - if (!users.back().S.write(metadata.data, metadata.len)){ - users.back().Disconnect("failed to receive metadata!"); + if (metadata.len > 0){ + if (!users.back().S.write(metadata.data, metadata.len)){ + users.back().Disconnect("failed to receive metadata!"); + } } - if (!users.back().S.write(audio_init.data, audio_init.len)){ - users.back().Disconnect("failed to receive audio init!"); + if (audio_init.len > 0){ + if (!users.back().S.write(audio_init.data, audio_init.len)){ + users.back().Disconnect("failed to receive audio init!"); + } } - if (!users.back().S.write(video_init.data, video_init.len)){ - users.back().Disconnect("failed to receive video init!"); + if (video_init.len > 0){ + if (!users.back().S.write(video_init.data, video_init.len)){ + users.back().Disconnect("failed to receive video init!"); + } } } } @@ -265,7 +272,13 @@ namespace Buffer{ if (tmp != ""){ std::cout << "Push attempt from IP " << tmp << std::endl; if (tmp == waiting_ip){ - std::cout << "Push accepted!" << std::endl; + if (!ip_input.connected()){ + std::cout << "Push accepted!" << std::endl; + ip_input = (*usersIt).S; + users.erase(usersIt); break; + }else{ + std::cout << "Push denied - push already in progress!" << std::endl; + } }else{ std::cout << "Push denied!" << std::endl; } diff --git a/Connector_HTTP/main.cpp b/Connector_HTTP/main.cpp index f45ec5e2..b11c7db9 100644 --- a/Connector_HTTP/main.cpp +++ b/Connector_HTTP/main.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include "../util/socket.h" @@ -134,15 +133,6 @@ namespace Connector_HTTP{ bool FlashFirstAudio = false; HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender. - int retval; - int poller = epoll_create(1); - int sspoller = epoll_create(1); - struct epoll_event ev; - ev.events = EPOLLIN; - ev.data.fd = conn.getSocket(); - epoll_ctl(poller, EPOLL_CTL_ADD, conn.getSocket(), &ev); - struct epoll_event events[1]; - std::string Movie = ""; std::string Quality = ""; int Segment = -1; @@ -221,9 +211,6 @@ namespace Connector_HTTP{ conn.close(); break; } - ev.events = EPOLLIN; - ev.data.fd = ss.getSocket(); - epoll_ctl(sspoller, EPOLL_CTL_ADD, ss.getSocket(), &ev); #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif @@ -240,7 +227,7 @@ namespace Connector_HTTP{ fprintf(stderr, "Sending a video fragment. %i left in buffer, %i requested\n", (int)Flash_FragBuffer.size(), Flash_RequestPending); #endif } - retval = epoll_wait(sspoller, events, 1, 1); + ss.canRead(); switch (ss.ready()){ case -1: conn.close(); diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 07432279..b7fba9a0 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include "../util/socket.h" #include "../util/flv_tag.h" @@ -59,20 +58,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ return 0; } - int retval; - int poller = epoll_create(1); - int sspoller = epoll_create(1); - struct epoll_event ev; - ev.events = EPOLLIN; - ev.data.fd = Socket.getSocket(); - epoll_ctl(poller, EPOLL_CTL_ADD, Socket.getSocket(), &ev); - struct epoll_event events[1]; - while (Socket.connected() && !FLV::Parse_Error){ //only parse input if available or not yet init'ed //rightnow = getNowMS(); - retval = epoll_wait(poller, events, 1, 1); - if ((retval > 0) || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size) + if (Socket.canRead() || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size) switch (Socket.ready()){ case -1: break; //disconnected case 0: break; //not ready yet @@ -90,15 +79,12 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket.close();//disconnect user break; } - ev.events = EPOLLIN; - ev.data.fd = SS.getSocket(); - epoll_ctl(sspoller, EPOLL_CTL_ADD, SS.getSocket(), &ev); #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif inited = true; } - retval = epoll_wait(sspoller, events, 1, 1); + SS.canRead(); switch (SS.ready()){ case -1: #if DEBUG >= 1 From 40bcb2ba72db8ded0eb7925f6c4a95b78928fbcb Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 25 Sep 2011 22:44:55 +0200 Subject: [PATCH 3/6] Epoll removal tested and found working! :D --- Connector_HTTP/main.cpp | 18 +++++++++++------- Connector_RTMP/main.cpp | 5 ++++- util/socket.cpp | 24 ++++++++++++++++++++++++ util/socket.h | 2 ++ 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/Connector_HTTP/main.cpp b/Connector_HTTP/main.cpp index b11c7db9..770219b7 100644 --- a/Connector_HTTP/main.cpp +++ b/Connector_HTTP/main.cpp @@ -237,7 +237,7 @@ namespace Connector_HTTP{ break; case 0: break;//not ready yet default: - if (tag.SockLoader(ss)){//able to read a full packet?f + if (tag.SockLoader(ss)){//able to read a full packet? if (handler == HANDLER_FLASH){ if (tag.tagTime() > 0){ if (Flash_StartTime == 0){ @@ -269,14 +269,18 @@ namespace Connector_HTTP{ FlashFirstVideo = true; FlashFirstAudio = true; } - if (FlashFirstVideo && (tag.data[0] == 0x09) && (Video_Init.len > 0)){ - Video_Init.tagTime(tag.tagTime()); - FlashBuf.append(Video_Init.data, Video_Init.len); + if (FlashFirstVideo && (tag.data[0] == 0x09) && (!tag.needsInitData() || (Video_Init.len > 0))){ + if (tag.needsInitData()){ + Video_Init.tagTime(tag.tagTime()); + FlashBuf.append(Video_Init.data, Video_Init.len); + } FlashFirstVideo = false; } - if (FlashFirstAudio && (tag.data[0] == 0x08) && (Audio_Init.len > 0)){ - Audio_Init.tagTime(tag.tagTime()); - FlashBuf.append(Audio_Init.data, Audio_Init.len); + if (FlashFirstAudio && (tag.data[0] == 0x08) && (!tag.needsInitData() || (Audio_Init.len > 0))){ + if (tag.needsInitData()){ + Audio_Init.tagTime(tag.tagTime()); + FlashBuf.append(Audio_Init.data, Audio_Init.len); + } FlashFirstAudio = false; } #if DEBUG >= 5 diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index b7fba9a0..ae256923 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -121,7 +121,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ break; } //not gotten init yet? cancel this tag - if (viddata.len == 0 || auddata.len == 0){break;} + if (tag.needsInitData()){ + if ((tag.data[0] == 0x09) && (viddata.len == 0)){break;} + if ((tag.data[0] == 0x08) && (auddata.len == 0)){break;} + } //send tag normally Socket.write(RTMPStream::SendMedia(tag)); #if DEBUG >= 8 diff --git a/util/socket.cpp b/util/socket.cpp index b0046d02..425d2a70 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -495,3 +495,27 @@ bool Socket::Server::connected(){ /// Returns internal socket number. int Socket::Server::getSocket(){return sock;} + +/// Unescapes URLencoded C-strings to a std::string. +/// This function *will* destroy the incoming data! +std::string Socket::Connection::urlunescape(char *s){ + char *p; + for (p = s; *s != '\0'; ++s){ + if (*s == '%'){ + if (*++s != '\0'){ + *p = unhex(*s) << 4; + } + if (*++s != '\0'){ + *p++ += unhex(*s); + } + } else { + if (*s == '+'){*p++ = ' ';}else{*p++ = *s;} + } + } + *p = '\0'; + return std::string(s); +} + +int Socket::Connection::unhex(char c){ + return( c >= '0' && c <= '9' ? c - '0' : c >= 'A' && c <= 'F' ? c - 'A' + 10 : c - 'a' + 10 ); +} diff --git a/util/socket.h b/util/socket.h index 57aca183..456a162f 100644 --- a/util/socket.h +++ b/util/socket.h @@ -23,6 +23,7 @@ namespace Socket{ private: int sock; ///< Internally saved socket number. std::string remotehost; ///< Stores remote host address. + int unhex(char c); ///< Helper function for urlunescape. public: Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. @@ -44,6 +45,7 @@ namespace Socket{ bool swrite(std::string & buffer); ///< Read call that is compatible with std::string. bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. + std::string urlunescape(char *s); ///< Unescapes URLencoded C-strings to a std::string. void close(); ///< Close connection. std::string getHost(); ///< Gets hostname for connection, if available. int getSocket(); ///< Returns internal socket number. From a5867061891df0dbc9c2c1c02f3cc3e73eebdff3 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 25 Sep 2011 23:18:08 +0200 Subject: [PATCH 4/6] Fixed stream receiving, hopefully... --- Connector_RTMP/main.cpp | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index ae256923..38f39e76 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -353,12 +353,19 @@ void Connector_RTMP::parseChunk(){ if ((amfdata.getContentP(0)->StrValue() == "publish")){ if (amfdata.getContentP(3)){ streamname = amfdata.getContentP(3)->StrValue(); - bool stoptokens = false; - for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ - if (*i == '?'){stoptokens = true;} - if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);} + for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){ + if (*i == '?'){streamname.erase(i, streamname.end()); break;} + if (!isalpha(*i) && !isdigit(*i)){ + streamname.erase(i); + --i; + }else{ + *i=tolower(*i); + } } streamname = "/tmp/shared_socket_" + streamname; + #if DEBUG >= 4 + fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str()); + #endif SS = Socket::Connection(streamname); if (!SS.connected()){ #if DEBUG >= 1 @@ -367,6 +374,10 @@ void Connector_RTMP::parseChunk(){ Socket.close();//disconnect user break; } + SS.write(Socket.getHost()+'\n'); + #if DEBUG >= 4 + fprintf(stderr, "Connected to buffer, starting to sent data...\n"); + #endif } //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -562,12 +573,19 @@ void Connector_RTMP::parseChunk(){ if ((amfdata.getContentP(0)->StrValue() == "publish")){ if (amfdata.getContentP(3)){ streamname = amfdata.getContentP(3)->StrValue(); - bool stoptokens = false; - for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ - if (*i == '?'){stoptokens = true;} - if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);} + for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){ + if (*i == '?'){streamname.erase(i, streamname.end()); break;} + if (!isalpha(*i) && !isdigit(*i)){ + streamname.erase(i); + --i; + }else{ + *i=tolower(*i); + } } streamname = "/tmp/shared_socket_" + streamname; + #if DEBUG >= 4 + fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str()); + #endif SS = Socket::Connection(streamname); if (!SS.connected()){ #if DEBUG >= 1 @@ -577,6 +595,9 @@ void Connector_RTMP::parseChunk(){ break; } SS.write(Socket.getHost()+'\n'); + #if DEBUG >= 4 + fprintf(stderr, "Connected to buffer, starting to send data...\n"); + #endif } //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); From 99dce538dea60e34671898ef7230e9d8a81c9320 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 26 Sep 2011 02:16:12 +0200 Subject: [PATCH 5/6] Added POST var parsing to HTTP lib, added WriteFile to JSON gearbox version, added real parsing of input to JSON gearbox version --- util/http_parser.cpp | 43 ++++++++++++++++++++++++++++++++++++++++++- util/http_parser.h | 2 ++ util/socket.cpp | 24 ------------------------ util/socket.h | 2 -- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/util/http_parser.cpp b/util/http_parser.cpp index ec648a39..9a81c404 100644 --- a/util/http_parser.cpp +++ b/util/http_parser.cpp @@ -188,10 +188,25 @@ bool HTTP::Parser::parse(){ } if (seenHeaders){ if (length > 0){ - /// \todo Include POST variable parsing? if (HTTPbuffer.length() >= length){ body = HTTPbuffer.substr(0, length); HTTPbuffer.erase(0, length); + std::string tmppost = body; + std::string varname; + std::string varval; + while (tmppost.find('=') != std::string::npos){ + size_t found = tmppost.find('='); + varname = urlunescape(tmppost.substr(0, found).c_str()); + tmppost.erase(0, found+1); + size_t found = tmppost.find('&'); + varval = urlunescape(tmppost.substr(0, found).c_str()); + SetVar(varname, varval); + if (found == std::string::npos){ + tmppost.clear(); + }else{ + tmppost.erase(0, found+1); + } + } return true; }else{ return false; @@ -241,3 +256,29 @@ void HTTP::Parser::SendBodyPart(Socket::Connection & conn, std::string bodypart) conn.write(bodypart); } } + +/// Unescapes URLencoded C-strings to a std::string. +/// This function *will* destroy the input data! +std::string HTTP::Parser::urlunescape(char *s){ + char *p; + for (p = s; *s != '\0'; ++s){ + if (*s == '%'){ + if (*++s != '\0'){ + *p = unhex(*s) << 4; + } + if (*++s != '\0'){ + *p++ += unhex(*s); + } + } else { + if (*s == '+'){*p++ = ' ';}else{*p++ = *s;} + } + } + *p = '\0'; + return std::string(s); +} + +/// Helper function for urlunescape. +/// Takes a single char input and outputs its integer hex value. +int HTTP::Parser::unhex(char c){ + return( c >= '0' && c <= '9' ? c - '0' : c >= 'A' && c <= 'F' ? c - 'A' + 10 : c - 'a' + 10 ); +} diff --git a/util/http_parser.h b/util/http_parser.h index d7048eae..542c06e6 100644 --- a/util/http_parser.h +++ b/util/http_parser.h @@ -30,6 +30,7 @@ namespace HTTP{ void SendBodyPart(Socket::Connection & conn, std::string bodypart); void Clean(); bool CleanForNext(); + std::string urlunescape(char *s); ///< Unescapes URLencoded C-strings to a std::string. std::string body; std::string method; std::string url; @@ -43,5 +44,6 @@ namespace HTTP{ std::map headers; std::map vars; void Trim(std::string & s); + int unhex(char c); ///< Helper function for urlunescape. };//HTTP::Parser class };//HTTP namespace diff --git a/util/socket.cpp b/util/socket.cpp index 425d2a70..b0046d02 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -495,27 +495,3 @@ bool Socket::Server::connected(){ /// Returns internal socket number. int Socket::Server::getSocket(){return sock;} - -/// Unescapes URLencoded C-strings to a std::string. -/// This function *will* destroy the incoming data! -std::string Socket::Connection::urlunescape(char *s){ - char *p; - for (p = s; *s != '\0'; ++s){ - if (*s == '%'){ - if (*++s != '\0'){ - *p = unhex(*s) << 4; - } - if (*++s != '\0'){ - *p++ += unhex(*s); - } - } else { - if (*s == '+'){*p++ = ' ';}else{*p++ = *s;} - } - } - *p = '\0'; - return std::string(s); -} - -int Socket::Connection::unhex(char c){ - return( c >= '0' && c <= '9' ? c - '0' : c >= 'A' && c <= 'F' ? c - 'A' + 10 : c - 'a' + 10 ); -} diff --git a/util/socket.h b/util/socket.h index 456a162f..57aca183 100644 --- a/util/socket.h +++ b/util/socket.h @@ -23,7 +23,6 @@ namespace Socket{ private: int sock; ///< Internally saved socket number. std::string remotehost; ///< Stores remote host address. - int unhex(char c); ///< Helper function for urlunescape. public: Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. @@ -45,7 +44,6 @@ namespace Socket{ bool swrite(std::string & buffer); ///< Read call that is compatible with std::string. bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. - std::string urlunescape(char *s); ///< Unescapes URLencoded C-strings to a std::string. void close(); ///< Close connection. std::string getHost(); ///< Gets hostname for connection, if available. int getSocket(); ///< Returns internal socket number. From 6ac7bb61144f96f461de0d988c4169cd7346ca80 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 26 Sep 2011 12:44:15 +0200 Subject: [PATCH 6/6] Added full IPv4 support --- util/http_parser.cpp | 6 +++--- util/socket.cpp | 42 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/util/http_parser.cpp b/util/http_parser.cpp index 9a81c404..274cb3c3 100644 --- a/util/http_parser.cpp +++ b/util/http_parser.cpp @@ -196,10 +196,10 @@ bool HTTP::Parser::parse(){ std::string varval; while (tmppost.find('=') != std::string::npos){ size_t found = tmppost.find('='); - varname = urlunescape(tmppost.substr(0, found).c_str()); + varname = urlunescape((char*)tmppost.substr(0, found).c_str()); tmppost.erase(0, found+1); - size_t found = tmppost.find('&'); - varval = urlunescape(tmppost.substr(0, found).c_str()); + found = tmppost.find('&'); + varval = urlunescape((char*)tmppost.substr(0, found).c_str()); SetVar(varname, varval); if (found == std::string::npos){ tmppost.clear(); diff --git a/util/socket.cpp b/util/socket.cpp index b0046d02..c585f7be 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -374,7 +374,47 @@ Socket::Server::Server(int port, std::string hostname, bool nonblock){ } }else{ #if DEBUG >= 1 - fprintf(stderr, "Binding failed! Error: %s\n", strerror(errno)); + fprintf(stderr, "Binding failed, retrying as IPv4... (%s)\n", strerror(errno)); + #endif + close(); + } + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0){ + #if DEBUG >= 1 + fprintf(stderr, "Could not create socket! Error: %s\n", strerror(errno)); + #endif + return; + } + on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (nonblock){ + int flags = fcntl(sock, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(sock, F_SETFL, flags); + } + struct sockaddr_in addr4; + addr4.sin_family = AF_INET; + addr4.sin_port = htons(port);//set port + if (hostname == "0.0.0.0"){ + addr4.sin_addr.s_addr = INADDR_ANY; + }else{ + inet_pton(AF_INET, hostname.c_str(), &addr4.sin_addr);//set interface, 0.0.0.0 (default) is all + } + ret = bind(sock, (sockaddr*)&addr4, sizeof(addr4));//do the actual bind + if (ret == 0){ + ret = listen(sock, 100);//start listening, backlog of 100 allowed + if (ret == 0){ + return; + }else{ + #if DEBUG >= 1 + fprintf(stderr, "Listen failed! Error: %s\n", strerror(errno)); + #endif + close(); + return; + } + }else{ + #if DEBUG >= 1 + fprintf(stderr, "IPv4 binding also failed, giving up. (%s)\n", strerror(errno)); #endif close(); return;