1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import os
34 import sys
35 import time
36 from ros import roslaunch
37 import rospy
38 import roslib
39
40 from xml.dom.minidom import parse, parseString
41 from xml.dom import Node as DomNode
42
43 from PySide import QtCore
44
45 import node_manager_fkie as nm
49
52 '''
53 A class to handle the ROS configuration stored in launch file.
54 '''
55 file_changed = QtCore.Signal(str, str)
56 '''@ivar: a signal to inform the receiver about the changes on
57 launch file or included file. ParameterB{:} (changed file, launch file)'''
58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 '''
61 Creates the LaunchConfig object. The launch file will be not loaded on
62 creation, first on request of Roscfg value.
63 @param launch_file: The absolute or relative path with the launch file.
64 By using relative path a package must be valid for
65 remote launches.
66 @type launch_file: C{str}
67 @param package: the package containing the launch file. If None the
68 launch_file will be used to determine the launch file.
69 No remote launches a possible without a valid package.
70 @type package: C{str} or C{None}
71 @param masteruri: The URL of the ROS master.
72 @type masteruri: C{str} or C{None}
73 @param argv: the list the arguments needed for loading the given launch file
74 @type argv: C{[str]}
75 @raise roslaunch.XmlParseException: if the launch file can't be found.
76 '''
77 QtCore.QObject.__init__(self)
78 self.__launchFile = launch_file
79 self.__package = self.packageName(os.path.dirname(self.__launchFile))[0] if package is None else package
80 self.__masteruri = masteruri if not masteruri is None else 'localhost'
81 self.__roscfg = None
82 self.argv = argv
83 self.__reqTested = False
84 self.global_param_done = []
85 self.hostname = nm.nameres().getHostname(self.__masteruri)
86 self.file_watcher = QtCore.QFileSystemWatcher()
87 self.file_watcher.fileChanged.connect(self.on_file_changed)
88 self.changed = {}
89
91
92
93 del self.file_watcher
94
96 '''
97 callback method, which is called by L{QtCore.QFileSystemWatcher} if the
98 launch file or included files are changed. In this case
99 L{LaunchConfig.file_changed} signal will be emitted.
100 '''
101
102 if (not self.changed.has_key(file) or (self.changed.has_key(file) and self.changed[file] + 0.05 < time.time())):
103 self.changed[file] = time.time()
104 self.file_changed.emit(file, self.__launchFile)
105
106 @property
108 '''
109 Returns the master URI (host) where the node of this config will be started.
110 @rtype: C{str}
111 '''
112 return self.__masteruri
113
114 @property
116 '''
117 Returns a loaded launch configuration
118 @rtype: L{roslaunch.ROSLaunchConfig} or C{None}
119 @raise LaunchConfigException: on load error
120 @see L{load()}
121 '''
122 if not (self.__roscfg is None):
123 return self.__roscfg
124 else:
125 result, argv = self.load(self.argv)
126 if not result:
127 raise LaunchConfigException("not all argv are setted properly!")
128 return self.__roscfg
129
130 @property
132 '''
133 Returns an existing path with file name or an empty string.
134 @rtype: C{str}
135 '''
136 if os.path.isfile(self.__launchFile):
137 return self.__launchFile
138 elif not (self.__package is None):
139 try:
140 import roslib
141 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop()
142 except Exception:
143 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!']))
144
145 @property
147 '''
148 Returns the name of the launch file with extension, e.g. 'test.launch'
149 @rtype: C{str}
150 '''
151 return os.path.basename(self.__launchFile)
152
153 @property
155 '''
156 Returns the name of the package containing the launch file or None.
157 @rtype: C{str} or C{None}
158 '''
159 return self.__package
160
161 @classmethod
163 '''
164 Returns for given directory a tuple of package name and package path or None values.
165 @rtype: C{(name, path)}
166 '''
167 if not (dir is None) and dir and dir != os.path.sep and os.path.isdir(dir):
168 package = os.path.basename(dir)
169 fileList = os.listdir(dir)
170 for file in fileList:
171 if file == 'manifest.xml':
172 return (package, dir)
173 return cls.packageName(os.path.dirname(dir))
174 return (None, None)
175
176 - def _index(self, text, regexp_list):
177 '''
178 Searches in the given text for key indicates the including of a file and
179 return their index.
180 @param text:
181 @type text: C{str}
182 @param regexp_list:
183 @type regexp_list: C{[L{QtCore.QRegExp},..]}
184 @return: the index of the including key or -1
185 @rtype: C{int}
186 '''
187 for pattern in regexp_list:
188 index = pattern.indexIn(text)
189 if index > -1:
190 return index
191 return -1
192
194 '''
195 Tries to determine the path of the included file. The statement of
196 $(find 'package') will be resolved.
197 @param path: the sting which contains the included path
198 @type path: C{str}
199 @param pwd: current working path
200 @type pwd: C{str}
201 @return: if no leading L{os.sep} is detected, the path set by L{setCurrentPath()}
202 will be prepend. C{$(find 'package')} will be resolved. Otherwise the parameter
203 itself will be returned
204 @rtype: C{str}
205 '''
206 path = path.strip()
207 index = path.find('$')
208 if index > -1:
209 startIndex = path.find('(', index)
210 if startIndex > -1:
211 endIndex = path.find(')', startIndex+1)
212 script = path[startIndex+1:endIndex].split()
213 if len(script) == 2 and (script[0] == 'find'):
214 pkg = roslib.packages.get_pkg_dir(script[1])
215 return ''.join([pkg, os.path.sep, path[endIndex+1:]])
216 elif len(path) > 0 and path[0] != os.path.sep:
217 return ''.join([pwd, os.path.sep, path])
218 return path
219
221 '''
222 Reads the configuration file and searches for included files. This files
223 will be returned in a list.
224 @return: the list with all files needed for the configuration
225 @rtype: C{[str,...]}
226 '''
227 result = set([file])
228 regexp_list = [QtCore.QRegExp("\\binclude\\b"), QtCore.QRegExp("\\btextfile\\b"),
229 QtCore.QRegExp("\\bfile\\b")]
230 lines = []
231 with open(file, 'r') as f:
232 lines = f.readlines()
233 for line in lines:
234 index = self._index(line, regexp_list)
235 if index > -1:
236 startIndex = line.find('"', index)
237 if startIndex > -1:
238 endIndex = line.find('"', startIndex+1)
239 fileName = line[startIndex+1:endIndex]
240 if len(fileName) > 0:
241 path = self.interpretPath(fileName, os.path.dirname(self.__launchFile))
242 if os.path.isfile(path):
243 result.add(path)
244 if path.endswith('.launch'):
245 result.update(self.getIncludedFiles(path))
246 return list(result)
247
248 - def load(self, argv):
249 '''
250 @param argv: the list with argv parameter needed to load the launch file.
251 The name and value are separated by C{:=}
252 @type argv: C{[str]}
253 @return True, if the launch file was loaded
254 @rtype boolean
255 @raise LaunchConfigException: on load errors
256 '''
257 import re
258 try:
259 roscfg = roslaunch.ROSLaunchConfig()
260 loader = roslaunch.XmlLoader()
261 loader.load(self.Filename, roscfg, verbose=False, argv=argv)
262 self.__roscfg = roscfg
263
264
265 files = self.file_watcher.files()
266 if files:
267 self.file_watcher.removePaths(files)
268 self.file_watcher.addPaths(self.getIncludedFiles(self.Filename))
269 except roslaunch.XmlParseException, e:
270 test = list(re.finditer(r"environment variable '\w+' is not set", str(e)))
271 message = str(e)
272 if test:
273 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!'])
274 raise LaunchConfigException(message)
275 return True
276
278 '''
279 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will
280 be returned
281 @rtype: C{[str]}
282 @raise roslaunch.XmlParseException: on parse errors
283 '''
284 arg_subs = []
285 args = []
286 for filename in self.getIncludedFiles(self.Filename):
287 try:
288 if filename.endswith('.launch'):
289 args[len(args):-1] = parse(filename).getElementsByTagName('arg')
290 except Exception as e:
291 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s"%e)
292
293 for arg in args:
294 arg_name = arg.getAttribute("name")
295 if not arg_name:
296 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s"%arg.toxml())
297
298
299 if not arg.parentNode.tagName=="launch":
300 continue
301
302 arg_default = arg.getAttribute("default")
303 arg_value = arg.getAttribute("value")
304 arg_sub = ''.join([arg_name, ':=', arg_default])
305 if (not arg_value) and not arg_sub in arg_subs:
306 arg_subs.append(arg_sub)
307
308 return arg_subs
309
311 result = val.replace("\\n ", "\n")
312 try:
313 result = result.decode(sys.getfilesystemencoding())
314 except:
315 pass
316 return result
317
319 '''
320 Parses the launch file for C{robots} parameter to get the description of the
321 robot.
322 @return: the robot description stored in the configuration
323 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))}
324 '''
325 result = dict()
326 if not self.Roscfg is None:
327 for param, p in self.Roscfg.params.items():
328 if param.endswith('robots'):
329 if isinstance(p.value, list):
330 if len(p.value) > 0 and len(p.value[0]) != 5:
331 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param
332 else:
333 for entry in p.value:
334 result[entry[0]] = { 'type' : entry[1], 'name' : entry[2], 'images' : entry[4].split(), 'description' : self._decode(entry[4]) }
335 return result
336
338 '''
339 Parses the launch file for C{capabilities} and C{capability_group} parameter
340 and creates dictionary for grouping the nodes.
341 @return: the capabilities description stored in this configuration
342 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))}
343 '''
344 result = dict()
345 if not self.Roscfg is None:
346
347
348 for param, p in self.Roscfg.params.items():
349 if param.endswith('capabilities'):
350 if isinstance(p.value, list):
351 if len(p.value) > 0 and len(p.value[0]) != 4:
352 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param
353 else:
354 ns = roslib.names.namespace(param)
355 for m in self.Roscfg.machines.keys():
356 if not result.has_key(m):
357 result[m] = dict()
358 for entry in p.value:
359 if not result[m].has_key(ns):
360 result[m][ns] = dict()
361 result[m][ns][entry[0]] = { 'type' : ''.join([entry[1]]), 'images' : entry[2].split(), 'description' : self._decode(entry[3]), 'nodes' : [] }
362
363 for param, p in self.Roscfg.params.items():
364 if param.endswith('capability_group'):
365 param_node = roslib.names.namespace(param).rstrip(roslib.names.SEP)
366 if not param_node:
367 param_node = roslib.names.SEP
368
369 for item in self.Roscfg.nodes:
370 node_fullname = roslib.names.ns_join(item.namespace, item.name)
371 machine_name = item.machine_name if not item.machine_name is None and not item.machine_name == 'localhost' else ''
372 added = False
373 if node_fullname == param_node:
374 if not result.has_key(machine_name):
375 result[machine_name] = dict()
376 for (ns, groups) in result[machine_name].items():
377 if groups.has_key(p.value):
378 groups[p.value]['nodes'].append(node_fullname)
379 added = True
380 break
381 if not added:
382 ns = ''.join([item.namespace])
383
384 if not result[machine_name].has_key(ns):
385 result[machine_name][ns] = dict()
386 if not result[machine_name][ns].has_key(p.value):
387 result[machine_name][ns][p.value] = { 'type' : '', 'images': [], 'description' : '', 'nodes' : [] }
388 result[machine_name][ns][p.value]['nodes'].append(node_fullname)
389 return result
390
392 result = dict()
393 for a in argv:
394 key, sep, value = a.partition(':=')
395 if sep:
396 result[key] = value
397 return result
398
400 '''
401 Returns a configuration node for a given node name.
402 @param name: the name of the node.
403 @type name: C{str}
404 @return: the configuration node stored in this configuration
405 @rtype: L{roslaunch.Node} or C{None}
406 '''
407 nodename = os.path.basename(name)
408 namespace = os.path.dirname(name).strip(roslib.names.SEP)
409 for item in self.Roscfg.nodes:
410 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace):
411 return item
412 return None
413