xmlexporter.py
Go to the documentation of this file.
1 """
2 from a list of nodes in the address space, build an XML file
3 format is the one from opc-ua specification
4 """
5 import logging
6 from collections import OrderedDict
7 import xml.etree.ElementTree as Et
8 from copy import copy
9 
10 from opcua import ua
11 from opcua.ua import object_ids as o_ids
12 from opcua.common.ua_utils import get_base_data_type
13 
14 
15 class XmlExporter(object):
16 
17  def __init__(self, server):
18  self.logger = logging.getLogger(__name__)
19  self.server = server
20  self.aliases = {}
22 
23  node_set_attributes = OrderedDict()
24  node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
25  node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
26  node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
27  node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
28 
29  self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
30 
31  def build_etree(self, node_list, uris=None):
32  """
33  Create an XML etree object from a list of nodes; custom namespace uris are optional
34  Namespaces used by nodes are always exported for consistency.
35  Args:
36  node_list: list of Node objects for export
37  uris: list of namespace uri strings
38 
39  Returns:
40  """
41  self.logger.info('Building XML etree')
42 
43  self._add_namespaces(node_list, uris)
44 
45  # add all nodes in the list to the XML etree
46  for node in node_list:
47  self.node_to_etree(node)
48 
49  # add aliases to the XML etree
50  self._add_alias_els()
51 
52  def _add_namespaces(self, nodes, uris):
53  idxs = self._get_ns_idxs_of_nodes(nodes)
54 
55  ns_array = self.server.get_namespace_array()
56 
57  # now add index of provided uris if necessary
58  if uris:
59  self._add_idxs_from_uris(idxs, uris, ns_array)
60 
61  # now create a dict of idx_in_address_space to idx_in_exported_file
62  self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
63  ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
64 
65  # write namespaces to xml
66  self._add_namespace_uri_els(ns_to_export)
67 
68  def _make_idx_dict(self, idxs, ns_array):
69  idxs.sort()
70  addr_idx_to_xml_idx = {0: 0}
71  for xml_idx, addr_idx in enumerate(idxs):
72  if addr_idx >= len(ns_array):
73  break
74  addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
75  return addr_idx_to_xml_idx
76 
77  def _get_ns_idxs_of_nodes(self, nodes):
78  """
79  get a list of all indexes used or references by nodes
80  """
81  idxs = []
82  for node in nodes:
83  node_idxs = [node.nodeid.NamespaceIndex]
84  node_idxs.append(node.get_browse_name().NamespaceIndex)
85  node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
86  node_idxs = list(set(node_idxs)) # remove duplicates
87  for i in node_idxs:
88  if i != 0 and i not in idxs:
89  idxs.append(i)
90  return idxs
91 
92  def _add_idxs_from_uris(self, idxs, uris, ns_array):
93  for uri in uris:
94  if uri in ns_array:
95  i = ns_array.index(uri)
96  if i not in idxs:
97  idxs.append(i)
98 
99 
100  def write_xml(self, xmlpath, pretty=True):
101  """
102  Write the XML etree in the exporter object to a file
103  Args:
104  xmlpath: string representing the path/file name
105 
106  Returns:
107  """
108  # try to write the XML etree to a file
109  self.logger.info('Exporting XML file to %s', xmlpath)
110  # from IPython import embed
111  # embed()
112  if pretty:
113  self.indent(self.etree.getroot())
114  self.etree.write(xmlpath,
115  encoding='utf-8',
116  xml_declaration=True
117  )
118  else:
119  self.etree.write(xmlpath,
120  encoding='utf-8',
121  xml_declaration=True
122  )
123 
124  def dump_etree(self):
125  """
126  Dump etree to console for debugging
127  Returns:
128  """
129  self.logger.info('Dumping XML etree to console')
130  Et.dump(self.etree)
131 
132  def node_to_etree(self, node):
133  """
134  Add the necessary XML sub elements to the etree for exporting the node
135  Args:
136  node: Node object which will be added to XML etree
137 
138  Returns:
139  """
140  node_class = node.get_node_class()
141 
142  if node_class is ua.NodeClass.Object:
143  self.add_etree_object(node)
144  elif node_class is ua.NodeClass.ObjectType:
145  self.add_etree_object_type(node)
146  elif node_class is ua.NodeClass.Variable:
147  self.add_etree_variable(node)
148  elif node_class is ua.NodeClass.VariableType:
149  self.add_etree_variable_type(node)
150  elif node_class is ua.NodeClass.ReferenceType:
151  self.add_etree_reference_type(node)
152  elif node_class is ua.NodeClass.DataType:
153  self.add_etree_datatype(node)
154  elif node_class is ua.NodeClass.Method:
155  self.add_etree_method(node)
156  else:
157  self.logger.info("Exporting node class not implemented: %s ", node_class)
158 
159  def _add_sub_el(self, el, name, text):
160  child_el = Et.SubElement(el, name)
161  child_el.text = text
162  return child_el
163 
164  def _node_to_string(self, nodeid):
165  if not isinstance(nodeid, ua.NodeId):
166  nodeid = nodeid.nodeid
167 
168  if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
169  nodeid = copy(nodeid)
170  nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
171  return nodeid.to_string()
172 
173  def _bname_to_string(self, bname):
174  if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
175  bname = copy(bname)
176  bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
177  return bname.to_string()
178 
179  def _add_node_common(self, nodetype, node):
180  browsename = node.get_browse_name()
181  nodeid = node.nodeid
182  parent = node.get_parent()
183  displayname = node.get_display_name().Text.decode('utf-8')
184  desc = node.get_description().Text
185  node_el = Et.SubElement(self.etree.getroot(), nodetype)
186  node_el.attrib["NodeId"] = self._node_to_string(nodeid)
187  node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
188  if parent is not None:
189  node_class = node.get_node_class()
190  if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
191  node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
192  self._add_sub_el(node_el, 'DisplayName', displayname)
193  if desc not in (None, ""):
194  self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
195  # FIXME: add WriteMask and UserWriteMask
196  return node_el
197 
198  def add_etree_object(self, node):
199  """
200  Add a UA object element to the XML etree
201  """
202  obj_el = self._add_node_common("UAObject", node)
203  var = node.get_attribute(ua.AttributeIds.EventNotifier)
204  if var.Value.Value != 0:
205  obj_el.attrib["EventNotifier"] = str(var.Value.Value)
206  self._add_ref_els(obj_el, node)
207 
208  def add_etree_object_type(self, node):
209  """
210  Add a UA object type element to the XML etree
211  """
212  obj_el = self._add_node_common("UAObjectType", node)
213  abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
214  if abstract:
215  obj_el.attrib["IsAbstract"] = 'true'
216  self._add_ref_els(obj_el, node)
217 
218  def add_variable_common(self, node, el):
219  dtype = node.get_data_type()
220  if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
221  dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
222  self.aliases[dtype] = dtype_name
223  else:
224  dtype_name = dtype.to_string()
225  rank = node.get_value_rank()
226  if rank != -1:
227  el.attrib["ValueRank"] = str(rank)
228  dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
229  if dim.Value.Value:
230  el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
231  el.attrib["DataType"] = dtype_name
232  self.value_to_etree(el, dtype_name, dtype, node)
233 
234  def add_etree_variable(self, node):
235  """
236  Add a UA variable element to the XML etree
237  """
238  var_el = self._add_node_common("UAVariable", node)
239  self.add_variable_common(node, var_el)
240 
241  accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
242  useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
243 
244  # We only write these values if they are different from defaults
245  # Not sure where default is defined....
246  if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
247  var_el.attrib["AccessLevel"] = str(accesslevel)
248  if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
249  var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
250 
251  var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
252  if var.Value.Value:
253  var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
254  var = node.get_attribute(ua.AttributeIds.Historizing)
255  if var.Value.Value:
256  var_el.attrib["Historizing"] = 'true'
257  self._add_ref_els(var_el, node)
258 
259  def add_etree_variable_type(self, node):
260  """
261  Add a UA variable type element to the XML etree
262  """
263 
264  var_el = self._add_node_common("UAVariableType", node)
265  self.add_variable_common(node, var_el)
266 
267  abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
268  if abstract.Value.Value:
269  var_el.attrib["IsAbstract"] = "true"
270 
271  self._add_ref_els(var_el, node)
272 
273  def add_etree_method(self, node):
274  obj_el = self._add_node_common("UAMethod", node)
275 
276  var = node.get_attribute(ua.AttributeIds.Executable)
277  if var.Value.Value is False:
278  obj_el.attrib["Executable"] = "false"
279  var = node.get_attribute(ua.AttributeIds.UserExecutable)
280  if var.Value.Value is False:
281  obj_el.attrib["UserExecutable"] = "false"
282  self._add_ref_els(obj_el, node)
283 
284  def add_etree_reference_type(self, obj):
285  obj_el = self._add_node_common("UAReferenceType", obj)
286  var = obj.get_attribute(ua.AttributeIds.InverseName)
287  if var is not None and var.Value.Value is not None:
288  self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
289  self._add_ref_els(obj_el, obj)
290 
291  def add_etree_datatype(self, obj):
292  """
293  Add a UA data type element to the XML etree
294  """
295  obj_el = self._add_node_common("UADataType", obj)
296  self._add_ref_els(obj_el, obj)
297 
298  def _add_namespace_uri_els(self, uris):
299  nuris_el = Et.Element('NamespaceUris')
300 
301  for uri in uris:
302  self._add_sub_el(nuris_el, 'Uri', uri)
303 
304  self.etree.getroot().insert(0, nuris_el)
305 
306  def _add_alias_els(self):
307  aliases_el = Et.Element('Aliases')
308 
309  ordered_keys = list(self.aliases.keys())
310  ordered_keys.sort()
311  for nodeid in ordered_keys:
312  name = self.aliases[nodeid]
313  ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
314  ref_el.text = nodeid.to_string()
315 
316  self.etree.getroot().insert(0, aliases_el)
317 
318  def _add_ref_els(self, parent_el, obj):
319  refs = obj.get_references()
320  refs_el = Et.SubElement(parent_el, 'References')
321 
322  for ref in refs:
323  if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
324  ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
325  else:
326  ref_name = ref.ReferenceTypeId.to_string()
327  ref_el = Et.SubElement(refs_el, 'Reference')
328  ref_el.attrib['ReferenceType'] = ref_name
329  if not ref.IsForward:
330  ref_el.attrib['IsForward'] = 'false'
331  ref_el.text = self._node_to_string(ref.NodeId)
332 
333  self.aliases[ref.ReferenceTypeId] = ref_name
334 
335 
336  def member_to_etree(self, el, name, dtype, val):
337  member_el = Et.SubElement(el, "uax:" + name)
338  if isinstance(val, (list, tuple)):
339  for v in val:
340  self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
341  else:
342  self._val_to_etree(member_el, dtype, val)
343 
344 
345  def _val_to_etree(self, el, dtype, val):
346  if val is None:
347  val = ""
348  if dtype == ua.NodeId(ua.ObjectIds.NodeId):
349  id_el = Et.SubElement(el, "uax:Identifier")
350  id_el.text = val.to_string()
351  elif dtype == ua.NodeId(ua.ObjectIds.Guid):
352  id_el = Et.SubElement(el, "uax:String")
353  id_el.text = str(val)
354  elif not hasattr(val, "ua_types"):
355  if isinstance(val, bytes):
356  el.text = val.decode("utf-8")
357  else:
358  el.text = str(val)
359  else:
360  for name, vtype in val.ua_types.items():
361  self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
362 
363 
364  def value_to_etree(self, el, dtype_name, dtype, node):
365  var = node.get_data_value().Value
366  if var.Value is not None:
367  val_el = Et.SubElement(el, 'Value')
368  self._value_to_etree(val_el, dtype_name, dtype, var.Value)
369 
370 
371  def _value_to_etree(self, el, type_name, dtype, val):
372  if val is None:
373  return
374 
375  if isinstance(val, (list, tuple)):
376  if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
377  elname = "uax:ListOf" + type_name
378  else: # this is an extentionObject:
379  elname = "uax:ListOfExtensionObject"
380 
381  list_el = Et.SubElement(el, elname)
382  for nval in val:
383  self._value_to_etree(list_el, type_name, dtype, nval)
384  else:
385  dtype_base = get_base_data_type(self.server.get_node(dtype))
386  dtype_base = dtype_base.nodeid
387 
388  if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
389  dtype_base = ua.NodeId(ua.ObjectIds.Int32)
390  type_name = ua.ObjectIdNames[dtype_base.Identifier]
391 
392  if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
393  type_name = ua.ObjectIdNames[dtype_base.Identifier]
394  val_el = Et.SubElement(el, "uax:" + type_name)
395  self._val_to_etree(val_el, dtype, val)
396  else:
397  self._extobj_to_etree(el, type_name, dtype, val)
398 
399 
400  def _extobj_to_etree(self, val_el, name, dtype, val):
401  obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
402  type_el = Et.SubElement(obj_el, "uax:TypeId")
403  id_el = Et.SubElement(type_el, "uax:Identifier")
404  id_el.text = dtype.to_string()
405  body_el = Et.SubElement(obj_el, "uax:Body")
406  struct_el = Et.SubElement(body_el, "uax:" + name)
407  for name, vtype in val.ua_types.items():
408  self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
409 
410 
411  def indent(self, elem, level=0):
412  """
413  copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
414  it basically walks your tree and adds spaces and newlines so the tree is
415  printed in a nice way
416  """
417  i = "\n" + level * " "
418  if len(elem):
419  if not elem.text or not elem.text.strip():
420  elem.text = i + " "
421  if not elem.tail or not elem.tail.strip():
422  elem.tail = i
423  for elem in elem:
424  self.indent(elem, level + 1)
425  if not elem.tail or not elem.tail.strip():
426  elem.tail = i
427  else:
428  if level and (not elem.tail or not elem.tail.strip()):
429  elem.tail = i
def value_to_etree(self, el, dtype_name, dtype, node)
Definition: xmlexporter.py:364
def indent(self, elem, level=0)
Definition: xmlexporter.py:411
def write_xml(self, xmlpath, pretty=True)
Definition: xmlexporter.py:100
def add_variable_common(self, node, el)
Definition: xmlexporter.py:218
def _add_sub_el(self, el, name, text)
Definition: xmlexporter.py:159
def _add_namespaces(self, nodes, uris)
Definition: xmlexporter.py:52
def _add_idxs_from_uris(self, idxs, uris, ns_array)
Definition: xmlexporter.py:92
def _add_ref_els(self, parent_el, obj)
Definition: xmlexporter.py:318
def build_etree(self, node_list, uris=None)
Definition: xmlexporter.py:31
def _add_node_common(self, nodetype, node)
Definition: xmlexporter.py:179
def _make_idx_dict(self, idxs, ns_array)
Definition: xmlexporter.py:68
def _extobj_to_etree(self, val_el, name, dtype, val)
Definition: xmlexporter.py:400
def get_base_data_type(datatype)
Definition: ua_utils.py:217
def _val_to_etree(self, el, dtype, val)
Definition: xmlexporter.py:345
def _value_to_etree(self, el, type_name, dtype, val)
Definition: xmlexporter.py:371
def member_to_etree(self, el, name, dtype, val)
Definition: xmlexporter.py:336
def _get_ns_idxs_of_nodes(self, nodes)
Definition: xmlexporter.py:77


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44