manage_nodes.py
Go to the documentation of this file.
1 """
2 High level functions to create nodes
3 """
4 from opcua import ua
5 from opcua.common import node
6 from opcua.common.instantiate import instantiate
7 
8 
9 def _parse_nodeid_qname(*args):
10  try:
11  if isinstance(args[0], int):
12  nodeid = ua.NodeId(0, int(args[0]))
13  qname = ua.QualifiedName(args[1], int(args[0]))
14  return nodeid, qname
15  if isinstance(args[0], ua.NodeId):
16  nodeid = args[0]
17  elif isinstance(args[0], str):
18  nodeid = ua.NodeId.from_string(args[0])
19  else:
20  raise RuntimeError()
21  if isinstance(args[1], ua.QualifiedName):
22  qname = args[1]
23  elif isinstance(args[1], str):
24  qname = ua.QualifiedName.from_string(args[1])
25  else:
26  raise RuntimeError()
27  return nodeid, qname
28  except ua.UaError:
29  raise
30  except Exception as ex:
31  raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {0} and got exception {1}".format(args, ex))
32 
33 
34 def create_folder(parent, nodeid, bname):
35  """
36  create a child node folder
37  arguments are nodeid, browsename
38  or namespace index, name
39  """
40  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
41  return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
42 
43 
44 def create_object(parent, nodeid, bname, objecttype=None):
45  """
46  create a child node object
47  arguments are nodeid, browsename, [objecttype]
48  or namespace index, name, [objecttype]
49  if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
50  """
51  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
52  if objecttype is not None:
53  objecttype = node.Node(parent.server, objecttype)
54  nodes = instantiate(parent, objecttype, nodeid, bname)[0]
55  return nodes
56  else:
57  return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
58 
59 
60 def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
61  """
62  create a child node property
63  args are nodeid, browsename, value, [variant type]
64  or idx, name, value, [variant type]
65  """
66  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
67  var = ua.Variant(val, varianttype)
68  if datatype and isinstance(datatype, int):
69  datatype = ua.NodeId(datatype, 0)
70  if datatype and not isinstance(datatype, ua.NodeId):
71  raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
72  return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
73 
74 
75 def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
76  """
77  create a child node variable
78  args are nodeid, browsename, value, [variant type], [data type]
79  or idx, name, value, [variant type], [data type]
80  """
81  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
82  var = ua.Variant(val, varianttype)
83  if datatype and isinstance(datatype, int):
84  datatype = ua.NodeId(datatype, 0)
85  if datatype and not isinstance(datatype, ua.NodeId):
86  raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
87 
88  return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
89 
90 
91 def create_variable_type(parent, nodeid, bname, datatype):
92  """
93  Create a new variable type
94  args are nodeid, browsename and datatype
95  or idx, name and data type
96  """
97  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
98  if datatype and not isinstance(datatype, ua.NodeId):
99  raise RuntimeError("Data type should be nodeid, got {0}".format(datatype))
100  addnode = ua.AddNodesItem()
101  addnode.RequestedNewNodeId = nodeid
102  addnode.BrowseName = qname
103  addnode.NodeClass = ua.NodeClass.Variable
104  addnode.ParentNodeId = parent.nodeid
105  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
106  attrs = ua.VariableTypeAttributes()
107  attrs.Description = ua.LocalizedText(qname.Name)
108  attrs.DisplayName = ua.LocalizedText(qname.Name)
109  attrs.DataType = datatype
110  attrs.IsAbstract = False
111  attrs.WriteMask = 0
112  attrs.UserWriteMask = 0
113  addnode.NodeAttributes = attrs
114  results = parent.server.add_nodes([addnode])
115  results[0].StatusCode.check()
116  return results[0].AddedNodeId
117 
118 
119 def create_reference_type(parent, nodeid, bname):
120  """
121  Create a new reference type
122  args are nodeid and browsename
123  or idx and name
124  """
125  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
126  addnode = ua.AddNodesItem()
127  addnode.RequestedNewNodeId = nodeid
128  addnode.BrowseName = qname
129  addnode.NodeClass = ua.NodeClass.Variable
130  addnode.ParentNodeId = parent.nodeid
131  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
133  attrs.IsAbstract = False
134  attrs.Description = ua.LocalizedText(qname.Name)
135  attrs.DisplayName = ua.LocalizedText(qname.Name)
136  attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
137  attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
138  addnode.NodeAttributes = attrs
139  results = parent.server.add_nodes([addnode])
140  results[0].StatusCode.check()
141  return results[0].AddedNodeId
142 
143 
144 def create_object_type(parent, nodeid, bname):
145  """
146  Create a new object type to be instanciated in address space.
147  arguments are nodeid, browsename
148  or namespace index, name
149  """
150  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
151  return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
152 
153 
154 def create_method(parent, *args):
155  """
156  create a child method object
157  This is only possible on server side!!
158  args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
159  or idx, name, method_to_be_called, [input argument types], [output argument types]
160  if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
161  a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
162  """
163  nodeid, qname = _parse_nodeid_qname(*args[:2])
164  callback = args[2]
165  if len(args) > 3:
166  inputs = args[3]
167  else:
168  inputs = []
169  if len(args) > 4:
170  outputs = args[4]
171  else:
172  outputs = []
173  return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
174 
175 
176 def _create_object(server, parentnodeid, nodeid, qname, objecttype):
177  addnode = ua.AddNodesItem()
178  addnode.RequestedNewNodeId = nodeid
179  addnode.BrowseName = qname
180  addnode.ParentNodeId = parentnodeid
181  if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
182  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
183  else:
184  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
185  addnode.NodeClass = ua.NodeClass.Object
186  if isinstance(objecttype, int):
187  addnode.TypeDefinition = ua.NodeId(objecttype)
188  else:
189  addnode.TypeDefinition = objecttype
190  attrs = ua.ObjectAttributes()
191  attrs.EventNotifier = 0
192 
193  attrs.Description = ua.LocalizedText(qname.Name)
194  attrs.DisplayName = ua.LocalizedText(qname.Name)
195  attrs.WriteMask = 0
196  attrs.UserWriteMask = 0
197  addnode.NodeAttributes = attrs
198  results = server.add_nodes([addnode])
199  results[0].StatusCode.check()
200  return results[0].AddedNodeId
201 
202 
203 def _create_object_type(server, parentnodeid, nodeid, qname):
204  addnode = ua.AddNodesItem()
205  addnode.RequestedNewNodeId = nodeid
206  addnode.BrowseName = qname
207  addnode.ParentNodeId = parentnodeid
208  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
209  addnode.NodeClass = ua.NodeClass.ObjectType
210  attrs = ua.ObjectTypeAttributes()
211  attrs.IsAbstract = False
212  attrs.Description = ua.LocalizedText(qname.Name)
213  attrs.DisplayName = ua.LocalizedText(qname.Name)
214  attrs.WriteMask = 0
215  attrs.UserWriteMask = 0
216  addnode.NodeAttributes = attrs
217  results = server.add_nodes([addnode])
218  results[0].StatusCode.check()
219  return results[0].AddedNodeId
220 
221 
222 def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
223  addnode = ua.AddNodesItem()
224  addnode.RequestedNewNodeId = nodeid
225  addnode.BrowseName = qname
226  addnode.NodeClass = ua.NodeClass.Variable
227  addnode.ParentNodeId = parentnodeid
228  if isproperty:
229  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
230  addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
231  else:
232  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
233  addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
234  attrs = ua.VariableAttributes()
235  attrs.Description = ua.LocalizedText(qname.Name)
236  attrs.DisplayName = ua.LocalizedText(qname.Name)
237  if datatype:
238  attrs.DataType = datatype
239  else:
240  attrs.DataType = _guess_datatype(var)
241 
242  attrs.Value = var
243  if not isinstance(var.Value, (list, tuple)):
244  attrs.ValueRank = ua.ValueRank.Scalar
245  else:
246  if var.Dimensions:
247  attrs.ValueRank = len(var.Dimensions)
248  attrs.ArrayDimensions = var.Dimensions
249  attrs.WriteMask = 0
250  attrs.UserWriteMask = 0
251  attrs.Historizing = 0
252  attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
253  attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
254  addnode.NodeAttributes = attrs
255  results = server.add_nodes([addnode])
256  results[0].StatusCode.check()
257  return results[0].AddedNodeId
258 
259 
260 def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
261  addnode = ua.AddNodesItem()
262  addnode.RequestedNewNodeId = nodeid
263  addnode.BrowseName = qname
264  addnode.NodeClass = ua.NodeClass.VariableType
265  addnode.ParentNodeId = parentnodeid
266  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
267  #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
268  attrs = ua.VariableTypeAttributes()
269  attrs.Description = ua.LocalizedText(qname.Name)
270  attrs.DisplayName = ua.LocalizedText(qname.Name)
271  attrs.DataType = datatype
272  if value:
273  attrs.Value = value
274  if isinstance(value, (list, tuple)):
275  attrs.ValueRank = ua.ValueRank.OneDimension
276  else:
277  attrs.ValueRank = ua.ValueRank.Scalar
278  #attrs.ArrayDimensions = None
279  attrs.WriteMask = 0
280  attrs.UserWriteMask = 0
281  attrs.Historizing = 0
282  attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
283  attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
284  addnode.NodeAttributes = attrs
285  results = server.add_nodes([addnode])
286  results[0].StatusCode.check()
287  return results[0].AddedNodeId
288 
289 
290 def create_data_type(parent, nodeid, bname, description=None):
291  """
292  Create a new data type to be used in new variables, etc ..
293  arguments are nodeid, browsename
294  or namespace index, name
295  """
296  nodeid, qname = _parse_nodeid_qname(nodeid, bname)
297 
298  addnode = ua.AddNodesItem()
299  addnode.RequestedNewNodeId = nodeid
300  addnode.BrowseName = qname
301  addnode.NodeClass = ua.NodeClass.DataType
302  addnode.ParentNodeId = parent.nodeid
303  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
304  #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
305  attrs = ua.DataTypeAttributes()
306  if description is None:
307  attrs.Description = ua.LocalizedText(qname.Name)
308  else:
309  attrs.Description = ua.LocalizedText(description)
310  attrs.DisplayName = ua.LocalizedText(qname.Name)
311  attrs.WriteMask = 0
312  attrs.UserWriteMask = 0
313  attrs.IsAbstract = False # True mean they cannot be instanciated
314  addnode.NodeAttributes = attrs
315  results = parent.server.add_nodes([addnode])
316  results[0].StatusCode.check()
317  return node.Node(parent.server, results[0].AddedNodeId)
318 
319 
320 def _create_method(parent, nodeid, qname, callback, inputs, outputs):
321  addnode = ua.AddNodesItem()
322  addnode.RequestedNewNodeId = nodeid
323  addnode.BrowseName = qname
324  addnode.NodeClass = ua.NodeClass.Method
325  addnode.ParentNodeId = parent.nodeid
326  addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
327  #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
328  attrs = ua.MethodAttributes()
329  attrs.Description = ua.LocalizedText(qname.Name)
330  attrs.DisplayName = ua.LocalizedText(qname.Name)
331  attrs.WriteMask = 0
332  attrs.UserWriteMask = 0
333  attrs.Executable = True
334  attrs.UserExecutable = True
335  addnode.NodeAttributes = attrs
336  results = parent.server.add_nodes([addnode])
337  results[0].StatusCode.check()
338  method = node.Node(parent.server, results[0].AddedNodeId)
339  if inputs:
340  create_property(method,
341  ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
342  ua.QualifiedName("InputArguments", 0),
343  [_vtype_to_argument(vtype) for vtype in inputs],
344  varianttype=ua.VariantType.ExtensionObject,
345  datatype=ua.ObjectIds.Argument)
346  if outputs:
347  create_property(method,
348  ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
349  ua.QualifiedName("OutputArguments", 0),
350  [_vtype_to_argument(vtype) for vtype in outputs],
351  varianttype=ua.VariantType.ExtensionObject,
352  datatype=ua.ObjectIds.Argument)
353  parent.server.add_method_callback(method.nodeid, callback)
354  return results[0].AddedNodeId
355 
356 
358  if isinstance(vtype, ua.Argument):
359  return vtype
360  arg = ua.Argument()
361  if isinstance(vtype, ua.VariantType):
362  arg.DataType = ua.NodeId(vtype.value)
363  else:
364  arg.DataType = ua.NodeId(vtype)
365  return arg
366 
367 
368 def _guess_datatype(variant):
369  if variant.VariantType == ua.VariantType.ExtensionObject:
370  if variant.Value is None:
371  raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
372  if type(variant.Value) in (list, tuple):
373  if len(variant.Value) == 0:
374  raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
375  extobj = variant.Value[0]
376  else:
377  extobj = variant.Value
378  classname = extobj.__class__.__name__
379  return ua.NodeId(getattr(ua.ObjectIds, classname))
380  else:
381  return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
382 
383 
384 def delete_nodes(server, nodes, recursive=False):
385  """
386  Delete specified nodes. Optionally delete recursively all nodes with a
387  downward hierachic references to the node
388  """
389  nodestodelete = []
390  if recursive:
391  nodes += _add_childs(nodes)
392  for mynode in nodes:
393  it = ua.DeleteNodesItem()
394  it.NodeId = mynode.nodeid
395  it.DeleteTargetReferences = True
396  nodestodelete.append(it)
397  params = ua.DeleteNodesParameters()
398  params.NodesToDelete = nodestodelete
399  return server.delete_nodes(params)
400 
401 
402 def _add_childs(nodes):
403  results = []
404  for mynode in nodes[:]:
405  results += mynode.get_children()
406  return results
407 
408 
def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False)
def instantiate(parent, node_type, nodeid=None, bname=None, idx=0)
Definition: instantiate.py:12
def create_object(parent, nodeid, bname, objecttype=None)
Definition: manage_nodes.py:44
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None)
def create_folder(parent, nodeid, bname)
Definition: manage_nodes.py:34
def create_data_type(parent, nodeid, bname, description=None)
def _guess_datatype(variant)
def _create_method(parent, nodeid, qname, callback, inputs, outputs)
def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None)
Definition: manage_nodes.py:60
def delete_nodes(server, nodes, recursive=False)
def create_object_type(parent, nodeid, bname)
def _create_object_type(server, parentnodeid, nodeid, qname)
def create_reference_type(parent, nodeid, bname)
def create_variable_type(parent, nodeid, bname, datatype)
Definition: manage_nodes.py:91
def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None)
Definition: manage_nodes.py:75
def create_method(parent, args)
def _create_object(server, parentnodeid, nodeid, qname, objecttype)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44