From ea443945fe70f4b33222605cf024b7bc76972d1f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 23 May 2019 13:28:30 +0200 Subject: [PATCH] Generalized Util::Config::is_restarting for rolling restarts, added rolling restart support to listening socket outputs --- lib/config.cpp | 33 +++++++++++++++++++----- lib/config.h | 1 + lib/socket.cpp | 20 +++++++++----- lib/socket.h | 4 ++- src/controller/controller.cpp | 12 ++++----- src/controller/controller_connectors.cpp | 2 ++ src/controller/controller_statistics.cpp | 4 +-- src/controller/controller_storage.cpp | 1 - src/controller/controller_storage.h | 1 - src/output/mist_out.cpp | 18 +++++++++++++ 10 files changed, 72 insertions(+), 24 deletions(-) diff --git a/lib/config.cpp b/lib/config.cpp index ff18b6fc..69847752 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -35,6 +35,7 @@ #include bool Util::Config::is_active = false; +bool Util::Config::is_restarting = false; static Socket::Server *serv_sock_pointer = 0; uint32_t Util::Config::printDebugLevel = DEBUG; // std::string Util::Config::streamName; @@ -321,16 +322,19 @@ int Util::Config::forkServer(Socket::Server &server_socket, int (*callback)(Sock } } Util::Procs::socketList.erase(server_socket.getSocket()); - server_socket.close(); + if (!is_restarting){ + server_socket.close(); + } return 0; } int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ Socket::Server server_socket; - if (vals.isMember("socket")){ + if (Socket::checkTrueSocket(0)){ + server_socket = Socket::Server(0); + }else if (vals.isMember("socket")){ server_socket = Socket::Server(Util::getTmpFolder() + getString("socket")); - } - if (vals.isMember("port") && vals.isMember("interface")){ + } else if (vals.isMember("port") && vals.isMember("interface")){ server_socket = Socket::Server(getInteger("port"), getString("interface"), false); } if (!server_socket.connected()){ @@ -340,6 +344,13 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ serv_sock_pointer = &server_socket; DEVEL_MSG("Activating threaded server: %s", getString("cmd").c_str()); activate(); + if (server_socket.getSocket()){ + int oldSock = server_socket.getSocket(); + if (!dup2(oldSock, 0)){ + server_socket = Socket::Server(0); + close(oldSock); + } + } int r = threadServer(server_socket, callback); serv_sock_pointer = 0; return r; @@ -347,10 +358,11 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){ Socket::Server server_socket; - if (vals.isMember("socket")){ + if (Socket::checkTrueSocket(0)){ + server_socket = Socket::Server(0); + }else if (vals.isMember("socket")){ server_socket = Socket::Server(Util::getTmpFolder() + getString("socket")); - } - if (vals.isMember("port") && vals.isMember("interface")){ + } else if (vals.isMember("port") && vals.isMember("interface")){ server_socket = Socket::Server(getInteger("port"), getString("interface"), false); } if (!server_socket.connected()){ @@ -360,6 +372,13 @@ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){ serv_sock_pointer = &server_socket; DEVEL_MSG("Activating forked server: %s", getString("cmd").c_str()); activate(); + if (server_socket.getSocket()){ + int oldSock = server_socket.getSocket(); + if (!dup2(oldSock, 0)){ + server_socket = Socket::Server(0); + close(oldSock); + } + } int r = forkServer(server_socket, callback); serv_sock_pointer = 0; return r; diff --git a/lib/config.h b/lib/config.h index 21f3b5e0..1ea0c865 100644 --- a/lib/config.h +++ b/lib/config.h @@ -24,6 +24,7 @@ namespace Util{ public: // variables static bool is_active; ///< Set to true by activate(), set to false by the signal handler. + static bool is_restarting; ///< Set to true when restarting, set to false on boot. static uint32_t printDebugLevel; static std::string streamName; ///< Used by debug messages to identify the stream name // functions diff --git a/lib/socket.cpp b/lib/socket.cpp index 4ab377fe..63608271 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -58,6 +58,13 @@ bool Socket::isLocalhost(const std::string &remotehost){ return false; } +///Checks if the given file descriptor is actually socket or not. +bool Socket::checkTrueSocket(int sock){ + struct stat sBuf; + if (sock != -1 && !fstat(sock, &sBuf)){return S_ISSOCK(sBuf.st_mode);} + return false; +} + bool Socket::isLocal(const std::string &remotehost){ struct ifaddrs *ifAddrStruct = NULL; struct ifaddrs *ifa = NULL; @@ -400,9 +407,7 @@ void Socket::Connection::setBoundAddr(){ Socket::Connection::Connection(int sockNo){ sSend = sockNo; sRecv = -1; - isTrueSocket = false; - struct stat sBuf; - if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);} + isTrueSocket = Socket::checkTrueSocket(sSend); setBoundAddr(); up = 0; down = 0; @@ -422,9 +427,7 @@ Socket::Connection::Connection(int write, int read){ }else{ sRecv = -1; } - isTrueSocket = false; - struct stat sBuf; - if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);} + isTrueSocket = Socket::checkTrueSocket(sSend); setBoundAddr(); up = 0; down = 0; @@ -1089,6 +1092,11 @@ Socket::Server::Server(){ sock = -1; }// Socket::Server base Constructor +/// Create a new Server from existing socket. +Socket::Server::Server(int fromSock){ + sock = fromSock; +} + /// Create a new TCP Server. The socket is immediately bound and set to listen. /// A maximum of 100 connections will be accepted between accept() calls. /// Any further connections coming in will be dropped. diff --git a/lib/socket.h b/lib/socket.h index e378530c..7907fe86 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -41,6 +41,7 @@ namespace Socket{ bool isLocal(const std::string & host); /// Returns true if given human-readable hostname is a local address. bool isLocalhost(const std::string & host); + bool checkTrueSocket(int sock); /// A buffer made out of std::string objects that can be efficiently read from and written to. class Buffer{ @@ -165,7 +166,8 @@ namespace Socket{ bool IPv4bind(int port, std::string hostname, bool nonblock); ///< Attempt to bind an IPv4 socket public: Server(); ///< Create a new base Server. - Server(int port, std::string hostname = "0.0.0.0", bool nonblock = false); ///< Create a new TCP Server. + Server(int existingSock); ///< Create a new Server from existing socket. + Server(int port, std::string hostname, bool nonblock = false); ///< Create a new TCP Server. Server(std::string adres, bool nonblock = false); ///< Create a new Unix Server. Connection accept(bool nonblock = false); ///< Accept any waiting connections. void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index c79c5fea..6db2e2c5 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -78,7 +78,7 @@ void statusMonitor(void *np){ } Util::sleep(3000); // wait at least 3 seconds } - if (Controller::restarting){ + if (Util::Config::is_restarting){ Controller::prepareActiveConnectorsForReload(); }else{ Controller::prepareActiveConnectorsForShutdown(); @@ -371,7 +371,7 @@ int main_loop(int argc, char **argv){ }else{ shutdown_reason = "socket problem (API port closed)"; } - if (Controller::restarting){shutdown_reason = "restart (on request)";} + if (Util::Config::is_restarting){shutdown_reason = "restart (on request)";} Controller::conf.is_active = false; Controller::Log("CONF", "Controller shutting down because of " + shutdown_reason); } @@ -388,7 +388,7 @@ int main_loop(int argc, char **argv){ // give everything some time to print messages Util::wait(100); std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; - if (Controller::restarting){return 42;} + if (Util::Config::is_restarting){return 42;} // close stderr to make the stderr reading thread exit close(STDERR_FILENO); return 0; @@ -396,7 +396,7 @@ int main_loop(int argc, char **argv){ void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){ Controller::Log("CONF", "USR1 received - restarting controller"); - Controller::restarting = true; + Util::Config::is_restarting = true; raise(SIGINT); // trigger restart } @@ -436,9 +436,9 @@ int main(int argc, char **argv){ // wait for the process to exit int status; while (waitpid(pid, &status, 0) != pid && errno == EINTR){ - if (Controller::restarting){ + if (Util::Config::is_restarting){ Controller::conf.is_active = true; - Controller::restarting = false; + Util::Config::is_restarting = false; kill(pid, SIGUSR1); } if (!Controller::conf.is_active){ diff --git a/src/controller/controller_connectors.cpp b/src/controller/controller_connectors.cpp index b3762c15..dcfd6324 100644 --- a/src/controller/controller_connectors.cpp +++ b/src/controller/controller_connectors.cpp @@ -56,11 +56,13 @@ namespace Controller { IPC::sharedPage f("MstCnns", 4096, false, false); const Util::RelAccX A(f.mapped, false); if (A.isReady()){ + INFO_MSG("Reloading existing connectors to complete rolling restart"); for (uint32_t i = 0; i < A.getRCount(); ++i){ char * p = A.getPointer("cmd", i); if (p != 0 && p[0] != 0){ currentConnectors[p] = A.getInt("pid", i); Util::Procs::remember(A.getInt("pid", i)); + kill(A.getInt("pid", i), SIGUSR1); } } } diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index c703335a..1dc6273e 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -196,10 +196,10 @@ void Controller::SharedMemStats(void * config){ } statPointer = 0; HIGH_MSG("Stopping stats thread"); - if (Controller::restarting){ + if (Util::Config::is_restarting){ statServer.abandon(); } - Controller::deinitState(Controller::restarting); + Controller::deinitState(Util::Config::is_restarting); } /// Gets a complete list of all streams currently in active state, with optional prefix matching diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 477e7b02..eaf11e0f 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -20,7 +20,6 @@ namespace Controller { tthread::mutex logMutex; uint64_t logCounter = 0; bool configChanged = false; - bool restarting = false; bool isTerminal = false; bool isColorized = false; uint32_t maxLogsRecs = 0; diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index beb3de73..395a0317 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -11,7 +11,6 @@ namespace Controller { extern tthread::mutex logMutex;///< Mutex for log thread. extern tthread::mutex configMutex;///< Mutex for server config access. extern bool configChanged; ///< Bool that indicates config must be written to SHM. - extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true). extern bool isTerminal;///< True if connected to a terminal and not a log file. extern bool isColorized;///< True if we colorize the output extern uint64_t logCounter; ///