1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Process monitoring implementation for roslaunch.
37 """
38
39 from __future__ import with_statement
40
41 import os
42 import sys
43 import time
44 import traceback
45 import logging
46 import Queue
47 import signal
48 import atexit
49 from threading import Thread, RLock, Lock
50
51 import roslib
52 from roslaunch.core import printlog, printlog_bold, printerrlog, RLException
53
54 logger = logging.getLogger("roslaunch.pmon")
55
57 """
58 Exception to indicate that a process launch has failed in a fatal
59 manner (i.e. relaunch is unlikely to succeed)
60 """
61 pass
62
63
64
65 _pmons = []
66 _pmon_counter = 0
88
90 """
91 @param process_monitor: process monitor to kill
92 @type process_monitor: L{ProcessMonitor}
93 @return: True if process_monitor was successfully
94 shutdown. False if it could not be shutdown cleanly or if there is
95 a problem with process_monitor
96 parameter. shutdown_process_monitor() does not throw any exceptions
97 as this is shutdown-critical code.
98 @rtype: bool
99 """
100 try:
101 if process_monitor is None or process_monitor.is_shutdown:
102 return False
103
104
105
106
107
108
109 process_monitor.shutdown()
110
111 process_monitor.join(20.0)
112 if process_monitor.isAlive():
113 logger.error("shutdown_process_monitor: ProcessMonitor shutdown failed!")
114 return False
115 else:
116 logger.debug("shutdown_process_monitor: ProcessMonitor shutdown succeeded")
117 return True
118 except Exception, e:
119 print >> sys.stderr, "exception in shutdown_process_monitor: %s"%e
120 traceback.print_exc()
121 return False
122
123 _shutdown_lock = Lock()
139
140 _signal_chain = {}
141 _shutting_down = False
154
155 _sig_initialized = False
166
167
168
170 """
171 Basic process representation for L{ProcessMonitor}. Must be subclassed
172 to provide actual start()/stop() implementations.
173
174 Constructor *must* be called from the Python Main thread in order
175 for signal handlers to register properly.
176 """
177
178 - def __init__(self, package, name, args, env, respawn=False, required=False):
179 self.package = package
180 self.name = name
181 self.args = args
182 self.env = env
183 self.respawn = respawn
184 self.required = required
185 self.lock = Lock()
186 self.exit_code = None
187
188 self.spawn_count = 0
189
190 _init_signal_handlers()
191
193 return "Process<%s>"%(self.name)
194
195
196
197
199 """
200 Get all data about this process in dictionary form
201 @return: dictionary of all relevant process properties
202 @rtype: dict { str: val }
203 """
204 info = {
205 'spawn_count': self.spawn_count,
206 'args': self.args,
207 'env': self.env,
208 'package': self.package,
209 'name': self.name,
210 'alive': self.is_alive(),
211 'respawn': self.respawn,
212 'required': self.required,
213 }
214 if self.exit_code is not None:
215 info['exit_code'] = self.exit_code
216 return info
217
219 self.spawn_count += 1
220
223
224 - def stop(self, errors=[]):
225 """
226 Stop the process. Record any significant error messages in the errors parameter
227
228 @param errors: error messages. stop() will record messages into this list.
229 @type errors: [str]
230 """
231 pass
232
234 if self.exit_code is not None:
235 if self.exit_code:
236 return 'process has died [exit code %s]'%self.exit_code
237 else:
238
239 return 'process has finished cleanly'
240 else:
241 return 'process has died'
242
244 """
245 Container class to maintain information about a process that has died. This
246 container allows us to delete the actual Process but still maintain the metadata
247 """
249 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn)
250 self.exit_code = p.exit_code
251 self.lock = None
252 self.spawn_count = p.spawn_count
253 self.info = p.get_info()
257 raise Exception("cannot call start on a dead process!")
260
262 """
263 Listener class for L{ProcessMonitor}
264 """
265
267 """
268 Notifies listener that process has died. This callback only
269 occurs for processes that die during normal process monitor
270 execution -- processes that are forcibly killed during
271 ProcessMonitor shutdown are not reported.
272 @param process_name: name of process
273 @type process_name: str
274 @param exit_code: exit code of process. If None, it means
275 that ProcessMonitor was unable to determine an exit code.
276 @type exit_code: int
277 """
278 pass
279
281
282 - def __init__(self, name="ProcessMonitor"):
283 Thread.__init__(self, name=name)
284 self.procs = []
285 self.plock = RLock()
286 self.is_shutdown = False
287 self.done = False
288 self.setDaemon(True)
289 self.reacquire_signals = set()
290 self.listeners = []
291 self.dead_list = []
292
293 self.core_procs = []
294
295 self._registrations_complete = False
296
297 logger.info("created process monitor %s"%self)
298
300 """
301 Listener for process events. MUST be called before
302 ProcessMonitor is running.See ProcessListener class.
303 @param l: listener instance
304 @type l: L{ProcessListener}
305 """
306 self.listeners.append(l)
307
309 """
310 Register process with L{ProcessMonitor}
311 @param p: Process
312 @type p: L{Process}
313 @raise RLException: if process with same name is already registered
314 """
315 logger.info("ProcessMonitor.register[%s]"%p.name)
316 e = None
317 with self.plock:
318 if self.has_process(p.name):
319 e = RLException("cannot add process with duplicate name '%s'"%p.name)
320 elif self.is_shutdown:
321 e = RLException("cannot add process [%s] after process monitor has been shut down"%p.name)
322 else:
323 self.procs.append(p)
324 if e:
325 logger.error("ProcessMonitor.register[%s] failed %s"%(p.name, e))
326 raise e
327 else:
328 logger.info("ProcessMonitor.register[%s] complete"%p.name)
329
331 """
332 Register core process with ProcessMonitor. Coreprocesses
333 have special shutdown semantics. They are killed after all
334 other processes, in reverse order in which they are added.
335 @param p Process
336 @type p: L{Process}
337 @raise RLException: if process with same name is already registered
338 """
339 self.register(p)
340 self.core_procs.append(p)
341
343 """
344 Inform the process monitor that registrations are complete.
345 After the registrations_complete flag is set, process monitor
346 will exit if there are no processes left to monitor.
347 """
348 self._registrations_complete = True
349 logger.info("registrations completed %s"%self)
350
352 logger.info("ProcessMonitor.unregister[%s] starting"%p.name)
353 with self.plock:
354 self.procs.remove(p)
355 logger.info("ProcessMonitor.unregister[%s] complete"%p.name)
356
358 """
359 @return: True if process is still be monitored. If False, process
360 has died or was never registered with process
361 @rtype: bool
362 """
363 return len([p for p in self.procs if p.name == name]) > 0
364
366 """
367 @return: process registered under \a name, or None
368 @rtype: L{Process}
369 """
370 with self.plock:
371 v = [p for p in self.procs if p.name == name]
372 if v:
373 return v[0]
374
376 """
377 @return: True if ProcessMonitor has tasks that need to be run in the main thread
378 @rtype: bool
379 """
380 return len(self.reacquire_signals)
381
383 """
384 Execute tasks that need to be run in the main thread. Must be
385 called from main thread.
386 """
387
388 sigs = [s for s in self.reacquire_signals]
389 for s in sigs:
390 _signal_chain[s] = signal.signal(s, rl_signal)
391 self.reacquire_signals.remove(s)
392
394 """
395 Kill process that matches name. NOTE: a killed process will
396 continue to show up as active until the process monitor thread
397 has caught that it has died.
398 @param name: Process name
399 @type name: str
400 @return: True if a process named name was removed from
401 process monitor. A process is considered killed if its stop()
402 method was called.
403 @rtype: bool
404 """
405 if not isinstance(name, basestring):
406 raise RLException("kill_process takes in a process name but was given: %s"%name)
407 logger.debug("ProcessMonitor.kill_process[%s]"%name)
408 printlog("[%s] kill requested"%name)
409 with self.plock:
410 p = self.get_process(name)
411 if p:
412 try:
413
414 p.stop([])
415 except:
416 logger.error(traceback.format_exc())
417 return True
418 else:
419 return False
420
422 """
423 Shutdown the process monitor thread
424 """
425 logger.info("ProcessMonitor.shutdown %s"%self)
426 self.is_shutdown = True
427
429 """
430 @return [str]: list of active process names
431 """
432 with self.plock:
433 retval = [p.name for p in self.procs]
434 return retval
435
437 """
438 @return: Two lists, where first
439 list of active process names along with the number of times
440 that process has been spawned. Second list contains dead process names
441 and their spawn count.
442 @rtype: [[(str, int),], [(str,int),]]
443 """
444 with self.plock:
445 actives = [(p.name, p.spawn_count) for p in self.procs]
446 deads = [(p.name, p.spawn_count) for p in self.dead_list]
447 retval = [actives, deads]
448 return retval
449
451 """
452 run() occurs in a separate thread and cannot do certain signal-related
453 work. The main thread of the application must call mainthread_spin()
454 or mainthread_spin_once() in order to perform these jobs.
455 """
456 if not self.done:
457 if self.has_main_thread_jobs():
458 self.do_main_thread_jobs()
459 return True
460 else:
461 return False
462
463 - def mainthread_spin(self):
464 """
465 run() occurs in a separate thread and cannot do certain signal-related
466 work. The main thread of the application must call mainthread_spin()
467 or mainthread_spin_once() in order to perform these jobs. mainthread_spin()
468 blocks until the process monitor is complete.
469 """
470 while not self.done:
471 time.sleep(0.1)
472 if self.has_main_thread_jobs():
473 self.do_main_thread_jobs()
474
476 """
477 thread routine of the process monitor. NOTE: you must still
478 call mainthread_spin or mainthread_spin_once() from the main
479 thread in order to pick up main thread work from the process
480 monitor.
481 """
482 try:
483
484 try:
485 self._run()
486 except:
487 logger.error(traceback.format_exc())
488 traceback.print_exc()
489 finally:
490 self._post_run()
491
493 """
494 Internal run loop of ProcessMonitor
495 """
496 plock = self.plock
497 dead = []
498 respawn = []
499 while not self.is_shutdown:
500 with plock:
501 procs = self.procs[:]
502 if self.is_shutdown:
503 break
504
505
506
507 for s in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]:
508 if signal.getsignal(s) != rl_signal:
509 self.reacquire_signals.add(s)
510
511 for p in procs:
512 try:
513 if not p.is_alive():
514 logger.debug("Process[%s] has died, respawn=%s, required=%s, exit_code=%s",p.name, p.respawn, p.required, p.exit_code)
515 exit_code_str = p.get_exit_description()
516 if p.respawn:
517 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str))
518 respawn.append(p)
519 elif p.required:
520 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80)
521 self.is_shutdown = True
522 else:
523 if p.exit_code:
524 printerrlog("[%s] %s"%(p.name, exit_code_str))
525 else:
526 printlog_bold("[%s] %s"%(p.name, exit_code_str))
527 dead.append(p)
528
529
530
531 for l in self.listeners:
532 l.process_died(p.name, p.exit_code)
533
534 except Exception, e:
535 traceback.print_exc()
536
537 dead.append(p)
538 if self.is_shutdown:
539 break
540 for d in dead:
541 try:
542 self.unregister(d)
543
544 d.stop([])
545
546
547 with plock:
548 self.dead_list.append(DeadProcess(d))
549 except:
550 logger.error(traceback.format_exc())
551
552
553
554 if self._registrations_complete and dead and not self.procs and not respawn:
555 printlog("all processes on machine have died, roslaunch will exit")
556 self.is_shutdown = True
557 del dead[:]
558 for r in respawn:
559 try:
560 if self.is_shutdown:
561 break
562 printlog("[%s] restarting process"%r.name)
563
564 r.stop([])
565 r.start()
566 except:
567 traceback.print_exc()
568 logger.error("Restart failed %s",traceback.format_exc())
569 del respawn[:]
570 time.sleep(0.1)
571
572
573
574 - def _post_run(self):
575 logger.info("ProcessMonitor._post_run %s"%self)
576
577 self.is_shutdown = True
578
579
580 q = Queue.Queue()
581 q.join()
582
583 with self.plock:
584
585 core_procs = self.core_procs[:]
586 logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, self.procs))
587
588
589
590 [q.put(p) for p in reversed(self.procs) if not p in core_procs]
591
592
593 killers = []
594 for i in range(10):
595 t = _ProcessKiller(q, i)
596 killers.append(t)
597 t.start()
598
599
600 q.join()
601 shutdown_errors = []
602
603
604 for t in killers:
605 shutdown_errors.extend(t.errors)
606 del killers[:]
607
608
609
610 for p in reversed(core_procs):
611 _kill_process(p, shutdown_errors)
612
613
614 logger.info("ProcessMonitor exit: cleaning up data structures and signals")
615 with self.plock:
616 del core_procs[:]
617 del self.procs[:]
618 del self.core_procs[:]
619
620 reacquire_signals = self.reacquire_signals
621 if reacquire_signals:
622 reacquire_signals.clear()
623 logger.info("ProcessMonitor exit: pmon has shutdown")
624 self.done = True
625
626 if shutdown_errors:
627 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
628
630 """
631 Routine for kill Process p with appropriate logging to screen and logfile
632
633 @param p: process to kill
634 @type p: Process
635 @param errors: list of error messages from killed process
636 @type errors: [str]
637 """
638 try:
639 logger.info("ProcessMonitor exit: killing %s", p.name)
640 printlog("[%s] killing on exit"%p.name)
641
642 p.stop(errors)
643 except:
644 traceback.print_exc()
645 logger.error(traceback.format_exc())
646
648
650 Thread.__init__(self, name="ProcessKiller-%s"%i)
651 self.q = q
652 self.errors = []
653
655 q = self.q
656 while not q.empty():
657 try:
658 p = q.get(False)
659 _kill_process(p, self.errors)
660 q.task_done()
661 except Queue.Empty:
662 pass
663