xmlexporter.py
Go to the documentation of this file.
00001 """
00002 from a list of nodes in the address space, build an XML file
00003 format is the one from opc-ua specification
00004 """
00005 import logging
00006 from collections import OrderedDict
00007 import xml.etree.ElementTree as Et
00008 from copy import copy
00009 
00010 from opcua import ua
00011 from opcua.ua import object_ids as o_ids
00012 from opcua.common.ua_utils import get_base_data_type
00013 
00014 
00015 class XmlExporter(object):
00016 
00017     def __init__(self, server):
00018         self.logger = logging.getLogger(__name__)
00019         self.server = server
00020         self.aliases = {}
00021         self._addr_idx_to_xml_idx = {}
00022 
00023         node_set_attributes = OrderedDict()
00024         node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
00025         node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
00026         node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
00027         node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
00028 
00029         self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
00030 
00031     def build_etree(self, node_list, uris=None):
00032         """
00033         Create an XML etree object from a list of nodes; custom namespace uris are optional
00034         Namespaces used by nodes are always exported for consistency.
00035         Args:
00036             node_list: list of Node objects for export
00037             uris: list of namespace uri strings
00038 
00039         Returns:
00040         """
00041         self.logger.info('Building XML etree')
00042 
00043         self._add_namespaces(node_list, uris)
00044 
00045         # add all nodes in the list to the XML etree
00046         for node in node_list:
00047             self.node_to_etree(node)
00048 
00049         # add aliases to the XML etree
00050         self._add_alias_els()
00051 
00052     def _add_namespaces(self, nodes, uris):
00053         idxs = self._get_ns_idxs_of_nodes(nodes)
00054 
00055         ns_array = self.server.get_namespace_array()
00056 
00057         # now add index of provided uris if necessary
00058         if uris:
00059             self._add_idxs_from_uris(idxs, uris, ns_array)
00060 
00061         # now create a dict of idx_in_address_space to idx_in_exported_file
00062         self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
00063         ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
00064 
00065         # write namespaces to xml
00066         self._add_namespace_uri_els(ns_to_export)
00067 
00068     def _make_idx_dict(self, idxs, ns_array):
00069         idxs.sort()
00070         addr_idx_to_xml_idx = {0: 0}
00071         for xml_idx, addr_idx in enumerate(idxs):
00072             if addr_idx >= len(ns_array):
00073                 break
00074             addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
00075         return addr_idx_to_xml_idx
00076 
00077     def _get_ns_idxs_of_nodes(self, nodes):
00078         """
00079         get a list of all indexes used or references by nodes
00080         """
00081         idxs = []
00082         for node in nodes:
00083             node_idxs = [node.nodeid.NamespaceIndex]
00084             node_idxs.append(node.get_browse_name().NamespaceIndex)
00085             node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
00086             node_idxs = list(set(node_idxs))  # remove duplicates
00087             for i in node_idxs:
00088                 if i != 0 and i not in idxs:
00089                     idxs.append(i)
00090         return idxs
00091 
00092     def _add_idxs_from_uris(self, idxs, uris, ns_array):
00093         for uri in uris:
00094             if uri in ns_array:
00095                 i = ns_array.index(uri)
00096                 if i not in idxs:
00097                     idxs.append(i)
00098 
00099 
00100     def write_xml(self, xmlpath, pretty=True):
00101         """
00102         Write the XML etree in the exporter object to a file
00103         Args:
00104             xmlpath: string representing the path/file name
00105 
00106         Returns:
00107         """
00108         # try to write the XML etree to a file
00109         self.logger.info('Exporting XML file to %s', xmlpath)
00110         # from IPython import embed
00111         # embed()
00112         if pretty:
00113             self.indent(self.etree.getroot())
00114             self.etree.write(xmlpath,
00115                              encoding='utf-8',
00116                              xml_declaration=True
00117                             )
00118         else:
00119             self.etree.write(xmlpath,
00120                              encoding='utf-8',
00121                              xml_declaration=True
00122                             )
00123 
00124     def dump_etree(self):
00125         """
00126         Dump etree to console for debugging
00127         Returns:
00128         """
00129         self.logger.info('Dumping XML etree to console')
00130         Et.dump(self.etree)
00131 
00132     def node_to_etree(self, node):
00133         """
00134         Add the necessary XML sub elements to the etree for exporting the node
00135         Args:
00136             node: Node object which will be added to XML etree
00137 
00138         Returns:
00139         """
00140         node_class = node.get_node_class()
00141 
00142         if node_class is ua.NodeClass.Object:
00143             self.add_etree_object(node)
00144         elif node_class is ua.NodeClass.ObjectType:
00145             self.add_etree_object_type(node)
00146         elif node_class is ua.NodeClass.Variable:
00147             self.add_etree_variable(node)
00148         elif node_class is ua.NodeClass.VariableType:
00149             self.add_etree_variable_type(node)
00150         elif node_class is ua.NodeClass.ReferenceType:
00151             self.add_etree_reference_type(node)
00152         elif node_class is ua.NodeClass.DataType:
00153             self.add_etree_datatype(node)
00154         elif node_class is ua.NodeClass.Method:
00155             self.add_etree_method(node)
00156         else:
00157             self.logger.info("Exporting node class not implemented: %s ", node_class)
00158 
00159     def _add_sub_el(self, el, name, text):
00160         child_el = Et.SubElement(el, name)
00161         child_el.text = text
00162         return child_el
00163 
00164     def _node_to_string(self, nodeid):
00165         if not isinstance(nodeid, ua.NodeId):
00166             nodeid = nodeid.nodeid
00167 
00168         if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
00169             nodeid = copy(nodeid)
00170             nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
00171         return nodeid.to_string()
00172 
00173     def _bname_to_string(self, bname):
00174         if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
00175             bname = copy(bname)
00176             bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
00177         return bname.to_string()
00178 
00179     def _add_node_common(self, nodetype, node):
00180         browsename = node.get_browse_name()
00181         nodeid = node.nodeid
00182         parent = node.get_parent()
00183         displayname = node.get_display_name().Text.decode('utf-8')
00184         desc = node.get_description().Text
00185         node_el = Et.SubElement(self.etree.getroot(), nodetype)
00186         node_el.attrib["NodeId"] = self._node_to_string(nodeid)
00187         node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
00188         if parent is not None:
00189             node_class = node.get_node_class()
00190             if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
00191                 node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
00192         self._add_sub_el(node_el, 'DisplayName', displayname)
00193         if desc not in (None, ""):
00194             self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
00195         # FIXME: add WriteMask and UserWriteMask
00196         return node_el
00197 
00198     def add_etree_object(self, node):
00199         """
00200         Add a UA object element to the XML etree
00201         """
00202         obj_el = self._add_node_common("UAObject", node)
00203         var = node.get_attribute(ua.AttributeIds.EventNotifier)
00204         if var.Value.Value != 0:
00205             obj_el.attrib["EventNotifier"] = str(var.Value.Value)
00206         self._add_ref_els(obj_el, node)
00207 
00208     def add_etree_object_type(self, node):
00209         """
00210         Add a UA object type element to the XML etree
00211         """
00212         obj_el = self._add_node_common("UAObjectType", node)
00213         abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
00214         if abstract:
00215             obj_el.attrib["IsAbstract"] = 'true'
00216         self._add_ref_els(obj_el, node)
00217 
00218     def add_variable_common(self, node, el):
00219         dtype = node.get_data_type()
00220         if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
00221             dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
00222             self.aliases[dtype] = dtype_name
00223         else:
00224             dtype_name = dtype.to_string()
00225         rank = node.get_value_rank()
00226         if rank != -1:
00227             el.attrib["ValueRank"] = str(rank)
00228         dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
00229         if dim.Value.Value:
00230             el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
00231         el.attrib["DataType"] = dtype_name
00232         self.value_to_etree(el, dtype_name, dtype, node)
00233 
00234     def add_etree_variable(self, node):
00235         """
00236         Add a UA variable element to the XML etree
00237         """
00238         var_el = self._add_node_common("UAVariable", node)
00239         self.add_variable_common(node, var_el)
00240 
00241         accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
00242         useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
00243 
00244         # We only write these values if they are different from defaults
00245         # Not sure where default is defined....
00246         if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
00247             var_el.attrib["AccessLevel"] = str(accesslevel)
00248         if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
00249             var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
00250 
00251         var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
00252         if var.Value.Value:
00253             var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
00254         var = node.get_attribute(ua.AttributeIds.Historizing)
00255         if var.Value.Value:
00256             var_el.attrib["Historizing"] = 'true'
00257         self._add_ref_els(var_el, node)
00258 
00259     def add_etree_variable_type(self, node):
00260         """
00261         Add a UA variable type element to the XML etree
00262         """
00263 
00264         var_el = self._add_node_common("UAVariableType", node)
00265         self.add_variable_common(node, var_el)
00266 
00267         abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
00268         if abstract.Value.Value:
00269             var_el.attrib["IsAbstract"] = "true"
00270 
00271         self._add_ref_els(var_el, node)
00272 
00273     def add_etree_method(self, node):
00274         obj_el = self._add_node_common("UAMethod", node)
00275 
00276         var = node.get_attribute(ua.AttributeIds.Executable)
00277         if var.Value.Value is False:
00278             obj_el.attrib["Executable"] = "false"
00279         var = node.get_attribute(ua.AttributeIds.UserExecutable)
00280         if var.Value.Value is False:
00281             obj_el.attrib["UserExecutable"] = "false"
00282         self._add_ref_els(obj_el, node)
00283 
00284     def add_etree_reference_type(self, obj):
00285         obj_el = self._add_node_common("UAReferenceType", obj)
00286         var = obj.get_attribute(ua.AttributeIds.InverseName)
00287         if var is not None and var.Value.Value is not None:
00288             self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
00289         self._add_ref_els(obj_el, obj)
00290 
00291     def add_etree_datatype(self, obj):
00292         """
00293         Add a UA data type element to the XML etree
00294         """
00295         obj_el = self._add_node_common("UADataType", obj)
00296         self._add_ref_els(obj_el, obj)
00297 
00298     def _add_namespace_uri_els(self, uris):
00299         nuris_el = Et.Element('NamespaceUris')
00300 
00301         for uri in uris:
00302             self._add_sub_el(nuris_el, 'Uri', uri)
00303 
00304         self.etree.getroot().insert(0, nuris_el)
00305 
00306     def _add_alias_els(self):
00307         aliases_el = Et.Element('Aliases')
00308 
00309         ordered_keys = list(self.aliases.keys())
00310         ordered_keys.sort()
00311         for nodeid in ordered_keys:
00312             name = self.aliases[nodeid]
00313             ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
00314             ref_el.text = nodeid.to_string()
00315 
00316         self.etree.getroot().insert(0, aliases_el)
00317 
00318     def _add_ref_els(self, parent_el, obj):
00319         refs = obj.get_references()
00320         refs_el = Et.SubElement(parent_el, 'References')
00321 
00322         for ref in refs:
00323             if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
00324                 ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
00325             else:
00326                 ref_name = ref.ReferenceTypeId.to_string()
00327             ref_el = Et.SubElement(refs_el, 'Reference')
00328             ref_el.attrib['ReferenceType'] = ref_name
00329             if not ref.IsForward:
00330                 ref_el.attrib['IsForward'] = 'false'
00331             ref_el.text = self._node_to_string(ref.NodeId)
00332 
00333             self.aliases[ref.ReferenceTypeId] = ref_name
00334 
00335 
00336     def member_to_etree(self, el, name, dtype, val):
00337         member_el = Et.SubElement(el, "uax:" + name)
00338         if isinstance(val, (list, tuple)):
00339             for v in val:
00340                 self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
00341         else:
00342             self._val_to_etree(member_el, dtype, val)
00343 
00344 
00345     def _val_to_etree(self, el, dtype, val):
00346         if val is None:
00347             val = ""
00348         if dtype == ua.NodeId(ua.ObjectIds.NodeId):
00349             id_el = Et.SubElement(el, "uax:Identifier")
00350             id_el.text = val.to_string()
00351         elif dtype == ua.NodeId(ua.ObjectIds.Guid):
00352             id_el = Et.SubElement(el, "uax:String")
00353             id_el.text = str(val)
00354         elif not hasattr(val, "ua_types"):
00355             if isinstance(val, bytes):
00356                 el.text = val.decode("utf-8")
00357             else:
00358                 el.text = str(val)
00359         else:
00360             for name, vtype in val.ua_types.items():
00361                 self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
00362 
00363 
00364     def value_to_etree(self, el, dtype_name, dtype, node):
00365         var = node.get_data_value().Value
00366         if var.Value is not None:
00367             val_el = Et.SubElement(el, 'Value')
00368             self._value_to_etree(val_el, dtype_name, dtype, var.Value)
00369 
00370 
00371     def _value_to_etree(self, el, type_name, dtype, val):
00372         if val is None:
00373             return
00374 
00375         if isinstance(val, (list, tuple)):
00376             if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
00377                 elname = "uax:ListOf" + type_name
00378             else:  # this is an extentionObject:
00379                 elname = "uax:ListOfExtensionObject"
00380 
00381             list_el = Et.SubElement(el, elname)
00382             for nval in val:
00383                 self._value_to_etree(list_el, type_name, dtype, nval)
00384         else:
00385             dtype_base = get_base_data_type(self.server.get_node(dtype))
00386             dtype_base = dtype_base.nodeid
00387 
00388             if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
00389                 dtype_base = ua.NodeId(ua.ObjectIds.Int32)
00390                 type_name = ua.ObjectIdNames[dtype_base.Identifier]
00391 
00392             if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
00393                 type_name = ua.ObjectIdNames[dtype_base.Identifier]
00394                 val_el = Et.SubElement(el, "uax:" + type_name)
00395                 self._val_to_etree(val_el, dtype, val)
00396             else:
00397                 self._extobj_to_etree(el, type_name, dtype, val)
00398 
00399 
00400     def _extobj_to_etree(self, val_el, name, dtype, val):
00401         obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
00402         type_el = Et.SubElement(obj_el, "uax:TypeId")
00403         id_el = Et.SubElement(type_el, "uax:Identifier")
00404         id_el.text = dtype.to_string()
00405         body_el = Et.SubElement(obj_el, "uax:Body")
00406         struct_el = Et.SubElement(body_el, "uax:" + name)
00407         for name, vtype in val.ua_types.items():
00408             self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
00409 
00410 
00411     def indent(self, elem, level=0):
00412         """
00413         copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
00414         it basically walks your tree and adds spaces and newlines so the tree is
00415         printed in a nice way
00416         """
00417         i = "\n" + level * "  "
00418         if len(elem):
00419             if not elem.text or not elem.text.strip():
00420                 elem.text = i + "  "
00421             if not elem.tail or not elem.tail.strip():
00422                 elem.tail = i
00423             for elem in elem:
00424                 self.indent(elem, level + 1)
00425             if not elem.tail or not elem.tail.strip():
00426                 elem.tail = i
00427         else:
00428             if level and (not elem.tail or not elem.tail.strip()):
00429                 elem.tail = i


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:24