Fixed some more threading issues in HTTP proxy - clarified code.

This commit is contained in:
Thulinma 2013-04-10 15:33:40 +02:00
parent 3ab6d3f47f
commit 40a99ece9a

View file

@ -329,46 +329,48 @@ namespace Connector_HTTP {
} }
//lock the mutex for this connection, and handle the request //lock the mutex for this connection, and handle the request
tthread::lock_guard<tthread::mutex> guard(connectorConnections[uid]->inUse); ConnConn * myCConn = connectorConnections[uid];
myCConn->inUse.lock();
connMutex.unlock(); connMutex.unlock();
//if the server connection is dead, handle as timeout. //if the server connection is dead, handle as timeout.
if ( !connectorConnections[uid]->conn->connected()){ if ( !myCConn->conn->connected()){
connectorConnections[uid]->conn->close(); myCConn->conn->close();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
} }
//forward the original request //forward the original request
connectorConnections[uid]->conn->SendNow(request); myCConn->conn->SendNow(request);
connectorConnections[uid]->lastUse = 0; myCConn->lastUse = 0;
unsigned int timeout = 0; unsigned int timeout = 0;
unsigned int retries = 0; unsigned int retries = 0;
//wait for a response //wait for a response
while (connectorConnections.count(uid) && connectorConnections[uid]->conn->connected() && conn->connected()){ while (myCConn->conn->connected() && conn->connected()){
conn->spool(); conn->spool();
if (connectorConnections[uid]->conn->Received().size() || connectorConnections[uid]->conn->spool()){ if (myCConn->conn->Received().size() || myCConn->conn->spool()){
//make sure we end in a \n //make sure we end in a \n
if ( *(connectorConnections[uid]->conn->Received().get().rbegin()) != '\n'){ if ( *(myCConn->conn->Received().get().rbegin()) != '\n'){
std::string tmp = connectorConnections[uid]->conn->Received().get(); std::string tmp = myCConn->conn->Received().get();
connectorConnections[uid]->conn->Received().get().clear(); myCConn->conn->Received().get().clear();
if (connectorConnections[uid]->conn->Received().size()){ if (myCConn->conn->Received().size()){
connectorConnections[uid]->conn->Received().get().insert(0, tmp); myCConn->conn->Received().get().insert(0, tmp);
}else{ }else{
connectorConnections[uid]->conn->Received().append(tmp); myCConn->conn->Received().append(tmp);
} }
} }
//check if the whole response was received //check if the whole response was received
if (H.Read(connectorConnections[uid]->conn->Received().get())){ if (H.Read(myCConn->conn->Received().get())){
//208 means the fragment is too new, retry in 3s //208 means the fragment is too new, retry in 3s
if (H.url == "208"){ if (H.url == "208"){
retries++; retries++;
if (retries >= 5){ if (retries >= 5){
std::cout << "[5 retry-laters, cancelled]" << std::endl; std::cout << "[5 retry-laters, cancelled]" << std::endl;
connectorConnections[uid]->conn->close(); myCConn->conn->close();
myCConn->inUse.unlock();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
} }
connectorConnections[uid]->lastUse = 0; myCConn->lastUse = 0;
timeout = 0; timeout = 0;
Util::sleep(3000); Util::sleep(3000);
connectorConnections[uid]->conn->SendNow(request); myCConn->conn->SendNow(request);
H.Clean(); H.Clean();
continue; continue;
} }
@ -378,16 +380,18 @@ namespace Connector_HTTP {
//keep trying unless the timeout triggers //keep trying unless the timeout triggers
if (timeout++ > 4000){ if (timeout++ > 4000){
std::cout << "[20s timeout triggered]" << std::endl; std::cout << "[20s timeout triggered]" << std::endl;
connectorConnections[uid]->conn->close(); myCConn->conn->close();
myCConn->inUse.unlock();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
}else{ }else{
Util::sleep(5); Util::sleep(5);
} }
} }
} }
if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected() || !conn->connected()){ if ( !myCConn->conn->connected() || !conn->connected()){
//failure, disconnect and sent error to user //failure, disconnect and sent error to user
connectorConnections[uid]->conn->close(); myCConn->conn->close();
myCConn->inUse.unlock();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
}else{ }else{
long long int ret = Util::getMS(); long long int ret = Util::getMS();
@ -397,15 +401,16 @@ namespace Connector_HTTP {
H.SetHeader("X-UID", uid); H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
conn->SendNow(H.BuildResponse("200", "OK")); conn->SendNow(H.BuildResponse("200", "OK"));
myCConn->inUse.unlock();
}else{ }else{
//unknown length //unknown length
H.SetHeader("X-UID", uid); H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
conn->SendNow(H.BuildResponse("200", "OK")); conn->SendNow(H.BuildResponse("200", "OK"));
//switch out the connection for an empty one - it makes no sense to keep these globally //switch out the connection for an empty one - it makes no sense to keep these globally
Socket::Connection * myConn = connectorConnections[uid]->conn; Socket::Connection * myConn = myCConn->conn;
connectorConnections[uid]->conn = new Socket::Connection(); myCConn->conn = new Socket::Connection();
connectorConnections[uid]->inUse.unlock(); myCConn->inUse.unlock();
//continue sending data from this socket and keep it permanently in use //continue sending data from this socket and keep it permanently in use
while (myConn->connected() && conn->connected()){ while (myConn->connected() && conn->connected()){
if (myConn->Received().size() || myConn->spool()){ if (myConn->Received().size() || myConn->spool()){
@ -555,10 +560,9 @@ int main(int argc, char ** argv){
Socket::Connection S = server_socket.accept(); Socket::Connection S = server_socket.accept();
if (S.connected()){ //check if the new connection is valid if (S.connected()){ //check if the new connection is valid
//spawn a new thread for this connection //spawn a new thread for this connection
tthread::thread * T = new tthread::thread(Connector_HTTP::proxyHandleHTTPConnection, (void *)(new Socket::Connection(S))); tthread::thread T(Connector_HTTP::proxyHandleHTTPConnection, (void *)(new Socket::Connection(S)));
//detach it, no need to keep track of it anymore //detach it, no need to keep track of it anymore
T->detach(); T.detach();
delete T;
}else{ }else{
Util::sleep(10); //sleep 10ms Util::sleep(10); //sleep 10ms
} }