Rewrite Buffer programma
This commit is contained in:
		
							parent
							
								
									273f30784b
								
							
						
					
					
						commit
						47370e9621
					
				
					 7 changed files with 175 additions and 147 deletions
				
			
		|  | @ -1,4 +1,4 @@ | |||
| SRC = main.cpp ../sockets/sw_base.cpp ../sockets/sw_inet.cpp ../sockets/sw_unix.cpp | ||||
| SRC = main.cpp | ||||
| OBJ = $(SRC:.cpp=.o) | ||||
| OUT = Buffer | ||||
| INCLUDES =  | ||||
|  |  | |||
|  | @ -1,7 +0,0 @@ | |||
| #pragma once | ||||
| 
 | ||||
| struct buffer{ | ||||
|   int number; | ||||
|   bool iskeyframe; | ||||
|   FLV_Pack * FLV; | ||||
| };//buffer
 | ||||
							
								
								
									
										187
									
								
								Buffer/main.cpp
									
										
									
									
									
								
							
							
						
						
									
										187
									
								
								Buffer/main.cpp
									
										
									
									
									
								
							|  | @ -1,62 +1,143 @@ | |||
| #include <unistd.h> | ||||
| #include <fcntl.h> | ||||
| #include <iostream> | ||||
| #include "../sockets/SocketW.h" | ||||
| #include <string> | ||||
| #include <vector> | ||||
| #include <cstdlib> | ||||
| #include <cstdio> | ||||
| #include <string.h> | ||||
| #include <unistd.h> | ||||
| #include <signal.h> | ||||
| #include "../util/flv.cpp" //FLV format parser
 | ||||
| #include "user.cpp" | ||||
| #include "../util/ddv_socket.cpp" //DDV Socket lib
 | ||||
| 
 | ||||
| #include <sys/epoll.h> | ||||
| 
 | ||||
| int get_empty( user ** list, int amount ) { | ||||
|   for (int i = 0; i < amount; i++ ){ | ||||
|     if (!list[i]->is_connected){return i;} | ||||
|   } | ||||
|   return -1; | ||||
| void termination_handler (int signum){ | ||||
|   return; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| struct buffer{ | ||||
|   int number; | ||||
|   bool iskeyframe; | ||||
|   FLV_Pack * FLV; | ||||
|   buffer(){ | ||||
|     number = -1; | ||||
|     iskeyframe = false; | ||||
|     FLV = 0; | ||||
|   }//constructor
 | ||||
| };//buffer
 | ||||
| 
 | ||||
| class user{ | ||||
|   public: | ||||
|     int MyBuffer; | ||||
|     int MyBuffer_num; | ||||
|     int MyBuffer_len; | ||||
|     int MyNum; | ||||
|     int currsend; | ||||
|     void * lastpointer; | ||||
|     static int UserCount; | ||||
|     int s; | ||||
|     user(int fd){ | ||||
|       s = fd; | ||||
|       MyNum = UserCount++; | ||||
|       std::cout << "User " << MyNum << " connected" << std::endl; | ||||
|     }//constructor
 | ||||
|     void Disconnect(std::string reason) { | ||||
|       if (s != -1) { | ||||
|         close(s); | ||||
|         s = -1; | ||||
|         std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; | ||||
|       } | ||||
|     }//Disconnect
 | ||||
|     bool doSend(char * buffer, int todo){ | ||||
|       int r = send(s, buffer+currsend, todo-currsend, 0); | ||||
|       if (r <= 0){ | ||||
|         if ((r < 0) && (errno == EWOULDBLOCK)){return false;} | ||||
|         Disconnect("Connection closed"); | ||||
|         return false; | ||||
|       } | ||||
|       currsend += r; | ||||
|       return (currsend == todo); | ||||
|     } | ||||
|     void Send(buffer ** ringbuf, int buffers){ | ||||
|       //not connected? cancel
 | ||||
|       if (s < 0){return;} | ||||
|       //still waiting for next buffer? check it
 | ||||
|       if (MyBuffer_num < 0){ | ||||
|         MyBuffer_num = ringbuf[MyBuffer]->number; | ||||
|         //still waiting? don't crash - wait longer.
 | ||||
|         if (MyBuffer_num < 0){ | ||||
|           return; | ||||
|         }else{ | ||||
|           MyBuffer_len = ringbuf[MyBuffer]->FLV->len; | ||||
|           lastpointer = ringbuf[MyBuffer]->FLV->data; | ||||
|         } | ||||
|       } | ||||
|       if (lastpointer != ringbuf[MyBuffer]->FLV->data){ | ||||
|         Disconnect("Buffer resize at wrong time... had to disconnect"); | ||||
|         return; | ||||
|       } | ||||
|       if (doSend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len)){ | ||||
|         //completed a send - switch to next buffer
 | ||||
|         if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ | ||||
|           std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; | ||||
|           int nocrashcount = 0; | ||||
|           do{ | ||||
|             MyBuffer++; | ||||
|             nocrashcount++; | ||||
|             MyBuffer %= buffers; | ||||
|           }while(!ringbuf[MyBuffer]->FLV->isKeyframe && (nocrashcount < buffers)); | ||||
|           if (nocrashcount >= buffers){ | ||||
|             std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl; | ||||
|             return; | ||||
|           } | ||||
|         }else{ | ||||
|           MyBuffer++; | ||||
|           MyBuffer %= buffers; | ||||
|         } | ||||
|         MyBuffer_num = -1; | ||||
|         lastpointer = 0; | ||||
|         currsend = 0; | ||||
|       }//completed a send
 | ||||
|     }//send
 | ||||
| }; | ||||
| int user::UserCount = 0; | ||||
| 
 | ||||
| int main( int argc, char * argv[] ) { | ||||
|   struct sigaction new_action; | ||||
|   new_action.sa_handler = termination_handler; | ||||
|   sigemptyset (&new_action.sa_mask); | ||||
|   new_action.sa_flags = 0; | ||||
|   sigaction (SIGPIPE, &new_action, NULL); | ||||
|    | ||||
|   if (argc < 2) { | ||||
|     std::cout << "usage: " << argv[0] << " buffers_count [streamname]" << std::endl; | ||||
|     return 1; | ||||
|   } | ||||
|   int metabuflen = 0; | ||||
|   char * metabuffer = 0; | ||||
|   int buffers = atoi(argv[1]); | ||||
|   buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); | ||||
|   std::vector<user> connectionList; | ||||
|   std::vector<user>::iterator connIt; | ||||
|   for (int i = 0; i < buffers; ++i) ringbuf[i] = new buffer; | ||||
|   int current_buffer = 0; | ||||
|   int lastproper = 0;//last properly finished buffer number
 | ||||
|   unsigned int loopcount = 0; | ||||
|   SWUnixSocket listener(SWBaseSocket::nonblocking); | ||||
|   SWBaseSocket * incoming = 0; | ||||
|   SWBaseSocket::SWBaseError BError; | ||||
| 
 | ||||
|   std::string shared_socket = "/tmp/shared_socket"; | ||||
|   if (argc > 2){ | ||||
|     shared_socket = argv[2]; | ||||
|     shared_socket = "/tmp/shared_socket_" + shared_socket; | ||||
|   } | ||||
|   unlink(shared_socket.c_str()); | ||||
|   listener.bind(shared_socket.c_str()); | ||||
|   listener.listen(50); | ||||
|   listener.set_timeout(0,50000); | ||||
| 
 | ||||
|   int metabuflen = 0; | ||||
|   char * metabuffer = 0; | ||||
|   int buffers = atoi(argv[1]); | ||||
|   buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); | ||||
|   std::vector<user> users; | ||||
|   std::vector<user>::iterator usersIt; | ||||
|   for (int i = 0; i < buffers; ++i) ringbuf[i] = new buffer; | ||||
|   int current_buffer = 0; | ||||
|   int lastproper = 0;//last properly finished buffer number
 | ||||
|   unsigned int loopcount = 0; | ||||
|   int listener = DDV_UnixListen(shared_socket, true); | ||||
|   int incoming = 0; | ||||
| 
 | ||||
|   unsigned char packtype; | ||||
|   bool gotVideoInfo = false; | ||||
|   bool gotAudioInfo = false; | ||||
| 
 | ||||
|   //set stdin to be nonblocking
 | ||||
|   //int flags = fcntl(0, F_GETFL, 0);
 | ||||
|   //flags |= O_NONBLOCK;
 | ||||
|   //fcntl(0, F_SETFL, flags);
 | ||||
| 
 | ||||
|   int infile = fileno(stdin); | ||||
|   int poller = epoll_create(1); | ||||
|   struct epoll_event ev; | ||||
|  | @ -69,7 +150,7 @@ int main( int argc, char * argv[] ) { | |||
|   while(!feof(stdin) && !All_Hell_Broke_Loose){ | ||||
|     //invalidate the current buffer
 | ||||
|     ringbuf[current_buffer]->number = -1; | ||||
|     if ((epoll_wait(poller, events, 1, 100) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){ | ||||
|     if ((epoll_wait(poller, events, 1, 10) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){ | ||||
|       loopcount++; | ||||
|       packtype = ringbuf[current_buffer]->FLV->data[0]; | ||||
|       //store metadata, if available
 | ||||
|  | @ -124,35 +205,43 @@ int main( int argc, char * argv[] ) { | |||
|     } | ||||
|      | ||||
|     //check for new connections, accept them if there are any
 | ||||
|     incoming = listener.accept(&BError); | ||||
|     if (incoming){ | ||||
|       connectionList.push_back(user(incoming)); | ||||
|     incoming = DDV_Accept(listener, true); | ||||
|     if (incoming >= 0){ | ||||
|       users.push_back(incoming); | ||||
|       //send the FLV header
 | ||||
|       connectionList.back().MyBuffer = lastproper; | ||||
|       connectionList.back().MyBuffer_num = -1; | ||||
|       users.back().currsend = 0; | ||||
|       users.back().MyBuffer = lastproper; | ||||
|       users.back().MyBuffer_num = -1; | ||||
|       //TODO: Do this more nicely?
 | ||||
|       if (connectionList.back().Conn->send(FLVHeader,13,&BError) != 13){ | ||||
|         connectionList.back().disconnect("failed to receive the header!"); | ||||
|       if (!DDV_write(FLVHeader, 13, incoming)){ | ||||
|         users.back().Disconnect("failed to receive the header!"); | ||||
|       }else{ | ||||
|         if (connectionList.back().Conn->send(metabuffer,metabuflen,&BError) != metabuflen){ | ||||
|           connectionList.back().disconnect("failed to receive metadata!"); | ||||
|         if (!DDV_write(metabuffer, metabuflen, incoming)){ | ||||
|           users.back().Disconnect("failed to receive metadata!"); | ||||
|         } | ||||
|       } | ||||
|       if (BError != SWBaseSocket::ok){ | ||||
|         connectionList.back().disconnect("Socket error: " + BError.get_error()); | ||||
|       } | ||||
|     } | ||||
|      | ||||
|     //send all connections what they need, if and when they need it
 | ||||
|     if (connectionList.size() > 0){ | ||||
|       for (connIt = connectionList.begin(); connIt != connectionList.end(); connIt++){ | ||||
|         if (!(*connIt).is_connected){connectionList.erase(connIt);break;} | ||||
|         (*connIt).Send(ringbuf, buffers); | ||||
|     if (users.size() > 0){ | ||||
|       for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ | ||||
|         if ((*usersIt).s == -1){ | ||||
|           users.erase(usersIt); break; | ||||
|         }else{ | ||||
|           (*usersIt).Send(ringbuf, buffers); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   }//main loop
 | ||||
| 
 | ||||
|   // disconnect listener
 | ||||
|   std::cout << "Reached EOF of input" << std::endl; | ||||
|   listener.disconnect(&BError); | ||||
|   close(listener); | ||||
|   while (users.size() > 0){ | ||||
|     for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ | ||||
|       (*usersIt).Disconnect("Shutting down..."); | ||||
|       if ((*usersIt).s == -1){users.erase(usersIt);break;} | ||||
|     } | ||||
|   } | ||||
|   return 0; | ||||
| } | ||||
|  |  | |||
|  | @ -1,84 +0,0 @@ | |||
| #include "buffer.h" | ||||
| #include "../sockets/SocketW.h" | ||||
| #include <iostream> | ||||
| 
 | ||||
| class user{ | ||||
|   public: | ||||
|     user(SWBaseSocket * newConn); | ||||
|     void disconnect(std::string reason); | ||||
|     void Send(buffer ** ringbuf, int buffers); | ||||
|     bool is_connected; | ||||
|     SWUnixSocket * Conn; | ||||
|     int MyBuffer; | ||||
|     int MyBuffer_num; | ||||
|     int MyBuffer_len; | ||||
|     int MyNum; | ||||
|     void * lastpointer; | ||||
|     static int UserCount; | ||||
|     static SWBaseSocket::SWBaseError err; | ||||
| };//user
 | ||||
| 
 | ||||
| int user::UserCount = 0; | ||||
| SWBaseSocket::SWBaseError user::err; | ||||
| 
 | ||||
| user::user(SWBaseSocket * newConn) { | ||||
|   Conn = (SWUnixSocket*)newConn; | ||||
|   is_connected = (Conn != 0); | ||||
|   MyNum = UserCount++; | ||||
|   std::cout << "User " << MyNum << " connected" << std::endl; | ||||
| } | ||||
| 
 | ||||
| void user::disconnect(std::string reason) { | ||||
|   if (Conn) { | ||||
|     Conn->disconnect(&err); | ||||
|     Conn = NULL; | ||||
|     std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; | ||||
|   } | ||||
|   is_connected = false; | ||||
| } | ||||
| 
 | ||||
| void user::Send(buffer ** ringbuf, int buffers){ | ||||
|   //not connected? cancel
 | ||||
|   if (!is_connected){return;} | ||||
|   //still waiting for next buffer? check it
 | ||||
|   if (MyBuffer_num < 0){ | ||||
|     MyBuffer_num = ringbuf[MyBuffer]->number; | ||||
|     //still waiting? don't crash - wait longer.
 | ||||
|     if (MyBuffer_num < 0){ | ||||
|       return; | ||||
|     }else{ | ||||
|       MyBuffer_len = ringbuf[MyBuffer]->FLV->len; | ||||
|       lastpointer = ringbuf[MyBuffer]->FLV->data; | ||||
|     } | ||||
|   } | ||||
|   if (lastpointer != ringbuf[MyBuffer]->FLV->data){ | ||||
|     disconnect("Buffer resize at wrong time... had to disconnect"); | ||||
|     return; | ||||
|   } | ||||
|   int ret = Conn->fsend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len, &err); | ||||
|   if ((err != SWBaseSocket::ok) && (err != SWBaseSocket::notReady)){ | ||||
|     disconnect("Socket error: " + err.get_error()); | ||||
|     return; | ||||
|   } | ||||
|   if (ret == MyBuffer_len){ | ||||
|     //completed a send - switch to next buffer
 | ||||
|     if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ | ||||
|       std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; | ||||
|       int nocrashcount = 0; | ||||
|       do{ | ||||
|         MyBuffer++; | ||||
|         nocrashcount++; | ||||
|         MyBuffer %= buffers; | ||||
|       }while(!ringbuf[MyBuffer]->FLV->isKeyframe && (nocrashcount < buffers)); | ||||
|       if (nocrashcount >= buffers){ | ||||
|         std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl; | ||||
|         return; | ||||
|       } | ||||
|     }else{ | ||||
|       MyBuffer++; | ||||
|       MyBuffer %= buffers; | ||||
|     } | ||||
|     MyBuffer_num = -1; | ||||
|     lastpointer = 0; | ||||
|   } | ||||
| } | ||||
|  | @ -1,4 +1,4 @@ | |||
| SRC = main.cpp ../sockets/sw_base.cpp ../sockets/sw_inet.cpp ../sockets/sw_unix.cpp | ||||
| SRC = main.cpp | ||||
| OBJ = $(SRC:.cpp=.o) | ||||
| OUT = Connector_RTMP | ||||
| INCLUDES =  | ||||
|  |  | |||
							
								
								
									
										3
									
								
								Makefile
									
										
									
									
									
								
							
							
						
						
									
										3
									
								
								Makefile
									
										
									
									
									
								
							|  | @ -14,6 +14,9 @@ client-clean: | |||
| 	cd Buffer; $(MAKE) clean | ||||
| clean: client-clean | ||||
| client-install: client-clean client | ||||
| 	mkdir /tmp/cores | ||||
| 	chmod 777 /tmp/cores | ||||
| 	echo "/tmp/cores/%e.%s.%p" > /proc/sys/kernel/core_pattern | ||||
| 	service xinetd stop | ||||
| 	cp -f ./Connector_HTTP/Connector_HTTP /usr/bin/ | ||||
| 	cd Connector_RTMP; $(MAKE) install | ||||
|  |  | |||
|  | @ -33,7 +33,6 @@ int DDV_OpenUnix(std::string adres, bool nonblock = false){ | |||
| 
 | ||||
| int DDV_Listen(int port){ | ||||
|   int s = socket(AF_INET, SOCK_STREAM, 0); | ||||
| 
 | ||||
|   int on = 1; | ||||
|   setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); | ||||
|   struct sockaddr_in addr; | ||||
|  | @ -57,9 +56,37 @@ int DDV_Listen(int port){ | |||
|   } | ||||
| } | ||||
| 
 | ||||
| int DDV_UnixListen(std::string adres, bool nonblock = false){ | ||||
|   unlink(adres.c_str()); | ||||
|   int s = socket(AF_UNIX, SOCK_STREAM, 0); | ||||
|   if (nonblock){ | ||||
|     int flags = fcntl(s, F_GETFL, 0); | ||||
|     flags |= O_NONBLOCK; | ||||
|     fcntl(s, F_SETFL, flags); | ||||
|   } | ||||
|   sockaddr_un addr; | ||||
|   addr.sun_family = AF_UNIX; | ||||
|   strncpy(addr.sun_path, adres.c_str(), adres.size()+1); | ||||
|   int ret = bind(s, (sockaddr*)&addr, sizeof(addr)); | ||||
|   if (ret == 0){ | ||||
|     ret = listen(s, 100);//start listening, backlog of 100 allowed
 | ||||
|     if (ret == 0){ | ||||
|       return s; | ||||
|     }else{ | ||||
|       fprintf(stderr, "Listen failed! Error: %s\n", strerror(errno)); | ||||
|       close(s); | ||||
|       return 0; | ||||
|     } | ||||
|   }else{ | ||||
|     fprintf(stderr, "Binding failed! Error: %s\n", strerror(errno)); | ||||
|     close(s); | ||||
|     return 0; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| int DDV_Accept(int sock, bool nonblock = false){ | ||||
|   int r = accept(sock, 0, 0); | ||||
|   if ((r > 0) && nonblock){ | ||||
|   if ((r >= 0) && nonblock){ | ||||
|     int flags = fcntl(r, F_GETFL, 0); | ||||
|     flags |= O_NONBLOCK; | ||||
|     fcntl(r, F_SETFL, flags); | ||||
|  | @ -126,11 +153,11 @@ int DDV_iwrite(void * buffer, int todo, int sock){ | |||
|   int r = send(sock, buffer, todo, 0); | ||||
|   if (r < 0){ | ||||
|     switch (errno){ | ||||
|       case EWOULDBLOCK: break; | ||||
|       case EWOULDBLOCK: return 0; break; | ||||
|       default: | ||||
|         socketError = true; | ||||
|         fprintf(stderr, "Could not write! %s\n", strerror(errno)); | ||||
|         return false; | ||||
|         return 0; | ||||
|         break; | ||||
|     } | ||||
|   } | ||||
|  | @ -145,7 +172,7 @@ int DDV_iread(void * buffer, int todo, int sock){ | |||
|       default: | ||||
|         socketError = true; | ||||
|         fprintf(stderr, "Could not read! %s\n", strerror(errno)); | ||||
|         return false; | ||||
|         return 0; | ||||
|         break; | ||||
|     } | ||||
|   } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Thulinma
						Thulinma