Package roslaunch :: Module pmon
[frames] | no frames]

Source Code for Module roslaunch.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import print_function 
 36  from __future__ import with_statement 
 37   
 38  """ 
 39  Process monitoring implementation for roslaunch. 
 40  """ 
 41   
 42  import os 
 43  import sys 
 44  import time 
 45  import traceback 
 46  import logging 
 47  try: 
 48      from queue import Empty, Queue 
 49  except ImportError: 
 50      from Queue import Empty, Queue 
 51  import signal 
 52  import atexit 
 53  from threading import Thread, RLock, Lock 
 54   
 55  import roslib 
 56  from roslaunch.core import printlog, printlog_bold, printerrlog, RLException 
 57   
 58  logger = logging.getLogger("roslaunch.pmon")           
 59   
60 -class FatalProcessLaunch(RLException):
61 """ 62 Exception to indicate that a process launch has failed in a fatal 63 manner (i.e. relaunch is unlikely to succeed) 64 """ 65 pass
66 67 # start/shutdown ################################################ 68 69 _pmons = [] 70 _pmon_counter = 0
71 -def start_process_monitor():
72 global _pmon_counter 73 if _shutting_down: 74 #logger.error("start_process_monitor: cannot start new ProcessMonitor (shutdown initiated)") 75 return None 76 _pmon_counter += 1 77 name = "ProcessMonitor-%s"%_pmon_counter 78 logger.info("start_process_monitor: creating ProcessMonitor") 79 process_monitor = ProcessMonitor(name) 80 try: 81 # prevent race condition with pmon_shutdown() being triggered 82 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 83 # during startup) 84 _shutdown_lock.acquire() 85 _pmons.append(process_monitor) 86 process_monitor.start() 87 logger.info("start_process_monitor: ProcessMonitor started") 88 finally: 89 _shutdown_lock.release() 90 91 return process_monitor
92
93 -def shutdown_process_monitor(process_monitor):
94 """ 95 @param process_monitor: process monitor to kill 96 @type process_monitor: L{ProcessMonitor} 97 @return: True if process_monitor was successfully 98 shutdown. False if it could not be shutdown cleanly or if there is 99 a problem with process_monitor 100 parameter. shutdown_process_monitor() does not throw any exceptions 101 as this is shutdown-critical code. 102 @rtype: bool 103 """ 104 try: 105 if process_monitor is None or process_monitor.is_shutdown: 106 return False 107 108 # I've decided to comment out use of logger until after 109 # critical section is done, just in case logger is already 110 # being torn down 111 112 #logger.debug("shutdown_process_monitor: shutting down ProcessMonitor") 113 process_monitor.shutdown() 114 #logger.debug("shutdown_process_monitor: joining ProcessMonitor") 115 process_monitor.join(20.0) 116 if process_monitor.isAlive(): 117 logger.error("shutdown_process_monitor: ProcessMonitor shutdown failed!") 118 return False 119 else: 120 logger.debug("shutdown_process_monitor: ProcessMonitor shutdown succeeded") 121 return True 122 except Exception as e: 123 print("exception in shutdown_process_monitor: %s" % e, file=sys.stderr) 124 traceback.print_exc() 125 return False
126 127 _shutdown_lock = Lock()
128 -def pmon_shutdown():
129 global _pmons 130 try: 131 _shutdown_lock.acquire() 132 try: 133 if not _pmons: 134 return 135 for p in _pmons: 136 shutdown_process_monitor(p) 137 del _pmons[:] 138 except: 139 print("exception in pmon_shutdown") 140 traceback.print_exc() 141 finally: 142 _shutdown_lock.release()
143 144 _signal_chain = {} 145 _shutting_down = False
146 -def rl_signal(sig, stackframe):
147 global _shutting_down 148 if _shutting_down: 149 return #prevent infinite callbacks 150 _shutting_down = True 151 pmon_shutdown() 152 prev_handler = _signal_chain.get(sig, None) 153 if prev_handler and prev_handler not in [signal.SIG_IGN, signal.SIG_DFL, rl_signal]: 154 try: 155 prev_handler(sig, stackframe) 156 except KeyboardInterrupt: 157 pass #filter out generic keyboard interrupt handler
158 159 if sys.platform in ['win32']: # cygwin seems to be ok 160 _signal_list = [signal.SIGTERM, signal.SIGINT] 161 else: 162 _signal_list = [signal.SIGTERM, signal.SIGINT, signal.SIGHUP] 163 164 _sig_initialized = False
165 -def _init_signal_handlers():
166 global _sig_initialized 167 if _sig_initialized: 168 return 169 if not roslib.is_interactive(): 170 for s in _signal_list: 171 _signal_chain[s] = signal.signal(s, rl_signal) 172 atexit.register(pmon_shutdown) 173 _sig_initialized = True
174 175 # ############################################################## 176
177 -class Process(object):
178 """ 179 Basic process representation for L{ProcessMonitor}. Must be subclassed 180 to provide actual start()/stop() implementations. 181 182 Constructor *must* be called from the Python Main thread in order 183 for signal handlers to register properly. 184 """ 185
186 - def __init__(self, package, name, args, env, 187 respawn=False, respawn_delay=0.0, required=False):
188 self.package = package 189 self.name = name 190 self.args = args 191 self.env = env 192 self.respawn = respawn 193 self.respawn_delay = respawn_delay 194 self.required = required 195 self.lock = Lock() 196 self.exit_code = None 197 # for keeping track of respawning 198 self.spawn_count = 0 199 self.time_of_death = None 200 201 _init_signal_handlers()
202
203 - def __str__(self):
204 return "Process<%s>"%(self.name)
205 206 # NOTE: get_info() is going to have to be sufficient for 207 # generating respawn requests, so we must be complete about it 208
209 - def get_info(self):
210 """ 211 Get all data about this process in dictionary form 212 @return: dictionary of all relevant process properties 213 @rtype: dict { str: val } 214 """ 215 info = { 216 'spawn_count': self.spawn_count, 217 'args': self.args, 218 'env': self.env, 219 'package': self.package, 220 'name': self.name, 221 'alive': self.is_alive(), 222 'respawn': self.respawn, 223 'respawn_delay': self.respawn_delay, 224 'required': self.required, 225 } 226 if self.exit_code is not None: 227 info['exit_code'] = self.exit_code 228 return info
229
230 - def start(self):
231 self.time_of_death = None 232 self.spawn_count += 1
233
234 - def is_alive(self):
235 if self.time_of_death is None: 236 self.time_of_death = time.time() 237 return False
238
239 - def should_respawn(self):
240 """ 241 @return: False if process should not respawn 242 floating point seconds until respawn otherwise 243 """ 244 if not self.respawn: 245 return False 246 if self.time_of_death is None: 247 if self.is_alive(): 248 return False 249 return (self.time_of_death + self.respawn_delay) - time.time()
250
251 - def stop(self, errors=None):
252 """ 253 Stop the process. Record any significant error messages in the errors parameter 254 255 @param errors: error messages. stop() will record messages into this list. 256 @type errors: [str] 257 """ 258 pass
259
260 - def get_exit_description(self):
261 if self.exit_code is not None: 262 if self.exit_code: 263 return 'process has died [exit code %s]'%self.exit_code 264 else: 265 # try not to scare users about process exit 266 return 'process has finished cleanly' 267 else: 268 return 'process has died'
269
270 -class DeadProcess(Process):
271 """ 272 Container class to maintain information about a process that has died. This 273 container allows us to delete the actual Process but still maintain the metadata 274 """
275 - def __init__(self, p):
276 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, 277 p.respawn, p.respawn_delay) 278 self.exit_code = p.exit_code 279 self.lock = None 280 self.spawn_count = p.spawn_count 281 self.info = p.get_info()
282 - def get_info(self):
283 return self.info
284 - def start(self):
285 raise Exception("cannot call start on a dead process!")
286 - def is_alive(self):
287 return False
288
289 -class ProcessListener(object):
290 """ 291 Listener class for L{ProcessMonitor} 292 """ 293
294 - def process_died(self, process_name, exit_code):
295 """ 296 Notifies listener that process has died. This callback only 297 occurs for processes that die during normal process monitor 298 execution -- processes that are forcibly killed during 299 ProcessMonitor shutdown are not reported. 300 @param process_name: name of process 301 @type process_name: str 302 @param exit_code: exit code of process. If None, it means 303 that ProcessMonitor was unable to determine an exit code. 304 @type exit_code: int 305 """ 306 pass
307
308 -class ProcessMonitor(Thread):
309
310 - def __init__(self, name="ProcessMonitor"):
311 Thread.__init__(self, name=name) 312 self.procs = [] 313 self.plock = RLock() 314 self.is_shutdown = False 315 self.done = False 316 self.setDaemon(True) 317 self.reacquire_signals = set() 318 self.listeners = [] 319 self.dead_list = [] 320 # #885: ensure core procs 321 self.core_procs = [] 322 # #642: flag to prevent process monitor exiting prematurely 323 self._registrations_complete = False 324 325 logger.info("created process monitor %s"%self)
326
327 - def add_process_listener(self, l):
328 """ 329 Listener for process events. MUST be called before 330 ProcessMonitor is running.See ProcessListener class. 331 @param l: listener instance 332 @type l: L{ProcessListener} 333 """ 334 self.listeners.append(l)
335
336 - def register(self, p):
337 """ 338 Register process with L{ProcessMonitor} 339 @param p: Process 340 @type p: L{Process} 341 @raise RLException: if process with same name is already registered 342 """ 343 logger.info("ProcessMonitor.register[%s]"%p.name) 344 e = None 345 with self.plock: 346 if self.has_process(p.name): 347 e = RLException("cannot add process with duplicate name '%s'"%p.name) 348 elif self.is_shutdown: 349 e = RLException("cannot add process [%s] after process monitor has been shut down"%p.name) 350 else: 351 self.procs.append(p) 352 if e: 353 logger.error("ProcessMonitor.register[%s] failed %s"%(p.name, e)) 354 raise e 355 else: 356 logger.info("ProcessMonitor.register[%s] complete"%p.name)
357
358 - def register_core_proc(self, p):
359 """ 360 Register core process with ProcessMonitor. Coreprocesses 361 have special shutdown semantics. They are killed after all 362 other processes, in reverse order in which they are added. 363 @param p Process 364 @type p: L{Process} 365 @raise RLException: if process with same name is already registered 366 """ 367 self.register(p) 368 self.core_procs.append(p)
369
370 - def registrations_complete(self):
371 """ 372 Inform the process monitor that registrations are complete. 373 After the registrations_complete flag is set, process monitor 374 will exit if there are no processes left to monitor. 375 """ 376 self._registrations_complete = True 377 logger.info("registrations completed %s"%self)
378
379 - def unregister(self, p):
380 logger.info("ProcessMonitor.unregister[%s] starting"%p.name) 381 with self.plock: 382 self.procs.remove(p) 383 logger.info("ProcessMonitor.unregister[%s] complete"%p.name)
384
385 - def has_process(self, name):
386 """ 387 @return: True if process is still be monitored. If False, process 388 has died or was never registered with process 389 @rtype: bool 390 """ 391 return len([p for p in self.procs if p.name == name]) > 0
392
393 - def get_process(self, name):
394 """ 395 @return: process registered under \a name, or None 396 @rtype: L{Process} 397 """ 398 with self.plock: 399 v = [p for p in self.procs if p.name == name] 400 if v: 401 return v[0]
402
403 - def has_main_thread_jobs(self):
404 """ 405 @return: True if ProcessMonitor has tasks that need to be run in the main thread 406 @rtype: bool 407 """ 408 return len(self.reacquire_signals)
409
410 - def do_main_thread_jobs(self):
411 """ 412 Execute tasks that need to be run in the main thread. Must be 413 called from main thread. 414 """ 415 #not entirely threadsafe 416 sigs = [s for s in self.reacquire_signals] 417 for s in sigs: 418 _signal_chain[s] = signal.signal(s, rl_signal) 419 self.reacquire_signals.remove(s)
420
421 - def kill_process(self, name):
422 """ 423 Kill process that matches name. NOTE: a killed process will 424 continue to show up as active until the process monitor thread 425 has caught that it has died. 426 @param name: Process name 427 @type name: str 428 @return: True if a process named name was removed from 429 process monitor. A process is considered killed if its stop() 430 method was called. 431 @rtype: bool 432 """ 433 def isstring(s): 434 """Small helper version to check an object is a string in 435 a way that works for both Python 2 and 3 436 """ 437 try: 438 return isinstance(s, basestring) 439 except NameError: 440 return isinstance(s, str)
441 442 if not isstring(name): 443 raise RLException("kill_process takes in a process name but was given: %s"%name) 444 logger.debug("ProcessMonitor.kill_process[%s]"%name) 445 printlog("[%s] kill requested"%name) 446 with self.plock: 447 p = self.get_process(name) 448 if p: 449 try: 450 # no need to accumulate errors, so pass in [] 451 p.stop([]) 452 except: 453 logger.error(traceback.format_exc()) 454 return True 455 else: 456 return False
457
458 - def shutdown(self):
459 """ 460 Shutdown the process monitor thread 461 """ 462 logger.info("ProcessMonitor.shutdown %s"%self) 463 self.is_shutdown = True
464
465 - def get_active_names(self):
466 """ 467 @return [str]: list of active process names 468 """ 469 with self.plock: 470 retval = [p.name for p in self.procs] 471 return retval
472
473 - def get_process_names_with_spawn_count(self):
474 """ 475 @return: Two lists, where first 476 list of active process names along with the number of times 477 that process has been spawned. Second list contains dead process names 478 and their spawn count. 479 @rtype: [[(str, int),], [(str,int),]] 480 """ 481 with self.plock: 482 actives = [(p.name, p.spawn_count) for p in self.procs] 483 deads = [(p.name, p.spawn_count) for p in self.dead_list] 484 retval = [actives, deads] 485 return retval
486
487 - def mainthread_spin_once(self):
488 """ 489 run() occurs in a separate thread and cannot do certain signal-related 490 work. The main thread of the application must call mainthread_spin() 491 or mainthread_spin_once() in order to perform these jobs. 492 """ 493 if not self.done: 494 if self.has_main_thread_jobs(): 495 self.do_main_thread_jobs() 496 return True 497 else: 498 return False
499
500 - def mainthread_spin(self):
501 """ 502 run() occurs in a separate thread and cannot do certain signal-related 503 work. The main thread of the application must call mainthread_spin() 504 or mainthread_spin_once() in order to perform these jobs. mainthread_spin() 505 blocks until the process monitor is complete. 506 """ 507 while not self.done: 508 if sys.platform in ['win32']: # cygwin seems to be ok 509 # windows sleep throws an exception when a signal has arrived, even when 510 # it has a handler. We can either use win32api.Sleep OR....catch 511 # the exception 512 try: 513 time.sleep(0.1) 514 except IOError: 515 pass 516 else: 517 time.sleep(0.1) 518 519 if self.has_main_thread_jobs(): 520 self.do_main_thread_jobs()
521
522 - def run(self):
523 """ 524 thread routine of the process monitor. NOTE: you must still 525 call mainthread_spin or mainthread_spin_once() from the main 526 thread in order to pick up main thread work from the process 527 monitor. 528 """ 529 try: 530 #don't let exceptions bomb thread, interferes with exit 531 try: 532 self._run() 533 except: 534 logger.error(traceback.format_exc()) 535 traceback.print_exc() 536 finally: 537 self._post_run()
538
539 - def _run(self):
540 """ 541 Internal run loop of ProcessMonitor 542 """ 543 plock = self.plock 544 dead = [] 545 respawn = [] 546 while not self.is_shutdown: 547 with plock: #copy self.procs 548 procs = self.procs[:] 549 if self.is_shutdown: 550 break 551 552 # check current signal handlers to see if children have stolen them away 553 # TODO: this code may not be necessary anymore (have to test) 554 for s in _signal_list: 555 if signal.getsignal(s) != rl_signal: 556 self.reacquire_signals.add(s) 557 558 for p in procs: 559 try: 560 if not p.is_alive(): 561 logger.debug("Process[%s] has died, respawn=%s, required=%s, exit_code=%s", 562 p.name, 563 "True(%f)" % p.respawn_delay if p.respawn else p.respawn, 564 p.required, p.exit_code) 565 exit_code_str = p.get_exit_description() 566 if p.required: 567 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80) 568 self.is_shutdown = True 569 elif not p in respawn: 570 if p.exit_code: 571 printerrlog("[%s] %s"%(p.name, exit_code_str)) 572 else: 573 printlog_bold("[%s] %s"%(p.name, exit_code_str)) 574 dead.append(p) 575 576 ## no need for lock as we require listeners be 577 ## added before process monitor is launched 578 for l in self.listeners: 579 l.process_died(p.name, p.exit_code) 580 581 except Exception as e: 582 traceback.print_exc() 583 #don't respawn as this is an internal error 584 dead.append(p) 585 if self.is_shutdown: 586 break #stop polling 587 for d in dead: 588 try: 589 # when should_respawn() returns 0.0, bool(0.0) evaluates to False 590 # work around this by checking if the return value is False 591 if d.should_respawn() is not False: 592 respawn.append(d) 593 else: 594 self.unregister(d) 595 # stop process, don't accumulate errors 596 d.stop([]) 597 # save process data to dead list 598 with plock: 599 self.dead_list.append(DeadProcess(d)) 600 except: 601 logger.error(traceback.format_exc()) 602 603 # dead check is to make sure that ProcessMonitor at least 604 # waits until its had at least one process before exiting 605 if self._registrations_complete and dead and not self.procs and not respawn: 606 printlog("all processes on machine have died, roslaunch will exit") 607 self.is_shutdown = True 608 del dead[:] 609 _respawn=[] 610 for r in respawn: 611 try: 612 if self.is_shutdown: 613 break 614 if r.should_respawn() <= 0.0: 615 printlog("[%s] restarting process" % r.name) 616 # stop process, don't accumulate errors 617 r.stop([]) 618 r.start() 619 else: 620 # not ready yet, keep it around 621 _respawn.append(r) 622 except: 623 traceback.print_exc() 624 logger.error("Restart failed %s",traceback.format_exc()) 625 respawn = _respawn 626 time.sleep(0.1) #yield thread
627 #moved this to finally block of _post_run 628 #self._post_run() #kill all processes 629
630 - def _post_run(self):
631 logger.info("ProcessMonitor._post_run %s"%self) 632 # this is already true entering, but go ahead and make sure 633 self.is_shutdown = True 634 # killall processes on run exit 635 636 q = Queue() 637 q.join() 638 639 with self.plock: 640 # make copy of core_procs for threadsafe usage 641 core_procs = self.core_procs[:] 642 logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, self.procs)) 643 644 # enqueue all non-core procs in reverse order for parallel kill 645 # #526/885: ignore core procs 646 [q.put(p) for p in reversed(self.procs) if not p in core_procs] 647 648 # use 10 workers 649 killers = [] 650 for i in range(10): 651 t = _ProcessKiller(q, i) 652 killers.append(t) 653 t.start() 654 655 # wait for workers to finish 656 q.join() 657 shutdown_errors = [] 658 659 # accumulate all the shutdown errors 660 for t in killers: 661 shutdown_errors.extend(t.errors) 662 del killers[:] 663 664 # #526/885: kill core procs last 665 # we don't want to parallelize this as the master has to be last 666 for p in reversed(core_procs): 667 _kill_process(p, shutdown_errors) 668 669 # delete everything except dead_list 670 logger.info("ProcessMonitor exit: cleaning up data structures and signals") 671 with self.plock: 672 del core_procs[:] 673 del self.procs[:] 674 del self.core_procs[:] 675 676 reacquire_signals = self.reacquire_signals 677 if reacquire_signals: 678 reacquire_signals.clear() 679 logger.info("ProcessMonitor exit: pmon has shutdown") 680 self.done = True 681 682 if shutdown_errors: 683 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
684
685 -def _kill_process(p, errors):
686 """ 687 Routine for kill Process p with appropriate logging to screen and logfile 688 689 @param p: process to kill 690 @type p: Process 691 @param errors: list of error messages from killed process 692 @type errors: [str] 693 """ 694 try: 695 logger.info("ProcessMonitor exit: killing %s", p.name) 696 printlog("[%s] killing on exit"%p.name) 697 # we accumulate errors from each process so that we can print these at the end 698 p.stop(errors) 699 except: 700 traceback.print_exc() 701 logger.error(traceback.format_exc())
702
703 -class _ProcessKiller(Thread):
704
705 - def __init__(self, q, i):
706 Thread.__init__(self, name="ProcessKiller-%s"%i) 707 self.q = q 708 self.errors = []
709
710 - def run(self):
711 q = self.q 712 while not q.empty(): 713 try: 714 p = q.get(False) 715 _kill_process(p, self.errors) 716 q.task_done() 717 except Empty: 718 pass
719