Package node_manager_fkie :: Module launch_config
[frames] | no frames]

Source Code for Module node_manager_fkie.launch_config

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from python_qt_binding.QtCore import QFile, QObject, QRegExp 
 34  from xml.dom.minidom import parse  # , parseString 
 35  import os 
 36  import re 
 37  import sys 
 38  import time 
 39   
 40  import roslaunch 
 41  import roslib 
 42   
 43  from master_discovery_fkie.common import get_hostname, resolve_url 
 44  import node_manager_fkie as nm 
 45   
 46  from .common import package_name, resolve_paths, utf8 
47 48 49 # from xml.dom import Node as DomNode #avoid aliasing 50 -class LaunchConfigException(Exception):
51 pass
52
53 54 -class LaunchConfig(QObject):
55 ''' 56 A class to handle the ROS configuration stored in launch file. 57 ''' 58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 ''' 61 Creates the LaunchConfig object. The launch file will be not loaded on 62 creation, first on request of Roscfg value. 63 @param launch_file: The absolute or relative path with the launch file. 64 By using relative path a package must be valid for 65 remote launches. 66 @type launch_file: C{str} 67 @param package: the package containing the launch file. If None the 68 launch_file will be used to determine the launch file. 69 No remote launches a possible without a valid package. 70 @type package: C{str} or C{None} 71 @param masteruri: The URL of the ROS master. 72 @type masteruri: C{str} or C{None} 73 @param argv: the list the arguments needed for loading the given launch file 74 @type argv: C{[str]} 75 @raise roslaunch.XmlParseException: if the launch file can't be found. 76 ''' 77 QObject.__init__(self) 78 self.__launchFile = launch_file 79 self.__package = package_name(os.path.dirname(self.__launchFile))[0] if package is None else package 80 self.__masteruri = masteruri if masteruri is not None else 'localhost' 81 self.__roscfg = None 82 self.argv = argv 83 self.__reqTested = False 84 self.__argv_values = dict() 85 self.global_param_done = [] # masteruri's where the global parameters are registered 86 self.hostname = get_hostname(self.__masteruri) 87 self.__launch_id = '%.9f' % time.time() 88 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, [self.__launchFile])
89
90 - def __del__(self):
91 # Delete to avoid segfault if the LaunchConfig class is destroyed recently 92 # after creation and xmlrpclib.ServerProxy process a method call. 93 nm.filewatcher().rem_launch(self.__masteruri, self.__launchFile, self.__launch_id)
94 95 @property
96 - def masteruri(self):
97 ''' 98 Returns the master URI (host) where the node of this config will be started. 99 @rtype: C{str} 100 ''' 101 return self.__masteruri
102 103 @property
104 - def Roscfg(self):
105 ''' 106 Holds a loaded launch configuration. It raises a LaunchConfigException on load error. 107 @rtype: U{roslaunch.ROSLaunchConfig<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None} 108 @see L{load()} 109 ''' 110 if self.__roscfg is not None: 111 return self.__roscfg 112 else: 113 result, _ = self.load(self.argv) # _:=argv 114 if not result: 115 raise LaunchConfigException("not all argv are setted properly!") 116 return self.__roscfg
117 118 @property
119 - def Filename(self):
120 ''' 121 Returns an existing path with file name or an empty string. 122 @rtype: C{str} 123 ''' 124 if os.path.isfile(self.__launchFile): 125 return self.__launchFile 126 elif self.__package is not None: 127 try: 128 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop() 129 except Exception: 130 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!'])) 131 raise LaunchConfigException(''.join(['launch file ', self.__launchFile, ' not found!']))
132 133 @property
134 - def LaunchName(self):
135 ''' 136 Returns the name of the launch file with extension, e.g. 'test.launch' 137 @rtype: C{str} 138 ''' 139 return os.path.basename(self.__launchFile)
140 141 @property
142 - def PackageName(self):
143 ''' 144 Returns the name of the package containing the launch file or None. 145 @rtype: C{str} or C{None} 146 ''' 147 return self.__package
148 149 @classmethod
150 - def _index(cls, text, regexp_list):
151 ''' 152 Searches in the given text for key indicates the including of a file and 153 return their index. 154 @param text: 155 @type text: C{str} 156 @param regexp_list: 157 @type regexp_list: C{[U{QRegExp<https://srinikom.github.io/pyside-docs/PySide/QtCore/QRegExp.html>},..]} 158 @return: the index of the including key or -1 159 @rtype: C{int} 160 ''' 161 for pattern in regexp_list: 162 index = pattern.indexIn(text) 163 if index > -1: 164 return index 165 return -1
166 167 @classmethod
168 - def interpretPath(cls, path, pwd='.'):
169 ''' 170 Tries to determine the path of the included file. The statement of 171 $(find 'package') will be resolved. 172 The supported URL begins with `file:///`, `package://` or `pkg://`. 173 The package URL will be resolved to a valid file path. If the file is in a 174 subdirectory, you can replace the subdirectory by `///`. 175 @param path: the sting which contains the included path 176 @type path: C{str} 177 @param pwd: current working path 178 @type pwd: C{str} 179 @return: C{$(find 'package')} will be resolved. The prefixes `file:///`, 180 `package://` or `pkg://` are also resolved. Otherwise the parameter 181 itself will be returned. 182 @rtype: C{str} 183 ''' 184 path = path.strip() 185 startIndex = path.find('$(') 186 if startIndex > -1: 187 endIndex = path.find(')', startIndex + 2) 188 script = path[startIndex + 2:endIndex].split() 189 if len(script) == 2 and (script[0] == 'find'): 190 pkg = roslib.packages.get_pkg_dir(script[1]) 191 return os.path.join(pkg, path[endIndex + 2:].strip(os.path.sep)) 192 elif len(path) > 0 and path[0] != os.path.sep: 193 try: 194 return resolve_url(path) 195 except ValueError, _: 196 if len(path) > 0 and path[0] != os.path.sep: 197 return os.path.normpath(''.join([pwd, os.path.sep, path])) 198 return path
199 200 @classmethod
201 - def included_files(cls, text_or_path, 202 regexp_retruns=[], 203 regexp_filelist=[QRegExp("\\btextfile\\b"), 204 QRegExp("\\bfile\\b"), 205 QRegExp("\\bdefault\\b"), 206 QRegExp("\\bvalue=.*pkg:\/\/\\b"), 207 QRegExp("\\bvalue=.*package:\/\/\\b"), 208 QRegExp("\\bvalue=.*\$\(find\\b"), 209 QRegExp("\\bargs=.*\$\(find\\b")], 210 recursive=True, unique=True):
211 ''' 212 :param regexp_retruns: the list with patterns which are returned as result. If empy it's the same as 'regexp_filelist' 213 :param regexp_filelist: the list with all patterns to find include files 214 ''' 215 result = [] 216 lines = [] 217 pwd = '.' 218 f = QFile(text_or_path) 219 if f.exists(): 220 pwd = os.path.dirname(text_or_path) 221 with open(text_or_path, 'r') as f: 222 content = f.read() 223 # remove the comments 224 comment_pattern = QRegExp("<!--.*?-->") 225 pos = comment_pattern.indexIn(content) 226 while pos != -1: 227 content = content[:pos] + content[pos + comment_pattern.matchedLength():] 228 pos = comment_pattern.indexIn(content) 229 lines = content.splitlines() 230 else: 231 lines = [text_or_path] 232 line_index = 0 233 for line in lines: 234 index = cls._index(line, regexp_filelist) 235 if index > -1: 236 startIndex = line.find('"', index) 237 if startIndex > -1: 238 endIndex = line.find('"', startIndex + 1) 239 fileName = line[startIndex + 1:endIndex] 240 if len(fileName) > 0: 241 try: 242 path = cls.interpretPath(fileName, pwd) 243 if os.path.isfile(path): 244 if not regexp_retruns or cls._index(line, regexp_retruns) > -1: 245 if not unique: 246 result.append((line_index, path)) 247 else: 248 result.append(path) 249 ext = os.path.splitext(path) 250 if recursive and ext[1] in nm.settings().SEARCH_IN_EXT: 251 result += cls.included_files(path, regexp_retruns, regexp_filelist) 252 except Exception: 253 import traceback 254 print traceback.format_exc() 255 line_index += 1 256 if unique: 257 return list(set(result)) 258 return result
259
260 - def load(self, argv):
261 ''' 262 @param argv: the list with argv parameter needed to load the launch file. 263 The name and value are separated by C{:=} 264 @type argv: C{[str]} 265 @return: True, if the launch file was loaded 266 @rtype: boolean 267 @raise LaunchConfigException: on load errors 268 ''' 269 try: 270 roscfg = roslaunch.ROSLaunchConfig() 271 loader = roslaunch.XmlLoader() 272 self.argv = self.resolveArgs(argv) 273 loader.load(self.Filename, roscfg, verbose=False, argv=self.argv) 274 self.__roscfg = roscfg 275 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, self.included_files(self.Filename)) 276 if not nm.is_local(get_hostname(self.__masteruri)): 277 files = self.included_files(self.Filename, 278 regexp_retruns=[QRegExp("\\bdefault\\b"), 279 QRegExp("\\bvalue=.*pkg:\/\/\\b"), 280 QRegExp("\\bvalue=.*package:\/\/\\b"), 281 QRegExp("\\bvalue=.*\$\(find\\b"), 282 QRegExp("\\bargs=.*\$\(find\\b")]) 283 nm.file_watcher_param().add_launch(self.__masteruri, 284 self.__launchFile, 285 self.__launch_id, 286 files) 287 except roslaunch.XmlParseException, e: 288 test = list(re.finditer(r"environment variable '\w+' is not set", utf8(e))) 289 message = utf8(e) 290 if test: 291 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!']) 292 raise LaunchConfigException(message) 293 return True, self.argv
294
295 - def resolveArgs(self, argv):
296 argv_dict = self.argvToDict(argv) 297 # replace $(arg ...) in arg values 298 for k, _ in argv_dict.items(): 299 self._replaceArg(k, argv_dict, self.__argv_values) 300 return ["%s:=%s" % (k, v) for k, v in argv_dict.items()]
301
302 - def _replaceArg(self, arg, argv_defaults, argv_values):
303 ''' 304 Replace the arg-tags in the value in given argument recursively. 305 ''' 306 rec_inc = 0 307 value = argv_defaults[arg] 308 arg_match = re.search(r"\$\(\s*arg\s*", value) 309 while arg_match is not None: 310 rec_inc += 1 311 endIndex = value.find(')', arg_match.end()) 312 if endIndex > -1: 313 arg_name = value[arg_match.end():endIndex].strip() 314 if arg == arg_name: 315 raise LaunchConfigException("Can't resolve the argument `%s` argument: the argument referenced to itself!" % arg_name) 316 if rec_inc > 100: 317 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument: recursion depth of 100 reached!" % (arg_name, arg)) 318 if arg_name in argv_defaults: 319 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_defaults[arg_name]) 320 elif arg_name in argv_values: 321 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_values[arg_name]) 322 else: 323 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument" % (arg_name, arg)) 324 else: 325 raise LaunchConfigException("Can't resolve the argument in `%s` argument: `)` not found" % arg) 326 value = argv_defaults[arg] 327 arg_match = re.search(r"\$\(\s*arg\s*", value)
328
329 - def getArgs(self):
330 ''' 331 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will 332 be returned 333 @rtype: C{[str]} 334 @raise roslaunch.XmlParseException: on parse errors 335 ''' 336 self._argv_values = dict() 337 arg_subs = [] 338 args = [] 339 # get only the args in the top launch file 340 for filename in [self.Filename]: 341 try: 342 if filename.endswith('.launch'): 343 args[len(args):-1] = parse(filename).getElementsByTagName('arg') 344 except Exception as e: 345 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s" % e) 346 347 for arg in args: 348 arg_name = arg.getAttribute("name") 349 if not arg_name: 350 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s" % arg.toxml()) 351 352 # we only want argsargs at top level: 353 if not arg.parentNode.tagName == "launch": 354 continue 355 356 arg_default = arg.getAttribute("default") 357 arg_value = arg.getAttribute("value") 358 arg_sub = ''.join([arg_name, ':=', arg_default]) 359 if (not arg_value) and arg_sub not in arg_subs: 360 arg_subs.append(arg_sub) 361 elif arg_value: 362 self.__argv_values[arg_name] = arg_value 363 364 return arg_subs
365
366 - def _decode(self, val):
367 result = val.replace("\\n ", "\n") 368 try: 369 result = result.decode(sys.getfilesystemencoding()) 370 except: 371 pass 372 return result
373
374 - def getRobotDescr(self):
375 ''' 376 Parses the launch file for C{robots} parameter to get the description of the 377 robot. 378 @return: the robot description stored in the configuration 379 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))} 380 ''' 381 result = dict() 382 if self.Roscfg is not None: 383 for param, p in self.Roscfg.params.items(): 384 if param.endswith('robots'): 385 if isinstance(p.value, list): 386 if len(p.value) > 0 and len(p.value[0]) != 5: 387 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param 388 else: 389 for entry in p.value: 390 result[entry[0]] = {'type': entry[1], 'name': entry[2], 'images': resolve_paths(entry[3]).split(','), 'description': resolve_paths(self._decode(entry[4]))} 391 return result
392
393 - def getCapabilitiesDesrc(self):
394 ''' 395 Parses the launch file for C{capabilities} and C{capability_group} parameter 396 and creates dictionary for grouping the nodes. 397 @return: the capabilities description stored in this configuration 398 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))} 399 ''' 400 result = dict() 401 capabilies_descr = dict() 402 if self.Roscfg is not None: 403 # get the capabilities description 404 # use two separate loops, to create the description list first 405 # TODO read the group description depending on namespace 406 for param, p in self.Roscfg.params.items(): 407 if param.endswith('capabilities'): 408 if isinstance(p.value, list): 409 if len(p.value) > 0 and len(p.value[0]) != 4: 410 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param 411 else: 412 for entry in p.value: 413 capabilies_descr[entry[0]] = {'type': ''.join([entry[1]]), 'images': resolve_paths(entry[2]).split(','), 'description': resolve_paths(self._decode(entry[3]))} 414 # get the capability nodes 415 for item in self.Roscfg.nodes: 416 node_fullname = roslib.names.ns_join(item.namespace, item.name) 417 machine_name = item.machine_name if item.machine_name is not None and not item.machine_name == 'localhost' else '' 418 added = False 419 cap_param = roslib.names.ns_join(node_fullname, 'capability_group') 420 cap_ns = node_fullname 421 # find the capability group parameter in namespace 422 while cap_param not in self.Roscfg.params and cap_param.count(roslib.names.SEP) > 1: 423 cap_ns = roslib.names.namespace(cap_ns).rstrip(roslib.names.SEP) 424 if not cap_ns: 425 cap_ns = roslib.names.SEP 426 cap_param = roslib.names.ns_join(cap_ns, 'capability_group') 427 if cap_ns == node_fullname: 428 cap_ns = item.namespace.rstrip(roslib.names.SEP) 429 if not cap_ns: 430 cap_ns = roslib.names.SEP 431 # if the 'capability_group' parameter found, assign node to the group 432 if cap_param in self.Roscfg.params and self.Roscfg.params[cap_param].value: 433 p = self.Roscfg.params[cap_param] 434 if machine_name not in result: 435 result[machine_name] = dict() 436 for (ns, groups) in result[machine_name].items(): 437 if ns == cap_ns and p.value in groups: 438 groups[p.value]['nodes'].append(node_fullname) 439 added = True 440 break 441 if not added: 442 ns = cap_ns 443 # add new group in the namespace of the node 444 if ns not in result[machine_name]: 445 result[machine_name][ns] = dict() 446 if p.value not in result[machine_name][ns]: 447 try: 448 result[machine_name][ns][p.value] = {'type': capabilies_descr[p.value]['type'], 449 'images': capabilies_descr[p.value]['images'], 450 'description': capabilies_descr[p.value]['description'], 451 'nodes': []} 452 except: 453 result[machine_name][ns][p.value] = {'type': '', 454 'images': [], 455 'description': '', 456 'nodes': []} 457 result[machine_name][ns][p.value]['nodes'].append(node_fullname) 458 return result
459
460 - def argvToDict(self, argv):
461 result = dict() 462 for a in argv: 463 key, sep, value = a.partition(':=') 464 if sep: 465 result[key] = value 466 return result
467
468 - def getNode(self, name):
469 ''' 470 Returns a configuration node for a given node name. 471 @param name: the name of the node. 472 @type name: C{str} 473 @return: the configuration node stored in this configuration 474 @rtype: U{roslaunch.Node<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None} 475 ''' 476 nodename = os.path.basename(name) 477 namespace = os.path.dirname(name).strip(roslib.names.SEP) 478 for item in self.Roscfg.nodes: 479 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace): 480 return item 481 return None
482
483 - def get_robot_icon(self):
484 ''' 485 Returns the value of the `/robot_icon` parameter or None 486 ''' 487 try: 488 return self.Roscfg.params['/robot_icon'].value 489 except: 490 pass 491 return None
492