35 SUPPORTED_PROTOCOL_VERSION = 3
41 CMD_GET_PROTOCOL_VERSION = 0x01
43 CMD_RESET_SENSOR = 0x03
44 CMD_RESET_PDALGO = 0x04
45 CMD_ENABLE_PDALGO = 0x05
46 CMD_ENABLE_IQSTREAM = 0x06
49 CMD_ENABLE_ASIC_DATA = 0x09
52 CMD_GET_SENSOR_PARAM = 0x20
53 CMD_GET_VERSION = 0x21
54 CMD_GET_SENSORS = 0x22
55 CMD_GET_ALGO_CONFIG = 0x23
58 CMD_SET_ALGO_CONFIG = 0x30
60 CMD_SET_RANGE_MM = 0x32
61 CMD_SET_SAMPLE_RANGE = 0x33
62 CMD_SET_PULSE_LENGTH = 0x34
63 CMD_SET_LOW_GAIN_RXLEN = 0x35
66 RESP_GET_PROTOCOL_VERSION = 0x81
67 RESP_SOFT_RESET = 0x82
68 RESP_RESET_SENSOR = 0x83
69 RESP_RESET_PDALGO = 0x84
70 RESP_ENABLE_PDALGO = 0x85
71 RESP_ENABLE_IQSTREAM = 0x86
74 RESP_ENABLE_ASIC_DATA = 0x89
75 RESP_ENABLE_IMU = 0x8a
77 RESP_GET_SENSOR_PARAM = 0xa0
78 RESP_GET_VERSION = 0xa1
79 RESP_GET_SENSORS = 0xa2
80 RESP_GET_ALGO_CONFIG = 0xa3
81 RESP_GET_STATUS = 0xa4
83 RESP_SET_ALGO_CONFIG = 0xb0
85 RESP_SET_RANGE_MM = 0xb2
86 RESP_SET_SAMPLE_RANGE = 0xb3
87 RESP_SET_PULSE_LENGTH = 0xb4
88 RESP_SET_LOW_GAIN_RXLEN = 0xb5
95 ASYNC_DEBUG_MESSAGE = 0xfd
100 FOOTER = (0xbe, 0xef)
107 (
'protocol_version',
'B', 1, 1,
True),
111 (
'timestamp',
'Q', 8, 1,
True),
112 (
'acc_x',
'h', 2, 1,
True),
113 (
'acc_y',
'h', 2, 1,
True),
114 (
'acc_z',
'h', 2, 1,
True),
115 (
'gyr_x',
'h', 2, 1,
True),
116 (
'gyr_y',
'h', 2, 1,
True),
117 (
'gyr_z',
'h', 2, 1,
True),
120 (
'target_detected',
'I', 4, 1,
True),
121 (
'range_cm',
'I', 4, 1,
True),
122 (
'amplitude',
'I', 4, 1,
True),
124 PacketType.ASYNC_DEBUG_MESSAGE: (self.
dmsg_decoder,
None),
126 (
'sensor_id',
'B', 1, 1,
True),
127 (
'timestamp',
'Q', 8, 1,
True),
128 (
'presence',
'i', 4, 1,
True),
129 (
'rmin',
'i', 4, 1,
True),
130 (
'rmax',
'i', 4, 1,
True),
131 (
'score',
'f', 4, 1,
True),
134 (
'timestamp',
'Q', 8, 1,
True),
135 (
'status',
'i', 4, 1,
True),
136 (
'error_count',
'i', 4, 1,
True),
140 (
'sensor_id',
'L', 4, 1,
True),
141 (
'op_freq_hz',
'L', 4, 1,
True),
142 (
'nb_samples',
'L', 4, 1,
True),
143 (
'odr_ms',
'L', 4, 1,
True),
144 (
'range_mm',
'L', 4, 1,
True),
145 (
'pulse_length',
'L', 4, 1,
True),
146 (
'low_gain_rxlen',
'L', 4, 1,
True),
149 (
'sensor_id',
'L', 4, 1,
True),
150 (
'odr_us',
'L', 4, 1,
True),
151 (
'nb_samples',
'L', 4, 1,
True),
152 (
'max_range',
'L', 4, 1,
True),
153 (
'min_range',
'L', 4, 1,
True),
154 (
'op_freq_hz',
'L', 4, 1,
True),
155 (
'sensitivity',
'L', 4, 1,
True),
156 (
'range_offset',
'L', 4, 1,
True),
157 (
'range_nb_zones',
'L', 4, 1,
True),
158 (
'range_hysteresis',
'L', 4, 1,
True),
159 (
'range_interval',
'L', 4, 1,
True),
162 (
'status',
'B', 1, 1,
True),
165 (
'timestamp',
'Q', 8, 1,
True),
166 (
'grv_w',
'i', 4, 1,
True),
167 (
'grv_x',
'i', 4, 1,
True),
168 (
'grv_y',
'i', 4, 1,
True),
169 (
'grv_z',
'i', 4, 1,
True),
192 def update(self, buffer_in, pkt_handler):
212 FRAME_MIN_SIZE = 2+2+2
215 if self.
rsize >= FRAME_MIN_SIZE:
217 print(
"Invalid input frame. HEADER missmatch.")
227 psize = self.
head+4+frameLen
229 if self.
rsize >= (FRAME_MIN_SIZE+frameLen):
231 print(
"Invalid input frame. FOOTER missmatch.")
236 while (psize-self.
head) > 0:
241 return self.
head - head0
252 ret_dict = decoder(args)
254 pkt_handler(command_code, ret_dict)
256 return self.
head - head0
261 return {
'status': status }
265 for key, type, word_size, size, scalar
in decoding_descriptor:
266 value = [struct.unpack_from(
267 '<' + type, self.
buffer, offset=self.
head + i * word_size)[0]
for i
in range(size)]
268 self.
head += size * word_size
270 ret_dict[key] = value[0]
if size == 1
and scalar
else value
276 self.
head += 1 + data_size
282 key, type, word_size, _, _ = decoding_descriptor[0]
285 ret_dict[key] = [struct.unpack_from(
'<' + type, self.
buffer, offset=self.
head+1+i*word_size)[0]
for i
in range(size)]
287 self.
head += size + 1
291 rx_sensor_id = struct.unpack(
"<B", self.
buffer[self.
head+0: self.
head+1])[0]
292 tx_sensor_id = struct.unpack(
"<B", self.
buffer[self.
head+1: self.
head+2])[0]
293 timestamp = struct.unpack(
"<Q", self.
buffer[self.
head+2: self.
head+10])[0]
294 flags = struct.unpack(
"<B", self.
buffer[self.
head+10: self.
head+11])[0]
298 data_size = struct.unpack(
"<h", self.
buffer[self.
head+0: self.
head+2])[0]
300 qdata = [struct.unpack(
"<h", x[0:2])[0]
for x
in [iq_bytes[pos:pos + 4]
for pos
in range(0, len(iq_bytes), 4)]]
301 idata = [struct.unpack(
"<h", x[2:4])[0]
for x
in [iq_bytes[pos:pos + 4]
for pos
in range(0, len(iq_bytes), 4)]]
302 self.
head += 2 + data_size
310 ret_dict = {key: -1
for key, _, _, _, _
in asic_decoding_descriptor
if key !=
'_'}
312 if ret_dict[
'target_detected'] == 0:
313 ret_dict[
'range_cm'] = 0
314 ret_dict[
'amplitude'] = 0
316 ret_dict[
'range_cm'] = round(ret_dict[
'range_cm'] / 32 / 10, 1)
319 'rx_sensor_id': rx_sensor_id,
320 'tx_sensor_id': tx_sensor_id,
321 'timestamp': timestamp,
329 nb_version, = struct.unpack_from(
'B', self.
buffer, self.
head)
334 for i
in range(nb_version):
335 slabel, sversion = struct.unpack_from(
'<BB', self.
buffer, self.
head)
337 label, version = struct.unpack_from(
'<{}sx{}sx'.format(slabel-1, sversion-1), self.
buffer, self.
head)
338 self.
head += slabel + sversion
339 ret_dict[label.decode(
'ascii')] = version.decode(
'ascii')
344 nb_sensors, = struct.unpack_from(
'<B', self.
buffer, self.
head)
345 ret_dict = {
'nb_sensors': nb_sensors }
346 ret_dict[
'sensor_ids'] = struct.unpack_from(
347 '{}B'.format(nb_sensors), self.
buffer, self.
head+1)
348 self.
head += (nb_sensors + 1)
352 print(
"Unknown packet", hex(command_code))
359 agrs_size = struct.calcsize(
"<B" + cmd_args_code)
360 lcmd = [0x55, 0xaa, agrs_size, cmd ] + list(cmd_args) + [0xbe, 0xef]
361 bcmd = struct.pack(
"<BBHB" + cmd_args_code +
"BB", *lcmd)
def update(self, buffer_in, pkt_handler)
def chdata_decoder(self, asic_decoding_descriptor)
def array_decoder(self, decoding_descriptor)
def parseInputBuffer(self, pkt_handler)
def parseInputPacket(self, pkt_handler)
def get_version_decoder(self, decoding_descriptor)
def simple_acknowledge(self, args)
def get_connected_sensors(self, decoding_descriptor)
def generic_decoder(self, decoding_descriptor)
def unknown_packet_decoder(self, command_code)
def dmsg_decoder(self, decoding_descriptor)