diff --git a/Connector_RTMP/amf.cpp b/Connector_RTMP/amf.cpp index c6bbdb96..70aab059 100644 --- a/Connector_RTMP/amf.cpp +++ b/Connector_RTMP/amf.cpp @@ -105,6 +105,7 @@ class AMFType { case 0x02://short string case 0x0C: std::cerr << "String"; break; case 0x03: std::cerr << "Object"; break; + case 0x08: std::cerr << "ECMA Array"; break; case 0x05: std::cerr << "Null"; break; case 0x06: std::cerr << "Undefined"; break; case 0x0D: std::cerr << "Unsupported"; break; @@ -157,6 +158,22 @@ class AMFType { } r += (char)0; r += (char)0; r += (char)9; break; + case 0x08:{//array + int arrlen = 0; + if (contents){ + arrlen = getContentP("length")->NumValue(); + r += arrlen / (256*256*256); r += arrlen / (256*256); r += arrlen / 256; r += arrlen % 256; + for (std::vector::iterator it = contents->begin(); it != contents->end(); it++){ + r += it->Indice().size() / 256; + r += it->Indice().size() % 256; + r += it->Indice(); + r += it->Pack(); + } + }else{ + r += (char)0; r += (char)0; r += (char)0; r += (char)0; + } + r += (char)0; r += (char)0; r += (char)9; + } break; case 0xFF://container - our own type - do not send, only send contents if (contents){ for (std::vector::iterator it = contents->begin(); it != contents->end(); it++){ @@ -176,6 +193,7 @@ class AMFType { };//AMFType AMFType parseOneAMF(const unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){ + char * helperchar = 0; std::string tmpstr; unsigned int tmpi = 0; unsigned char tmpdbl[8]; @@ -202,13 +220,21 @@ AMFType parseOneAMF(const unsigned char *& data, unsigned int &len, unsigned int break; case 0x0C://long string tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4]; - tmpstr = (char*)(data+i+5); + helperchar = (char*)malloc(tmpi+1); + memcpy(helperchar, data+i+5, tmpi); + helperchar[tmpi] = 0; + tmpstr = helperchar; + free(helperchar); i += tmpi + 5; return AMFType(name, tmpstr, 0x0C); break; case 0x02://string tmpi = data[i+1]*256+data[i+2]; - tmpstr = (char*)(data+i+3); + helperchar = (char*)malloc(tmpi+1); + memcpy(helperchar, data+i+3, tmpi); + helperchar[tmpi] = 0; + tmpstr = helperchar; + free(helperchar); i += tmpi + 3; return AMFType(name, tmpstr, 0x02); break; @@ -230,8 +256,23 @@ AMFType parseOneAMF(const unsigned char *& data, unsigned int &len, unsigned int i += 3; return ret; } break; + case 0x08:{//ECMA array + ++i; + AMFType ret = AMFType(name, data[i-1]); + i += 4; + while (data[i] + data[i+1] != 0){ + tmpi = data[i]*256+data[i+1]; + tmpstr = (char*)(data+i+2); + i += tmpi + 2; + ret.addContent(parseOneAMF(data, len, i, tmpstr)); + } + i += 3; + return ret; + } break; } + #ifdef DEBUG fprintf(stderr, "Error: Unimplemented AMF type %hhx - returning.\n", data[i]); + #endif return AMFType("error", (unsigned char)0xFF); }//parseOneAMF diff --git a/Connector_RTMP/chunkstream.cpp b/Connector_RTMP/chunkstream.cpp index c99287d1..6df8752c 100644 --- a/Connector_RTMP/chunkstream.cpp +++ b/Connector_RTMP/chunkstream.cpp @@ -4,6 +4,13 @@ #include #include +unsigned int getNowMS(){ + timeval t; + gettimeofday(&t, 0); + return t.tv_sec + t.tv_usec/1000; +} + + unsigned int chunk_rec_max = 128; unsigned int chunk_snd_max = 128; unsigned int rec_window_size = 0xFA00; @@ -13,6 +20,8 @@ unsigned int snd_window_at = 0; unsigned int rec_cnt = 0; unsigned int snd_cnt = 0; +unsigned int firsttime; + struct chunkinfo { unsigned int cs_id; unsigned int timestamp; @@ -84,6 +93,7 @@ void SendChunk(chunkpack ch){ unsigned int tmpi; unsigned char chtype = 0x00; chunkinfo prev = GetSndPrev(ch.cs_id); + ch.timestamp -= firsttime; if (prev.cs_id == ch.cs_id){ if (ch.msg_stream_id == prev.msg_stream_id){ chtype = 0x40;//do not send msg_stream_id @@ -143,10 +153,10 @@ void SendChunk(chunkpack ch){ snd_cnt+=1; if (chtype != 0x40){ //msg stream id - tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, stdout); - tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, stdout); - tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, stdout); tmp = ch.msg_stream_id % 256; fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, stdout); snd_cnt+=4; } } @@ -191,10 +201,8 @@ void SendChunk(chunkpack ch){ //sends a chunk void SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data){ chunkpack ch; - timeval t; - gettimeofday(&t, 0); ch.cs_id = cs_id; - ch.timestamp = t.tv_sec; + ch.timestamp = getNowMS(); ch.len = data.size(); ch.real_len = data.size(); ch.len_left = 0; @@ -207,18 +215,15 @@ void SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_s }//SendChunk //sends a media chunk -void SendMedia(unsigned char msg_type_id, unsigned char * data, int len){ - if ((msg_type_id != 8) && (msg_type_id != 9)) return;//only parse audio and video +void SendMedia(unsigned char msg_type_id, unsigned char * data, int len, unsigned int ts){ chunkpack ch; - timeval t; - gettimeofday(&t, 0); ch.cs_id = msg_type_id; - ch.timestamp = t.tv_sec; + ch.timestamp = ts; ch.len = len; ch.real_len = len; ch.len_left = 0; ch.msg_type_id = msg_type_id; - ch.msg_stream_id = 10; + ch.msg_stream_id = 1; ch.data = (unsigned char*)malloc(len); memcpy(ch.data, data, len); SendChunk(ch); @@ -228,10 +233,8 @@ void SendMedia(unsigned char msg_type_id, unsigned char * data, int len){ //sends a control message void SendCTL(unsigned char type, unsigned int data){ chunkpack ch; - timeval t; - gettimeofday(&t, 0); ch.cs_id = 2; - ch.timestamp = t.tv_sec; + ch.timestamp = getNowMS(); ch.len = 4; ch.real_len = 4; ch.len_left = 0; @@ -247,10 +250,8 @@ void SendCTL(unsigned char type, unsigned int data){ //sends a control message void SendCTL(unsigned char type, unsigned int data, unsigned char data2){ chunkpack ch; - timeval t; - gettimeofday(&t, 0); ch.cs_id = 2; - ch.timestamp = t.tv_sec; + ch.timestamp = getNowMS(); ch.len = 5; ch.real_len = 5; ch.len_left = 0; @@ -267,10 +268,8 @@ void SendCTL(unsigned char type, unsigned int data, unsigned char data2){ //sends a usr control message void SendUSR(unsigned char type, unsigned int data){ chunkpack ch; - timeval t; - gettimeofday(&t, 0); ch.cs_id = 2; - ch.timestamp = t.tv_sec; + ch.timestamp = getNowMS(); ch.len = 6; ch.real_len = 6; ch.len_left = 0; @@ -288,10 +287,8 @@ void SendUSR(unsigned char type, unsigned int data){ //sends a usr control message void SendUSR(unsigned char type, unsigned int data, unsigned int data2){ chunkpack ch; - timeval t; - gettimeofday(&t, 0); ch.cs_id = 2; - ch.timestamp = t.tv_sec; + ch.timestamp = getNowMS(); ch.len = 10; ch.real_len = 10; ch.len_left = 0; @@ -353,13 +350,13 @@ struct chunkpack getChunk(){ fread(&temp, 1, 1, stdin); ret.msg_type_id = temp; fread(&temp, 1, 1, stdin); - ret.msg_stream_id = temp*256*256*256; - fread(&temp, 1, 1, stdin); - ret.msg_stream_id += temp*256*256; + ret.msg_stream_id = temp; fread(&temp, 1, 1, stdin); ret.msg_stream_id += temp*256; fread(&temp, 1, 1, stdin); - ret.msg_stream_id += temp; + ret.msg_stream_id += temp*256*256; + fread(&temp, 1, 1, stdin); + ret.msg_stream_id += temp*256*256*256; rec_cnt+=11; break; case 0x40: @@ -459,7 +456,12 @@ chunkpack * AddChunkPart(chunkpack newchunk){ }else{ p = it->second; tmpdata = (unsigned char*)realloc(p->data, p->real_len + newchunk.real_len); - if (tmpdata == 0){fprintf(stderr, "Error allocating memory!\n");return 0;} + if (tmpdata == 0){ + #ifdef DEBUG + fprintf(stderr, "Error allocating memory!\n"); + #endif + return 0; + } p->data = tmpdata; memcpy(p->data+p->real_len, newchunk.data, newchunk.real_len); p->real_len += newchunk.real_len; diff --git a/Connector_RTMP/handshake.cpp b/Connector_RTMP/handshake.cpp index 6aee562c..adc81739 100644 --- a/Connector_RTMP/handshake.cpp +++ b/Connector_RTMP/handshake.cpp @@ -4,7 +4,7 @@ struct Handshake { char Random[1528]; };//Handshake -char * versionstring = "PLSRTMPServer"; +char versionstring[] = "PLSRTMPServer"; void doHandshake(){ srand(time(NULL)); diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 2d978861..f361f9de 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -1,3 +1,4 @@ +#define DEBUG #include #include #include @@ -21,37 +22,61 @@ timeval lastrec; #include "flv_sock.cpp" //FLV parsing with SocketW int main(){ + unsigned int ts; + unsigned int fts = 0; + unsigned int ftst; SWUnixSocket ss; fd_set pollset; struct timeval timeout; - struct timeval now; //0 timeout - return immediately after select call timeout.tv_sec = 1; timeout.tv_usec = 0; FD_ZERO(&pollset);//clear the polling set FD_SET(0, &pollset);//add stdin to polling set + //first timestamp set + firsttime = getNowMS(); + + #ifdef DEBUG fprintf(stderr, "Doing handshake...\n"); + #endif doHandshake(); + #ifdef DEBUG fprintf(stderr, "Starting processing...\n"); + #endif while (!feof(stdin)){ select(1, &pollset, 0, 0, &timeout); //only parse input from stdin if available or not yet init'ed - if (FD_ISSET(0, &pollset) || !ready4data){parseChunk();fflush(stdout);}// || !ready4data? + if (FD_ISSET(0, &pollset) || !ready4data || (snd_cnt - snd_window_at >= snd_window_size)){parseChunk();fflush(stdout);}// || !ready4data? if (ready4data){ if (!inited){ //we are ready, connect the socket! if (!ss.connect("../shared_socket")){ + #ifdef DEBUG fprintf(stderr, "Could not connect to server!\n"); + #endif return 1; } FLV_Readheader(ss);//read the header, we don't want it + #ifdef DEBUG fprintf(stderr, "Header read, starting to send video data...\n"); + #endif inited = true; } //only send data if previous data has been ACK'ed... if (snd_cnt - snd_window_at < snd_window_size){ if (FLV_GetPacket(ss)){//able to read a full packet? - SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15); + ts = FLVbuffer[7] * 256*256*256; + ts += FLVbuffer[4] * 256*256; + ts += FLVbuffer[5] * 256; + ts += FLVbuffer[6]; + if (fts == 0){fts = ts;ftst = getNowMS();} + ts -= fts; + FLVbuffer[7] = ts / (256*256*256); + FLVbuffer[4] = ts / (256*256); + FLVbuffer[5] = ts / 256; + FLVbuffer[6] = ts % 256; + ts += ftst; + SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15, ts); } } } diff --git a/Connector_RTMP/parsechunks.cpp b/Connector_RTMP/parsechunks.cpp index a515114c..f4fbe465 100644 --- a/Connector_RTMP/parsechunks.cpp +++ b/Connector_RTMP/parsechunks.cpp @@ -7,76 +7,115 @@ void parseChunk(){ static chunkpack next; static AMFType amfdata("empty", (unsigned char)0xFF); static AMFType amfelem("empty", (unsigned char)0xFF); - static int tmpint; next = getWholeChunk(); switch (next.msg_type_id){ case 0://does not exist break;//happens when connection breaks unexpectedly case 1://set chunk size chunk_rec_max = ntohl(*(int*)next.data); + #ifdef DEBUG fprintf(stderr, "CTRL: Set chunk size: %i\n", chunk_rec_max); + #endif break; case 2://abort message - we ignore this one + #ifdef DEBUG fprintf(stderr, "CTRL: Abort message\n"); + #endif //4 bytes of stream id to drop break; case 3://ack + #ifdef DEBUG fprintf(stderr, "CTRL: Acknowledgement\n"); + #endif snd_window_at = ntohl(*(int*)next.data); //maybe better? snd_window_at = snd_cnt; break; case 4:{ + #ifdef DEBUG short int ucmtype = ntohs(*(short int*)next.data); fprintf(stderr, "CTRL: User control message %hi\n", ucmtype); + #endif //2 bytes event type, rest = event data + //types: + //0 = stream begin, 4 bytes ID + //1 = stream EOF, 4 bytes ID + //2 = stream dry, 4 bytes ID + //3 = setbufferlen, 4 bytes ID, 4 bytes length + //4 = streamisrecorded, 4 bytes ID + //6 = pingrequest, 4 bytes data + //7 = pingresponse, 4 bytes data //we don't need to process this } break; case 5://window size of other end + #ifdef DEBUG fprintf(stderr, "CTRL: Window size\n"); + #endif rec_window_size = ntohl(*(int*)next.data); rec_window_at = rec_cnt; SendCTL(3, rec_cnt);//send ack (msg 3) break; case 6: + #ifdef DEBUG fprintf(stderr, "CTRL: Set peer bandwidth\n"); + #endif //4 bytes window size, 1 byte limit type (ignored) snd_window_size = ntohl(*(int*)next.data); SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) break; case 8: + #ifdef DEBUG fprintf(stderr, "Received audio data\n"); + #endif break; case 9: + #ifdef DEBUG fprintf(stderr, "Received video data\n"); + #endif break; case 15: + #ifdef DEBUG fprintf(stderr, "Received AFM3 data message\n"); + #endif break; case 16: + #ifdef DEBUG fprintf(stderr, "Received AFM3 shared object\n"); + #endif break; case 17: + #ifdef DEBUG fprintf(stderr, "Received AFM3 command message\n"); + #endif break; case 18: + #ifdef DEBUG fprintf(stderr, "Received AFM0 data message\n"); + #endif break; case 19: + #ifdef DEBUG fprintf(stderr, "Received AFM0 shared object\n"); + #endif break; case 20:{//AMF0 command message + bool parsed = false; amfdata = parseAMF(next.data, next.real_len); + #ifdef DEBUG fprintf(stderr, "Received AFM0 command message:\n"); amfdata.Print(); + #endif if (amfdata.getContentP(0)->StrValue() == "connect"){ + #ifdef DEBUG + int tmpint; tmpint = amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} tmpint = amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + #endif SendCTL(6, rec_window_size, 0);//send peer bandwidth (msg 6) - //SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) + SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) SendUSR(0, 0);//send UCM StreamBegin (0), stream 0 //send a _result reply AMFType amfreply("container", (unsigned char)0xFF); @@ -88,10 +127,10 @@ void parseChunk(){ amfreply.getContentP(2)->addContent(AMFType("capabilities", (double)31));//stolen from examples amfreply.addContent(AMFType(""));//info amfreply.getContentP(3)->addContent(AMFType("level", "status")); - amfreply.getContentP(3)->addContent(AMFType("code", "NetConnection.Connect.Sucess")); + amfreply.getContentP(3)->addContent(AMFType("code", "NetConnection.Connect.Success")); amfreply.getContentP(3)->addContent(AMFType("description", "Connection succeeded.")); amfreply.getContentP(3)->addContent(AMFType("capabilities", (double)33));//from red5 server - amfreply.getContentP(3)->addContent(AMFType("fmsVer", "RED5/1,0,0,0"));//from red5 server + amfreply.getContentP(3)->addContent(AMFType("fmsVer", "PLS/1,0,0,0"));//from red5 server SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); //send onBWDone packet //amfreply = AMFType("container", (unsigned char)0xFF); @@ -99,6 +138,7 @@ void parseChunk(){ //amfreply.addContent(AMFType("", (double)0));//zero //amfreply.addContent(AMFType("", (double)0, 0x05));//null //SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); + parsed = true; }//connect if (amfdata.getContentP(0)->StrValue() == "createStream"){ //send a _result reply @@ -106,18 +146,42 @@ void parseChunk(){ amfreply.addContent(AMFType("", "_result"));//result success amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info - amfreply.addContent(AMFType("", (double)10));//stream ID - we use 10 + amfreply.addContent(AMFType("", (double)1));//stream ID - we use 1 SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); + #ifdef DEBUG + fprintf(stderr, "AMF0 command: createStream result\n"); + #endif + parsed = true; }//createStream - if (amfdata.getContentP(0)->StrValue() == "getMovLen"){ + if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ //send a _result reply AMFType amfreply("container", (unsigned char)0xFF); amfreply.addContent(AMFType("", "_result"));//result success amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMFType("", (double)6000));//null - command info + amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info + amfreply.addContent(AMFType("", (double)0));//zero length SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); - }//createStream + #ifdef DEBUG + fprintf(stderr, "AMF0 command: getStreamLength result\n"); + #endif + parsed = true; + }//getStreamLength + if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ + //send a _result reply + AMFType amfreply("container", (unsigned char)0xFF); + amfreply.addContent(AMFType("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info + amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info + SendChunk(3, 20, 1, amfreply.Pack()); + #ifdef DEBUG + fprintf(stderr, "AMF0 command: checkBandwidth result\n"); + #endif + parsed = true; + }//checkBandwidth if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ + //send streambegin + SendUSR(0, 1);//send UCM StreamBegin (0), stream 1 //send a status reply AMFType amfreply("container", (unsigned char)0xFF); amfreply.addContent(AMFType("", "onStatus"));//status reply @@ -127,7 +191,9 @@ void parseChunk(){ amfreply.getContentP(3)->addContent(AMFType("level", "status")); amfreply.getContentP(3)->addContent(AMFType("code", "NetStream.Play.Reset")); amfreply.getContentP(3)->addContent(AMFType("description", "Playing and resetting...")); - SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); + amfreply.getContentP(3)->addContent(AMFType("details", "PLS")); + amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1)); + SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()); amfreply = AMFType("container", (unsigned char)0xFF); amfreply.addContent(AMFType("", "onStatus"));//status reply amfreply.addContent(amfdata.getContent(1));//same transaction ID @@ -136,17 +202,32 @@ void parseChunk(){ amfreply.getContentP(3)->addContent(AMFType("level", "status")); amfreply.getContentP(3)->addContent(AMFType("code", "NetStream.Play.Start")); amfreply.getContentP(3)->addContent(AMFType("description", "Playing!")); - SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); + amfreply.getContentP(3)->addContent(AMFType("details", "PLS")); + amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1)); + SendChunk(4, 20, 1, amfreply.Pack()); chunk_snd_max = 1024*1024; SendCTL(1, chunk_snd_max);//send chunk size max (msg 1) ready4data = true;//start sending video data! + #ifdef DEBUG + fprintf(stderr, "AMF0 command: play result\n"); + #endif + parsed = true; }//createStream + if (!parsed){ + #ifdef DEBUG + fprintf(stderr, "AMF0 command not processed! :(\n"); + #endif + } } break; case 22: + #ifdef DEBUG fprintf(stderr, "Received aggregate message\n"); + #endif break; default: + #ifdef DEBUG fprintf(stderr, "Unknown chunk received!\n"); + #endif break; } }//parseChunk diff --git a/Server/main.cpp b/Server/main.cpp index 0747c128..146221ec 100644 --- a/Server/main.cpp +++ b/Server/main.cpp @@ -21,6 +21,8 @@ int main( int argc, char * argv[] ) { std::cout << "usage: " << argv[0] << " buffers_count max_clients" << std::endl; return 1; } + int metabuflen = 0; + char * metabuffer = 0; int buffers = atoi(argv[1]); int connections = atoi(argv[2]); buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); @@ -50,25 +52,33 @@ int main( int argc, char * argv[] ) { } else { FLV_GetPacket(ringbuf[current_buffer]->FLV); //if video frame? (id 9) check for incoming connections - if (ringbuf[current_buffer]->FLV->data[0] == 9) { - incoming = listener.accept(&BError); - if (incoming){ - open_connection = get_empty(connectionList,connections); - if (open_connection != -1) { - connectionList[open_connection]->connect(incoming); - //send the FLV header - std::cout << "Client " << open_connection << " connected." << std::endl; - connectionList[open_connection]->MyBuffer = lastproper; - connectionList[open_connection]->MyBuffer_num = ringbuf[lastproper]->number; - //TODO: Do this more nicely? - if (connectionList[open_connection]->Conn->send(FLVHeader,13,0) != 13){ - connectionList[open_connection]->disconnect(); - std::cout << "Client " << open_connection << " failed to receive the header!" << std::endl; - } - std::cout << "Client " << open_connection << " received header!" << std::endl; - }else{ - std::cout << "New client not connected: no more connections!" << std::endl; + if (ringbuf[current_buffer]->FLV->data[0] == 0x12){ + metabuflen = ringbuf[current_buffer]->FLV->len; + metabuffer = (char*)realloc(metabuffer, metabuflen); + memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen); + } + incoming = listener.accept(&BError); + if (incoming){ + open_connection = get_empty(connectionList,connections); + if (open_connection != -1) { + connectionList[open_connection]->connect(incoming); + //send the FLV header + std::cout << "Client " << open_connection << " connected." << std::endl; + connectionList[open_connection]->MyBuffer = lastproper; + connectionList[open_connection]->MyBuffer_num = ringbuf[lastproper]->number; + //TODO: Do this more nicely? + if (connectionList[open_connection]->Conn->send(FLVHeader,13,0) != 13){ + connectionList[open_connection]->disconnect(); + std::cout << "Client " << open_connection << " failed to receive the header!" << std::endl; } + if (connectionList[open_connection]->Conn->send(metabuffer,metabuflen,0) != metabuflen){ + connectionList[open_connection]->disconnect(); + std::cout << "Client " << open_connection << " failed to receive metadata!" << std::endl; + } + std::cout << "Client " << open_connection << " received metadata and header!" << std::endl; + }else{ + std::cout << "New client not connected: no more connections!" << std::endl; + incoming->disconnect(); } } ringbuf[current_buffer]->number = loopcount; @@ -83,6 +93,6 @@ int main( int argc, char * argv[] ) { // disconnect listener std::cout << "Reached EOF of input" << std::endl; - listener.disconnect(); + listener.disconnect(&BError); return 0; }