10h, closes #45 - New-style HTTP handling finished. Needs testing. :-)
This commit is contained in:
parent
e6ad892439
commit
a04c8b1821
2 changed files with 193 additions and 13 deletions
|
@ -12,31 +12,103 @@
|
|||
#include <getopt.h>
|
||||
#include <ctime>
|
||||
#include <set>
|
||||
#include "tinythread.h"
|
||||
#include <openssl/md5.h>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/http_parser.h>
|
||||
#include <mist/config.h>
|
||||
#include <mist/procs.h>
|
||||
#include "tinythread.h"
|
||||
|
||||
/// Holds everything unique to HTTP Connector.
|
||||
namespace Connector_HTTP{
|
||||
|
||||
/// Class for keeping track of connections to connectors.
|
||||
class ConnConn{
|
||||
public:
|
||||
Socket::Connection * conn; ///< The socket of this connection
|
||||
unsigned int lastuse; ///< Seconds since last use of this connection.
|
||||
tthread::mutex in_use; ///< Mutex for this connection.
|
||||
/// Constructor that sets the socket and lastuse to 0.
|
||||
ConnConn(){
|
||||
conn = 0;
|
||||
lastuse = 0;
|
||||
};
|
||||
/// Constructor that sets lastuse to 0, but socket to s.
|
||||
ConnConn(Socket::Connection * s){
|
||||
conn = s;
|
||||
lastuse = 0;
|
||||
};
|
||||
/// Destructor that deletes the socket if non-null.
|
||||
~ConnConn(){
|
||||
if (conn){
|
||||
conn->close();
|
||||
delete conn;
|
||||
}
|
||||
conn = 0;
|
||||
};
|
||||
};
|
||||
|
||||
std::map<std::string, ConnConn *> connconn; ///< Connections to connectors
|
||||
std::set<tthread::thread *> active_threads; ///< Holds currently active threads
|
||||
std::set<tthread::thread *> done_threads; ///< Holds threads that are done and ready to be joined.
|
||||
tthread::mutex thread_mutex; ///< Mutex for adding/removing threads.
|
||||
tthread::mutex conn_mutex; ///< Mutex for adding/removing connector connections.
|
||||
tthread::mutex timeout_mutex; ///< Mutex for timeout thread.
|
||||
tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors.
|
||||
|
||||
void Timeout_Thread(void * n){
|
||||
n = 0;//prevent unused variable warning
|
||||
tthread::lock_guard<tthread::mutex> guard(timeout_mutex);
|
||||
std::cout << "Started timeout thread" << std::endl;
|
||||
while (true){
|
||||
{
|
||||
tthread::lock_guard<tthread::mutex> guard(conn_mutex);
|
||||
if (connconn.empty()){
|
||||
std::cout << "No more connections" << std::endl;
|
||||
return;
|
||||
}
|
||||
std::cout << "Currently " << connconn.size() << " active connections" << std::endl;
|
||||
std::map<std::string, ConnConn*>::iterator it;
|
||||
for (it = connconn.begin(); it != connconn.end(); it++){
|
||||
if (!it->second->conn->connected() || it->second->lastuse++ > 15){
|
||||
if (it->second->in_use.try_lock()){
|
||||
it->second->in_use.unlock();
|
||||
std::cout << "Murdered one" << std::endl;
|
||||
delete it->second;
|
||||
connconn.erase(it);
|
||||
it = connconn.begin();//get a valid iterator
|
||||
if (it == connconn.end()){return;}
|
||||
}
|
||||
}
|
||||
}
|
||||
conn_mutex.unlock();
|
||||
}
|
||||
usleep(1000000);//sleep 1 second and re-check
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles requests without associated handler, displaying a nice friendly error message.
|
||||
void Handle_None(HTTP::Parser & H, Socket::Connection * conn){
|
||||
H.Clean();
|
||||
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
|
||||
H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
|
||||
conn->Send(H.BuildResponse("415", "Unsupported Media Type"));
|
||||
}
|
||||
|
||||
void Handle_Timeout(HTTP::Parser & H, Socket::Connection * conn){
|
||||
H.Clean();
|
||||
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
|
||||
H.SetBody("<!DOCTYPE html><html><head><title>Gateway timeout</title></head><body><h1>Gateway timeout</h1>Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later.</body></html>");
|
||||
conn->Send(H.BuildResponse("504", "Gateway Timeout"));
|
||||
}
|
||||
|
||||
/// Handles internal requests.
|
||||
void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){
|
||||
|
||||
if (H.url == "/crossdomain.xml"){
|
||||
H.Clean();
|
||||
H.SetHeader("Content-Type", "text/xml");
|
||||
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
|
||||
H.SetBody("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
|
||||
conn->Send(H.BuildResponse("200", "OK"));
|
||||
return;
|
||||
|
@ -47,6 +119,7 @@ namespace Connector_HTTP{
|
|||
JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist");
|
||||
std::string response;
|
||||
H.Clean();
|
||||
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
|
||||
H.SetHeader("Content-Type", "application/javascript");
|
||||
response = "// Generating embed code for stream " + streamname + "\n\n";
|
||||
if (ServConf["streams"].isMember(streamname)){
|
||||
|
@ -65,11 +138,105 @@ namespace Connector_HTTP{
|
|||
Handle_None(H, conn);//anything else doesn't get handled
|
||||
}
|
||||
|
||||
/// Wrapper function for openssl MD5 implementation
|
||||
std::string md5(std::string input){
|
||||
char tmp[3];
|
||||
std::string ret;
|
||||
const unsigned char * res = MD5((const unsigned char*)input.c_str(), input.length(), 0);
|
||||
for (int i = 0; i < 16; ++i){
|
||||
snprintf(tmp, 3, "%02x", res[i]);
|
||||
ret += tmp;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Handles requests without associated handler, displaying a nice friendly error message.
|
||||
void Handle_Through_Connector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){
|
||||
//create a unique ID based on a hash of the user agent and host, followed by the stream name and connector
|
||||
std::string uid = md5(H.GetHeader("User-Agent")+conn->getHost())+"_"+H.GetVar("stream")+"_"+connector;
|
||||
H.SetHeader("X-UID", uid);//add the UID to the headers before copying
|
||||
std::string request = H.BuildRequest();//copy the request for later forwarding to the connector
|
||||
H.Clean();
|
||||
H.SetBody("<!DOCTYPE html><html><head><title>Handled</title></head><body><h1>"+connector+"</h1>Handling as: "+connector+"</body></html>");
|
||||
conn->Send(H.BuildResponse("200", "OK"));
|
||||
|
||||
//check if a connection exists, and if not create one
|
||||
std::cout << "Creating connection" << std::endl;
|
||||
conn_mutex.lock();
|
||||
if (!connconn.count(uid)){
|
||||
connconn[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_"+connector));
|
||||
connconn[uid]->conn->setBlocking(false);//do not block on spool() with no data
|
||||
}
|
||||
//start a new timeout thread, if neccesary
|
||||
if (timeout_mutex.try_lock()){
|
||||
if (timeouter){
|
||||
timeouter->join();
|
||||
delete timeouter;
|
||||
}
|
||||
timeouter = new tthread::thread(Connector_HTTP::Timeout_Thread, 0);
|
||||
timeout_mutex.unlock();
|
||||
}
|
||||
conn_mutex.unlock();
|
||||
|
||||
std::cout << "Locking connection" << std::endl;
|
||||
//lock the mutex for this connection, and handle the request
|
||||
tthread::lock_guard<tthread::mutex> guard(connconn[uid]->in_use);
|
||||
//if the server connection is dead, handle as timeout.
|
||||
if (!connconn.count(uid) || !connconn[uid]->conn->connected()){
|
||||
Handle_Timeout(H, conn);
|
||||
return;
|
||||
}
|
||||
std::cout << "Forwarding connection" << std::endl;
|
||||
//forward the original request
|
||||
connconn[uid]->conn->Send(request);
|
||||
connconn[uid]->lastuse = 0;
|
||||
unsigned int timeout = 0;
|
||||
//wait for a response
|
||||
std::cout << "Waiting connection" << std::endl;
|
||||
while (connconn.count(uid) && connconn[uid]->conn->connected()){
|
||||
if (connconn[uid]->conn->spool()){
|
||||
//check if the whole response was received
|
||||
if (H.Read(connconn[uid]->conn->Received())){
|
||||
break;//continue down below this while loop
|
||||
}
|
||||
}else{
|
||||
//keep trying unless the timeout triggers
|
||||
timeout++;
|
||||
if (timeout > 50){
|
||||
Handle_Timeout(H, conn);
|
||||
return;
|
||||
}
|
||||
usleep(100000);
|
||||
}
|
||||
}
|
||||
if (!connconn.count(uid) || !connconn[uid]->conn->connected()){
|
||||
//failure, disconnect and sent error to user
|
||||
std::cout << "Failure" << std::endl;
|
||||
Handle_Timeout(H, conn);
|
||||
return;
|
||||
}else{
|
||||
//success, check type of response
|
||||
if (H.GetHeader("Content-Length") != ""){
|
||||
//known length - simply re-send the request with added headers and continue
|
||||
std::cout << "Known success" << std::endl;
|
||||
H.SetHeader("X-UID", uid);
|
||||
H.protocol = "HTTP/1.0";
|
||||
conn->Send(H.BuildResponse("200", "OK"));
|
||||
}else{
|
||||
//unknown length
|
||||
std::cout << "Unknown success" << std::endl;
|
||||
H.SetHeader("X-UID", uid);
|
||||
H.protocol = "HTTP/1.0";
|
||||
conn->Send(H.BuildResponse("200", "OK"));
|
||||
//continue sending data from this socket and keep it permanently in use
|
||||
while (connconn[uid]->conn->connected()){
|
||||
if (connconn[uid]->conn->spool()){
|
||||
//forward any and all incoming data directly without parsing
|
||||
conn->Send(connconn[uid]->conn->Received());
|
||||
connconn[uid]->conn->Received().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
std::cout << "Completing connection" << std::endl;
|
||||
}
|
||||
|
||||
/// Returns the name of the HTTP connector the given request should be served by.
|
||||
|
@ -79,29 +246,34 @@ namespace Connector_HTTP{
|
|||
/// - dynamic (request fed from http_dynamic connector)
|
||||
/// - progressive (request fed from http_progressive connector)
|
||||
std::string getHTTPType(HTTP::Parser & H){
|
||||
if ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos)){return "dynamic";}
|
||||
if (H.url.find("f4m") != std::string::npos){return "dynamic";}
|
||||
if ((H.url.find("f4m") != std::string::npos) || ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos))){
|
||||
H.SetVar("stream", H.url.substr(1,H.url.find("/",1)-1));
|
||||
return "dynamic";
|
||||
}
|
||||
if (H.url.length() > 4){
|
||||
std::string ext = H.url.substr(H.url.length() - 4, 4);
|
||||
if (ext == ".flv"){return "progressive";}
|
||||
if (ext == ".mp3"){return "progressive";}
|
||||
if (ext == ".flv" || ext == ".mp3"){
|
||||
H.SetVar("stream", H.url.substr(1,H.url.length() - 5));
|
||||
return "progressive";
|
||||
}
|
||||
}
|
||||
if (H.url == "/crossdomain.xml"){return "internal";}
|
||||
if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";}
|
||||
return "none";
|
||||
}
|
||||
|
||||
/// Function handling a single connection
|
||||
/// Thread for handling a single HTTP connection
|
||||
void Handle_HTTP_Connection(void * pointer){
|
||||
Socket::Connection * conn = (Socket::Connection *)pointer;
|
||||
conn->setBlocking(false);//do not block on conn.spool() when no data is available
|
||||
HTTP::Parser Client;
|
||||
while (conn->connected()){
|
||||
if (conn->spool()){
|
||||
std::cout << "Data: " << conn->Received() << std::endl;
|
||||
if (Client.Read(conn->Received())){
|
||||
std::string handler = getHTTPType(Client);
|
||||
#if DEBUG >= 4
|
||||
std::cout << "Received request: " << Client.url << " => " << handler << std::endl;
|
||||
std::cout << "Received request: " << Client.url << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl;
|
||||
#endif
|
||||
if (handler == "none" || handler == "internal"){
|
||||
if (handler == "internal"){
|
||||
|
@ -113,11 +285,14 @@ namespace Connector_HTTP{
|
|||
Handle_Through_Connector(Client, conn, handler);
|
||||
}
|
||||
Client.Clean(); //clean for any possible next requests
|
||||
std::cout << "Request handled" << std::endl;
|
||||
}else{
|
||||
#if DEBUG >= 3
|
||||
fprintf(stderr, "Could not parse the following:\n%s\n", conn->Received().c_str());
|
||||
#endif
|
||||
}
|
||||
}else{
|
||||
usleep(10000);//sleep 10ms
|
||||
}
|
||||
}
|
||||
//close and remove the connection
|
||||
|
@ -147,12 +322,18 @@ int main(int argc, char ** argv){
|
|||
if (!server_socket.connected()){return 1;}
|
||||
conf.activate();
|
||||
|
||||
//start progressive and dynamic handlers from the same folder as this application
|
||||
Util::Procs::Start("progressive", (std::string)(argv[0]) + "Progressive -n");
|
||||
Util::Procs::Start("dynamic", (std::string)(argv[0]) + "Dynamic -n");
|
||||
|
||||
while (server_socket.connected() && conf.is_active){
|
||||
Socket::Connection S = server_socket.accept();
|
||||
if (S.connected()){//check if the new connection is valid
|
||||
//lock the thread mutex and spawn a new thread for this connection
|
||||
Connector_HTTP::thread_mutex.lock();
|
||||
tthread::thread * T = new tthread::thread(Connector_HTTP::Handle_HTTP_Connection, (void *)(new Socket::Connection(S)));
|
||||
Connector_HTTP::active_threads.insert(T);
|
||||
//clean up any threads that may have finished
|
||||
while (!Connector_HTTP::done_threads.empty()){
|
||||
T = *Connector_HTTP::done_threads.begin();
|
||||
T->join();
|
||||
|
@ -163,5 +344,6 @@ int main(int argc, char ** argv){
|
|||
}
|
||||
}//while connected and not requested to stop
|
||||
server_socket.close();
|
||||
Util::Procs::StopAll();
|
||||
return 0;
|
||||
}//main
|
||||
|
|
|
@ -112,8 +112,7 @@ namespace Connector_HTTP{
|
|||
std::cout << "Received request: " << HTTP_R.url << std::endl;
|
||||
#endif
|
||||
if (HTTP_R.url.find("f4m") == std::string::npos){
|
||||
Movie = HTTP_R.url.substr(1);
|
||||
Movie = Movie.substr(0,Movie.find("/"));
|
||||
Movie = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
|
||||
Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 );
|
||||
Quality = Quality.substr(0, Quality.find("Seg"));
|
||||
temp = HTTP_R.url.find("Seg") + 3;
|
||||
|
@ -125,8 +124,7 @@ namespace Connector_HTTP{
|
|||
#endif
|
||||
Flash_RequestPending++;
|
||||
}else{
|
||||
Movie = HTTP_R.url.substr(1);
|
||||
Movie = Movie.substr(0,Movie.find("/"));
|
||||
Movie = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
|
||||
HTTP_S.Clean();
|
||||
HTTP_S.SetHeader("Content-Type","text/xml");
|
||||
HTTP_S.SetHeader("Cache-Control","no-cache");
|
||||
|
|
Loading…
Add table
Reference in a new issue