1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Local process implementation for running and monitoring nodes.
37 """
38
39 import errno
40 import os
41 import signal
42 import subprocess
43 import time
44 import traceback
45
46 import rospkg
47
48 from roslaunch.core import *
49
50 from roslaunch.node_args import create_local_process_args
51 from roslaunch.pmon import Process, FatalProcessLaunch
52
53 from rosmaster.master_api import NUM_WORKERS
54
55 import logging
56 _logger = logging.getLogger("roslaunch")
57
58 _TIMEOUT_SIGINT = 15.0
59 _TIMEOUT_SIGTERM = 2.0
60
61 _counter = 0
66
68 """
69 Launch a master
70 @param type_: name of master executable (currently just Master.ZENMASTER)
71 @type type_: str
72 @param ros_root: ROS_ROOT environment setting
73 @type ros_root: str
74 @param port: port to launch master on
75 @type port: int
76 @param num_workers: number of worker threads.
77 @type num_workers: int
78 @param timeout: socket timeout for connections.
79 @type timeout: float
80 @raise RLException: if type_ or port is invalid
81 @param master_logger_level: rosmaster.master logger debug level
82 @type master_logger_level=: str or False
83 """
84 if port < 1 or port > 65535:
85 raise RLException("invalid port assignment: %s"%port)
86
87 _logger.info("create_master_process: %s, %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout, master_logger_level)
88
89 master = type_
90
91 if type_ in [Master.ROSMASTER, Master.ZENMASTER]:
92 package = 'rosmaster'
93 args = [master, '--core', '-p', str(port), '-w', str(num_workers)]
94 if timeout is not None:
95 args += ['-t', str(timeout)]
96 if master_logger_level:
97 args += ['--master-logger-level', str(master_logger_level)]
98 else:
99 raise RLException("unknown master typ_: %s"%type_)
100
101 _logger.info("process[master]: launching with args [%s]"%args)
102 log_output = False
103 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True)
104
106 """
107 Factory for generating processes for launching local ROS
108 nodes. Also registers the process with the L{ProcessMonitor} so that
109 events can be generated when the process dies.
110
111 @param run_id: run_id of launch
112 @type run_id: str
113 @param node: node to launch. Node name must be assigned.
114 @type node: L{Node}
115 @param master_uri: API URI for master node
116 @type master_uri: str
117 @return: local process instance
118 @rtype: L{LocalProcess}
119 @raise NodeParamsException: If the node's parameters are improperly specific
120 """
121 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri)
122
123 machine = node.machine
124 if machine is None:
125 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type))
126 if not node.name:
127 raise ValueError("node name must be assigned")
128
129
130 env = setup_env(node, machine, master_uri)
131
132 if not node.name:
133 raise ValueError("node name must be assigned")
134
135
136
137
138 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter())
139 if name[0] == '/':
140 name = name[1:]
141
142 _logger.info('process[%s]: env[%s]', name, env)
143
144 args = create_local_process_args(node, machine)
145
146 _logger.info('process[%s]: args[%s]', name, args)
147
148
149 log_output = node.output != 'screen'
150 _logger.debug('process[%s]: returning LocalProcess wrapper')
151 return LocalProcess(run_id, node.package, name, args, env, log_output, \
152 respawn=node.respawn, respawn_delay=node.respawn_delay, \
153 required=node.required, cwd=node.cwd)
154
155
157 """
158 Process launched on local machine
159 """
160
161 - def __init__(self, run_id, package, name, args, env, log_output,
162 respawn=False, respawn_delay=0.0, required=False, cwd=None,
163 is_node=True):
164 """
165 @param run_id: unique run ID for this roslaunch. Used to
166 generate log directory location. run_id may be None if this
167 feature is not being used.
168 @type run_id: str
169 @param package: name of package process is part of
170 @type package: str
171 @param name: name of process
172 @type name: str
173 @param args: list of arguments to process
174 @type args: [str]
175 @param env: environment dictionary for process
176 @type env: {str : str}
177 @param log_output: if True, log output streams of process
178 @type log_output: bool
179 @param respawn: respawn process if it dies (default is False)
180 @type respawn: bool
181 @param respawn_delay: respawn process after a delay
182 @type respawn_delay: float
183 @param cwd: working directory of process, or None
184 @type cwd: str
185 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True
186 @type is_node: False
187 """
188 super(LocalProcess, self).__init__(package, name, args, env,
189 respawn, respawn_delay, required)
190 self.run_id = run_id
191 self.popen = None
192 self.log_output = log_output
193 self.started = False
194 self.stopped = False
195 self.cwd = cwd
196 self.log_dir = None
197 self.pid = -1
198 self.is_node = is_node
199
200
202 """
203 Get all data about this process in dictionary form
204 """
205 info = super(LocalProcess, self).get_info()
206 info['pid'] = self.pid
207 if self.run_id:
208 info['run_id'] = self.run_id
209 info['log_output'] = self.log_output
210 if self.cwd is not None:
211 info['cwd'] = self.cwd
212 return info
213
260
262 """
263 Start the process.
264
265 @raise FatalProcessLaunch: if process cannot be started and it
266 is not likely to ever succeed
267 """
268 super(LocalProcess, self).start()
269 try:
270 self.lock.acquire()
271 if self.started:
272 _logger.info("process[%s]: restarting os process", self.name)
273 else:
274 _logger.info("process[%s]: starting os process", self.name)
275 self.started = self.stopped = False
276
277 full_env = self.env
278
279
280 try:
281 logfileout, logfileerr = self._configure_logging()
282 except Exception as e:
283 _logger.error(traceback.format_exc())
284 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e)))
285
286
287
288 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE
289
290 if self.cwd == 'node':
291 cwd = os.path.dirname(self.args[0])
292 elif self.cwd == 'cwd':
293 cwd = os.getcwd()
294 elif self.cwd == 'ros-root':
295 cwd = get_ros_root()
296 else:
297 cwd = rospkg.get_ros_home()
298 if not os.path.exists(cwd):
299 try:
300 os.makedirs(cwd)
301 except OSError:
302
303 pass
304
305 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args)
306 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd)
307
308 try:
309 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid)
310 except OSError as e:
311 self.started = True
312 _logger.error("OSError(%d, %s)", e.errno, e.strerror)
313 if e.errno == errno.ENOEXEC:
314 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name)
315 elif e.errno == errno.ENOENT:
316 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run:
317
318 %s
319
320 Please make sure that all the executables in this command exist and have
321 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args)))
322 else:
323 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror))
324
325 self.started = True
326
327
328
329 poll_result = self.popen.poll()
330 if poll_result is None or poll_result == 0:
331 self.pid = self.popen.pid
332 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid))
333 return True
334 else:
335 printerrlog("failed to start local process: %s"%(' '.join(self.args)))
336 return False
337 finally:
338 self.lock.release()
339
341 return self.name.replace('/', '-')
342
344 """
345 @return: True if process is still running
346 @rtype: bool
347 """
348 if not self.started:
349 return True
350 if self.stopped or self.popen is None:
351 if self.time_of_death is None:
352 self.time_of_death = time.time()
353 return False
354 self.exit_code = self.popen.poll()
355 if self.exit_code is not None:
356 if self.time_of_death is None:
357 self.time_of_death = time.time()
358 return False
359 return True
360
362 """
363 @return: human-readable description of exit state
364 @rtype: str
365 """
366 if self.exit_code is None:
367 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args))
368 elif self.exit_code != 0:
369 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args))
370 else:
371 output = 'process has finished cleanly'
372
373 if self.log_dir:
374
375 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name()))
376 return output
377
379 """
380 UNIX implementation of process killing
381
382 @param errors: error messages. stop() will record messages into this list.
383 @type errors: [str]
384 """
385 self.exit_code = self.popen.poll()
386 if self.exit_code is not None:
387 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
388
389 self.popen = None
390 self.stopped = True
391 return
392
393 pid = self.popen.pid
394 pgid = os.getpgid(pid)
395 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid)
396
397 try:
398
399 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid)
400 os.killpg(pgid, signal.SIGINT)
401 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid)
402 timeout_t = time.time() + _TIMEOUT_SIGINT
403 retcode = self.popen.poll()
404 while time.time() < timeout_t and retcode is None:
405 time.sleep(0.1)
406 retcode = self.popen.poll()
407
408 if retcode is None:
409 printerrlog("[%s] escalating to SIGTERM"%self.name)
410 timeout_t = time.time() + _TIMEOUT_SIGTERM
411 os.killpg(pgid, signal.SIGTERM)
412 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid))
413 retcode = self.popen.poll()
414 while time.time() < timeout_t and retcode is None:
415 time.sleep(0.2)
416 _logger.debug('poll for retcode')
417 retcode = self.popen.poll()
418 if retcode is None:
419 printerrlog("[%s] escalating to SIGKILL"%self.name)
420 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
421 try:
422 os.killpg(pgid, signal.SIGKILL)
423 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid))
424
425
426
427 _logger.info("process[%s]: sent SIGKILL", self.name)
428 except OSError as e:
429 if e.args[0] == 3:
430 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
431 else:
432 printerrlog("errors shutting down [%s], see log for details"%self.name)
433 _logger.error(traceback.format_exc())
434 else:
435 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
436 else:
437 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
438
439 finally:
440 self.popen = None
441
443 """
444 Win32 implementation of process killing. In part, refer to
445
446 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
447
448 Note that it doesn't work as completely as _stop_unix as it can't utilise
449 group id's. This means that any program which forks children underneath it
450 won't get caught by this kill mechanism.
451
452 @param errors: error messages. stop() will record messages into this list.
453 @type errors: [str]
454 """
455 self.exit_code = self.popen.poll()
456 if self.exit_code is not None:
457 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
458 self.popen = None
459 self.stopped = True
460 return
461
462 pid = self.popen.pid
463 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid)
464
465 try:
466
467 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid)
468 os.kill(pid, signal.SIGINT)
469 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid)
470 timeout_t = time.time() + _TIMEOUT_SIGINT
471 retcode = self.popen.poll()
472 while time.time() < timeout_t and retcode is None:
473 time.sleep(0.1)
474 retcode = self.popen.poll()
475
476 if retcode is None:
477 printerrlog("[%s] escalating to SIGTERM"%self.name)
478 timeout_t = time.time() + _TIMEOUT_SIGTERM
479 os.killpg(pid, signal.SIGTERM)
480 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid))
481 retcode = self.popen.poll()
482 while time.time() < timeout_t and retcode is None:
483 time.sleep(0.2)
484 _logger.debug('poll for retcode')
485 retcode = self.popen.poll()
486 if retcode is None:
487 printerrlog("[%s] escalating to SIGKILL"%self.name)
488 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
489 try:
490 os.killpg(pid, signal.SIGKILL)
491 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid))
492
493
494
495 _logger.info("process[%s]: sent SIGKILL", self.name)
496 except OSError as e:
497 if e.args[0] == 3:
498 printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
499 else:
500 printerrlog("errors shutting down [%s], see log for details"%self.name)
501 _logger.error(traceback.format_exc())
502 else:
503 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode)
504 else:
505 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode)
506 finally:
507 self.popen = None
508
509 - def stop(self, errors=None):
510 """
511 Stop the process. Record any significant error messages in the errors parameter
512
513 @param errors: error messages. stop() will record messages into this list.
514 @type errors: [str]
515 """
516 if errors is None:
517 errors = []
518 super(LocalProcess, self).stop(errors)
519 self.lock.acquire()
520 try:
521 try:
522 _logger.debug("process[%s].stop() starting", self.name)
523 if self.popen is None:
524 _logger.debug("process[%s].stop(): popen is None, nothing to kill")
525 return
526 if sys.platform in ['win32']:
527 self._stop_win32(errors)
528 else:
529 self._stop_unix(errors)
530 except:
531
532 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc())
533 finally:
534 self.stopped = True
535 self.lock.release()
536
537
538
540 """
541 Remove all instances of args that start with prefix. This is used
542 to remove args that were previously added (and are now being
543 regenerated due to respawning)
544 """
545 existing_args = [a for a in args if a.startswith(prefix)]
546 for a in existing_args:
547 args.remove(a)
548 return args
549