00001 from opcua import ua 00002 from opcua.common.node import Node 00003 00004 00005 def copy_node(parent, node, nodeid=None, recursive=True): 00006 """ 00007 Copy a node or node tree as child of parent node 00008 """ 00009 print("Copying", node, "into ", parent) 00010 rdesc = _rdesc_from_node(parent, node) 00011 00012 if nodeid is None: 00013 nodeid = ua.NodeId(namespaceidx=node.nodeid.NamespaceIndex) 00014 added_nodeids = _copy_node(parent.server, parent.nodeid, rdesc, nodeid, recursive) 00015 return [Node(parent.server, nid) for nid in added_nodeids] 00016 00017 00018 def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive): 00019 addnode = ua.AddNodesItem() 00020 addnode.RequestedNewNodeId = nodeid 00021 addnode.BrowseName = rdesc.BrowseName 00022 addnode.ParentNodeId = parent_nodeid 00023 addnode.ReferenceTypeId = rdesc.ReferenceTypeId 00024 addnode.TypeDefinition = rdesc.TypeDefinition 00025 addnode.NodeClass = rdesc.NodeClass 00026 00027 node_to_copy = Node(server, rdesc.NodeId) 00028 00029 attrObj = getattr(ua, rdesc.NodeClass.name + "Attributes") 00030 _read_and_copy_attrs(node_to_copy, attrObj(), addnode) 00031 00032 res = server.add_nodes([addnode])[0] 00033 00034 added_nodes = [res.AddedNodeId] 00035 00036 if recursive: 00037 descs = node_to_copy.get_children_descriptions() 00038 for desc in descs: 00039 nodes = _copy_node(server, res.AddedNodeId, desc, nodeid=ua.NodeId(namespaceidx=desc.NodeId.NamespaceIndex), recursive=True) 00040 added_nodes.extend(nodes) 00041 00042 return added_nodes 00043 00044 00045 def _rdesc_from_node(parent, node): 00046 results = node.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName]) 00047 nclass, qname, dname = [res.Value.Value for res in results] 00048 00049 rdesc = ua.ReferenceDescription() 00050 rdesc.NodeId = node.nodeid 00051 rdesc.BrowseName = qname 00052 rdesc.DisplayName = dname 00053 rdesc.NodeClass = nclass 00054 if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType): 00055 rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes) 00056 else: 00057 rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent) 00058 typedef = node.get_type_definition() 00059 if typedef: 00060 rdesc.TypeDefinition = typedef 00061 return rdesc 00062 00063 00064 def _read_and_copy_attrs(node_type, struct, addnode): 00065 names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier")] 00066 attrs = [getattr(ua.AttributeIds, name) for name in names] 00067 for name in names: 00068 results = node_type.get_attributes(attrs) 00069 for idx, name in enumerate(names): 00070 if results[idx].StatusCode.is_good(): 00071 if name == "Value": 00072 setattr(struct, name, results[idx].Value) 00073 else: 00074 setattr(struct, name, results[idx].Value.Value) 00075 else: 00076 print("Instantiate: while copying attributes from node type {0!s}, attribute {1!s}, statuscode is {2!s}".format(node_type, name, results[idx].StatusCode)) 00077 addnode.NodeAttributes = struct