Fixed HTTP connectors to compile again, base HTTP proxy framework completed.

This commit is contained in:
Thulinma 2012-07-24 06:35:27 +02:00
parent ffff1f9577
commit e6ad892439
4 changed files with 139 additions and 172 deletions

View file

@ -6,6 +6,6 @@ MistBuffer_SOURCES=buffer.cpp buffer_user.h buffer_user.cpp buffer_stream.h buff
MistController_SOURCES=controller.cpp ../VERSION
MistConnRAW_SOURCES=conn_raw.cpp ../VERSION
MistConnRTMP_SOURCES=conn_rtmp.cpp ../VERSION
MistConnHTTP_SOURCES=conn_http.cpp ../VERSION
MistConnHTTP_SOURCES=conn_http.cpp tinythread.cpp tinythread.h ../VERSION
MistConnHTTPProgressive_SOURCES=conn_http_progressive.cpp ../VERSION
MistConnHTTPDynamic_SOURCES=conn_http_dynamic.cpp ../VERSION

View file

@ -11,192 +11,157 @@
#include <sys/wait.h>
#include <getopt.h>
#include <ctime>
#include <set>
#include "tinythread.h"
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <mist/json.h>
#include <mist/dtsc.h>
#include <mist/flv_tag.h>
#include <mist/base64.h>
#include <mist/amf.h>
#include <mist/mp4.h>
#include <mist/config.h>
/// Holds everything unique to HTTP Connector.
namespace Connector_HTTP{
/// Main function for Connector_HTTP
int Handle_Connection(Socket::Connection conn){
conn.setBlocking(false);//do not block on conn.spool() when no data is available
while (conn.connected()){
//only parse input if available or not yet init'ed
if (conn.spool()){
if (HTTP_R.Read(conn.Received())){
handler = HANDLER_PROGRESSIVE;
std::set<tthread::thread *> active_threads; ///< Holds currently active threads
std::set<tthread::thread *> done_threads; ///< Holds threads that are done and ready to be joined.
tthread::mutex thread_mutex; ///< Mutex for adding/removing threads.
/// Handles requests without associated handler, displaying a nice friendly error message.
void Handle_None(HTTP::Parser & H, Socket::Connection * conn){
H.Clean();
H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
conn->Send(H.BuildResponse("415", "Unsupported Media Type"));
}
/// Handles internal requests.
void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){
if (H.url == "/crossdomain.xml"){
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetBody("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
conn->Send(H.BuildResponse("200", "OK"));
return;
}//crossdomain.xml
if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){
std::string streamname = H.url.substr(7, H.url.length() - 10);
JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist");
std::string response;
H.Clean();
H.SetHeader("Content-Type", "application/javascript");
response = "// Generating embed code for stream " + streamname + "\n\n";
if (ServConf["streams"].isMember(streamname)){
std::string streamurl = "http://" + H.GetHeader("Host") + "/" + streamname + ".flv";
response += "// Stream URL: " + streamurl + "\n\n";
response += "document.write('<object width=\"600\" height=\"409\"><param name=\"movie\" value=\"http://fpdownload.adobe.com/strobe/FlashMediaPlayback.swf\"></param><param name=\"flashvars\" value=\"src="+HTTP::Parser::urlencode(streamurl)+"&controlBarMode=floating\"></param><param name=\"allowFullScreen\" value=\"true\"></param><param name=\"allowscriptaccess\" value=\"always\"></param><embed src=\"http://fpdownload.adobe.com/strobe/FlashMediaPlayback.swf\" type=\"application/x-shockwave-flash\" allowscriptaccess=\"always\" allowfullscreen=\"true\" width=\"600\" height=\"409\" flashvars=\"src="+HTTP::Parser::urlencode(streamurl)+"&controlBarMode=floating\"></embed></object>');\n";
}else{
response += "// Stream not available at this server.\nalert(\"This stream is currently not available at this server.\\\\nPlease try again later!\");";
}
response += "";
H.SetBody(response);
conn->Send(H.BuildResponse("200", "OK"));
return;
}//embed code generator
Handle_None(H, conn);//anything else doesn't get handled
}
/// Handles requests without associated handler, displaying a nice friendly error message.
void Handle_Through_Connector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){
H.Clean();
H.SetBody("<!DOCTYPE html><html><head><title>Handled</title></head><body><h1>"+connector+"</h1>Handling as: "+connector+"</body></html>");
conn->Send(H.BuildResponse("200", "OK"));
}
/// Returns the name of the HTTP connector the given request should be served by.
/// Can currently return:
/// - none (request not supported)
/// - internal (request fed from information internal to this connector)
/// - dynamic (request fed from http_dynamic connector)
/// - progressive (request fed from http_progressive connector)
std::string getHTTPType(HTTP::Parser & H){
if ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos)){return "dynamic";}
if (H.url.find("f4m") != std::string::npos){return "dynamic";}
if (H.url.length() > 4){
std::string ext = H.url.substr(H.url.length() - 4, 4);
if (ext == ".flv"){return "progressive";}
if (ext == ".mp3"){return "progressive";}
}
if (H.url == "/crossdomain.xml"){return "internal";}
if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";}
return "none";
}
/// Function handling a single connection
void Handle_HTTP_Connection(void * pointer){
Socket::Connection * conn = (Socket::Connection *)pointer;
conn->setBlocking(false);//do not block on conn.spool() when no data is available
HTTP::Parser Client;
while (conn->connected()){
if (conn->spool()){
if (Client.Read(conn->Received())){
std::string handler = getHTTPType(Client);
#if DEBUG >= 4
std::cout << "Received request: " << HTTP_R.url << std::endl;
std::cout << "Received request: " << Client.url << " => " << handler << std::endl;
#endif
if ((HTTP_R.url.find("Seg") != std::string::npos) && (HTTP_R.url.find("Frag") != std::string::npos)){handler = HANDLER_FLASH;}
if (HTTP_R.url.find("f4m") != std::string::npos){handler = HANDLER_FLASH;}
if (HTTP_R.url == "/crossdomain.xml"){
handler = HANDLER_NONE;
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetBody("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
conn.Send(HTTP_S.BuildResponse("200", "OK"));
#if DEBUG >= 3
printf("Sending crossdomain.xml file\n");
#endif
}
if (HTTP_R.url.length() > 10 && HTTP_R.url.substr(0, 7) == "/embed_" && HTTP_R.url.substr(HTTP_R.url.length() - 3, 3) == ".js"){
streamname = HTTP_R.url.substr(7, HTTP_R.url.length() - 10);
JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist");
std::string response;
handler = HANDLER_NONE;
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "application/javascript");
response = "// Generating embed code for stream " + streamname + "\n\n";
if (ServConf["streams"].isMember(streamname)){
std::string streamurl = "http://" + HTTP_S.GetHeader("Host") + "/" + streamname + ".flv";
response += "// Stream URL: " + streamurl + "\n\n";
response += "document.write('<object width=\"600\" height=\"409\"><param name=\"movie\" value=\"http://fpdownload.adobe.com/strobe/FlashMediaPlayback.swf\"></param><param name=\"flashvars\" value=\"src="+HTTP::Parser::urlencode(streamurl)+"&controlBarMode=floating\"></param><param name=\"allowFullScreen\" value=\"true\"></param><param name=\"allowscriptaccess\" value=\"always\"></param><embed src=\"http://fpdownload.adobe.com/strobe/FlashMediaPlayback.swf\" type=\"application/x-shockwave-flash\" allowscriptaccess=\"always\" allowfullscreen=\"true\" width=\"600\" height=\"409\" flashvars=\"src="+HTTP::Parser::urlencode(streamurl)+"&controlBarMode=floating\"></embed></object>');\n";
if (handler == "none" || handler == "internal"){
if (handler == "internal"){
Handle_Internal(Client, conn);
}else{
response += "// Stream not available at this server.\nalert(\"This stream is currently not available at this server.\\\\nPlease try again later!\");";
Handle_None(Client, conn);
}
response += "";
HTTP_S.SetBody(response);
conn.Send(HTTP_S.BuildResponse("200", "OK"));
#if DEBUG >= 3
printf("Sending embed code for %s\n", streamname.c_str());
#endif
}else{
Handle_Through_Connector(Client, conn, handler);
}
if (handler == HANDLER_FLASH){
if (HTTP_R.url.find("f4m") == std::string::npos){
Movie = HTTP_R.url.substr(1);
Movie = Movie.substr(0,Movie.find("/"));
Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 );
Quality = Quality.substr(0, Quality.find("Seg"));
temp = HTTP_R.url.find("Seg") + 3;
Segment = atoi( HTTP_R.url.substr(temp,HTTP_R.url.find("-",temp)-temp).c_str());
temp = HTTP_R.url.find("Frag") + 4;
ReqFragment = atoi( HTTP_R.url.substr(temp).c_str() );
#if DEBUG >= 4
printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment);
#endif
Flash_RequestPending++;
}else{
Movie = HTTP_R.url.substr(1);
Movie = Movie.substr(0,Movie.find("/"));
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type","text/xml");
HTTP_S.SetHeader("Cache-Control","no-cache");
std::string manifest = BuildManifest(Movie);
HTTP_S.SetBody(manifest);
conn.Send(HTTP_S.BuildResponse("200", "OK"));
#if DEBUG >= 3
printf("Sent manifest\n");
#endif
}
ready4data = true;
}//FLASH handler
if (handler == HANDLER_PROGRESSIVE){
//we assume the URL is the stream name with a 3 letter extension
std::string extension = HTTP_R.url.substr(HTTP_R.url.size()-4);
Movie = HTTP_R.url.substr(0, HTTP_R.url.size()-4);//strip the extension
/// \todo VoD streams will need support for position reading from the URL parameters
ready4data = true;
}//PROGRESSIVE handler
if (Movie != "" && Movie != streamname){
#if DEBUG >= 4
printf("Buffer switch detected (%s -> %s)! (Re)connecting buffer...\n", streamname.c_str(), Movie.c_str());
#endif
streamname = Movie;
inited = false;
ss.close();
if (inited && handler == HANDLER_PROGRESSIVE){
#if DEBUG >= 4
printf("Progressive-mode reconnect impossible - disconnecting.\n");
#endif
conn.close();
ready4data = false;
}
}
HTTP_R.Clean(); //clean for any possible next requests
Client.Clean(); //clean for any possible next requests
}else{
#if DEBUG >= 3
fprintf(stderr, "Could not parse the following:\n%s\n", conn.Received().c_str());
fprintf(stderr, "Could not parse the following:\n%s\n", conn->Received().c_str());
#endif
}
}
if (ready4data){
if (!inited){
//we are ready, connect the socket!
ss = Socket::getStream(streamname);
if (!ss.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
ss.close();
HTTP_S.Clean();
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
conn.Send(HTTP_S.BuildResponse("404", "Not found"));
ready4data = false;
continue;
}
#if DEBUG >= 3
fprintf(stderr, "Everything connected, starting to send video data...\n");
#endif
inited = true;
}
if ((Flash_RequestPending > 0) && !Flash_FragBuffer.empty()){
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type","video/mp4");
HTTP_S.SetBody(MP4::mdatFold(Flash_FragBuffer.front()));
Flash_FragBuffer.pop();
conn.Send(HTTP_S.BuildResponse("200", "OK"));
Flash_RequestPending--;
#if DEBUG >= 3
fprintf(stderr, "Sending a video fragment. %i left in buffer, %i requested\n", (int)Flash_FragBuffer.size(), Flash_RequestPending);
#endif
}
unsigned int now = time(0);
if (now != lastStats){
lastStats = now;
ss.Send("S "+conn.getStats("HTTP"));
}
if (ss.spool() || ss.Received() != ""){
if (Strm.parsePacket(ss.Received())){
tag.DTSCLoader(Strm);
if (handler == HANDLER_FLASH){
FlashDynamic(tag, Strm);
}
if (handler == HANDLER_PROGRESSIVE){
Progressive(tag, HTTP_S, conn, Strm);
}
}
}
}
}
conn.close();
ss.close();
#if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){
fprintf(stderr, "Status was: inited\n");
}else{
if (ready4data){
fprintf(stderr, "Status was: ready4data\n");
}else{
fprintf(stderr, "Status was: connected\n");
//close and remove the connection
conn->close();
delete conn;
//remove this thread from active_threads and add it to done_threads.
thread_mutex.lock();
for (std::set<tthread::thread *>::iterator it = active_threads.begin(); it != active_threads.end(); it++){
if ((*it)->get_id() == tthread::this_thread::get_id()){
tthread::thread * T = (*it);
active_threads.erase(T);
done_threads.insert(T);
break;
}
}
#endif
return 0;
}//Connector_HTTP main function
thread_mutex.unlock();
}
};//Connector_HTTP namespace
// Load main server setup file, default port 8080, handler is Connector_HTTP::Connector_HTTP
#define DEFAULT_PORT 8080
#define MAINHANDLER Connector_HTTP::Connector_HTTP
#define CONFIGSECT HTTP
#include "server_setup.h"
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(8080);
conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"));
if (!server_socket.connected()){return 1;}
conf.activate();
while (server_socket.connected() && conf.is_active){
Socket::Connection S = server_socket.accept();
if (S.connected()){//check if the new connection is valid
Connector_HTTP::thread_mutex.lock();
tthread::thread * T = new tthread::thread(Connector_HTTP::Handle_HTTP_Connection, (void *)(new Socket::Connection(S)));
Connector_HTTP::active_threads.insert(T);
while (!Connector_HTTP::done_threads.empty()){
T = *Connector_HTTP::done_threads.begin();
T->join();
Connector_HTTP::done_threads.erase(T);
delete T;
}
Connector_HTTP::thread_mutex.unlock();
}
}//while connected and not requested to stop
server_socket.close();
return 0;
}//main

View file

@ -19,9 +19,10 @@
#include <mist/base64.h>
#include <mist/amf.h>
#include <mist/mp4.h>
#include <mist/config.h>
/// Holds everything unique to HTTP Dynamic Connector.
namespace Connector_HTTP_Dynamic{
namespace Connector_HTTP{
/// Returns AMF-format metadata
std::string GetMetaData( ) {
@ -242,7 +243,7 @@ int main(int argc, char ** argv){
if (S.connected()){//check if the new connection is valid
pid_t myid = fork();
if (myid == 0){//if new child, start MAINHANDLER
return Connector_RTMP::Connector_RTMP(S);
return Connector_HTTP::Connector_HTTP_Dynamic(S);
}else{//otherwise, do nothing or output debugging text
#if DEBUG >= 3
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());

View file

@ -16,9 +16,10 @@
#include <mist/dtsc.h>
#include <mist/flv_tag.h>
#include <mist/amf.h>
#include <mist/config.h>
/// Holds everything unique to HTTP Progressive Connector.
namespace Connector_HTTP_Progressive{
namespace Connector_HTTP{
/// Main function for Connector_HTTP_Progressive
int Connector_HTTP_Progressive(Socket::Connection conn){
@ -152,7 +153,7 @@ int main(int argc, char ** argv){
if (S.connected()){//check if the new connection is valid
pid_t myid = fork();
if (myid == 0){//if new child, start MAINHANDLER
return Connector_RTMP::Connector_RTMP(S);
return Connector_HTTP::Connector_HTTP_Progressive(S);
}else{//otherwise, do nothing or output debugging text
#if DEBUG >= 3
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());