Package rosunit :: Module pmon
[frames] | no frames]

Source Code for Module rosunit.pmon

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Process monitor 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41  import os 
 42  import sys 
 43  import time 
 44  import traceback 
 45  import Queue 
 46  import atexit 
 47  from threading import Thread, RLock, Lock 
 48   
 49  from .core import printlog, printlog_bold, printerrlog 
 50   
51 -class PmonException(Exception): pass
52
53 -class FatalProcessLaunch(PmonException):
54 """ 55 Exception to indicate that a process launch has failed in a fatal 56 manner (i.e. relaunch is unlikely to succeed) 57 """ 58 pass
59 60 # start/shutdown ################################################ 61 62 _pmons = [] 63 _pmon_counter = 0 64 _shutting_down = False
65 -def start_process_monitor():
66 global _pmon_counter 67 if _shutting_down: 68 return None 69 _pmon_counter += 1 70 name = "ProcessMonitor-%s"%_pmon_counter 71 process_monitor = ProcessMonitor(name) 72 with _shutdown_lock: 73 # prevent race condition with pmon_shutdown() being triggered 74 # as we are starting a ProcessMonitor (i.e. user hits ctrl-C 75 # during startup) 76 _pmons.append(process_monitor) 77 process_monitor.start() 78 return process_monitor
79
80 -def shutdown_process_monitor(process_monitor):
81 """ 82 @param process_monitor: process monitor to kill 83 @type process_monitor: L{ProcessMonitor} 84 @return: True if process_monitor was successfully 85 shutdown. False if it could not be shutdown cleanly or if there is 86 a problem with process_monitor 87 parameter. shutdown_process_monitor() does not throw any exceptions 88 as this is shutdown-critical code. 89 @rtype: bool 90 """ 91 try: 92 if process_monitor is None or process_monitor.is_shutdown: 93 return False 94 95 process_monitor.shutdown() 96 process_monitor.join(20.0) 97 if process_monitor.isAlive(): 98 return False 99 else: 100 return True 101 except Exception, e: 102 return False
103 104 _shutdown_lock = Lock()
105 -def pmon_shutdown():
106 global _pmons 107 with _shutdown_lock: 108 if not _pmons: 109 return 110 for p in _pmons: 111 shutdown_process_monitor(p) 112 del _pmons[:]
113 114 atexit.register(pmon_shutdown) 115 116 # ############################################################## 117
118 -class Process(object):
119 """ 120 Basic process representation for L{ProcessMonitor}. Must be subclassed 121 to provide actual start()/stop() implementations. 122 """ 123
124 - def __init__(self, package, name, args, env, respawn=False, required=False):
125 self.package = package 126 self.name = name 127 self.args = args 128 self.env = env 129 self.respawn = respawn 130 self.required = required 131 self.lock = Lock() 132 self.exit_code = None 133 # for keeping track of respawning 134 self.spawn_count = 0
135
136 - def __str__(self):
137 return "Process<%s>"%(self.name)
138 139 # NOTE: get_info() is going to have to be sufficient for 140 # generating respawn requests, so we must be complete about it 141
142 - def get_info(self):
143 """ 144 Get all data about this process in dictionary form 145 @return: dictionary of all relevant process properties 146 @rtype: dict { str: val } 147 """ 148 info = { 149 'spawn_count': self.spawn_count, 150 'args': self.args, 151 'env': self.env, 152 'package': self.package, 153 'name': self.name, 154 'alive': self.is_alive(), 155 'respawn': self.respawn, 156 'required': self.required, 157 } 158 if self.exit_code is not None: 159 info['exit_code'] = self.exit_code 160 return info
161
162 - def start(self):
163 self.spawn_count += 1
164
165 - def is_alive(self):
166 return False
167
168 - def stop(self, errors=[]):
169 """ 170 Stop the process. Record any significant error messages in the errors parameter 171 172 @param errors: error messages. stop() will record messages into this list. 173 @type errors: [str] 174 """ 175 pass
176
177 - def get_exit_description(self):
178 if self.exit_code is not None: 179 if self.exit_code: 180 return 'process has died [exit code %s]'%self.exit_code 181 else: 182 # try not to scare users about process exit 183 return 'process has finished cleanly' 184 else: 185 return 'process has died'
186
187 -class DeadProcess(Process):
188 """ 189 Container class to maintain information about a process that has died. This 190 container allows us to delete the actual Process but still maintain the metadata 191 """
192 - def __init__(self, p):
193 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn) 194 self.exit_code = p.exit_code 195 self.lock = None 196 self.spawn_count = p.spawn_count 197 self.info = p.get_info()
198 - def get_info(self):
199 return self.info
200 - def start(self):
201 raise Exception("cannot call start on a dead process!")
202 - def is_alive(self):
203 return False
204
205 -class ProcessListener(object):
206 """ 207 Listener class for L{ProcessMonitor} 208 """ 209
210 - def process_died(self, process_name, exit_code):
211 """ 212 Notifies listener that process has died. This callback only 213 occurs for processes that die during normal process monitor 214 execution -- processes that are forcibly killed during 215 ProcessMonitor shutdown are not reported. 216 @param process_name: name of process 217 @type process_name: str 218 @param exit_code: exit code of process. If None, it means 219 that ProcessMonitor was unable to determine an exit code. 220 @type exit_code: int 221 """ 222 pass
223
224 -class ProcessMonitor(Thread):
225
226 - def __init__(self, name="ProcessMonitor"):
227 Thread.__init__(self, name=name) 228 self.procs = [] 229 self.plock = RLock() 230 self.is_shutdown = False 231 self.done = False 232 self.setDaemon(True) 233 self.listeners = [] 234 self.dead_list = [] 235 # #885: ensure core procs 236 self.core_procs = [] 237 # #642: flag to prevent process monitor exiting prematurely 238 self._registrations_complete = False
239
240 - def add_process_listener(self, l):
241 """ 242 Listener for process events. MUST be called before 243 ProcessMonitor is running.See ProcessListener class. 244 @param l: listener instance 245 @type l: L{ProcessListener} 246 """ 247 self.listeners.append(l)
248
249 - def register(self, p):
250 """ 251 Register process with L{ProcessMonitor} 252 @param p: Process 253 @type p: L{Process} 254 @raise PmonException: if process with same name is already registered 255 """ 256 e = None 257 with self.plock: 258 if self.has_process(p.name): 259 e = PmonException("cannot add process with duplicate name '%s'"%p.name) 260 elif self.is_shutdown: 261 e = PmonException("cannot add process [%s] after process monitor has been shut down"%p.name) 262 else: 263 self.procs.append(p) 264 if e: 265 raise e
266
267 - def register_core_proc(self, p):
268 """ 269 Register core process with ProcessMonitor. Coreprocesses 270 have special shutdown semantics. They are killed after all 271 other processes, in reverse order in which they are added. 272 @param p Process 273 @type p: L{Process} 274 @raise PmonException: if process with same name is already registered 275 """ 276 self.register(p) 277 self.core_procs.append(p)
278
279 - def registrations_complete(self):
280 """ 281 Inform the process monitor that registrations are complete. 282 After the registrations_complete flag is set, process monitor 283 will exit if there are no processes left to monitor. 284 """ 285 self._registrations_complete = True
286
287 - def unregister(self, p):
288 with self.plock: 289 self.procs.remove(p)
290
291 - def has_process(self, name):
292 """ 293 @return: True if process is still be monitored. If False, process 294 has died or was never registered with process 295 @rtype: bool 296 """ 297 return len([p for p in self.procs if p.name == name]) > 0
298
299 - def get_process(self, name):
300 """ 301 @return: process registered under \a name, or None 302 @rtype: L{Process} 303 """ 304 with self.plock: 305 v = [p for p in self.procs if p.name == name] 306 if v: 307 return v[0]
308
309 - def kill_process(self, name):
310 """ 311 Kill process that matches name. NOTE: a killed process will 312 continue to show up as active until the process monitor thread 313 has caught that it has died. 314 @param name: Process name 315 @type name: str 316 @return: True if a process named name was removed from 317 process monitor. A process is considered killed if its stop() 318 method was called. 319 @rtype: bool 320 """ 321 if not isinstance(name, basestring): 322 raise PmonException("kill_process takes in a process name but was given: %s"%name) 323 printlog("[%s] kill requested"%name) 324 with self.plock: 325 p = self.get_process(name) 326 if p: 327 try: 328 # no need to accumulate errors, so pass in [] 329 p.stop([]) 330 except Exception as e: 331 printerrlog("Exception: %s"%(str(e))) 332 return True 333 else: 334 return False
335
336 - def shutdown(self):
337 """ 338 Shutdown the process monitor thread 339 """ 340 self.is_shutdown = True
341
342 - def get_active_names(self):
343 """ 344 @return [str]: list of active process names 345 """ 346 with self.plock: 347 retval = [p.name for p in self.procs] 348 return retval
349
351 """ 352 @return: Two lists, where first 353 list of active process names along with the number of times 354 that process has been spawned. Second list contains dead process names 355 and their spawn count. 356 @rtype: [[(str, int),], [(str,int),]] 357 """ 358 with self.plock: 359 actives = [(p.name, p.spawn_count) for p in self.procs] 360 deads = [(p.name, p.spawn_count) for p in self.dead_list] 361 retval = [actives, deads] 362 return retval
363
364 - def run(self):
365 """ 366 thread routine of the process monitor. 367 """ 368 try: 369 #don't let exceptions bomb thread, interferes with exit 370 try: 371 self._run() 372 except: 373 traceback.print_exc() 374 finally: 375 self._post_run()
376
377 - def _run(self):
378 """ 379 Internal run loop of ProcessMonitor 380 """ 381 plock = self.plock 382 dead = [] 383 respawn = [] 384 while not self.is_shutdown: 385 with plock: #copy self.procs 386 procs = self.procs[:] 387 if self.is_shutdown: 388 break 389 390 for p in procs: 391 try: 392 if not p.is_alive(): 393 exit_code_str = p.get_exit_description() 394 if p.respawn: 395 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str)) 396 respawn.append(p) 397 elif p.required: 398 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80) 399 self.is_shutdown = True 400 else: 401 if p.exit_code: 402 printerrlog("[%s] %s"%(p.name, exit_code_str)) 403 else: 404 printlog_bold("[%s] %s"%(p.name, exit_code_str)) 405 dead.append(p) 406 407 ## no need for lock as we require listeners be 408 ## added before process monitor is launched 409 for l in self.listeners: 410 l.process_died(p.name, p.exit_code) 411 412 except Exception, e: 413 traceback.print_exc() 414 #don't respawn as this is an internal error 415 dead.append(p) 416 if self.is_shutdown: 417 break #stop polling 418 for d in dead: 419 try: 420 self.unregister(d) 421 # stop process, don't accumulate errors 422 d.stop([]) 423 424 # save process data to dead list 425 with plock: 426 self.dead_list.append(DeadProcess(d)) 427 except Exception as e: 428 printerrlog("Exception: %s"%(str(e))) 429 430 # dead check is to make sure that ProcessMonitor at least 431 # waits until its had at least one process before exiting 432 if self._registrations_complete and dead and not self.procs and not respawn: 433 printlog("all processes on machine have died, roslaunch will exit") 434 self.is_shutdown = True 435 del dead[:] 436 for r in respawn: 437 try: 438 if self.is_shutdown: 439 break 440 printlog("[%s] restarting process"%r.name) 441 # stop process, don't accumulate errors 442 r.stop([]) 443 r.start() 444 except: 445 traceback.print_exc() 446 del respawn[:] 447 time.sleep(0.1) #yield thread
448 #moved this to finally block of _post_run 449 #self._post_run() #kill all processes 450
451 - def _post_run(self):
452 # this is already true entering, but go ahead and make sure 453 self.is_shutdown = True 454 # killall processes on run exit 455 456 q = Queue.Queue() 457 q.join() 458 459 with self.plock: 460 # make copy of core_procs for threadsafe usage 461 core_procs = self.core_procs[:] 462 463 # enqueue all non-core procs in reverse order for parallel kill 464 # #526/885: ignore core procs 465 [q.put(p) for p in reversed(self.procs) if not p in core_procs] 466 467 # use 10 workers 468 killers = [] 469 for i in range(10): 470 t = _ProcessKiller(q, i) 471 killers.append(t) 472 t.start() 473 474 # wait for workers to finish 475 q.join() 476 shutdown_errors = [] 477 478 # accumulate all the shutdown errors 479 for t in killers: 480 shutdown_errors.extend(t.errors) 481 del killers[:] 482 483 # #526/885: kill core procs last 484 # we don't want to parallelize this as the master has to be last 485 for p in reversed(core_procs): 486 _kill_process(p, shutdown_errors) 487 488 # delete everything except dead_list 489 with self.plock: 490 del core_procs[:] 491 del self.procs[:] 492 del self.core_procs[:] 493 494 self.done = True 495 496 if shutdown_errors: 497 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
498
499 -def _kill_process(p, errors):
500 """ 501 Routine for kill Process p with appropriate logging to screen and logfile 502 503 @param p: process to kill 504 @type p: Process 505 @param errors: list of error messages from killed process 506 @type errors: [str] 507 """ 508 try: 509 printlog("[%s] killing on exit"%p.name) 510 # we accumulate errors from each process so that we can print these at the end 511 p.stop(errors) 512 except Exception as e: 513 printerrlog("Exception: %s"%(str(e)))
514
515 -class _ProcessKiller(Thread):
516
517 - def __init__(self, q, i):
518 Thread.__init__(self, name="ProcessKiller-%s"%i) 519 self.q = q 520 self.errors = []
521
522 - def run(self):
523 q = self.q 524 while not q.empty(): 525 try: 526 p = q.get(False) 527 _kill_process(p, self.errors) 528 q.task_done() 529 except Queue.Empty: 530 pass
531