Package roshlaunch :: Module pmon
[frames] | no frames]

Source Code for Module roshlaunch.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: pmon.py 10575 2010-07-31 22:13:41Z kwc $ 
 34   
 35  """ 
 36  Process monitoring implementation for roslaunch. 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41  import os 
 42  import sys 
 43  import time 
 44  import traceback 
 45  import logging 
 46  import Queue 
 47  import signal 
 48  import atexit 
 49  from threading import Thread, RLock, Lock 
 50   
 51  import roslib 
 52  from roslaunch.core import printlog, printlog_bold, printerrlog, RLException 
 53   
 54  logger = logging.getLogger("roslaunch.pmon")           
 55   
56 -class FatalProcessLaunch(RLException):
57 """ 58 Exception to indicate that a process launch has failed in a fatal 59 manner (i.e. relaunch is unlikely to succeed) 60 """ 61 pass
62 63 # start/shutdown ################################################ 64 65 _pmons = [] 66 _pmon_counter = 0
67 -def start_process_monitor():
68 global _pmon_counter 69 if _shutting_down: 70 #logger.error("start_process_monitor: cannot start new ProcessMonitor (shutdown initiated)") 71 return None 72 _pmon_counter += 1 73 name = "ProcessMonitor-%s"%_pmon_counter 74 logger.info("start_process_monitor: creating ProcessMonitor") 75 process_monitor = ProcessMonitor(name) 76 try: 77 # prevent race condition with pmon_shutdown() being triggered 78 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 79 # during startup) 80 _shutdown_lock.acquire() 81 _pmons.append(process_monitor) 82 process_monitor.start() 83 logger.info("start_process_monitor: ProcessMonitor started") 84 finally: 85 _shutdown_lock.release() 86 87 return process_monitor
88
89 -def shutdown_process_monitor(process_monitor):
90 """ 91 @param process_monitor: process monitor to kill 92 @type process_monitor: L{ProcessMonitor} 93 @return: True if process_monitor was successfully 94 shutdown. False if it could not be shutdown cleanly or if there is 95 a problem with process_monitor 96 parameter. shutdown_process_monitor() does not throw any exceptions 97 as this is shutdown-critical code. 98 @rtype: bool 99 """ 100 try: 101 if process_monitor is None or process_monitor.is_shutdown: 102 return False 103 104 # I've decided to comment out use of logger until after 105 # critical section is done, just in case logger is already 106 # being torn down 107 108 #logger.debug("shutdown_process_monitor: shutting down ProcessMonitor") 109 process_monitor.shutdown() 110 #logger.debug("shutdown_process_monitor: joining ProcessMonitor") 111 process_monitor.join(20.0) 112 if process_monitor.isAlive(): 113 logger.error("shutdown_process_monitor: ProcessMonitor shutdown failed!") 114 return False 115 else: 116 logger.debug("shutdown_process_monitor: ProcessMonitor shutdown succeeded") 117 return True 118 except Exception, e: 119 print >> sys.stderr, "exception in shutdown_process_monitor: %s"%e 120 traceback.print_exc() 121 return False
122 123 _shutdown_lock = Lock()
124 -def pmon_shutdown():
125 global _pmons 126 try: 127 _shutdown_lock.acquire() 128 try: 129 if not _pmons: 130 return 131 for p in _pmons: 132 shutdown_process_monitor(p) 133 del _pmons[:] 134 except: 135 print "exception in pmon_shutdown" 136 traceback.print_exc() 137 finally: 138 _shutdown_lock.release()
139 140 _signal_chain = {} 141 _shutting_down = False
142 -def rl_signal(sig, stackframe):
143 global _shutting_down 144 if _shutting_down: 145 return #prevent infinite callbacks 146 _shutting_down = True 147 pmon_shutdown() 148 prev_handler = _signal_chain.get(sig, None) 149 if prev_handler and prev_handler not in [signal.SIG_IGN, signal.SIG_DFL, rl_signal]: 150 try: 151 prev_handler(sig, stackframe) 152 except KeyboardInterrupt: 153 pass #filter out generic keyboard interrupt handler
154 155 _sig_initialized = False
156 -def _init_signal_handlers():
157 global _sig_initialized 158 if _sig_initialized: 159 return 160 if not roslib.is_interactive(): 161 _signal_chain[signal.SIGTERM] = signal.signal(signal.SIGTERM, rl_signal) 162 _signal_chain[signal.SIGINT] = signal.signal(signal.SIGINT, rl_signal) 163 _signal_chain[signal.SIGHUP] = signal.signal(signal.SIGHUP, rl_signal) 164 atexit.register(pmon_shutdown) 165 _sig_initialized = True
166 167 # ############################################################## 168
169 -class Process(object):
170 """ 171 Basic process representation for L{ProcessMonitor}. Must be subclassed 172 to provide actual start()/stop() implementations. 173 174 Constructor *must* be called from the Python Main thread in order 175 for signal handlers to register properly. 176 """ 177
178 - def __init__(self, package, name, args, env, respawn=False, required=False):
179 self.package = package 180 self.name = name 181 self.args = args 182 self.env = env 183 self.respawn = respawn 184 self.required = required 185 self.lock = Lock() 186 self.exit_code = None 187 # for keeping track of respawning 188 self.spawn_count = 0 189 190 _init_signal_handlers()
191
192 - def __str__(self):
193 return "Process<%s>"%(self.name)
194 195 # NOTE: get_info() is going to have to be sufficient for 196 # generating respawn requests, so we must be complete about it 197
198 - def get_info(self):
199 """ 200 Get all data about this process in dictionary form 201 @return: dictionary of all relevant process properties 202 @rtype: dict { str: val } 203 """ 204 info = { 205 'spawn_count': self.spawn_count, 206 'args': self.args, 207 'env': self.env, 208 'package': self.package, 209 'name': self.name, 210 'alive': self.is_alive(), 211 'respawn': self.respawn, 212 'required': self.required, 213 } 214 if self.exit_code is not None: 215 info['exit_code'] = self.exit_code 216 return info
217
218 - def start(self):
219 self.spawn_count += 1
220
221 - def is_alive(self):
222 return False
223
224 - def stop(self, errors=[]):
225 """ 226 Stop the process. Record any significant error messages in the errors parameter 227 228 @param errors: error messages. stop() will record messages into this list. 229 @type errors: [str] 230 """ 231 pass
232
233 - def get_exit_description(self):
234 if self.exit_code is not None: 235 if self.exit_code: 236 return 'process has died [exit code %s]'%self.exit_code 237 else: 238 # try not to scare users about process exit 239 return 'process has finished cleanly' 240 else: 241 return 'process has died'
242
243 -class DeadProcess(Process):
244 """ 245 Container class to maintain information about a process that has died. This 246 container allows us to delete the actual Process but still maintain the metadata 247 """
248 - def __init__(self, p):
249 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn) 250 self.exit_code = p.exit_code 251 self.lock = None 252 self.spawn_count = p.spawn_count 253 self.info = p.get_info()
254 - def get_info(self):
255 return self.info
256 - def start(self):
257 raise Exception("cannot call start on a dead process!")
258 - def is_alive(self):
259 return False
260
261 -class ProcessListener(object):
262 """ 263 Listener class for L{ProcessMonitor} 264 """ 265
266 - def process_died(self, process_name, exit_code):
267 """ 268 Notifies listener that process has died. This callback only 269 occurs for processes that die during normal process monitor 270 execution -- processes that are forcibly killed during 271 ProcessMonitor shutdown are not reported. 272 @param process_name: name of process 273 @type process_name: str 274 @param exit_code: exit code of process. If None, it means 275 that ProcessMonitor was unable to determine an exit code. 276 @type exit_code: int 277 """ 278 pass
279
280 -class ProcessMonitor(Thread):
281
282 - def __init__(self, name="ProcessMonitor"):
283 Thread.__init__(self, name=name) 284 self.procs = [] 285 self.plock = RLock() 286 self.is_shutdown = False 287 self.done = False 288 self.setDaemon(True) 289 self.reacquire_signals = set() 290 self.listeners = [] 291 self.dead_list = [] 292 # #885: ensure core procs 293 self.core_procs = [] 294 # #642: flag to prevent process monitor exiting prematurely 295 self._registrations_complete = False 296 297 logger.info("created process monitor %s"%self)
298
299 - def add_process_listener(self, l):
300 """ 301 Listener for process events. MUST be called before 302 ProcessMonitor is running.See ProcessListener class. 303 @param l: listener instance 304 @type l: L{ProcessListener} 305 """ 306 self.listeners.append(l)
307
308 - def register(self, p):
309 """ 310 Register process with L{ProcessMonitor} 311 @param p: Process 312 @type p: L{Process} 313 @raise RLException: if process with same name is already registered 314 """ 315 logger.info("ProcessMonitor.register[%s]"%p.name) 316 e = None 317 with self.plock: 318 if self.has_process(p.name): 319 e = RLException("cannot add process with duplicate name '%s'"%p.name) 320 elif self.is_shutdown: 321 e = RLException("cannot add process [%s] after process monitor has been shut down"%p.name) 322 else: 323 self.procs.append(p) 324 if e: 325 logger.error("ProcessMonitor.register[%s] failed %s"%(p.name, e)) 326 raise e 327 else: 328 logger.info("ProcessMonitor.register[%s] complete"%p.name)
329
330 - def register_core_proc(self, p):
331 """ 332 Register core process with ProcessMonitor. Coreprocesses 333 have special shutdown semantics. They are killed after all 334 other processes, in reverse order in which they are added. 335 @param p Process 336 @type p: L{Process} 337 @raise RLException: if process with same name is already registered 338 """ 339 self.register(p) 340 self.core_procs.append(p)
341
342 - def registrations_complete(self):
343 """ 344 Inform the process monitor that registrations are complete. 345 After the registrations_complete flag is set, process monitor 346 will exit if there are no processes left to monitor. 347 """ 348 self._registrations_complete = True 349 logger.info("registrations completed %s"%self)
350
351 - def unregister(self, p):
352 logger.info("ProcessMonitor.unregister[%s] starting"%p.name) 353 with self.plock: 354 self.procs.remove(p) 355 logger.info("ProcessMonitor.unregister[%s] complete"%p.name)
356
357 - def has_process(self, name):
358 """ 359 @return: True if process is still be monitored. If False, process 360 has died or was never registered with process 361 @rtype: bool 362 """ 363 return len([p for p in self.procs if p.name == name]) > 0
364
365 - def get_process(self, name):
366 """ 367 @return: process registered under \a name, or None 368 @rtype: L{Process} 369 """ 370 with self.plock: 371 v = [p for p in self.procs if p.name == name] 372 if v: 373 return v[0]
374
375 - def has_main_thread_jobs(self):
376 """ 377 @return: True if ProcessMonitor has tasks that need to be run in the main thread 378 @rtype: bool 379 """ 380 return len(self.reacquire_signals)
381
382 - def do_main_thread_jobs(self):
383 """ 384 Execute tasks that need to be run in the main thread. Must be 385 called from main thread. 386 """ 387 #not entirely threadsafe 388 sigs = [s for s in self.reacquire_signals] 389 for s in sigs: 390 _signal_chain[s] = signal.signal(s, rl_signal) 391 self.reacquire_signals.remove(s)
392
393 - def kill_process(self, name):
394 """ 395 Kill process that matches name. NOTE: a killed process will 396 continue to show up as active until the process monitor thread 397 has caught that it has died. 398 @param name: Process name 399 @type name: str 400 @return: True if a process named name was removed from 401 process monitor. A process is considered killed if its stop() 402 method was called. 403 @rtype: bool 404 """ 405 if not isinstance(name, basestring): 406 raise RLException("kill_process takes in a process name but was given: %s"%name) 407 logger.debug("ProcessMonitor.kill_process[%s]"%name) 408 printlog("[%s] kill requested"%name) 409 with self.plock: 410 p = self.get_process(name) 411 if p: 412 try: 413 # no need to accumulate errors, so pass in [] 414 p.stop([]) 415 except: 416 logger.error(traceback.format_exc()) 417 return True 418 else: 419 return False
420
421 - def shutdown(self):
422 """ 423 Shutdown the process monitor thread 424 """ 425 logger.info("ProcessMonitor.shutdown %s"%self) 426 self.is_shutdown = True
427
428 - def get_active_names(self):
429 """ 430 @return [str]: list of active process names 431 """ 432 with self.plock: 433 retval = [p.name for p in self.procs] 434 return retval
435
437 """ 438 @return: Two lists, where first 439 list of active process names along with the number of times 440 that process has been spawned. Second list contains dead process names 441 and their spawn count. 442 @rtype: [[(str, int),], [(str,int),]] 443 """ 444 with self.plock: 445 actives = [(p.name, p.spawn_count) for p in self.procs] 446 deads = [(p.name, p.spawn_count) for p in self.dead_list] 447 retval = [actives, deads] 448 return retval
449
450 - def mainthread_spin_once(self):
451 """ 452 run() occurs in a separate thread and cannot do certain signal-related 453 work. The main thread of the application must call mainthread_spin() 454 or mainthread_spin_once() in order to perform these jobs. 455 """ 456 if not self.done: 457 if self.has_main_thread_jobs(): 458 self.do_main_thread_jobs() 459 return True 460 else: 461 return False
462
463 - def mainthread_spin(self):
464 """ 465 run() occurs in a separate thread and cannot do certain signal-related 466 work. The main thread of the application must call mainthread_spin() 467 or mainthread_spin_once() in order to perform these jobs. mainthread_spin() 468 blocks until the process monitor is complete. 469 """ 470 while not self.done: 471 time.sleep(0.1) 472 if self.has_main_thread_jobs(): 473 self.do_main_thread_jobs()
474
475 - def run(self):
476 """ 477 thread routine of the process monitor. NOTE: you must still 478 call mainthread_spin or mainthread_spin_once() from the main 479 thread in order to pick up main thread work from the process 480 monitor. 481 """ 482 try: 483 #don't let exceptions bomb thread, interferes with exit 484 try: 485 self._run() 486 except: 487 logger.error(traceback.format_exc()) 488 traceback.print_exc() 489 finally: 490 self._post_run()
491
492 - def _run(self):
493 """ 494 Internal run loop of ProcessMonitor 495 """ 496 plock = self.plock 497 dead = [] 498 respawn = [] 499 while not self.is_shutdown: 500 with plock: #copy self.procs 501 procs = self.procs[:] 502 if self.is_shutdown: 503 break 504 505 # check current signal handlers to see if children have stolen them away 506 # TODO: this code may not be necessary anymore (have to test) 507 for s in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]: 508 if signal.getsignal(s) != rl_signal: 509 self.reacquire_signals.add(s) 510 511 for p in procs: 512 try: 513 if not p.is_alive(): 514 logger.debug("Process[%s] has died, respawn=%s, required=%s, exit_code=%s",p.name, p.respawn, p.required, p.exit_code) 515 exit_code_str = p.get_exit_description() 516 if p.respawn: 517 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str)) 518 respawn.append(p) 519 elif p.required: 520 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80) 521 self.is_shutdown = True 522 else: 523 if p.exit_code: 524 printerrlog("[%s] %s"%(p.name, exit_code_str)) 525 else: 526 printlog_bold("[%s] %s"%(p.name, exit_code_str)) 527 dead.append(p) 528 529 ## no need for lock as we require listeners be 530 ## added before process monitor is launched 531 for l in self.listeners: 532 l.process_died(p.name, p.exit_code) 533 534 except Exception, e: 535 traceback.print_exc() 536 #don't respawn as this is an internal error 537 dead.append(p) 538 if self.is_shutdown: 539 break #stop polling 540 for d in dead: 541 try: 542 self.unregister(d) 543 # stop process, don't accumulate errors 544 d.stop([]) 545 546 # save process data to dead list 547 with plock: 548 self.dead_list.append(DeadProcess(d)) 549 except: 550 logger.error(traceback.format_exc()) 551 552 # dead check is to make sure that ProcessMonitor at least 553 # waits until its had at least one process before exiting 554 if self._registrations_complete and dead and not self.procs and not respawn: 555 printlog("all processes on machine have died, roslaunch will exit") 556 self.is_shutdown = True 557 del dead[:] 558 for r in respawn: 559 try: 560 if self.is_shutdown: 561 break 562 printlog("[%s] restarting process"%r.name) 563 # stop process, don't accumulate errors 564 r.stop([]) 565 r.start() 566 except: 567 traceback.print_exc() 568 logger.error("Restart failed %s",traceback.format_exc()) 569 del respawn[:] 570 time.sleep(0.1) #yield thread
571 #moved this to finally block of _post_run 572 #self._post_run() #kill all processes 573
574 - def _post_run(self):
575 logger.info("ProcessMonitor._post_run %s"%self) 576 # this is already true entering, but go ahead and make sure 577 self.is_shutdown = True 578 # killall processes on run exit 579 580 q = Queue.Queue() 581 q.join() 582 583 with self.plock: 584 # make copy of core_procs for threadsafe usage 585 core_procs = self.core_procs[:] 586 logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, self.procs)) 587 588 # enqueue all non-core procs in reverse order for parallel kill 589 # #526/885: ignore core procs 590 [q.put(p) for p in reversed(self.procs) if not p in core_procs] 591 592 # use 10 workers 593 killers = [] 594 for i in range(10): 595 t = _ProcessKiller(q, i) 596 killers.append(t) 597 t.start() 598 599 # wait for workers to finish 600 q.join() 601 shutdown_errors = [] 602 603 # accumulate all the shutdown errors 604 for t in killers: 605 shutdown_errors.extend(t.errors) 606 del killers[:] 607 608 # #526/885: kill core procs last 609 # we don't want to parallelize this as the master has to be last 610 for p in reversed(core_procs): 611 _kill_process(p, shutdown_errors) 612 613 # delete everything except dead_list 614 logger.info("ProcessMonitor exit: cleaning up data structures and signals") 615 with self.plock: 616 del core_procs[:] 617 del self.procs[:] 618 del self.core_procs[:] 619 620 reacquire_signals = self.reacquire_signals 621 if reacquire_signals: 622 reacquire_signals.clear() 623 logger.info("ProcessMonitor exit: pmon has shutdown") 624 self.done = True 625 626 if shutdown_errors: 627 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
628
629 -def _kill_process(p, errors):
630 """ 631 Routine for kill Process p with appropriate logging to screen and logfile 632 633 @param p: process to kill 634 @type p: Process 635 @param errors: list of error messages from killed process 636 @type errors: [str] 637 """ 638 try: 639 logger.info("ProcessMonitor exit: killing %s", p.name) 640 printlog("[%s] killing on exit"%p.name) 641 # we accumulate errors from each process so that we can print these at the end 642 p.stop(errors) 643 except: 644 traceback.print_exc() 645 logger.error(traceback.format_exc())
646
647 -class _ProcessKiller(Thread):
648
649 - def __init__(self, q, i):
650 Thread.__init__(self, name="ProcessKiller-%s"%i) 651 self.q = q 652 self.errors = []
653
654 - def run(self):
655 q = self.q 656 while not q.empty(): 657 try: 658 p = q.get(False) 659 _kill_process(p, self.errors) 660 q.task_done() 661 except Queue.Empty: 662 pass
663