generate_address_space.py
Go to the documentation of this file.
1 """
2 Generate address space c++ code from xml file specification
3 """
4 import sys
5 
6 import xml.etree.ElementTree as ET
7 
8 class ObjectStruct(object):
9  def __init__(self):
10  self.nodetype = None
11  self.nodeid = None
12  self.browsename = None
13  self.displayname = None
14  self.symname = None
15  self.parent = None
16  self.parentlink = None
17  self.desc = ""
18  self.typedef = None
19  self.refs = []
20  self.nodeclass = None
21  self.eventnotifier = 0
22 
23  #variable
24  self.datatype = None
25  self.rank = -1 # check default value
26  self.value = []
27  self.dimensions = None
28  self.accesslevel = None
29  self.useraccesslevel = None
30  self.minsample = None
31 
32  #referencetype
33  self.inversename = ""
34  self.abstract = "false"
35  self.symmetric = "false"
36 
37  #datatype
38  self.definition = []
39 
40  #types
41 
42 
43 class RefStruct():
44  def __init__(self):
45  self.reftype = None
46  self.forward = "true"
47  self.target = None
48 
49 
50 class CodeGenerator(object):
51  def __init__(self, input_path, output_path):
52  self.input_path = input_path
53  self.output_path = output_path
54  self.output_file = None
55  self.part = self.input_path.split(".")[-2]
56 
57  def run(self):
58  sys.stderr.write("Generating C++ {} for XML file {}".format(self.output_path, self.input_path) + "\n")
59  if sys.version_info < (3,):
60  import codecs
61  self.output_file = codecs.open(self.output_path, 'w', 'utf-8')
62  else:
63  self.output_file = open(self.output_path, "w")
64  self.make_header()
65  tree = ET.parse(xmlpath)
66  root = tree.getroot()
67  for child in root:
68  if child.tag[51:] == 'UAObject':
69  node = self.parse_node(child)
70  self.make_object_code(node)
71  elif child.tag[51:] == 'UAObjectType':
72  node = self.parse_node(child)
73  self.make_object_type_code(node)
74  elif child.tag[51:] == 'UAVariable':
75  node = self.parse_node(child)
76  self.make_variable_code(node)
77  elif child.tag[51:] == 'UAVariableType':
78  node = self.parse_node(child)
79  self.make_variable_type_code(node)
80  elif child.tag[51:] == 'UAReferenceType':
81  node = self.parse_node(child)
82  self.make_reference_code(node)
83  elif child.tag[51:] == 'UADataType':
84  node = self.parse_node(child)
85  self.make_datatype_code(node)
86  else:
87  sys.stderr.write("Not implemented node type: " + child.tag[51:] + "\n")
88  self.writecode('''
89 void CreateAddressSpace%s(OpcUa::NodeManagementServices & registry)
90 {''' % (self.part))
91  for child in root:
92  if child.tag[51:] == 'UAObject':
93  pass
94  elif child.tag[51:] == 'UAObjectType':
95  pass
96  elif child.tag[51:] == 'UAVariable':
97  pass
98  elif child.tag[51:] == 'UAVariableType':
99  pass
100  elif child.tag[51:] == 'UAReferenceType':
101  pass
102  elif child.tag[51:] == 'UADataType':
103  pass
104  else:
105  continue
106  node = self.parse_node(child)
107  self.writecode(" ", 'create_{}(registry);'.format(node.nodeid[2:]))
108  self.make_footer()
109 
110  def writecode(self, *args):
111  self.output_file.write(" ".join(args) + "\n")
112 
113  def make_header(self, ):
114  self.writecode('''
115 // DO NOT EDIT THIS FILE!
116 // It is automatically generated from opcfoundation.org schemas.
117 //
118 
119 #include "standard_address_space_parts.h"
120 #include <opc/ua/protocol/string_utils.h>
121 #include <opc/common/addons_core/addon.h>
122 #include <opc/ua/protocol/strings.h>
123 #include <opc/ua/protocol/variable_access_level.h>
124 #include <opc/ua/services/node_management.h>
125 
126 #include <algorithm>
127 #include <iostream>
128 #include <map>
129 
130 namespace OpcUa
131 {''')
132 
133  def make_footer(self, ):
134  self.writecode('''
135 }
136 
137 } // namespace
138 ''')
139 
140 
141  def parse_node(self, child):
142  obj = ObjectStruct()
143  obj.nodetype = child.tag[53:]
144  for key, val in child.attrib.items():
145  if key == "NodeId":
146  obj.nodeid = val
147  elif key == "BrowseName":
148  obj.browsename = val
149  elif key == "SymbolicName":
150  obj.symname = val
151  elif key == "ParentNodeId":
152  obj.parent = val
153  elif key == "DataType":
154  obj.datatype = val
155  elif key == "IsAbstract":
156  obj.abstract = val
157  elif key == "EventNotifier":
158  obj.eventnotifier = val
159  elif key == "ValueRank":
160  obj.rank = val
161  elif key == "ArrayDimensions":
162  obj.dimensions = val
163  elif key == "MinimumSamplingInterval":
164  obj.minsample = val
165  elif key == "AccessLevel":
166  obj.accesslevel = val
167  elif key == "UserAccessLevel":
168  obj.useraccesslevel = val
169  elif key == "Symmetric":
170  obj.symmetric = val
171  else:
172  sys.stderr.write("Attribute not implemented: " + key + " " + val + "\n")
173 
174  obj.displayname = obj.browsename#FIXME
175  for el in child:
176  tag = el.tag[51:]
177 
178  if tag == "DisplayName":
179  obj.displayname = el.text
180  elif tag == "Description":
181  obj.desc = el.text
182  elif tag == "References":
183  for ref in el:
184  #self.writecode("ref", ref, "IsForward" in ref, ref.text )
185  if ref.attrib["ReferenceType"] == "HasTypeDefinition":
186  obj.typedef = ref.text
187  elif "IsForward" in ref.attrib and ref.attrib["IsForward"] == "false":
188  #if obj.parent:
189  #sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
190  obj.parent = ref.text
191  obj.parentlink = ref.attrib["ReferenceType"]
192  else:
193  struct = RefStruct()
194  if "IsForward" in ref.attrib: struct.forward = ref.attrib["IsForward"]
195  struct.target = ref.text
196  struct.reftype = ref.attrib["ReferenceType"]
197  obj.refs.append(struct)
198  elif tag == "Value":
199  for val in el:
200  ntag = val.tag[47:]
201  if ntag == "Int32":
202  obj.value.append("(int32_t) " + val.text)
203  elif ntag == "UInt32":
204  obj.value.append("(uint32_t) " + val.text)
205  elif ntag in ('ByteString', 'String'):
206  mytext = val.text.replace('\r', '')
207  if len(mytext) < 65535:
208  mytext = ['"{}"'.format(x) for x in val.text.replace('\r', '').splitlines()]
209  mytext = '\n'.join(mytext)
210  obj.value.append('+{}'.format(mytext))
211  else:
212  def batch_gen(data, batch_size):
213  for i in range(0, len(data), batch_size):
214  yield data[i:i+batch_size]
215  mytext = '({}).c_str()'.format(
216  ' +\n'.join(
217  ['std::string({})'.format(
218  '\n'.join(
219  ['"{}"'.format(x) for x in segment.splitlines()]
220  )
221  ) for segment in batch_gen(mytext, 65000)
222  ]
223  )
224  )
225  elif ntag == "ListOfExtensionObject":
226  pass
227  elif ntag == "ListOfLocalizedText":
228  pass
229  else:
230  self.writecode("Missing type: ", ntag)
231  elif tag == "InverseName":
232  obj.inversename = el.text
233  elif tag == "Definition":
234  for field in el:
235  obj.definition.append(field)
236  else:
237  sys.stderr.write("Not implemented tag: "+ str(el) + "\n")
238  return obj
239 
240  def make_node_code(self, obj, indent):
241  self.writecode(indent, 'AddNodesItem node;')
242  self.writecode(indent, 'node.RequestedNewNodeId = ToNodeId("{}");'.format(obj.nodeid))
243  self.writecode(indent, 'node.BrowseName = ToQualifiedName("{}");'.format(obj.browsename))
244  self.writecode(indent, 'node.Class = NodeClass::{};'.format(obj.nodetype))
245  if obj.parent: self.writecode(indent, 'node.ParentNodeId = ToNodeId("{}");'.format(obj.parent))
246  if obj.parent: self.writecode(indent, 'node.ReferenceTypeId = {};'.format(self.to_ref_type(obj.parentlink)))
247  if obj.typedef: self.writecode(indent, 'node.TypeDefinition = ToNodeId("{}");'.format(obj.typedef))
248 
249  def to_vector(self, dims):
250  s = "std::vector<uint32_t> {"
251  s += dims.replace(',', ', ')
252  s += "}"
253  return s
254 
255  def to_data_type(self, nodeid):
256  if not nodeid:
257  return "ObjectId::String"
258  if "=" in nodeid:
259  return 'ToNodeId("{}")'.format(nodeid)
260  else:
261  return 'ObjectId::{}'.format(nodeid)
262 
263  def to_ref_type(self, nodeid):
264  if "=" in nodeid:
265  return 'ToNodeId("{}")'.format(nodeid)
266  else:
267  return 'ReferenceId::{}'.format(nodeid)
268 
269  def make_object_code(self, obj):
270  indent = " "
271  self.writecode("")
272  self.writecode('static void create_{}(OpcUa::NodeManagementServices & registry)'.format(obj.nodeid[2:]))
273  self.writecode("{")
274  self.make_node_code(obj, indent)
275  self.writecode(indent, 'ObjectAttributes attrs;')
276  if obj.desc: self.writecode(indent, 'attrs.Description = LocalizedText("{}");'.format(obj.desc))
277  self.writecode(indent, 'attrs.DisplayName = LocalizedText("{}");'.format(obj.displayname))
278  self.writecode(indent, 'attrs.EventNotifier = {};'.format(obj.eventnotifier))
279  self.writecode(indent, 'node.Attributes = attrs;')
280  self.writecode(indent, 'registry.AddNodes(std::vector<AddNodesItem> {node});')
281  self.make_refs_code(obj, indent)
282  self.writecode("}")
283 
284  def make_object_type_code(self, obj):
285  indent = " "
286  self.writecode("")
287  self.writecode('static void create_{}(OpcUa::NodeManagementServices & registry)'.format(obj.nodeid[2:]))
288  self.writecode("{")
289  self.make_node_code(obj, indent)
290  self.writecode(indent, 'ObjectTypeAttributes attrs;')
291  if obj.desc: self.writecode(indent, 'attrs.Description = LocalizedText("{}");'.format(obj.desc))
292  self.writecode(indent, 'attrs.DisplayName = LocalizedText("{}");'.format(obj.displayname))
293  self.writecode(indent, 'attrs.IsAbstract = {};'.format(obj.abstract))
294  self.writecode(indent, 'node.Attributes = attrs;')
295  self.writecode(indent, 'registry.AddNodes(std::vector<AddNodesItem> {node});')
296  self.make_refs_code(obj, indent)
297  self.writecode("}")
298 
299 
300  def make_variable_code(self, obj):
301  indent = " "
302  self.writecode("")
303  self.writecode('static void create_{}(OpcUa::NodeManagementServices & registry)'.format(obj.nodeid[2:]))
304  self.writecode("{")
305  self.make_node_code(obj, indent)
306  self.writecode(indent, 'VariableAttributes attrs;')
307  if obj.desc: self.writecode(indent, 'attrs.Description = LocalizedText("{}");'.format(obj.desc))
308  self.writecode(indent, 'attrs.DisplayName = LocalizedText("{}");'.format(obj.displayname))
309  self.writecode(indent, 'attrs.Type = {};'.format(self.to_data_type(obj.datatype)))
310  if obj.value and len(obj.value) == 1: self.writecode(indent, 'attrs.Value = {};'.format(obj.value[0]))
311  if obj.rank: self.writecode(indent, 'attrs.Rank = {};'.format(obj.rank))
312  if obj.accesslevel: self.writecode(indent, 'attrs.AccessLevel = (VariableAccessLevel) {};'.format(obj.accesslevel))
313  if obj.useraccesslevel: self.writecode(indent, 'attrs.UserAccessLevel = (VariableAccessLevel) {};'.format(obj.useraccesslevel))
314  if obj.minsample: self.writecode(indent, 'attrs.MinimumSamplingInterval = {};'.format(obj.minsample))
315  if obj.dimensions: self.writecode(indent, 'attrs.Dimensions = {};'.format(self.to_vector(obj.dimensions)))
316  self.writecode(indent, 'node.Attributes = attrs;')
317  self.writecode(indent, 'registry.AddNodes(std::vector<AddNodesItem> {node});')
318  self.make_refs_code(obj, indent)
319  self.writecode("}")
320 
321  def make_variable_type_code(self, obj):
322  indent = " "
323  self.writecode("")
324  self.writecode('static void create_{}(OpcUa::NodeManagementServices & registry)'.format(obj.nodeid[2:]))
325  self.writecode("{")
326  self.make_node_code(obj, indent)
327  self.writecode(indent, 'VariableTypeAttributes attrs;')
328  if obj.desc: self.writecode(indent, 'attrs.Description = LocalizedText("{}");'.format(obj.desc))
329  self.writecode(indent, 'attrs.DisplayName = LocalizedText("{}");'.format(obj.displayname))
330  self.writecode(indent, 'attrs.Type = {};'.format(self.to_data_type(obj.datatype)))
331  if obj.value and len(obj.value) == 1: self.writecode(indent, 'attrs.Value = {};'.format(obj.value[0]))
332  if obj.rank: self.writecode(indent, 'attrs.Rank = {};'.format(obj.rank))
333  if obj.abstract: self.writecode(indent, 'attrs.IsAbstract = {};'.format(obj.abstract))
334  if obj.dimensions: self.writecode(indent, 'attrs.Dimensions = {};'.format(self.to_vector(obj.dimensions)))
335  self.writecode(indent, 'node.Attributes = attrs;')
336  self.writecode(indent, 'registry.AddNodes(std::vector<AddNodesItem> {node});')
337  self.make_refs_code(obj, indent)
338  self.writecode("}")
339 
340 
341 
342  def make_reference_code(self, obj):
343  indent = " "
344  self.writecode("")
345  self.writecode('static void create_{}(OpcUa::NodeManagementServices & registry)'.format(obj.nodeid[2:]))
346  self.writecode("{")
347  self.make_node_code(obj, indent)
348  self.writecode(indent, 'ReferenceTypeAttributes attrs;')
349  if obj.desc: self.writecode(indent, 'attrs.Description = LocalizedText("{}");'.format(obj.desc))
350  self.writecode(indent, 'attrs.DisplayName = LocalizedText("{}");'.format(obj.displayname))
351  if obj. inversename: self.writecode(indent, 'attrs.InverseName = LocalizedText("{}");'.format(obj.inversename))
352  if obj.abstract: self.writecode(indent, 'attrs.IsAbstract = {};'.format(obj.abstract))
353  if obj.symmetric: self.writecode(indent, 'attrs.Symmetric = {};'.format(obj.symmetric))
354  self.writecode(indent, 'node.Attributes = attrs;')
355  self.writecode(indent, 'registry.AddNodes(std::vector<AddNodesItem> {node});')
356  self.make_refs_code(obj, indent)
357  self.writecode("}")
358 
359  def make_datatype_code(self, obj):
360  indent = " "
361  self.writecode("")
362  self.writecode('static void create_{}(OpcUa::NodeManagementServices & registry)'.format(obj.nodeid[2:]))
363  self.writecode("{")
364  self.make_node_code(obj, indent)
365  self.writecode(indent, 'DataTypeAttributes attrs;')
366  if obj.desc: self.writecode(indent, u'attrs.Description = LocalizedText("{}");'.format(obj.desc))
367  self.writecode(indent, 'attrs.DisplayName = LocalizedText("{}");'.format(obj.displayname))
368  if obj.abstract: self.writecode(indent, 'attrs.IsAbstract = {};'.format(obj.abstract))
369  self.writecode(indent, 'node.Attributes = attrs;')
370  self.writecode(indent, 'registry.AddNodes(std::vector<AddNodesItem> {node});')
371  self.make_refs_code(obj, indent)
372  self.writecode("}")
373 
374  def make_refs_code(self, obj, indent):
375  if not obj.refs:
376  return
377  self.writecode(indent, "std::vector<AddReferencesItem> refs;")
378  for ref in obj.refs:
379  self.writecode(indent, "{")
380  localIndent = indent + " "
381  self.writecode(localIndent, 'AddReferencesItem ref;')
382  self.writecode(localIndent, 'ref.IsForward = true;')
383  self.writecode(localIndent, 'ref.ReferenceTypeId = {};'.format(self.to_ref_type(ref.reftype)))
384  self.writecode(localIndent, 'ref.SourceNodeId = ToNodeId("{}");'.format(obj.nodeid))
385  self.writecode(localIndent, 'ref.TargetNodeClass = NodeClass::DataType;')
386  self.writecode(localIndent, 'ref.TargetNodeId = ToNodeId("{}");'.format(ref.target))
387  self.writecode(localIndent, "refs.push_back(ref);")
388  self.writecode(indent, "}")
389  self.writecode(indent, 'registry.AddReferences(refs);')
390 
391 
392 if __name__ == "__main__":
393  if len(sys.argv) == 2 and sys.argv[1] == "all":
394  for i in (3, 4, 5, 8, 9, 10, 11, 13):
395  xmlpath = "Opc.Ua.NodeSet2.Part{}.xml".format(str(i))
396  cpppath = "../src/server/standard_address_space_part{}.cpp".format(str(i))
397  c = CodeGenerator(xmlpath, cpppath)
398  c.run()
399 
400 
401  elif len(sys.argv) != 3:
402  print(sys.argv)
403  print("usage: generate_address_space.py xml_input_file cpp_output_file")
404  print(" or generate_address_space.py all")
405  sys.exit(1)
406  else:
407  xmlpath = sys.argv[1]
408  cpppath = sys.argv[2]
409  c = CodeGenerator(xmlpath, cpppath)
410  c.run()
411 
FMT_API void print(std::FILE *f, CStringRef format_str, ArgList args)
Definition: format.cc:873
std::string format(CStringRef format_str, ArgList args)
Definition: format.h:3686
def __init__(self, input_path, output_path)


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:06:05