Package node_manager_fkie :: Module launch_config
[frames] | no frames]

Source Code for Module node_manager_fkie.launch_config

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  import os 
 34  import re 
 35  import sys 
 36  import time 
 37  from ros import roslaunch 
 38  import rospy 
 39  import roslib 
 40   
 41  from xml.dom.minidom import parse, parseString  
 42  #from xml.dom import Node as DomNode #avoid aliasing 
 43   
 44  from python_qt_binding import QtCore 
 45   
 46  from master_discovery_fkie.common import resolve_url 
 47  import node_manager_fkie as nm 
 48  from common import package_name 
49 50 -class LaunchConfigException(Exception):
51 pass
52
53 54 -class LaunchConfig(QtCore.QObject):
55 ''' 56 A class to handle the ROS configuration stored in launch file. 57 ''' 58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 ''' 61 Creates the LaunchConfig object. The launch file will be not loaded on 62 creation, first on request of Roscfg value. 63 @param launch_file: The absolute or relative path with the launch file. 64 By using relative path a package must be valid for 65 remote launches. 66 @type launch_file: C{str} 67 @param package: the package containing the launch file. If None the 68 launch_file will be used to determine the launch file. 69 No remote launches a possible without a valid package. 70 @type package: C{str} or C{None} 71 @param masteruri: The URL of the ROS master. 72 @type masteruri: C{str} or C{None} 73 @param argv: the list the arguments needed for loading the given launch file 74 @type argv: C{[str]} 75 @raise roslaunch.XmlParseException: if the launch file can't be found. 76 ''' 77 QtCore.QObject.__init__(self) 78 self.__launchFile = launch_file 79 self.__package = package_name(os.path.dirname(self.__launchFile))[0] if package is None else package 80 self.__masteruri = masteruri if not masteruri is None else 'localhost' 81 self.__roscfg = None 82 self.argv = argv 83 self.__reqTested = False 84 self.__argv_values = dict() 85 self.global_param_done = [] # masteruri's where the global parameters are registered 86 self.hostname = nm.nameres().getHostname(self.__masteruri) 87 self.__launch_id = '%.9f'%time.time() 88 nm.file_watcher().add(self.__masteruri, self.__launchFile, self.__launch_id, [self.__launchFile])
89 # nm.file_watcher().add(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename)) 90 91
92 - def __del__(self):
93 # Delete to avoid segfault if the LaunchConfig class is destroyed recently 94 # after creation and xmlrpclib.ServerProxy process a method call. 95 # del self.file_watcher 96 nm.file_watcher().rem(self.__masteruri, self.__launchFile, self.__launch_id)
97 98 @property
99 - def masteruri(self):
100 ''' 101 Returns the master URI (host) where the node of this config will be started. 102 @rtype: C{str} 103 ''' 104 return self.__masteruri
105 106 @property
107 - def Roscfg(self):
108 ''' 109 Returns a loaded launch configuration 110 @rtype: L{roslaunch.ROSLaunchConfig} or C{None} 111 @raise LaunchConfigException: on load error 112 @see L{load()} 113 ''' 114 if not (self.__roscfg is None): 115 return self.__roscfg 116 else: 117 result, argv = self.load(self.argv) 118 if not result: 119 raise LaunchConfigException("not all argv are setted properly!") 120 return self.__roscfg
121 122 @property
123 - def Filename(self):
124 ''' 125 Returns an existing path with file name or an empty string. 126 @rtype: C{str} 127 ''' 128 if os.path.isfile(self.__launchFile): 129 return self.__launchFile 130 elif not (self.__package is None): 131 try: 132 import roslib 133 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop() 134 except Exception: 135 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!'])) 136 raise LaunchConfigException(''.join(['launch file ', self.__launchFile, ' not found!']))
137 138 @property
139 - def LaunchName(self):
140 ''' 141 Returns the name of the launch file with extension, e.g. 'test.launch' 142 @rtype: C{str} 143 ''' 144 return os.path.basename(self.__launchFile)
145 146 @property
147 - def PackageName(self):
148 ''' 149 Returns the name of the package containing the launch file or None. 150 @rtype: C{str} or C{None} 151 ''' 152 return self.__package
153 154 @classmethod
155 - def _index(cls, text, regexp_list):
156 ''' 157 Searches in the given text for key indicates the including of a file and 158 return their index. 159 @param text: 160 @type text: C{str} 161 @param regexp_list: 162 @type regexp_list: C{[L{QtCore.QRegExp},..]} 163 @return: the index of the including key or -1 164 @rtype: C{int} 165 ''' 166 for pattern in regexp_list: 167 index = pattern.indexIn(text) 168 if index > -1: 169 return index 170 return -1
171 172 @classmethod
173 - def interpretPath(cls, path, pwd='.'):
174 ''' 175 Tries to determine the path of the included file. The statement of 176 $(find 'package') will be resolved. 177 The supported URL begins with `file:///`, `package://` or `pkg://`. 178 The package URL will be resolved to a valid file path. If the file is in a 179 subdirectory, you can replace the subdirectory by `///`. 180 @param path: the sting which contains the included path 181 @type path: C{str} 182 @param pwd: current working path 183 @type pwd: C{str} 184 @return: if no leading L{os.sep} is detected, the path set by L{setCurrentPath()} 185 will be prepend. C{$(find 'package')} will be resolved. Otherwise the parameter 186 itself will be returned 187 @rtype: C{str} 188 ''' 189 path = path.strip() 190 startIndex = path.find('$(') 191 if startIndex > -1: 192 endIndex = path.find(')', startIndex+2) 193 script = path[startIndex+2:endIndex].split() 194 if len(script) == 2 and (script[0] == 'find'): 195 pkg = roslib.packages.get_pkg_dir(script[1]) 196 return os.path.join(pkg, path[endIndex+2:].strip(os.path.sep)) 197 elif len(path) > 0 and path[0] != os.path.sep: 198 try: 199 return resolve_url(path) 200 except ValueError, e: 201 if len(path) > 0 and path[0] != os.path.sep: 202 return os.path.normpath(''.join([pwd, os.path.sep, path])) 203 return path
204 205 @classmethod
206 - def getIncludedFiles(cls, file, regexp_list=[QtCore.QRegExp("\\binclude\\b"), 207 QtCore.QRegExp("\\btextfile\\b"), 208 QtCore.QRegExp("\\bfile\\b")]):
209 ''' 210 Reads the configuration file and searches for included files. This files 211 will be returned in a list. 212 @param file: path of the ROS launch file 213 @param regexp_list: pattern of 214 @return: the list with all files needed for the configuration 215 @rtype: C{[str,...]} 216 ''' 217 result = set() 218 with open(file, 'r') as f: 219 content = f.read() 220 # remove the comments 221 comment_pattern = QtCore.QRegExp("<!--.*?-->") 222 pos = comment_pattern.indexIn(content) 223 while pos != -1: 224 content = content[:pos] + content[pos+comment_pattern.matchedLength():] 225 pos = comment_pattern.indexIn(content) 226 lines = content.splitlines() 227 for line in lines: 228 index = cls._index(line, regexp_list) 229 if index > -1: 230 startIndex = line.find('"', index) 231 if startIndex > -1: 232 endIndex = line.find('"', startIndex+1) 233 fileName = line[startIndex+1:endIndex] 234 if len(fileName) > 0: 235 try: 236 path = cls.interpretPath(fileName, os.path.dirname(file)) 237 if os.path.isfile(path): 238 result.add(path) 239 if path.endswith('.launch'): 240 result.update(cls.getIncludedFiles(path, regexp_list)) 241 except: 242 pass 243 return list(result)
244
245 - def load(self, argv):
246 ''' 247 @param argv: the list with argv parameter needed to load the launch file. 248 The name and value are separated by C{:=} 249 @type argv: C{[str]} 250 @return True, if the launch file was loaded 251 @rtype boolean 252 @raise LaunchConfigException: on load errors 253 ''' 254 try: 255 roscfg = roslaunch.ROSLaunchConfig() 256 loader = roslaunch.XmlLoader() 257 self.argv = self.resolveArgs(argv) 258 loader.load(self.Filename, roscfg, verbose=False, argv=self.argv) 259 self.__roscfg = roscfg 260 nm.file_watcher().add(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename)) 261 if not nm.is_local(nm.nameres().getHostname(self.__masteruri)): 262 nm.file_watcher_param().add(self.__masteruri, self.__launchFile, self.__launch_id, 263 self.getIncludedFiles(self.Filename, 264 regexp_list = [QtCore.QRegExp("\\bvalue=.*pkg:\/\/\\b"), 265 QtCore.QRegExp("\\bvalue=.*package:\/\/\\b"), 266 QtCore.QRegExp("\\bvalue=.*\$\(find\\b")]) 267 ) 268 except roslaunch.XmlParseException, e: 269 test = list(re.finditer(r"environment variable '\w+' is not set", str(e))) 270 message = str(e) 271 if test: 272 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!']) 273 raise LaunchConfigException(message) 274 return True, self.argv
275
276 - def resolveArgs(self, argv):
277 argv_dict = self.argvToDict(argv) 278 # replace $(arg ...) in arg values 279 for k, v in argv_dict.items(): 280 self._replaceArg(k,argv_dict, self.__argv_values) 281 return ["%s:=%s"%(k,v) for k, v in argv_dict.items()]
282
283 - def _replaceArg(self, arg, argv_defaults, argv_values):
284 ''' 285 Replace the arg-tags in the value in given argument recursively. 286 ''' 287 rec_inc = 0 288 value = argv_defaults[arg] 289 arg_match = re.search(r"\$\(\s*arg\s*", value) 290 while not arg_match is None: 291 rec_inc += 1 292 endIndex = value.find(')', arg_match.end()) 293 if endIndex > -1: 294 arg_name = value[arg_match.end():endIndex].strip() 295 if arg == arg_name: 296 raise LaunchConfigException("Can't resolve the argument `%s` argument: the argument referenced to itself!"%arg_name) 297 if rec_inc > 100: 298 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument: recursion depth of 100 reached!"%(arg_name, arg)) 299 if argv_defaults.has_key(arg_name): 300 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex+1], argv_defaults[arg_name]) 301 elif argv_values.has_key(arg_name): 302 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex+1], argv_values[arg_name]) 303 else: 304 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument"%(arg_name, arg)) 305 else: 306 raise LaunchConfigException("Can't resolve the argument in `%s` argument: `)` not found"%arg) 307 value = argv_defaults[arg] 308 arg_match = re.search(r"\$\(\s*arg\s*", value)
309
310 - def getArgs(self):
311 ''' 312 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will 313 be returned 314 @rtype: C{[str]} 315 @raise roslaunch.XmlParseException: on parse errors 316 ''' 317 self._argv_values = dict() 318 arg_subs = [] 319 args = [] 320 # for filename in self.getIncludedFiles(self.Filename): 321 # get only the args in the top launch file 322 for filename in [self.Filename]: 323 try: 324 if filename.endswith('.launch'): 325 args[len(args):-1] = parse(filename).getElementsByTagName('arg') 326 except Exception as e: 327 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s"%e) 328 329 for arg in args: 330 arg_name = arg.getAttribute("name") 331 if not arg_name: 332 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s"%arg.toxml()) 333 334 # we only want argsargs at top level: 335 if not arg.parentNode.tagName=="launch": 336 continue 337 338 arg_default = arg.getAttribute("default") 339 arg_value = arg.getAttribute("value") 340 arg_sub = ''.join([arg_name, ':=', arg_default]) 341 if (not arg_value) and not arg_sub in arg_subs: 342 arg_subs.append(arg_sub) 343 elif arg_value: 344 self.__argv_values[arg_name] = arg_value 345 346 return arg_subs
347
348 - def _decode(self, val):
349 result = val.replace("\\n ", "\n") 350 try: 351 result = result.decode(sys.getfilesystemencoding()) 352 except: 353 pass 354 return result
355
356 - def getRobotDescr(self):
357 ''' 358 Parses the launch file for C{robots} parameter to get the description of the 359 robot. 360 @return: the robot description stored in the configuration 361 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))} 362 ''' 363 result = dict() 364 if not self.Roscfg is None: 365 for param, p in self.Roscfg.params.items(): 366 if param.endswith('robots'): 367 if isinstance(p.value, list): 368 if len(p.value) > 0 and len(p.value[0]) != 5: 369 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param 370 else: 371 for entry in p.value: 372 result[entry[0]] = { 'type' : entry[1], 'name' : entry[2], 'images' : entry[4].split(), 'description' : self._decode(entry[4]) } 373 return result
374
375 - def getCapabilitiesDesrc(self):
376 ''' 377 Parses the launch file for C{capabilities} and C{capability_group} parameter 378 and creates dictionary for grouping the nodes. 379 @return: the capabilities description stored in this configuration 380 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))} 381 ''' 382 result = dict() 383 capabilies_descr = dict() 384 if not self.Roscfg is None: 385 # get the capabilities description 386 # use two separate loops, to create the description list first 387 # TODO read the group description depending on namespace 388 for param, p in self.Roscfg.params.items(): 389 if param.endswith('capabilities'): 390 if isinstance(p.value, list): 391 if len(p.value) > 0 and len(p.value[0]) != 4: 392 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param 393 else: 394 for entry in p.value: 395 capabilies_descr[entry[0]] = { 'type' : ''.join([entry[1]]), 'images' : entry[2].split(), 'description' : self._decode(entry[3])} 396 # get the capability nodes 397 for item in self.Roscfg.nodes: 398 node_fullname = roslib.names.ns_join(item.namespace, item.name) 399 machine_name = item.machine_name if not item.machine_name is None and not item.machine_name == 'localhost' else '' 400 added = False 401 cap_param = roslib.names.ns_join(node_fullname, 'capability_group') 402 cap_ns = node_fullname 403 #find the capability group parameter in namespace 404 while not self.Roscfg.params.has_key(cap_param) and cap_param.count(roslib.names.SEP) > 1: 405 cap_ns = roslib.names.namespace(cap_ns).rstrip(roslib.names.SEP) 406 if not cap_ns: 407 cap_ns = roslib.names.SEP 408 cap_param = roslib.names.ns_join(cap_ns, 'capability_group') 409 if cap_ns == node_fullname: 410 cap_ns = item.namespace.rstrip(roslib.names.SEP) 411 # if the 'capability_group' parameter found, assign node to the group 412 if self.Roscfg.params.has_key(cap_param) and self.Roscfg.params[cap_param].value: 413 p = self.Roscfg.params[cap_param] 414 if not result.has_key(machine_name): 415 result[machine_name] = dict() 416 for (ns, groups) in result[machine_name].items(): 417 if ns == cap_ns and groups.has_key(p.value): 418 groups[p.value]['nodes'].append(node_fullname) 419 added = True 420 break 421 if not added: 422 ns = cap_ns 423 # add new group in the namespace of the node 424 if not result[machine_name].has_key(ns): 425 result[machine_name][ns] = dict() 426 if not result[machine_name][ns].has_key(p.value): 427 try: 428 result[machine_name][ns][p.value] = { 'type' : capabilies_descr[p.value]['type'], 'images': capabilies_descr[p.value]['images'], 'description' : capabilies_descr[p.value]['description'], 'nodes' : [] } 429 except: 430 result[machine_name][ns][p.value] = { 'type' : '', 'images': [], 'description' : '', 'nodes' : [] } 431 result[machine_name][ns][p.value]['nodes'].append(node_fullname) 432 return result
433
434 - def argvToDict(self, argv):
435 result = dict() 436 for a in argv: 437 key, sep, value = a.partition(':=') 438 if sep: 439 result[key] = value 440 return result
441
442 - def getNode(self, name):
443 ''' 444 Returns a configuration node for a given node name. 445 @param name: the name of the node. 446 @type name: C{str} 447 @return: the configuration node stored in this configuration 448 @rtype: L{roslaunch.Node} or C{None} 449 ''' 450 nodename = os.path.basename(name) 451 namespace = os.path.dirname(name).strip(roslib.names.SEP) 452 for item in self.Roscfg.nodes: 453 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace): 454 return item 455 return None
456
457 - def get_robot_icon(self):
458 ''' 459 Returns the value of the `/robot_icon` parameter or None 460 ''' 461 try: 462 return self.Roscfg.params['/robot_icon'].value 463 except: 464 pass 465 return None
466