introspect.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
3 #
4 # This software is distributed under the terms of the MIT License.
5 #
6 # Author: Pavel Kirienko <pavel.kirienko@zubax.com>
7 # Ben Dyer <ben_dyer@mac.com>
8 #
9 
10 from __future__ import division, absolute_import, print_function, unicode_literals
11 import os
12 import pyuavcan_v0
13 from pyuavcan_v0.transport import CompoundValue, PrimitiveValue, ArrayValue, VoidValue
14 try:
15  from io import StringIO
16 except ImportError:
17  # noinspection PyUnresolvedReferences
18  from StringIO import StringIO
19 
20 
22  # Decomposing PrimitiveValue to value and type. This is ugly but it's by design...
23  if isinstance(obj, PrimitiveValue):
24  obj = obj.value
25 
26  # CompoundValue
27  if isinstance(obj, CompoundValue):
28  output = dict()
29  for field_name, field in pyuavcan_v0.get_fields(obj).items():
30  if pyuavcan_v0.is_union(obj) and pyuavcan_v0.get_active_union_field(obj) != field_name:
31  continue
32  if isinstance(field, VoidValue):
33  continue
34 
35  output[field_name] = to_json_compatible_object(field)
36  return output
37 
38  # ArrayValue
39  elif isinstance(obj, ArrayValue):
40  t = pyuavcan_v0.get_uavcan_data_type(obj)
41  if t.value_type.category == t.value_type.CATEGORY_PRIMITIVE:
42  def is_nice_character(ch):
43  if ch.is_printable() or ch.isspace():
44  return True
45  if ch in b'\n\r\t':
46  return True
47  return False
48 
49  # Catch a string masquerading as an array
50  if t.is_string_like and all(map(is_nice_character, obj)):
51  return obj.decode()
52 
53  # Return the array!
54  output = []
55  for x in obj:
56  output.append(to_json_compatible_object(x))
57 
58  return output
59 
60  # Primitive types
61  elif isinstance(obj, float):
62  return obj
63  elif isinstance(obj, bool):
64  return obj
65  elif isinstance(obj, int):
66  return obj
67 
68  # Non-printable types
69  elif isinstance(obj, VoidValue):
70  pass
71 
72  # Unknown types
73  else:
74  raise ValueError('Cannot generate JSON-compatible object representation for %r' % type(obj))
75 
76 
78  """
79  This function returns a representation of a UAVCAN structure (message, request, or response), or
80  a DSDL entity (array or primitive), or a UAVCAN transfer, as a structure easily able to be
81  transformed into json or json-like serialization
82  Args:
83  obj: Object to convert.
84 
85  Returns: structure which can easily be transformed into a json-like serialization
86  """
87  if not isinstance(obj, CompoundValue) and hasattr(obj, 'transfer'):
88  output = dict()
89  if hasattr(obj, 'message'):
90  payload = obj.message
91  output['transfer_type'] = 'Message'
92  elif hasattr(obj, 'request'):
93  payload = obj.request
94  output['transfer_type'] = 'Request'
95  elif hasattr(obj, 'response'):
96  payload = obj.response
97  output['transfer_type'] = 'Response'
98  else:
99  raise ValueError('Cannot generate JSON-compatible object representation for %r' % type(obj))
100 
101  output['source_node_id'] = obj.transfer.source_node_id
102  output['dest_node_id'] = obj.transfer.dest_node_id
103  output['ts_monotonic'] = obj.transfer.ts_monotonic
104  output['ts_real'] = obj.transfer.ts_real
105  output['transfer_priority'] = obj.transfer.transfer_priority
106  output['datatype'] = '{}'.format(payload._type)
107  output['fields'] = _to_json_compatible_object_impl(payload)
108 
109  return output
110  else:
112 
113 
114 def _to_yaml_impl(obj, indent_level=0, parent=None, name=None, uavcan_type=None):
115  buf = StringIO()
116 
117  def write(fmt, *args):
118  buf.write((fmt % args) if len(args) else fmt)
119 
120  def indent_newline():
121  buf.write(os.linesep + ' ' * 2 * indent_level)
122 
123  # Decomposing PrimitiveValue to value and type. This is ugly but it's by design...
124  if isinstance(obj, PrimitiveValue):
125  uavcan_type = pyuavcan_v0.get_uavcan_data_type(obj)
126  obj = obj.value
127 
128  # CompoundValue
129  if isinstance(obj, CompoundValue):
130  first_field = True
131 
132  # Rendering all fields than can be rendered
133  for field_name, field in pyuavcan_v0.get_fields(obj).items():
134  if pyuavcan_v0.is_union(obj) and pyuavcan_v0.get_active_union_field(obj) != field_name:
135  continue
136  if isinstance(field, VoidValue):
137  continue
138  if (first_field and indent_level > 0) or not first_field:
139  indent_newline()
140  first_field = False
141  rendered_field = _to_yaml_impl(field, indent_level=indent_level + 1, parent=obj, name=field_name)
142  write('%s: %s', field_name, rendered_field)
143 
144  # Special case - empty non-union struct is rendered as empty map
145  if first_field and not pyuavcan_v0.is_union(obj):
146  if indent_level > 0:
147  indent_newline()
148  write('{}')
149 
150  # ArrayValue
151  elif isinstance(obj, ArrayValue):
152  t = pyuavcan_v0.get_uavcan_data_type(obj)
153  if t.value_type.category == t.value_type.CATEGORY_PRIMITIVE:
154  def is_nice_character(ch):
155  if 32 <= ch <= 126:
156  return True
157  if ch in b'\n\r\t':
158  return True
159  return False
160 
161  as_bytes = '[%s]' % ', '.join([_to_yaml_impl(x, indent_level=indent_level + 1, uavcan_type=t.value_type)
162  for x in obj])
163  if t.is_string_like and all(map(is_nice_character, obj)):
164  write('%r # ', obj.decode())
165  write(as_bytes)
166  else:
167  if len(obj) == 0:
168  write('[]')
169  else:
170  for x in obj:
171  indent_newline()
172  write('- %s', _to_yaml_impl(x, indent_level=indent_level + 1, uavcan_type=t.value_type))
173 
174  # Primitive types
175  elif isinstance(obj, float):
176  assert uavcan_type is not None
177  float_fmt = {
178  16: '%.4f',
179  32: '%.6f',
180  64: '%.9f',
181  }[uavcan_type.bitlen]
182  write(float_fmt, obj)
183  elif isinstance(obj, bool):
184  write('%s', 'true' if obj else 'false')
185  elif isinstance(obj, int):
186  write('%s', obj)
187  if parent is not None and name is not None:
188  resolved_name = value_to_constant_name(parent, name)
189  if isinstance(resolved_name, str):
190  write(' # %s', resolved_name)
191 
192  # Non-printable types
193  elif isinstance(obj, VoidValue):
194  pass
195 
196  # Unknown types
197  else:
198  raise ValueError('Cannot generate YAML representation for %r' % type(obj))
199 
200  return buf.getvalue()
201 
202 
203 def to_yaml(obj):
204  """
205  This function returns correct YAML representation of a UAVCAN structure (message, request, or response), or
206  a DSDL entity (array or primitive), or a UAVCAN transfer, with comments for human benefit.
207  Args:
208  obj: Object to convert.
209 
210  Returns: Unicode string containing YAML representation of the object.
211  """
212  if not isinstance(obj, CompoundValue) and hasattr(obj, 'transfer'):
213  if hasattr(obj, 'message'):
214  payload = obj.message
215  header = 'Message'
216  elif hasattr(obj, 'request'):
217  payload = obj.request
218  header = 'Request'
219  elif hasattr(obj, 'response'):
220  payload = obj.response
221  header = 'Response'
222  else:
223  raise ValueError('Cannot generate YAML representation for %r' % type(obj))
224 
225  prefix = '### %s from %s to %s ts_mono=%.6f ts_real=%.6f\n' % \
226  (header,
227  obj.transfer.source_node_id or 'Anon',
228  obj.transfer.dest_node_id or 'All',
229  obj.transfer.ts_monotonic, obj.transfer.ts_real)
230 
231  return prefix + _to_yaml_impl(payload)
232  else:
233  return _to_yaml_impl(obj)
234 
235 
236 def value_to_constant_name(struct, field_name, keep_literal=False):
237  """
238  This function accepts a UAVCAN struct (message, request, or response), and a field name; and returns
239  the name of constant or bit mask that match the value. If no match could be established, the literal
240  value will be returned as is.
241  Args:
242  struct: UAVCAN struct to work with
243  field_name: Name of the field to work with
244  keep_literal: Whether to include the input integer value in the output string
245 
246  Returns: Name of the constant or flags if match could be detected, otherwise integer as is.
247  """
248  # Extracting constants
249  uavcan_type = pyuavcan_v0.get_uavcan_data_type(struct)
250  if pyuavcan_v0.is_request(struct):
251  consts = uavcan_type.request_constants
252  fields = uavcan_type.request_fields
253  elif pyuavcan_v0.is_response(struct):
254  consts = uavcan_type.response_constants
255  fields = uavcan_type.response_fields
256  else:
257  consts = uavcan_type.constants
258  fields = uavcan_type.fields
259 
260  assert len(fields) > 0
261 
262  # noinspection PyShadowingNames
263  def format_output(name, value, remove_common_prefix):
264  if remove_common_prefix:
265  num_seps = len(field_name.split('_'))
266  parts = name.split('_')[num_seps:]
267  name = '_'.join(parts)
268  return ('%s (%r)' % (name, value)) if keep_literal else name
269 
270  # noinspection PyShadowingNames
271  def match_one_prefix(prefix, value):
272  matches = []
273  for cname, cval in [(x.name, x.value) for x in consts if x.name.lower().startswith(prefix.lower())]:
274  if cval == value:
275  matches.append(cname)
276  # Making sure we found exactly one match, otherwise it's not a correct result
277  if len(matches) == 1:
278  return matches[0]
279 
280  # noinspection PyShadowingNames
281  def match_value(value):
282  # Trying direct match
283  match = match_one_prefix(field_name + '_', value)
284  if match:
285  return format_output(match, value, True)
286 
287  # Trying direct match without the terminal letter if it is 's' (plural). This works for 'flags'.
288  # TODO: this is sketchy.
289  if field_name[-1] == 's':
290  match = match_one_prefix(field_name[:-1] + '_', value)
291  if match:
292  return format_output(match, value, True)
293 
294  # Trying match without prefix, only if there's just one field
295  if len(fields) == 1:
296  match = match_one_prefix('', value)
297  if match:
298  return format_output(match, value, False)
299 
300  # Trying single value first
301  value = getattr(struct, field_name)
302  match = match_value(value)
303  if match:
304  return match
305 
306  # Trying bit masks
307  def extract_powers_of_2(x):
308  i = 1
309  while i <= x:
310  if i & x:
311  yield i
312  i <<= 1
313 
314  matches = []
315  for pow2 in extract_powers_of_2(value):
316  match = match_value(pow2)
317  if match:
318  matches.append(match)
319  else:
320  matches = []
321  break # If at least one couldn't be matched, we're on a wrong track, stop
322  if len(matches) > 0:
323  return ' | '.join(matches)
324 
325  # No match could be found, returning the value as is
326  return value
327 
328 
329 if __name__ == '__main__':
330  # to_yaml()
331  print(to_yaml(pyuavcan_v0.protocol.NodeStatus()))
332 
333  info = pyuavcan_v0.protocol.GetNodeInfo.Response(name='legion')
334  info.hardware_version.certificate_of_authenticity = b'\x01\x02\x03\xff'
335  print(to_yaml(info))
336 
337  lights = pyuavcan_v0.equipment.indication.LightsCommand()
338  lcmd = pyuavcan_v0.equipment.indication.SingleLightCommand(light_id=123)
339  lcmd.color.red = 1
340  lcmd.color.green = 2
341  lcmd.color.blue = 3
342  lights.commands.append(lcmd)
343  lcmd.light_id += 1
344  lights.commands.append(lcmd)
345  print(to_yaml(lights))
346 
347  print(to_yaml(pyuavcan_v0.equipment.power.BatteryInfo()))
348  print(to_yaml(pyuavcan_v0.protocol.param.Empty()))
349 
350  getset = pyuavcan_v0.protocol.param.GetSet.Response()
351  print(to_yaml(getset))
352  pyuavcan_v0.switch_union_field(getset.value, 'empty')
353  print(to_yaml(getset))
354 
355  # value_to_constant_name()
357  pyuavcan_v0.protocol.NodeStatus(mode=pyuavcan_v0.protocol.NodeStatus().MODE_OPERATIONAL),
358  'mode'
359  ))
361  pyuavcan_v0.protocol.NodeStatus(mode=pyuavcan_v0.protocol.NodeStatus().HEALTH_OK),
362  'health'
363  ))
365  pyuavcan_v0.equipment.range_sensor.Measurement(reading_type=pyuavcan_v0.equipment.range_sensor.Measurement()
366  .READING_TYPE_TOO_FAR),
367  'reading_type'
368  ))
370  pyuavcan_v0.protocol.param.ExecuteOpcode.Request(opcode=pyuavcan_v0.protocol.param.ExecuteOpcode.Request().OPCODE_ERASE),
371  'opcode'
372  ))
374  pyuavcan_v0.protocol.file.Error(value=pyuavcan_v0.protocol.file.Error().ACCESS_DENIED),
375  'value'
376  ))
378  pyuavcan_v0.equipment.power.BatteryInfo(status_flags=
379  pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE),
380  'status_flags'
381  ))
383  pyuavcan_v0.equipment.power.BatteryInfo(status_flags=
384  pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE |
385  pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_TEMP_HOT |
386  pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_CHARGED),
387  'status_flags'
388  ))
390  pyuavcan_v0.protocol.AccessCommandShell.Response(flags=
391  pyuavcan_v0.protocol.AccessCommandShell.Response().FLAG_SHELL_ERROR |
392  pyuavcan_v0.protocol.AccessCommandShell.Response().
393  FLAG_HAS_PENDING_STDOUT),
394  'flags'
395  ))
396 
397  # Printing transfers
398  node = pyuavcan_v0.make_node('vcan0', node_id=42)
399  node.request(pyuavcan_v0.protocol.GetNodeInfo.Request(), 100, lambda e: print(to_yaml(e)))
400  node.add_handler(pyuavcan_v0.protocol.NodeStatus, lambda e: print(to_yaml(e)))
401  node.spin()
pyuavcan_v0.introspect.value_to_constant_name
def value_to_constant_name(struct, field_name, keep_literal=False)
Definition: introspect.py:236
pyuavcan_v0.introspect.to_json_compatible_object
def to_json_compatible_object(obj)
Definition: introspect.py:77
pyuavcan_v0.introspect._to_json_compatible_object_impl
def _to_json_compatible_object_impl(obj)
Definition: introspect.py:21
pyuavcan_v0.introspect._to_yaml_impl
def _to_yaml_impl(obj, indent_level=0, parent=None, name=None, uavcan_type=None)
Definition: introspect.py:114
pyuavcan_v0.introspect.to_yaml
def to_yaml(obj)
Definition: introspect.py:203
pyuavcan_v0.transport
Definition: transport.py:1
test.format
format
Definition: dsdl/test.py:6


uavcan_communicator
Author(s):
autogenerated on Fri Dec 13 2024 03:10:02