Package node_manager_fkie :: Module launch_config
[frames] | no frames]

Source Code for Module node_manager_fkie.launch_config

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  import os 
 34  import sys 
 35  import time 
 36  from ros import roslaunch 
 37  import rospy 
 38  import roslib 
 39   
 40  from xml.dom.minidom import parse, parseString  
 41  from xml.dom import Node as DomNode #avoid aliasing 
 42   
 43  from PySide import QtCore 
 44   
 45  import node_manager_fkie as nm 
46 47 -class LaunchConfigException(Exception):
48 pass
49
50 51 -class LaunchConfig(QtCore.QObject):
52 ''' 53 A class to handle the ROS configuration stored in launch file. 54 ''' 55 file_changed = QtCore.Signal(str, str) 56 '''@ivar: a signal to inform the receiver about the changes on 57 launch file or included file. ParameterB{:} (changed file, launch file)''' 58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 ''' 61 Creates the LaunchConfig object. The launch file will be not loaded on 62 creation, first on request of Roscfg value. 63 @param launch_file: The absolute or relative path with the launch file. 64 By using relative path a package must be valid for 65 remote launches. 66 @type launch_file: C{str} 67 @param package: the package containing the launch file. If None the 68 launch_file will be used to determine the launch file. 69 No remote launches a possible without a valid package. 70 @type package: C{str} or C{None} 71 @param masteruri: The URL of the ROS master. 72 @type masteruri: C{str} or C{None} 73 @param argv: the list the arguments needed for loading the given launch file 74 @type argv: C{[str]} 75 @raise roslaunch.XmlParseException: if the launch file can't be found. 76 ''' 77 QtCore.QObject.__init__(self) 78 self.__launchFile = launch_file 79 self.__package = self.packageName(os.path.dirname(self.__launchFile))[0] if package is None else package 80 self.__masteruri = masteruri if not masteruri is None else 'localhost' 81 self.__roscfg = None 82 self.argv = argv 83 self.__reqTested = False 84 self.global_param_done = [] # masteruri's where the global parameters are registered 85 self.hostname = nm.nameres().getHostname(self.__masteruri) 86 self.file_watcher = QtCore.QFileSystemWatcher() 87 self.file_watcher.fileChanged.connect(self.on_file_changed) 88 self.changed = {}
89
90 - def __del__(self):
91 # Delete to avoid segfault if the LaunchConfig class is destroyed recently 92 # after creation and xmlrpclib.ServerProxy process a method call. 93 del self.file_watcher
94
95 - def on_file_changed(self, file):
96 ''' 97 callback method, which is called by L{QtCore.QFileSystemWatcher} if the 98 launch file or included files are changed. In this case 99 L{LaunchConfig.file_changed} signal will be emitted. 100 ''' 101 # to avoid to handle from QFileSystemWatcher fired the signal two times 102 if (not self.changed.has_key(file) or (self.changed.has_key(file) and self.changed[file] + 0.05 < time.time())): 103 self.changed[file] = time.time() 104 self.file_changed.emit(file, self.__launchFile)
105 106 @property
107 - def masteruri(self):
108 ''' 109 Returns the master URI (host) where the node of this config will be started. 110 @rtype: C{str} 111 ''' 112 return self.__masteruri
113 114 @property
115 - def Roscfg(self):
116 ''' 117 Returns a loaded launch configuration 118 @rtype: L{roslaunch.ROSLaunchConfig} or C{None} 119 @raise LaunchConfigException: on load error 120 @see L{load()} 121 ''' 122 if not (self.__roscfg is None): 123 return self.__roscfg 124 else: 125 result, argv = self.load(self.argv) 126 if not result: 127 raise LaunchConfigException("not all argv are setted properly!") 128 return self.__roscfg
129 130 @property
131 - def Filename(self):
132 ''' 133 Returns an existing path with file name or an empty string. 134 @rtype: C{str} 135 ''' 136 if os.path.isfile(self.__launchFile): 137 return self.__launchFile 138 elif not (self.__package is None): 139 try: 140 import roslib 141 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop() 142 except Exception: 143 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!']))
144 145 @property
146 - def LaunchName(self):
147 ''' 148 Returns the name of the launch file with extension, e.g. 'test.launch' 149 @rtype: C{str} 150 ''' 151 return os.path.basename(self.__launchFile)
152 153 @property
154 - def PackageName(self):
155 ''' 156 Returns the name of the package containing the launch file or None. 157 @rtype: C{str} or C{None} 158 ''' 159 return self.__package
160 161 @classmethod
162 - def packageName(cls, dir):
163 ''' 164 Returns for given directory a tuple of package name and package path or None values. 165 @rtype: C{(name, path)} 166 ''' 167 if not (dir is None) and dir and dir != os.path.sep and os.path.isdir(dir): 168 package = os.path.basename(dir) 169 fileList = os.listdir(dir) 170 for file in fileList: 171 if file == 'manifest.xml': 172 return (package, dir) 173 return cls.packageName(os.path.dirname(dir)) 174 return (None, None)
175
176 - def _index(self, text, regexp_list):
177 ''' 178 Searches in the given text for key indicates the including of a file and 179 return their index. 180 @param text: 181 @type text: C{str} 182 @param regexp_list: 183 @type regexp_list: C{[L{QtCore.QRegExp},..]} 184 @return: the index of the including key or -1 185 @rtype: C{int} 186 ''' 187 for pattern in regexp_list: 188 index = pattern.indexIn(text) 189 if index > -1: 190 return index 191 return -1
192
193 - def interpretPath(self, path, pwd='.'):
194 ''' 195 Tries to determine the path of the included file. The statement of 196 $(find 'package') will be resolved. 197 @param path: the sting which contains the included path 198 @type path: C{str} 199 @param pwd: current working path 200 @type pwd: C{str} 201 @return: if no leading L{os.sep} is detected, the path set by L{setCurrentPath()} 202 will be prepend. C{$(find 'package')} will be resolved. Otherwise the parameter 203 itself will be returned 204 @rtype: C{str} 205 ''' 206 path = path.strip() 207 index = path.find('$') 208 if index > -1: 209 startIndex = path.find('(', index) 210 if startIndex > -1: 211 endIndex = path.find(')', startIndex+1) 212 script = path[startIndex+1:endIndex].split() 213 if len(script) == 2 and (script[0] == 'find'): 214 pkg = roslib.packages.get_pkg_dir(script[1]) 215 return ''.join([pkg, os.path.sep, path[endIndex+1:]]) 216 elif len(path) > 0 and path[0] != os.path.sep: 217 return ''.join([pwd, os.path.sep, path]) 218 return path
219
220 - def getIncludedFiles(self, file):
221 ''' 222 Reads the configuration file and searches for included files. This files 223 will be returned in a list. 224 @return: the list with all files needed for the configuration 225 @rtype: C{[str,...]} 226 ''' 227 result = set([file]) 228 regexp_list = [QtCore.QRegExp("\\binclude\\b"), QtCore.QRegExp("\\btextfile\\b"), 229 QtCore.QRegExp("\\bfile\\b")] 230 lines = [] 231 with open(file, 'r') as f: 232 lines = f.readlines() 233 for line in lines: 234 index = self._index(line, regexp_list) 235 if index > -1: 236 startIndex = line.find('"', index) 237 if startIndex > -1: 238 endIndex = line.find('"', startIndex+1) 239 fileName = line[startIndex+1:endIndex] 240 if len(fileName) > 0: 241 path = self.interpretPath(fileName, os.path.dirname(self.__launchFile)) 242 if os.path.isfile(path): 243 result.add(path) 244 if path.endswith('.launch'): 245 result.update(self.getIncludedFiles(path)) 246 return list(result)
247
248 - def load(self, argv):
249 ''' 250 @param argv: the list with argv parameter needed to load the launch file. 251 The name and value are separated by C{:=} 252 @type argv: C{[str]} 253 @return True, if the launch file was loaded 254 @rtype boolean 255 @raise LaunchConfigException: on load errors 256 ''' 257 import re 258 try: 259 roscfg = roslaunch.ROSLaunchConfig() 260 loader = roslaunch.XmlLoader() 261 loader.load(self.Filename, roscfg, verbose=False, argv=argv) 262 self.__roscfg = roscfg 263 # for m, k in self.__roscfg.machines.items(): 264 # print m, k 265 files = self.file_watcher.files() 266 if files: 267 self.file_watcher.removePaths(files) 268 self.file_watcher.addPaths(self.getIncludedFiles(self.Filename)) 269 except roslaunch.XmlParseException, e: 270 test = list(re.finditer(r"environment variable '\w+' is not set", str(e))) 271 message = str(e) 272 if test: 273 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!']) 274 raise LaunchConfigException(message) 275 return True
276
277 - def getArgs(self):
278 ''' 279 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will 280 be returned 281 @rtype: C{[str]} 282 @raise roslaunch.XmlParseException: on parse errors 283 ''' 284 arg_subs = [] 285 args = [] 286 for filename in self.getIncludedFiles(self.Filename): 287 try: 288 if filename.endswith('.launch'): 289 args[len(args):-1] = parse(filename).getElementsByTagName('arg') 290 except Exception as e: 291 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s"%e) 292 293 for arg in args: 294 arg_name = arg.getAttribute("name") 295 if not arg_name: 296 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s"%arg.toxml()) 297 298 # we only want args at top level: 299 if not arg.parentNode.tagName=="launch": 300 continue 301 302 arg_default = arg.getAttribute("default") 303 arg_value = arg.getAttribute("value") 304 arg_sub = ''.join([arg_name, ':=', arg_default]) 305 if (not arg_value) and not arg_sub in arg_subs: 306 arg_subs.append(arg_sub) 307 308 return arg_subs
309
310 - def _decode(self, val):
311 result = val.replace("\\n ", "\n") 312 try: 313 result = result.decode(sys.getfilesystemencoding()) 314 except: 315 pass 316 return result
317
318 - def getRobotDescr(self):
319 ''' 320 Parses the launch file for C{robots} parameter to get the description of the 321 robot. 322 @return: the robot description stored in the configuration 323 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))} 324 ''' 325 result = dict() 326 if not self.Roscfg is None: 327 for param, p in self.Roscfg.params.items(): 328 if param.endswith('robots'): 329 if isinstance(p.value, list): 330 if len(p.value) > 0 and len(p.value[0]) != 5: 331 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param 332 else: 333 for entry in p.value: 334 result[entry[0]] = { 'type' : entry[1], 'name' : entry[2], 'images' : entry[4].split(), 'description' : self._decode(entry[4]) } 335 return result
336
337 - def getCapabilitiesDesrc(self):
338 ''' 339 Parses the launch file for C{capabilities} and C{capability_group} parameter 340 and creates dictionary for grouping the nodes. 341 @return: the capabilities description stored in this configuration 342 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))} 343 ''' 344 result = dict() 345 if not self.Roscfg is None: 346 # get the capabilities description 347 # use two separate loops, to create the description list first 348 for param, p in self.Roscfg.params.items(): 349 if param.endswith('capabilities'): 350 if isinstance(p.value, list): 351 if len(p.value) > 0 and len(p.value[0]) != 4: 352 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param 353 else: 354 ns = roslib.names.namespace(param) 355 for m in self.Roscfg.machines.keys(): 356 if not result.has_key(m): 357 result[m] = dict() 358 for entry in p.value: 359 if not result[m].has_key(ns): 360 result[m][ns] = dict() 361 result[m][ns][entry[0]] = { 'type' : ''.join([entry[1]]), 'images' : entry[2].split(), 'description' : self._decode(entry[3]), 'nodes' : [] } 362 # get the capability nodes 363 for param, p in self.Roscfg.params.items(): 364 if param.endswith('capability_group'): 365 param_node = roslib.names.namespace(param).rstrip(roslib.names.SEP) 366 if not param_node: 367 param_node = roslib.names.SEP 368 # get the nodes with groups 369 for item in self.Roscfg.nodes: 370 node_fullname = roslib.names.ns_join(item.namespace, item.name) 371 machine_name = item.machine_name if not item.machine_name is None and not item.machine_name == 'localhost' else '' 372 added = False 373 if node_fullname == param_node: 374 if not result.has_key(machine_name): 375 result[machine_name] = dict() 376 for (ns, groups) in result[machine_name].items(): 377 if groups.has_key(p.value): 378 groups[p.value]['nodes'].append(node_fullname) 379 added = True 380 break 381 if not added: 382 ns = ''.join([item.namespace]) 383 # add new group in the namespace of the node 384 if not result[machine_name].has_key(ns): 385 result[machine_name][ns] = dict() 386 if not result[machine_name][ns].has_key(p.value): 387 result[machine_name][ns][p.value] = { 'type' : '', 'images': [], 'description' : '', 'nodes' : [] } 388 result[machine_name][ns][p.value]['nodes'].append(node_fullname) 389 return result
390
391 - def argvToDict(self, argv):
392 result = dict() 393 for a in argv: 394 key, sep, value = a.partition(':=') 395 if sep: 396 result[key] = value 397 return result
398
399 - def getNode(self, name):
400 ''' 401 Returns a configuration node for a given node name. 402 @param name: the name of the node. 403 @type name: C{str} 404 @return: the configuration node stored in this configuration 405 @rtype: L{roslaunch.Node} or C{None} 406 ''' 407 nodename = os.path.basename(name) 408 namespace = os.path.dirname(name).strip(roslib.names.SEP) 409 for item in self.Roscfg.nodes: 410 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace): 411 return item 412 return None
413