1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Process monitor
37 """
38
39 from __future__ import with_statement
40
41 import os
42 import sys
43 import time
44 import traceback
45 try:
46 from queue import Empty, Queue
47 except ImportError:
48 from Queue import Empty, Queue
49 import atexit
50 from threading import Thread, RLock, Lock
51
52 from .core import printlog, printlog_bold, printerrlog
53
55
57 """
58 Exception to indicate that a process launch has failed in a fatal
59 manner (i.e. relaunch is unlikely to succeed)
60 """
61 pass
62
63
64
65 _pmons = []
66 _pmon_counter = 0
67 _shutting_down = False
82
84 """
85 @param process_monitor: process monitor to kill
86 @type process_monitor: L{ProcessMonitor}
87 @return: True if process_monitor was successfully
88 shutdown. False if it could not be shutdown cleanly or if there is
89 a problem with process_monitor
90 parameter. shutdown_process_monitor() does not throw any exceptions
91 as this is shutdown-critical code.
92 @rtype: bool
93 """
94 try:
95 if process_monitor is None or process_monitor.is_shutdown:
96 return False
97
98 process_monitor.shutdown()
99 process_monitor.join(20.0)
100 if process_monitor.isAlive():
101 return False
102 else:
103 return True
104 except Exception as e:
105 return False
106
107 _shutdown_lock = Lock()
116
117 atexit.register(pmon_shutdown)
118
119
120
122 """
123 Basic process representation for L{ProcessMonitor}. Must be subclassed
124 to provide actual start()/stop() implementations.
125 """
126
127 - def __init__(self, package, name, args, env, respawn=False, required=False):
128 self.package = package
129 self.name = name
130 self.args = args
131 self.env = env
132 self.respawn = respawn
133 self.required = required
134 self.lock = Lock()
135 self.exit_code = None
136
137 self.spawn_count = 0
138
140 return "Process<%s>"%(self.name)
141
142
143
144
146 """
147 Get all data about this process in dictionary form
148 @return: dictionary of all relevant process properties
149 @rtype: dict { str: val }
150 """
151 info = {
152 'spawn_count': self.spawn_count,
153 'args': self.args,
154 'env': self.env,
155 'package': self.package,
156 'name': self.name,
157 'alive': self.is_alive(),
158 'respawn': self.respawn,
159 'required': self.required,
160 }
161 if self.exit_code is not None:
162 info['exit_code'] = self.exit_code
163 return info
164
166 self.spawn_count += 1
167
170
171 - def stop(self, errors=[]):
172 """
173 Stop the process. Record any significant error messages in the errors parameter
174
175 @param errors: error messages. stop() will record messages into this list.
176 @type errors: [str]
177 """
178 pass
179
181 if self.exit_code is not None:
182 if self.exit_code:
183 return 'process has died [exit code %s]'%self.exit_code
184 else:
185
186 return 'process has finished cleanly'
187 else:
188 return 'process has died'
189
191 """
192 Container class to maintain information about a process that has died. This
193 container allows us to delete the actual Process but still maintain the metadata
194 """
196 super(DeadProcess, self).__init__(p.package, p.name, p.args, p.env, p.respawn)
197 self.exit_code = p.exit_code
198 self.lock = None
199 self.spawn_count = p.spawn_count
200 self.info = p.get_info()
204 raise Exception("cannot call start on a dead process!")
207
209 """
210 Listener class for L{ProcessMonitor}
211 """
212
214 """
215 Notifies listener that process has died. This callback only
216 occurs for processes that die during normal process monitor
217 execution -- processes that are forcibly killed during
218 ProcessMonitor shutdown are not reported.
219 @param process_name: name of process
220 @type process_name: str
221 @param exit_code: exit code of process. If None, it means
222 that ProcessMonitor was unable to determine an exit code.
223 @type exit_code: int
224 """
225 pass
226
228
229 - def __init__(self, name="ProcessMonitor"):
230 Thread.__init__(self, name=name)
231 self.procs = []
232 self.plock = RLock()
233 self.is_shutdown = False
234 self.done = False
235 self.setDaemon(True)
236 self.listeners = []
237 self.dead_list = []
238
239 self.core_procs = []
240
241 self._registrations_complete = False
242
244 """
245 Listener for process events. MUST be called before
246 ProcessMonitor is running.See ProcessListener class.
247 @param l: listener instance
248 @type l: L{ProcessListener}
249 """
250 self.listeners.append(l)
251
253 """
254 Register process with L{ProcessMonitor}
255 @param p: Process
256 @type p: L{Process}
257 @raise PmonException: if process with same name is already registered
258 """
259 e = None
260 with self.plock:
261 if self.has_process(p.name):
262 e = PmonException("cannot add process with duplicate name '%s'"%p.name)
263 elif self.is_shutdown:
264 e = PmonException("cannot add process [%s] after process monitor has been shut down"%p.name)
265 else:
266 self.procs.append(p)
267 if e:
268 raise e
269
271 """
272 Register core process with ProcessMonitor. Coreprocesses
273 have special shutdown semantics. They are killed after all
274 other processes, in reverse order in which they are added.
275 @param p Process
276 @type p: L{Process}
277 @raise PmonException: if process with same name is already registered
278 """
279 self.register(p)
280 self.core_procs.append(p)
281
283 """
284 Inform the process monitor that registrations are complete.
285 After the registrations_complete flag is set, process monitor
286 will exit if there are no processes left to monitor.
287 """
288 self._registrations_complete = True
289
291 with self.plock:
292 self.procs.remove(p)
293
295 """
296 @return: True if process is still be monitored. If False, process
297 has died or was never registered with process
298 @rtype: bool
299 """
300 return len([p for p in self.procs if p.name == name]) > 0
301
303 """
304 @return: process registered under \a name, or None
305 @rtype: L{Process}
306 """
307 with self.plock:
308 v = [p for p in self.procs if p.name == name]
309 if v:
310 return v[0]
311
313 """
314 Kill process that matches name. NOTE: a killed process will
315 continue to show up as active until the process monitor thread
316 has caught that it has died.
317 @param name: Process name
318 @type name: str
319 @return: True if a process named name was removed from
320 process monitor. A process is considered killed if its stop()
321 method was called.
322 @rtype: bool
323 """
324 if not isinstance(name, basestring):
325 raise PmonException("kill_process takes in a process name but was given: %s"%name)
326 printlog("[%s] kill requested"%name)
327 with self.plock:
328 p = self.get_process(name)
329 if p:
330 try:
331
332 p.stop([])
333 except Exception as e:
334 printerrlog("Exception: %s"%(str(e)))
335 return True
336 else:
337 return False
338
340 """
341 Shutdown the process monitor thread
342 """
343 self.is_shutdown = True
344
346 """
347 @return [str]: list of active process names
348 """
349 with self.plock:
350 retval = [p.name for p in self.procs]
351 return retval
352
354 """
355 @return: Two lists, where first
356 list of active process names along with the number of times
357 that process has been spawned. Second list contains dead process names
358 and their spawn count.
359 @rtype: [[(str, int),], [(str,int),]]
360 """
361 with self.plock:
362 actives = [(p.name, p.spawn_count) for p in self.procs]
363 deads = [(p.name, p.spawn_count) for p in self.dead_list]
364 retval = [actives, deads]
365 return retval
366
368 """
369 thread routine of the process monitor.
370 """
371 try:
372
373 try:
374 self._run()
375 except:
376 traceback.print_exc()
377 finally:
378 self._post_run()
379
381 """
382 Internal run loop of ProcessMonitor
383 """
384 plock = self.plock
385 dead = []
386 respawn = []
387 while not self.is_shutdown:
388 with plock:
389 procs = self.procs[:]
390 if self.is_shutdown:
391 break
392
393 for p in procs:
394 try:
395 if not p.is_alive():
396 exit_code_str = p.get_exit_description()
397 if p.respawn:
398 printlog_bold("[%s] %s\nrespawning..."%(p.name, exit_code_str))
399 respawn.append(p)
400 elif p.required:
401 printerrlog('='*80+"REQUIRED process [%s] has died!\n%s\nInitiating shutdown!\n"%(p.name, exit_code_str)+'='*80)
402 self.is_shutdown = True
403 else:
404 if p.exit_code:
405 printerrlog("[%s] %s"%(p.name, exit_code_str))
406 else:
407 printlog_bold("[%s] %s"%(p.name, exit_code_str))
408 dead.append(p)
409
410
411
412 for l in self.listeners:
413 l.process_died(p.name, p.exit_code)
414
415 except Exception as e:
416 traceback.print_exc()
417
418 dead.append(p)
419 if self.is_shutdown:
420 break
421 for d in dead:
422 try:
423 self.unregister(d)
424
425 d.stop([])
426
427
428 with plock:
429 self.dead_list.append(DeadProcess(d))
430 except Exception as e:
431 printerrlog("Exception: %s"%(str(e)))
432
433
434
435 if self._registrations_complete and dead and not self.procs and not respawn:
436 printlog("all processes on machine have died, roslaunch will exit")
437 self.is_shutdown = True
438 del dead[:]
439 for r in respawn:
440 try:
441 if self.is_shutdown:
442 break
443 printlog("[%s] restarting process"%r.name)
444
445 r.stop([])
446 r.start()
447 except:
448 traceback.print_exc()
449 del respawn[:]
450 time.sleep(0.1)
451
452
453
454 - def _post_run(self):
455
456 self.is_shutdown = True
457
458
459 q = Queue()
460 q.join()
461
462 with self.plock:
463
464 core_procs = self.core_procs[:]
465
466
467
468 [q.put(p) for p in reversed(self.procs) if not p in core_procs]
469
470
471 killers = []
472 for i in range(10):
473 t = _ProcessKiller(q, i)
474 killers.append(t)
475 t.start()
476
477
478 q.join()
479 shutdown_errors = []
480
481
482 for t in killers:
483 shutdown_errors.extend(t.errors)
484 del killers[:]
485
486
487
488 for p in reversed(core_procs):
489 _kill_process(p, shutdown_errors)
490
491
492 with self.plock:
493 del core_procs[:]
494 del self.procs[:]
495 del self.core_procs[:]
496
497 self.done = True
498
499 if shutdown_errors:
500 printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
501
503 """
504 Routine for kill Process p with appropriate logging to screen and logfile
505
506 @param p: process to kill
507 @type p: Process
508 @param errors: list of error messages from killed process
509 @type errors: [str]
510 """
511 try:
512 printlog("[%s] killing on exit"%p.name)
513
514 p.stop(errors)
515 except Exception as e:
516 printerrlog("Exception: %s"%(str(e)))
517
519
521 Thread.__init__(self, name="ProcessKiller-%s"%i)
522 self.q = q
523 self.errors = []
524
526 q = self.q
527 while not q.empty():
528 try:
529 p = q.get(False)
530 _kill_process(p, self.errors)
531 q.task_done()
532 except Empty:
533 pass
534