1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 Top-level implementation of launching processes. Coordinates
39 lower-level libraries.
40 """
41
42 import os
43 import logging
44 import subprocess
45 import sys
46 import time
47 import traceback
48
49 import rosgraph
50 import rosgraph.names
51 import rosgraph.network
52
53 from roslaunch.core import *
54
55 from roslaunch.nodeprocess import create_master_process, create_node_process
56 from roslaunch.pmon import start_process_monitor, ProcessListener
57
58 from roslaunch.rlutil import update_terminal_name
59
60 from rosmaster.master_api import NUM_WORKERS
61
62 _TIMEOUT_MASTER_START = 10.0
63 _TIMEOUT_MASTER_STOP = 10.0
64
65 _ID = '/roslaunch'
66
68
70 """
71 Validate the configuration of a master we are about to launch. Ths
72 validation already assumes that no existing master is running at
73 this configuration and merely checks configuration for a new
74 launch.
75 """
76
77
78
79
80
81
82
83 if not rosgraph.network.is_local_address(m.get_host()):
84
85
86 if is_core:
87
88
89
90 try:
91 reverse_ips = [host[4][0] for host in socket.getaddrinfo(m.get_host(), 0, 0, 0, socket.SOL_TCP)]
92 local_addrs = rosgraph.network.get_local_addresses()
93 printerrlog("""WARNING: IP addresses %s for local hostname '%s' do not appear to match
94 any local IP address (%s). Your ROS nodes may fail to communicate.
95
96 Please use ROS_IP to set the correct IP address to use."""%(','.join(reverse_ips), m.get_host(), ','.join(local_addrs)))
97 except:
98 printerrlog("""WARNING: local hostname '%s' does not map to an IP address.
99 Your ROS nodes may fail to communicate.
100
101 Please use ROS_IP to set the correct IP address to use."""%(m.get_host()))
102
103 else:
104
105 raise RLException("ERROR: unable to contact ROS master at [%s]"%(m.uri))
106
107 if is_core and not is_rostest:
108
109
110 env_uri = rosgraph.get_master_uri()
111 env_host, env_port = rosgraph.network.parse_http_host_and_port(env_uri)
112
113 if not rosgraph.network.is_local_address(env_host):
114
115 printerrlog("WARNING: ROS_MASTER_URI [%s] host is not set to this machine"%(env_uri))
116 elif env_port != m.get_port():
117
118 printerrlog("WARNING: ROS_MASTER_URI port [%s] does not match this roscore [%s]"%(env_port, m.get_port()))
119
121 splits = [s for s in p.split(rosgraph.names.SEP) if s]
122 curr =rosgraph.names.SEP
123 parents = [curr]
124 for s in splits[:-1]:
125 next_ = curr + s + rosgraph.names.SEP
126 parents.append(next_)
127 curr = next_
128 return parents
129
131 """
132 Reduce clear_params such that only the highest-level namespaces
133 are represented for overlapping namespaces. e.g. if both /foo/ and
134 /foo/bar are present, return just /foo/.
135
136 @param params: paremter namees
137 @type params: [str]
138 @return: unified parameters
139 @rtype: [str]
140 """
141
142
143
144 canon_params = []
145 for p in params:
146 if not p[-1] == rosgraph.names.SEP:
147 p += rosgraph.names.SEP
148 if not p in canon_params:
149 canon_params.append(p)
150
151 clear_params = canon_params[:]
152 for p in canon_params:
153 for parent in _all_namespace_parents(p):
154 if parent in canon_params and p in clear_params and p != '/':
155 clear_params.remove(p)
156 return clear_params
157
159 """
160 Utility function to strip illegal characters from hostname. This
161 is implemented to be correct, not efficient."""
162
163
164 fixed = 'host_'
165
166 hostname = hostname.lower()
167 for c in hostname:
168
169 if (c >= 'a' and c <= 'z') or \
170 (c >= '0' and c <= '9'):
171 fixed+=c
172 else:
173 fixed+='_'
174 return fixed
175
177 """
178 Helper class to manage distributing process events. It functions as
179 a higher level aggregator of events. In particular, events about
180 remote and local processes require different mechanisms for being
181 caught and reported.
182 """
184 self.process_listeners = []
185
187 """
188 Add listener to list of listeners. Not threadsafe.
189 @param l: listener
190 @type l: L{ProcessListener}
191 """
192 self.process_listeners.append(l)
193
195 """
196 ProcessListener callback
197 """
198 for l in self.process_listeners:
199 try:
200 l.process_died(process_name, exit_code)
201 except Exception as e:
202 logging.getLogger('roslaunch').error(traceback.format_exc())
203
205 """
206 Listener interface for events related to ROSLaunch.
207 ROSLaunchListener is currently identical to the lower-level
208 L{roslaunch.pmon.ProcessListener} interface, but is separate for
209 architectural reasons. The lower-level
210 L{roslaunch.pmon.ProcessMonitor} does not provide events about
211 remotely running processes.
212 """
213
215 """
216 Notifies listener that process has died. This callback only
217 occurs for processes that die during normal process monitor
218 execution -- processes that are forcibly killed during
219 L{roslaunch.pmon.ProcessMonitor} shutdown are not reported.
220 @param process_name: name of process
221 @type process_name: str
222 @param exit_code int: exit code of process. If None, it means
223 that L{roslaunch.pmon.ProcessMonitor} was unable to determine an exit code.
224 @type exit_code: int
225 """
226 pass
227
229 """
230 Runs a roslaunch. The normal sequence of API calls is L{launch()}
231 followed by L{spin()}. An external thread can call L{stop()}; otherwise
232 the runner will block until an exit signal. Another usage is to
233 call L{launch()} followed by repeated calls to L{spin_once()}. This usage
234 allows the main thread to continue to do work while processes are
235 monitored.
236 """
237
238 - def __init__(self, run_id, config, server_uri=None, pmon=None, is_core=False, remote_runner=None, is_child=False, is_rostest=False, num_workers=NUM_WORKERS, timeout=None):
239 """
240 @param run_id: /run_id for this launch. If the core is not
241 running, this value will be used to initialize /run_id. If
242 the core is already running, this value will be checked
243 against the value stored on the core. L{ROSLaunchRunner} will
244 fail during L{launch()} if they do not match.
245 @type run_id: str
246 @param config: roslauch instance to run
247 @type config: L{ROSLaunchConfig}
248 @param server_uri: XML-RPC URI of roslaunch server.
249 @type server_uri: str
250 @param pmon: optionally override the process
251 monitor the runner uses for starting and tracking processes
252 @type pmon: L{ProcessMonitor}
253
254 @param is_core: if True, this runner is a roscore
255 instance. This affects the error behavior if a master is
256 already running -- aborts if is_core is True and a core is
257 detected.
258 @type is_core: bool
259 @param remote_runner: remote roslaunch process runner
260 @param is_rostest: if True, this runner is a rostest
261 instance. This affects certain validation checks.
262 @type is_rostest: bool
263 @param num_workers: If this is the core, the number of worker-threads to use.
264 @type num_workers: int
265 @param timeout: If this is the core, the socket-timeout to use.
266 @type timeout: Float or None
267 """
268 if run_id is None:
269 raise RLException("run_id is None")
270 self.run_id = run_id
271
272
273
274
275
276
277 self.config = config
278 self.server_uri = server_uri
279 self.is_child = is_child
280 self.is_core = is_core
281 self.is_rostest = is_rostest
282 self.num_workers = num_workers
283 self.timeout = timeout
284 self.logger = logging.getLogger('roslaunch')
285 self.pm = pmon or start_process_monitor()
286
287
288
289
290 self.listeners = _ROSLaunchListeners()
291 if self.pm is None:
292 raise RLException("unable to initialize roslaunch process monitor")
293 if self.pm.is_shutdown:
294 raise RLException("bad roslaunch process monitor initialization: process monitor is already dead")
295
296 self.pm.add_process_listener(self.listeners)
297
298 self.remote_runner = remote_runner
299
301 """
302 Add listener to list of listeners. Not threadsafe. Must be
303 called before processes started.
304 @param l: listener
305 @type l: L{ProcessListener}
306 """
307 self.listeners.add_process_listener(l)
308
310 """
311 Load parameters onto the parameter server
312 """
313 self.logger.info("load_parameters starting ...")
314 config = self.config
315 param_server = config.master.get()
316 p = None
317 try:
318
319 param_server_multi = config.master.get_multi()
320
321
322
323 for p in _unify_clear_params(config.clear_params):
324 if param_server.hasParam(_ID, p)[2]:
325
326 param_server_multi.deleteParam(_ID, p)
327 r = param_server_multi()
328 for code, msg, _ in r:
329 if code != 1:
330 raise RLException("Failed to clear parameter: %s"%(msg))
331
332
333 param_server_multi = config.master.get_multi()
334 for p in config.params.values():
335
336
337 param_server_multi.setParam(_ID, p.key, p.value)
338 r = param_server_multi()
339 for code, msg, _ in r:
340 if code != 1:
341 raise RLException("Failed to set parameter: %s"%(msg))
342 except RLException:
343 raise
344 except Exception as e:
345 printerrlog("load_parameters: unable to set parameters (last param was [%s]): %s"%(p,e))
346 raise
347 self.logger.info("... load_parameters complete")
348
350 """
351 Launch all the declared nodes/master
352 @return: two lists of node names where the first
353 is the nodes that successfully launched and the second is the
354 nodes that failed to launch.
355 @rtype: [[str], [str]]
356 """
357 config = self.config
358 succeeded = []
359 failed = []
360 self.logger.info("launch_nodes: launching local nodes ...")
361 local_nodes = config.nodes
362
363
364 local_nodes = [n for n in config.nodes if is_machine_local(n.machine)]
365
366 for node in local_nodes:
367 proc, success = self.launch_node(node)
368 if success:
369 succeeded.append(str(proc))
370 else:
371 failed.append(str(proc))
372
373 if self.remote_runner:
374 self.logger.info("launch_nodes: launching remote nodes ...")
375 r_succ, r_fail = self.remote_runner.launch_remote_nodes()
376 succeeded.extend(r_succ)
377 failed.extend(r_fail)
378
379 self.logger.info("... launch_nodes complete")
380 return succeeded, failed
381
383 """
384 Launches master if requested.
385 @return: True if a master was launched, False if a master was
386 already running.
387 @rtype: bool
388 @raise RLException: if master launch fails
389 """
390 m = self.config.master
391 is_running = m.is_running()
392
393 if self.is_core and is_running:
394 raise RLException("roscore cannot run as another roscore/master is already running. \nPlease kill other roscore/master processes before relaunching.\nThe ROS_MASTER_URI is %s"%(m.uri))
395
396 if not is_running:
397 validate_master_launch(m, self.is_core, self.is_rostest)
398
399 printlog("auto-starting new master")
400 p = create_master_process(self.run_id, m.type, get_ros_root(), m.get_port(), self.num_workers, self.timeout)
401 self.pm.register_core_proc(p)
402 success = p.start()
403 if not success:
404 raise RLException("ERROR: unable to auto-start master process")
405 timeout_t = time.time() + _TIMEOUT_MASTER_START
406 while not m.is_running() and time.time() < timeout_t:
407 time.sleep(0.1)
408
409 if not m.is_running():
410 raise RLException("ERROR: could not contact master [%s]"%m.uri)
411
412 printlog_bold("ROS_MASTER_URI=%s"%m.uri)
413
414
415
416 update_terminal_name(m.uri)
417
418
419 param_server = m.get()
420
421
422 self._check_and_set_run_id(param_server, self.run_id)
423
424 if self.server_uri:
425
426
427 hostname, port = rosgraph.network.parse_http_host_and_port(self.server_uri)
428 hostname = _hostname_to_rosname(hostname)
429 self.logger.info("setting /roslaunch/uris/%s__%s' to %s"%(hostname, port, self.server_uri))
430 param_server.setParam(_ID, '/roslaunch/uris/%s__%s'%(hostname, port),self.server_uri)
431
432 return not is_running
433
435 """
436 Initialize self.run_id to existing value or setup parameter
437 server with /run_id set to default_run_id
438 @param default_run_id: run_id to use if value is not set
439 @type default_run_id: str
440 @param param_server: parameter server proxy
441 @type param_server: xmlrpclib.ServerProxy
442 """
443 code, _, val = param_server.hasParam(_ID, '/run_id')
444 if code == 1 and not val:
445 printlog_bold("setting /run_id to %s"%run_id)
446 param_server.setParam('/roslaunch', '/run_id', run_id)
447 else:
448
449 code, _, val = param_server.getParam('/roslaunch', '/run_id')
450 if code != 1:
451
452
453 raise RLException("ERROR: unable to retrieve /run_id from parameter server")
454 if run_id != val:
455 raise RLException("run_id on parameter server does not match declared run_id: %s vs %s"%(val, run_id))
456
457
458
460 """
461 Launch a single L{Executable} object. Blocks until executable finishes.
462 @param e: Executable
463 @type e: L{Executable}
464 @raise RLException: if exectuable fails. Failure includes non-zero exit code.
465 """
466 try:
467
468 cmd = e.command
469 cmd = "%s %s"%(cmd, ' '.join(e.args))
470 print("running %s" % cmd)
471 local_machine = self.config.machines['']
472 env = setup_env(None, local_machine, self.config.master.uri)
473 retcode = subprocess.call(cmd, shell=True, env=env)
474 if retcode < 0:
475 raise RLException("command [%s] failed with exit code %s"%(cmd, retcode))
476 except OSError as e:
477 raise RLException("command [%s] failed: %s"%(cmd, e))
478
479
480
482 """
483 @raise RLException: if exectuable fails. Failure includes non-zero exit code.
484 """
485 exes = [e for e in self.config.executables if e.phase == PHASE_SETUP]
486 for e in exes:
487 self._launch_executable(e)
488
490 """
491 launch any core services that are not already running. master must
492 be already running
493 @raise RLException: if core launches fail
494 """
495 config = self.config
496 master = config.master.get()
497 tolaunch = []
498 for node in config.nodes_core:
499 node_name = rosgraph.names.ns_join(node.namespace, node.name)
500 code, msg, _ = master.lookupNode(_ID, node_name)
501 if code == -1:
502 tolaunch.append(node)
503 elif code == 1:
504 print("core service [%s] found" % node_name)
505 else:
506 print("WARN: master is not behaving well (unexpected return value when looking up node)", file=sys.stderr)
507 self.logger.error("ERROR: master return [%s][%s] on lookupNode call"%(code,msg))
508
509 for node in tolaunch:
510 node_name = rosgraph.names.ns_join(node.namespace, node.name)
511 name, success = self.launch_node(node, core=True)
512 if success:
513 print("started core service [%s]" % node_name)
514 else:
515 raise RLException("failed to start core service [%s]"%node_name)
516
518 """
519 Launch a single node locally. Remote launching is handled separately by the remote module.
520 If node name is not assigned, one will be created for it.
521
522 @param node Node: node to launch
523 @param core bool: if True, core node
524 @return obj, bool: Process handle, successful launch. If success, return actual Process instance. Otherwise return name.
525 """
526 self.logger.info("... preparing to launch node of type [%s/%s]", node.package, node.type)
527
528
529
530
531 if node.machine is None:
532 node.machine = self.config.machines['']
533 if node.name is None:
534 node.name = rosgraph.names.anonymous_name(node.type)
535
536 master = self.config.master
537 import roslaunch.node_args
538 try:
539 process = create_node_process(self.run_id, node, master.uri)
540 except roslaunch.node_args.NodeParamsException as e:
541 self.logger.error(e)
542 printerrlog("ERROR: cannot launch node of type [%s/%s]: %s"%(node.package, node.type, str(e)))
543 if node.name:
544 return node.name, False
545 else:
546 return "%s/%s"%(node.package,node.type), False
547
548 self.logger.info("... created process [%s]", process.name)
549 if core:
550 self.pm.register_core_proc(process)
551 else:
552 self.pm.register(process)
553 node.process_name = process.name
554 self.logger.info("... registered process [%s]", process.name)
555
556
557 success = process.start()
558 if not success:
559 if node.machine.name:
560 printerrlog("launch of %s/%s on %s failed"%(node.package, node.type, node.machine.name))
561 else:
562 printerrlog("local launch of %s/%s failed"%(node.package, node.type))
563 else:
564 self.logger.info("... successfully launched [%s]", process.name)
565 return process, success
566
568 """
569 Check for running node process.
570 @param node Node: node object to check
571 @return bool: True if process associated with node is running (launched && !dead)
572 """
573
574 return node.process_name and self.pm.has_process(node.process_name)
575
577 """
578 Same as spin() but only does one cycle. must be run from the main thread.
579 """
580 if not self.pm:
581 return False
582 return self.pm.mainthread_spin_once()
583
585 """
586 spin() must be run from the main thread. spin() is very
587 important for roslaunch as it picks up jobs that the process
588 monitor need to be run in the main thread.
589 """
590 self.logger.info("spin")
591
592
593
594 if not self.pm or not self.pm.get_active_names():
595 printlog_bold("No processes to monitor")
596 self.stop()
597 return
598 self.pm.mainthread_spin()
599
600 self.logger.info("process monitor is done spinning, initiating full shutdown")
601 self.stop()
602 printlog_bold("done")
603
605 """
606 Stop the launch and all associated processes. not thread-safe.
607 """
608 self.logger.info("runner.stop()")
609 if self.pm is not None:
610 printlog("shutting down processing monitor...")
611 self.logger.info("shutting down processing monitor %s"%self.pm)
612 self.pm.shutdown()
613 self.pm.join()
614 self.pm = None
615 printlog("... shutting down processing monitor complete")
616 else:
617 self.logger.info("... roslaunch runner has already been stopped")
618
620 """
621 Setup the state of the ROS network, including the parameter
622 server state and core services
623 """
624
625 self.config.assign_machines()
626
627 if self.remote_runner:
628
629 self.remote_runner.add_process_listener(self.listeners)
630
631
632 launched = self._launch_master()
633 if launched:
634 self._launch_core_nodes()
635
636
637
638
639
640 self._launch_setup_executables()
641
642
643 if not self.is_child:
644 self._load_parameters()
645
647 """
648 Run the launch. Depending on usage, caller should call
649 spin_once or spin as appropriate after launch().
650 @return ([str], [str]): tuple containing list of nodes that
651 successfully launches and list of nodes that failed to launch
652 @rtype: ([str], [str])
653 @raise RLException: if launch fails (e.g. run_id parameter does
654 not match ID on parameter server)
655 """
656 try:
657 self._setup()
658 succeeded, failed = self._launch_nodes()
659 return succeeded, failed
660 except KeyboardInterrupt:
661 self.stop()
662 raise
663
665 """
666 Run the test node. Blocks until completion or timeout.
667 @param test: test node to run
668 @type test: Test
669 @raise RLTestTimeoutException: if test fails to launch or test times out
670 """
671 self.logger.info("... preparing to run test [%s] of type [%s/%s]", test.test_name, test.package, test.type)
672 proc_h, success = self.launch_node(test)
673 if not success:
674 raise RLException("test [%s] failed to launch"%test.test_name)
675
676
677 timeout_t = time.time() + test.time_limit
678 pm = self.pm
679 while pm.mainthread_spin_once() and self.is_node_running(test):
680
681 if time.time() > timeout_t:
682 raise RLTestTimeoutException(
683 "max time [%ss] allotted for test [%s] of type [%s/%s]" %
684 (test.time_limit, test.test_name, test.package, test.type))
685 time.sleep(0.1)
686
687
688
689
690
692 """
693 API for remote running
694 """
700 """
701 Listen to events about remote processes dying. Not
702 threadsafe. Must be called before processes started.
703 @param l: listener
704 @type l: L{ProcessListener}
705 """
706 pass
707
709 """
710 Contact each child to launch remote nodes
711 @return: succeeded, failed
712 @rtype: [str], [str]
713 """
714 pass
715