1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Integrates roslaunch remote process launching capabilities.
37 """
38
39 import logging
40 import socket
41 import sys
42 import time
43
44 import roslib.network as network
45
46 import roslaunch.config
47 import roslaunch.remoteprocess
48 from roslaunch.remoteprocess import SSHChildROSLaunchProcess
49 import roslaunch.runner
50 import roslaunch.server
51 from roslaunch.core import RLException, setup_env, is_machine_local, printerrlog
52
53
54 from rosgraph_msgs.msg import Log
55
56 _CHILD_REGISTER_TIMEOUT = 10.0
57
58
60 """
61 Manages the running of remote roslaunch children
62 """
63
64
65 - def __init__(self, run_id, rosconfig, pm, server):
66 """
67 @param run_id: roslaunch run_id of this runner
68 @type run_id: str
69 @param config: launch configuration
70 @type config: L{ROSConfig}
71 @param pm process monitor
72 @type pm: L{ProcessMonitor}
73 @param server: roslaunch parent server
74 @type server: L{ROSLaunchParentNode}
75 """
76 self.run_id = run_id
77 self.rosconfig = rosconfig
78 self.server = server
79 self.pm = pm
80 self.logger = logging.getLogger('roslaunch.remote')
81 self.listeners = []
82
83 self.machine_list = []
84 self.remote_processes = []
85
87 """
88 Listen to events about remote processes dying. Not
89 threadsafe. Must be called before processes started.
90 @param l: ProcessListener
91 @type l: L{ProcessListener}
92 """
93 self.listeners.append(l)
94
96
97
98 name = "%s-%s"%(machine.address, counter)
99
100 self.logger.info("remote[%s] starting roslaunch", name)
101 print "remote[%s] starting roslaunch"%name
102
103 env_dict = setup_env(None, machine, self.rosconfig.master.uri)
104 p = SSHChildROSLaunchProcess(self.run_id, name, server_node_uri, env_dict, machine)
105 success = p.start()
106 self.pm.register(p)
107 if not success:
108 raise RLException("unable to start remote roslaunch child: %s"%name)
109 self.server.add_child(name, p)
110 return p
111
113 """
114 Start the child roslaunch processes
115 """
116 server_node_uri = self.server.uri
117 if not server_node_uri:
118 raise RLException("server URI is not initialized")
119
120
121
122
123
124
125 machines = {}
126 for n in self.rosconfig.nodes:
127 if not is_machine_local(n.machine):
128 machines[n.machine.config_key()] = n.machine
129
130
131 counter = 0
132
133 procs = []
134 for m in machines:
135 p = self._start_child(server_node_uri, machines[m], counter)
136 procs.append(p)
137 counter += 1
138
139
140
141
142 start_t = time.time()
143 while True:
144 pending = []
145 for p in procs:
146 if not p.is_alive():
147 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name)
148 elif not p.uri:
149 pending.append(p.machine)
150 if not pending:
151 break
152
153 timeout_t = start_t + min([m.timeout for m in pending])
154 if time.time() > timeout_t:
155 break
156 time.sleep(0.1)
157 if pending:
158 raise RLException(
159 """The following roslaunch remote processes failed to register:
160 %s
161
162 If this is a network latency issue, you may wish to consider setting
163 <machine timeout="NUMBER OF SECONDS" ... />
164 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending]))
165
166
167 self.machine_list = machines.values()
168
169 self.remote_processes = procs
170
171
173 """
174 Utility routine for logging/recording nodes that failed
175 @param nodes: list of nodes that are assumed to have failed
176 @type nodes: [L{Node}]
177 @param failed: list of names of nodes that have failed to extend
178 @type failed: [str]
179 """
180 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes]
181 failed.extend(str_nodes)
182 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
183
185 """
186 Contact each child to launch remote nodes
187 """
188 succeeded = []
189 failed = []
190
191
192
193 self.remote_nodes = {}
194 for m in self.machine_list:
195 self.remote_nodes[m.config_key()] = []
196
197
198 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)]
199 for n in nodes:
200 self.remote_nodes[n.machine.config_key()].append(n)
201
202 for child in self.remote_processes:
203 nodes = self.remote_nodes[child.machine.config_key()]
204 body = '\n'.join([n.to_remote_xml() for n in nodes])
205 xml = '<launch>\n%s</launch>'%body
206 if 0:
207 print xml
208
209 api = child.getapi()
210
211 try:
212 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml))
213 code, msg, val = api.launch(xml)
214 if code == 1:
215 c_succ, c_fail = val
216 succeeded.extend(c_succ)
217 failed.extend(c_fail)
218 else:
219 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg))
220 self._assume_failed(nodes, failed)
221 except socket.error, (errno, msg):
222 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg)))
223 self._assume_failed(nodes, failed)
224
225 except socket.gaierror, (errno, msg):
226
227 child_host, _ = network.parse_http_host_and_port(child.uri)
228 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host))
229 self._assume_failed(nodes, failed)
230
231 except Exception, e:
232 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e)))
233 self._assume_failed(nodes, failed)
234
235 return succeeded, failed
236