slcan.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
3 #
4 # This software is distributed under the terms of the MIT License.
5 #
6 # Author: Ben Dyer <ben_dyer@mac.com>
7 # Pavel Kirienko <pavel.kirienko@zubax.com>
8 #
9 
10 from __future__ import division, absolute_import, print_function, unicode_literals
11 import os
12 import sys
13 import time
14 import inspect
15 import binascii
16 import select
17 import multiprocessing
18 import threading
19 import copy
20 from logging import getLogger
21 from .common import DriverError, TxQueueFullError, CANFrame, AbstractDriver
22 from .timestamp_estimator import TimestampEstimator
23 
24 try:
25  import queue
26 except ImportError:
27  # noinspection PyPep8Naming,PyUnresolvedReferences
28  import Queue as queue
29 
30 logger = getLogger(__name__)
31 
32 # If PySerial isn't available, we can't support SLCAN
33 try:
34  import serial
35 except ImportError:
36  serial = None
37  logger.info("Cannot import PySerial; SLCAN will not be available.")
38 
39 try:
40  # noinspection PyUnresolvedReferences
41  sys.getwindowsversion()
42  RUNNING_ON_WINDOWS = True
43 except AttributeError:
44  RUNNING_ON_WINDOWS = False
45 
46 
47 #
48 # Constants and defaults
49 #
50 if 'darwin' in sys.platform:
51  RX_QUEUE_SIZE = 32767 # http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767
52 else:
53  RX_QUEUE_SIZE = 1000000
54 
55 TX_QUEUE_SIZE = 1000
56 
57 TIMESTAMP_OVERFLOW_PERIOD = 60 # Defined by SLCAN protocol
58 
59 DEFAULT_BITRATE = 1000000
60 DEFAULT_BAUDRATE = 3000000
61 
62 ACK_TIMEOUT = 0.5
63 ACK = b'\r'
64 NACK = b'\x07'
65 CLI_END_OF_LINE = b'\r\n'
66 CLI_END_OF_TEXT = b'\x03'
67 
68 DEFAULT_MAX_ADAPTER_CLOCK_RATE_ERROR_PPM = 200 # Suits virtually all adapters
69 DEFAULT_FIXED_RX_DELAY = 0.0002 # Good for USB, could be higher for UART
70 DEFAULT_MAX_ESTIMATED_RX_DELAY_TO_RESYNC = 0.1 # When clock divergence exceeds this value, resync
71 
72 IO_PROCESS_INIT_TIMEOUT = 10
73 IO_PROCESS_NICENESS_INCREMENT = -18
74 
75 MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP = 1000
76 
77 
78 #
79 # IPC entities
80 #
81 IPC_SIGNAL_INIT_OK = 'init_ok' # Sent from IO process to the parent process when init is done
82 IPC_COMMAND_STOP = 'stop' # Sent from parent process to the IO process when it's time to exit
83 
84 
86  DEFAULT_TIMEOUT = 1
87 
88  def __init__(self, command, timeout=None):
89  if isinstance(command, bytes):
90  command = command.decode('utf8')
91  self.command = command.lstrip()
92  self.monotonic_deadline = time.monotonic() + (timeout or self.DEFAULT_TIMEOUT)
93 
94  @property
95  def expired(self):
96  return time.monotonic() >= self.monotonic_deadline
97 
98 
100  def __init__(self, command, lines=None, expired=False):
101  def try_decode(what):
102  if isinstance(what, bytes):
103  return what.decode('utf8')
104  return what
105 
106  self.command = try_decode(command)
107  self.lines = [try_decode(ln) for ln in (lines or [])]
108  self.expired = expired
109 
110  def __str__(self):
111  if not self.expired:
112  return '%r %r' % (self.command, self.lines)
113  else:
114  return '%r EXPIRED' % self.command
115 
116  __repr__ = __str__
117 
118 
119 _pending_command_line_execution_requests = queue.Queue()
120 
121 
122 #
123 # Logic of the IO process
124 #
125 class RxWorker:
126  PY2_COMPAT = sys.version_info[0] < 3
127  SELECT_TIMEOUT = 0.1
128  READ_BUFFER_SIZE = 1024 * 8 # Arbitrary large number
129 
130  def __init__(self, conn, rx_queue, ts_estimator_mono, ts_estimator_real, termination_condition):
131  self._conn = conn
132  self._output_queue = rx_queue
133  self._ts_estimator_mono = ts_estimator_mono
134  self._ts_estimator_real = ts_estimator_real
135  self._termination_condition = termination_condition
136 
137  if RUNNING_ON_WINDOWS:
138  # select() doesn't work on serial ports under Windows, so we have to resort to workarounds. :(
139  self._conn.timeout = self.SELECT_TIMEOUT
140  else:
141  self._conn.timeout = 0
142 
143  def _read_port(self):
144  if RUNNING_ON_WINDOWS:
145  data = self._conn.read(max(1, self._conn.inWaiting()))
146  # Timestamping as soon as possible after unblocking
147  ts_mono = time.monotonic()
148  ts_real = time.time()
149  else:
150  select.select([self._conn.fileno()], [], [], self.SELECT_TIMEOUT)
151  # Timestamping as soon as possible after unblocking
152  ts_mono = time.monotonic()
153  ts_real = time.time()
154  # Read as much data as possible in order to avoid RX overrun
155  data = self._conn.read(self.READ_BUFFER_SIZE)
156  return data, ts_mono, ts_real
157 
158  def _process_slcan_line(self, line, local_ts_mono, local_ts_real):
159  line = line.strip().strip(NACK).strip(CLI_END_OF_TEXT)
160  line_len = len(line)
161 
162  if line_len < 1:
163  return
164 
165  # Checking the header, ignore all irrelevant lines
166  if line[0] == b'T'[0]:
167  id_len = 8
168  elif line[0] == b't'[0]:
169  id_len = 3
170  else:
171  return
172 
173  # Parsing ID and DLC
174  packet_id = int(line[1:1 + id_len], 16)
175  if self.PY2_COMPAT:
176  packet_len = int(line[1 + id_len]) # This version is horribly slow
177  else:
178  packet_len = line[1 + id_len] - 48 # Py3 version is faster
179 
180  if packet_len > 8 or packet_len < 0:
181  raise DriverError('Invalid packet length')
182 
183  # Parsing the payload, detecting timestamp
184  # <type> <id> <dlc> <data> [timestamp]
185  # 1 3|8 1 packet_len * 2 [4]
186  with_timestamp = line_len > (2 + id_len + packet_len * 2)
187 
188  packet_data = binascii.a2b_hex(line[2 + id_len:2 + id_len + packet_len * 2])
189 
190  # Handling the timestamp, if present
191  if with_timestamp:
192  ts_hardware = int(line[-4:], 16) * 1e-3
193  ts_mono = self._ts_estimator_mono.update(ts_hardware, local_ts_mono)
194  ts_real = self._ts_estimator_real.update(ts_hardware, local_ts_real)
195  else:
196  ts_mono = local_ts_mono
197  ts_real = local_ts_real
198 
199  frame = CANFrame(packet_id, packet_data, (id_len == 8), ts_monotonic=ts_mono, ts_real=ts_real)
200  self._output_queue.put_nowait(frame)
201 
202  def _process_many_slcan_lines(self, lines, ts_mono, ts_real):
203  for slc in lines:
204  # noinspection PyBroadException
205  try:
206  self._process_slcan_line(slc, local_ts_mono=ts_mono, local_ts_real=ts_real)
207  except Exception:
208  logger.error('Could not process SLCAN line %r', slc, exc_info=True)
209 
210  # noinspection PyBroadException
211  def run(self):
212  logger.info('RX worker started')
213 
214  successive_errors = 0
215  data = bytes()
216 
217  outstanding_command = None
218  outstanding_command_response_lines = []
219 
220  while not self._termination_condition():
221  try:
222  new_data, ts_mono, ts_real = self._read_port()
223  data += new_data
224 
225  # Checking the command queue and handling command timeouts
226  while True:
227  if outstanding_command is None:
228  try:
229  outstanding_command = _pending_command_line_execution_requests.get_nowait()
230  outstanding_command_response_lines = []
231  except queue.Empty:
232  break
233 
234  if outstanding_command.expired:
235  self._output_queue.put(IPCCommandLineExecutionResponse(outstanding_command.command,
236  expired=True))
237  outstanding_command = None
238  else:
239  break
240 
241  # Processing in normal mode if there's no outstanding command; using much slower CLI mode otherwise
242  if outstanding_command is None:
243  slcan_lines = data.split(ACK)
244  slcan_lines, data = slcan_lines[:-1], slcan_lines[-1]
245 
246  self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)
247 
248  del slcan_lines
249  else:
250  # TODO This branch contains dirty and poorly tested code. Refactor once the protocol matures.
251  split_lines = data.split(CLI_END_OF_LINE)
252  split_lines, data = split_lines[:-1], split_lines[-1]
253 
254  # Processing the mix of SLCAN and CLI lines
255  for ln in split_lines:
256  tmp = ln.split(ACK)
257  slcan_lines, cli_line = tmp[:-1], tmp[-1]
258 
259  self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)
260 
261  # Processing the CLI line
262  logger.debug('Processing CLI response line %r as...', cli_line)
263  if len(outstanding_command_response_lines) == 0:
264  if outstanding_command is not None and \
265  cli_line == outstanding_command.command.encode('utf8'):
266  logger.debug('...echo')
267  outstanding_command_response_lines.append(cli_line)
268  else:
269  # Otherwise we're receiving some CLI garbage before or after the command output, e.g.
270  # end of the previous command output if it was missed
271  logger.debug('...garbage')
272  else:
273  if cli_line == CLI_END_OF_TEXT:
274  logger.debug('...end-of-text')
275  # Shipping
276  response = IPCCommandLineExecutionResponse(outstanding_command.command,
277  lines=outstanding_command_response_lines[1:])
278  self._output_queue.put(response)
279  # Immediately fetching the next command, expiration is not checked
280  try:
281  outstanding_command = _pending_command_line_execution_requests.get_nowait()
282  except queue.Empty:
283  outstanding_command = None
284  outstanding_command_response_lines = []
285  else:
286  logger.debug('...mid response')
287  outstanding_command_response_lines.append(cli_line)
288 
289  del split_lines
290 
291  # The remainder may contain SLCAN and CLI lines as well;
292  # there is no reason not to process SLCAN ones immediately.
293  # The last byte could be beginning of an \r\n sequence, so it's excluded from parsing.
294  data, last_byte = data[:-1], data[-1:]
295  slcan_lines = data.split(ACK)
296  slcan_lines, data = slcan_lines[:-1], slcan_lines[-1] + last_byte
297 
298  self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)
299 
300  successive_errors = 0
301  except Exception as ex:
302  # TODO: handle the case when the port is closed
303  logger.error('RX thread error %d of %d',
304  successive_errors, MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP, exc_info=True)
305 
306  try:
307  self._output_queue.put_nowait(ex)
308  except Exception:
309  pass
310 
311  successive_errors += 1
312  if successive_errors >= MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP:
313  break
314 
315  logger.info('RX worker is stopping')
316 
317 
318 class TxWorker:
319  QUEUE_BLOCK_TIMEOUT = 0.1
320 
321  def __init__(self, conn, rx_queue, tx_queue, termination_condition):
322  self._conn = conn
323  self._rx_queue = rx_queue
324  self._tx_queue = tx_queue
325  self._termination_condition = termination_condition
326 
327  def _send_frame(self, frame):
328  line = '%s%d%s\r' % (('T%08X' if frame.extended else 't%03X') % frame.id,
329  len(frame.data),
330  binascii.b2a_hex(frame.data).decode('ascii'))
331 
332  self._conn.write(line.encode('ascii'))
333  self._conn.flush()
334 
335  def _execute_command(self, command):
336  logger.info('Executing command line %r', command.command)
337  # It is extremely important to write into the queue first, in order to make the RX worker expect the response!
338  _pending_command_line_execution_requests.put(command)
339  self._conn.write(command.command.encode('ascii') + CLI_END_OF_LINE)
340  self._conn.flush()
341 
342  def run(self):
343  while True:
344  try:
345  command = self._tx_queue.get(True, self.QUEUE_BLOCK_TIMEOUT)
346 
347  if isinstance(command, CANFrame):
348  self._send_frame(command)
349  elif isinstance(command, IPCCommandLineExecutionRequest):
350  self._execute_command(command)
351  elif command == IPC_COMMAND_STOP:
352  break
353  else:
354  raise DriverError('IO process received unknown IPC command: %r' % command)
355  except queue.Empty:
356  # Checking in this handler in order to avoid interference with traffic
357  if self._termination_condition():
358  break
359  except Exception as ex:
360  logger.error('TX thread exception', exc_info=True)
361  # Propagating the exception to the parent process
362  # noinspection PyBroadException
363  try:
364  self._rx_queue.put_nowait(ex)
365  except Exception:
366  pass
367 
368 
369 # noinspection PyUnresolvedReferences
371  if RUNNING_ON_WINDOWS:
372  import win32api
373  import win32process
374  import win32con
375  handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, win32api.GetCurrentProcessId())
376  win32process.SetPriorityClass(handle, win32process.REALTIME_PRIORITY_CLASS)
377  else:
378  import os
379  os.nice(IO_PROCESS_NICENESS_INCREMENT)
380 
381 
382 def _init_adapter(conn, bitrate):
383  def wait_for_ack():
384  logger.info('Init: Waiting for ACK...')
385  conn.timeout = ACK_TIMEOUT
386  while True:
387  b = conn.read(1)
388  if not b:
389  raise DriverError('SLCAN ACK timeout')
390  if b == NACK:
391  raise DriverError('SLCAN NACK in response')
392  if b == ACK:
393  break
394  logger.info('Init: Ignoring byte %r while waiting for ACK', b)
395 
396  def send_command(cmd):
397  logger.info('Init: Sending command %r', cmd)
398  conn.write(cmd + b'\r')
399 
400  speed_code = {
401  1000000: 8,
402  800000: 7,
403  500000: 6,
404  250000: 5,
405  125000: 4,
406  100000: 3,
407  50000: 2,
408  20000: 1,
409  10000: 0
410  }[bitrate if bitrate is not None else DEFAULT_BITRATE]
411 
412  num_retries = 3
413  while True:
414  try:
415  # Sending an empty command in order to reset the adapter's command parser, then discarding all output
416  send_command(b'')
417  try:
418  wait_for_ack()
419  except DriverError:
420  pass
421  time.sleep(0.1)
422  conn.flushInput()
423 
424  # Making sure the channel is closed - some adapters may refuse to re-open if the channel is already open
425  send_command(b'C')
426  try:
427  wait_for_ack()
428  except DriverError:
429  pass
430 
431  # Setting speed code
432  send_command(('S%d' % speed_code).encode())
433  conn.flush()
434  wait_for_ack()
435 
436  # Opening the channel
437  send_command(b'O')
438  conn.flush()
439  wait_for_ack()
440 
441  # Clearing error flags
442  send_command(b'F')
443  conn.flush()
444  try:
445  wait_for_ack()
446  except DriverError as ex:
447  logger.warning('Init: Could not clear error flags (command not supported by the CAN adapter?): %s', ex)
448  except Exception as ex:
449  if num_retries > 0:
450  logger.error('Could not init SLCAN adapter, will retry; error was: %s', ex, exc_info=True)
451  else:
452  raise ex
453  num_retries -= 1
454  else:
455  break
456 
457  # Discarding all input again
458  time.sleep(0.1)
459  conn.flushInput()
460 
461 
462 def _stop_adapter(conn):
463  conn.write(b'C\r' * 10)
464  conn.flush()
465 
466 
467 # noinspection PyBroadException
468 def _io_process(device,
469  tx_queue,
470  rx_queue,
471  log_queue,
472  parent_pid,
473  bitrate=None,
474  baudrate=None,
475  max_adapter_clock_rate_error_ppm=None,
476  fixed_rx_delay=None,
477  max_estimated_rx_delay_to_resync=None):
478  try:
479  # noinspection PyUnresolvedReferences
480  from logging.handlers import QueueHandler
481  except ImportError:
482  pass # Python 2.7, no logging for you
483  else:
484  getLogger().addHandler(QueueHandler(log_queue))
485  getLogger().setLevel('INFO')
486 
487  logger.info('IO process started with PID %r', os.getpid())
488 
489  # We don't need stdin
490  try:
491  stdin_fileno = sys.stdin.fileno()
492  sys.stdin.close()
493  os.close(stdin_fileno)
494  except Exception:
495  pass
496 
497  def is_parent_process_alive():
498  if RUNNING_ON_WINDOWS:
499  return True # TODO: Find a working solution for Windows (os.kill(ppid, 0) doesn't work)
500  else:
501  return os.getppid() == parent_pid
502 
503  try:
505  except Exception as ex:
506  logger.info('Could not adjust priority of the IO process: %r', ex)
507 
508  #
509  # This is needed to convert timestamps from hardware clock to local clocks
510  #
511  if max_adapter_clock_rate_error_ppm is None:
512  max_adapter_clock_rate_error = DEFAULT_MAX_ADAPTER_CLOCK_RATE_ERROR_PPM / 1e6
513  else:
514  max_adapter_clock_rate_error = max_adapter_clock_rate_error_ppm / 1e6
515 
516  fixed_rx_delay = fixed_rx_delay if fixed_rx_delay is not None else DEFAULT_FIXED_RX_DELAY
517  max_estimated_rx_delay_to_resync = max_estimated_rx_delay_to_resync or DEFAULT_MAX_ESTIMATED_RX_DELAY_TO_RESYNC
518 
519  ts_estimator_mono = TimestampEstimator(max_rate_error=max_adapter_clock_rate_error,
520  source_clock_overflow_period=TIMESTAMP_OVERFLOW_PERIOD,
521  fixed_delay=fixed_rx_delay,
522  max_phase_error_to_resync=max_estimated_rx_delay_to_resync)
523  ts_estimator_real = copy.deepcopy(ts_estimator_mono)
524 
525  #
526  # Preparing the RX thread
527  #
528  should_exit = False
529 
530  def rx_thread_wrapper():
531  rx_worker = RxWorker(conn=conn,
532  rx_queue=rx_queue,
533  ts_estimator_mono=ts_estimator_mono,
534  ts_estimator_real=ts_estimator_real,
535  termination_condition=lambda: should_exit)
536  try:
537  rx_worker.run()
538  except Exception as ex:
539  logger.error('RX thread failed, exiting', exc_info=True)
540  # Propagating the exception to the parent process
541  rx_queue.put(ex)
542 
543  rxthd = threading.Thread(target=rx_thread_wrapper, name='slcan_rx')
544  rxthd.daemon = True
545 
546  try:
547  conn = serial.Serial(device, baudrate or DEFAULT_BAUDRATE)
548  except Exception as ex:
549  logger.error('Could not open port', exc_info=True)
550  rx_queue.put(ex)
551  return
552 
553  #
554  # Actual work is here
555  #
556  try:
557  _init_adapter(conn, bitrate)
558 
559  rxthd.start()
560 
561  logger.info('IO process initialization complete')
562  rx_queue.put(IPC_SIGNAL_INIT_OK)
563 
564  tx_worker = TxWorker(conn=conn,
565  rx_queue=rx_queue,
566  tx_queue=tx_queue,
567  termination_condition=lambda: (should_exit or
568  not rxthd.is_alive() or
569  not is_parent_process_alive()))
570  tx_worker.run()
571  except Exception as ex:
572  logger.error('IO process failed', exc_info=True)
573  rx_queue.put(ex)
574  finally:
575  logger.info('IO process is terminating...')
576  should_exit = True
577  if rxthd.is_alive():
578  rxthd.join()
579 
580  _stop_adapter(conn)
581  conn.close()
582  logger.info('IO process is now ready to die, goodbye')
583 
584 
585 #
586 # Logic of the main process
587 #
589  """
590  Driver for SLCAN-compatible CAN bus adapters, with extension to support CLI commands.
591 
592  Some info on SLCAN can be found here:
593  - Linux tree: drivers/net/can/slcan.c (http://lxr.free-electrons.com/source/drivers/net/can/slcan.c)
594  - https://files.zubax.com/docs/Generic_SLCAN_API.pdf
595  - http://www.can232.com/docs/canusb_manual.pdf
596  - http://www.fischl.de/usbtin/
597 
598  The CLI extension allows to execute arbitrary CLI commands on the adapter. The commands differ from regular SLCAN
599  exchange in the following ways:
600  - CLI commands are echoed back.
601  - Every output line of a CLI command, including echo, is terminated with CR LF (\r\n).
602  - After the last line follows the ASCII End Of Text character (ETX, ^C, ASCII code 0x03) on a separate
603  line (terminated with CR LF).
604  - CLI commands must not begin with whitespace characters.
605  Example:
606  Input command "stat\r\n" may produce the following output lines:
607  - Echo: "stat\r\n"
608  - Data: "First line\r\n", "Second line\r\n", ...
609  - End Of Text marker: "\x03\r\n"
610  Refer to https://kb.zubax.com for more info.
611  """
612 
613  def __init__(self, device_name, **kwargs):
614  if not serial:
615  raise RuntimeError("PySerial not imported; SLCAN is not available. Please install PySerial.")
616 
617  super(SLCAN, self).__init__()
618 
619  self._stopping = False
620 
621  self._rx_queue = multiprocessing.Queue(maxsize=RX_QUEUE_SIZE)
622  self._tx_queue = multiprocessing.Queue(maxsize=TX_QUEUE_SIZE)
623  self._log_queue = multiprocessing.Queue()
624 
625  self._cli_command_requests = [] # List of tuples: (command, callback)
626 
627  # https://docs.python.org/3/howto/logging-cookbook.html
628  self._logging_thread = threading.Thread(target=self._logging_proxy_loop, name='slcan_log_proxy')
629  self._logging_thread.daemon = True
630 
631  # Removing all unused stuff, because it breaks inter process communications.
632  kwargs = copy.copy(kwargs)
633  keep_keys = inspect.getargspec(_io_process).args
634  for key in list(kwargs.keys()):
635  if key not in keep_keys:
636  del kwargs[key]
637 
638  kwargs['rx_queue'] = self._rx_queue
639  kwargs['tx_queue'] = self._tx_queue
640  kwargs['log_queue'] = self._log_queue
641  kwargs['parent_pid'] = os.getpid()
642 
643  self._proc = multiprocessing.Process(target=_io_process, name='slcan_io_process',
644  args=(device_name,), kwargs=kwargs)
645  self._proc.daemon = True
646  self._proc.start()
647 
648  # The logging thread should be started immediately AFTER the IO process is started
649  self._logging_thread.start()
650 
651  deadline = time.monotonic() + IO_PROCESS_INIT_TIMEOUT
652  while True:
653  try:
654  sig = self._rx_queue.get(timeout=IO_PROCESS_INIT_TIMEOUT)
655  if sig == IPC_SIGNAL_INIT_OK:
656  break
657  if isinstance(sig, Exception):
658  self._tx_queue.put(IPC_COMMAND_STOP, timeout=IO_PROCESS_INIT_TIMEOUT)
659  raise sig
660  except queue.Empty:
661  pass
662  if time.monotonic() > deadline:
663  self._tx_queue.put(IPC_COMMAND_STOP, timeout=IO_PROCESS_INIT_TIMEOUT)
664  raise DriverError('IO process did not confirm initialization')
665 
666  self._check_alive()
667 
668  # noinspection PyBroadException
670  while self._proc.is_alive() and not self._stopping:
671  try:
672  try:
673  record = self._log_queue.get(timeout=0.5)
674  except queue.Empty:
675  continue
676  getLogger(record.name).handle(record)
677  except Exception as ex:
678  try:
679  print('SLCAN logging proxy failed:', ex, file=sys.stderr)
680  except Exception:
681  pass
682 
683  logger.info('Logging proxy thread is stopping')
684 
685  def close(self):
686  if self._proc.is_alive():
687  self._tx_queue.put(IPC_COMMAND_STOP)
688  self._proc.join(10)
689  # Sometimes the child process stucks at exit, this is a workaround
690  if self._proc.is_alive() or self._proc.exitcode is None:
691  logger.warning('IO process refused to exit and will be terminated')
692  try:
693  self._proc.terminate()
694  except Exception as ex:
695  logger.error('Failed to terminate the IO process [%r]', ex, exc_info=True)
696  try:
697  if self._proc.is_alive():
698  logger.error('IO process refused to terminate, escalating to SIGKILL')
699  import signal
700  os.kill(self._proc.pid, signal.SIGKILL)
701  except Exception as ex:
702  logger.critical('Failed to kill the IO process [%r]', ex, exc_info=True)
703 
704  self._stopping = True
705  self._logging_thread.join()
706 
707  def __del__(self):
708  if not self._stopping:
709  self.close()
710 
711  def _check_alive(self):
712  if not self._proc.is_alive():
713  raise DriverError('IO process is dead :(')
714 
715  def receive(self, timeout=None):
716  self._check_alive()
717 
718  if timeout is None:
719  deadline = None
720  elif timeout == 0:
721  deadline = 0
722  else:
723  deadline = time.monotonic() + timeout
724 
725  while True:
726  # Blockingly reading the queue
727  try:
728  if deadline is None:
729  get_timeout = None
730  elif deadline == 0:
731  # TODO this is a workaround. Zero timeout causes the IPC queue to ALWAYS throw queue.Empty!
732  get_timeout = 1e-3
733  else:
734  # TODO this is a workaround. Zero timeout causes the IPC queue to ALWAYS throw queue.Empty!
735  get_timeout = max(1e-3, deadline - time.monotonic())
736 
737  obj = self._rx_queue.get(timeout=get_timeout)
738  except queue.Empty:
739  return
740 
741  # Handling the received thing
742  if isinstance(obj, CANFrame):
743  self._rx_hook(obj)
744  return obj
745 
746  elif isinstance(obj, Exception): # Propagating exceptions from the IO process to the main process
747  raise obj
748 
749  elif isinstance(obj, IPCCommandLineExecutionResponse):
750  while len(self._cli_command_requests):
751  (stored_command, stored_callback), self._cli_command_requests = \
753  if stored_command == obj.command:
754  stored_callback(obj)
755  break
756  else:
757  logger.error('Mismatched CLI response: expected %r, got %r', stored_command, obj.command)
758 
759  else:
760  raise DriverError('Unexpected entity in IPC channel: %r' % obj)
761 
762  # Termination condition
763  if deadline == 0:
764  break
765  elif deadline is not None:
766  if time.monotonic() >= deadline:
767  return
768 
769  def send(self, message_id, message, extended=False):
770  self._check_alive()
771  frame = CANFrame(message_id, message, extended)
772  try:
773  self._tx_queue.put_nowait(frame)
774  except queue.Full:
775  raise TxQueueFullError()
776  self._tx_hook(frame)
777 
778  def execute_cli_command(self, command, callback, timeout=None):
779  """
780  Executes an arbitrary CLI command on the SLCAN adapter, assuming that the adapter supports CLI commands.
781  The callback will be invoked from the method receive() using same thread.
782  If the command times out, the callback will be invoked anyway, with 'expired' flag set.
783  Args:
784  command: Command as unicode string or bytes
785  callback: A callable that accepts one argument.
786  The argument is an instance of IPCCommandLineExecutionResponse
787  timeout: Timeout in seconds. None to use default timeout.
788  """
789  self._check_alive()
790  request = IPCCommandLineExecutionRequest(command, timeout)
791  try:
792  self._tx_queue.put(request, timeout=timeout)
793  except queue.Full:
794  raise TxQueueFullError()
795  # The command could be modified by the IPCCommandLineExecutionRequest
796  self._cli_command_requests.append((request.command, callback))
pyuavcan_v0.driver.slcan.SLCAN.receive
def receive(self, timeout=None)
Definition: slcan.py:715
pyuavcan_v0.driver.slcan.TxWorker._termination_condition
_termination_condition
Definition: slcan.py:325
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionResponse.lines
lines
Definition: slcan.py:107
pyuavcan_v0.driver.slcan._init_adapter
def _init_adapter(conn, bitrate)
Definition: slcan.py:382
pyuavcan_v0.driver.slcan.RxWorker._process_many_slcan_lines
def _process_many_slcan_lines(self, lines, ts_mono, ts_real)
Definition: slcan.py:202
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionRequest
Definition: slcan.py:85
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionRequest.monotonic_deadline
monotonic_deadline
Definition: slcan.py:92
pyuavcan_v0.driver.slcan.SLCAN._logging_proxy_loop
def _logging_proxy_loop(self)
Definition: slcan.py:669
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionRequest.command
command
Definition: slcan.py:91
pyuavcan_v0.driver.slcan.TxWorker._execute_command
def _execute_command(self, command)
Definition: slcan.py:335
pyuavcan_v0.driver.common.AbstractDriver._rx_hook
def _rx_hook(self, frame)
Definition: driver/common.py:88
pyuavcan_v0.driver.slcan.SLCAN._cli_command_requests
_cli_command_requests
Definition: slcan.py:625
pyuavcan_v0.driver.slcan.RxWorker.READ_BUFFER_SIZE
int READ_BUFFER_SIZE
Definition: slcan.py:128
pyuavcan_v0.driver.slcan.RxWorker.run
def run(self)
Definition: slcan.py:211
pyuavcan_v0.driver.slcan._raise_self_process_priority
def _raise_self_process_priority()
Definition: slcan.py:370
pyuavcan_v0.driver.slcan.TxWorker._tx_queue
_tx_queue
Definition: slcan.py:324
pyuavcan_v0.driver.slcan.SLCAN.__init__
def __init__(self, device_name, **kwargs)
Definition: slcan.py:613
pyuavcan_v0.driver.slcan.RxWorker.SELECT_TIMEOUT
float SELECT_TIMEOUT
Definition: slcan.py:127
pyuavcan_v0.driver.slcan.TxWorker.QUEUE_BLOCK_TIMEOUT
float QUEUE_BLOCK_TIMEOUT
Definition: slcan.py:319
pyuavcan_v0.driver.slcan.SLCAN._rx_queue
_rx_queue
Definition: slcan.py:621
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionResponse.__str__
def __str__(self)
Definition: slcan.py:110
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionResponse
Definition: slcan.py:99
pyuavcan_v0.driver.slcan.RxWorker._process_slcan_line
def _process_slcan_line(self, line, local_ts_mono, local_ts_real)
Definition: slcan.py:158
pyuavcan_v0.driver.slcan.RxWorker._ts_estimator_real
_ts_estimator_real
Definition: slcan.py:134
pyuavcan_v0.driver.common.AbstractDriver
Definition: driver/common.py:53
pyuavcan_v0.driver.slcan.SLCAN.execute_cli_command
def execute_cli_command(self, command, callback, timeout=None)
Definition: slcan.py:778
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionRequest.__init__
def __init__(self, command, timeout=None)
Definition: slcan.py:88
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionRequest.expired
def expired(self)
Definition: slcan.py:95
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionRequest.DEFAULT_TIMEOUT
int DEFAULT_TIMEOUT
Definition: slcan.py:86
pyuavcan_v0.driver.slcan.RxWorker._termination_condition
_termination_condition
Definition: slcan.py:135
pyuavcan_v0.driver.slcan.SLCAN._tx_queue
_tx_queue
Definition: slcan.py:622
pyuavcan_v0.driver.slcan.RxWorker.__init__
def __init__(self, conn, rx_queue, ts_estimator_mono, ts_estimator_real, termination_condition)
Definition: slcan.py:130
uavcan::max
const UAVCAN_EXPORT T & max(const T &a, const T &b)
Definition: templates.hpp:291
pyuavcan_v0.driver.slcan.RxWorker.PY2_COMPAT
int PY2_COMPAT
Definition: slcan.py:126
pyuavcan_v0.driver.slcan.SLCAN._log_queue
_log_queue
Definition: slcan.py:623
pyuavcan_v0.driver.slcan.TxWorker.__init__
def __init__(self, conn, rx_queue, tx_queue, termination_condition)
Definition: slcan.py:321
pyuavcan_v0.driver.slcan.RxWorker._conn
_conn
Definition: slcan.py:131
pyuavcan_v0.driver.slcan._io_process
def _io_process(device, tx_queue, rx_queue, log_queue, parent_pid, bitrate=None, baudrate=None, max_adapter_clock_rate_error_ppm=None, fixed_rx_delay=None, max_estimated_rx_delay_to_resync=None)
Definition: slcan.py:468
pyuavcan_v0.driver.slcan._stop_adapter
def _stop_adapter(conn)
Definition: slcan.py:462
pyuavcan_v0.driver.slcan.TxWorker._conn
_conn
Definition: slcan.py:322
int
int
Definition: libstubs.cpp:120
pyuavcan_v0.driver.slcan.SLCAN.__del__
def __del__(self)
Definition: slcan.py:707
pyuavcan_v0.driver.slcan.RxWorker._ts_estimator_mono
_ts_estimator_mono
Definition: slcan.py:133
pyuavcan_v0.driver.common.CANFrame
Definition: driver/common.py:27
pyuavcan_v0.driver.slcan.SLCAN._check_alive
def _check_alive(self)
Definition: slcan.py:711
pyuavcan_v0.driver.slcan.SLCAN
Definition: slcan.py:588
pyuavcan_v0.driver.common.AbstractDriver._tx_hook
def _tx_hook(self, frame)
Definition: driver/common.py:85
pyuavcan_v0.driver.slcan.TxWorker._send_frame
def _send_frame(self, frame)
Definition: slcan.py:327
pyuavcan_v0.driver.slcan.TxWorker
Definition: slcan.py:318
pyuavcan_v0.driver.timestamp_estimator.TimestampEstimator
Definition: timestamp_estimator.py:82
pyuavcan_v0.driver.slcan.SLCAN._logging_thread
_logging_thread
Definition: slcan.py:628
pyuavcan_v0.driver.slcan.TxWorker.run
def run(self)
Definition: slcan.py:342
pyuavcan_v0.driver.slcan.SLCAN._stopping
_stopping
Definition: slcan.py:619
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionResponse.command
command
Definition: slcan.py:106
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionResponse.__init__
def __init__(self, command, lines=None, expired=False)
Definition: slcan.py:100
pyuavcan_v0.driver.slcan.IPCCommandLineExecutionResponse.expired
expired
Definition: slcan.py:108
pyuavcan_v0.driver.slcan.RxWorker
Definition: slcan.py:125
pyuavcan_v0.driver.slcan.SLCAN._proc
_proc
Definition: slcan.py:643
pyuavcan_v0.driver.common.DriverError
Definition: driver/common.py:19
pyuavcan_v0.driver.slcan.SLCAN.close
def close(self)
Definition: slcan.py:685
pyuavcan_v0.driver.slcan.RxWorker._read_port
def _read_port(self)
Definition: slcan.py:143
pyuavcan_v0.driver.slcan.TxWorker._rx_queue
_rx_queue
Definition: slcan.py:323
pyuavcan_v0.driver.common.TxQueueFullError
Definition: driver/common.py:23
pyuavcan_v0.driver.slcan.SLCAN.send
def send(self, message_id, message, extended=False)
Definition: slcan.py:769
pyuavcan_v0.driver.slcan.RxWorker._output_queue
_output_queue
Definition: slcan.py:132


uavcan_communicator
Author(s):
autogenerated on Fri Dec 13 2024 03:10:03