00001 """
00002 from a list of nodes in the address space, build an XML file
00003 format is the one from opc-ua specification
00004 """
00005 import logging
00006 from collections import OrderedDict
00007 import xml.etree.ElementTree as Et
00008 from copy import copy
00009
00010 from opcua import ua
00011 from opcua.ua import object_ids as o_ids
00012 from opcua.common.ua_utils import get_base_data_type
00013
00014
00015 class XmlExporter(object):
00016
00017 def __init__(self, server):
00018 self.logger = logging.getLogger(__name__)
00019 self.server = server
00020 self.aliases = {}
00021 self._addr_idx_to_xml_idx = {}
00022
00023 node_set_attributes = OrderedDict()
00024 node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
00025 node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
00026 node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
00027 node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
00028
00029 self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
00030
00031 def build_etree(self, node_list, uris=None):
00032 """
00033 Create an XML etree object from a list of nodes; custom namespace uris are optional
00034 Namespaces used by nodes are always exported for consistency.
00035 Args:
00036 node_list: list of Node objects for export
00037 uris: list of namespace uri strings
00038
00039 Returns:
00040 """
00041 self.logger.info('Building XML etree')
00042
00043 self._add_namespaces(node_list, uris)
00044
00045
00046 for node in node_list:
00047 self.node_to_etree(node)
00048
00049
00050 self._add_alias_els()
00051
00052 def _add_namespaces(self, nodes, uris):
00053 idxs = self._get_ns_idxs_of_nodes(nodes)
00054
00055 ns_array = self.server.get_namespace_array()
00056
00057
00058 if uris:
00059 self._add_idxs_from_uris(idxs, uris, ns_array)
00060
00061
00062 self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
00063 ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
00064
00065
00066 self._add_namespace_uri_els(ns_to_export)
00067
00068 def _make_idx_dict(self, idxs, ns_array):
00069 idxs.sort()
00070 addr_idx_to_xml_idx = {0: 0}
00071 for xml_idx, addr_idx in enumerate(idxs):
00072 if addr_idx >= len(ns_array):
00073 break
00074 addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
00075 return addr_idx_to_xml_idx
00076
00077 def _get_ns_idxs_of_nodes(self, nodes):
00078 """
00079 get a list of all indexes used or references by nodes
00080 """
00081 idxs = []
00082 for node in nodes:
00083 node_idxs = [node.nodeid.NamespaceIndex]
00084 node_idxs.append(node.get_browse_name().NamespaceIndex)
00085 node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
00086 node_idxs = list(set(node_idxs))
00087 for i in node_idxs:
00088 if i != 0 and i not in idxs:
00089 idxs.append(i)
00090 return idxs
00091
00092 def _add_idxs_from_uris(self, idxs, uris, ns_array):
00093 for uri in uris:
00094 if uri in ns_array:
00095 i = ns_array.index(uri)
00096 if i not in idxs:
00097 idxs.append(i)
00098
00099
00100 def write_xml(self, xmlpath, pretty=True):
00101 """
00102 Write the XML etree in the exporter object to a file
00103 Args:
00104 xmlpath: string representing the path/file name
00105
00106 Returns:
00107 """
00108
00109 self.logger.info('Exporting XML file to %s', xmlpath)
00110
00111
00112 if pretty:
00113 self.indent(self.etree.getroot())
00114 self.etree.write(xmlpath,
00115 encoding='utf-8',
00116 xml_declaration=True
00117 )
00118 else:
00119 self.etree.write(xmlpath,
00120 encoding='utf-8',
00121 xml_declaration=True
00122 )
00123
00124 def dump_etree(self):
00125 """
00126 Dump etree to console for debugging
00127 Returns:
00128 """
00129 self.logger.info('Dumping XML etree to console')
00130 Et.dump(self.etree)
00131
00132 def node_to_etree(self, node):
00133 """
00134 Add the necessary XML sub elements to the etree for exporting the node
00135 Args:
00136 node: Node object which will be added to XML etree
00137
00138 Returns:
00139 """
00140 node_class = node.get_node_class()
00141
00142 if node_class is ua.NodeClass.Object:
00143 self.add_etree_object(node)
00144 elif node_class is ua.NodeClass.ObjectType:
00145 self.add_etree_object_type(node)
00146 elif node_class is ua.NodeClass.Variable:
00147 self.add_etree_variable(node)
00148 elif node_class is ua.NodeClass.VariableType:
00149 self.add_etree_variable_type(node)
00150 elif node_class is ua.NodeClass.ReferenceType:
00151 self.add_etree_reference_type(node)
00152 elif node_class is ua.NodeClass.DataType:
00153 self.add_etree_datatype(node)
00154 elif node_class is ua.NodeClass.Method:
00155 self.add_etree_method(node)
00156 else:
00157 self.logger.info("Exporting node class not implemented: %s ", node_class)
00158
00159 def _add_sub_el(self, el, name, text):
00160 child_el = Et.SubElement(el, name)
00161 child_el.text = text
00162 return child_el
00163
00164 def _node_to_string(self, nodeid):
00165 if not isinstance(nodeid, ua.NodeId):
00166 nodeid = nodeid.nodeid
00167
00168 if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
00169 nodeid = copy(nodeid)
00170 nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
00171 return nodeid.to_string()
00172
00173 def _bname_to_string(self, bname):
00174 if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
00175 bname = copy(bname)
00176 bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
00177 return bname.to_string()
00178
00179 def _add_node_common(self, nodetype, node):
00180 browsename = node.get_browse_name()
00181 nodeid = node.nodeid
00182 parent = node.get_parent()
00183 displayname = node.get_display_name().Text.decode('utf-8')
00184 desc = node.get_description().Text
00185 node_el = Et.SubElement(self.etree.getroot(), nodetype)
00186 node_el.attrib["NodeId"] = self._node_to_string(nodeid)
00187 node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
00188 if parent is not None:
00189 node_class = node.get_node_class()
00190 if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
00191 node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
00192 self._add_sub_el(node_el, 'DisplayName', displayname)
00193 if desc not in (None, ""):
00194 self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
00195
00196 return node_el
00197
00198 def add_etree_object(self, node):
00199 """
00200 Add a UA object element to the XML etree
00201 """
00202 obj_el = self._add_node_common("UAObject", node)
00203 var = node.get_attribute(ua.AttributeIds.EventNotifier)
00204 if var.Value.Value != 0:
00205 obj_el.attrib["EventNotifier"] = str(var.Value.Value)
00206 self._add_ref_els(obj_el, node)
00207
00208 def add_etree_object_type(self, node):
00209 """
00210 Add a UA object type element to the XML etree
00211 """
00212 obj_el = self._add_node_common("UAObjectType", node)
00213 abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
00214 if abstract:
00215 obj_el.attrib["IsAbstract"] = 'true'
00216 self._add_ref_els(obj_el, node)
00217
00218 def add_variable_common(self, node, el):
00219 dtype = node.get_data_type()
00220 if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
00221 dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
00222 self.aliases[dtype] = dtype_name
00223 else:
00224 dtype_name = dtype.to_string()
00225 rank = node.get_value_rank()
00226 if rank != -1:
00227 el.attrib["ValueRank"] = str(rank)
00228 dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
00229 if dim.Value.Value:
00230 el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
00231 el.attrib["DataType"] = dtype_name
00232 self.value_to_etree(el, dtype_name, dtype, node)
00233
00234 def add_etree_variable(self, node):
00235 """
00236 Add a UA variable element to the XML etree
00237 """
00238 var_el = self._add_node_common("UAVariable", node)
00239 self.add_variable_common(node, var_el)
00240
00241 accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
00242 useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
00243
00244
00245
00246 if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
00247 var_el.attrib["AccessLevel"] = str(accesslevel)
00248 if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
00249 var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
00250
00251 var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
00252 if var.Value.Value:
00253 var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
00254 var = node.get_attribute(ua.AttributeIds.Historizing)
00255 if var.Value.Value:
00256 var_el.attrib["Historizing"] = 'true'
00257 self._add_ref_els(var_el, node)
00258
00259 def add_etree_variable_type(self, node):
00260 """
00261 Add a UA variable type element to the XML etree
00262 """
00263
00264 var_el = self._add_node_common("UAVariableType", node)
00265 self.add_variable_common(node, var_el)
00266
00267 abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
00268 if abstract.Value.Value:
00269 var_el.attrib["IsAbstract"] = "true"
00270
00271 self._add_ref_els(var_el, node)
00272
00273 def add_etree_method(self, node):
00274 obj_el = self._add_node_common("UAMethod", node)
00275
00276 var = node.get_attribute(ua.AttributeIds.Executable)
00277 if var.Value.Value is False:
00278 obj_el.attrib["Executable"] = "false"
00279 var = node.get_attribute(ua.AttributeIds.UserExecutable)
00280 if var.Value.Value is False:
00281 obj_el.attrib["UserExecutable"] = "false"
00282 self._add_ref_els(obj_el, node)
00283
00284 def add_etree_reference_type(self, obj):
00285 obj_el = self._add_node_common("UAReferenceType", obj)
00286 var = obj.get_attribute(ua.AttributeIds.InverseName)
00287 if var is not None and var.Value.Value is not None:
00288 self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
00289 self._add_ref_els(obj_el, obj)
00290
00291 def add_etree_datatype(self, obj):
00292 """
00293 Add a UA data type element to the XML etree
00294 """
00295 obj_el = self._add_node_common("UADataType", obj)
00296 self._add_ref_els(obj_el, obj)
00297
00298 def _add_namespace_uri_els(self, uris):
00299 nuris_el = Et.Element('NamespaceUris')
00300
00301 for uri in uris:
00302 self._add_sub_el(nuris_el, 'Uri', uri)
00303
00304 self.etree.getroot().insert(0, nuris_el)
00305
00306 def _add_alias_els(self):
00307 aliases_el = Et.Element('Aliases')
00308
00309 ordered_keys = list(self.aliases.keys())
00310 ordered_keys.sort()
00311 for nodeid in ordered_keys:
00312 name = self.aliases[nodeid]
00313 ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
00314 ref_el.text = nodeid.to_string()
00315
00316 self.etree.getroot().insert(0, aliases_el)
00317
00318 def _add_ref_els(self, parent_el, obj):
00319 refs = obj.get_references()
00320 refs_el = Et.SubElement(parent_el, 'References')
00321
00322 for ref in refs:
00323 if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
00324 ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
00325 else:
00326 ref_name = ref.ReferenceTypeId.to_string()
00327 ref_el = Et.SubElement(refs_el, 'Reference')
00328 ref_el.attrib['ReferenceType'] = ref_name
00329 if not ref.IsForward:
00330 ref_el.attrib['IsForward'] = 'false'
00331 ref_el.text = self._node_to_string(ref.NodeId)
00332
00333 self.aliases[ref.ReferenceTypeId] = ref_name
00334
00335
00336 def member_to_etree(self, el, name, dtype, val):
00337 member_el = Et.SubElement(el, "uax:" + name)
00338 if isinstance(val, (list, tuple)):
00339 for v in val:
00340 self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
00341 else:
00342 self._val_to_etree(member_el, dtype, val)
00343
00344
00345 def _val_to_etree(self, el, dtype, val):
00346 if val is None:
00347 val = ""
00348 if dtype == ua.NodeId(ua.ObjectIds.NodeId):
00349 id_el = Et.SubElement(el, "uax:Identifier")
00350 id_el.text = val.to_string()
00351 elif dtype == ua.NodeId(ua.ObjectIds.Guid):
00352 id_el = Et.SubElement(el, "uax:String")
00353 id_el.text = str(val)
00354 elif not hasattr(val, "ua_types"):
00355 if isinstance(val, bytes):
00356 el.text = val.decode("utf-8")
00357 else:
00358 el.text = str(val)
00359 else:
00360 for name, vtype in val.ua_types.items():
00361 self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
00362
00363
00364 def value_to_etree(self, el, dtype_name, dtype, node):
00365 var = node.get_data_value().Value
00366 if var.Value is not None:
00367 val_el = Et.SubElement(el, 'Value')
00368 self._value_to_etree(val_el, dtype_name, dtype, var.Value)
00369
00370
00371 def _value_to_etree(self, el, type_name, dtype, val):
00372 if val is None:
00373 return
00374
00375 if isinstance(val, (list, tuple)):
00376 if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
00377 elname = "uax:ListOf" + type_name
00378 else:
00379 elname = "uax:ListOfExtensionObject"
00380
00381 list_el = Et.SubElement(el, elname)
00382 for nval in val:
00383 self._value_to_etree(list_el, type_name, dtype, nval)
00384 else:
00385 dtype_base = get_base_data_type(self.server.get_node(dtype))
00386 dtype_base = dtype_base.nodeid
00387
00388 if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
00389 dtype_base = ua.NodeId(ua.ObjectIds.Int32)
00390 type_name = ua.ObjectIdNames[dtype_base.Identifier]
00391
00392 if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
00393 type_name = ua.ObjectIdNames[dtype_base.Identifier]
00394 val_el = Et.SubElement(el, "uax:" + type_name)
00395 self._val_to_etree(val_el, dtype, val)
00396 else:
00397 self._extobj_to_etree(el, type_name, dtype, val)
00398
00399
00400 def _extobj_to_etree(self, val_el, name, dtype, val):
00401 obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
00402 type_el = Et.SubElement(obj_el, "uax:TypeId")
00403 id_el = Et.SubElement(type_el, "uax:Identifier")
00404 id_el.text = dtype.to_string()
00405 body_el = Et.SubElement(obj_el, "uax:Body")
00406 struct_el = Et.SubElement(body_el, "uax:" + name)
00407 for name, vtype in val.ua_types.items():
00408 self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
00409
00410
00411 def indent(self, elem, level=0):
00412 """
00413 copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
00414 it basically walks your tree and adds spaces and newlines so the tree is
00415 printed in a nice way
00416 """
00417 i = "\n" + level * " "
00418 if len(elem):
00419 if not elem.text or not elem.text.strip():
00420 elem.text = i + " "
00421 if not elem.tail or not elem.tail.strip():
00422 elem.tail = i
00423 for elem in elem:
00424 self.indent(elem, level + 1)
00425 if not elem.tail or not elem.tail.strip():
00426 elem.tail = i
00427 else:
00428 if level and (not elem.tail or not elem.tail.strip()):
00429 elem.tail = i