1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 Top-level implementation of launching processes. Coordinates
39 lower-level libraries.
40 """
41
42 import os
43 import logging
44 import subprocess
45 import sys
46 import time
47 import traceback
48
49 import rosgraph
50 import rosgraph.names
51 import rosgraph.network
52
53 from roslaunch.core import *
54
55 from roslaunch.nodeprocess import create_master_process, create_node_process, DEFAULT_TIMEOUT_SIGINT, DEFAULT_TIMEOUT_SIGTERM
56 from roslaunch.pmon import start_process_monitor, ProcessListener
57
58 from roslaunch.rlutil import update_terminal_name
59
60 from rosmaster.master_api import NUM_WORKERS
61
62 _TIMEOUT_MASTER_START = 10.0
63 _TIMEOUT_MASTER_STOP = 10.0
64
65 _ID = '/roslaunch'
66
68
70 """
71 Validate the configuration of a master we are about to launch. Ths
72 validation already assumes that no existing master is running at
73 this configuration and merely checks configuration for a new
74 launch.
75 """
76
77
78
79
80
81
82
83 if not rosgraph.network.is_local_address(m.get_host()):
84
85
86 if is_core:
87
88
89
90 try:
91 reverse_ips = [host[4][0] for host in socket.getaddrinfo(m.get_host(), 0, 0, 0, socket.SOL_TCP)]
92 local_addrs = rosgraph.network.get_local_addresses()
93 printerrlog("""WARNING: IP addresses %s for local hostname '%s' do not appear to match
94 any local IP address (%s). Your ROS nodes may fail to communicate.
95
96 Please use ROS_IP to set the correct IP address to use."""%(','.join(reverse_ips), m.get_host(), ','.join(local_addrs)))
97 except:
98 printerrlog("""WARNING: local hostname '%s' does not map to an IP address.
99 Your ROS nodes may fail to communicate.
100
101 Please use ROS_IP to set the correct IP address to use."""%(m.get_host()))
102
103 else:
104
105 raise RLException("ERROR: unable to contact ROS master at [%s]"%(m.uri))
106
107 if is_core and not is_rostest:
108
109
110 env_uri = rosgraph.get_master_uri()
111 env_host, env_port = rosgraph.network.parse_http_host_and_port(env_uri)
112
113 if not rosgraph.network.is_local_address(env_host):
114
115 printerrlog("WARNING: ROS_MASTER_URI [%s] host is not set to this machine"%(env_uri))
116 elif env_port != m.get_port():
117
118 printerrlog("WARNING: ROS_MASTER_URI port [%s] does not match this roscore [%s]"%(env_port, m.get_port()))
119
121 splits = [s for s in p.split(rosgraph.names.SEP) if s]
122 curr =rosgraph.names.SEP
123 parents = [curr]
124 for s in splits[:-1]:
125 next_ = curr + s + rosgraph.names.SEP
126 parents.append(next_)
127 curr = next_
128 return parents
129
131 """
132 Reduce clear_params such that only the highest-level namespaces
133 are represented for overlapping namespaces. e.g. if both /foo/ and
134 /foo/bar are present, return just /foo/.
135
136 @param params: paremter namees
137 @type params: [str]
138 @return: unified parameters
139 @rtype: [str]
140 """
141
142
143
144 canon_params = []
145 for p in params:
146 if not p[-1] == rosgraph.names.SEP:
147 p += rosgraph.names.SEP
148 if not p in canon_params:
149 canon_params.append(p)
150
151 clear_params = canon_params[:]
152 for p in canon_params:
153 for parent in _all_namespace_parents(p):
154 if parent in canon_params and p in clear_params and p != '/':
155 clear_params.remove(p)
156 return clear_params
157
159 """
160 Utility function to strip illegal characters from hostname. This
161 is implemented to be correct, not efficient."""
162
163
164 fixed = 'host_'
165
166 hostname = hostname.lower()
167 for c in hostname:
168
169 if (c >= 'a' and c <= 'z') or \
170 (c >= '0' and c <= '9'):
171 fixed+=c
172 else:
173 fixed+='_'
174 return fixed
175
177 """
178 Helper class to manage distributing process events. It functions as
179 a higher level aggregator of events. In particular, events about
180 remote and local processes require different mechanisms for being
181 caught and reported.
182 """
184 self.process_listeners = []
185
187 """
188 Add listener to list of listeners. Not threadsafe.
189 @param l: listener
190 @type l: L{ProcessListener}
191 """
192 self.process_listeners.append(l)
193
195 """
196 ProcessListener callback
197 """
198 for l in self.process_listeners:
199 try:
200 l.process_died(process_name, exit_code)
201 except Exception as e:
202 logging.getLogger('roslaunch').error(traceback.format_exc())
203
205 """
206 Listener interface for events related to ROSLaunch.
207 ROSLaunchListener is currently identical to the lower-level
208 L{roslaunch.pmon.ProcessListener} interface, but is separate for
209 architectural reasons. The lower-level
210 L{roslaunch.pmon.ProcessMonitor} does not provide events about
211 remotely running processes.
212 """
213
215 """
216 Notifies listener that process has died. This callback only
217 occurs for processes that die during normal process monitor
218 execution -- processes that are forcibly killed during
219 L{roslaunch.pmon.ProcessMonitor} shutdown are not reported.
220 @param process_name: name of process
221 @type process_name: str
222 @param exit_code int: exit code of process. If None, it means
223 that L{roslaunch.pmon.ProcessMonitor} was unable to determine an exit code.
224 @type exit_code: int
225 """
226 pass
227
229 """
230 Runs a roslaunch. The normal sequence of API calls is L{launch()}
231 followed by L{spin()}. An external thread can call L{stop()}; otherwise
232 the runner will block until an exit signal. Another usage is to
233 call L{launch()} followed by repeated calls to L{spin_once()}. This usage
234 allows the main thread to continue to do work while processes are
235 monitored.
236 """
237
238 - def __init__(self, run_id, config, server_uri=None, pmon=None, is_core=False, remote_runner=None, is_child=False, is_rostest=False, num_workers=NUM_WORKERS, timeout=None,
239 master_logger_level=False, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
240 """
241 @param run_id: /run_id for this launch. If the core is not
242 running, this value will be used to initialize /run_id. If
243 the core is already running, this value will be checked
244 against the value stored on the core. L{ROSLaunchRunner} will
245 fail during L{launch()} if they do not match.
246 @type run_id: str
247 @param config: roslauch instance to run
248 @type config: L{ROSLaunchConfig}
249 @param server_uri: XML-RPC URI of roslaunch server.
250 @type server_uri: str
251 @param pmon: optionally override the process
252 monitor the runner uses for starting and tracking processes
253 @type pmon: L{ProcessMonitor}
254
255 @param is_core: if True, this runner is a roscore
256 instance. This affects the error behavior if a master is
257 already running -- aborts if is_core is True and a core is
258 detected.
259 @type is_core: bool
260 @param remote_runner: remote roslaunch process runner
261 @param is_rostest: if True, this runner is a rostest
262 instance. This affects certain validation checks.
263 @type is_rostest: bool
264 @param num_workers: If this is the core, the number of worker-threads to use.
265 @type num_workers: int
266 @param timeout: If this is the core, the socket-timeout to use.
267 @type timeout: Float or None
268 @param master_logger_level: Specify roscore's rosmaster.master logger level, use default if it is False.
269 @type master_logger_level: str or False
270 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds).
271 @type sigint_timeout: float
272 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds).
273 @type sigterm_timeout: float
274 @raise RLException: If sigint_timeout or sigterm_timeout are nonpositive.
275 """
276 if run_id is None:
277 raise RLException("run_id is None")
278 if sigint_timeout <= 0:
279 raise RLException("sigint_timeout must be a positive number, received %f" % sigint_timeout)
280 if sigterm_timeout <= 0:
281 raise RLException("sigterm_timeout must be a positive number, received %f" % sigterm_timeout)
282
283 self.run_id = run_id
284
285
286
287
288
289
290 self.config = config
291 self.server_uri = server_uri
292 self.is_child = is_child
293 self.is_core = is_core
294 self.is_rostest = is_rostest
295 self.num_workers = num_workers
296 self.timeout = timeout
297 self.master_logger_level = master_logger_level
298 self.logger = logging.getLogger('roslaunch')
299 self.pm = pmon or start_process_monitor()
300 self.sigint_timeout = sigint_timeout
301 self.sigterm_timeout = sigterm_timeout
302
303
304
305
306 self.listeners = _ROSLaunchListeners()
307 if self.pm is None:
308 raise RLException("unable to initialize roslaunch process monitor")
309 if self.pm.is_shutdown:
310 raise RLException("bad roslaunch process monitor initialization: process monitor is already dead")
311
312 self.pm.add_process_listener(self.listeners)
313
314 self.remote_runner = remote_runner
315
317 """
318 Add listener to list of listeners. Not threadsafe. Must be
319 called before processes started.
320 @param l: listener
321 @type l: L{ProcessListener}
322 """
323 self.listeners.add_process_listener(l)
324
326 """
327 Load parameters onto the parameter server
328 """
329 self.logger.info("load_parameters starting ...")
330 config = self.config
331 param_server = config.master.get()
332 p = None
333 try:
334
335 param_server_multi = config.master.get_multi()
336
337
338
339 for p in _unify_clear_params(config.clear_params):
340 if param_server.hasParam(_ID, p)[2]:
341
342 param_server_multi.deleteParam(_ID, p)
343 r = param_server_multi()
344 for code, msg, _ in r:
345 if code != 1:
346 raise RLException("Failed to clear parameter: %s"%(msg))
347
348
349 param_server_multi = config.master.get_multi()
350 for p in config.params.values():
351
352
353 param_server_multi.setParam(_ID, p.key, p.value)
354 r = param_server_multi()
355 for code, msg, _ in r:
356 if code != 1:
357 raise RLException("Failed to set parameter: %s"%(msg))
358 except RLException:
359 raise
360 except Exception as e:
361 printerrlog("load_parameters: unable to set parameters (last param was [%s]): %s"%(p,e))
362 raise
363 self.logger.info("... load_parameters complete")
364
366 """
367 Launch all the declared nodes/master
368 @return: two lists of node names where the first
369 is the nodes that successfully launched and the second is the
370 nodes that failed to launch.
371 @rtype: [[str], [str]]
372 """
373 config = self.config
374 succeeded = []
375 failed = []
376 self.logger.info("launch_nodes: launching local nodes ...")
377 local_nodes = config.nodes
378
379
380 local_nodes = [n for n in config.nodes if is_machine_local(n.machine)]
381
382 for node in local_nodes:
383 proc, success = self.launch_node(node)
384 if success:
385 succeeded.append(str(proc))
386 else:
387 failed.append(str(proc))
388
389 if self.remote_runner:
390 self.logger.info("launch_nodes: launching remote nodes ...")
391 r_succ, r_fail = self.remote_runner.launch_remote_nodes()
392 succeeded.extend(r_succ)
393 failed.extend(r_fail)
394
395 self.logger.info("... launch_nodes complete")
396 return succeeded, failed
397
399 """
400 Launches master if requested.
401 @return: True if a master was launched, False if a master was
402 already running.
403 @rtype: bool
404 @raise RLException: if master launch fails
405 """
406 m = self.config.master
407 is_running = m.is_running()
408
409 if self.is_core and is_running:
410 raise RLException("roscore cannot run as another roscore/master is already running. \nPlease kill other roscore/master processes before relaunching.\nThe ROS_MASTER_URI is %s"%(m.uri))
411
412 if not is_running:
413 validate_master_launch(m, self.is_core, self.is_rostest)
414
415 printlog("auto-starting new master")
416 p = create_master_process(
417 self.run_id, m.type, get_ros_root(), m.get_port(), self.num_workers,
418 self.timeout, master_logger_level=self.master_logger_level,
419 sigint_timeout=self.sigint_timeout, sigterm_timeout=self.sigterm_timeout)
420 self.pm.register_core_proc(p)
421 success = p.start()
422 if not success:
423 raise RLException("ERROR: unable to auto-start master process")
424 timeout_t = time.time() + _TIMEOUT_MASTER_START
425 while not m.is_running() and time.time() < timeout_t:
426 time.sleep(0.1)
427
428 if not m.is_running():
429 raise RLException("ERROR: could not contact master [%s]"%m.uri)
430
431 printlog_bold("ROS_MASTER_URI=%s"%m.uri)
432
433
434
435 update_terminal_name(m.uri)
436
437
438 param_server = m.get()
439
440
441 self._check_and_set_run_id(param_server, self.run_id)
442
443 if self.server_uri:
444
445
446 hostname, port = rosgraph.network.parse_http_host_and_port(self.server_uri)
447 hostname = _hostname_to_rosname(hostname)
448 self.logger.info("setting /roslaunch/uris/%s__%s' to %s"%(hostname, port, self.server_uri))
449 param_server.setParam(_ID, '/roslaunch/uris/%s__%s'%(hostname, port),self.server_uri)
450
451 return not is_running
452
454 """
455 Initialize self.run_id to existing value or setup parameter
456 server with /run_id set to default_run_id
457 @param default_run_id: run_id to use if value is not set
458 @type default_run_id: str
459 @param param_server: parameter server proxy
460 @type param_server: xmlrpclib.ServerProxy
461 """
462 code, _, val = param_server.hasParam(_ID, '/run_id')
463 if code == 1 and not val:
464 printlog_bold("setting /run_id to %s"%run_id)
465 param_server.setParam('/roslaunch', '/run_id', run_id)
466 else:
467
468 code, _, val = param_server.getParam('/roslaunch', '/run_id')
469 if code != 1:
470
471
472 raise RLException("ERROR: unable to retrieve /run_id from parameter server")
473 if run_id != val:
474 raise RLException("run_id on parameter server does not match declared run_id: %s vs %s"%(val, run_id))
475
476
477
479 """
480 Launch a single L{Executable} object. Blocks until executable finishes.
481 @param e: Executable
482 @type e: L{Executable}
483 @raise RLException: if executable fails. Failure includes non-zero exit code.
484 """
485 try:
486
487 cmd = e.command
488 cmd = "%s %s"%(cmd, ' '.join(e.args))
489 print("running %s" % cmd)
490 local_machine = self.config.machines['']
491 env = setup_env(None, local_machine, self.config.master.uri)
492 retcode = subprocess.call(cmd, shell=True, env=env)
493 if retcode < 0:
494 raise RLException("command [%s] failed with exit code %s"%(cmd, retcode))
495 except OSError as e:
496 raise RLException("command [%s] failed: %s"%(cmd, e))
497
498
499
501 """
502 @raise RLException: if executable fails. Failure includes non-zero exit code.
503 """
504 exes = [e for e in self.config.executables if e.phase == PHASE_SETUP]
505 for e in exes:
506 self._launch_executable(e)
507
509 """
510 launch any core services that are not already running. master must
511 be already running
512 @raise RLException: if core launches fail
513 """
514 config = self.config
515 master = config.master.get()
516 tolaunch = []
517 for node in config.nodes_core:
518 node_name = rosgraph.names.ns_join(node.namespace, node.name)
519 code, msg, _ = master.lookupNode(_ID, node_name)
520 if code == -1:
521 tolaunch.append(node)
522 elif code == 1:
523 print("core service [%s] found" % node_name)
524 else:
525 print("WARN: master is not behaving well (unexpected return value when looking up node)", file=sys.stderr)
526 self.logger.error("ERROR: master return [%s][%s] on lookupNode call"%(code,msg))
527
528 for node in tolaunch:
529 node_name = rosgraph.names.ns_join(node.namespace, node.name)
530 name, success = self.launch_node(node, core=True)
531 if success:
532 print("started core service [%s]" % node_name)
533 else:
534 raise RLException("failed to start core service [%s]"%node_name)
535
537 """
538 Launch a single node locally. Remote launching is handled separately by the remote module.
539 If node name is not assigned, one will be created for it.
540
541 @param node Node: node to launch
542 @param core bool: if True, core node
543 @return obj, bool: Process handle, successful launch. If success, return actual Process instance. Otherwise return name.
544 """
545 self.logger.info("... preparing to launch node of type [%s/%s]", node.package, node.type)
546
547
548
549
550 if node.machine is None:
551 node.machine = self.config.machines['']
552 if node.name is None:
553 node.name = rosgraph.names.anonymous_name(node.type)
554
555 master = self.config.master
556 import roslaunch.node_args
557 try:
558 process = create_node_process(self.run_id, node, master.uri, sigint_timeout=self.sigint_timeout, sigterm_timeout=self.sigterm_timeout)
559 except roslaunch.node_args.NodeParamsException as e:
560 self.logger.error(e)
561 printerrlog("ERROR: cannot launch node of type [%s/%s]: %s"%(node.package, node.type, str(e)))
562 if node.name:
563 return node.name, False
564 else:
565 return "%s/%s"%(node.package,node.type), False
566
567 self.logger.info("... created process [%s]", process.name)
568 if core:
569 self.pm.register_core_proc(process)
570 else:
571 self.pm.register(process)
572 node.process_name = process.name
573 self.logger.info("... registered process [%s]", process.name)
574
575
576 success = process.start()
577 if not success:
578 if node.machine.name:
579 printerrlog("launch of %s/%s on %s failed"%(node.package, node.type, node.machine.name))
580 else:
581 printerrlog("local launch of %s/%s failed"%(node.package, node.type))
582 else:
583 self.logger.info("... successfully launched [%s]", process.name)
584 return process, success
585
587 """
588 Check for running node process.
589 @param node Node: node object to check
590 @return bool: True if process associated with node is running (launched && !dead)
591 """
592
593 return node.process_name and self.pm.has_process(node.process_name)
594
596 """
597 Same as spin() but only does one cycle. must be run from the main thread.
598 """
599 if not self.pm:
600 return False
601 return self.pm.mainthread_spin_once()
602
604 """
605 spin() must be run from the main thread. spin() is very
606 important for roslaunch as it picks up jobs that the process
607 monitor need to be run in the main thread.
608 """
609 self.logger.info("spin")
610
611
612
613 if not self.pm or not self.pm.get_active_names():
614 printlog_bold("No processes to monitor")
615 self.stop()
616 return
617 self.pm.mainthread_spin()
618
619 self.logger.info("process monitor is done spinning, initiating full shutdown")
620 self.stop()
621 printlog_bold("done")
622
624 """
625 Stop the launch and all associated processes. not thread-safe.
626 """
627 self.logger.info("runner.stop()")
628 if self.pm is not None:
629 printlog("shutting down processing monitor...")
630 self.logger.info("shutting down processing monitor %s"%self.pm)
631 self.pm.shutdown()
632 self.pm.join()
633 self.pm = None
634 printlog("... shutting down processing monitor complete")
635 else:
636 self.logger.info("... roslaunch runner has already been stopped")
637
639 """
640 Setup the state of the ROS network, including the parameter
641 server state and core services
642 """
643
644 self.config.assign_machines()
645
646 if self.remote_runner:
647
648 self.remote_runner.add_process_listener(self.listeners)
649
650
651 launched = self._launch_master()
652 if launched:
653 self._launch_core_nodes()
654
655
656
657
658
659 self._launch_setup_executables()
660
661
662 if not self.is_child:
663 self._load_parameters()
664
666 """
667 Run the launch. Depending on usage, caller should call
668 spin_once or spin as appropriate after launch().
669 @return ([str], [str]): tuple containing list of nodes that
670 successfully launches and list of nodes that failed to launch
671 @rtype: ([str], [str])
672 @raise RLException: if launch fails (e.g. run_id parameter does
673 not match ID on parameter server)
674 """
675 try:
676 self._setup()
677 succeeded, failed = self._launch_nodes()
678 return succeeded, failed
679 except KeyboardInterrupt:
680 self.stop()
681 raise
682
684 """
685 Run the test node. Blocks until completion or timeout.
686 @param test: test node to run
687 @type test: Test
688 @raise RLTestTimeoutException: if test fails to launch or test times out
689 """
690 self.logger.info("... preparing to run test [%s] of type [%s/%s]", test.test_name, test.package, test.type)
691 proc_h, success = self.launch_node(test)
692 if not success:
693 raise RLException("test [%s] failed to launch"%test.test_name)
694
695
696 timeout_t = time.time() + test.time_limit
697 pm = self.pm
698 while pm.mainthread_spin_once() and self.is_node_running(test):
699
700 if time.time() > timeout_t:
701 raise RLTestTimeoutException(
702 "max time [%ss] allotted for test [%s] of type [%s/%s]" %
703 (test.time_limit, test.test_name, test.package, test.type))
704 time.sleep(0.1)
705
706
707
708
709
711 """
712 API for remote running
713 """
719 """
720 Listen to events about remote processes dying. Not
721 threadsafe. Must be called before processes started.
722 @param l: listener
723 @type l: L{ProcessListener}
724 """
725 pass
726
728 """
729 Contact each child to launch remote nodes
730 @return: succeeded, failed
731 @rtype: [str], [str]
732 """
733 pass
734