generate_protocol_python.py
Go to the documentation of this file.
1 # temporary hack
2 import generate_model as gm
3 
4 IgnoredEnums = ["NodeIdType"]
5 IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "LocalizedText", "ExtensionObject", "XmlElement"]
6 
7 class Primitives1(object):
8  Int8 = 0
9  SByte = 0
10  Int16 = 0
11  Int32 = 0
12  Int64 = 0
13  UInt8 = 0
14  Char = 0
15  Byte = 0
16  UInt16 = 0
17  UInt32 = 0
18  UInt64 = 0
19  Boolean = 0
20  Double = 0
21  Float = 0
22 
23 
25  Null = 0
26  String = 0
27  Bytes = 0
28  ByteString = 0
29  CharArray = 0
30  DateTime = 0
31 
32 
33 
34 class CodeGenerator(object):
35 
36  def __init__(self, model, output):
37  self.model = model
38  self.output_path = output
39  self.indent = " "
40  self.iidx = 0 # indent index
41 
42  def run(self):
43  print("Writting python protocol code to ", self.output_path)
44  self.output_file = open(self.output_path, "w")
45  self.make_header()
46  for enum in self.model.enums:
47  if enum.name not in IgnoredEnums:
48  self.generate_enum_code(enum)
49  for struct in self.model.structs:
50  if struct.name in IgnoredStructs:
51  continue
52  if struct.name.endswith("Node") or struct.name.endswith("NodeId"):
53  continue
54  self.generate_struct_code(struct)
55 
56  self.iidx = 0
57  self.write("")
58  self.write("")
59  self.write("ExtensionClasses = {")
60  for struct in self.model.structs:
61  if struct.name in IgnoredStructs:
62  continue
63  if struct.name.endswith("Node") or struct.name.endswith("NodeId"):
64  continue
65  if "ExtensionObject" in struct.parents:
66  self.write(" ObjectIds.{0}_Encoding_DefaultBinary: {0},".format(struct.name))
67  self.write("}")
68  self.write("")
69  with open('uaprotocol_auto_add.py') as f:
70  for line in f:
71  self.write(line.rstrip())
72 
73  def write(self, line):
74  if line:
75  line = self.indent * self.iidx + line
76  self.output_file.write(line + "\n")
77 
78  def make_header(self):
79  self.write("'''")
80  self.write("Autogenerate code from xml spec")
81  self.write("'''")
82  self.write("")
83  self.write("from datetime import datetime")
84  self.write("from enum import Enum, IntEnum")
85  self.write("")
86  self.write("from opcua.common.utils import Buffer")
87  self.write("from opcua.ua.uaerrors import UaError")
88  self.write("from opcua.ua.uatypes import *")
89  self.write("from opcua.ua import ua_binary as uabin")
90  self.write("from opcua.ua.object_ids import ObjectIds")
91 
92  def generate_enum_code(self, enum):
93  self.write("")
94  self.write("")
95  self.write("class {}(IntEnum):".format(enum.name))
96  self.iidx = 1
97  self.write("'''")
98  if enum.doc:
99  self.write(enum.doc)
100  self.write("")
101  for val in enum.values:
102  self.write(":ivar {}:".format(val.name))
103  self.write(":vartype {}: {}".format(val.name, val.value))
104  self.write("'''")
105  for val in enum.values:
106  self.write("{} = {}".format(val.name, val.value))
107  self.iidx = 0
108 
109  def generate_struct_code(self, obj):
110  self.write("")
111  self.write("")
112  self.iidx = 0
113  self.write("class {}(FrozenClass):".format(obj.name))
114  self.iidx += 1
115  self.write("'''")
116  if obj.doc:
117  self.write(obj.doc)
118  self.write("")
119  for field in obj.fields:
120  self.write(":ivar {}:".format(field.name))
121  self.write(":vartype {}: {}".format(field.name, field.uatype))
122  self.write("'''")
123 
124  self.write("")
125  self.write("ua_types = {")
126  for field in obj.fields:
127  self.write(" '{}': '{}',".format(field.name, field.uatype))
128  self.write(" }")
129  self.write("")
130 
131  self.write("def __init__(self, binary=None):")
132  self.iidx += 1
133  self.write("if binary is not None:")
134  self.iidx += 1
135  self.write("self._binary_init(binary)")
136  self.write("self._freeze = True")
137  self.write("return")
138  self.iidx -= 1
139 
140  # hack extension object stuff
141  extobj_hack = False
142  if "BodyLength" in [f.name for f in obj.fields]:
143  extobj_hack = True
144 
145  for field in obj.fields:
146  if extobj_hack and field.name == "Encoding":
147  self.write("self.Encoding = 1")
148  elif field.uatype == obj.name: # help!!! selv referencing class
149  self.write("self.{} = None".format(field.name))
150  elif not obj.name in ("ExtensionObject") and field.name == "TypeId": # and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
151  self.write("self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
152  else:
153  self.write("self.{} = {}".format(field.name, "[]" if field.length else self.get_default_value(field)))
154  self.write("self._freeze = True")
155  self.iidx = 1
156 
157  # serialize code
158  self.write("")
159  self.write("def to_binary(self):")
160  self.iidx += 1
161 
162  # hack for self referencing classes
163  # for field in obj.fields:
164  # if field.uatype == obj.name: #help!!! selv referencing class
165  #self.write("if self.{name} is None: self.{name} = {uatype}()".format(name=field.name, uatype=field.uatype))
166 
167  self.write("packet = []")
168  if extobj_hack:
169  self.write("body = []")
170  #self.write("tmp = packet")
171  for field in obj.fields:
172  if field.switchfield:
173  if field.switchvalue:
174  bit = obj.bits[field.switchfield]
175  #self.write("if self.{}: self.{} |= (value << {})".format(field.name, field.switchfield, field.switchvalue))
176  mask = '0b' + '0' * (8 - bit.length) + '1' * bit.length
177  self.write("others = self.{} & {}".format(bit.container, mask))
178  self.write("if self.{}: self.{} = ( {} | others )".format(field.name, bit.container, field.switchvalue))
179  else:
180  bit = obj.bits[field.switchfield]
181  self.write("if self.{}: self.{} |= (1 << {})".format(field.name, bit.container, bit.idx))
182  iidx = self.iidx
183  listname = "packet"
184  for idx, field in enumerate(obj.fields):
185  # if field.name == "Body" and idx <= (len(obj.fields)-1):
186  if field.name == "BodyLength":
187  listname = "body"
188  #self.write("tmp = packet")
189  continue
190  self.iidx = iidx
191  switch = ""
192  fname = "self." + field.name
193  if field.switchfield:
194  self.write("if self.{}: ".format(field.name))
195  self.iidx += 1
196  if field.length:
197  self.write("{}.append(uabin.Primitives.Int32.pack(len(self.{})))".format(listname, field.name))
198  self.write("for fieldname in self.{}:".format(field.name))
199  fname = "fieldname"
200  self.iidx += 1
201  if field.is_native_type():
202  self.write_pack_uatype(listname, fname, field.uatype)
203  elif field.uatype in self.model.enum_list:
204  enum = self.model.get_enum(field.uatype)
205  self.write_pack_enum(listname, fname, enum)
206  elif field.uatype in ("ExtensionObject"):
207  self.write("{}.append(extensionobject_to_binary({}))".format(listname, fname))
208  else:
209  self.write("{}.append({}.to_binary())".format(listname, fname))
210  if field.length:
211  self.iidx -= 1
212  self.iidx = 2
213  if extobj_hack:
214  self.write("body = b''.join(body)")
215  self.write("packet.append(struct.pack('<i', len(body)))")
216  self.write("packet.append(body)")
217  self.write("return b''.join(packet)")
218  self.write("")
219 
220  self.iidx = 1
221  # deserialize
222  self.write("@staticmethod")
223  self.write("def from_binary(data):")
224  self.iidx += 1
225  self.write("return {}(data)".format(obj.name))
226  self.iidx -= 1
227  self.write("")
228 
229  self.write("def _binary_init(self, data):")
230  self.iidx += 1
231  iidx = self.iidx
232  for idx, field in enumerate(obj.fields):
233  self.iidx = iidx
234  # if field.name == "Body" and idx <= (len(obj.fields)-1):
235  #self.write("bodylength = struct.unpack('<i', data)[0]")
236  # continue
237  if field.switchfield:
238  bit = obj.bits[field.switchfield]
239  if field.switchvalue:
240  mask = '0b' + '0' * (8 - bit.length) + '1' * bit.length
241  self.write("val = self.{} & {}".format(bit.container, mask))
242  self.write("if val == {}:".format(bit.idx))
243  else:
244  self.write("if self.{} & (1 << {}):".format(bit.container, bit.idx))
245  self.iidx += 1
246  if field.is_native_type():
247  if field.length:
248  self.write("self.{} = uabin.Primitives.{}.unpack_array(data)".format(field.name, field.uatype))
249  else:
250  self.write_unpack_uatype(field.name, field.uatype)
251  elif field.uatype in self.model.enum_list:
252  #uatype = self.model.get_enum(field.uatype).uatype
253  #self.write_unpack_uatype(field.name, uatype)
254  enum = self.model.get_enum(field.uatype)
255  self.write_unpack_enum(field.name, enum)
256  else:
257  if field.uatype in ("ExtensionObject"):
258  frombinary = "extensionobject_from_binary(data)"
259  else:
260  frombinary = "{}.from_binary(data)".format(field.uatype)
261  if field.length:
262  self.write("length = uabin.Primitives.Int32.unpack(data)")
263  self.write("array = []")
264  self.write("if length != -1:")
265  self.iidx += 1
266  self.write("for _ in range(0, length):")
267  self.iidx += 1
268  self.write("array.append({})".format(frombinary))
269  self.iidx -= 2
270  self.write("self.{} = array".format(field.name))
271  else:
272  self.write("self.{} = {}".format(field.name, frombinary))
273  if field.switchfield:
274  self.iidx -= 1
275  self.write("else:")
276  self.iidx += 1
277  if extobj_hack and field.name == "Encoding":
278  self.write("self.Encoding = 1")
279  elif field.uatype == obj.name: # help!!! selv referencing class
280  self.write("self.{} = None".format(field.name))
281  elif not obj.name in ("ExtensionObject") and field.name == "TypeId": # and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
282  self.write("self.TypeId = FourByteNodeId(ObjectIds.{}_Encoding_DefaultBinary)".format(obj.name))
283  else:
284  self.write("self.{} = {}".format(field.name, "[]" if field.length else self.get_default_value(field)))
285  if len(obj.fields) == 0:
286  self.write("pass")
287 
288  self.iidx = 2
289 
290  #__str__
291  self.iidx = 1
292  self.write("")
293  self.write("def __str__(self):")
294  self.iidx += 1
295  tmp = ["'{name}:' + str(self.{name})".format(name=f.name) for f in obj.fields]
296  tmp = " + ', ' + \\\n ".join(tmp)
297  self.write("return '{}(' + {} + ')'".format(obj.name, tmp))
298  self.iidx -= 1
299  self.write("")
300  self.write("__repr__ = __str__")
301 
302  self.iix = 0
303 
304  def write_unpack_enum(self, name, enum):
305  self.write("self.{} = {}(uabin.Primitives.{}.unpack(data))".format(name, enum.name, enum.uatype))
306 
307  def get_size_from_uatype(self, uatype):
308  if uatype in ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean"):
309  return 1
310  elif uatype in ("Int16", "UInt16"):
311  return 2
312  elif uatype in ("Int32", "UInt32", "Float"):
313  return 4
314  elif uatype in ("Int64", "UInt64", "Double"):
315  return 8
316  else:
317  raise Exception("Cannot get size from type {}".format(uatype))
318 
319  def write_unpack_uatype(self, name, uatype):
320  if hasattr(Primitives, uatype):
321  self.write("self.{} = uabin.Primitives.{}.unpack(data)".format(name, uatype))
322  else:
323  self.write("self.{} = {}.from_binary(data))".format(name, uatype))
324 
325  def write_pack_enum(self, listname, name, enum):
326  self.write("{}.append(uabin.Primitives.{}.pack({}.value))".format(listname, enum.uatype, name))
327 
328  def write_pack_uatype(self, listname, name, uatype):
329  if hasattr(Primitives, uatype):
330  self.write("{}.append(uabin.Primitives.{}.pack({}))".format(listname, uatype, name))
331  else:
332  self.write("{}.append({}.to_binary(}))".format(listname, name))
333  return
334 
335  def get_default_value(self, field):
336  if field.uatype in self.model.enum_list:
337  enum = self.model.get_enum(field.uatype)
338  return enum.name + "(0)"
339  if field.uatype in ("String"):
340  return None
341  elif field.uatype in ("ByteString", "CharArray", "Char"):
342  return None
343  elif field.uatype in ("Boolean"):
344  return "True"
345  elif field.uatype in ("DateTime"):
346  return "datetime.now()"
347  elif field.uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte"):
348  return 0
349  elif field.uatype in ("ExtensionObject"):
350  return "None"
351  else:
352  return field.uatype + "()"
353 
354 
355 def fix_names(model):
356  for s in model.enums:
357  for f in s.values:
358  if f.name == "None":
359  f.name = "None_"
360 
361 
362 if __name__ == "__main__":
363  xmlpath = "Opc.Ua.Types.bsd"
364  protocolpath = "../opcua/ua/uaprotocol_auto.py"
365  p = gm.Parser(xmlpath)
366  model = p.parse()
367  gm.add_basetype_members(model)
368  gm.add_encoding_field(model)
369  gm.remove_duplicates(model)
370  gm.remove_vector_length(model)
371  gm.split_requests(model)
372  fix_names(model)
373  c = CodeGenerator(model, protocolpath)
374  c.run()
def write_pack_enum(self, listname, name, enum)
def write_pack_uatype(self, listname, name, uatype)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44