Package rosunit :: Module pmon
[frames] | no frames]

Source Code for Module rosunit.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Process monitor 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41  import atexit 
 42  import time 
 43  import traceback 
 44  from threading import Lock 
 45  from threading import RLock 
 46  from threading import Thread 
 47  try: 
 48      from queue import Empty, Queue 
 49  except ImportError: 
 50      from Queue import Empty, Queue 
 51   
 52  from .core import printerrlog 
 53  from .core import printlog 
 54  from .core import printlog_bold 
 55   
 56   
57 -class PmonException(Exception):
58 pass
59 60
61 -class FatalProcessLaunch(PmonException):
62 """ 63 Exception to indicate that a process launch has failed in a fatal 64 manner (i.e. relaunch is unlikely to succeed) 65 """ 66 pass
67 68 69 # start/shutdown ################################################ 70 71 _pmons = [] 72 _pmon_counter = 0 73 _shutting_down = False 74 75
76 -def start_process_monitor():
77 global _pmon_counter 78 if _shutting_down: 79 return None 80 _pmon_counter += 1 81 name = 'ProcessMonitor-%s' % _pmon_counter 82 process_monitor = ProcessMonitor(name) 83 with _shutdown_lock: 84 # prevent race condition with pmon_shutdown() being triggered 85 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 86 # during startup) 87 _pmons.append(process_monitor) 88 process_monitor.start() 89 return process_monitor
90 91
92 -def shutdown_process_monitor(process_monitor):
93 """ 94 @param process_monitor: process monitor to kill 95 @type process_monitor: L{ProcessMonitor} 96 @return: True if process_monitor was successfully 97 shutdown. False if it could not be shutdown cleanly or if there is 98 a problem with process_monitor 99 parameter. shutdown_process_monitor() does not throw any exceptions 100 as this is shutdown-critical code. 101 @rtype: bool 102 """ 103 try: 104 if process_monitor is None or process_monitor.is_shutdown: 105 return False 106 107 process_monitor.shutdown() 108 process_monitor.join(20.0) 109 if process_monitor.isAlive(): 110 return False 111 else: 112 return True 113 except Exception: 114 return False
115 116 117 _shutdown_lock = Lock() 118 119
120 -def pmon_shutdown():
121 global _pmons 122 with _shutdown_lock: 123 if not _pmons: 124 return 125 for p in _pmons: 126 shutdown_process_monitor(p) 127 del _pmons[:]
128 129 130 atexit.register(pmon_shutdown) 131 132 133 # ############################################################## 134
135 -class Process(object):
136 """ 137 Basic process representation for L{ProcessMonitor}. Must be subclassed 138 to provide actual start()/stop() implementations. 139 """ 140
141 - def __init__(self, package, name, args, env, respawn=False, required=False):
142 self.package = package 143 self.name = name 144 self.args = args 145 self.env = env 146 self.respawn = respawn 147 self.required = required 148 self.lock = Lock() 149 self.exit_code = None 150 # for keeping track of respawning 151 self.spawn_count = 0
152
153 - def __str__(self):
154 return 'Process<%s>' % (self.name)
155 156 # NOTE: get_info() is going to have to be sufficient for 157 # generating respawn requests, so we must be complete about it 158
159 - def get_info(self):
160 """ 161 Get all data about this process in dictionary form 162 @return: dictionary of all relevant process properties 163 @rtype: dict { str: val } 164 """ 165 info = { 166 'spawn_count': self.spawn_count, 167 'args': self.args, 168 'env': self.env, 169 'package': self.package, 170 'name': self.name, 171 'alive': self.is_alive(), 172 'respawn': self.respawn, 173 'required': self.required, 174 } 175 if self.exit_code is not None: 176 info['exit_code'] = self.exit_code 177 return info
178
179 - def start(self):
180 self.spawn_count += 1
181
182 - def is_alive(self):
183 return False
184
185 - def stop(self, errors=[]):
186 """ 187 Stop the process. Record any significant error messages in the errors parameter 188 189 @param errors: error messages. stop() will record messages into this list. 190 @type errors: [str] 191 """ 192 pass
193
194 - def get_exit_description(self):
195 if self.exit_code is not None: 196 if self.exit_code: 197 return 'process has died [exit code %s]' % self.exit_code 198 else: 199 # try not to scare users about process exit 200 return 'process has finished cleanly' 201 else: 202 return 'process has died'
203 204
205 -class DeadProcess(Process):
206 """ 207 Container class to maintain information about a process that has died. This 208 container allows us to delete the actual Process but still maintain the metadata 209 """
210 - def __init__(self, p):
211 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn) 212 self.exit_code = p.exit_code 213 self.lock = None 214 self.spawn_count = p.spawn_count 215 self.info = p.get_info()
216
217 - def get_info(self):
218 return self.info
219
220 - def start(self):
221 raise Exception('cannot call start on a dead process!')
222
223 - def is_alive(self):
224 return False
225 226
227 -class ProcessListener(object):
228 """ 229 Listener class for L{ProcessMonitor} 230 """ 231
232 - def process_died(self, process_name, exit_code):
233 """ 234 Notifies listener that process has died. This callback only 235 occurs for processes that die during normal process monitor 236 execution -- processes that are forcibly killed during 237 ProcessMonitor shutdown are not reported. 238 @param process_name: name of process 239 @type process_name: str 240 @param exit_code: exit code of process. If None, it means 241 that ProcessMonitor was unable to determine an exit code. 242 @type exit_code: int 243 """ 244 pass
245 246
247 -class ProcessMonitor(Thread):
248
249 - def __init__(self, name='ProcessMonitor'):
250 Thread.__init__(self, name=name) 251 self.procs = [] 252 self.plock = RLock() 253 self.is_shutdown = False 254 self.done = False 255 self.setDaemon(True) 256 self.listeners = [] 257 self.dead_list = [] 258 # #885: ensure core procs 259 self.core_procs = [] 260 # #642: flag to prevent process monitor exiting prematurely 261 self._registrations_complete = False
262
263 - def add_process_listener(self, l):
264 """ 265 Listener for process events. MUST be called before 266 ProcessMonitor is running.See ProcessListener class. 267 @param l: listener instance 268 @type l: L{ProcessListener} 269 """ 270 self.listeners.append(l)
271
272 - def register(self, p):
273 """ 274 Register process with L{ProcessMonitor} 275 @param p: Process 276 @type p: L{Process} 277 @raise PmonException: if process with same name is already registered 278 """ 279 e = None 280 with self.plock: 281 if self.has_process(p.name): 282 e = PmonException("cannot add process with duplicate name '%s'" % p.name) 283 elif self.is_shutdown: 284 e = PmonException('cannot add process [%s] after process monitor has been shut down' % p.name) 285 else: 286 self.procs.append(p) 287 if e: 288 raise e
289
290 - def register_core_proc(self, p):
291 """ 292 Register core process with ProcessMonitor. Coreprocesses 293 have special shutdown semantics. They are killed after all 294 other processes, in reverse order in which they are added. 295 @param p Process 296 @type p: L{Process} 297 @raise PmonException: if process with same name is already registered 298 """ 299 self.register(p) 300 self.core_procs.append(p)
301
302 - def registrations_complete(self):
303 """ 304 Inform the process monitor that registrations are complete. 305 After the registrations_complete flag is set, process monitor 306 will exit if there are no processes left to monitor. 307 """ 308 self._registrations_complete = True
309
310 - def unregister(self, p):
311 with self.plock: 312 self.procs.remove(p)
313
314 - def has_process(self, name):
315 """ 316 @return: True if process is still be monitored. If False, process 317 has died or was never registered with process 318 @rtype: bool 319 """ 320 return len([p for p in self.procs if p.name == name]) > 0
321
322 - def get_process(self, name):
323 """ 324 @return: process registered under \a name, or None 325 @rtype: L{Process} 326 """ 327 with self.plock: 328 v = [p for p in self.procs if p.name == name] 329 if v: 330 return v[0]
331
332 - def kill_process(self, name):
333 """ 334 Kill process that matches name. NOTE: a killed process will 335 continue to show up as active until the process monitor thread 336 has caught that it has died. 337 @param name: Process name 338 @type name: str 339 @return: True if a process named name was removed from 340 process monitor. A process is considered killed if its stop() 341 method was called. 342 @rtype: bool 343 """ 344 def is_string_type(obj): 345 try: 346 return isinstance(obj, basestring) 347 except NameError: 348 return isinstance(obj, str)
349 if not is_string_type(name): 350 raise PmonException('kill_process takes in a process name but was given: %s' % name) 351 printlog('[%s] kill requested' % name) 352 with self.plock: 353 p = self.get_process(name) 354 if p: 355 try: 356 # no need to accumulate errors, so pass in [] 357 p.stop([]) 358 except Exception as e: 359 printerrlog('Exception: %s' % (str(e))) 360 return True 361 else: 362 return False
363
364 - def shutdown(self):
365 """ 366 Shutdown the process monitor thread 367 """ 368 self.is_shutdown = True
369
370 - def get_active_names(self):
371 """ 372 @return [str]: list of active process names 373 """ 374 with self.plock: 375 retval = [p.name for p in self.procs] 376 return retval
377
378 - def get_process_names_with_spawn_count(self):
379 """ 380 @return: Two lists, where first 381 list of active process names along with the number of times 382 that process has been spawned. Second list contains dead process names 383 and their spawn count. 384 @rtype: [[(str, int),], [(str,int),]] 385 """ 386 with self.plock: 387 actives = [(p.name, p.spawn_count) for p in self.procs] 388 deads = [(p.name, p.spawn_count) for p in self.dead_list] 389 retval = [actives, deads] 390 return retval
391
392 - def run(self):
393 """ 394 thread routine of the process monitor. 395 """ 396 try: 397 # don't let exceptions bomb thread, interferes with exit 398 try: 399 self._run() 400 except Exception: 401 traceback.print_exc() 402 finally: 403 self._post_run()
404
405 - def _run(self):
406 """ 407 Internal run loop of ProcessMonitor 408 """ 409 plock = self.plock 410 dead = [] 411 respawn = [] 412 while not self.is_shutdown: 413 with plock: # copy self.procs 414 procs = self.procs[:] 415 if self.is_shutdown: 416 break 417 418 for p in procs: 419 try: 420 if not p.is_alive(): 421 exit_code_str = p.get_exit_description() 422 if p.respawn: 423 printlog_bold('[%s] %s\nrespawning...' % (p.name, exit_code_str)) 424 respawn.append(p) 425 elif p.required: 426 printerrlog('=' * 80 + 'REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n' % (p.name, exit_code_str) + '=' * 80) 427 self.is_shutdown = True 428 else: 429 if p.exit_code: 430 printerrlog('[%s] %s' % (p.name, exit_code_str)) 431 else: 432 printlog_bold('[%s] %s' % (p.name, exit_code_str)) 433 dead.append(p) 434 435 # no need for lock as we require listeners be 436 # added before process monitor is launched 437 for l in self.listeners: 438 l.process_died(p.name, p.exit_code) 439 440 except Exception: 441 traceback.print_exc() 442 # don't respawn as this is an internal error 443 dead.append(p) 444 if self.is_shutdown: 445 break # stop polling 446 for d in dead: 447 try: 448 self.unregister(d) 449 # stop process, don't accumulate errors 450 d.stop([]) 451 452 # save process data to dead list 453 with plock: 454 self.dead_list.append(DeadProcess(d)) 455 except Exception as e: 456 printerrlog('Exception: %s' % (str(e))) 457 458 # dead check is to make sure that ProcessMonitor at least 459 # waits until its had at least one process before exiting 460 if self._registrations_complete and dead and not self.procs and not respawn: 461 printlog('all processes on machine have died, roslaunch will exit') 462 self.is_shutdown = True 463 del dead[:] 464 for r in respawn: 465 try: 466 if self.is_shutdown: 467 break 468 printlog('[%s] restarting process' % r.name) 469 # stop process, don't accumulate errors 470 r.stop([]) 471 r.start() 472 except Exception: 473 traceback.print_exc() 474 del respawn[:] 475 time.sleep(0.1) # yield thread
476 # moved this to finally block of _post_run 477 # self._post_run() #kill all processes 478
479 - def _post_run(self):
480 # this is already true entering, but go ahead and make sure 481 self.is_shutdown = True 482 # killall processes on run exit 483 484 q = Queue() 485 q.join() 486 487 with self.plock: 488 # make copy of core_procs for threadsafe usage 489 core_procs = self.core_procs[:] 490 491 # enqueue all non-core procs in reverse order for parallel kill 492 # #526/885: ignore core procs 493 [q.put(p) for p in reversed(self.procs) if p not in core_procs] 494 495 # use 10 workers 496 killers = [] 497 for i in range(10): 498 t = _ProcessKiller(q, i) 499 killers.append(t) 500 t.start() 501 502 # wait for workers to finish 503 q.join() 504 shutdown_errors = [] 505 506 # accumulate all the shutdown errors 507 for t in killers: 508 shutdown_errors.extend(t.errors) 509 del killers[:] 510 511 # #526/885: kill core procs last 512 # we don't want to parallelize this as the master has to be last 513 for p in reversed(core_procs): 514 _kill_process(p, shutdown_errors) 515 516 # delete everything except dead_list 517 with self.plock: 518 del core_procs[:] 519 del self.procs[:] 520 del self.core_procs[:] 521 522 self.done = True 523 524 if shutdown_errors: 525 printerrlog('Shutdown errors:\n' + '\n'.join([' * %s' % e for e in shutdown_errors]))
526 527
528 -def _kill_process(p, errors):
529 """ 530 Routine for kill Process p with appropriate logging to screen and logfile 531 532 @param p: process to kill 533 @type p: Process 534 @param errors: list of error messages from killed process 535 @type errors: [str] 536 """ 537 try: 538 printlog('[%s] killing on exit' % p.name) 539 # we accumulate errors from each process so that we can print these at the end 540 p.stop(errors) 541 except Exception as e: 542 printerrlog('Exception: %s' % (str(e)))
543 544
545 -class _ProcessKiller(Thread):
546
547 - def __init__(self, q, i):
548 Thread.__init__(self, name='ProcessKiller-%s' % i) 549 self.q = q 550 self.errors = []
551
552 - def run(self):
553 q = self.q 554 while not q.empty(): 555 try: 556 p = q.get(False) 557 _kill_process(p, self.errors) 558 q.task_done() 559 except Empty: 560 pass
561