10 from __future__
import division, absolute_import, print_function, unicode_literals
15 from io
import StringIO
18 from StringIO
import StringIO
23 if isinstance(obj, PrimitiveValue):
27 if isinstance(obj, CompoundValue):
29 for field_name, field
in pyuavcan_v0.get_fields(obj).items():
30 if pyuavcan_v0.is_union(obj)
and pyuavcan_v0.get_active_union_field(obj) != field_name:
32 if isinstance(field, VoidValue):
39 elif isinstance(obj, ArrayValue):
40 t = pyuavcan_v0.get_uavcan_data_type(obj)
41 if t.value_type.category == t.value_type.CATEGORY_PRIMITIVE:
42 def is_nice_character(ch):
43 if ch.is_printable()
or ch.isspace():
50 if t.is_string_like
and all(map(is_nice_character, obj)):
61 elif isinstance(obj, float):
63 elif isinstance(obj, bool):
65 elif isinstance(obj, int):
69 elif isinstance(obj, VoidValue):
74 raise ValueError(
'Cannot generate JSON-compatible object representation for %r' % type(obj))
79 This function returns a representation of a UAVCAN structure (message, request, or response), or
80 a DSDL entity (array or primitive), or a UAVCAN transfer, as a structure easily able to be
81 transformed into json or json-like serialization
83 obj: Object to convert.
85 Returns: structure which can easily be transformed into a json-like serialization
87 if not isinstance(obj, CompoundValue)
and hasattr(obj,
'transfer'):
89 if hasattr(obj,
'message'):
91 output[
'transfer_type'] =
'Message'
92 elif hasattr(obj,
'request'):
94 output[
'transfer_type'] =
'Request'
95 elif hasattr(obj,
'response'):
96 payload = obj.response
97 output[
'transfer_type'] =
'Response'
99 raise ValueError(
'Cannot generate JSON-compatible object representation for %r' % type(obj))
101 output[
'source_node_id'] = obj.transfer.source_node_id
102 output[
'dest_node_id'] = obj.transfer.dest_node_id
103 output[
'ts_monotonic'] = obj.transfer.ts_monotonic
104 output[
'ts_real'] = obj.transfer.ts_real
105 output[
'transfer_priority'] = obj.transfer.transfer_priority
106 output[
'datatype'] =
'{}'.
format(payload._type)
114 def _to_yaml_impl(obj, indent_level=0, parent=None, name=None, uavcan_type=None):
117 def write(fmt, *args):
118 buf.write((fmt % args)
if len(args)
else fmt)
120 def indent_newline():
121 buf.write(os.linesep +
' ' * 2 * indent_level)
124 if isinstance(obj, PrimitiveValue):
125 uavcan_type = pyuavcan_v0.get_uavcan_data_type(obj)
129 if isinstance(obj, CompoundValue):
133 for field_name, field
in pyuavcan_v0.get_fields(obj).items():
134 if pyuavcan_v0.is_union(obj)
and pyuavcan_v0.get_active_union_field(obj) != field_name:
136 if isinstance(field, VoidValue):
138 if (first_field
and indent_level > 0)
or not first_field:
141 rendered_field =
_to_yaml_impl(field, indent_level=indent_level + 1, parent=obj, name=field_name)
142 write(
'%s: %s', field_name, rendered_field)
145 if first_field
and not pyuavcan_v0.is_union(obj):
151 elif isinstance(obj, ArrayValue):
152 t = pyuavcan_v0.get_uavcan_data_type(obj)
153 if t.value_type.category == t.value_type.CATEGORY_PRIMITIVE:
154 def is_nice_character(ch):
161 as_bytes =
'[%s]' %
', '.join([
_to_yaml_impl(x, indent_level=indent_level + 1, uavcan_type=t.value_type)
163 if t.is_string_like
and all(map(is_nice_character, obj)):
164 write(
'%r # ', obj.decode())
172 write(
'- %s',
_to_yaml_impl(x, indent_level=indent_level + 1, uavcan_type=t.value_type))
175 elif isinstance(obj, float):
176 assert uavcan_type
is not None
181 }[uavcan_type.bitlen]
182 write(float_fmt, obj)
183 elif isinstance(obj, bool):
184 write(
'%s',
'true' if obj
else 'false')
185 elif isinstance(obj, int):
187 if parent
is not None and name
is not None:
189 if isinstance(resolved_name, str):
190 write(
' # %s', resolved_name)
193 elif isinstance(obj, VoidValue):
198 raise ValueError(
'Cannot generate YAML representation for %r' % type(obj))
200 return buf.getvalue()
205 This function returns correct YAML representation of a UAVCAN structure (message, request, or response), or
206 a DSDL entity (array or primitive), or a UAVCAN transfer, with comments for human benefit.
208 obj: Object to convert.
210 Returns: Unicode string containing YAML representation of the object.
212 if not isinstance(obj, CompoundValue)
and hasattr(obj,
'transfer'):
213 if hasattr(obj,
'message'):
214 payload = obj.message
216 elif hasattr(obj,
'request'):
217 payload = obj.request
219 elif hasattr(obj,
'response'):
220 payload = obj.response
223 raise ValueError(
'Cannot generate YAML representation for %r' % type(obj))
225 prefix =
'### %s from %s to %s ts_mono=%.6f ts_real=%.6f\n' % \
227 obj.transfer.source_node_id
or 'Anon',
228 obj.transfer.dest_node_id
or 'All',
229 obj.transfer.ts_monotonic, obj.transfer.ts_real)
238 This function accepts a UAVCAN struct (message, request, or response), and a field name; and returns
239 the name of constant or bit mask that match the value. If no match could be established, the literal
240 value will be returned as is.
242 struct: UAVCAN struct to work with
243 field_name: Name of the field to work with
244 keep_literal: Whether to include the input integer value in the output string
246 Returns: Name of the constant or flags if match could be detected, otherwise integer as is.
249 uavcan_type = pyuavcan_v0.get_uavcan_data_type(struct)
250 if pyuavcan_v0.is_request(struct):
251 consts = uavcan_type.request_constants
252 fields = uavcan_type.request_fields
253 elif pyuavcan_v0.is_response(struct):
254 consts = uavcan_type.response_constants
255 fields = uavcan_type.response_fields
257 consts = uavcan_type.constants
258 fields = uavcan_type.fields
260 assert len(fields) > 0
263 def format_output(name, value, remove_common_prefix):
264 if remove_common_prefix:
265 num_seps = len(field_name.split(
'_'))
266 parts = name.split(
'_')[num_seps:]
267 name =
'_'.join(parts)
268 return (
'%s (%r)' % (name, value))
if keep_literal
else name
271 def match_one_prefix(prefix, value):
273 for cname, cval
in [(x.name, x.value)
for x
in consts
if x.name.lower().startswith(prefix.lower())]:
275 matches.append(cname)
277 if len(matches) == 1:
281 def match_value(value):
283 match = match_one_prefix(field_name +
'_', value)
285 return format_output(match, value,
True)
289 if field_name[-1] ==
's':
290 match = match_one_prefix(field_name[:-1] +
'_', value)
292 return format_output(match, value,
True)
296 match = match_one_prefix(
'', value)
298 return format_output(match, value,
False)
301 value = getattr(struct, field_name)
302 match = match_value(value)
307 def extract_powers_of_2(x):
315 for pow2
in extract_powers_of_2(value):
316 match = match_value(pow2)
318 matches.append(match)
323 return ' | '.join(matches)
329 if __name__ ==
'__main__':
331 print(
to_yaml(pyuavcan_v0.protocol.NodeStatus()))
333 info = pyuavcan_v0.protocol.GetNodeInfo.Response(name=
'legion')
334 info.hardware_version.certificate_of_authenticity = b
'\x01\x02\x03\xff'
337 lights = pyuavcan_v0.equipment.indication.LightsCommand()
338 lcmd = pyuavcan_v0.equipment.indication.SingleLightCommand(light_id=123)
342 lights.commands.append(lcmd)
344 lights.commands.append(lcmd)
347 print(
to_yaml(pyuavcan_v0.equipment.power.BatteryInfo()))
348 print(
to_yaml(pyuavcan_v0.protocol.param.Empty()))
350 getset = pyuavcan_v0.protocol.param.GetSet.Response()
352 pyuavcan_v0.switch_union_field(getset.value,
'empty')
357 pyuavcan_v0.protocol.NodeStatus(mode=pyuavcan_v0.protocol.NodeStatus().MODE_OPERATIONAL),
361 pyuavcan_v0.protocol.NodeStatus(mode=pyuavcan_v0.protocol.NodeStatus().HEALTH_OK),
365 pyuavcan_v0.equipment.range_sensor.Measurement(reading_type=pyuavcan_v0.equipment.range_sensor.Measurement()
366 .READING_TYPE_TOO_FAR),
370 pyuavcan_v0.protocol.param.ExecuteOpcode.Request(opcode=pyuavcan_v0.protocol.param.ExecuteOpcode.Request().OPCODE_ERASE),
374 pyuavcan_v0.protocol.file.Error(value=pyuavcan_v0.protocol.file.Error().ACCESS_DENIED),
378 pyuavcan_v0.equipment.power.BatteryInfo(status_flags=
379 pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE),
383 pyuavcan_v0.equipment.power.BatteryInfo(status_flags=
384 pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE |
385 pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_TEMP_HOT |
386 pyuavcan_v0.equipment.power.BatteryInfo().STATUS_FLAG_CHARGED),
390 pyuavcan_v0.protocol.AccessCommandShell.Response(flags=
391 pyuavcan_v0.protocol.AccessCommandShell.Response().FLAG_SHELL_ERROR |
392 pyuavcan_v0.protocol.AccessCommandShell.Response().
393 FLAG_HAS_PENDING_STDOUT),
398 node = pyuavcan_v0.make_node(
'vcan0', node_id=42)
399 node.request(pyuavcan_v0.protocol.GetNodeInfo.Request(), 100,
lambda e: print(
to_yaml(e)))
400 node.add_handler(pyuavcan_v0.protocol.NodeStatus,
lambda e: print(
to_yaml(e)))