xmlparser.py
Go to the documentation of this file.
00001 """
00002 parse xml file from opcua-spec
00003 """
00004 import logging
00005 import re
00006 import sys
00007 
00008 import xml.etree.ElementTree as ET
00009 
00010 
00011 def _to_bool(val):
00012     return val in ("True", "true", "on", "On", "1")
00013 
00014 
00015 def ua_type_to_python(val, uatype):
00016     if uatype.startswith("Int") or uatype.startswith("UInt"):
00017         return int(val)
00018     elif uatype.lower().startswith("bool"):
00019         return _to_bool(val)
00020     elif uatype in ("Double", "Float"):
00021         return float(val)
00022     elif uatype == "String":
00023         return val
00024     elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
00025         if sys.version_info.major > 2:
00026             return bytes(val, 'utf8')
00027         else:
00028             return val
00029     else:
00030         raise Exception("uatype nopt handled", uatype, " for val ", val)
00031 
00032 
00033 class NodeData(object):
00034 
00035     def __init__(self):
00036         self.nodetype = None
00037         self.nodeid = None
00038         self.browsename = None
00039         self.displayname = None
00040         self.symname = None  # FIXME: this param is never used, why?
00041         self.parent = None
00042         self.parentlink = None
00043         self.desc = ""
00044         self.typedef = None
00045         self.refs = []
00046         self.nodeclass = None
00047         self.eventnotifier = 0
00048 
00049         # variable
00050         self.datatype = None
00051         self.rank = -1  # check default value
00052         self.value = None
00053         self.valuetype = None
00054         self.dimensions = None
00055         self.accesslevel = None
00056         self.useraccesslevel = None
00057         self.minsample = None
00058 
00059         # referencetype
00060         self.inversename = ""
00061         self.abstract = False
00062         self.symmetric = False
00063 
00064         # datatype
00065         self.definition = []
00066 
00067     def __str__(self):
00068         return "NodeData(nodeid:{0})".format(self.nodeid)
00069     __repr__ = __str__
00070 
00071 
00072 class RefStruct(object):
00073 
00074     def __init__(self):
00075         self.reftype = None
00076         self.forward = True
00077         self.target = None
00078 
00079 
00080 class ExtObj(object):
00081 
00082     def __init__(self):
00083         self.typeid = None
00084         self.objname = None
00085         self.bodytype = None
00086         self.body = {}
00087 
00088     def __str__(self):
00089         return "ExtObj({0}, {1})".format(self.objname, self.body)
00090     __repr__ = __str__
00091 
00092 
00093 class XMLParser(object):
00094 
00095     def __init__(self, xmlpath):
00096         self.logger = logging.getLogger(__name__)
00097         self._retag = re.compile(r"(\{.*\})(.*)")
00098         self.path = xmlpath
00099 
00100         self.tree = ET.parse(xmlpath)
00101         self.root = self.tree.getroot()
00102         # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
00103         self.ns = {
00104             'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
00105             'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
00106             'xsd': "http://www.w3.org/2001/XMLSchema",
00107             'xsi': "http://www.w3.org/2001/XMLSchema-instance"
00108         }
00109 
00110     def get_used_namespaces(self):
00111         """
00112         Return the used namespace uris in this import file
00113         """
00114         namespaces_uris = []
00115         for child in self.root:
00116             tag = self._retag.match(child.tag).groups()[1]
00117             if tag == 'NamespaceUris':
00118                 namespaces_uris = [ns_element.text for ns_element in child]
00119                 break
00120         return namespaces_uris
00121 
00122     def get_aliases(self):
00123         """
00124         Return the used node aliases in this import file
00125         """
00126         aliases = {}
00127         for child in self.root:
00128             tag = self._retag.match(child.tag).groups()[1]
00129             if tag == 'Aliases':
00130                 for el in child:
00131                     aliases[el.attrib["Alias"]] = el.text
00132                 break
00133         return aliases
00134 
00135     def get_node_datas(self):
00136         nodes = []
00137         for child in self.root:
00138             tag = self._retag.match(child.tag).groups()[1]
00139             if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
00140                 node = self._parse_node(tag, child)
00141                 nodes.append(node)
00142         return nodes
00143 
00144     def _parse_node(self, nodetype, child):
00145         """
00146         Parse a XML node and create a NodeData object.
00147         """
00148         obj = NodeData()
00149         obj.nodetype = nodetype
00150         for key, val in child.attrib.items():
00151             self._set_attr(key, val, obj)
00152         self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
00153         obj.displayname = obj.browsename  # give a default value to display name
00154         for el in child:
00155             self._parse_attr(el, obj)
00156         return obj
00157 
00158     def _set_attr(self, key, val, obj):
00159         if key == "NodeId":
00160             obj.nodeid = val
00161         elif key == "BrowseName":
00162             obj.browsename = val
00163         elif key == "SymbolicName":
00164             obj.symname = val
00165         elif key == "ParentNodeId":
00166             obj.parent = val
00167         elif key == "DataType":
00168             obj.datatype = val
00169         elif key == "IsAbstract":
00170             obj.abstract = _to_bool(val)
00171         elif key == "Executable":
00172             obj.executable = _to_bool(val)
00173         elif key == "EventNotifier":
00174             obj.eventnotifier = int(val)
00175         elif key == "ValueRank":
00176             obj.rank = int(val)
00177         elif key == "ArrayDimensions":
00178             obj.dimensions = [int(i) for i in val.split(",")]
00179         elif key == "MinimumSamplingInterval":
00180             obj.minsample = int(val)
00181         elif key == "AccessLevel":
00182             obj.accesslevel = int(val)
00183         elif key == "UserAccessLevel":
00184             obj.useraccesslevel = int(val)
00185         elif key == "Symmetric":
00186             obj.symmetric = _to_bool(val)
00187         else:
00188             self.logger.info("Attribute not implemented: %s:%s", key, val)
00189 
00190     def _parse_attr(self, el, obj):
00191         tag = self._retag.match(el.tag).groups()[1]
00192 
00193         if tag == "DisplayName":
00194             obj.displayname = el.text
00195         elif tag == "Description":
00196             obj.desc = el.text
00197         elif tag == "References":
00198             self._parse_refs(el, obj)
00199         elif tag == "Value":
00200             self._parse_value(el, obj)
00201         elif tag == "InverseName":
00202             obj.inversename = el.text
00203         elif tag == "Definition":
00204             for field in el:
00205                 obj.definition.append(field)
00206         else:
00207             self.logger.info("Not implemented tag: %s", el)
00208 
00209     def _parse_value(self, val_el, obj):
00210         child_el = val_el.find(".//")  # should be only one child
00211         if child_el is not None:
00212             ntag = self._retag.match(child_el.tag).groups()[1]
00213         else:
00214             ntag = "Null"
00215         obj.valuetype = ntag
00216 
00217         if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
00218             obj.value = int(child_el.text)
00219         elif ntag in ("Float", "Double"):
00220             obj.value = float(child_el.text)
00221         elif ntag == "Boolean":
00222             obj.value = _to_bool(child_el.text)
00223         elif ntag in ("ByteString", "String"):
00224             mytext = child_el.text
00225             if mytext is None:  # support importing null strings
00226                 mytext = ""
00227             mytext = mytext.replace('\n', '').replace('\r', '')
00228             obj.value = mytext
00229         elif ntag == "DateTime":
00230             obj.value = child_el.text
00231         elif ntag == "Guid":
00232             self._parse_value(child_el, obj)
00233             obj.valuetype = obj.datatype  # override parsed string type to guid
00234         elif ntag == "LocalizedText":
00235             obj.value = self._parse_body(child_el)
00236         elif ntag == "NodeId":
00237             id_el = child_el.find("uax:Identifier", self.ns)
00238             if id_el is not None:
00239                 obj.value = id_el.text
00240         elif ntag == "ListOfExtensionObject":
00241             obj.value = self._parse_list_of_extension_object(child_el)
00242         elif ntag == "ListOfLocalizedText":
00243             obj.value = self._parse_list_of_localized_text(child_el)
00244         elif ntag.startswith("ListOf"):
00245             obj.value = self._parse_list(child_el)
00246         elif ntag == "ExtensionObject":
00247             obj.value = self._parse_ext_obj(child_el)
00248         elif ntag == "Null":
00249             obj.value = None
00250         else:
00251             self.logger.warning("Parsing value of type '%s' not implemented", ntag)
00252 
00253     def _get_text(self, el):
00254         txtlist = [txt.strip() for txt in el.itertext()]
00255         return "".join(txtlist)
00256 
00257     def _parse_list(self, el):
00258         value = []
00259         for val_el in el:
00260             ntag = self._retag.match(val_el.tag).groups()[1]
00261             if ntag.startswith("ListOf"):
00262                 val = self._parse_list(val_el)
00263             else:
00264                 val = ua_type_to_python(val_el.text, ntag)
00265             value.append(val)
00266         return value
00267 
00268     def _parse_list_of_localized_text(self, el):
00269         value = []
00270         for localized_text in el:
00271             ntag = self._retag.match(localized_text.tag).groups()[1]
00272             for child in localized_text:
00273                 ntag = self._retag.match(child.tag).groups()[1]
00274                 if ntag == 'Text':
00275                     value.append(self._get_text(child))
00276         return value
00277 
00278     def _parse_list_of_extension_object(self, el):
00279         """
00280         Parse a uax:ListOfExtensionObject Value
00281         Return an list of ExtObj
00282         """
00283         value = []
00284         for extension_object in el:
00285             ext_obj = self._parse_ext_obj(extension_object)
00286             value.append(ext_obj)
00287         return value
00288 
00289     def _parse_ext_obj(self, el):
00290         ext = ExtObj()
00291         for extension_object_part in el:
00292             ntag = self._retag.match(extension_object_part.tag).groups()[1]
00293             if ntag == 'TypeId':
00294                 ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
00295                 ext.typeid = self._get_text(extension_object_part)
00296             elif ntag == 'Body':
00297                 ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
00298                 ext.body = self._parse_body(extension_object_part)
00299             else:
00300                 print("Uknown ndtag", ntag)
00301         return ext
00302 
00303     def _parse_body(self, el):
00304         body = []
00305         for body_item in el:
00306             otag = self._retag.match(body_item.tag).groups()[1]
00307             childs = [i for i in body_item]
00308             if not childs:
00309                 val = self._get_text(body_item)
00310             else:
00311                 val = self._parse_body(body_item)
00312             if val:
00313                 body.append((otag, val))
00314         return body
00315 
00316     def _parse_refs(self, el, obj):
00317         for ref in el:
00318             if ref.attrib["ReferenceType"] == "HasTypeDefinition":
00319                 obj.typedef = ref.text
00320             elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
00321                 # if obj.parent:
00322                     # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
00323                 obj.parent = ref.text
00324                 obj.parentlink = ref.attrib["ReferenceType"]
00325             else:
00326                 struct = RefStruct()
00327                 if "IsForward" in ref.attrib:
00328                     struct.forward = ref.attrib["IsForward"]
00329                 struct.target = ref.text
00330                 struct.reftype = ref.attrib["ReferenceType"]
00331                 obj.refs.append(struct)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:24