1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from python_qt_binding.QtCore import QFile, QObject, QRegExp
34 from xml.dom.minidom import parse
35 import os
36 import re
37 import sys
38 import time
39
40 import roslaunch
41 import roslib
42
43 from master_discovery_fkie.common import get_hostname, resolve_url
44 import node_manager_fkie as nm
45
46 from .common import package_name, resolve_paths, utf8
52
55 '''
56 A class to handle the ROS configuration stored in launch file.
57 '''
58
59 - def __init__(self, launch_file, package=None, masteruri=None, argv=[]):
60 '''
61 Creates the LaunchConfig object. The launch file will be not loaded on
62 creation, first on request of Roscfg value.
63 @param launch_file: The absolute or relative path with the launch file.
64 By using relative path a package must be valid for
65 remote launches.
66 @type launch_file: C{str}
67 @param package: the package containing the launch file. If None the
68 launch_file will be used to determine the launch file.
69 No remote launches a possible without a valid package.
70 @type package: C{str} or C{None}
71 @param masteruri: The URL of the ROS master.
72 @type masteruri: C{str} or C{None}
73 @param argv: the list the arguments needed for loading the given launch file
74 @type argv: C{[str]}
75 @raise roslaunch.XmlParseException: if the launch file can't be found.
76 '''
77 QObject.__init__(self)
78 self.__launchFile = launch_file
79 self.__package = package_name(os.path.dirname(self.__launchFile))[0] if package is None else package
80 self.__masteruri = masteruri if masteruri is not None else 'localhost'
81 self.__roscfg = None
82 self.argv = argv
83 self.__reqTested = False
84 self.__argv_values = dict()
85 self.global_param_done = []
86 self.hostname = get_hostname(self.__masteruri)
87 self.__launch_id = '%.9f' % time.time()
88 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, [self.__launchFile])
89
94
95 @property
97 '''
98 Returns the master URI (host) where the node of this config will be started.
99 @rtype: C{str}
100 '''
101 return self.__masteruri
102
103 @property
105 '''
106 Holds a loaded launch configuration. It raises a LaunchConfigException on load error.
107 @rtype: U{roslaunch.ROSLaunchConfig<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None}
108 @see L{load()}
109 '''
110 if self.__roscfg is not None:
111 return self.__roscfg
112 else:
113 result, _ = self.load(self.argv)
114 if not result:
115 raise LaunchConfigException("not all argv are setted properly!")
116 return self.__roscfg
117
118 @property
120 '''
121 Returns an existing path with file name or an empty string.
122 @rtype: C{str}
123 '''
124 if os.path.isfile(self.__launchFile):
125 return self.__launchFile
126 elif self.__package is not None:
127 try:
128 return roslib.packages.find_resource(self.PackageName, self.LaunchName).pop()
129 except Exception:
130 raise LaunchConfigException(''.join(['launch file ', self.LaunchName, ' not found!']))
131 raise LaunchConfigException(''.join(['launch file ', self.__launchFile, ' not found!']))
132
133 @property
135 '''
136 Returns the name of the launch file with extension, e.g. 'test.launch'
137 @rtype: C{str}
138 '''
139 return os.path.basename(self.__launchFile)
140
141 @property
143 '''
144 Returns the name of the package containing the launch file or None.
145 @rtype: C{str} or C{None}
146 '''
147 return self.__package
148
149 @classmethod
150 - def _index(cls, text, regexp_list):
151 '''
152 Searches in the given text for key indicates the including of a file and
153 return their index.
154 @param text:
155 @type text: C{str}
156 @param regexp_list:
157 @type regexp_list: C{[U{QRegExp<https://srinikom.github.io/pyside-docs/PySide/QtCore/QRegExp.html>},..]}
158 @return: the index of the including key or -1
159 @rtype: C{int}
160 '''
161 for pattern in regexp_list:
162 index = pattern.indexIn(text)
163 if index > -1:
164 return index
165 return -1
166
167 @classmethod
169 '''
170 Tries to determine the path of the included file. The statement of
171 $(find 'package') will be resolved.
172 The supported URL begins with `file:///`, `package://` or `pkg://`.
173 The package URL will be resolved to a valid file path. If the file is in a
174 subdirectory, you can replace the subdirectory by `///`.
175 @param path: the sting which contains the included path
176 @type path: C{str}
177 @param pwd: current working path
178 @type pwd: C{str}
179 @return: C{$(find 'package')} will be resolved. The prefixes `file:///`,
180 `package://` or `pkg://` are also resolved. Otherwise the parameter
181 itself will be returned.
182 @rtype: C{str}
183 '''
184 path = path.strip()
185 startIndex = path.find('$(')
186 if startIndex > -1:
187 endIndex = path.find(')', startIndex + 2)
188 script = path[startIndex + 2:endIndex].split()
189 if len(script) == 2 and (script[0] == 'find'):
190 pkg = roslib.packages.get_pkg_dir(script[1])
191 return os.path.join(pkg, path[endIndex + 2:].strip(os.path.sep))
192 elif len(path) > 0 and path[0] != os.path.sep:
193 try:
194 return resolve_url(path)
195 except ValueError, _:
196 if len(path) > 0 and path[0] != os.path.sep:
197 return os.path.normpath(''.join([pwd, os.path.sep, path]))
198 return path
199
200 @classmethod
201 - def included_files(cls, text_or_path,
202 regexp_retruns=[],
203 regexp_filelist=[QRegExp("\\btextfile\\b"),
204 QRegExp("\\bfile\\b"),
205 QRegExp("\\bdefault\\b"),
206 QRegExp("\\bvalue=.*pkg:\/\/\\b"),
207 QRegExp("\\bvalue=.*package:\/\/\\b"),
208 QRegExp("\\bvalue=.*\$\(find\\b"),
209 QRegExp("\\bargs=.*\$\(find\\b")],
210 recursive=True, unique=True):
211 '''
212 :param regexp_retruns: the list with patterns which are returned as result. If empy it's the same as 'regexp_filelist'
213 :param regexp_filelist: the list with all patterns to find include files
214 '''
215 result = []
216 lines = []
217 pwd = '.'
218 f = QFile(text_or_path)
219 if f.exists():
220 pwd = os.path.dirname(text_or_path)
221 with open(text_or_path, 'r') as f:
222 content = f.read()
223
224 comment_pattern = QRegExp("<!--.*?-->")
225 pos = comment_pattern.indexIn(content)
226 while pos != -1:
227 content = content[:pos] + content[pos + comment_pattern.matchedLength():]
228 pos = comment_pattern.indexIn(content)
229 lines = content.splitlines()
230 else:
231 lines = [text_or_path]
232 line_index = 0
233 for line in lines:
234 index = cls._index(line, regexp_filelist)
235 if index > -1:
236 startIndex = line.find('"', index)
237 if startIndex > -1:
238 endIndex = line.find('"', startIndex + 1)
239 fileName = line[startIndex + 1:endIndex]
240 if len(fileName) > 0:
241 try:
242 path = cls.interpretPath(fileName, pwd)
243 if os.path.isfile(path):
244 if not regexp_retruns or cls._index(line, regexp_retruns) > -1:
245 if not unique:
246 result.append((line_index, path))
247 else:
248 result.append(path)
249 ext = os.path.splitext(path)
250 if recursive and ext[1] in nm.settings().SEARCH_IN_EXT:
251 result += cls.included_files(path, regexp_retruns, regexp_filelist)
252 except Exception:
253 import traceback
254 print traceback.format_exc()
255 line_index += 1
256 if unique:
257 return list(set(result))
258 return result
259
260 - def load(self, argv):
261 '''
262 @param argv: the list with argv parameter needed to load the launch file.
263 The name and value are separated by C{:=}
264 @type argv: C{[str]}
265 @return: True, if the launch file was loaded
266 @rtype: boolean
267 @raise LaunchConfigException: on load errors
268 '''
269 try:
270 roscfg = roslaunch.ROSLaunchConfig()
271 loader = roslaunch.XmlLoader()
272 self.argv = self.resolveArgs(argv)
273 loader.load(self.Filename, roscfg, verbose=False, argv=self.argv)
274 self.__roscfg = roscfg
275 nm.filewatcher().add_launch(self.__masteruri, self.__launchFile, self.__launch_id, self.included_files(self.Filename))
276 if not nm.is_local(get_hostname(self.__masteruri)):
277 files = self.included_files(self.Filename,
278 regexp_retruns=[QRegExp("\\bdefault\\b"),
279 QRegExp("\\bvalue=.*pkg:\/\/\\b"),
280 QRegExp("\\bvalue=.*package:\/\/\\b"),
281 QRegExp("\\bvalue=.*\$\(find\\b"),
282 QRegExp("\\bargs=.*\$\(find\\b")])
283 nm.file_watcher_param().add_launch(self.__masteruri,
284 self.__launchFile,
285 self.__launch_id,
286 files)
287 except roslaunch.XmlParseException, e:
288 test = list(re.finditer(r"environment variable '\w+' is not set", utf8(e)))
289 message = utf8(e)
290 if test:
291 message = ''.join([message, '\n', 'environment substitution is not supported, use "arg" instead!'])
292 raise LaunchConfigException(message)
293 return True, self.argv
294
296 argv_dict = self.argvToDict(argv)
297
298 for k, _ in argv_dict.items():
299 self._replaceArg(k, argv_dict, self.__argv_values)
300 return ["%s:=%s" % (k, v) for k, v in argv_dict.items()]
301
302 - def _replaceArg(self, arg, argv_defaults, argv_values):
303 '''
304 Replace the arg-tags in the value in given argument recursively.
305 '''
306 rec_inc = 0
307 value = argv_defaults[arg]
308 arg_match = re.search(r"\$\(\s*arg\s*", value)
309 while arg_match is not None:
310 rec_inc += 1
311 endIndex = value.find(')', arg_match.end())
312 if endIndex > -1:
313 arg_name = value[arg_match.end():endIndex].strip()
314 if arg == arg_name:
315 raise LaunchConfigException("Can't resolve the argument `%s` argument: the argument referenced to itself!" % arg_name)
316 if rec_inc > 100:
317 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument: recursion depth of 100 reached!" % (arg_name, arg))
318 if arg_name in argv_defaults:
319 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_defaults[arg_name])
320 elif arg_name in argv_values:
321 argv_defaults[arg] = value.replace(value[arg_match.start():endIndex + 1], argv_values[arg_name])
322 else:
323 raise LaunchConfigException("Can't resolve the argument `%s` in `%s` argument" % (arg_name, arg))
324 else:
325 raise LaunchConfigException("Can't resolve the argument in `%s` argument: `)` not found" % arg)
326 value = argv_defaults[arg]
327 arg_match = re.search(r"\$\(\s*arg\s*", value)
328
330 '''
331 @return: a list with args being used in the roslaunch file. Only arg tags that are a direct child of <launch> will
332 be returned
333 @rtype: C{[str]}
334 @raise roslaunch.XmlParseException: on parse errors
335 '''
336 self._argv_values = dict()
337 arg_subs = []
338 args = []
339
340 for filename in [self.Filename]:
341 try:
342 if filename.endswith('.launch'):
343 args[len(args):-1] = parse(filename).getElementsByTagName('arg')
344 except Exception as e:
345 raise roslaunch.XmlParseException("Invalid roslaunch XML syntax: %s" % e)
346
347 for arg in args:
348 arg_name = arg.getAttribute("name")
349 if not arg_name:
350 raise roslaunch.XmlParseException("arg tag needs a name, xml is %s" % arg.toxml())
351
352
353 if not arg.parentNode.tagName == "launch":
354 continue
355
356 arg_default = arg.getAttribute("default")
357 arg_value = arg.getAttribute("value")
358 arg_sub = ''.join([arg_name, ':=', arg_default])
359 if (not arg_value) and arg_sub not in arg_subs:
360 arg_subs.append(arg_sub)
361 elif arg_value:
362 self.__argv_values[arg_name] = arg_value
363
364 return arg_subs
365
367 result = val.replace("\\n ", "\n")
368 try:
369 result = result.decode(sys.getfilesystemencoding())
370 except:
371 pass
372 return result
373
375 '''
376 Parses the launch file for C{robots} parameter to get the description of the
377 robot.
378 @return: the robot description stored in the configuration
379 @rtype: C{dict(robot:dict('type' :str, 'name': str, 'images' : [str], 'description': str))}
380 '''
381 result = dict()
382 if self.Roscfg is not None:
383 for param, p in self.Roscfg.params.items():
384 if param.endswith('robots'):
385 if isinstance(p.value, list):
386 if len(p.value) > 0 and len(p.value[0]) != 5:
387 print "WRONG format, expected: ['host', 'type', 'name', 'images', 'description'] -> ignore", param
388 else:
389 for entry in p.value:
390 result[entry[0]] = {'type': entry[1], 'name': entry[2], 'images': resolve_paths(entry[3]).split(','), 'description': resolve_paths(self._decode(entry[4]))}
391 return result
392
394 '''
395 Parses the launch file for C{capabilities} and C{capability_group} parameter
396 and creates dictionary for grouping the nodes.
397 @return: the capabilities description stored in this configuration
398 @rtype: C{dict(machine : dict(namespace: dict(group:dict('type' : str, 'images' : [str], 'description' : str, 'nodes' : [str]))))}
399 '''
400 result = dict()
401 capabilies_descr = dict()
402 if self.Roscfg is not None:
403
404
405
406 for param, p in self.Roscfg.params.items():
407 if param.endswith('capabilities'):
408 if isinstance(p.value, list):
409 if len(p.value) > 0 and len(p.value[0]) != 4:
410 print "WRONG format, expected: ['name', 'type', 'images', 'description'] -> ignore", param
411 else:
412 for entry in p.value:
413 capabilies_descr[entry[0]] = {'type': ''.join([entry[1]]), 'images': resolve_paths(entry[2]).split(','), 'description': resolve_paths(self._decode(entry[3]))}
414
415 for item in self.Roscfg.nodes:
416 node_fullname = roslib.names.ns_join(item.namespace, item.name)
417 machine_name = item.machine_name if item.machine_name is not None and not item.machine_name == 'localhost' else ''
418 added = False
419 cap_param = roslib.names.ns_join(node_fullname, 'capability_group')
420 cap_ns = node_fullname
421
422 while cap_param not in self.Roscfg.params and cap_param.count(roslib.names.SEP) > 1:
423 cap_ns = roslib.names.namespace(cap_ns).rstrip(roslib.names.SEP)
424 if not cap_ns:
425 cap_ns = roslib.names.SEP
426 cap_param = roslib.names.ns_join(cap_ns, 'capability_group')
427 if cap_ns == node_fullname:
428 cap_ns = item.namespace.rstrip(roslib.names.SEP)
429 if not cap_ns:
430 cap_ns = roslib.names.SEP
431
432 if cap_param in self.Roscfg.params and self.Roscfg.params[cap_param].value:
433 p = self.Roscfg.params[cap_param]
434 if machine_name not in result:
435 result[machine_name] = dict()
436 for (ns, groups) in result[machine_name].items():
437 if ns == cap_ns and p.value in groups:
438 groups[p.value]['nodes'].append(node_fullname)
439 added = True
440 break
441 if not added:
442 ns = cap_ns
443
444 if ns not in result[machine_name]:
445 result[machine_name][ns] = dict()
446 if p.value not in result[machine_name][ns]:
447 try:
448 result[machine_name][ns][p.value] = {'type': capabilies_descr[p.value]['type'],
449 'images': capabilies_descr[p.value]['images'],
450 'description': capabilies_descr[p.value]['description'],
451 'nodes': []}
452 except:
453 result[machine_name][ns][p.value] = {'type': '',
454 'images': [],
455 'description': '',
456 'nodes': []}
457 result[machine_name][ns][p.value]['nodes'].append(node_fullname)
458 return result
459
461 result = dict()
462 for a in argv:
463 key, sep, value = a.partition(':=')
464 if sep:
465 result[key] = value
466 return result
467
469 '''
470 Returns a configuration node for a given node name.
471 @param name: the name of the node.
472 @type name: C{str}
473 @return: the configuration node stored in this configuration
474 @rtype: U{roslaunch.Node<http://docs.ros.org/kinetic/api/roslaunch/html/>} or C{None}
475 '''
476 nodename = os.path.basename(name)
477 namespace = os.path.dirname(name).strip(roslib.names.SEP)
478 for item in self.Roscfg.nodes:
479 if (item.name == nodename) and (item.namespace.strip(roslib.names.SEP) == namespace):
480 return item
481 return None
482
484 '''
485 Returns the value of the `/robot_icon` parameter or None
486 '''
487 try:
488 return self.Roscfg.params['/robot_icon'].value
489 except:
490 pass
491 return None
492