15 """ Simple wrapper to add XML reflection to an xml_reflection.Object class """ 23 """ What to do on an error. This can be changed to raise an exception. """ 24 print(message, file=sys.stderr)
31 value_type_prefix =
'' 35 Basic mechanism to prevent conflicts for string types for URDF and SDF 36 @note Does not handle nesting! 38 global value_type_prefix
39 value_type_prefix = namespace +
'.' 42 global value_type_prefix
43 value_type_prefix =
'' 46 if isinstance(key, str):
47 key = value_type_prefix + key
48 assert key
not in value_types
49 value_types[key] = value
52 """ Can wrap value types if needed """ 53 if value_type_prefix
and isinstance(cur_type, str):
55 curKey = value_type_prefix + cur_type
56 value_type = value_types.get(curKey)
59 if value_type
is None:
61 value_type = value_types.get(cur_type)
62 if value_type
is None:
68 if isinstance(cur_type, ValueType):
70 elif isinstance(cur_type, str):
71 if cur_type.startswith(
'vector'):
79 raise Exception(
"Invalid value type: {}".format(cur_type))
80 elif cur_type == list:
82 elif issubclass(cur_type, Object):
84 elif cur_type
in [str, float]:
87 raise Exception(
"Invalid type: {}".format(cur_type))
90 """ Primitive value type """ 91 def from_xml(self, node):
92 return self.from_string(node.text)
94 def write_xml(self, node, value):
95 """ If type has 'write_xml', this function should expect to have it's own XML already created 96 i.e., In Axis.to_sdf(self, node), 'node' would be the 'axis' element. 97 @todo Add function that makes an XML node completely independently?""" 98 node.text = self.to_string(value)
100 def equals(self, a, b):
104 def __init__(self, cur_type):
106 def to_string(self, value):
108 def from_string(self, value):
109 return self.type(value)
112 def to_string(self, values):
113 return ' '.join(values)
114 def from_string(self, text):
116 def equals(self, aValues, bValues):
117 return len(aValues) == len(bValues)
and all(a == b
for (a, b)
in zip(aValues, bValues))
120 def __init__(self, count = None):
123 def check(self, values):
124 if self.count
is not None:
125 assert len(values) == self.count,
"Invalid vector length" 127 def to_string(self, values):
129 raw = list(map(str, values))
130 return ListType.to_string(self, raw)
132 def from_string(self, text):
133 raw = ListType.from_string(self, text)
135 return list(map(float, raw))
138 """ Simple, raw XML value. Need to bugfix putting this back into a document """ 139 def from_xml(self, node):
142 def write_xml(self, node, value):
144 children = xml_children(value)
145 list(map(node.append, children))
148 def __init__(self, attribute, value_type):
149 self.attribute = attribute
150 self.value_type =
get_type(value_type)
151 def from_xml(self, node):
152 text = node.get(self.attribute)
153 return self.value_type.from_string(text)
154 def write_xml(self, node, value):
155 text = self.value_type.to_string(value)
156 node.set(self.attribute, text)
159 def __init__(self, cur_type):
162 def from_xml(self, node):
167 def write_xml(self, node, obj):
171 def __init__(self, name, typeMap):
173 self.typeMap = typeMap
175 for (key, value)
in typeMap.items():
177 self.nameMap[value] = key
179 def from_xml(self, node):
180 cur_type = self.typeMap.get(node.tag)
182 raise Exception(
"Invalid {} tag: {}".format(self.name, node.tag))
184 return value_type.from_xml(node)
186 def get_name(self, obj):
188 name = self.nameMap.get(cur_type)
190 raise Exception(
"Invalid {} type: {}".format(self.name, cur_type))
193 def write_xml(self, node, obj):
198 def __init__(self, name, typeOrder):
200 assert len(typeOrder) > 0
201 self.type_order = typeOrder
203 def from_xml(self, node):
205 for value_type
in self.type_order:
207 return value_type.from_xml(node)
208 except Exception
as e:
209 error_set.append((value_type, e))
211 out =
"Could not perform duck-typed parsing." 212 for (value_type, e)
in error_set:
213 out +=
"\nValue Type: {}\nException: {}\n".format(value_type, e)
216 def write_xml(self, node, obj):
220 """ Mirroring Gazebo's SDF api 222 @param xml_var: Xml name 223 @todo If the value_type is an object with a tag defined in it's reflection, allow it to act as the default tag name? 224 @param var: Python class variable name. By default it's the same as the XML name 226 def __init__(self, xml_var, value_type, required = True, default = None, var = None):
227 self.xml_var = xml_var
233 self.value_type =
get_type(value_type)
234 self.default = default
236 assert default
is None,
"Default does not make sense for a required field" 237 self.required = required
238 self.is_aggregate =
False 240 def set_default(self):
242 raise Exception(
"Required {} not set in XML: {}".format(self.type, self.xml_var))
243 elif not skip_default:
244 setattr(obj, self.var, self.default)
247 def __init__(self, xml_var, value_type, required = True, default = None, var = None):
248 Param.__init__(self, xml_var, value_type, required, default, var)
249 self.type =
'attribute' 251 def set_from_string(self, obj, value):
252 """ Node is the parent node in this case """ 254 setattr(obj, self.var, self.value_type.from_string(value))
256 def add_to_xml(self, obj, node):
257 value = getattr(obj, self.var)
261 raise Exception(
"Required attribute not set in object: {}".format(self.var))
262 elif not skip_default:
265 if value
is not None:
266 node.set(self.xml_var, self.value_type.to_string(value))
271 def __init__(self, xml_var, value_type, required = True, default = None, var = None, is_raw = False):
272 Param.__init__(self, xml_var, value_type, required, default, var)
273 self.type =
'element' 276 def set_from_xml(self, obj, node):
277 value = self.value_type.from_xml(node)
278 setattr(obj, self.var, value)
280 def add_to_xml(self, obj, parent):
281 value = getattr(obj, self.xml_var)
284 raise Exception(
"Required element not defined in object: {}".format(self.var))
285 elif not skip_default:
287 if value
is not None:
288 self.add_scalar_to_xml(parent, value)
290 def add_scalar_to_xml(self, parent, value):
294 node = node_add(parent, self.xml_var)
295 self.value_type.write_xml(node, value)
299 def __init__(self, xml_var, value_type, var = None, is_raw = False):
302 Element.__init__(self, xml_var, value_type, required =
False, var = var, is_raw = is_raw)
303 self.is_aggregate =
True 305 def add_from_xml(self, obj, node):
306 value = self.value_type.from_xml(node)
307 obj.add_aggregate(self.xml_var, value)
309 def set_default(self):
314 """ Small container for keeping track of what's been consumed """ 315 def __init__(self, node):
316 self.attributes = list(node.attrib.keys())
317 self.children = xml_children(node)
319 class Reflection(object):
320 def __init__(self, params = [], parent_cls = None, tag = None):
321 """ Construct a XML reflection thing 322 @param parent_cls: Parent class, to use it's reflection as well. 323 @param tag: Only necessary if you intend to use Object.write_xml_doc() 324 This does not override the name supplied in the reflection definition thing. 326 if parent_cls
is not None:
327 self.parent = parent_cls.XML_REFL
336 if isinstance(param, Element):
337 elements.append(param)
339 attributes.append(param)
344 self.attributes = attributes
345 self.attribute_map = {}
346 self.required_attribute_names = []
347 for attribute
in attributes:
348 self.attribute_map[attribute.xml_var] = attribute
349 self.paramMap[attribute.xml_var] = attribute
350 self.vars.append(attribute.var)
351 if attribute.required:
352 self.required_attribute_names.append(attribute.xml_var)
355 self.element_map = {}
356 self.required_element_names = []
359 self.scalarNames = []
360 for element
in elements:
361 self.element_map[element.xml_var] = element
362 self.paramMap[element.xml_var] = element
363 self.vars.append(element.var)
365 self.required_element_names.append(element.xml_var)
366 if element.is_aggregate:
367 self.aggregates.append(element)
369 self.scalars.append(element)
370 self.scalarNames.append(element.xml_var)
372 def set_from_xml(self, obj, node, info = None):
379 self.parent.set_from_xml(obj, node, info)
382 unset_attributes = list(self.attribute_map.keys())
383 unset_scalars = copy.copy(self.scalarNames)
386 for xml_var
in copy.copy(info.attributes):
387 attribute = self.attribute_map.get(xml_var)
388 if attribute
is not None:
389 value = node.attrib[xml_var]
390 attribute.set_from_string(obj, value)
391 unset_attributes.remove(xml_var)
392 info.attributes.remove(xml_var)
395 for child
in copy.copy(info.children):
397 element = self.element_map.get(tag)
398 if element
is not None:
399 if element.is_aggregate:
400 element.add_from_xml(obj, child)
402 if tag
in unset_scalars:
403 element.set_from_xml(obj, child)
404 unset_scalars.remove(tag)
406 on_error(
"Scalar element defined multiple times: {}".format(tag))
407 info.children.remove(child)
409 for attribute
in map(self.attribute_map.get, unset_attributes):
410 attribute.set_default()
412 for element
in map(self.element_map.get, unset_scalars):
413 element.set_default()
416 for xml_var
in info.attributes:
417 on_error(
'Unknown attribute: {}'.format(xml_var))
418 for node
in info.children:
419 on_error(
'Unknown tag: {}'.format(node.tag))
421 def add_to_xml(self, obj, node):
423 self.parent.add_to_xml(obj, node)
424 for attribute
in self.attributes:
425 attribute.add_to_xml(obj, node)
426 for element
in self.scalars:
427 element.add_to_xml(obj, node)
430 obj.add_aggregates_to_xml(node)
432 class Object(YamlReflection):
433 """ Raw python object for yaml / xml representation """ 436 def get_refl_vars(self):
437 return self.XML_REFL.vars
439 def check_valid(self):
442 def pre_write_xml(self):
443 """ If anything needs to be converted prior to dumping to xml 444 i.e., getting the names of objects and such """ 447 def write_xml(self, node):
448 """ Adds contents directly to XML node """ 451 self.XML_REFL.add_to_xml(self, node)
454 """ Creates an overarching tag and adds its contents to the node """ 455 tag = self.XML_REFL.tag
456 assert tag
is not None,
"Must define 'tag' in reflection to use this function" 457 doc = etree.Element(tag)
461 def to_xml_string(self):
462 return xml_string(self.to_xml())
464 def post_read_xml(self):
467 def read_xml(self, node):
468 self.XML_REFL.set_from_xml(self, node)
473 def from_xml(cls, node):
475 return cur_type.from_xml(node)
478 def from_xml_string(cls, xml_string):
479 node = etree.fromstring(xml_string)
480 return cls.from_xml(node)
483 def from_xml_file(cls, file_path):
484 xml_string= open(file_path,
'r').read() 485 return cls.from_xml_string(xml_string)
489 def get_aggregate_list(self, xml_var):
490 var = self.XML_REFL.paramMap[xml_var].var
491 values = getattr(self, var)
492 assert isinstance(values, list)
495 def aggregate_init(self):
496 """ Must be called in constructor! """ 497 self.aggregate_order = []
499 self.aggregate_type = {}
501 def add_aggregate(self, xml_var, obj):
502 """ NOTE: One must keep careful track of aggregate types for this system. 503 Can use 'lump_aggregates()' before writing if you don't care. """ 504 self.get_aggregate_list(xml_var).append(obj)
505 self.aggregate_order.append(obj)
506 self.aggregate_type[obj] = xml_var
508 def add_aggregates_to_xml(self, node):
509 for value
in self.aggregate_order:
510 typeName = self.aggregate_type[value]
511 element = self.XML_REFL.element_map[typeName]
512 element.add_scalar_to_xml(node, value)
514 def remove_aggregate(self, obj):
515 self.aggregate_order.remove(obj)
516 xml_var = self.aggregate_type[obj]
517 del self.aggregate_type[obj]
518 self.get_aggregate_list(xml_var).remove(obj)
520 def lump_aggregates(self):
521 """ Put all aggregate types together, just because """ 522 self.aggregate_init()
523 for param
in self.XML_REFL.aggregates:
524 for obj
in self.get_aggregate_list(param.xml_var):
525 self.add_aggregate(param.var, obj)
527 """ Compatibility """ 528 def parse(self, xml_string):
529 node = etree.fromstring(xml_string)
534 add_type(
'element_name', SimpleElementType(
'name', str))
535 add_type(
'element_value', SimpleElementType(
'value', float))
def start_namespace(namespace)
def reflect(cls, args, kwargs)