upgrade_firmware.py
Go to the documentation of this file.
1 #! /usr/bin/env python2
2 
3 # firmware upgrade script still has not been converted to py3:
4 # https://github.com/UbiquityRobotics/ubiquity_motor/issues/149
5 # https://github.com/UbiquityRobotics/ubiquity_motor/pull/152
6 # the nasty workaround for Noetic has been to still have py2 installed on the system
7 # and trigger the update with that. That is why this script is still on py2.
8 
9 import requests
10 
11 import time
12 import serial
13 import os, sys, subprocess
14 
15 import argparse
16 
17 print("""-------------------------------------------------------------
18 Welcome to the Ubiquity Robotics Firmware Updater!
19 
20 Please make sure that you are not running any ROS nodes.
21 - sudo systemctl stop magni-base
22 
23 Note: Updating the firmware requires access to the internet.
24 -------------------------------------------------------------
25 """)
26 
27 TMP_FILE_PATH = '/tmp/firmware'
28 
29 
30 parser = argparse.ArgumentParser(description='Ubiquity Robotics Firmware Updater')
31 parser.add_argument('--device', help='Serial port to use (eg /dev/ttyAMA0)', default='/dev/ttyAMA0')
32 parser.add_argument('--file', help='Path to a firmware file', default='')
33 args = parser.parse_args()
34 
35 serial_port = args.device
36 
37 if (subprocess.call(['fuser','-v', serial_port], stdout=None) == 0):
38  print("")
39  print("Another process is using the serial port, cannot upgrade firmware")
40  print("Make sure you stopped any running ROS nodes. To stop default nodes:")
41  print("sudo systemctl stop magni-base")
42  sys.exit(1)
43 
44 if args.file:
45  path_to_file = args.file
46 
47 else:
48  path_to_file = TMP_FILE_PATH
49 
50  email = raw_input("Please enter your email address: ").strip()
51 
52  print(email)
53 
54  if email != "":
55  r = requests.post('https://api.ubiquityrobotics.com/token/', json = {'email': email})
56 
57  if r.status_code == 500:
58  print("Error: 500 Internal Server Error. Something went wrong, try again in a few minutes. If the error persists, contact support.")
59  elif r.status_code != 200:
60  print("Error with requesting a token %d" % r.status_code)
61  sys.exit(1)
62 
63  print("An access token was sent to your email address")
64 
65  token = raw_input("Please enter your access token: ").strip()
66 
67  version = raw_input("What version would you like (press enter for latest): ").strip()
68 
69  if version == "":
70  version = "latest"
71 
72  auth_headers = {"Authorization": "Bearer %s" % token}
73  r = requests.get('https://api.ubiquityrobotics.com/firmware/%s' % version, headers=auth_headers)
74 
75  if r.status_code == 401:
76  print("Error: 401 Unauthorized. Please make sure that your token is valid and entered correctly.")
77  sys.exit(1)
78  elif r.status_code == 404:
79  print("Error: 404 Not Found. The firmware version that you requested was not found.")
80  elif r.status_code == 500:
81  print("Error: 500 Internal Server Error. Something went wrong, try again in a few minutes. If the error persists, contact support.")
82  elif r.status_code != 200:
83  print("Error downloading firmware %d" % r.status_code)
84  sys.exit(1)
85 
86  with open(path_to_file, 'w+b') as fd:
87  for chunk in r.iter_content(chunk_size=128):
88  fd.write(chunk)
89 
90 print("\nUpdating firmware now. Do not power off the robot. This is expected to take less than a minute.")
91 
92 
93 # Begin the code firmware uploading code
94 DEBUG = False
95 class InvalidFileException(Exception):
96  def __init__(self):
97  pass
98 
99 class ReadFile:
100  def __init__(self, content, encrypted):
101  self.content = content
102  self.encrypted = encrypted
103  self.size = len(content)
104  def is_encrypted(self):
105  return self.encrypted
106  def read(self, bc):
107  if type(bc) != int or bc < 1:
108  raise Exception("Invalid read: " + str(bc))
109  r = self.content[0:bc]
110  self.content = self.content[bc:]
111  return r
112  def is_open(self):
113  return len(self.content) > 0
114  def get_size(self):
115  return self.size
116  def get_position(self):
117  return self.size - len(self.content)
118 
119 
120 def load_hex(filename):
121  f = open(filename, "r")
122 
123  fcontent = ""
124  fout = ""
125  encrypted = False
126  not_encrypted = False
127  for l in f:
128  for c in l:
129  if c == ":":
130  not_encrypted = True
131  if c == "+":
132  encrypted = True
133  if c == ":" and encrypted:
134  raise Exception("Partial encryption")
135  if c == "+" and not_encrypted:
136  raise Exception("Partial encryption")
137  if c not in ['\n', '\r', '\t', ' ', ':', "+"]:
138  fcontent = fcontent + c
139  while len(fcontent) >= 2:
140  fout += chr(int(fcontent[0:2], 16))
141  fcontent = fcontent[2:]
142  assert not_encrypted or encrypted
143  return ReadFile(fout, encrypted)
144 
145 def convert_num(arr):
146  out = 0
147  scale = 1
148  for b in arr:
149  out = out + ord(b) * scale
150  scale = scale * 256
151  return out
153  arrc = [x for x in arr]
154  arrc.reverse()
155  return convert_num(arrc)
156 
158  asilicon_id = [x for x in f.read(4)]
159  asilicon_id.reverse()
160  silicon_id = ""
161  for a in asilicon_id:
162  silicon_id += a
163 
164  silicon_rev = f.read(1)
165  checksum_type = f.read(1)
166  return silicon_id, silicon_rev, checksum_type
168  flash_id = file_convert_num(f.read(1))
169  row_number = file_convert_num(f.read(2))
170  data_length = file_convert_num(f.read(2))
171  data = f.read(data_length)
172  checksum = file_convert_num(f.read(1))
173  return flash_id, row_number, data_length, data, checksum
174 
176  flash_id = file_convert_num(f.read(1))
177  row_number = file_convert_num(f.read(2))
178  data_length = file_convert_num(f.read(2))
179  data = f.read(data_length + 12)
180  return flash_id, row_number, data_length, data
181 
182 def cstr(x):
183  if type(x) == str:
184  return str(x) + " <- " + str([hex(ord(l)) for l in x])
185  else:
186  return str(type(x)) + " -> " + str(x)
187 
188 class Packet:
189  def __init__(self, ser, cmd):
190  self.ser = ser
191  self.cmd = chr(cmd)
192  self.out = []
193  self.is_sent = False
194  self.id_ptr = 0
195  self.inp = []
196  self.ignore_response = False
197 
198  def do_ignore_response(self): #For exit packet
199  self.ignore_response = True
200 
201  def require_len(self, mylen):
202  if type(mylen) != int or mylen < 0:
203  raise Exception("Invalid length goal for packet: " + cstr(mylen))
204  if len(self.inp) != mylen:
205  raise Exception("Invalid length for packet: " + len(self.inp) + " should be " + str(inp))
206 
207  def write(self, x, mylen = None):
208  if mylen != None:
209  num = long(x)
210  numa = []
211  for step in xrange(0, mylen):
212  numa.append(chr(num % 256))
213  num = num / 256
214  self.out.extend(numa)
215  return
216  if self.is_sent:
217  raise Exception("Write to packet after send")
218  if type(x) == list:
219  self.out.extend(x)
220  elif type(x) == str:
221  self.out.extend([i for i in x])
222  else:
223  self.out.append(chr(x))
224 
225  def read(self, bytes):
226  if not self.is_sent:
227  raise Exception("Read from packet before send")
228  if type(bytes) != int or bytes < 1:
229  raise Exception("Invalid byte read: " + cstr(bytes))
230  r = self.inp[self.id_ptr:self.id_ptr+bytes]
231  self.id_ptr += bytes
232  return r
233 
234  def read_num(self, bytes):
235  return convert_num(self.read(bytes))
236 
237  def send(self):
238  self.is_sent = True
239  init_bytes = [chr(0x1), self.cmd,
240  chr((len(self.out) / 1) % 256),
241  chr((len(self.out) / 256) % 256)]
242  init_bytes.extend(self.out)
243 
244  if DEBUG: print("-"*120)
245  if DEBUG: print("Send packet: " + cstr(self.out))
246 
247  ib_buf = ""
248  for c in init_bytes: ib_buf += c
249  if DEBUG: print("Packet without end and checksum is", cstr(ib_buf))
250 
251  checksum = compute_checksum(ib_buf)
252  if DEBUG: print("Checksum is", checksum, "LSB", (checksum / 1) % 256, "MSB", (checksum / 256) % 256)
253 
254  ib_buf += chr((checksum / 1) % 256)
255  ib_buf += chr((checksum / 256) % 256)
256  ib_buf += chr(0x17)
257 
258  self.ser.write(ib_buf)
259  self.ser.flush()
260 
261  if self.ignore_response:
262  return
263 
264  #Now we should read back.
265  header = self.ser.read(4)
266  start_byte = header[0:1]
267  if start_byte != str(chr(0x01)):
268  raise Exception("Packet did not start with valid start byte, instead: " + cstr(start_byte) + " -- header: " + cstr(header))
269  status_code = header[1:2]
270  if status_code != str(chr(0x00)):
271  raise Exception("Packet did not start with valid status, instead: " + cstr(status_code) + " -- header: " + cstr(header))
272  data_length_raw = header[2:4]
273  data_length = (ord(data_length_raw[0])) + (ord(data_length_raw[1])) * 256
274  if DEBUG: print("Read in", data_length, "from", cstr(data_length_raw))
275  self.inp = self.ser.read(data_length)
276  if len(self.inp) != data_length:
277  raise Exception("We tried to read: " + str(data_length) + " but instead read: " + str(len(self.inp)) + " -- header: " + cstr(header))
278 
279  #Append many arrays:
280  checksum_bytes = start_byte + status_code + data_length_raw + self.inp
281  checksum_comp = compute_checksum(checksum_bytes)
282 
283  checksum_raw = self.ser.read(2)
284  checksum = (ord(checksum_raw[0])) + (ord(checksum_raw[1]) * 256)
285  if checksum != checksum_comp:
286  raise Exception("Invalid checksum for packet: " + cstr(checksum) + " != " + cstr(checksum_comp) + " -- header: " + cstr(header))
287 
288  end_byte = self.ser.read(1)
289  if end_byte != str(chr(0x17)):
290  raise Exception("Invalid terminator for packet: " + cstr(end_byte) + " -- header: " + cstr(header))
291 
292 def compute_checksum(bytes):
293  return twos_complement(bytes)
294 
295 def twos_complement(bytes):
296  x = 0
297  for b in bytes:
298  x += int(ord(b))
299  x = x & 0xFFFF
300  x = x ^ 0xFFFF
301  x = x + 1
302  return x
303 
304 #CRC16-CCITT
305 def crc16_ccitt(bytes):
306  crc = 0xFFFF
307  x = 0
308  def ushort(x):
309  return x & 0xFFFF
310  for byte in bytes:
311  x = (crc >> 8) ^ (int(ord(byte)) & 0xFF)
312  x ^= (x >> 4)
313  crc = ushort(ushort(crc << 8) ^ ushort(x << 12) ^ ushort(x << 5) ^ ushort(x))
314  return crc
315 
316 def send__erase_row(ser, flash_id, row_number):
317  p = Packet(ser, 0x34)
318  p.write(flash_id, 1)
319  p.write(row_number, 2)
320  p.send()
321  p.require_len(0)
322  return None
323 
325  p = Packet(ser, 0x38)
326  p.send()
327  p.require_len(8)
328  silicon_id = p.read(4)
329  silicon_rev = p.read(1)
330  bootloader_version = p.read(3)
331  return silicon_id, silicon_rev, bootloader_version
332 
333 def send__get_flash_size(ser, flash_id):
334  p = Packet(ser, 0x32)
335  p.write(flash_id, 1)
336  p.send()
337  p.require_len(4)
338  flash_first_row = p.read_num(2)
339  flash_last_row = p.read_num(2)
340  return flash_first_row, flash_last_row
341 
342 def send__program_row(ser, flash_id, row_number, data_next):
343  p = Packet(ser, 0x39)
344  p.write(flash_id, 1)
345  p.write(row_number, 2)
346  p.write(data_next)
347  p.send()
348  p.require_len(0)
349  return None
350 
351 def send__encrypted_program_row(ser, flash_id, row_number, data_next):
352  p = Packet(ser, 0x3D)
353  p.write(flash_id, 1)
354  p.write(row_number, 2)
355  p.write(data_next)
356  p.send()
357  p.require_len(0)
358  return None
359 
360 def send__verify_row(ser, flash_id, row_number):
361  p = Packet(ser, 0x3A)
362  p.write(flash_id, 1)
363  p.write(row_number, 2)
364  p.send()
365  p.require_len(1)
366  checksum = p.read_num(1)
367  return checksum
368 
369 def send__data(ser, data_next):
370  p = Packet(ser, 0x37)
371  p.write(data_next)
372  p.send()
373  p.require_len(0)
374  return None
375 
377  p = Packet(ser, 0x31)
378  p.send()
379  p.require_len(1)
380  r = p.read_num(1)
381  return r
382 
383 def send__get_application_status(ser, app_number):
384  p = Packet(ser, 0x33)
385  p.write(app_number, 1)
386  p.send()
387  p.require_len(2)
388  valid_app_number = p.read_num(1)
389  active_app_number = p.read_num(1)
390  return valid_app_number, active_app_number
391 
392 def send__set_active_application(ser, app_number):
393  p = Packet(ser, 0x36)
394  p.write(app_number, 1)
395  p.send()
396  p.require_len(0)
397  return
398 
400  p = Packet(ser, 0x35)
401  p.send()
402  p.require_len(0)
403  return
404 
406  p = Packet(ser, 0x3B)
407  p.do_ignore_response()
408  p.send()
409  p.require_len(0)
410  return
411 
412 def add8(a, b):
413  return ((a & 0xFF) + (b & 0xFF)) & 0xFF
414 
415 def convert_checksum(checksum, flash_id, row_number, row_size):
416  r = add8(checksum, flash_id)
417  r = add8(r, row_number)
418  r = add8(r, row_number >> 8)
419  r = add8(r, row_size)
420  r = add8(r, row_size >> 8)
421  if DEBUG: print("Checksum convert:", checksum, flash_id, row_number, row_size, "out:", r)
422  return r
423 
424 hex_stream = None
425 try:
426  hex_stream = load_hex(path_to_file)
427 except IOError:
428  print("Unable to open file: ", path_to_file)
429 except InvalidFileException:
430  print("File is not of the correct format")
431 print("Encryption:", hex_stream.is_encrypted())
432 
433 ser = serial.Serial(serial_port, 38400, timeout=1, bytesize=8,
434  parity=serial.PARITY_NONE, stopbits=1, xonxoff=0, rtscts=0)
435 
436 # Write to request the bootloader to the correct state.
437 request_bootloader = [0x7E, 0x3E, 0x01, 0x01, 0x01, 0x01, 0x01]
438 request_bootloader.append(0xFF - sum(request_bootloader[1:]))
439 ser.write("".join([chr(x) for x in request_bootloader]))
440 ser.flush()
441 ser.read(65536) # Clear out the response
442 
443 silicon_id, silicon_rev, bootloader_version = send__enter_bootloader(ser)
444 file_silicon_id, file_silicon_rev, file_checksum_type = read__header(hex_stream)
445 if file_silicon_id != silicon_id:
446  raise Exception("The silicon ids did not match " + cstr(file_silicon_id) + " != " + cstr(silicon_id))
447 if file_silicon_rev != silicon_rev:
448  raise Exception("The silicon revs did not match")
449 
450 if file_checksum_type != str(chr(0)):
451  raise Exception("Invalid checksum type: " + cstr(file_checksum_type))
452 
453 last_percent = 0.0
454 
455 #send__sync_bootloader(ser)
456 
457 while hex_stream.is_open():
458  if not hex_stream.is_encrypted():
459  flash_id, row_number, data_length, data, checksum = read__flash_line(hex_stream)
460  print("Writing row", row_number, "for", flash_id, "process at", ("%.2f" % last_percent), "percent completion")
461  last_percent = (100.0 * hex_stream.get_position()) / hex_stream.get_size()
462 
463  real_checksum = convert_checksum(checksum, flash_id, row_number, len(data))
464  if DEBUG: print("Write", flash_id, row_number, data_length, len(data), checksum, "->", real_checksum)
465 
466  flash_first_row, flash_last_row = send__get_flash_size(ser, flash_id)
467  if DEBUG: print("Flash: ", flash_first_row, flash_last_row)
468  if row_number < flash_first_row or row_number > flash_last_row:
469  raise Exception("Invalid row: must be between " + str(flash_first_row)
470  + " and " + str(flash_last_row) + " but is " + str(row_number))
471 
472  send__erase_row(ser, flash_id, row_number)
473 
474  data_strip = data
475  data_sent = 0
476  while data_strip != [] and data_strip != "":
477  bytes_at_a_time = 256
478  data_next = data_strip[0:bytes_at_a_time]
479  data_strip = data_strip[bytes_at_a_time:]
480  data_sent += bytes_at_a_time
481  if DEBUG: print("Sending", len(data_next), "Sent: ", data_sent, "Left: ", len(data_strip))
482  if data_strip == [] or data_strip == "":
483  #Program write
484  if DEBUG: print("Last packet")
485  send__program_row(ser, flash_id, row_number, data_next)
486  comp_checksum = send__verify_row(ser, flash_id, row_number)
487  if real_checksum != comp_checksum:
488  raise Exception("Checksum error at " + flash_id + " " + row_number
489  + ". We wanted " + cstr(real_checksum) + " but we got " + cstr(comp_checksum))
490  else:
491  if DEBUG: print("Checksum valid", flash_id, row_number, "->", real_checksum, "==", comp_checksum)
492  else:
493  send__data(ser, data_next)
494  else:
495  flash_id, row_number, data_length, data = read__encrypted_flash_line(hex_stream)
496  print("Writing row", row_number, "for", flash_id, "process at", ("%.2f" % last_percent), "percent completion")
497  last_percent = (100.0 * hex_stream.get_position()) / hex_stream.get_size()
498  assert len(data) < 256
499  send__encrypted_program_row(ser, flash_id, row_number, data)
500 
501 if send__verify_checksum(ser) == 0:
502  raise Exception("Application checksum invalid")
503 
504 try:
505  hex_stream.close()
506 except:
507  pass
508 
510 
511 ser.close()
512 
513 print("Finished upgrading firmware!")
upgrade_firmware.Packet.out
out
Definition: upgrade_firmware.py:192
upgrade_firmware.ReadFile.get_size
def get_size(self)
Definition: upgrade_firmware.py:114
upgrade_firmware.add8
def add8(a, b)
Definition: upgrade_firmware.py:412
upgrade_firmware.Packet.inp
inp
Definition: upgrade_firmware.py:195
upgrade_firmware.read__header
def read__header(f)
Definition: upgrade_firmware.py:157
upgrade_firmware.send__encrypted_program_row
def send__encrypted_program_row(ser, flash_id, row_number, data_next)
Definition: upgrade_firmware.py:351
upgrade_firmware.send__erase_row
def send__erase_row(ser, flash_id, row_number)
Definition: upgrade_firmware.py:316
upgrade_firmware.Packet.cmd
cmd
Definition: upgrade_firmware.py:191
upgrade_firmware.send__program_row
def send__program_row(ser, flash_id, row_number, data_next)
Definition: upgrade_firmware.py:342
upgrade_firmware.InvalidFileException
Definition: upgrade_firmware.py:95
upgrade_firmware.Packet.id_ptr
id_ptr
Definition: upgrade_firmware.py:194
upgrade_firmware.Packet.read_num
def read_num(self, bytes)
Definition: upgrade_firmware.py:234
upgrade_firmware.send__verify_checksum
def send__verify_checksum(ser)
Definition: upgrade_firmware.py:376
upgrade_firmware.Packet.is_sent
is_sent
Definition: upgrade_firmware.py:193
upgrade_firmware.ReadFile.read
def read(self, bc)
Definition: upgrade_firmware.py:106
upgrade_firmware.ReadFile.__init__
def __init__(self, content, encrypted)
Definition: upgrade_firmware.py:100
upgrade_firmware.ReadFile.is_encrypted
def is_encrypted(self)
Definition: upgrade_firmware.py:104
upgrade_firmware.Packet.write
def write(self, x, mylen=None)
Definition: upgrade_firmware.py:207
upgrade_firmware.convert_checksum
def convert_checksum(checksum, flash_id, row_number, row_size)
Definition: upgrade_firmware.py:415
upgrade_firmware.Packet.require_len
def require_len(self, mylen)
Definition: upgrade_firmware.py:201
upgrade_firmware.Packet.ser
ser
Definition: upgrade_firmware.py:190
upgrade_firmware.send__sync_bootloader
def send__sync_bootloader(ser)
Definition: upgrade_firmware.py:399
upgrade_firmware.send__get_flash_size
def send__get_flash_size(ser, flash_id)
Definition: upgrade_firmware.py:333
upgrade_firmware.send__data
def send__data(ser, data_next)
Definition: upgrade_firmware.py:369
upgrade_firmware.ReadFile.content
content
Definition: upgrade_firmware.py:101
upgrade_firmware.cstr
def cstr(x)
Definition: upgrade_firmware.py:182
upgrade_firmware.send__get_application_status
def send__get_application_status(ser, app_number)
Definition: upgrade_firmware.py:383
upgrade_firmware.crc16_ccitt
def crc16_ccitt(bytes)
Definition: upgrade_firmware.py:305
upgrade_firmware.read__encrypted_flash_line
def read__encrypted_flash_line(f)
Definition: upgrade_firmware.py:175
upgrade_firmware.send__enter_bootloader
def send__enter_bootloader(ser)
Definition: upgrade_firmware.py:324
upgrade_firmware.twos_complement
def twos_complement(bytes)
Definition: upgrade_firmware.py:295
serial::Serial
Definition: serial.h:147
upgrade_firmware.ReadFile.get_position
def get_position(self)
Definition: upgrade_firmware.py:116
upgrade_firmware.ReadFile.size
size
Definition: upgrade_firmware.py:103
upgrade_firmware.InvalidFileException.__init__
def __init__(self)
Definition: upgrade_firmware.py:96
upgrade_firmware.load_hex
def load_hex(filename)
Definition: upgrade_firmware.py:120
upgrade_firmware.convert_num
def convert_num(arr)
Definition: upgrade_firmware.py:145
upgrade_firmware.Packet.do_ignore_response
def do_ignore_response(self)
Definition: upgrade_firmware.py:198
upgrade_firmware.Packet.send
def send(self)
Definition: upgrade_firmware.py:237
upgrade_firmware.send__exit_bootloader
def send__exit_bootloader(ser)
Definition: upgrade_firmware.py:405
upgrade_firmware.ReadFile
Definition: upgrade_firmware.py:99
upgrade_firmware.file_convert_num
def file_convert_num(arr)
Definition: upgrade_firmware.py:152
upgrade_firmware.Packet.ignore_response
ignore_response
Definition: upgrade_firmware.py:196
upgrade_firmware.send__verify_row
def send__verify_row(ser, flash_id, row_number)
Definition: upgrade_firmware.py:360
upgrade_firmware.read__flash_line
def read__flash_line(f)
Definition: upgrade_firmware.py:167
upgrade_firmware.send__set_active_application
def send__set_active_application(ser, app_number)
Definition: upgrade_firmware.py:392
upgrade_firmware.Packet.read
def read(self, bytes)
Definition: upgrade_firmware.py:225
upgrade_firmware.Packet.__init__
def __init__(self, ser, cmd)
Definition: upgrade_firmware.py:189
upgrade_firmware.compute_checksum
def compute_checksum(bytes)
Definition: upgrade_firmware.py:292
upgrade_firmware.ReadFile.is_open
def is_open(self)
Definition: upgrade_firmware.py:112
upgrade_firmware.ReadFile.encrypted
encrypted
Definition: upgrade_firmware.py:102
upgrade_firmware.Packet
Definition: upgrade_firmware.py:188


ubiquity_motor
Author(s):
autogenerated on Thu Nov 16 2023 03:30:56