1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Integrates roslaunch remote process launching capabilities.
37 """
38
39 import logging
40 import socket
41 import sys
42 import time
43
44 import roslib.network as network
45
46 import roslaunch.config
47 import roslaunch.remoteprocess
48 from roslaunch.remoteprocess import SSHChildROSLaunchProcess
49 import roslaunch.launch
50 import roslaunch.server
51 from roslaunch.core import RLException, setup_env, is_machine_local, printerrlog
52
53 _CHILD_REGISTER_TIMEOUT = 10.0
54
56 """
57 Manages the running of remote roslaunch children
58 """
59
60 - def __init__(self, run_id, rosconfig, pm, server):
61 """
62 @param run_id: roslaunch run_id of this runner
63 @type run_id: str
64 @param config: launch configuration
65 @type config: L{ROSConfig}
66 @param pm process monitor
67 @type pm: L{ProcessMonitor}
68 @param server: roslaunch parent server
69 @type server: L{ROSLaunchParentNode}
70 """
71 self.run_id = run_id
72 self.rosconfig = rosconfig
73 self.server = server
74 self.pm = pm
75 self.logger = logging.getLogger('roslaunch.remote')
76 self.listeners = []
77
78 self.machine_list = []
79 self.remote_processes = []
80
82 """
83 Listen to events about remote processes dying. Not
84 threadsafe. Must be called before processes started.
85 @param l: ProcessListener
86 @type l: L{ProcessListener}
87 """
88 self.listeners.append(l)
89
106
108 """
109 Start the child roslaunch processes
110 """
111 server_node_uri = self.server.uri
112 if not server_node_uri:
113 raise RLException("server URI is not initialized")
114
115
116
117
118
119
120 machines = {}
121 for n in self.rosconfig.nodes:
122 if not is_machine_local(n.machine):
123 machines[n.machine.config_key()] = n.machine
124
125
126 counter = 0
127
128 procs = []
129 for m in machines:
130 p = self._start_child(server_node_uri, machines[m], counter)
131 procs.append(p)
132 counter += 1
133
134
135
136
137 start_t = time.time()
138 while True:
139 pending = []
140 for p in procs:
141 if not p.is_alive():
142 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name)
143 elif not p.uri:
144 pending.append(p.machine)
145 if not pending:
146 break
147
148 timeout_t = start_t + min([m.timeout for m in pending])
149 if time.time() > timeout_t:
150 break
151 time.sleep(0.1)
152 if pending:
153 raise RLException(
154 """The following roslaunch remote processes failed to register:
155 %s
156
157 If this is a network latency issue, you may wish to consider setting
158 <machine timeout="NUMBER OF SECONDS" ... />
159 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending]))
160
161
162 self.machine_list = machines.values()
163
164 self.remote_processes = procs
165
166
168 """
169 Utility routine for logging/recording nodes that failed
170 @param nodes: list of nodes that are assumed to have failed
171 @type nodes: [L{Node}]
172 @param failed: list of names of nodes that have failed to extend
173 @type failed: [str]
174 """
175 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes]
176 failed.extend(str_nodes)
177 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
178
180 """
181 Contact each child to launch remote nodes
182 """
183 succeeded = []
184 failed = []
185
186
187
188 self.remote_nodes = {}
189 for m in self.machine_list:
190 self.remote_nodes[m.config_key()] = []
191
192
193 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)]
194 for n in nodes:
195 self.remote_nodes[n.machine.config_key()].append(n)
196
197 for child in self.remote_processes:
198 nodes = self.remote_nodes[child.machine.config_key()]
199 body = '\n'.join([n.to_remote_xml() for n in nodes])
200 xml = '<launch>\n%s</launch>'%body
201 if 0:
202 print xml
203
204 api = child.getapi()
205
206 try:
207 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml))
208 code, msg, val = api.launch(xml)
209 if code == 1:
210 c_succ, c_fail = val
211 succeeded.extend(c_succ)
212 failed.extend(c_fail)
213 else:
214 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg))
215 self._assume_failed(nodes, failed)
216 except socket.error, (errno, msg):
217 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg)))
218 self._assume_failed(nodes, failed)
219
220 except socket.gaierror, (errno, msg):
221
222 child_host, _ = network.parse_http_host_and_port(child.uri)
223 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host))
224 self._assume_failed(nodes, failed)
225
226 except Exception, e:
227 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e)))
228 self._assume_failed(nodes, failed)
229
230 return succeeded, failed
231