1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Local process implementation for running and monitoring nodes.
37 """
38
39 import os
40 import signal
41 import subprocess
42 import time
43 import traceback
44
45 import rospkg
46
47 from roslaunch.core import *
48
49 from roslaunch.node_args import create_local_process_args
50 from roslaunch.pmon import Process, FatalProcessLaunch
51
52 from rosmaster.master_api import NUM_WORKERS
53
54 import logging
55 _logger = logging.getLogger("roslaunch")
56
57 _TIMEOUT_SIGINT = 15.0
58 _TIMEOUT_SIGTERM = 2.0
59
60 _counter = 0
65
67 """
68 Launch a master
69 @param type_: name of master executable (currently just Master.ZENMASTER)
70 @type type_: str
71 @param ros_root: ROS_ROOT environment setting
72 @type ros_root: str
73 @param port: port to launch master on
74 @type port: int
75 @param num_workers: number of worker threads.
76 @type num_workers: int
77 @param timeout: socket timeout for connections.
78 @type timeout: float
79 @raise RLException: if type_ or port is invalid
80 """
81 if port < 1 or port > 65535:
82 raise RLException("invalid port assignment: %s"%port)
83
84 _logger.info("create_master_process: %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout)
85
86 master = type_
87
88 if type_ in [Master.ROSMASTER, Master.ZENMASTER]:
89 package = 'rosmaster'
90 args = [master, '--core', '-p', str(port), '-w', str(num_workers)]
91 if timeout is not None:
92 args += ['-t', str(timeout)]
93 else:
94 raise RLException("unknown master typ_: %s"%type_)
95
96 _logger.info("process[master]: launching with args [%s]"%args)
97 log_output = False
98 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True)
99
101 """
102 Factory for generating processes for launching local ROS
103 nodes. Also registers the process with the L{ProcessMonitor} so that
104 events can be generated when the process dies.
105
106 @param run_id: run_id of launch
107 @type run_id: str
108 @param node: node to launch. Node name must be assigned.
109 @type node: L{Node}
110 @param master_uri: API URI for master node
111 @type master_uri: str
112 @return: local process instance
113 @rtype: L{LocalProcess}
114 @raise NodeParamsException: If the node's parameters are improperly specific
115 """
116 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri)
117
118 machine = node.machine
119 if machine is None:
120 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type))
121 if not node.name:
122 raise ValueError("node name must be assigned")
123
124
125 env = setup_env(node, machine, master_uri)
126
127 if not node.name:
128 raise ValueError("node name must be assigned")
129
130
131
132
133 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter())
134 if name[0] == '/':
135 name = name[1:]
136
137 _logger.info('process[%s]: env[%s]', name, env)
138
139 args = create_local_process_args(node, machine)
140
141 _logger.info('process[%s]: args[%s]', name, args)
142
143
144 log_output = node.output != 'screen'
145 _logger.debug('process[%s]: returning LocalProcess wrapper')
146 return LocalProcess(run_id, node.package, name, args, env, log_output, \
147 respawn=node.respawn, respawn_delay=node.respawn_delay, \
148 required=node.required, cwd=node.cwd)
149
150
152 """
153 Process launched on local machine
154 """
155
156 - def __init__(self, run_id, package, name, args, env, log_output,
157 respawn=False, respawn_delay=0.0, required=False, cwd=None,
158 is_node=True):
159 """
160 @param run_id: unique run ID for this roslaunch. Used to
161 generate log directory location. run_id may be None if this
162 feature is not being used.
163 @type run_id: str
164 @param package: name of package process is part of
165 @type package: str
166 @param name: name of process
167 @type name: str
168 @param args: list of arguments to process
169 @type args: [str]
170 @param env: environment dictionary for process
171 @type env: {str : str}
172 @param log_output: if True, log output streams of process
173 @type log_output: bool
174 @param respawn: respawn process if it dies (default is False)
175 @type respawn: bool
176 @param respawn_delay: respawn process after a delay
177 @type respawn_delay: float
178 @param cwd: working directory of process, or None
179 @type cwd: str
180 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True
181 @type is_node: False
182 """
183 super(LocalProcess, self).__init__(package, name, args, env,
184 respawn, respawn_delay, required)
185 self.run_id = run_id
186 self.popen = None
187 self.log_output = log_output
188 self.started = False
189 self.stopped = False
190 self.cwd = cwd
191 self.log_dir = None
192 self.pid = -1
193 self.is_node = is_node
194
195
197 """
198 Get all data about this process in dictionary form
199 """
200 info = super(LocalProcess, self).get_info()
201 info['pid'] = self.pid
202 if self.run_id:
203 info['run_id'] = self.run_id
204 info['log_output'] = self.log_output
205 if self.cwd is not None:
206 info['cwd'] = self.cwd
207 return info
208
255
257 """
258 Start the process.
259
260 @raise FatalProcessLaunch: if process cannot be started and it
261 is not likely to ever succeed
262 """
263 super(LocalProcess, self).start()
264 try:
265 self.lock.acquire()
266 if self.started:
267 _logger.info("process[%s]: restarting os process", self.name)
268 else:
269 _logger.info("process[%s]: starting os process", self.name)
270 self.started = self.stopped = False
271
272 full_env = self.env
273
274
275 try:
276 logfileout, logfileerr = self._configure_logging()
277 except Exception as e:
278 _logger.error(traceback.format_exc())
279 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e)))
280
281
282
283 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE
284
285 if self.cwd == 'node':
286 cwd = os.path.dirname(self.args[0])
287 elif self.cwd == 'cwd':
288 cwd = os.getcwd()
289 elif self.cwd == 'ros-root':
290 cwd = get_ros_root()
291 else:
292 cwd = rospkg.get_ros_home()
293 if not os.path.exists(cwd):
294 try:
295 os.makedirs(cwd)
296 except OSError:
297
298 pass
299
300 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args)
301 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd)
302
303 try:
304 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid)
305 except OSError as e:
306 self.started = True
307 _logger.error("OSError(%d, %s)", e.errno, e.strerror)
308 if e.errno == 8:
309 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name)
310 elif e.errno == 2:
311 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run:
312
313 %s
314
315 Please make sure that all the executables in this command exist and have
316 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args)))
317 else:
318 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror))
319
320 self.started = True
321
322
323
324 poll_result = self.popen.poll()
325 if poll_result is None or poll_result == 0:
326 self.pid = self.popen.pid
327 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid))
328 return True
329 else:
330 printerrlog("failed to start local process: %s"%(' '.join(self.args)))
331 return False
332 finally:
333 self.lock.release()
334
336 return self.name.replace('/', '-')
337
339 """
340 @return: True if process is still running
341 @rtype: bool
342 """
343 if not self.started:
344 return True
345 if self.stopped or self.popen is None:
346 if self.time_of_death is None:
347 self.time_of_death = time.time()
348 return False
349 self.exit_code = self.popen.poll()
350 if self.exit_code is not None:
351 if self.time_of_death is None:
352 self.time_of_death = time.time()
353 return False
354 return True
355
357 """
358 @return: human-readable description of exit state
359 @rtype: str
360 """
361 if self.exit_code is None:
362 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args))
363 elif self.exit_code != 0:
364 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args))
365 else:
366 output = 'process has finished cleanly'
367
368 if self.log_dir:
369
370 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name()))
371 return output
372
374 """
375 UNIX implementation of process killing
376
377 @param errors: error messages. stop() will record messages into this list.
378 @type errors: [str]
379 """
380 self.exit_code = self.popen.poll()
381 if self.exit_code is not None:
382 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
383
384 self.popen = None
385 self.stopped = True
386 return
387
388 pid = self.popen.pid
389 pgid = os.getpgid(pid)
390 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid)
391
392 try:
393
394 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid)
395 os.killpg(pgid, signal.SIGINT)
396 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid)
397 timeout_t = time.time() + _TIMEOUT_SIGINT
398 retcode = self.popen.poll()
399 while time.time() < timeout_t and retcode is None:
400 time.sleep(0.1)
401 retcode = self.popen.poll()
402
403 if retcode is None:
404 printerrlog("[%s] escalating to SIGTERM"%self.name)
405 timeout_t = time.time() + _TIMEOUT_SIGTERM
406 os.killpg(pgid, signal.SIGTERM)
407 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid))
408 retcode = self.popen.poll()
409 while time.time() < timeout_t and retcode is None:
410 time.sleep(0.2)
411 _logger.debug('poll for retcode')
412 retcode = self.popen.poll()
413 if retcode is None:
414 printerrlog("[%s] escalating to SIGKILL"%self.name)
415 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
416 try:
417 os.killpg(pgid, signal.SIGKILL)
418 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid))
419
420
421
422 _logger.info("process[%s]: sent SIGKILL", self.name)
423 except OSError as e:
424 if e.args[0] == 3:
425 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
426 else:
427 printerrlog("errors shutting down [%s], see log for details"%self.name)
428 _logger.error(traceback.format_exc())
429 else:
430 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
431 else:
432 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
433
434 finally:
435 self.popen = None
436
438 """
439 Win32 implementation of process killing. In part, refer to
440
441 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
442
443 Note that it doesn't work as completely as _stop_unix as it can't utilise
444 group id's. This means that any program which forks children underneath it
445 won't get caught by this kill mechanism.
446
447 @param errors: error messages. stop() will record messages into this list.
448 @type errors: [str]
449 """
450 self.exit_code = self.popen.poll()
451 if self.exit_code is not None:
452 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
453 self.popen = None
454 self.stopped = True
455 return
456
457 pid = self.popen.pid
458 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid)
459
460 try:
461
462 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid)
463 os.kill(pid, signal.SIGINT)
464 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid)
465 timeout_t = time.time() + _TIMEOUT_SIGINT
466 retcode = self.popen.poll()
467 while time.time() < timeout_t and retcode is None:
468 time.sleep(0.1)
469 retcode = self.popen.poll()
470
471 if retcode is None:
472 printerrlog("[%s] escalating to SIGTERM"%self.name)
473 timeout_t = time.time() + _TIMEOUT_SIGTERM
474 os.killpg(pid, signal.SIGTERM)
475 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid))
476 retcode = self.popen.poll()
477 while time.time() < timeout_t and retcode is None:
478 time.sleep(0.2)
479 _logger.debug('poll for retcode')
480 retcode = self.popen.poll()
481 if retcode is None:
482 printerrlog("[%s] escalating to SIGKILL"%self.name)
483 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
484 try:
485 os.killpg(pid, signal.SIGKILL)
486 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid))
487
488
489
490 _logger.info("process[%s]: sent SIGKILL", self.name)
491 except OSError as e:
492 if e.args[0] == 3:
493 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
494 else:
495 printerrlog("errors shutting down [%s], see log for details"%self.name)
496 _logger.error(traceback.format_exc())
497 else:
498 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
499 else:
500 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
501 finally:
502 self.popen = None
503
504 - def stop(self, errors=None):
505 """
506 Stop the process. Record any significant error messages in the errors parameter
507
508 @param errors: error messages. stop() will record messages into this list.
509 @type errors: [str]
510 """
511 if errors is None:
512 errors = []
513 super(LocalProcess, self).stop(errors)
514 self.lock.acquire()
515 try:
516 try:
517 _logger.debug("process[%s].stop() starting", self.name)
518 if self.popen is None:
519 _logger.debug("process[%s].stop(): popen is None, nothing to kill")
520 return
521 if sys.platform in ['win32']:
522 self._stop_win32(errors)
523 else:
524 self._stop_unix(errors)
525 except:
526
527 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc())
528 finally:
529 self.stopped = True
530 self.lock.release()
531
532
533
535 """
536 Remove all instances of args that start with prefix. This is used
537 to remove args that were previously added (and are now being
538 regenerated due to respawning)
539 """
540 existing_args = [a for a in args if a.startswith(prefix)]
541 for a in existing_args:
542 args.remove(a)
543 return args
544