generate_address_space.py
Go to the documentation of this file.
00001 """
00002 Generate address space c++ code from xml file specification
00003 xmlparser.py is a requirement. it is in opcua folder but to avoid importing all code, developer can link xmlparser.py in current directory
00004 """
00005 import sys
00006 import logging
00007 # sys.path.insert(0, "..")  # load local freeopcua implementation
00008 #from opcua import xmlparser
00009 import xmlparser
00010 
00011 
00012 def _to_val(objs, attr, val):
00013     from opcua import ua
00014     print("got ", objs, attr, val)
00015     cls = getattr(ua, objs[0])
00016     for o in objs[1:]:
00017         cls = getattr(ua, cls.ua_types[o])
00018     if cls == ua.NodeId:
00019         return "ua.NodeId.from_string('val')"
00020     return ua_type_to_python(val, cls.ua_types[attr])
00021 
00022 
00023 def ua_type_to_python(val, uatype):
00024     if uatype in ("String"):
00025         return "'{0}'".format(val)
00026     elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
00027         return "b'{0}'".format(val)
00028     else:
00029         return val
00030 
00031 
00032 class CodeGenerator(object):
00033 
00034     def __init__(self, input_path, output_path):
00035         self.input_path = input_path
00036         self.output_path = output_path
00037         self.output_file = None
00038         self.part = self.input_path.split(".")[-2]
00039         self.parser = None
00040 
00041     def run(self):
00042         sys.stderr.write("Generating Python code {0} for XML file {1}".format(self.output_path, self.input_path) + "\n")
00043         self.output_file = open(self.output_path, "w")
00044         self.make_header()
00045         self.parser = xmlparser.XMLParser(self.input_path, None)
00046         for node in self.parser:
00047             if node.nodetype == 'UAObject':
00048                 self.make_object_code(node)
00049             elif node.nodetype == 'UAObjectType':
00050                 self.make_object_type_code(node)
00051             elif node.nodetype == 'UAVariable':
00052                 self.make_variable_code(node)
00053             elif node.nodetype == 'UAVariableType':
00054                 self.make_variable_type_code(node)
00055             elif node.nodetype == 'UAReferenceType':
00056                 self.make_reference_code(node)
00057             elif node.nodetype == 'UADataType':
00058                 self.make_datatype_code(node)
00059             elif node.nodetype == 'UAMethod':
00060                 self.make_method_code(node)
00061             else:
00062                 sys.stderr.write("Not implemented node type: " + node.nodetype + "\n")
00063         self.output_file.close()
00064 
00065     def writecode(self, *args):
00066         self.output_file.write(" ".join(args) + "\n")
00067 
00068     def make_header(self, ):
00069         self.writecode('''
00070 """
00071 DO NOT EDIT THIS FILE!
00072 It is automatically generated from opcfoundation.org schemas.
00073 """
00074 
00075 from opcua import ua
00076 
00077 
00078 def create_standard_address_space_{0!s}(server):
00079   '''.format((self.part)))
00080 
00081     def make_node_code(self, obj, indent):
00082         self.writecode(indent, 'node = ua.AddNodesItem()')
00083         self.writecode(indent, 'node.RequestedNewNodeId = ua.NodeId.from_string("{0}")'.format(obj.nodeid))
00084         self.writecode(indent, 'node.BrowseName = ua.QualifiedName.from_string("{0}")'.format(obj.browsename))
00085         self.writecode(indent, 'node.NodeClass = ua.NodeClass.{0}'.format(obj.nodetype[2:]))
00086         if obj.parent:
00087             self.writecode(indent, 'node.ParentNodeId = ua.NodeId.from_string("{0}")'.format(obj.parent))
00088         if obj.parent:
00089             self.writecode(indent, 'node.ReferenceTypeId = {0}'.format(self.to_ref_type(obj.parentlink)))
00090         if obj.typedef:
00091             self.writecode(indent, 'node.TypeDefinition = ua.NodeId.from_string("{0}")'.format(obj.typedef))
00092 
00093     def to_data_type(self, nodeid):
00094         if not nodeid:
00095             return "ua.NodeId(ua.ObjectIds.String)"
00096         if "=" in nodeid:
00097             return 'ua.NodeId.from_string("{0}")'.format(nodeid)
00098         else:
00099             return 'ua.NodeId(ua.ObjectIds.{0})'.format(nodeid)
00100 
00101     def to_ref_type(self, nodeid):
00102         if not "=" in nodeid:
00103             nodeid = self.parser.aliases[nodeid]
00104         return 'ua.NodeId.from_string("{0}")'.format(nodeid)
00105 
00106     def make_object_code(self, obj):
00107         indent = "   "
00108         self.writecode(indent)
00109         self.make_node_code(obj, indent)
00110         self.writecode(indent, 'attrs = ua.ObjectAttributes()')
00111         if obj.desc:
00112             self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00113         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00114         self.writecode(indent, 'attrs.EventNotifier = {0}'.format(obj.eventnotifier))
00115         self.writecode(indent, 'node.NodeAttributes = attrs')
00116         self.writecode(indent, 'server.add_nodes([node])')
00117         self.make_refs_code(obj, indent)
00118 
00119     def make_object_type_code(self, obj):
00120         indent = "   "
00121         self.writecode(indent)
00122         self.make_node_code(obj, indent)
00123         self.writecode(indent, 'attrs = ua.ObjectTypeAttributes()')
00124         if obj.desc:
00125             self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00126         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00127         self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00128         self.writecode(indent, 'node.NodeAttributes = attrs')
00129         self.writecode(indent, 'server.add_nodes([node])')
00130         self.make_refs_code(obj, indent)
00131 
00132     def make_common_variable_code(self, indent, obj):
00133         if obj.desc:
00134             self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00135         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00136         self.writecode(indent, 'attrs.DataType = {0}'.format(self.to_data_type(obj.datatype)))
00137         if obj.value is not None:
00138             if obj.valuetype == "ListOfExtensionObject":
00139                 self.writecode(indent, 'value = []')
00140                 for ext in obj.value:
00141                     self.make_ext_obj_code(indent, ext)
00142                     self.writecode(indent, 'value.append(extobj)')
00143                 self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
00144             elif obj.valuetype == "ExtensionObject":
00145                 self.make_ext_obj_code(indent, obj.value)
00146                 self.writecode(indent, 'value = extobj')
00147                 self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
00148             else:
00149                 if obj.valuetype.startswith("ListOf"):
00150                     obj.valuetype = obj.valuetype[6:]
00151                 self.writecode(indent, 'attrs.Value = ua.Variant({0}, ua.VariantType.{1})'.format(self.to_value(obj.value), obj.valuetype))
00152         if obj.rank:
00153             self.writecode(indent, 'attrs.ValueRank = {0}'.format(obj.rank))
00154         if obj.accesslevel:
00155             self.writecode(indent, 'attrs.AccessLevel = {0}'.format(obj.accesslevel))
00156         if obj.useraccesslevel:
00157             self.writecode(indent, 'attrs.UserAccessLevel = {0}'.format(obj.useraccesslevel))
00158         if obj.dimensions:
00159             self.writecode(indent, 'attrs.ArrayDimensions = {0}'.format(obj.dimensions))
00160     
00161     def make_ext_obj_code(self, indent, extobj):
00162         print("makeing code for ", extobj.objname)
00163         self.writecode(indent, 'extobj = ua.{0}()'.format(extobj.objname))
00164         for name, val in extobj.body.items():
00165             for k, v in val.items():
00166                 if type(v) is str:
00167                     val = _to_val([extobj.objname], k, v)
00168                     self.writecode(indent, 'extobj.{0} = {1}'.format(k, val))
00169                 else:
00170                     if k == "DataType":  #hack for strange nodeid xml format
00171                         self.writecode(indent, 'extobj.{0} = ua.NodeId.from_string("{1}")'.format(k, v["Identifier"]))
00172                         continue
00173                     for k2, v2 in v.items():
00174                         val2 = _to_val([extobj.objname, k], k2, v2)
00175                         self.writecode(indent, 'extobj.{0}.{1} = {2}'.format(k, k2, val2))
00176 
00177     def make_variable_code(self, obj):
00178         indent = "   "
00179         self.writecode(indent)
00180         self.make_node_code(obj, indent)
00181         self.writecode(indent, 'attrs = ua.VariableAttributes()')
00182         if obj.minsample:
00183             self.writecode(indent, 'attrs.MinimumSamplingInterval = {0}'.format(obj.minsample))
00184         self.make_common_variable_code(indent, obj)
00185         self.writecode(indent, 'node.NodeAttributes = attrs')
00186         self.writecode(indent, 'server.add_nodes([node])')
00187         self.make_refs_code(obj, indent)
00188 
00189     def make_variable_type_code(self, obj):
00190         indent = "   "
00191         self.writecode(indent)
00192         self.make_node_code(obj, indent)
00193         self.writecode(indent, 'attrs = ua.VariableTypeAttributes()')
00194         if obj.desc:
00195             self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00196         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00197         if obj.abstract:
00198             self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00199         self.make_common_variable_code(indent, obj)
00200         self.writecode(indent, 'node.NodeAttributes = attrs')
00201         self.writecode(indent, 'server.add_nodes([node])')
00202         self.make_refs_code(obj, indent)
00203 
00204     def to_value(self, val):
00205         # if type(val) in (str, unicode):
00206         if isinstance(val, str):
00207             return '"' + val + '"'
00208         else:
00209             return val
00210 
00211     def make_method_code(self, obj):
00212         indent = "   "
00213         self.writecode(indent)
00214         self.make_node_code(obj, indent)
00215         self.writecode(indent, 'attrs = ua.MethodAttributes()')
00216         if obj.desc:
00217             self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00218         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00219         self.writecode(indent, 'node.NodeAttributes = attrs')
00220         self.writecode(indent, 'server.add_nodes([node])')
00221         self.make_refs_code(obj, indent)
00222 
00223     def make_reference_code(self, obj):
00224         indent = "   "
00225         self.writecode(indent)
00226         self.make_node_code(obj, indent)
00227         self.writecode(indent, 'attrs = ua.ReferenceTypeAttributes()')
00228         if obj.desc:
00229             self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00230         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00231         if obj. inversename:
00232             self.writecode(indent, 'attrs.InverseName = ua.LocalizedText("{0}")'.format(obj.inversename))
00233         if obj.abstract:
00234             self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00235         if obj.symmetric:
00236             self.writecode(indent, 'attrs.Symmetric = {0}'.format(obj.symmetric))
00237         self.writecode(indent, 'node.NodeAttributes = attrs')
00238         self.writecode(indent, 'server.add_nodes([node])')
00239         self.make_refs_code(obj, indent)
00240 
00241     def make_datatype_code(self, obj):
00242         indent = "   "
00243         self.writecode(indent)
00244         self.make_node_code(obj, indent)
00245         self.writecode(indent, 'attrs = ua.DataTypeAttributes()')
00246         if obj.desc:
00247             self.writecode(indent, u'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc.encode('ascii', 'replace')))
00248         self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00249         if obj.abstract:
00250             self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00251         self.writecode(indent, 'node.NodeAttributes = attrs')
00252         self.writecode(indent, 'server.add_nodes([node])')
00253         self.make_refs_code(obj, indent)
00254 
00255     def make_refs_code(self, obj, indent):
00256         if not obj.refs:
00257             return
00258         self.writecode(indent, "refs = []")
00259         for ref in obj.refs:
00260             self.writecode(indent, 'ref = ua.AddReferencesItem()')
00261             self.writecode(indent, 'ref.IsForward = True')
00262             self.writecode(indent, 'ref.ReferenceTypeId = {0}'.format(self.to_ref_type(ref.reftype)))
00263             self.writecode(indent, 'ref.SourceNodeId = ua.NodeId.from_string("{0}")'.format(obj.nodeid))
00264             self.writecode(indent, 'ref.TargetNodeClass = ua.NodeClass.DataType')
00265             self.writecode(indent, 'ref.TargetNodeId = ua.NodeId.from_string("{0}")'.format(ref.target))
00266             self.writecode(indent, "refs.append(ref)")
00267         self.writecode(indent, 'server.add_references(refs)')
00268 
00269 
00270 def save_aspace_to_disk():
00271     import os.path
00272     path = os.path.join("..", "opcua", "binary_address_space.pickle")
00273     print("Savind standard address space to:", path)
00274     sys.path.append("..")
00275     from opcua.server.standard_address_space import standard_address_space
00276     from opcua.server.address_space import NodeManagementService, AddressSpace
00277     aspace = AddressSpace()
00278     standard_address_space.fill_address_space(NodeManagementService(aspace))
00279     aspace.dump(path)
00280 
00281 if __name__ == "__main__":
00282     logging.basicConfig(level=logging.WARN)
00283     for i in (3, 4, 5, 8, 9, 10, 11, 13):
00284         xmlpath = "Opc.Ua.NodeSet2.Part{0}.xml".format(str(i))
00285         cpppath = "../opcua/server/standard_address_space/standard_address_space_part{0}.py".format(str(i))
00286         c = CodeGenerator(xmlpath, cpppath)
00287         c.run()
00288 
00289     save_aspace_to_disk()


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23