2 import generate_model
as gm
4 IgnoredEnums = [
"NodeIdType"]
5 IgnoredStructs = [
"QualifiedName",
"NodeId",
"ExpandedNodeId",
"FilterOperand",
"Variant",
"DataValue",
"LocalizedText",
"ExtensionObject",
"XmlElement"]
43 print(
"Writting python protocol code to ", self.
output_path)
46 for enum
in self.model.enums:
47 if enum.name
not in IgnoredEnums:
49 for struct
in self.model.structs:
50 if struct.name
in IgnoredStructs:
52 if struct.name.endswith(
"Node")
or struct.name.endswith(
"NodeId"):
59 self.
write(
"ExtensionClasses = {")
60 for struct
in self.model.structs:
61 if struct.name
in IgnoredStructs:
63 if struct.name.endswith(
"Node")
or struct.name.endswith(
"NodeId"):
65 if "ExtensionObject" in struct.parents:
66 self.
write(
" ObjectIds.{0}_Encoding_DefaultBinary: {0},".format(struct.name))
69 with open(
'uaprotocol_auto_add.py')
as f:
71 self.
write(line.rstrip())
76 self.output_file.write(line +
"\n")
80 self.
write(
"Autogenerate code from xml spec")
83 self.
write(
"from datetime import datetime")
84 self.
write(
"from enum import Enum, IntEnum")
86 self.
write(
"from opcua.common.utils import Buffer")
87 self.
write(
"from opcua.ua.uaerrors import UaError")
88 self.
write(
"from opcua.ua.uatypes import *")
89 self.
write(
"from opcua.ua import ua_binary as uabin")
90 self.
write(
"from opcua.ua.object_ids import ObjectIds")
95 self.
write(
"class {}(IntEnum):".format(enum.name))
101 for val
in enum.values:
102 self.
write(
":ivar {}:".format(val.name))
103 self.
write(
":vartype {}: {}".format(val.name, val.value))
105 for val
in enum.values:
106 self.
write(
"{} = {}".format(val.name, val.value))
113 self.
write(
"class {}(FrozenClass):".format(obj.name))
119 for field
in obj.fields:
120 self.
write(
":ivar {}:".format(field.name))
121 self.
write(
":vartype {}: {}".format(field.name, field.uatype))
125 self.
write(
"ua_types = {")
126 for field
in obj.fields:
127 self.
write(
" '{}': '{}',".format(field.name, field.uatype))
131 self.
write(
"def __init__(self, binary=None):")
133 self.
write(
"if binary is not None:")
135 self.
write(
"self._binary_init(binary)")
136 self.
write(
"self._freeze = True")
142 if "BodyLength" in [f.name
for f
in obj.fields]:
145 for field
in obj.fields:
146 if extobj_hack
and field.name ==
"Encoding":
147 self.
write(
"self.Encoding = 1")
148 elif field.uatype == obj.name:
149 self.
write(
"self.{} = None".format(field.name))
150 elif not obj.name
in (
"ExtensionObject")
and field.name ==
"TypeId":
151 self.
write(
"self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
154 self.
write(
"self._freeze = True")
159 self.
write(
"def to_binary(self):")
167 self.
write(
"packet = []")
169 self.
write(
"body = []")
171 for field
in obj.fields:
172 if field.switchfield:
173 if field.switchvalue:
174 bit = obj.bits[field.switchfield]
176 mask =
'0b' +
'0' * (8 - bit.length) +
'1' * bit.length
177 self.
write(
"others = self.{} & {}".format(bit.container, mask))
178 self.
write(
"if self.{}: self.{} = ( {} | others )".format(field.name, bit.container, field.switchvalue))
180 bit = obj.bits[field.switchfield]
181 self.
write(
"if self.{}: self.{} |= (1 << {})".format(field.name, bit.container, bit.idx))
184 for idx, field
in enumerate(obj.fields):
186 if field.name ==
"BodyLength":
192 fname =
"self." + field.name
193 if field.switchfield:
194 self.
write(
"if self.{}: ".format(field.name))
197 self.
write(
"{}.append(uabin.Primitives.Int32.pack(len(self.{})))".format(listname, field.name))
198 self.
write(
"for fieldname in self.{}:".format(field.name))
201 if field.is_native_type():
203 elif field.uatype
in self.model.enum_list:
204 enum = self.model.get_enum(field.uatype)
206 elif field.uatype
in (
"ExtensionObject"):
207 self.
write(
"{}.append(extensionobject_to_binary({}))".format(listname, fname))
209 self.
write(
"{}.append({}.to_binary())".format(listname, fname))
214 self.
write(
"body = b''.join(body)")
215 self.
write(
"packet.append(struct.pack('<i', len(body)))")
216 self.
write(
"packet.append(body)")
217 self.
write(
"return b''.join(packet)")
222 self.
write(
"@staticmethod")
223 self.
write(
"def from_binary(data):")
225 self.
write(
"return {}(data)".format(obj.name))
229 self.
write(
"def _binary_init(self, data):")
232 for idx, field
in enumerate(obj.fields):
237 if field.switchfield:
238 bit = obj.bits[field.switchfield]
239 if field.switchvalue:
240 mask =
'0b' +
'0' * (8 - bit.length) +
'1' * bit.length
241 self.
write(
"val = self.{} & {}".format(bit.container, mask))
242 self.
write(
"if val == {}:".format(bit.idx))
244 self.
write(
"if self.{} & (1 << {}):".format(bit.container, bit.idx))
246 if field.is_native_type():
248 self.
write(
"self.{} = uabin.Primitives.{}.unpack_array(data)".format(field.name, field.uatype))
251 elif field.uatype
in self.model.enum_list:
254 enum = self.model.get_enum(field.uatype)
257 if field.uatype
in (
"ExtensionObject"):
258 frombinary =
"extensionobject_from_binary(data)" 260 frombinary =
"{}.from_binary(data)".format(field.uatype)
262 self.
write(
"length = uabin.Primitives.Int32.unpack(data)")
263 self.
write(
"array = []")
264 self.
write(
"if length != -1:")
266 self.
write(
"for _ in range(0, length):")
268 self.
write(
"array.append({})".format(frombinary))
270 self.
write(
"self.{} = array".format(field.name))
272 self.
write(
"self.{} = {}".format(field.name, frombinary))
273 if field.switchfield:
277 if extobj_hack
and field.name ==
"Encoding":
278 self.
write(
"self.Encoding = 1")
279 elif field.uatype == obj.name:
280 self.
write(
"self.{} = None".format(field.name))
281 elif not obj.name
in (
"ExtensionObject")
and field.name ==
"TypeId":
282 self.
write(
"self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
285 if len(obj.fields) == 0:
293 self.
write(
"def __str__(self):")
295 tmp = [
"'{name}:' + str(self.{name})".format(name=f.name)
for f
in obj.fields]
296 tmp =
" + ', ' + \\\n ".join(tmp)
297 self.
write(
"return '{}(' + {} + ')'".format(obj.name, tmp))
300 self.
write(
"__repr__ = __str__")
305 self.
write(
"self.{} = {}(uabin.Primitives.{}.unpack(data))".format(name, enum.name, enum.uatype))
308 if uatype
in (
"Int8",
"UInt8",
"Sbyte",
"Byte",
"Char",
"Boolean"):
310 elif uatype
in (
"Int16",
"UInt16"):
312 elif uatype
in (
"Int32",
"UInt32",
"Float"):
314 elif uatype
in (
"Int64",
"UInt64",
"Double"):
317 raise Exception(
"Cannot get size from type {}".format(uatype))
320 if hasattr(Primitives, uatype):
321 self.
write(
"self.{} = uabin.Primitives.{}.unpack(data)".format(name, uatype))
323 self.
write(
"self.{} = {}.from_binary(data))".format(name, uatype))
326 self.
write(
"{}.append(uabin.Primitives.{}.pack({}.value))".format(listname, enum.uatype, name))
329 if hasattr(Primitives, uatype):
330 self.
write(
"{}.append(uabin.Primitives.{}.pack({}))".format(listname, uatype, name))
332 self.
write(
"{}.append({}.to_binary(}))".format(listname, name))
336 if field.uatype
in self.model.enum_list:
337 enum = self.model.get_enum(field.uatype)
338 return enum.name +
"(0)" 339 if field.uatype
in (
"String"):
341 elif field.uatype
in (
"ByteString",
"CharArray",
"Char"):
343 elif field.uatype
in (
"Boolean"):
345 elif field.uatype
in (
"DateTime"):
346 return "datetime.now()" 347 elif field.uatype
in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
"Double",
"Float",
"Byte"):
349 elif field.uatype
in (
"ExtensionObject"):
352 return field.uatype +
"()" 356 for s
in model.enums:
362 if __name__ ==
"__main__":
363 xmlpath =
"Opc.Ua.Types.bsd" 364 protocolpath =
"../opcua/ua/uaprotocol_auto.py" 365 p = gm.Parser(xmlpath)
367 gm.add_basetype_members(model)
368 gm.add_encoding_field(model)
369 gm.remove_duplicates(model)
370 gm.remove_vector_length(model)
371 gm.split_requests(model)
def generate_enum_code(self, enum)
def write_pack_enum(self, listname, name, enum)
def __init__(self, model, output)
def get_default_value(self, field)
def write_unpack_enum(self, name, enum)
def write_unpack_uatype(self, name, uatype)
def get_size_from_uatype(self, uatype)
def write_pack_uatype(self, listname, name, uatype)
def generate_struct_code(self, obj)