10 from __future__
import division, absolute_import, print_function, unicode_literals
17 import multiprocessing
20 from logging
import getLogger
21 from .common
import DriverError, TxQueueFullError, CANFrame, AbstractDriver
22 from .timestamp_estimator
import TimestampEstimator
30 logger = getLogger(__name__)
37 logger.info(
"Cannot import PySerial; SLCAN will not be available.")
41 sys.getwindowsversion()
42 RUNNING_ON_WINDOWS =
True
43 except AttributeError:
44 RUNNING_ON_WINDOWS =
False
50 if 'darwin' in sys.platform:
53 RX_QUEUE_SIZE = 1000000
57 TIMESTAMP_OVERFLOW_PERIOD = 60
59 DEFAULT_BITRATE = 1000000
60 DEFAULT_BAUDRATE = 3000000
65 CLI_END_OF_LINE = b
'\r\n'
66 CLI_END_OF_TEXT = b
'\x03'
68 DEFAULT_MAX_ADAPTER_CLOCK_RATE_ERROR_PPM = 200
69 DEFAULT_FIXED_RX_DELAY = 0.0002
70 DEFAULT_MAX_ESTIMATED_RX_DELAY_TO_RESYNC = 0.1
72 IO_PROCESS_INIT_TIMEOUT = 10
73 IO_PROCESS_NICENESS_INCREMENT = -18
75 MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP = 1000
81 IPC_SIGNAL_INIT_OK =
'init_ok'
82 IPC_COMMAND_STOP =
'stop'
89 if isinstance(command, bytes):
90 command = command.decode(
'utf8')
100 def __init__(self, command, lines=None, expired=False):
101 def try_decode(what):
102 if isinstance(what, bytes):
103 return what.decode(
'utf8')
107 self.
lines = [try_decode(ln)
for ln
in (lines
or [])]
114 return '%r EXPIRED' % self.
command
119 _pending_command_line_execution_requests = queue.Queue()
126 PY2_COMPAT = sys.version_info[0] < 3
128 READ_BUFFER_SIZE = 1024 * 8
130 def __init__(self, conn, rx_queue, ts_estimator_mono, ts_estimator_real, termination_condition):
137 if RUNNING_ON_WINDOWS:
141 self.
_conn.timeout = 0
144 if RUNNING_ON_WINDOWS:
147 ts_mono = time.monotonic()
148 ts_real = time.time()
152 ts_mono = time.monotonic()
153 ts_real = time.time()
156 return data, ts_mono, ts_real
159 line = line.strip().strip(NACK).strip(CLI_END_OF_TEXT)
166 if line[0] == b
'T'[0]:
168 elif line[0] == b
't'[0]:
174 packet_id =
int(line[1:1 + id_len], 16)
176 packet_len =
int(line[1 + id_len])
178 packet_len = line[1 + id_len] - 48
180 if packet_len > 8
or packet_len < 0:
186 with_timestamp = line_len > (2 + id_len + packet_len * 2)
188 packet_data = binascii.a2b_hex(line[2 + id_len:2 + id_len + packet_len * 2])
192 ts_hardware =
int(line[-4:], 16) * 1e-3
196 ts_mono = local_ts_mono
197 ts_real = local_ts_real
199 frame =
CANFrame(packet_id, packet_data, (id_len == 8), ts_monotonic=ts_mono, ts_real=ts_real)
208 logger.error(
'Could not process SLCAN line %r', slc, exc_info=
True)
212 logger.info(
'RX worker started')
214 successive_errors = 0
217 outstanding_command =
None
218 outstanding_command_response_lines = []
222 new_data, ts_mono, ts_real = self.
_read_port()
227 if outstanding_command
is None:
229 outstanding_command = _pending_command_line_execution_requests.get_nowait()
230 outstanding_command_response_lines = []
234 if outstanding_command.expired:
237 outstanding_command =
None
242 if outstanding_command
is None:
243 slcan_lines = data.split(ACK)
244 slcan_lines, data = slcan_lines[:-1], slcan_lines[-1]
251 split_lines = data.split(CLI_END_OF_LINE)
252 split_lines, data = split_lines[:-1], split_lines[-1]
255 for ln
in split_lines:
257 slcan_lines, cli_line = tmp[:-1], tmp[-1]
262 logger.debug(
'Processing CLI response line %r as...', cli_line)
263 if len(outstanding_command_response_lines) == 0:
264 if outstanding_command
is not None and \
265 cli_line == outstanding_command.command.encode(
'utf8'):
266 logger.debug(
'...echo')
267 outstanding_command_response_lines.append(cli_line)
271 logger.debug(
'...garbage')
273 if cli_line == CLI_END_OF_TEXT:
274 logger.debug(
'...end-of-text')
277 lines=outstanding_command_response_lines[1:])
281 outstanding_command = _pending_command_line_execution_requests.get_nowait()
283 outstanding_command =
None
284 outstanding_command_response_lines = []
286 logger.debug(
'...mid response')
287 outstanding_command_response_lines.append(cli_line)
294 data, last_byte = data[:-1], data[-1:]
295 slcan_lines = data.split(ACK)
296 slcan_lines, data = slcan_lines[:-1], slcan_lines[-1] + last_byte
300 successive_errors = 0
301 except Exception
as ex:
303 logger.error(
'RX thread error %d of %d',
304 successive_errors, MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP, exc_info=
True)
311 successive_errors += 1
312 if successive_errors >= MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP:
315 logger.info(
'RX worker is stopping')
319 QUEUE_BLOCK_TIMEOUT = 0.1
321 def __init__(self, conn, rx_queue, tx_queue, termination_condition):
328 line =
'%s%d%s\r' % ((
'T%08X' if frame.extended
else 't%03X') % frame.id,
330 binascii.b2a_hex(frame.data).decode(
'ascii'))
332 self.
_conn.write(line.encode(
'ascii'))
336 logger.info(
'Executing command line %r', command.command)
338 _pending_command_line_execution_requests.put(command)
339 self.
_conn.write(command.command.encode(
'ascii') + CLI_END_OF_LINE)
347 if isinstance(command, CANFrame):
349 elif isinstance(command, IPCCommandLineExecutionRequest):
351 elif command == IPC_COMMAND_STOP:
354 raise DriverError(
'IO process received unknown IPC command: %r' % command)
359 except Exception
as ex:
360 logger.error(
'TX thread exception', exc_info=
True)
371 if RUNNING_ON_WINDOWS:
375 handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS,
True, win32api.GetCurrentProcessId())
376 win32process.SetPriorityClass(handle, win32process.REALTIME_PRIORITY_CLASS)
379 os.nice(IO_PROCESS_NICENESS_INCREMENT)
384 logger.info(
'Init: Waiting for ACK...')
385 conn.timeout = ACK_TIMEOUT
394 logger.info(
'Init: Ignoring byte %r while waiting for ACK', b)
396 def send_command(cmd):
397 logger.info(
'Init: Sending command %r', cmd)
398 conn.write(cmd + b
'\r')
410 }[bitrate
if bitrate
is not None else DEFAULT_BITRATE]
432 send_command((
'S%d' % speed_code).encode())
446 except DriverError
as ex:
447 logger.warning(
'Init: Could not clear error flags (command not supported by the CAN adapter?): %s', ex)
448 except Exception
as ex:
450 logger.error(
'Could not init SLCAN adapter, will retry; error was: %s', ex, exc_info=
True)
463 conn.write(b
'C\r' * 10)
475 max_adapter_clock_rate_error_ppm=None,
477 max_estimated_rx_delay_to_resync=None):
480 from logging.handlers
import QueueHandler
484 getLogger().addHandler(QueueHandler(log_queue))
485 getLogger().setLevel(
'INFO')
487 logger.info(
'IO process started with PID %r', os.getpid())
491 stdin_fileno = sys.stdin.fileno()
493 os.close(stdin_fileno)
497 def is_parent_process_alive():
498 if RUNNING_ON_WINDOWS:
501 return os.getppid() == parent_pid
505 except Exception
as ex:
506 logger.info(
'Could not adjust priority of the IO process: %r', ex)
511 if max_adapter_clock_rate_error_ppm
is None:
512 max_adapter_clock_rate_error = DEFAULT_MAX_ADAPTER_CLOCK_RATE_ERROR_PPM / 1e6
514 max_adapter_clock_rate_error = max_adapter_clock_rate_error_ppm / 1e6
516 fixed_rx_delay = fixed_rx_delay
if fixed_rx_delay
is not None else DEFAULT_FIXED_RX_DELAY
517 max_estimated_rx_delay_to_resync = max_estimated_rx_delay_to_resync
or DEFAULT_MAX_ESTIMATED_RX_DELAY_TO_RESYNC
520 source_clock_overflow_period=TIMESTAMP_OVERFLOW_PERIOD,
521 fixed_delay=fixed_rx_delay,
522 max_phase_error_to_resync=max_estimated_rx_delay_to_resync)
523 ts_estimator_real = copy.deepcopy(ts_estimator_mono)
530 def rx_thread_wrapper():
533 ts_estimator_mono=ts_estimator_mono,
534 ts_estimator_real=ts_estimator_real,
535 termination_condition=
lambda: should_exit)
538 except Exception
as ex:
539 logger.error(
'RX thread failed, exiting', exc_info=
True)
543 rxthd = threading.Thread(target=rx_thread_wrapper, name=
'slcan_rx')
547 conn = serial.Serial(device, baudrate
or DEFAULT_BAUDRATE)
548 except Exception
as ex:
549 logger.error(
'Could not open port', exc_info=
True)
561 logger.info(
'IO process initialization complete')
562 rx_queue.put(IPC_SIGNAL_INIT_OK)
567 termination_condition=
lambda: (should_exit
or
568 not rxthd.is_alive()
or
569 not is_parent_process_alive()))
571 except Exception
as ex:
572 logger.error(
'IO process failed', exc_info=
True)
575 logger.info(
'IO process is terminating...')
582 logger.info(
'IO process is now ready to die, goodbye')
590 Driver for SLCAN-compatible CAN bus adapters, with extension to support CLI commands.
592 Some info on SLCAN can be found here:
593 - Linux tree: drivers/net/can/slcan.c (http://lxr.free-electrons.com/source/drivers/net/can/slcan.c)
594 - https://files.zubax.com/docs/Generic_SLCAN_API.pdf
595 - http://www.can232.com/docs/canusb_manual.pdf
596 - http://www.fischl.de/usbtin/
598 The CLI extension allows to execute arbitrary CLI commands on the adapter. The commands differ from regular SLCAN
599 exchange in the following ways:
600 - CLI commands are echoed back.
601 - Every output line of a CLI command, including echo, is terminated with CR LF (\r\n).
602 - After the last line follows the ASCII End Of Text character (ETX, ^C, ASCII code 0x03) on a separate
603 line (terminated with CR LF).
604 - CLI commands must not begin with whitespace characters.
606 Input command "stat\r\n" may produce the following output lines:
608 - Data: "First line\r\n", "Second line\r\n", ...
609 - End Of Text marker: "\x03\r\n"
610 Refer to https://kb.zubax.com for more info.
615 raise RuntimeError(
"PySerial not imported; SLCAN is not available. Please install PySerial.")
621 self.
_rx_queue = multiprocessing.Queue(maxsize=RX_QUEUE_SIZE)
622 self.
_tx_queue = multiprocessing.Queue(maxsize=TX_QUEUE_SIZE)
632 kwargs = copy.copy(kwargs)
633 keep_keys = inspect.getargspec(_io_process).args
634 for key
in list(kwargs.keys()):
635 if key
not in keep_keys:
641 kwargs[
'parent_pid'] = os.getpid()
643 self.
_proc = multiprocessing.Process(target=_io_process, name=
'slcan_io_process',
644 args=(device_name,), kwargs=kwargs)
645 self.
_proc.daemon =
True
651 deadline = time.monotonic() + IO_PROCESS_INIT_TIMEOUT
654 sig = self.
_rx_queue.get(timeout=IO_PROCESS_INIT_TIMEOUT)
655 if sig == IPC_SIGNAL_INIT_OK:
657 if isinstance(sig, Exception):
658 self.
_tx_queue.put(IPC_COMMAND_STOP, timeout=IO_PROCESS_INIT_TIMEOUT)
662 if time.monotonic() > deadline:
663 self.
_tx_queue.put(IPC_COMMAND_STOP, timeout=IO_PROCESS_INIT_TIMEOUT)
664 raise DriverError(
'IO process did not confirm initialization')
676 getLogger(record.name).handle(record)
677 except Exception
as ex:
679 print(
'SLCAN logging proxy failed:', ex, file=sys.stderr)
683 logger.info(
'Logging proxy thread is stopping')
686 if self.
_proc.is_alive():
690 if self.
_proc.is_alive()
or self.
_proc.exitcode
is None:
691 logger.warning(
'IO process refused to exit and will be terminated')
693 self.
_proc.terminate()
694 except Exception
as ex:
695 logger.error(
'Failed to terminate the IO process [%r]', ex, exc_info=
True)
697 if self.
_proc.is_alive():
698 logger.error(
'IO process refused to terminate, escalating to SIGKILL')
700 os.kill(self.
_proc.pid, signal.SIGKILL)
701 except Exception
as ex:
702 logger.critical(
'Failed to kill the IO process [%r]', ex, exc_info=
True)
712 if not self.
_proc.is_alive():
723 deadline = time.monotonic() + timeout
735 get_timeout =
max(1e-3, deadline - time.monotonic())
737 obj = self.
_rx_queue.get(timeout=get_timeout)
742 if isinstance(obj, CANFrame):
746 elif isinstance(obj, Exception):
749 elif isinstance(obj, IPCCommandLineExecutionResponse):
753 if stored_command == obj.command:
757 logger.error(
'Mismatched CLI response: expected %r, got %r', stored_command, obj.command)
760 raise DriverError(
'Unexpected entity in IPC channel: %r' % obj)
765 elif deadline
is not None:
766 if time.monotonic() >= deadline:
769 def send(self, message_id, message, extended=False):
771 frame =
CANFrame(message_id, message, extended)
780 Executes an arbitrary CLI command on the SLCAN adapter, assuming that the adapter supports CLI commands.
781 The callback will be invoked from the method receive() using same thread.
782 If the command times out, the callback will be invoked anyway, with 'expired' flag set.
784 command: Command as unicode string or bytes
785 callback: A callable that accepts one argument.
786 The argument is an instance of IPCCommandLineExecutionResponse
787 timeout: Timeout in seconds. None to use default timeout.
792 self.
_tx_queue.put(request, timeout=timeout)