diff --git a/Buffer/main.cpp b/Buffer/main.cpp index dbfa9b94..4dcdfd27 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -13,8 +13,6 @@ #include "../util/flv_tag.h" //FLV format parser #include "../util/socket.h" //Socket lib -#include - /// Holds all code unique to the Buffer. namespace Buffer{ @@ -137,9 +135,16 @@ namespace Buffer{ //then check and parse the commandline if (argc < 3) { - std::cout << "usage: " << argv[0] << " buffers_count streamname" << std::endl; + std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl; return 1; } + std::string waiting_ip = ""; + bool ip_waiting = false; + int ip_input = -1; + if (argc >= 4){ + waiting_ip += argv[3]; + ip_waiting = true; + } std::string shared_socket = "/tmp/shared_socket_"; shared_socket += argv[2]; @@ -156,26 +161,23 @@ namespace Buffer{ int lastproper = 0;//last properly finished buffer number unsigned int loopcount = 0; Socket::Connection incoming; + Socket::Connection std_input(fileno(stdin)); unsigned char packtype; bool gotVideoInfo = false; bool gotAudioInfo = false; + bool gotData = false; - int infile = fileno(stdin);//get file number for stdin - - //add stdin to an epoll - int poller = epoll_create(1); - struct epoll_event ev; - ev.events = EPOLLIN; - ev.data.fd = infile; - epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev); - struct epoll_event events[1]; - - - while(!feof(stdin) && !FLV::Parse_Error){ + while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){ //invalidate the current buffer ringbuf[current_buffer]->number = -1; - if ((epoll_wait(poller, events, 1, 10) > 0) && ringbuf[current_buffer]->FLV.FileLoader(stdin)){ + if ( + (!ip_waiting && + (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin) + ) || (ip_waiting && (ip_input > -1) && + ringbuf[current_buffer]->FLV.SockLoader(ip_input) + ) + ){ loopcount++; packtype = ringbuf[current_buffer]->FLV.data[0]; //store metadata, if available @@ -230,17 +232,19 @@ namespace Buffer{ users.back().MyBuffer = lastproper; users.back().MyBuffer_num = -1; /// \todo Do this more nicely? - if (!users.back().S.write(FLV::Header, 13)){ - users.back().Disconnect("failed to receive the header!"); - }else{ - if (!users.back().S.write(metadata.data, metadata.len)){ - users.back().Disconnect("failed to receive metadata!"); - } - if (!users.back().S.write(audio_init.data, audio_init.len)){ - users.back().Disconnect("failed to receive audio init!"); - } - if (!users.back().S.write(video_init.data, video_init.len)){ - users.back().Disconnect("failed to receive video init!"); + if (gotData){ + if (!users.back().S.write(FLV::Header, 13)){ + users.back().Disconnect("failed to receive the header!"); + }else{ + if (!users.back().S.write(metadata.data, metadata.len)){ + users.back().Disconnect("failed to receive metadata!"); + } + if (!users.back().S.write(audio_init.data, audio_init.len)){ + users.back().Disconnect("failed to receive audio init!"); + } + if (!users.back().S.write(video_init.data, video_init.len)){ + users.back().Disconnect("failed to receive video init!"); + } } } } @@ -251,6 +255,23 @@ namespace Buffer{ if (!(*usersIt).S.connected()){ users.erase(usersIt); break; }else{ + if (!gotData && ip_waiting){ + if ((*usersIt).S.canRead()){ + std::string tmp = ""; + char charbuf; + while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){ + tmp += charbuf; + } + if (tmp != ""){ + std::cout << "Push attempt from IP " << tmp << std::endl; + if (tmp == waiting_ip){ + std::cout << "Push accepted!" << std::endl; + }else{ + std::cout << "Push denied!" << std::endl; + } + } + } + } (*usersIt).Send(ringbuf, buffers); } } diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 3b4ceb43..07432279 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -25,6 +25,7 @@ namespace Connector_RTMP{ bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled. Socket::Connection Socket; ///< Socket connected to user + Socket::Connection SS; ///< Socket connected to server std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened void parseChunk(); int Connector_RTMP(Socket::Connection conn); @@ -34,7 +35,6 @@ namespace Connector_RTMP{ /// Main Connector_RTMP function int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket = conn; - Socket::Connection SS; FLV::Tag tag, viddata, auddata; bool viddone = false, auddone = false; @@ -168,6 +168,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ void Connector_RTMP::parseChunk(){ static RTMPStream::Chunk next; static std::string inbuffer; + FLV::Tag F; static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER); static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); @@ -243,11 +244,19 @@ void Connector_RTMP::parseChunk(){ #if DEBUG >= 4 fprintf(stderr, "Received audio data\n"); #endif + F.ChunkLoader(next); + if (SS.connected()){ + SS.write(std::string(F.data, F.len)); + } break; case 9: #if DEBUG >= 4 fprintf(stderr, "Received video data\n"); #endif + F.ChunkLoader(next); + if (SS.connected()){ + SS.write(std::string(F.data, F.len)); + } break; case 15: #if DEBUG >= 4 @@ -352,6 +361,51 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); parsed3 = true; }//getStreamLength + if ((amfdata.getContentP(0)->StrValue() == "publish")){ + if (amfdata.getContentP(3)){ + streamname = amfdata.getContentP(3)->StrValue(); + bool stoptokens = false; + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (*i == '?'){stoptokens = true;} + if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);} + } + streamname = "/tmp/shared_socket_" + streamname; + SS = Socket::Connection(streamname); + if (!SS.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close();//disconnect user + break; + } + } + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a status reply + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); + parsed3 = true; + }//getStreamLength if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -421,6 +475,10 @@ void Connector_RTMP::parseChunk(){ #if DEBUG >= 4 fprintf(stderr, "Received AFM0 data message (metadata)\n"); #endif + F.ChunkLoader(next); + if (SS.connected()){ + SS.write(std::string(F.data, F.len)); + } break; case 19: #if DEBUG >= 4 @@ -441,12 +499,16 @@ void Connector_RTMP::parseChunk(){ fprintf(stderr, "Object encoding set to %e\n", objencoding); #if DEBUG >= 4 int tmpint; - tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} - if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} - tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} - if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + if (amfdata.getContentP(2)->getContentP("videoCodecs")){ + tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} + if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} + } + if (amfdata.getContentP(2)->getContentP("audioCodecs")){ + tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} + if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + } #endif RTMPStream::chunk_snd_max = 4096; Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) @@ -508,6 +570,52 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); parsed = true; }//getStreamLength + if ((amfdata.getContentP(0)->StrValue() == "publish")){ + if (amfdata.getContentP(3)){ + streamname = amfdata.getContentP(3)->StrValue(); + bool stoptokens = false; + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (*i == '?'){stoptokens = true;} + if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);} + } + streamname = "/tmp/shared_socket_" + streamname; + SS = Socket::Connection(streamname); + if (!SS.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close();//disconnect user + break; + } + SS.write(Socket.getHost()+'\n'); + } + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a status reply + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack())); + parsed = true; + }//getStreamLength if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); diff --git a/util/socket.cpp b/util/socket.cpp index ecb1dab5..b0046d02 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -3,6 +3,11 @@ /// Written by Jaron Vietor in 2010 for DDVTech #include "socket.h" +#include + +#ifdef __FreeBSD__ +#include +#endif /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// \param sockNo Integer representing the socket to convert. @@ -69,6 +74,27 @@ Socket::Connection::Connection(std::string address, bool nonblock){ } }//Socket::Connection Unix Contructor +/// Calls poll() on the socket, checking if data is available. +/// This function may return true even if there is no data, but never returns false when there is. +bool Socket::Connection::canRead(){ + struct pollfd PFD; + PFD.fd = sock; + PFD.events = POLLIN; + PFD.revents = 0; + poll(&PFD, 1, 5); + return (PFD.revents & POLLIN) == POLLIN; +} +/// Calls poll() on the socket, checking if data can be written. +bool Socket::Connection::canWrite(){ + struct pollfd PFD; + PFD.fd = sock; + PFD.events = POLLOUT; + PFD.revents = 0; + poll(&PFD, 1, 5); + return (PFD.revents & POLLOUT) == POLLOUT; +} + + /// Returns the ready-state for this socket. /// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise. signed int Socket::Connection::ready(){ diff --git a/util/socket.h b/util/socket.h index 9aa754be..57aca183 100644 --- a/util/socket.h +++ b/util/socket.h @@ -27,6 +27,8 @@ namespace Socket{ Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. + bool canRead(); ///< Calls poll() on the socket, checking if data is available. + bool canWrite(); ///< Calls poll() on the socket, checking if data can be written. bool Error; ///< Set to true if a socket error happened. bool Blocking; ///< Set to true if a socket is currently or wants to be blocking. signed int ready(); ///< Returns the ready-state for this socket.