xmlimporter.py
Go to the documentation of this file.
1 """
2 add nodes defined in XML to address space
3 format is the one from opc-ua specification
4 """
5 import logging
6 import uuid
7 from copy import copy
8 
9 import dateutil.parser
10 
11 from opcua import ua
12 from opcua.common import xmlparser
13 
14 
15 class XmlImporter(object):
16 
17  def __init__(self, server):
18  self.logger = logging.getLogger(__name__)
19  self.parser = None
20  self.server = server
21  self.namespaces = {}
22  self.aliases = {}
23 
24  def _map_namespaces(self, namespaces_uris):
25  """
26  creates a mapping between the namespaces in the xml file and in the server.
27  if not present the namespace is registered.
28  """
29  namespaces = {}
30  for ns_index, ns_uri in enumerate(namespaces_uris):
31  ns_server_index = self.server.register_namespace(ns_uri)
32  namespaces[ns_index + 1] = ns_server_index
33  return namespaces
34 
35  def _map_aliases(self, aliases):
36  """
37  maps the import aliases to the correct namespaces
38  """
39  aliases_mapped = {}
40  for alias, node_id in aliases.items():
41  aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
42  return aliases_mapped
43 
44  def import_xml(self, xmlpath):
45  """
46  import xml and return added nodes
47  """
48  self.logger.info("Importing XML file %s", xmlpath)
49  self.parser = xmlparser.XMLParser(xmlpath)
50 
51  dnodes = self.parser.get_node_datas()
52  dnodes = self.make_objects(dnodes)
53 
54  self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
55  self.aliases = self._map_aliases(self.parser.get_aliases())
56 
57  nodes_parsed = self._sort_nodes_by_parentid(dnodes)
58 
59  nodes = []
60  for nodedata in nodes_parsed: # self.parser:
61  if nodedata.nodetype == 'UAObject':
62  node = self.add_object(nodedata)
63  elif nodedata.nodetype == 'UAObjectType':
64  node = self.add_object_type(nodedata)
65  elif nodedata.nodetype == 'UAVariable':
66  node = self.add_variable(nodedata)
67  elif nodedata.nodetype == 'UAVariableType':
68  node = self.add_variable_type(nodedata)
69  elif nodedata.nodetype == 'UAReferenceType':
70  node = self.add_reference_type(nodedata)
71  elif nodedata.nodetype == 'UADataType':
72  node = self.add_datatype(nodedata)
73  elif nodedata.nodetype == 'UAMethod':
74  node = self.add_method(nodedata)
75  else:
76  self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
77  continue
78  nodes.append(node)
79  return nodes
80 
81  def make_objects(self, node_datas):
82  new_nodes = []
83  for ndata in node_datas:
84  ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
85  ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
86  if ndata.parent:
87  ndata.parent = ua.NodeId.from_string(ndata.parent)
88  if ndata.parentlink:
89  ndata.parentlink = self.to_nodeid(ndata.parentlink)
90  if ndata.typedef:
91  ndata.typedef = self.to_nodeid(ndata.typedef)
92  new_nodes.append(ndata)
93  return new_nodes
94 
95  def _migrate_ns(self, nodeid):
96  """
97  Check if the index of nodeid or browsename given in the xml model file
98  must be converted to a already existing namespace id based on the files
99  namespace uri
100 
101  :returns: NodeId (str)
102  """
103  if nodeid.NamespaceIndex in self.namespaces:
104  nodeid = copy(nodeid)
105  nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
106  return nodeid
107 
108  def _get_node(self, obj):
109  node = ua.AddNodesItem()
110  node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
111  node.BrowseName = self._migrate_ns(obj.browsename)
112  self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
113  node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
114  if obj.parent:
115  node.ParentNodeId = self._migrate_ns(obj.parent)
116  if obj.parentlink:
117  node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
118  if obj.typedef:
119  node.TypeDefinition = self._migrate_ns(obj.typedef)
120  return node
121 
122  def to_nodeid(self, nodeid):
123  if isinstance(nodeid, ua.NodeId):
124  return nodeid
125  elif not nodeid:
126  return ua.NodeId(ua.ObjectIds.String)
127  elif "=" in nodeid:
128  return ua.NodeId.from_string(nodeid)
129  elif hasattr(ua.ObjectIds, nodeid):
130  return ua.NodeId(getattr(ua.ObjectIds, nodeid))
131  else:
132  if nodeid in self.aliases:
133  return self.aliases[nodeid]
134  else:
135  return ua.NodeId(getattr(ua.ObjectIds, nodeid))
136 
137  def add_object(self, obj):
138  node = self._get_node(obj)
139  attrs = ua.ObjectAttributes()
140  if obj.desc:
141  attrs.Description = ua.LocalizedText(obj.desc)
142  attrs.DisplayName = ua.LocalizedText(obj.displayname)
143  attrs.EventNotifier = obj.eventnotifier
144  node.NodeAttributes = attrs
145  res = self.server.iserver.isession.add_nodes([node])
146  self._add_refs(obj)
147  res[0].StatusCode.check()
148  return res[0].AddedNodeId
149 
150  def add_object_type(self, obj):
151  node = self._get_node(obj)
152  attrs = ua.ObjectTypeAttributes()
153  if obj.desc:
154  attrs.Description = ua.LocalizedText(obj.desc)
155  attrs.DisplayName = ua.LocalizedText(obj.displayname)
156  attrs.IsAbstract = obj.abstract
157  node.NodeAttributes = attrs
158  res = self.server.iserver.isession.add_nodes([node])
159  self._add_refs(obj)
160  res[0].StatusCode.check()
161  return res[0].AddedNodeId
162 
163  def add_variable(self, obj):
164  node = self._get_node(obj)
165  attrs = ua.VariableAttributes()
166  if obj.desc:
167  attrs.Description = ua.LocalizedText(obj.desc)
168  attrs.DisplayName = ua.LocalizedText(obj.displayname)
169  attrs.DataType = self.to_nodeid(obj.datatype)
170  if obj.value is not None:
171  attrs.Value = self._add_variable_value(obj,)
172  if obj.rank:
173  attrs.ValueRank = obj.rank
174  if obj.accesslevel:
175  attrs.AccessLevel = obj.accesslevel
176  if obj.useraccesslevel:
177  attrs.UserAccessLevel = obj.useraccesslevel
178  if obj.minsample:
179  attrs.MinimumSamplingInterval = obj.minsample
180  if obj.dimensions:
181  attrs.ArrayDimensions = obj.dimensions
182  node.NodeAttributes = attrs
183  res = self.server.iserver.isession.add_nodes([node])
184  self._add_refs(obj)
185  res[0].StatusCode.check()
186  return res[0].AddedNodeId
187 
188  def _make_ext_obj(self, obj):
189  ext = getattr(ua, obj.objname)()
190  for name, val in obj.body:
191  if isinstance(val, str):
192  raise Exception("Error val should a dict", name, val)
193  else:
194  for attname, v in val:
195  self._set_attr(ext, attname, v)
196  return ext
197 
198  def _set_attr(self, obj, attname, val):
199  # tow possible values:
200  # either we get value directly
201  # or a dict if it s an object or a list
202  if isinstance(val, str):
203  pval = xmlparser.ua_type_to_python(val, obj.ua_types[attname])
204  setattr(obj, attname, pval)
205  else:
206  # so we have either an object or a list...
207  obj2 = getattr(obj, attname)
208  if isinstance(obj2, ua.NodeId): # NodeId representation does not follow common rules!!
209  for attname2, v2 in val:
210  if attname2 == "Identifier":
211  obj2 = ua.NodeId.from_string(v2)
212  setattr(obj, attname, obj2)
213  break
214  elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
215  # we probably have a list
216  my_list = []
217  for vtype, v2 in val:
218  my_list.append(xmlparser.ua_type_to_python(v2, vtype))
219  setattr(obj, attname, my_list)
220  else:
221  for attname2, v2 in val:
222  self._set_attr(obj2, attname2, v2)
223  setattr(obj, attname, obj2)
224 
225  def _add_variable_value(self, obj):
226  """
227  Returns the value for a Variable based on the objects value type.
228  """
229  self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
230  if obj.valuetype == 'ListOfExtensionObject':
231  values = []
232  for ext in obj.value:
233  extobj = self._make_ext_obj(ext)
234  values.append(extobj)
235  return values
236  elif obj.valuetype.startswith("ListOf"):
237  vtype = obj.valuetype[6:]
238  if hasattr(ua.ua_binary.Primitives, vtype):
239  return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
240  else:
241  return [getattr(ua, vtype)(v) for v in obj.value]
242  elif obj.valuetype == 'ExtensionObject':
243  extobj = self._make_ext_obj(obj.value)
244  return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
245  elif obj.valuetype == 'DateTime':
246  return ua.Variant(dateutil.parser.parse(obj.value), getattr(ua.VariantType, obj.valuetype))
247  elif obj.valuetype == 'Guid':
248  return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
249  elif obj.valuetype == 'LocalizedText':
250  ltext = ua.LocalizedText()
251  for name, val in obj.value:
252  if name == "Text":
253  ltext.Text = val.encode("utf-8")
254  else:
255  self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
256  return ua.Variant(ltext, ua.VariantType.LocalizedText)
257  elif obj.valuetype == 'NodeId':
258  return ua.Variant(ua.NodeId.from_string(obj.value))
259  else:
260  return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
261 
262  def add_variable_type(self, obj):
263  node = self._get_node(obj)
264  attrs = ua.VariableTypeAttributes()
265  if obj.desc:
266  attrs.Description = ua.LocalizedText(obj.desc)
267  attrs.DisplayName = ua.LocalizedText(obj.displayname)
268  attrs.DataType = self.to_nodeid(obj.datatype)
269  if obj.value and len(obj.value) == 1:
270  attrs.Value = obj.value[0]
271  if obj.rank:
272  attrs.ValueRank = obj.rank
273  if obj.abstract:
274  attrs.IsAbstract = obj.abstract
275  if obj.dimensions:
276  attrs.ArrayDimensions = obj.dimensions
277  node.NodeAttributes = attrs
278  res = self.server.iserver.isession.add_nodes([node])
279  self._add_refs(obj)
280  res[0].StatusCode.check()
281  return res[0].AddedNodeId
282 
283  def add_method(self, obj):
284  node = self._get_node(obj)
285  attrs = ua.MethodAttributes()
286  if obj.desc:
287  attrs.Description = ua.LocalizedText(obj.desc)
288  attrs.DisplayName = ua.LocalizedText(obj.displayname)
289  if obj.accesslevel:
290  attrs.AccessLevel = obj.accesslevel
291  if obj.useraccesslevel:
292  attrs.UserAccessLevel = obj.useraccesslevel
293  if obj.minsample:
294  attrs.MinimumSamplingInterval = obj.minsample
295  if obj.dimensions:
296  attrs.ArrayDimensions = obj.dimensions
297  node.NodeAttributes = attrs
298  res = self.server.iserver.isession.add_nodes([node])
299  self._add_refs(obj)
300  res[0].StatusCode.check()
301  return res[0].AddedNodeId
302 
303  def add_reference_type(self, obj):
304  node = self._get_node(obj)
306  if obj.desc:
307  attrs.Description = ua.LocalizedText(obj.desc)
308  attrs.DisplayName = ua.LocalizedText(obj.displayname)
309  if obj. inversename:
310  attrs.InverseName = ua.LocalizedText(obj.inversename)
311  if obj.abstract:
312  attrs.IsAbstract = obj.abstract
313  if obj.symmetric:
314  attrs.Symmetric = obj.symmetric
315  node.NodeAttributes = attrs
316  res = self.server.iserver.isession.add_nodes([node])
317  self._add_refs(obj)
318  res[0].StatusCode.check()
319  return res[0].AddedNodeId
320 
321  def add_datatype(self, obj):
322  node = self._get_node(obj)
323  attrs = ua.DataTypeAttributes()
324  if obj.desc:
325  attrs.Description = ua.LocalizedText(obj.desc)
326  attrs.DisplayName = ua.LocalizedText(obj.displayname)
327  if obj.abstract:
328  attrs.IsAbstract = obj.abstract
329  node.NodeAttributes = attrs
330  res = self.server.iserver.isession.add_nodes([node])
331  self._add_refs(obj)
332  res[0].StatusCode.check()
333  return res[0].AddedNodeId
334 
335  def _add_refs(self, obj):
336  if not obj.refs:
337  return
338  refs = []
339  for data in obj.refs:
340  ref = ua.AddReferencesItem()
341  ref.IsForward = True
342  ref.ReferenceTypeId = self.to_nodeid(data.reftype)
343  ref.SourceNodeId = self._migrate_ns(obj.nodeid)
344  ref.TargetNodeClass = ua.NodeClass.DataType
345  ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
346  refs.append(ref)
347  self.server.iserver.isession.add_references(refs)
348 
349  def _sort_nodes_by_parentid(self, ndatas):
350  """
351  Sort the list of nodes according their parent node in order to respect
352  the dependency between nodes.
353 
354  :param nodes: list of NodeDataObjects
355  :returns: list of sorted nodes
356  """
357  _ndatas = list(ndatas)
358  # list of node ids that are already sorted / inserted
359  sorted_nodes_ids = []
360  # list of sorted nodes (i.e. XML Elements)
361  sorted_ndatas = []
362  all_node_ids = [data.nodeid for data in ndatas]
363  # list of namespace indexes that are relevant for this import
364  # we can only respect ordering nodes for namespaces indexes that
365  # are defined in the xml file itself. Thus we assume that all other
366  # references namespaces are already known to the server and should
367  # not create any dependency problems (like "NodeNotFound")
368  while len(_ndatas) > 0:
369  pop_nodes = []
370  for ndata in _ndatas:
371  # Insert nodes that
372  # (1) have no parent / parent_ns is None (e.g. namespace 0)
373  # (2) ns is not in list of relevant namespaces
374  if ndata.nodeid.NamespaceIndex not in self.namespaces or \
375  ndata.parent is None or \
376  ndata.parent not in all_node_ids:
377  sorted_ndatas.append(ndata)
378  sorted_nodes_ids.append(ndata.nodeid)
379  pop_nodes.append(ndata)
380  else:
381  # Check if the nodes parent is already in the list of
382  # inserted nodes
383  if ndata.parent in sorted_nodes_ids:
384  sorted_ndatas.append(ndata)
385  sorted_nodes_ids.append(ndata.nodeid)
386  pop_nodes.append(ndata)
387  # Remove inserted nodes from the list
388  for ndata in pop_nodes:
389  _ndatas.pop(_ndatas.index(ndata))
390  return sorted_ndatas
def _sort_nodes_by_parentid(self, ndatas)
Definition: xmlimporter.py:349
def make_objects(self, node_datas)
Definition: xmlimporter.py:81
def _set_attr(self, obj, attname, val)
Definition: xmlimporter.py:198
def _map_namespaces(self, namespaces_uris)
Definition: xmlimporter.py:24


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44