Package roslaunch :: Module remoteprocess
[frames] | no frames]

Source Code for Module roslaunch.remoteprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Process handler for launching ssh-based roslaunch child processes. 
 37  """ 
 38   
 39  import os 
 40  import socket 
 41  import traceback 
 42  try: 
 43      from xmlrpc.client import ServerProxy 
 44  except ImportError: 
 45      from xmlrpclib import ServerProxy 
 46   
 47  import rosgraph 
 48  from roslaunch.core import printlog, printerrlog 
 49  import roslaunch.pmon 
 50  import roslaunch.server 
 51  from roslaunch.nodeprocess import DEFAULT_TIMEOUT_SIGINT, DEFAULT_TIMEOUT_SIGTERM 
 52   
 53  import logging 
 54  _logger = logging.getLogger("roslaunch.remoteprocess") 
 55   
 56  # #1975 timeout for creating ssh connections 
 57  TIMEOUT_SSH_CONNECT = 30. 
 58   
59 -def ssh_check_known_hosts(ssh, address, port, username=None, logger=None):
60 """ 61 Validation routine for loading the host keys and making sure that 62 they are configured properly for the desired SSH. The behavior of 63 this routine can be modified by the ROSLAUNCH_SSH_UNKNOWN 64 environment variable, which enables the paramiko.AutoAddPolicy. 65 66 :param ssh: paramiko SSH client, :class:`paramiko.SSHClient` 67 :param address: SSH IP address, ``str`` 68 :param port: SSH port, ``int`` 69 :param username: optional username to include in error message if check fails, ``str`` 70 :param logger: (optional) logger to record tracebacks to, :class:`logging.Logger` 71 :returns: error message if improperly configured, or ``None``. ``str`` 72 """ 73 import paramiko 74 try: 75 try: 76 if os.path.isfile('/etc/ssh/ssh_known_hosts'): #default ubuntu location 77 ssh.load_system_host_keys('/etc/ssh/ssh_known_hosts') 78 except IOError: 79 pass 80 ssh.load_system_host_keys() #default user location 81 except: 82 if logger: 83 logger.error(traceback.format_exc()) 84 # as seen in #767, base64 raises generic Error. 85 # 86 # A corrupt pycrypto build can also cause this, though 87 # the cause of the corrupt builds has been fixed. 88 return "cannot load SSH host keys -- your known_hosts file may be corrupt" 89 90 # #3158: resolve the actual host using the user's ~/.ssh/config 91 ssh_config = paramiko.SSHConfig() 92 try: 93 with open(os.path.join(os.path.expanduser('~'), '.ssh', 'config')) as f: 94 ssh_config.parse(f) 95 config_lookup = ssh_config.lookup(address) 96 resolved_address = config_lookup['hostname'] if 'hostname' in config_lookup else address 97 except: 98 resolved_address = address 99 100 # #1849: paramiko will raise an SSHException with an 'Unknown 101 # server' message if the address is not in the known_hosts 102 # file. This causes a lot of confusion to users, so we try 103 # and diagnose this in advance and offer better guidance 104 105 # - ssh.get_host_keys() does not return the system host keys 106 hk = ssh._system_host_keys 107 override = os.environ.get('ROSLAUNCH_SSH_UNKNOWN', 0) 108 if override == '1': 109 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 110 elif hk.lookup(resolved_address) is None: 111 port_str = user_str = '' 112 if port != 22: 113 port_str = "-p %s "%port 114 if username: 115 user_str = username+'@' 116 return """%s is not in your SSH known_hosts file. 117 118 Please manually: 119 ssh %s%s%s 120 121 then try roslaunching again. 122 123 If you wish to configure roslaunch to automatically recognize unknown 124 hosts, please set the environment variable ROSLAUNCH_SSH_UNKNOWN=1"""%(resolved_address, port_str, user_str, resolved_address)
125
126 -class SSHChildROSLaunchProcess(roslaunch.server.ChildROSLaunchProcess):
127 """ 128 Process wrapper for launching and monitoring a child roslaunch process over SSH 129 """
130 - def __init__(self, run_id, name, server_uri, machine, master_uri=None, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
131 """ 132 :param machine: Machine instance. Must be fully configured. 133 machine.env_loader is required to be set. 134 :param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds). 135 :type sigint_timeout: float 136 :param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds). 137 :type sigterm_timeout: float 138 """ 139 if not machine.env_loader: 140 raise ValueError("machine.env_loader must have been assigned before creating ssh child instance") 141 args = [machine.env_loader, 'roslaunch', '-c', name, '-u', server_uri, '--run_id', run_id, 142 '--sigint-timeout', str(sigint_timeout), '--sigterm-timeout', str(sigterm_timeout)] 143 # env is always empty dict because we only use env_loader 144 super(SSHChildROSLaunchProcess, self).__init__(name, args, {}) 145 self.machine = machine 146 self.master_uri = master_uri 147 self.sigint_timeout = sigint_timeout 148 self.sigterm_timeout = sigterm_timeout 149 self.ssh = self.sshin = self.sshout = self.ssherr = None 150 self.started = False 151 self.uri = None 152 # self.is_dead is a flag set by is_alive that affects whether or not we 153 # log errors during a stop(). 154 self.is_dead = False
155
156 - def _ssh_exec(self, command, address, port, username=None, password=None):
157 """ 158 :returns: (ssh pipes, message). If error occurs, returns (None, error message). 159 """ 160 if self.master_uri: 161 env_command = 'env %s=%s' % (rosgraph.ROS_MASTER_URI, self.master_uri) 162 command = '%s %s' % (env_command, command) 163 try: 164 import paramiko 165 except ImportError as e: 166 _logger.error("cannot use SSH: paramiko is not installed") 167 return None, "paramiko is not installed" 168 #load user's ssh configuration 169 config_block = {'hostname': None, 'user': None, 'identityfile': None} 170 ssh_config = paramiko.SSHConfig() 171 try: 172 with open(os.path.join(os.path.expanduser('~'), '.ssh','config')) as f: 173 ssh_config.parse(f) 174 config_block.update(ssh_config.lookup(address)) 175 except: 176 pass 177 address = config_block['hostname'] or address 178 username = username or config_block['user'] 179 identity_file = None 180 if config_block.get('identityfile', None): 181 if isinstance(config_block['identityfile'], list): 182 identity_file = [os.path.expanduser(f) for f in config_block['identityfile']] 183 else: 184 identity_file = os.path.expanduser(config_block['identityfile']) 185 #load ssh client and connect 186 ssh = paramiko.SSHClient() 187 err_msg = ssh_check_known_hosts(ssh, address, port, username=username, logger=_logger) 188 189 if not err_msg: 190 username_str = '%s@'%username if username else '' 191 try: 192 if password is None: #use SSH agent 193 ssh.connect(address, port, username, timeout=TIMEOUT_SSH_CONNECT, key_filename=identity_file) 194 else: #use SSH with login/pass 195 ssh.connect(address, port, username, password, timeout=TIMEOUT_SSH_CONNECT) 196 except paramiko.BadHostKeyException: 197 _logger.error(traceback.format_exc()) 198 err_msg = "Unable to verify host key for remote computer[%s:%s]"%(address, port) 199 except paramiko.AuthenticationException: 200 _logger.error(traceback.format_exc()) 201 err_msg = "Authentication to remote computer[%s%s:%s] failed.\nA common cause of this error is a missing key in your authorized_keys file."%(username_str, address, port) 202 except paramiko.SSHException as e: 203 _logger.error(traceback.format_exc()) 204 if str(e).startswith("Unknown server"): 205 pass 206 err_msg = "Unable to establish ssh connection to [%s%s:%s]: %s"%(username_str, address, port, e) 207 except socket.error as e: 208 # #1824 209 if e.args[0] == 111: 210 err_msg = "network connection refused by [%s:%s]"%(address, port) 211 else: 212 err_msg = "network error connecting to [%s:%s]: %s"%(address, port, str(e)) 213 if err_msg: 214 return None, err_msg 215 else: 216 printlog("launching remote roslaunch child with command: [%s]"%(str(command))) 217 sshin, sshout, ssherr = ssh.exec_command(command) 218 return (ssh, sshin, sshout, ssherr), "executed remotely"
219
220 - def start(self):
221 """ 222 Start the remote process. This will create an SSH connection 223 to the remote host. 224 """ 225 self.started = False #won't set to True until we are finished 226 self.ssh = self.sshin = self.sshout = self.ssherr = None 227 with self.lock: 228 name = self.name 229 m = self.machine 230 if m.user is not None: 231 printlog("remote[%s]: creating ssh connection to %s:%s, user[%s]"%(name, m.address, m.ssh_port, m.user)) 232 else: 233 printlog("remote[%s]: creating ssh connection to %s:%s"%(name, m.address, m.ssh_port)) 234 _logger.info("remote[%s]: invoking with ssh exec args [%s]"%(name, ' '.join(self.args))) 235 sshvals, msg = self._ssh_exec(' '.join(self.args), m.address, m.ssh_port, m.user, m.password) 236 if sshvals is None: 237 printerrlog("remote[%s]: failed to launch on %s:\n\n%s\n\n"%(name, m.name, msg)) 238 return False 239 self.ssh, self.sshin, self.sshout, self.ssherr = sshvals 240 printlog("remote[%s]: ssh connection created"%name) 241 self.started = True 242 return True
243
244 - def getapi(self):
245 """ 246 :returns: ServerProxy to remote client XMLRPC server, `ServerProxy` 247 """ 248 if self.uri: 249 return ServerProxy(self.uri) 250 else: 251 return None
252
253 - def is_alive(self):
254 """ 255 :returns: ``True`` if the process is alive. is_alive needs to be 256 called periodically as it drains the SSH buffer, ``bool`` 257 """ 258 if self.started and not self.ssh: 259 return False 260 elif not self.started: 261 return True #not started is equivalent to alive in our logic 262 s = self.ssherr 263 s.channel.settimeout(0) 264 try: 265 #drain the pipes 266 data = s.read(2048) 267 if not len(data): 268 self.is_dead = True 269 return False 270 # #2012 il8n: ssh *should* be UTF-8, but often isn't 271 # (e.g. Japan) 272 data = data.decode('utf-8') 273 printerrlog("remote[%s]: %s"%(self.name, data)) 274 except socket.timeout: 275 pass 276 except IOError: 277 return False 278 except UnicodeDecodeError: 279 # #2012: soft fail, printing is not essential. This occurs 280 # with older machines that don't send utf-8 over ssh 281 pass 282 283 s = self.sshout 284 s.channel.settimeout(0) 285 try: 286 #drain the pipes 287 #TODO: write to log file 288 data = s.read(2048) 289 if not len(data): 290 self.is_dead = True 291 return False 292 except socket.timeout: 293 pass 294 except IOError: 295 return False 296 return True
297
298 - def stop(self, errors=None):
299 """ 300 Terminate this process, including the SSH connection. 301 """ 302 if errors is None: 303 errors = [] 304 with self.lock: 305 if not self.ssh: 306 return 307 308 # call the shutdown API first as closing the SSH connection 309 # won't actually kill the process unless it triggers SIGPIPE 310 try: 311 api = self.getapi() 312 if api is not None: 313 #TODO: probably need a timeout on this 314 api.shutdown() 315 except socket.error: 316 # normal if process is already dead 317 address, port = self.machine.address, self.machine.ssh_port 318 if not self.is_dead: 319 printerrlog("remote[%s]: unable to contact [%s] to shutdown remote processes!"%(self.name, address)) 320 else: 321 printlog("remote[%s]: unable to contact [%s] to shutdown cleanly. The remote roslaunch may have exited already."%(self.name, address)) 322 except: 323 # temporary: don't really want to log here as this 324 # may occur during shutdown 325 traceback.print_exc() 326 327 _logger.info("remote[%s]: closing ssh connection", self.name) 328 self.sshin.close() 329 self.sshout.close() 330 self.ssherr.close() 331 self.ssh.close() 332 333 self.sshin = None 334 self.sshout = None 335 self.ssherr = None 336 self.ssh = None 337 _logger.info("remote[%s]: ssh connection closed", self.name)
338