From d642d2f1112e5dccf362b1d77e12f72c6635877f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 28 Jul 2010 18:47:31 +0200 Subject: [PATCH] AMF0 parsing, chunk merging --- {RTMP/Connector => Connector_RTMP}/Makefile | 2 +- Connector_RTMP/amf.cpp | 151 ++++++++++++++++++ .../chunkstream.cpp | 87 +++++++++- .../handshake.cpp | 0 {RTMP/Connector => Connector_RTMP}/main.cpp | 14 +- 5 files changed, 238 insertions(+), 16 deletions(-) rename {RTMP/Connector => Connector_RTMP}/Makefile (90%) create mode 100644 Connector_RTMP/amf.cpp rename {RTMP/Connector => Connector_RTMP}/chunkstream.cpp (54%) rename {RTMP/Connector => Connector_RTMP}/handshake.cpp (100%) rename {RTMP/Connector => Connector_RTMP}/main.cpp (88%) diff --git a/RTMP/Connector/Makefile b/Connector_RTMP/Makefile similarity index 90% rename from RTMP/Connector/Makefile rename to Connector_RTMP/Makefile index c7a44e45..7d4bea53 100644 --- a/RTMP/Connector/Makefile +++ b/Connector_RTMP/Makefile @@ -12,7 +12,7 @@ LIBS = default: $(OUT) .cpp.o: $(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@ -$(OUT): $(OBJ) handshake.cpp chunkstream.cpp +$(OUT): $(OBJ) handshake.cpp chunkstream.cpp amf.cpp $(CC) $(LIBS) -o $(OUT) $(OBJ) clean: rm -rf $(OBJ) $(OUT) Makefile.bak *~ diff --git a/Connector_RTMP/amf.cpp b/Connector_RTMP/amf.cpp new file mode 100644 index 00000000..0f22bdfe --- /dev/null +++ b/Connector_RTMP/amf.cpp @@ -0,0 +1,151 @@ +#include +#include +#include + +class AMFType { + public: + double NumValue(){return numval;}; + std::string StrValue(){return strval;}; + AMFType(double val){strval = ""; numval = val;}; + AMFType(std::string val){strval = val; numval = 0;}; + private: + std::string strval; + double numval; +};//AMFType + +//scans the vector for the indice, returns the next AMFType from it or null +AMFType * getAMF(std::vector * vect, std::string indice){ + std::vector::iterator it; + for (it=vect.begin(); it < vect.end(); it++){ + if ((*it)->StrValue() == indice){it++; return *it;} + } + return 0; +}//getAMF + +std::vector * parseAMF(unsigned char * data, unsigned int len){ + std::vector * ret = new std::vector; + unsigned int i = 0; + std::string tmpstr; + unsigned int tmpi = 0; + unsigned char tmpdbl[8]; + while (i < len){ + switch (data[i]){ + case 0x00://number + tmpdbl[7] = data[i+1]; + tmpdbl[6] = data[i+2]; + tmpdbl[5] = data[i+3]; + tmpdbl[4] = data[i+4]; + tmpdbl[3] = data[i+5]; + tmpdbl[2] = data[i+6]; + tmpdbl[1] = data[i+7]; + tmpdbl[0] = data[i+8]; + ret->push_back(*(double*)tmpdbl); + fprintf(stderr, "AMF: Number %f\n", *(double*)tmpdbl); + i += 8; + break; + case 0x01://bool + if (data[i+1] == 0){ + ret->push_back((double)0); + fprintf(stderr, "AMF: Bool false\n"); + }else{ + ret->push_back((double)1); + fprintf(stderr, "AMF: Bool true\n"); + } + ++i; + break; + case 0x0C://long string + tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4]; + tmpstr = (char*)(data+i+5); + ret->push_back(tmpstr); + i += tmpi + 4; + fprintf(stderr, "AMF: String %s\n", tmpstr.c_str()); + break; + case 0x02://string + tmpi = data[i+1]*256+data[i+2]; + tmpstr = (char*)(data+i+3); + ret->push_back(tmpstr); + i += tmpi + 2; + fprintf(stderr, "AMF: String %s\n", tmpstr.c_str()); + break; + case 0x05://null + case 0x06://undefined + case 0x0D://unsupported + fprintf(stderr, "AMF: Null\n"); + ret->push_back((double)0); + break; + case 0x03://object + ++i; + while (data[i] + data[i+1] != 0){ + tmpi = data[i]*256+data[i+1]; + tmpstr = (char*)(data+i+2); + ret->push_back(tmpstr); + i += tmpi + 2; + fprintf(stderr, "AMF: Indice %s\n", tmpstr.c_str()); + switch (data[i]){ + case 0x00://number + tmpdbl[7] = data[i+1]; + tmpdbl[6] = data[i+2]; + tmpdbl[5] = data[i+3]; + tmpdbl[4] = data[i+4]; + tmpdbl[3] = data[i+5]; + tmpdbl[2] = data[i+6]; + tmpdbl[1] = data[i+7]; + tmpdbl[0] = data[i+8]; + ret->push_back(*(double*)tmpdbl); + fprintf(stderr, "AMF: Value Number %f\n", *(double*)tmpdbl); + i += 8; + break; + case 0x01://bool + if (data[i+1] == 0){ + ret->push_back((double)0); + fprintf(stderr, "AMF: Value Bool false\n"); + }else{ + ret->push_back((double)1); + fprintf(stderr, "AMF: Value Bool true\n"); + } + ++i; + break; + case 0x0C://long string + tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4]; + tmpstr = (char*)(data+i+5); + ret->push_back(tmpstr); + i += tmpi + 4; + fprintf(stderr, "AMF: Value String %s\n", tmpstr.c_str()); + break; + case 0x02://string + tmpi = data[i+1]*256+data[i+2]; + tmpstr = (char*)(data+i+3); + ret->push_back(tmpstr); + i += tmpi + 2; + fprintf(stderr, "AMF: Value String %s\n", tmpstr.c_str()); + break; + case 0x05://null + case 0x06://undefined + case 0x0D://unsupported + fprintf(stderr, "AMF: Value Null\n"); + ret->push_back((double)0); + break; + default: + fprintf(stderr, "Error: Unknown AMF object contents type %hhx - returning.\n", data[i]); + break; + } + ++i; + } + i += 2; + break; + case 0x07://reference + case 0x08://array + case 0x0A://strict array + case 0x0B://date + case 0x0F://XML + case 0x10://typed object + case 0x11://AMF+ + default: + fprintf(stderr, "Error: Unknown AMF type %hhx - returning.\n", data[i]); + return ret; + break; + } + ++i; + } + return ret; +}//parseAMF diff --git a/RTMP/Connector/chunkstream.cpp b/Connector_RTMP/chunkstream.cpp similarity index 54% rename from RTMP/Connector/chunkstream.cpp rename to Connector_RTMP/chunkstream.cpp index fc340768..8f3c6e6e 100644 --- a/RTMP/Connector/chunkstream.cpp +++ b/Connector_RTMP/chunkstream.cpp @@ -1,3 +1,7 @@ +#include +#include +#include + struct chunkpack { unsigned char chunktype; unsigned int cs_id; @@ -24,7 +28,7 @@ struct chunkpack getChunk(struct chunkpack prev){ struct chunkpack ret; unsigned char temp; fread(&(ret.chunktype), 1, 1, stdin); - fprintf(stderr, "Got chunkstream ID %hhi\n", ret.chunktype & 0x3F); + //read the chunkstream ID properly switch (ret.chunktype & 0x3F){ case 0: fread(&temp, 1, 1, stdin); @@ -40,7 +44,7 @@ struct chunkpack getChunk(struct chunkpack prev){ ret.cs_id = ret.chunktype & 0x3F; break; } - fprintf(stderr, "Got a type %hhi chunk\n", ret.chunktype & 0xC0); + //process the rest of the header, for each chunk type switch (ret.chunktype & 0xC0){ case 0: fread(&temp, 1, 1, stdin); @@ -59,7 +63,9 @@ struct chunkpack getChunk(struct chunkpack prev){ fread(&temp, 1, 1, stdin); ret.msg_type_id = temp; fread(&temp, 1, 1, stdin); - ret.msg_stream_id = temp*256*256; + ret.msg_stream_id = temp*256*256*256; + fread(&temp, 1, 1, stdin); + ret.msg_stream_id += temp*256*256; fread(&temp, 1, 1, stdin); ret.msg_stream_id += temp*256; fread(&temp, 1, 1, stdin); @@ -105,10 +111,7 @@ struct chunkpack getChunk(struct chunkpack prev){ ret.msg_stream_id = prev.msg_stream_id; break; } - fprintf(stderr, "Timestamp: %i\n", ret.timestamp); - fprintf(stderr, "Length: %i\n", ret.len); - fprintf(stderr, "Message type ID: %hhi\n", ret.msg_type_id); - fprintf(stderr, "Message stream ID: %i\n", ret.msg_stream_id); + //calculate chunk length, real length, and length left till complete if (ret.len_left > 0){ ret.real_len = ret.len_left; ret.len_left -= ret.real_len; @@ -117,7 +120,9 @@ struct chunkpack getChunk(struct chunkpack prev){ } if (ret.real_len > chunk_rec_max){ ret.len_left += ret.real_len - chunk_rec_max; + ret.real_len = chunk_rec_max; } + //read extended timestamp, if neccesary if (ret.timestamp == 0x00ffffff){ fread(&temp, 1, 1, stdin); ret.timestamp = temp*256*256*256; @@ -128,6 +133,7 @@ struct chunkpack getChunk(struct chunkpack prev){ fread(&temp, 1, 1, stdin); ret.timestamp += temp; } + //read data if length > 0, and allocate it if (ret.real_len > 0){ ret.data = (unsigned char*)malloc(ret.real_len); fread(ret.data, 1, ret.real_len, stdin); @@ -135,4 +141,69 @@ struct chunkpack getChunk(struct chunkpack prev){ ret.data = 0; } return ret; -} +}//getChunk + +//adds newchunk to global list of unfinished chunks, re-assembling them complete +//returns pointer to chunk when a chunk is finished, 0 otherwise +//removes pointed to chunk from internal list if returned, without cleanup +// (cleanup performed in getWholeChunk function) +chunkpack * AddChunkPart(chunkpack newchunk){ + chunkpack * p; + unsigned char * tmpdata = 0; + static std::map ch_lst; + std::map::iterator it; + it = ch_lst.find(newchunk.cs_id); + if (it == ch_lst.end()){ + p = (chunkpack*)malloc(sizeof(chunkpack)); + *p = newchunk; + p->data = (unsigned char*)malloc(p->real_len); + memcpy(p->data, newchunk.data, p->real_len); + if (p->len_left == 0){ + fprintf(stderr, "New chunk of size %i / %i is whole - returning it\n", newchunk.real_len, newchunk.len); + return p; + } + fprintf(stderr, "New chunk of size %i / %i\n", newchunk.real_len, newchunk.len); + ch_lst[newchunk.cs_id] = p; + }else{ + p = it->second; + fprintf(stderr, "Appending chunk of size %i to chunk of size %i / %i...\n", newchunk.real_len, p->real_len, p->len); + fprintf(stderr, "Reallocating %i bytes\n", p->real_len + newchunk.real_len); + tmpdata = (unsigned char*)realloc(p->data, p->real_len + newchunk.real_len); + if (tmpdata == 0){fprintf(stderr, "Error allocating memory!\n");return 0;} + p->data = tmpdata; + fprintf(stderr, "Reallocated %i bytes\n", p->real_len + newchunk.real_len); + memcpy(p->data+p->real_len, newchunk.data, newchunk.real_len); + fprintf(stderr, "Copied contents over\n"); + p->real_len += newchunk.real_len; + p->len_left -= newchunk.real_len; + fprintf(stderr, "New size: %i / %i\n", p->real_len, p->len); + if (p->len_left <= 0){ + ch_lst.erase(it); + return p; + }else{ + ch_lst[newchunk.cs_id] = p;//pointer may have changed + } + } + return 0; +}//AddChunkPart + +//grabs chunks until a whole one comes in, then returns that +chunkpack getWholeChunk(){ + static chunkpack gwc_next, gwc_complete, gwc_prev; + static bool clean = false; + if (!clean){gwc_prev.data = 0; clean = true;}//prevent brain damage + chunkpack * ret = 0; + scrubChunk(gwc_complete); + while (true){ + gwc_next = getChunk(gwc_prev); + scrubChunk(gwc_prev); + gwc_prev = gwc_next; + fprintf(stderr, "Processing chunk...\n"); + ret = AddChunkPart(gwc_next); + if (ret){ + gwc_complete = *ret; + free(ret);//cleanup returned chunk + return gwc_complete; + } + } +}//getWholeChunk diff --git a/RTMP/Connector/handshake.cpp b/Connector_RTMP/handshake.cpp similarity index 100% rename from RTMP/Connector/handshake.cpp rename to Connector_RTMP/handshake.cpp diff --git a/RTMP/Connector/main.cpp b/Connector_RTMP/main.cpp similarity index 88% rename from RTMP/Connector/main.cpp rename to Connector_RTMP/main.cpp index 275cd69e..74d9a6b3 100644 --- a/RTMP/Connector/main.cpp +++ b/Connector_RTMP/main.cpp @@ -4,16 +4,15 @@ #include #include "handshake.cpp" //handshaking #include "chunkstream.cpp" //chunkstream decoding +#include "amf.cpp" //simple AMF0 parsing int main(){ - chunkpack prev, next; doHandshake(); - std::cerr << "Handshake completed" << std::endl; - prev.len = 0; - prev.data = 0; + chunkpack next; + std::vector * amfdata = 0; while (!feof(stdin)){ - next = getChunk(prev); + next = getWholeChunk(); if (next.cs_id == 2 && next.msg_stream_id == 0){ fprintf(stderr, "Received protocol message. (cs_id 2, stream id 0)\nContents:\n"); fwrite(next.data, 1, next.real_len, stderr); @@ -61,6 +60,9 @@ int main(){ break; case 20: fprintf(stderr, "Received AFM0 command message\n"); + if (amfdata != 0){delete amfdata;} + amfdata = parseAMF(next.data, next.real_len); + break; case 22: fprintf(stderr, "Received aggregate message\n"); @@ -69,8 +71,6 @@ int main(){ fprintf(stderr, "Unknown chunk received!\n"); break; } - scrubChunk(prev); - prev = next; }