generate_protocol_python.py
Go to the documentation of this file.
00001 # temporary hack
00002 import generate_model as gm
00003 
00004 IgnoredEnums = ["NodeIdType"]
00005 IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "LocalizedText", "ExtensionObject", "XmlElement"]
00006 
00007 class Primitives1(object):
00008     Int8 = 0
00009     SByte = 0
00010     Int16 = 0
00011     Int32 = 0
00012     Int64 = 0
00013     UInt8 = 0
00014     Char = 0
00015     Byte = 0
00016     UInt16 = 0
00017     UInt32 = 0
00018     UInt64 = 0
00019     Boolean = 0
00020     Double = 0
00021     Float = 0
00022 
00023 
00024 class Primitives(Primitives1):
00025     Null = 0
00026     String = 0
00027     Bytes = 0
00028     ByteString = 0
00029     CharArray = 0
00030     DateTime = 0
00031 
00032 
00033 
00034 class CodeGenerator(object):
00035 
00036     def __init__(self, model, output):
00037         self.model = model
00038         self.output_path = output
00039         self.indent = "    "
00040         self.iidx = 0  # indent index
00041 
00042     def run(self):
00043         print("Writting python protocol code to ", self.output_path)
00044         self.output_file = open(self.output_path, "w")
00045         self.make_header()
00046         for enum in self.model.enums:
00047             if enum.name not in IgnoredEnums:
00048                 self.generate_enum_code(enum)
00049         for struct in self.model.structs:
00050             if struct.name in IgnoredStructs:
00051                 continue
00052             if struct.name.endswith("Node") or struct.name.endswith("NodeId"):
00053                 continue
00054             self.generate_struct_code(struct)
00055 
00056         self.iidx = 0
00057         self.write("")
00058         self.write("")
00059         self.write("ExtensionClasses = {")
00060         for struct in self.model.structs:
00061             if struct.name in IgnoredStructs:
00062                 continue
00063             if struct.name.endswith("Node") or struct.name.endswith("NodeId"):
00064                 continue
00065             if "ExtensionObject" in struct.parents:
00066                 self.write("    ObjectIds.{0}_Encoding_DefaultBinary: {0},".format(struct.name))
00067         self.write("}")
00068         self.write("")
00069         with open('uaprotocol_auto_add.py') as f:
00070             for line in f:
00071                 self.write(line.rstrip())
00072 
00073     def write(self, line):
00074         if line:
00075             line = self.indent * self.iidx + line
00076         self.output_file.write(line + "\n")
00077 
00078     def make_header(self):
00079         self.write("'''")
00080         self.write("Autogenerate code from xml spec")
00081         self.write("'''")
00082         self.write("")
00083         self.write("from datetime import datetime")
00084         self.write("from enum import Enum, IntEnum")
00085         self.write("")
00086         self.write("from opcua.common.utils import Buffer")
00087         self.write("from opcua.ua.uaerrors import UaError")
00088         self.write("from opcua.ua.uatypes import *")
00089         self.write("from opcua.ua import ua_binary as uabin")
00090         self.write("from opcua.ua.object_ids import ObjectIds")
00091 
00092     def generate_enum_code(self, enum):
00093         self.write("")
00094         self.write("")
00095         self.write("class {}(IntEnum):".format(enum.name))
00096         self.iidx = 1
00097         self.write("'''")
00098         if enum.doc:
00099             self.write(enum.doc)
00100             self.write("")
00101         for val in enum.values:
00102             self.write(":ivar {}:".format(val.name))
00103             self.write(":vartype {}: {}".format(val.name, val.value))
00104         self.write("'''")
00105         for val in enum.values:
00106             self.write("{} = {}".format(val.name, val.value))
00107         self.iidx = 0
00108 
00109     def generate_struct_code(self, obj):
00110         self.write("")
00111         self.write("")
00112         self.iidx = 0
00113         self.write("class {}(FrozenClass):".format(obj.name))
00114         self.iidx += 1
00115         self.write("'''")
00116         if obj.doc:
00117             self.write(obj.doc)
00118             self.write("")
00119         for field in obj.fields:
00120             self.write(":ivar {}:".format(field.name))
00121             self.write(":vartype {}: {}".format(field.name, field.uatype))
00122         self.write("'''")
00123 
00124         self.write("")
00125         self.write("ua_types = {")
00126         for field in obj.fields:
00127             self.write("    '{}': '{}',".format(field.name, field.uatype))
00128         self.write("           }")
00129         self.write("")
00130 
00131         self.write("def __init__(self, binary=None):")
00132         self.iidx += 1
00133         self.write("if binary is not None:")
00134         self.iidx += 1
00135         self.write("self._binary_init(binary)")
00136         self.write("self._freeze = True")
00137         self.write("return")
00138         self.iidx -= 1
00139 
00140         # hack extension object stuff
00141         extobj_hack = False
00142         if "BodyLength" in [f.name for f in obj.fields]:
00143             extobj_hack = True
00144 
00145         for field in obj.fields:
00146             if extobj_hack and field.name == "Encoding":
00147                 self.write("self.Encoding = 1")
00148             elif field.uatype == obj.name:  # help!!! selv referencing class
00149                 self.write("self.{} = None".format(field.name))
00150             elif not obj.name in ("ExtensionObject") and field.name == "TypeId":  # and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
00151                 self.write("self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
00152             else:
00153                 self.write("self.{} = {}".format(field.name, "[]" if field.length else self.get_default_value(field)))
00154         self.write("self._freeze = True")
00155         self.iidx = 1
00156 
00157         # serialize code
00158         self.write("")
00159         self.write("def to_binary(self):")
00160         self.iidx += 1
00161 
00162         # hack for self referencing classes
00163         # for field in obj.fields:
00164         # if field.uatype == obj.name: #help!!! selv referencing class
00165         #self.write("if self.{name} is None: self.{name} = {uatype}()".format(name=field.name, uatype=field.uatype))
00166 
00167         self.write("packet = []")
00168         if extobj_hack:
00169             self.write("body = []")
00170         #self.write("tmp = packet")
00171         for field in obj.fields:
00172             if field.switchfield:
00173                 if field.switchvalue:
00174                     bit = obj.bits[field.switchfield]
00175                     #self.write("if self.{}: self.{} |= (value << {})".format(field.name, field.switchfield, field.switchvalue))
00176                     mask = '0b' + '0' * (8 - bit.length) + '1' * bit.length
00177                     self.write("others = self.{} & {}".format(bit.container, mask))
00178                     self.write("if self.{}: self.{} = ( {} | others )".format(field.name, bit.container, field.switchvalue))
00179                 else:
00180                     bit = obj.bits[field.switchfield]
00181                     self.write("if self.{}: self.{} |= (1 << {})".format(field.name, bit.container, bit.idx))
00182         iidx = self.iidx
00183         listname = "packet"
00184         for idx, field in enumerate(obj.fields):
00185             # if field.name == "Body" and idx <= (len(obj.fields)-1):
00186             if field.name == "BodyLength":
00187                 listname = "body"
00188                 #self.write("tmp = packet")
00189                 continue
00190             self.iidx = iidx
00191             switch = ""
00192             fname = "self." + field.name
00193             if field.switchfield:
00194                 self.write("if self.{}: ".format(field.name))
00195                 self.iidx += 1
00196             if field.length:
00197                 self.write("{}.append(uabin.Primitives.Int32.pack(len(self.{})))".format(listname, field.name))
00198                 self.write("for fieldname in self.{}:".format(field.name))
00199                 fname = "fieldname"
00200                 self.iidx += 1
00201             if field.is_native_type():
00202                 self.write_pack_uatype(listname, fname, field.uatype)
00203             elif field.uatype in self.model.enum_list:
00204                 enum = self.model.get_enum(field.uatype)
00205                 self.write_pack_enum(listname, fname, enum)
00206             elif field.uatype in ("ExtensionObject"):
00207                 self.write("{}.append(extensionobject_to_binary({}))".format(listname, fname))
00208             else:
00209                 self.write("{}.append({}.to_binary())".format(listname, fname))
00210             if field.length:
00211                 self.iidx -= 1
00212         self.iidx = 2
00213         if extobj_hack:
00214             self.write("body = b''.join(body)")
00215             self.write("packet.append(struct.pack('<i', len(body)))")
00216             self.write("packet.append(body)")
00217         self.write("return b''.join(packet)")
00218         self.write("")
00219 
00220         self.iidx = 1
00221         # deserialize
00222         self.write("@staticmethod")
00223         self.write("def from_binary(data):")
00224         self.iidx += 1
00225         self.write("return {}(data)".format(obj.name))
00226         self.iidx -= 1
00227         self.write("")
00228 
00229         self.write("def _binary_init(self, data):")
00230         self.iidx += 1
00231         iidx = self.iidx
00232         for idx, field in enumerate(obj.fields):
00233             self.iidx = iidx
00234             # if field.name == "Body" and idx <= (len(obj.fields)-1):
00235             #self.write("bodylength = struct.unpack('<i', data)[0]")
00236             # continue
00237             if field.switchfield:
00238                 bit = obj.bits[field.switchfield]
00239                 if field.switchvalue:
00240                     mask = '0b' + '0' * (8 - bit.length) + '1' * bit.length
00241                     self.write("val = self.{} & {}".format(bit.container, mask))
00242                     self.write("if val == {}:".format(bit.idx))
00243                 else:
00244                     self.write("if self.{} & (1 << {}):".format(bit.container, bit.idx))
00245                 self.iidx += 1
00246             if field.is_native_type():
00247                 if field.length:
00248                     self.write("self.{} = uabin.Primitives.{}.unpack_array(data)".format(field.name, field.uatype))
00249                 else:
00250                     self.write_unpack_uatype(field.name, field.uatype)
00251             elif field.uatype in self.model.enum_list:
00252                 #uatype = self.model.get_enum(field.uatype).uatype
00253                 #self.write_unpack_uatype(field.name, uatype)
00254                 enum = self.model.get_enum(field.uatype)
00255                 self.write_unpack_enum(field.name, enum)
00256             else:
00257                 if field.uatype in ("ExtensionObject"):
00258                     frombinary = "extensionobject_from_binary(data)"
00259                 else:
00260                     frombinary = "{}.from_binary(data)".format(field.uatype)
00261                 if field.length:
00262                     self.write("length = uabin.Primitives.Int32.unpack(data)")
00263                     self.write("array = []")
00264                     self.write("if length != -1:")
00265                     self.iidx += 1
00266                     self.write("for _ in range(0, length):")
00267                     self.iidx += 1
00268                     self.write("array.append({})".format(frombinary))
00269                     self.iidx -= 2
00270                     self.write("self.{} = array".format(field.name))
00271                 else:
00272                     self.write("self.{} = {}".format(field.name, frombinary))
00273             if field.switchfield:
00274                 self.iidx -= 1
00275                 self.write("else:")
00276                 self.iidx += 1
00277                 if extobj_hack and field.name == "Encoding":
00278                     self.write("self.Encoding = 1")
00279                 elif field.uatype == obj.name:  # help!!! selv referencing class
00280                     self.write("self.{} = None".format(field.name))
00281                 elif not obj.name in ("ExtensionObject") and field.name == "TypeId":  # and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
00282                     self.write("self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
00283                 else:
00284                     self.write("self.{} = {}".format(field.name, "[]" if field.length else self.get_default_value(field)))
00285         if len(obj.fields) == 0:
00286             self.write("pass")
00287 
00288         self.iidx = 2
00289 
00290         #__str__
00291         self.iidx = 1
00292         self.write("")
00293         self.write("def __str__(self):")
00294         self.iidx += 1
00295         tmp = ["'{name}:' + str(self.{name})".format(name=f.name) for f in obj.fields]
00296         tmp = " + ', ' + \\\n               ".join(tmp)
00297         self.write("return '{}(' + {} + ')'".format(obj.name, tmp))
00298         self.iidx -= 1
00299         self.write("")
00300         self.write("__repr__ = __str__")
00301 
00302         self.iix = 0
00303 
00304     def write_unpack_enum(self, name, enum):
00305         self.write("self.{} = {}(uabin.Primitives.{}.unpack(data))".format(name, enum.name, enum.uatype))
00306 
00307     def get_size_from_uatype(self, uatype):
00308         if uatype in ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean"):
00309             return 1
00310         elif uatype in ("Int16", "UInt16"):
00311             return 2
00312         elif uatype in ("Int32", "UInt32", "Float"):
00313             return 4
00314         elif uatype in ("Int64", "UInt64", "Double"):
00315             return 8
00316         else:
00317             raise Exception("Cannot get size from type {}".format(uatype))
00318 
00319     def write_unpack_uatype(self, name, uatype):
00320         if hasattr(Primitives, uatype):
00321             self.write("self.{} = uabin.Primitives.{}.unpack(data)".format(name, uatype))
00322         else:
00323             self.write("self.{} = {}.from_binary(data))".format(name, uatype))
00324 
00325     def write_pack_enum(self, listname, name, enum):
00326         self.write("{}.append(uabin.Primitives.{}.pack({}.value))".format(listname, enum.uatype, name))
00327 
00328     def write_pack_uatype(self, listname, name, uatype):
00329         if hasattr(Primitives, uatype):
00330             self.write("{}.append(uabin.Primitives.{}.pack({}))".format(listname, uatype, name))
00331         else:
00332             self.write("{}.append({}.to_binary(}))".format(listname, name))
00333             return
00334 
00335     def get_default_value(self, field):
00336         if field.uatype in self.model.enum_list:
00337             enum = self.model.get_enum(field.uatype)
00338             return enum.name + "(0)"
00339         if field.uatype in ("String"):
00340             return None 
00341         elif field.uatype in ("ByteString", "CharArray", "Char"):
00342             return None 
00343         elif field.uatype in ("Boolean"):
00344             return "True"
00345         elif field.uatype in ("DateTime"):
00346             return "datetime.now()"
00347         elif field.uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte"):
00348             return 0
00349         elif field.uatype in ("ExtensionObject"):
00350             return "None"
00351         else:
00352             return field.uatype + "()"
00353 
00354 
00355 def fix_names(model):
00356     for s in model.enums:
00357         for f in s.values:
00358             if f.name == "None":
00359                 f.name = "None_"
00360 
00361 
00362 if __name__ == "__main__":
00363     xmlpath = "Opc.Ua.Types.bsd"
00364     protocolpath = "../opcua/ua/uaprotocol_auto.py"
00365     p = gm.Parser(xmlpath)
00366     model = p.parse()
00367     gm.add_basetype_members(model)
00368     gm.add_encoding_field(model)
00369     gm.remove_duplicates(model)
00370     gm.remove_vector_length(model)
00371     gm.split_requests(model)
00372     fix_names(model)
00373     c = CodeGenerator(model, protocolpath)
00374     c.run()


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23