1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from python_qt_binding.QtCore import QObject, QRegExp
34 from xml.dom.minidom import parse
35 import os
36 import re
37 import sys
38 import time
39
40 import roslaunch
41 import roslib
42
43 from master_discovery_fkie.common import get_hostname, resolve_url
44 import node_manager_fkie as nm
45
46 from .common import package_name, resolve_paths
52
55 '''
56 A class to handle the ROS configuration stored in launch file.
57 '''
58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 '''
61 Creates the LaunchConfig object. The launch file will be not loaded on
62 creation, first on request of Roscfg value.
63 @param launch_file: The absolute or relative path with the launch file.
64 By using relative path a package must be valid for
65 remote launches.
66 @type launch_file: C{str}
67 @param package: the package containing the launch file. If None the
68 launch_file will be used to determine the launch file.
69 No remote launches a possible without a valid package.
70 @type package: C{str} or C{None}
71 @param masteruri: The URL of the ROS master.
72 @type masteruri: C{str} or C{None}
73 @param argv: the list the arguments needed for loading the given launch file
74 @type argv: C{[str]}
75 @raise roslaunch.XmlParseException: if the launch file can't be found.
76 '''
77 QObject.__init__(self)
78 self.__launchFile = launch_file
79 self.__package = package_name(os.path.dirname(self.__launchFile))[0] if package is None else package
80 self.__masteruri = masteruri if masteruri is not None else 'localhost'
81 self.__roscfg = None
82 self.argv = argv
83 self.__reqTested = False
84 self.__argv_values = dict()
85 self.global_param_done = []
86 self.hostname = get_hostname(self.__masteruri)
87 self.__launch_id = '%.9f' % time.time()
88 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, [self.__launchFile])
89
90
95
96 @property
98 '''
99 Returns the master URI (host) where the node of this config will be started.
100 @rtype: C{str}
101 '''
102 return self.__masteruri
103
104 @property
106 '''
107 Holds a loaded launch configuration. It raises a LaunchConfigException on load error.
108 @rtype: U{roslaunch.ROSLaunchConfig<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None}
109 @see L{load()}
110 '''
111 if self.__roscfg is not None:
112 return self.__roscfg
113 else:
114 result, _ = self.load(self.argv)
115 if not result:
116 raise LaunchConfigException("not all argv are setted properly!")
117 return self.__roscfg
118
119 @property
121 '''
122 Returns an existing path with file name or an empty string.
123 @rtype: C{str}
124 '''
125 if os.path.isfile(self.__launchFile):
126 return self.__launchFile
127 elif self.__package is not None:
128 try:
129 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop()
130 except Exception:
131 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!']))
132 raise LaunchConfigException(''.join(['launch file ', self.__launchFile, ' not found!']))
133
134 @property
136 '''
137 Returns the name of the launch file with extension, e.g. 'test.launch'
138 @rtype: C{str}
139 '''
140 return os.path.basename(self.__launchFile)
141
142 @property
144 '''
145 Returns the name of the package containing the launch file or None.
146 @rtype: C{str} or C{None}
147 '''
148 return self.__package
149
150 @classmethod
151 - def _index(cls, text, regexp_list):
152 '''
153 Searches in the given text for key indicates the including of a file and
154 return their index.
155 @param text:
156 @type text: C{str}
157 @param regexp_list:
158 @type regexp_list: C{[U{QRegExp<https://srinikom.github.io/pyside-docs/PySide/QtCore/QRegExp.html>},..]}
159 @return: the index of the including key or -1
160 @rtype: C{int}
161 '''
162 for pattern in regexp_list:
163 index = pattern.indexIn(text)
164 if index > -1:
165 return index
166 return -1
167
168 @classmethod
170 '''
171 Tries to determine the path of the included file. The statement of
172 $(find 'package') will be resolved.
173 The supported URL begins with `file:///`, `package://` or `pkg://`.
174 The package URL will be resolved to a valid file path. If the file is in a
175 subdirectory, you can replace the subdirectory by `///`.
176 @param path: the sting which contains the included path
177 @type path: C{str}
178 @param pwd: current working path
179 @type pwd: C{str}
180 @return: C{$(find 'package')} will be resolved. The prefixes `file:///`,
181 `package://` or `pkg://` are also resolved. Otherwise the parameter
182 itself will be returned.
183 @rtype: C{str}
184 '''
185 path = path.strip()
186 startIndex = path.find('$(')
187 if startIndex > -1:
188 endIndex = path.find(')', startIndex + 2)
189 script = path[startIndex + 2:endIndex].split()
190 if len(script) == 2 and (script[0] == 'find'):
191 pkg = roslib.packages.get_pkg_dir(script[1])
192 return os.path.join(pkg, path[endIndex + 2:].strip(os.path.sep))
193 elif len(path) > 0 and path[0] != os.path.sep:
194 try:
195 return resolve_url(path)
196 except ValueError, _:
197 if len(path) > 0 and path[0] != os.path.sep:
198 return os.path.normpath(''.join([pwd, os.path.sep, path]))
199 return path
200
201 @classmethod
202 - def getIncludedFiles(cls, inc_file, regexp_list=[QRegExp("\\binclude\\b"),
203 QRegExp("\\btextfile\\b"),
204 QRegExp("\\bfile\\b"),
205 QRegExp("\\bdefault\\b"),
206 QRegExp("\\bvalue=.*pkg:\/\/\\b"),
207 QRegExp("\\bvalue=.*package:\/\/\\b"),
208 QRegExp("\\bvalue=.*\$\(find\\b")]):
209 '''
210 Reads the configuration file and searches for included files. This files
211 will be returned in a list.
212 @param inc_file: path of the ROS launch file
213 @param regexp_list: pattern of
214 @return: the list with all files needed for the configuration
215 @rtype: C{[str,...]}
216 '''
217 result = set()
218 with open(inc_file, 'r') as f:
219 content = f.read()
220
221 comment_pattern = QRegExp("<!--.*?-->")
222 pos = comment_pattern.indexIn(content)
223 while pos != -1:
224 content = content[:pos] + content[pos + comment_pattern.matchedLength():]
225 pos = comment_pattern.indexIn(content)
226 lines = content.splitlines()
227 for line in lines:
228 index = cls._index(line, regexp_list)
229 if index > -1:
230 startIndex = line.find('"', index)
231 if startIndex > -1:
232 endIndex = line.find('"', startIndex + 1)
233 fileName = line[startIndex + 1:endIndex]
234 if len(fileName) > 0:
235 try:
236 path = cls.interpretPath(fileName, os.path.dirname(inc_file))
237 if os.path.isfile(path):
238 result.add(path)
239 if path.endswith('.launch'):
240 result.update(cls.getIncludedFiles(path, regexp_list))
241 except:
242 pass
243 return list(result)
244
245 - def load(self, argv):
246 '''
247 @param argv: the list with argv parameter needed to load the launch file.
248 The name and value are separated by C{:=}
249 @type argv: C{[str]}
250 @return: True, if the launch file was loaded
251 @rtype: boolean
252 @raise LaunchConfigException: on load errors
253 '''
254 try:
255 roscfg = roslaunch.ROSLaunchConfig()
256 loader = roslaunch.XmlLoader()
257 self.argv = self.resolveArgs(argv)
258 loader.load(self.Filename, roscfg, verbose=False, argv=self.argv)
259 self.__roscfg = roscfg
260 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, self.getIncludedFiles(self.Filename))
261 if not nm.is_local(get_hostname(self.__masteruri)):
262 files = self.getIncludedFiles(self.Filename,
263 regexp_list=[QRegExp("\\bdefault\\b"),
264 QRegExp("\\bvalue=.*pkg:\/\/\\b"),
265 QRegExp("\\bvalue=.*package:\/\/\\b"),
266 QRegExp("\\bvalue=.*\$\(find\\b")])
267 nm.file_watcher_param().add_launch(self.__masteruri,
268 self.__launchFile,
269 self.__launch_id,
270 files)
271 except roslaunch.XmlParseException, e:
272 test = list(re.finditer(r"environment variable '\w+' is not set", str(e)))
273 message = str(e)
274 if test:
275 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!'])
276 raise LaunchConfigException(message)
277 return True, self.argv
278
280 argv_dict = self.argvToDict(argv)
281
282 for k, _ in argv_dict.items():
283 self._replaceArg(k, argv_dict, self.__argv_values)
284 return ["%s:=%s" % (k, v) for k, v in argv_dict.items()]
285
286 - def _replaceArg(self, arg, argv_defaults, argv_values):
287 '''
288 Replace the arg-tags in the value in given argument recursively.
289 '''
290 rec_inc = 0
291 value = argv_defaults[arg]
292 arg_match = re.search(r"\$\(\s*arg\s*", value)
293 while arg_match is not None:
294 rec_inc += 1
295 endIndex = value.find(')', arg_match.end())
296 if endIndex > -1:
297 arg_name = value[arg_match.end():endIndex].strip()
298 if arg == arg_name:
299 raise LaunchConfigException("Can't resolve the argument `%s` argument: the argument referenced to itself!" % arg_name)
300 if rec_inc > 100:
301 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument: recursion depth of 100 reached!" % (arg_name, arg))
302 if arg_name in argv_defaults:
303 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_defaults[arg_name])
304 elif arg_name in argv_values:
305 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_values[arg_name])
306 else:
307 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument" % (arg_name, arg))
308 else:
309 raise LaunchConfigException("Can't resolve the argument in `%s` argument: `)` not found" % arg)
310 value = argv_defaults[arg]
311 arg_match = re.search(r"\$\(\s*arg\s*", value)
312
314 '''
315 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will
316 be returned
317 @rtype: C{[str]}
318 @raise roslaunch.XmlParseException: on parse errors
319 '''
320 self._argv_values = dict()
321 arg_subs = []
322 args = []
323
324
325 for filename in [self.Filename]:
326 try:
327 if filename.endswith('.launch'):
328 args[len(args):-1] = parse(filename).getElementsByTagName('arg')
329 except Exception as e:
330 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s" % e)
331
332 for arg in args:
333 arg_name = arg.getAttribute("name")
334 if not arg_name:
335 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s" % arg.toxml())
336
337
338 if not arg.parentNode.tagName == "launch":
339 continue
340
341 arg_default = arg.getAttribute("default")
342 arg_value = arg.getAttribute("value")
343 arg_sub = ''.join([arg_name, ':=', arg_default])
344 if (not arg_value) and arg_sub not in arg_subs:
345 arg_subs.append(arg_sub)
346 elif arg_value:
347 self.__argv_values[arg_name] = arg_value
348
349 return arg_subs
350
352 result = val.replace("\\n ", "\n")
353 try:
354 result = result.decode(sys.getfilesystemencoding())
355 except:
356 pass
357 return result
358
360 '''
361 Parses the launch file for C{robots} parameter to get the description of the
362 robot.
363 @return: the robot description stored in the configuration
364 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))}
365 '''
366 result = dict()
367 if self.Roscfg is not None:
368 for param, p in self.Roscfg.params.items():
369 if param.endswith('robots'):
370 if isinstance(p.value, list):
371 if len(p.value) > 0 and len(p.value[0]) != 5:
372 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param
373 else:
374 for entry in p.value:
375 result[entry[0]] = {'type': entry[1], 'name': entry[2], 'images': resolve_paths(entry[3]).split(','), 'description': resolve_paths(self._decode(entry[4]))}
376 return result
377
379 '''
380 Parses the launch file for C{capabilities} and C{capability_group} parameter
381 and creates dictionary for grouping the nodes.
382 @return: the capabilities description stored in this configuration
383 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))}
384 '''
385 result = dict()
386 capabilies_descr = dict()
387 if self.Roscfg is not None:
388
389
390
391 for param, p in self.Roscfg.params.items():
392 if param.endswith('capabilities'):
393 if isinstance(p.value, list):
394 if len(p.value) > 0 and len(p.value[0]) != 4:
395 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param
396 else:
397 for entry in p.value:
398 capabilies_descr[entry[0]] = {'type': ''.join([entry[1]]), 'images': resolve_paths(entry[2]).split(','), 'description': resolve_paths(self._decode(entry[3]))}
399
400 for item in self.Roscfg.nodes:
401 node_fullname = roslib.names.ns_join(item.namespace, item.name)
402 machine_name = item.machine_name if item.machine_name is not None and not item.machine_name == 'localhost' else ''
403 added = False
404 cap_param = roslib.names.ns_join(node_fullname, 'capability_group')
405 cap_ns = node_fullname
406
407 while cap_param not in self.Roscfg.params and cap_param.count(roslib.names.SEP) > 1:
408 cap_ns = roslib.names.namespace(cap_ns).rstrip(roslib.names.SEP)
409 if not cap_ns:
410 cap_ns = roslib.names.SEP
411 cap_param = roslib.names.ns_join(cap_ns, 'capability_group')
412 if cap_ns == node_fullname:
413 cap_ns = item.namespace.rstrip(roslib.names.SEP)
414 if not cap_ns:
415 cap_ns = roslib.names.SEP
416
417 if cap_param in self.Roscfg.params and self.Roscfg.params[cap_param].value:
418 p = self.Roscfg.params[cap_param]
419 if machine_name not in result:
420 result[machine_name] = dict()
421 for (ns, groups) in result[machine_name].items():
422 if ns == cap_ns and p.value in groups:
423 groups[p.value]['nodes'].append(node_fullname)
424 added = True
425 break
426 if not added:
427 ns = cap_ns
428
429 if ns not in result[machine_name]:
430 result[machine_name][ns] = dict()
431 if p.value not in result[machine_name][ns]:
432 try:
433 result[machine_name][ns][p.value] = {'type': capabilies_descr[p.value]['type'],
434 'images': capabilies_descr[p.value]['images'],
435 'description': capabilies_descr[p.value]['description'],
436 'nodes': []}
437 except:
438 result[machine_name][ns][p.value] = {'type': '',
439 'images': [],
440 'description': '',
441 'nodes': []}
442 result[machine_name][ns][p.value]['nodes'].append(node_fullname)
443 return result
444
446 result = dict()
447 for a in argv:
448 key, sep, value = a.partition(':=')
449 if sep:
450 result[key] = value
451 return result
452
454 '''
455 Returns a configuration node for a given node name.
456 @param name: the name of the node.
457 @type name: C{str}
458 @return: the configuration node stored in this configuration
459 @rtype: U{roslaunch.Node<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None}
460 '''
461 nodename = os.path.basename(name)
462 namespace = os.path.dirname(name).strip(roslib.names.SEP)
463 for item in self.Roscfg.nodes:
464 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace):
465 return item
466 return None
467
469 '''
470 Returns the value of the `/robot_icon` parameter or None
471 '''
472 try:
473 return self.Roscfg.params['/robot_icon'].value
474 except:
475 pass
476 return None
477