Some RTMP timestamp issues found and fixed, as well as some RTMP Parser fixes (but now segfaults? Needs more testing...)

This commit is contained in:
Thulinma 2011-08-21 18:45:59 +02:00
parent 5e7b316d21
commit dee3f65228
4 changed files with 22 additions and 19 deletions

View file

@ -372,7 +372,7 @@ void Connector_RTMP::parseChunk(){
if (!isalpha(*i) && !isdigit(*i)){streamname.erase(i);}else{*i=tolower(*i);} if (!isalpha(*i) && !isdigit(*i)){streamname.erase(i);}else{*i=tolower(*i);}
} }
streamname = "/tmp/shared_socket_" + streamname; streamname = "/tmp/shared_socket_" + streamname;
//Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply //send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply amfreply.addContent(AMF::Object("", "onStatus"));//status reply
@ -383,7 +383,7 @@ void Connector_RTMP::parseChunk(){
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting..."));
amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1)); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4 #if DEBUG >= 4
amfreply.Print(); amfreply.Print();
#endif #endif
@ -397,7 +397,7 @@ void Connector_RTMP::parseChunk(){
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1)); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4 #if DEBUG >= 4
amfreply.Print(); amfreply.Print();
#endif #endif
@ -528,7 +528,7 @@ void Connector_RTMP::parseChunk(){
if (!isalpha(*i) && !isdigit(*i)){streamname.erase(i);}else{*i=tolower(*i);} if (!isalpha(*i) && !isdigit(*i)){streamname.erase(i);}else{*i=tolower(*i);}
} }
streamname = "/tmp/shared_socket_" + streamname; streamname = "/tmp/shared_socket_" + streamname;
//Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply //send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply amfreply.addContent(AMF::Object("", "onStatus"));//status reply

View file

@ -55,7 +55,7 @@ int main(int argc, char ** argv){
while (next.Parse(inbuffer)){ while (next.Parse(inbuffer)){
if ((Detail & DETAIL_VERBOSE) == DETAIL_VERBOSE){ if ((Detail & DETAIL_VERBOSE) == DETAIL_VERBOSE){
fprintf(stderr, "Chunk info: CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.cs_id, next.timestamp, next.len, next.msg_type_id, next.msg_stream_id); fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp, next.len, next.msg_type_id, next.msg_stream_id);
} }
switch (next.msg_type_id){ switch (next.msg_type_id){
case 0://does not exist case 0://does not exist
@ -78,28 +78,28 @@ int main(int argc, char ** argv){
short int ucmtype = ntohs(*(short int*)next.data.c_str()); short int ucmtype = ntohs(*(short int*)next.data.c_str());
switch (ucmtype){ switch (ucmtype){
case 0: case 0:
fprintf(stderr, "CTRL: User control message: stream begin %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: stream begin %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
case 1: case 1:
fprintf(stderr, "CTRL: User control message: stream EOF %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: stream EOF %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
case 2: case 2:
fprintf(stderr, "CTRL: User control message: stream dry %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: stream dry %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
case 3: case 3:
fprintf(stderr, "CTRL: User control message: setbufferlen %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: setbufferlen %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
case 4: case 4:
fprintf(stderr, "CTRL: User control message: streamisrecorded %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: streamisrecorded %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
case 6: case 6:
fprintf(stderr, "CTRL: User control message: pingrequest %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: pingrequest %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
case 7: case 7:
fprintf(stderr, "CTRL: User control message: pingresponse %i\n", ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: pingresponse %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
default: default:
fprintf(stderr, "CTRL: User control message: UNKNOWN %hi - %i\n", ucmtype, ntohl(*(int*)next.data.c_str()+2)); fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
} }
} break; } break;

View file

@ -42,6 +42,7 @@ std::string RTMPStream::Chunk::Pack(){
unsigned int tmpi; unsigned int tmpi;
unsigned char chtype = 0x00; unsigned char chtype = 0x00;
timestamp -= firsttime; timestamp -= firsttime;
if (timestamp < prev.timestamp){timestamp = prev.timestamp;}
if ((prev.msg_type_id > 0) && (prev.cs_id == cs_id)){ if ((prev.msg_type_id > 0) && (prev.cs_id == cs_id)){
if (msg_stream_id == prev.msg_stream_id){ if (msg_stream_id == prev.msg_stream_id){
chtype = 0x40;//do not send msg_stream_id chtype = 0x40;//do not send msg_stream_id
@ -215,7 +216,7 @@ std::string RTMPStream::SendUSR(unsigned char type, unsigned int data){
ch.msg_type_id = 4; ch.msg_type_id = 4;
ch.msg_stream_id = 0; ch.msg_stream_id = 0;
ch.data.resize(6); ch.data.resize(6);
*(unsigned int*)((char*)ch.data.c_str()+2) = htonl(data); *(unsigned int*)(((char*)ch.data.c_str())+2) = htonl(data);
ch.data[0] = 0; ch.data[0] = 0;
ch.data[1] = type; ch.data[1] = type;
return ch.Pack(); return ch.Pack();
@ -232,8 +233,8 @@ std::string RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigned
ch.msg_type_id = 4; ch.msg_type_id = 4;
ch.msg_stream_id = 0; ch.msg_stream_id = 0;
ch.data.resize(10); ch.data.resize(10);
*(unsigned int*)((char*)ch.data.c_str()+2) = htonl(data); *(unsigned int*)(((char*)ch.data.c_str())+2) = htonl(data);
*(unsigned int*)((char*)ch.data.c_str()+6) = htonl(data2); *(unsigned int*)(((char*)ch.data.c_str())+6) = htonl(data2);
ch.data[0] = 0; ch.data[0] = 0;
ch.data[1] = type; ch.data[1] = type;
return ch.Pack(); return ch.Pack();
@ -274,7 +275,8 @@ bool RTMPStream::Chunk::Parse(std::string & indata){
RTMPStream::Chunk prev = lastrecv[cs_id]; RTMPStream::Chunk prev = lastrecv[cs_id];
//process the rest of the header, for each chunk type //process the rest of the header, for each chunk type
switch (chunktype & 0xC0){ headertype = chunktype & 0xC0;
switch (headertype){
case 0x00: case 0x00:
if (indata.size() < i+11) return false; //can't read whole header if (indata.size() < i+11) return false; //can't read whole header
timestamp = indata[i++]*256*256; timestamp = indata[i++]*256*256;
@ -296,7 +298,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){
timestamp = indata[i++]*256*256; timestamp = indata[i++]*256*256;
timestamp += indata[i++]*256; timestamp += indata[i++]*256;
timestamp += indata[i++]; timestamp += indata[i++];
timestamp += prev.timestamp; if (timestamp != 0x00ffffff){timestamp += prev.timestamp;}
len = indata[i++]*256*256; len = indata[i++]*256*256;
len += indata[i++]*256; len += indata[i++]*256;
len += indata[i++]; len += indata[i++];
@ -310,7 +312,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){
timestamp = indata[i++]*256*256; timestamp = indata[i++]*256*256;
timestamp += indata[i++]*256; timestamp += indata[i++]*256;
timestamp += indata[i++]; timestamp += indata[i++];
timestamp += prev.timestamp; if (timestamp != 0x00ffffff){timestamp += prev.timestamp;}
len = prev.len; len = prev.len;
len_left = prev.len_left; len_left = prev.len_left;
msg_type_id = prev.msg_type_id; msg_type_id = prev.msg_type_id;

View file

@ -30,6 +30,7 @@ namespace RTMPStream{
/// Holds a single RTMP chunk, either send or receive direction. /// Holds a single RTMP chunk, either send or receive direction.
class Chunk{ class Chunk{
public: public:
unsigned char headertype; ///< For input chunks, the type of header. This is calculated automatically for output chunks.
unsigned int cs_id; ///< ContentStream ID unsigned int cs_id; ///< ContentStream ID
unsigned int timestamp; ///< Timestamp of this chunk. unsigned int timestamp; ///< Timestamp of this chunk.
unsigned int len; ///< Length of the complete chunk. unsigned int len; ///< Length of the complete chunk.