Package roslaunch :: Module pmon
[frames] | no frames]

Source Code for Module roslaunch.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Process monitoring implementation for roslaunch. 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41  import os 
 42  import sys 
 43  import time 
 44  import traceback 
 45  import logging 
 46  import Queue 
 47  import signal 
 48  import atexit 
 49  from threading import Thread, RLock, Lock 
 50   
 51  import roslib 
 52  from roslaunch.core import printlog, printlog_bold, printerrlog, RLException 
 53   
 54  logger = logging.getLogger("roslaunch.pmon")           
 55   
56 -class FatalProcessLaunch(RLException):
57 """ 58 Exception to indicate that a process launch has failed in a fatal 59 manner (i.e. relaunch is unlikely to succeed) 60 """ 61 pass
62 63 # start/shutdown ################################################ 64 65 _pmons = [] 66 _pmon_counter = 0
67 -def start_process_monitor():
68 global _pmon_counter 69 if _shutting_down: 70 #logger.error("start_process_monitor: cannot start new ProcessMonitor (shutdown initiated)") 71 return None 72 _pmon_counter += 1 73 name = "ProcessMonitor-%s"%_pmon_counter 74 logger.info("start_process_monitor: creating ProcessMonitor") 75 process_monitor = ProcessMonitor(name) 76 try: 77 # prevent race condition with pmon_shutdown() being triggered 78 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 79 # during startup) 80 _shutdown_lock.acquire() 81 _pmons.append(process_monitor) 82 process_monitor.start() 83 logger.info("start_process_monitor: ProcessMonitor started") 84 finally: 85 _shutdown_lock.release() 86 87 return process_monitor
88
89 -def shutdown_process_monitor(process_monitor):
90 """ 91 @param process_monitor: process monitor to kill 92 @type process_monitor: L{ProcessMonitor} 93 @return: True if process_monitor was successfully 94 shutdown. False if it could not be shutdown cleanly or if there is 95 a problem with process_monitor 96 parameter. shutdown_process_monitor() does not throw any exceptions 97 as this is shutdown-critical code. 98 @rtype: bool 99 """ 100 try: 101 if process_monitor is None or process_monitor.is_shutdown: 102 return False 103 104 # I've decided to comment out use of logger until after 105 # critical section is done, just in case logger is already 106 # being torn down 107 108 #logger.debug("shutdown_process_monitor: shutting down ProcessMonitor") 109 process_monitor.shutdown() 110 #logger.debug("shutdown_process_monitor: joining ProcessMonitor") 111 process_monitor.join(20.0) 112 if process_monitor.isAlive(): 113 logger.error("shutdown_process_monitor: ProcessMonitor shutdown failed!") 114 return False 115 else: 116 logger.debug("shutdown_process_monitor: ProcessMonitor shutdown succeeded") 117 return True 118 except Exception, e: 119 print >> sys.stderr, "exception in shutdown_process_monitor: %s"%e 120 traceback.print_exc() 121 return False
122 123 _shutdown_lock = Lock()
124 -def pmon_shutdown():
125 global _pmons 126 try: 127 _shutdown_lock.acquire() 128 try: 129 if not _pmons: 130 return 131 for p in _pmons: 132 shutdown_process_monitor(p) 133 del _pmons[:] 134 except: 135 print "exception in pmon_shutdown" 136 traceback.print_exc() 137 finally: 138 _shutdown_lock.release()
139 140 _signal_chain = {} 141 _shutting_down = False
142 -def rl_signal(sig, stackframe):
143 global _shutting_down 144 if _shutting_down: 145 return #prevent infinite callbacks 146 _shutting_down = True 147 pmon_shutdown() 148 prev_handler = _signal_chain.get(sig, None) 149 if prev_handler and prev_handler not in [signal.SIG_IGN, signal.SIG_DFL, rl_signal]: 150 try: 151 prev_handler(sig, stackframe) 152 except KeyboardInterrupt: 153 pass #filter out generic keyboard interrupt handler
154 155 if sys.platform in ['win32']: # cygwin seems to be ok 156 _signal_list = [signal.SIGTERM, signal.SIGINT] 157 else: 158 _signal_list = [signal.SIGTERM, signal.SIGINT, signal.SIGHUP] 159 160 _sig_initialized = False
161 -def _init_signal_handlers():
162 global _sig_initialized 163 if _sig_initialized: 164 return 165 if not roslib.is_interactive(): 166 for s in _signal_list: 167 _signal_chain[s] = signal.signal(s, rl_signal) 168 atexit.register(pmon_shutdown) 169 _sig_initialized = True
170 171 # ############################################################## 172
173 -class Process(object):
174 """ 175 Basic process representation for L{ProcessMonitor}. Must be subclassed 176 to provide actual start()/stop() implementations. 177 178 Constructor *must* be called from the Python Main thread in order 179 for signal handlers to register properly. 180 """ 181
182 - def __init__(self, package, name, args, env, respawn=False, required=False):
183 self.package = package 184 self.name = name 185 self.args = args 186 self.env = env 187 self.respawn = respawn 188 self.required = required 189 self.lock = Lock() 190 self.exit_code = None 191 # for keeping track of respawning 192 self.spawn_count = 0 193 194 _init_signal_handlers()
195
196 - def __str__(self):
197 return "Process<%s>"%(self.name)
198 199 # NOTE: get_info() is going to have to be sufficient for 200 # generating respawn requests, so we must be complete about it 201
202 - def get_info(self):
203 """ 204 Get all data about this process in dictionary form 205 @return: dictionary of all relevant process properties 206 @rtype: dict { str: val } 207 """ 208 info = { 209 'spawn_count': self.spawn_count, 210 'args': self.args, 211 'env': self.env, 212 'package': self.package, 213 'name': self.name, 214 'alive': self.is_alive(), 215 'respawn': self.respawn, 216 'required': self.required, 217 } 218 if self.exit_code is not None: 219 info['exit_code'] = self.exit_code 220 return info
221
222 - def start(self):
223 self.spawn_count += 1
224
225 - def is_alive(self):
226 return False
227
228 - def stop(self, errors=None):
229 """ 230 Stop the process. Record any significant error messages in the errors parameter 231 232 @param errors: error messages. stop() will record messages into this list. 233 @type errors: [str] 234 """ 235 pass
236
237 - def get_exit_description(self):
238 if self.exit_code is not None: 239 if self.exit_code: 240 return 'process has died [exit code %s]'%self.exit_code 241 else: 242 # try not to scare users about process exit 243 return 'process has finished cleanly' 244 else: 245 return 'process has died'
246
247 -class DeadProcess(Process):
248 """ 249 Container class to maintain information about a process that has died. This 250 container allows us to delete the actual Process but still maintain the metadata 251 """
252 - def __init__(self, p):
253 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn) 254 self.exit_code = p.exit_code 255 self.lock = None 256 self.spawn_count = p.spawn_count 257 self.info = p.get_info()
258 - def get_info(self):
259 return self.info
260 - def start(self):
261 raise Exception("cannot call start on a dead process!")
262 - def is_alive(self):
263 return False
264
265 -class ProcessListener(object):
266 """ 267 Listener class for L{ProcessMonitor} 268 """ 269
270 - def process_died(self, process_name, exit_code):
271 """ 272 Notifies listener that process has died. This callback only 273 occurs for processes that die during normal process monitor 274 execution -- processes that are forcibly killed during 275 ProcessMonitor shutdown are not reported. 276 @param process_name: name of process 277 @type process_name: str 278 @param exit_code: exit code of process. If None, it means 279 that ProcessMonitor was unable to determine an exit code. 280 @type exit_code: int 281 """ 282 pass
283
284 -class ProcessMonitor(Thread):
285
286 - def __init__(self, name="ProcessMonitor"):
287 Thread.__init__(self, name=name) 288 self.procs = [] 289 self.plock = RLock() 290 self.is_shutdown = False 291 self.done = False 292 self.setDaemon(True) 293 self.reacquire_signals = set() 294 self.listeners = [] 295 self.dead_list = [] 296 # #885: ensure core procs 297 self.core_procs = [] 298 # #642: flag to prevent process monitor exiting prematurely 299 self._registrations_complete = False 300 301 logger.info("created process monitor %s"%self)
302
303 - def add_process_listener(self, l):
304 """ 305 Listener for process events. MUST be called before 306 ProcessMonitor is running.See ProcessListener class. 307 @param l: listener instance 308 @type l: L{ProcessListener} 309 """ 310 self.listeners.append(l)
311
312 - def register(self, p):
313 """ 314 Register process with L{ProcessMonitor} 315 @param p: Process 316 @type p: L{Process} 317 @raise RLException: if process with same name is already registered 318 """ 319 logger.info("ProcessMonitor.register[%s]"%p.name) 320 e = None 321 with self.plock: 322 if self.has_process(p.name): 323 e = RLException("cannot add process with duplicate name '%s'"%p.name) 324 elif self.is_shutdown: 325 e = RLException("cannot add process [%s] after process monitor has been shut down"%p.name) 326 else: 327 self.procs.append(p) 328 if e: 329 logger.error("ProcessMonitor.register[%s] failed %s"%(p.name, e)) 330 raise e 331 else: 332 logger.info("ProcessMonitor.register[%s] complete"%p.name)
333
334 - def register_core_proc(self, p):
335 """ 336 Register core process with ProcessMonitor. Coreprocesses 337 have special shutdown semantics. They are killed after all 338 other processes, in reverse order in which they are added. 339 @param p Process 340 @type p: L{Process} 341 @raise RLException: if process with same name is already registered 342 """ 343 self.register(p) 344 self.core_procs.append(p)
345
346 - def registrations_complete(self):
347 """ 348 Inform the process monitor that registrations are complete. 349 After the registrations_complete flag is set, process monitor 350 will exit if there are no processes left to monitor. 351 """ 352 self._registrations_complete = True 353 logger.info("registrations completed %s"%self)
354
355 - def unregister(self, p):
356 logger.info("ProcessMonitor.unregister[%s] starting"%p.name) 357 with self.plock: 358 self.procs.remove(p) 359 logger.info("ProcessMonitor.unregister[%s] complete"%p.name)
360
361 - def has_process(self, name):
362 """ 363 @return: True if process is still be monitored. If False, process 364 has died or was never registered with process 365 @rtype: bool 366 """ 367 return len([p for p in self.procs if p.name == name]) > 0
368
369 - def get_process(self, name):
370 """ 371 @return: process registered under \a name, or None 372 @rtype: L{Process} 373 """ 374 with self.plock: 375 v = [p for p in self.procs if p.name == name] 376 if v: 377 return v[0]
378
379 - def has_main_thread_jobs(self):
380 """ 381 @return: True if ProcessMonitor has tasks that need to be run in the main thread 382 @rtype: bool 383 """ 384 return len(self.reacquire_signals)
385
386 - def do_main_thread_jobs(self):
387 """ 388 Execute tasks that need to be run in the main thread. Must be 389 called from main thread. 390 """ 391 #not entirely threadsafe 392 sigs = [s for s in self.reacquire_signals] 393 for s in sigs: 394 _signal_chain[s] = signal.signal(s, rl_signal) 395 self.reacquire_signals.remove(s)
396
397 - def kill_process(self, name):
398 """ 399 Kill process that matches name. NOTE: a killed process will 400 continue to show up as active until the process monitor thread 401 has caught that it has died. 402 @param name: Process name 403 @type name: str 404 @return: True if a process named name was removed from 405 process monitor. A process is considered killed if its stop() 406 method was called. 407 @rtype: bool 408 """ 409 if not isinstance(name, basestring): 410 raise RLException("kill_process takes in a process name but was given: %s"%name) 411 logger.debug("ProcessMonitor.kill_process[%s]"%name) 412 printlog("[%s] kill requested"%name) 413 with self.plock: 414 p = self.get_process(name) 415 if p: 416 try: 417 # no need to accumulate errors, so pass in [] 418 p.stop([]) 419 except: 420 logger.error(traceback.format_exc()) 421 return True 422 else: 423 return False
424
425 - def shutdown(self):
426 """ 427 Shutdown the process monitor thread 428 """ 429 logger.info("ProcessMonitor.shutdown %s"%self) 430 self.is_shutdown = True
431
432 - def get_active_names(self):
433 """ 434 @return [str]: list of active process names 435 """ 436 with self.plock: 437 retval = [p.name for p in self.procs] 438 return retval
439
441 """ 442 @return: Two lists, where first 443 list of active process names along with the number of times 444 that process has been spawned. Second list contains dead process names 445 and their spawn count. 446 @rtype: [[(str, int),], [(str,int),]] 447 """ 448 with self.plock: 449 actives = [(p.name, p.spawn_count) for p in self.procs] 450 deads = [(p.name, p.spawn_count) for p in self.dead_list] 451 retval = [actives, deads] 452 return retval
453
454 - def mainthread_spin_once(self):
455 """ 456 run() occurs in a separate thread and cannot do certain signal-related 457 work. The main thread of the application must call mainthread_spin() 458 or mainthread_spin_once() in order to perform these jobs. 459 """ 460 if not self.done: 461 if self.has_main_thread_jobs(): 462 self.do_main_thread_jobs() 463 return True 464 else: 465 return False
466
467 - def mainthread_spin(self):
468 """ 469 run() occurs in a separate thread and cannot do certain signal-related 470 work. The main thread of the application must call mainthread_spin() 471 or mainthread_spin_once() in order to perform these jobs. mainthread_spin() 472 blocks until the process monitor is complete. 473 """ 474 while not self.done: 475 if sys.platform in ['win32']: # cygwin seems to be ok 476 # windows sleep throws an exception when a signal has arrived, even when 477 # it has a handler. We can either use win32api.Sleep OR....catch 478 # the exception 479 try: 480 time.sleep(0.1) 481 except IOError: 482 pass 483 else: 484 time.sleep(0.1) 485 486 if self.has_main_thread_jobs(): 487 self.do_main_thread_jobs()
488
489 - def run(self):
490 """ 491 thread routine of the process monitor. NOTE: you must still 492 call mainthread_spin or mainthread_spin_once() from the main 493 thread in order to pick up main thread work from the process 494 monitor. 495 """ 496 try: 497 #don't let exceptions bomb thread, interferes with exit 498 try: 499 self._run() 500 except: 501 logger.error(traceback.format_exc()) 502 traceback.print_exc() 503 finally: 504 self._post_run()
505
506 - def _run(self):
507 """ 508 Internal run loop of ProcessMonitor 509 """ 510 plock = self.plock 511 dead = [] 512 respawn = [] 513 while not self.is_shutdown: 514 with plock: #copy self.procs 515 procs = self.procs[:] 516 if self.is_shutdown: 517 break 518 519 # check current signal handlers to see if children have stolen them away 520 # TODO: this code may not be necessary anymore (have to test) 521 for s in _signal_list: 522 if signal.getsignal(s) != rl_signal: 523 self.reacquire_signals.add(s) 524 525 for p in procs: 526 try: 527 if not p.is_alive(): 528 logger.debug("Process[%s] has died, respawn=%s, required=%s, exit_code=%s",p.name, p.respawn, p.required, p.exit_code) 529 exit_code_str = p.get_exit_description() 530 if p.respawn: 531 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str)) 532 respawn.append(p) 533 elif p.required: 534 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80) 535 self.is_shutdown = True 536 else: 537 if p.exit_code: 538 printerrlog("[%s] %s"%(p.name, exit_code_str)) 539 else: 540 printlog_bold("[%s] %s"%(p.name, exit_code_str)) 541 dead.append(p) 542 543 ## no need for lock as we require listeners be 544 ## added before process monitor is launched 545 for l in self.listeners: 546 l.process_died(p.name, p.exit_code) 547 548 except Exception, e: 549 traceback.print_exc() 550 #don't respawn as this is an internal error 551 dead.append(p) 552 if self.is_shutdown: 553 break #stop polling 554 for d in dead: 555 try: 556 self.unregister(d) 557 # stop process, don't accumulate errors 558 d.stop([]) 559 560 # save process data to dead list 561 with plock: 562 self.dead_list.append(DeadProcess(d)) 563 except: 564 logger.error(traceback.format_exc()) 565 566 # dead check is to make sure that ProcessMonitor at least 567 # waits until its had at least one process before exiting 568 if self._registrations_complete and dead and not self.procs and not respawn: 569 printlog("all processes on machine have died, roslaunch will exit") 570 self.is_shutdown = True 571 del dead[:] 572 for r in respawn: 573 try: 574 if self.is_shutdown: 575 break 576 printlog("[%s] restarting process"%r.name) 577 # stop process, don't accumulate errors 578 r.stop([]) 579 r.start() 580 except: 581 traceback.print_exc() 582 logger.error("Restart failed %s",traceback.format_exc()) 583 del respawn[:] 584 time.sleep(0.1) #yield thread
585 #moved this to finally block of _post_run 586 #self._post_run() #kill all processes 587
588 - def _post_run(self):
589 logger.info("ProcessMonitor._post_run %s"%self) 590 # this is already true entering, but go ahead and make sure 591 self.is_shutdown = True 592 # killall processes on run exit 593 594 q = Queue.Queue() 595 q.join() 596 597 with self.plock: 598 # make copy of core_procs for threadsafe usage 599 core_procs = self.core_procs[:] 600 logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, self.procs)) 601 602 # enqueue all non-core procs in reverse order for parallel kill 603 # #526/885: ignore core procs 604 [q.put(p) for p in reversed(self.procs) if not p in core_procs] 605 606 # use 10 workers 607 killers = [] 608 for i in range(10): 609 t = _ProcessKiller(q, i) 610 killers.append(t) 611 t.start() 612 613 # wait for workers to finish 614 q.join() 615 shutdown_errors = [] 616 617 # accumulate all the shutdown errors 618 for t in killers: 619 shutdown_errors.extend(t.errors) 620 del killers[:] 621 622 # #526/885: kill core procs last 623 # we don't want to parallelize this as the master has to be last 624 for p in reversed(core_procs): 625 _kill_process(p, shutdown_errors) 626 627 # delete everything except dead_list 628 logger.info("ProcessMonitor exit: cleaning up data structures and signals") 629 with self.plock: 630 del core_procs[:] 631 del self.procs[:] 632 del self.core_procs[:] 633 634 reacquire_signals = self.reacquire_signals 635 if reacquire_signals: 636 reacquire_signals.clear() 637 logger.info("ProcessMonitor exit: pmon has shutdown") 638 self.done = True 639 640 if shutdown_errors: 641 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
642
643 -def _kill_process(p, errors):
644 """ 645 Routine for kill Process p with appropriate logging to screen and logfile 646 647 @param p: process to kill 648 @type p: Process 649 @param errors: list of error messages from killed process 650 @type errors: [str] 651 """ 652 try: 653 logger.info("ProcessMonitor exit: killing %s", p.name) 654 printlog("[%s] killing on exit"%p.name) 655 # we accumulate errors from each process so that we can print these at the end 656 p.stop(errors) 657 except: 658 traceback.print_exc() 659 logger.error(traceback.format_exc())
660
661 -class _ProcessKiller(Thread):
662
663 - def __init__(self, q, i):
664 Thread.__init__(self, name="ProcessKiller-%s"%i) 665 self.q = q 666 self.errors = []
667
668 - def run(self):
669 q = self.q 670 while not q.empty(): 671 try: 672 p = q.get(False) 673 _kill_process(p, self.errors) 674 q.task_done() 675 except Queue.Empty: 676 pass
677