00001
00002 import generate_model as gm
00003
00004 IgnoredEnums = ["NodeIdType"]
00005 IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "LocalizedText", "ExtensionObject", "XmlElement"]
00006
00007 class Primitives1(object):
00008 Int8 = 0
00009 SByte = 0
00010 Int16 = 0
00011 Int32 = 0
00012 Int64 = 0
00013 UInt8 = 0
00014 Char = 0
00015 Byte = 0
00016 UInt16 = 0
00017 UInt32 = 0
00018 UInt64 = 0
00019 Boolean = 0
00020 Double = 0
00021 Float = 0
00022
00023
00024 class Primitives(Primitives1):
00025 Null = 0
00026 String = 0
00027 Bytes = 0
00028 ByteString = 0
00029 CharArray = 0
00030 DateTime = 0
00031
00032
00033
00034 class CodeGenerator(object):
00035
00036 def __init__(self, model, output):
00037 self.model = model
00038 self.output_path = output
00039 self.indent = " "
00040 self.iidx = 0
00041
00042 def run(self):
00043 print("Writting python protocol code to ", self.output_path)
00044 self.output_file = open(self.output_path, "w")
00045 self.make_header()
00046 for enum in self.model.enums:
00047 if enum.name not in IgnoredEnums:
00048 self.generate_enum_code(enum)
00049 for struct in self.model.structs:
00050 if struct.name in IgnoredStructs:
00051 continue
00052 if struct.name.endswith("Node") or struct.name.endswith("NodeId"):
00053 continue
00054 self.generate_struct_code(struct)
00055
00056 self.iidx = 0
00057 self.write("")
00058 self.write("")
00059 self.write("ExtensionClasses = {")
00060 for struct in self.model.structs:
00061 if struct.name in IgnoredStructs:
00062 continue
00063 if struct.name.endswith("Node") or struct.name.endswith("NodeId"):
00064 continue
00065 if "ExtensionObject" in struct.parents:
00066 self.write(" ObjectIds.{0}_Encoding_DefaultBinary: {0},".format(struct.name))
00067 self.write("}")
00068 self.write("")
00069 with open('uaprotocol_auto_add.py') as f:
00070 for line in f:
00071 self.write(line.rstrip())
00072
00073 def write(self, line):
00074 if line:
00075 line = self.indent * self.iidx + line
00076 self.output_file.write(line + "\n")
00077
00078 def make_header(self):
00079 self.write("'''")
00080 self.write("Autogenerate code from xml spec")
00081 self.write("'''")
00082 self.write("")
00083 self.write("from datetime import datetime")
00084 self.write("from enum import Enum, IntEnum")
00085 self.write("")
00086 self.write("from opcua.common.utils import Buffer")
00087 self.write("from opcua.ua.uaerrors import UaError")
00088 self.write("from opcua.ua.uatypes import *")
00089 self.write("from opcua.ua import ua_binary as uabin")
00090 self.write("from opcua.ua.object_ids import ObjectIds")
00091
00092 def generate_enum_code(self, enum):
00093 self.write("")
00094 self.write("")
00095 self.write("class {}(IntEnum):".format(enum.name))
00096 self.iidx = 1
00097 self.write("'''")
00098 if enum.doc:
00099 self.write(enum.doc)
00100 self.write("")
00101 for val in enum.values:
00102 self.write(":ivar {}:".format(val.name))
00103 self.write(":vartype {}: {}".format(val.name, val.value))
00104 self.write("'''")
00105 for val in enum.values:
00106 self.write("{} = {}".format(val.name, val.value))
00107 self.iidx = 0
00108
00109 def generate_struct_code(self, obj):
00110 self.write("")
00111 self.write("")
00112 self.iidx = 0
00113 self.write("class {}(FrozenClass):".format(obj.name))
00114 self.iidx += 1
00115 self.write("'''")
00116 if obj.doc:
00117 self.write(obj.doc)
00118 self.write("")
00119 for field in obj.fields:
00120 self.write(":ivar {}:".format(field.name))
00121 self.write(":vartype {}: {}".format(field.name, field.uatype))
00122 self.write("'''")
00123
00124 self.write("")
00125 self.write("ua_types = {")
00126 for field in obj.fields:
00127 self.write(" '{}': '{}',".format(field.name, field.uatype))
00128 self.write(" }")
00129 self.write("")
00130
00131 self.write("def __init__(self, binary=None):")
00132 self.iidx += 1
00133 self.write("if binary is not None:")
00134 self.iidx += 1
00135 self.write("self._binary_init(binary)")
00136 self.write("self._freeze = True")
00137 self.write("return")
00138 self.iidx -= 1
00139
00140
00141 extobj_hack = False
00142 if "BodyLength" in [f.name for f in obj.fields]:
00143 extobj_hack = True
00144
00145 for field in obj.fields:
00146 if extobj_hack and field.name == "Encoding":
00147 self.write("self.Encoding = 1")
00148 elif field.uatype == obj.name:
00149 self.write("self.{} = None".format(field.name))
00150 elif not obj.name in ("ExtensionObject") and field.name == "TypeId":
00151 self.write("self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
00152 else:
00153 self.write("self.{} = {}".format(field.name, "[]" if field.length else self.get_default_value(field)))
00154 self.write("self._freeze = True")
00155 self.iidx = 1
00156
00157
00158 self.write("")
00159 self.write("def to_binary(self):")
00160 self.iidx += 1
00161
00162
00163
00164
00165
00166
00167 self.write("packet = []")
00168 if extobj_hack:
00169 self.write("body = []")
00170
00171 for field in obj.fields:
00172 if field.switchfield:
00173 if field.switchvalue:
00174 bit = obj.bits[field.switchfield]
00175
00176 mask = '0b' + '0' * (8 - bit.length) + '1' * bit.length
00177 self.write("others = self.{} & {}".format(bit.container, mask))
00178 self.write("if self.{}: self.{} = ( {} | others )".format(field.name, bit.container, field.switchvalue))
00179 else:
00180 bit = obj.bits[field.switchfield]
00181 self.write("if self.{}: self.{} |= (1 << {})".format(field.name, bit.container, bit.idx))
00182 iidx = self.iidx
00183 listname = "packet"
00184 for idx, field in enumerate(obj.fields):
00185
00186 if field.name == "BodyLength":
00187 listname = "body"
00188
00189 continue
00190 self.iidx = iidx
00191 switch = ""
00192 fname = "self." + field.name
00193 if field.switchfield:
00194 self.write("if self.{}: ".format(field.name))
00195 self.iidx += 1
00196 if field.length:
00197 self.write("{}.append(uabin.Primitives.Int32.pack(len(self.{})))".format(listname, field.name))
00198 self.write("for fieldname in self.{}:".format(field.name))
00199 fname = "fieldname"
00200 self.iidx += 1
00201 if field.is_native_type():
00202 self.write_pack_uatype(listname, fname, field.uatype)
00203 elif field.uatype in self.model.enum_list:
00204 enum = self.model.get_enum(field.uatype)
00205 self.write_pack_enum(listname, fname, enum)
00206 elif field.uatype in ("ExtensionObject"):
00207 self.write("{}.append(extensionobject_to_binary({}))".format(listname, fname))
00208 else:
00209 self.write("{}.append({}.to_binary())".format(listname, fname))
00210 if field.length:
00211 self.iidx -= 1
00212 self.iidx = 2
00213 if extobj_hack:
00214 self.write("body = b''.join(body)")
00215 self.write("packet.append(struct.pack('<i', len(body)))")
00216 self.write("packet.append(body)")
00217 self.write("return b''.join(packet)")
00218 self.write("")
00219
00220 self.iidx = 1
00221
00222 self.write("@staticmethod")
00223 self.write("def from_binary(data):")
00224 self.iidx += 1
00225 self.write("return {}(data)".format(obj.name))
00226 self.iidx -= 1
00227 self.write("")
00228
00229 self.write("def _binary_init(self, data):")
00230 self.iidx += 1
00231 iidx = self.iidx
00232 for idx, field in enumerate(obj.fields):
00233 self.iidx = iidx
00234
00235
00236
00237 if field.switchfield:
00238 bit = obj.bits[field.switchfield]
00239 if field.switchvalue:
00240 mask = '0b' + '0' * (8 - bit.length) + '1' * bit.length
00241 self.write("val = self.{} & {}".format(bit.container, mask))
00242 self.write("if val == {}:".format(bit.idx))
00243 else:
00244 self.write("if self.{} & (1 << {}):".format(bit.container, bit.idx))
00245 self.iidx += 1
00246 if field.is_native_type():
00247 if field.length:
00248 self.write("self.{} = uabin.Primitives.{}.unpack_array(data)".format(field.name, field.uatype))
00249 else:
00250 self.write_unpack_uatype(field.name, field.uatype)
00251 elif field.uatype in self.model.enum_list:
00252
00253
00254 enum = self.model.get_enum(field.uatype)
00255 self.write_unpack_enum(field.name, enum)
00256 else:
00257 if field.uatype in ("ExtensionObject"):
00258 frombinary = "extensionobject_from_binary(data)"
00259 else:
00260 frombinary = "{}.from_binary(data)".format(field.uatype)
00261 if field.length:
00262 self.write("length = uabin.Primitives.Int32.unpack(data)")
00263 self.write("array = []")
00264 self.write("if length != -1:")
00265 self.iidx += 1
00266 self.write("for _ in range(0, length):")
00267 self.iidx += 1
00268 self.write("array.append({})".format(frombinary))
00269 self.iidx -= 2
00270 self.write("self.{} = array".format(field.name))
00271 else:
00272 self.write("self.{} = {}".format(field.name, frombinary))
00273 if field.switchfield:
00274 self.iidx -= 1
00275 self.write("else:")
00276 self.iidx += 1
00277 if extobj_hack and field.name == "Encoding":
00278 self.write("self.Encoding = 1")
00279 elif field.uatype == obj.name:
00280 self.write("self.{} = None".format(field.name))
00281 elif not obj.name in ("ExtensionObject") and field.name == "TypeId":
00282 self.write("self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
00283 else:
00284 self.write("self.{} = {}".format(field.name, "[]" if field.length else self.get_default_value(field)))
00285 if len(obj.fields) == 0:
00286 self.write("pass")
00287
00288 self.iidx = 2
00289
00290
00291 self.iidx = 1
00292 self.write("")
00293 self.write("def __str__(self):")
00294 self.iidx += 1
00295 tmp = ["'{name}:' + str(self.{name})".format(name=f.name) for f in obj.fields]
00296 tmp = " + ', ' + \\\n ".join(tmp)
00297 self.write("return '{}(' + {} + ')'".format(obj.name, tmp))
00298 self.iidx -= 1
00299 self.write("")
00300 self.write("__repr__ = __str__")
00301
00302 self.iix = 0
00303
00304 def write_unpack_enum(self, name, enum):
00305 self.write("self.{} = {}(uabin.Primitives.{}.unpack(data))".format(name, enum.name, enum.uatype))
00306
00307 def get_size_from_uatype(self, uatype):
00308 if uatype in ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean"):
00309 return 1
00310 elif uatype in ("Int16", "UInt16"):
00311 return 2
00312 elif uatype in ("Int32", "UInt32", "Float"):
00313 return 4
00314 elif uatype in ("Int64", "UInt64", "Double"):
00315 return 8
00316 else:
00317 raise Exception("Cannot get size from type {}".format(uatype))
00318
00319 def write_unpack_uatype(self, name, uatype):
00320 if hasattr(Primitives, uatype):
00321 self.write("self.{} = uabin.Primitives.{}.unpack(data)".format(name, uatype))
00322 else:
00323 self.write("self.{} = {}.from_binary(data))".format(name, uatype))
00324
00325 def write_pack_enum(self, listname, name, enum):
00326 self.write("{}.append(uabin.Primitives.{}.pack({}.value))".format(listname, enum.uatype, name))
00327
00328 def write_pack_uatype(self, listname, name, uatype):
00329 if hasattr(Primitives, uatype):
00330 self.write("{}.append(uabin.Primitives.{}.pack({}))".format(listname, uatype, name))
00331 else:
00332 self.write("{}.append({}.to_binary(}))".format(listname, name))
00333 return
00334
00335 def get_default_value(self, field):
00336 if field.uatype in self.model.enum_list:
00337 enum = self.model.get_enum(field.uatype)
00338 return enum.name + "(0)"
00339 if field.uatype in ("String"):
00340 return None
00341 elif field.uatype in ("ByteString", "CharArray", "Char"):
00342 return None
00343 elif field.uatype in ("Boolean"):
00344 return "True"
00345 elif field.uatype in ("DateTime"):
00346 return "datetime.now()"
00347 elif field.uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte"):
00348 return 0
00349 elif field.uatype in ("ExtensionObject"):
00350 return "None"
00351 else:
00352 return field.uatype + "()"
00353
00354
00355 def fix_names(model):
00356 for s in model.enums:
00357 for f in s.values:
00358 if f.name == "None":
00359 f.name = "None_"
00360
00361
00362 if __name__ == "__main__":
00363 xmlpath = "Opc.Ua.Types.bsd"
00364 protocolpath = "../opcua/ua/uaprotocol_auto.py"
00365 p = gm.Parser(xmlpath)
00366 model = p.parse()
00367 gm.add_basetype_members(model)
00368 gm.add_encoding_field(model)
00369 gm.remove_duplicates(model)
00370 gm.remove_vector_length(model)
00371 gm.split_requests(model)
00372 fix_names(model)
00373 c = CodeGenerator(model, protocolpath)
00374 c.run()