Added extra doxygen, refactored all analysers into a separate namespace.

This commit is contained in:
Erik Zandvliet 2013-03-30 15:05:43 +01:00
parent ea646f6354
commit 431f453586
9 changed files with 411 additions and 329 deletions

View file

@ -9,18 +9,32 @@
#include <mist/amf.h> #include <mist/amf.h>
#include <mist/config.h> #include <mist/config.h>
/// Debugging tool for AMF data. ///\brief Holds everything unique to the analysers.
/// Expects AMF data through stdin, outputs human-readable information to stderr. namespace Analysers {
///\brief Debugging tool for AMF data.
///
/// Expects AMF data through stdin, outputs human-readable information to stderr.
///\return The return code of the analyser.
int analyseAMF(){
std::string amfBuffer;
//Read all of std::cin to amfBuffer
while (std::cin.good()){
amfBuffer += std::cin.get();
}
//Strip the invalid last character
amfBuffer.erase((amfBuffer.end() - 1));
//Parse into an AMF::Object
AMF::Object amfData = AMF::parse(amfBuffer);
//Print the output.
std::cerr << amfData.Print() << std::endl;
return 0;
}
}
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
std::string temp;
while (std::cin.good()){ return Analysers::analyseAMF();
temp += std::cin.get();
} //read all of std::cin to temp
temp.erase(temp.size() - 1, 1); //strip the invalid last character
AMF::Object amfdata = AMF::parse(temp); //parse temp into an AMF::Object
std::cerr << amfdata.Print() << std::endl; //pretty-print the object
return 0;
} }

View file

@ -3,115 +3,128 @@
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <mist/dtsc.h> #include <mist/dtsc.h>
#include <mist/json.h> #include <mist/json.h>
#include <mist/config.h> #include <mist/config.h>
///\brief Holds everything unique to the analysers.
namespace Analysers {
///\brief Debugging tool for DTSC data.
///
/// Expects DTSC data in a file given on the command line, outputs human-readable information to stderr.
///\param conf The configuration parsed from the commandline.
///\return The return code of the analyser.
int analyseDTSC(Util::Config conf){
DTSC::File F(conf.getString("filename"));
JSON::Value meta = F.getMeta();
std::cout << meta.toPrettyString() << std::endl;
JSON::Value pack;
long long unsigned int firstpack = 0;
long long unsigned int nowpack = 0;
long long unsigned int lastaudio = 0;
long long unsigned int lastvideo = 0;
long long unsigned int lastkey = 0;
long long unsigned int totalvideo = 0;
long long unsigned int totalaudio = 0;
long long unsigned int keyframes = 0;
long long unsigned int key_min = 0xffffffff;
long long unsigned int key_max = 0;
long long unsigned int vid_min = 0xffffffff;
long long unsigned int vid_max = 0;
long long unsigned int aud_min = 0xffffffff;
long long unsigned int aud_max = 0;
long long unsigned int bfrm_min = 0xffffffff;
long long unsigned int bfrm_max = 0;
long long unsigned int bps = 0;
F.seekNext();
while ( !F.getJSON().isNull()){
std::cout << F.getJSON().toPrettyString() << std::endl;
nowpack = F.getJSON()["time"].asInt();
if (firstpack == 0){
firstpack = nowpack;
}
if (F.getJSON()["datatype"].asString() == "audio"){
if (lastaudio != 0 && (nowpack - lastaudio) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastaudio);
if (bps < aud_min){
aud_min = bps;
}
if (bps > aud_max){
aud_max = bps;
}
}
totalaudio += F.getJSON()["data"].asString().size();
lastaudio = nowpack;
}
if (F.getJSON()["datatype"].asString() == "video"){
if (lastvideo != 0 && (nowpack - lastvideo) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastvideo);
if (bps < vid_min){
vid_min = bps;
}
if (bps > vid_max){
vid_max = bps;
}
}
if (F.getJSON()["keyframe"].asInt() != 0){
if (lastkey != 0){
bps = nowpack - lastkey;
if (bps < key_min){
key_min = bps;
}
if (bps > key_max){
key_max = bps;
}
}
keyframes++;
lastkey = nowpack;
}
if (F.getJSON()["offset"].asInt() != 0){
bps = F.getJSON()["offset"].asInt();
if (bps < bfrm_min){
bfrm_min = bps;
}
if (bps > bfrm_max){
bfrm_max = bps;
}
}
totalvideo += F.getJSON()["data"].asString().size();
lastvideo = nowpack;
}
F.seekNext();
}
std::cout << std::endl << "Summary:" << std::endl;
meta["length"] = (long long int)((nowpack - firstpack) / 1000);
if (meta.isMember("audio")){
meta["audio"]["bps"] = (long long int)(totalaudio / ((lastaudio - firstpack) / 1000));
std::cout << " Audio: " << meta["audio"]["codec"].asString() << std::endl;
std::cout << " Bitrate: " << meta["audio"]["bps"].asInt() << std::endl;
}
if (meta.isMember("video")){
meta["video"]["bps"] = (long long int)(totalvideo / ((lastvideo - firstpack) / 1000));
meta["video"]["keyms"] = (long long int)((lastvideo - firstpack) / keyframes);
if (meta["video"]["keyms"].asInt() - key_min > key_max - meta["video"]["keyms"].asInt()){
meta["video"]["keyvar"] = (long long int)(meta["video"]["keyms"].asInt() - key_min);
}else{
meta["video"]["keyvar"] = (long long int)(key_max - meta["video"]["keyms"].asInt());
}
std::cout << " Video: " << meta["video"]["codec"].asString() << std::endl;
std::cout << " Bitrate: " << meta["video"]["bps"].asInt() << std::endl;
std::cout << " Keyframes: " << meta["video"]["keyms"].asInt() << "~" << meta["video"]["keyvar"].asInt() << std::endl;
std::cout << " B-frames: " << bfrm_min << " - " << bfrm_max << std::endl;
}
return 0;
}
}
/// Reads an DTSC file and prints all readable data about it /// Reads an DTSC file and prints all readable data about it
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("filename", JSON::fromString("{\"arg_num\":1, \"arg\":\"string\", \"help\":\"Filename of the DTSC file to analyse.\"}")); conf.addOption("filename", JSON::fromString("{\"arg_num\":1, \"arg\":\"string\", \"help\":\"Filename of the DTSC file to analyse.\"}"));
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
DTSC::File F(conf.getString("filename")); return Analysers::analyseDTSC(conf);
JSON::Value meta = F.getMeta();
std::cout << meta.toPrettyString() << std::endl;
JSON::Value pack;
long long unsigned int firstpack = 0;
long long unsigned int nowpack = 0;
long long unsigned int lastaudio = 0;
long long unsigned int lastvideo = 0;
long long unsigned int lastkey = 0;
long long unsigned int totalvideo = 0;
long long unsigned int totalaudio = 0;
long long unsigned int keyframes = 0;
long long unsigned int key_min = 0xffffffff;
long long unsigned int key_max = 0;
long long unsigned int vid_min = 0xffffffff;
long long unsigned int vid_max = 0;
long long unsigned int aud_min = 0xffffffff;
long long unsigned int aud_max = 0;
long long unsigned int bfrm_min = 0xffffffff;
long long unsigned int bfrm_max = 0;
long long unsigned int bps = 0;
F.seekNext();
while ( !F.getJSON().isNull()){
std::cout << F.getJSON().toPrettyString() << std::endl;
nowpack = F.getJSON()["time"].asInt();
if (firstpack == 0){
firstpack = nowpack;
}
if (F.getJSON()["datatype"].asString() == "audio"){
if (lastaudio != 0 && (nowpack - lastaudio) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastaudio);
if (bps < aud_min){
aud_min = bps;
}
if (bps > aud_max){
aud_max = bps;
}
}
totalaudio += F.getJSON()["data"].asString().size();
lastaudio = nowpack;
}
if (F.getJSON()["datatype"].asString() == "video"){
if (lastvideo != 0 && (nowpack - lastvideo) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastvideo);
if (bps < vid_min){
vid_min = bps;
}
if (bps > vid_max){
vid_max = bps;
}
}
if (F.getJSON()["keyframe"].asInt() != 0){
if (lastkey != 0){
bps = nowpack - lastkey;
if (bps < key_min){
key_min = bps;
}
if (bps > key_max){
key_max = bps;
}
}
keyframes++;
lastkey = nowpack;
}
if (F.getJSON()["offset"].asInt() != 0){
bps = F.getJSON()["offset"].asInt();
if (bps < bfrm_min){
bfrm_min = bps;
}
if (bps > bfrm_max){
bfrm_max = bps;
}
}
totalvideo += F.getJSON()["data"].asString().size();
lastvideo = nowpack;
}
F.seekNext();
}
std::cout << std::endl << "Summary:" << std::endl;
meta["length"] = (long long int)((nowpack - firstpack) / 1000);
if (meta.isMember("audio")){
meta["audio"]["bps"] = (long long int)(totalaudio / ((lastaudio - firstpack) / 1000));
std::cout << " Audio: " << meta["audio"]["codec"].asString() << std::endl;
std::cout << " Bitrate: " << meta["audio"]["bps"].asInt() << std::endl;
}
if (meta.isMember("video")){
meta["video"]["bps"] = (long long int)(totalvideo / ((lastvideo - firstpack) / 1000));
meta["video"]["keyms"] = (long long int)((lastvideo - firstpack) / keyframes);
if (meta["video"]["keyms"].asInt() - key_min > key_max - meta["video"]["keyms"].asInt()){
meta["video"]["keyvar"] = (long long int)(meta["video"]["keyms"].asInt() - key_min);
}else{
meta["video"]["keyvar"] = (long long int)(key_max - meta["video"]["keyms"].asInt());
}
std::cout << " Video: " << meta["video"]["codec"].asString() << std::endl;
std::cout << " Bitrate: " << meta["video"]["bps"].asInt() << std::endl;
std::cout << " Keyframes: " << meta["video"]["keyms"].asInt() << "~" << meta["video"]["keyvar"].asInt() << std::endl;
std::cout << " B-frames: " << bfrm_min << " - " << bfrm_max << std::endl;
}
return 0;
} //main } //main

View file

@ -14,15 +14,25 @@
#include <mist/flv_tag.h> //FLV support #include <mist/flv_tag.h> //FLV support
#include <mist/config.h> #include <mist/config.h>
/// Reads FLV from stdin and outputs human-readable information to stderr. ///\brief Holds everything unique to the analysers.
namespace Analysers {
///\brief Debugging tool for FLV data.
///
/// Expects FLV data through stdin, outputs human-readable information to stderr.
///\return The return code of the analyser.
int analyseFLV(){
FLV::Tag flvData; // Temporary storage for incoming FLV data.
while ( !feof(stdin)){
if (flvData.FileLoader(stdin)){
std::cout << "Tag: " << flvData.tagType() << "\n\tTime: " << flvData.tagTime() << std::endl;
}
}
return 0;
}
}
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
FLV::Tag FLV_in; // Temporary storage for incoming FLV data. return Analysers::analyseFLV();
while ( !feof(stdin)){
if (FLV_in.FileLoader(stdin)){
std::cout << "Tag: " << FLV_in.tagType() << "\n\tTime: " << FLV_in.tagTime() << std::endl;
}
}
return 0;
} }

View file

@ -10,22 +10,33 @@
#include <mist/mp4.h> #include <mist/mp4.h>
#include <mist/config.h> #include <mist/config.h>
///\brief Holds everything unique to the analysers.
namespace Analysers {
///\brief Debugging tool for MP4 data.
///
/// Expects MP4 data through stdin, outputs human-readable information to stderr.
///\return The return code of the analyser.
int analyseMP4(){
std::string mp4Buffer;
//Read all of std::cin to mp4Buffer
while (std::cin.good()){
mp4Buffer += std::cin.get();
}
mp4Buffer.erase(mp4Buffer.size() - 1, 1);
MP4::Box mp4Data;
while (mp4Data.read(mp4Buffer)){
std::cerr << mp4Data.toPrettyString(0) << std::endl;
}
return 0;
}
}
/// Debugging tool for MP4 data. /// Debugging tool for MP4 data.
/// Expects MP4 data through stdin, outputs human-readable information to stderr. /// Expects MP4 data through stdin, outputs human-readable information to stderr.
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
return Analysers::analyseMP4();
std::string temp;
while (std::cin.good()){
temp += std::cin.get();
} //read all of std::cin to temp
temp.erase(temp.size() - 1, 1); //strip the invalid last character
MP4::Box mp4data;
while (mp4data.read(temp)){
std::cerr << mp4data.toPrettyString(0) << std::endl;
}
return 0;
} }

View file

@ -1,33 +1,195 @@
/// \file rtmp_analyser.cpp /// \file rtmp_analyser.cpp
/// Debugging tool for RTMP data. /// Debugging tool for RTMP data.
/// Expects RTMP data of one side of the conversion through stdin, outputs human-readable information to stderr.
/// Automatically skips 3073 bytes of handshake data.
/// Optionally reconstructs an FLV file
/// Singular argument is a bitmask indicating the following (defaulting to 0):
/// - 0 = Info: Output chunk meanings and fulltext commands to stderr.
/// - 1 = Reconstruct: Output valid .flv file to stdout.
/// - 2 = Explicit: Audio/video data details.
/// - 4 = Verbose: details about every whole chunk.
#include <cstdlib>
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#include <string> #include <string>
#include <iostream>
#include <cstdlib>
#include <mist/flv_tag.h> #include <mist/flv_tag.h>
#include <mist/amf.h> #include <mist/amf.h>
#include <mist/rtmpchunks.h> #include <mist/rtmpchunks.h>
#include <mist/config.h> #include <mist/config.h>
int Detail = 0;
#define DETAIL_RECONSTRUCT 1 #define DETAIL_RECONSTRUCT 1
#define DETAIL_EXPLICIT 2 #define DETAIL_EXPLICIT 2
#define DETAIL_VERBOSE 4 #define DETAIL_VERBOSE 4
/// Debugging tool for RTMP data. ///\brief Holds everything unique to the analysers.
/// Expects RTMP data of one side of the conversion through stdin, outputs human-readable information to stderr. namespace Analysers {
/// Will output FLV file to stdout, if available ///\brief Debugging tool for RTMP data.
/// Automatically skips 3073 bytes of handshake data. ///
///Expects RTMP data of one side of the conversation through stdin, outputs human-readable information to stderr.
///
///Will output FLV file to stdout, if available.
///
///Automatically skips the handshake data.
///\param conf The configuration parsed from the commandline.
///\return The return code of the analyser.
int analyseRTMP(Util::Config conf){
int Detail = conf.getInteger("detail");
if (Detail > 0){
fprintf(stderr, "Detail level set:\n");
if (Detail & DETAIL_RECONSTRUCT){
fprintf(stderr, " - Will reconstuct FLV file to stdout\n");
std::cout.write(FLV::Header, 13);
}
if (Detail & DETAIL_EXPLICIT){
fprintf(stderr, " - Will list explicit video/audio data information\n");
}
if (Detail & DETAIL_VERBOSE){
fprintf(stderr, " - Will list verbose chunk information\n");
}
}
std::string inbuffer;
while (std::cin.good()){
inbuffer += std::cin.get();
} //read all of std::cin to temp
inbuffer.erase(0, 3073); //strip the handshake part
RTMPStream::Chunk next;
FLV::Tag F; //FLV holder
AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
while (next.Parse(inbuffer)){
if (Detail & DETAIL_VERBOSE){
fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp,
next.len, next.msg_type_id, next.msg_stream_id);
}
switch (next.msg_type_id){
case 0: //does not exist
fprintf(stderr, "Error chunk - %i, %i, %i, %i, %i\n", next.cs_id, next.timestamp, next.real_len, next.len_left, next.msg_stream_id);
//return 0;
break; //happens when connection breaks unexpectedly
case 1: //set chunk size
RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str());
fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max);
break;
case 2: //abort message - we ignore this one
fprintf(stderr, "CTRL: Abort message: %i\n", ntohl(*(int*)next.data.c_str()));
//4 bytes of stream id to drop
break;
case 3: //ack
RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str());
fprintf(stderr, "CTRL: Acknowledgement: %i\n", RTMPStream::snd_window_at);
break;
case 4: {
short int ucmtype = ntohs(*(short int*)next.data.c_str());
switch (ucmtype){
case 0:
fprintf(stderr, "CTRL: User control message: stream begin %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 1:
fprintf(stderr, "CTRL: User control message: stream EOF %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 2:
fprintf(stderr, "CTRL: User control message: stream dry %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 3:
fprintf(stderr, "CTRL: User control message: setbufferlen %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 4:
fprintf(stderr, "CTRL: User control message: streamisrecorded %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 6:
fprintf(stderr, "CTRL: User control message: pingrequest %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 7:
fprintf(stderr, "CTRL: User control message: pingresponse %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
default:
fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
}
}
break;
case 5: //window size of other end
RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str());
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
fprintf(stderr, "CTRL: Window size: %i\n", RTMPStream::rec_window_size);
break;
case 6:
RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str());
//4 bytes window size, 1 byte limit type (ignored)
fprintf(stderr, "CTRL: Set peer bandwidth: %i\n", RTMPStream::snd_window_size);
break;
case 8:
if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){
F.ChunkLoader(next);
if (Detail & DETAIL_EXPLICIT){
fprintf(stderr, "Received %i bytes audio data\n", next.len);
std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl;
}
if (Detail & DETAIL_RECONSTRUCT){
std::cout.write(F.data, F.len);
}
}
break;
case 9:
if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){
F.ChunkLoader(next);
if (Detail & DETAIL_EXPLICIT){
fprintf(stderr, "Received %i bytes video data\n", next.len);
std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl;
}
if (Detail & DETAIL_RECONSTRUCT){
std::cout.write(F.data, F.len);
}
}
break;
case 15:
fprintf(stderr, "Received AFM3 data message\n");
break;
case 16:
fprintf(stderr, "Received AFM3 shared object\n");
break;
case 17: {
fprintf(stderr, "Received AFM3 command message:\n");
char soort = next.data[0];
next.data = next.data.substr(1);
if (soort == 0){
amfdata = AMF::parse(next.data);
std::cerr << amfdata.Print() << std::endl;
}else{
amf3data = AMF::parse3(next.data);
amf3data.Print();
}
}
break;
case 18: {
fprintf(stderr, "Received AFM0 data message (metadata):\n");
amfdata = AMF::parse(next.data);
amfdata.Print();
if (Detail & DETAIL_RECONSTRUCT){
F.ChunkLoader(next);
std::cout.write(F.data, F.len);
}
}
break;
case 19:
fprintf(stderr, "Received AFM0 shared object\n");
break;
case 20: { //AMF0 command message
fprintf(stderr, "Received AFM0 command message:\n");
amfdata = AMF::parse(next.data);
std::cerr << amfdata.Print() << std::endl;
}
break;
case 22:
fprintf(stderr, "Received aggregate message\n");
break;
default:
fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n");
return 1;
break;
} //switch for type of chunk
} //while chunk parsed
fprintf(stderr, "No more readable data\n");
return 0;
}
}
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("detail", conf.addOption("detail",
@ -35,164 +197,5 @@ int main(int argc, char ** argv){
"{\"arg_num\":1, \"arg\":\"integer\", \"default\":0, \"help\":\"Bitmask, 1 = Reconstruct, 2 = Explicit media info, 4 = Verbose chunks\"}")); "{\"arg_num\":1, \"arg\":\"integer\", \"default\":0, \"help\":\"Bitmask, 1 = Reconstruct, 2 = Explicit media info, 4 = Verbose chunks\"}"));
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Detail = conf.getInteger("detail"); return Analysers::analyseRTMP(conf);
if (Detail > 0){
fprintf(stderr, "Detail level set:\n");
if ((Detail & DETAIL_RECONSTRUCT) == DETAIL_RECONSTRUCT){
fprintf(stderr, " - Will reconstuct FLV file to stdout\n");
std::cout.write(FLV::Header, 13);
}
if ((Detail & DETAIL_EXPLICIT) == DETAIL_EXPLICIT){
fprintf(stderr, " - Will list explicit video/audio data information\n");
}
if ((Detail & DETAIL_VERBOSE) == DETAIL_VERBOSE){
fprintf(stderr, " - Will list verbose chunk information\n");
}
}
std::string inbuffer;
while (std::cin.good()){
inbuffer += std::cin.get();
} //read all of std::cin to temp
inbuffer.erase(0, 3073); //strip the handshake part
RTMPStream::Chunk next;
FLV::Tag F; //FLV holder
AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
while (next.Parse(inbuffer)){
if ((Detail & DETAIL_VERBOSE) == DETAIL_VERBOSE){
fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp,
next.len, next.msg_type_id, next.msg_stream_id);
}
switch (next.msg_type_id){
case 0: //does not exist
fprintf(stderr, "Error chunk - %i, %i, %i, %i, %i\n", next.cs_id, next.timestamp, next.real_len, next.len_left, next.msg_stream_id);
//return 0;
break; //happens when connection breaks unexpectedly
case 1: //set chunk size
RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str());
fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max);
break;
case 2: //abort message - we ignore this one
fprintf(stderr, "CTRL: Abort message: %i\n", ntohl(*(int*)next.data.c_str()));
//4 bytes of stream id to drop
break;
case 3: //ack
RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str());
fprintf(stderr, "CTRL: Acknowledgement: %i\n", RTMPStream::snd_window_at);
break;
case 4: {
short int ucmtype = ntohs(*(short int*)next.data.c_str());
switch (ucmtype){
case 0:
fprintf(stderr, "CTRL: User control message: stream begin %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 1:
fprintf(stderr, "CTRL: User control message: stream EOF %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 2:
fprintf(stderr, "CTRL: User control message: stream dry %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 3:
fprintf(stderr, "CTRL: User control message: setbufferlen %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 4:
fprintf(stderr, "CTRL: User control message: streamisrecorded %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 6:
fprintf(stderr, "CTRL: User control message: pingrequest %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
case 7:
fprintf(stderr, "CTRL: User control message: pingresponse %u\n", ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
default:
fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2)));
break;
}
}
break;
case 5: //window size of other end
RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str());
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
fprintf(stderr, "CTRL: Window size: %i\n", RTMPStream::rec_window_size);
break;
case 6:
RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str());
//4 bytes window size, 1 byte limit type (ignored)
fprintf(stderr, "CTRL: Set peer bandwidth: %i\n", RTMPStream::snd_window_size);
break;
case 8:
if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){
F.ChunkLoader(next);
if ((Detail & DETAIL_EXPLICIT) == DETAIL_EXPLICIT){
fprintf(stderr, "Received %i bytes audio data\n", next.len);
std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl;
}
if ((Detail & DETAIL_RECONSTRUCT) == DETAIL_RECONSTRUCT){
std::cout.write(F.data, F.len);
}
}
break;
case 9:
if (Detail & (DETAIL_EXPLICIT | DETAIL_RECONSTRUCT)){
F.ChunkLoader(next);
if ((Detail & DETAIL_EXPLICIT) == DETAIL_EXPLICIT){
fprintf(stderr, "Received %i bytes video data\n", next.len);
std::cerr << "Got a " << F.len << " bytes " << F.tagType() << " FLV tag of time " << F.tagTime() << "." << std::endl;
}
if ((Detail & DETAIL_RECONSTRUCT) == DETAIL_RECONSTRUCT){
std::cout.write(F.data, F.len);
}
}
break;
case 15:
fprintf(stderr, "Received AFM3 data message\n");
break;
case 16:
fprintf(stderr, "Received AFM3 shared object\n");
break;
case 17: {
fprintf(stderr, "Received AFM3 command message:\n");
char soort = next.data[0];
next.data = next.data.substr(1);
if (soort == 0){
amfdata = AMF::parse(next.data);
std::cerr << amfdata.Print() << std::endl;
}else{
amf3data = AMF::parse3(next.data);
amf3data.Print();
}
}
break;
case 18: {
fprintf(stderr, "Received AFM0 data message (metadata):\n");
amfdata = AMF::parse(next.data);
amfdata.Print();
if ((Detail & DETAIL_RECONSTRUCT) == DETAIL_RECONSTRUCT){
F.ChunkLoader(next);
std::cout.write(F.data, F.len);
}
}
break;
case 19:
fprintf(stderr, "Received AFM0 shared object\n");
break;
case 20: { //AMF0 command message
fprintf(stderr, "Received AFM0 command message:\n");
amfdata = AMF::parse(next.data);
std::cerr << amfdata.Print() << std::endl;
}
break;
case 22:
fprintf(stderr, "Received aggregate message\n");
break;
default:
fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n");
return 1;
break;
} //switch for type of chunk
} //while chunk parsed
fprintf(stderr, "No more readable data\n");
return 0;
} //main } //main

View file

@ -23,13 +23,16 @@ namespace Buffer {
Stream * thisStream = 0; Stream * thisStream = 0;
Socket::Server SS; ///< The server socket. Socket::Server SS; ///< The server socket.
/// Gets the current system time in milliseconds. ///\brief Gets the current system time in milliseconds.
///\return The current timestamp in milliseconds.
long long int getNowMS(){ long long int getNowMS(){
timeval t; timeval t;
gettimeofday( &t, 0); gettimeofday( &t, 0);
return t.tv_sec * 1000 + t.tv_usec / 1000; return t.tv_sec * 1000 + t.tv_usec / 1000;
} //getNowMS } //getNowMS
///\brief A function running in a thread to send all statistics.
///\param empty A null pointer.
void handleStats(void * empty){ void handleStats(void * empty){
if (empty != 0){ if (empty != 0){
return; return;
@ -51,6 +54,8 @@ namespace Buffer {
StatsSocket.close(); StatsSocket.close();
} }
///\brief A function running in a thread to handle a new user connection.
///\param v_usr The user that is connected.
void handleUser(void * v_usr){ void handleUser(void * v_usr){
user * usr = (user*)v_usr; user * usr = (user*)v_usr;
#if DEBUG >= 5 #if DEBUG >= 5
@ -153,7 +158,10 @@ namespace Buffer {
usr->Disconnect("Socket closed."); usr->Disconnect("Socket closed.");
} }
/// Loop reading DTSC data from stdin and processing it at the correct speed. ///\brief A function running a thread to handle input data through stdin.
///
///Automatically slows down to realtime playback.
///\param empty A null pointer.
void handleStdin(void * empty){ void handleStdin(void * empty){
if (empty != 0){ if (empty != 0){
return; return;
@ -190,8 +198,8 @@ namespace Buffer {
buffer_running = false; buffer_running = false;
} }
/// Loop reading DTSC data from an IP push address. ///\brief A function running a thread to handle input data through rtmp push.
/// No changes to the speed are made. ///\param empty A null pointer.
void handlePushin(void * empty){ void handlePushin(void * empty){
if (empty != 0){ if (empty != 0){
return; return;
@ -221,7 +229,10 @@ namespace Buffer {
} }
} }
/// Starts a loop, waiting for connections to send data to. ///\brief Starts a loop, waiting for connections to send data to.
///\param argc The number of arguments to the program.
///\param argv The arguments to the program.
///\return The return code of the buffer.
int Start(int argc, char ** argv){ int Start(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("stream_name", conf.addOption("stream_name",
@ -295,7 +306,7 @@ namespace Buffer {
; ;
//Buffer namespace //Buffer namespace
/// Entry point for Buffer, simply calls Buffer::Start(). ///\brief Entry point for Buffer, simply calls Buffer::Start().
int main(int argc, char ** argv){ int main(int argc, char ** argv){
return Buffer::Start(argc, argv); return Buffer::Start(argc, argv);
} //main } //main

View file

@ -11,17 +11,19 @@ namespace Buffer {
/// Converts a stats line to up, down, host, connector and conntime values. /// Converts a stats line to up, down, host, connector and conntime values.
class Stats{ class Stats{
public: public:
unsigned int up; unsigned int up;///<The amount of bytes sent upstream.
unsigned int down; unsigned int down;///<The amount of bytes received downstream.
std::string host; std::string host;///<The connected host.
std::string connector; std::string connector;///<The connector the user is connected with.
unsigned int conntime; unsigned int conntime;///<The amount of time the user is connected.
Stats(); Stats();
Stats(std::string s); Stats(std::string s);
}; };
/// Holds connected users. ///\brief Keeps track of connected users.
/// Keeps track of what buffer users are using and the connection status. ///
///Keeps track of which buffer the user currently uses,
///and its connection status.
class user{ class user{
public: public:
tthread::thread * Thread; ///< Holds the thread dealing with this user. tthread::thread * Thread; ///< Holds the thread dealing with this user.

View file

@ -24,21 +24,27 @@
#define CYG_LOOP #define CYG_LOOP
#endif #endif
/// Copy of stats from buffer_user.cpp ///Converts a stats line to up, down, host, connector and conntime values.
class Stats{ class Stats{
public: public:
unsigned int up; unsigned int up;///<The amount of bytes sent upstream.
unsigned int down; unsigned int down;///<The amount of bytes received downstream.
std::string host; std::string host;///<The connected host.
std::string connector; std::string connector;///<The connector the user is connected with.
unsigned int conntime; unsigned int conntime;///<The amount of time the user is connected.
///\brief Default stats constructor.
///
///Should not be used.
Stats(){ Stats(){
up = 0; up = 0;
down = 0; down = 0;
conntime = 0; conntime = 0;
} }
; ;
/// Reads a stats string and parses it to the internal representation. ///\brief Stats constructor reading a string.
///
///Reads a stats string and parses it to the internal representation.
///\param s The string of stats.
Stats(std::string s){ Stats(std::string s){
size_t f = s.find(' '); size_t f = s.find(' ');
if (f != std::string::npos){ if (f != std::string::npos){

View file

@ -10,6 +10,10 @@ namespace Controller {
std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers. std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
///\brief Checks whether two streams are equal.
///\param one The first stream for the comparison.
///\param two The second stream for the comparison.
///\return True if the streams are equal, false otherwise.
bool streamsEqual(JSON::Value & one, JSON::Value & two){ bool streamsEqual(JSON::Value & one, JSON::Value & two){
if ( !one.isMember("source") || !two.isMember("source") || one["source"] != two["source"]){ if ( !one.isMember("source") || !two.isMember("source") || one["source"] != two["source"]){
return false; return false;
@ -17,6 +21,9 @@ namespace Controller {
return true; return true;
} }
///\brief Starts a single stream
///\param name The name of the stream
///\param data The corresponding configuration values.
void startStream(std::string name, JSON::Value & data){ void startStream(std::string name, JSON::Value & data){
data["online"] = (std::string)"Checking..."; data["online"] = (std::string)"Checking...";
data.removeMember("error"); data.removeMember("error");
@ -76,6 +83,8 @@ namespace Controller {
} }
} }
///\brief Checks all streams, restoring if needed.
///\param data The stream configuration for the server.
void CheckAllStreams(JSON::Value & data){ void CheckAllStreams(JSON::Value & data){
long long int currTime = Util::epoch(); long long int currTime = Util::epoch();
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){ for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
@ -131,6 +140,9 @@ namespace Controller {
} }
} }
///\brief Parse a given stream configuration.
///\param in The requested configuration.
///\param out The new configuration after parsing.
void CheckStreams(JSON::Value & in, JSON::Value & out){ void CheckStreams(JSON::Value & in, JSON::Value & out){
bool changed = false; bool changed = false;