1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Local process implementation for running and monitoring nodes.
37 """
38
39 import errno
40 import os
41 import signal
42 import subprocess
43 import time
44 import traceback
45
46 import rospkg
47
48 from roslaunch.core import *
49
50 from roslaunch.node_args import create_local_process_args
51 from roslaunch.pmon import Process, FatalProcessLaunch
52
53 from rosmaster.master_api import NUM_WORKERS
54
55 import logging
56 _logger = logging.getLogger("roslaunch")
57
58 DEFAULT_TIMEOUT_SIGINT = 15.0
59 DEFAULT_TIMEOUT_SIGTERM = 2.0
60
61 _counter = 0
66
68 """
69 Launch a master
70 @param type_: name of master executable (currently just Master.ZENMASTER)
71 @type type_: str
72 @param ros_root: ROS_ROOT environment setting
73 @type ros_root: str
74 @param port: port to launch master on
75 @type port: int
76 @param num_workers: number of worker threads.
77 @type num_workers: int
78 @param timeout: socket timeout for connections.
79 @type timeout: float
80 @param master_logger_level: rosmaster.master logger debug level
81 @type master_logger_level=: str or False
82 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds).
83 @type sigint_timeout: float
84 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds).
85 @type sigterm_timeout: float
86 @raise RLException: if type_ or port is invalid or sigint_timeout or sigterm_timeout are nonpositive.
87 """
88 if port < 1 or port > 65535:
89 raise RLException("invalid port assignment: %s"%port)
90
91 _logger.info("create_master_process: %s, %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout, master_logger_level)
92
93 master = type_
94
95 if type_ in [Master.ROSMASTER, Master.ZENMASTER]:
96 package = 'rosmaster'
97 args = [master, '--core', '-p', str(port), '-w', str(num_workers)]
98 if timeout is not None:
99 args += ['-t', str(timeout)]
100 if master_logger_level:
101 args += ['--master-logger-level', str(master_logger_level)]
102 else:
103 raise RLException("unknown master typ_: %s"%type_)
104
105 _logger.info("process[master]: launching with args [%s]"%args)
106 log_output = False
107 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True,
108 sigint_timeout=sigint_timeout, sigterm_timeout=sigterm_timeout)
109
111 """
112 Factory for generating processes for launching local ROS
113 nodes. Also registers the process with the L{ProcessMonitor} so that
114 events can be generated when the process dies.
115
116 @param run_id: run_id of launch
117 @type run_id: str
118 @param node: node to launch. Node name must be assigned.
119 @type node: L{Node}
120 @param master_uri: API URI for master node
121 @type master_uri: str
122 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds).
123 @type sigint_timeout: float
124 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds).
125 @type sigterm_timeout: float
126 @return: local process instance
127 @rtype: L{LocalProcess}
128 @raise NodeParamsException: If the node's parameters are improperly specific
129 @raise RLException: If sigint_timeout or sigterm_timeout are nonpositive.
130 """
131 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri)
132
133 machine = node.machine
134 if machine is None:
135 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type))
136 if not node.name:
137 raise ValueError("node name must be assigned")
138
139
140 env = setup_env(node, machine, master_uri)
141
142 if not node.name:
143 raise ValueError("node name must be assigned")
144
145
146
147
148 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter())
149 if name[0] == '/':
150 name = name[1:]
151
152 _logger.info('process[%s]: env[%s]', name, env)
153
154 args = create_local_process_args(node, machine)
155
156 _logger.info('process[%s]: args[%s]', name, args)
157
158
159 log_output = node.output != 'screen'
160 _logger.debug('process[%s]: returning LocalProcess wrapper')
161 return LocalProcess(run_id, node.package, name, args, env, log_output, \
162 respawn=node.respawn, respawn_delay=node.respawn_delay, \
163 required=node.required, cwd=node.cwd, \
164 sigint_timeout=sigint_timeout, sigterm_timeout=sigterm_timeout)
165
166
168 """
169 Process launched on local machine
170 """
171
172 - def __init__(self, run_id, package, name, args, env, log_output,
173 respawn=False, respawn_delay=0.0, required=False, cwd=None,
174 is_node=True, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
175 """
176 @param run_id: unique run ID for this roslaunch. Used to
177 generate log directory location. run_id may be None if this
178 feature is not being used.
179 @type run_id: str
180 @param package: name of package process is part of
181 @type package: str
182 @param name: name of process
183 @type name: str
184 @param args: list of arguments to process
185 @type args: [str]
186 @param env: environment dictionary for process
187 @type env: {str : str}
188 @param log_output: if True, log output streams of process
189 @type log_output: bool
190 @param respawn: respawn process if it dies (default is False)
191 @type respawn: bool
192 @param respawn_delay: respawn process after a delay
193 @type respawn_delay: float
194 @param cwd: working directory of process, or None
195 @type cwd: str
196 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True
197 @type is_node: False
198 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds).
199 @type sigint_timeout: float
200 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds).
201 @type sigterm_timeout: float
202 @raise RLException: If sigint_timeout or sigterm_timeout are nonpositive.
203 """
204 super(LocalProcess, self).__init__(package, name, args, env,
205 respawn, respawn_delay, required)
206
207 if sigint_timeout <= 0:
208 raise RLException("sigint_timeout must be a positive number, received %f" % sigint_timeout)
209 if sigterm_timeout <= 0:
210 raise RLException("sigterm_timeout must be a positive number, received %f" % sigterm_timeout)
211
212 self.run_id = run_id
213 self.popen = None
214 self.log_output = log_output
215 self.started = False
216 self.stopped = False
217 self.cwd = cwd
218 self.log_dir = None
219 self.pid = -1
220 self.is_node = is_node
221 self.sigint_timeout = sigint_timeout
222 self.sigterm_timeout = sigterm_timeout
223
224
226 """
227 Get all data about this process in dictionary form
228 """
229 info = super(LocalProcess, self).get_info()
230 info['pid'] = self.pid
231 if self.run_id:
232 info['run_id'] = self.run_id
233 info['log_output'] = self.log_output
234 if self.cwd is not None:
235 info['cwd'] = self.cwd
236 return info
237
284
286 """
287 Start the process.
288
289 @raise FatalProcessLaunch: if process cannot be started and it
290 is not likely to ever succeed
291 """
292 super(LocalProcess, self).start()
293 try:
294 self.lock.acquire()
295 if self.started:
296 _logger.info("process[%s]: restarting os process", self.name)
297 else:
298 _logger.info("process[%s]: starting os process", self.name)
299 self.started = self.stopped = False
300
301 full_env = self.env
302
303
304 try:
305 logfileout, logfileerr = self._configure_logging()
306 except Exception as e:
307 _logger.error(traceback.format_exc())
308 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e)))
309
310
311
312 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE
313
314 if self.cwd == 'node':
315 cwd = os.path.dirname(self.args[0])
316 elif self.cwd == 'cwd':
317 cwd = os.getcwd()
318 elif self.cwd == 'ros-root':
319 cwd = get_ros_root()
320 else:
321 cwd = rospkg.get_ros_home()
322 if not os.path.exists(cwd):
323 try:
324 os.makedirs(cwd)
325 except OSError:
326
327 pass
328
329 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args)
330 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd)
331
332 try:
333 preexec_function = os.setsid
334 close_file_descriptor = True
335 except AttributeError:
336 preexec_function = None
337 close_file_descriptor = False
338
339 try:
340 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=close_file_descriptor, preexec_fn=preexec_function)
341 except OSError as e:
342 self.started = True
343 _logger.error("OSError(%d, %s)", e.errno, e.strerror)
344 if e.errno == errno.ENOEXEC:
345 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name)
346 elif e.errno == errno.ENOENT:
347 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run:
348
349 %s
350
351 Please make sure that all the executables in this command exist and have
352 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args)))
353 else:
354 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror))
355
356 self.started = True
357
358
359
360 poll_result = self.popen.poll()
361 if poll_result is None or poll_result == 0:
362 self.pid = self.popen.pid
363 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid))
364 return True
365 else:
366 printerrlog("failed to start local process: %s"%(' '.join(self.args)))
367 return False
368 finally:
369 self.lock.release()
370
372 return self.name.replace('/', '-')
373
375 """
376 @return: True if process is still running
377 @rtype: bool
378 """
379 if not self.started:
380 return True
381 if self.stopped or self.popen is None:
382 if self.time_of_death is None:
383 self.time_of_death = time.time()
384 return False
385 self.exit_code = self.popen.poll()
386 if self.exit_code is not None:
387 if self.time_of_death is None:
388 self.time_of_death = time.time()
389 return False
390 return True
391
393 """
394 @return: human-readable description of exit state
395 @rtype: str
396 """
397 if self.exit_code is None:
398 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args))
399 elif self.exit_code != 0:
400 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args))
401 else:
402 output = 'process has finished cleanly'
403
404 if self.log_dir:
405
406 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name()))
407 return output
408
410 """
411 UNIX implementation of process killing
412
413 @param errors: error messages. stop() will record messages into this list.
414 @type errors: [str]
415 """
416 self.exit_code = self.popen.poll()
417 if self.exit_code is not None:
418 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
419
420 self.popen = None
421 self.stopped = True
422 return
423
424 pid = self.popen.pid
425 pgid = os.getpgid(pid)
426 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid)
427
428 try:
429
430 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid)
431 os.killpg(pgid, signal.SIGINT)
432 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid)
433 timeout_t = time.time() + self.sigint_timeout
434 retcode = self.popen.poll()
435 while time.time() < timeout_t and retcode is None:
436 time.sleep(0.1)
437 retcode = self.popen.poll()
438
439 if retcode is None:
440 printerrlog("[%s] escalating to SIGTERM"%self.name)
441 timeout_t = time.time() + self.sigterm_timeout
442 os.killpg(pgid, signal.SIGTERM)
443 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid))
444 retcode = self.popen.poll()
445 while time.time() < timeout_t and retcode is None:
446 time.sleep(0.2)
447 _logger.debug('poll for retcode')
448 retcode = self.popen.poll()
449 if retcode is None:
450 printerrlog("[%s] escalating to SIGKILL"%self.name)
451 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
452 try:
453 os.killpg(pgid, signal.SIGKILL)
454 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid))
455
456
457
458 _logger.info("process[%s]: sent SIGKILL", self.name)
459 except OSError as e:
460 if e.args[0] == 3:
461 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
462 else:
463 printerrlog("errors shutting down [%s], see log for details"%self.name)
464 _logger.error(traceback.format_exc())
465 else:
466 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
467 else:
468 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
469
470 finally:
471 self.popen = None
472
474 """
475 Win32 implementation of process killing. In part, refer to
476
477 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
478
479 Note that it doesn't work as completely as _stop_unix as it can't utilise
480 group id's. This means that any program which forks children underneath it
481 won't get caught by this kill mechanism.
482
483 @param errors: error messages. stop() will record messages into this list.
484 @type errors: [str]
485 """
486 self.exit_code = self.popen.poll()
487 if self.exit_code is not None:
488 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
489 self.popen = None
490 self.stopped = True
491 return
492
493 pid = self.popen.pid
494 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid)
495
496 try:
497
498 _logger.info("[%s] running taskkill pid tree [%s]", self.name, pid)
499 subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
500 _logger.info("[%s] run taskkill pid tree [%s]", self.name, pid)
501 timeout_t = time.time() + self.sigint_timeout
502 retcode = self.popen.poll()
503 while time.time() < timeout_t and retcode is None:
504 time.sleep(0.1)
505 retcode = self.popen.poll()
506 if retcode is None:
507 printerrlog("errors shutting down [%s], see log for details"%self.name)
508 _logger.error("errors shutting down [%s], see log for details"%self.name)
509 else:
510 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
511 finally:
512 self.popen = None
513
514 - def stop(self, errors=None):
515 """
516 Stop the process. Record any significant error messages in the errors parameter
517
518 @param errors: error messages. stop() will record messages into this list.
519 @type errors: [str]
520 """
521 if errors is None:
522 errors = []
523 super(LocalProcess, self).stop(errors)
524 self.lock.acquire()
525 try:
526 try:
527 _logger.debug("process[%s].stop() starting", self.name)
528 if self.popen is None:
529 _logger.debug("process[%s].stop(): popen is None, nothing to kill")
530 return
531 if sys.platform in ['win32']:
532 self._stop_win32(errors)
533 else:
534 self._stop_unix(errors)
535 except:
536
537 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc())
538 finally:
539 self.stopped = True
540 self.lock.release()
541
542
543
545 """
546 Remove all instances of args that start with prefix. This is used
547 to remove args that were previously added (and are now being
548 regenerated due to respawning)
549 """
550 existing_args = [a for a in args if a.startswith(prefix)]
551 for a in existing_args:
552 args.remove(a)
553 return args
554