werkende negotiation, klaar om daadwerklijk video/audio te versturen\!

This commit is contained in:
Thulinma 2010-07-29 23:08:05 +02:00
parent 4f0f71716f
commit f4c02f33d8
4 changed files with 174 additions and 52 deletions

View file

@ -84,7 +84,7 @@ class AMFType {
}
}
return *this;
};
};//= operator
AMFType(const AMFType &a){
myIndice = a.myIndice;
myType = a.myType;
@ -96,7 +96,7 @@ class AMFType {
contents->push_back(*it);
}
}else{contents = 0;}
};
};//copy constructor
void Print(std::string indent = ""){
std::cerr << indent;
switch (myType){
@ -119,7 +119,54 @@ class AMFType {
if (contents){
for (std::vector<AMFType>::iterator it = contents->begin(); it != contents->end(); it++){it->Print(indent+" ");}
}
};
};//print
std::string Pack(){
std::string r = "";
if ((myType == 0x02) && (strval.size() > 0xFFFF)){myType = 0x0C;}
if (myType != 0xFF){r += myType;}
switch (myType){
case 0x00://number
r += *(((char*)&numval)+7); r += *(((char*)&numval)+6);
r += *(((char*)&numval)+5); r += *(((char*)&numval)+4);
r += *(((char*)&numval)+3); r += *(((char*)&numval)+2);
r += *(((char*)&numval)+1); r += *(((char*)&numval));
break;
case 0x01://bool
r += (char)numval;
break;
case 0x02://short string
r += strval.size() / 256;
r += strval.size() % 256;
r += strval;
break;
case 0x0C://long string
r += strval.size() / (256*256*256);
r += strval.size() / (256*256);
r += strval.size() / 256;
r += strval.size() % 256;
r += strval;
break;
case 0x03://object
if (contents){
for (std::vector<AMFType>::iterator it = contents->begin(); it != contents->end(); it++){
r += it->Indice().size() / 256;
r += it->Indice().size() % 256;
r += it->Indice();
r += it->Pack();
}
}
r += (char)0; r += (char)0; r += (char)9;
break;
case 0xFF://container - our own type - do not send, only send contents
if (contents){
for (std::vector<AMFType>::iterator it = contents->begin(); it != contents->end(); it++){
r += it->Pack();
}
}
break;
}
return r;
};//pack
protected:
std::string myIndice;
unsigned char myType;
@ -128,7 +175,7 @@ class AMFType {
std::vector<AMFType> * contents;
};//AMFType
AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){
AMFType parseOneAMF(const unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){
std::string tmpstr;
unsigned int tmpi = 0;
unsigned char tmpdbl[8];
@ -143,16 +190,13 @@ AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, s
tmpdbl[1] = data[i+7];
tmpdbl[0] = data[i+8];
i+=9;
fprintf(stderr, "AMF: Number %f\n", *(double*)tmpdbl);
return AMFType(name, *(double*)tmpdbl, 0x00);
break;
case 0x01://bool
i+=2;
if (data[i-1] == 0){
fprintf(stderr, "AMF: Bool false\n");
return AMFType(name, (double)0, 0x01);
}else{
fprintf(stderr, "AMF: Bool true\n");
return AMFType(name, (double)1, 0x01);
}
break;
@ -160,20 +204,17 @@ AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, s
tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4];
tmpstr = (char*)(data+i+5);
i += tmpi + 5;
fprintf(stderr, "AMF: String %s\n", tmpstr.c_str());
return AMFType(name, tmpstr, 0x0C);
break;
case 0x02://string
tmpi = data[i+1]*256+data[i+2];
tmpstr = (char*)(data+i+3);
i += tmpi + 3;
fprintf(stderr, "AMF: String %s\n", tmpstr.c_str());
return AMFType(name, tmpstr, 0x02);
break;
case 0x05://null
case 0x06://undefined
case 0x0D://unsupported
fprintf(stderr, "AMF: Null\n");
++i;
return AMFType(name, (double)0, data[i-1]);
break;
@ -184,29 +225,20 @@ AMFType parseOneAMF(unsigned char *& data, unsigned int &len, unsigned int &i, s
tmpi = data[i]*256+data[i+1];
tmpstr = (char*)(data+i+2);
i += tmpi + 2;
fprintf(stderr, "AMF: Indice %s\n", tmpstr.c_str());
ret.addContent(parseOneAMF(data, len, i, tmpstr));
}
i += 3;
return ret;
} break;
case 0x07://reference
case 0x08://array
case 0x0A://strict array
case 0x0B://date
case 0x0F://XML
case 0x10://typed object
case 0x11://AMF+
default:
fprintf(stderr, "Error: Unimplemented AMF type %hhx - returning.\n", data[i]);
return AMFType("error", (unsigned char)0xFF);
break;
}
fprintf(stderr, "Error: Unimplemented AMF type %hhx - returning.\n", data[i]);
return AMFType("error", (unsigned char)0xFF);
}//parseOneAMF
AMFType parseAMF(unsigned char * data, unsigned int len){
AMFType parseAMF(const unsigned char * data, unsigned int len){
AMFType ret("returned", (unsigned char)0xFF);//container type
unsigned int i = 0;
while (i < len){ret.addContent(parseOneAMF(data, len, i, ""));}
return ret;
}//parseAMF
AMFType parseAMF(std::string data){return parseAMF((const unsigned char*)data.c_str(), data.size());}

View file

@ -13,6 +13,15 @@ unsigned int snd_window_at = 0;
unsigned int rec_cnt = 0;
unsigned int snd_cnt = 0;
struct chunkinfo {
unsigned int timestamp;
unsigned int len;
unsigned int real_len;
unsigned int len_left;
unsigned char msg_type_id;
unsigned int msg_stream_id;
};//chunkinfo
struct chunkpack {
unsigned char chunktype;
unsigned int cs_id;
@ -92,6 +101,24 @@ void SendChunk(chunkpack ch){
fflush(stdout);
}//SendChunk
//sends a chunk
void SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data){
chunkpack ch;
timeval t;
gettimeofday(&t, 0);
ch.cs_id = cs_id;
ch.timestamp = t.tv_sec * 10 + t.tv_usec / 1000000;
ch.len = data.size();
ch.real_len = data.size();
ch.len_left = 0;
ch.msg_type_id = msg_type_id;
ch.msg_stream_id = msg_stream_id;
ch.data = (unsigned char*)malloc(data.size());
memcpy(ch.data, data.c_str(), data.size());
SendChunk(ch);
free(ch.data);
}//SendChunk
//sends a control message
void SendCTL(unsigned char type, unsigned int data){
chunkpack ch;
@ -175,8 +202,26 @@ void SendUSR(unsigned char type, unsigned int data, unsigned int data2){
free(ch.data);
}//SendUSR
//ugly global, but who cares...
std::map<unsigned int, chunkinfo> prevmap;
//return previous packet of this cs_id
chunkinfo GetPrev(unsigned int cs_id){
return prevmap[cs_id];
}//GetPrev
//store packet information of last packet of this cs_id
void PutPrev(chunkpack prev){
prevmap[prev.cs_id].timestamp = prev.timestamp;
prevmap[prev.cs_id].len = prev.len;
prevmap[prev.cs_id].real_len = prev.real_len;
prevmap[prev.cs_id].len_left = prev.len_left;
prevmap[prev.cs_id].msg_type_id = prev.msg_type_id;
prevmap[prev.cs_id].msg_stream_id = prev.msg_stream_id;
}//PutPrev
//get a chunk from standard input
struct chunkpack getChunk(struct chunkpack prev){
struct chunkpack getChunk(){
struct chunkpack ret;
unsigned char temp;
fread(&(ret.chunktype), 1, 1, stdin);
@ -196,9 +241,10 @@ struct chunkpack getChunk(struct chunkpack prev){
ret.cs_id = ret.chunktype & 0x3F;
break;
}
chunkinfo prev = GetPrev(ret.cs_id);
//process the rest of the header, for each chunk type
switch (ret.chunktype & 0xC0){
case 0:
case 0x00:
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
@ -223,7 +269,7 @@ struct chunkpack getChunk(struct chunkpack prev){
fread(&temp, 1, 1, stdin);
ret.msg_stream_id += temp;
break;
case 1:
case 0x40:
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
@ -242,7 +288,7 @@ struct chunkpack getChunk(struct chunkpack prev){
ret.msg_type_id = temp;
ret.msg_stream_id = prev.msg_stream_id;
break;
case 2:
case 0x80:
fread(&temp, 1, 1, stdin);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
@ -255,7 +301,7 @@ struct chunkpack getChunk(struct chunkpack prev){
ret.msg_type_id = prev.msg_type_id;
ret.msg_stream_id = prev.msg_stream_id;
break;
case 3:
case 0xC0:
ret.timestamp = prev.timestamp;
ret.len = prev.len;
ret.len_left = prev.len_left;
@ -292,6 +338,7 @@ struct chunkpack getChunk(struct chunkpack prev){
}else{
ret.data = 0;
}
PutPrev(ret);
return ret;
}//getChunk
@ -332,16 +379,15 @@ chunkpack * AddChunkPart(chunkpack newchunk){
//grabs chunks until a whole one comes in, then returns that
chunkpack getWholeChunk(){
static chunkpack gwc_next, gwc_complete, gwc_prev;
static chunkpack gwc_next, gwc_complete;
static bool clean = false;
if (!clean){gwc_prev.data = 0; clean = true;}//prevent brain damage
if (!clean){gwc_complete.data = 0; clean = true;}//prevent brain damage
chunkpack * ret = 0;
scrubChunk(gwc_complete);
while (true){
gwc_next = getChunk(gwc_prev);
scrubChunk(gwc_prev);
gwc_prev = gwc_next;
gwc_next = getChunk();
ret = AddChunkPart(gwc_next);
scrubChunk(gwc_next);
if (ret){
gwc_complete = *ret;
free(ret);//cleanup returned chunk

View file

@ -2,13 +2,39 @@
#include <cstdlib>
#include <cstdio>
#include <cmath>
//needed for select
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
bool ready4data = false;//set to true when streaming starts
#include "handshake.cpp" //handshaking
#include "parsechunks.cpp" //chunkstream parsing
int main(){
fd_set pollset;
struct timeval timeout;
//0 timeout - return immediately after select call
timeout.tv_sec = 1; timeout.tv_usec = 0;
FD_ZERO(&pollset);//clear the polling set
FD_SET(0, &pollset);//add stdin to polling set
doHandshake();
while (!feof(stdin)){
parseChunk();
select(1, &pollset, 0, 0, &timeout);
if (FD_ISSET(0, &pollset)){
//only try to parse a new chunk when one is available :-)
std::cerr << "Parsing..." << std::endl;
parseChunk();
}
if (ready4data){
//check for packets, send them if needed
std::cerr << "Sending crap..." << std::endl;
}
}
return 0;
}//main

View file

@ -9,11 +9,6 @@ void parseChunk(){
static AMFType amfelem("empty", (unsigned char)0xFF);
static int tmpint;
next = getWholeChunk();
if (next.cs_id == 2 && next.msg_stream_id == 0){
fprintf(stderr, "Received protocol message. (cs_id 2, stream id 0)\nContents:\n");
fwrite(next.data, 1, next.real_len, stderr);
fflush(stderr);
}
switch (next.msg_type_id){
case 0://does not exist
break;//happens when connection breaks unexpectedly
@ -30,11 +25,12 @@ void parseChunk(){
snd_window_at = ntohl(*(int*)next.data);
//maybe better? snd_window_at = snd_cnt;
break;
case 4:
fprintf(stderr, "CTRL: User control message\n");
case 4:{
short int ucmtype = ntohs(*(short int*)next.data);
fprintf(stderr, "CTRL: User control message %hi\n", ucmtype);
//2 bytes event type, rest = event data
//TODO: process this
break;
//we don't need to process this
} break;
case 5://window size of other end
fprintf(stderr, "CTRL: Window size\n");
rec_window_size = ntohl(*(int*)next.data);
@ -66,10 +62,10 @@ void parseChunk(){
case 19:
fprintf(stderr, "Received AFM0 shared object\n");
break;
case 20:
case 20:{//AMF0 command message
amfdata = parseAMF(next.data, next.real_len);
fprintf(stderr, "Received AFM0 command message:\n");
amfdata.Print();
fprintf(stderr, "Received AFM0 command message: %s\n", amfdata.getContentP(0)->Str());
if (amfdata.getContentP(0)->StrValue() == "connect"){
tmpint = amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
@ -77,16 +73,38 @@ void parseChunk(){
tmpint = amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
//send a _result reply
AMFType amfreply("container", (unsigned char)0xFF);
amfreply.addContent(AMFType("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMFType("", (double)0, 0x05));//null - properties (none?)
amfreply.addContent(AMFType(""));//info
amfreply.getContentP(3)->addContent(AMFType("level", "status"));
amfreply.getContentP(3)->addContent(AMFType("code", "NetConnection.Connect.Sucess"));
amfreply.getContentP(3)->addContent(AMFType("description", "Connection succeeded."));
SendChunk(3, 20, next.msg_stream_id, amfreply.Pack());
SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5)
SendCTL(5, rec_window_size, 1);//send peer bandwidth (msg 6)
SendUSR(0, 10);//send UCM StreamBegin (0), stream 10 (we use this number)
//send AFM0 (20) {_result, 1, {properties}, {info}}
}else{
//call, close, createStream
//TODO: play (&& play2?)
//fprintf(stderr, "Ignored AFM0 command.\n");
}
break;
}//connect
if (amfdata.getContentP(0)->StrValue() == "createStream"){
//send a _result reply
AMFType amfreply("container", (unsigned char)0xFF);
amfreply.addContent(AMFType("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info
amfreply.addContent(AMFType("", (double)10));//stream ID - we use 10
SendChunk(3, 20, next.msg_stream_id, amfreply.Pack());
}//createStream
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
//send a status reply
AMFType amfreply("container", (unsigned char)0xFF);
amfreply.addContent(AMFType("", "onStatus"));//status reply
amfreply.addContent(AMFType("", "NetStream.Play.Start"));//result success
SendChunk(3, 20, next.msg_stream_id, amfreply.Pack());
ready4data = true;//start sending video data!
}//createStream
} break;
case 22:
fprintf(stderr, "Received aggregate message\n");
break;