1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Process handler for launching ssh-based roslaunch child processes.
37 """
38
39 from __future__ import with_statement
40
41 import os
42 import sys
43 import socket
44 import traceback
45 import xmlrpclib
46
47 from roslaunch.core import printlog, printerrlog
48 import roslaunch.pmon
49 import roslaunch.server
50
51 import logging
52 _logger = logging.getLogger("roslaunch.remoteprocess")
53
54
55 TIMEOUT_SSH_CONNECT = 30.
56
58 """
59 Validation routine for loading the host keys and making sure that
60 they are configured properly for the desired SSH. The behavior of
61 this routine can be modified by the ROSLAUNCH_SSH_UNKNOWN
62 environment variable, which enables the paramiko.AutoAddPolicy.
63
64 @param ssh: paramiko SSH client
65 @type ssh: L{paramiko.SSHClient}
66 @param address: SSH IP address
67 @type address: str
68 @param port: SSH port
69 @type port: int
70 @param username: optional username to include in error message if check fails
71 @type username: str
72 @param logger: (optional) logger to record tracebacks to
73 @type logger: logging.Logger
74 @return: error message if improperly configured, or None
75 @rtype: str
76 """
77 import paramiko
78 try:
79 try:
80 if os.path.isfile('/etc/ssh/ssh_known_hosts'):
81 ssh.load_system_host_keys('/etc/ssh/ssh_known_hosts')
82 except IOError:
83 pass
84 ssh.load_system_host_keys()
85 except:
86 if logger:
87 logger.error(traceback.format_exc())
88
89
90
91
92 return "cannot load SSH host keys -- your known_hosts file may be corrupt"
93
94
95
96
97
98
99
100 hk = ssh._system_host_keys
101 override = os.environ.get('ROSLAUNCH_SSH_UNKNOWN', 0)
102 if override == '1':
103 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
104 elif hk.lookup(address) is None:
105 port_str = user_str = ''
106 if port != 22:
107 port_str = "-p %s "%port
108 if username:
109 user_str = username+'@'
110 return """%s is not in your SSH known_hosts file.
111
112 Please manually:
113 ssh %s%s%s
114
115 then try roslaunching again.
116
117 If you wish to configure roslaunch to automatically recognize unknown
118 hosts, please set the environment variable ROSLAUNCH_SSH_UNKNOWN=1"""%(address, user_str, port_str, address)
119
120
122 """
123 Process wrapper for launching and monitoring a child roslaunch process over SSH
124 """
125 - def __init__(self, run_id, name, server_uri, env, machine):
126 if machine.ros_root:
127 cmd = os.path.join(machine.ros_root, 'bin', 'roslaunch')
128 else:
129 cmd = 'roslaunch'
130 args = [cmd, '-c', name, '-u', server_uri, '--run_id', run_id]
131 super(SSHChildROSLaunchProcess, self).__init__(name, args, env)
132 self.machine = machine
133 self.ssh = self.sshin = self.sshout = self.ssherr = None
134 self.started = False
135 self.uri = None
136
137
138 self.is_dead = False
139
140 - def _ssh_exec(self, command, env, address, port, username=None, password=None):
141 if env:
142 env_command = "env "+' '.join(["%s=%s"%(k,v) for (k, v) in env.iteritems()])
143 command = "%s %s"%(env_command, command)
144 try:
145
146 import warnings
147 warnings.filterwarnings("ignore", message="the sha module is deprecated; use the hashlib module instead")
148 warnings.filterwarnings("ignore", message="the md5 module is deprecated; use hashlib instead")
149
150 import Crypto
151 except ImportError, e:
152 _logger.error("cannot use SSH: pycrypto is not installed")
153 return None, "pycrypto is not installed"
154 try:
155 import paramiko
156 except ImportError, e:
157 _logger.error("cannot use SSH: paramiko is not installed")
158 return None, "paramiko is not installed"
159
160 ssh = paramiko.SSHClient()
161 err_msg = ssh_check_known_hosts(ssh, address, port, username=username, logger=_logger)
162
163 if not err_msg:
164 username_str = '%s@'%username if username else ''
165 try:
166 if not password:
167 ssh.connect(address, port, username, timeout=TIMEOUT_SSH_CONNECT)
168 else:
169 ssh.connect(address, port, username, password, timeout=TIMEOUT_SSH_CONNECT)
170 except paramiko.BadHostKeyException:
171 _logger.error(traceback.format_exc())
172 err_msg = "Unable to verify host key for remote computer[%s:%s]"%(address, port)
173 except paramiko.AuthenticationException:
174 _logger.error(traceback.format_exc())
175 err_msg = "Authentication to remote computer[%s%s:%s] failed.\nA common cause of this error is a missing key in your authorized_keys file."%(username_str, address, port)
176 except paramiko.SSHException, e:
177 _logger.error(traceback.format_exc())
178 if str(e).startswith("Unknown server"):
179 pass
180 err_msg = "Unable to establish ssh connection to [%s%s:%s]: %s"%(username_str, address, port, e)
181 except socket.error, e:
182
183 if e[0] == 111:
184 err_msg = "network connection refused by [%s:%s]"%(address, port)
185 else:
186 err_msg = "network error connecting to [%s:%s]: %s"%(address, port, str(e))
187 if err_msg:
188 return None, err_msg
189 else:
190 sshin, sshout, ssherr = ssh.exec_command(command)
191 return (ssh, sshin, sshout, ssherr), "executed remotely"
192
194 """
195 Start the remote process. This will create an SSH connection
196 to the remote host.
197 """
198 self.started = False
199 self.ssh = self.sshin = self.sshout = self.ssherr = None
200 try:
201 self.lock.acquire()
202 name = self.name
203 m = self.machine
204 if m.user is not None:
205 printlog("remote[%s]: creating ssh connection to %s:%s, user[%s]"%(name, m.address, m.ssh_port, m.user))
206 else:
207 printlog("remote[%s]: creating ssh connection to %s:%s"%(name, m.address, m.ssh_port))
208 _logger.info("remote[%s]: invoking with ssh exec args [%s], env: %s"%(name, ' '.join(self.args), self.env))
209 sshvals, msg = self._ssh_exec(' '.join(self.args), self.env, m.address, m.ssh_port, m.user, m.password)
210 if sshvals is None:
211 printerrlog("remote[%s]: failed to launch on %s:\n\n%s\n\n"%(name, m.name, msg))
212 return False
213 self.ssh, self.sshin, self.sshout, self.ssherr = sshvals
214 printlog("remote[%s]: ssh connection created"%name)
215 self.started = True
216 return True
217 finally:
218 self.lock.release()
219
221 """
222 @return: ServerProxy to remote client XMLRPC server
223 @rtype: L{ServerProxy}
224 """
225 if self.uri:
226 return xmlrpclib.ServerProxy(self.uri)
227 else:
228 return None
229
231 """
232 @return: True if the process is alive. is_alive needs to be
233 called periodically as it drains the SSH buffer
234 @rtype: bool
235 """
236 if self.started and not self.ssh:
237 return False
238 elif not self.started:
239 return True
240 s = self.ssherr
241 s.channel.settimeout(0)
242 try:
243
244 data = s.read(2048)
245 if not len(data):
246 self.is_dead = True
247 return False
248
249
250 data = data.decode('utf-8')
251 printerrlog("remote[%s]: %s"%(self.name, data))
252 except socket.timeout:
253 pass
254 except IOError:
255 return False
256 except UnicodeDecodeError:
257
258
259 pass
260
261 s = self.sshout
262 s.channel.settimeout(0)
263 try:
264
265
266 data = s.read(2048)
267 if not len(data):
268 self.is_dead = True
269 return False
270
271
272 except socket.timeout:
273 pass
274 except IOError:
275 return False
276 return True
277
278 - def stop(self, errors=[]):
279 """
280 Terminate this process, including the SSH connection.
281 """
282 try:
283 self.lock.acquire()
284 if not self.ssh:
285 return
286
287
288
289 try:
290 api = self.getapi()
291 if api is not None:
292
293 api.shutdown()
294 except socket.error:
295
296 address, port = self.machine.address, self.machine.ssh_port
297 if not self.is_dead:
298 printerrlog("remote[%s]: unable to contact [%s] to shutdown remote processes!"%(self.name, address))
299 else:
300 printlog("remote[%s]: unable to contact [%s] to shutdown cleanly. The remote roslaunch may have exited already."%(self.name, address))
301 except:
302
303
304 traceback.print_exc()
305
306 _logger.info("remote[%s]: closing ssh connection", self.name)
307 self.sshin.close()
308 self.sshout.close()
309 self.ssherr.close()
310 self.ssh.close()
311
312 self.sshin = None
313 self.sshout = None
314 self.ssherr = None
315 self.ssh = None
316 _logger.info("remote[%s]: ssh connection closed", self.name)
317 finally:
318 self.lock.release()
319