From f45d02124a2c65a28b84b2ca2c8e1459d9616369 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Thu, 22 Sep 2011 06:56:39 +0200
Subject: [PATCH] Removing epoll in favor of more cross-platform poll - also
 adding RTMP push support and Buffer push support with IP security

---
 Buffer/main.cpp         |  75 +++++++++++++++---------
 Connector_RTMP/main.cpp | 122 +++++++++++++++++++++++++++++++++++++---
 util/socket.cpp         |  26 +++++++++
 util/socket.h           |   2 +
 4 files changed, 191 insertions(+), 34 deletions(-)

diff --git a/Buffer/main.cpp b/Buffer/main.cpp
index dbfa9b94..4dcdfd27 100644
--- a/Buffer/main.cpp
+++ b/Buffer/main.cpp
@@ -13,8 +13,6 @@
 #include "../util/flv_tag.h" //FLV format parser
 #include "../util/socket.h" //Socket lib
 
-#include <sys/epoll.h>
-
 /// Holds all code unique to the Buffer.
 namespace Buffer{
 
@@ -137,9 +135,16 @@ namespace Buffer{
 
     //then check and parse the commandline
     if (argc < 3) {
-      std::cout << "usage: " << argv[0] << " buffers_count streamname" << std::endl;
+      std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl;
       return 1;
     }
+    std::string waiting_ip = "";
+    bool ip_waiting = false;
+    int ip_input = -1;
+    if (argc >= 4){
+      waiting_ip += argv[3];
+      ip_waiting = true;
+    }
     std::string shared_socket = "/tmp/shared_socket_";
     shared_socket += argv[2];
 
@@ -156,26 +161,23 @@ namespace Buffer{
     int lastproper = 0;//last properly finished buffer number
     unsigned int loopcount = 0;
     Socket::Connection incoming;
+    Socket::Connection std_input(fileno(stdin));
 
     unsigned char packtype;
     bool gotVideoInfo = false;
     bool gotAudioInfo = false;
+    bool gotData = false;
 
-    int infile = fileno(stdin);//get file number for stdin
-
-    //add stdin to an epoll
-    int poller = epoll_create(1);
-    struct epoll_event ev;
-    ev.events = EPOLLIN;
-    ev.data.fd = infile;
-    epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev);
-    struct epoll_event events[1];
-
-
-    while(!feof(stdin) && !FLV::Parse_Error){
+    while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){
       //invalidate the current buffer
       ringbuf[current_buffer]->number = -1;
-      if ((epoll_wait(poller, events, 1, 10) > 0) && ringbuf[current_buffer]->FLV.FileLoader(stdin)){
+      if (
+        (!ip_waiting &&
+          (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin)
+        ) || (ip_waiting && (ip_input > -1) &&
+          ringbuf[current_buffer]->FLV.SockLoader(ip_input)
+        )
+      ){
         loopcount++;
         packtype = ringbuf[current_buffer]->FLV.data[0];
         //store metadata, if available
@@ -230,17 +232,19 @@ namespace Buffer{
         users.back().MyBuffer = lastproper;
         users.back().MyBuffer_num = -1;
         /// \todo Do this more nicely?
-        if (!users.back().S.write(FLV::Header, 13)){
-          users.back().Disconnect("failed to receive the header!");
-        }else{
-          if (!users.back().S.write(metadata.data, metadata.len)){
-            users.back().Disconnect("failed to receive metadata!");
-          }
-          if (!users.back().S.write(audio_init.data, audio_init.len)){
-            users.back().Disconnect("failed to receive audio init!");
-          }
-          if (!users.back().S.write(video_init.data, video_init.len)){
-            users.back().Disconnect("failed to receive video init!");
+        if (gotData){
+          if (!users.back().S.write(FLV::Header, 13)){
+            users.back().Disconnect("failed to receive the header!");
+          }else{
+            if (!users.back().S.write(metadata.data, metadata.len)){
+              users.back().Disconnect("failed to receive metadata!");
+            }
+            if (!users.back().S.write(audio_init.data, audio_init.len)){
+              users.back().Disconnect("failed to receive audio init!");
+            }
+            if (!users.back().S.write(video_init.data, video_init.len)){
+              users.back().Disconnect("failed to receive video init!");
+            }
           }
         }
       }
@@ -251,6 +255,23 @@ namespace Buffer{
           if (!(*usersIt).S.connected()){
             users.erase(usersIt); break;
           }else{
+            if (!gotData && ip_waiting){
+              if ((*usersIt).S.canRead()){
+                std::string tmp = "";
+                char charbuf;
+                while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
+                  tmp += charbuf;
+                }
+                if (tmp != ""){
+                  std::cout << "Push attempt from IP " << tmp << std::endl;
+                  if (tmp == waiting_ip){
+                    std::cout << "Push accepted!" << std::endl;
+                  }else{
+                    std::cout << "Push denied!" << std::endl;
+                  }
+                }
+              }
+            }
             (*usersIt).Send(ringbuf, buffers);
           }
         }
diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp
index 3b4ceb43..07432279 100644
--- a/Connector_RTMP/main.cpp
+++ b/Connector_RTMP/main.cpp
@@ -25,6 +25,7 @@ namespace Connector_RTMP{
   bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
 
   Socket::Connection Socket; ///< Socket connected to user
+  Socket::Connection SS; ///< Socket connected to server
   std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened
   void parseChunk();
   int Connector_RTMP(Socket::Connection conn);
@@ -34,7 +35,6 @@ namespace Connector_RTMP{
 /// Main Connector_RTMP function
 int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
   Socket = conn;
-  Socket::Connection SS;
   FLV::Tag tag, viddata, auddata;
   bool viddone = false, auddone = false;
 
@@ -168,6 +168,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
 void Connector_RTMP::parseChunk(){
   static RTMPStream::Chunk next;
   static std::string inbuffer;
+  FLV::Tag F;
   static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
   static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER);
   static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
@@ -243,11 +244,19 @@ void Connector_RTMP::parseChunk(){
         #if DEBUG >= 4
         fprintf(stderr, "Received audio data\n");
         #endif
+        F.ChunkLoader(next);
+        if (SS.connected()){
+          SS.write(std::string(F.data, F.len));
+        }
         break;
       case 9:
         #if DEBUG >= 4
         fprintf(stderr, "Received video data\n");
         #endif
+        F.ChunkLoader(next);
+        if (SS.connected()){
+          SS.write(std::string(F.data, F.len));
+        }
         break;
       case 15:
         #if DEBUG >= 4
@@ -352,6 +361,51 @@ void Connector_RTMP::parseChunk(){
             Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
             parsed3 = true;
           }//getStreamLength
+          if ((amfdata.getContentP(0)->StrValue() == "publish")){
+            if (amfdata.getContentP(3)){
+              streamname = amfdata.getContentP(3)->StrValue();
+              bool stoptokens = false;
+              for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
+                if (*i == '?'){stoptokens = true;}
+                if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);}
+              }
+              streamname = "/tmp/shared_socket_" + streamname;
+              SS = Socket::Connection(streamname);
+              if (!SS.connected()){
+                #if DEBUG >= 1
+                fprintf(stderr, "Could not connect to server!\n");
+                #endif
+                Socket.close();//disconnect user
+                break;
+              }
+            }
+            //send a _result reply
+            AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
+            amfreply.addContent(AMF::Object("", "_result"));//result success
+            amfreply.addContent(amfdata.getContent(1));//same transaction ID
+            amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
+            amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
+            #if DEBUG >= 4
+            amfreply.Print();
+            #endif
+            Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
+            Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
+            //send a status reply
+            amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
+            amfreply.addContent(AMF::Object("", "onStatus"));//status reply
+            amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
+            amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
+            amfreply.addContent(AMF::Object(""));//info
+            amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
+            amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
+            amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
+            amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
+            #if DEBUG >= 4
+            amfreply.Print();
+            #endif
+            Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
+            parsed3 = true;
+          }//getStreamLength
           if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
             //send a _result reply
             AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
@@ -421,6 +475,10 @@ void Connector_RTMP::parseChunk(){
         #if DEBUG >= 4
         fprintf(stderr, "Received AFM0 data message (metadata)\n");
         #endif
+        F.ChunkLoader(next);
+        if (SS.connected()){
+          SS.write(std::string(F.data, F.len));
+        }
         break;
       case 19:
         #if DEBUG >= 4
@@ -441,12 +499,16 @@ void Connector_RTMP::parseChunk(){
           fprintf(stderr, "Object encoding set to %e\n", objencoding);
           #if DEBUG >= 4
           int tmpint;
-          tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
-          if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
-          if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
-          tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
-          if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
-          if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
+          if (amfdata.getContentP(2)->getContentP("videoCodecs")){
+            tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
+            if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
+            if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
+          }
+          if (amfdata.getContentP(2)->getContentP("audioCodecs")){
+            tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
+            if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
+            if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
+          }
           #endif
           RTMPStream::chunk_snd_max = 4096;
           Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
@@ -508,6 +570,52 @@ void Connector_RTMP::parseChunk(){
           Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
           parsed = true;
         }//getStreamLength
+        if ((amfdata.getContentP(0)->StrValue() == "publish")){
+          if (amfdata.getContentP(3)){
+            streamname = amfdata.getContentP(3)->StrValue();
+            bool stoptokens = false;
+            for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
+              if (*i == '?'){stoptokens = true;}
+              if (stoptokens || (!isalpha(*i) && !isdigit(*i))){streamname.erase(i);}else{*i=tolower(*i);}
+            }
+            streamname = "/tmp/shared_socket_" + streamname;
+            SS = Socket::Connection(streamname);
+            if (!SS.connected()){
+              #if DEBUG >= 1
+              fprintf(stderr, "Could not connect to server!\n");
+              #endif
+              Socket.close();//disconnect user
+              break;
+            }
+            SS.write(Socket.getHost()+'\n');
+          }
+          //send a _result reply
+          AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
+          amfreply.addContent(AMF::Object("", "_result"));//result success
+          amfreply.addContent(amfdata.getContent(1));//same transaction ID
+          amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
+          amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
+          #if DEBUG >= 4
+          amfreply.Print();
+          #endif
+          Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
+          Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
+          //send a status reply
+          amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
+          amfreply.addContent(AMF::Object("", "onStatus"));//status reply
+          amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
+          amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
+          amfreply.addContent(AMF::Object(""));//info
+          amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
+          amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
+          amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
+          amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
+          #if DEBUG >= 4
+          amfreply.Print();
+          #endif
+          Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()));
+          parsed = true;
+        }//getStreamLength
         if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
           //send a _result reply
           AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
diff --git a/util/socket.cpp b/util/socket.cpp
index ecb1dab5..b0046d02 100644
--- a/util/socket.cpp
+++ b/util/socket.cpp
@@ -3,6 +3,11 @@
 /// Written by Jaron Vietor in 2010 for DDVTech
 
 #include "socket.h"
+#include <poll.h>
+
+#ifdef __FreeBSD__
+#include <netinet/in.h>
+#endif
 
 /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection.
 /// \param sockNo Integer representing the socket to convert.
@@ -69,6 +74,27 @@ Socket::Connection::Connection(std::string address, bool nonblock){
   }
 }//Socket::Connection Unix Contructor
 
+/// Calls poll() on the socket, checking if data is available.
+/// This function may return true even if there is no data, but never returns false when there is.
+bool Socket::Connection::canRead(){
+  struct pollfd PFD;
+  PFD.fd = sock;
+  PFD.events = POLLIN;
+  PFD.revents = 0;
+  poll(&PFD, 1, 5);
+  return (PFD.revents & POLLIN) == POLLIN;
+}
+/// Calls poll() on the socket, checking if data can be written.
+bool Socket::Connection::canWrite(){
+  struct pollfd PFD;
+  PFD.fd = sock;
+  PFD.events = POLLOUT;
+  PFD.revents = 0;
+  poll(&PFD, 1, 5);
+  return (PFD.revents & POLLOUT) == POLLOUT;
+}
+
+
 /// Returns the ready-state for this socket.
 /// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise.
 signed int Socket::Connection::ready(){
diff --git a/util/socket.h b/util/socket.h
index 9aa754be..57aca183 100644
--- a/util/socket.h
+++ b/util/socket.h
@@ -27,6 +27,8 @@ namespace Socket{
       Connection(); ///< Create a new disconnected base socket.
       Connection(int sockNo); ///< Create a new base socket.
       Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket.
+      bool canRead(); ///< Calls poll() on the socket, checking if data is available.
+      bool canWrite(); ///< Calls poll() on the socket, checking if data can be written.
       bool Error; ///< Set to true if a socket error happened.
       bool Blocking; ///< Set to true if a socket is currently or wants to be blocking.
       signed int ready(); ///< Returns the ready-state for this socket.