1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 Top-level implementation of launching processes. Coordinates
39 lower-level libraries.
40 """
41
42 import os
43 import logging
44 import subprocess
45 import sys
46 import time
47 import traceback
48
49 import rosgraph
50 import rosgraph.names
51 import rosgraph.network
52
53 from roslaunch.core import *
54
55 from roslaunch.nodeprocess import create_master_process, create_node_process
56 from roslaunch.pmon import start_process_monitor, ProcessListener
57
58 from roslaunch.rlutil import update_terminal_name
59
60 from rosmaster.master_api import NUM_WORKERS
61
62 _TIMEOUT_MASTER_START = 10.0
63 _TIMEOUT_MASTER_STOP = 10.0
64
65 _ID = '/roslaunch'
66
68
70 """
71 Validate the configuration of a master we are about to launch. Ths
72 validation already assumes that no existing master is running at
73 this configuration and merely checks configuration for a new
74 launch.
75 """
76
77
78
79
80
81
82
83 if not rosgraph.network.is_local_address(m.get_host()):
84
85
86 if is_core:
87
88
89
90 try:
91 reverse_ips = [host[4][0] for host in socket.getaddrinfo(m.get_host(), 0, 0, 0, socket.SOL_TCP)]
92 local_addrs = rosgraph.network.get_local_addresses()
93 printerrlog("""WARNING: IP addresses %s for local hostname '%s' do not appear to match
94 any local IP address (%s). Your ROS nodes may fail to communicate.
95
96 Please use ROS_IP to set the correct IP address to use."""%(','.join(reverse_ips), m.get_host(), ','.join(local_addrs)))
97 except:
98 printerrlog("""WARNING: local hostname '%s' does not map to an IP address.
99 Your ROS nodes may fail to communicate.
100
101 Please use ROS_IP to set the correct IP address to use."""%(m.get_host()))
102
103 else:
104
105 raise RLException("ERROR: unable to contact ROS master at [%s]"%(m.uri))
106
107 if is_core and not is_rostest:
108
109
110 env_uri = rosgraph.get_master_uri()
111 env_host, env_port = rosgraph.network.parse_http_host_and_port(env_uri)
112
113 if not rosgraph.network.is_local_address(env_host):
114
115 printerrlog("WARNING: ROS_MASTER_URI [%s] host is not set to this machine"%(env_uri))
116 elif env_port != m.get_port():
117
118 printerrlog("WARNING: ROS_MASTER_URI port [%s] does not match this roscore [%s]"%(env_port, m.get_port()))
119
121 splits = [s for s in p.split(rosgraph.names.SEP) if s]
122 curr =rosgraph.names.SEP
123 parents = [curr]
124 for s in splits[:-1]:
125 next_ = curr + s + rosgraph.names.SEP
126 parents.append(next_)
127 curr = next_
128 return parents
129
131 """
132 Reduce clear_params such that only the highest-level namespaces
133 are represented for overlapping namespaces. e.g. if both /foo/ and
134 /foo/bar are present, return just /foo/.
135
136 @param params: paremter namees
137 @type params: [str]
138 @return: unified parameters
139 @rtype: [str]
140 """
141
142
143
144 canon_params = []
145 for p in params:
146 if not p[-1] == rosgraph.names.SEP:
147 p += rosgraph.names.SEP
148 if not p in canon_params:
149 canon_params.append(p)
150
151 clear_params = canon_params[:]
152 for p in canon_params:
153 for parent in _all_namespace_parents(p):
154 if parent in canon_params and p in clear_params and p != '/':
155 clear_params.remove(p)
156 return clear_params
157
159 """
160 Utility function to strip illegal characters from hostname. This
161 is implemented to be correct, not efficient."""
162
163
164 fixed = 'host_'
165
166 hostname = hostname.lower()
167 for c in hostname:
168
169 if (c >= 'a' and c <= 'z') or \
170 (c >= '0' and c <= '9'):
171 fixed+=c
172 else:
173 fixed+='_'
174 return fixed
175
177 """
178 Helper class to manage distributing process events. It functions as
179 a higher level aggregator of events. In particular, events about
180 remote and local processes require different mechanisms for being
181 caught and reported.
182 """
184 self.process_listeners = []
185
187 """
188 Add listener to list of listeners. Not threadsafe.
189 @param l: listener
190 @type l: L{ProcessListener}
191 """
192 self.process_listeners.append(l)
193
195 """
196 ProcessListener callback
197 """
198 for l in self.process_listeners:
199 try:
200 l.process_died(process_name, exit_code)
201 except Exception as e:
202 logging.getLogger('roslaunch').error(traceback.format_exc())
203
205 """
206 Listener interface for events related to ROSLaunch.
207 ROSLaunchListener is currently identical to the lower-level
208 L{roslaunch.pmon.ProcessListener} interface, but is separate for
209 architectural reasons. The lower-level
210 L{roslaunch.pmon.ProcessMonitor} does not provide events about
211 remotely running processes.
212 """
213
215 """
216 Notifies listener that process has died. This callback only
217 occurs for processes that die during normal process monitor
218 execution -- processes that are forcibly killed during
219 L{roslaunch.pmon.ProcessMonitor} shutdown are not reported.
220 @param process_name: name of process
221 @type process_name: str
222 @param exit_code int: exit code of process. If None, it means
223 that L{roslaunch.pmon.ProcessMonitor} was unable to determine an exit code.
224 @type exit_code: int
225 """
226 pass
227
229 """
230 Runs a roslaunch. The normal sequence of API calls is L{launch()}
231 followed by L{spin()}. An external thread can call L{stop()}; otherwise
232 the runner will block until an exit signal. Another usage is to
233 call L{launch()} followed by repeated calls to L{spin_once()}. This usage
234 allows the main thread to continue to do work while processes are
235 monitored.
236 """
237
238 - def __init__(self, run_id, config, server_uri=None, pmon=None, is_core=False, remote_runner=None, is_child=False, is_rostest=False, num_workers=NUM_WORKERS, timeout=None):
239 """
240 @param run_id: /run_id for this launch. If the core is not
241 running, this value will be used to initialize /run_id. If
242 the core is already running, this value will be checked
243 against the value stored on the core. L{ROSLaunchRunner} will
244 fail during L{launch()} if they do not match.
245 @type run_id: str
246 @param config: roslauch instance to run
247 @type config: L{ROSLaunchConfig}
248 @param server_uri: XML-RPC URI of roslaunch server.
249 @type server_uri: str
250 @param pmon: optionally override the process
251 monitor the runner uses for starting and tracking processes
252 @type pmon: L{ProcessMonitor}
253
254 @param is_core: if True, this runner is a roscore
255 instance. This affects the error behavior if a master is
256 already running -- aborts if is_core is True and a core is
257 detected.
258 @type is_core: bool
259 @param remote_runner: remote roslaunch process runner
260 @param is_rostest: if True, this runner is a rostest
261 instance. This affects certain validation checks.
262 @type is_rostest: bool
263 @param num_workers: If this is the core, the number of worker-threads to use.
264 @type num_workers: int
265 @param timeout: If this is the core, the socket-timeout to use.
266 @type timeout: Float or None
267 """
268 if run_id is None:
269 raise RLException("run_id is None")
270 self.run_id = run_id
271
272
273
274
275
276
277 self.config = config
278 self.server_uri = server_uri
279 self.is_child = is_child
280 self.is_core = is_core
281 self.is_rostest = is_rostest
282 self.num_workers = num_workers
283 self.timeout = timeout
284 self.logger = logging.getLogger('roslaunch')
285 self.pm = pmon or start_process_monitor()
286
287
288
289
290 self.listeners = _ROSLaunchListeners()
291 if self.pm is None:
292 raise RLException("unable to initialize roslaunch process monitor")
293 if self.pm.is_shutdown:
294 raise RLException("bad roslaunch process monitor initialization: process monitor is already dead")
295
296 self.pm.add_process_listener(self.listeners)
297
298 self.remote_runner = remote_runner
299
301 """
302 Add listener to list of listeners. Not threadsafe. Must be
303 called before processes started.
304 @param l: listener
305 @type l: L{ProcessListener}
306 """
307 self.listeners.add_process_listener(l)
308
310 """
311 Load parameters onto the parameter server
312 """
313 self.logger.info("load_parameters starting ...")
314 config = self.config
315 param_server = config.master.get()
316 p = None
317 try:
318
319 param_server_multi = config.master.get_multi()
320
321
322
323 for p in _unify_clear_params(config.clear_params):
324 if param_server.hasParam(_ID, p)[2]:
325
326 param_server_multi.deleteParam(_ID, p)
327 r = param_server_multi()
328 for code, msg, _ in r:
329 if code != 1:
330 raise RLException("Failed to clear parameter: %s"%(msg))
331
332
333 param_server_multi = config.master.get_multi()
334 for p in config.params.values():
335
336
337 param_server_multi.setParam(_ID, p.key, p.value)
338 r = param_server_multi()
339 for code, msg, _ in r:
340 if code != 1:
341 raise RLException("Failed to set parameter: %s"%(msg))
342 except RLException:
343 raise
344 except Exception as e:
345 printerrlog("load_parameters: unable to set parameters (last param was [%s]): %s"%(p,e))
346 raise
347 self.logger.info("... load_parameters complete")
348
350 """
351 Launch all the declared nodes/master
352 @return: two lists of node names where the first
353 is the nodes that successfully launched and the second is the
354 nodes that failed to launch.
355 @rtype: [[str], [str]]
356 """
357 config = self.config
358 succeeded = []
359 failed = []
360 self.logger.info("launch_nodes: launching local nodes ...")
361 local_nodes = config.nodes
362
363
364 local_nodes = [n for n in config.nodes if is_machine_local(n.machine)]
365
366 for node in local_nodes:
367 proc, success = self.launch_node(node)
368 if success:
369 succeeded.append(str(proc))
370 else:
371 failed.append(str(proc))
372
373 if self.remote_runner:
374 self.logger.info("launch_nodes: launching remote nodes ...")
375 r_succ, r_fail = self.remote_runner.launch_remote_nodes()
376 succeeded.extend(r_succ)
377 failed.extend(r_fail)
378
379 self.logger.info("... launch_nodes complete")
380 return succeeded, failed
381
383 """
384 Launches master if requested.
385 @raise RLException: if master launch fails
386 """
387 m = self.config.master
388 is_running = m.is_running()
389
390 if self.is_core and is_running:
391 raise RLException("roscore cannot run as another roscore/master is already running. \nPlease kill other roscore/master processes before relaunching.\nThe ROS_MASTER_URI is %s"%(m.uri))
392
393 if not is_running:
394 validate_master_launch(m, self.is_core, self.is_rostest)
395
396 printlog("auto-starting new master")
397 p = create_master_process(self.run_id, m.type, get_ros_root(), m.get_port(), self.num_workers, self.timeout)
398 self.pm.register_core_proc(p)
399 success = p.start()
400 if not success:
401 raise RLException("ERROR: unable to auto-start master process")
402 timeout_t = time.time() + _TIMEOUT_MASTER_START
403 while not m.is_running() and time.time() < timeout_t:
404 time.sleep(0.1)
405
406 if not m.is_running():
407 raise RLException("ERROR: could not contact master [%s]"%m.uri)
408
409 printlog_bold("ROS_MASTER_URI=%s"%m.uri)
410
411
412
413 update_terminal_name(m.uri)
414
415
416 param_server = m.get()
417
418
419 self._check_and_set_run_id(param_server, self.run_id)
420
421 if self.server_uri:
422
423
424 hostname, port = rosgraph.network.parse_http_host_and_port(self.server_uri)
425 hostname = _hostname_to_rosname(hostname)
426 self.logger.info("setting /roslaunch/uris/%s__%s' to %s"%(hostname, port, self.server_uri))
427 param_server.setParam(_ID, '/roslaunch/uris/%s__%s'%(hostname, port),self.server_uri)
428
430 """
431 Initialize self.run_id to existing value or setup parameter
432 server with /run_id set to default_run_id
433 @param default_run_id: run_id to use if value is not set
434 @type default_run_id: str
435 @param param_server: parameter server proxy
436 @type param_server: xmlrpclib.ServerProxy
437 """
438 code, _, val = param_server.hasParam(_ID, '/run_id')
439 if code == 1 and not val:
440 printlog_bold("setting /run_id to %s"%run_id)
441 param_server.setParam('/roslaunch', '/run_id', run_id)
442 else:
443
444 code, _, val = param_server.getParam('/roslaunch', '/run_id')
445 if code != 1:
446
447
448 raise RLException("ERROR: unable to retrieve /run_id from parameter server")
449 if run_id != val:
450 raise RLException("run_id on parameter server does not match declared run_id: %s vs %s"%(val, run_id))
451
452
453
455 """
456 Launch a single L{Executable} object. Blocks until executable finishes.
457 @param e: Executable
458 @type e: L{Executable}
459 @raise RLException: if exectuable fails. Failure includes non-zero exit code.
460 """
461 try:
462
463 cmd = e.command
464 cmd = "%s %s"%(cmd, ' '.join(e.args))
465 print("running %s" % cmd)
466 local_machine = self.config.machines['']
467 env = setup_env(None, local_machine, self.config.master.uri)
468 retcode = subprocess.call(cmd, shell=True, env=env)
469 if retcode < 0:
470 raise RLException("command [%s] failed with exit code %s"%(cmd, retcode))
471 except OSError as e:
472 raise RLException("command [%s] failed: %s"%(cmd, e))
473
474
475
477 """
478 @raise RLException: if exectuable fails. Failure includes non-zero exit code.
479 """
480 exes = [e for e in self.config.executables if e.phase == PHASE_SETUP]
481 for e in exes:
482 self._launch_executable(e)
483
485 """
486 launch any core services that are not already running. master must
487 be already running
488 @raise RLException: if core launches fail
489 """
490 config = self.config
491 master = config.master.get()
492 tolaunch = []
493 for node in config.nodes_core:
494 node_name = rosgraph.names.ns_join(node.namespace, node.name)
495 code, msg, _ = master.lookupNode(_ID, node_name)
496 if code == -1:
497 tolaunch.append(node)
498 elif code == 1:
499 print("core service [%s] found" % node_name)
500 else:
501 print("WARN: master is not behaving well (unexpected return value when looking up node)", file=sys.stderr)
502 self.logger.error("ERROR: master return [%s][%s] on lookupNode call"%(code,msg))
503
504 for node in tolaunch:
505 node_name = rosgraph.names.ns_join(node.namespace, node.name)
506 name, success = self.launch_node(node, core=True)
507 if success:
508 print("started core service [%s]" % node_name)
509 else:
510 raise RLException("failed to start core service [%s]"%node_name)
511
513 """
514 Launch a single node locally. Remote launching is handled separately by the remote module.
515 If node name is not assigned, one will be created for it.
516
517 @param node Node: node to launch
518 @param core bool: if True, core node
519 @return obj, bool: Process handle, successful launch. If success, return actual Process instance. Otherwise return name.
520 """
521 self.logger.info("... preparing to launch node of type [%s/%s]", node.package, node.type)
522
523
524
525
526 if node.machine is None:
527 node.machine = self.config.machines['']
528 if node.name is None:
529 node.name = rosgraph.names.anonymous_name(node.type)
530
531 master = self.config.master
532 import roslaunch.node_args
533 try:
534 process = create_node_process(self.run_id, node, master.uri)
535 except roslaunch.node_args.NodeParamsException as e:
536 self.logger.error(e)
537 if node.package == 'rosout' and node.type == 'rosout':
538 printerrlog("\n\n\nERROR: rosout is not built. Please run 'rosmake rosout'\n\n\n")
539 else:
540 printerrlog("ERROR: cannot launch node of type [%s/%s]: %s"%(node.package, node.type, str(e)))
541 if node.name:
542 return node.name, False
543 else:
544 return "%s/%s"%(node.package,node.type), False
545
546 self.logger.info("... created process [%s]", process.name)
547 if core:
548 self.pm.register_core_proc(process)
549 else:
550 self.pm.register(process)
551 node.process_name = process.name
552 self.logger.info("... registered process [%s]", process.name)
553
554
555 success = process.start()
556 if not success:
557 if node.machine.name:
558 printerrlog("launch of %s/%s on %s failed"%(node.package, node.type, node.machine.name))
559 else:
560 printerrlog("local launch of %s/%s failed"%(node.package, node.type))
561 else:
562 self.logger.info("... successfully launched [%s]", process.name)
563 return process, success
564
566 """
567 Check for running node process.
568 @param node Node: node object to check
569 @return bool: True if process associated with node is running (launched && !dead)
570 """
571
572 return node.process_name and self.pm.has_process(node.process_name)
573
575 """
576 Same as spin() but only does one cycle. must be run from the main thread.
577 """
578 if not self.pm:
579 return False
580 return self.pm.mainthread_spin_once()
581
583 """
584 spin() must be run from the main thread. spin() is very
585 important for roslaunch as it picks up jobs that the process
586 monitor need to be run in the main thread.
587 """
588 self.logger.info("spin")
589
590
591
592 if not self.pm or not self.pm.get_active_names():
593 printlog_bold("No processes to monitor")
594 self.stop()
595 return
596 self.pm.mainthread_spin()
597
598 self.logger.info("process monitor is done spinning, initiating full shutdown")
599 self.stop()
600 printlog_bold("done")
601
603 """
604 Stop the launch and all associated processes. not thread-safe.
605 """
606 self.logger.info("runner.stop()")
607 if self.pm is not None:
608 printlog("shutting down processing monitor...")
609 self.logger.info("shutting down processing monitor %s"%self.pm)
610 self.pm.shutdown()
611 self.pm.join()
612 self.pm = None
613 printlog("... shutting down processing monitor complete")
614 else:
615 self.logger.info("... roslaunch runner has already been stopped")
616
618 """
619 Setup the state of the ROS network, including the parameter
620 server state and core services
621 """
622
623 self.config.assign_machines()
624
625 if self.remote_runner:
626
627 self.remote_runner.add_process_listener(self.listeners)
628
629
630 self._launch_master()
631 self._launch_core_nodes()
632
633
634
635
636
637 self._launch_setup_executables()
638
639
640 if not self.is_child:
641 self._load_parameters()
642
644 """
645 Run the launch. Depending on usage, caller should call
646 spin_once or spin as appropriate after launch().
647 @return ([str], [str]): tuple containing list of nodes that
648 successfully launches and list of nodes that failed to launch
649 @rtype: ([str], [str])
650 @raise RLException: if launch fails (e.g. run_id parameter does
651 not match ID on parameter server)
652 """
653 try:
654 self._setup()
655 succeeded, failed = self._launch_nodes()
656 return succeeded, failed
657 except KeyboardInterrupt:
658 self.stop()
659 raise
660
662 """
663 Run the test node. Blocks until completion or timeout.
664 @param test: test node to run
665 @type test: Test
666 @raise RLTestTimeoutException: if test fails to launch or test times out
667 """
668 self.logger.info("... preparing to run test [%s] of type [%s/%s]", test.test_name, test.package, test.type)
669 proc_h, success = self.launch_node(test)
670 if not success:
671 raise RLException("test [%s] failed to launch"%test.test_name)
672
673
674 timeout_t = time.time() + test.time_limit
675 pm = self.pm
676 while pm.mainthread_spin_once() and self.is_node_running(test):
677
678 if time.time() > timeout_t:
679 raise RLTestTimeoutException(
680 "max time [%ss] allotted for test [%s] of type [%s/%s]" %
681 (test.time_limit, test.test_name, test.package, test.type))
682 time.sleep(0.1)
683
684
685
686
687
689 """
690 API for remote running
691 """
697 """
698 Listen to events about remote processes dying. Not
699 threadsafe. Must be called before processes started.
700 @param l: listener
701 @type l: L{ProcessListener}
702 """
703 pass
704
706 """
707 Contact each child to launch remote nodes
708 @return: succeeded, failed
709 @rtype: [str], [str]
710 """
711 pass
712