Fix sessions race condition during shutdown

This commit is contained in:
Thulinma 2022-10-17 11:51:50 +02:00
parent 8175ad6dd5
commit 0c68bb1530

View file

@ -111,7 +111,6 @@ void userOnDisconnect(Comms::Connections & connections, size_t idx){
} }
int main(int argc, char **argv){ int main(int argc, char **argv){
Comms::Connections connections;
Comms::Sessions sessions; Comms::Sessions sessions;
uint64_t lastSeen = Util::bootSecs(); uint64_t lastSeen = Util::bootSecs();
Util::redirectLogsIfNeeded(); Util::redirectLogsIfNeeded();
@ -235,9 +234,6 @@ int main(int argc, char **argv){
if (thisStreamName.size()){streamLastActive[thisStreamName] = now;} if (thisStreamName.size()){streamLastActive[thisStreamName] = now;}
if (memcmp(thisHost.data(), nullAddress, 16)){hostLastActive[thisHost] = now;} if (memcmp(thisHost.data(), nullAddress, 16)){hostLastActive[thisHost] = now;}
// Open the shared memory page containing statistics for each individual connection in this session
connections.reload(thisSessionId, true);
// Determine session type, since triggers only get run for viewer type sessions // Determine session type, since triggers only get run for viewer type sessions
uint64_t thisType = 0; uint64_t thisType = 0;
if (thisSessionId[0] == 'I'){ if (thisSessionId[0] == 'I'){
@ -247,126 +243,135 @@ int main(int argc, char **argv){
} else if (thisSessionId[0] == 'U'){ } else if (thisSessionId[0] == 'U'){
thisType = 3; thisType = 3;
} }
bool shouldSleep = false;
// Do a USER_NEW trigger if it is defined for this stream //Scope to ensure the connections page is deleted before other cleanup happens
if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){ {
std::string payload = thisStreamName + "\n" + config.getString("ip") + "\n" + // Open the shared memory page containing statistics for each individual connection in this session
thisToken + "\n" + thisProtocol + Comms::Connections connections;
"\n" + thisReqUrl + "\n" + thisSessionId; connections.reload(thisSessionId, true);
if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){
// Mark all connections of this session as finished, since this viewer is not allowed to view this stream
Util::logExitReason("Session rejected by USER_NEW");
connections.setExit();
connections.finishAll();
}
}
//start allowing viewers // Do a USER_NEW trigger if it is defined for this stream
sessionLock.post(); if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){
std::string payload = thisStreamName + "\n" + config.getString("ip") + "\n" +
INFO_MSG("Started new session %s in %.3f ms", thisSessionId.c_str(), (double)Util::getMicros(bootTime)/1000.0); thisToken + "\n" + thisProtocol +
"\n" + thisReqUrl + "\n" + thisSessionId;
// Stay active until Mist exits or we no longer have an active connection if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){
while (config.is_active && (currentConnections || now - lastSeen <= STATS_DELAY) && !connections.getExit()){ // Mark all connections of this session as finished, since this viewer is not allowed to view this stream
currentConnections = 0; Util::logExitReason("Session rejected by USER_NEW");
lastSecond = 0; connections.setExit();
now = Util::bootSecs(); connections.finishAll();
}
// Loop through all connection entries to get a summary of statistics
COMM_LOOP(connections, userOnActive(connections, id), userOnDisconnect(connections, id));
if (currentConnections){
globalTime++;
lastSeen = now;
} }
//start allowing viewers
sessionLock.post();
sessions.setTime(globalTime); INFO_MSG("Started new session %s in %.3f ms", thisSessionId.c_str(), (double)Util::getMicros(bootTime)/1000.0);
sessions.setDown(globalDown);
sessions.setUp(globalUp);
sessions.setPacketCount(globalPktcount);
sessions.setPacketLostCount(globalPktloss);
sessions.setPacketRetransmitCount(globalPktretrans);
sessions.setLastSecond(lastSecond);
sessions.setNow(now);
if (currentConnections){ // Stay active until Mist exits or we no longer have an active connection
{ while (config.is_active && (currentConnections || now - lastSeen <= STATS_DELAY) && !connections.getExit()){
// Convert active protocols to string currentConnections = 0;
std::stringstream connectorSummary; lastSecond = 0;
for (std::map<std::string, uint64_t>::iterator it = connectorLastActive.begin(); now = Util::bootSecs();
it != connectorLastActive.end(); ++it){
if (now - it->second < STATS_DELAY){ // Loop through all connection entries to get a summary of statistics
connectorSummary << (connectorSummary.str().size() ? "," : "") << it->first; COMM_LOOP(connections, userOnActive(connections, id), userOnDisconnect(connections, id));
} if (currentConnections){
} globalTime++;
sessions.setConnector(connectorSummary.str()); lastSeen = now;
} }
{
// Set active host to last active or 0 if there were various hosts active recently sessions.setTime(globalTime);
std::string thisHost; sessions.setDown(globalDown);
for (std::map<std::string, uint64_t>::iterator it = hostLastActive.begin(); sessions.setUp(globalUp);
it != hostLastActive.end(); ++it){ sessions.setPacketCount(globalPktcount);
if (now - it->second < STATS_DELAY){ sessions.setPacketLostCount(globalPktloss);
if (!thisHost.size()){ sessions.setPacketRetransmitCount(globalPktretrans);
thisHost = it->first; sessions.setLastSecond(lastSecond);
}else if (thisHost != it->first){ sessions.setNow(now);
thisHost = nullAddress;
break; if (currentConnections){
{
// Convert active protocols to string
std::stringstream connectorSummary;
for (std::map<std::string, uint64_t>::iterator it = connectorLastActive.begin();
it != connectorLastActive.end(); ++it){
if (now - it->second < STATS_DELAY){
connectorSummary << (connectorSummary.str().size() ? "," : "") << it->first;
} }
} }
sessions.setConnector(connectorSummary.str());
} }
if (!thisHost.size()){
thisHost = nullAddress;
}
sessions.setHost(thisHost);
}
{ {
// Set active stream name to last active or "" if there were multiple streams active recently // Set active host to last active or 0 if there were various hosts active recently
std::string thisStream = ""; std::string thisHost;
for (std::map<std::string, uint64_t>::iterator it = streamLastActive.begin(); for (std::map<std::string, uint64_t>::iterator it = hostLastActive.begin();
it != streamLastActive.end(); ++it){ it != hostLastActive.end(); ++it){
if (now - it->second < STATS_DELAY){ if (now - it->second < STATS_DELAY){
if (!thisStream.size()){ if (!thisHost.size()){
thisStream = it->first; thisHost = it->first;
}else if (thisStream != it->first){ }else if (thisHost != it->first){
thisStream = ""; thisHost = nullAddress;
break; break;
}
} }
} }
if (!thisHost.size()){
thisHost = nullAddress;
}
sessions.setHost(thisHost);
} }
sessions.setStream(thisStream);
}
}
// Retrigger USER_NEW if a re-sync was requested {
if (!thisType && forceTrigger){ // Set active stream name to last active or "" if there were multiple streams active recently
forceTrigger = false; std::string thisStream = "";
std::string host; for (std::map<std::string, uint64_t>::iterator it = streamLastActive.begin();
Socket::hostBytesToStr(thisHost.data(), 16, host); it != streamLastActive.end(); ++it){
if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){ if (now - it->second < STATS_DELAY){
INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str()); if (!thisStream.size()){
std::string payload = thisStreamName + "\n" + host + "\n" + thisStream = it->first;
thisToken + "\n" + thisProtocol + }else if (thisStream != it->first){
"\n" + thisReqUrl + "\n" + thisSessionId; thisStream = "";
if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ break;
INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str()); }
Util::logExitReason("Session rejected by USER_NEW"); }
connections.setExit(); }
connections.finishAll(); sessions.setStream(thisStream);
break;
}else{
INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str());
} }
} }
}
// Remember latest activity so we know when this session ends // Retrigger USER_NEW if a re-sync was requested
if (currentConnections){ if (!thisType && forceTrigger){
forceTrigger = false;
std::string host;
Socket::hostBytesToStr(thisHost.data(), 16, host);
if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){
INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str());
std::string payload = thisStreamName + "\n" + host + "\n" +
thisToken + "\n" + thisProtocol +
"\n" + thisReqUrl + "\n" + thisSessionId;
if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){
INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str());
Util::logExitReason("Session rejected by USER_NEW");
connections.setExit();
connections.finishAll();
break;
}else{
INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str());
}
}
}
// Remember latest activity so we know when this session ends
if (currentConnections){
}
Util::wait(1000);
} }
Util::wait(1000); shouldSleep = connections.getExit();
} }//connections scope end
if (Util::bootSecs() - lastSeen > STATS_DELAY){ if (Util::bootSecs() - lastSeen > STATS_DELAY){
Util::logExitReason("Session inactive for %d seconds", STATS_DELAY); Util::logExitReason("Session inactive for %d seconds", STATS_DELAY);
} }
@ -412,7 +417,7 @@ int main(int argc, char **argv){
Triggers::doTrigger("USER_END", summary.str(), thisStreamName); Triggers::doTrigger("USER_END", summary.str(), thisStreamName);
} }
if (!thisType && connections.getExit()){ if (!thisType && shouldSleep){
uint64_t sleepStart = Util::bootSecs(); uint64_t sleepStart = Util::bootSecs();
// Keep session invalidated for 10 minutes, or until the session stops // Keep session invalidated for 10 minutes, or until the session stops
while (config.is_active && Util::bootSecs() - sleepStart < SESS_TIMEOUT){ while (config.is_active && Util::bootSecs() - sleepStart < SESS_TIMEOUT){