diff --git a/src/session.cpp b/src/session.cpp index d2331c53..59fb768c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -111,7 +111,6 @@ void userOnDisconnect(Comms::Connections & connections, size_t idx){ } int main(int argc, char **argv){ - Comms::Connections connections; Comms::Sessions sessions; uint64_t lastSeen = Util::bootSecs(); Util::redirectLogsIfNeeded(); @@ -235,9 +234,6 @@ int main(int argc, char **argv){ if (thisStreamName.size()){streamLastActive[thisStreamName] = now;} if (memcmp(thisHost.data(), nullAddress, 16)){hostLastActive[thisHost] = now;} - // Open the shared memory page containing statistics for each individual connection in this session - connections.reload(thisSessionId, true); - // Determine session type, since triggers only get run for viewer type sessions uint64_t thisType = 0; if (thisSessionId[0] == 'I'){ @@ -247,126 +243,135 @@ int main(int argc, char **argv){ } else if (thisSessionId[0] == 'U'){ thisType = 3; } + bool shouldSleep = false; - // Do a USER_NEW trigger if it is defined for this stream - if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){ - std::string payload = thisStreamName + "\n" + config.getString("ip") + "\n" + - thisToken + "\n" + thisProtocol + - "\n" + thisReqUrl + "\n" + thisSessionId; - if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ - // Mark all connections of this session as finished, since this viewer is not allowed to view this stream - Util::logExitReason("Session rejected by USER_NEW"); - connections.setExit(); - connections.finishAll(); - } - } + //Scope to ensure the connections page is deleted before other cleanup happens + { + // Open the shared memory page containing statistics for each individual connection in this session + Comms::Connections connections; + connections.reload(thisSessionId, true); - //start allowing viewers - sessionLock.post(); - - INFO_MSG("Started new session %s in %.3f ms", thisSessionId.c_str(), (double)Util::getMicros(bootTime)/1000.0); - - // Stay active until Mist exits or we no longer have an active connection - while (config.is_active && (currentConnections || now - lastSeen <= STATS_DELAY) && !connections.getExit()){ - currentConnections = 0; - lastSecond = 0; - now = Util::bootSecs(); - - // Loop through all connection entries to get a summary of statistics - COMM_LOOP(connections, userOnActive(connections, id), userOnDisconnect(connections, id)); - if (currentConnections){ - globalTime++; - lastSeen = now; + // Do a USER_NEW trigger if it is defined for this stream + if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){ + std::string payload = thisStreamName + "\n" + config.getString("ip") + "\n" + + thisToken + "\n" + thisProtocol + + "\n" + thisReqUrl + "\n" + thisSessionId; + if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ + // Mark all connections of this session as finished, since this viewer is not allowed to view this stream + Util::logExitReason("Session rejected by USER_NEW"); + connections.setExit(); + connections.finishAll(); + } } + //start allowing viewers + sessionLock.post(); - sessions.setTime(globalTime); - sessions.setDown(globalDown); - sessions.setUp(globalUp); - sessions.setPacketCount(globalPktcount); - sessions.setPacketLostCount(globalPktloss); - sessions.setPacketRetransmitCount(globalPktretrans); - sessions.setLastSecond(lastSecond); - sessions.setNow(now); + INFO_MSG("Started new session %s in %.3f ms", thisSessionId.c_str(), (double)Util::getMicros(bootTime)/1000.0); - if (currentConnections){ - { - // Convert active protocols to string - std::stringstream connectorSummary; - for (std::map::iterator it = connectorLastActive.begin(); - it != connectorLastActive.end(); ++it){ - if (now - it->second < STATS_DELAY){ - connectorSummary << (connectorSummary.str().size() ? "," : "") << it->first; - } - } - sessions.setConnector(connectorSummary.str()); + // Stay active until Mist exits or we no longer have an active connection + while (config.is_active && (currentConnections || now - lastSeen <= STATS_DELAY) && !connections.getExit()){ + currentConnections = 0; + lastSecond = 0; + now = Util::bootSecs(); + + // Loop through all connection entries to get a summary of statistics + COMM_LOOP(connections, userOnActive(connections, id), userOnDisconnect(connections, id)); + if (currentConnections){ + globalTime++; + lastSeen = now; } - { - // Set active host to last active or 0 if there were various hosts active recently - std::string thisHost; - for (std::map::iterator it = hostLastActive.begin(); - it != hostLastActive.end(); ++it){ - if (now - it->second < STATS_DELAY){ - if (!thisHost.size()){ - thisHost = it->first; - }else if (thisHost != it->first){ - thisHost = nullAddress; - break; + + sessions.setTime(globalTime); + sessions.setDown(globalDown); + sessions.setUp(globalUp); + sessions.setPacketCount(globalPktcount); + sessions.setPacketLostCount(globalPktloss); + sessions.setPacketRetransmitCount(globalPktretrans); + sessions.setLastSecond(lastSecond); + sessions.setNow(now); + + if (currentConnections){ + { + // Convert active protocols to string + std::stringstream connectorSummary; + for (std::map::iterator it = connectorLastActive.begin(); + it != connectorLastActive.end(); ++it){ + if (now - it->second < STATS_DELAY){ + connectorSummary << (connectorSummary.str().size() ? "," : "") << it->first; } } + sessions.setConnector(connectorSummary.str()); } - if (!thisHost.size()){ - thisHost = nullAddress; - } - sessions.setHost(thisHost); - } - { - // Set active stream name to last active or "" if there were multiple streams active recently - std::string thisStream = ""; - for (std::map::iterator it = streamLastActive.begin(); - it != streamLastActive.end(); ++it){ - if (now - it->second < STATS_DELAY){ - if (!thisStream.size()){ - thisStream = it->first; - }else if (thisStream != it->first){ - thisStream = ""; - break; + { + // Set active host to last active or 0 if there were various hosts active recently + std::string thisHost; + for (std::map::iterator it = hostLastActive.begin(); + it != hostLastActive.end(); ++it){ + if (now - it->second < STATS_DELAY){ + if (!thisHost.size()){ + thisHost = it->first; + }else if (thisHost != it->first){ + thisHost = nullAddress; + break; + } } } + if (!thisHost.size()){ + thisHost = nullAddress; + } + sessions.setHost(thisHost); } - sessions.setStream(thisStream); - } - } - // Retrigger USER_NEW if a re-sync was requested - if (!thisType && forceTrigger){ - forceTrigger = false; - std::string host; - Socket::hostBytesToStr(thisHost.data(), 16, host); - if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){ - INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str()); - std::string payload = thisStreamName + "\n" + host + "\n" + - thisToken + "\n" + thisProtocol + - "\n" + thisReqUrl + "\n" + thisSessionId; - if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ - INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str()); - Util::logExitReason("Session rejected by USER_NEW"); - connections.setExit(); - connections.finishAll(); - break; - }else{ - INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str()); + { + // Set active stream name to last active or "" if there were multiple streams active recently + std::string thisStream = ""; + for (std::map::iterator it = streamLastActive.begin(); + it != streamLastActive.end(); ++it){ + if (now - it->second < STATS_DELAY){ + if (!thisStream.size()){ + thisStream = it->first; + }else if (thisStream != it->first){ + thisStream = ""; + break; + } + } + } + sessions.setStream(thisStream); } } - } - // Remember latest activity so we know when this session ends - if (currentConnections){ + // Retrigger USER_NEW if a re-sync was requested + if (!thisType && forceTrigger){ + forceTrigger = false; + std::string host; + Socket::hostBytesToStr(thisHost.data(), 16, host); + if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){ + INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str()); + std::string payload = thisStreamName + "\n" + host + "\n" + + thisToken + "\n" + thisProtocol + + "\n" + thisReqUrl + "\n" + thisSessionId; + if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ + INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str()); + Util::logExitReason("Session rejected by USER_NEW"); + connections.setExit(); + connections.finishAll(); + break; + }else{ + INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str()); + } + } + } + + // Remember latest activity so we know when this session ends + if (currentConnections){ + } + Util::wait(1000); } - Util::wait(1000); - } + shouldSleep = connections.getExit(); + }//connections scope end if (Util::bootSecs() - lastSeen > STATS_DELAY){ Util::logExitReason("Session inactive for %d seconds", STATS_DELAY); } @@ -412,7 +417,7 @@ int main(int argc, char **argv){ Triggers::doTrigger("USER_END", summary.str(), thisStreamName); } - if (!thisType && connections.getExit()){ + if (!thisType && shouldSleep){ uint64_t sleepStart = Util::bootSecs(); // Keep session invalidated for 10 minutes, or until the session stops while (config.is_active && Util::bootSecs() - sleepStart < SESS_TIMEOUT){