diff --git a/src/conn_http.cpp b/src/conn_http.cpp index e1f570e5..10a05d3e 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -12,31 +12,103 @@ #include #include #include -#include "tinythread.h" +#include #include #include #include +#include +#include "tinythread.h" /// Holds everything unique to HTTP Connector. namespace Connector_HTTP{ + /// Class for keeping track of connections to connectors. + class ConnConn{ + public: + Socket::Connection * conn; ///< The socket of this connection + unsigned int lastuse; ///< Seconds since last use of this connection. + tthread::mutex in_use; ///< Mutex for this connection. + /// Constructor that sets the socket and lastuse to 0. + ConnConn(){ + conn = 0; + lastuse = 0; + }; + /// Constructor that sets lastuse to 0, but socket to s. + ConnConn(Socket::Connection * s){ + conn = s; + lastuse = 0; + }; + /// Destructor that deletes the socket if non-null. + ~ConnConn(){ + if (conn){ + conn->close(); + delete conn; + } + conn = 0; + }; + }; + + std::map connconn; ///< Connections to connectors std::set active_threads; ///< Holds currently active threads std::set done_threads; ///< Holds threads that are done and ready to be joined. tthread::mutex thread_mutex; ///< Mutex for adding/removing threads. + tthread::mutex conn_mutex; ///< Mutex for adding/removing connector connections. + tthread::mutex timeout_mutex; ///< Mutex for timeout thread. + tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors. + + void Timeout_Thread(void * n){ + n = 0;//prevent unused variable warning + tthread::lock_guard guard(timeout_mutex); + std::cout << "Started timeout thread" << std::endl; + while (true){ + { + tthread::lock_guard guard(conn_mutex); + if (connconn.empty()){ + std::cout << "No more connections" << std::endl; + return; + } + std::cout << "Currently " << connconn.size() << " active connections" << std::endl; + std::map::iterator it; + for (it = connconn.begin(); it != connconn.end(); it++){ + if (!it->second->conn->connected() || it->second->lastuse++ > 15){ + if (it->second->in_use.try_lock()){ + it->second->in_use.unlock(); + std::cout << "Murdered one" << std::endl; + delete it->second; + connconn.erase(it); + it = connconn.begin();//get a valid iterator + if (it == connconn.end()){return;} + } + } + } + conn_mutex.unlock(); + } + usleep(1000000);//sleep 1 second and re-check + } + } /// Handles requests without associated handler, displaying a nice friendly error message. void Handle_None(HTTP::Parser & H, Socket::Connection * conn){ H.Clean(); + H.SetHeader("Server", "mistserver/" PACKAGE_VERSION); H.SetBody("Unsupported Media Type

Unsupported Media Type

The server isn't quite sure what you wanted to receive from it."); conn->Send(H.BuildResponse("415", "Unsupported Media Type")); } + void Handle_Timeout(HTTP::Parser & H, Socket::Connection * conn){ + H.Clean(); + H.SetHeader("Server", "mistserver/" PACKAGE_VERSION); + H.SetBody("Gateway timeout

Gateway timeout

Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later."); + conn->Send(H.BuildResponse("504", "Gateway Timeout")); + } + /// Handles internal requests. void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){ if (H.url == "/crossdomain.xml"){ H.Clean(); H.SetHeader("Content-Type", "text/xml"); + H.SetHeader("Server", "mistserver/" PACKAGE_VERSION); H.SetBody(""); conn->Send(H.BuildResponse("200", "OK")); return; @@ -47,6 +119,7 @@ namespace Connector_HTTP{ JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist"); std::string response; H.Clean(); + H.SetHeader("Server", "mistserver/" PACKAGE_VERSION); H.SetHeader("Content-Type", "application/javascript"); response = "// Generating embed code for stream " + streamname + "\n\n"; if (ServConf["streams"].isMember(streamname)){ @@ -65,11 +138,105 @@ namespace Connector_HTTP{ Handle_None(H, conn);//anything else doesn't get handled } + /// Wrapper function for openssl MD5 implementation + std::string md5(std::string input){ + char tmp[3]; + std::string ret; + const unsigned char * res = MD5((const unsigned char*)input.c_str(), input.length(), 0); + for (int i = 0; i < 16; ++i){ + snprintf(tmp, 3, "%02x", res[i]); + ret += tmp; + } + return ret; + } + /// Handles requests without associated handler, displaying a nice friendly error message. void Handle_Through_Connector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){ + //create a unique ID based on a hash of the user agent and host, followed by the stream name and connector + std::string uid = md5(H.GetHeader("User-Agent")+conn->getHost())+"_"+H.GetVar("stream")+"_"+connector; + H.SetHeader("X-UID", uid);//add the UID to the headers before copying + std::string request = H.BuildRequest();//copy the request for later forwarding to the connector H.Clean(); - H.SetBody("Handled

"+connector+"

Handling as: "+connector+""); - conn->Send(H.BuildResponse("200", "OK")); + + //check if a connection exists, and if not create one + std::cout << "Creating connection" << std::endl; + conn_mutex.lock(); + if (!connconn.count(uid)){ + connconn[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_"+connector)); + connconn[uid]->conn->setBlocking(false);//do not block on spool() with no data + } + //start a new timeout thread, if neccesary + if (timeout_mutex.try_lock()){ + if (timeouter){ + timeouter->join(); + delete timeouter; + } + timeouter = new tthread::thread(Connector_HTTP::Timeout_Thread, 0); + timeout_mutex.unlock(); + } + conn_mutex.unlock(); + + std::cout << "Locking connection" << std::endl; + //lock the mutex for this connection, and handle the request + tthread::lock_guard guard(connconn[uid]->in_use); + //if the server connection is dead, handle as timeout. + if (!connconn.count(uid) || !connconn[uid]->conn->connected()){ + Handle_Timeout(H, conn); + return; + } + std::cout << "Forwarding connection" << std::endl; + //forward the original request + connconn[uid]->conn->Send(request); + connconn[uid]->lastuse = 0; + unsigned int timeout = 0; + //wait for a response + std::cout << "Waiting connection" << std::endl; + while (connconn.count(uid) && connconn[uid]->conn->connected()){ + if (connconn[uid]->conn->spool()){ + //check if the whole response was received + if (H.Read(connconn[uid]->conn->Received())){ + break;//continue down below this while loop + } + }else{ + //keep trying unless the timeout triggers + timeout++; + if (timeout > 50){ + Handle_Timeout(H, conn); + return; + } + usleep(100000); + } + } + if (!connconn.count(uid) || !connconn[uid]->conn->connected()){ + //failure, disconnect and sent error to user + std::cout << "Failure" << std::endl; + Handle_Timeout(H, conn); + return; + }else{ + //success, check type of response + if (H.GetHeader("Content-Length") != ""){ + //known length - simply re-send the request with added headers and continue + std::cout << "Known success" << std::endl; + H.SetHeader("X-UID", uid); + H.protocol = "HTTP/1.0"; + conn->Send(H.BuildResponse("200", "OK")); + }else{ + //unknown length + std::cout << "Unknown success" << std::endl; + H.SetHeader("X-UID", uid); + H.protocol = "HTTP/1.0"; + conn->Send(H.BuildResponse("200", "OK")); + //continue sending data from this socket and keep it permanently in use + while (connconn[uid]->conn->connected()){ + if (connconn[uid]->conn->spool()){ + //forward any and all incoming data directly without parsing + conn->Send(connconn[uid]->conn->Received()); + connconn[uid]->conn->Received().clear(); + } + } + } + } + std::cout << "Completing connection" << std::endl; } /// Returns the name of the HTTP connector the given request should be served by. @@ -79,29 +246,34 @@ namespace Connector_HTTP{ /// - dynamic (request fed from http_dynamic connector) /// - progressive (request fed from http_progressive connector) std::string getHTTPType(HTTP::Parser & H){ - if ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos)){return "dynamic";} - if (H.url.find("f4m") != std::string::npos){return "dynamic";} + if ((H.url.find("f4m") != std::string::npos) || ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos))){ + H.SetVar("stream", H.url.substr(1,H.url.find("/",1)-1)); + return "dynamic"; + } if (H.url.length() > 4){ std::string ext = H.url.substr(H.url.length() - 4, 4); - if (ext == ".flv"){return "progressive";} - if (ext == ".mp3"){return "progressive";} + if (ext == ".flv" || ext == ".mp3"){ + H.SetVar("stream", H.url.substr(1,H.url.length() - 5)); + return "progressive"; + } } if (H.url == "/crossdomain.xml"){return "internal";} if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";} return "none"; } - /// Function handling a single connection + /// Thread for handling a single HTTP connection void Handle_HTTP_Connection(void * pointer){ Socket::Connection * conn = (Socket::Connection *)pointer; conn->setBlocking(false);//do not block on conn.spool() when no data is available HTTP::Parser Client; while (conn->connected()){ if (conn->spool()){ + std::cout << "Data: " << conn->Received() << std::endl; if (Client.Read(conn->Received())){ std::string handler = getHTTPType(Client); #if DEBUG >= 4 - std::cout << "Received request: " << Client.url << " => " << handler << std::endl; + std::cout << "Received request: " << Client.url << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl; #endif if (handler == "none" || handler == "internal"){ if (handler == "internal"){ @@ -113,11 +285,14 @@ namespace Connector_HTTP{ Handle_Through_Connector(Client, conn, handler); } Client.Clean(); //clean for any possible next requests + std::cout << "Request handled" << std::endl; }else{ #if DEBUG >= 3 fprintf(stderr, "Could not parse the following:\n%s\n", conn->Received().c_str()); #endif } + }else{ + usleep(10000);//sleep 10ms } } //close and remove the connection @@ -147,12 +322,18 @@ int main(int argc, char ** argv){ if (!server_socket.connected()){return 1;} conf.activate(); + //start progressive and dynamic handlers from the same folder as this application + Util::Procs::Start("progressive", (std::string)(argv[0]) + "Progressive -n"); + Util::Procs::Start("dynamic", (std::string)(argv[0]) + "Dynamic -n"); + while (server_socket.connected() && conf.is_active){ Socket::Connection S = server_socket.accept(); if (S.connected()){//check if the new connection is valid + //lock the thread mutex and spawn a new thread for this connection Connector_HTTP::thread_mutex.lock(); tthread::thread * T = new tthread::thread(Connector_HTTP::Handle_HTTP_Connection, (void *)(new Socket::Connection(S))); Connector_HTTP::active_threads.insert(T); + //clean up any threads that may have finished while (!Connector_HTTP::done_threads.empty()){ T = *Connector_HTTP::done_threads.begin(); T->join(); @@ -163,5 +344,6 @@ int main(int argc, char ** argv){ } }//while connected and not requested to stop server_socket.close(); + Util::Procs::StopAll(); return 0; }//main diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index 2f1a4803..27055a6f 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -112,8 +112,7 @@ namespace Connector_HTTP{ std::cout << "Received request: " << HTTP_R.url << std::endl; #endif if (HTTP_R.url.find("f4m") == std::string::npos){ - Movie = HTTP_R.url.substr(1); - Movie = Movie.substr(0,Movie.find("/")); + Movie = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 ); Quality = Quality.substr(0, Quality.find("Seg")); temp = HTTP_R.url.find("Seg") + 3; @@ -125,8 +124,7 @@ namespace Connector_HTTP{ #endif Flash_RequestPending++; }else{ - Movie = HTTP_R.url.substr(1); - Movie = Movie.substr(0,Movie.find("/")); + Movie = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type","text/xml"); HTTP_S.SetHeader("Cache-Control","no-cache");