Package node_manager_fkie :: Module launch_config
[frames] | no frames]

Source Code for Module node_manager_fkie.launch_config

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  import os 
 34  import re 
 35  import sys 
 36  import time 
 37  import roslaunch 
 38  import roslib 
 39   
 40  from xml.dom.minidom import parse#, parseString 
 41  #from xml.dom import Node as DomNode #avoid aliasing 
 42   
 43  from python_qt_binding import QtCore 
 44   
 45  from master_discovery_fkie.common import resolve_url 
 46  import node_manager_fkie as nm 
 47  from common import package_name, resolve_paths 
48 49 -class LaunchConfigException(Exception):
50 pass
51
52 53 -class LaunchConfig(QtCore.QObject):
54 ''' 55 A class to handle the ROS configuration stored in launch file. 56 ''' 57
58 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
59 ''' 60 Creates the LaunchConfig object. The launch file will be not loaded on 61 creation, first on request of Roscfg value. 62 @param launch_file: The absolute or relative path with the launch file. 63 By using relative path a package must be valid for 64 remote launches. 65 @type launch_file: C{str} 66 @param package: the package containing the launch file. If None the 67 launch_file will be used to determine the launch file. 68 No remote launches a possible without a valid package. 69 @type package: C{str} or C{None} 70 @param masteruri: The URL of the ROS master. 71 @type masteruri: C{str} or C{None} 72 @param argv: the list the arguments needed for loading the given launch file 73 @type argv: C{[str]} 74 @raise roslaunch.XmlParseException: if the launch file can't be found. 75 ''' 76 QtCore.QObject.__init__(self) 77 self.__launchFile = launch_file 78 self.__package = package_name(os.path.dirname(self.__launchFile))[0] if package is None else package 79 self.__masteruri = masteruri if not masteruri is None else 'localhost' 80 self.__roscfg = None 81 self.argv = argv 82 self.__reqTested = False 83 self.__argv_values = dict() 84 self.global_param_done = [] # masteruri's where the global parameters are registered 85 self.hostname = nm.nameres().getHostname(self.__masteruri) 86 self.__launch_id = '%.9f'%time.time() 87 nm.file_watcher().add(self.__masteruri, self.__launchFile, self.__launch_id, [self.__launchFile])
88 # nm.file_watcher().add(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename)) 89 90
91 - def __del__(self):
92 # Delete to avoid segfault if the LaunchConfig class is destroyed recently 93 # after creation and xmlrpclib.ServerProxy process a method call. 94 # del self.file_watcher 95 nm.file_watcher().rem(self.__masteruri, self.__launchFile, self.__launch_id)
96 97 @property
98 - def masteruri(self):
99 ''' 100 Returns the master URI (host) where the node of this config will be started. 101 @rtype: C{str} 102 ''' 103 return self.__masteruri
104 105 @property
106 - def Roscfg(self):
107 ''' 108 Returns a loaded launch configuration 109 @rtype: L{roslaunch.ROSLaunchConfig} or C{None} 110 @raise LaunchConfigException: on load error 111 @see L{load()} 112 ''' 113 if not (self.__roscfg is None): 114 return self.__roscfg 115 else: 116 result, _ = self.load(self.argv)#_:=argv 117 if not result: 118 raise LaunchConfigException("not all argv are setted properly!") 119 return self.__roscfg
120 121 @property
122 - def Filename(self):
123 ''' 124 Returns an existing path with file name or an empty string. 125 @rtype: C{str} 126 ''' 127 if os.path.isfile(self.__launchFile): 128 return self.__launchFile 129 elif not (self.__package is None): 130 try: 131 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop() 132 except Exception: 133 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!'])) 134 raise LaunchConfigException(''.join(['launch file ', self.__launchFile, ' not found!']))
135 136 @property
137 - def LaunchName(self):
138 ''' 139 Returns the name of the launch file with extension, e.g. 'test.launch' 140 @rtype: C{str} 141 ''' 142 return os.path.basename(self.__launchFile)
143 144 @property
145 - def PackageName(self):
146 ''' 147 Returns the name of the package containing the launch file or None. 148 @rtype: C{str} or C{None} 149 ''' 150 return self.__package
151 152 @classmethod
153 - def _index(cls, text, regexp_list):
154 ''' 155 Searches in the given text for key indicates the including of a file and 156 return their index. 157 @param text: 158 @type text: C{str} 159 @param regexp_list: 160 @type regexp_list: C{[L{QtCore.QRegExp},..]} 161 @return: the index of the including key or -1 162 @rtype: C{int} 163 ''' 164 for pattern in regexp_list: 165 index = pattern.indexIn(text) 166 if index > -1: 167 return index 168 return -1
169 170 @classmethod
171 - def interpretPath(cls, path, pwd='.'):
172 ''' 173 Tries to determine the path of the included file. The statement of 174 $(find 'package') will be resolved. 175 The supported URL begins with `file:///`, `package://` or `pkg://`. 176 The package URL will be resolved to a valid file path. If the file is in a 177 subdirectory, you can replace the subdirectory by `///`. 178 @param path: the sting which contains the included path 179 @type path: C{str} 180 @param pwd: current working path 181 @type pwd: C{str} 182 @return: if no leading L{os.sep} is detected, the path set by L{setCurrentPath()} 183 will be prepend. C{$(find 'package')} will be resolved. Otherwise the parameter 184 itself will be returned 185 @rtype: C{str} 186 ''' 187 path = path.strip() 188 startIndex = path.find('$(') 189 if startIndex > -1: 190 endIndex = path.find(')', startIndex+2) 191 script = path[startIndex+2:endIndex].split() 192 if len(script) == 2 and (script[0] == 'find'): 193 pkg = roslib.packages.get_pkg_dir(script[1]) 194 return os.path.join(pkg, path[endIndex+2:].strip(os.path.sep)) 195 elif len(path) > 0 and path[0] != os.path.sep: 196 try: 197 return resolve_url(path) 198 except ValueError, _: 199 if len(path) > 0 and path[0] != os.path.sep: 200 return os.path.normpath(''.join([pwd, os.path.sep, path])) 201 return path
202 203 @classmethod
204 - def getIncludedFiles(cls, inc_file, regexp_list=[QtCore.QRegExp("\\binclude\\b"), 205 QtCore.QRegExp("\\btextfile\\b"), 206 QtCore.QRegExp("\\bfile\\b")]):
207 ''' 208 Reads the configuration file and searches for included files. This files 209 will be returned in a list. 210 @param inc_file: path of the ROS launch file 211 @param regexp_list: pattern of 212 @return: the list with all files needed for the configuration 213 @rtype: C{[str,...]} 214 ''' 215 result = set() 216 with open(inc_file, 'r') as f: 217 content = f.read() 218 # remove the comments 219 comment_pattern = QtCore.QRegExp("<!--.*?-->") 220 pos = comment_pattern.indexIn(content) 221 while pos != -1: 222 content = content[:pos] + content[pos+comment_pattern.matchedLength():] 223 pos = comment_pattern.indexIn(content) 224 lines = content.splitlines() 225 for line in lines: 226 index = cls._index(line, regexp_list) 227 if index > -1: 228 startIndex = line.find('"', index) 229 if startIndex > -1: 230 endIndex = line.find('"', startIndex+1) 231 fileName = line[startIndex+1:endIndex] 232 if len(fileName) > 0: 233 try: 234 path = cls.interpretPath(fileName, os.path.dirname(inc_file)) 235 if os.path.isfile(path): 236 result.add(path) 237 if path.endswith('.launch'): 238 result.update(cls.getIncludedFiles(path, regexp_list)) 239 except: 240 pass 241 return list(result)
242
243 - def load(self, argv):
244 ''' 245 @param argv: the list with argv parameter needed to load the launch file. 246 The name and value are separated by C{:=} 247 @type argv: C{[str]} 248 @return True, if the launch file was loaded 249 @rtype boolean 250 @raise LaunchConfigException: on load errors 251 ''' 252 try: 253 roscfg = roslaunch.ROSLaunchConfig() 254 loader = roslaunch.XmlLoader() 255 self.argv = self.resolveArgs(argv) 256 loader.load(self.Filename, roscfg, verbose=False, argv=self.argv) 257 self.__roscfg = roscfg 258 nm.file_watcher().add(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename)) 259 if not nm.is_local(nm.nameres().getHostname(self.__masteruri)): 260 nm.file_watcher_param().add(self.__masteruri, self.__launchFile, self.__launch_id, 261 self.getIncludedFiles(self.Filename, 262 regexp_list = [QtCore.QRegExp("\\bvalue=.*pkg:\/\/\\b"), 263 QtCore.QRegExp("\\bvalue=.*package:\/\/\\b"), 264 QtCore.QRegExp("\\bvalue=.*\$\(find\\b")]) 265 ) 266 except roslaunch.XmlParseException, e: 267 test = list(re.finditer(r"environment variable '\w+' is not set", str(e))) 268 message = str(e) 269 if test: 270 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!']) 271 raise LaunchConfigException(message) 272 return True, self.argv
273
274 - def resolveArgs(self, argv):
275 argv_dict = self.argvToDict(argv) 276 # replace $(arg ...) in arg values 277 for k, _ in argv_dict.items(): 278 self._replaceArg(k,argv_dict, self.__argv_values) 279 return ["%s:=%s"%(k,v) for k, v in argv_dict.items()]
280
281 - def _replaceArg(self, arg, argv_defaults, argv_values):
282 ''' 283 Replace the arg-tags in the value in given argument recursively. 284 ''' 285 rec_inc = 0 286 value = argv_defaults[arg] 287 arg_match = re.search(r"\$\(\s*arg\s*", value) 288 while not arg_match is None: 289 rec_inc += 1 290 endIndex = value.find(')', arg_match.end()) 291 if endIndex > -1: 292 arg_name = value[arg_match.end():endIndex].strip() 293 if arg == arg_name: 294 raise LaunchConfigException("Can't resolve the argument `%s` argument: the argument referenced to itself!"%arg_name) 295 if rec_inc > 100: 296 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument: recursion depth of 100 reached!"%(arg_name, arg)) 297 if argv_defaults.has_key(arg_name): 298 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex+1], argv_defaults[arg_name]) 299 elif argv_values.has_key(arg_name): 300 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex+1], argv_values[arg_name]) 301 else: 302 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument"%(arg_name, arg)) 303 else: 304 raise LaunchConfigException("Can't resolve the argument in `%s` argument: `)` not found"%arg) 305 value = argv_defaults[arg] 306 arg_match = re.search(r"\$\(\s*arg\s*", value)
307
308 - def getArgs(self):
309 ''' 310 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will 311 be returned 312 @rtype: C{[str]} 313 @raise roslaunch.XmlParseException: on parse errors 314 ''' 315 self._argv_values = dict() 316 arg_subs = [] 317 args = [] 318 # for filename in self.getIncludedFiles(self.Filename): 319 # get only the args in the top launch file 320 for filename in [self.Filename]: 321 try: 322 if filename.endswith('.launch'): 323 args[len(args):-1] = parse(filename).getElementsByTagName('arg') 324 except Exception as e: 325 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s"%e) 326 327 for arg in args: 328 arg_name = arg.getAttribute("name") 329 if not arg_name: 330 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s"%arg.toxml()) 331 332 # we only want argsargs at top level: 333 if not arg.parentNode.tagName=="launch": 334 continue 335 336 arg_default = arg.getAttribute("default") 337 arg_value = arg.getAttribute("value") 338 arg_sub = ''.join([arg_name, ':=', arg_default]) 339 if (not arg_value) and not arg_sub in arg_subs: 340 arg_subs.append(arg_sub) 341 elif arg_value: 342 self.__argv_values[arg_name] = arg_value 343 344 return arg_subs
345
346 - def _decode(self, val):
347 result = val.replace("\\n ", "\n") 348 try: 349 result = result.decode(sys.getfilesystemencoding()) 350 except: 351 pass 352 return result
353
354 - def getRobotDescr(self):
355 ''' 356 Parses the launch file for C{robots} parameter to get the description of the 357 robot. 358 @return: the robot description stored in the configuration 359 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))} 360 ''' 361 result = dict() 362 if not self.Roscfg is None: 363 for param, p in self.Roscfg.params.items(): 364 if param.endswith('robots'): 365 if isinstance(p.value, list): 366 if len(p.value) > 0 and len(p.value[0]) != 5: 367 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param 368 else: 369 for entry in p.value: 370 result[entry[0]] = { 'type' : entry[1], 'name' : entry[2], 'images' : resolve_paths(entry[3]).split(','), 'description' : resolve_paths(self._decode(entry[4])) } 371 return result
372
373 - def getCapabilitiesDesrc(self):
374 ''' 375 Parses the launch file for C{capabilities} and C{capability_group} parameter 376 and creates dictionary for grouping the nodes. 377 @return: the capabilities description stored in this configuration 378 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))} 379 ''' 380 result = dict() 381 capabilies_descr = dict() 382 if not self.Roscfg is None: 383 # get the capabilities description 384 # use two separate loops, to create the description list first 385 # TODO read the group description depending on namespace 386 for param, p in self.Roscfg.params.items(): 387 if param.endswith('capabilities'): 388 if isinstance(p.value, list): 389 if len(p.value) > 0 and len(p.value[0]) != 4: 390 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param 391 else: 392 for entry in p.value: 393 capabilies_descr[entry[0]] = { 'type' : ''.join([entry[1]]), 'images' : resolve_paths(entry[2]).split(','), 'description' : resolve_paths(self._decode(entry[3]))} 394 # get the capability nodes 395 for item in self.Roscfg.nodes: 396 node_fullname = roslib.names.ns_join(item.namespace, item.name) 397 machine_name = item.machine_name if not item.machine_name is None and not item.machine_name == 'localhost' else '' 398 added = False 399 cap_param = roslib.names.ns_join(node_fullname, 'capability_group') 400 cap_ns = node_fullname 401 #find the capability group parameter in namespace 402 while not self.Roscfg.params.has_key(cap_param) and cap_param.count(roslib.names.SEP) > 1: 403 cap_ns = roslib.names.namespace(cap_ns).rstrip(roslib.names.SEP) 404 if not cap_ns: 405 cap_ns = roslib.names.SEP 406 cap_param = roslib.names.ns_join(cap_ns, 'capability_group') 407 if cap_ns == node_fullname: 408 cap_ns = item.namespace.rstrip(roslib.names.SEP) 409 # if the 'capability_group' parameter found, assign node to the group 410 if self.Roscfg.params.has_key(cap_param) and self.Roscfg.params[cap_param].value: 411 p = self.Roscfg.params[cap_param] 412 if not result.has_key(machine_name): 413 result[machine_name] = dict() 414 for (ns, groups) in result[machine_name].items(): 415 if ns == cap_ns and groups.has_key(p.value): 416 groups[p.value]['nodes'].append(node_fullname) 417 added = True 418 break 419 if not added: 420 ns = cap_ns 421 # add new group in the namespace of the node 422 if not result[machine_name].has_key(ns): 423 result[machine_name][ns] = dict() 424 if not result[machine_name][ns].has_key(p.value): 425 try: 426 result[machine_name][ns][p.value] = { 'type' : capabilies_descr[p.value]['type'], 'images': capabilies_descr[p.value]['images'], 'description' : capabilies_descr[p.value]['description'], 'nodes' : [] } 427 except: 428 result[machine_name][ns][p.value] = { 'type' : '', 'images': [], 'description' : '', 'nodes' : [] } 429 result[machine_name][ns][p.value]['nodes'].append(node_fullname) 430 return result
431
432 - def argvToDict(self, argv):
433 result = dict() 434 for a in argv: 435 key, sep, value = a.partition(':=') 436 if sep: 437 result[key] = value 438 return result
439
440 - def getNode(self, name):
441 ''' 442 Returns a configuration node for a given node name. 443 @param name: the name of the node. 444 @type name: C{str} 445 @return: the configuration node stored in this configuration 446 @rtype: L{roslaunch.Node} or C{None} 447 ''' 448 nodename = os.path.basename(name) 449 namespace = os.path.dirname(name).strip(roslib.names.SEP) 450 for item in self.Roscfg.nodes: 451 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace): 452 return item 453 return None
454
455 - def get_robot_icon(self):
456 ''' 457 Returns the value of the `/robot_icon` parameter or None 458 ''' 459 try: 460 return self.Roscfg.params['/robot_icon'].value 461 except: 462 pass 463 return None
464