Refactoring of the HTTP_Connector namespace, starting to add documentation as well.

This commit is contained in:
Erik Zandvliet 2013-03-27 16:05:59 +01:00
parent 2e1296cf5e
commit 570aa95315
7 changed files with 233 additions and 159 deletions

View file

@ -26,7 +26,7 @@ DOXYFILE_ENCODING = UTF-8
# identify the project. Note that if you do not use Doxywizard you need
# to put quotes around the project name if it contains spaces.
PROJECT_NAME = DDVTECHStreamingServer
PROJECT_NAME = MistServer
# The PROJECT_NUMBER tag can be used to enter a project or revision number.
# This could be handy for archiving the generated documentation or
@ -106,7 +106,7 @@ ABBREVIATE_BRIEF =
# Doxygen will generate a detailed section even if there is only a brief
# description.
ALWAYS_DETAILED_SEC = NO
ALWAYS_DETAILED_SEC = YES
# If the INLINE_INHERITED_MEMB tag is set to YES, doxygen will show all
# inherited members of a class in the documentation of that class as if those
@ -151,7 +151,7 @@ SHORT_NAMES = NO
# comments will behave just like regular Qt-style comments
# (thus requiring an explicit @brief command for a brief description.)
JAVADOC_AUTOBRIEF = YES
JAVADOC_AUTOBRIEF = NO
# If the QT_AUTOBRIEF tag is set to YES then Doxygen will
# interpret the first line (until the first dot) of a Qt-style
@ -471,7 +471,7 @@ SORT_MEMBER_DOCS = YES
# by member name. If set to NO (the default) the members will appear in
# declaration order.
SORT_BRIEF_DOCS = NO
SORT_BRIEF_DOCS = YES
# If the SORT_MEMBERS_CTORS_1ST tag is set to YES then doxygen
# will sort the (brief and detailed) documentation of class members so that
@ -481,7 +481,7 @@ SORT_BRIEF_DOCS = NO
# This tag will be ignored for brief docs if SORT_BRIEF_DOCS is set to NO
# and ignored for detailed docs if SORT_MEMBER_DOCS is set to NO.
SORT_MEMBERS_CTORS_1ST = NO
SORT_MEMBERS_CTORS_1ST = YES
# If the SORT_GROUP_NAMES tag is set to YES then doxygen will sort the
# hierarchy of group names into alphabetical order. If set to NO (the default)
@ -629,7 +629,7 @@ WARN_IF_DOC_ERROR = YES
# wrong or incomplete parameter documentation, but not about the absence of
# documentation.
WARN_NO_PARAMDOC = NO
WARN_NO_PARAMDOC = YES
# The WARN_FORMAT tag determines the format of the warning messages that
# doxygen can produce. The string should contain the $file, $line, and $text

View file

@ -3,6 +3,8 @@
#include <iostream>
#include <queue>
#include <set>
#include <cstdlib>
#include <cstdio>
#include <cmath>
@ -10,17 +12,18 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <getopt.h>
#include <set>
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <mist/config.h>
#include <mist/stream.h>
#include <mist/timing.h>
#include <mist/auth.h>
#include "tinythread.h"
#include "embed.js.h"
/// Holds everything unique to HTTP Connector.
/// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
/// Class for keeping track of connections to connectors.
@ -60,7 +63,7 @@ namespace Connector_HTTP {
tthread::mutex timeout_mutex; ///< Mutex for timeout thread.
tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors.
void Timeout_Thread(void * n){
void proxyTimeoutThread(void * n){
n = 0; //prevent unused variable warning
tthread::lock_guard<tthread::mutex> guard(timeout_mutex);
while (true){
@ -90,7 +93,7 @@ namespace Connector_HTTP {
}
/// Handles requests without associated handler, displaying a nice friendly error message.
long long int Handle_None(HTTP::Parser & H, Socket::Connection * conn){
long long int proxyHandleUnsupported(HTTP::Parser & H, Socket::Connection * conn){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody(
@ -100,7 +103,7 @@ namespace Connector_HTTP {
return ret;
}
long long int Handle_Timeout(HTTP::Parser & H, Socket::Connection * conn){
long long int proxyHandleTimeout(HTTP::Parser & H, Socket::Connection * conn){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody(
@ -111,7 +114,7 @@ namespace Connector_HTTP {
}
/// Handles internal requests.
long long int Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){
long long int proxyHandleInternal(HTTP::Parser & H, Socket::Connection * conn){
std::string url = H.getUrl();
@ -257,11 +260,11 @@ namespace Connector_HTTP {
return ret;
} //embed code generator
return Handle_None(H, conn); //anything else doesn't get handled
return proxyHandleUnsupported(H, conn); //anything else doesn't get handled
}
/// Handles requests without associated handler, displaying a nice friendly error message.
long long int Handle_Through_Connector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){
long long int proxyHandleThroughConnector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){
//create a unique ID based on a hash of the user agent and host, followed by the stream name and connector
std::string uid = Secure::md5(H.GetHeader("User-Agent") + conn->getHost()) + "_" + H.GetVar("stream") + "_" + connector;
H.SetHeader("X-Stream", H.GetVar("stream"));
@ -293,7 +296,7 @@ namespace Connector_HTTP {
timeouter->join();
delete timeouter;
}
timeouter = new tthread::thread(Connector_HTTP::Timeout_Thread, 0);
timeouter = new tthread::thread(Connector_HTTP::proxyTimeoutThread, 0);
timeout_mutex.unlock();
}
conn_mutex.unlock();
@ -303,7 +306,7 @@ namespace Connector_HTTP {
//if the server connection is dead, handle as timeout.
if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){
connconn[uid]->conn->close();
return Handle_Timeout(H, conn);
return proxyHandleTimeout(H, conn);
}
//forward the original request
connconn[uid]->conn->SendNow(request);
@ -332,7 +335,7 @@ namespace Connector_HTTP {
if (retries >= 5){
std::cout << "[5 retry-laters, cancelled]" << std::endl;
connconn[uid]->conn->close();
return Handle_Timeout(H, conn);
return proxyHandleTimeout(H, conn);
}
connconn[uid]->lastuse = 0;
timeout = 0;
@ -348,7 +351,7 @@ namespace Connector_HTTP {
if (timeout++ > 4000){
std::cout << "[20s timeout triggered]" << std::endl;
connconn[uid]->conn->close();
return Handle_Timeout(H, conn);
return proxyHandleTimeout(H, conn);
}else{
Util::sleep(5);
}
@ -357,7 +360,7 @@ namespace Connector_HTTP {
if ( !connconn.count(uid) || !connconn[uid]->conn->connected() || !conn->connected()){
//failure, disconnect and sent error to user
connconn[uid]->conn->close();
return Handle_Timeout(H, conn);
return proxyHandleTimeout(H, conn);
}else{
long long int ret = Util::getMS();
//success, check type of response
@ -399,7 +402,7 @@ namespace Connector_HTTP {
/// - internal (request fed from information internal to this connector)
/// - dynamic (request fed from http_dynamic connector)
/// - progressive (request fed from http_progressive connector)
std::string getHTTPType(HTTP::Parser & H){
std::string proxyGetHandleType(HTTP::Parser & H){
std::string url = H.getUrl();
if (url.find("/dynamic/") != std::string::npos){
std::string streamname = url.substr(9, url.find("/", 9) - 9);
@ -447,7 +450,7 @@ namespace Connector_HTTP {
}
/// Thread for handling a single HTTP connection
void Handle_HTTP_Connection(void * pointer){
void proxyHandleHTTPConnection(void * pointer){
Socket::Connection * conn = (Socket::Connection *)pointer;
conn->setBlocking(false); //do not block on conn.spool() when no data is available
HTTP::Parser Client;
@ -464,7 +467,7 @@ namespace Connector_HTTP {
}
}
if (Client.Read(conn->Received().get())){
std::string handler = getHTTPType(Client);
std::string handler = proxyGetHandleType(Client);
#if DEBUG >= 4
std::cout << "Received request: " << Client.getUrl() << " (" << conn->getSocket() << ") => " << handler << " (" << Client.GetVar("stream")
<< ")" << std::endl;
@ -478,12 +481,12 @@ namespace Connector_HTTP {
if (handler == "none" || handler == "internal"){
if (handler == "internal"){
midms = Handle_Internal(Client, conn);
midms = proxyHandleInternal(Client, conn);
}else{
midms = Handle_None(Client, conn);
midms = proxyHandleUnsupported(Client, conn);
}
}else{
midms = Handle_Through_Connector(Client, conn, handler);
midms = proxyHandleThroughConnector(Client, conn, handler);
}
#if DEBUG >= 4
long long int nowms = Util::getMS();
@ -531,7 +534,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid
//lock the thread mutex and spawn a new thread for this connection
Connector_HTTP::thread_mutex.lock();
tthread::thread * T = new tthread::thread(Connector_HTTP::Handle_HTTP_Connection, (void *)(new Socket::Connection(S)));
tthread::thread * T = new tthread::thread(Connector_HTTP::proxyHandleHTTPConnection, (void *)(new Socket::Connection(S)));
Connector_HTTP::active_threads.insert(T);
//clean up any threads that may have finished
while ( !Connector_HTTP::done_threads.empty()){

View file

@ -23,10 +23,14 @@
#include <mist/stream.h>
#include <mist/timing.h>
/// Holds everything unique to HTTP Dynamic Connector.
/// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
std::string GenerateBootstrap(std::string & MovieId, JSON::Value & metadata, int fragnum = 0){
///\brief Builds a bootstrap for use in HTTP Dynamic streaming.
///\param MovieId The name of the movie.
///\param metadata The current metadata, used to generate the index.
///\param fragnum The index of the current fragment
///\return The generated bootstrap.
std::string dynamicBootstrap(std::string & MovieId, JSON::Value & metadata, int fragnum = 0){
std::string empty;
MP4::ASRT asrt;
@ -86,8 +90,11 @@ namespace Connector_HTTP {
return std::string((char*)abst.asBox(), (int)abst.boxedSize());
}
/// Returns a F4M-format manifest file
std::string BuildManifest(std::string & MovieId, JSON::Value & metadata){
///\brief Builds an index file for HTTP Dynamic streaming.
///\param MovieId The name of the movie.
///\param metadata The current metadata, used to generate the index.
///\return The index file for HTTP Dynamic Streaming.
std::string dynamicIndex(std::string & MovieId, JSON::Value & metadata){
std::string Result;
if (metadata.isMember("vod")){
Result =
@ -100,7 +107,7 @@ namespace Connector_HTTP {
"<mimeType>video/mp4</mimeType>\n"
"<streamType>recorded</streamType>\n"
"<deliveryType>streaming</deliveryType>\n"
"<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(GenerateBootstrap(MovieId, metadata)) + "</bootstrapInfo>\n"
"<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(dynamicBootstrap(MovieId, metadata)) + "</bootstrapInfo>\n"
"<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + MovieId + "/\">\n"
"<metadata>AgAKb25NZXRhRGF0YQMAAAk=</metadata>\n"
"</media>\n"
@ -126,8 +133,10 @@ namespace Connector_HTTP {
return Result;
} //BuildManifest
/// Main function for Connector_HTTP_Dynamic
int Connector_HTTP_Dynamic(Socket::Connection conn){
///\brief Main function for the HTTP Dynamic Connector
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int dynamicConnector(Socket::Connection conn){
std::deque<std::string> FlashBuf;
int FlashBufSize = 0;
long long int FlashBufTime = 0;
@ -184,7 +193,7 @@ namespace Connector_HTTP {
}
if (HTTP_R.url.find(".abst") != std::string::npos){
HTTP_S.Clean();
HTTP_S.SetBody(GenerateBootstrap(streamname, Strm.metadata));
HTTP_S.SetBody(dynamicBootstrap(streamname, Strm.metadata));
HTTP_S.SetHeader("Content-Type", "binary/octet");
HTTP_S.SetHeader("Cache-Control", "no-cache");
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -232,7 +241,7 @@ namespace Connector_HTTP {
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = BuildManifest(streamname, Strm.metadata);
std::string manifest = dynamicIndex(streamname, Strm.metadata);
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
}
@ -254,7 +263,7 @@ namespace Connector_HTTP {
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.SetBody("");
std::string new_strap = GenerateBootstrap(streamname, Strm.metadata, ReqFragment);
std::string new_strap = dynamicBootstrap(streamname, Strm.metadata, ReqFragment);
HTTP_S.SetHeader("Content-Length", FlashBufSize + 8 + new_strap.size()); //32+33+btstrp.size());
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(new_strap);
@ -305,6 +314,7 @@ namespace Connector_HTTP {
} //Connector_HTTP_Dynamic namespace
///\brief The standard process-spawning main function.
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935);
@ -320,7 +330,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid
pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_HTTP::Connector_HTTP_Dynamic(S);
return Connector_HTTP::dynamicConnector(S);
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 3
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());

View file

@ -25,9 +25,11 @@
/// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
/// Returns a m3u or m3u8 index file
std::string BuildIndex(std::string & MovieId, JSON::Value & metadata){
///\brief Builds an index file for HTTP Live streaming.
///\param MovieId The name of the movie.
///\param metadata The current metadata, used to generate the index.
///\return The index file for HTTP Live Streaming.
std::string liveIndex(std::string & MovieId, JSON::Value & metadata){
std::stringstream Result;
if ( !metadata.isMember("live")){
int longestFragment = 0;
@ -58,10 +60,12 @@ namespace Connector_HTTP {
std::cerr << "Sending this index:" << std::endl << Result.str() << std::endl;
#endif
return Result.str();
} //BuildIndex
} //liveIndex
/// Main function for Connector_HTTP_Live
int Connector_HTTP_Live(Socket::Connection conn){
///\brief Main function for the HTTP HLS Connector
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int liveConnector(Socket::Connection conn){
std::stringstream TSBuf;
long long int TSBufTime = 0;
@ -171,7 +175,7 @@ namespace Connector_HTTP {
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", manifestType);
HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = BuildIndex(streamname, Strm.metadata);
std::string manifest = liveIndex(streamname, Strm.metadata);
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
}
@ -292,10 +296,11 @@ namespace Connector_HTTP {
fprintf(stderr, "HLS: User %i disconnected.\n", conn.getSocket());
#endif
return 0;
} //Connector_HTTP_Dynamic main function
} //HLS_Connector main function
} //Connector_HTTP_Dynamic namespace
} //Connector_HTTP namespace
///\brief The standard process-spawning main function.
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935);
@ -311,7 +316,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid
pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_HTTP::Connector_HTTP_Live(S);
return Connector_HTTP::liveConnector(S);
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 5
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());

View file

@ -1,8 +1,10 @@
/// \file conn_http_progressive.cpp
/// Contains the main code for the HTTP Progressive Connector
///\file conn_http_progressive.cpp
///\brief Contains the main code for the HTTP Progressive Connector
#include <iostream>
#include <queue>
#include <sstream>
#include <cstdlib>
#include <cstdio>
#include <cmath>
@ -10,7 +12,7 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <getopt.h>
#include <sstream>
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <mist/dtsc.h>
@ -20,28 +22,29 @@
#include <mist/stream.h>
#include <mist/timing.h>
/// Holds everything unique to HTTP Progressive Connector.
///\brief Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
///\brief Main function for the HTTP Progressive Connector
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int progressiveConnector(Socket::Connection conn){
bool progressive_has_sent_header = false;//Indicates whether we have sent a header.
bool ready4data = false; //Set to true when streaming is to begin.
DTSC::Stream Strm; //Incoming stream buffer.
HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender.
bool inited = false;//Whether the stream is initialized
Socket::Connection ss( -1);//The Stream Socket, used to connect to the desired stream.
std::string streamname;//Will contain the name of the stream.
FLV::Tag tag;//Temporary tag buffer.
/// Main function for Connector_HTTP_Progressive
int Connector_HTTP_Progressive(Socket::Connection conn){
bool progressive_has_sent_header = false;
bool ready4data = false; ///< Set to true when streaming is to begin.
DTSC::Stream Strm; ///< Incoming stream buffer.
HTTP::Parser HTTP_R, HTTP_S; ///<HTTP Receiver en HTTP Sender.
bool inited = false;
Socket::Connection ss( -1);
std::string streamname;
FLV::Tag tag; ///< Temporary tag buffer.
unsigned int lastStats = 0;//Indicates the last time that we have sent stats to the server socket.
unsigned int seek_sec = 0;//Seek position in ms
unsigned int seek_byte = 0;//Seek position in bytes
unsigned int lastStats = 0;
unsigned int seek_sec = 0; //seek position in ms
unsigned int seek_byte = 0; //seek position in bytes
bool isMP3 = false;
bool isMP3 = false;//Indicates whether the request is audio-only mp3.
while (conn.connected()){
//only parse input if available or not yet init'ed
//Only attempt to parse input when not yet init'ed.
if ( !inited){
if (conn.Received().size() || conn.spool()){
//make sure it ends in a \n
@ -192,10 +195,11 @@ namespace Connector_HTTP {
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
ss.close();
return 0;
} //Connector_HTTP main function
} //Progressive_Connector main function
} //Connector_HTTP namespace
///\brief The standard process-spawning main function.
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935);
@ -211,7 +215,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid
pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_HTTP::Connector_HTTP_Progressive(S);
return Connector_HTTP::progressiveConnector(S);
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 5
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());

View file

@ -1,9 +1,11 @@
/// \file conn_http_dynamic.cpp
/// Contains the main code for the HTTP Dynamic Connector
///\file conn_http_smooth.cpp
///\brief Contains the main code for the HTTP Smooth Connector
#include <iostream>
#include <iomanip>
#include <queue>
#include <sstream>
#include <cstdlib>
#include <cstdio>
#include <cmath>
@ -11,6 +13,7 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <getopt.h>
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <mist/json.h>
@ -19,36 +22,58 @@
#include <mist/amf.h>
#include <mist/mp4.h>
#include <mist/config.h>
#include <sstream>
#include <mist/stream.h>
#include <mist/timing.h>
/// Holds everything unique to HTTP Connectors.
///\brief Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
/// Returns a Smooth-format manifest file
std::string BuildManifest(std::string & MovieId, JSON::Value & metadata){
///\brief Builds an index file for HTTP Smooth streaming.
///\param MovieId The name of the movie.
///\param metadata The current metadata, used to generate the index.
///\return The index file for HTTP Smooth Streaming.
std::string smoothIndex(std::string & MovieId, JSON::Value & metadata){
std::stringstream Result;
Result << "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n";
Result << "<SmoothStreamingMedia MajorVersion=\"2\" MinorVersion=\"0\" TimeScale=\"10000000\" ";
Result << "<SmoothStreamingMedia "
"MajorVersion=\"2\" "
"MinorVersion=\"0\" "
"TimeScale=\"10000000\" ";
if (metadata.isMember("vod")){
Result << "Duration=\"" << metadata["lastms"].asInt() << "0000\"";
}else{
Result << "Duration=\"0\" IsLive=\"TRUE\" LookAheadFragmentCount=\"2\" DVRWindowLength=\"" + metadata["buffer_window"].asString() + "0000\" CanSeek=\"TRUE\" CanPause=\"TRUE\" ";
Result << "Duration=\"0\" "
"IsLive=\"TRUE\" "
"LookAheadFragmentCount=\"2\" "
"DVRWindowLength=\"" + metadata["buffer_window"].asString() + "0000\" "
"CanSeek=\"TRUE\" "
"CanPause=\"TRUE\" ";
}
Result << ">\n";
//Add audio entries
if (metadata.isMember("audio") && metadata["audio"]["codec"].asString() == "AAC"){
Result << " <StreamIndex Type=\"audio\" QualityLevels=\"1\" Name=\"audio\" Chunks=\"" << metadata["keytime"].size()
<< "\" Url=\"Q({bitrate})/A({start time})\">\n";
Result << " <QualityLevel Index=\"0\" Bitrate=\"" << metadata["audio"]["bps"].asInt() * 8 << "\" CodecPrivateData=\"";
Result << std::hex;
Result << "<StreamIndex "
"Type=\"audio\" "
"QualityLevels=\"1\" "
"Name=\"audio\" "
"Chunks=\"" << metadata["keytime"].size() << "\" "
"Url=\"Q({bitrate})/A({start time})\">\n";
//Add audio qualities
Result << "<QualityLevel "
"Index=\"0\" "
"Bitrate=\"" << metadata["audio"]["bps"].asInt() * 8 << "\" "
"CodecPrivateData=\"" << std::hex;
for (int i = 0; i < metadata["audio"]["init"].asString().size(); i++){
Result << std::setfill('0') << std::setw(2) << std::right << (int)metadata["audio"]["init"].asString()[i];
}
Result << std::dec;
Result << "\" SamplingRate=\"" << metadata["audio"]["rate"].asInt()
<< "\" Channels=\"2\" BitsPerSample=\"16\" PacketSize=\"4\" AudioTag=\"255\" FourCC=\"AACL\" />\n";
Result << std::dec << "\" "
"SamplingRate=\"" << metadata["audio"]["rate"].asInt() << "\" "
"Channels=\"2\" "
"BitsPerSample=\"16\" "
"PacketSize=\"4\" "
"AudioTag=\"255\" "
"FourCC=\"AACL\" />\n";
for (unsigned int i = 0; i < metadata["keylen"].size(); i++){
Result << " <c ";
Result << "<c ";
if (i == 0){
Result << "t=\"" << metadata["keytime"][i].asInt() * 10000 << "\" ";
}
@ -56,30 +81,41 @@ namespace Connector_HTTP {
}
Result << " </StreamIndex>\n";
}
//Add video entries
if (metadata.isMember("video") && metadata["video"]["codec"].asString() == "H264"){
Result << " <StreamIndex Type=\"video\" QualityLevels=\"1\" Name=\"video\" Chunks=\"" << metadata["keytime"].size()
<< "\" Url=\"Q({bitrate})/V({start time})\" MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" MaxHeight=\""
<< metadata["video"]["height"].asInt() << "\" DisplayWidth=\"" << metadata["video"]["width"].asInt() << "\" DisplayHeight=\""
<< metadata["video"]["height"].asInt() << "\">\n";
Result << " <QualityLevel Index=\"0\" Bitrate=\"" << metadata["video"]["bps"].asInt() * 8 << "\" CodecPrivateData=\"";
Result << "<StreamIndex "
"Type=\"video\" "
"QualityLevels=\"1\" "
"Name=\"video\" "
"Chunks=\"" << metadata["keytime"].size() << "\" "
"Url=\"Q({bitrate})/V({start time})\" "
"MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" "
"MaxHeight=\"" << metadata["video"]["height"].asInt() << "\" "
"DisplayWidth=\"" << metadata["video"]["width"].asInt() << "\" "
"DisplayHeight=\"" << metadata["video"]["height"].asInt() << "\">\n";
//Add video qualities
Result << "<QualityLevel "
"Index=\"0\" "
"Bitrate=\"" << metadata["video"]["bps"].asInt() * 8 << "\" "
"CodecPrivateData=\"" << std::hex;
MP4::AVCC avccbox;
avccbox.setPayload(metadata["video"]["init"].asString());
std::string tmpString = avccbox.asAnnexB();
Result << std::hex;
for (int i = 0; i < tmpString.size(); i++){
Result << std::setfill('0') << std::setw(2) << std::right << (int)tmpString[i];
}
Result << std::dec;
Result << "\" MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" MaxHeight=\"" << metadata["video"]["height"].asInt()
<< "\" FourCC=\"AVC1\" />\n";
Result << std::dec << "\" "
"MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" "
"MaxHeight=\"" << metadata["video"]["height"].asInt() << "\" "
"FourCC=\"AVC1\" />\n";
for (unsigned int i = 0; i < metadata["keylen"].size(); i++){
Result << " <c ";
Result << "<c ";
if (i == 0){
Result << "t=\"" << metadata["keytime"][i].asInt() * 10000 << "\" ";
}
Result << "d=\"" << metadata["keylen"][i].asInt() * 10000 << "\" />\n";
}
Result << " </StreamIndex>\n";
Result << "</StreamIndex>\n";
}
Result << "</SmoothStreamingMedia>\n";
@ -87,32 +123,35 @@ namespace Connector_HTTP {
std::cerr << "Sending this manifest:" << std::endl << Result << std::endl;
#endif
return Result.str();
} //BuildManifest
} //smoothIndex
/// Main function for Connector_HTTP_Dynamic
int Connector_HTTP_Dynamic(Socket::Connection conn){
std::deque<std::string> FlashBuf;
int FlashBufSize = 0;
///\brief Main function for the HTTP Smooth Connector
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int smoothConnector(Socket::Connection conn){
std::deque<std::string> dataBuffer;//A buffer for the data that needs to be sent to the client.
int dataSize = 0;//The amount of bytes in the dataBuffer
DTSC::Stream Strm; //Incoming stream buffer.
HTTP::Parser HTTP_R, HTTP_S; //HTTP Receiver and HTTP Sender.
DTSC::Stream Strm;//Incoming stream buffer.
HTTP::Parser HTTP_R;//HTTP Receiver
HTTP::Parser HTTP_S;//HTTP Sender.
bool ready4data = false; //Set to true when streaming is to begin.
Socket::Connection ss( -1);
std::string streamname;
bool ready4data = false;//Set to true when streaming is to begin.
Socket::Connection ss( -1);//The Stream Socket, used to connect to the desired stream.
std::string streamname;//Will contain the name of the stream.
bool wantsVideo = false;
bool wantsAudio = false;
bool wantsVideo = false;//Indicates whether this request is a video request.
bool wantsAudio = false;//Indicates whether this request is an audio request.
std::string Quality;
long long int ReqFragment = -1;
std::string tempStr;
unsigned int lastStats = 0;
conn.setBlocking(false); //do not block on conn.spool() when no data is available
std::string Quality;//Indicates the request quality of the movie.
long long int requestedTime = -1;//Indicates the fragment requested.
std::string parseString;//A string used for parsing different aspects of the request.
unsigned int lastStats = 0;//Indicates the last time that we have sent stats to the server socket.
conn.setBlocking(false);//Set the client socket to non-blocking
while (conn.connected()){
if (conn.spool() || conn.Received().size()){
//make sure it ends in a \n
//Make sure the received data ends in a newline (\n).
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
@ -126,9 +165,11 @@ namespace Connector_HTTP {
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
//Get data set by the proxy.
conn.setHost(HTTP_R.GetHeader("X-Origin"));
streamname = HTTP_R.GetHeader("X-Stream");
if ( !ss){
//initiate Stream Socket
ss = Util::Stream::getStream(streamname);
if ( !ss.connected()){
#if DEBUG >= 1
@ -141,7 +182,7 @@ namespace Connector_HTTP {
continue;
}
ss.setBlocking(false);
//make sure metadata is received
//Do nothing until metadata has been received.
while ( !Strm.metadata){
if (ss.spool()){
while (Strm.parsePacket(ss.Received())){
@ -151,25 +192,26 @@ namespace Connector_HTTP {
}
}
if (HTTP_R.url.find("Manifest") == std::string::npos){
//We have a non-manifest request, parse it.
Quality = HTTP_R.url.substr(HTTP_R.url.find("/Q(", 8) + 3);
Quality = Quality.substr(0, Quality.find(")"));
tempStr = HTTP_R.url.substr(HTTP_R.url.find(")/") + 2);
parseString = HTTP_R.url.substr(HTTP_R.url.find(")/") + 2);
wantsAudio = false;
wantsVideo = false;
if (tempStr[0] == 'A'){
if (parseString[0] == 'A'){
wantsAudio = true;
}
if (tempStr[0] == 'V'){
if (parseString[0] == 'V'){
wantsVideo = true;
}
tempStr = tempStr.substr(tempStr.find("(") + 1);
ReqFragment = atoll(tempStr.substr(0, tempStr.find(")")).c_str());
parseString = parseString.substr(parseString.find("(") + 1);
requestedTime = atoll(parseString.substr(0, parseString.find(")")).c_str());
if (Strm.metadata.isMember("live")){
int seekable = Strm.canSeekms(ReqFragment / 10000);
int seekable = Strm.canSeekms(requestedTime / 10000);
if (seekable == 0){
// iff the fragment in question is available, check if the next is available too
for (int i = 0; i < Strm.metadata["keytime"].size(); i++){
if (Strm.metadata["keytime"][i].asInt() >= (ReqFragment / 10000)){
if (Strm.metadata["keytime"][i].asInt() >= (requestedTime / 10000)){
if (i + 1 == Strm.metadata["keytime"].size()){
seekable = 1;
}
@ -182,7 +224,7 @@ namespace Connector_HTTP {
HTTP_S.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
conn.SendNow(HTTP_S.BuildResponse("412", "Fragment out of range"));
HTTP_R.Clean(); //clean for any possible next requests
std::cout << "Fragment @ " << ReqFragment / 10000 << "ms too old (" << Strm.metadata["keytime"][0u].asInt() << " - " << Strm.metadata["keytime"][Strm.metadata["keytime"].size() - 1].asInt() << " ms)" << std::endl;
std::cout << "Fragment @ " << requestedTime / 10000 << "ms too old (" << Strm.metadata["keytime"][0u].asInt() << " - " << Strm.metadata["keytime"][Strm.metadata["keytime"].size() - 1].asInt() << " ms)" << std::endl;
continue;
}
if (seekable > 0){
@ -190,47 +232,53 @@ namespace Connector_HTTP {
HTTP_S.SetBody("Proxy, re-request this in a second or two.\n");
conn.SendNow(HTTP_S.BuildResponse("208", "Ask again later"));
HTTP_R.Clean(); //clean for any possible next requests
std::cout << "Fragment @ " << ReqFragment / 10000 << "ms not available yet (" << Strm.metadata["keytime"][0u].asInt() << " - " << Strm.metadata["keytime"][Strm.metadata["keytime"].size() - 1].asInt() << " ms)" << std::endl;
std::cout << "Fragment @ " << requestedTime / 10000 << "ms not available yet (" << Strm.metadata["keytime"][0u].asInt() << " - " << Strm.metadata["keytime"][Strm.metadata["keytime"].size() - 1].asInt() << " ms)" << std::endl;
continue;
}
}
//Seek to the right place and send a play-once for a single fragment.
std::stringstream sstream;
sstream << "s " << (ReqFragment / 10000) << "\no \n";
sstream << "s " << (requestedTime / 10000) << "\no \n";
ss.SendNow(sstream.str().c_str());
}else{
//We have a request for a Manifest, generate and send it.
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = BuildManifest(streamname, Strm.metadata);
std::string manifest = smoothIndex(streamname, Strm.metadata);
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
}
ready4data = true;
HTTP_R.Clean(); //clean for any possible next requests
//Clean for any possible next requests
HTTP_R.Clean();
}
}else{
//Wait 1 second before checking for new data.
Util::sleep(1);
}
if (ready4data){
unsigned int now = Util::epoch();
if (now != lastStats){
//Send new stats.
lastStats = now;
ss.SendNow(conn.getStats("HTTP_Smooth").c_str());
}
if (ss.spool()){
while (Strm.parsePacket(ss.Received())){
if (Strm.lastType() == DTSC::PAUSEMARK){
if (FlashBufSize){
//Send the current buffer
if (dataSize){
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.SetBody("");
unsigned int myDuration;
//Wrap everything in mp4 boxes
MP4::MFHD mfhd_box;
for (int i = 0; i < Strm.metadata["keytime"].size(); i++){
if (Strm.metadata["keytime"][i].asInt() >= (ReqFragment / 10000)){
if (Strm.metadata["keytime"][i].asInt() >= (requestedTime / 10000)){
mfhd_box.setSequenceNumber(Strm.metadata["keynum"][i].asInt());
myDuration = Strm.metadata["keylen"][i].asInt() * 10000;
break;
@ -243,20 +291,20 @@ namespace Connector_HTTP {
tfhd_box.setDefaultSampleFlags(0x000000C0 | MP4::noIPicture | MP4::noDisposable | MP4::noKeySample);
MP4::TRUN trun_box;
//maybe reinsert dataOffset
trun_box.setFlags(MP4::trundataOffset | MP4::trunfirstSampleFlags | MP4::trunsampleDuration | MP4::trunsampleSize);
trun_box.setDataOffset(42);
trun_box.setFirstSampleFlags(0x00000040 | MP4::isIPicture | MP4::noDisposable | MP4::isKeySample);
for (int i = 0; i < FlashBuf.size(); i++){
for (int i = 0; i < dataBuffer.size(); i++){
MP4::trunSampleInformation trunSample;
trunSample.sampleSize = FlashBuf[i].size();
trunSample.sampleDuration = (((double)myDuration / FlashBuf.size()) * i) - (((double)myDuration / FlashBuf.size()) * (i - 1));
trunSample.sampleSize = dataBuffer[i].size();
trunSample.sampleDuration = (((double)myDuration / dataBuffer.size()) * i) - (((double)myDuration / dataBuffer.size()) * (i - 1));
trun_box.setSampleInformation(trunSample, i);
}
MP4::SDTP sdtp_box;
sdtp_box.setVersion(0);
sdtp_box.setValue(0x24, 4);
for (int i = 1; i < FlashBuf.size(); i++){
for (int i = 1; i < dataBuffer.size(); i++){
sdtp_box.setValue(0x14, 4 + i);
}
@ -265,14 +313,14 @@ namespace Connector_HTTP {
traf_box.setContent(trun_box, 1);
traf_box.setContent(sdtp_box, 2);
// if live, send fragref box if we can
//If the stream is live, we want to have a fragref box if possible
if (Strm.metadata.isMember("live")){
MP4::UUID_TrackFragmentReference fragref_box;
fragref_box.setVersion(1);
fragref_box.setFragmentCount(0);
int fragCount = 0;
for (int i = 0; i < Strm.metadata["keytime"].size(); i++){
if (Strm.metadata["keytime"][i].asInt() > (ReqFragment / 10000)){
if (Strm.metadata["keytime"][i].asInt() > (requestedTime / 10000)){
fragref_box.setTime(fragCount, Strm.metadata["keytime"][i].asInt() * 10000);
fragref_box.setDuration(fragCount, Strm.metadata["keylen"][i].asInt() * 10000);
fragref_box.setFragmentCount(++fragCount);
@ -285,32 +333,31 @@ namespace Connector_HTTP {
moof_box.setContent(mfhd_box, 0);
moof_box.setContent(traf_box, 1);
//setting tha offsets!
//Setting the correct offsets.
trun_box.setDataOffset(moof_box.boxedSize() + 8);
traf_box.setContent(trun_box, 1);
moof_box.setContent(traf_box, 1);
//std::cerr << "\t[encoded] = " << ((MP4::TRUN&)(((MP4::TRAF&)(moof_box.getContent(1))).getContent(1))).getDataOffset() << std::endl;
HTTP_S.SetHeader("Content-Length", FlashBufSize + 8 + moof_box.boxedSize()); //32+33+btstrp.size());
//Send the complete message
HTTP_S.SetHeader("Content-Length", dataSize + 8 + moof_box.boxedSize());
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(moof_box.asBox(), moof_box.boxedSize());
unsigned long size = htonl(FlashBufSize+8);
unsigned long size = htonl(dataSize + 8);
conn.SendNow((char*) &size, 4);
conn.SendNow("mdat", 4);
while (FlashBuf.size() > 0){
conn.SendNow(FlashBuf.front());
FlashBuf.pop_front();
while (dataBuffer.size() > 0){
conn.SendNow(dataBuffer.front());
dataBuffer.pop_front();
}
}
FlashBuf.clear();
FlashBufSize = 0;
dataBuffer.clear();
dataSize = 0;
}
if ((wantsAudio && Strm.lastType() == DTSC::AUDIO) || (wantsVideo && Strm.lastType() == DTSC::VIDEO)){
FlashBuf.push_back(Strm.lastData());
FlashBufSize += Strm.lastData().size();
//Select only the data that the client has requested.
dataBuffer.push_back(Strm.lastData());
dataSize += Strm.lastData().size();
}
}
}
@ -323,10 +370,11 @@ namespace Connector_HTTP {
ss.SendNow(conn.getStats("HTTP_Smooth").c_str());
ss.close();
return 0;
} //Connector_HTTP_Smooth main function
}//Smooth_Connector main function
} //Connector_HTTP_Smooth namespace
}//Connector_HTTP namespace
///\brief The standard process-spawning main function.
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935);
@ -342,7 +390,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid
pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_HTTP::Connector_HTTP_Dynamic(S);
return Connector_HTTP::smoothConnector(S);
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 5
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());

View file

@ -45,8 +45,10 @@ namespace Connector_RTMP {
int Connector_RTMP(Socket::Connection conn);
} //Connector_RTMP namespace;
/// Main Connector_RTMP function
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
///\brief Main Connector_RTMP function
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
Socket = conn;
Socket.setBlocking(false);
FLV::Tag tag, init_tag;
@ -179,7 +181,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
return 0;
} //Connector_RTMP
/// Tries to get and parse one RTMP chunk at a time.
///\brief Tries to get and parse one RTMP chunk at a time.
///\param inbuffer A buffer filled with chunk data.
void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){
//for DTSC conversion
static JSON::Value meta_out;
@ -597,6 +600,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
#endif
} //parseAMFCommand
///\brief The standard process-spawning main function.
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935);