1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Local process implementation for running and monitoring nodes.
37 """
38
39 import os
40 import signal
41 import subprocess
42 import time
43 import traceback
44
45 import rospkg
46
47 from roslaunch.core import *
48
49 from roslaunch.node_args import create_local_process_args
50 from roslaunch.pmon import Process, FatalProcessLaunch
51
52 from rosmaster.master_api import NUM_WORKERS
53
54 import logging
55 _logger = logging.getLogger("roslaunch")
56
57 _TIMEOUT_SIGINT = 15.0
58 _TIMEOUT_SIGTERM = 2.0
59
60 _counter = 0
65
67 """
68 Launch a master
69 @param type_: name of master executable (currently just Master.ZENMASTER)
70 @type type_: str
71 @param ros_root: ROS_ROOT environment setting
72 @type ros_root: str
73 @param port: port to launch master on
74 @type port: int
75 @param num_workers: number of worker threads.
76 @type num_workers: int
77 @param timeout: socket timeout for connections.
78 @type timeout: float
79 @raise RLException: if type_ or port is invalid
80 """
81 if port < 1 or port > 65535:
82 raise RLException("invalid port assignment: %s"%port)
83
84 _logger.info("create_master_process: %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout)
85
86 master = type_
87
88 if type_ in [Master.ROSMASTER, Master.ZENMASTER]:
89 package = 'rosmaster'
90 args = [master, '--core', '-p', str(port), '-w', str(num_workers)]
91 if timeout is not None:
92 args += ['-t', str(timeout)]
93 else:
94 raise RLException("unknown master typ_: %s"%type_)
95
96 _logger.info("process[master]: launching with args [%s]"%args)
97 log_output = False
98 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None)
99
101 """
102 Factory for generating processes for launching local ROS
103 nodes. Also registers the process with the L{ProcessMonitor} so that
104 events can be generated when the process dies.
105
106 @param run_id: run_id of launch
107 @type run_id: str
108 @param node: node to launch. Node name must be assigned.
109 @type node: L{Node}
110 @param master_uri: API URI for master node
111 @type master_uri: str
112 @return: local process instance
113 @rtype: L{LocalProcess}
114 @raise NodeParamsException: If the node's parameters are improperly specific
115 """
116 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri)
117
118 machine = node.machine
119 if machine is None:
120 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type))
121 if not node.name:
122 raise ValueError("node name must be assigned")
123
124
125 env = setup_env(node, machine, master_uri)
126
127 if not node.name:
128 raise ValueError("node name must be assigned")
129
130
131
132
133 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter())
134 if name[0] == '/':
135 name = name[1:]
136
137 _logger.info('process[%s]: env[%s]', name, env)
138
139 args = create_local_process_args(node, machine)
140
141 _logger.info('process[%s]: args[%s]', name, args)
142
143
144 log_output = node.output != 'screen'
145 _logger.debug('process[%s]: returning LocalProcess wrapper')
146 return LocalProcess(run_id, node.package, name, args, env, log_output, \
147 respawn=node.respawn, respawn_delay=node.respawn_delay, \
148 required=node.required, cwd=node.cwd)
149
150
152 """
153 Process launched on local machine
154 """
155
156 - def __init__(self, run_id, package, name, args, env, log_output,
157 respawn=False, respawn_delay=0.0, required=False, cwd=None,
158 is_node=True):
159 """
160 @param run_id: unique run ID for this roslaunch. Used to
161 generate log directory location. run_id may be None if this
162 feature is not being used.
163 @type run_id: str
164 @param package: name of package process is part of
165 @type package: str
166 @param name: name of process
167 @type name: str
168 @param args: list of arguments to process
169 @type args: [str]
170 @param env: environment dictionary for process
171 @type env: {str : str}
172 @param log_output: if True, log output streams of process
173 @type log_output: bool
174 @param respawn: respawn process if it dies (default is False)
175 @type respawn: bool
176 @param respawn_delay: respawn process after a delay
177 @type respawn_delay: float
178 @param cwd: working directory of process, or None
179 @type cwd: str
180 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True
181 @type is_node: False
182 """
183 super(LocalProcess, self).__init__(package, name, args, env,
184 respawn, respawn_delay, required)
185 self.run_id = run_id
186 self.popen = None
187 self.log_output = log_output
188 self.started = False
189 self.stopped = False
190 self.cwd = cwd
191 self.log_dir = None
192 self.pid = -1
193 self.is_node = is_node
194
195
197 """
198 Get all data about this process in dictionary form
199 """
200 info = super(LocalProcess, self).get_info()
201 info['pid'] = self.pid
202 if self.run_id:
203 info['run_id'] = self.run_id
204 info['log_output'] = self.log_output
205 if self.cwd is not None:
206 info['cwd'] = self.cwd
207 return info
208
255
257 """
258 Start the process.
259
260 @raise FatalProcessLaunch: if process cannot be started and it
261 is not likely to ever succeed
262 """
263 super(LocalProcess, self).start()
264 try:
265 self.lock.acquire()
266 if self.started:
267 _logger.info("process[%s]: restarting os process", self.name)
268 else:
269 _logger.info("process[%s]: starting os process", self.name)
270 self.started = self.stopped = False
271
272 full_env = self.env
273
274
275 try:
276 logfileout, logfileerr = self._configure_logging()
277 except Exception as e:
278 _logger.error(traceback.format_exc())
279 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e)))
280
281
282
283 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE
284
285 if self.cwd == 'node':
286 cwd = os.path.dirname(self.args[0])
287 elif self.cwd == 'cwd':
288 cwd = os.getcwd()
289 elif self.cwd == 'ros-root':
290 cwd = get_ros_root()
291 else:
292 cwd = rospkg.get_ros_home()
293
294 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args)
295 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd)
296
297 try:
298 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid)
299 except OSError as e:
300 self.started = True
301 _logger.error("OSError(%d, %s)", e.errno, e.strerror)
302 if e.errno == 8:
303 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name)
304 elif e.errno == 2:
305 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run:
306
307 %s
308
309 Please make sure that all the executables in this command exist and have
310 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args)))
311 else:
312 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror))
313
314 self.started = True
315
316
317
318 poll_result = self.popen.poll()
319 if poll_result is None or poll_result == 0:
320 self.pid = self.popen.pid
321 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid))
322 return True
323 else:
324 printerrlog("failed to start local process: %s"%(' '.join(self.args)))
325 return False
326 finally:
327 self.lock.release()
328
330 return self.name.replace('/', '-')
331
333 """
334 @return: True if process is still running
335 @rtype: bool
336 """
337 if not self.started:
338 return True
339 if self.stopped or self.popen is None:
340 if self.time_of_death is None:
341 self.time_of_death = time.time()
342 return False
343 self.exit_code = self.popen.poll()
344 if self.exit_code is not None:
345 if self.time_of_death is None:
346 self.time_of_death = time.time()
347 return False
348 return True
349
351 """
352 @return: human-readable description of exit state
353 @rtype: str
354 """
355 if self.exit_code is None:
356 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args))
357 elif self.exit_code != 0:
358 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args))
359 else:
360 output = 'process has finished cleanly'
361
362 if self.log_dir:
363
364 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name()))
365 return output
366
368 """
369 UNIX implementation of process killing
370
371 @param errors: error messages. stop() will record messages into this list.
372 @type errors: [str]
373 """
374 self.exit_code = self.popen.poll()
375 if self.exit_code is not None:
376 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
377
378 self.popen = None
379 self.stopped = True
380 return
381
382 pid = self.popen.pid
383 pgid = os.getpgid(pid)
384 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid)
385
386 try:
387
388 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid)
389 os.killpg(pgid, signal.SIGINT)
390 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid)
391 timeout_t = time.time() + _TIMEOUT_SIGINT
392 retcode = self.popen.poll()
393 while time.time() < timeout_t and retcode is None:
394 time.sleep(0.1)
395 retcode = self.popen.poll()
396
397 if retcode is None:
398 printerrlog("[%s] escalating to SIGTERM"%self.name)
399 timeout_t = time.time() + _TIMEOUT_SIGTERM
400 os.killpg(pgid, signal.SIGTERM)
401 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid))
402 retcode = self.popen.poll()
403 while time.time() < timeout_t and retcode is None:
404 time.sleep(0.2)
405 _logger.debug('poll for retcode')
406 retcode = self.popen.poll()
407 if retcode is None:
408 printerrlog("[%s] escalating to SIGKILL"%self.name)
409 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
410 try:
411 os.killpg(pgid, signal.SIGKILL)
412 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid))
413
414
415
416 _logger.info("process[%s]: sent SIGKILL", self.name)
417 except OSError as e:
418 if e.args[0] == 3:
419 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
420 else:
421 printerrlog("errors shutting down [%s], see log for details"%self.name)
422 _logger.error(traceback.format_exc())
423 else:
424 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
425 else:
426 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
427
428 finally:
429 self.popen = None
430
432 """
433 Win32 implementation of process killing. In part, refer to
434
435 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
436
437 Note that it doesn't work as completely as _stop_unix as it can't utilise
438 group id's. This means that any program which forks children underneath it
439 won't get caught by this kill mechanism.
440
441 @param errors: error messages. stop() will record messages into this list.
442 @type errors: [str]
443 """
444 self.exit_code = self.popen.poll()
445 if self.exit_code is not None:
446 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
447 self.popen = None
448 self.stopped = True
449 return
450
451 pid = self.popen.pid
452 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid)
453
454 try:
455
456 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid)
457 os.kill(pid, signal.SIGINT)
458 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid)
459 timeout_t = time.time() + _TIMEOUT_SIGINT
460 retcode = self.popen.poll()
461 while time.time() < timeout_t and retcode is None:
462 time.sleep(0.1)
463 retcode = self.popen.poll()
464
465 if retcode is None:
466 printerrlog("[%s] escalating to SIGTERM"%self.name)
467 timeout_t = time.time() + _TIMEOUT_SIGTERM
468 os.killpg(pid, signal.SIGTERM)
469 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid))
470 retcode = self.popen.poll()
471 while time.time() < timeout_t and retcode is None:
472 time.sleep(0.2)
473 _logger.debug('poll for retcode')
474 retcode = self.popen.poll()
475 if retcode is None:
476 printerrlog("[%s] escalating to SIGKILL"%self.name)
477 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
478 try:
479 os.killpg(pid, signal.SIGKILL)
480 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid))
481
482
483
484 _logger.info("process[%s]: sent SIGKILL", self.name)
485 except OSError as e:
486 if e.args[0] == 3:
487 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
488 else:
489 printerrlog("errors shutting down [%s], see log for details"%self.name)
490 _logger.error(traceback.format_exc())
491 else:
492 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
493 else:
494 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
495 finally:
496 self.popen = None
497
498 - def stop(self, errors=None):
499 """
500 Stop the process. Record any significant error messages in the errors parameter
501
502 @param errors: error messages. stop() will record messages into this list.
503 @type errors: [str]
504 """
505 if errors is None:
506 errors = []
507 super(LocalProcess, self).stop(errors)
508 self.lock.acquire()
509 try:
510 try:
511 _logger.debug("process[%s].stop() starting", self.name)
512 if self.popen is None:
513 _logger.debug("process[%s].stop(): popen is None, nothing to kill")
514 return
515 if sys.platform in ['win32']:
516 self._stop_win32(errors)
517 else:
518 self._stop_unix(errors)
519 except:
520
521 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc())
522 finally:
523 self.stopped = True
524 self.lock.release()
525
526
527
529 """
530 Remove all instances of args that start with prefix. This is used
531 to remove args that were previously added (and are now being
532 regenerated due to respawning)
533 """
534 existing_args = [a for a in args if a.startswith(prefix)]
535 for a in existing_args:
536 args.remove(a)
537 return args
538