generate_address_space.py
Go to the documentation of this file.
1 """
2 Generate address space c++ code from xml file specification
3 xmlparser.py is a requirement. it is in opcua folder but to avoid importing all code, developer can link xmlparser.py in current directory
4 """
5 import sys
6 import logging
7 # sys.path.insert(0, "..") # load local freeopcua implementation
8 #from opcua import xmlparser
9 import xmlparser
10 
11 
12 def _to_val(objs, attr, val):
13  from opcua import ua
14  print("got ", objs, attr, val)
15  cls = getattr(ua, objs[0])
16  for o in objs[1:]:
17  cls = getattr(ua, cls.ua_types[o])
18  if cls == ua.NodeId:
19  return "ua.NodeId.from_string('val')"
20  return ua_type_to_python(val, cls.ua_types[attr])
21 
22 
23 def ua_type_to_python(val, uatype):
24  if uatype in ("String"):
25  return "'{0}'".format(val)
26  elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
27  return "b'{0}'".format(val)
28  else:
29  return val
30 
31 
32 class CodeGenerator(object):
33 
34  def __init__(self, input_path, output_path):
35  self.input_path = input_path
36  self.output_path = output_path
37  self.output_file = None
38  self.part = self.input_path.split(".")[-2]
39  self.parser = None
40 
41  def run(self):
42  sys.stderr.write("Generating Python code {0} for XML file {1}".format(self.output_path, self.input_path) + "\n")
43  self.output_file = open(self.output_path, "w")
44  self.make_header()
45  self.parser = xmlparser.XMLParser(self.input_path, None)
46  for node in self.parser:
47  if node.nodetype == 'UAObject':
48  self.make_object_code(node)
49  elif node.nodetype == 'UAObjectType':
50  self.make_object_type_code(node)
51  elif node.nodetype == 'UAVariable':
52  self.make_variable_code(node)
53  elif node.nodetype == 'UAVariableType':
54  self.make_variable_type_code(node)
55  elif node.nodetype == 'UAReferenceType':
56  self.make_reference_code(node)
57  elif node.nodetype == 'UADataType':
58  self.make_datatype_code(node)
59  elif node.nodetype == 'UAMethod':
60  self.make_method_code(node)
61  else:
62  sys.stderr.write("Not implemented node type: " + node.nodetype + "\n")
63  self.output_file.close()
64 
65  def writecode(self, *args):
66  self.output_file.write(" ".join(args) + "\n")
67 
68  def make_header(self, ):
69  self.writecode('''
70 """
71 DO NOT EDIT THIS FILE!
72 It is automatically generated from opcfoundation.org schemas.
73 """
74 
75 from opcua import ua
76 
77 
78 def create_standard_address_space_{0!s}(server):
79  '''.format((self.part)))
80 
81  def make_node_code(self, obj, indent):
82  self.writecode(indent, 'node = ua.AddNodesItem()')
83  self.writecode(indent, 'node.RequestedNewNodeId = ua.NodeId.from_string("{0}")'.format(obj.nodeid))
84  self.writecode(indent, 'node.BrowseName = ua.QualifiedName.from_string("{0}")'.format(obj.browsename))
85  self.writecode(indent, 'node.NodeClass = ua.NodeClass.{0}'.format(obj.nodetype[2:]))
86  if obj.parent:
87  self.writecode(indent, 'node.ParentNodeId = ua.NodeId.from_string("{0}")'.format(obj.parent))
88  if obj.parent:
89  self.writecode(indent, 'node.ReferenceTypeId = {0}'.format(self.to_ref_type(obj.parentlink)))
90  if obj.typedef:
91  self.writecode(indent, 'node.TypeDefinition = ua.NodeId.from_string("{0}")'.format(obj.typedef))
92 
93  def to_data_type(self, nodeid):
94  if not nodeid:
95  return "ua.NodeId(ua.ObjectIds.String)"
96  if "=" in nodeid:
97  return 'ua.NodeId.from_string("{0}")'.format(nodeid)
98  else:
99  return 'ua.NodeId(ua.ObjectIds.{0})'.format(nodeid)
100 
101  def to_ref_type(self, nodeid):
102  if not "=" in nodeid:
103  nodeid = self.parser.aliases[nodeid]
104  return 'ua.NodeId.from_string("{0}")'.format(nodeid)
105 
106  def make_object_code(self, obj):
107  indent = " "
108  self.writecode(indent)
109  self.make_node_code(obj, indent)
110  self.writecode(indent, 'attrs = ua.ObjectAttributes()')
111  if obj.desc:
112  self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
113  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
114  self.writecode(indent, 'attrs.EventNotifier = {0}'.format(obj.eventnotifier))
115  self.writecode(indent, 'node.NodeAttributes = attrs')
116  self.writecode(indent, 'server.add_nodes([node])')
117  self.make_refs_code(obj, indent)
118 
119  def make_object_type_code(self, obj):
120  indent = " "
121  self.writecode(indent)
122  self.make_node_code(obj, indent)
123  self.writecode(indent, 'attrs = ua.ObjectTypeAttributes()')
124  if obj.desc:
125  self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
126  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
127  self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
128  self.writecode(indent, 'node.NodeAttributes = attrs')
129  self.writecode(indent, 'server.add_nodes([node])')
130  self.make_refs_code(obj, indent)
131 
132  def make_common_variable_code(self, indent, obj):
133  if obj.desc:
134  self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
135  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
136  self.writecode(indent, 'attrs.DataType = {0}'.format(self.to_data_type(obj.datatype)))
137  if obj.value is not None:
138  if obj.valuetype == "ListOfExtensionObject":
139  self.writecode(indent, 'value = []')
140  for ext in obj.value:
141  self.make_ext_obj_code(indent, ext)
142  self.writecode(indent, 'value.append(extobj)')
143  self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
144  elif obj.valuetype == "ExtensionObject":
145  self.make_ext_obj_code(indent, obj.value)
146  self.writecode(indent, 'value = extobj')
147  self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
148  else:
149  if obj.valuetype.startswith("ListOf"):
150  obj.valuetype = obj.valuetype[6:]
151  self.writecode(indent, 'attrs.Value = ua.Variant({0}, ua.VariantType.{1})'.format(self.to_value(obj.value), obj.valuetype))
152  if obj.rank:
153  self.writecode(indent, 'attrs.ValueRank = {0}'.format(obj.rank))
154  if obj.accesslevel:
155  self.writecode(indent, 'attrs.AccessLevel = {0}'.format(obj.accesslevel))
156  if obj.useraccesslevel:
157  self.writecode(indent, 'attrs.UserAccessLevel = {0}'.format(obj.useraccesslevel))
158  if obj.dimensions:
159  self.writecode(indent, 'attrs.ArrayDimensions = {0}'.format(obj.dimensions))
160 
161  def make_ext_obj_code(self, indent, extobj):
162  print("makeing code for ", extobj.objname)
163  self.writecode(indent, 'extobj = ua.{0}()'.format(extobj.objname))
164  for name, val in extobj.body.items():
165  for k, v in val.items():
166  if type(v) is str:
167  val = _to_val([extobj.objname], k, v)
168  self.writecode(indent, 'extobj.{0} = {1}'.format(k, val))
169  else:
170  if k == "DataType": #hack for strange nodeid xml format
171  self.writecode(indent, 'extobj.{0} = ua.NodeId.from_string("{1}")'.format(k, v["Identifier"]))
172  continue
173  for k2, v2 in v.items():
174  val2 = _to_val([extobj.objname, k], k2, v2)
175  self.writecode(indent, 'extobj.{0}.{1} = {2}'.format(k, k2, val2))
176 
177  def make_variable_code(self, obj):
178  indent = " "
179  self.writecode(indent)
180  self.make_node_code(obj, indent)
181  self.writecode(indent, 'attrs = ua.VariableAttributes()')
182  if obj.minsample:
183  self.writecode(indent, 'attrs.MinimumSamplingInterval = {0}'.format(obj.minsample))
184  self.make_common_variable_code(indent, obj)
185  self.writecode(indent, 'node.NodeAttributes = attrs')
186  self.writecode(indent, 'server.add_nodes([node])')
187  self.make_refs_code(obj, indent)
188 
189  def make_variable_type_code(self, obj):
190  indent = " "
191  self.writecode(indent)
192  self.make_node_code(obj, indent)
193  self.writecode(indent, 'attrs = ua.VariableTypeAttributes()')
194  if obj.desc:
195  self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
196  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
197  if obj.abstract:
198  self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
199  self.make_common_variable_code(indent, obj)
200  self.writecode(indent, 'node.NodeAttributes = attrs')
201  self.writecode(indent, 'server.add_nodes([node])')
202  self.make_refs_code(obj, indent)
203 
204  def to_value(self, val):
205  # if type(val) in (str, unicode):
206  if isinstance(val, str):
207  return '"' + val + '"'
208  else:
209  return val
210 
211  def make_method_code(self, obj):
212  indent = " "
213  self.writecode(indent)
214  self.make_node_code(obj, indent)
215  self.writecode(indent, 'attrs = ua.MethodAttributes()')
216  if obj.desc:
217  self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
218  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
219  self.writecode(indent, 'node.NodeAttributes = attrs')
220  self.writecode(indent, 'server.add_nodes([node])')
221  self.make_refs_code(obj, indent)
222 
223  def make_reference_code(self, obj):
224  indent = " "
225  self.writecode(indent)
226  self.make_node_code(obj, indent)
227  self.writecode(indent, 'attrs = ua.ReferenceTypeAttributes()')
228  if obj.desc:
229  self.writecode(indent, 'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc))
230  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
231  if obj. inversename:
232  self.writecode(indent, 'attrs.InverseName = ua.LocalizedText("{0}")'.format(obj.inversename))
233  if obj.abstract:
234  self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
235  if obj.symmetric:
236  self.writecode(indent, 'attrs.Symmetric = {0}'.format(obj.symmetric))
237  self.writecode(indent, 'node.NodeAttributes = attrs')
238  self.writecode(indent, 'server.add_nodes([node])')
239  self.make_refs_code(obj, indent)
240 
241  def make_datatype_code(self, obj):
242  indent = " "
243  self.writecode(indent)
244  self.make_node_code(obj, indent)
245  self.writecode(indent, 'attrs = ua.DataTypeAttributes()')
246  if obj.desc:
247  self.writecode(indent, u'attrs.Description = ua.LocalizedText("{0}")'.format(obj.desc.encode('ascii', 'replace')))
248  self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{0}")'.format(obj.displayname))
249  if obj.abstract:
250  self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
251  self.writecode(indent, 'node.NodeAttributes = attrs')
252  self.writecode(indent, 'server.add_nodes([node])')
253  self.make_refs_code(obj, indent)
254 
255  def make_refs_code(self, obj, indent):
256  if not obj.refs:
257  return
258  self.writecode(indent, "refs = []")
259  for ref in obj.refs:
260  self.writecode(indent, 'ref = ua.AddReferencesItem()')
261  self.writecode(indent, 'ref.IsForward = True')
262  self.writecode(indent, 'ref.ReferenceTypeId = {0}'.format(self.to_ref_type(ref.reftype)))
263  self.writecode(indent, 'ref.SourceNodeId = ua.NodeId.from_string("{0}")'.format(obj.nodeid))
264  self.writecode(indent, 'ref.TargetNodeClass = ua.NodeClass.DataType')
265  self.writecode(indent, 'ref.TargetNodeId = ua.NodeId.from_string("{0}")'.format(ref.target))
266  self.writecode(indent, "refs.append(ref)")
267  self.writecode(indent, 'server.add_references(refs)')
268 
269 
271  import os.path
272  path = os.path.join("..", "opcua", "binary_address_space.pickle")
273  print("Savind standard address space to:", path)
274  sys.path.append("..")
275  from opcua.server.standard_address_space import standard_address_space
276  from opcua.server.address_space import NodeManagementService, AddressSpace
277  aspace = AddressSpace()
278  standard_address_space.fill_address_space(NodeManagementService(aspace))
279  aspace.dump(path)
280 
281 if __name__ == "__main__":
282  logging.basicConfig(level=logging.WARN)
283  for i in (3, 4, 5, 8, 9, 10, 11, 13):
284  xmlpath = "Opc.Ua.NodeSet2.Part{0}.xml".format(str(i))
285  cpppath = "../opcua/server/standard_address_space/standard_address_space_part{0}.py".format(str(i))
286  c = CodeGenerator(xmlpath, cpppath)
287  c.run()
288 
def ua_type_to_python(val, uatype)
def __init__(self, input_path, output_path)
def _to_val(objs, attr, val)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:43