1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Process handler for launching ssh-based roslaunch child processes.
37 """
38
39 import os
40 import socket
41 import traceback
42 try:
43 from xmlrpc.client import ServerProxy
44 except ImportError:
45 from xmlrpclib import ServerProxy
46
47 import rosgraph
48 from roslaunch.core import printlog, printerrlog
49 import roslaunch.pmon
50 import roslaunch.server
51 from roslaunch.nodeprocess import DEFAULT_TIMEOUT_SIGINT, DEFAULT_TIMEOUT_SIGTERM
52
53 import logging
54 _logger = logging.getLogger("roslaunch.remoteprocess")
55
56
57 TIMEOUT_SSH_CONNECT = 30.
58
60 """
61 Validation routine for loading the host keys and making sure that
62 they are configured properly for the desired SSH. The behavior of
63 this routine can be modified by the ROSLAUNCH_SSH_UNKNOWN
64 environment variable, which enables the paramiko.AutoAddPolicy.
65
66 :param ssh: paramiko SSH client, :class:`paramiko.SSHClient`
67 :param address: SSH IP address, ``str``
68 :param port: SSH port, ``int``
69 :param username: optional username to include in error message if check fails, ``str``
70 :param logger: (optional) logger to record tracebacks to, :class:`logging.Logger`
71 :returns: error message if improperly configured, or ``None``. ``str``
72 """
73 import paramiko
74 try:
75 try:
76 if os.path.isfile('/etc/ssh/ssh_known_hosts'):
77 ssh.load_system_host_keys('/etc/ssh/ssh_known_hosts')
78 except IOError:
79 pass
80 ssh.load_system_host_keys()
81 except:
82 if logger:
83 logger.error(traceback.format_exc())
84
85
86
87
88 return "cannot load SSH host keys -- your known_hosts file may be corrupt"
89
90
91 ssh_config = paramiko.SSHConfig()
92 try:
93 with open(os.path.join(os.path.expanduser('~'), '.ssh', 'config')) as f:
94 ssh_config.parse(f)
95 config_lookup = ssh_config.lookup(address)
96 resolved_address = config_lookup['hostname'] if 'hostname' in config_lookup else address
97 except:
98 resolved_address = address
99
100
101
102
103
104
105
106 hk = ssh._system_host_keys
107 override = os.environ.get('ROSLAUNCH_SSH_UNKNOWN', 0)
108 if override == '1':
109 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
110 elif hk.lookup(resolved_address) is None:
111 port_str = user_str = ''
112 if port != 22:
113 port_str = "-p %s "%port
114 if username:
115 user_str = username+'@'
116 return """%s is not in your SSH known_hosts file.
117
118 Please manually:
119 ssh %s%s%s
120
121 then try roslaunching again.
122
123 If you wish to configure roslaunch to automatically recognize unknown
124 hosts, please set the environment variable ROSLAUNCH_SSH_UNKNOWN=1"""%(resolved_address, port_str, user_str, resolved_address)
125
127 """
128 Process wrapper for launching and monitoring a child roslaunch process over SSH
129 """
131 """
132 :param machine: Machine instance. Must be fully configured.
133 machine.env_loader is required to be set.
134 :param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds).
135 :type sigint_timeout: float
136 :param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds).
137 :type sigterm_timeout: float
138 """
139 if not machine.env_loader:
140 raise ValueError("machine.env_loader must have been assigned before creating ssh child instance")
141 args = [machine.env_loader, 'roslaunch', '-c', name, '-u', server_uri, '--run_id', run_id,
142 '--sigint-timeout', str(sigint_timeout), '--sigterm-timeout', str(sigterm_timeout)]
143
144 super(SSHChildROSLaunchProcess, self).__init__(name, args, {})
145 self.machine = machine
146 self.master_uri = master_uri
147 self.sigint_timeout = sigint_timeout
148 self.sigterm_timeout = sigterm_timeout
149 self.ssh = self.sshin = self.sshout = self.ssherr = None
150 self.started = False
151 self.uri = None
152
153
154 self.is_dead = False
155
156 - def _ssh_exec(self, command, address, port, username=None, password=None):
157 """
158 :returns: (ssh pipes, message). If error occurs, returns (None, error message).
159 """
160 if self.master_uri:
161 env_command = 'env %s=%s' % (rosgraph.ROS_MASTER_URI, self.master_uri)
162 command = '%s %s' % (env_command, command)
163 try:
164 import paramiko
165 except ImportError as e:
166 _logger.error("cannot use SSH: paramiko is not installed")
167 return None, "paramiko is not installed"
168
169 config_block = {'hostname': None, 'user': None, 'identityfile': None}
170 ssh_config = paramiko.SSHConfig()
171 try:
172 with open(os.path.join(os.path.expanduser('~'), '.ssh','config')) as f:
173 ssh_config.parse(f)
174 config_block.update(ssh_config.lookup(address))
175 except:
176 pass
177 address = config_block['hostname'] or address
178 username = username or config_block['user']
179 identity_file = None
180 if config_block.get('identityfile', None):
181 if isinstance(config_block['identityfile'], list):
182 identity_file = [os.path.expanduser(f) for f in config_block['identityfile']]
183 else:
184 identity_file = os.path.expanduser(config_block['identityfile'])
185
186 ssh = paramiko.SSHClient()
187 err_msg = ssh_check_known_hosts(ssh, address, port, username=username, logger=_logger)
188
189 if not err_msg:
190 username_str = '%s@'%username if username else ''
191 try:
192 if password is None:
193 ssh.connect(address, port, username, timeout=TIMEOUT_SSH_CONNECT, key_filename=identity_file)
194 else:
195 ssh.connect(address, port, username, password, timeout=TIMEOUT_SSH_CONNECT)
196 except paramiko.BadHostKeyException:
197 _logger.error(traceback.format_exc())
198 err_msg = "Unable to verify host key for remote computer[%s:%s]"%(address, port)
199 except paramiko.AuthenticationException:
200 _logger.error(traceback.format_exc())
201 err_msg = "Authentication to remote computer[%s%s:%s] failed.\nA common cause of this error is a missing key in your authorized_keys file."%(username_str, address, port)
202 except paramiko.SSHException as e:
203 _logger.error(traceback.format_exc())
204 if str(e).startswith("Unknown server"):
205 pass
206 err_msg = "Unable to establish ssh connection to [%s%s:%s]: %s"%(username_str, address, port, e)
207 except socket.error as e:
208
209 if e.args[0] == 111:
210 err_msg = "network connection refused by [%s:%s]"%(address, port)
211 else:
212 err_msg = "network error connecting to [%s:%s]: %s"%(address, port, str(e))
213 if err_msg:
214 return None, err_msg
215 else:
216 printlog("launching remote roslaunch child with command: [%s]"%(str(command)))
217 sshin, sshout, ssherr = ssh.exec_command(command)
218 return (ssh, sshin, sshout, ssherr), "executed remotely"
219
221 """
222 Start the remote process. This will create an SSH connection
223 to the remote host.
224 """
225 self.started = False
226 self.ssh = self.sshin = self.sshout = self.ssherr = None
227 with self.lock:
228 name = self.name
229 m = self.machine
230 if m.user is not None:
231 printlog("remote[%s]: creating ssh connection to %s:%s, user[%s]"%(name, m.address, m.ssh_port, m.user))
232 else:
233 printlog("remote[%s]: creating ssh connection to %s:%s"%(name, m.address, m.ssh_port))
234 _logger.info("remote[%s]: invoking with ssh exec args [%s]"%(name, ' '.join(self.args)))
235 sshvals, msg = self._ssh_exec(' '.join(self.args), m.address, m.ssh_port, m.user, m.password)
236 if sshvals is None:
237 printerrlog("remote[%s]: failed to launch on %s:\n\n%s\n\n"%(name, m.name, msg))
238 return False
239 self.ssh, self.sshin, self.sshout, self.ssherr = sshvals
240 printlog("remote[%s]: ssh connection created"%name)
241 self.started = True
242 return True
243
245 """
246 :returns: ServerProxy to remote client XMLRPC server, `ServerProxy`
247 """
248 if self.uri:
249 return ServerProxy(self.uri)
250 else:
251 return None
252
254 """
255 :returns: ``True`` if the process is alive. is_alive needs to be
256 called periodically as it drains the SSH buffer, ``bool``
257 """
258 if self.started and not self.ssh:
259 return False
260 elif not self.started:
261 return True
262 s = self.ssherr
263 s.channel.settimeout(0)
264 try:
265
266 data = s.read(2048)
267 if not len(data):
268 self.is_dead = True
269 return False
270
271
272 data = data.decode('utf-8')
273 printerrlog("remote[%s]: %s"%(self.name, data))
274 except socket.timeout:
275 pass
276 except IOError:
277 return False
278 except UnicodeDecodeError:
279
280
281 pass
282
283 s = self.sshout
284 s.channel.settimeout(0)
285 try:
286
287
288 data = s.read(2048)
289 if not len(data):
290 self.is_dead = True
291 return False
292 except socket.timeout:
293 pass
294 except IOError:
295 return False
296 return True
297
298 - def stop(self, errors=None):
299 """
300 Terminate this process, including the SSH connection.
301 """
302 if errors is None:
303 errors = []
304 with self.lock:
305 if not self.ssh:
306 return
307
308
309
310 try:
311 api = self.getapi()
312 if api is not None:
313
314 api.shutdown()
315 except socket.error:
316
317 address, port = self.machine.address, self.machine.ssh_port
318 if not self.is_dead:
319 printerrlog("remote[%s]: unable to contact [%s] to shutdown remote processes!"%(self.name, address))
320 else:
321 printlog("remote[%s]: unable to contact [%s] to shutdown cleanly. The remote roslaunch may have exited already."%(self.name, address))
322 except:
323
324
325 traceback.print_exc()
326
327 _logger.info("remote[%s]: closing ssh connection", self.name)
328 self.sshin.close()
329 self.sshout.close()
330 self.ssherr.close()
331 self.ssh.close()
332
333 self.sshin = None
334 self.sshout = None
335 self.ssherr = None
336 self.ssh = None
337 _logger.info("remote[%s]: ssh connection closed", self.name)
338