Made process library thread-safe.
This commit is contained in:
		
							parent
							
								
									0ea2a5e4b1
								
							
						
					
					
						commit
						5da3c24eae
					
				
					 2 changed files with 98 additions and 46 deletions
				
			
		
							
								
								
									
										136
									
								
								lib/procs.cpp
									
										
									
									
									
								
							
							
						
						
									
										136
									
								
								lib/procs.cpp
									
										
									
									
									
								
							|  | @ -24,12 +24,29 @@ | |||
| std::set<pid_t> Util::Procs::plist; | ||||
| std::set<int> Util::Procs::socketList; | ||||
| bool Util::Procs::handler_set = false; | ||||
| bool Util::Procs::thread_handler = false; | ||||
| tthread::mutex Util::Procs::plistMutex; | ||||
| tthread::thread * Util::Procs::reaper_thread = 0; | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| static bool childRunning(pid_t p) { | ||||
|   pid_t ret = waitpid(p, 0, WNOHANG); | ||||
| /// Local-only function. Attempts to reap child and returns current running status.
 | ||||
| bool Util::Procs::childRunning(pid_t p) { | ||||
|   int status; | ||||
|   pid_t ret = waitpid(p, &status, WNOHANG); | ||||
|   if (ret == p) { | ||||
|     tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|     int exitcode = -1; | ||||
|     if (WIFEXITED(status)) { | ||||
|       exitcode = WEXITSTATUS(status); | ||||
|     } else if (WIFSIGNALED(status)) { | ||||
|       exitcode = -WTERMSIG(status); | ||||
|     } | ||||
|     if (plist.count(ret)) { | ||||
|       HIGH_MSG("Process %d fully terminated with code %d", ret, exitcode); | ||||
|       plist.erase(ret); | ||||
|     } else { | ||||
|       HIGH_MSG("Child process %d exited with code %d", ret, exitcode); | ||||
|     } | ||||
|     return false; | ||||
|   } | ||||
|   if (ret < 0 && errno == EINTR) { | ||||
|  | @ -49,7 +66,17 @@ bool Util::Procs::isRunning(pid_t pid){ | |||
| /// all remaining children. Waits one more second for cleanup to finish, then exits.
 | ||||
| void Util::Procs::exit_handler() { | ||||
|   int waiting = 0; | ||||
|   std::set<pid_t> listcopy = plist; | ||||
|   std::set<pid_t> listcopy; | ||||
|   { | ||||
|     tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|     listcopy = plist; | ||||
|     thread_handler = false; | ||||
|   } | ||||
|   if (reaper_thread){ | ||||
|     reaper_thread->join(); | ||||
|     delete reaper_thread; | ||||
|     reaper_thread = 0; | ||||
|   } | ||||
|   std::set<pid_t>::iterator it; | ||||
|   if (listcopy.empty()) { | ||||
|     return; | ||||
|  | @ -72,7 +99,7 @@ void Util::Procs::exit_handler() { | |||
|     return; | ||||
|   } | ||||
| 
 | ||||
|   DEBUG_MSG(DLVL_DEVEL, "Sending SIGINT to remaining %d children", (int)listcopy.size()); | ||||
|   WARN_MSG("Sending SIGINT to remaining %d children", (int)listcopy.size()); | ||||
|   //send sigint to all remaining
 | ||||
|   if (!listcopy.empty()) { | ||||
|     for (it = listcopy.begin(); it != listcopy.end(); it++) { | ||||
|  | @ -81,7 +108,7 @@ void Util::Procs::exit_handler() { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   DEBUG_MSG(DLVL_DEVEL, "Waiting up to 5 seconds for %d children to terminate.", (int)listcopy.size()); | ||||
|   INFO_MSG("Waiting up to 5 seconds for %d children to terminate.", (int)listcopy.size()); | ||||
|   waiting = 0; | ||||
|   //wait up to 5 seconds for applications to shut down
 | ||||
|   while (!listcopy.empty() && waiting <= 250) { | ||||
|  | @ -100,7 +127,7 @@ void Util::Procs::exit_handler() { | |||
|     return; | ||||
|   } | ||||
| 
 | ||||
|   DEBUG_MSG(DLVL_DEVEL, "Sending SIGKILL to remaining %d children", (int)listcopy.size()); | ||||
|   ERROR_MSG("Sending SIGKILL to remaining %d children", (int)listcopy.size()); | ||||
|   //send sigkill to all remaining
 | ||||
|   if (!listcopy.empty()) { | ||||
|     for (it = listcopy.begin(); it != listcopy.end(); it++) { | ||||
|  | @ -109,7 +136,7 @@ void Util::Procs::exit_handler() { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   DEBUG_MSG(DLVL_DEVEL, "Waiting up to a second for %d children to terminate.", (int)listcopy.size()); | ||||
|   INFO_MSG("Waiting up to a second for %d children to terminate.", (int)listcopy.size()); | ||||
|   waiting = 0; | ||||
|   //wait up to 1 second for applications to shut down
 | ||||
|   while (!listcopy.empty() && waiting <= 50) { | ||||
|  | @ -127,14 +154,17 @@ void Util::Procs::exit_handler() { | |||
|   if (listcopy.empty()) { | ||||
|     return; | ||||
|   } | ||||
|   DEBUG_MSG(DLVL_DEVEL, "Giving up with %d children left.", (int)listcopy.size()); | ||||
| 
 | ||||
|   FAIL_MSG("Giving up with %d children left.", (int)listcopy.size()); | ||||
| } | ||||
| 
 | ||||
| /// Sets up exit and childsig handlers.
 | ||||
| /// Spawns grim_reaper. exit handler despawns grim_reaper
 | ||||
| /// Called by every Start* function.
 | ||||
| void Util::Procs::setHandler() { | ||||
|   tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|   if (!handler_set) { | ||||
|     thread_handler = true; | ||||
|     reaper_thread = new tthread::thread(grim_reaper, 0); | ||||
|     struct sigaction new_action; | ||||
|     new_action.sa_handler = childsig_handler; | ||||
|     sigemptyset(&new_action.sa_mask); | ||||
|  | @ -145,41 +175,48 @@ void Util::Procs::setHandler() { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| /// Used internally to capture child signals and update plist.
 | ||||
| void Util::Procs::childsig_handler(int signum) { | ||||
|   if (signum != SIGCHLD) { | ||||
|     return; | ||||
|   } | ||||
|   int status; | ||||
|   pid_t ret = -1; | ||||
|   while (ret != 0) { | ||||
|     ret = waitpid(-1, &status, WNOHANG); | ||||
|     if (ret <= 0) { //ignore, would block otherwise
 | ||||
|       if (ret == 0 || errno != EINTR) { | ||||
|         return; | ||||
| ///Thread that loops until thread_handler is false.
 | ||||
| ///Reaps available children and then sleeps for a second.
 | ||||
| ///Not done in signal handler so we can use a mutex to prevent race conditions.
 | ||||
| void Util::Procs::grim_reaper(void * n){ | ||||
|   VERYHIGH_MSG("Grim reaper start"); | ||||
|   while (thread_handler){ | ||||
|     { | ||||
|       tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|       int status; | ||||
|       pid_t ret = -1; | ||||
|       while (ret != 0) { | ||||
|         ret = waitpid(-1, &status, WNOHANG); | ||||
|         if (ret <= 0) { //ignore, would block otherwise
 | ||||
|           if (ret == 0 || errno != EINTR) { | ||||
|             break; | ||||
|           } | ||||
|           continue; | ||||
|         } | ||||
|         int exitcode; | ||||
|         if (WIFEXITED(status)) { | ||||
|           exitcode = WEXITSTATUS(status); | ||||
|         } else if (WIFSIGNALED(status)) { | ||||
|           exitcode = -WTERMSIG(status); | ||||
|         } else { // not possible
 | ||||
|           break; | ||||
|         } | ||||
|         if (plist.count(ret)) { | ||||
|           HIGH_MSG("Process %d fully terminated with code %d", ret, exitcode); | ||||
|           plist.erase(ret); | ||||
|         } else { | ||||
|           HIGH_MSG("Child process %d exited with code %d", ret, exitcode); | ||||
|         } | ||||
|       } | ||||
|       continue; | ||||
|     } | ||||
|     int exitcode; | ||||
|     if (WIFEXITED(status)) { | ||||
|       exitcode = WEXITSTATUS(status); | ||||
|     } else if (WIFSIGNALED(status)) { | ||||
|       exitcode = -WTERMSIG(status); | ||||
|     } else { // not possible
 | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     plist.erase(ret); | ||||
| #if DEBUG >= DLVL_HIGH | ||||
|     if (!isActive(pname)) { | ||||
|       DEBUG_MSG(DLVL_HIGH, "Process %d fully terminated", ret); | ||||
|     } else { | ||||
|       DEBUG_MSG(DLVL_HIGH, "Child process %d exited", ret); | ||||
|     } | ||||
| #endif | ||||
| 
 | ||||
|     Util::sleep(500); | ||||
|   } | ||||
|   VERYHIGH_MSG("Grim reaper stop"); | ||||
| } | ||||
| 
 | ||||
| /// Ignores everything. Separate thread handles waiting for children.
 | ||||
| void Util::Procs::childsig_handler(int signum) { | ||||
|   return; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
|  | @ -188,7 +225,7 @@ std::string Util::Procs::getOutputOf(char * const * argv) { | |||
|   std::string ret; | ||||
|   int fin = 0, fout = -1, ferr = 0; | ||||
|   pid_t myProc = StartPiped(argv, &fin, &fout, &ferr); | ||||
|   while (isActive(myProc)) { | ||||
|   while (childRunning(myProc)) { | ||||
|     Util::sleep(100); | ||||
|   } | ||||
|   FILE * outFile = fdopen(fout, "r"); | ||||
|  | @ -349,7 +386,10 @@ pid_t Util::Procs::StartPiped(char * const * argv, int * fdin, int * fdout, int | |||
|     } | ||||
|     return 0; | ||||
|   } else { //parent
 | ||||
|     plist.insert(pid); | ||||
|     { | ||||
|       tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|       plist.insert(pid); | ||||
|     } | ||||
|     DEBUG_MSG(DLVL_HIGH, "Piped process %s started, PID %d", argv[0], pid); | ||||
|     if (devnull != -1) { | ||||
|       close(devnull); | ||||
|  | @ -384,7 +424,11 @@ void Util::Procs::Murder(pid_t name) { | |||
| 
 | ||||
| /// (Attempts to) stop all running child processes.
 | ||||
| void Util::Procs::StopAll() { | ||||
|   std::set<pid_t> listcopy = plist; | ||||
|   std::set<pid_t> listcopy; | ||||
|   { | ||||
|     tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|     listcopy = plist; | ||||
|   } | ||||
|   std::set<pid_t>::iterator it; | ||||
|   for (it = listcopy.begin(); it != listcopy.end(); it++) { | ||||
|     Stop(*it); | ||||
|  | @ -393,11 +437,13 @@ void Util::Procs::StopAll() { | |||
| 
 | ||||
| /// Returns the number of active child processes.
 | ||||
| int Util::Procs::Count() { | ||||
|   tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|   return plist.size(); | ||||
| } | ||||
| 
 | ||||
| /// Returns true if a process with this PID is currently active.
 | ||||
| bool Util::Procs::isActive(pid_t name) { | ||||
|   tthread::lock_guard<tthread::mutex> guard(plistMutex); | ||||
|   return (plist.count(name) == 1) && (kill(name, 0) == 0); | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -7,6 +7,7 @@ | |||
| #include <set> | ||||
| #include <vector> | ||||
| #include <deque> | ||||
| #include "tinythread.h" | ||||
| 
 | ||||
| /// Contains utility code, not directly related to streaming media
 | ||||
| namespace Util { | ||||
|  | @ -14,13 +15,18 @@ namespace Util { | |||
|   /// Deals with spawning, monitoring and stopping child processes
 | ||||
|   class Procs { | ||||
|     private: | ||||
|       static std::set<pid_t> plist; ///< Holds active processes
 | ||||
|       static bool childRunning(pid_t p); | ||||
|       static tthread::mutex plistMutex; | ||||
|       static tthread::thread * reaper_thread; | ||||
|       static std::set<pid_t> plist; ///< Holds active process list.
 | ||||
|       static bool handler_set; ///< If true, the sigchld handler has been setup.
 | ||||
|       static bool thread_handler;///< True while thread handler should be running.
 | ||||
|       static void childsig_handler(int signum); | ||||
|       static void exit_handler(); | ||||
|       static void runCmd(std::string & cmd); | ||||
|       static void setHandler(); | ||||
|       static char* const* dequeToArgv(std::deque<std::string> & argDeq); | ||||
|       static void grim_reaper(void * n); | ||||
|     public: | ||||
|       static std::string getOutputOf(char * const * argv); | ||||
|       static std::string getOutputOf(std::deque<std::string> & argDeq); | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Thulinma
						Thulinma