manage_nodes.py
Go to the documentation of this file.
00001 """
00002 High level functions to create nodes
00003 """
00004 from opcua import ua
00005 from opcua.common import node
00006 from opcua.common.instantiate import instantiate
00007 
00008 
00009 def _parse_nodeid_qname(*args):
00010     try:
00011         if isinstance(args[0], int):
00012             nodeid = ua.NodeId(0, int(args[0]))
00013             qname = ua.QualifiedName(args[1], int(args[0]))
00014             return nodeid, qname
00015         if isinstance(args[0], ua.NodeId):
00016             nodeid = args[0]
00017         elif isinstance(args[0], str):
00018             nodeid = ua.NodeId.from_string(args[0])
00019         else:
00020             raise RuntimeError()
00021         if isinstance(args[1], ua.QualifiedName):
00022             qname = args[1]
00023         elif isinstance(args[1], str):
00024             qname = ua.QualifiedName.from_string(args[1])
00025         else:
00026             raise RuntimeError()
00027         return nodeid, qname
00028     except ua.UaError:
00029         raise
00030     except Exception as ex:
00031         raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {0} and got exception {1}".format(args, ex))
00032 
00033 
00034 def create_folder(parent, nodeid, bname):
00035     """
00036     create a child node folder
00037     arguments are nodeid, browsename
00038     or namespace index, name
00039     """
00040     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00041     return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
00042 
00043 
00044 def create_object(parent, nodeid, bname, objecttype=None):
00045     """
00046     create a child node object
00047     arguments are nodeid, browsename, [objecttype]
00048     or namespace index, name, [objecttype]
00049     if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
00050     """
00051     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00052     if objecttype is not None:
00053         objecttype = node.Node(parent.server, objecttype)
00054         nodes = instantiate(parent, objecttype, nodeid, bname)[0]
00055         return nodes
00056     else:
00057         return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
00058 
00059 
00060 def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
00061     """
00062     create a child node property
00063     args are nodeid, browsename, value, [variant type]
00064     or idx, name, value, [variant type]
00065     """
00066     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00067     var = ua.Variant(val, varianttype)
00068     if datatype and isinstance(datatype, int):
00069         datatype = ua.NodeId(datatype, 0)
00070     if datatype and not isinstance(datatype, ua.NodeId):
00071         raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
00072     return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
00073 
00074 
00075 def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
00076     """
00077     create a child node variable
00078     args are nodeid, browsename, value, [variant type], [data type]
00079     or idx, name, value, [variant type], [data type]
00080     """
00081     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00082     var = ua.Variant(val, varianttype)
00083     if datatype and isinstance(datatype, int):
00084         datatype = ua.NodeId(datatype, 0)
00085     if datatype and not isinstance(datatype, ua.NodeId):
00086         raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
00087 
00088     return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
00089 
00090 
00091 def create_variable_type(parent, nodeid, bname, datatype):
00092     """
00093     Create a new variable type
00094     args are nodeid, browsename and datatype
00095     or idx, name and data type
00096     """
00097     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00098     if datatype and not isinstance(datatype, ua.NodeId):
00099         raise RuntimeError("Data type should be nodeid, got {0}".format(datatype))
00100     addnode = ua.AddNodesItem()
00101     addnode.RequestedNewNodeId = nodeid
00102     addnode.BrowseName = qname
00103     addnode.NodeClass = ua.NodeClass.Variable
00104     addnode.ParentNodeId = parent.nodeid
00105     addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
00106     attrs = ua.VariableTypeAttributes()
00107     attrs.Description = ua.LocalizedText(qname.Name)
00108     attrs.DisplayName = ua.LocalizedText(qname.Name)
00109     attrs.DataType = datatype
00110     attrs.IsAbstract = False
00111     attrs.WriteMask = 0
00112     attrs.UserWriteMask = 0
00113     addnode.NodeAttributes = attrs
00114     results = parent.server.add_nodes([addnode])
00115     results[0].StatusCode.check()
00116     return results[0].AddedNodeId
00117 
00118 
00119 def create_reference_type(parent, nodeid, bname):
00120     """
00121     Create a new reference type
00122     args are nodeid and browsename
00123     or idx and name
00124     """
00125     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00126     addnode = ua.AddNodesItem()
00127     addnode.RequestedNewNodeId = nodeid
00128     addnode.BrowseName = qname
00129     addnode.NodeClass = ua.NodeClass.Variable
00130     addnode.ParentNodeId = parent.nodeid
00131     addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
00132     attrs = ua.ReferenceTypeAttributes()
00133     attrs.IsAbstract = False
00134     attrs.Description = ua.LocalizedText(qname.Name)
00135     attrs.DisplayName = ua.LocalizedText(qname.Name)
00136     attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
00137     attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
00138     addnode.NodeAttributes = attrs
00139     results = parent.server.add_nodes([addnode])
00140     results[0].StatusCode.check()
00141     return results[0].AddedNodeId
00142 
00143 
00144 def create_object_type(parent, nodeid, bname):
00145     """
00146     Create a new object type to be instanciated in address space.
00147     arguments are nodeid, browsename
00148     or namespace index, name
00149     """
00150     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00151     return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
00152 
00153 
00154 def create_method(parent, *args):
00155     """
00156     create a child method object
00157     This is only possible on server side!!
00158     args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
00159     or idx, name, method_to_be_called, [input argument types], [output argument types]
00160     if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
00161     a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
00162     """
00163     nodeid, qname = _parse_nodeid_qname(*args[:2])
00164     callback = args[2]
00165     if len(args) > 3:
00166         inputs = args[3]
00167     else:
00168         inputs = []
00169     if len(args) > 4:
00170         outputs = args[4]
00171     else:
00172         outputs = []
00173     return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
00174 
00175 
00176 def _create_object(server, parentnodeid, nodeid, qname, objecttype):
00177     addnode = ua.AddNodesItem()
00178     addnode.RequestedNewNodeId = nodeid
00179     addnode.BrowseName = qname
00180     addnode.ParentNodeId = parentnodeid
00181     if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
00182         addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
00183     else:
00184         addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
00185     addnode.NodeClass = ua.NodeClass.Object
00186     if isinstance(objecttype, int):
00187         addnode.TypeDefinition = ua.NodeId(objecttype)
00188     else:
00189         addnode.TypeDefinition = objecttype
00190     attrs = ua.ObjectAttributes()
00191     attrs.EventNotifier = 0
00192 
00193     attrs.Description = ua.LocalizedText(qname.Name)
00194     attrs.DisplayName = ua.LocalizedText(qname.Name)
00195     attrs.WriteMask = 0
00196     attrs.UserWriteMask = 0
00197     addnode.NodeAttributes = attrs
00198     results = server.add_nodes([addnode])
00199     results[0].StatusCode.check()
00200     return results[0].AddedNodeId
00201 
00202 
00203 def _create_object_type(server, parentnodeid, nodeid, qname):
00204     addnode = ua.AddNodesItem()
00205     addnode.RequestedNewNodeId = nodeid
00206     addnode.BrowseName = qname
00207     addnode.ParentNodeId = parentnodeid
00208     addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
00209     addnode.NodeClass = ua.NodeClass.ObjectType
00210     attrs = ua.ObjectTypeAttributes()
00211     attrs.IsAbstract = False
00212     attrs.Description = ua.LocalizedText(qname.Name)
00213     attrs.DisplayName = ua.LocalizedText(qname.Name)
00214     attrs.WriteMask = 0
00215     attrs.UserWriteMask = 0
00216     addnode.NodeAttributes = attrs
00217     results = server.add_nodes([addnode])
00218     results[0].StatusCode.check()
00219     return results[0].AddedNodeId
00220 
00221 
00222 def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
00223     addnode = ua.AddNodesItem()
00224     addnode.RequestedNewNodeId = nodeid
00225     addnode.BrowseName = qname
00226     addnode.NodeClass = ua.NodeClass.Variable
00227     addnode.ParentNodeId = parentnodeid
00228     if isproperty:
00229         addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
00230         addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
00231     else:
00232         addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
00233         addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
00234     attrs = ua.VariableAttributes()
00235     attrs.Description = ua.LocalizedText(qname.Name)
00236     attrs.DisplayName = ua.LocalizedText(qname.Name)
00237     if datatype:
00238         attrs.DataType = datatype
00239     else:
00240         attrs.DataType = _guess_datatype(var)
00241 
00242     attrs.Value = var
00243     if not isinstance(var.Value, (list, tuple)):
00244         attrs.ValueRank = ua.ValueRank.Scalar
00245     else:
00246         if var.Dimensions:
00247             attrs.ValueRank = len(var.Dimensions)
00248             attrs.ArrayDimensions = var.Dimensions
00249     attrs.WriteMask = 0
00250     attrs.UserWriteMask = 0
00251     attrs.Historizing = 0
00252     attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
00253     attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
00254     addnode.NodeAttributes = attrs
00255     results = server.add_nodes([addnode])
00256     results[0].StatusCode.check()
00257     return results[0].AddedNodeId
00258 
00259 
00260 def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
00261     addnode = ua.AddNodesItem()
00262     addnode.RequestedNewNodeId = nodeid
00263     addnode.BrowseName = qname
00264     addnode.NodeClass = ua.NodeClass.VariableType
00265     addnode.ParentNodeId = parentnodeid
00266     addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
00267     #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
00268     attrs = ua.VariableTypeAttributes()
00269     attrs.Description = ua.LocalizedText(qname.Name)
00270     attrs.DisplayName = ua.LocalizedText(qname.Name)
00271     attrs.DataType = datatype
00272     if value:
00273         attrs.Value = value
00274         if isinstance(value, (list, tuple)):
00275             attrs.ValueRank = ua.ValueRank.OneDimension
00276         else:
00277             attrs.ValueRank = ua.ValueRank.Scalar
00278     #attrs.ArrayDimensions = None
00279     attrs.WriteMask = 0
00280     attrs.UserWriteMask = 0
00281     attrs.Historizing = 0
00282     attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
00283     attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
00284     addnode.NodeAttributes = attrs
00285     results = server.add_nodes([addnode])
00286     results[0].StatusCode.check()
00287     return results[0].AddedNodeId
00288 
00289 
00290 def create_data_type(parent, nodeid, bname, description=None):
00291     """
00292     Create a new data type to be used in new variables, etc ..
00293     arguments are nodeid, browsename
00294     or namespace index, name
00295     """
00296     nodeid, qname = _parse_nodeid_qname(nodeid, bname)
00297 
00298     addnode = ua.AddNodesItem()
00299     addnode.RequestedNewNodeId = nodeid
00300     addnode.BrowseName = qname
00301     addnode.NodeClass = ua.NodeClass.DataType
00302     addnode.ParentNodeId = parent.nodeid
00303     addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
00304     #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
00305     attrs = ua.DataTypeAttributes()
00306     if description is None:
00307         attrs.Description = ua.LocalizedText(qname.Name)
00308     else:
00309         attrs.Description = ua.LocalizedText(description)
00310     attrs.DisplayName = ua.LocalizedText(qname.Name)
00311     attrs.WriteMask = 0
00312     attrs.UserWriteMask = 0
00313     attrs.IsAbstract = False  # True mean they cannot be instanciated
00314     addnode.NodeAttributes = attrs
00315     results = parent.server.add_nodes([addnode])
00316     results[0].StatusCode.check()
00317     return node.Node(parent.server, results[0].AddedNodeId)
00318 
00319 
00320 def _create_method(parent, nodeid, qname, callback, inputs, outputs):
00321     addnode = ua.AddNodesItem()
00322     addnode.RequestedNewNodeId = nodeid
00323     addnode.BrowseName = qname
00324     addnode.NodeClass = ua.NodeClass.Method
00325     addnode.ParentNodeId = parent.nodeid
00326     addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
00327     #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
00328     attrs = ua.MethodAttributes()
00329     attrs.Description = ua.LocalizedText(qname.Name)
00330     attrs.DisplayName = ua.LocalizedText(qname.Name)
00331     attrs.WriteMask = 0
00332     attrs.UserWriteMask = 0
00333     attrs.Executable = True
00334     attrs.UserExecutable = True
00335     addnode.NodeAttributes = attrs
00336     results = parent.server.add_nodes([addnode])
00337     results[0].StatusCode.check()
00338     method = node.Node(parent.server, results[0].AddedNodeId)
00339     if inputs:
00340         create_property(method,
00341                         ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
00342                         ua.QualifiedName("InputArguments", 0),
00343                         [_vtype_to_argument(vtype) for vtype in inputs],
00344                         varianttype=ua.VariantType.ExtensionObject,
00345                         datatype=ua.ObjectIds.Argument)
00346     if outputs:
00347         create_property(method,
00348                         ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
00349                         ua.QualifiedName("OutputArguments", 0),
00350                         [_vtype_to_argument(vtype) for vtype in outputs],
00351                         varianttype=ua.VariantType.ExtensionObject,
00352                         datatype=ua.ObjectIds.Argument)
00353     parent.server.add_method_callback(method.nodeid, callback)
00354     return results[0].AddedNodeId
00355 
00356 
00357 def _vtype_to_argument(vtype):
00358     if isinstance(vtype, ua.Argument):
00359         return vtype
00360     arg = ua.Argument()
00361     if isinstance(vtype, ua.VariantType):
00362         arg.DataType = ua.NodeId(vtype.value)
00363     else:
00364         arg.DataType = ua.NodeId(vtype)
00365     return arg
00366 
00367 
00368 def _guess_datatype(variant):
00369     if variant.VariantType == ua.VariantType.ExtensionObject:
00370         if variant.Value is None:
00371             raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
00372         if type(variant.Value) in (list, tuple):
00373             if len(variant.Value) == 0:
00374                 raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
00375             extobj = variant.Value[0]
00376         else:
00377             extobj = variant.Value
00378         classname = extobj.__class__.__name__
00379         return ua.NodeId(getattr(ua.ObjectIds, classname))
00380     else:
00381         return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
00382 
00383 
00384 def delete_nodes(server, nodes, recursive=False):
00385     """
00386     Delete specified nodes. Optionally delete recursively all nodes with a
00387     downward hierachic references to the node
00388     """
00389     nodestodelete = []
00390     if recursive:
00391         nodes += _add_childs(nodes)
00392     for mynode in nodes:
00393         it = ua.DeleteNodesItem()
00394         it.NodeId = mynode.nodeid
00395         it.DeleteTargetReferences = True
00396         nodestodelete.append(it)
00397     params = ua.DeleteNodesParameters()
00398     params.NodesToDelete = nodestodelete
00399     return server.delete_nodes(params)
00400 
00401 
00402 def _add_childs(nodes):
00403     results = []
00404     for mynode in nodes[:]:
00405         results += mynode.get_children()
00406     return results
00407 
00408 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23