Fix various subtle concurrency issues in the controller

This commit is contained in:
Thulinma 2024-06-19 01:24:06 +02:00
parent 6eec3661ca
commit aca4623a8a
4 changed files with 13 additions and 3 deletions

View file

@ -586,7 +586,6 @@ int main_loop(int argc, char **argv){
monitorThread.join(); monitorThread.join();
HIGH_MSG("Joining UDP API thread..."); HIGH_MSG("Joining UDP API thread...");
UDPAPIThread.join(); UDPAPIThread.join();
/*LTS-START*/
HIGH_MSG("Joining uplink thread..."); HIGH_MSG("Joining uplink thread...");
uplinkThread.join(); uplinkThread.join();
HIGH_MSG("Joining push thread..."); HIGH_MSG("Joining push thread...");
@ -597,9 +596,9 @@ int main_loop(int argc, char **argv){
HIGH_MSG("Joining updater thread..."); HIGH_MSG("Joining updater thread...");
updaterThread.join(); updaterThread.join();
#endif #endif
/*LTS-END*/
// write config // write config
tthread::lock_guard<tthread::mutex> guard(Controller::logMutex); tthread::lock_guard<tthread::mutex> guardLog(Controller::logMutex);
tthread::lock_guard<tthread::mutex> guardCnf(Controller::configMutex);
Controller::writeConfigToDisk(true); Controller::writeConfigToDisk(true);
// stop all child processes // stop all child processes
Util::Procs::StopAll(); Util::Procs::StopAll();

View file

@ -348,6 +348,7 @@ int Controller::handleAPIConnection(Socket::Connection &conn){
req["authorize"] = JSON::fromString(auth.substr(5)); req["authorize"] = JSON::fromString(auth.substr(5));
if (Storage["account"]){ if (Storage["account"]){
tthread::lock_guard<tthread::mutex> guard(configMutex); tthread::lock_guard<tthread::mutex> guard(configMutex);
if (!Controller::conf.is_active){return 0;}
authorized = authorize(req, req, conn); authorized = authorize(req, req, conn);
if (!authorized){ if (!authorized){
H.Clean(); H.Clean();
@ -413,6 +414,7 @@ int Controller::handleAPIConnection(Socket::Connection &conn){
if (H.url == "/api2"){Request["minimal"] = true;} if (H.url == "/api2"){Request["minimal"] = true;}
{// lock the config mutex here - do not unlock until done processing {// lock the config mutex here - do not unlock until done processing
tthread::lock_guard<tthread::mutex> guard(configMutex); tthread::lock_guard<tthread::mutex> guard(configMutex);
if (!Controller::conf.is_active){return 0;}
// if already authorized, do not re-check for authorization // if already authorized, do not re-check for authorization
if (authorized && Storage["account"]){ if (authorized && Storage["account"]){
Response["authorize"]["status"] = "OK"; Response["authorize"]["status"] = "OK";
@ -1053,6 +1055,7 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
/// ///
if (Request.isMember("clearstatlogs") || Request.isMember("log") || !Request.isMember("minimal")){ if (Request.isMember("clearstatlogs") || Request.isMember("log") || !Request.isMember("minimal")){
tthread::lock_guard<tthread::mutex> guard(logMutex); tthread::lock_guard<tthread::mutex> guard(logMutex);
if (!Controller::conf.is_active){return;}
if (!Request.isMember("minimal") || Request.isMember("log")){ if (!Request.isMember("minimal") || Request.isMember("log")){
Response["log"] = Controller::Storage["log"]; Response["log"] = Controller::Storage["log"];
} }

View file

@ -1859,6 +1859,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
{// Scope for shortest possible blocking of statsMutex {// Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex); tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
if (!Controller::conf.is_active){return;}
response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n"; response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n";
response << "# TYPE mist_sessions_total gauge\n"; response << "# TYPE mist_sessions_total gauge\n";
@ -1937,6 +1938,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
resp["bwlimit"] = bwLimit; resp["bwlimit"] = bwLimit;
{// Scope for shortest possible blocking of statsMutex {// Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex); tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
if (!Controller::conf.is_active){return;}
resp["curr"].append((uint64_t)sessions.size()); resp["curr"].append((uint64_t)sessions.size());
if (Controller::triggerStats.size()){ if (Controller::triggerStats.size()){
@ -1982,6 +1984,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
{ {
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex); tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
if (!Controller::conf.is_active){return;}
// add tags, if any // add tags, if any
if (Storage.isMember("tags") && Storage["tags"].isArray() && Storage["tags"].size()){resp["tags"] = Storage["tags"];} if (Storage.isMember("tags") && Storage["tags"].isArray() && Storage["tags"].size()){resp["tags"] = Storage["tags"];}
// Loop over connectors // Loop over connectors

View file

@ -30,19 +30,23 @@ namespace Controller{
JSON::Value logs; JSON::Value logs;
}; };
std::map<pid_t, procInfo> activeProcs; std::map<pid_t, procInfo> activeProcs;
tthread::recursive_mutex procMutex;
void procLogMessage(uint64_t id, const JSON::Value & msg){ void procLogMessage(uint64_t id, const JSON::Value & msg){
tthread::lock_guard<tthread::recursive_mutex> procGuard(procMutex);
JSON::Value &log = activeProcs[id].logs; JSON::Value &log = activeProcs[id].logs;
log.append(msg); log.append(msg);
log.shrink(25); log.shrink(25);
} }
bool isProcActive(uint64_t id){ bool isProcActive(uint64_t id){
tthread::lock_guard<tthread::recursive_mutex> procGuard(procMutex);
return activeProcs.count(id); return activeProcs.count(id);
} }
void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList){ void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList){
tthread::lock_guard<tthread::recursive_mutex> procGuard(procMutex);
std::set<pid_t> wipeList; std::set<pid_t> wipeList;
for (std::map<pid_t, procInfo>::iterator it = activeProcs.begin(); it != activeProcs.end(); ++it){ for (std::map<pid_t, procInfo>::iterator it = activeProcs.begin(); it != activeProcs.end(); ++it){
if (!stream.size() || stream == it->second.sink || stream == it->second.source){ if (!stream.size() || stream == it->second.sink || stream == it->second.source){
@ -65,6 +69,7 @@ namespace Controller{
} }
void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status){ void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status){
tthread::lock_guard<tthread::recursive_mutex> procGuard(procMutex);
procInfo & prc = activeProcs[id]; procInfo & prc = activeProcs[id];
prc.lastupdate = Util::bootSecs(); prc.lastupdate = Util::bootSecs();
prc.stats.extend(status); prc.stats.extend(status);