2 from __future__
import print_function
11 Baudrates, XDIGroup, getMIDName, DeviceState, DeprecatedMID, \
12 MTErrorMessage, MTTimeoutException
16 """Convert the given character or byte to a byte.""" 17 if sys.version_info[0] == 2:
19 if isinstance(b, bytearray):
28 """XSens MT device communication object.""" 30 def __init__(self, port, baudrate=115200, timeout=0.002, autoconf=True,
31 config_mode=False, verbose=False, initial_wait=0.1):
36 self.
device = serial.Serial(port, baudrate, timeout=timeout,
40 self.
device = serial.Serial(port, baudrate, timeout=timeout,
41 writeTimeout=timeout, rtscts=
True,
43 time.sleep(initial_wait)
68 """Low-level message sending function.""" 71 lendat = b
'\xFF' + struct.pack(
'!H', length)
73 lendat = struct.pack(
'!B', length)
74 packet = b
'\xFA\xFF' + struct.pack(
'!B', mid) + lendat + data
75 packet += struct.pack(
'!B', 0xFF & (-(sum(map(_byte, packet[1:])))))
78 while ((time.time()-start) < self.
timeout)
and self.
device.read():
82 except serial.serialutil.SerialTimeoutException:
83 raise MTTimeoutException(
"writing message")
85 print(
"MT: Write message id 0x%02X (%s) with %d data bytes: " 87 ' '.join(
"%02X" %
_byte(v)
for v
in data)))
90 """Get a given amount of data.""" 93 buf.extend(self.
device.read(size-len(buf)))
97 print(
"waiting for %d bytes, got %d so far: [%s]" %
98 (size, len(buf),
' '.join(
'%02X' % v
for v
in buf)))
99 raise MTTimeoutException(
"waiting for message")
102 """Low-level MTData receiving function. 103 Take advantage of known message length. 107 totlength = 5 + self.
length 109 totlength = 7 + self.
length 110 while (time.time()-start) < self.
timeout:
111 buf.extend(self.
waitfor(totlength - len(buf)))
112 preamble_ind = buf.find(self.
header)
113 if preamble_ind == -1:
116 sys.stderr.write(
"MT: discarding (no preamble).\n")
122 sys.stderr.write(
"MT: discarding (before preamble).\n")
123 del buf[:preamble_ind]
125 buf.extend(self.
waitfor(totlength-len(buf)))
126 if 0xFF & sum(buf[1:]):
128 sys.stderr.write(
"MT: invalid checksum; discarding data " 129 "and waiting for next message.\n")
130 del buf[:buf.find(self.
header)-2]
132 data = str(buf[-self.
length-1:-1])
136 raise MTException(
"could not find MTData message.")
139 """Low-level message receiving function.""" 141 while (time.time()-start) < self.
timeout:
149 mid, length = struct.unpack(
'!BB', self.
waitfor(2))
151 length, = struct.unpack(
'!H', self.
waitfor(2))
155 data = struct.unpack(
'!%dB' % length, buf[:-1])
157 if 0xFF & sum(data, 0xFF+mid+length+checksum):
159 sys.stderr.write(
"invalid checksum; discarding data and " 160 "waiting for next message.\n")
163 print(
"MT: Got message id 0x%02X (%s) with %d data bytes: " 165 ' '.join(
"%02X" % v
for v
in data)))
167 raise MTErrorMessage(data[0])
168 return (mid, buf[:-1])
172 def write_ack(self, mid, data=b'', n_resend=30, n_read=25):
173 """Send a message and read confirmation.""" 174 for _
in range(n_resend):
176 for _
in range(n_read):
178 if mid_ack == (mid+1):
181 print(
"ack (0x%02X) expected, got 0x%02X instead" %
187 n_retries = n_resend*n_read
188 raise MTException(
"Ack (0x%02X) expected, MID 0x%02X received " 189 "instead (after %d retries)." % (mid+1, mid_ack,
194 """Switch device to config state if necessary.""" 195 if self.
state != DeviceState.Config:
199 """Switch device to measurement state if necessary.""" 200 if self.
state != DeviceState.Measurement:
206 def Reset(self, go_to_config=False):
209 If go_to_config then send WakeUpAck in order to leave the device in 216 if mid == MID.WakeUp:
218 self.
state = DeviceState.Config
220 self.
state = DeviceState.Measurement
223 """Place MT device in configuration mode.""" 225 self.
state = DeviceState.Config
228 """Place MT device in measurement mode.""" 231 self.
state = DeviceState.Measurement
234 """Get the device identifier.""" 237 deviceID, = struct.unpack(
'!I', data)
241 """Get the product code.""" 243 data = self.
write_ack(MID.ReqProductCode)
244 return str(data).strip()
247 """Get the hardware version.""" 249 data = self.
write_ack(MID.ReqHardwareVersion)
250 major, minor = struct.unpack(
'!BB', data)
251 return (major, minor)
254 """Get the firmware version.""" 258 major, minor, revision = struct.unpack(
'!BBB', data)
259 return (major, minor, revision)
262 major, minor, rev, buildnr, svnrev = struct.unpack(
'!BBBII', data)
263 return (major, minor, rev, buildnr, svnrev)
266 """Run the built-in self test.""" 269 bit_names = [
'accX',
'accY',
'accZ',
'gyrX',
'gyrY',
'gyrZ',
270 'magX',
'magY',
'magZ']
271 self_test_results = []
272 for i, name
in enumerate(bit_names):
273 self_test_results.append((name, (data >> i) & 1))
274 return self_test_results
277 """Get the current baudrate id of the device.""" 283 """Set the baudrate of the device using the baudrate id.""" 288 """Get the current error mode of the device.""" 291 error_mode, = struct.unpack(
'!H', data)
295 """Set the error mode of the device. 297 The error mode defines the way the device deals with errors (expect 299 0x0000: ignore any errors except message handling errors, 300 0x0001: in case of missing sampling instance: increase sample 301 counter and do not send error message, 302 0x0002: in case of missing sampling instance: increase sample 303 counter and send error message, 304 0x0003: in case of non-message handling error, an error message is 305 sent and the device will go into Config State. 308 data = struct.pack(
'!H', error_mode)
312 """Get the option flags (MTi-1 series).""" 314 data = self.
write_ack(MID.SetOptionFlags)
315 flags, = struct.unpack(
'!I', data)
319 """Set the option flags (MTi-1 series).""" 321 data = struct.pack(
'!II', set_flags, clear_flags)
325 """Get the location ID of the device.""" 328 location_id, = struct.unpack(
'!H', data)
332 """Set the location ID of the device (arbitrary).""" 334 data = struct.pack(
'!H', location_id)
338 """Restore MT device configuration to factory defaults (soft version). 344 """Get the transmission delay (only RS485).""" 346 data = self.
write_ack(MID.SetTransmitDelay)
347 transmit_delay, = struct.unpack(
'!H', data)
348 return transmit_delay
351 """Set the transmission delay (only RS485).""" 353 data = struct.pack(
'!H', transmit_delay)
354 self.
write_ack(MID.SetTransmitDelay, data)
357 """Get the synchronisation settings.""" 359 data = self.
write_ack(MID.SetSyncSettings)
360 sync_settings = [struct.unpack(
'!BBBBHHHH', data[o:o+12])
361 for o
in range(0, len(data), 12)]
365 """Set the synchronisation settings (mark IV)""" 367 data = b
''.join(struct.pack(
'!BBBBHHHH', *sync_setting)
368 for sync_setting
in sync_settings)
369 self.
write_ack(MID.SetSyncSettings, data)
372 """Ask for the current configuration of the MT device.""" 374 config = self.
write_ack(MID.ReqConfiguration)
376 masterID, period, skipfactor, _, _, _, date, time, num, deviceID,\
377 length, mode, settings =\
378 struct.unpack(
'!IHHHHI8s8s32x32xHIHHI8x', config)
380 raise MTException(
"could not parse configuration.")
385 self.
header = b
'\xFA\xFF\x32' + struct.pack(
'!B', self.
length)
387 self.
header = b
'\xFA\xFF\x32\xFF' + struct.pack(
'!H', self.
length)
388 conf = {
'output-mode': mode,
389 'output-settings': settings,
392 'skipfactor': skipfactor,
393 'Master device ID': masterID,
396 'number of devices': num,
397 'device ID': deviceID}
401 """Get the output configuration of the device (mark IV).""" 403 data = self.
write_ack(MID.SetOutputConfiguration)
404 output_configuration = [struct.unpack(
'!HH', data[o:o+4])
405 for o
in range(0, len(data), 4)]
406 return output_configuration
409 """Set the output configuration of the device (mark IV).""" 411 data = b
''.join(struct.pack(
'!HH', *output)
412 for output
in output_configuration)
413 self.
write_ack(MID.SetOutputConfiguration, data)
416 """Get the NMEA data output configuration.""" 418 data = self.
write_ack(MID.SetStringOutputType)
419 string_output_type, = struct.unpack(
'!H', data)
420 return string_output_type
423 """Set the configuration of the NMEA data output.""" 425 data = struct.pack(
'!H', string_output_type)
426 self.
write_ack(MID.SetStringOutputType, data)
429 """Get the sampling period.""" 432 period, = struct.unpack(
'!H', data)
436 """Set the sampling period.""" 438 data = struct.pack(
'!H', period)
442 """Get the object alignment. 444 parameter indicates the desired alignment quaternion: 445 0 for sensor alignment (RotSensor), 446 1 for local alignment (RotLocal). 449 data = struct.pack(
'!B', parameter)
450 data = self.
write_ack(MID.SetAlignmentRotation, data)
452 q0, q1, q2, q3 = struct.unpack(
'!ffff', data)
453 return parameter, (q0, q1, q2, q3)
454 elif len(data) == 17:
455 param, q0, q1, q2, q3 = struct.unpack(
'!Bffff', data)
456 return param, (q0, q1, q2, q3)
458 raise MTException(
'Could not parse ReqAlignmentRotationAck message:' 459 ' wrong size of message (%d instead of either 16 ' 460 'or 17).' % len(data))
463 """Set the object alignment. 465 parameter indicates the desired alignment quaternion: 466 0 for sensor alignment (RotSensor), 467 1 for local alignment (RotLocal). 470 data = struct.pack(
'!Bffff', parameter, *quaternion)
471 self.
write_ack(MID.SetAlignmentRotation, data)
474 """Get current output mode.""" 477 self.
mode, = struct.unpack(
'!H', data)
481 """Select which information to output.""" 483 data = struct.pack(
'!H', mode)
488 """Get current extended output mode (for alternative UART).""" 490 data = self.
write_ack(MID.SetExtOutputMode)
491 ext_mode, = struct.unpack(
'!H', data)
495 """Set extended output mode (for alternative UART).""" 497 data = struct.pack(
'!H', ext_mode)
498 self.
write_ack(MID.SetExtOutputMode, data)
501 """Get current output mode.""" 503 data = self.
write_ack(MID.SetOutputSettings)
504 self.
settings, = struct.unpack(
'!I', data)
508 """Select how to output the information.""" 510 data = struct.pack(
'!I', settings)
511 self.
write_ack(MID.SetOutputSettings, data)
515 """Set the output skip factor.""" 517 data = struct.pack(
'!H', skipfactor)
518 self.
write_ack(DeprecatedMID.SetOutputSkipFactor, data)
521 """Get data length for mark III devices.""" 524 data = self.
write_ack(DeprecatedMID.ReqDataLength)
525 except MTErrorMessage
as e:
527 sys.stderr.write(
"ReqDataLength message is deprecated and not " 528 "recognised by your device.")
531 self.
length, = struct.unpack(
'!H', data)
533 self.
header = b
'\xFA\xFF\x32' + struct.pack(
'!B', self.
length)
535 self.
header = b
'\xFA\xFF\x32\xFF' + struct.pack(
'!H', self.
length)
539 """Get the stored position of the device. 540 It is used internally for local magnetic declination and local gravity. 545 lat, lon, alt = struct.unpack(
'!ddd', data)
546 elif len(data) == 12:
547 lat, lon, alt = struct.unpack(
'!fff', data)
549 raise MTException(
'Could not parse ReqLatLonAltAck message: wrong' 551 return (lat, lon, alt)
554 """Set the position of the device. 555 It is used internally for local magnetic declination and local gravity. 558 data = struct.pack(
'!ddd', lat, lon, alt)
562 """Get the available XKF scenarios on the device.""" 564 data = self.
write_ack(MID.ReqAvailableScenarios)
567 for i
in range(len(data)//22):
568 scenario_type, version, label =\
569 struct.unpack(
'!BB20s', data[22*i:22*(i+1)])
570 scenarios.append((scenario_type, version, label.strip()))
574 raise MTException(
"could not parse the available XKF scenarios.")
578 """Get the ID of the currently used XKF scenario.""" 580 data = self.
write_ack(MID.SetCurrentScenario)
585 """Set the XKF scenario to use.""" 587 data = struct.pack(
'!BB', 0, scenario_id)
588 self.
write_ack(MID.SetCurrentScenario, data)
591 GetAvailableFilterProfiles = GetAvailableScenarios
592 GetFilterProfile = GetCurrentScenario
593 SetFilterProfile = SetCurrentScenario
596 """Get the current GNSS navigation filter settings.""" 598 data = self.
write_ack(MID.SetGnssPlatform)
599 platform, = struct.unpack(
'!H', data)
603 """Set the GNSS navigation filter settings.""" 605 data = struct.pack(
'!H', platform)
606 self.
write_ack(MID.SetGnssPlatform, data)
609 """Reset the orientation. 611 Code can take several values: 612 0x0000: store current settings (only in config mode), 613 0x0001: heading reset (NOT supported by MTi-G), 614 0x0003: object reset. 616 data = struct.pack(
'!H', code)
617 self.
write_ack(MID.ResetOrientation, data)
620 """Initiate the "no rotation" procedure to estimate gyro biases.""" 622 data = struct.pack(
'!H', duration)
626 """Get UTC time from device.""" 629 ns, year, month, day, hour, minute, second, flag = \
630 struct.unpack(
'!IHBBBBBB', data)
631 return (ns, year, month, day, hour, minute, second, flag)
633 def SetUTCTime(self, ns, year, month, day, hour, minute, second, flag):
634 """Set UTC time on the device.""" 636 data = struct.pack(
'!IHBBBBBB', ns, year, month, day, hour, minute,
641 """Adjust UTC Time of device using correction ticks (0.1 ms).""" 643 data = struct.pack(
'!i', ticks)
644 self.write(MID.AdjustUTCTime, data)
647 """Command of In-run Compass Calibration (ICC).""" 648 if command
not in (0, 1, 2, 3):
649 raise MTException(
"unknown ICC command 0x%02X" % command)
650 cmd_data = struct.pack(
'!B', command)
651 res_data = self.
write_ack(MID.IccCommand, cmd_data)
652 cmd_ack = struct.unpack(
'!B', res_data[:1])
653 payload = res_data[1:]
654 if cmd_ack != command:
655 raise MTException(
"expected ack of command 0x%02X; got 0x%02X " 656 "instead" % (command, cmd_ack))
660 ddt_value, dimension, status = struct.unpack(
'!fBB', payload)
661 return ddt_value, dimension, status
665 state = struct.unpack(
'!B', payload)
672 """Configure the mode and settings of the MT device in legacy mode.""" 676 except MTErrorMessage
as e:
682 except MTException
as e:
684 print(
"no ack received while switching from MTData2 to MTData.")
688 if period
is not None:
690 if skipfactor
is not None:
695 """Read configuration from device in legacy mode.""" 704 if mid == MID.MTData:
706 elif mid == MID.MTData2:
709 raise MTException(
"unknown data message: mid=0x%02X (%s)." %
714 def parse_temperature(data_id, content, ffmt):
716 if (data_id & 0x00F0) == 0x10:
717 o[
'Temp'], = struct.unpack(
'!'+ffmt, content)
719 raise MTException(
"unknown packet: 0x%04X." % data_id)
722 def parse_timestamp(data_id, content, ffmt):
724 if (data_id & 0x00F0) == 0x10:
725 o[
'ns'], o[
'Year'], o[
'Month'], o[
'Day'], o[
'Hour'],\
726 o[
'Minute'], o[
'Second'], o[
'Flags'] =\
727 struct.unpack(
'!LHBBBBBB', content)
728 elif (data_id & 0x00F0) == 0x20:
729 o[
'PacketCounter'], = struct.unpack(
'!H', content)
730 elif (data_id & 0x00F0) == 0x30:
731 o[
'TimeOfWeek'], = struct.unpack(
'!L', content)
732 elif (data_id & 0x00F0) == 0x40:
733 o[
'gpsAge'], = struct.unpack(
'!B', content)
734 elif (data_id & 0x00F0) == 0x50:
735 o[
'pressureAge'], = struct.unpack(
'!B', content)
736 elif (data_id & 0x00F0) == 0x60:
737 o[
'SampleTimeFine'], = struct.unpack(
'!L', content)
738 elif (data_id & 0x00F0) == 0x70:
739 o[
'SampleTimeCoarse'], = struct.unpack(
'!L', content)
740 elif (data_id & 0x00F0) == 0x80:
741 o[
'startFrame'], o[
'endFrame'] = struct.unpack(
'!HH', content)
743 raise MTException(
"unknown packet: 0x%04X." % data_id)
746 def parse_orientation_data(data_id, content, ffmt):
748 if (data_id & 0x000C) == 0x00:
750 elif (data_id & 0x000C) == 0x04:
752 elif (data_id & 0x000C) == 0x08:
754 if (data_id & 0x00F0) == 0x10:
755 o[
'Q0'], o[
'Q1'], o[
'Q2'], o[
'Q3'] = struct.unpack(
'!'+4*ffmt,
757 elif (data_id & 0x00F0) == 0x20:
758 o[
'a'], o[
'b'], o[
'c'], o[
'd'], o[
'e'], o[
'f'], o[
'g'],\
759 o[
'h'], o[
'i'] = struct.unpack(
'!'+9*ffmt, content)
760 elif (data_id & 0x00F0) == 0x30:
761 o[
'Roll'], o[
'Pitch'], o[
'Yaw'] = struct.unpack(
'!'+3*ffmt,
764 raise MTException(
"unknown packet: 0x%04X." % data_id)
767 def parse_pressure(data_id, content, ffmt):
769 if (data_id & 0x00F0) == 0x10:
770 o[
'Pressure'], = struct.unpack(
'!L', content)
772 raise MTException(
"unknown packet: 0x%04X." % data_id)
775 def parse_acceleration(data_id, content, ffmt):
777 if (data_id & 0x000C) == 0x00:
779 elif (data_id & 0x000C) == 0x04:
781 elif (data_id & 0x000C) == 0x08:
783 if (data_id & 0x00F0) == 0x10:
784 o[
'Delta v.x'], o[
'Delta v.y'], o[
'Delta v.z'] = \
785 struct.unpack(
'!'+3*ffmt, content)
786 elif (data_id & 0x00F0) == 0x20:
787 o[
'accX'], o[
'accY'], o[
'accZ'] = \
788 struct.unpack(
'!'+3*ffmt, content)
789 elif (data_id & 0x00F0) == 0x30:
790 o[
'freeAccX'], o[
'freeAccY'], o[
'freeAccZ'] = \
791 struct.unpack(
'!'+3*ffmt, content)
792 elif (data_id & 0x00F0) == 0x40:
793 o[
'accX'], o[
'accY'], o[
'accZ'] = \
794 struct.unpack(
'!'+3*ffmt, content)
796 raise MTException(
"unknown packet: 0x%04X." % data_id)
799 def parse_position(data_id, content, ffmt):
801 if (data_id & 0x000C) == 0x00:
803 elif (data_id & 0x000C) == 0x04:
805 elif (data_id & 0x000C) == 0x08:
807 if (data_id & 0x00F0) == 0x10:
808 o[
'altMsl'], = struct.unpack(
'!'+ffmt, content)
809 elif (data_id & 0x00F0) == 0x20:
810 o[
'altEllipsoid'], = struct.unpack(
'!'+ffmt, content)
811 elif (data_id & 0x00F0) == 0x30:
812 o[
'ecefX'], o[
'ecefY'], o[
'ecefZ'] = \
813 struct.unpack(
'!'+3*ffmt, content)
814 elif (data_id & 0x00F0) == 0x40:
815 o[
'lat'], o[
'lon'] = struct.unpack(
'!'+2*ffmt, content)
817 raise MTException(
"unknown packet: 0x%04X." % data_id)
820 def parse_GNSS(data_id, content, ffmt):
822 if (data_id & 0x00F0) == 0x10:
823 o[
'itow'], o[
'year'], o[
'month'], o[
'day'], o[
'hour'],\
824 o[
'min'], o[
'sec'], o[
'valid'], o[
'tAcc'], o[
'nano'],\
825 o[
'fixtype'], o[
'flags'], o[
'numSV'], o[
'lon'], o[
'lat'],\
826 o[
'height'], o[
'hMSL'], o[
'hAcc'], o[
'vAcc'], o[
'velN'],\
827 o[
'velE'], o[
'velD'], o[
'gSpeed'], o[
'headMot'],\
828 o[
'sAcc'], o[
'headAcc'], o[
'headVeh'], o[
'gdop'],\
829 o[
'pdop'], o[
'tdop'], o[
'vdop'], o[
'hdop'], o[
'ndop'],\
831 struct.unpack(
'!IHBBBBBBIiBBBxiiiiIIiiiiiIIiHHHHHHH',
845 elif (data_id & 0x00F0) == 0x20:
846 o[
'iTOW'], o[
'numSvs'] = struct.unpack(
'!LBxxx', content[:8])
849 for i
in range(o[
'numSvs']):
850 ch[
'gnssId'], ch[
'svId'], ch[
'cno'], ch[
'flags'] = \
851 struct.unpack(
'!BBBB', content[8+4*i:12+4*i])
855 raise MTException(
"unknown packet: 0x%04X." % data_id)
858 def parse_angular_velocity(data_id, content, ffmt):
860 if (data_id & 0x000C) == 0x00:
862 elif (data_id & 0x000C) == 0x04:
864 elif (data_id & 0x000C) == 0x08:
866 if (data_id & 0x00F0) == 0x20:
867 o[
'gyrX'], o[
'gyrY'], o[
'gyrZ'] = \
868 struct.unpack(
'!'+3*ffmt, content)
869 elif (data_id & 0x00F0) == 0x30:
870 o[
'Delta q0'], o[
'Delta q1'], o[
'Delta q2'], o[
'Delta q3'] = \
871 struct.unpack(
'!'+4*ffmt, content)
872 elif (data_id & 0x00F0) == 0x40:
873 o[
'gyrX'], o[
'gyrY'], o[
'gyrZ'] = \
874 struct.unpack(
'!'+3*ffmt, content)
876 raise MTException(
"unknown packet: 0x%04X." % data_id)
879 def parse_GPS(data_id, content, ffmt):
881 if (data_id & 0x00F0) == 0x30:
882 o[
'iTOW'], g, p, t, v, h, n, e = \
883 struct.unpack(
'!LHHHHHHH', content)
884 o[
'gDOP'], o[
'pDOP'], o[
'tDOP'], o[
'vDOP'], o[
'hDOP'], \
885 o[
'nDOP'], o[
'eDOP'] = 0.01*g, 0.01*p, 0.01*t, \
886 0.01*v, 0.01*h, 0.01*n, 0.01*e
887 elif (data_id & 0x00F0) == 0x40:
888 o[
'iTOW'], o[
'fTOW'], o[
'Week'], o[
'gpsFix'], o[
'Flags'], \
889 o[
'ecefX'], o[
'ecefY'], o[
'ecefZ'], o[
'pAcc'], \
890 o[
'ecefVX'], o[
'ecefVY'], o[
'ecefVZ'], o[
'sAcc'], \
891 o[
'pDOP'], o[
'numSV'] = \
892 struct.unpack(
'!LlhBBlllLlllLHxBx', content)
895 elif (data_id & 0x00F0) == 0x80:
896 o[
'iTOW'], o[
'tAcc'], o[
'nano'], o[
'year'], o[
'month'], \
897 o[
'day'], o[
'hour'], o[
'min'], o[
'sec'], o[
'valid'] = \
898 struct.unpack(
'!LLlHBBBBBB', content)
899 elif (data_id & 0x00F0) == 0xA0:
900 o[
'iTOW'], o[
'numCh'] = struct.unpack(
'!LBxxx', content[:8])
903 for i
in range(o[
'numCh']):
904 ch[
'chn'], ch[
'svid'], ch[
'flags'], ch[
'quality'], \
905 ch[
'cno'], ch[
'elev'], ch[
'azim'], ch[
'prRes'] = \
906 struct.unpack(
'!BBBBBbhl', content[8+12*i:20+12*i])
908 o[
'channels'] = channels
910 raise MTException(
"unknown packet: 0x%04X." % data_id)
913 def parse_SCR(data_id, content, ffmt):
915 if (data_id & 0x00F0) == 0x10:
916 o[
'accX'], o[
'accY'], o[
'accZ'], o[
'gyrX'], o[
'gyrY'], \
917 o[
'gyrZ'], o[
'magX'], o[
'magY'], o[
'magZ'], o[
'Temp'] = \
918 struct.unpack(
"!9Hh", content)
919 elif (data_id & 0x00F0) == 0x20:
920 o[
'tempGyrX'], o[
'tempGyrY'], o[
'tempGyrZ'] = \
921 struct.unpack(
"!hhh", content)
923 raise MTException(
"unknown packet: 0x%04X." % data_id)
926 def parse_analog_in(data_id, content, ffmt):
928 if (data_id & 0x00F0) == 0x10:
929 o[
'analogIn1'], = struct.unpack(
"!H", content)
930 elif (data_id & 0x00F0) == 0x20:
931 o[
'analogIn2'], = struct.unpack(
"!H", content)
933 raise MTException(
"unknown packet: 0x%04X." % data_id)
936 def parse_magnetic(data_id, content, ffmt):
938 if (data_id & 0x000C) == 0x00:
940 elif (data_id & 0x000C) == 0x04:
942 elif (data_id & 0x000C) == 0x08:
944 if (data_id & 0x00F0) == 0x20:
945 o[
'magX'], o[
'magY'], o[
'magZ'] = \
946 struct.unpack(
"!3"+ffmt, content)
948 raise MTException(
"unknown packet: 0x%04X." % data_id)
951 def parse_velocity(data_id, content, ffmt):
953 if (data_id & 0x000C) == 0x00:
955 elif (data_id & 0x000C) == 0x04:
957 elif (data_id & 0x000C) == 0x08:
959 if (data_id & 0x00F0) == 0x10:
960 o[
'velX'], o[
'velY'], o[
'velZ'] = \
961 struct.unpack(
"!3"+ffmt, content)
963 raise MTException(
"unknown packet: 0x%04X." % data_id)
966 def parse_status(data_id, content, ffmt):
968 if (data_id & 0x00F0) == 0x10:
969 o[
'StatusByte'], = struct.unpack(
"!B", content)
970 elif (data_id & 0x00F0) == 0x20:
971 o[
'StatusWord'], = struct.unpack(
"!L", content)
972 elif (data_id & 0x00F0) == 0x40:
973 o[
'RSSI'], = struct.unpack(
"!b", content)
975 raise MTException(
"unknown packet: 0x%04X." % data_id)
982 data_id, size = struct.unpack(
'!HB', data[:3])
983 if (data_id & 0x0003) == 0x3:
985 elif (data_id & 0x0003) == 0x0:
988 raise MTException(
"fixed point precision not supported.")
989 content = data[3:3+size]
991 group = data_id & 0xF800
993 if group == XDIGroup.Temperature:
994 output.setdefault(
'Temperature', {}).update(
995 parse_temperature(data_id, content, ffmt))
996 elif group == XDIGroup.Timestamp:
997 output.setdefault(
'Timestamp', {}).update(
998 parse_timestamp(data_id, content, ffmt))
999 elif group == XDIGroup.OrientationData:
1000 output.setdefault(
'Orientation Data', {}).update(
1001 parse_orientation_data(data_id, content, ffmt))
1002 elif group == XDIGroup.Pressure:
1003 output.setdefault(
'Pressure', {}).update(
1004 parse_pressure(data_id, content, ffmt))
1005 elif group == XDIGroup.Acceleration:
1006 output.setdefault(
'Acceleration', {}).update(
1007 parse_acceleration(data_id, content, ffmt))
1008 elif group == XDIGroup.Position:
1009 output.setdefault(
'Position', {}).update(
1010 parse_position(data_id, content, ffmt))
1011 elif group == XDIGroup.GNSS:
1012 output.setdefault(
'GNSS', {}).update(
1013 parse_GNSS(data_id, content, ffmt))
1014 elif group == XDIGroup.AngularVelocity:
1015 output.setdefault(
'Angular Velocity', {}).update(
1016 parse_angular_velocity(data_id, content, ffmt))
1017 elif group == XDIGroup.GPS:
1018 output.setdefault(
'GPS', {}).update(
1019 parse_GPS(data_id, content, ffmt))
1020 elif group == XDIGroup.SensorComponentReadout:
1021 output.setdefault(
'SCR', {}).update(
1022 parse_SCR(data_id, content, ffmt))
1023 elif group == XDIGroup.AnalogIn:
1024 output.setdefault(
'Analog In', {}).update(
1025 parse_analog_in(data_id, content, ffmt))
1026 elif group == XDIGroup.Magnetic:
1027 output.setdefault(
'Magnetic', {}).update(
1028 parse_magnetic(data_id, content, ffmt))
1029 elif group == XDIGroup.Velocity:
1030 output.setdefault(
'Velocity', {}).update(
1031 parse_velocity(data_id, content, ffmt))
1032 elif group == XDIGroup.Status:
1033 output.setdefault(
'Status', {}).update(
1034 parse_status(data_id, content, ffmt))
1036 raise MTException(
"unknown XDI group: 0x%04X." % group)
1037 except struct.error:
1038 raise MTException(
"couldn't parse MTData2 message (data_id: " 1039 "0x%04X, size: %d)." % (data_id, size))
1043 """Read and parse a legacy measurement packet.""" 1047 if settings
is None:
1053 if mode & OutputMode.RAW:
1055 o[
'accX'], o[
'accY'], o[
'accZ'], o[
'gyrX'], o[
'gyrY'], \
1056 o[
'gyrZ'], o[
'magX'], o[
'magY'], o[
'magZ'], o[
'temp'] =\
1057 struct.unpack(
'!10H', data[:20])
1061 if mode & OutputMode.RAWGPS:
1063 o[
'Press'], o[
'bPrs'], o[
'ITOW'], o[
'LAT'], o[
'LON'],\
1064 o[
'ALT'], o[
'VEL_N'], o[
'VEL_E'], o[
'VEL_D'], o[
'Hacc'],\
1065 o[
'Vacc'], o[
'Sacc'], o[
'bGPS'] =\
1066 struct.unpack(
'!HBI6i3IB', data[:44])
1068 output[
'RAWGPS'] = o
1070 if mode & OutputMode.Temp:
1071 temp, = struct.unpack(
'!f', data[:4])
1073 output[
'Temp'] = temp
1075 if mode & OutputMode.Calib:
1077 if (settings & OutputSettings.Coordinates_NED):
1081 if not (settings & OutputSettings.CalibMode_GyrMag):
1082 o[
'accX'], o[
'accY'], o[
'accZ'] = struct.unpack(
'!3f',
1085 if not (settings & OutputSettings.CalibMode_AccMag):
1086 o[
'gyrX'], o[
'gyrY'], o[
'gyrZ'] = struct.unpack(
'!3f',
1089 if not (settings & OutputSettings.CalibMode_AccGyr):
1090 o[
'magX'], o[
'magY'], o[
'magZ'] = struct.unpack(
'!3f',
1095 if mode & OutputMode.Orient:
1097 if (settings & OutputSettings.Coordinates_NED):
1101 if settings & OutputSettings.OrientMode_Euler:
1102 o[
'roll'], o[
'pitch'], o[
'yaw'] = struct.unpack(
'!3f',
1105 elif settings & OutputSettings.OrientMode_Matrix:
1106 a, b, c, d, e, f, g, h, i = struct.unpack(
'!9f',
1109 o[
'matrix'] = ((a, b, c), (d, e, f), (g, h, i))
1111 q0, q1, q2, q3 = struct.unpack(
'!4f', data[:16])
1113 o[
'quaternion'] = (q0, q1, q2, q3)
1114 output[
'Orient'] = o
1116 if mode & OutputMode.Auxiliary:
1118 if not (settings & OutputSettings.AuxiliaryMode_NoAIN1):
1119 o[
'Ain_1'], = struct.unpack(
'!H', data[:2])
1121 if not (settings & OutputSettings.AuxiliaryMode_NoAIN2):
1122 o[
'Ain_2'], = struct.unpack(
'!H', data[:2])
1124 output[
'Auxiliary'] = o
1126 if mode & OutputMode.Position:
1128 o[
'Lat'], o[
'Lon'], o[
'Alt'] = struct.unpack(
'!3f', data[:12])
1132 if mode & OutputMode.Velocity:
1134 if (settings & OutputSettings.Coordinates_NED):
1138 o[
'Vel_X'], o[
'Vel_Y'], o[
'Vel_Z'] = struct.unpack(
'!3f',
1143 if mode & OutputMode.Status:
1144 status, = struct.unpack(
'!B', data[:1])
1146 output[
'Stat'] = status
1148 if settings & OutputSettings.Timestamp_SampleCnt:
1149 TS, = struct.unpack(
'!H', data[:2])
1151 output[
'Sample'] = TS
1153 if settings & OutputSettings.Timestamp_UTCTime:
1155 o[
'ns'], o[
'Year'], o[
'Month'], o[
'Day'], o[
'Hour'],\
1156 o[
'Minute'], o[
'Second'], o[
'Flags'] = struct.unpack(
1157 '!ihbbbbb', data[:12])
1159 output[
'Timestamp'] = o
1161 except struct.error
as e:
1162 raise MTException(
"could not parse MTData message.")
1164 raise MTException(
"could not parse MTData message (too long).")
1168 """Change the baudrate, reset the device and reopen communication.""" 1169 brid = Baudrates.get_BRID(baudrate)
1173 self.
device.baudrate = baudrate
1185 for port
in glob.glob(
"/dev/tty*S*"):
1187 print(
"Trying '%s'" % port)
1191 mtdev_list.append((port, br))
1201 baudrates = (115200, 460800, 921600, 230400, 57600, 38400, 19200, 9600)
1202 for br
in baudrates:
1204 print(
"Trying %d bd:" % br, end=
' ')
1207 mt =
MTDevice(port, br, timeout=timeout, verbose=verbose,
1208 initial_wait=initial_wait)
1209 except serial.SerialException:
1211 print(
"fail: unable to open device.")
1215 mt.GoToMeasurement()
def SetGnssPlatform(self, platform)
def GetCurrentScenario(self)
def SetErrorMode(self, error_mode)
def waitfor(self, size=1)
def SetLocationID(self, location_id)
def GoToMeasurement(self)
def GetStringOutputType(self)
def SetStringOutputType(self, string_output_type)
def GetAvailableScenarios(self)
def parse_MTData2(self, data)
def SetCurrentScenario(self, scenario_id)
def SetBaudrate(self, brid)
def IccCommand(self, command)
def GetOutputSettings(self)
def read_measurement(self, mode=None, settings=None)
def __init__(self, port, baudrate=115200, timeout=0.002, autoconf=True, config_mode=False, verbose=False, initial_wait=0.1)
def parse_MTData(self, data, mode=None, settings=None)
def ChangeBaudrate(self, baudrate)
def SetTransmitDelay(self, transmit_delay)
def GetTransmitDelay(self)
def SetUTCTime(self, ns, year, month, day, hour, minute, second, flag)
def configure_legacy(self, mode, settings, period=None, skipfactor=None)
High-level utility functions.
def GetHardwareVersion(self)
def GetOutputConfiguration(self)
def SetLatLonAlt(self, lat, lon, alt)
def SetOutputSettings(self, settings)
def GetSyncSettings(self)
def SetOutputConfiguration(self, output_configuration)
def RestoreFactoryDefaults(self)
def SetOutputMode(self, mode)
def _ensure_config_state(self)
def GetExtOutputMode(self)
def write_ack(self, mid, data=b'', n_resend=30, n_read=25)
def find_devices(timeout=0.002, verbose=False, initial_wait=0.1)
def SetNoRotation(self, duration)
def SetOutputSkipFactor(self, skipfactor)
def read_data_msg(self, buf=bytearray())
def SetAlignmentRotation(self, parameter, quaternion)
def find_baudrate(port, timeout=0.002, verbose=False, initial_wait=0.1)
Auto detect baudrate.
def ResetOrientation(self, code)
def AdjustUTCTime(self, ticks)
def SetOptionFlags(self, set_flags, clear_flags)
def GetConfiguration(self)
def GetAlignmentRotation(self, parameter)
def _ensure_measurement_state(self)
def SetExtOutputMode(self, ext_mode)
def GetGnssPlatform(self)
def SetSyncSettings(self, sync_settings)
def write_msg(self, mid, data=b'')
Low-level communication.
def Reset(self, go_to_config=False)
High-level functions.
def SetPeriod(self, period)
def auto_config_legacy(self)