driver/common.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
3 #
4 # This software is distributed under the terms of the MIT License.
5 #
6 # Author: Ben Dyer <ben_dyer@mac.com>
7 # Pavel Kirienko <pavel.kirienko@zubax.com>
8 #
9 
10 import time
11 import sys
12 from logging import getLogger
13 from .. import UAVCANException
14 
15 
16 logger = getLogger(__name__)
17 
18 
19 class DriverError(UAVCANException):
20  pass
21 
22 
24  pass
25 
26 
27 class CANFrame:
28  MAX_DATA_LENGTH = 8
29 
30  def __init__(self, can_id, data, extended, ts_monotonic=None, ts_real=None):
31  self.id = can_id
32  self.data = data
33  self.extended = extended
34  self.ts_monotonic = ts_monotonic or time.monotonic()
35  self.ts_real = ts_real or time.time()
36 
37  def __str__(self):
38  if sys.version_info[0] > 2:
39  b2int = lambda x: x
40  else:
41  b2int = ord
42 
43  id_str = ('%0*x' % (8 if self.extended else 3, self.id)).rjust(8)
44  hex_data = ' '.join(['%02x' % b2int(x) for x in self.data]).ljust(3 * self.MAX_DATA_LENGTH)
45  ascii_data = ''.join([(chr(x) if 32 <= x <= 126 else '.') for x in self.data])
46 
47  return "%12.6f %12.6f %s %s '%s'" % \
48  (self.ts_monotonic, self.ts_real, id_str, hex_data, ascii_data)
49 
50  __repr__ = __str__
51 
52 
53 class AbstractDriver(object):
54  FRAME_DIRECTION_INCOMING = 'rx'
55  FRAME_DIRECTION_OUTGOING = 'tx'
56 
57  class HookRemover:
58  def __init__(self, remover):
59  self.remove = remover
60 
61  def __init__(self):
62  self._io_hooks = []
63 
64  def add_io_hook(self, hook):
65  """
66  Args:
67  hook: This hook will be invoked for every incoming and outgoing CAN frame.
68  Hook arguments: (direction, frame)
69  See FRAME_DIRECTION_*, CANFrame.
70  """
71  def proxy(*args):
72  hook(*args)
73 
74  self._io_hooks.append(proxy)
75 
76  return self.HookRemover(lambda: self._io_hooks.remove(proxy))
77 
78  def _call_io_hooks(self, direction, frame):
79  for h in self._io_hooks:
80  try:
81  h(direction, frame)
82  except Exception as ex:
83  logger.error('Uncaught exception from CAN IO hook: %r', ex, exc_info=True)
84 
85  def _tx_hook(self, frame):
87 
88  def _rx_hook(self, frame):
def _call_io_hooks(self, direction, frame)
def __init__(self, can_id, data, extended, ts_monotonic=None, ts_real=None)


uavcan_communicator
Author(s):
autogenerated on Wed Jan 11 2023 03:59:39