From e6ad89243932ad52cd0aad0eff22bf6da3fd22d7 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 24 Jul 2012 06:35:27 +0200 Subject: [PATCH] Fixed HTTP connectors to compile again, base HTTP proxy framework completed. --- src/Makefile.am | 2 +- src/conn_http.cpp | 299 +++++++++++++++------------------- src/conn_http_dynamic.cpp | 5 +- src/conn_http_progressive.cpp | 5 +- 4 files changed, 139 insertions(+), 172 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 5b3d22e7..393d13c1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,6 +6,6 @@ MistBuffer_SOURCES=buffer.cpp buffer_user.h buffer_user.cpp buffer_stream.h buff MistController_SOURCES=controller.cpp ../VERSION MistConnRAW_SOURCES=conn_raw.cpp ../VERSION MistConnRTMP_SOURCES=conn_rtmp.cpp ../VERSION -MistConnHTTP_SOURCES=conn_http.cpp ../VERSION +MistConnHTTP_SOURCES=conn_http.cpp tinythread.cpp tinythread.h ../VERSION MistConnHTTPProgressive_SOURCES=conn_http_progressive.cpp ../VERSION MistConnHTTPDynamic_SOURCES=conn_http_dynamic.cpp ../VERSION diff --git a/src/conn_http.cpp b/src/conn_http.cpp index d1abfc39..e1f570e5 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -11,192 +11,157 @@ #include #include #include +#include +#include "tinythread.h" #include #include -#include -#include -#include -#include -#include -#include +#include /// Holds everything unique to HTTP Connector. namespace Connector_HTTP{ - /// Main function for Connector_HTTP - int Handle_Connection(Socket::Connection conn){ - conn.setBlocking(false);//do not block on conn.spool() when no data is available - while (conn.connected()){ - //only parse input if available or not yet init'ed - if (conn.spool()){ - if (HTTP_R.Read(conn.Received())){ - handler = HANDLER_PROGRESSIVE; + std::set active_threads; ///< Holds currently active threads + std::set done_threads; ///< Holds threads that are done and ready to be joined. + tthread::mutex thread_mutex; ///< Mutex for adding/removing threads. + + /// Handles requests without associated handler, displaying a nice friendly error message. + void Handle_None(HTTP::Parser & H, Socket::Connection * conn){ + H.Clean(); + H.SetBody("Unsupported Media Type

Unsupported Media Type

The server isn't quite sure what you wanted to receive from it."); + conn->Send(H.BuildResponse("415", "Unsupported Media Type")); + } + + /// Handles internal requests. + void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){ + + if (H.url == "/crossdomain.xml"){ + H.Clean(); + H.SetHeader("Content-Type", "text/xml"); + H.SetBody(""); + conn->Send(H.BuildResponse("200", "OK")); + return; + }//crossdomain.xml + + if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){ + std::string streamname = H.url.substr(7, H.url.length() - 10); + JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist"); + std::string response; + H.Clean(); + H.SetHeader("Content-Type", "application/javascript"); + response = "// Generating embed code for stream " + streamname + "\n\n"; + if (ServConf["streams"].isMember(streamname)){ + std::string streamurl = "http://" + H.GetHeader("Host") + "/" + streamname + ".flv"; + response += "// Stream URL: " + streamurl + "\n\n"; + response += "document.write('');\n"; + }else{ + response += "// Stream not available at this server.\nalert(\"This stream is currently not available at this server.\\\\nPlease try again later!\");"; + } + response += ""; + H.SetBody(response); + conn->Send(H.BuildResponse("200", "OK")); + return; + }//embed code generator + + Handle_None(H, conn);//anything else doesn't get handled + } + + /// Handles requests without associated handler, displaying a nice friendly error message. + void Handle_Through_Connector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){ + H.Clean(); + H.SetBody("Handled

"+connector+"

Handling as: "+connector+""); + conn->Send(H.BuildResponse("200", "OK")); + } + + /// Returns the name of the HTTP connector the given request should be served by. + /// Can currently return: + /// - none (request not supported) + /// - internal (request fed from information internal to this connector) + /// - dynamic (request fed from http_dynamic connector) + /// - progressive (request fed from http_progressive connector) + std::string getHTTPType(HTTP::Parser & H){ + if ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos)){return "dynamic";} + if (H.url.find("f4m") != std::string::npos){return "dynamic";} + if (H.url.length() > 4){ + std::string ext = H.url.substr(H.url.length() - 4, 4); + if (ext == ".flv"){return "progressive";} + if (ext == ".mp3"){return "progressive";} + } + if (H.url == "/crossdomain.xml"){return "internal";} + if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";} + return "none"; + } + + /// Function handling a single connection + void Handle_HTTP_Connection(void * pointer){ + Socket::Connection * conn = (Socket::Connection *)pointer; + conn->setBlocking(false);//do not block on conn.spool() when no data is available + HTTP::Parser Client; + while (conn->connected()){ + if (conn->spool()){ + if (Client.Read(conn->Received())){ + std::string handler = getHTTPType(Client); #if DEBUG >= 4 - std::cout << "Received request: " << HTTP_R.url << std::endl; + std::cout << "Received request: " << Client.url << " => " << handler << std::endl; #endif - if ((HTTP_R.url.find("Seg") != std::string::npos) && (HTTP_R.url.find("Frag") != std::string::npos)){handler = HANDLER_FLASH;} - if (HTTP_R.url.find("f4m") != std::string::npos){handler = HANDLER_FLASH;} - if (HTTP_R.url == "/crossdomain.xml"){ - handler = HANDLER_NONE; - HTTP_S.Clean(); - HTTP_S.SetHeader("Content-Type", "text/xml"); - HTTP_S.SetBody(""); - conn.Send(HTTP_S.BuildResponse("200", "OK")); - #if DEBUG >= 3 - printf("Sending crossdomain.xml file\n"); - #endif - } - if (HTTP_R.url.length() > 10 && HTTP_R.url.substr(0, 7) == "/embed_" && HTTP_R.url.substr(HTTP_R.url.length() - 3, 3) == ".js"){ - streamname = HTTP_R.url.substr(7, HTTP_R.url.length() - 10); - JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist"); - std::string response; - handler = HANDLER_NONE; - HTTP_S.Clean(); - HTTP_S.SetHeader("Content-Type", "application/javascript"); - response = "// Generating embed code for stream " + streamname + "\n\n"; - if (ServConf["streams"].isMember(streamname)){ - std::string streamurl = "http://" + HTTP_S.GetHeader("Host") + "/" + streamname + ".flv"; - response += "// Stream URL: " + streamurl + "\n\n"; - response += "document.write('');\n"; + if (handler == "none" || handler == "internal"){ + if (handler == "internal"){ + Handle_Internal(Client, conn); }else{ - response += "// Stream not available at this server.\nalert(\"This stream is currently not available at this server.\\\\nPlease try again later!\");"; + Handle_None(Client, conn); } - response += ""; - HTTP_S.SetBody(response); - conn.Send(HTTP_S.BuildResponse("200", "OK")); - #if DEBUG >= 3 - printf("Sending embed code for %s\n", streamname.c_str()); - #endif + }else{ + Handle_Through_Connector(Client, conn, handler); } - if (handler == HANDLER_FLASH){ - if (HTTP_R.url.find("f4m") == std::string::npos){ - Movie = HTTP_R.url.substr(1); - Movie = Movie.substr(0,Movie.find("/")); - Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 ); - Quality = Quality.substr(0, Quality.find("Seg")); - temp = HTTP_R.url.find("Seg") + 3; - Segment = atoi( HTTP_R.url.substr(temp,HTTP_R.url.find("-",temp)-temp).c_str()); - temp = HTTP_R.url.find("Frag") + 4; - ReqFragment = atoi( HTTP_R.url.substr(temp).c_str() ); - #if DEBUG >= 4 - printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment); - #endif - Flash_RequestPending++; - }else{ - Movie = HTTP_R.url.substr(1); - Movie = Movie.substr(0,Movie.find("/")); - HTTP_S.Clean(); - HTTP_S.SetHeader("Content-Type","text/xml"); - HTTP_S.SetHeader("Cache-Control","no-cache"); - std::string manifest = BuildManifest(Movie); - HTTP_S.SetBody(manifest); - conn.Send(HTTP_S.BuildResponse("200", "OK")); - #if DEBUG >= 3 - printf("Sent manifest\n"); - #endif - } - ready4data = true; - }//FLASH handler - if (handler == HANDLER_PROGRESSIVE){ - //we assume the URL is the stream name with a 3 letter extension - std::string extension = HTTP_R.url.substr(HTTP_R.url.size()-4); - Movie = HTTP_R.url.substr(0, HTTP_R.url.size()-4);//strip the extension - /// \todo VoD streams will need support for position reading from the URL parameters - ready4data = true; - }//PROGRESSIVE handler - if (Movie != "" && Movie != streamname){ - #if DEBUG >= 4 - printf("Buffer switch detected (%s -> %s)! (Re)connecting buffer...\n", streamname.c_str(), Movie.c_str()); - #endif - streamname = Movie; - inited = false; - ss.close(); - if (inited && handler == HANDLER_PROGRESSIVE){ - #if DEBUG >= 4 - printf("Progressive-mode reconnect impossible - disconnecting.\n"); - #endif - conn.close(); - ready4data = false; - } - } - HTTP_R.Clean(); //clean for any possible next requests + Client.Clean(); //clean for any possible next requests }else{ #if DEBUG >= 3 - fprintf(stderr, "Could not parse the following:\n%s\n", conn.Received().c_str()); + fprintf(stderr, "Could not parse the following:\n%s\n", conn->Received().c_str()); #endif } } - if (ready4data){ - if (!inited){ - //we are ready, connect the socket! - ss = Socket::getStream(streamname); - if (!ss.connected()){ - #if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); - #endif - ss.close(); - HTTP_S.Clean(); - HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); - conn.Send(HTTP_S.BuildResponse("404", "Not found")); - ready4data = false; - continue; - } - #if DEBUG >= 3 - fprintf(stderr, "Everything connected, starting to send video data...\n"); - #endif - inited = true; - } - if ((Flash_RequestPending > 0) && !Flash_FragBuffer.empty()){ - HTTP_S.Clean(); - HTTP_S.SetHeader("Content-Type","video/mp4"); - HTTP_S.SetBody(MP4::mdatFold(Flash_FragBuffer.front())); - Flash_FragBuffer.pop(); - conn.Send(HTTP_S.BuildResponse("200", "OK")); - Flash_RequestPending--; - #if DEBUG >= 3 - fprintf(stderr, "Sending a video fragment. %i left in buffer, %i requested\n", (int)Flash_FragBuffer.size(), Flash_RequestPending); - #endif - } - unsigned int now = time(0); - if (now != lastStats){ - lastStats = now; - ss.Send("S "+conn.getStats("HTTP")); - } - if (ss.spool() || ss.Received() != ""){ - if (Strm.parsePacket(ss.Received())){ - tag.DTSCLoader(Strm); - if (handler == HANDLER_FLASH){ - FlashDynamic(tag, Strm); - } - if (handler == HANDLER_PROGRESSIVE){ - Progressive(tag, HTTP_S, conn, Strm); - } - } - } - } } - conn.close(); - ss.close(); - #if DEBUG >= 1 - if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} - fprintf(stderr, "User %i disconnected.\n", conn.getSocket()); - if (inited){ - fprintf(stderr, "Status was: inited\n"); - }else{ - if (ready4data){ - fprintf(stderr, "Status was: ready4data\n"); - }else{ - fprintf(stderr, "Status was: connected\n"); + //close and remove the connection + conn->close(); + delete conn; + //remove this thread from active_threads and add it to done_threads. + thread_mutex.lock(); + for (std::set::iterator it = active_threads.begin(); it != active_threads.end(); it++){ + if ((*it)->get_id() == tthread::this_thread::get_id()){ + tthread::thread * T = (*it); + active_threads.erase(T); + done_threads.insert(T); + break; } } - #endif - return 0; - }//Connector_HTTP main function + thread_mutex.unlock(); + } };//Connector_HTTP namespace -// Load main server setup file, default port 8080, handler is Connector_HTTP::Connector_HTTP -#define DEFAULT_PORT 8080 -#define MAINHANDLER Connector_HTTP::Connector_HTTP -#define CONFIGSECT HTTP -#include "server_setup.h" + +int main(int argc, char ** argv){ + Util::Config conf(argv[0], PACKAGE_VERSION); + conf.addConnectorOptions(8080); + conf.parseArgs(argc, argv); + Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface")); + if (!server_socket.connected()){return 1;} + conf.activate(); + + while (server_socket.connected() && conf.is_active){ + Socket::Connection S = server_socket.accept(); + if (S.connected()){//check if the new connection is valid + Connector_HTTP::thread_mutex.lock(); + tthread::thread * T = new tthread::thread(Connector_HTTP::Handle_HTTP_Connection, (void *)(new Socket::Connection(S))); + Connector_HTTP::active_threads.insert(T); + while (!Connector_HTTP::done_threads.empty()){ + T = *Connector_HTTP::done_threads.begin(); + T->join(); + Connector_HTTP::done_threads.erase(T); + delete T; + } + Connector_HTTP::thread_mutex.unlock(); + } + }//while connected and not requested to stop + server_socket.close(); + return 0; +}//main diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index b5ce016b..2f1a4803 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -19,9 +19,10 @@ #include #include #include +#include /// Holds everything unique to HTTP Dynamic Connector. -namespace Connector_HTTP_Dynamic{ +namespace Connector_HTTP{ /// Returns AMF-format metadata std::string GetMetaData( ) { @@ -242,7 +243,7 @@ int main(int argc, char ** argv){ if (S.connected()){//check if the new connection is valid pid_t myid = fork(); if (myid == 0){//if new child, start MAINHANDLER - return Connector_RTMP::Connector_RTMP(S); + return Connector_HTTP::Connector_HTTP_Dynamic(S); }else{//otherwise, do nothing or output debugging text #if DEBUG >= 3 fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket()); diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index 403e768f..56f1ac06 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -16,9 +16,10 @@ #include #include #include +#include /// Holds everything unique to HTTP Progressive Connector. -namespace Connector_HTTP_Progressive{ +namespace Connector_HTTP{ /// Main function for Connector_HTTP_Progressive int Connector_HTTP_Progressive(Socket::Connection conn){ @@ -152,7 +153,7 @@ int main(int argc, char ** argv){ if (S.connected()){//check if the new connection is valid pid_t myid = fork(); if (myid == 0){//if new child, start MAINHANDLER - return Connector_RTMP::Connector_RTMP(S); + return Connector_HTTP::Connector_HTTP_Progressive(S); }else{//otherwise, do nothing or output debugging text #if DEBUG >= 3 fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());