From aca4623a8ab7cd88da4dfd2faed1b4ab6f83495f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 19 Jun 2024 01:24:06 +0200 Subject: [PATCH] Fix various subtle concurrency issues in the controller --- src/controller/controller.cpp | 5 ++--- src/controller/controller_api.cpp | 3 +++ src/controller/controller_statistics.cpp | 3 +++ src/controller/controller_streams.cpp | 5 +++++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index e05b99d5..d883c1fe 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -586,7 +586,6 @@ int main_loop(int argc, char **argv){ monitorThread.join(); HIGH_MSG("Joining UDP API thread..."); UDPAPIThread.join(); - /*LTS-START*/ HIGH_MSG("Joining uplink thread..."); uplinkThread.join(); HIGH_MSG("Joining push thread..."); @@ -597,9 +596,9 @@ int main_loop(int argc, char **argv){ HIGH_MSG("Joining updater thread..."); updaterThread.join(); #endif - /*LTS-END*/ // write config - tthread::lock_guard guard(Controller::logMutex); + tthread::lock_guard guardLog(Controller::logMutex); + tthread::lock_guard guardCnf(Controller::configMutex); Controller::writeConfigToDisk(true); // stop all child processes Util::Procs::StopAll(); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index f46e1083..6ab017e4 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -348,6 +348,7 @@ int Controller::handleAPIConnection(Socket::Connection &conn){ req["authorize"] = JSON::fromString(auth.substr(5)); if (Storage["account"]){ tthread::lock_guard guard(configMutex); + if (!Controller::conf.is_active){return 0;} authorized = authorize(req, req, conn); if (!authorized){ H.Clean(); @@ -413,6 +414,7 @@ int Controller::handleAPIConnection(Socket::Connection &conn){ if (H.url == "/api2"){Request["minimal"] = true;} {// lock the config mutex here - do not unlock until done processing tthread::lock_guard guard(configMutex); + if (!Controller::conf.is_active){return 0;} // if already authorized, do not re-check for authorization if (authorized && Storage["account"]){ Response["authorize"]["status"] = "OK"; @@ -1053,6 +1055,7 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ /// if (Request.isMember("clearstatlogs") || Request.isMember("log") || !Request.isMember("minimal")){ tthread::lock_guard guard(logMutex); + if (!Controller::conf.is_active){return;} if (!Request.isMember("minimal") || Request.isMember("log")){ Response["log"] = Controller::Storage["log"]; } diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index a42ee476..a57e6e20 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -1859,6 +1859,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int {// Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); + if (!Controller::conf.is_active){return;} response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n"; response << "# TYPE mist_sessions_total gauge\n"; @@ -1937,6 +1938,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int resp["bwlimit"] = bwLimit; {// Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); + if (!Controller::conf.is_active){return;} resp["curr"].append((uint64_t)sessions.size()); if (Controller::triggerStats.size()){ @@ -1982,6 +1984,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int { tthread::lock_guard guard(Controller::configMutex); + if (!Controller::conf.is_active){return;} // add tags, if any if (Storage.isMember("tags") && Storage["tags"].isArray() && Storage["tags"].size()){resp["tags"] = Storage["tags"];} // Loop over connectors diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 2c8a7860..d416b861 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -30,19 +30,23 @@ namespace Controller{ JSON::Value logs; }; std::map activeProcs; + tthread::recursive_mutex procMutex; void procLogMessage(uint64_t id, const JSON::Value & msg){ + tthread::lock_guard procGuard(procMutex); JSON::Value &log = activeProcs[id].logs; log.append(msg); log.shrink(25); } bool isProcActive(uint64_t id){ + tthread::lock_guard procGuard(procMutex); return activeProcs.count(id); } void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList){ + tthread::lock_guard procGuard(procMutex); std::set wipeList; for (std::map::iterator it = activeProcs.begin(); it != activeProcs.end(); ++it){ if (!stream.size() || stream == it->second.sink || stream == it->second.source){ @@ -65,6 +69,7 @@ namespace Controller{ } void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status){ + tthread::lock_guard procGuard(procMutex); procInfo & prc = activeProcs[id]; prc.lastupdate = Util::bootSecs(); prc.stats.extend(status);