From 42da518d5fcda9fd3c0f00f35a656d459f0b7508 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Wed, 22 May 2019 15:16:38 +0200
Subject: [PATCH 1/2] Fixed ERROR message when force-saving connector list to
 shared memory.

---
 src/controller/controller_connectors.cpp | 14 ++++++++++----
 src/controller/controller_connectors.h   |  2 +-
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/src/controller/controller_connectors.cpp b/src/controller/controller_connectors.cpp
index cdd7edab..b3762c15 100644
--- a/src/controller/controller_connectors.cpp
+++ b/src/controller/controller_connectors.cpp
@@ -23,11 +23,17 @@ namespace Controller {
   static std::map<std::string, pid_t> currentConnectors; ///<The currently running connectors.
 
   /// Updates the shared memory page with active connectors
-  void saveActiveConnectors(){
-    IPC::sharedPage f("MstCnns", 4096, true, false);
+  void saveActiveConnectors(bool forceOverride){
+    IPC::sharedPage f("MstCnns", 4096, forceOverride, false);
     if (!f.mapped){
-      FAIL_MSG("Could not store connector data!");
-      return;
+      if (!forceOverride){
+        saveActiveConnectors(true);
+        return;
+      }
+      if (!f.mapped){
+        FAIL_MSG("Could not store connector data!");
+        return;
+      }
     }
     memset(f.mapped, 0, 32);
     Util::RelAccX A(f.mapped, false);
diff --git a/src/controller/controller_connectors.h b/src/controller/controller_connectors.h
index e48ff308..75e7da5c 100644
--- a/src/controller/controller_connectors.h
+++ b/src/controller/controller_connectors.h
@@ -9,7 +9,7 @@ namespace Controller {
   bool CheckProtocols(JSON::Value & p, const JSON::Value & capabilities);
 
   /// Updates the shared memory page with active connectors
-  void saveActiveConnectors();
+  void saveActiveConnectors(bool forceOverride = false);
 
   /// Reads active connectors from the shared memory pages
   void loadActiveConnectors();

From ea443945fe70f4b33222605cf024b7bc76972d1f Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Thu, 23 May 2019 13:28:30 +0200
Subject: [PATCH 2/2] Generalized Util::Config::is_restarting for rolling
 restarts, added rolling restart support to listening socket outputs

---
 lib/config.cpp                           | 33 +++++++++++++++++++-----
 lib/config.h                             |  1 +
 lib/socket.cpp                           | 20 +++++++++-----
 lib/socket.h                             |  4 ++-
 src/controller/controller.cpp            | 12 ++++-----
 src/controller/controller_connectors.cpp |  2 ++
 src/controller/controller_statistics.cpp |  4 +--
 src/controller/controller_storage.cpp    |  1 -
 src/controller/controller_storage.h      |  1 -
 src/output/mist_out.cpp                  | 18 +++++++++++++
 10 files changed, 72 insertions(+), 24 deletions(-)

diff --git a/lib/config.cpp b/lib/config.cpp
index ff18b6fc..69847752 100644
--- a/lib/config.cpp
+++ b/lib/config.cpp
@@ -35,6 +35,7 @@
 #include <unistd.h>
 
 bool Util::Config::is_active = false;
+bool Util::Config::is_restarting = false;
 static Socket::Server *serv_sock_pointer = 0;
 uint32_t Util::Config::printDebugLevel = DEBUG; //
 std::string Util::Config::streamName;
@@ -321,16 +322,19 @@ int Util::Config::forkServer(Socket::Server &server_socket, int (*callback)(Sock
     }
   }
   Util::Procs::socketList.erase(server_socket.getSocket());
-  server_socket.close();
+  if (!is_restarting){
+    server_socket.close();
+  }
   return 0;
 }
 
 int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){
   Socket::Server server_socket;
-  if (vals.isMember("socket")){
+  if (Socket::checkTrueSocket(0)){
+    server_socket = Socket::Server(0);
+  }else if (vals.isMember("socket")){
     server_socket = Socket::Server(Util::getTmpFolder() + getString("socket"));
-  }
-  if (vals.isMember("port") && vals.isMember("interface")){
+  } else if (vals.isMember("port") && vals.isMember("interface")){
     server_socket = Socket::Server(getInteger("port"), getString("interface"), false);
   }
   if (!server_socket.connected()){
@@ -340,6 +344,13 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){
   serv_sock_pointer = &server_socket;
   DEVEL_MSG("Activating threaded server: %s", getString("cmd").c_str());
   activate();
+  if (server_socket.getSocket()){
+    int oldSock = server_socket.getSocket();
+    if (!dup2(oldSock, 0)){
+      server_socket = Socket::Server(0);
+      close(oldSock);
+    }
+  }
   int r = threadServer(server_socket, callback);
   serv_sock_pointer = 0;
   return r;
@@ -347,10 +358,11 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){
 
 int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){
   Socket::Server server_socket;
-  if (vals.isMember("socket")){
+  if (Socket::checkTrueSocket(0)){
+    server_socket = Socket::Server(0);
+  }else if (vals.isMember("socket")){
     server_socket = Socket::Server(Util::getTmpFolder() + getString("socket"));
-  }
-  if (vals.isMember("port") && vals.isMember("interface")){
+  } else if (vals.isMember("port") && vals.isMember("interface")){
     server_socket = Socket::Server(getInteger("port"), getString("interface"), false);
   }
   if (!server_socket.connected()){
@@ -360,6 +372,13 @@ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){
   serv_sock_pointer = &server_socket;
   DEVEL_MSG("Activating forked server: %s", getString("cmd").c_str());
   activate();
+  if (server_socket.getSocket()){
+    int oldSock = server_socket.getSocket();
+    if (!dup2(oldSock, 0)){
+      server_socket = Socket::Server(0);
+      close(oldSock);
+    }
+  }
   int r = forkServer(server_socket, callback);
   serv_sock_pointer = 0;
   return r;
diff --git a/lib/config.h b/lib/config.h
index 21f3b5e0..1ea0c865 100644
--- a/lib/config.h
+++ b/lib/config.h
@@ -24,6 +24,7 @@ namespace Util{
   public:
     // variables
     static bool is_active; ///< Set to true by activate(), set to false by the signal handler.
+    static bool is_restarting; ///< Set to true when restarting, set to false on boot.
     static uint32_t printDebugLevel;
     static std::string streamName; ///< Used by debug messages to identify the stream name
     // functions
diff --git a/lib/socket.cpp b/lib/socket.cpp
index 4ab377fe..63608271 100644
--- a/lib/socket.cpp
+++ b/lib/socket.cpp
@@ -58,6 +58,13 @@ bool Socket::isLocalhost(const std::string &remotehost){
   return false;
 }
 
+///Checks if the given file descriptor is actually socket or not.
+bool Socket::checkTrueSocket(int sock){
+  struct stat sBuf;
+  if (sock != -1 && !fstat(sock, &sBuf)){return S_ISSOCK(sBuf.st_mode);}
+  return false;
+}
+
 bool Socket::isLocal(const std::string &remotehost){
   struct ifaddrs *ifAddrStruct = NULL;
   struct ifaddrs *ifa = NULL;
@@ -400,9 +407,7 @@ void Socket::Connection::setBoundAddr(){
 Socket::Connection::Connection(int sockNo){
   sSend = sockNo;
   sRecv = -1;
-  isTrueSocket = false;
-  struct stat sBuf;
-  if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);}
+  isTrueSocket = Socket::checkTrueSocket(sSend);
   setBoundAddr();
   up = 0;
   down = 0;
@@ -422,9 +427,7 @@ Socket::Connection::Connection(int write, int read){
   }else{
     sRecv = -1;
   }
-  isTrueSocket = false;
-  struct stat sBuf;
-  if (sSend != -1 && !fstat(sSend, &sBuf)){isTrueSocket = S_ISSOCK(sBuf.st_mode);}
+  isTrueSocket = Socket::checkTrueSocket(sSend);
   setBoundAddr();
   up = 0;
   down = 0;
@@ -1089,6 +1092,11 @@ Socket::Server::Server(){
   sock = -1;
 }// Socket::Server base Constructor
 
+/// Create a new Server from existing socket.
+Socket::Server::Server(int fromSock){
+  sock = fromSock;
+}
+
 /// Create a new TCP Server. The socket is immediately bound and set to listen.
 /// A maximum of 100 connections will be accepted between accept() calls.
 /// Any further connections coming in will be dropped.
diff --git a/lib/socket.h b/lib/socket.h
index e378530c..7907fe86 100644
--- a/lib/socket.h
+++ b/lib/socket.h
@@ -41,6 +41,7 @@ namespace Socket{
   bool isLocal(const std::string & host);
   /// Returns true if given human-readable hostname is a local address.
   bool isLocalhost(const std::string & host);
+  bool checkTrueSocket(int sock);
 
   /// A buffer made out of std::string objects that can be efficiently read from and written to.
   class Buffer{
@@ -165,7 +166,8 @@ namespace Socket{
     bool IPv4bind(int port, std::string hostname, bool nonblock); ///< Attempt to bind an IPv4 socket
   public:
     Server();                                                                  ///< Create a new base Server.
-    Server(int port, std::string hostname = "0.0.0.0", bool nonblock = false); ///< Create a new TCP Server.
+    Server(int existingSock);                                                  ///< Create a new Server from existing socket.
+    Server(int port, std::string hostname, bool nonblock = false);             ///< Create a new TCP Server.
     Server(std::string adres, bool nonblock = false);                          ///< Create a new Unix Server.
     Connection accept(bool nonblock = false);                                  ///< Accept any waiting connections.
     void setBlocking(bool blocking);                                           ///< Set this socket to be blocking (true) or nonblocking (false).
diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp
index c79c5fea..6db2e2c5 100644
--- a/src/controller/controller.cpp
+++ b/src/controller/controller.cpp
@@ -78,7 +78,7 @@ void statusMonitor(void *np){
     }
     Util::sleep(3000); // wait at least 3 seconds
   }
-  if (Controller::restarting){
+  if (Util::Config::is_restarting){
     Controller::prepareActiveConnectorsForReload();
   }else{
     Controller::prepareActiveConnectorsForShutdown();
@@ -371,7 +371,7 @@ int main_loop(int argc, char **argv){
     }else{
       shutdown_reason = "socket problem (API port closed)";
     }
-    if (Controller::restarting){shutdown_reason = "restart (on request)";}
+    if (Util::Config::is_restarting){shutdown_reason = "restart (on request)";}
     Controller::conf.is_active = false;
     Controller::Log("CONF", "Controller shutting down because of " + shutdown_reason);
   }
@@ -388,7 +388,7 @@ int main_loop(int argc, char **argv){
   // give everything some time to print messages
   Util::wait(100);
   std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
-  if (Controller::restarting){return 42;}
+  if (Util::Config::is_restarting){return 42;}
   // close stderr to make the stderr reading thread exit
   close(STDERR_FILENO);
   return 0;
@@ -396,7 +396,7 @@ int main_loop(int argc, char **argv){
 
 void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
   Controller::Log("CONF", "USR1 received - restarting controller");
-  Controller::restarting = true;
+  Util::Config::is_restarting = true;
   raise(SIGINT); // trigger restart
 }
 
@@ -436,9 +436,9 @@ int main(int argc, char **argv){
     // wait for the process to exit
     int status;
     while (waitpid(pid, &status, 0) != pid && errno == EINTR){
-      if (Controller::restarting){
+      if (Util::Config::is_restarting){
         Controller::conf.is_active = true;
-        Controller::restarting = false;
+        Util::Config::is_restarting = false;
         kill(pid, SIGUSR1);
       }
       if (!Controller::conf.is_active){
diff --git a/src/controller/controller_connectors.cpp b/src/controller/controller_connectors.cpp
index b3762c15..dcfd6324 100644
--- a/src/controller/controller_connectors.cpp
+++ b/src/controller/controller_connectors.cpp
@@ -56,11 +56,13 @@ namespace Controller {
     IPC::sharedPage f("MstCnns", 4096, false, false);
     const Util::RelAccX A(f.mapped, false);
     if (A.isReady()){
+      INFO_MSG("Reloading existing connectors to complete rolling restart");
       for (uint32_t i = 0; i < A.getRCount(); ++i){
         char * p = A.getPointer("cmd", i);
         if (p != 0 && p[0] != 0){
           currentConnectors[p] = A.getInt("pid", i);
           Util::Procs::remember(A.getInt("pid", i));
+          kill(A.getInt("pid", i), SIGUSR1);
         }
       }
     }
diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp
index c703335a..1dc6273e 100644
--- a/src/controller/controller_statistics.cpp
+++ b/src/controller/controller_statistics.cpp
@@ -196,10 +196,10 @@ void Controller::SharedMemStats(void * config){
   }
   statPointer = 0;
   HIGH_MSG("Stopping stats thread");
-  if (Controller::restarting){
+  if (Util::Config::is_restarting){
     statServer.abandon();
   }
-  Controller::deinitState(Controller::restarting);
+  Controller::deinitState(Util::Config::is_restarting);
 }
 
 /// Gets a complete list of all streams currently in active state, with optional prefix matching
diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp
index 477e7b02..eaf11e0f 100644
--- a/src/controller/controller_storage.cpp
+++ b/src/controller/controller_storage.cpp
@@ -20,7 +20,6 @@ namespace Controller {
   tthread::mutex logMutex;
   uint64_t logCounter = 0;
   bool configChanged = false;
-  bool restarting = false;
   bool isTerminal = false;
   bool isColorized = false;
   uint32_t maxLogsRecs = 0;
diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h
index beb3de73..395a0317 100644
--- a/src/controller/controller_storage.h
+++ b/src/controller/controller_storage.h
@@ -11,7 +11,6 @@ namespace Controller {
   extern tthread::mutex logMutex;///< Mutex for log thread.
   extern tthread::mutex configMutex;///< Mutex for server config access.
   extern bool configChanged; ///< Bool that indicates config must be written to SHM.
-  extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
   extern bool isTerminal;///< True if connected to a terminal and not a log file.
   extern bool isColorized;///< True if we colorize the output
   extern uint64_t logCounter; ///<Count of logged messages since boot
diff --git a/src/output/mist_out.cpp b/src/output/mist_out.cpp
index a0432cce..14607a79 100644
--- a/src/output/mist_out.cpp
+++ b/src/output/mist_out.cpp
@@ -9,6 +9,12 @@ int spawnForked(Socket::Connection & S){
   return tmp.run();
 }
 
+void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
+  HIGH_MSG("USR1 received - triggering rolling restart");
+  Util::Config::is_restarting = true;
+  Util::Config::is_active = false;
+}
+
 int main(int argc, char * argv[]) {
   Util::redirectLogsIfNeeded();
   Util::Config conf(argv[0]);
@@ -21,7 +27,19 @@ int main(int argc, char * argv[]) {
     }
     conf.activate();
     if (mistOut::listenMode()){
+      {
+        struct sigaction new_action;
+        new_action.sa_sigaction = handleUSR1;
+        sigemptyset(&new_action.sa_mask);
+        new_action.sa_flags = 0;
+        sigaction(SIGUSR1, &new_action, NULL);
+      }
       mistOut::listener(conf, spawnForked);
+      if (conf.is_restarting && Socket::checkTrueSocket(0)){
+        INFO_MSG("Reloading input while re-using server socket");
+        execvp(argv[0], argv);
+        FAIL_MSG("Error reloading: %s", strerror(errno));
+      }
     }else{
       Socket::Connection S(fileno(stdout),fileno(stdin) );
       mistOut tmp(S);