00001 """
00002 Generate address space c++ code from xml file specification
00003 xmlparser.py is a requirement. it is in opcua folder but to avoid importing all code, developer can link xmlparser.py in current directory
00004 """
00005 import sys
00006 import logging
00007
00008
00009 import xmlparser
00010
00011
00012 def _to_val(objs, attr, val):
00013 from opcua import ua
00014 print("got ", objs, attr, val)
00015 cls = getattr(ua, objs[0])
00016 for o in objs[1:]:
00017 cls = getattr(ua, cls.ua_types[o])
00018 if cls == ua.NodeId:
00019 return "ua.NodeId.from_string('val')"
00020 return ua_type_to_python(val, cls.ua_types[attr])
00021
00022
00023 def ua_type_to_python(val, uatype):
00024 if uatype in ("String"):
00025 return "'{0}'".format(val)
00026 elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
00027 return "b'{0}'".format(val)
00028 else:
00029 return val
00030
00031
00032 class CodeGenerator(object):
00033
00034 def __init__(self, input_path, output_path):
00035 self.input_path = input_path
00036 self.output_path = output_path
00037 self.output_file = None
00038 self.part = self.input_path.split(".")[-2]
00039 self.parser = None
00040
00041 def run(self):
00042 sys.stderr.write("Generating Python code {0} for XML file {1}".format(self.output_path, self.input_path) + "\n")
00043 self.output_file = open(self.output_path, "w")
00044 self.make_header()
00045 self.parser = xmlparser.XMLParser(self.input_path, None)
00046 for node in self.parser:
00047 if node.nodetype == 'UAObject':
00048 self.make_object_code(node)
00049 elif node.nodetype == 'UAObjectType':
00050 self.make_object_type_code(node)
00051 elif node.nodetype == 'UAVariable':
00052 self.make_variable_code(node)
00053 elif node.nodetype == 'UAVariableType':
00054 self.make_variable_type_code(node)
00055 elif node.nodetype == 'UAReferenceType':
00056 self.make_reference_code(node)
00057 elif node.nodetype == 'UADataType':
00058 self.make_datatype_code(node)
00059 elif node.nodetype == 'UAMethod':
00060 self.make_method_code(node)
00061 else:
00062 sys.stderr.write("Not implemented node type: " + node.nodetype + "\n")
00063 self.output_file.close()
00064
00065 def writecode(self, *args):
00066 self.output_file.write(" ".join(args) + "\n")
00067
00068 def make_header(self, ):
00069 self.writecode('''
00070 """
00071 DO NOT EDIT THIS FILE!
00072 It is automatically generated from opcfoundation.org schemas.
00073 """
00074
00075 from opcua import ua
00076
00077
00078 def create_standard_address_space_{0!s}(server):
00079 '''.format((self.part)))
00080
00081 def make_node_code(self, obj, indent):
00082 self.writecode(indent, 'node = ua.AddNodesItem()')
00083 self.writecode(indent, 'node.RequestedNewNodeId = ua.NodeId.from_string("{0}")'.format(obj.nodeid))
00084 self.writecode(indent, 'node.BrowseName = ua.QualifiedName.from_string("{0}")'.format(obj.browsename))
00085 self.writecode(indent, 'node.NodeClass = ua.NodeClass.{0}'.format(obj.nodetype[2:]))
00086 if obj.parent:
00087 self.writecode(indent, 'node.ParentNodeId = ua.NodeId.from_string("{0}")'.format(obj.parent))
00088 if obj.parent:
00089 self.writecode(indent, 'node.ReferenceTypeId = {0}'.format(self.to_ref_type(obj.parentlink)))
00090 if obj.typedef:
00091 self.writecode(indent, 'node.TypeDefinition = ua.NodeId.from_string("{0}")'.format(obj.typedef))
00092
00093 def to_data_type(self, nodeid):
00094 if not nodeid:
00095 return "ua.NodeId(ua.ObjectIds.String)"
00096 if "=" in nodeid:
00097 return 'ua.NodeId.from_string("{0}")'.format(nodeid)
00098 else:
00099 return 'ua.NodeId(ua.ObjectIds.{0})'.format(nodeid)
00100
00101 def to_ref_type(self, nodeid):
00102 if not "=" in nodeid:
00103 nodeid = self.parser.aliases[nodeid]
00104 return 'ua.NodeId.from_string("{0}")'.format(nodeid)
00105
00106 def make_object_code(self, obj):
00107 indent = " "
00108 self.writecode(indent)
00109 self.make_node_code(obj, indent)
00110 self.writecode(indent, 'attrs = ua.ObjectAttributes()')
00111 if obj.desc:
00112 self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00113 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00114 self.writecode(indent, 'attrs.EventNotifier = {0}'.format(obj.eventnotifier))
00115 self.writecode(indent, 'node.NodeAttributes = attrs')
00116 self.writecode(indent, 'server.add_nodes([node])')
00117 self.make_refs_code(obj, indent)
00118
00119 def make_object_type_code(self, obj):
00120 indent = " "
00121 self.writecode(indent)
00122 self.make_node_code(obj, indent)
00123 self.writecode(indent, 'attrs = ua.ObjectTypeAttributes()')
00124 if obj.desc:
00125 self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00126 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00127 self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00128 self.writecode(indent, 'node.NodeAttributes = attrs')
00129 self.writecode(indent, 'server.add_nodes([node])')
00130 self.make_refs_code(obj, indent)
00131
00132 def make_common_variable_code(self, indent, obj):
00133 if obj.desc:
00134 self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00135 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00136 self.writecode(indent, 'attrs.DataType = {0}'.format(self.to_data_type(obj.datatype)))
00137 if obj.value is not None:
00138 if obj.valuetype == "ListOfExtensionObject":
00139 self.writecode(indent, 'value = []')
00140 for ext in obj.value:
00141 self.make_ext_obj_code(indent, ext)
00142 self.writecode(indent, 'value.append(extobj)')
00143 self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
00144 elif obj.valuetype == "ExtensionObject":
00145 self.make_ext_obj_code(indent, obj.value)
00146 self.writecode(indent, 'value = extobj')
00147 self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
00148 else:
00149 if obj.valuetype.startswith("ListOf"):
00150 obj.valuetype = obj.valuetype[6:]
00151 self.writecode(indent, 'attrs.Value = ua.Variant({0}, ua.VariantType.{1})'.format(self.to_value(obj.value), obj.valuetype))
00152 if obj.rank:
00153 self.writecode(indent, 'attrs.ValueRank = {0}'.format(obj.rank))
00154 if obj.accesslevel:
00155 self.writecode(indent, 'attrs.AccessLevel = {0}'.format(obj.accesslevel))
00156 if obj.useraccesslevel:
00157 self.writecode(indent, 'attrs.UserAccessLevel = {0}'.format(obj.useraccesslevel))
00158 if obj.dimensions:
00159 self.writecode(indent, 'attrs.ArrayDimensions = {0}'.format(obj.dimensions))
00160
00161 def make_ext_obj_code(self, indent, extobj):
00162 print("makeing code for ", extobj.objname)
00163 self.writecode(indent, 'extobj = ua.{0}()'.format(extobj.objname))
00164 for name, val in extobj.body.items():
00165 for k, v in val.items():
00166 if type(v) is str:
00167 val = _to_val([extobj.objname], k, v)
00168 self.writecode(indent, 'extobj.{0} = {1}'.format(k, val))
00169 else:
00170 if k == "DataType":
00171 self.writecode(indent, 'extobj.{0} = ua.NodeId.from_string("{1}")'.format(k, v["Identifier"]))
00172 continue
00173 for k2, v2 in v.items():
00174 val2 = _to_val([extobj.objname, k], k2, v2)
00175 self.writecode(indent, 'extobj.{0}.{1} = {2}'.format(k, k2, val2))
00176
00177 def make_variable_code(self, obj):
00178 indent = " "
00179 self.writecode(indent)
00180 self.make_node_code(obj, indent)
00181 self.writecode(indent, 'attrs = ua.VariableAttributes()')
00182 if obj.minsample:
00183 self.writecode(indent, 'attrs.MinimumSamplingInterval = {0}'.format(obj.minsample))
00184 self.make_common_variable_code(indent, obj)
00185 self.writecode(indent, 'node.NodeAttributes = attrs')
00186 self.writecode(indent, 'server.add_nodes([node])')
00187 self.make_refs_code(obj, indent)
00188
00189 def make_variable_type_code(self, obj):
00190 indent = " "
00191 self.writecode(indent)
00192 self.make_node_code(obj, indent)
00193 self.writecode(indent, 'attrs = ua.VariableTypeAttributes()')
00194 if obj.desc:
00195 self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00196 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00197 if obj.abstract:
00198 self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00199 self.make_common_variable_code(indent, obj)
00200 self.writecode(indent, 'node.NodeAttributes = attrs')
00201 self.writecode(indent, 'server.add_nodes([node])')
00202 self.make_refs_code(obj, indent)
00203
00204 def to_value(self, val):
00205
00206 if isinstance(val, str):
00207 return '"' + val + '"'
00208 else:
00209 return val
00210
00211 def make_method_code(self, obj):
00212 indent = " "
00213 self.writecode(indent)
00214 self.make_node_code(obj, indent)
00215 self.writecode(indent, 'attrs = ua.MethodAttributes()')
00216 if obj.desc:
00217 self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00218 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00219 self.writecode(indent, 'node.NodeAttributes = attrs')
00220 self.writecode(indent, 'server.add_nodes([node])')
00221 self.make_refs_code(obj, indent)
00222
00223 def make_reference_code(self, obj):
00224 indent = " "
00225 self.writecode(indent)
00226 self.make_node_code(obj, indent)
00227 self.writecode(indent, 'attrs = ua.ReferenceTypeAttributes()')
00228 if obj.desc:
00229 self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
00230 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00231 if obj. inversename:
00232 self.writecode(indent, 'attrs.InverseName = ua.LocalizedText("{0}")'.format(obj.inversename))
00233 if obj.abstract:
00234 self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00235 if obj.symmetric:
00236 self.writecode(indent, 'attrs.Symmetric = {0}'.format(obj.symmetric))
00237 self.writecode(indent, 'node.NodeAttributes = attrs')
00238 self.writecode(indent, 'server.add_nodes([node])')
00239 self.make_refs_code(obj, indent)
00240
00241 def make_datatype_code(self, obj):
00242 indent = " "
00243 self.writecode(indent)
00244 self.make_node_code(obj, indent)
00245 self.writecode(indent, 'attrs = ua.DataTypeAttributes()')
00246 if obj.desc:
00247 self.writecode(indent, u'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc.encode('ascii', 'replace')))
00248 self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
00249 if obj.abstract:
00250 self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
00251 self.writecode(indent, 'node.NodeAttributes = attrs')
00252 self.writecode(indent, 'server.add_nodes([node])')
00253 self.make_refs_code(obj, indent)
00254
00255 def make_refs_code(self, obj, indent):
00256 if not obj.refs:
00257 return
00258 self.writecode(indent, "refs = []")
00259 for ref in obj.refs:
00260 self.writecode(indent, 'ref = ua.AddReferencesItem()')
00261 self.writecode(indent, 'ref.IsForward = True')
00262 self.writecode(indent, 'ref.ReferenceTypeId = {0}'.format(self.to_ref_type(ref.reftype)))
00263 self.writecode(indent, 'ref.SourceNodeId = ua.NodeId.from_string("{0}")'.format(obj.nodeid))
00264 self.writecode(indent, 'ref.TargetNodeClass = ua.NodeClass.DataType')
00265 self.writecode(indent, 'ref.TargetNodeId = ua.NodeId.from_string("{0}")'.format(ref.target))
00266 self.writecode(indent, "refs.append(ref)")
00267 self.writecode(indent, 'server.add_references(refs)')
00268
00269
00270 def save_aspace_to_disk():
00271 import os.path
00272 path = os.path.join("..", "opcua", "binary_address_space.pickle")
00273 print("Savind standard address space to:", path)
00274 sys.path.append("..")
00275 from opcua.server.standard_address_space import standard_address_space
00276 from opcua.server.address_space import NodeManagementService, AddressSpace
00277 aspace = AddressSpace()
00278 standard_address_space.fill_address_space(NodeManagementService(aspace))
00279 aspace.dump(path)
00280
00281 if __name__ == "__main__":
00282 logging.basicConfig(level=logging.WARN)
00283 for i in (3, 4, 5, 8, 9, 10, 11, 13):
00284 xmlpath = "Opc.Ua.NodeSet2.Part{0}.xml".format(str(i))
00285 cpppath = "../opcua/server/standard_address_space/standard_address_space_part{0}.py".format(str(i))
00286 c = CodeGenerator(xmlpath, cpppath)
00287 c.run()
00288
00289 save_aspace_to_disk()