Fixed handling of type 3 RTMP packets with extended timestamps. Removed redundant code.

This commit is contained in:
Thulinma 2015-03-24 21:59:52 +01:00
parent 49f9b44add
commit dffc0f9539
2 changed files with 28 additions and 162 deletions

View file

@ -121,6 +121,7 @@ std::string & RTMPStream::Chunk::Pack() {
ntime = tmpi; ntime = tmpi;
tmpi = 0x00ffffff; tmpi = 0x00ffffff;
} }
ts_header = tmpi;
output += (unsigned char)((tmpi >> 16) & 0xff); output += (unsigned char)((tmpi >> 16) & 0xff);
output += (unsigned char)((tmpi >> 8) & 0xff); output += (unsigned char)((tmpi >> 8) & 0xff);
output += (unsigned char)(tmpi & 0xff); output += (unsigned char)(tmpi & 0xff);
@ -140,6 +141,11 @@ std::string & RTMPStream::Chunk::Pack() {
output += (unsigned char)(msg_stream_id / (256 * 256 * 256)); output += (unsigned char)(msg_stream_id / (256 * 256 * 256));
} }
} }
}else{
ts_header = prev.ts_header;
if (ts_header == 0xffffff){
ntime = timestamp;
}
} }
//support for 0x00ffffff timestamps //support for 0x00ffffff timestamps
if (ntime) { if (ntime) {
@ -169,6 +175,12 @@ std::string & RTMPStream::Chunk::Pack() {
output += (unsigned char)((cs_id - 64) / 256); output += (unsigned char)((cs_id - 64) / 256);
} }
} }
if (ntime) {
output += (unsigned char)(ntime & 0xff);
output += (unsigned char)((ntime >> 8) & 0xff);
output += (unsigned char)((ntime >> 16) & 0xff);
output += (unsigned char)((ntime >> 24) & 0xff);
}
} }
} }
lastsend[cs_id] = *this; lastsend[cs_id] = *this;
@ -312,162 +324,6 @@ std::string & RTMPStream::SendUSR(unsigned char type, unsigned int data, unsigne
return ch.Pack(); return ch.Pack();
} //SendUSR } //SendUSR
/// Parses the argument string into the current chunk.
/// Tries to read a whole chunk, removing data from the input string as it reads.
/// If only part of a chunk is read, it will remove the part and call itself again.
/// This has the effect of only causing a "true" reponse in the case a *whole* chunk
/// is read, not just part of a chunk.
/// \param indata The input string to parse and update.
/// \warning This function will destroy the current data in this chunk!
/// \returns True if a whole chunk could be read, false otherwise.
bool RTMPStream::Chunk::Parse(std::string & indata) {
gettimeofday(&RTMPStream::lastrec, 0);
unsigned int i = 0;
if (indata.size() < 1) return false; //need at least a byte
unsigned char chunktype = indata[i++ ];
//read the chunkstream ID properly
switch (chunktype & 0x3F) {
case 0:
if (indata.size() < 2) return false; //need at least 2 bytes to continue
cs_id = indata[i++ ] + 64;
break;
case 1:
if (indata.size() < 3) return false; //need at least 3 bytes to continue
cs_id = indata[i++ ] + 64;
cs_id += indata[i++ ] * 256;
break;
default:
cs_id = chunktype & 0x3F;
break;
}
bool allow_short = lastrecv.count(cs_id);
RTMPStream::Chunk prev = lastrecv[cs_id];
//process the rest of the header, for each chunk type
headertype = chunktype & 0xC0;
DEBUG_MSG(DLVL_DONTEVEN, "Parsing RTMP chunk header (%#.2hhX) at offset %#X", chunktype, RTMPStream::rec_cnt);
switch (headertype) {
case 0x00:
if (indata.size() < i + 11) return false; //can't read whole header
timestamp = indata[i++ ] * 256 * 256;
timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ];
ts_delta = timestamp;
len = indata[i++ ] * 256 * 256;
len += indata[i++ ] * 256;
len += indata[i++ ];
len_left = 0;
msg_type_id = indata[i++ ];
msg_stream_id = indata[i++ ];
msg_stream_id += indata[i++ ] * 256;
msg_stream_id += indata[i++ ] * 256 * 256;
msg_stream_id += indata[i++ ] * 256 * 256 * 256;
break;
case 0x40:
if (indata.size() < i + 7) return false; //can't read whole header
if (!allow_short) {
DEBUG_MSG(DLVL_WARN, "Warning: Header type 0x40 with no valid previous chunk!");
}
timestamp = indata[i++ ] * 256 * 256;
timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ];
if (timestamp != 0x00ffffff) {
ts_delta = timestamp;
timestamp = prev.timestamp + ts_delta;
}
len = indata[i++ ] * 256 * 256;
len += indata[i++ ] * 256;
len += indata[i++ ];
len_left = 0;
msg_type_id = indata[i++ ];
msg_stream_id = prev.msg_stream_id;
break;
case 0x80:
if (indata.size() < i + 3) return false; //can't read whole header
if (!allow_short) {
DEBUG_MSG(DLVL_WARN, "Warning: Header type 0x80 with no valid previous chunk!");
}
timestamp = indata[i++ ] * 256 * 256;
timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ];
if (timestamp != 0x00ffffff) {
ts_delta = timestamp;
timestamp = prev.timestamp + ts_delta;
}
len = prev.len;
len_left = prev.len_left;
msg_type_id = prev.msg_type_id;
msg_stream_id = prev.msg_stream_id;
break;
case 0xC0:
if (!allow_short) {
DEBUG_MSG(DLVL_WARN, "Warning: Header type 0xC0 with no valid previous chunk!");
}
timestamp = prev.timestamp + prev.ts_delta;
ts_delta = prev.ts_delta;
len = prev.len;
len_left = prev.len_left;
if (len_left > 0){
timestamp = prev.timestamp;
}
msg_type_id = prev.msg_type_id;
msg_stream_id = prev.msg_stream_id;
break;
}
//calculate chunk length, real length, and length left till complete
if (len_left > 0) {
real_len = len_left;
len_left -= real_len;
} else {
real_len = len;
}
if (real_len > RTMPStream::chunk_rec_max) {
len_left += real_len - RTMPStream::chunk_rec_max;
real_len = RTMPStream::chunk_rec_max;
}
DEBUG_MSG(DLVL_DONTEVEN, "Parsing RTMP chunk result: len_left=%d, real_len=%d", len_left, real_len);
//read extended timestamp, if neccesary
if (timestamp == 0x00ffffff) {
if (indata.size() < i + 4) return false; //can't read whole header
timestamp = indata[i++ ] * 256 * 256 * 256;
timestamp += indata[i++ ] * 256 * 256;
timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ];
ts_delta = timestamp;
}
//read data if length > 0, and allocate it
if (real_len > 0) {
if (prev.len_left > 0) {
data = prev.data;
} else {
data = "";
}
if (indata.size() < i + real_len) return false; //can't read all data (yet)
data.append(indata, i, real_len);
indata = indata.substr(i + real_len);
lastrecv[cs_id] = *this;
RTMPStream::rec_cnt += i + real_len;
if (len_left == 0) {
return true;
} else {
return Parse(indata);
}
} else {
data = "";
indata = indata.substr(i + real_len);
lastrecv[cs_id] = *this;
RTMPStream::rec_cnt += i + real_len;
return true;
}
} //Parse
/// Parses the argument Socket::Buffer into the current chunk. /// Parses the argument Socket::Buffer into the current chunk.
/// Tries to read a whole chunk, removing data from the Buffer as it reads. /// Tries to read a whole chunk, removing data from the Buffer as it reads.
/// If a single packet contains a partial chunk, it will remove the packet and /// If a single packet contains a partial chunk, it will remove the packet and
@ -499,10 +355,14 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
break; break;
} }
bool allow_short = lastrecv.count(cs_id);
RTMPStream::Chunk prev = lastrecv[cs_id]; RTMPStream::Chunk prev = lastrecv[cs_id];
//process the rest of the header, for each chunk type //process the rest of the header, for each chunk type
headertype = chunktype & 0xC0; headertype = chunktype & 0xC0;
DEBUG_MSG(DLVL_DONTEVEN, "Parsing RTMP chunk header (%#.2hhX) at offset %#X", chunktype, RTMPStream::rec_cnt);
switch (headertype) { switch (headertype) {
case 0x00: case 0x00:
if (!buffer.available(i + 11)) { if (!buffer.available(i + 11)) {
@ -513,6 +373,7 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
timestamp += indata[i++ ] * 256; timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ]; timestamp += indata[i++ ];
ts_delta = timestamp; ts_delta = timestamp;
ts_header = timestamp;
len = indata[i++ ] * 256 * 256; len = indata[i++ ] * 256 * 256;
len += indata[i++ ] * 256; len += indata[i++ ] * 256;
len += indata[i++ ]; len += indata[i++ ];
@ -528,12 +389,13 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
return false; return false;
} //can't read whole header } //can't read whole header
indata = buffer.copy(i + 7); indata = buffer.copy(i + 7);
if (prev.msg_type_id == 0) { if (!allow_short) {
DEBUG_MSG(DLVL_WARN, "Warning: Header type 0x40 with no valid previous chunk!"); DEBUG_MSG(DLVL_WARN, "Warning: Header type 0x40 with no valid previous chunk!");
} }
timestamp = indata[i++ ] * 256 * 256; timestamp = indata[i++ ] * 256 * 256;
timestamp += indata[i++ ] * 256; timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ]; timestamp += indata[i++ ];
ts_header = timestamp;
if (timestamp != 0x00ffffff) { if (timestamp != 0x00ffffff) {
ts_delta = timestamp; ts_delta = timestamp;
timestamp = prev.timestamp + ts_delta; timestamp = prev.timestamp + ts_delta;
@ -550,12 +412,13 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
return false; return false;
} //can't read whole header } //can't read whole header
indata = buffer.copy(i + 3); indata = buffer.copy(i + 3);
if (prev.msg_type_id == 0) { if (!allow_short) {
DEBUG_MSG(DLVL_WARN, "Warning: Header type 0x80 with no valid previous chunk!"); DEBUG_MSG(DLVL_WARN, "Warning: Header type 0x80 with no valid previous chunk!");
} }
timestamp = indata[i++ ] * 256 * 256; timestamp = indata[i++ ] * 256 * 256;
timestamp += indata[i++ ] * 256; timestamp += indata[i++ ] * 256;
timestamp += indata[i++ ]; timestamp += indata[i++ ];
ts_header = timestamp;
if (timestamp != 0x00ffffff) { if (timestamp != 0x00ffffff) {
ts_delta = timestamp; ts_delta = timestamp;
timestamp = prev.timestamp + ts_delta; timestamp = prev.timestamp + ts_delta;
@ -566,10 +429,11 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
msg_stream_id = prev.msg_stream_id; msg_stream_id = prev.msg_stream_id;
break; break;
case 0xC0: case 0xC0:
if (prev.msg_type_id == 0) { if (!allow_short) {
DEBUG_MSG(DLVL_WARN, "Warning: Header type 0xC0 with no valid previous chunk!"); DEBUG_MSG(DLVL_WARN, "Warning: Header type 0xC0 with no valid previous chunk!");
} }
timestamp = prev.timestamp + prev.ts_delta; timestamp = prev.timestamp + prev.ts_delta;
ts_header = prev.ts_header;
ts_delta = prev.ts_delta; ts_delta = prev.ts_delta;
len = prev.len; len = prev.len;
len_left = prev.len_left; len_left = prev.len_left;
@ -591,8 +455,11 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
len_left += real_len - RTMPStream::chunk_rec_max; len_left += real_len - RTMPStream::chunk_rec_max;
real_len = RTMPStream::chunk_rec_max; real_len = RTMPStream::chunk_rec_max;
} }
DEBUG_MSG(DLVL_DONTEVEN, "Parsing RTMP chunk result: len_left=%d, real_len=%d", len_left, real_len);
//read extended timestamp, if neccesary //read extended timestamp, if neccesary
if (timestamp == 0x00ffffff) { if (ts_header == 0x00ffffff) {
if (!buffer.available(i + 4)) { if (!buffer.available(i + 4)) {
return false; return false;
} //can't read timestamp } //can't read timestamp
@ -632,7 +499,6 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) {
} }
} //Parse } //Parse
#include <iomanip>
/// Does the handshake. Expects handshake_in to be filled, and fills handshake_out. /// Does the handshake. Expects handshake_in to be filled, and fills handshake_out.
/// After calling this function, don't forget to read and ignore 1536 extra bytes, /// After calling this function, don't forget to read and ignore 1536 extra bytes,
/// these are the handshake response and not interesting for us because we don't do client /// these are the handshake response and not interesting for us because we don't do client

View file

@ -36,6 +36,7 @@ namespace RTMPStream {
unsigned int cs_id; ///< ContentStream ID unsigned int cs_id; ///< ContentStream ID
unsigned int timestamp; ///< Timestamp of this chunk. unsigned int timestamp; ///< Timestamp of this chunk.
unsigned int ts_delta; ///< Last timestamp delta. unsigned int ts_delta; ///< Last timestamp delta.
unsigned int ts_header; ///< Last header timestamp without extensions or deltas.
unsigned int len; ///< Length of the complete chunk. unsigned int len; ///< Length of the complete chunk.
unsigned int real_len; ///< Length of this particular part of it. unsigned int real_len; ///< Length of this particular part of it.
unsigned int len_left; ///< Length not yet received, out of complete chunk. unsigned int len_left; ///< Length not yet received, out of complete chunk.
@ -44,7 +45,6 @@ namespace RTMPStream {
std::string data; ///< Payload of chunk. std::string data; ///< Payload of chunk.
Chunk(); Chunk();
bool Parse(std::string & data);
bool Parse(Socket::Buffer & data); bool Parse(Socket::Buffer & data);
std::string & Pack(); std::string & Pack();
}; };