AMF0 parsing, chunk merging

This commit is contained in:
Thulinma 2010-07-28 18:47:31 +02:00
parent 6bff69af30
commit d642d2f111
5 changed files with 238 additions and 16 deletions

24
Connector_RTMP/Makefile Normal file
View file

@ -0,0 +1,24 @@
SRC = main.cpp
OBJ = $(SRC:.cpp=.o)
OUT = Connector_RTMP
INCLUDES =
CCFLAGS = -Wall -Wextra -funsigned-char -g
CC = $(CROSS)g++
LD = $(CROSS)ld
AR = $(CROSS)ar
LIBS =
.SUFFIXES: .cpp
.PHONY: clean default
default: $(OUT)
.cpp.o:
$(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@
$(OUT): $(OBJ) handshake.cpp chunkstream.cpp amf.cpp
$(CC) $(LIBS) -o $(OUT) $(OBJ)
clean:
rm -rf $(OBJ) $(OUT) Makefile.bak *~
run-test: $(OUT)
rm -rf ./meh
mkfifo ./meh
nc -o test -l -p 1935 -e './Connector_RTMP 2>./meh'
run-cat:
cat < ./meh

151
Connector_RTMP/amf.cpp Normal file
View file

@ -0,0 +1,151 @@
#include <vector>
#include <string.h>
#include <string>
class AMFType {
public:
double NumValue(){return numval;};
std::string StrValue(){return strval;};
AMFType(double val){strval = ""; numval = val;};
AMFType(std::string val){strval = val; numval = 0;};
private:
std::string strval;
double numval;
};//AMFType
//scans the vector for the indice, returns the next AMFType from it or null
AMFType * getAMF(std::vector<AMFType> * vect, std::string indice){
std::vector<AMFType>::iterator it;
for (it=vect.begin(); it < vect.end(); it++){
if ((*it)->StrValue() == indice){it++; return *it;}
}
return 0;
}//getAMF
std::vector<AMFType> * parseAMF(unsigned char * data, unsigned int len){
std::vector<AMFType> * ret = new std::vector<AMFType>;
unsigned int i = 0;
std::string tmpstr;
unsigned int tmpi = 0;
unsigned char tmpdbl[8];
while (i < len){
switch (data[i]){
case 0x00://number
tmpdbl[7] = data[i+1];
tmpdbl[6] = data[i+2];
tmpdbl[5] = data[i+3];
tmpdbl[4] = data[i+4];
tmpdbl[3] = data[i+5];
tmpdbl[2] = data[i+6];
tmpdbl[1] = data[i+7];
tmpdbl[0] = data[i+8];
ret->push_back(*(double*)tmpdbl);
fprintf(stderr, "AMF: Number %f\n", *(double*)tmpdbl);
i += 8;
break;
case 0x01://bool
if (data[i+1] == 0){
ret->push_back((double)0);
fprintf(stderr, "AMF: Bool false\n");
}else{
ret->push_back((double)1);
fprintf(stderr, "AMF: Bool true\n");
}
++i;
break;
case 0x0C://long string
tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4];
tmpstr = (char*)(data+i+5);
ret->push_back(tmpstr);
i += tmpi + 4;
fprintf(stderr, "AMF: String %s\n", tmpstr.c_str());
break;
case 0x02://string
tmpi = data[i+1]*256+data[i+2];
tmpstr = (char*)(data+i+3);
ret->push_back(tmpstr);
i += tmpi + 2;
fprintf(stderr, "AMF: String %s\n", tmpstr.c_str());
break;
case 0x05://null
case 0x06://undefined
case 0x0D://unsupported
fprintf(stderr, "AMF: Null\n");
ret->push_back((double)0);
break;
case 0x03://object
++i;
while (data[i] + data[i+1] != 0){
tmpi = data[i]*256+data[i+1];
tmpstr = (char*)(data+i+2);
ret->push_back(tmpstr);
i += tmpi + 2;
fprintf(stderr, "AMF: Indice %s\n", tmpstr.c_str());
switch (data[i]){
case 0x00://number
tmpdbl[7] = data[i+1];
tmpdbl[6] = data[i+2];
tmpdbl[5] = data[i+3];
tmpdbl[4] = data[i+4];
tmpdbl[3] = data[i+5];
tmpdbl[2] = data[i+6];
tmpdbl[1] = data[i+7];
tmpdbl[0] = data[i+8];
ret->push_back(*(double*)tmpdbl);
fprintf(stderr, "AMF: Value Number %f\n", *(double*)tmpdbl);
i += 8;
break;
case 0x01://bool
if (data[i+1] == 0){
ret->push_back((double)0);
fprintf(stderr, "AMF: Value Bool false\n");
}else{
ret->push_back((double)1);
fprintf(stderr, "AMF: Value Bool true\n");
}
++i;
break;
case 0x0C://long string
tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4];
tmpstr = (char*)(data+i+5);
ret->push_back(tmpstr);
i += tmpi + 4;
fprintf(stderr, "AMF: Value String %s\n", tmpstr.c_str());
break;
case 0x02://string
tmpi = data[i+1]*256+data[i+2];
tmpstr = (char*)(data+i+3);
ret->push_back(tmpstr);
i += tmpi + 2;
fprintf(stderr, "AMF: Value String %s\n", tmpstr.c_str());
break;
case 0x05://null
case 0x06://undefined
case 0x0D://unsupported
fprintf(stderr, "AMF: Value Null\n");
ret->push_back((double)0);
break;
default:
fprintf(stderr, "Error: Unknown AMF object contents type %hhx - returning.\n", data[i]);
break;
}
++i;
}
i += 2;
break;
case 0x07://reference
case 0x08://array
case 0x0A://strict array
case 0x0B://date
case 0x0F://XML
case 0x10://typed object
case 0x11://AMF+
default:
fprintf(stderr, "Error: Unknown AMF type %hhx - returning.\n", data[i]);
return ret;
break;
}
++i;
}
return ret;
}//parseAMF

View file

@ -0,0 +1,209 @@
#include <map>
#include <string.h>
#include <stdlib.h>
struct chunkpack {
unsigned char chunktype;
unsigned int cs_id;
unsigned int timestamp;
unsigned int len;
unsigned int real_len;
unsigned int len_left;
unsigned char msg_type_id;
unsigned int msg_stream_id;
unsigned char * data;
};//chunkpack
unsigned int chunk_rec_max = 128;
//clean a chunk so that it may be re-used without memory leaks
void scrubChunk(struct chunkpack c){
if (c.data){free(c.data);}
c.data = 0;
c.real_len = 0;
}//scrubChunk
//get a chunk from standard input
struct chunkpack getChunk(struct chunkpack prev){
struct chunkpack ret;
unsigned char temp;
fread(&(ret.chunktype), 1, 1, stdin);
//read the chunkstream ID properly
switch (ret.chunktype & 0x3F){
case 0:
fread(&temp, 1, 1, stdin);
ret.cs_id = temp + 64;
break;
case 1:
fread(&temp, 1, 1, stdin);
ret.cs_id = temp + 64;
fread(&temp, 1, 1, stdin);
ret.cs_id += temp * 256;
break;
default:
ret.cs_id = ret.chunktype & 0x3F;
break;
}
//process the rest of the header, for each chunk type
switch (ret.chunktype & 0xC0){
case 0:
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp;
fread(&temp, 1, 1, stdin);
ret.len = temp*256*256;
fread(&temp, 1, 1, stdin);
ret.len += temp*256;
fread(&temp, 1, 1, stdin);
ret.len += temp;
ret.len_left = 0;
fread(&temp, 1, 1, stdin);
ret.msg_type_id = temp;
fread(&temp, 1, 1, stdin);
ret.msg_stream_id = temp*256*256*256;
fread(&temp, 1, 1, stdin);
ret.msg_stream_id += temp*256*256;
fread(&temp, 1, 1, stdin);
ret.msg_stream_id += temp*256;
fread(&temp, 1, 1, stdin);
ret.msg_stream_id += temp;
break;
case 1:
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp;
ret.timestamp += prev.timestamp;
fread(&temp, 1, 1, stdin);
ret.len = temp*256*256;
fread(&temp, 1, 1, stdin);
ret.len += temp*256;
fread(&temp, 1, 1, stdin);
ret.len += temp;
ret.len_left = 0;
fread(&temp, 1, 1, stdin);
ret.msg_type_id = temp;
ret.msg_stream_id = prev.msg_stream_id;
break;
case 2:
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp;
ret.timestamp += prev.timestamp;
ret.len = prev.len;
ret.len_left = prev.len_left;
ret.msg_type_id = prev.msg_type_id;
ret.msg_stream_id = prev.msg_stream_id;
break;
case 3:
ret.timestamp = prev.timestamp;
ret.len = prev.len;
ret.len_left = prev.len_left;
ret.msg_type_id = prev.msg_type_id;
ret.msg_stream_id = prev.msg_stream_id;
break;
}
//calculate chunk length, real length, and length left till complete
if (ret.len_left > 0){
ret.real_len = ret.len_left;
ret.len_left -= ret.real_len;
}else{
ret.real_len = ret.len;
}
if (ret.real_len > chunk_rec_max){
ret.len_left += ret.real_len - chunk_rec_max;
ret.real_len = chunk_rec_max;
}
//read extended timestamp, if neccesary
if (ret.timestamp == 0x00ffffff){
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp*256*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
ret.timestamp += temp;
}
//read data if length > 0, and allocate it
if (ret.real_len > 0){
ret.data = (unsigned char*)malloc(ret.real_len);
fread(ret.data, 1, ret.real_len, stdin);
}else{
ret.data = 0;
}
return ret;
}//getChunk
//adds newchunk to global list of unfinished chunks, re-assembling them complete
//returns pointer to chunk when a chunk is finished, 0 otherwise
//removes pointed to chunk from internal list if returned, without cleanup
// (cleanup performed in getWholeChunk function)
chunkpack * AddChunkPart(chunkpack newchunk){
chunkpack * p;
unsigned char * tmpdata = 0;
static std::map<unsigned int, chunkpack *> ch_lst;
std::map<unsigned int, chunkpack *>::iterator it;
it = ch_lst.find(newchunk.cs_id);
if (it == ch_lst.end()){
p = (chunkpack*)malloc(sizeof(chunkpack));
*p = newchunk;
p->data = (unsigned char*)malloc(p->real_len);
memcpy(p->data, newchunk.data, p->real_len);
if (p->len_left == 0){
fprintf(stderr, "New chunk of size %i / %i is whole - returning it\n", newchunk.real_len, newchunk.len);
return p;
}
fprintf(stderr, "New chunk of size %i / %i\n", newchunk.real_len, newchunk.len);
ch_lst[newchunk.cs_id] = p;
}else{
p = it->second;
fprintf(stderr, "Appending chunk of size %i to chunk of size %i / %i...\n", newchunk.real_len, p->real_len, p->len);
fprintf(stderr, "Reallocating %i bytes\n", p->real_len + newchunk.real_len);
tmpdata = (unsigned char*)realloc(p->data, p->real_len + newchunk.real_len);
if (tmpdata == 0){fprintf(stderr, "Error allocating memory!\n");return 0;}
p->data = tmpdata;
fprintf(stderr, "Reallocated %i bytes\n", p->real_len + newchunk.real_len);
memcpy(p->data+p->real_len, newchunk.data, newchunk.real_len);
fprintf(stderr, "Copied contents over\n");
p->real_len += newchunk.real_len;
p->len_left -= newchunk.real_len;
fprintf(stderr, "New size: %i / %i\n", p->real_len, p->len);
if (p->len_left <= 0){
ch_lst.erase(it);
return p;
}else{
ch_lst[newchunk.cs_id] = p;//pointer may have changed
}
}
return 0;
}//AddChunkPart
//grabs chunks until a whole one comes in, then returns that
chunkpack getWholeChunk(){
static chunkpack gwc_next, gwc_complete, gwc_prev;
static bool clean = false;
if (!clean){gwc_prev.data = 0; clean = true;}//prevent brain damage
chunkpack * ret = 0;
scrubChunk(gwc_complete);
while (true){
gwc_next = getChunk(gwc_prev);
scrubChunk(gwc_prev);
gwc_prev = gwc_next;
fprintf(stderr, "Processing chunk...\n");
ret = AddChunkPart(gwc_next);
if (ret){
gwc_complete = *ret;
free(ret);//cleanup returned chunk
return gwc_complete;
}
}
}//getWholeChunk

View file

@ -0,0 +1,40 @@
struct Handshake {
char Time[4];
char Zero[4];
char Random[1528];
};//Handshake
void doHandshake(){
srand(time(NULL));
char Version;
Handshake Client;
Handshake Server;
/** Read C0 **/
fread(&(Version), 1, 1, stdin);
/** Read C1 **/
fread(Client.Time, 1, 4, stdin);
fread(Client.Zero, 1, 4, stdin);
fread(Client.Random, 1, 1528, stdin);
/** Build S1 Packet **/
Server.Time[0] = 0; Server.Time[1] = 0; Server.Time[2] = 0; Server.Time[3] = 0;
Server.Zero[0] = 0; Server.Zero[1] = 0; Server.Zero[2] = 0; Server.Zero[3] = 0;
for (int i = 0; i < 1528; i++){Server.Random[i] = (rand() % 256);}
/** Send S0 **/
fwrite(&(Version), 1, 1, stdout);
/** Send S1 **/
fwrite(Server.Time, 1, 4, stdout);
fwrite(Server.Zero, 1, 4, stdout);
fwrite(Server.Random, 1, 1528, stdout);
/** Flush output, just for certainty **/
fflush(stdout);
/** Send S2 **/
fwrite(Client.Time, 1, 4, stdout);
fwrite(Client.Time, 1, 4, stdout);
fwrite(Client.Random, 1, 1528, stdout);
/** Flush, necessary in order to work **/
fflush(stdout);
/** Read and discard C2 **/
fread(Client.Time, 1, 4, stdin);
fread(Client.Zero, 1, 4, stdin);
fread(Client.Random, 1, 1528, stdin);
}//doHandshake

78
Connector_RTMP/main.cpp Normal file
View file

@ -0,0 +1,78 @@
#include <iostream>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include "handshake.cpp" //handshaking
#include "chunkstream.cpp" //chunkstream decoding
#include "amf.cpp" //simple AMF0 parsing
int main(){
doHandshake();
chunkpack next;
std::vector<AMFType> * amfdata = 0;
while (!feof(stdin)){
next = getWholeChunk();
if (next.cs_id == 2 && next.msg_stream_id == 0){
fprintf(stderr, "Received protocol message. (cs_id 2, stream id 0)\nContents:\n");
fwrite(next.data, 1, next.real_len, stderr);
fflush(stderr);
}
switch (next.msg_type_id){
case 1:
fprintf(stderr, "CTRL: Set chunk size\n");
break;
case 2:
fprintf(stderr, "CTRL: Abort message\n");
break;
case 3:
fprintf(stderr, "CTRL: Acknowledgement\n");
break;
case 4:
fprintf(stderr, "CTRL: User control message\n");
break;
case 5:
fprintf(stderr, "CTRL: Window size\n");
break;
case 6:
fprintf(stderr, "CTRL: Set peer bandwidth\n");
break;
case 8:
fprintf(stderr, "Received audio data\n");
break;
case 9:
fprintf(stderr, "Received video data\n");
break;
case 15:
fprintf(stderr, "Received AFM3 data message\n");
break;
case 16:
fprintf(stderr, "Received AFM3 shared object\n");
break;
case 17:
fprintf(stderr, "Received AFM3 command message\n");
break;
case 18:
fprintf(stderr, "Received AFM0 data message\n");
break;
case 19:
fprintf(stderr, "Received AFM0 shared object\n");
break;
case 20:
fprintf(stderr, "Received AFM0 command message\n");
if (amfdata != 0){delete amfdata;}
amfdata = parseAMF(next.data, next.real_len);
break;
case 22:
fprintf(stderr, "Received aggregate message\n");
break;
default:
fprintf(stderr, "Unknown chunk received!\n");
break;
}
}
return 0;
}//main