diff --git a/Connector_RTMP/amf.cpp b/Connector_RTMP/amf.cpp index 0f22bdfe..bc163126 100644 --- a/Connector_RTMP/amf.cpp +++ b/Connector_RTMP/amf.cpp @@ -16,8 +16,8 @@ class AMFType { //scans the vector for the indice, returns the next AMFType from it or null AMFType * getAMF(std::vector * vect, std::string indice){ std::vector::iterator it; - for (it=vect.begin(); it < vect.end(); it++){ - if ((*it)->StrValue() == indice){it++; return *it;} + for (it=vect->begin(); it < vect->end(); it++){ + if ((*it).StrValue() == indice){it++; return &(*it);} } return 0; }//getAMF diff --git a/Connector_RTMP/chunkstream.cpp b/Connector_RTMP/chunkstream.cpp index 8f3c6e6e..6386622b 100644 --- a/Connector_RTMP/chunkstream.cpp +++ b/Connector_RTMP/chunkstream.cpp @@ -1,6 +1,17 @@ #include #include #include +#include +#include + +unsigned int chunk_rec_max = 128; +unsigned int chunk_snd_max = 128; +unsigned int rec_window_size = 1024*500; +unsigned int snd_window_size = 1024*500; +unsigned int rec_window_at = 0; +unsigned int snd_window_at = 0; +unsigned int rec_cnt = 0; +unsigned int snd_cnt = 0; struct chunkpack { unsigned char chunktype; @@ -14,8 +25,6 @@ struct chunkpack { unsigned char * data; };//chunkpack -unsigned int chunk_rec_max = 128; - //clean a chunk so that it may be re-used without memory leaks void scrubChunk(struct chunkpack c){ if (c.data){free(c.data);} @@ -23,6 +32,149 @@ void scrubChunk(struct chunkpack c){ c.real_len = 0; }//scrubChunk +//sends the chunk over the network +void SendChunk(chunkpack ch){ + unsigned char tmp; + unsigned int tmpi; + if (ch.cs_id <= 63){ + tmp = ch.cs_id; fwrite(&tmp, 1, 1, stdout); + }else{ + if (ch.cs_id <= 255+64){ + tmp = 0; fwrite(&tmp, 1, 1, stdout); + tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout); + }else{ + tmp = 1; fwrite(&tmp, 1, 1, stdout); + tmpi = ch.cs_id - 64; + tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); + } + } + //timestamp + //TODO: support for > 0x00ffffff timestamps! + tmpi = ch.timestamp; + tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + //len + tmpi = ch.len; + tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + //msg type id + tmp = ch.msg_type_id; fwrite(&tmp, 1, 1, stdout); + //msg stream id + tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, stdout); + tmp = ch.msg_stream_id % 256; fwrite(&tmp, 1, 1, stdout); + ch.len_left = 0; + while (ch.len_left < ch.len){ + tmpi = ch.len - ch.len_left; + if (tmpi > chunk_snd_max){tmpi = chunk_snd_max;} + fwrite((ch.data + ch.len_left), 1, tmpi, stdout); + ch.len_left += tmpi; + if (ch.len_left < ch.len){ + if (ch.cs_id <= 63){ + tmp = 0xC + ch.cs_id; fwrite(&tmp, 1, 1, stdout); + }else{ + if (ch.cs_id <= 255+64){ + tmp = 0xC0; fwrite(&tmp, 1, 1, stdout); + tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout); + }else{ + tmp = 0xC1; fwrite(&tmp, 1, 1, stdout); + tmpi = ch.cs_id - 64; + tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); + tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); + } + } + } + } + fflush(stdout); +}//SendChunk + +//sends a control message +void SendCTL(unsigned char type, unsigned int data){ + chunkpack ch; + timeval t; + gettimeofday(&t, 0); + ch.cs_id = 2; + ch.timestamp = t.tv_sec * 10 + t.tv_usec / 1000000; + ch.len = 4; + ch.real_len = 4; + ch.len_left = 0; + ch.msg_type_id = type; + ch.msg_stream_id = 0; + ch.data = (unsigned char*)malloc(4); + data = htonl(data); + memcpy(ch.data, &data, 4); + SendChunk(ch); + free(ch.data); +}//SendCTL + +//sends a control message +void SendCTL(unsigned char type, unsigned int data, unsigned char data2){ + chunkpack ch; + timeval t; + gettimeofday(&t, 0); + ch.cs_id = 2; + ch.timestamp = t.tv_sec * 10 + t.tv_usec / 1000000; + ch.len = 5; + ch.real_len = 5; + ch.len_left = 0; + ch.msg_type_id = type; + ch.msg_stream_id = 0; + ch.data = (unsigned char*)malloc(5); + data = htonl(data); + memcpy(ch.data, &data, 4); + ch.data[4] = data2; + SendChunk(ch); + free(ch.data); +}//SendCTL + +//sends a usr control message +void SendUSR(unsigned char type, unsigned int data){ + chunkpack ch; + timeval t; + gettimeofday(&t, 0); + ch.cs_id = 2; + ch.timestamp = t.tv_sec * 10 + t.tv_usec / 1000000; + ch.len = 6; + ch.real_len = 6; + ch.len_left = 0; + ch.msg_type_id = 4; + ch.msg_stream_id = 0; + ch.data = (unsigned char*)malloc(6); + data = htonl(data); + memcpy(ch.data+2, &data, 4); + ch.data[0] = 0; + ch.data[1] = type; + SendChunk(ch); + free(ch.data); +}//SendUSR + +//sends a usr control message +void SendUSR(unsigned char type, unsigned int data, unsigned int data2){ + chunkpack ch; + timeval t; + gettimeofday(&t, 0); + ch.cs_id = 2; + ch.timestamp = t.tv_sec * 10 + t.tv_usec / 1000000; + ch.len = 10; + ch.real_len = 10; + ch.len_left = 0; + ch.msg_type_id = 4; + ch.msg_stream_id = 0; + ch.data = (unsigned char*)malloc(10); + data = htonl(data); + data2 = htonl(data2); + memcpy(ch.data+2, &data, 4); + memcpy(ch.data+6, &data2, 4); + ch.data[0] = 0; + ch.data[1] = type; + SendChunk(ch); + free(ch.data); +}//SendUSR + //get a chunk from standard input struct chunkpack getChunk(struct chunkpack prev){ struct chunkpack ret; @@ -158,25 +310,16 @@ chunkpack * AddChunkPart(chunkpack newchunk){ *p = newchunk; p->data = (unsigned char*)malloc(p->real_len); memcpy(p->data, newchunk.data, p->real_len); - if (p->len_left == 0){ - fprintf(stderr, "New chunk of size %i / %i is whole - returning it\n", newchunk.real_len, newchunk.len); - return p; - } - fprintf(stderr, "New chunk of size %i / %i\n", newchunk.real_len, newchunk.len); + if (p->len_left == 0){return p;} ch_lst[newchunk.cs_id] = p; }else{ p = it->second; - fprintf(stderr, "Appending chunk of size %i to chunk of size %i / %i...\n", newchunk.real_len, p->real_len, p->len); - fprintf(stderr, "Reallocating %i bytes\n", p->real_len + newchunk.real_len); tmpdata = (unsigned char*)realloc(p->data, p->real_len + newchunk.real_len); if (tmpdata == 0){fprintf(stderr, "Error allocating memory!\n");return 0;} p->data = tmpdata; - fprintf(stderr, "Reallocated %i bytes\n", p->real_len + newchunk.real_len); memcpy(p->data+p->real_len, newchunk.data, newchunk.real_len); - fprintf(stderr, "Copied contents over\n"); p->real_len += newchunk.real_len; p->len_left -= newchunk.real_len; - fprintf(stderr, "New size: %i / %i\n", p->real_len, p->len); if (p->len_left <= 0){ ch_lst.erase(it); return p; @@ -198,7 +341,6 @@ chunkpack getWholeChunk(){ gwc_next = getChunk(gwc_prev); scrubChunk(gwc_prev); gwc_prev = gwc_next; - fprintf(stderr, "Processing chunk...\n"); ret = AddChunkPart(gwc_next); if (ret){ gwc_complete = *ret; diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 74d9a6b3..eb3a0e4f 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -3,76 +3,12 @@ #include #include #include "handshake.cpp" //handshaking -#include "chunkstream.cpp" //chunkstream decoding -#include "amf.cpp" //simple AMF0 parsing +#include "parsechunks.cpp" //chunkstream parsing int main(){ doHandshake(); - - chunkpack next; - std::vector * amfdata = 0; while (!feof(stdin)){ - next = getWholeChunk(); - if (next.cs_id == 2 && next.msg_stream_id == 0){ - fprintf(stderr, "Received protocol message. (cs_id 2, stream id 0)\nContents:\n"); - fwrite(next.data, 1, next.real_len, stderr); - fflush(stderr); - } - switch (next.msg_type_id){ - case 1: - fprintf(stderr, "CTRL: Set chunk size\n"); - break; - case 2: - fprintf(stderr, "CTRL: Abort message\n"); - break; - case 3: - fprintf(stderr, "CTRL: Acknowledgement\n"); - break; - case 4: - fprintf(stderr, "CTRL: User control message\n"); - break; - case 5: - fprintf(stderr, "CTRL: Window size\n"); - break; - case 6: - fprintf(stderr, "CTRL: Set peer bandwidth\n"); - break; - case 8: - fprintf(stderr, "Received audio data\n"); - break; - case 9: - fprintf(stderr, "Received video data\n"); - break; - case 15: - fprintf(stderr, "Received AFM3 data message\n"); - break; - case 16: - fprintf(stderr, "Received AFM3 shared object\n"); - break; - case 17: - fprintf(stderr, "Received AFM3 command message\n"); - break; - case 18: - fprintf(stderr, "Received AFM0 data message\n"); - break; - case 19: - fprintf(stderr, "Received AFM0 shared object\n"); - break; - case 20: - fprintf(stderr, "Received AFM0 command message\n"); - if (amfdata != 0){delete amfdata;} - amfdata = parseAMF(next.data, next.real_len); - - break; - case 22: - fprintf(stderr, "Received aggregate message\n"); - break; - default: - fprintf(stderr, "Unknown chunk received!\n"); - break; - } + parseChunk(); } - - return 0; }//main diff --git a/Connector_RTMP/parsechunks.cpp b/Connector_RTMP/parsechunks.cpp new file mode 100644 index 00000000..8785caf8 --- /dev/null +++ b/Connector_RTMP/parsechunks.cpp @@ -0,0 +1,97 @@ +#include "chunkstream.cpp" //chunkstream decoding +#include "amf.cpp" //simple AMF0 parsing + + +//gets and parses one chunk +void parseChunk(){ + static chunkpack next; + static std::vector * amfdata = 0; + static AMFType * amfelem = 0; + static int tmpint; + next = getWholeChunk(); + if (next.cs_id == 2 && next.msg_stream_id == 0){ + fprintf(stderr, "Received protocol message. (cs_id 2, stream id 0)\nContents:\n"); + fwrite(next.data, 1, next.real_len, stderr); + fflush(stderr); + } + switch (next.msg_type_id){ + case 0://does not exist + break;//happens when connection breaks unexpectedly + case 1://set chunk size + chunk_rec_max = ntohl(*(int*)next.data); + fprintf(stderr, "CTRL: Set chunk size: %i\n", chunk_rec_max); + break; + case 2://abort message - we ignore this one + fprintf(stderr, "CTRL: Abort message\n"); + //4 bytes of stream id to drop + break; + case 3://ack + fprintf(stderr, "CTRL: Acknowledgement\n"); + snd_window_at = ntohl(*(int*)next.data); + //maybe better? snd_window_at = snd_cnt; + break; + case 4: + fprintf(stderr, "CTRL: User control message\n"); + //2 bytes event type, rest = event data + //TODO: process this + break; + case 5://window size of other end + fprintf(stderr, "CTRL: Window size\n"); + rec_window_size = ntohl(*(int*)next.data); + break; + case 6: + fprintf(stderr, "CTRL: Set peer bandwidth\n"); + //4 bytes window size, 1 byte limit type (ignored) + snd_window_size = ntohl(*(int*)next.data); + SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) + break; + case 8: + fprintf(stderr, "Received audio data\n"); + break; + case 9: + fprintf(stderr, "Received video data\n"); + break; + case 15: + fprintf(stderr, "Received AFM3 data message\n"); + break; + case 16: + fprintf(stderr, "Received AFM3 shared object\n"); + break; + case 17: + fprintf(stderr, "Received AFM3 command message\n"); + break; + case 18: + fprintf(stderr, "Received AFM0 data message\n"); + break; + case 19: + fprintf(stderr, "Received AFM0 shared object\n"); + break; + case 20: + if (amfdata != 0){delete amfdata;} + amfdata = parseAMF(next.data, next.real_len); + fprintf(stderr, "Received AFM0 command message: %s\n", (*amfdata)[0].StrValue().c_str()); + if ((*amfdata)[0].StrValue() == "connect"){ + tmpint = getAMF(amfdata, "videoCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} + if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} + tmpint = getAMF(amfdata, "audioCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} + if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) + SendCTL(5, rec_window_size, 1);//send peer bandwidth (msg 6) + SendUSR(0, 10);//send UCM StreamBegin (0), stream 10 (we use this number) + //send AFM0 (20) {_result, 1, {properties}, {info}} + }else{ + //call, close, createStream + //TODO: play (&& play2?) + fprintf(stderr, "Ignored AFM0 command.\n"); + } + break; + case 22: + fprintf(stderr, "Received aggregate message\n"); + break; + default: + fprintf(stderr, "Unknown chunk received!\n"); + break; + } +}//parseChunk