From 2529d5c0f016fb88a5c438e4b05ecb49de36beb9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 8 Nov 2010 15:28:56 +0100 Subject: [PATCH] Nieuwe flv parser, nieuwe unix sockets support... --- Buffer/main.cpp | 4 ++ Connector_RTMP/main.cpp | 49 ++++++++---------- util/ddv_socket.cpp | 52 +++++++++++++++++-- util/flv_sock.cpp | 108 ++++++++++++++++++++++++++++++---------- 4 files changed, 156 insertions(+), 57 deletions(-) diff --git a/Buffer/main.cpp b/Buffer/main.cpp index e81f80fa..8330108f 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -78,6 +78,10 @@ int main( int argc, char * argv[] ) { metabuffer = (char*)realloc(metabuffer, metabuflen); memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen); std::cout << "Received metadata!" << std::endl; + if (gotVideoInfo && gotAudioInfo){ + All_Hell_Broke_Loose = true; + std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl; + } gotVideoInfo = false; gotAudioInfo = false; } diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 22c7fc4f..f04c6de2 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -10,15 +10,14 @@ #include //for connection to server -#include "../sockets/SocketW.h" bool ready4data = false;//set to true when streaming starts bool inited = false; bool stopparsing = false; timeval lastrec; int CONN_fd = 0; -#include "../util/flv_sock.cpp" //FLV parsing with SocketW #include "../util/ddv_socket.cpp" //DDVTech Socket wrapper +#include "../util/flv_sock.cpp" //FLV parsing with SocketW #include "parsechunks.cpp" //chunkstream parsing #include "handshake.cpp" //handshaking @@ -64,7 +63,8 @@ int main(){ unsigned int ts; unsigned int fts = 0; unsigned int ftst; - SWUnixSocket ss; + int ss; + FLV_Pack * tag; //first timestamp set firsttime = getNowMS(); @@ -98,7 +98,7 @@ int main(){ - while (!socketError){ + while (!socketError && !All_Hell_Broke_Loose){ //only parse input if available or not yet init'ed //rightnow = getNowMS(); retval = epoll_wait(poller, events, 1, 0); @@ -108,51 +108,44 @@ int main(){ if (ready4data){ if (!inited){ //we are ready, connect the socket! - if (!ss.connect(streamname.c_str())){ + ss = DDV_OpenUnix(streamname.c_str()); + if (ss <= 0){ #ifdef DEBUG fprintf(stderr, "Could not connect to server!\n"); #endif return 0; } - FLV_Readheader(ss);//read the header, we don't want it #ifdef DEBUG - fprintf(stderr, "Header read, starting to send video data...\n"); + fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif inited = true; } //only send data if previous data has been ACK'ed... if (snd_cnt - snd_window_at < snd_window_size){ - if (FLV_GetPacket(ss)){//able to read a full packet? - ts = FLVbuffer[7] * 256*256*256; - ts += FLVbuffer[4] * 256*256; - ts += FLVbuffer[5] * 256; - ts += FLVbuffer[6]; + if (FLV_GetPacket(tag, ss)){//able to read a full packet? + ts = tag->data[7] * 256*256*256; + ts += tag->data[4] * 256*256; + ts += tag->data[5] * 256; + ts += tag->data[6]; if (ts != 0){ if (fts == 0){fts = ts;ftst = getNowMS();} ts -= fts; - FLVbuffer[7] = ts / (256*256*256); - FLVbuffer[4] = ts / (256*256); - FLVbuffer[5] = ts / 256; - FLVbuffer[6] = ts % 256; + tag->data[7] = ts / (256*256*256); + tag->data[4] = ts / (256*256); + tag->data[5] = ts / 256; + tag->data[6] = ts % 256; ts += ftst; }else{ ftst = getNowMS(); - FLVbuffer[7] = ftst / (256*256*256); - FLVbuffer[4] = ftst / (256*256); - FLVbuffer[5] = ftst / 256; - FLVbuffer[6] = ftst % 256; + tag->data[7] = ftst / (256*256*256); + tag->data[4] = ftst / (256*256); + tag->data[5] = ftst / 256; + tag->data[6] = ftst % 256; } - SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15, ts); + SendMedia((unsigned char)tag->data[0], (unsigned char *)tag->data+11, tag->len-15, ts); #ifdef DEBUG fprintf(stderr, "Sent a tag to %i\n", CONN_fd); #endif - FLV_Dump();//dump packet and get ready for next - } - if ((SWBerr != SWBaseSocket::ok) && (SWBerr != SWBaseSocket::notReady)){ - #ifdef DEBUG - fprintf(stderr, "No more data! :-( (%s)\n", SWBerr.get_error().c_str()); - #endif - return 0;//no more input possible! Fail immediately. } } } diff --git a/util/ddv_socket.cpp b/util/ddv_socket.cpp index 4e1341d2..caece508 100644 --- a/util/ddv_socket.cpp +++ b/util/ddv_socket.cpp @@ -1,13 +1,34 @@ #include #include +#include #include #include #include #include #include +#include bool socketError = false; +int DDV_OpenUnix(const char adres[], bool nonblock = false){ + int s = socket(AF_UNIX, SOCK_STREAM, 0); + sockaddr_un addr; + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, adres); + int r = connect(s, (sockaddr*)&adres, sizeof(addr)); + if (r == 0){ + if (nonblock){ + int flags = fcntl(s, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(s, F_SETFL, flags); + } + return s; + }else{ + close(s); + return 0; + } +} + int DDV_Listen(int port){ int s = socket(AF_INET, SOCK_STREAM, 0); @@ -36,9 +57,8 @@ int DDV_Accept(int sock){ return accept(sock, 0, 0); } -bool DDV_write(void * buffer, int width, int count, int sock){ +bool DDV_write(void * buffer, int todo, int sock){ int sofar = 0; - int todo = width*count; while (sofar != todo){ int r = send(sock, (char*)buffer + sofar, todo-sofar, 0); if (r <= 0){ @@ -51,9 +71,8 @@ bool DDV_write(void * buffer, int width, int count, int sock){ return true; } -bool DDV_read(void * buffer, int width, int count, int sock){ +bool DDV_read(void * buffer, int todo, int sock){ int sofar = 0; - int todo = width*count; while (sofar != todo){ int r = recv(sock, (char*)buffer + sofar, todo-sofar, 0); if (r <= 0){ @@ -65,3 +84,28 @@ bool DDV_read(void * buffer, int width, int count, int sock){ } return true; } + + +bool DDV_read(void * buffer, int width, int count, int sock){return DDV_read(buffer, width*count, sock);} +bool DDV_write(void * buffer, int width, int count, int sock){return DDV_write(buffer, width*count, sock);} + + +int DDV_iwrite(void * buffer, int todo, int sock){ + int r = send(sock, buffer, todo, 0); + if (r < 0){ + socketError = true; + printf("Could not write! %s\n", strerror(errno)); + } + return r; +} + +int DDV_iread(void * buffer, int todo, int sock){ + int r = recv(sock, buffer, todo, 0); + if (r < 0){ + socketError = true; + printf("Could not write! %s\n", strerror(errno)); + } + return r; +} + + diff --git a/util/flv_sock.cpp b/util/flv_sock.cpp index 60bc3abe..ac520b43 100644 --- a/util/flv_sock.cpp +++ b/util/flv_sock.cpp @@ -1,34 +1,92 @@ -SWBaseSocket::SWBaseError SWBerr; -char * FLVbuffer; -int FLV_len; -int FLVbs = 0; -bool HeaderDone = false; -static char FLVheader[13]; -void FLV_Readheader(SWUnixSocket & ss){ -}//FLV_Readheader +struct FLV_Pack { + int len; + int buf; + bool isKeyframe; + char * data; +};//FLV_Pack -void FLV_Dump(){FLV_len = 0;} +char FLVHeader[13]; +bool All_Hell_Broke_Loose = false; -bool FLV_GetPacket(SWUnixSocket & ss){ - if (!HeaderDone){ - if (ss.frecv(FLVheader, 13, &SWBerr) == 13){HeaderDone = true;} - return false; - } +//checks FLV Header for correctness +//returns true if everything is alright, false otherwise +bool FLV_Checkheader(char * header){ + if (header[0] != 'F') return false; + if (header[1] != 'L') return false; + if (header[2] != 'V') return false; + if (header[8] != 0x09) return false; + if (header[9] != 0) return false; + if (header[10] != 0) return false; + if (header[11] != 0) return false; + if (header[12] != 0) return false; + return true; +}//FLV_Checkheader - - if (FLVbs < 15){FLVbuffer = (char*)realloc(FLVbuffer, 15); FLVbs = 15;} - //if received a whole header, receive a whole packet - //if not, retry header next pass - if (FLV_len == 0){ - if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){ - FLV_len = FLVbuffer[3] + 15; - FLV_len += (FLVbuffer[2] << 8); - FLV_len += (FLVbuffer[1] << 16); - if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;} +//returns true if header is an FLV header +bool FLV_Isheader(char * header){ + if (header[0] != 'F') return false; + if (header[1] != 'L') return false; + if (header[2] != 'V') return false; + return true; +}//FLV_Isheader + +bool ReadUntil(char * buffer, unsigned int count, unsigned int & sofar, int sock){ + if (sofar >= count){return true;} + int r = 0; + r = DDV_iread(buffer + sofar,count-sofar,sock); + if (r < 0){All_Hell_Broke_Loose = true; return false;} + sofar += r; + if (sofar >= count){return true;} + return false; +} + +//gets a packet, storing in given FLV_Pack pointer. +//will assign pointer if null +//resizes FLV_Pack data field bigger if data doesn't fit +// (does not auto-shrink for speed!) +bool FLV_GetPacket(FLV_Pack *& p, int sock){ + int preflags = fcntl(sock, F_GETFL, 0); + int postflags = preflags | O_NONBLOCK; + fcntl(sock, F_SETFL, postflags); + static bool done = true; + static unsigned int sofar = 0; + if (!p){p = (FLV_Pack*)calloc(1, sizeof(FLV_Pack));} + if (p->buf < 15){p->data = (char*)realloc(p->data, 15); p->buf = 15;} + + if (done){ + //read a header + if (ReadUntil(p->data, 11, sofar, sock)){ + //if its a correct FLV header, throw away and read tag header + if (FLV_Isheader(p->data)){ + if (ReadUntil(p->data, 13, sofar, sock)){ + if (FLV_Checkheader(p->data)){ + sofar = 0; + memcpy(FLVHeader, p->data, 13); + }else{All_Hell_Broke_Loose = true;} + } + }else{ + //if a tag header, calculate length and read tag body + p->len = p->data[3] + 15; + p->len += (p->data[2] << 8); + p->len += (p->data[1] << 16); + if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;} + done = false; + } } }else{ - if (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) == FLV_len-11){return true;} + //read tag body + if (ReadUntil(p->data, p->len, sofar, sock)){ + //calculate keyframeness, next time read header again, return true + p->isKeyframe = false; + if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;} + done = true; + sofar = 0; + fcntl(sock, F_SETFL, preflags); + return true; + } } + fcntl(sock, F_SETFL, preflags); return false; }//FLV_GetPacket +