From e16723c46667f1e25e43e4e5616932f25b6d30e6 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 4 Apr 2013 00:52:36 +0200 Subject: [PATCH] RTMP Analyser performance improvements. --- src/analysers/rtmp_analyser.cpp | 257 +++++++++++++++++--------------- 1 file changed, 133 insertions(+), 124 deletions(-) diff --git a/src/analysers/rtmp_analyser.cpp b/src/analysers/rtmp_analyser.cpp index 3fdf8d50..02a0c29a 100644 --- a/src/analysers/rtmp_analyser.cpp +++ b/src/analysers/rtmp_analyser.cpp @@ -43,7 +43,8 @@ namespace Analysers { } std::string inbuffer; - while (std::cin.good()){ + inbuffer.reserve(3073); + while (std::cin.good() && inbuffer.size() < 3073){ inbuffer += std::cin.get(); } //read all of std::cin to temp inbuffer.erase(0, 3073); //strip the handshake part @@ -52,139 +53,147 @@ namespace Analysers { AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); - while (next.Parse(inbuffer)){ - if (Detail & DETAIL_VERBOSE){ - fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp, - next.len, next.msg_type_id, next.msg_stream_id); - } - switch (next.msg_type_id){ - case 0: //does not exist - fprintf(stderr, "Error chunk - %i, %i, %i, %i, %i\n", next.cs_id, next.timestamp, next.real_len, next.len_left, next.msg_stream_id); - //return 0; - break; //happens when connection breaks unexpectedly - case 1: //set chunk size - RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str()); - fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max); - break; - case 2: //abort message - we ignore this one - fprintf(stderr, "CTRL: Abort message: %i\n", ntohl(*(int*)next.data.c_str())); - //4 bytes of stream id to drop - break; - case 3: //ack - RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str()); - fprintf(stderr, "CTRL: Acknowledgement: %i\n", RTMPStream::snd_window_at); - break; - case 4: { - short int ucmtype = ntohs(*(short int*)next.data.c_str()); - switch (ucmtype){ - case 0: - fprintf(stderr, "CTRL: User control message: stream begin %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - case 1: - fprintf(stderr, "CTRL: User control message: stream EOF %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - case 2: - fprintf(stderr, "CTRL: User control message: stream dry %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - case 3: - fprintf(stderr, "CTRL: User control message: setbufferlen %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - case 4: - fprintf(stderr, "CTRL: User control message: streamisrecorded %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - case 6: - fprintf(stderr, "CTRL: User control message: pingrequest %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - case 7: - fprintf(stderr, "CTRL: User control message: pingresponse %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - default: - fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2))); - break; - } + while (std::cin.good() || inbuffer.size()){ + if (next.Parse(inbuffer)){ + if (Detail & DETAIL_VERBOSE){ + fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp, + next.len, next.msg_type_id, next.msg_stream_id); } - break; - case 5: //window size of other end - RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); - RTMPStream::rec_window_at = RTMPStream::rec_cnt; - fprintf(stderr, "CTRL: Window size: %i\n", RTMPStream::rec_window_size); - break; - case 6: - RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str()); - //4 bytes window size, 1 byte limit type (ignored) - fprintf(stderr, "CTRL: Set peer bandwidth: %i\n", RTMPStream::snd_window_size); - break; - case 8: - if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){ - F.ChunkLoader(next); - if (Detail & DETAIL_EXPLICIT){ - fprintf(stderr, "Received %i bytes audio data\n", next.len); - std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl; + switch (next.msg_type_id){ + case 0: //does not exist + fprintf(stderr, "Error chunk - %i, %i, %i, %i, %i\n", next.cs_id, next.timestamp, next.real_len, next.len_left, next.msg_stream_id); + //return 0; + break; //happens when connection breaks unexpectedly + case 1: //set chunk size + RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str()); + fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max); + break; + case 2: //abort message - we ignore this one + fprintf(stderr, "CTRL: Abort message: %i\n", ntohl(*(int*)next.data.c_str())); + //4 bytes of stream id to drop + break; + case 3: //ack + RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str()); + fprintf(stderr, "CTRL: Acknowledgement: %i\n", RTMPStream::snd_window_at); + break; + case 4: { + short int ucmtype = ntohs(*(short int*)next.data.c_str()); + switch (ucmtype){ + case 0: + fprintf(stderr, "CTRL: User control message: stream begin %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + case 1: + fprintf(stderr, "CTRL: User control message: stream EOF %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + case 2: + fprintf(stderr, "CTRL: User control message: stream dry %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + case 3: + fprintf(stderr, "CTRL: User control message: setbufferlen %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + case 4: + fprintf(stderr, "CTRL: User control message: streamisrecorded %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + case 6: + fprintf(stderr, "CTRL: User control message: pingrequest %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + case 7: + fprintf(stderr, "CTRL: User control message: pingresponse %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; + default: + fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2))); + break; } + } + break; + case 5: //window size of other end + RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); + RTMPStream::rec_window_at = RTMPStream::rec_cnt; + fprintf(stderr, "CTRL: Window size: %i\n", RTMPStream::rec_window_size); + break; + case 6: + RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str()); + //4 bytes window size, 1 byte limit type (ignored) + fprintf(stderr, "CTRL: Set peer bandwidth: %i\n", RTMPStream::snd_window_size); + break; + case 8: + if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){ + F.ChunkLoader(next); + if (Detail & DETAIL_EXPLICIT){ + fprintf(stderr, "Received %i bytes audio data\n", next.len); + std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl; + } + if (Detail & DETAIL_RECONSTRUCT){ + std::cout.write(F.data, F.len); + } + } + break; + case 9: + if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){ + F.ChunkLoader(next); + if (Detail & DETAIL_EXPLICIT){ + fprintf(stderr, "Received %i bytes video data\n", next.len); + std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl; + } + if (Detail & DETAIL_RECONSTRUCT){ + std::cout.write(F.data, F.len); + } + } + break; + case 15: + fprintf(stderr, "Received AFM3 data message\n"); + break; + case 16: + fprintf(stderr, "Received AFM3 shared object\n"); + break; + case 17: { + fprintf(stderr, "Received AFM3 command message:\n"); + char soort = next.data[0]; + next.data = next.data.substr(1); + if (soort == 0){ + amfdata = AMF::parse(next.data); + std::cerr << amfdata.Print() << std::endl; + }else{ + amf3data = AMF::parse3(next.data); + amf3data.Print(); + } + } + break; + case 18: { + fprintf(stderr, "Received AFM0 data message (metadata):\n"); + amfdata = AMF::parse(next.data); + amfdata.Print(); if (Detail & DETAIL_RECONSTRUCT){ + F.ChunkLoader(next); std::cout.write(F.data, F.len); } } - break; - case 9: - if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){ - F.ChunkLoader(next); - if (Detail & DETAIL_EXPLICIT){ - fprintf(stderr, "Received %i bytes video data\n", next.len); - std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl; - } - if (Detail & DETAIL_RECONSTRUCT){ - std::cout.write(F.data, F.len); - } - } - break; - case 15: - fprintf(stderr, "Received AFM3 data message\n"); - break; - case 16: - fprintf(stderr, "Received AFM3 shared object\n"); - break; - case 17: { - fprintf(stderr, "Received AFM3 command message:\n"); - char soort = next.data[0]; - next.data = next.data.substr(1); - if (soort == 0){ + break; + case 19: + fprintf(stderr, "Received AFM0 shared object\n"); + break; + case 20: { //AMF0 command message + fprintf(stderr, "Received AFM0 command message:\n"); amfdata = AMF::parse(next.data); std::cerr << amfdata.Print() << std::endl; - }else{ - amf3data = AMF::parse3(next.data); - amf3data.Print(); } + break; + case 22: + fprintf(stderr, "Received aggregate message\n"); + break; + default: + fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n"); + return 1; + break; + } //switch for type of chunk + }else{ //if chunk parsed + if (std::cin.good()){ + inbuffer += std::cin.get(); + }else{ + inbuffer.clear(); } - break; - case 18: { - fprintf(stderr, "Received AFM0 data message (metadata):\n"); - amfdata = AMF::parse(next.data); - amfdata.Print(); - if (Detail & DETAIL_RECONSTRUCT){ - F.ChunkLoader(next); - std::cout.write(F.data, F.len); - } - } - break; - case 19: - fprintf(stderr, "Received AFM0 shared object\n"); - break; - case 20: { //AMF0 command message - fprintf(stderr, "Received AFM0 command message:\n"); - amfdata = AMF::parse(next.data); - std::cerr << amfdata.Print() << std::endl; - } - break; - case 22: - fprintf(stderr, "Received aggregate message\n"); - break; - default: - fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n"); - return 1; - break; - } //switch for type of chunk - } //while chunk parsed + } + }//while std::cin.good() fprintf(stderr, "No more readable data\n"); return 0; }