Package rosunit :: Module pmon
[frames] | no frames]

Source Code for Module rosunit.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: pmon.py 11982 2010-10-29 23:43:00Z kwc $ 
 34   
 35  """ 
 36  Process monitor 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41  import os 
 42  import sys 
 43  import time 
 44  import traceback 
 45  import logging 
 46  import Queue 
 47  import atexit 
 48  from threading import Thread, RLock, Lock 
 49   
 50  from .core import printlog, printlog_bold, printerrlog 
 51   
 52  _logger = logging.getLogger("rosunit")           
 53   
54 -class PmonException(Exception): pass
55
56 -class FatalProcessLaunch(PmonException):
57 """ 58 Exception to indicate that a process launch has failed in a fatal 59 manner (i.e. relaunch is unlikely to succeed) 60 """ 61 pass
62 63 # start/shutdown ################################################ 64 65 _pmons = [] 66 _pmon_counter = 0 67 _shutting_down = False
68 -def start_process_monitor():
69 global _pmon_counter 70 if _shutting_down: 71 #_logger.error("start_process_monitor: cannot start new ProcessMonitor (shutdown initiated)") 72 return None 73 _pmon_counter += 1 74 name = "ProcessMonitor-%s"%_pmon_counter 75 _logger.info("start_process_monitor: creating ProcessMonitor") 76 process_monitor = ProcessMonitor(name) 77 with _shutdown_lock: 78 # prevent race condition with pmon_shutdown() being triggered 79 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 80 # during startup) 81 _pmons.append(process_monitor) 82 process_monitor.start() 83 _logger.info("start_process_monitor: ProcessMonitor started") 84 return process_monitor
85
86 -def shutdown_process_monitor(process_monitor):
87 """ 88 @param process_monitor: process monitor to kill 89 @type process_monitor: L{ProcessMonitor} 90 @return: True if process_monitor was successfully 91 shutdown. False if it could not be shutdown cleanly or if there is 92 a problem with process_monitor 93 parameter. shutdown_process_monitor() does not throw any exceptions 94 as this is shutdown-critical code. 95 @rtype: bool 96 """ 97 try: 98 if process_monitor is None or process_monitor.is_shutdown: 99 return False 100 101 # I've decided to comment out use of _logger until after 102 # critical section is done, just in case _logger is already 103 # being torn down 104 105 #_logger.debug("shutdown_process_monitor: shutting down ProcessMonitor") 106 process_monitor.shutdown() 107 #_logger.debug("shutdown_process_monitor: joining ProcessMonitor") 108 process_monitor.join(20.0) 109 if process_monitor.isAlive(): 110 _logger.error("shutdown_process_monitor: ProcessMonitor shutdown failed!") 111 return False 112 else: 113 _logger.debug("shutdown_process_monitor: ProcessMonitor shutdown succeeded") 114 return True 115 except Exception, e: 116 return False
117 118 _shutdown_lock = Lock()
119 -def pmon_shutdown():
120 global _pmons 121 with _shutdown_lock: 122 if not _pmons: 123 return 124 for p in _pmons: 125 shutdown_process_monitor(p) 126 del _pmons[:]
127 128 atexit.register(pmon_shutdown) 129 130 # ############################################################## 131
132 -class Process(object):
133 """ 134 Basic process representation for L{ProcessMonitor}. Must be subclassed 135 to provide actual start()/stop() implementations. 136 """ 137
138 - def __init__(self, package, name, args, env, respawn=False, required=False):
139 self.package = package 140 self.name = name 141 self.args = args 142 self.env = env 143 self.respawn = respawn 144 self.required = required 145 self.lock = Lock() 146 self.exit_code = None 147 # for keeping track of respawning 148 self.spawn_count = 0
149
150 - def __str__(self):
151 return "Process<%s>"%(self.name)
152 153 # NOTE: get_info() is going to have to be sufficient for 154 # generating respawn requests, so we must be complete about it 155
156 - def get_info(self):
157 """ 158 Get all data about this process in dictionary form 159 @return: dictionary of all relevant process properties 160 @rtype: dict { str: val } 161 """ 162 info = { 163 'spawn_count': self.spawn_count, 164 'args': self.args, 165 'env': self.env, 166 'package': self.package, 167 'name': self.name, 168 'alive': self.is_alive(), 169 'respawn': self.respawn, 170 'required': self.required, 171 } 172 if self.exit_code is not None: 173 info['exit_code'] = self.exit_code 174 return info
175
176 - def start(self):
177 self.spawn_count += 1
178
179 - def is_alive(self):
180 return False
181
182 - def stop(self, errors=[]):
183 """ 184 Stop the process. Record any significant error messages in the errors parameter 185 186 @param errors: error messages. stop() will record messages into this list. 187 @type errors: [str] 188 """ 189 pass
190
191 - def get_exit_description(self):
192 if self.exit_code is not None: 193 if self.exit_code: 194 return 'process has died [exit code %s]'%self.exit_code 195 else: 196 # try not to scare users about process exit 197 return 'process has finished cleanly' 198 else: 199 return 'process has died'
200
201 -class DeadProcess(Process):
202 """ 203 Container class to maintain information about a process that has died. This 204 container allows us to delete the actual Process but still maintain the metadata 205 """
206 - def __init__(self, p):
207 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn) 208 self.exit_code = p.exit_code 209 self.lock = None 210 self.spawn_count = p.spawn_count 211 self.info = p.get_info()
212 - def get_info(self):
213 return self.info
214 - def start(self):
215 raise Exception("cannot call start on a dead process!")
216 - def is_alive(self):
217 return False
218
219 -class ProcessListener(object):
220 """ 221 Listener class for L{ProcessMonitor} 222 """ 223
224 - def process_died(self, process_name, exit_code):
225 """ 226 Notifies listener that process has died. This callback only 227 occurs for processes that die during normal process monitor 228 execution -- processes that are forcibly killed during 229 ProcessMonitor shutdown are not reported. 230 @param process_name: name of process 231 @type process_name: str 232 @param exit_code: exit code of process. If None, it means 233 that ProcessMonitor was unable to determine an exit code. 234 @type exit_code: int 235 """ 236 pass
237
238 -class ProcessMonitor(Thread):
239
240 - def __init__(self, name="ProcessMonitor"):
241 Thread.__init__(self, name=name) 242 self.procs = [] 243 self.plock = RLock() 244 self.is_shutdown = False 245 self.done = False 246 self.setDaemon(True) 247 self.listeners = [] 248 self.dead_list = [] 249 # #885: ensure core procs 250 self.core_procs = [] 251 # #642: flag to prevent process monitor exiting prematurely 252 self._registrations_complete = False 253 254 _logger.info("created process monitor %s"%self)
255
256 - def add_process_listener(self, l):
257 """ 258 Listener for process events. MUST be called before 259 ProcessMonitor is running.See ProcessListener class. 260 @param l: listener instance 261 @type l: L{ProcessListener} 262 """ 263 self.listeners.append(l)
264
265 - def register(self, p):
266 """ 267 Register process with L{ProcessMonitor} 268 @param p: Process 269 @type p: L{Process} 270 @raise PmonException: if process with same name is already registered 271 """ 272 _logger.info("ProcessMonitor.register[%s]"%p.name) 273 e = None 274 with self.plock: 275 if self.has_process(p.name): 276 e = PmonException("cannot add process with duplicate name '%s'"%p.name) 277 elif self.is_shutdown: 278 e = PmonException("cannot add process [%s] after process monitor has been shut down"%p.name) 279 else: 280 self.procs.append(p) 281 if e: 282 _logger.error("ProcessMonitor.register[%s] failed %s"%(p.name, e)) 283 raise e 284 else: 285 _logger.info("ProcessMonitor.register[%s] complete"%p.name)
286
287 - def register_core_proc(self, p):
288 """ 289 Register core process with ProcessMonitor. Coreprocesses 290 have special shutdown semantics. They are killed after all 291 other processes, in reverse order in which they are added. 292 @param p Process 293 @type p: L{Process} 294 @raise PmonException: if process with same name is already registered 295 """ 296 self.register(p) 297 self.core_procs.append(p)
298
299 - def registrations_complete(self):
300 """ 301 Inform the process monitor that registrations are complete. 302 After the registrations_complete flag is set, process monitor 303 will exit if there are no processes left to monitor. 304 """ 305 self._registrations_complete = True 306 _logger.info("registrations completed %s"%self)
307
308 - def unregister(self, p):
309 _logger.info("ProcessMonitor.unregister[%s] starting"%p.name) 310 with self.plock: 311 self.procs.remove(p) 312 _logger.info("ProcessMonitor.unregister[%s] complete"%p.name)
313
314 - def has_process(self, name):
315 """ 316 @return: True if process is still be monitored. If False, process 317 has died or was never registered with process 318 @rtype: bool 319 """ 320 return len([p for p in self.procs if p.name == name]) > 0
321
322 - def get_process(self, name):
323 """ 324 @return: process registered under \a name, or None 325 @rtype: L{Process} 326 """ 327 with self.plock: 328 v = [p for p in self.procs if p.name == name] 329 if v: 330 return v[0]
331
332 - def kill_process(self, name):
333 """ 334 Kill process that matches name. NOTE: a killed process will 335 continue to show up as active until the process monitor thread 336 has caught that it has died. 337 @param name: Process name 338 @type name: str 339 @return: True if a process named name was removed from 340 process monitor. A process is considered killed if its stop() 341 method was called. 342 @rtype: bool 343 """ 344 if not isinstance(name, basestring): 345 raise PmonException("kill_process takes in a process name but was given: %s"%name) 346 _logger.debug("ProcessMonitor.kill_process[%s]"%name) 347 printlog("[%s] kill requested"%name) 348 with self.plock: 349 p = self.get_process(name) 350 if p: 351 try: 352 # no need to accumulate errors, so pass in [] 353 p.stop([]) 354 except: 355 _logger.error(traceback.format_exc()) 356 return True 357 else: 358 return False
359
360 - def shutdown(self):
361 """ 362 Shutdown the process monitor thread 363 """ 364 _logger.info("ProcessMonitor.shutdown %s"%self) 365 self.is_shutdown = True
366
367 - def get_active_names(self):
368 """ 369 @return [str]: list of active process names 370 """ 371 with self.plock: 372 retval = [p.name for p in self.procs] 373 return retval
374
376 """ 377 @return: Two lists, where first 378 list of active process names along with the number of times 379 that process has been spawned. Second list contains dead process names 380 and their spawn count. 381 @rtype: [[(str, int),], [(str,int),]] 382 """ 383 with self.plock: 384 actives = [(p.name, p.spawn_count) for p in self.procs] 385 deads = [(p.name, p.spawn_count) for p in self.dead_list] 386 retval = [actives, deads] 387 return retval
388
389 - def run(self):
390 """ 391 thread routine of the process monitor. 392 """ 393 try: 394 #don't let exceptions bomb thread, interferes with exit 395 try: 396 self._run() 397 except: 398 _logger.error(traceback.format_exc()) 399 traceback.print_exc() 400 finally: 401 self._post_run()
402
403 - def _run(self):
404 """ 405 Internal run loop of ProcessMonitor 406 """ 407 plock = self.plock 408 dead = [] 409 respawn = [] 410 while not self.is_shutdown: 411 with plock: #copy self.procs 412 procs = self.procs[:] 413 if self.is_shutdown: 414 break 415 416 for p in procs: 417 try: 418 if not p.is_alive(): 419 _logger.debug("Process[%s] has died, respawn=%s, required=%s, exit_code=%s",p.name, p.respawn, p.required, p.exit_code) 420 exit_code_str = p.get_exit_description() 421 if p.respawn: 422 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str)) 423 respawn.append(p) 424 elif p.required: 425 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80) 426 self.is_shutdown = True 427 else: 428 if p.exit_code: 429 printerrlog("[%s] %s"%(p.name, exit_code_str)) 430 else: 431 printlog_bold("[%s] %s"%(p.name, exit_code_str)) 432 dead.append(p) 433 434 ## no need for lock as we require listeners be 435 ## added before process monitor is launched 436 for l in self.listeners: 437 l.process_died(p.name, p.exit_code) 438 439 except Exception, e: 440 traceback.print_exc() 441 #don't respawn as this is an internal error 442 dead.append(p) 443 if self.is_shutdown: 444 break #stop polling 445 for d in dead: 446 try: 447 self.unregister(d) 448 # stop process, don't accumulate errors 449 d.stop([]) 450 451 # save process data to dead list 452 with plock: 453 self.dead_list.append(DeadProcess(d)) 454 except: 455 _logger.error(traceback.format_exc()) 456 457 # dead check is to make sure that ProcessMonitor at least 458 # waits until its had at least one process before exiting 459 if self._registrations_complete and dead and not self.procs and not respawn: 460 printlog("all processes on machine have died, roslaunch will exit") 461 self.is_shutdown = True 462 del dead[:] 463 for r in respawn: 464 try: 465 if self.is_shutdown: 466 break 467 printlog("[%s] restarting process"%r.name) 468 # stop process, don't accumulate errors 469 r.stop([]) 470 r.start() 471 except: 472 traceback.print_exc() 473 _logger.error("Restart failed %s",traceback.format_exc()) 474 del respawn[:] 475 time.sleep(0.1) #yield thread
476 #moved this to finally block of _post_run 477 #self._post_run() #kill all processes 478
479 - def _post_run(self):
480 _logger.info("ProcessMonitor._post_run %s"%self) 481 # this is already true entering, but go ahead and make sure 482 self.is_shutdown = True 483 # killall processes on run exit 484 485 q = Queue.Queue() 486 q.join() 487 488 with self.plock: 489 # make copy of core_procs for threadsafe usage 490 core_procs = self.core_procs[:] 491 _logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, self.procs)) 492 493 # enqueue all non-core procs in reverse order for parallel kill 494 # #526/885: ignore core procs 495 [q.put(p) for p in reversed(self.procs) if not p in core_procs] 496 497 # use 10 workers 498 killers = [] 499 for i in range(10): 500 t = _ProcessKiller(q, i) 501 killers.append(t) 502 t.start() 503 504 # wait for workers to finish 505 q.join() 506 shutdown_errors = [] 507 508 # accumulate all the shutdown errors 509 for t in killers: 510 shutdown_errors.extend(t.errors) 511 del killers[:] 512 513 # #526/885: kill core procs last 514 # we don't want to parallelize this as the master has to be last 515 for p in reversed(core_procs): 516 _kill_process(p, shutdown_errors) 517 518 # delete everything except dead_list 519 _logger.info("ProcessMonitor exit: cleaning up data structures") 520 with self.plock: 521 del core_procs[:] 522 del self.procs[:] 523 del self.core_procs[:] 524 525 _logger.info("ProcessMonitor exit: pmon has shutdown") 526 self.done = True 527 528 if shutdown_errors: 529 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
530
531 -def _kill_process(p, errors):
532 """ 533 Routine for kill Process p with appropriate logging to screen and logfile 534 535 @param p: process to kill 536 @type p: Process 537 @param errors: list of error messages from killed process 538 @type errors: [str] 539 """ 540 try: 541 _logger.info("ProcessMonitor exit: killing %s", p.name) 542 printlog("[%s] killing on exit"%p.name) 543 # we accumulate errors from each process so that we can print these at the end 544 p.stop(errors) 545 except: 546 _logger.error(traceback.format_exc())
547
548 -class _ProcessKiller(Thread):
549
550 - def __init__(self, q, i):
551 Thread.__init__(self, name="ProcessKiller-%s"%i) 552 self.q = q 553 self.errors = []
554
555 - def run(self):
556 q = self.q 557 while not q.empty(): 558 try: 559 p = q.get(False) 560 _kill_process(p, self.errors) 561 q.task_done() 562 except Queue.Empty: 563 pass
564