From f4c02f33d8614edde360881cb0b387e3dd7b6182 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 29 Jul 2010 23:08:05 +0200 Subject: [PATCH] werkende negotiation, klaar om daadwerklijk video/audio te versturen\! --- Connector_RTMP/amf.cpp | 78 ++++++++++++++++++++++++---------- Connector_RTMP/chunkstream.cpp | 66 +++++++++++++++++++++++----- Connector_RTMP/main.cpp | 28 +++++++++++- Connector_RTMP/parsechunks.cpp | 54 +++++++++++++++-------- 4 files changed, 174 insertions(+), 52 deletions(-) diff --git a/Connector_RTMP/amf.cpp b/Connector_RTMP/amf.cpp index f79415cb..c6bbdb96 100644 --- a/Connector_RTMP/amf.cpp +++ b/Connector_RTMP/amf.cpp @@ -84,7 +84,7 @@ class AMFType { } } return *this; - }; + };//= operator AMFType(const AMFType &a){ myIndice = a.myIndice; myType = a.myType; @@ -96,7 +96,7 @@ class AMFType { contents->push_back(*it); } }else{contents = 0;} - }; + };//copy constructor void Print(std::string indent = ""){ std::cerr << indent; switch (myType){ @@ -119,7 +119,54 @@ class AMFType { if (contents){ for (std::vector::iterator it = contents->begin(); it != contents->end(); it++){it->Print(indent+" ");} } - }; + };//print + std::string Pack(){ + std::string r = ""; + if ((myType == 0x02) && (strval.size() > 0xFFFF)){myType = 0x0C;} + if (myType != 0xFF){r += myType;} + switch (myType){ + case 0x00://number + r += *(((char*)&numval)+7); r += *(((char*)&numval)+6); + r += *(((char*)&numval)+5); r += *(((char*)&numval)+4); + r += *(((char*)&numval)+3); r += *(((char*)&numval)+2); + r += *(((char*)&numval)+1); r += *(((char*)&numval)); + break; + case 0x01://bool + r += (char)numval; + break; + case 0x02://short string + r += strval.size() / 256; + r += strval.size() % 256; + r += strval; + break; + case 0x0C://long string + r += strval.size() / (256*256*256); + r += strval.size() / (256*256); + r += strval.size() / 256; + r += strval.size() % 256; + r += strval; + break; + case 0x03://object + if (contents){ + for (std::vector::iterator it = contents->begin(); it != contents->end(); it++){ + r += it->Indice().size() / 256; + r += it->Indice().size() % 256; + r += it->Indice(); + r += it->Pack(); + } + } + r += (char)0; r += (char)0; r += (char)9; + break; + case 0xFF://container - our own type - do not send, only send contents + if (contents){ + for (std::vector::iterator it = contents->begin(); it != contents->end(); it++){ + r += it->Pack(); + } + } + break; + } + return r; + };//pack protected: std::string myIndice; unsigned char myType; @@ -128,7 +175,7 @@ class AMFType { std::vector * contents; };//AMFType -AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){ +AMFType parseOneAMF(const unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){ std::string tmpstr; unsigned int tmpi = 0; unsigned char tmpdbl[8]; @@ -143,16 +190,13 @@ AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, s tmpdbl[1] = data[i+7]; tmpdbl[0] = data[i+8]; i+=9; - fprintf(stderr, "AMF: Number %f\n", *(double*)tmpdbl); return AMFType(name, *(double*)tmpdbl, 0x00); break; case 0x01://bool i+=2; if (data[i-1] == 0){ - fprintf(stderr, "AMF: Bool false\n"); return AMFType(name, (double)0, 0x01); }else{ - fprintf(stderr, "AMF: Bool true\n"); return AMFType(name, (double)1, 0x01); } break; @@ -160,20 +204,17 @@ AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, s tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4]; tmpstr = (char*)(data+i+5); i += tmpi + 5; - fprintf(stderr, "AMF: String %s\n", tmpstr.c_str()); return AMFType(name, tmpstr, 0x0C); break; case 0x02://string tmpi = data[i+1]*256+data[i+2]; tmpstr = (char*)(data+i+3); i += tmpi + 3; - fprintf(stderr, "AMF: String %s\n", tmpstr.c_str()); return AMFType(name, tmpstr, 0x02); break; case 0x05://null case 0x06://undefined case 0x0D://unsupported - fprintf(stderr, "AMF: Null\n"); ++i; return AMFType(name, (double)0, data[i-1]); break; @@ -184,29 +225,20 @@ AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, s tmpi = data[i]*256+data[i+1]; tmpstr = (char*)(data+i+2); i += tmpi + 2; - fprintf(stderr, "AMF: Indice %s\n", tmpstr.c_str()); ret.addContent(parseOneAMF(data, len, i, tmpstr)); } i += 3; return ret; } break; - case 0x07://reference - case 0x08://array - case 0x0A://strict array - case 0x0B://date - case 0x0F://XML - case 0x10://typed object - case 0x11://AMF+ - default: - fprintf(stderr, "Error: Unimplemented AMF type %hhx - returning.\n", data[i]); - return AMFType("error", (unsigned char)0xFF); - break; } + fprintf(stderr, "Error: Unimplemented AMF type %hhx - returning.\n", data[i]); + return AMFType("error", (unsigned char)0xFF); }//parseOneAMF -AMFType parseAMF(unsigned char * data, unsigned int len){ +AMFType parseAMF(const unsigned char * data, unsigned int len){ AMFType ret("returned", (unsigned char)0xFF);//container type unsigned int i = 0; while (i < len){ret.addContent(parseOneAMF(data, len, i, ""));} return ret; }//parseAMF +AMFType parseAMF(std::string data){return parseAMF((const unsigned char*)data.c_str(), data.size());} diff --git a/Connector_RTMP/chunkstream.cpp b/Connector_RTMP/chunkstream.cpp index 6386622b..882febde 100644 --- a/Connector_RTMP/chunkstream.cpp +++ b/Connector_RTMP/chunkstream.cpp @@ -13,6 +13,15 @@ unsigned int snd_window_at = 0; unsigned int rec_cnt = 0; unsigned int snd_cnt = 0; +struct chunkinfo { + unsigned int timestamp; + unsigned int len; + unsigned int real_len; + unsigned int len_left; + unsigned char msg_type_id; + unsigned int msg_stream_id; +};//chunkinfo + struct chunkpack { unsigned char chunktype; unsigned int cs_id; @@ -92,6 +101,24 @@ void SendChunk(chunkpack ch){ fflush(stdout); }//SendChunk +//sends a chunk +void SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data){ + chunkpack ch; + timeval t; + gettimeofday(&t, 0); + ch.cs_id = cs_id; + ch.timestamp = t.tv_sec * 10 + t.tv_usec / 1000000; + ch.len = data.size(); + ch.real_len = data.size(); + ch.len_left = 0; + ch.msg_type_id = msg_type_id; + ch.msg_stream_id = msg_stream_id; + ch.data = (unsigned char*)malloc(data.size()); + memcpy(ch.data, data.c_str(), data.size()); + SendChunk(ch); + free(ch.data); +}//SendChunk + //sends a control message void SendCTL(unsigned char type, unsigned int data){ chunkpack ch; @@ -175,8 +202,26 @@ void SendUSR(unsigned char type, unsigned int data, unsigned int data2){ free(ch.data); }//SendUSR +//ugly global, but who cares... +std::map prevmap; + +//return previous packet of this cs_id +chunkinfo GetPrev(unsigned int cs_id){ + return prevmap[cs_id]; +}//GetPrev + +//store packet information of last packet of this cs_id +void PutPrev(chunkpack prev){ + prevmap[prev.cs_id].timestamp = prev.timestamp; + prevmap[prev.cs_id].len = prev.len; + prevmap[prev.cs_id].real_len = prev.real_len; + prevmap[prev.cs_id].len_left = prev.len_left; + prevmap[prev.cs_id].msg_type_id = prev.msg_type_id; + prevmap[prev.cs_id].msg_stream_id = prev.msg_stream_id; +}//PutPrev + //get a chunk from standard input -struct chunkpack getChunk(struct chunkpack prev){ +struct chunkpack getChunk(){ struct chunkpack ret; unsigned char temp; fread(&(ret.chunktype), 1, 1, stdin); @@ -196,9 +241,10 @@ struct chunkpack getChunk(struct chunkpack prev){ ret.cs_id = ret.chunktype & 0x3F; break; } + chunkinfo prev = GetPrev(ret.cs_id); //process the rest of the header, for each chunk type switch (ret.chunktype & 0xC0){ - case 0: + case 0x00: fread(&temp, 1, 1, stdin); ret.timestamp = temp*256*256; fread(&temp, 1, 1, stdin); @@ -223,7 +269,7 @@ struct chunkpack getChunk(struct chunkpack prev){ fread(&temp, 1, 1, stdin); ret.msg_stream_id += temp; break; - case 1: + case 0x40: fread(&temp, 1, 1, stdin); ret.timestamp = temp*256*256; fread(&temp, 1, 1, stdin); @@ -242,7 +288,7 @@ struct chunkpack getChunk(struct chunkpack prev){ ret.msg_type_id = temp; ret.msg_stream_id = prev.msg_stream_id; break; - case 2: + case 0x80: fread(&temp, 1, 1, stdin); ret.timestamp = temp*256*256; fread(&temp, 1, 1, stdin); @@ -255,7 +301,7 @@ struct chunkpack getChunk(struct chunkpack prev){ ret.msg_type_id = prev.msg_type_id; ret.msg_stream_id = prev.msg_stream_id; break; - case 3: + case 0xC0: ret.timestamp = prev.timestamp; ret.len = prev.len; ret.len_left = prev.len_left; @@ -292,6 +338,7 @@ struct chunkpack getChunk(struct chunkpack prev){ }else{ ret.data = 0; } + PutPrev(ret); return ret; }//getChunk @@ -332,16 +379,15 @@ chunkpack * AddChunkPart(chunkpack newchunk){ //grabs chunks until a whole one comes in, then returns that chunkpack getWholeChunk(){ - static chunkpack gwc_next, gwc_complete, gwc_prev; + static chunkpack gwc_next, gwc_complete; static bool clean = false; - if (!clean){gwc_prev.data = 0; clean = true;}//prevent brain damage + if (!clean){gwc_complete.data = 0; clean = true;}//prevent brain damage chunkpack * ret = 0; scrubChunk(gwc_complete); while (true){ - gwc_next = getChunk(gwc_prev); - scrubChunk(gwc_prev); - gwc_prev = gwc_next; + gwc_next = getChunk(); ret = AddChunkPart(gwc_next); + scrubChunk(gwc_next); if (ret){ gwc_complete = *ret; free(ret);//cleanup returned chunk diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index eb3a0e4f..10423ca8 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -2,13 +2,39 @@ #include #include #include + +//needed for select +#include +#include +#include +#include +#include + +bool ready4data = false;//set to true when streaming starts + #include "handshake.cpp" //handshaking #include "parsechunks.cpp" //chunkstream parsing int main(){ + fd_set pollset; + struct timeval timeout; + //0 timeout - return immediately after select call + timeout.tv_sec = 1; timeout.tv_usec = 0; + FD_ZERO(&pollset);//clear the polling set + FD_SET(0, &pollset);//add stdin to polling set + doHandshake(); while (!feof(stdin)){ - parseChunk(); + select(1, &pollset, 0, 0, &timeout); + if (FD_ISSET(0, &pollset)){ + //only try to parse a new chunk when one is available :-) + std::cerr << "Parsing..." << std::endl; + parseChunk(); + } + if (ready4data){ + //check for packets, send them if needed + std::cerr << "Sending crap..." << std::endl; + } } return 0; }//main diff --git a/Connector_RTMP/parsechunks.cpp b/Connector_RTMP/parsechunks.cpp index 649ee2d2..22e565d4 100644 --- a/Connector_RTMP/parsechunks.cpp +++ b/Connector_RTMP/parsechunks.cpp @@ -9,11 +9,6 @@ void parseChunk(){ static AMFType amfelem("empty", (unsigned char)0xFF); static int tmpint; next = getWholeChunk(); - if (next.cs_id == 2 && next.msg_stream_id == 0){ - fprintf(stderr, "Received protocol message. (cs_id 2, stream id 0)\nContents:\n"); - fwrite(next.data, 1, next.real_len, stderr); - fflush(stderr); - } switch (next.msg_type_id){ case 0://does not exist break;//happens when connection breaks unexpectedly @@ -30,11 +25,12 @@ void parseChunk(){ snd_window_at = ntohl(*(int*)next.data); //maybe better? snd_window_at = snd_cnt; break; - case 4: - fprintf(stderr, "CTRL: User control message\n"); + case 4:{ + short int ucmtype = ntohs(*(short int*)next.data); + fprintf(stderr, "CTRL: User control message %hi\n", ucmtype); //2 bytes event type, rest = event data - //TODO: process this - break; + //we don't need to process this + } break; case 5://window size of other end fprintf(stderr, "CTRL: Window size\n"); rec_window_size = ntohl(*(int*)next.data); @@ -66,10 +62,10 @@ void parseChunk(){ case 19: fprintf(stderr, "Received AFM0 shared object\n"); break; - case 20: + case 20:{//AMF0 command message amfdata = parseAMF(next.data, next.real_len); + fprintf(stderr, "Received AFM0 command message:\n"); amfdata.Print(); - fprintf(stderr, "Received AFM0 command message: %s\n", amfdata.getContentP(0)->Str()); if (amfdata.getContentP(0)->StrValue() == "connect"){ tmpint = amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} @@ -77,16 +73,38 @@ void parseChunk(){ tmpint = amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} + //send a _result reply + AMFType amfreply("container", (unsigned char)0xFF); + amfreply.addContent(AMFType("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMFType("", (double)0, 0x05));//null - properties (none?) + amfreply.addContent(AMFType(""));//info + amfreply.getContentP(3)->addContent(AMFType("level", "status")); + amfreply.getContentP(3)->addContent(AMFType("code", "NetConnection.Connect.Sucess")); + amfreply.getContentP(3)->addContent(AMFType("description", "Connection succeeded.")); + SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) SendCTL(5, rec_window_size, 1);//send peer bandwidth (msg 6) SendUSR(0, 10);//send UCM StreamBegin (0), stream 10 (we use this number) - //send AFM0 (20) {_result, 1, {properties}, {info}} - }else{ - //call, close, createStream - //TODO: play (&& play2?) - //fprintf(stderr, "Ignored AFM0 command.\n"); - } - break; + }//connect + if (amfdata.getContentP(0)->StrValue() == "createStream"){ + //send a _result reply + AMFType amfreply("container", (unsigned char)0xFF); + amfreply.addContent(AMFType("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info + amfreply.addContent(AMFType("", (double)10));//stream ID - we use 10 + SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); + }//createStream + if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ + //send a status reply + AMFType amfreply("container", (unsigned char)0xFF); + amfreply.addContent(AMFType("", "onStatus"));//status reply + amfreply.addContent(AMFType("", "NetStream.Play.Start"));//result success + SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); + ready4data = true;//start sending video data! + }//createStream + } break; case 22: fprintf(stderr, "Received aggregate message\n"); break;