Package node_manager_fkie :: Module start_handler
[frames] | no frames]

Source Code for Module node_manager_fkie.start_handler

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Fraunhofer nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  import os 
  34  import roslib 
  35  import rospy 
  36  import shlex 
  37  import signal 
  38  import socket 
  39  import time 
  40  import types 
  41  import xmlrpclib 
  42   
  43  from master_discovery_fkie.common import get_hostname, get_port 
  44  from node_manager_fkie.common import get_ros_home, masteruri_from_ros, package_name, utf8 
  45  from node_manager_fkie.name_resolution import NameResolution 
  46  from node_manager_fkie.supervised_popen import SupervisedPopen 
  47   
  48  import node_manager_fkie as nm 
49 50 51 -class StartException(Exception):
52 pass
53
54 55 -class BinarySelectionRequest(Exception):
56 ''' ''' 57
58 - def __init__(self, choices, error):
59 Exception.__init__(self) 60 self.choices = choices 61 self.error = error
62
63 - def __str__(self):
64 return "BinarySelectionRequest from " + self.choices + "::" + repr(self.error)
65 66 67 CACHED_PKG_PATH = dict() # {host : {pkg: path}}
68 69 70 -class AdvRunCfg(object):
71
72 - def __init__(self, node, roslaunch_config, force2host=None, masteruri=None, executable=None, logging=None, auto_pw_request=False, user=None, pw=None):
73 ''' 74 @param force2host: start the node on given host. 75 @type force2host: str 76 @param masteruri: force the masteruri. 77 @type masteruri: str 78 ''' 79 self.node = node 80 self.roslaunch_config = roslaunch_config 81 self.force2host = force2host 82 self.masteruri = masteruri 83 self.logging = logging 84 self.executable = executable 85 self.auto_pw_request = auto_pw_request 86 self.user = user 87 self.pw = pw
88
89 90 -class StartHandler(object):
91 ''' 92 This class contains the methods to run the nodes on local and remote machines 93 in a screen terminal. 94 ''' 95
96 - def __init__(self):
97 pass
98 99 @classmethod
100 - def runNode(cls, runcfg):
101 ''' 102 Start the node with given name from the given configuration. 103 @param runcfg: the configuration containing the start parameter 104 @type runcfg: AdvRunCfg 105 @raise StartException: if the screen is not available on host. 106 @raise Exception: on errors while resolving host 107 @see: L{node_manager_fkie.is_local()} 108 ''' 109 # 'print "RUN node", node, time.time() 110 n = runcfg.roslaunch_config.getNode(runcfg.node) 111 if n is None: 112 raise StartException(''.join(["Node '", runcfg.node, "' not found!"])) 113 114 env = list(n.env_args) 115 # set logging options 116 if runcfg.logging is not None: 117 if not runcfg.logging.is_default('console_format'): 118 env.append(('ROSCONSOLE_FORMAT', '%s' % runcfg.logging.console_format)) 119 if n.respawn: 120 # set the respawn environment variables 121 respawn_params = cls._get_respawn_params(rospy.names.ns_join(n.namespace, n.name), runcfg.roslaunch_config.Roscfg.params) 122 if respawn_params['max'] > 0: 123 env.append(('RESPAWN_MAX', '%d' % respawn_params['max'])) 124 if respawn_params['min_runtime'] > 0: 125 env.append(('RESPAWN_MIN_RUNTIME', '%d' % respawn_params['min_runtime'])) 126 if respawn_params['delay'] > 0: 127 env.append(('RESPAWN_DELAY', '%d' % respawn_params['delay'])) 128 prefix = n.launch_prefix if n.launch_prefix is not None else '' 129 if prefix.lower() == 'screen' or prefix.lower().find('screen ') != -1: 130 rospy.loginfo("SCREEN prefix removed before start!") 131 prefix = '' 132 args = ['__ns:=%s' % n.namespace.rstrip(rospy.names.SEP), '__name:=%s' % n.name] 133 if n.cwd is not None: 134 args.append('__cwd:=%s' % n.cwd) 135 136 # add remaps 137 for remap in n.remap_args: 138 args.append(''.join([remap[0], ':=', remap[1]])) 139 140 # get host of the node 141 host = runcfg.roslaunch_config.hostname 142 env_loader = '' 143 if n.machine_name: 144 machine = runcfg.roslaunch_config.Roscfg.machines[n.machine_name] 145 if machine.address not in ['localhost', '127.0.0.1']: 146 host = machine.address 147 if runcfg.masteruri is None: 148 runcfg.masteruri = nm.nameres().masteruri(n.machine_name) 149 # TODO: env-loader support? 150 # if hasattr(machine, "env_loader") and machine.env_loader: 151 # env_loader = machine.env_loader 152 # set the host to the given host 153 if runcfg.force2host is not None: 154 host = runcfg.force2host 155 156 # set the ROS_MASTER_URI 157 if runcfg.masteruri is None: 158 runcfg.masteruri = masteruri_from_ros() 159 env.append(('ROS_MASTER_URI', runcfg.masteruri)) 160 ros_hostname = NameResolution.get_ros_hostname(runcfg.masteruri) 161 if ros_hostname: 162 env.append(('ROS_HOSTNAME', ros_hostname)) 163 164 abs_paths = list() # tuples of (parameter name, old value, new value) 165 not_found_packages = list() # package names 166 # set the global parameter 167 if runcfg.masteruri is not None and runcfg.masteruri not in runcfg.roslaunch_config.global_param_done: 168 global_node_names = cls.getGlobalParams(runcfg.roslaunch_config.Roscfg) 169 rospy.loginfo("Register global parameter:\n %s", '\n '.join("%s%s" % (utf8(v)[:80], '...' if len(utf8(v)) > 80 else'') for v in global_node_names.values())) 170 abs_paths[len(abs_paths):], not_found_packages[len(not_found_packages):] = cls._load_parameters(runcfg.masteruri, global_node_names, [], runcfg.user, runcfg.pw, runcfg.auto_pw_request) 171 runcfg.roslaunch_config.global_param_done.append(runcfg.masteruri) 172 173 # add params 174 if runcfg.masteruri is not None: 175 nodens = ''.join([n.namespace, n.name, rospy.names.SEP]) 176 params = dict() 177 for param, value in runcfg.roslaunch_config.Roscfg.params.items(): 178 if param.startswith(nodens): 179 params[param] = value 180 clear_params = [] 181 for cparam in runcfg.roslaunch_config.Roscfg.clear_params: 182 if cparam.startswith(nodens): 183 clear_params.append(cparam) 184 rospy.loginfo("Delete parameter:\n %s", '\n '.join(clear_params)) 185 rospy.loginfo("Register parameter:\n %s", '\n '.join("%s%s" % (utf8(v)[:80], '...' if len(utf8(v)) > 80 else'') for v in params.values())) 186 abs_paths[len(abs_paths):], not_found_packages[len(not_found_packages):] = cls._load_parameters(runcfg.masteruri, params, clear_params, runcfg.user, runcfg.pw, runcfg.auto_pw_request) 187 # 'print "RUN prepared", node, time.time() 188 189 if nm.is_local(host, wait=True): 190 nm.screen().testScreen() 191 if runcfg.executable: 192 cmd_type = runcfg.executable 193 else: 194 try: 195 cmd = roslib.packages.find_node(n.package, n.type) 196 except (Exception, roslib.packages.ROSPkgException) as starterr: 197 # multiple nodes, invalid package 198 raise StartException("Can't find resource: %s" % starterr) 199 # handle diferent result types str or array of string 200 if isinstance(cmd, types.StringTypes): 201 cmd = [cmd] 202 cmd_type = '' 203 if cmd is None or len(cmd) == 0: 204 raise StartException("%s in package [%s] not found!\n\nThe package " 205 "was created?\nIs the binary executable?\n" % (n.type, n.package)) 206 if len(cmd) > 1: 207 if runcfg.auto_pw_request: 208 # Open selection for executables, only if the method is called from the main GUI thread 209 try: 210 try: 211 from python_qt_binding.QtGui import QInputDialog 212 except: 213 from python_qt_binding.QtWidgets import QInputDialog 214 item, result = QInputDialog.getItem(None, 'Multiple executables %s in %s' % (n.type, n.package), 215 'Select an executable', 216 cmd, 0, False) 217 if result: 218 # open the selected screen 219 cmd_type = item 220 else: 221 return 222 except: 223 raise StartException('Multiple executables with same name in package found!') 224 else: 225 err = BinarySelectionRequest(cmd, 'Multiple executables') 226 raise nm.InteractionNeededError(err, cls.runNode, 227 (runcfg,)) 228 else: 229 cmd_type = cmd[0] 230 # determine the current working path, Default: the package of the node 231 cwd = get_ros_home() 232 if not (n.cwd is None): 233 if n.cwd == 'ROS_HOME': 234 cwd = get_ros_home() 235 elif n.cwd == 'node': 236 cwd = os.path.dirname(cmd_type) 237 cls._prepareROSMaster(runcfg.masteruri) 238 node_cmd = [nm.settings().respawn_script if n.respawn else '', prefix, cmd_type] 239 cmd_args = [nm.screen().getSceenCmd(runcfg.node)] 240 cmd_args[len(cmd_args):] = node_cmd 241 cmd_args.append(utf8(n.args)) 242 cmd_args[len(cmd_args):] = args 243 rospy.loginfo("RUN: %s", ' '.join(cmd_args)) 244 new_env = dict(os.environ) 245 new_env['ROS_MASTER_URI'] = runcfg.masteruri 246 ros_hostname = NameResolution.get_ros_hostname(runcfg.masteruri) 247 if ros_hostname: 248 new_env['ROS_HOSTNAME'] = ros_hostname 249 # add the namespace environment parameter to handle some cases, 250 # e.g. rqt_cpp plugins 251 if n.namespace: 252 new_env['ROS_NAMESPACE'] = n.namespace 253 # set logging options 254 if runcfg.logging is not None: 255 if not runcfg.logging.is_default('loglevel'): 256 env.append(('ROSCONSOLE_CONFIG_FILE', nm.settings().rosconsole_cfg_file(n.package))) 257 for key, value in env: 258 new_env[key] = value 259 SupervisedPopen(shlex.split(utf8(' '.join(cmd_args))), cwd=cwd, 260 env=new_env, object_id="Run node", description="Run node " 261 "[%s]%s" % (utf8(n.package), utf8(n.type))) 262 nm.filewatcher().add_binary(cmd_type, runcfg.node, runcfg.masteruri, runcfg.roslaunch_config.Filename) 263 else: 264 # 'print "RUN REMOTE", node, time.time() 265 # start remote 266 if runcfg.roslaunch_config.PackageName is None: 267 raise StartException("Can't run remote without a valid package name!") 268 # thus the prefix parameters while the transfer are not separated 269 if prefix: 270 prefix = ''.join(['"', prefix, '"']) 271 # setup environment 272 env_command = '' 273 if env_loader: 274 rospy.logwarn("env_loader in machine tag currently not supported") 275 raise StartException("env_loader in machine tag currently not supported") 276 if env: 277 new_env = dict() 278 try: 279 for (k, v) in env: 280 v_value, is_abs_path, found, package = cls._resolve_abs_paths(v, host, runcfg.user, runcfg.pw, runcfg.auto_pw_request) 281 new_env[k] = v_value 282 if is_abs_path: 283 abs_paths.append(('ENV', "%s=%s" % (k, v), "%s=%s" % (k, v_value))) 284 if not found and package: 285 not_found_packages.append(package) 286 env_command = "env " + ' '.join(["%s=\'%s\'" % (k, v) for (k, v) in new_env.items()]) 287 except nm.AuthenticationRequest as e: 288 raise nm.InteractionNeededError(e, cls.runNode, (runcfg,)) 289 290 startcmd = [env_command, nm.settings().start_remote_script, 291 '--package', utf8(n.package), 292 '--node_type', utf8(n.type), 293 '--node_name', utf8(runcfg.node), 294 '--node_respawn true' if n.respawn else ''] 295 if runcfg.masteruri is not None: 296 startcmd.append('--masteruri') 297 startcmd.append(runcfg.masteruri) 298 if prefix: 299 startcmd[len(startcmd):] = ['--prefix', prefix] 300 if runcfg.logging is not None: 301 if not runcfg.logging.is_default('loglevel'): 302 startcmd.append('--loglevel') 303 startcmd.append(nm.settings().logging.loglevel) 304 305 # rename the absolute paths in the args of the node 306 node_args = [] 307 try: 308 for a in n.args.split(): 309 a_value, is_abs_path, found, package = cls._resolve_abs_paths(a, host, runcfg.user, runcfg.pw, runcfg.auto_pw_request) 310 node_args.append(a_value) 311 if is_abs_path: 312 abs_paths.append(('ARGS', a, a_value)) 313 if not found and package: 314 not_found_packages.append(package) 315 316 startcmd[len(startcmd):] = node_args 317 startcmd[len(startcmd):] = args 318 rospy.loginfo("Run remote on %s: %s", host, utf8(' '.join(startcmd))) 319 # 'print "RUN CALL", node, time.time() 320 _, stdout, stderr, ok = nm.ssh().ssh_exec(host, startcmd, runcfg.user, runcfg.pw, runcfg.auto_pw_request, close_stdin=True) 321 output = stdout.read() 322 error = stderr.read() 323 stdout.close() 324 stderr.close() 325 # 'print "RUN CALLOK", node, time.time() 326 except nm.AuthenticationRequest as e: 327 raise nm.InteractionNeededError(e, cls.runNode, (runcfg,)) 328 329 if ok: 330 if error: 331 rospy.logwarn("ERROR while start '%s': %s", runcfg.node, error) 332 raise StartException(utf8(''.join(['The host "', host, '" reports:\n', error]))) 333 if output: 334 rospy.loginfo("STDOUT while start '%s': %s", runcfg.node, output) 335 # inform about absolute paths in parameter value 336 if len(abs_paths) > 0: 337 rospy.loginfo("Absolute paths found while start:\n%s", utf8('\n'.join([''.join([p, '\n OLD: ', ov, '\n NEW: ', nv]) for p, ov, nv in abs_paths]))) 338 339 if len(not_found_packages) > 0: 340 packages = '\n'.join(not_found_packages) 341 raise StartException(utf8('\n'.join(['Some absolute paths are not renamed because following packages are not found on remote host:', packages])))
342 # if len(abs_paths) > 0: 343 # parameters = '\n'.join(abs_paths) 344 # raise nm.StartException(utf8('\n'.join(['Following parameter seems to use an absolute local path for remote host:', parameters, 'Use "cwd" attribute of the "node" tag to specify relative paths for remote usage!']))) 345 # 'print "RUN OK", node, time.time() 346 347 @classmethod
348 - def _get_respawn_params(cls, node, params):
349 result = {'max': 0, 'min_runtime': 0, 'delay': 0} 350 respawn_max = rospy.names.ns_join(node, 'respawn/max') 351 respawn_min_runtime = rospy.names.ns_join(node, 'respawn/min_runtime') 352 respawn_delay = rospy.names.ns_join(node, 'respawn/delay') 353 try: 354 result['max'] = int(params[respawn_max].value) 355 except: 356 pass 357 try: 358 result['min_runtime'] = int(params[respawn_min_runtime].value) 359 except: 360 pass 361 try: 362 result['delay'] = int(params[respawn_delay].value) 363 except: 364 pass 365 return result
366 367 @classmethod
368 - def _load_parameters(cls, masteruri, params, clear_params, user, pw, auto_pw_request):
369 """ 370 Load parameters onto the parameter server 371 """ 372 import roslaunch 373 import roslaunch.launch 374 param_server = xmlrpclib.ServerProxy(masteruri) 375 p = None 376 abs_paths = list() # tuples of (parameter name, old value, new value) 377 not_found_packages = list() # packages names 378 param_errors = [] 379 try: 380 socket.setdefaulttimeout(6 + len(clear_params)) 381 # multi-call style xmlrpc 382 param_server_multi = xmlrpclib.MultiCall(param_server) 383 384 # clear specified parameter namespaces 385 # #2468 unify clear params to prevent error 386 for p in clear_params: 387 param_server_multi.deleteParam(rospy.get_name(), p) 388 r = param_server_multi() 389 for code, msg, _ in r: 390 if code != 1 and not msg.find("is not set"): 391 rospy.logwarn("Failed to clear parameter: %s", msg) 392 # raise StartException("Failed to clear parameter: %s"%(msg)) 393 394 # multi-call objects are not reusable 395 socket.setdefaulttimeout(6 + len(params)) 396 param_server_multi = xmlrpclib.MultiCall(param_server) 397 address = nm.nameres().address(masteruri) 398 for p in params.itervalues(): 399 # suppressing this as it causes too much spam 400 value, is_abs_path, found, package = cls._resolve_abs_paths(p.value, address, user, pw, auto_pw_request) 401 if is_abs_path: 402 abs_paths.append((p.key, p.value, value)) 403 if not found and package: 404 not_found_packages.append(package) 405 # if p.value is None: 406 # rospy.logwarn("The parameter '%s' is invalid: '%s'"%(p.key, p.value)) 407 # raise StartException("The parameter '%s' is invalid: '%s'"%(p.key, p.value)) 408 param_server_multi.setParam(rospy.get_name(), p.key, value if is_abs_path else p.value) 409 test_ret = cls._test_value(p.key, value if is_abs_path else p.value) 410 if test_ret: 411 param_errors.extend(test_ret) 412 r = param_server_multi() 413 for code, msg, _ in r: 414 if code != 1: 415 raise StartException("Failed to set parameter: %s" % (msg)) 416 except roslaunch.core.RLException, e: 417 raise StartException(e) 418 except Exception as e: 419 raise StartException("Failed to set parameter. ROS Parameter Server " 420 "reports: %s\n\n%s" % (e, '\n'.join(param_errors))) 421 finally: 422 socket.setdefaulttimeout(None) 423 return abs_paths, not_found_packages
424 425 @classmethod
426 - def _test_value(cls, key, value):
427 result = [] 428 if value is None: 429 msg = "Invalid parameter value of '%s': '%s'" % (key, value) 430 result.append(msg) 431 rospy.logwarn(msg) 432 elif isinstance(value, list): 433 for val in value: 434 ret = cls._test_value(key, val) 435 if ret: 436 result.extend(ret) 437 elif isinstance(value, dict): 438 for subkey, val in value.items(): 439 ret = cls._test_value("%s/%s" % (key, subkey), val) 440 if ret: 441 result.extend(ret) 442 return result
443 444 @classmethod
445 - def _resolve_abs_paths(cls, value, host, user, pw, auto_pw_request):
446 ''' 447 Replaces the local absolute path by remote absolute path. Only valid ROS 448 package paths are resolved. 449 @return: value, is absolute path, remote package found (ignore it on local host or if is not absolute path!), package name (if absolute path and remote package NOT found) 450 ''' 451 if isinstance(value, types.StringTypes) and value.startswith('/') and (os.path.isfile(value) or os.path.isdir(value)): 452 if nm.is_local(host): 453 return value, True, True, '' 454 else: 455 path = os.path.dirname(value) if os.path.isfile(value) else value 456 package, package_path = package_name(path) 457 if package: 458 _, stdout, _, ok = nm.ssh().ssh_exec(host, ['rospack', 'find', package], user, pw, auto_pw_request, close_stdin=True, close_stderr=True) 459 output = stdout.read() 460 stdout.close() 461 if ok: 462 if output: 463 value.replace(package_path, output) 464 return value.replace(package_path, output.strip()), True, True, package 465 else: 466 # package on remote host not found! 467 # TODO add error message 468 # error = stderr.read() 469 pass 470 return value, True, False, '' 471 else: 472 return value, False, False, ''
473 474 @classmethod
475 - def runNodeWithoutConfig(cls, host, package, binary, name, args=[], masteruri=None, auto_pw_request=False, user=None, pw=None):
476 ''' 477 Start a node with using a launch configuration. 478 @param host: the host or ip to run the node 479 @type host: C{str} 480 @param package: the ROS package containing the binary 481 @type package: C{str} 482 @param binary: the binary of the node to execute 483 @type binary: C{str} 484 @param name: the ROS name of the node (with name space) 485 @type name: C{str} 486 @param args: the list with arguments passed to the binary 487 @type args: C{[str, ...]} 488 @param auto_pw_request: opens question dialog directly, use True only if the method is called from the main GUI thread 489 @type auto_pw_request: bool 490 @raise Exception: on errors while resolving host 491 @see: L{node_manager_fkie.is_local()} 492 ''' 493 # create the name with namespace 494 args2 = list(args) 495 fullname = roslib.names.ns_join(roslib.names.SEP, name) 496 namespace = '' 497 for a in args: 498 if a.startswith('__ns:='): 499 namespace = a.replace('__ns:=', '') 500 fullname = roslib.names.ns_join(namespace, name) 501 args2.append(''.join(['__name:=', name])) 502 # run on local host 503 if nm.is_local(host, wait=True): 504 try: 505 cmd = roslib.packages.find_node(package, binary) 506 except roslib.packages.ROSPkgException as e: 507 # multiple nodes, invalid package 508 raise StartException(utf8(e)) 509 # handle different result types str or array of string 510 # import types 511 if isinstance(cmd, types.StringTypes): 512 cmd = [cmd] 513 cmd_type = '' 514 if cmd is None or len(cmd) == 0: 515 raise StartException('%s in package [%s] not found!' % (binary, package)) 516 if len(cmd) > 1: 517 # Open selection for executables 518 err = [''.join(['Multiple executables with same name in package [', package, '] found:'])] 519 err.extend(cmd) 520 raise StartException('\n'.join(err)) 521 else: 522 cmd_type = cmd[0] 523 cmd_str = utf8(' '.join([nm.screen().getSceenCmd(fullname), cmd_type, ' '.join(args2)])) 524 rospy.loginfo("Run without config: %s", cmd_str) 525 new_env = dict(os.environ) 526 if namespace: 527 new_env['ROS_NAMESPACE'] = namespace 528 if masteruri is not None: 529 cls._prepareROSMaster(masteruri) 530 new_env['ROS_MASTER_URI'] = masteruri 531 ros_hostname = NameResolution.get_ros_hostname(masteruri) 532 if ros_hostname: 533 new_env['ROS_HOSTNAME'] = ros_hostname 534 SupervisedPopen(shlex.split(cmd_str), env=new_env, object_id="Run without config", description="Run without config [%s]%s" % (utf8(package), utf8(binary))) 535 else: 536 # run on a remote machine 537 startcmd = [nm.settings().start_remote_script, 538 '--package', utf8(package), 539 '--node_type', utf8(binary), 540 '--node_name', utf8(fullname)] 541 startcmd[len(startcmd):] = args2 542 if masteruri is not None: 543 startcmd.append('--masteruri') 544 startcmd.append(masteruri) 545 rospy.loginfo("Run remote on %s: %s", host, ' '.join(startcmd)) 546 try: 547 _, stdout, stderr, ok = nm.ssh().ssh_exec(host, startcmd, user, pw, auto_pw_request, close_stdin=True) 548 if ok: 549 output = stdout.read() 550 error = stderr.read() 551 stdout.close() 552 stderr.close() 553 if error: 554 rospy.logwarn("ERROR while start '%s': %s", name, error) 555 raise StartException(''.join(['The host "', host, '" reports:\n', error])) 556 if output: 557 if output.find("dn't") != -1: 558 rospy.logwarn("Warning while start '%s': %s", name, output) 559 else: 560 rospy.loginfo("STDOUT while start '%s': %s", name, output) 561 else: 562 if error: 563 rospy.logwarn("ERROR while start '%s': %s", name, error) 564 raise StartException(''.join(['The host "', host, '" reports:\n', error])) 565 except nm.AuthenticationRequest as e: 566 raise nm.InteractionNeededError(e, cls.runNodeWithoutConfig, (host, package, binary, name, args, masteruri, auto_pw_request))
567 568 @classmethod
569 - def _prepareROSMaster(cls, masteruri):
570 if masteruri is None: 571 masteruri = masteruri_from_ros() 572 # start roscore, if needed 573 try: 574 if not os.path.isdir(nm.ScreenHandler.LOG_PATH): 575 os.makedirs(nm.ScreenHandler.LOG_PATH) 576 socket.setdefaulttimeout(3) 577 master = xmlrpclib.ServerProxy(masteruri) 578 master.getUri(rospy.get_name()) 579 # restart ROSCORE on different masteruri?, not now... 580 # master_uri = master.getUri(rospy.get_name()) 581 # if masteruri != master_uri[2]: 582 # # kill the local roscore... 583 # raise 584 except: 585 # run a roscore 586 master_host = get_hostname(masteruri) 587 if nm.is_local(master_host, True): 588 master_port = get_port(masteruri) 589 new_env = dict(os.environ) 590 new_env['ROS_MASTER_URI'] = masteruri 591 ros_hostname = NameResolution.get_ros_hostname(masteruri) 592 if ros_hostname: 593 new_env['ROS_HOSTNAME'] = ros_hostname 594 cmd_args = '%s roscore --port %d' % (nm.ScreenHandler.getSceenCmd('/roscore--%d' % master_port), master_port) 595 for n in [1, 2, 3, 4]: 596 try: 597 if n == 1: 598 print("Launch ROS Master in screen ...") 599 SupervisedPopen(shlex.split(cmd_args), env=new_env, object_id="ROSCORE", description="Start roscore") 600 elif n == 2: 601 print("ROS Master takes too long for start, wait for next 10 sec ...") 602 elif n == 3: 603 print("A really slow start, wait for last 10 sec ...") 604 # wait for roscore to avoid connection problems while init_node 605 result = -1 606 count = 1 607 while result == -1 and count < 11: 608 try: 609 master = xmlrpclib.ServerProxy(masteruri) 610 result, _, _ = master.getUri(rospy.get_name()) # _:=uri, msg 611 return 612 except Exception: 613 time.sleep(1) 614 count += 1 615 if n == 4 and count >= 11: 616 raise StartException('Cannot connect to ROS-Master: %s\n--> please run "roscore" manually!' % utf8(masteruri)) 617 except Exception as e: 618 raise Exception("Error while call '%s': %s" % (cmd_args, utf8(e))) 619 else: 620 raise Exception("ROS master '%s' is not reachable" % masteruri) 621 finally: 622 socket.setdefaulttimeout(None)
623
624 - def callService(self, service_uri, service, service_type, service_args=[]):
625 ''' 626 Calls the service and return the response. 627 To call the service the ServiceProxy can't be used, because it uses 628 environment variables to determine the URI of the running service. In our 629 case this service can be running using another ROS master. The changes on the 630 environment variables is not thread safe. 631 So the source code of the rospy.SerivceProxy (tcpros_service.py) was modified. 632 633 @param service_uri: the URI of the service 634 @type service_uri: C{str} 635 @param service: full service name (with name space) 636 @type service: C{str} 637 @param service_type: service class 638 @type service_type: ServiceDefinition: service class 639 @param service_args: arguments 640 @return: the tuple of request and response. 641 @rtype: C{(request object, response object)} 642 @raise StartException: on error 643 644 @see: U{rospy.SerivceProxy<http://docs.ros.org/kinetic/api/rospy/html/rospy.impl.tcpros_service.ServiceProxy-class.html>} 645 646 ''' 647 service = str(service) 648 rospy.loginfo("Call service %s[%s]: %s, %s", utf8(service), utf8(service_uri), utf8(service_type), utf8(service_args)) 649 from rospy.core import parse_rosrpc_uri, is_shutdown 650 # from rospy.msg import args_kwds_to_message 651 from rospy.exceptions import TransportInitError, TransportException 652 from rospy.impl.tcpros_base import TCPROSTransport, DEFAULT_BUFF_SIZE # ,TCPROSTransportProtocol 653 from rospy.impl.tcpros_service import TCPROSServiceClient 654 from rospy.service import ServiceException 655 request = service_type._request_class() 656 import genpy 657 try: 658 now = rospy.get_rostime() 659 import std_msgs.msg 660 keys = {'now': now, 'auto': std_msgs.msg.Header(stamp=now)} 661 genpy.message.fill_message_args(request, service_args, keys) 662 except genpy.MessageException as e: 663 def argsummary(args): 664 if type(args) in [tuple, list]: 665 return '\n'.join([' * %s (type %s)' % (a, type(a).__name__) for a in args]) 666 else: 667 return ' * %s (type %s)' % (args, type(args).__name__)
668 raise StartException("Incompatible arguments to call service:\n%s\nProvided arguments are:\n%s\n\nService arguments are: [%s]" % (e, argsummary(service_args), genpy.message.get_printable_message_args(request))) 669 670 # request = args_kwds_to_message(type._request_class, args, kwds) 671 protocol = TCPROSServiceClient(service, service_type, headers={}) 672 transport = TCPROSTransport(protocol, service) 673 # initialize transport 674 dest_addr, dest_port = parse_rosrpc_uri(service_uri) 675 # connect to service 676 transport.buff_size = DEFAULT_BUFF_SIZE 677 try: 678 transport.connect(dest_addr, dest_port, service_uri, timeout=5) 679 except TransportInitError as e: 680 # can be a connection or md5sum mismatch 681 raise StartException(''.join(["unable to connect to service: ", utf8(e)])) 682 transport.send_message(request, 0) 683 try: 684 responses = transport.receive_once() 685 if len(responses) == 0: 686 raise StartException("service [%s] returned no response" % service) 687 elif len(responses) > 1: 688 raise StartException("service [%s] returned multiple responses: %s" % (service, len(responses))) 689 except TransportException as e: 690 # convert lower-level exception to exposed type 691 if is_shutdown(): 692 raise StartException("node shutdown interrupted service call") 693 else: 694 raise StartException("transport error completing service call: %s" % (utf8(e))) 695 except ServiceException, e: 696 raise StartException("Service error: %s" % (utf8(e))) 697 finally: 698 transport.close() 699 transport = None 700 return request, responses[0] if len(responses) > 0 else None
701 702 @classmethod
703 - def getGlobalParams(cls, roscfg):
704 ''' 705 Return the parameter of the configuration file, which are not associated with 706 any nodes in the configuration. 707 @param roscfg: the launch configuration 708 @type roscfg: U{roslaunch.ROSLaunchConfig<http://docs.ros.org/kinetic/api/roslaunch/html/>} 709 @return: the list with names of the global parameter 710 @rtype: C{dict(param:value, ...)} 711 ''' 712 result = dict() 713 nodes = [] 714 for item in roscfg.resolved_node_names: 715 nodes.append(item) 716 for param, value in roscfg.params.items(): 717 nodesparam = False 718 for n in nodes: 719 if param.startswith(n): 720 nodesparam = True 721 break 722 if not nodesparam: 723 result[param] = value 724 return result
725 726 @classmethod
727 - def get_log_path(cls, host, nodes=[], auto_pw_request=False, user=None, pw=None):
728 if nm.is_local(host): 729 if len(nodes) == 1: 730 return nm.screen().getScreenLogFile(node=nodes[0]) 731 else: 732 return nm.screen().LOG_PATH 733 else: 734 request = '[]' if len(nodes) != 1 else nodes[0] 735 try: 736 socket.setdefaulttimeout(3) 737 _, stdout, _, ok = nm.ssh().ssh_exec(host, [nm.settings().start_remote_script, '--ros_log_path', request], user, pw, auto_pw_request, close_stdin=True, close_stderr=True) 738 if ok: 739 output = stdout.read() 740 stdout.close() 741 return output.strip() 742 else: 743 raise StartException(utf8(''.join(['Get log path from "', host, '" failed']))) 744 except nm.AuthenticationRequest as e: 745 raise nm.InteractionNeededError(e, cls.get_log_path, (host, nodes, auto_pw_request)) 746 finally: 747 socket.setdefaulttimeout(None)
748 749 @classmethod
750 - def openLog(cls, nodename, host, user=None, only_screen=False):
751 ''' 752 Opens the log file associated with the given node in a new terminal. 753 @param nodename: the name of the node (with name space) 754 @type nodename: C{str} 755 @param host: the host name or ip where the log file are 756 @type host: C{str} 757 @return: C{True}, if a log file was found 758 @rtype: C{bool} 759 @raise Exception: on errors while resolving host 760 @see: L{node_manager_fkie.is_local()} 761 ''' 762 rospy.loginfo("show log for '%s' on '%s'", utf8(nodename), utf8(host)) 763 title_opt = 'LOG %s on %s' % (nodename, host) 764 if nm.is_local(host): 765 found = False 766 screenLog = nm.screen().getScreenLogFile(node=nodename) 767 if os.path.isfile(screenLog): 768 cmd = nm.settings().terminal_cmd([nm.settings().log_viewer, screenLog], title_opt) 769 rospy.loginfo("open log: %s", cmd) 770 SupervisedPopen(shlex.split(cmd), object_id="Open log", description="Open log for '%s' on '%s'" % (utf8(nodename), utf8(host))) 771 found = True 772 # open roslog file 773 roslog = nm.screen().getROSLogFile(nodename) 774 if os.path.isfile(roslog) and not only_screen: 775 title_opt = title_opt.replace('LOG', 'ROSLOG') 776 cmd = nm.settings().terminal_cmd([nm.settings().log_viewer, roslog], title_opt) 777 rospy.loginfo("open ROS log: %s", cmd) 778 SupervisedPopen(shlex.split(cmd), object_id="Open log", description="Open log for '%s' on '%s'" % (utf8(nodename), utf8(host))) 779 found = True 780 return found 781 else: 782 ps = nm.ssh().ssh_x11_exec(host, [nm.settings().start_remote_script, '--show_screen_log', nodename], title_opt, user) 783 if not only_screen: 784 ps = nm.ssh().ssh_x11_exec(host, [nm.settings().start_remote_script, '--show_ros_log', nodename], title_opt.replace('LOG', 'ROSLOG'), user) 785 return False
786 787 @classmethod
788 - def deleteLog(cls, nodename, host, auto_pw_request=False, user=None, pw=None):
789 ''' 790 Deletes the log file associated with the given node. 791 @param nodename: the name of the node (with name space) 792 @type nodename: C{str} 793 @param host: the host name or ip where the log file are to delete 794 @type host: C{str} 795 @raise Exception: on errors while resolving host 796 @see: L{node_manager_fkie.is_local()} 797 ''' 798 rospy.loginfo("delete log for '%s' on '%s'", utf8(nodename), utf8(host)) 799 if nm.is_local(host): 800 screenLog = nm.screen().getScreenLogFile(node=nodename) 801 pidFile = nm.screen().getScreenPidFile(node=nodename) 802 roslog = nm.screen().getROSLogFile(nodename) 803 if os.path.isfile(screenLog): 804 os.remove(screenLog) 805 if os.path.isfile(pidFile): 806 os.remove(pidFile) 807 if os.path.isfile(roslog): 808 os.remove(roslog) 809 else: 810 try: 811 # output ignored: output, error, ok 812 _, stdout, _, ok = nm.ssh().ssh_exec(host, [nm.settings().start_remote_script, '--delete_logs', nodename], user, pw, auto_pw_request, close_stdin=True, close_stdout=False, close_stderr=True) 813 if ok: 814 stdout.readlines() 815 stdout.close() 816 except nm.AuthenticationRequest as e: 817 raise nm.InteractionNeededError(e, cls.deleteLog, (nodename, host, auto_pw_request))
818
819 - def kill(self, host, pid, auto_pw_request=False, user=None, pw=None):
820 ''' 821 Kills the process with given process id on given host. 822 @param host: the name or address of the host, where the process must be killed. 823 @type host: C{str} 824 @param pid: the process id 825 @type pid: C{int} 826 @raise StartException: on error 827 @raise Exception: on errors while resolving host 828 @see: L{node_manager_fkie.is_local()} 829 ''' 830 try: 831 self._kill_wo(host, pid, auto_pw_request, user, pw) 832 except nm.AuthenticationRequest as e: 833 raise nm.InteractionNeededError(e, self.kill, (host, pid, auto_pw_request))
834
835 - def _kill_wo(self, host, pid, auto_pw_request=False, user=None, pw=None):
836 rospy.loginfo("kill %s on %s", utf8(pid), host) 837 if nm.is_local(host): 838 os.kill(pid, signal.SIGKILL) 839 rospy.loginfo("kill: %s", utf8(pid)) 840 else: 841 # kill on a remote machine 842 cmd = ['kill -9', str(pid)] 843 _, stdout, stderr, ok = nm.ssh().ssh_exec(host, cmd, user, pw, False, close_stdin=True) 844 if ok: 845 output = stdout.read() 846 error = stderr.read() 847 stdout.close() 848 stderr.close() 849 if error: 850 rospy.logwarn("ERROR while kill %s: %s", utf8(pid), error) 851 raise StartException(utf8(''.join(['The host "', host, '" reports:\n', error]))) 852 if output: 853 rospy.logdebug("STDOUT while kill %s on %s: %s", utf8(pid), host, output)
854
855 - def killall_roscore(self, host, auto_pw_request=False, user=None, pw=None):
856 ''' 857 Kills all roscore processes on given host. 858 @param host: the name or address of the host, where the process must be killed. 859 @type host: C{str} 860 @raise StartException: on error 861 @raise Exception: on errors while resolving host 862 @see: L{node_manager_fkie.is_local()} 863 ''' 864 try: 865 self._killall_roscore_wo(host, auto_pw_request, user, pw) 866 except nm.AuthenticationRequest as e: 867 raise nm.InteractionNeededError(e, self.killall_roscore, (host, auto_pw_request))
868
869 - def _killall_roscore_wo(self, host, auto_pw_request=False, user=None, pw=None):
870 rospy.loginfo("killall roscore on %s", host) 871 cmd = ['killall', 'roscore'] 872 if nm.is_local(host): 873 SupervisedPopen(cmd, object_id="killall roscore", description="killall roscore") 874 else: 875 # kill on a remote machine 876 _, stdout, stderr, ok = nm.ssh().ssh_exec(host, cmd, user, pw, False, close_stdin=True) 877 if ok: 878 output = stdout.read() 879 error = stderr.read() 880 stdout.close() 881 stderr.close() 882 if error: 883 rospy.logwarn("ERROR while killall roscore on %s: %s" % (host, error)) 884 raise StartException('The host "%s" reports:\n%s' % (host, error)) 885 if output: 886 rospy.logdebug("STDOUT while killall roscore on %s: %s" % (host, output))
887
888 - def poweroff(self, host, auto_pw_request=False, user=None, pw=None):
889 ''' 890 poweroff given host. 891 @param host: the name or address of the host, where the process must be killed. 892 @type host: C{str} 893 @raise StartException: on error 894 @raise Exception: on errors while resolving host 895 @see: L{node_manager_fkie.is_local()} 896 ''' 897 try: 898 self._poweroff_wo(host, auto_pw_request, user, pw) 899 except nm.AuthenticationRequest as e: 900 raise nm.InteractionNeededError(e, self.poweroff, (host, auto_pw_request))
901
902 - def _poweroff_wo(self, host, auto_pw_request=False, user=None, pw=None):
903 if nm.is_local(host): 904 rospy.logwarn("shutdown localhost localhost!") 905 cmd = nm.settings().terminal_cmd(['sudo poweroff'], "poweroff") 906 SupervisedPopen(shlex.split(cmd), object_id="poweroff", description="poweroff") 907 else: 908 rospy.loginfo("poweroff %s", host) 909 # kill on a remote machine 910 cmd = ['sudo poweroff'] 911 _ = nm.ssh().ssh_x11_exec(host, cmd, 'Shutdown %s' % host, user)
912
913 - def rosclean(self, host, auto_pw_request=False, user=None, pw=None):
914 ''' 915 rosclean purge on given host. 916 @param host: the name or address of the host, where rosclean is called. 917 @type host: C{str} 918 @raise StartException: on error 919 @raise Exception: on errors while resolving host 920 @see: L{node_manager_fkie.is_local()} 921 ''' 922 try: 923 self._rosclean_wo(host, auto_pw_request, user, pw) 924 except nm.AuthenticationRequest as e: 925 raise nm.InteractionNeededError(e, self.poweroff, (host, auto_pw_request))
926
927 - def _rosclean_wo(self, host, auto_pw_request=False, user=None, pw=None):
928 if nm.is_local(host): 929 rospy.loginfo("rosclean purge on localhost!") 930 cmd = nm.settings().terminal_cmd(['rosclean purge -y'], "rosclean") 931 SupervisedPopen(shlex.split(cmd), object_id="rosclean", description="rosclean") 932 else: 933 rospy.loginfo("rosclean %s", host) 934 # kill on a remote machine 935 cmd = ['rosclean purge -y'] 936 _ = nm.ssh().ssh_x11_exec(host, cmd, 'rosclean purge on %s' % host, user)
937 938 @classmethod
939 - def transfer_files(cls, host, path, auto_pw_request=False, user=None, pw=None):
940 ''' 941 Copies the given file to the remote host. Uses caching of remote paths. 942 ''' 943 # get package of the file 944 if nm.is_local(host): 945 # it's local -> no copy needed 946 return 947 (pkg_name, pkg_path) = package_name(os.path.dirname(path)) 948 if pkg_name is not None: 949 # get the subpath of the file 950 subfile_path = path.replace(pkg_path, '') 951 # get the path of the package on the remote machine 952 try: 953 output = '' 954 error = '' 955 ok = True 956 if host in CACHED_PKG_PATH and pkg_name in CACHED_PKG_PATH[host]: 957 output = CACHED_PKG_PATH[host][pkg_name] 958 else: 959 if host not in CACHED_PKG_PATH: 960 CACHED_PKG_PATH[host] = dict() 961 _, stdout, stderr, ok = nm.ssh().ssh_exec(host, [nm.settings().start_remote_script, '--package', pkg_name], user, pw, auto_pw_request, close_stdin=True) 962 output = stdout.read() 963 error = stderr.read() 964 stdout.close() 965 stderr.close() 966 if ok: 967 if error: 968 rospy.logwarn("ERROR while transfer %s to %s: %s", path, host, error) 969 raise StartException(utf8(''.join(['The host "', host, '" reports:\n', error]))) 970 if output: 971 CACHED_PKG_PATH[host][pkg_name] = output 972 nm.ssh().transfer(host, path, os.path.join(output.strip(), subfile_path.strip(os.sep)), user) 973 else: 974 raise StartException("Remote host no returned any answer. Is there the new version of node_manager installed?") 975 else: 976 raise StartException("Can't get path from remote host. Is there the new version of node_manager installed?") 977 except nm.AuthenticationRequest as e: 978 raise nm.InteractionNeededError(e, cls.transfer_files, (host, path, auto_pw_request))
979 980 @classmethod
981 - def ntpdate(cls, host, cmd, user=None, pw=None):
982 ''' 983 Opens the log file associated with the given node in a new terminal. 984 @param host: the host name or ip where the log file are 985 @type host: C{str} 986 @param cmd: command to set the time 987 @type cmd: C{str} 988 @return: C{True}, if a log file was found 989 @rtype: C{bool} 990 @raise Exception: on errors while resolving host 991 @see: L{node_manager_fkie.is_local()} 992 ''' 993 mesg = "synchronize time on '%s' using '%s'" % (utf8(host), cmd) 994 rospy.loginfo(mesg) 995 title_opt = "ntpdate on %s" % str(host) # '%s on %s' % (cmd, host) 996 if nm.is_local(host): 997 cmd = nm.settings().terminal_cmd([cmd], title_opt, noclose=True) 998 rospy.loginfo("EXEC: %s" % cmd) 999 ps = SupervisedPopen(shlex.split(cmd), object_id=cmd, description=mesg) 1000 else: 1001 ps = nm.ssh().ssh_x11_exec(host, [cmd, ';echo "";echo "this terminal will be closed in 10 sec...";sleep 10'], title_opt, user) 1002 return False
1003