upgrade_firmware.py
Go to the documentation of this file.
1 #! /usr/bin/python
2 import requests
3 
4 import time
5 import serial
6 import os, sys, subprocess
7 
8 import argparse
9 
10 print """-------------------------------------------------------------
11 Welcome to the Ubiquity Robotics Firmware Updater!
12 
13 Please make sure that you are not running any ROS nodes.
14 - sudo systemctl stop magni-base
15 
16 Note: Updating the firmware requires access to the internet.
17 -------------------------------------------------------------
18 """
19 
20 TMP_FILE_PATH = '/tmp/firmware'
21 
22 
23 parser = argparse.ArgumentParser(description='Ubiquity Robotics Firmware Updater')
24 parser.add_argument('--device', help='Serial port to use (eg /dev/ttyAMA0)', default='/dev/ttyAMA0')
25 parser.add_argument('--file', help='Path to a firmware file', default='')
26 args = parser.parse_args()
27 
28 serial_port = args.device
29 
30 if (subprocess.call(['fuser','-v', serial_port], stdout=None) == 0):
31  print ""
32  print "Another process is using the serial port, cannot upgrade firmware"
33  print "Make sure you stopped any running ROS nodes. To stop default nodes:"
34  print "sudo systemctl stop magni-base"
35  sys.exit(1)
36 
37 if args.file:
38  path_to_file = args.file
39 
40 else:
41  path_to_file = TMP_FILE_PATH
42 
43  email = raw_input("Please enter your email address: ").strip()
44 
45  if email != "":
46  r = requests.post('https://api.ubiquityrobotics.com/token/', json = {'email': email})
47 
48  if r.status_code != 200:
49  print "Error with requesting a token %d" % r.status_code
50  sys.exit(1)
51 
52  print "An access token was sent to your email address"
53 
54  token = raw_input("Please enter your access token: ").strip()
55 
56  version = raw_input("What version would you like (press enter for latest): ").strip()
57 
58  if version == "":
59  version = "latest"
60 
61  auth_headers = {"Authorization": "Bearer %s" % token}
62  r = requests.get('https://api.ubiquityrobotics.com/firmware/%s' % version, headers=auth_headers)
63 
64  if r.status_code != 200:
65  print "Error downloading firmware %d" % r.status_code
66  sys.exit(1)
67 
68  with open(path_to_file, 'w+b') as fd:
69  for chunk in r.iter_content(chunk_size=128):
70  fd.write(chunk)
71 
72 print "\nUpdating firmware now. Do not power off the robot. This is expected to take less than a minute."
73 
74 
75 # Begin the code firmware uploading code
76 DEBUG = False
77 class InvalidFileException(Exception):
78  def __init__(self):
79  pass
80 
81 class ReadFile:
82  def __init__(self, content, encrypted):
83  self.content = content
84  self.encrypted = encrypted
85  self.size = len(content)
86  def is_encrypted(self):
87  return self.encrypted
88  def read(self, bc):
89  if type(bc) != int or bc < 1:
90  raise Exception("Invalid read: " + str(bc))
91  r = self.content[0:bc]
92  self.content = self.content[bc:]
93  return r
94  def is_open(self):
95  return len(self.content) > 0
96  def get_size(self):
97  return self.size
98  def get_position(self):
99  return self.size - len(self.content)
100 
101 
102 def load_hex(filename):
103  f = open(filename, "r")
104 
105  fcontent = ""
106  fout = ""
107  encrypted = False
108  not_encrypted = False
109  for l in f:
110  for c in l:
111  if c == ":":
112  not_encrypted = True
113  if c == "+":
114  encrypted = True
115  if c == ":" and encrypted:
116  raise Exception("Partial encryption")
117  if c == "+" and not_encrypted:
118  raise Exception("Partial encryption")
119  if c not in ['\n', '\r', '\t', ' ', ':', "+"]:
120  fcontent = fcontent + c
121  while len(fcontent) >= 2:
122  fout += chr(int(fcontent[0:2], 16))
123  fcontent = fcontent[2:]
124  assert not_encrypted or encrypted
125  return ReadFile(fout, encrypted)
126 
127 def convert_num(arr):
128  out = 0
129  scale = 1
130  for b in arr:
131  out = out + ord(b) * scale
132  scale = scale * 256
133  return out
135  arrc = [x for x in arr]
136  arrc.reverse()
137  return convert_num(arrc)
138 
140  asilicon_id = [x for x in f.read(4)]
141  asilicon_id.reverse()
142  silicon_id = ""
143  for a in asilicon_id:
144  silicon_id += a
145 
146  silicon_rev = f.read(1)
147  checksum_type = f.read(1)
148  return silicon_id, silicon_rev, checksum_type
150  flash_id = file_convert_num(f.read(1))
151  row_number = file_convert_num(f.read(2))
152  data_length = file_convert_num(f.read(2))
153  data = f.read(data_length)
154  checksum = file_convert_num(f.read(1))
155  return flash_id, row_number, data_length, data, checksum
156 
158  flash_id = file_convert_num(f.read(1))
159  row_number = file_convert_num(f.read(2))
160  data_length = file_convert_num(f.read(2))
161  data = f.read(data_length + 12)
162  return flash_id, row_number, data_length, data
163 
164 def cstr(x):
165  if type(x) == str:
166  return str(x) + " <- " + str([hex(ord(l)) for l in x])
167  else:
168  return str(type(x)) + " -> " + str(x)
169 
170 class Packet:
171  def __init__(self, ser, cmd):
172  self.ser = ser
173  self.cmd = chr(cmd)
174  self.out = []
175  self.is_sent = False
176  self.id_ptr = 0
177  self.inp = []
178  self.ignore_response = False
179 
180  def do_ignore_response(self): #For exit packet
181  self.ignore_response = True
182 
183  def require_len(self, mylen):
184  if type(mylen) != int or mylen < 0:
185  raise Exception("Invalid length goal for packet: " + cstr(mylen))
186  if len(self.inp) != mylen:
187  raise Exception("Invalid length for packet: " + len(self.inp) + " should be " + str(inp))
188 
189  def write(self, x, mylen = None):
190  if mylen != None:
191  num = long(x)
192  numa = []
193  for step in xrange(0, mylen):
194  numa.append(chr(num % 256))
195  num = num / 256
196  self.out.extend(numa)
197  return
198  if self.is_sent:
199  raise Exception("Write to packet after send")
200  if type(x) == list:
201  self.out.extend(x)
202  elif type(x) == str:
203  self.out.extend([i for i in x])
204  else:
205  self.out.append(chr(x))
206 
207  def read(self, bytes):
208  if not self.is_sent:
209  raise Exception("Read from packet before send")
210  if type(bytes) != int or bytes < 1:
211  raise Exception("Invalid byte read: " + cstr(bytes))
212  r = self.inp[self.id_ptr:self.id_ptr+bytes]
213  self.id_ptr += bytes
214  return r
215 
216  def read_num(self, bytes):
217  return convert_num(self.read(bytes))
218 
219  def send(self):
220  self.is_sent = True
221  init_bytes = [chr(0x1), self.cmd,
222  chr((len(self.out) / 001) % 256),
223  chr((len(self.out) / 256) % 256)]
224  init_bytes.extend(self.out)
225 
226  if DEBUG: print "-"*120
227  if DEBUG: print "Send packet: " + cstr(self.out)
228 
229  ib_buf = ""
230  for c in init_bytes: ib_buf += c
231  if DEBUG: print "Packet without end and checksum is", cstr(ib_buf)
232 
233  checksum = compute_checksum(ib_buf)
234  if DEBUG: print "Checksum is", checksum, "LSB", (checksum / 001) % 256, "MSB", (checksum / 256) % 256
235 
236  ib_buf += chr((checksum / 001) % 256)
237  ib_buf += chr((checksum / 256) % 256)
238  ib_buf += chr(0x17)
239 
240  self.ser.write(ib_buf)
241  self.ser.flush()
242 
243  if self.ignore_response:
244  return
245 
246  #Now we should read back.
247  header = self.ser.read(4)
248  start_byte = header[0:1]
249  if start_byte != str(chr(0x01)):
250  raise Exception("Packet did not start with valid start byte, instead: " + cstr(start_byte) + " -- header: " + cstr(header))
251  status_code = header[1:2]
252  if status_code != str(chr(0x00)):
253  raise Exception("Packet did not start with valid status, instead: " + cstr(status_code) + " -- header: " + cstr(header))
254  data_length_raw = header[2:4]
255  data_length = (ord(data_length_raw[0])) + (ord(data_length_raw[1])) * 256
256  if DEBUG: print "Read in", data_length, "from", cstr(data_length_raw)
257  self.inp = self.ser.read(data_length)
258  if len(self.inp) != data_length:
259  raise Exception("We tried to read: " + str(data_length) + " but instead read: " + str(len(self.inp)) + " -- header: " + cstr(header))
260 
261  #Append many arrays:
262  checksum_bytes = start_byte + status_code + data_length_raw + self.inp
263  checksum_comp = compute_checksum(checksum_bytes)
264 
265  checksum_raw = self.ser.read(2)
266  checksum = (ord(checksum_raw[0])) + (ord(checksum_raw[1]) * 256)
267  if checksum != checksum_comp:
268  raise Exception("Invalid checksum for packet: " + cstr(checksum) + " != " + cstr(checksum_comp) + " -- header: " + cstr(header))
269 
270  end_byte = self.ser.read(1)
271  if end_byte != str(chr(0x17)):
272  raise Exception("Invalid terminator for packet: " + cstr(end_byte) + " -- header: " + cstr(header))
273 
274 def compute_checksum(bytes):
275  return twos_complement(bytes)
276 
277 def twos_complement(bytes):
278  x = 0
279  for b in bytes:
280  x += int(ord(b))
281  x = x & 0xFFFF
282  x = x ^ 0xFFFF
283  x = x + 1
284  return x
285 
286 #CRC16-CCITT
287 def crc16_ccitt(bytes):
288  crc = 0xFFFF
289  x = 0
290  def ushort(x):
291  return x & 0xFFFF
292  for byte in bytes:
293  x = (crc >> 8) ^ (int(ord(byte)) & 0xFF)
294  x ^= (x >> 4)
295  crc = ushort(ushort(crc << 8) ^ ushort(x << 12) ^ ushort(x << 5) ^ ushort(x))
296  return crc
297 
298 def send__erase_row(ser, flash_id, row_number):
299  p = Packet(ser, 0x34)
300  p.write(flash_id, 1)
301  p.write(row_number, 2)
302  p.send()
303  p.require_len(0)
304  return None
305 
307  p = Packet(ser, 0x38)
308  p.send()
309  p.require_len(8)
310  silicon_id = p.read(4)
311  silicon_rev = p.read(1)
312  bootloader_version = p.read(3)
313  return silicon_id, silicon_rev, bootloader_version
314 
315 def send__get_flash_size(ser, flash_id):
316  p = Packet(ser, 0x32)
317  p.write(flash_id, 1)
318  p.send()
319  p.require_len(4)
320  flash_first_row = p.read_num(2)
321  flash_last_row = p.read_num(2)
322  return flash_first_row, flash_last_row
323 
324 def send__program_row(ser, flash_id, row_number, data_next):
325  p = Packet(ser, 0x39)
326  p.write(flash_id, 1)
327  p.write(row_number, 2)
328  p.write(data_next)
329  p.send()
330  p.require_len(0)
331  return None
332 
333 def send__encrypted_program_row(ser, flash_id, row_number, data_next):
334  p = Packet(ser, 0x3D)
335  p.write(flash_id, 1)
336  p.write(row_number, 2)
337  p.write(data_next)
338  p.send()
339  p.require_len(0)
340  return None
341 
342 def send__verify_row(ser, flash_id, row_number):
343  p = Packet(ser, 0x3A)
344  p.write(flash_id, 1)
345  p.write(row_number, 2)
346  p.send()
347  p.require_len(1)
348  checksum = p.read_num(1)
349  return checksum
350 
351 def send__data(ser, data_next):
352  p = Packet(ser, 0x37)
353  p.write(data_next)
354  p.send()
355  p.require_len(0)
356  return None
357 
359  p = Packet(ser, 0x31)
360  p.send()
361  p.require_len(1)
362  r = p.read_num(1)
363  return r
364 
365 def send__get_application_status(ser, app_number):
366  p = Packet(ser, 0x33)
367  p.write(app_number, 1)
368  p.send()
369  p.require_len(2)
370  valid_app_number = p.read_num(1)
371  active_app_number = p.read_num(1)
372  return valid_app_number, active_app_number
373 
374 def send__set_active_application(ser, app_number):
375  p = Packet(ser, 0x36)
376  p.write(app_number, 1)
377  p.send()
378  p.require_len(0)
379  return
380 
382  p = Packet(ser, 0x35)
383  p.send()
384  p.require_len(0)
385  return
386 
388  p = Packet(ser, 0x3B)
389  p.do_ignore_response()
390  p.send()
391  p.require_len(0)
392  return
393 
394 def add8(a, b):
395  return ((a & 0xFF) + (b & 0xFF)) & 0xFF
396 
397 def convert_checksum(checksum, flash_id, row_number, row_size):
398  r = add8(checksum, flash_id)
399  r = add8(r, row_number)
400  r = add8(r, row_number >> 8)
401  r = add8(r, row_size)
402  r = add8(r, row_size >> 8)
403  if DEBUG: print "Checksum convert:", checksum, flash_id, row_number, row_size, "out:", r
404  return r
405 
406 hex_stream = None
407 try:
408  hex_stream = load_hex(path_to_file)
409 except IOError:
410  print "Unable to open file: ", path_to_file
411 except InvalidFileException:
412  print "File is not of the correct format"
413 print("Encryption:", hex_stream.is_encrypted())
414 
415 ser = serial.Serial(serial_port, 38400, timeout=1, bytesize=8,
416  parity=serial.PARITY_NONE, stopbits=1, xonxoff=0, rtscts=0)
417 
418 # Write to request the bootloader to the correct state.
419 request_bootloader = [0x7E, 0x3E, 0x01, 0x01, 0x01, 0x01, 0x01]
420 request_bootloader.append(0xFF - sum(request_bootloader[1:]))
421 ser.write("".join([chr(x) for x in request_bootloader]))
422 ser.flush()
423 ser.read(65536) # Clear out the response
424 
425 silicon_id, silicon_rev, bootloader_version = send__enter_bootloader(ser)
426 file_silicon_id, file_silicon_rev, file_checksum_type = read__header(hex_stream)
427 if file_silicon_id != silicon_id:
428  raise Exception("The silicon ids did not match " + cstr(file_silicon_id) + " != " + cstr(silicon_id))
429 if file_silicon_rev != silicon_rev:
430  raise Exception("The silicon revs did not match")
431 
432 if file_checksum_type != str(chr(0)):
433  raise Exception("Invalid checksum type: " + cstr(file_checksum_type))
434 
435 last_percent = 0.0
436 
437 #send__sync_bootloader(ser)
438 
439 while hex_stream.is_open():
440  if not hex_stream.is_encrypted():
441  flash_id, row_number, data_length, data, checksum = read__flash_line(hex_stream)
442  print "Writing row", row_number, "for", flash_id, "process at", ("%.2f" % last_percent), "percent completion"
443  last_percent = (100.0 * hex_stream.get_position()) / hex_stream.get_size()
444 
445  real_checksum = convert_checksum(checksum, flash_id, row_number, len(data))
446  if DEBUG: print "Write", flash_id, row_number, data_length, len(data), checksum, "->", real_checksum
447 
448  flash_first_row, flash_last_row = send__get_flash_size(ser, flash_id)
449  if DEBUG: print "Flash: ", flash_first_row, flash_last_row
450  if row_number < flash_first_row or row_number > flash_last_row:
451  raise Exception("Invalid row: must be between " + str(flash_first_row)
452  + " and " + str(flash_last_row) + " but is " + str(row_number))
453 
454  send__erase_row(ser, flash_id, row_number)
455 
456  data_strip = data
457  data_sent = 0
458  while data_strip != [] and data_strip != "":
459  bytes_at_a_time = 256
460  data_next = data_strip[0:bytes_at_a_time]
461  data_strip = data_strip[bytes_at_a_time:]
462  data_sent += bytes_at_a_time
463  if DEBUG: print "Sending", len(data_next), "Sent: ", data_sent, "Left: ", len(data_strip)
464  if data_strip == [] or data_strip == "":
465  #Program write
466  if DEBUG: print "Last packet"
467  send__program_row(ser, flash_id, row_number, data_next)
468  comp_checksum = send__verify_row(ser, flash_id, row_number)
469  if real_checksum != comp_checksum:
470  raise Exception("Checksum error at " + flash_id + " " + row_number
471  + ". We wanted " + cstr(real_checksum) + " but we got " + cstr(comp_checksum))
472  else:
473  if DEBUG: print "Checksum valid", flash_id, row_number, "->", real_checksum, "==", comp_checksum
474  else:
475  send__data(ser, data_next)
476  else:
477  flash_id, row_number, data_length, data = read__encrypted_flash_line(hex_stream)
478  print "Writing row", row_number, "for", flash_id, "process at", ("%.2f" % last_percent), "percent completion"
479  last_percent = (100.0 * hex_stream.get_position()) / hex_stream.get_size()
480  assert len(data) < 256
481  send__encrypted_program_row(ser, flash_id, row_number, data)
482 
483 if send__verify_checksum(ser) == 0:
484  raise Exception("Application checksum invalid")
485 
486 try:
487  hex_stream.close()
488 except:
489  pass
490 
492 
493 ser.close()
494 
495 print "Finished upgrading firmware!"
def send__get_flash_size(ser, flash_id)
def write(self, x, mylen=None)
def twos_complement(bytes)
def send__encrypted_program_row(ser, flash_id, row_number, data_next)
def load_hex(filename)
def compute_checksum(bytes)
def send__verify_row(ser, flash_id, row_number)
def send__enter_bootloader(ser)
def send__program_row(ser, flash_id, row_number, data_next)
def __init__(self, ser, cmd)
def send__data(ser, data_next)
def send__sync_bootloader(ser)
def read_num(self, bytes)
def require_len(self, mylen)
def send__get_application_status(ser, app_number)
def __init__(self, content, encrypted)
def send__set_active_application(ser, app_number)
def convert_checksum(checksum, flash_id, row_number, row_size)
def send__exit_bootloader(ser)
def send__erase_row(ser, flash_id, row_number)
def send__verify_checksum(ser)
def crc16_ccitt(bytes)
def read__encrypted_flash_line(f)


ubiquity_motor
Author(s):
autogenerated on Mon Feb 28 2022 23:57:06