1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Integrates roslaunch remote process launching capabilities.
37 """
38
39 import logging
40 import socket
41 import time
42
43 import rosgraph.network as network
44
45 import roslaunch.config
46 import roslaunch.remoteprocess
47 from roslaunch.remoteprocess import SSHChildROSLaunchProcess
48 import roslaunch.launch
49 import roslaunch.server
50 from roslaunch.core import RLException, is_machine_local, printerrlog, printlog
51
52 _CHILD_REGISTER_TIMEOUT = 10.0
53
55 """
56 Manages the running of remote roslaunch children
57 """
58
59 - def __init__(self, run_id, rosconfig, pm, server):
60 """
61 :param run_id: roslaunch run_id of this runner, ``str``
62 :param config: launch configuration, ``ROSConfig``
63 :param pm process monitor, ``ProcessMonitor``
64 :param server: roslaunch parent server, ``ROSLaunchParentNode``
65 """
66 self.run_id = run_id
67 self.rosconfig = rosconfig
68 self.server = server
69 self.pm = pm
70 self.logger = logging.getLogger('roslaunch.remote')
71 self.listeners = []
72
73 self.machine_list = []
74 self.remote_processes = []
75
77 """
78 Listen to events about remote processes dying. Not
79 threadsafe. Must be called before processes started.
80
81 :param l: ProcessListener
82 """
83 self.listeners.append(l)
84
100
102 """
103 Start the child roslaunch processes
104 """
105 server_node_uri = self.server.uri
106 if not server_node_uri:
107 raise RLException("server URI is not initialized")
108
109
110
111
112
113
114 machines = {}
115 for n in self.rosconfig.nodes:
116 if not is_machine_local(n.machine):
117 machines[n.machine.config_key()] = n.machine
118
119
120 counter = 0
121
122 procs = []
123 for m in machines:
124 p = self._start_child(server_node_uri, machines[m], counter)
125 procs.append(p)
126 counter += 1
127
128
129
130
131 start_t = time.time()
132 while True:
133 pending = []
134 for p in procs:
135 if not p.is_alive():
136 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name)
137 elif not p.uri:
138 pending.append(p.machine)
139 if not pending:
140 break
141
142 timeout_t = start_t + min([m.timeout for m in pending])
143 if time.time() > timeout_t:
144 break
145 time.sleep(0.1)
146 if pending:
147 raise RLException(
148 """The following roslaunch remote processes failed to register:
149 %s
150
151 If this is a network latency issue, you may wish to consider setting
152 <machine timeout="NUMBER OF SECONDS" ... />
153 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending]))
154
155
156 self.machine_list = machines.values()
157
158 self.remote_processes = procs
159
160
162 """
163 Utility routine for logging/recording nodes that failed
164
165 :param nodes: list of nodes that are assumed to have failed, ``Node``
166 :param failed: list of names of nodes that have failed to extend, ``[str]``
167 """
168 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes]
169 failed.extend(str_nodes)
170 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
171
173 """
174 Contact each child to launch remote nodes
175 """
176 succeeded = []
177 failed = []
178
179
180
181 self.remote_nodes = {}
182 for m in self.machine_list:
183 self.remote_nodes[m.config_key()] = []
184
185
186 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)]
187 for n in nodes:
188 self.remote_nodes[n.machine.config_key()].append(n)
189
190 for child in self.remote_processes:
191 nodes = self.remote_nodes[child.machine.config_key()]
192 body = '\n'.join([n.to_remote_xml() for n in nodes])
193
194 xml = '<?xml version="1.0" encoding="utf-8"?>\n<launch>\n%s</launch>'%body
195
196 api = child.getapi()
197
198 try:
199 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml))
200 code, msg, val = api.launch(xml)
201 if code == 1:
202 c_succ, c_fail = val
203 succeeded.extend(c_succ)
204 failed.extend(c_fail)
205 else:
206 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg))
207 self._assume_failed(nodes, failed)
208 except socket.error as e:
209 errno, msg = e
210 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg)))
211 self._assume_failed(nodes, failed)
212
213 except socket.gaierror as e:
214 errno, msg = e
215
216 child_host, _ = network.parse_http_host_and_port(child.uri)
217 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host))
218 self._assume_failed(nodes, failed)
219
220 except Exception as e:
221 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e)))
222 self._assume_failed(nodes, failed)
223
224 return succeeded, failed
225