diff --git a/Buffer/main.cpp b/Buffer/main.cpp index fb078177..8330108f 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -1,3 +1,5 @@ +#include +#include #include #include "../sockets/SocketW.h" #include @@ -9,6 +11,8 @@ #include "../util/flv.cpp" //FLV format parser #include "user.cpp" +#include + int get_empty( user ** list, int amount ) { for (int i = 0; i < amount; i++ ){ if (!list[i]->is_connected){return i;} @@ -47,15 +51,26 @@ int main( int argc, char * argv[] ) { unsigned char packtype; bool gotVideoInfo = false; bool gotAudioInfo = false; - while(std::cin.good() && std::cout.good()) { - loopcount ++; + + //set stdin to be nonblocking + //int flags = fcntl(0, F_GETFL, 0); + //flags |= O_NONBLOCK; + //fcntl(0, F_SETFL, flags); + + int infile = fileno(stdin); + int poller = epoll_create(1); + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.fd = infile; + epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev); + struct epoll_event events[1]; + + + while(!feof(stdin) && !All_Hell_Broke_Loose){ //invalidate the current buffer ringbuf[current_buffer]->number = -1; - if (std::cin.peek() == 'F') { - //new FLV file, read the file header again. - FLV_Readheader(); - } else { - FLV_GetPacket(ringbuf[current_buffer]->FLV); + if ((epoll_wait(poller, events, 1, 100) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){ + loopcount ++; packtype = ringbuf[current_buffer]->FLV->data[0]; //store metadata, if available if (packtype == 0x12){ @@ -63,6 +78,10 @@ int main( int argc, char * argv[] ) { metabuffer = (char*)realloc(metabuffer, metabuflen); memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen); std::cout << "Received metadata!" << std::endl; + if (gotVideoInfo && gotAudioInfo){ + All_Hell_Broke_Loose = true; + std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl; + } gotVideoInfo = false; gotAudioInfo = false; } @@ -96,35 +115,39 @@ int main( int argc, char * argv[] ) { if (packtype == 0x09){ if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 1){lastproper = current_buffer;} } - incoming = listener.accept(&BError); - if (incoming){ - connectionList.push_back(user(incoming)); - //send the FLV header - connectionList.back().MyBuffer = lastproper; - connectionList.back().MyBuffer_num = -1; - //TODO: Do this more nicely? - if (connectionList.back().Conn->send(FLVHeader,13,&BError) != 13){ - connectionList.back().disconnect("failed to receive the header!"); - }else{ - if (connectionList.back().Conn->send(metabuffer,metabuflen,&BError) != metabuflen){ - connectionList.back().disconnect("failed to receive metadata!"); - } - } - if (BError != SWBaseSocket::ok){ - connectionList.back().disconnect("Socket error: " + BError.get_error()); + //keep track of buffers + current_buffer++; + current_buffer %= buffers; + ringbuf[current_buffer]->number = loopcount; + } + + //check for new connections, accept them if there are any + incoming = listener.accept(&BError); + if (incoming){ + connectionList.push_back(user(incoming)); + //send the FLV header + connectionList.back().MyBuffer = lastproper; + connectionList.back().MyBuffer_num = -1; + //TODO: Do this more nicely? + if (connectionList.back().Conn->send(FLVHeader,13,&BError) != 13){ + connectionList.back().disconnect("failed to receive the header!"); + }else{ + if (connectionList.back().Conn->send(metabuffer,metabuflen,&BError) != metabuflen){ + connectionList.back().disconnect("failed to receive metadata!"); } } - ringbuf[current_buffer]->number = loopcount; - //send all connections what they need, if and when they need it + if (BError != SWBaseSocket::ok){ + connectionList.back().disconnect("Socket error: " + BError.get_error()); + } + } + //send all connections what they need, if and when they need it + if (connectionList.size() > 0){ for (connIt = connectionList.begin(); connIt != connectionList.end(); connIt++){ if (!(*connIt).is_connected){connectionList.erase(connIt);break;} (*connIt).Send(ringbuf, buffers); } - //keep track of buffers - current_buffer++; - current_buffer %= buffers; } - } + }//main loop // disconnect listener std::cout << "Reached EOF of input" << std::endl; diff --git a/Connector_RTMP/Conn_RTMP b/Connector_RTMP/Conn_RTMP new file mode 100755 index 00000000..c1686a2f --- /dev/null +++ b/Connector_RTMP/Conn_RTMP @@ -0,0 +1,58 @@ +#!/bin/sh +# +# chkconfig: 345 92 8 +# description: DDVTech RTMP Connector +# +# processname: Connector_RTMP + +. /etc/rc.d/init.d/functions + +prog="Connector_RTMP" +fullprog="/usr/bin/Connector_RTMP" +RETVAL=0 + +start() { + gprintf "Starting %s: " $prog + daemon --user=root $fullprog + RETVAL=$? + echo + [ $RETVAL -eq 0 ] && touch /var/lock/subsys/$prog + return $RETVAL +} + +stop() { + gprintf "Stopping %s: " $prog + killproc $fullprog + RETVAL=$? + echo + [ $RETVAL -eq 0 ] && rm -f /var/lock/subsys/$prog + return $RETVAL +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + condrestart) + if [ -f /var/lock/subsys/$prog ]; then + stop + start + fi + ;; + status) + status $fullprog + RETVAL=$? + ;; + *) + gprintf "Usage: %s {start|stop|restart|status}" $0 + RETVAL=1 +esac + +exit $RETVAL diff --git a/Connector_RTMP/Makefile b/Connector_RTMP/Makefile index 60d3f086..1edeaee9 100644 --- a/Connector_RTMP/Makefile +++ b/Connector_RTMP/Makefile @@ -16,8 +16,8 @@ $(OUT): $(OBJ) chunkstream.cpp parsechunks.cpp handshake.cpp crypto.cpp amf.cpp $(CC) $(LIBS) -o $(OUT) $(OBJ) clean: rm -rf $(OBJ) $(OUT) Makefile.bak *~ -run-test: $(OUT) - rm -rf ./meh - mkfifo ./meh - cat ./meh & - nc -l -p 1935 -e './Connector_RTMP 2>./meh' +install: $(OUT) + -service Conn_RTMP stop + cp -f ./$(OUT) /usr/bin/ + cp -f ./Conn_RTMP /etc/init.d/ + service Conn_RTMP start diff --git a/Connector_RTMP/chunkstream.cpp b/Connector_RTMP/chunkstream.cpp index 9d9247aa..8edf2016 100644 --- a/Connector_RTMP/chunkstream.cpp +++ b/Connector_RTMP/chunkstream.cpp @@ -108,18 +108,18 @@ void SendChunk(chunkpack ch){ } } if (ch.cs_id <= 63){ - tmp = chtype | ch.cs_id; fwrite(&tmp, 1, 1, stdout); + tmp = chtype | ch.cs_id; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=1; }else{ if (ch.cs_id <= 255+64){ - tmp = chtype | 0; fwrite(&tmp, 1, 1, stdout); - tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout); + tmp = chtype | 0; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ch.cs_id - 64; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=2; }else{ - tmp = chtype | 1; fwrite(&tmp, 1, 1, stdout); + tmp = chtype | 1; DDV_write(&tmp, 1, 1, CONN_fd); tmpi = ch.cs_id - 64; - tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); - tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=3; } } @@ -129,67 +129,67 @@ void SendChunk(chunkpack ch){ if (chtype == 0x00){ tmpi = ch.timestamp; if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;} - tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); - tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); - tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / (256*256); DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=3; }else{ tmpi = ch.timestamp - prev.timestamp; if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;} - tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); - tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); - tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / (256*256); DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=3; } if (chtype != 0x80){ //len tmpi = ch.len; - tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); - tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); - tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / (256*256); DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=3; //msg type id - tmp = ch.msg_type_id; fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_type_id; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=1; if (chtype != 0x40){ //msg stream id - tmp = ch.msg_stream_id % 256; fwrite(&tmp, 1, 1, stdout); - tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, stdout); - tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, stdout); - tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id % 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ch.msg_stream_id / 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ch.msg_stream_id / (256*256); DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ch.msg_stream_id / (256*256*256); DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=4; } } } //support for 0x00ffffff timestamps if (ntime){ - tmp = ntime / (256*256*256); fwrite(&tmp, 1, 1, stdout); - tmp = ntime / (256*256); fwrite(&tmp, 1, 1, stdout); - tmp = ntime / 256; fwrite(&tmp, 1, 1, stdout); - tmp = ntime % 256; fwrite(&tmp, 1, 1, stdout); + tmp = ntime / (256*256*256); DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ntime / (256*256); DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ntime / 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ntime % 256; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=4; } ch.len_left = 0; while (ch.len_left < ch.len){ tmpi = ch.len - ch.len_left; if (tmpi > chunk_snd_max){tmpi = chunk_snd_max;} - fwrite((ch.data + ch.len_left), 1, tmpi, stdout); + DDV_write((ch.data + ch.len_left), 1, tmpi, CONN_fd); snd_cnt+=tmpi; ch.len_left += tmpi; if (ch.len_left < ch.len){ if (ch.cs_id <= 63){ - tmp = 0xC0 + ch.cs_id; fwrite(&tmp, 1, 1, stdout); + tmp = 0xC0 + ch.cs_id; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=1; }else{ if (ch.cs_id <= 255+64){ - tmp = 0xC0; fwrite(&tmp, 1, 1, stdout); - tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout); + tmp = 0xC0; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = ch.cs_id - 64; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=2; }else{ - tmp = 0xC1; fwrite(&tmp, 1, 1, stdout); + tmp = 0xC1; DDV_write(&tmp, 1, 1, CONN_fd); tmpi = ch.cs_id - 64; - tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); - tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd); + tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd); snd_cnt+=4; } } @@ -310,19 +310,19 @@ struct chunkpack getChunk(){ gettimeofday(&lastrec, 0); struct chunkpack ret; unsigned char temp; - fread(&(ret.chunktype), 1, 1, stdin); + DDV_read(&(ret.chunktype), 1, 1, CONN_fd); rec_cnt++; //read the chunkstream ID properly switch (ret.chunktype & 0x3F){ case 0: - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); rec_cnt++; ret.cs_id = temp + 64; break; case 1: - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.cs_id = temp + 64; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.cs_id += temp * 256; rec_cnt+=2; break; @@ -334,57 +334,57 @@ struct chunkpack getChunk(){ //process the rest of the header, for each chunk type switch (ret.chunktype & 0xC0){ case 0x00: - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp = temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.len = temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.len += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.len += temp; ret.len_left = 0; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.msg_type_id = temp; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.msg_stream_id = temp; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.msg_stream_id += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.msg_stream_id += temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.msg_stream_id += temp*256*256*256; rec_cnt+=11; break; case 0x40: - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp = temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp; ret.timestamp += prev.timestamp; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.len = temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.len += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.len += temp; ret.len_left = 0; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.msg_type_id = temp; ret.msg_stream_id = prev.msg_stream_id; rec_cnt+=7; break; case 0x80: - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp = temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp; ret.timestamp += prev.timestamp; ret.len = prev.len; @@ -414,20 +414,20 @@ struct chunkpack getChunk(){ } //read extended timestamp, if neccesary if (ret.timestamp == 0x00ffffff){ - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp = temp*256*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp*256*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp*256; - fread(&temp, 1, 1, stdin); + DDV_read(&temp, 1, 1, CONN_fd); ret.timestamp += temp; rec_cnt+=4; } //read data if length > 0, and allocate it if (ret.real_len > 0){ ret.data = (unsigned char*)malloc(ret.real_len); - fread(ret.data, 1, ret.real_len, stdin); + DDV_read(ret.data, 1, ret.real_len, CONN_fd); rec_cnt+=ret.real_len; }else{ ret.data = 0; @@ -484,7 +484,7 @@ chunkpack getWholeChunk(){ if (!clean){gwc_complete.data = 0; clean = true;}//prevent brain damage chunkpack * ret = 0; scrubChunk(gwc_complete); - while (counter < 10000){ + while (counter < 1000){ gwc_next = getChunk(); ret = AddChunkPart(gwc_next); scrubChunk(gwc_next); @@ -493,7 +493,7 @@ chunkpack getWholeChunk(){ free(ret);//cleanup returned chunk return gwc_complete; } - if (feof(stdin) != 0){break;} + if (socketError || socketBlocking){break;} counter++; } gwc_complete.msg_type_id = 0; diff --git a/Connector_RTMP/handshake.cpp b/Connector_RTMP/handshake.cpp index 43143dc8..a8461a83 100644 --- a/Connector_RTMP/handshake.cpp +++ b/Connector_RTMP/handshake.cpp @@ -14,11 +14,11 @@ bool doHandshake(){ Handshake Client; Handshake Server; /** Read C0 **/ - fread(&(Version), 1, 1, stdin); + DDV_read(&(Version), 1, 1, CONN_fd); /** Read C1 **/ - fread(Client.Time, 1, 4, stdin); - fread(Client.Zero, 1, 4, stdin); - fread(Client.Random, 1, 1528, stdin); + DDV_read(Client.Time, 1, 4, CONN_fd); + DDV_read(Client.Zero, 1, 4, CONN_fd); + DDV_read(Client.Random, 1, 1528, CONN_fd); rec_cnt+=1537; /** Build S1 Packet **/ Server.Time[0] = 0; Server.Time[1] = 0; Server.Time[2] = 0; Server.Time[3] = 0; @@ -27,25 +27,25 @@ bool doHandshake(){ Server.Random[i] = versionstring[i%13]; } /** Send S0 **/ - fwrite(&(Version), 1, 1, stdout); + DDV_write(&(Version), 1, 1, CONN_fd); /** Send S1 **/ - fwrite(Server.Time, 1, 4, stdout); - fwrite(Server.Zero, 1, 4, stdout); - fwrite(Server.Random, 1, 1528, stdout); + DDV_write(Server.Time, 1, 4, CONN_fd); + DDV_write(Server.Zero, 1, 4, CONN_fd); + DDV_write(Server.Random, 1, 1528, CONN_fd); /** Flush output, just for certainty **/ - fflush(stdout); + //fflush(CONN_fd); snd_cnt+=1537; /** Send S2 **/ - fwrite(Client.Time, 1, 4, stdout); - fwrite(Client.Time, 1, 4, stdout); - fwrite(Client.Random, 1, 1528, stdout); + DDV_write(Client.Time, 1, 4, CONN_fd); + DDV_write(Client.Time, 1, 4, CONN_fd); + DDV_write(Client.Random, 1, 1528, CONN_fd); snd_cnt+=1536; /** Flush, necessary in order to work **/ - fflush(stdout); + //fflush(CONN_fd); /** Read and discard C2 **/ - fread(Client.Time, 1, 4, stdin); - fread(Client.Zero, 1, 4, stdin); - fread(Client.Random, 1, 1528, stdin); + DDV_read(Client.Time, 1, 4, CONN_fd); + DDV_read(Client.Zero, 1, 4, CONN_fd); + DDV_read(Client.Random, 1, 1528, CONN_fd); rec_cnt+=1536; return true; }//doHandshake @@ -57,10 +57,10 @@ bool doHandshake(){ bool doHandshake(){ char Version; /** Read C0 **/ - fread(&Version, 1, 1, stdin); + DDV_read(&Version, 1, 1, CONN_fd); uint8_t Client[1536]; uint8_t Server[3072]; - fread(&Client, 1, 1536, stdin); + DDV_read(&Client, 1, 1536, CONN_fd); rec_cnt+=1537; /** Build S1 Packet **/ @@ -123,13 +123,13 @@ bool doHandshake(){ delete[] pLastHash; //***** DONE BUILDING THE RESPONSE ***// /** Send response **/ - fwrite(&Version, 1, 1, stdout); - fwrite(&Server, 1, 3072, stdout); + DDV_write(&Version, 1, 1, CONN_fd); + DDV_write(&Server, 1, 3072, CONN_fd); snd_cnt+=3073; /** Flush, necessary in order to work **/ - fflush(stdout); + //fflush(CONN_fd); /** Read and discard C2 **/ - fread(Client, 1, 1536, stdin); + DDV_read(Client, 1, 1536, CONN_fd); rec_cnt+=1536; return true; } diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 14cfea45..87b67b22 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -1,38 +1,72 @@ -#undef DEBUG +#define DEBUG #include #include #include #include - -//needed for select -#include -#include -#include -#include #include +#include +#include +#include +#include //for connection to server -#include "../sockets/SocketW.h" bool ready4data = false;//set to true when streaming starts bool inited = false; bool stopparsing = false; timeval lastrec; +int CONN_fd = 0; +#include "../util/ddv_socket.cpp" //DDVTech Socket wrapper +#include "../util/flv_sock.cpp" //FLV parsing with SocketW #include "parsechunks.cpp" //chunkstream parsing #include "handshake.cpp" //handshaking -#include "../util/flv_sock.cpp" //FLV parsing with SocketW -int main(){ + + +int server_socket = 0; + +void termination_handler (int signum){ + if (server_socket == 0) return; + close(server_socket); + server_socket = 0; +} + +int main(int argc, char ** argv){ + //setup signal handler + struct sigaction new_action; + new_action.sa_handler = termination_handler; + sigemptyset (&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction (SIGINT, &new_action, NULL); + sigaction (SIGHUP, &new_action, NULL); + sigaction (SIGTERM, &new_action, NULL); + + server_socket = DDV_Listen(1935); + if ((argc < 2) || (argv[1] == "nd")){ + if (server_socket > 0){daemon(1, 0);}else{return 1;} + } + int status; + while (server_socket > 0){ + waitpid((pid_t)-1, &status, WNOHANG); + CONN_fd = DDV_Accept(server_socket); + if (CONN_fd > 0){ + pid_t myid = fork(); + if (myid == 0){ + break; + }else{ + printf("Spawned new process %i for handling socket %i\n", (int)myid, CONN_fd); + } + } + } + if (server_socket <= 0){ + return 0; + } + unsigned int ts; unsigned int fts = 0; unsigned int ftst; - SWUnixSocket ss; - fd_set pollset; - struct timeval timeout; - //0 timeout - return immediately after select call - timeout.tv_sec = 1; timeout.tv_usec = 0; - FD_ZERO(&pollset);//clear the polling set - FD_SET(0, &pollset);//add stdin to polling set + int ss; + FLV_Pack * tag; //first timestamp set firsttime = getNowMS(); @@ -53,58 +87,71 @@ int main(){ #ifdef DEBUG fprintf(stderr, "Starting processing...\n"); #endif - while (std::cin.good() && std::cout.good()){ - //select(1, &pollset, 0, 0, &timeout); - //only parse input from stdin if available or not yet init'ed - //FD_ISSET(0, &pollset) || //NOTE: Polling does not work? WHY?!? WHY DAMN IT?!? - if ((!ready4data || (snd_cnt - snd_window_at >= snd_window_size)) && !stopparsing){parseChunk();fflush(stdout);} + + + int retval; + int poller = epoll_create(1); + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.fd = CONN_fd; + epoll_ctl(poller, EPOLL_CTL_ADD, CONN_fd, &ev); + struct epoll_event events[1]; + + + + + while (!socketError && !All_Hell_Broke_Loose){ + //only parse input if available or not yet init'ed + //rightnow = getNowMS(); + retval = epoll_wait(poller, events, 1, 0); + if (!ready4data || (snd_cnt - snd_window_at >= snd_window_size)){ + if (DDV_ready(CONN_fd)){ + parseChunk(); + } + } if (ready4data){ if (!inited){ //we are ready, connect the socket! - if (!ss.connect(streamname.c_str())){ + ss = DDV_OpenUnix(streamname.c_str()); + if (ss <= 0){ #ifdef DEBUG fprintf(stderr, "Could not connect to server!\n"); #endif return 0; } - FLV_Readheader(ss);//read the header, we don't want it #ifdef DEBUG - fprintf(stderr, "Header read, starting to send video data...\n"); + fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif inited = true; } //only send data if previous data has been ACK'ed... - if (snd_cnt - snd_window_at < snd_window_size){ - if (FLV_GetPacket(ss)){//able to read a full packet? - ts = FLVbuffer[7] * 256*256*256; - ts += FLVbuffer[4] * 256*256; - ts += FLVbuffer[5] * 256; - ts += FLVbuffer[6]; + //if (snd_cnt - snd_window_at < snd_window_size){ + if (FLV_GetPacket(tag, ss)){//able to read a full packet? + ts = tag->data[7] * 256*256*256; + ts += tag->data[4] * 256*256; + ts += tag->data[5] * 256; + ts += tag->data[6]; if (ts != 0){ if (fts == 0){fts = ts;ftst = getNowMS();} ts -= fts; - FLVbuffer[7] = ts / (256*256*256); - FLVbuffer[4] = ts / (256*256); - FLVbuffer[5] = ts / 256; - FLVbuffer[6] = ts % 256; + tag->data[7] = ts / (256*256*256); + tag->data[4] = ts / (256*256); + tag->data[5] = ts / 256; + tag->data[6] = ts % 256; ts += ftst; }else{ ftst = getNowMS(); - FLVbuffer[7] = ftst / (256*256*256); - FLVbuffer[4] = ftst / (256*256); - FLVbuffer[5] = ftst / 256; - FLVbuffer[6] = ftst % 256; + tag->data[7] = ftst / (256*256*256); + tag->data[4] = ftst / (256*256); + tag->data[5] = ftst / 256; + tag->data[6] = ftst % 256; } - SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15, ts); - FLV_Dump();//dump packet and get ready for next - } - if ((SWBerr != SWBaseSocket::ok) && (SWBerr != SWBaseSocket::notReady)){ + SendMedia((unsigned char)tag->data[0], (unsigned char *)tag->data+11, tag->len-15, ts); #ifdef DEBUG - fprintf(stderr, "No more data! :-( (%s)\n", SWBerr.get_error().c_str()); + fprintf(stderr, "Sent a tag to %i\n", CONN_fd); #endif - return 0;//no more input possible! Fail immediately. } - } + //} } //send ACK if we received a whole window if (rec_cnt - rec_window_at > rec_window_size){ @@ -112,8 +159,8 @@ int main(){ SendCTL(3, rec_cnt);//send ack (msg 3) } } - #ifdef DEBUG - fprintf(stderr, "User disconnected.\n"); - #endif + //#ifdef DEBUG + fprintf(stderr, "User %i disconnected.\n", CONN_fd); + //#endif return 0; }//main diff --git a/Makefile b/Makefile index cceed8f6..42ead03d 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -default: client-local-install +default: client-install client: cd Connector_HTTP; $(MAKE) @@ -13,19 +13,12 @@ client-clean: #cd Connector_RTSP; $(MAKE) clean cd Buffer; $(MAKE) clean clean: client-clean -client-install: client +client-install: client-clean client + service xinetd stop cp -f ./Connector_HTTP/Connector_HTTP /usr/bin/ - cp -f ./Connector_RTMP/Connector_RTMP /usr/bin/ + cd Connector_RTMP; $(MAKE) install cp -f ./Connector_RAW/Connector_RAW /usr/bin/ #cp -f ./Connector_RTSP/Connector_RTSP /usr/bin/ cp -f ./Buffer/Buffer /usr/bin/ cp -f ./PLS /etc/xinetd.d/ - service xinetd restart -client-local-install: client - mkdir -p ./bin - cp -f ./Connector_HTTP/Connector_HTTP ./bin/ - cp -f ./Connector_RTMP/Connector_RTMP ./bin/ - cp -f ./Connector_RTMP/Connector_RAW ./bin/ - #cp -f ./Connector_RTSP/Connector_RTSP ./bin/ - cp -f ./Buffer/Buffer ./bin/ - + service xinetd start diff --git a/PLS b/PLS index c4546aa2..e6eda2ea 100644 --- a/PLS +++ b/PLS @@ -8,29 +8,21 @@ service ddvtechhttp server = /usr/bin/Connector_HTTP port = 7337 wait = no + per_source = 10 + cps = 100 5 } -service ddvtechrtmp +service ddvtechraw { disable = no type = UNLISTED protocol = tcp socket_type = stream user = root - server = /usr/bin/Connector_RTMP - port = 1935 + server = /usr/bin/Connector_RAW + port = 3773 wait = no -} - -service ddvtechraw -{ - disable = no - type = UNLISTED - protocol = tcp - socket_type = stream - user = root - server = /usr/bin/Connector_RAW - port = 3773 - wait = no + per_source = 10 + cps = 100 5 } diff --git a/util/ddv_socket.cpp b/util/ddv_socket.cpp new file mode 100644 index 00000000..cedcc76c --- /dev/null +++ b/util/ddv_socket.cpp @@ -0,0 +1,154 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +bool socketError = false; +bool socketBlocking = false; + +int DDV_OpenUnix(const char adres[], bool nonblock = false){ + int s = socket(AF_UNIX, SOCK_STREAM, 0); + sockaddr_un addr; + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, adres); + int r = connect(s, (sockaddr*)&addr, sizeof(addr)); + if (r == 0){ + if (nonblock){ + int flags = fcntl(s, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(s, F_SETFL, flags); + } + return s; + }else{ + close(s); + return 0; + } +} + +int DDV_Listen(int port){ + int s = socket(AF_INET, SOCK_STREAM, 0); + + int on = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port);//port 8888 + inet_pton(AF_INET, "0.0.0.0", &addr.sin_addr);//listen on all interfaces + int ret = bind(s, (sockaddr*)&addr, sizeof(addr));//bind to all interfaces, chosen port + if (ret == 0){ + ret = listen(s, 100);//start listening, backlog of 100 allowed + if (ret == 0){ + return s; + }else{ + printf("Listen failed! Error: %s\n", strerror(errno)); + close(s); + return 0; + } + }else{ + printf("Binding failed! Error: %s\n", strerror(errno)); + close(s); + return 0; + } +} + +int DDV_Accept(int sock, bool nonblock = false){ + int r = accept(sock, 0, 0); + if ((r > 0) && nonblock){ + int flags = fcntl(r, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(r, F_SETFL, flags); + } + return r; +} + +bool DDV_write(void * buffer, int todo, int sock){ + int sofar = 0; + socketBlocking = false; + while (sofar != todo){ + int r = send(sock, (char*)buffer + sofar, todo-sofar, 0); + if (r <= 0){ + switch (errno){ + case EWOULDBLOCK: socketBlocking = true; break; + default: + socketError = true; + printf("Could not write! %s\n", strerror(errno)); + return false; + break; + } + } + sofar += r; + } + return true; +} + +bool DDV_ready(int sock){ + char tmp; + int preflags = fcntl(sock, F_GETFL, 0); + int postflags = preflags | O_NONBLOCK; + fcntl(sock, F_SETFL, postflags); + int r = recv(sock, &tmp, 1, MSG_PEEK); + fcntl(sock, F_SETFL, preflags); + return (r == 1); +} + +bool DDV_read(void * buffer, int todo, int sock){ + int sofar = 0; + socketBlocking = false; + while (sofar != todo){ + int r = recv(sock, (char*)buffer + sofar, todo-sofar, 0); + if (r <= 0){ + switch (errno){ + case EWOULDBLOCK: socketBlocking = true; break; + default: + socketError = true; + printf("Could not read! %s\n", strerror(errno)); + return false; + break; + } + } + sofar += r; + } + return true; +} + + +bool DDV_read(void * buffer, int width, int count, int sock){return DDV_read(buffer, width*count, sock);} +bool DDV_write(void * buffer, int width, int count, int sock){return DDV_write(buffer, width*count, sock);} + + +int DDV_iwrite(void * buffer, int todo, int sock){ + int r = send(sock, buffer, todo, 0); + if (r < 0){ + switch (errno){ + case EWOULDBLOCK: break; + default: + socketError = true; + printf("Could not write! %s\n", strerror(errno)); + return false; + break; + } + } + return r; +} + +int DDV_iread(void * buffer, int todo, int sock){ + int r = recv(sock, buffer, todo, 0); + if (r < 0){ + switch (errno){ + case EWOULDBLOCK: break; + default: + socketError = true; + printf("Could not read! %s\n", strerror(errno)); + return false; + break; + } + } + return r; +} + + diff --git a/util/flv.cpp b/util/flv.cpp index 0e119a52..c04d977a 100644 --- a/util/flv.cpp +++ b/util/flv.cpp @@ -1,4 +1,5 @@ #include //for read() +#include struct FLV_Pack { int len; @@ -8,42 +9,86 @@ struct FLV_Pack { };//FLV_Pack char FLVHeader[13]; +bool All_Hell_Broke_Loose = false; -//reads full length from a file descriptor -void Magic_Read(char * buf, int len, int file){ - int i = 0; - while (i < len) i += read(file, buf, len-i); -} - - -//reads a FLV header and checks for correctness +//checks FLV Header for correctness //returns true if everything is alright, false otherwise -bool FLV_Readheader(){ - fread(FLVHeader,1,13,stdin); - if (FLVHeader[0] != 'F') return false; - if (FLVHeader[1] != 'L') return false; - if (FLVHeader[2] != 'V') return false; - if (FLVHeader[8] != 0x09) return false; - if (FLVHeader[9] != 0) return false; - if (FLVHeader[10] != 0) return false; - if (FLVHeader[11] != 0) return false; - if (FLVHeader[12] != 0) return false; +bool FLV_Checkheader(char * header){ + if (header[0] != 'F') return false; + if (header[1] != 'L') return false; + if (header[2] != 'V') return false; + if (header[8] != 0x09) return false; + if (header[9] != 0) return false; + if (header[10] != 0) return false; + if (header[11] != 0) return false; + if (header[12] != 0) return false; return true; -}//FLV_Readheader +}//FLV_Checkheader + +//returns true if header is an FLV header +bool FLV_Isheader(char * header){ + if (header[0] != 'F') return false; + if (header[1] != 'L') return false; + if (header[2] != 'V') return false; + return true; +}//FLV_Isheader + +bool ReadUntil(char * buffer, unsigned int count, unsigned int & sofar){ + if (sofar >= count){return true;} + int r = 0; + r = fread(buffer + sofar,1,count-sofar,stdin); + if (r < 0){All_Hell_Broke_Loose = true; return false;} + sofar += r; + if (sofar >= count){return true;} + return false; +} //gets a packet, storing in given FLV_Pack pointer. //will assign pointer if null //resizes FLV_Pack data field bigger if data doesn't fit // (does not auto-shrink for speed!) -void FLV_GetPacket(FLV_Pack *& p){ +bool FLV_GetPacket(FLV_Pack *& p){ + int preflags = fcntl(fileno(stdin), F_GETFL, 0); + int postflags = preflags | O_NONBLOCK; + fcntl(fileno(stdin), F_SETFL, postflags); + static bool done = true; + static unsigned int sofar = 0; if (!p){p = (FLV_Pack*)calloc(1, sizeof(FLV_Pack));} if (p->buf < 15){p->data = (char*)realloc(p->data, 15); p->buf = 15;} - fread(p->data,1,11,stdin); - p->len = p->data[3] + 15; - p->len += (p->data[2] << 8); - p->len += (p->data[1] << 16); - if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;} - fread(p->data+11,1,p->len-11,stdin); - p->isKeyframe = false; - if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;} + + if (done){ + //read a header + if (ReadUntil(p->data, 11, sofar)){ + //if its a correct FLV header, throw away and read tag header + if (FLV_Isheader(p->data)){ + if (ReadUntil(p->data, 13, sofar)){ + if (FLV_Checkheader(p->data)){ + sofar = 0; + memcpy(FLVHeader, p->data, 13); + }else{All_Hell_Broke_Loose = true;} + } + }else{ + //if a tag header, calculate length and read tag body + p->len = p->data[3] + 15; + p->len += (p->data[2] << 8); + p->len += (p->data[1] << 16); + if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;} + done = false; + } + } + }else{ + //read tag body + if (ReadUntil(p->data, p->len, sofar)){ + //calculate keyframeness, next time read header again, return true + p->isKeyframe = false; + if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;} + done = true; + sofar = 0; + fcntl(fileno(stdin), F_SETFL, preflags); + return true; + } + } + fcntl(fileno(stdin), F_SETFL, preflags); + return false; }//FLV_GetPacket + diff --git a/util/flv_sock.cpp b/util/flv_sock.cpp index 8d6c55a4..ac520b43 100644 --- a/util/flv_sock.cpp +++ b/util/flv_sock.cpp @@ -1,30 +1,92 @@ -SWBaseSocket::SWBaseError SWBerr; -char * FLVbuffer; -int FLV_len; -int FLVbs = 0; -void FLV_Readheader(SWUnixSocket & ss){ - static char header[13]; - while (ss.frecv(header, 13, &SWBerr) != 13){ - //wait - } -}//FLV_Readheader +struct FLV_Pack { + int len; + int buf; + bool isKeyframe; + char * data; +};//FLV_Pack -void FLV_Dump(){FLV_len = 0;} +char FLVHeader[13]; +bool All_Hell_Broke_Loose = false; -bool FLV_GetPacket(SWUnixSocket & ss){ - if (FLVbs < 15){FLVbuffer = (char*)realloc(FLVbuffer, 15); FLVbs = 15;} - //if received a whole header, receive a whole packet - //if not, retry header next pass - if (FLV_len == 0){ - if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){ - FLV_len = FLVbuffer[3] + 15; - FLV_len += (FLVbuffer[2] << 8); - FLV_len += (FLVbuffer[1] << 16); - if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;} +//checks FLV Header for correctness +//returns true if everything is alright, false otherwise +bool FLV_Checkheader(char * header){ + if (header[0] != 'F') return false; + if (header[1] != 'L') return false; + if (header[2] != 'V') return false; + if (header[8] != 0x09) return false; + if (header[9] != 0) return false; + if (header[10] != 0) return false; + if (header[11] != 0) return false; + if (header[12] != 0) return false; + return true; +}//FLV_Checkheader + +//returns true if header is an FLV header +bool FLV_Isheader(char * header){ + if (header[0] != 'F') return false; + if (header[1] != 'L') return false; + if (header[2] != 'V') return false; + return true; +}//FLV_Isheader + +bool ReadUntil(char * buffer, unsigned int count, unsigned int & sofar, int sock){ + if (sofar >= count){return true;} + int r = 0; + r = DDV_iread(buffer + sofar,count-sofar,sock); + if (r < 0){All_Hell_Broke_Loose = true; return false;} + sofar += r; + if (sofar >= count){return true;} + return false; +} + +//gets a packet, storing in given FLV_Pack pointer. +//will assign pointer if null +//resizes FLV_Pack data field bigger if data doesn't fit +// (does not auto-shrink for speed!) +bool FLV_GetPacket(FLV_Pack *& p, int sock){ + int preflags = fcntl(sock, F_GETFL, 0); + int postflags = preflags | O_NONBLOCK; + fcntl(sock, F_SETFL, postflags); + static bool done = true; + static unsigned int sofar = 0; + if (!p){p = (FLV_Pack*)calloc(1, sizeof(FLV_Pack));} + if (p->buf < 15){p->data = (char*)realloc(p->data, 15); p->buf = 15;} + + if (done){ + //read a header + if (ReadUntil(p->data, 11, sofar, sock)){ + //if its a correct FLV header, throw away and read tag header + if (FLV_Isheader(p->data)){ + if (ReadUntil(p->data, 13, sofar, sock)){ + if (FLV_Checkheader(p->data)){ + sofar = 0; + memcpy(FLVHeader, p->data, 13); + }else{All_Hell_Broke_Loose = true;} + } + }else{ + //if a tag header, calculate length and read tag body + p->len = p->data[3] + 15; + p->len += (p->data[2] << 8); + p->len += (p->data[1] << 16); + if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;} + done = false; + } } }else{ - if (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) == FLV_len-11){return true;} + //read tag body + if (ReadUntil(p->data, p->len, sofar, sock)){ + //calculate keyframeness, next time read header again, return true + p->isKeyframe = false; + if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;} + done = true; + sofar = 0; + fcntl(sock, F_SETFL, preflags); + return true; + } } + fcntl(sock, F_SETFL, preflags); return false; }//FLV_GetPacket +