00001 """
00002 parse xml file from opcua-spec
00003 """
00004 import logging
00005 import re
00006 import sys
00007
00008 import xml.etree.ElementTree as ET
00009
00010
00011 def _to_bool(val):
00012 return val in ("True", "true", "on", "On", "1")
00013
00014
00015 def ua_type_to_python(val, uatype):
00016 if uatype.startswith("Int") or uatype.startswith("UInt"):
00017 return int(val)
00018 elif uatype.lower().startswith("bool"):
00019 return _to_bool(val)
00020 elif uatype in ("Double", "Float"):
00021 return float(val)
00022 elif uatype == "String":
00023 return val
00024 elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
00025 if sys.version_info.major > 2:
00026 return bytes(val, 'utf8')
00027 else:
00028 return val
00029 else:
00030 raise Exception("uatype nopt handled", uatype, " for val ", val)
00031
00032
00033 class NodeData(object):
00034
00035 def __init__(self):
00036 self.nodetype = None
00037 self.nodeid = None
00038 self.browsename = None
00039 self.displayname = None
00040 self.symname = None
00041 self.parent = None
00042 self.parentlink = None
00043 self.desc = ""
00044 self.typedef = None
00045 self.refs = []
00046 self.nodeclass = None
00047 self.eventnotifier = 0
00048
00049
00050 self.datatype = None
00051 self.rank = -1
00052 self.value = None
00053 self.valuetype = None
00054 self.dimensions = None
00055 self.accesslevel = None
00056 self.useraccesslevel = None
00057 self.minsample = None
00058
00059
00060 self.inversename = ""
00061 self.abstract = False
00062 self.symmetric = False
00063
00064
00065 self.definition = []
00066
00067 def __str__(self):
00068 return "NodeData(nodeid:{0})".format(self.nodeid)
00069 __repr__ = __str__
00070
00071
00072 class RefStruct(object):
00073
00074 def __init__(self):
00075 self.reftype = None
00076 self.forward = True
00077 self.target = None
00078
00079
00080 class ExtObj(object):
00081
00082 def __init__(self):
00083 self.typeid = None
00084 self.objname = None
00085 self.bodytype = None
00086 self.body = {}
00087
00088 def __str__(self):
00089 return "ExtObj({0}, {1})".format(self.objname, self.body)
00090 __repr__ = __str__
00091
00092
00093 class XMLParser(object):
00094
00095 def __init__(self, xmlpath):
00096 self.logger = logging.getLogger(__name__)
00097 self._retag = re.compile(r"(\{.*\})(.*)")
00098 self.path = xmlpath
00099
00100 self.tree = ET.parse(xmlpath)
00101 self.root = self.tree.getroot()
00102
00103 self.ns = {
00104 'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
00105 'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
00106 'xsd': "http://www.w3.org/2001/XMLSchema",
00107 'xsi': "http://www.w3.org/2001/XMLSchema-instance"
00108 }
00109
00110 def get_used_namespaces(self):
00111 """
00112 Return the used namespace uris in this import file
00113 """
00114 namespaces_uris = []
00115 for child in self.root:
00116 tag = self._retag.match(child.tag).groups()[1]
00117 if tag == 'NamespaceUris':
00118 namespaces_uris = [ns_element.text for ns_element in child]
00119 break
00120 return namespaces_uris
00121
00122 def get_aliases(self):
00123 """
00124 Return the used node aliases in this import file
00125 """
00126 aliases = {}
00127 for child in self.root:
00128 tag = self._retag.match(child.tag).groups()[1]
00129 if tag == 'Aliases':
00130 for el in child:
00131 aliases[el.attrib["Alias"]] = el.text
00132 break
00133 return aliases
00134
00135 def get_node_datas(self):
00136 nodes = []
00137 for child in self.root:
00138 tag = self._retag.match(child.tag).groups()[1]
00139 if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:
00140 node = self._parse_node(tag, child)
00141 nodes.append(node)
00142 return nodes
00143
00144 def _parse_node(self, nodetype, child):
00145 """
00146 Parse a XML node and create a NodeData object.
00147 """
00148 obj = NodeData()
00149 obj.nodetype = nodetype
00150 for key, val in child.attrib.items():
00151 self._set_attr(key, val, obj)
00152 self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
00153 obj.displayname = obj.browsename
00154 for el in child:
00155 self._parse_attr(el, obj)
00156 return obj
00157
00158 def _set_attr(self, key, val, obj):
00159 if key == "NodeId":
00160 obj.nodeid = val
00161 elif key == "BrowseName":
00162 obj.browsename = val
00163 elif key == "SymbolicName":
00164 obj.symname = val
00165 elif key == "ParentNodeId":
00166 obj.parent = val
00167 elif key == "DataType":
00168 obj.datatype = val
00169 elif key == "IsAbstract":
00170 obj.abstract = _to_bool(val)
00171 elif key == "Executable":
00172 obj.executable = _to_bool(val)
00173 elif key == "EventNotifier":
00174 obj.eventnotifier = int(val)
00175 elif key == "ValueRank":
00176 obj.rank = int(val)
00177 elif key == "ArrayDimensions":
00178 obj.dimensions = [int(i) for i in val.split(",")]
00179 elif key == "MinimumSamplingInterval":
00180 obj.minsample = int(val)
00181 elif key == "AccessLevel":
00182 obj.accesslevel = int(val)
00183 elif key == "UserAccessLevel":
00184 obj.useraccesslevel = int(val)
00185 elif key == "Symmetric":
00186 obj.symmetric = _to_bool(val)
00187 else:
00188 self.logger.info("Attribute not implemented: %s:%s", key, val)
00189
00190 def _parse_attr(self, el, obj):
00191 tag = self._retag.match(el.tag).groups()[1]
00192
00193 if tag == "DisplayName":
00194 obj.displayname = el.text
00195 elif tag == "Description":
00196 obj.desc = el.text
00197 elif tag == "References":
00198 self._parse_refs(el, obj)
00199 elif tag == "Value":
00200 self._parse_value(el, obj)
00201 elif tag == "InverseName":
00202 obj.inversename = el.text
00203 elif tag == "Definition":
00204 for field in el:
00205 obj.definition.append(field)
00206 else:
00207 self.logger.info("Not implemented tag: %s", el)
00208
00209 def _parse_value(self, val_el, obj):
00210 child_el = val_el.find(".//")
00211 if child_el is not None:
00212 ntag = self._retag.match(child_el.tag).groups()[1]
00213 else:
00214 ntag = "Null"
00215 obj.valuetype = ntag
00216
00217 if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
00218 obj.value = int(child_el.text)
00219 elif ntag in ("Float", "Double"):
00220 obj.value = float(child_el.text)
00221 elif ntag == "Boolean":
00222 obj.value = _to_bool(child_el.text)
00223 elif ntag in ("ByteString", "String"):
00224 mytext = child_el.text
00225 if mytext is None:
00226 mytext = ""
00227 mytext = mytext.replace('\n', '').replace('\r', '')
00228 obj.value = mytext
00229 elif ntag == "DateTime":
00230 obj.value = child_el.text
00231 elif ntag == "Guid":
00232 self._parse_value(child_el, obj)
00233 obj.valuetype = obj.datatype
00234 elif ntag == "LocalizedText":
00235 obj.value = self._parse_body(child_el)
00236 elif ntag == "NodeId":
00237 id_el = child_el.find("uax:Identifier", self.ns)
00238 if id_el is not None:
00239 obj.value = id_el.text
00240 elif ntag == "ListOfExtensionObject":
00241 obj.value = self._parse_list_of_extension_object(child_el)
00242 elif ntag == "ListOfLocalizedText":
00243 obj.value = self._parse_list_of_localized_text(child_el)
00244 elif ntag.startswith("ListOf"):
00245 obj.value = self._parse_list(child_el)
00246 elif ntag == "ExtensionObject":
00247 obj.value = self._parse_ext_obj(child_el)
00248 elif ntag == "Null":
00249 obj.value = None
00250 else:
00251 self.logger.warning("Parsing value of type '%s' not implemented", ntag)
00252
00253 def _get_text(self, el):
00254 txtlist = [txt.strip() for txt in el.itertext()]
00255 return "".join(txtlist)
00256
00257 def _parse_list(self, el):
00258 value = []
00259 for val_el in el:
00260 ntag = self._retag.match(val_el.tag).groups()[1]
00261 if ntag.startswith("ListOf"):
00262 val = self._parse_list(val_el)
00263 else:
00264 val = ua_type_to_python(val_el.text, ntag)
00265 value.append(val)
00266 return value
00267
00268 def _parse_list_of_localized_text(self, el):
00269 value = []
00270 for localized_text in el:
00271 ntag = self._retag.match(localized_text.tag).groups()[1]
00272 for child in localized_text:
00273 ntag = self._retag.match(child.tag).groups()[1]
00274 if ntag == 'Text':
00275 value.append(self._get_text(child))
00276 return value
00277
00278 def _parse_list_of_extension_object(self, el):
00279 """
00280 Parse a uax:ListOfExtensionObject Value
00281 Return an list of ExtObj
00282 """
00283 value = []
00284 for extension_object in el:
00285 ext_obj = self._parse_ext_obj(extension_object)
00286 value.append(ext_obj)
00287 return value
00288
00289 def _parse_ext_obj(self, el):
00290 ext = ExtObj()
00291 for extension_object_part in el:
00292 ntag = self._retag.match(extension_object_part.tag).groups()[1]
00293 if ntag == 'TypeId':
00294 ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
00295 ext.typeid = self._get_text(extension_object_part)
00296 elif ntag == 'Body':
00297 ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
00298 ext.body = self._parse_body(extension_object_part)
00299 else:
00300 print("Uknown ndtag", ntag)
00301 return ext
00302
00303 def _parse_body(self, el):
00304 body = []
00305 for body_item in el:
00306 otag = self._retag.match(body_item.tag).groups()[1]
00307 childs = [i for i in body_item]
00308 if not childs:
00309 val = self._get_text(body_item)
00310 else:
00311 val = self._parse_body(body_item)
00312 if val:
00313 body.append((otag, val))
00314 return body
00315
00316 def _parse_refs(self, el, obj):
00317 for ref in el:
00318 if ref.attrib["ReferenceType"] == "HasTypeDefinition":
00319 obj.typedef = ref.text
00320 elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
00321
00322
00323 obj.parent = ref.text
00324 obj.parentlink = ref.attrib["ReferenceType"]
00325 else:
00326 struct = RefStruct()
00327 if "IsForward" in ref.attrib:
00328 struct.forward = ref.attrib["IsForward"]
00329 struct.target = ref.text
00330 struct.reftype = ref.attrib["ReferenceType"]
00331 obj.refs.append(struct)