Package rosunit :: Module pmon
[frames] | no frames]

Source Code for Module rosunit.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Process monitor 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41  import os 
 42  import sys 
 43  import time 
 44  import traceback 
 45  try: 
 46      from queue import Empty, Queue 
 47  except ImportError: 
 48      from Queue import Empty, Queue 
 49  import atexit 
 50  from threading import Thread, RLock, Lock 
 51   
 52  from .core import printlog, printlog_bold, printerrlog 
 53   
54 -class PmonException(Exception): pass
55
56 -class FatalProcessLaunch(PmonException):
57 """ 58 Exception to indicate that a process launch has failed in a fatal 59 manner (i.e. relaunch is unlikely to succeed) 60 """ 61 pass
62 63 # start/shutdown ################################################ 64 65 _pmons = [] 66 _pmon_counter = 0 67 _shutting_down = False
68 -def start_process_monitor():
69 global _pmon_counter 70 if _shutting_down: 71 return None 72 _pmon_counter += 1 73 name = "ProcessMonitor-%s"%_pmon_counter 74 process_monitor = ProcessMonitor(name) 75 with _shutdown_lock: 76 # prevent race condition with pmon_shutdown() being triggered 77 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 78 # during startup) 79 _pmons.append(process_monitor) 80 process_monitor.start() 81 return process_monitor
82
83 -def shutdown_process_monitor(process_monitor):
84 """ 85 @param process_monitor: process monitor to kill 86 @type process_monitor: L{ProcessMonitor} 87 @return: True if process_monitor was successfully 88 shutdown. False if it could not be shutdown cleanly or if there is 89 a problem with process_monitor 90 parameter. shutdown_process_monitor() does not throw any exceptions 91 as this is shutdown-critical code. 92 @rtype: bool 93 """ 94 try: 95 if process_monitor is None or process_monitor.is_shutdown: 96 return False 97 98 process_monitor.shutdown() 99 process_monitor.join(20.0) 100 if process_monitor.isAlive(): 101 return False 102 else: 103 return True 104 except Exception as e: 105 return False
106 107 _shutdown_lock = Lock()
108 -def pmon_shutdown():
109 global _pmons 110 with _shutdown_lock: 111 if not _pmons: 112 return 113 for p in _pmons: 114 shutdown_process_monitor(p) 115 del _pmons[:]
116 117 atexit.register(pmon_shutdown) 118 119 # ############################################################## 120
121 -class Process(object):
122 """ 123 Basic process representation for L{ProcessMonitor}. Must be subclassed 124 to provide actual start()/stop() implementations. 125 """ 126
127 - def __init__(self, package, name, args, env, respawn=False, required=False):
128 self.package = package 129 self.name = name 130 self.args = args 131 self.env = env 132 self.respawn = respawn 133 self.required = required 134 self.lock = Lock() 135 self.exit_code = None 136 # for keeping track of respawning 137 self.spawn_count = 0
138
139 - def __str__(self):
140 return "Process<%s>"%(self.name)
141 142 # NOTE: get_info() is going to have to be sufficient for 143 # generating respawn requests, so we must be complete about it 144
145 - def get_info(self):
146 """ 147 Get all data about this process in dictionary form 148 @return: dictionary of all relevant process properties 149 @rtype: dict { str: val } 150 """ 151 info = { 152 'spawn_count': self.spawn_count, 153 'args': self.args, 154 'env': self.env, 155 'package': self.package, 156 'name': self.name, 157 'alive': self.is_alive(), 158 'respawn': self.respawn, 159 'required': self.required, 160 } 161 if self.exit_code is not None: 162 info['exit_code'] = self.exit_code 163 return info
164
165 - def start(self):
166 self.spawn_count += 1
167
168 - def is_alive(self):
169 return False
170
171 - def stop(self, errors=[]):
172 """ 173 Stop the process. Record any significant error messages in the errors parameter 174 175 @param errors: error messages. stop() will record messages into this list. 176 @type errors: [str] 177 """ 178 pass
179
180 - def get_exit_description(self):
181 if self.exit_code is not None: 182 if self.exit_code: 183 return 'process has died [exit code %s]'%self.exit_code 184 else: 185 # try not to scare users about process exit 186 return 'process has finished cleanly' 187 else: 188 return 'process has died'
189
190 -class DeadProcess(Process):
191 """ 192 Container class to maintain information about a process that has died. This 193 container allows us to delete the actual Process but still maintain the metadata 194 """
195 - def __init__(self, p):
196 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn) 197 self.exit_code = p.exit_code 198 self.lock = None 199 self.spawn_count = p.spawn_count 200 self.info = p.get_info()
201 - def get_info(self):
202 return self.info
203 - def start(self):
204 raise Exception("cannot call start on a dead process!")
205 - def is_alive(self):
206 return False
207
208 -class ProcessListener(object):
209 """ 210 Listener class for L{ProcessMonitor} 211 """ 212
213 - def process_died(self, process_name, exit_code):
214 """ 215 Notifies listener that process has died. This callback only 216 occurs for processes that die during normal process monitor 217 execution -- processes that are forcibly killed during 218 ProcessMonitor shutdown are not reported. 219 @param process_name: name of process 220 @type process_name: str 221 @param exit_code: exit code of process. If None, it means 222 that ProcessMonitor was unable to determine an exit code. 223 @type exit_code: int 224 """ 225 pass
226
227 -class ProcessMonitor(Thread):
228
229 - def __init__(self, name="ProcessMonitor"):
230 Thread.__init__(self, name=name) 231 self.procs = [] 232 self.plock = RLock() 233 self.is_shutdown = False 234 self.done = False 235 self.setDaemon(True) 236 self.listeners = [] 237 self.dead_list = [] 238 # #885: ensure core procs 239 self.core_procs = [] 240 # #642: flag to prevent process monitor exiting prematurely 241 self._registrations_complete = False
242
243 - def add_process_listener(self, l):
244 """ 245 Listener for process events. MUST be called before 246 ProcessMonitor is running.See ProcessListener class. 247 @param l: listener instance 248 @type l: L{ProcessListener} 249 """ 250 self.listeners.append(l)
251
252 - def register(self, p):
253 """ 254 Register process with L{ProcessMonitor} 255 @param p: Process 256 @type p: L{Process} 257 @raise PmonException: if process with same name is already registered 258 """ 259 e = None 260 with self.plock: 261 if self.has_process(p.name): 262 e = PmonException("cannot add process with duplicate name '%s'"%p.name) 263 elif self.is_shutdown: 264 e = PmonException("cannot add process [%s] after process monitor has been shut down"%p.name) 265 else: 266 self.procs.append(p) 267 if e: 268 raise e
269
270 - def register_core_proc(self, p):
271 """ 272 Register core process with ProcessMonitor. Coreprocesses 273 have special shutdown semantics. They are killed after all 274 other processes, in reverse order in which they are added. 275 @param p Process 276 @type p: L{Process} 277 @raise PmonException: if process with same name is already registered 278 """ 279 self.register(p) 280 self.core_procs.append(p)
281
282 - def registrations_complete(self):
283 """ 284 Inform the process monitor that registrations are complete. 285 After the registrations_complete flag is set, process monitor 286 will exit if there are no processes left to monitor. 287 """ 288 self._registrations_complete = True
289
290 - def unregister(self, p):
291 with self.plock: 292 self.procs.remove(p)
293
294 - def has_process(self, name):
295 """ 296 @return: True if process is still be monitored. If False, process 297 has died or was never registered with process 298 @rtype: bool 299 """ 300 return len([p for p in self.procs if p.name == name]) > 0
301
302 - def get_process(self, name):
303 """ 304 @return: process registered under \a name, or None 305 @rtype: L{Process} 306 """ 307 with self.plock: 308 v = [p for p in self.procs if p.name == name] 309 if v: 310 return v[0]
311
312 - def kill_process(self, name):
313 """ 314 Kill process that matches name. NOTE: a killed process will 315 continue to show up as active until the process monitor thread 316 has caught that it has died. 317 @param name: Process name 318 @type name: str 319 @return: True if a process named name was removed from 320 process monitor. A process is considered killed if its stop() 321 method was called. 322 @rtype: bool 323 """ 324 if not isinstance(name, basestring): 325 raise PmonException("kill_process takes in a process name but was given: %s"%name) 326 printlog("[%s] kill requested"%name) 327 with self.plock: 328 p = self.get_process(name) 329 if p: 330 try: 331 # no need to accumulate errors, so pass in [] 332 p.stop([]) 333 except Exception as e: 334 printerrlog("Exception: %s"%(str(e))) 335 return True 336 else: 337 return False
338
339 - def shutdown(self):
340 """ 341 Shutdown the process monitor thread 342 """ 343 self.is_shutdown = True
344
345 - def get_active_names(self):
346 """ 347 @return [str]: list of active process names 348 """ 349 with self.plock: 350 retval = [p.name for p in self.procs] 351 return retval
352
354 """ 355 @return: Two lists, where first 356 list of active process names along with the number of times 357 that process has been spawned. Second list contains dead process names 358 and their spawn count. 359 @rtype: [[(str, int),], [(str,int),]] 360 """ 361 with self.plock: 362 actives = [(p.name, p.spawn_count) for p in self.procs] 363 deads = [(p.name, p.spawn_count) for p in self.dead_list] 364 retval = [actives, deads] 365 return retval
366
367 - def run(self):
368 """ 369 thread routine of the process monitor. 370 """ 371 try: 372 #don't let exceptions bomb thread, interferes with exit 373 try: 374 self._run() 375 except: 376 traceback.print_exc() 377 finally: 378 self._post_run()
379
380 - def _run(self):
381 """ 382 Internal run loop of ProcessMonitor 383 """ 384 plock = self.plock 385 dead = [] 386 respawn = [] 387 while not self.is_shutdown: 388 with plock: #copy self.procs 389 procs = self.procs[:] 390 if self.is_shutdown: 391 break 392 393 for p in procs: 394 try: 395 if not p.is_alive(): 396 exit_code_str = p.get_exit_description() 397 if p.respawn: 398 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str)) 399 respawn.append(p) 400 elif p.required: 401 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80) 402 self.is_shutdown = True 403 else: 404 if p.exit_code: 405 printerrlog("[%s] %s"%(p.name, exit_code_str)) 406 else: 407 printlog_bold("[%s] %s"%(p.name, exit_code_str)) 408 dead.append(p) 409 410 ## no need for lock as we require listeners be 411 ## added before process monitor is launched 412 for l in self.listeners: 413 l.process_died(p.name, p.exit_code) 414 415 except Exception as e: 416 traceback.print_exc() 417 #don't respawn as this is an internal error 418 dead.append(p) 419 if self.is_shutdown: 420 break #stop polling 421 for d in dead: 422 try: 423 self.unregister(d) 424 # stop process, don't accumulate errors 425 d.stop([]) 426 427 # save process data to dead list 428 with plock: 429 self.dead_list.append(DeadProcess(d)) 430 except Exception as e: 431 printerrlog("Exception: %s"%(str(e))) 432 433 # dead check is to make sure that ProcessMonitor at least 434 # waits until its had at least one process before exiting 435 if self._registrations_complete and dead and not self.procs and not respawn: 436 printlog("all processes on machine have died, roslaunch will exit") 437 self.is_shutdown = True 438 del dead[:] 439 for r in respawn: 440 try: 441 if self.is_shutdown: 442 break 443 printlog("[%s] restarting process"%r.name) 444 # stop process, don't accumulate errors 445 r.stop([]) 446 r.start() 447 except: 448 traceback.print_exc() 449 del respawn[:] 450 time.sleep(0.1) #yield thread
451 #moved this to finally block of _post_run 452 #self._post_run() #kill all processes 453
454 - def _post_run(self):
455 # this is already true entering, but go ahead and make sure 456 self.is_shutdown = True 457 # killall processes on run exit 458 459 q = Queue() 460 q.join() 461 462 with self.plock: 463 # make copy of core_procs for threadsafe usage 464 core_procs = self.core_procs[:] 465 466 # enqueue all non-core procs in reverse order for parallel kill 467 # #526/885: ignore core procs 468 [q.put(p) for p in reversed(self.procs) if not p in core_procs] 469 470 # use 10 workers 471 killers = [] 472 for i in range(10): 473 t = _ProcessKiller(q, i) 474 killers.append(t) 475 t.start() 476 477 # wait for workers to finish 478 q.join() 479 shutdown_errors = [] 480 481 # accumulate all the shutdown errors 482 for t in killers: 483 shutdown_errors.extend(t.errors) 484 del killers[:] 485 486 # #526/885: kill core procs last 487 # we don't want to parallelize this as the master has to be last 488 for p in reversed(core_procs): 489 _kill_process(p, shutdown_errors) 490 491 # delete everything except dead_list 492 with self.plock: 493 del core_procs[:] 494 del self.procs[:] 495 del self.core_procs[:] 496 497 self.done = True 498 499 if shutdown_errors: 500 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
501
502 -def _kill_process(p, errors):
503 """ 504 Routine for kill Process p with appropriate logging to screen and logfile 505 506 @param p: process to kill 507 @type p: Process 508 @param errors: list of error messages from killed process 509 @type errors: [str] 510 """ 511 try: 512 printlog("[%s] killing on exit"%p.name) 513 # we accumulate errors from each process so that we can print these at the end 514 p.stop(errors) 515 except Exception as e: 516 printerrlog("Exception: %s"%(str(e)))
517
518 -class _ProcessKiller(Thread):
519
520 - def __init__(self, q, i):
521 Thread.__init__(self, name="ProcessKiller-%s"%i) 522 self.q = q 523 self.errors = []
524
525 - def run(self):
526 q = self.q 527 while not q.empty(): 528 try: 529 p = q.get(False) 530 _kill_process(p, self.errors) 531 q.task_done() 532 except Empty: 533 pass
534