HTTP proxy rewrite, by Wouter Spruit.

This commit is contained in:
Thulinma 2014-06-18 15:00:26 +02:00
parent 00d1dfb1e5
commit 81d56bc04b
13 changed files with 191 additions and 244 deletions

View file

@ -4,13 +4,16 @@
#include <iostream>
#include <queue>
#include <set>
#include <sstream>
#include <ctime>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h> //
#include <getopt.h>
#include <mist/socket.h>
@ -29,6 +32,25 @@
/// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){
for (JSON::ObjIter it = argset.ObjBegin(); it != argset.ObjEnd(); ++it){
if (it->second.isMember("option") && p.isMember(it->first)){
if (it->second.isMember("type")){
if (it->second["type"].asStringRef() == "str" && !p[it->first].isString()){
p[it->first] = p[it->first].asString();
}
if ((it->second["type"].asStringRef() == "uint" || it->second["type"].asStringRef() == "int") && !p[it->first].isInt()){
p[it->first] = JSON::Value(p[it->first].asInt()).asString();
}
}
if (p[it->first].asStringRef().size() > 0){
argarr[argnum++] = (char*)(it->second["option"].c_str());
argarr[argnum++] = (char*)(p[it->first].c_str());
}
}
}
}
/// Class for keeping track of connections to connectors.
class ConnConn{
public:
@ -62,6 +84,7 @@ namespace Connector_HTTP {
tthread::mutex timeoutMutex; ///< Mutex for timeout thread.
tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors.
JSON::Value capabilities; ///< Holds a list of all HTTP connectors and their properties
JSON::Value ServConf; /// < holds configuration, loads from file in main
///\brief Function run as a thread to timeout requests on the proxy.
///\param n A NULL-pointer
@ -295,7 +318,7 @@ namespace Connector_HTTP {
streamname = url.substr(7, url.length() - 10);
}
Util::Stream::sanitizeName(streamname);
JSON::Value ServConf = JSON::fromFile(Util::getTmpFolder() + "streamlist");
//JSON::Value ServConf = JSON::fromFile(Util::getTmpFolder() + "streamlist");
std::string response;
std::string host = H.GetHeader("Host");
if (host.find(':')){
@ -390,185 +413,90 @@ namespace Connector_HTTP {
return proxyHandleUnsupported(H, conn); //anything else doesn't get handled
}
///\brief Handles requests by dispatching them to the corresponding connector.
///\param H The request to be handled.
///\brief Handles requests by starting a corresponding output process.
///\param H The request to be handled
///\param conn The connection to the client that issued the request.
///\param connector The type of connector to be invoked.
///\return A timestamp indicating when the request was parsed.
///\return -1 on failure, else 0.
long long int proxyHandleThroughConnector(HTTP::Parser & H, Socket::Connection & conn, std::string & connector){
static unsigned long long int confUpdateTime=0;
static tthread::mutex updateLock;
if( Util::bootSecs() -confUpdateTime > 10 ){
tthread::lock_guard<tthread::mutex> guard(updateLock);
if( Util::bootSecs() -confUpdateTime > 10 ){
Connector_HTTP::ServConf = JSON::fromFile(Util::getTmpFolder() + "streamlist");
confUpdateTime=Util::bootSecs();
}
}
//create a unique ID based on a hash of the user agent and host, followed by the stream name and connector
std::string uid = Secure::md5(H.GetHeader("User-Agent") + conn.getHost()) + "_" + H.GetVar("stream") + "_" + connector;
H.SetHeader("X-Stream", H.GetVar("stream"));
H.SetHeader("X-UID", uid); //add the UID to the headers before copying
H.SetHeader("X-Origin", conn.getHost()); //add the UID to the headers before copying
std::string request = H.BuildRequest(); //copy the request for later forwarding to the connector
std::string orig_url = H.getUrl();
H.Clean();
std::stringstream uidtemp;
/// \todo verify the correct formation of the uid
uidtemp << Secure::md5(H.GetHeader("User-Agent") + conn.getHost()) << "_" << H.GetVar("stream") << "_" << connector;
std::string uid = uidtemp.str();
ConnConn * myCConn = 0;
unsigned int counter = 0;
//loop until a connection is available/created
while (!myCConn){
//lock the connection mutex before trying anything
connMutex.lock();
//check if a connection exists, and if not create one
if ( !connectorConnections.count(uid)){
connectorConnections[uid] = new ConnConn(new Socket::Connection(Util::getTmpFolder() + connector));
connectorConnections[uid]->conn->setBlocking(false); //do not block on spool() with no data
if (!connectorConnections[uid]->conn->spool() && !connectorConnections[uid]->conn){
//unlock the connection mutex before exiting
connMutex.unlock();
DEBUG_MSG(DLVL_FAIL, "Created new connection (%s) failed - aborting request!", uid.c_str());
return Util::getMS();
}
DEBUG_MSG(DLVL_HIGH, "Created new connection %s", uid.c_str());
}
//attempt to lock the mutex for this connection
if (connectorConnections[uid]->inUse.try_lock()){
myCConn = connectorConnections[uid];
//if the connection is dead, delete it and re-loop
if (!myCConn->conn->spool() && !myCConn->conn->connected()){
counter++;
DEBUG_MSG(DLVL_HIGH, "Resetting existing connection %s", uid.c_str());
connectorConnections.erase(uid);
myCConn->inUse.unlock();
delete myCConn;
myCConn = 0;
if (counter++ > 2){
connMutex.unlock();
DEBUG_MSG(DLVL_FAIL, "Created new connection (%s) failed - aborting request!", uid.c_str());
return Util::getMS();
}
}else{
DEBUG_MSG(DLVL_HIGH, "Using active connection %s", uid.c_str());
}
}
//unlock the connection mutex before sleeping
connMutex.unlock();
//no connection yet? wait for 0.1 second and try again
if ( !myCConn){
Util::sleep(100);
//fdIn and fdOut are connected to conn.sock
int fdIn = conn.getSocket();
int fdOut = conn.getSocket();
//taken from CheckProtocols (controller_connectors.cpp)
char * argarr[20];
for (int i=0; i<20; i++){argarr[i] = 0;}
int id = -1;
for (unsigned int i=0; i < ServConf["config"]["protocols"].size(); ++i){
std::cout << "checking: " << ServConf["config"]["protocols"][i]["connector"].asStringRef() <<std::endl;
if ( ServConf["config"]["protocols"][i]["connector"].asStringRef() == connector ) {
id = i;
break; //pick the first protocol in the list that matches the connector
}
}
if (id == -1) {
DEBUG_MSG(DLVL_ERROR, "No connector found for: %s", connector.c_str());
return -1;
}
DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str());
//build arguments for starting output process
std::string temphost=conn.getHost();
std::string tempstream=H.GetVar("stream");
// buildPipedArguments( , (char **)&argarr, Connector_HTTP::capabilities, temphost, tempstream);
int argnum = 0;
std::string tmparg;
tmparg = Util::getMyPath() + std::string("MistOut") + ServConf["config"]["protocols"][id]["connector"].asStringRef();
struct stat buf;
if (::stat(tmparg.c_str(), &buf) != 0){
tmparg = Util::getMyPath() + std::string("MistConn") + ServConf["config"]["protocols"][id]["connector"].asStringRef();
}
//we now have a locked, working connection
{//start a new timeout thread, if neccesary
tthread::lock_guard<tthread::mutex> guard(timeoutStartMutex);
if (timeoutMutex.try_lock()){
if (timeouter){
timeouter->join();
delete timeouter;
}
timeoutThreadStarted = false;
timeouter = new tthread::thread(Connector_HTTP::proxyTimeoutThread, 0);
timeoutMutex.unlock();
while (!timeoutThreadStarted){Util::sleep(10);}
}
}
//forward the original request
myCConn->conn->SendNow(request);
myCConn->lastUse = 0;
unsigned int timeout = 0;
unsigned int retries = 0;
//set to only read headers
H.headerOnly = true;
//wait for a response
while (myCConn->conn->connected() && conn.connected()){
conn.spool();
//check if the whole header was received
if (myCConn->conn->spool() && H.Read(*(myCConn->conn))){
//208 means the fragment is too new, retry in 3s
if (H.url == "208"){
while (myCConn->conn->Received().size() > 0){
myCConn->conn->Received().get().clear();
}
retries++;
if (retries >= 10){
DEBUG_MSG(DLVL_HIGH, "Cancelled connection %s, because of 208 status repeated 10 times", uid.c_str());
myCConn->conn->close();
myCConn->inUse.unlock();
//unset to only read headers
H.headerOnly = false;
return proxyHandleTimeout(H, conn, "Timeout: fragment too new");
}
myCConn->lastUse = 0;
timeout = 0;
Util::sleep(3000);
myCConn->conn->SendNow(request);
H.Clean();
continue;
}
break; //continue down below this while loop
}
//keep trying unless the timeout triggers
if (timeout++ > 4000){
DEBUG_MSG(DLVL_HIGH, "Canceled connection %s, 4s timeout", uid.c_str());
myCConn->conn->close();
myCConn->inUse.unlock();
//unset to only read headers
H.headerOnly = false;
return proxyHandleTimeout(H, conn, "Gateway timeout while waiting for response");
}else{
Util::sleep(100);
}
}
//unset to only read headers
H.headerOnly = false;
if ( !myCConn->conn->connected() || !conn.connected()){
//failure, disconnect and sent error to user
myCConn->conn->close();
myCConn->inUse.unlock();
return proxyHandleTimeout(H, conn, "Gateway connection dropped");
}else{
long long int ret = Util::getMS();
//success, check type of response
if (H.GetHeader("MistMultiplex") != "No" && (H.GetHeader("Content-Length") != "" || H.GetHeader("Transfer-Encoding") == "chunked")){
//known length - simply re-send the request with added headers and continue
DEBUG_MSG(DLVL_HIGH, "Proxying %s - known length or chunked transfer encoding", uid.c_str());
H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.body = "";
H.Proxy(*(myCConn->conn), conn);
if (!conn.connected()){
DEBUG_MSG(DLVL_HIGH, "Incoming connection to %s dropped, killing off connector", uid.c_str());
myCConn->conn->close();
}
myCConn->inUse.unlock();
}else{
DEBUG_MSG(DLVL_HIGH, "Handing off %s - one-time connection", uid.c_str());
//unknown length
H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
conn.SendNow(H.BuildResponse(H.url, H.method));
//switch out the connection for an empty one - it makes no sense to keep these globally
Socket::Connection * myConn = myCConn->conn;
myCConn->conn = new Socket::Connection();
myCConn->inUse.unlock();
long long int last_data_time = Util::getMS();
//continue sending data from this socket and keep it permanently in use
while (myConn->connected() && conn.connected()){
if (myConn->Received().size() || myConn->spool()){
//forward any and all incoming data directly without parsing
conn.SendNow(myConn->Received().get());
myConn->Received().get().clear();
last_data_time = Util::getMS();
}else{
Util::sleep(30);
//if no data for 5000ms, cancel the connection
if (Util::getMS() - last_data_time > 5000){
break;
}
}
}
myConn->close();
delete myConn;
conn.close();
}
return ret;
}
argarr[argnum++] = (char*)tmparg.c_str();
JSON::Value & p = ServConf["config"]["protocols"][id];
JSON::Value & pipedCapa = capabilities[p["connector"].asStringRef()];
/// \todo why is the if(pipedCapa) line not working (nothing is added to argarr)??
if (pipedCapa.isMember("required")){builPipedPart(p, argarr, argnum, pipedCapa["required"]);}
if (pipedCapa.isMember("optional")){builPipedPart(p, argarr, argnum, pipedCapa["optional"]);}
argarr[argnum++] = (char*)"-i";
argarr[argnum++] = (char*)(temphost.c_str());
argarr[argnum++] = (char*)"-s";
argarr[argnum++] = (char*)(tempstream.c_str());
INFO_MSG("argnum: %i", argnum);
//for (unsigned int i=0; i<20; i++){
//std::cerr << "argv["<<i<< "] " << argarr[i] <<std::endl;
//}
//std::cerr << "p: " << p.toPrettyString() <<std::endl;
//std::cerr << "pipedCapa: " << pipedCapa.toPrettyString() <<std::endl;
//std::cerr << "capa: " << capabilities.toPrettyString() <<std::endl;
int tempint = fileno(stderr);
///start output process, fdIn and fdOut are connected to conn.sock
Util::Procs::StartPiped(argarr, & fdIn, & fdOut, & tempint);
conn.drop();
return 0;
}
///\brief Determines the type of connector to be used for handling a request.
@ -593,8 +521,8 @@ namespace Connector_HTTP {
//it matched - handle it now
std::string streamname = url.substr(found, url.size() - oit->second["url_match"].asStringRef().size() + 1);
Util::Stream::sanitizeName(streamname);
H.SetVar("stream", streamname);
return oit->second["socket"];
H.SetVar("stream", streamname);
return oit->first;
}
}
}
@ -608,7 +536,7 @@ namespace Connector_HTTP {
std::string streamname = url.substr(found, found_suf - found);
Util::Stream::sanitizeName(streamname);
H.SetVar("stream", streamname);
return oit->second["socket"];
return oit->first;
}
}
}
@ -645,16 +573,11 @@ namespace Connector_HTTP {
conn.setBlocking(false); //do not block on conn.spool() when no data is available
HTTP::Parser Client;
while (conn.connected()){
if (conn.spool() && Client.Read(conn)){
//conn.peek reads data without removing it from pipe
if (conn.peek() && Client.Read(conn)){
std::string handler = proxyGetHandleType(Client);
DEBUG_MSG(DLVL_HIGH, "Received request: %s (%d) => %s (%s)", Client.getUrl().c_str(), conn.getSocket(), handler.c_str(), Client.GetVar("stream").c_str());
#if DEBUG >= DLVL_HIGH
long long int startms = Util::getMS();
long long int midms = 0;
#define MID_BENCH midms =
#else
#define MID_BENCH
#endif
bool closeConnection = false;
if (Client.GetHeader("Connection") == "close"){
closeConnection = true;
@ -662,17 +585,14 @@ namespace Connector_HTTP {
if (handler == "none" || handler == "internal"){
if (handler == "internal"){
MID_BENCH proxyHandleInternal(Client, conn);
proxyHandleInternal(Client, conn);
}else{
MID_BENCH proxyHandleUnsupported(Client, conn);
proxyHandleUnsupported(Client, conn);
}
}else{
MID_BENCH proxyHandleThroughConnector(Client, conn, handler);
proxyHandleThroughConnector(Client, conn, handler);
}
#if DEBUG >= DLVL_HIGH
long long int nowms = Util::getMS();
DEBUG_MSG(DLVL_HIGH, "Completed request %d (%s) in %d ms (processing) / %d ms (transfer)", conn.getSocket(), handler.c_str(), (midms - startms), (nowms - midms));
#endif
DEBUG_MSG(DLVL_HIGH, "Completed request %d (%s) ", conn.getSocket(), handler.c_str());
if (closeConnection){
break;
}
@ -695,6 +615,7 @@ int main(int argc, char ** argv){
capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
capa["optional"]["debug"]["option"] = "--debug";
capa["optional"]["debug"]["type"] = "uint";
Connector_HTTP::ServConf = JSON::fromFile(Util::getTmpFolder() + "streamlist");
capa["desc"] = "Enables the generic HTTP listener, required by all other HTTP protocols. Needs other HTTP protocols enabled to do much of anything.";
capa["deps"] = "";
conf.addConnectorOptions(8080, capa);
@ -729,10 +650,4 @@ int main(int argc, char ** argv){
}
return conf.serveThreadedSocket(Connector_HTTP::proxyHandleHTTPConnection);
if (Connector_HTTP::timeouter){
Connector_HTTP::timeouter->detach();
delete Connector_HTTP::timeouter;
}
return 0;
} //main