From 47370e9621a301fd727d29194b608caa451dd430 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 20 Nov 2010 19:54:44 +0100 Subject: [PATCH] Rewrite Buffer programma --- Buffer/Makefile | 2 +- Buffer/buffer.h | 7 -- Buffer/main.cpp | 187 +++++++++++++++++++++++++++++----------- Buffer/user.cpp | 84 ------------------ Connector_RTMP/Makefile | 2 +- Makefile | 3 + util/ddv_socket.cpp | 37 ++++++-- 7 files changed, 175 insertions(+), 147 deletions(-) delete mode 100644 Buffer/buffer.h delete mode 100644 Buffer/user.cpp diff --git a/Buffer/Makefile b/Buffer/Makefile index a1e7bd97..aa683d41 100644 --- a/Buffer/Makefile +++ b/Buffer/Makefile @@ -1,4 +1,4 @@ -SRC = main.cpp ../sockets/sw_base.cpp ../sockets/sw_inet.cpp ../sockets/sw_unix.cpp +SRC = main.cpp OBJ = $(SRC:.cpp=.o) OUT = Buffer INCLUDES = diff --git a/Buffer/buffer.h b/Buffer/buffer.h deleted file mode 100644 index 5a18603f..00000000 --- a/Buffer/buffer.h +++ /dev/null @@ -1,7 +0,0 @@ -#pragma once - -struct buffer{ - int number; - bool iskeyframe; - FLV_Pack * FLV; -};//buffer diff --git a/Buffer/main.cpp b/Buffer/main.cpp index 638524cb..77376630 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -1,62 +1,143 @@ -#include #include #include -#include "../sockets/SocketW.h" #include #include #include #include #include #include +#include #include "../util/flv.cpp" //FLV format parser -#include "user.cpp" +#include "../util/ddv_socket.cpp" //DDV Socket lib #include -int get_empty( user ** list, int amount ) { - for (int i = 0; i < amount; i++ ){ - if (!list[i]->is_connected){return i;} - } - return -1; +void termination_handler (int signum){ + return; } + +struct buffer{ + int number; + bool iskeyframe; + FLV_Pack * FLV; + buffer(){ + number = -1; + iskeyframe = false; + FLV = 0; + }//constructor +};//buffer + +class user{ + public: + int MyBuffer; + int MyBuffer_num; + int MyBuffer_len; + int MyNum; + int currsend; + void * lastpointer; + static int UserCount; + int s; + user(int fd){ + s = fd; + MyNum = UserCount++; + std::cout << "User " << MyNum << " connected" << std::endl; + }//constructor + void Disconnect(std::string reason) { + if (s != -1) { + close(s); + s = -1; + std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; + } + }//Disconnect + bool doSend(char * buffer, int todo){ + int r = send(s, buffer+currsend, todo-currsend, 0); + if (r <= 0){ + if ((r < 0) && (errno == EWOULDBLOCK)){return false;} + Disconnect("Connection closed"); + return false; + } + currsend += r; + return (currsend == todo); + } + void Send(buffer ** ringbuf, int buffers){ + //not connected? cancel + if (s < 0){return;} + //still waiting for next buffer? check it + if (MyBuffer_num < 0){ + MyBuffer_num = ringbuf[MyBuffer]->number; + //still waiting? don't crash - wait longer. + if (MyBuffer_num < 0){ + return; + }else{ + MyBuffer_len = ringbuf[MyBuffer]->FLV->len; + lastpointer = ringbuf[MyBuffer]->FLV->data; + } + } + if (lastpointer != ringbuf[MyBuffer]->FLV->data){ + Disconnect("Buffer resize at wrong time... had to disconnect"); + return; + } + if (doSend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len)){ + //completed a send - switch to next buffer + if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ + std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; + int nocrashcount = 0; + do{ + MyBuffer++; + nocrashcount++; + MyBuffer %= buffers; + }while(!ringbuf[MyBuffer]->FLV->isKeyframe && (nocrashcount < buffers)); + if (nocrashcount >= buffers){ + std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl; + return; + } + }else{ + MyBuffer++; + MyBuffer %= buffers; + } + MyBuffer_num = -1; + lastpointer = 0; + currsend = 0; + }//completed a send + }//send +}; +int user::UserCount = 0; + int main( int argc, char * argv[] ) { + struct sigaction new_action; + new_action.sa_handler = termination_handler; + sigemptyset (&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction (SIGPIPE, &new_action, NULL); + if (argc < 2) { std::cout << "usage: " << argv[0] << " buffers_count [streamname]" << std::endl; return 1; } - int metabuflen = 0; - char * metabuffer = 0; - int buffers = atoi(argv[1]); - buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); - std::vector connectionList; - std::vector::iterator connIt; - for (int i = 0; i < buffers; ++i) ringbuf[i] = new buffer; - int current_buffer = 0; - int lastproper = 0;//last properly finished buffer number - unsigned int loopcount = 0; - SWUnixSocket listener(SWBaseSocket::nonblocking); - SWBaseSocket * incoming = 0; - SWBaseSocket::SWBaseError BError; - std::string shared_socket = "/tmp/shared_socket"; if (argc > 2){ shared_socket = argv[2]; shared_socket = "/tmp/shared_socket_" + shared_socket; } - unlink(shared_socket.c_str()); - listener.bind(shared_socket.c_str()); - listener.listen(50); - listener.set_timeout(0,50000); + + int metabuflen = 0; + char * metabuffer = 0; + int buffers = atoi(argv[1]); + buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); + std::vector users; + std::vector::iterator usersIt; + for (int i = 0; i < buffers; ++i) ringbuf[i] = new buffer; + int current_buffer = 0; + int lastproper = 0;//last properly finished buffer number + unsigned int loopcount = 0; + int listener = DDV_UnixListen(shared_socket, true); + int incoming = 0; + unsigned char packtype; bool gotVideoInfo = false; bool gotAudioInfo = false; - //set stdin to be nonblocking - //int flags = fcntl(0, F_GETFL, 0); - //flags |= O_NONBLOCK; - //fcntl(0, F_SETFL, flags); - int infile = fileno(stdin); int poller = epoll_create(1); struct epoll_event ev; @@ -69,7 +150,7 @@ int main( int argc, char * argv[] ) { while(!feof(stdin) && !All_Hell_Broke_Loose){ //invalidate the current buffer ringbuf[current_buffer]->number = -1; - if ((epoll_wait(poller, events, 1, 100) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){ + if ((epoll_wait(poller, events, 1, 10) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){ loopcount++; packtype = ringbuf[current_buffer]->FLV->data[0]; //store metadata, if available @@ -124,35 +205,43 @@ int main( int argc, char * argv[] ) { } //check for new connections, accept them if there are any - incoming = listener.accept(&BError); - if (incoming){ - connectionList.push_back(user(incoming)); + incoming = DDV_Accept(listener, true); + if (incoming >= 0){ + users.push_back(incoming); //send the FLV header - connectionList.back().MyBuffer = lastproper; - connectionList.back().MyBuffer_num = -1; + users.back().currsend = 0; + users.back().MyBuffer = lastproper; + users.back().MyBuffer_num = -1; //TODO: Do this more nicely? - if (connectionList.back().Conn->send(FLVHeader,13,&BError) != 13){ - connectionList.back().disconnect("failed to receive the header!"); + if (!DDV_write(FLVHeader, 13, incoming)){ + users.back().Disconnect("failed to receive the header!"); }else{ - if (connectionList.back().Conn->send(metabuffer,metabuflen,&BError) != metabuflen){ - connectionList.back().disconnect("failed to receive metadata!"); + if (!DDV_write(metabuffer, metabuflen, incoming)){ + users.back().Disconnect("failed to receive metadata!"); } } - if (BError != SWBaseSocket::ok){ - connectionList.back().disconnect("Socket error: " + BError.get_error()); - } } + //send all connections what they need, if and when they need it - if (connectionList.size() > 0){ - for (connIt = connectionList.begin(); connIt != connectionList.end(); connIt++){ - if (!(*connIt).is_connected){connectionList.erase(connIt);break;} - (*connIt).Send(ringbuf, buffers); + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if ((*usersIt).s == -1){ + users.erase(usersIt); break; + }else{ + (*usersIt).Send(ringbuf, buffers); + } } } }//main loop // disconnect listener std::cout << "Reached EOF of input" << std::endl; - listener.disconnect(&BError); + close(listener); + while (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + (*usersIt).Disconnect("Shutting down..."); + if ((*usersIt).s == -1){users.erase(usersIt);break;} + } + } return 0; } diff --git a/Buffer/user.cpp b/Buffer/user.cpp deleted file mode 100644 index 19c2878e..00000000 --- a/Buffer/user.cpp +++ /dev/null @@ -1,84 +0,0 @@ -#include "buffer.h" -#include "../sockets/SocketW.h" -#include - -class user{ - public: - user(SWBaseSocket * newConn); - void disconnect(std::string reason); - void Send(buffer ** ringbuf, int buffers); - bool is_connected; - SWUnixSocket * Conn; - int MyBuffer; - int MyBuffer_num; - int MyBuffer_len; - int MyNum; - void * lastpointer; - static int UserCount; - static SWBaseSocket::SWBaseError err; -};//user - -int user::UserCount = 0; -SWBaseSocket::SWBaseError user::err; - -user::user(SWBaseSocket * newConn) { - Conn = (SWUnixSocket*)newConn; - is_connected = (Conn != 0); - MyNum = UserCount++; - std::cout << "User " << MyNum << " connected" << std::endl; -} - -void user::disconnect(std::string reason) { - if (Conn) { - Conn->disconnect(&err); - Conn = NULL; - std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; - } - is_connected = false; -} - -void user::Send(buffer ** ringbuf, int buffers){ - //not connected? cancel - if (!is_connected){return;} - //still waiting for next buffer? check it - if (MyBuffer_num < 0){ - MyBuffer_num = ringbuf[MyBuffer]->number; - //still waiting? don't crash - wait longer. - if (MyBuffer_num < 0){ - return; - }else{ - MyBuffer_len = ringbuf[MyBuffer]->FLV->len; - lastpointer = ringbuf[MyBuffer]->FLV->data; - } - } - if (lastpointer != ringbuf[MyBuffer]->FLV->data){ - disconnect("Buffer resize at wrong time... had to disconnect"); - return; - } - int ret = Conn->fsend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len, &err); - if ((err != SWBaseSocket::ok) && (err != SWBaseSocket::notReady)){ - disconnect("Socket error: " + err.get_error()); - return; - } - if (ret == MyBuffer_len){ - //completed a send - switch to next buffer - if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ - std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; - int nocrashcount = 0; - do{ - MyBuffer++; - nocrashcount++; - MyBuffer %= buffers; - }while(!ringbuf[MyBuffer]->FLV->isKeyframe && (nocrashcount < buffers)); - if (nocrashcount >= buffers){ - std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl; - return; - } - }else{ - MyBuffer++; - MyBuffer %= buffers; - } - MyBuffer_num = -1; - lastpointer = 0; - } -} diff --git a/Connector_RTMP/Makefile b/Connector_RTMP/Makefile index 8a32051b..65d7cf48 100644 --- a/Connector_RTMP/Makefile +++ b/Connector_RTMP/Makefile @@ -1,4 +1,4 @@ -SRC = main.cpp ../sockets/sw_base.cpp ../sockets/sw_inet.cpp ../sockets/sw_unix.cpp +SRC = main.cpp OBJ = $(SRC:.cpp=.o) OUT = Connector_RTMP INCLUDES = diff --git a/Makefile b/Makefile index 42ead03d..a963ea54 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,9 @@ client-clean: cd Buffer; $(MAKE) clean clean: client-clean client-install: client-clean client + mkdir /tmp/cores + chmod 777 /tmp/cores + echo "/tmp/cores/%e.%s.%p" > /proc/sys/kernel/core_pattern service xinetd stop cp -f ./Connector_HTTP/Connector_HTTP /usr/bin/ cd Connector_RTMP; $(MAKE) install diff --git a/util/ddv_socket.cpp b/util/ddv_socket.cpp index dc4002c3..e7b6c099 100644 --- a/util/ddv_socket.cpp +++ b/util/ddv_socket.cpp @@ -33,7 +33,6 @@ int DDV_OpenUnix(std::string adres, bool nonblock = false){ int DDV_Listen(int port){ int s = socket(AF_INET, SOCK_STREAM, 0); - int on = 1; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); struct sockaddr_in addr; @@ -57,9 +56,37 @@ int DDV_Listen(int port){ } } +int DDV_UnixListen(std::string adres, bool nonblock = false){ + unlink(adres.c_str()); + int s = socket(AF_UNIX, SOCK_STREAM, 0); + if (nonblock){ + int flags = fcntl(s, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(s, F_SETFL, flags); + } + sockaddr_un addr; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, adres.c_str(), adres.size()+1); + int ret = bind(s, (sockaddr*)&addr, sizeof(addr)); + if (ret == 0){ + ret = listen(s, 100);//start listening, backlog of 100 allowed + if (ret == 0){ + return s; + }else{ + fprintf(stderr, "Listen failed! Error: %s\n", strerror(errno)); + close(s); + return 0; + } + }else{ + fprintf(stderr, "Binding failed! Error: %s\n", strerror(errno)); + close(s); + return 0; + } +} + int DDV_Accept(int sock, bool nonblock = false){ int r = accept(sock, 0, 0); - if ((r > 0) && nonblock){ + if ((r >= 0) && nonblock){ int flags = fcntl(r, F_GETFL, 0); flags |= O_NONBLOCK; fcntl(r, F_SETFL, flags); @@ -126,11 +153,11 @@ int DDV_iwrite(void * buffer, int todo, int sock){ int r = send(sock, buffer, todo, 0); if (r < 0){ switch (errno){ - case EWOULDBLOCK: break; + case EWOULDBLOCK: return 0; break; default: socketError = true; fprintf(stderr, "Could not write! %s\n", strerror(errno)); - return false; + return 0; break; } } @@ -145,7 +172,7 @@ int DDV_iread(void * buffer, int todo, int sock){ default: socketError = true; fprintf(stderr, "Could not read! %s\n", strerror(errno)); - return false; + return 0; break; } }