1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Integrates roslaunch remote process launching capabilities.
37 """
38
39 import logging
40 import socket
41 import time
42
43 import rosgraph.network as network
44
45 import roslaunch.config
46 import roslaunch.remoteprocess
47 from roslaunch.remoteprocess import SSHChildROSLaunchProcess
48 import roslaunch.launch
49 import roslaunch.server
50 from roslaunch.core import RLException, is_machine_local, printerrlog, printlog
51 from roslaunch.nodeprocess import DEFAULT_TIMEOUT_SIGINT, DEFAULT_TIMEOUT_SIGTERM
52
53 _CHILD_REGISTER_TIMEOUT = 10.0
54
56 """
57 Manages the running of remote roslaunch children
58 """
59
61 """
62 :param run_id: roslaunch run_id of this runner, ``str``
63 :param config: launch configuration, ``ROSConfig``
64 :param pm process monitor, ``ProcessMonitor``
65 :param server: roslaunch parent server, ``ROSLaunchParentNode``
66 :param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds).
67 :type sigint_timeout: float
68 :param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds).
69 :type sigterm_timeout: float
70 """
71 self.run_id = run_id
72 self.rosconfig = rosconfig
73 self.server = server
74 self.pm = pm
75 self.sigint_timeout = sigint_timeout
76 self.sigterm_timeout = sigterm_timeout
77 self.logger = logging.getLogger('roslaunch.remote')
78 self.listeners = []
79
80 self.machine_list = []
81 self.remote_processes = []
82
84 """
85 Listen to events about remote processes dying. Not
86 threadsafe. Must be called before processes started.
87
88 :param l: ProcessListener
89 """
90 self.listeners.append(l)
91
108
110 """
111 Start the child roslaunch processes
112 """
113 server_node_uri = self.server.uri
114 if not server_node_uri:
115 raise RLException("server URI is not initialized")
116
117
118
119
120
121
122 machines = {}
123 for n in self.rosconfig.nodes:
124 if not is_machine_local(n.machine):
125 machines[n.machine.config_key()] = n.machine
126
127
128 counter = 0
129
130 procs = []
131 for m in machines:
132 p = self._start_child(server_node_uri, machines[m], counter)
133 procs.append(p)
134 counter += 1
135
136
137
138
139 start_t = time.time()
140 while True:
141 pending = []
142 for p in procs:
143 if not p.is_alive():
144 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name)
145 elif not p.uri:
146 pending.append(p.machine)
147 if not pending:
148 break
149
150 timeout_t = start_t + min([m.timeout for m in pending])
151 if time.time() > timeout_t:
152 break
153 time.sleep(0.1)
154 if pending:
155 raise RLException(
156 """The following roslaunch remote processes failed to register:
157 %s
158
159 If this is a network latency issue, you may wish to consider setting
160 <machine timeout="NUMBER OF SECONDS" ... />
161 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending]))
162
163
164 self.machine_list = machines.values()
165
166 self.remote_processes = procs
167
168
170 """
171 Utility routine for logging/recording nodes that failed
172
173 :param nodes: list of nodes that are assumed to have failed, ``Node``
174 :param failed: list of names of nodes that have failed to extend, ``[str]``
175 """
176 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes]
177 failed.extend(str_nodes)
178 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
179
181 """
182 Contact each child to launch remote nodes
183 """
184 succeeded = []
185 failed = []
186
187
188
189 self.remote_nodes = {}
190 for m in self.machine_list:
191 self.remote_nodes[m.config_key()] = []
192
193
194 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)]
195 for n in nodes:
196 self.remote_nodes[n.machine.config_key()].append(n)
197
198 for child in self.remote_processes:
199 nodes = self.remote_nodes[child.machine.config_key()]
200 body = '\n'.join([n.to_remote_xml() for n in nodes])
201
202 xml = '<?xml version="1.0" encoding="utf-8"?>\n<launch>\n%s</launch>'%body
203
204 api = child.getapi()
205
206 try:
207 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml))
208 code, msg, val = api.launch(xml)
209 if code == 1:
210 c_succ, c_fail = val
211 succeeded.extend(c_succ)
212 failed.extend(c_fail)
213 else:
214 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg))
215 self._assume_failed(nodes, failed)
216 except socket.error as e:
217 errno, msg = e
218 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg)))
219 self._assume_failed(nodes, failed)
220
221 except socket.gaierror as e:
222 errno, msg = e
223
224 child_host, _ = network.parse_http_host_and_port(child.uri)
225 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host))
226 self._assume_failed(nodes, failed)
227
228 except Exception as e:
229 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e)))
230 self._assume_failed(nodes, failed)
231
232 return succeeded, failed
233