Merge branch 'master' of github.com:DDVTECH/DMS
This commit is contained in:
		
						commit
						26c957f12a
					
				
					 3 changed files with 210 additions and 77 deletions
				
			
		
							
								
								
									
										203
									
								
								Buffer/main.cpp
									
										
									
									
									
								
							
							
						
						
									
										203
									
								
								Buffer/main.cpp
									
										
									
									
									
								
							| 
						 | 
					@ -11,7 +11,7 @@
 | 
				
			||||||
#include <unistd.h>
 | 
					#include <unistd.h>
 | 
				
			||||||
#include <signal.h>
 | 
					#include <signal.h>
 | 
				
			||||||
#include <sstream>
 | 
					#include <sstream>
 | 
				
			||||||
#include "../util/dtsc.h" //DTSC support
 | 
					#include "../util/flv_tag.h" //FLV format parser
 | 
				
			||||||
#include "../util/socket.h" //Socket lib
 | 
					#include "../util/socket.h" //Socket lib
 | 
				
			||||||
#include "../util/json/json.h"
 | 
					#include "../util/json/json.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -28,7 +28,11 @@ namespace Buffer{
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  DTSC::Stream * Strm = 0;
 | 
					  ///holds FLV::Tag objects and their numbers
 | 
				
			||||||
 | 
					  struct buffer{
 | 
				
			||||||
 | 
					    int number;
 | 
				
			||||||
 | 
					    FLV::Tag FLV;
 | 
				
			||||||
 | 
					  };//buffer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Converts a stats line to up, down, host, connector and conntime values.
 | 
					  /// Converts a stats line to up, down, host, connector and conntime values.
 | 
				
			||||||
  class Stats{
 | 
					  class Stats{
 | 
				
			||||||
| 
						 | 
					@ -72,7 +76,9 @@ namespace Buffer{
 | 
				
			||||||
  /// Keeps track of what buffer users are using and the connection status.
 | 
					  /// Keeps track of what buffer users are using and the connection status.
 | 
				
			||||||
  class user{
 | 
					  class user{
 | 
				
			||||||
    public:
 | 
					    public:
 | 
				
			||||||
      DTSC::Ring * myRing; ///< Ring of the buffer for this user.
 | 
					      int MyBuffer; ///< Index of currently used buffer.
 | 
				
			||||||
 | 
					      int MyBuffer_num; ///< Number of currently used buffer.
 | 
				
			||||||
 | 
					      int MyBuffer_len; ///< Length in bytes of currently used buffer.
 | 
				
			||||||
      int MyNum; ///< User ID of this user.
 | 
					      int MyNum; ///< User ID of this user.
 | 
				
			||||||
      std::string MyStr; ///< User ID of this user as a string.
 | 
					      std::string MyStr; ///< User ID of this user as a string.
 | 
				
			||||||
      int currsend; ///< Current amount of bytes sent.
 | 
					      int currsend; ///< Current amount of bytes sent.
 | 
				
			||||||
| 
						 | 
					@ -91,16 +97,11 @@ namespace Buffer{
 | 
				
			||||||
        std::stringstream st;
 | 
					        std::stringstream st;
 | 
				
			||||||
        st << MyNum;
 | 
					        st << MyNum;
 | 
				
			||||||
        MyStr = st.str();
 | 
					        MyStr = st.str();
 | 
				
			||||||
 | 
					        gotproperaudio = false;
 | 
				
			||||||
        curr_up = 0;
 | 
					        curr_up = 0;
 | 
				
			||||||
        curr_down = 0;
 | 
					        curr_down = 0;
 | 
				
			||||||
        currsend = 0;
 | 
					 | 
				
			||||||
        myRing = 0;
 | 
					 | 
				
			||||||
        std::cout << "User " << MyNum << " connected" << std::endl;
 | 
					        std::cout << "User " << MyNum << " connected" << std::endl;
 | 
				
			||||||
      }//constructor
 | 
					      }//constructor
 | 
				
			||||||
      /// Drops held DTSC::Ring class, if one is held.
 | 
					 | 
				
			||||||
      ~user(){
 | 
					 | 
				
			||||||
        Strm->dropRing(myRing);
 | 
					 | 
				
			||||||
      }//destructor
 | 
					 | 
				
			||||||
      /// Disconnects the current user. Doesn't do anything if already disconnected.
 | 
					      /// Disconnects the current user. Doesn't do anything if already disconnected.
 | 
				
			||||||
      /// Prints "Disconnected user" to stdout if disconnect took place.
 | 
					      /// Prints "Disconnected user" to stdout if disconnect took place.
 | 
				
			||||||
      void Disconnect(std::string reason) {
 | 
					      void Disconnect(std::string reason) {
 | 
				
			||||||
| 
						 | 
					@ -118,42 +119,70 @@ namespace Buffer{
 | 
				
			||||||
      }//Disconnect
 | 
					      }//Disconnect
 | 
				
			||||||
      /// Tries to send the current buffer, returns true if success, false otherwise.
 | 
					      /// Tries to send the current buffer, returns true if success, false otherwise.
 | 
				
			||||||
      /// Has a side effect of dropping the connection if send will never complete.
 | 
					      /// Has a side effect of dropping the connection if send will never complete.
 | 
				
			||||||
      bool doSend(const char * ptr, int len){
 | 
					      bool doSend(){
 | 
				
			||||||
        int r = S.iwrite(ptr+currsend, len-currsend);
 | 
					        int r = S.iwrite((char*)lastpointer+currsend, MyBuffer_len-currsend);
 | 
				
			||||||
        if (r <= 0){
 | 
					        if (r <= 0){
 | 
				
			||||||
          if (errno == EWOULDBLOCK){return false;}
 | 
					          if (errno == EWOULDBLOCK){return false;}
 | 
				
			||||||
          Disconnect(S.getError());
 | 
					          Disconnect(S.getError());
 | 
				
			||||||
          return false;
 | 
					          return false;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        currsend += r;
 | 
					        currsend += r;
 | 
				
			||||||
        return (currsend == len);
 | 
					        return (currsend == MyBuffer_len);
 | 
				
			||||||
      }//doSend
 | 
					      }//doSend
 | 
				
			||||||
      /// Try to send data to this user. Disconnects if any problems occur.
 | 
					      /// Try to send data to this user. Disconnects if any problems occur.
 | 
				
			||||||
      void Send(){
 | 
					      /// \param ringbuf Array of buffers (FLV:Tag with ID attached)
 | 
				
			||||||
        if (!myRing){return;}//no ring!
 | 
					      /// \param buffers Count of elements in ringbuf
 | 
				
			||||||
 | 
					      void Send(buffer ** ringbuf, int buffers){
 | 
				
			||||||
 | 
					        /// \todo For MP3: gotproperaudio - if false, only send if first byte is 0xFF and set to true
 | 
				
			||||||
        if (!S.connected()){return;}//cancel if not connected
 | 
					        if (!S.connected()){return;}//cancel if not connected
 | 
				
			||||||
        if (myRing->waiting){return;}//still waiting for next buffer?
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (myRing->starved){
 | 
					        //still waiting for next buffer? check it
 | 
				
			||||||
          //if corrupt data, warn and get new DTSC::Ring
 | 
					        if (MyBuffer_num < 0){
 | 
				
			||||||
          std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl;
 | 
					          MyBuffer_num = ringbuf[MyBuffer]->number;
 | 
				
			||||||
          Strm->dropRing(myRing);
 | 
					          if (MyBuffer_num < 0){
 | 
				
			||||||
          myRing = Strm->getRing();
 | 
					            return; //still waiting? don't crash - wait longer.
 | 
				
			||||||
 | 
					          }else{
 | 
				
			||||||
 | 
					            MyBuffer_len = ringbuf[MyBuffer]->FLV.len;
 | 
				
			||||||
 | 
					            lastpointer = ringbuf[MyBuffer]->FLV.data;
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        //do check for buffer resizes
 | 
				
			||||||
 | 
					        if (lastpointer != ringbuf[MyBuffer]->FLV.data){
 | 
				
			||||||
 | 
					          Disconnect("Buffer resize at wrong time... had to disconnect");
 | 
				
			||||||
 | 
					          return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        currsend = 0;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        //try to complete a send
 | 
					        //try to complete a send
 | 
				
			||||||
        if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){
 | 
					        if (doSend()){
 | 
				
			||||||
          //switch to next buffer
 | 
					          //switch to next buffer
 | 
				
			||||||
          if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode.
 | 
					          if ((ringbuf[MyBuffer]->number != MyBuffer_num)){
 | 
				
			||||||
          myRing->b--;
 | 
					            //if corrupt data, warn and find keyframe
 | 
				
			||||||
 | 
					            std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
 | 
				
			||||||
 | 
					            int nocrashcount = 0;
 | 
				
			||||||
 | 
					            do{
 | 
				
			||||||
 | 
					              MyBuffer++;
 | 
				
			||||||
 | 
					              nocrashcount++;
 | 
				
			||||||
 | 
					              MyBuffer %= buffers;
 | 
				
			||||||
 | 
					            }while(!ringbuf[MyBuffer]->FLV.isKeyframe && (nocrashcount < buffers));
 | 
				
			||||||
 | 
					            //if keyframe not available, try again later
 | 
				
			||||||
 | 
					            if (nocrashcount >= buffers){
 | 
				
			||||||
 | 
					              std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl;
 | 
				
			||||||
 | 
					              return;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					          }else{
 | 
				
			||||||
 | 
					            MyBuffer++;
 | 
				
			||||||
 | 
					            MyBuffer %= buffers;
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					          MyBuffer_num = -1;
 | 
				
			||||||
 | 
					          lastpointer = 0;
 | 
				
			||||||
          currsend = 0;
 | 
					          currsend = 0;
 | 
				
			||||||
        }//completed a send
 | 
					        }//completed a send
 | 
				
			||||||
      }//send
 | 
					      }//send
 | 
				
			||||||
  };
 | 
					  };
 | 
				
			||||||
  int user::UserCount = 0;
 | 
					  int user::UserCount = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Starts a loop, waiting for connections to send data to.
 | 
					  /// Starts a loop, waiting for connections to send video data to.
 | 
				
			||||||
  int Start(int argc, char ** argv) {
 | 
					  int Start(int argc, char ** argv) {
 | 
				
			||||||
    //first make sure no segpipe signals will kill us
 | 
					    //first make sure no segpipe signals will kill us
 | 
				
			||||||
    struct sigaction new_action;
 | 
					    struct sigaction new_action;
 | 
				
			||||||
| 
						 | 
					@ -163,27 +192,32 @@ namespace Buffer{
 | 
				
			||||||
    sigaction (SIGPIPE, &new_action, NULL);
 | 
					    sigaction (SIGPIPE, &new_action, NULL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    //then check and parse the commandline
 | 
					    //then check and parse the commandline
 | 
				
			||||||
    if (argc < 2) {
 | 
					    if (argc < 3) {
 | 
				
			||||||
      std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl;
 | 
					      std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl;
 | 
				
			||||||
      return 1;
 | 
					      return 1;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    std::string waiting_ip = "";
 | 
					    std::string waiting_ip = "";
 | 
				
			||||||
    bool ip_waiting = false;
 | 
					    bool ip_waiting = false;
 | 
				
			||||||
    Socket::Connection ip_input;
 | 
					    Socket::Connection ip_input;
 | 
				
			||||||
    if (argc >= 4){
 | 
					    if (argc >= 4){
 | 
				
			||||||
      waiting_ip += argv[2];
 | 
					      waiting_ip += argv[3];
 | 
				
			||||||
      ip_waiting = true;
 | 
					      ip_waiting = true;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    std::string shared_socket = "/tmp/shared_socket_";
 | 
					    std::string shared_socket = "/tmp/shared_socket_";
 | 
				
			||||||
    shared_socket += argv[1];
 | 
					    shared_socket += argv[2];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Socket::Server SS(shared_socket, true);
 | 
					    Socket::Server SS(shared_socket, true);
 | 
				
			||||||
    Strm = new DTSC::Stream(5);
 | 
					    FLV::Tag metadata;
 | 
				
			||||||
 | 
					    FLV::Tag video_init;
 | 
				
			||||||
 | 
					    FLV::Tag audio_init;
 | 
				
			||||||
 | 
					    int buffers = atoi(argv[1]);
 | 
				
			||||||
 | 
					    buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*));
 | 
				
			||||||
    std::vector<user> users;
 | 
					    std::vector<user> users;
 | 
				
			||||||
    std::vector<user>::iterator usersIt;
 | 
					    std::vector<user>::iterator usersIt;
 | 
				
			||||||
    std::string inBuffer;
 | 
					    for (int i = 0; i < buffers; ++i) ringbuf[i] = new buffer;
 | 
				
			||||||
    char charBuffer[1024*10];
 | 
					    int current_buffer = 0;
 | 
				
			||||||
    unsigned int charCount;
 | 
					    int lastproper = 0;//last properly finished buffer number
 | 
				
			||||||
 | 
					    unsigned int loopcount = 0;
 | 
				
			||||||
    unsigned int stattimer = 0;
 | 
					    unsigned int stattimer = 0;
 | 
				
			||||||
    Socket::Connection incoming;
 | 
					    Socket::Connection incoming;
 | 
				
			||||||
    Socket::Connection std_input(fileno(stdin));
 | 
					    Socket::Connection std_input(fileno(stdin));
 | 
				
			||||||
| 
						 | 
					@ -193,8 +227,12 @@ namespace Buffer{
 | 
				
			||||||
    Storage["curr"] = Json::Value(Json::objectValue);
 | 
					    Storage["curr"] = Json::Value(Json::objectValue);
 | 
				
			||||||
    Storage["totals"] = Json::Value(Json::objectValue);
 | 
					    Storage["totals"] = Json::Value(Json::objectValue);
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
 | 
					    unsigned char packtype;
 | 
				
			||||||
 | 
					    bool gotVideoInfo = false;
 | 
				
			||||||
 | 
					    bool gotAudioInfo = false;
 | 
				
			||||||
 | 
					    bool gotData = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    while (!feof(stdin) || ip_waiting){
 | 
					    while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){
 | 
				
			||||||
      usleep(1000); //sleep for 1 ms, to prevent 100% CPU time
 | 
					      usleep(1000); //sleep for 1 ms, to prevent 100% CPU time
 | 
				
			||||||
      unsigned int now = time(0);
 | 
					      unsigned int now = time(0);
 | 
				
			||||||
      if (now != stattimer){
 | 
					      if (now != stattimer){
 | 
				
			||||||
| 
						 | 
					@ -211,9 +249,7 @@ namespace Buffer{
 | 
				
			||||||
        Storage["totals"]["up"] = tot_up;
 | 
					        Storage["totals"]["up"] = tot_up;
 | 
				
			||||||
        Storage["totals"]["count"] = tot_count;
 | 
					        Storage["totals"]["count"] = tot_count;
 | 
				
			||||||
        Storage["totals"]["now"] = now;
 | 
					        Storage["totals"]["now"] = now;
 | 
				
			||||||
        if( argc >= 4 ) {
 | 
					 | 
				
			||||||
        Storage["totals"]["buffer"] = argv[2];
 | 
					        Storage["totals"]["buffer"] = argv[2];
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        if (!StatsSocket.connected()){
 | 
					        if (!StatsSocket.connected()){
 | 
				
			||||||
          StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
 | 
					          StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
| 
						 | 
					@ -223,22 +259,89 @@ namespace Buffer{
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      //invalidate the current buffer
 | 
					      //invalidate the current buffer
 | 
				
			||||||
      if ( (!ip_waiting && std_input.canRead()) || (ip_waiting && ip_input.connected()) ){
 | 
					      ringbuf[current_buffer]->number = -1;
 | 
				
			||||||
        std::cin.read(charBuffer, 1024*10);
 | 
					      if (
 | 
				
			||||||
        charCount = std::cin.gcount();
 | 
					          (!ip_waiting &&
 | 
				
			||||||
        inBuffer.append(charBuffer, charCount);
 | 
					              (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin)
 | 
				
			||||||
        Strm->parsePacket(inBuffer);
 | 
					          ) || (ip_waiting && (ip_input.connected()) &&
 | 
				
			||||||
 | 
					              ringbuf[current_buffer]->FLV.SockLoader(ip_input)
 | 
				
			||||||
 | 
					          )
 | 
				
			||||||
 | 
					      ){
 | 
				
			||||||
 | 
					        loopcount++;
 | 
				
			||||||
 | 
					        packtype = ringbuf[current_buffer]->FLV.data[0];
 | 
				
			||||||
 | 
					        //store metadata, if available
 | 
				
			||||||
 | 
					        if (packtype == 0x12){
 | 
				
			||||||
 | 
					          metadata = ringbuf[current_buffer]->FLV;
 | 
				
			||||||
 | 
					          std::cout << "Received metadata!" << std::endl;
 | 
				
			||||||
 | 
					          if (gotVideoInfo && gotAudioInfo){
 | 
				
			||||||
 | 
					            FLV::Parse_Error = true;
 | 
				
			||||||
 | 
					            std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl;
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					          gotVideoInfo = false;
 | 
				
			||||||
 | 
					          gotAudioInfo = false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        //store video init data, if available
 | 
				
			||||||
 | 
					        if (!gotVideoInfo && ringbuf[current_buffer]->FLV.isKeyframe){
 | 
				
			||||||
 | 
					          if ((ringbuf[current_buffer]->FLV.data[11] & 0x0f) == 7){//avc packet
 | 
				
			||||||
 | 
					            if (ringbuf[current_buffer]->FLV.data[12] == 0){
 | 
				
			||||||
 | 
					              ringbuf[current_buffer]->FLV.tagTime(0);//timestamp to zero
 | 
				
			||||||
 | 
					              video_init = ringbuf[current_buffer]->FLV;
 | 
				
			||||||
 | 
					              gotVideoInfo = true;
 | 
				
			||||||
 | 
					              std::cout << "Received video configuration!" << std::endl;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					          }else{gotVideoInfo = true;}//non-avc = no config...
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        //store audio init data, if available
 | 
				
			||||||
 | 
					        if (!gotAudioInfo && (packtype == 0x08)){
 | 
				
			||||||
 | 
					          if (((ringbuf[current_buffer]->FLV.data[11] & 0xf0) >> 4) == 10){//aac packet
 | 
				
			||||||
 | 
					            ringbuf[current_buffer]->FLV.tagTime(0);//timestamp to zero
 | 
				
			||||||
 | 
					            audio_init = ringbuf[current_buffer]->FLV;
 | 
				
			||||||
 | 
					            gotAudioInfo = true;
 | 
				
			||||||
 | 
					            std::cout << "Received audio configuration!" << std::endl;
 | 
				
			||||||
 | 
					          }else{gotAudioInfo = true;}//no aac = no config...
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        //on keyframe set possible start point
 | 
				
			||||||
 | 
					        if (packtype == 0x09){
 | 
				
			||||||
 | 
					          if (((ringbuf[current_buffer]->FLV.data[11] & 0xf0) >> 4) == 1){
 | 
				
			||||||
 | 
					            lastproper = current_buffer;
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (loopcount > 5){gotData = true;}
 | 
				
			||||||
 | 
					        //keep track of buffers
 | 
				
			||||||
 | 
					        ringbuf[current_buffer]->number = loopcount;
 | 
				
			||||||
 | 
					        current_buffer++;
 | 
				
			||||||
 | 
					        current_buffer %= buffers;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      //check for new connections, accept them if there are any
 | 
					      //check for new connections, accept them if there are any
 | 
				
			||||||
      incoming = SS.accept(true);
 | 
					      incoming = SS.accept(true);
 | 
				
			||||||
      if (incoming.connected()){
 | 
					      if (incoming.connected()){
 | 
				
			||||||
        users.push_back(incoming);
 | 
					        users.push_back(incoming);
 | 
				
			||||||
        //send the header
 | 
					        //send the FLV header
 | 
				
			||||||
        users.back().myRing = Strm->getRing();
 | 
					        users.back().currsend = 0;
 | 
				
			||||||
        if (!users.back().S.write(Strm->outHeader())){
 | 
					        users.back().MyBuffer = lastproper;
 | 
				
			||||||
 | 
					        users.back().MyBuffer_num = -1;
 | 
				
			||||||
        /// \todo Do this more nicely?
 | 
					        /// \todo Do this more nicely?
 | 
				
			||||||
 | 
					        if (gotData){
 | 
				
			||||||
 | 
					          if (!users.back().S.write(FLV::Header, 13)){
 | 
				
			||||||
            users.back().Disconnect("failed to receive the header!");
 | 
					            users.back().Disconnect("failed to receive the header!");
 | 
				
			||||||
 | 
					          }else{
 | 
				
			||||||
 | 
					            if (metadata.len > 0){
 | 
				
			||||||
 | 
					              if (!users.back().S.write(metadata.data, metadata.len)){
 | 
				
			||||||
 | 
					                users.back().Disconnect("failed to receive metadata!");
 | 
				
			||||||
 | 
					              }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            if (audio_init.len > 0){
 | 
				
			||||||
 | 
					              if (!users.back().S.write(audio_init.data, audio_init.len)){
 | 
				
			||||||
 | 
					                users.back().Disconnect("failed to receive audio init!");
 | 
				
			||||||
 | 
					              }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            if (video_init.len > 0){
 | 
				
			||||||
 | 
					              if (!users.back().S.write(video_init.data, video_init.len)){
 | 
				
			||||||
 | 
					                users.back().Disconnect("failed to receive video init!");
 | 
				
			||||||
 | 
					              }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -259,7 +362,7 @@ namespace Buffer{
 | 
				
			||||||
              if (tmp != ""){
 | 
					              if (tmp != ""){
 | 
				
			||||||
                if (tmp[0] == 'P'){
 | 
					                if (tmp[0] == 'P'){
 | 
				
			||||||
                  std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl;
 | 
					                  std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl;
 | 
				
			||||||
                  if (tmp.substr(2) == waiting_ip){
 | 
					                  if (tmp.substr(2) == waiting_ip || tmp.substr(2) == "::ffff:"+waiting_ip){
 | 
				
			||||||
                    if (!ip_input.connected()){
 | 
					                    if (!ip_input.connected()){
 | 
				
			||||||
                      std::cout << "Push accepted!" << std::endl;
 | 
					                      std::cout << "Push accepted!" << std::endl;
 | 
				
			||||||
                      ip_input = (*usersIt).S;
 | 
					                      ip_input = (*usersIt).S;
 | 
				
			||||||
| 
						 | 
					@ -269,7 +372,7 @@ namespace Buffer{
 | 
				
			||||||
                      (*usersIt).Disconnect("Push denied - push already in progress!");
 | 
					                      (*usersIt).Disconnect("Push denied - push already in progress!");
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                  }else{
 | 
					                  }else{
 | 
				
			||||||
                    (*usersIt).Disconnect("Push denied - invalid IP address!");
 | 
					                    (*usersIt).Disconnect("Push denied - invalid IP address ("+waiting_ip+"!="+tmp.substr(2)+")!");
 | 
				
			||||||
                  }
 | 
					                  }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                if (tmp[0] == 'S'){
 | 
					                if (tmp[0] == 'S'){
 | 
				
			||||||
| 
						 | 
					@ -288,15 +391,18 @@ namespace Buffer{
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
              }
 | 
					              }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            (*usersIt).Send();
 | 
					            (*usersIt).Send(ringbuf, buffers);
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }//main loop
 | 
					    }//main loop
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // disconnect listener
 | 
					    // disconnect listener
 | 
				
			||||||
    /// \todo Deal with EOF more nicely - doesn't send the end of the stream to all users!
 | 
					    if (FLV::Parse_Error){
 | 
				
			||||||
 | 
					      std::cout << "FLV parse error:" << FLV::Error_Str << std::endl;
 | 
				
			||||||
 | 
					    }else{
 | 
				
			||||||
      std::cout << "Reached EOF of input" << std::endl;
 | 
					      std::cout << "Reached EOF of input" << std::endl;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    SS.close();
 | 
					    SS.close();
 | 
				
			||||||
    while (users.size() > 0){
 | 
					    while (users.size() > 0){
 | 
				
			||||||
      for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
 | 
					      for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
 | 
				
			||||||
| 
						 | 
					@ -304,7 +410,6 @@ namespace Buffer{
 | 
				
			||||||
        if (!(*usersIt).S.connected()){users.erase(usersIt);break;}
 | 
					        if (!(*usersIt).S.connected()){users.erase(usersIt);break;}
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    delete Strm;
 | 
					 | 
				
			||||||
    return 0;
 | 
					    return 0;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -241,21 +241,31 @@ void Connector_RTMP::parseChunk(){
 | 
				
			||||||
        Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
 | 
					        Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
 | 
				
			||||||
        break;
 | 
					        break;
 | 
				
			||||||
      case 8:
 | 
					      case 8:
 | 
				
			||||||
        #if DEBUG >= 4
 | 
					 | 
				
			||||||
        fprintf(stderr, "Received audio data\n");
 | 
					 | 
				
			||||||
        #endif
 | 
					 | 
				
			||||||
        F.ChunkLoader(next);
 | 
					        F.ChunkLoader(next);
 | 
				
			||||||
        if (SS.connected()){
 | 
					        if (SS.connected()){
 | 
				
			||||||
 | 
					          #if DEBUG >= 4
 | 
				
			||||||
 | 
					          fprintf(stderr, "A");
 | 
				
			||||||
 | 
					          #endif
 | 
				
			||||||
          SS.write(std::string(F.data, F.len));
 | 
					          SS.write(std::string(F.data, F.len));
 | 
				
			||||||
 | 
					        }else{
 | 
				
			||||||
 | 
					          #if DEBUG >= 4
 | 
				
			||||||
 | 
					          fprintf(stderr, "Received useless audio data\n");
 | 
				
			||||||
 | 
					          #endif
 | 
				
			||||||
 | 
					          Socket.close();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        break;
 | 
					        break;
 | 
				
			||||||
      case 9:
 | 
					      case 9:
 | 
				
			||||||
        #if DEBUG >= 4
 | 
					 | 
				
			||||||
        fprintf(stderr, "Received video data\n");
 | 
					 | 
				
			||||||
        #endif
 | 
					 | 
				
			||||||
        F.ChunkLoader(next);
 | 
					        F.ChunkLoader(next);
 | 
				
			||||||
        if (SS.connected()){
 | 
					        if (SS.connected()){
 | 
				
			||||||
 | 
					          #if DEBUG >= 4
 | 
				
			||||||
 | 
					          fprintf(stderr, "V");
 | 
				
			||||||
 | 
					          #endif
 | 
				
			||||||
          SS.write(std::string(F.data, F.len));
 | 
					          SS.write(std::string(F.data, F.len));
 | 
				
			||||||
 | 
					        }else{
 | 
				
			||||||
 | 
					          #if DEBUG >= 4
 | 
				
			||||||
 | 
					          fprintf(stderr, "Received useless video data\n");
 | 
				
			||||||
 | 
					          #endif
 | 
				
			||||||
 | 
					          Socket.close();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        break;
 | 
					        break;
 | 
				
			||||||
      case 15:
 | 
					      case 15:
 | 
				
			||||||
| 
						 | 
					@ -348,6 +358,9 @@ void Connector_RTMP::parseChunk(){
 | 
				
			||||||
            Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
 | 
					            Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
 | 
				
			||||||
            parsed3 = true;
 | 
					            parsed3 = true;
 | 
				
			||||||
          }//createStream
 | 
					          }//createStream
 | 
				
			||||||
 | 
					          if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
 | 
				
			||||||
 | 
					            if (SS.connected()){SS.close();}
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
          if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
 | 
					          if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
 | 
				
			||||||
            //send a _result reply
 | 
					            //send a _result reply
 | 
				
			||||||
            AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
 | 
					            AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
 | 
				
			||||||
| 
						 | 
					@ -569,6 +582,9 @@ void Connector_RTMP::parseChunk(){
 | 
				
			||||||
          Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
 | 
					          Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
 | 
				
			||||||
          parsed = true;
 | 
					          parsed = true;
 | 
				
			||||||
        }//createStream
 | 
					        }//createStream
 | 
				
			||||||
 | 
					        if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
 | 
				
			||||||
 | 
					          if (SS.connected()){SS.close();}
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
 | 
					        if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
 | 
				
			||||||
          //send a _result reply
 | 
					          //send a _result reply
 | 
				
			||||||
          AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
 | 
					          AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -580,7 +580,14 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P){
 | 
				
			||||||
        len += (data[2] << 8);
 | 
					        len += (data[2] << 8);
 | 
				
			||||||
        len += (data[1] << 16);
 | 
					        len += (data[1] << 16);
 | 
				
			||||||
        if (buf < len){data = (char*)realloc(data, len); buf = len;}
 | 
					        if (buf < len){data = (char*)realloc(data, len); buf = len;}
 | 
				
			||||||
        if (data[0] > 0x12){FLV::Parse_Error = true; Error_Str = "Invalid Tag received."; return false;}
 | 
					        if (data[0] > 0x12){
 | 
				
			||||||
 | 
					          data[0] += 32;
 | 
				
			||||||
 | 
					          FLV::Parse_Error = true;
 | 
				
			||||||
 | 
					          Error_Str = "Invalid Tag received (";
 | 
				
			||||||
 | 
					          Error_Str += data[0];
 | 
				
			||||||
 | 
					          Error_Str += ").";
 | 
				
			||||||
 | 
					          return false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        done = false;
 | 
					        done = false;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					@ -607,20 +614,11 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P){
 | 
				
			||||||
/// \param sock Socket to read from.
 | 
					/// \param sock Socket to read from.
 | 
				
			||||||
/// \return True if count bytes are read succesfully, false otherwise.
 | 
					/// \return True if count bytes are read succesfully, false otherwise.
 | 
				
			||||||
bool FLV::Tag::SockReadUntil(char * buffer, unsigned int count, unsigned int & sofar, Socket::Connection & sock){
 | 
					bool FLV::Tag::SockReadUntil(char * buffer, unsigned int count, unsigned int & sofar, Socket::Connection & sock){
 | 
				
			||||||
  if (sofar == count){return true;}
 | 
					  if (sofar >= count){return true;}
 | 
				
			||||||
  if (!sock.read(buffer + sofar,count-sofar)){
 | 
					  int r = 0;
 | 
				
			||||||
    if (errno != EWOULDBLOCK){
 | 
					  r = sock.iread(buffer + sofar,count-sofar);
 | 
				
			||||||
      FLV::Parse_Error = true;
 | 
					  sofar += r;
 | 
				
			||||||
      Error_Str = "Error reading from socket.";
 | 
					  if (sofar >= count){return true;}
 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    return false;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  sofar += count-sofar;
 | 
					 | 
				
			||||||
  if (sofar == count){return true;}
 | 
					 | 
				
			||||||
  if (sofar > count){
 | 
					 | 
				
			||||||
    FLV::Parse_Error = true;
 | 
					 | 
				
			||||||
    Error_Str = "Socket buffer overflow.";
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return false;
 | 
					  return false;
 | 
				
			||||||
}//Tag::SockReadUntil
 | 
					}//Tag::SockReadUntil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -647,7 +645,14 @@ bool FLV::Tag::SockLoader(Socket::Connection sock){
 | 
				
			||||||
        len += (data[2] << 8);
 | 
					        len += (data[2] << 8);
 | 
				
			||||||
        len += (data[1] << 16);
 | 
					        len += (data[1] << 16);
 | 
				
			||||||
        if (buf < len){data = (char*)realloc(data, len); buf = len;}
 | 
					        if (buf < len){data = (char*)realloc(data, len); buf = len;}
 | 
				
			||||||
        if (data[0] > 0x12){FLV::Parse_Error = true; Error_Str = "Invalid Tag received."; return false;}
 | 
					        if (data[0] > 0x12){
 | 
				
			||||||
 | 
					          data[0] += 32;
 | 
				
			||||||
 | 
					          FLV::Parse_Error = true;
 | 
				
			||||||
 | 
					          Error_Str = "Invalid Tag received (";
 | 
				
			||||||
 | 
					          Error_Str += data[0];
 | 
				
			||||||
 | 
					          Error_Str += ").";
 | 
				
			||||||
 | 
					          return false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        done = false;
 | 
					        done = false;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					@ -719,7 +724,14 @@ bool FLV::Tag::FileLoader(FILE * f){
 | 
				
			||||||
        len += (data[2] << 8);
 | 
					        len += (data[2] << 8);
 | 
				
			||||||
        len += (data[1] << 16);
 | 
					        len += (data[1] << 16);
 | 
				
			||||||
        if (buf < len){data = (char*)realloc(data, len); buf = len;}
 | 
					        if (buf < len){data = (char*)realloc(data, len); buf = len;}
 | 
				
			||||||
        if (data[0] > 0x12){FLV::Parse_Error = true; Error_Str = "Invalid Tag received."; return false;}
 | 
					        if (data[0] > 0x12){
 | 
				
			||||||
 | 
					          data[0] += 32;
 | 
				
			||||||
 | 
					          FLV::Parse_Error = true;
 | 
				
			||||||
 | 
					          Error_Str = "Invalid Tag received (";
 | 
				
			||||||
 | 
					          Error_Str += data[0];
 | 
				
			||||||
 | 
					          Error_Str += ").";
 | 
				
			||||||
 | 
					          return false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        done = false;
 | 
					        done = false;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue