1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Integrates roslaunch remote process launching capabilities.
37 """
38
39 import logging
40 import socket
41 import sys
42 import time
43
44 import roslib.network as network
45
46 import roslaunch.config
47 import roslaunch.remoteprocess
48 from roslaunch.remoteprocess import SSHChildROSLaunchProcess
49 import roslaunch.launch
50 import roslaunch.server
51 from roslaunch.core import RLException, setup_env, is_machine_local, printerrlog
52
53 _CHILD_REGISTER_TIMEOUT = 10.0
54
56 """
57 Manages the running of remote roslaunch children
58 """
59
60 - def __init__(self, run_id, rosconfig, pm, server):
61 """
62 @param run_id: roslaunch run_id of this runner
63 @type run_id: str
64 @param config: launch configuration
65 @type config: L{ROSConfig}
66 @param pm process monitor
67 @type pm: L{ProcessMonitor}
68 @param server: roslaunch parent server
69 @type server: L{ROSLaunchParentNode}
70 """
71 self.run_id = run_id
72 self.rosconfig = rosconfig
73 self.server = server
74 self.pm = pm
75 self.logger = logging.getLogger('roslaunch.remote')
76 self.listeners = []
77
78 self.machine_list = []
79 self.remote_processes = []
80
82 """
83 Listen to events about remote processes dying. Not
84 threadsafe. Must be called before processes started.
85 @param l: ProcessListener
86 @type l: L{ProcessListener}
87 """
88 self.listeners.append(l)
89
91
92
93 name = "%s-%s"%(machine.address, counter)
94
95 self.logger.info("remote[%s] starting roslaunch", name)
96 print "remote[%s] starting roslaunch"%name
97
98 env_dict = setup_env(None, machine, self.rosconfig.master.uri)
99 p = SSHChildROSLaunchProcess(self.run_id, name, server_node_uri, env_dict, machine)
100 success = p.start()
101 self.pm.register(p)
102 if not success:
103 raise RLException("unable to start remote roslaunch child: %s"%name)
104 self.server.add_child(name, p)
105 return p
106
108 """
109 Start the child roslaunch processes
110 """
111 server_node_uri = self.server.uri
112 if not server_node_uri:
113 raise RLException("server URI is not initialized")
114
115
116
117
118
119
120 machines = {}
121 for n in self.rosconfig.nodes:
122 if not is_machine_local(n.machine):
123 machines[n.machine.config_key()] = n.machine
124
125
126 counter = 0
127
128 procs = []
129 for m in machines:
130 p = self._start_child(server_node_uri, machines[m], counter)
131 procs.append(p)
132 counter += 1
133
134
135
136
137 start_t = time.time()
138 while True:
139 pending = []
140 for p in procs:
141 if not p.is_alive():
142 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name)
143 elif not p.uri:
144 pending.append(p.machine)
145 if not pending:
146 break
147
148 timeout_t = start_t + min([m.timeout for m in pending])
149 if time.time() > timeout_t:
150 break
151 time.sleep(0.1)
152 if pending:
153 raise RLException(
154 """The following roslaunch remote processes failed to register:
155 %s
156
157 If this is a network latency issue, you may wish to consider setting
158 <machine timeout="NUMBER OF SECONDS" ... />
159 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending]))
160
161
162 self.machine_list = machines.values()
163
164 self.remote_processes = procs
165
166
168 """
169 Utility routine for logging/recording nodes that failed
170 @param nodes: list of nodes that are assumed to have failed
171 @type nodes: [L{Node}]
172 @param failed: list of names of nodes that have failed to extend
173 @type failed: [str]
174 """
175 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes]
176 failed.extend(str_nodes)
177 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
178
180 """
181 Contact each child to launch remote nodes
182 """
183 succeeded = []
184 failed = []
185
186
187
188 self.remote_nodes = {}
189 for m in self.machine_list:
190 self.remote_nodes[m.config_key()] = []
191
192
193 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)]
194 for n in nodes:
195 self.remote_nodes[n.machine.config_key()].append(n)
196
197 for child in self.remote_processes:
198 nodes = self.remote_nodes[child.machine.config_key()]
199 body = '\n'.join([n.to_remote_xml() for n in nodes])
200
201 xml = '<?xml version="1.0" encoding="utf-8"?>\n<launch>\n%s</launch>'%body
202 if 0:
203 print xml
204
205 api = child.getapi()
206
207 try:
208 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml))
209 code, msg, val = api.launch(xml)
210 if code == 1:
211 c_succ, c_fail = val
212 succeeded.extend(c_succ)
213 failed.extend(c_fail)
214 else:
215 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg))
216 self._assume_failed(nodes, failed)
217 except socket.error, (errno, msg):
218 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg)))
219 self._assume_failed(nodes, failed)
220
221 except socket.gaierror, (errno, msg):
222
223 child_host, _ = network.parse_http_host_and_port(child.uri)
224 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host))
225 self._assume_failed(nodes, failed)
226
227 except Exception, e:
228 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e)))
229 self._assume_failed(nodes, failed)
230
231 return succeeded, failed
232