Package node_manager_fkie :: Module launch_config
[frames] | no frames]

Source Code for Module node_manager_fkie.launch_config

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from python_qt_binding.QtCore import QObject, QRegExp 
 34  from xml.dom.minidom import parse  # , parseString 
 35  import os 
 36  import re 
 37  import sys 
 38  import time 
 39   
 40  import roslaunch 
 41  import roslib 
 42   
 43  from master_discovery_fkie.common import get_hostname, resolve_url 
 44  import node_manager_fkie as nm 
 45   
 46  from .common import package_name, resolve_paths 
47 48 49 # from xml.dom import Node as DomNode #avoid aliasing 50 -class LaunchConfigException(Exception):
51 pass
52
53 54 -class LaunchConfig(QObject):
55 ''' 56 A class to handle the ROS configuration stored in launch file. 57 ''' 58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 ''' 61 Creates the LaunchConfig object. The launch file will be not loaded on 62 creation, first on request of Roscfg value. 63 @param launch_file: The absolute or relative path with the launch file. 64 By using relative path a package must be valid for 65 remote launches. 66 @type launch_file: C{str} 67 @param package: the package containing the launch file. If None the 68 launch_file will be used to determine the launch file. 69 No remote launches a possible without a valid package. 70 @type package: C{str} or C{None} 71 @param masteruri: The URL of the ROS master. 72 @type masteruri: C{str} or C{None} 73 @param argv: the list the arguments needed for loading the given launch file 74 @type argv: C{[str]} 75 @raise roslaunch.XmlParseException: if the launch file can't be found. 76 ''' 77 QObject.__init__(self) 78 self.__launchFile = launch_file 79 self.__package = package_name(os.path.dirname(self.__launchFile))[0] if package is None else package 80 self.__masteruri = masteruri if masteruri is not None else 'localhost' 81 self.__roscfg = None 82 self.argv = argv 83 self.__reqTested = False 84 self.__argv_values = dict() 85 self.global_param_done = [] # masteruri's where the global parameters are registered 86 self.hostname = get_hostname(self.__masteruri) 87 self.__launch_id = '%.9f' % time.time() 88 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, [self.__launchFile])
89 # nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename)) 90
91 - def __del__(self):
92 # Delete to avoid segfault if the LaunchConfig class is destroyed recently 93 # after creation and xmlrpclib.ServerProxy process a method call. 94 nm.filewatcher().rem_launch(self.__masteruri, self.__launchFile, self.__launch_id)
95 96 @property
97 - def masteruri(self):
98 ''' 99 Returns the master URI (host) where the node of this config will be started. 100 @rtype: C{str} 101 ''' 102 return self.__masteruri
103 104 @property
105 - def Roscfg(self):
106 ''' 107 Holds a loaded launch configuration. It raises a LaunchConfigException on load error. 108 @rtype: U{roslaunch.ROSLaunchConfig<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None} 109 @see L{load()} 110 ''' 111 if self.__roscfg is not None: 112 return self.__roscfg 113 else: 114 result, _ = self.load(self.argv) # _:=argv 115 if not result: 116 raise LaunchConfigException("not all argv are setted properly!") 117 return self.__roscfg
118 119 @property
120 - def Filename(self):
121 ''' 122 Returns an existing path with file name or an empty string. 123 @rtype: C{str} 124 ''' 125 if os.path.isfile(self.__launchFile): 126 return self.__launchFile 127 elif self.__package is not None: 128 try: 129 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop() 130 except Exception: 131 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!'])) 132 raise LaunchConfigException(''.join(['launch file ', self.__launchFile, ' not found!']))
133 134 @property
135 - def LaunchName(self):
136 ''' 137 Returns the name of the launch file with extension, e.g. 'test.launch' 138 @rtype: C{str} 139 ''' 140 return os.path.basename(self.__launchFile)
141 142 @property
143 - def PackageName(self):
144 ''' 145 Returns the name of the package containing the launch file or None. 146 @rtype: C{str} or C{None} 147 ''' 148 return self.__package
149 150 @classmethod
151 - def _index(cls, text, regexp_list):
152 ''' 153 Searches in the given text for key indicates the including of a file and 154 return their index. 155 @param text: 156 @type text: C{str} 157 @param regexp_list: 158 @type regexp_list: C{[U{QRegExp<https://srinikom.github.io/pyside-docs/PySide/QtCore/QRegExp.html>},..]} 159 @return: the index of the including key or -1 160 @rtype: C{int} 161 ''' 162 for pattern in regexp_list: 163 index = pattern.indexIn(text) 164 if index > -1: 165 return index 166 return -1
167 168 @classmethod
169 - def interpretPath(cls, path, pwd='.'):
170 ''' 171 Tries to determine the path of the included file. The statement of 172 $(find 'package') will be resolved. 173 The supported URL begins with `file:///`, `package://` or `pkg://`. 174 The package URL will be resolved to a valid file path. If the file is in a 175 subdirectory, you can replace the subdirectory by `///`. 176 @param path: the sting which contains the included path 177 @type path: C{str} 178 @param pwd: current working path 179 @type pwd: C{str} 180 @return: C{$(find 'package')} will be resolved. The prefixes `file:///`, 181 `package://` or `pkg://` are also resolved. Otherwise the parameter 182 itself will be returned. 183 @rtype: C{str} 184 ''' 185 path = path.strip() 186 startIndex = path.find('$(') 187 if startIndex > -1: 188 endIndex = path.find(')', startIndex + 2) 189 script = path[startIndex + 2:endIndex].split() 190 if len(script) == 2 and (script[0] == 'find'): 191 pkg = roslib.packages.get_pkg_dir(script[1]) 192 return os.path.join(pkg, path[endIndex + 2:].strip(os.path.sep)) 193 elif len(path) > 0 and path[0] != os.path.sep: 194 try: 195 return resolve_url(path) 196 except ValueError, _: 197 if len(path) > 0 and path[0] != os.path.sep: 198 return os.path.normpath(''.join([pwd, os.path.sep, path])) 199 return path
200 201 @classmethod
202 - def getIncludedFiles(cls, inc_file, regexp_list=[QRegExp("\\binclude\\b"), 203 QRegExp("\\btextfile\\b"), 204 QRegExp("\\bfile\\b"), 205 QRegExp("\\bdefault\\b"), 206 QRegExp("\\bvalue=.*pkg:\/\/\\b"), 207 QRegExp("\\bvalue=.*package:\/\/\\b"), 208 QRegExp("\\bvalue=.*\$\(find\\b")]):
209 ''' 210 Reads the configuration file and searches for included files. This files 211 will be returned in a list. 212 @param inc_file: path of the ROS launch file 213 @param regexp_list: pattern of 214 @return: the list with all files needed for the configuration 215 @rtype: C{[str,...]} 216 ''' 217 result = set() 218 with open(inc_file, 'r') as f: 219 content = f.read() 220 # remove the comments 221 comment_pattern = QRegExp("<!--.*?-->") 222 pos = comment_pattern.indexIn(content) 223 while pos != -1: 224 content = content[:pos] + content[pos + comment_pattern.matchedLength():] 225 pos = comment_pattern.indexIn(content) 226 lines = content.splitlines() 227 for line in lines: 228 index = cls._index(line, regexp_list) 229 if index > -1: 230 startIndex = line.find('"', index) 231 if startIndex > -1: 232 endIndex = line.find('"', startIndex + 1) 233 fileName = line[startIndex + 1:endIndex] 234 if len(fileName) > 0: 235 try: 236 path = cls.interpretPath(fileName, os.path.dirname(inc_file)) 237 if os.path.isfile(path): 238 result.add(path) 239 if path.endswith('.launch'): 240 result.update(cls.getIncludedFiles(path, regexp_list)) 241 except: 242 pass 243 return list(result)
244
245 - def load(self, argv):
246 ''' 247 @param argv: the list with argv parameter needed to load the launch file. 248 The name and value are separated by C{:=} 249 @type argv: C{[str]} 250 @return: True, if the launch file was loaded 251 @rtype: boolean 252 @raise LaunchConfigException: on load errors 253 ''' 254 try: 255 roscfg = roslaunch.ROSLaunchConfig() 256 loader = roslaunch.XmlLoader() 257 self.argv = self.resolveArgs(argv) 258 loader.load(self.Filename, roscfg, verbose=False, argv=self.argv) 259 self.__roscfg = roscfg 260 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename)) 261 if not nm.is_local(get_hostname(self.__masteruri)): 262 files = self.getIncludedFiles(self.Filename, 263 regexp_list=[QRegExp("\\bdefault\\b"), 264 QRegExp("\\bvalue=.*pkg:\/\/\\b"), 265 QRegExp("\\bvalue=.*package:\/\/\\b"), 266 QRegExp("\\bvalue=.*\$\(find\\b")]) 267 nm.file_watcher_param().add_launch(self.__masteruri, 268 self.__launchFile, 269 self.__launch_id, 270 files) 271 except roslaunch.XmlParseException, e: 272 test = list(re.finditer(r"environment variable '\w+' is not set", str(e))) 273 message = str(e) 274 if test: 275 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!']) 276 raise LaunchConfigException(message) 277 return True, self.argv
278
279 - def resolveArgs(self, argv):
280 argv_dict = self.argvToDict(argv) 281 # replace $(arg ...) in arg values 282 for k, _ in argv_dict.items(): 283 self._replaceArg(k, argv_dict, self.__argv_values) 284 return ["%s:=%s" % (k, v) for k, v in argv_dict.items()]
285
286 - def _replaceArg(self, arg, argv_defaults, argv_values):
287 ''' 288 Replace the arg-tags in the value in given argument recursively. 289 ''' 290 rec_inc = 0 291 value = argv_defaults[arg] 292 arg_match = re.search(r"\$\(\s*arg\s*", value) 293 while arg_match is not None: 294 rec_inc += 1 295 endIndex = value.find(')', arg_match.end()) 296 if endIndex > -1: 297 arg_name = value[arg_match.end():endIndex].strip() 298 if arg == arg_name: 299 raise LaunchConfigException("Can't resolve the argument `%s` argument: the argument referenced to itself!" % arg_name) 300 if rec_inc > 100: 301 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument: recursion depth of 100 reached!" % (arg_name, arg)) 302 if arg_name in argv_defaults: 303 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_defaults[arg_name]) 304 elif arg_name in argv_values: 305 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_values[arg_name]) 306 else: 307 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument" % (arg_name, arg)) 308 else: 309 raise LaunchConfigException("Can't resolve the argument in `%s` argument: `)` not found" % arg) 310 value = argv_defaults[arg] 311 arg_match = re.search(r"\$\(\s*arg\s*", value)
312
313 - def getArgs(self):
314 ''' 315 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will 316 be returned 317 @rtype: C{[str]} 318 @raise roslaunch.XmlParseException: on parse errors 319 ''' 320 self._argv_values = dict() 321 arg_subs = [] 322 args = [] 323 # for filename in self.getIncludedFiles(self.Filename): 324 # get only the args in the top launch file 325 for filename in [self.Filename]: 326 try: 327 if filename.endswith('.launch'): 328 args[len(args):-1] = parse(filename).getElementsByTagName('arg') 329 except Exception as e: 330 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s" % e) 331 332 for arg in args: 333 arg_name = arg.getAttribute("name") 334 if not arg_name: 335 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s" % arg.toxml()) 336 337 # we only want argsargs at top level: 338 if not arg.parentNode.tagName == "launch": 339 continue 340 341 arg_default = arg.getAttribute("default") 342 arg_value = arg.getAttribute("value") 343 arg_sub = ''.join([arg_name, ':=', arg_default]) 344 if (not arg_value) and arg_sub not in arg_subs: 345 arg_subs.append(arg_sub) 346 elif arg_value: 347 self.__argv_values[arg_name] = arg_value 348 349 return arg_subs
350
351 - def _decode(self, val):
352 result = val.replace("\\n ", "\n") 353 try: 354 result = result.decode(sys.getfilesystemencoding()) 355 except: 356 pass 357 return result
358
359 - def getRobotDescr(self):
360 ''' 361 Parses the launch file for C{robots} parameter to get the description of the 362 robot. 363 @return: the robot description stored in the configuration 364 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))} 365 ''' 366 result = dict() 367 if self.Roscfg is not None: 368 for param, p in self.Roscfg.params.items(): 369 if param.endswith('robots'): 370 if isinstance(p.value, list): 371 if len(p.value) > 0 and len(p.value[0]) != 5: 372 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param 373 else: 374 for entry in p.value: 375 result[entry[0]] = {'type': entry[1], 'name': entry[2], 'images': resolve_paths(entry[3]).split(','), 'description': resolve_paths(self._decode(entry[4]))} 376 return result
377
378 - def getCapabilitiesDesrc(self):
379 ''' 380 Parses the launch file for C{capabilities} and C{capability_group} parameter 381 and creates dictionary for grouping the nodes. 382 @return: the capabilities description stored in this configuration 383 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))} 384 ''' 385 result = dict() 386 capabilies_descr = dict() 387 if self.Roscfg is not None: 388 # get the capabilities description 389 # use two separate loops, to create the description list first 390 # TODO read the group description depending on namespace 391 for param, p in self.Roscfg.params.items(): 392 if param.endswith('capabilities'): 393 if isinstance(p.value, list): 394 if len(p.value) > 0 and len(p.value[0]) != 4: 395 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param 396 else: 397 for entry in p.value: 398 capabilies_descr[entry[0]] = {'type': ''.join([entry[1]]), 'images': resolve_paths(entry[2]).split(','), 'description': resolve_paths(self._decode(entry[3]))} 399 # get the capability nodes 400 for item in self.Roscfg.nodes: 401 node_fullname = roslib.names.ns_join(item.namespace, item.name) 402 machine_name = item.machine_name if item.machine_name is not None and not item.machine_name == 'localhost' else '' 403 added = False 404 cap_param = roslib.names.ns_join(node_fullname, 'capability_group') 405 cap_ns = node_fullname 406 # find the capability group parameter in namespace 407 while cap_param not in self.Roscfg.params and cap_param.count(roslib.names.SEP) > 1: 408 cap_ns = roslib.names.namespace(cap_ns).rstrip(roslib.names.SEP) 409 if not cap_ns: 410 cap_ns = roslib.names.SEP 411 cap_param = roslib.names.ns_join(cap_ns, 'capability_group') 412 if cap_ns == node_fullname: 413 cap_ns = item.namespace.rstrip(roslib.names.SEP) 414 if not cap_ns: 415 cap_ns = roslib.names.SEP 416 # if the 'capability_group' parameter found, assign node to the group 417 if cap_param in self.Roscfg.params and self.Roscfg.params[cap_param].value: 418 p = self.Roscfg.params[cap_param] 419 if machine_name not in result: 420 result[machine_name] = dict() 421 for (ns, groups) in result[machine_name].items(): 422 if ns == cap_ns and p.value in groups: 423 groups[p.value]['nodes'].append(node_fullname) 424 added = True 425 break 426 if not added: 427 ns = cap_ns 428 # add new group in the namespace of the node 429 if ns not in result[machine_name]: 430 result[machine_name][ns] = dict() 431 if p.value not in result[machine_name][ns]: 432 try: 433 result[machine_name][ns][p.value] = {'type': capabilies_descr[p.value]['type'], 434 'images': capabilies_descr[p.value]['images'], 435 'description': capabilies_descr[p.value]['description'], 436 'nodes': []} 437 except: 438 result[machine_name][ns][p.value] = {'type': '', 439 'images': [], 440 'description': '', 441 'nodes': []} 442 result[machine_name][ns][p.value]['nodes'].append(node_fullname) 443 return result
444
445 - def argvToDict(self, argv):
446 result = dict() 447 for a in argv: 448 key, sep, value = a.partition(':=') 449 if sep: 450 result[key] = value 451 return result
452
453 - def getNode(self, name):
454 ''' 455 Returns a configuration node for a given node name. 456 @param name: the name of the node. 457 @type name: C{str} 458 @return: the configuration node stored in this configuration 459 @rtype: U{roslaunch.Node<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None} 460 ''' 461 nodename = os.path.basename(name) 462 namespace = os.path.dirname(name).strip(roslib.names.SEP) 463 for item in self.Roscfg.nodes: 464 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace): 465 return item 466 return None
467
468 - def get_robot_icon(self):
469 ''' 470 Returns the value of the `/robot_icon` parameter or None 471 ''' 472 try: 473 return self.Roscfg.params['/robot_icon'].value 474 except: 475 pass 476 return None
477