1
0
Fork 0
InfiniTime/bootloader/ota-dfu-python/ble_secure_dfu_controller.py

324 lines
13 KiB
Python
Raw Permalink Normal View History

import math
import pexpect
import time
from array import array
from util import *
from nrf_ble_dfu_controller import NrfBleDfuController
verbose = False
class Procedures:
CREATE = 0x01
SET_PRN = 0x02
CALC_CHECKSUM = 0x03
EXECUTE = 0x04
SELECT = 0x06
RESPONSE = 0x60
PARAM_COMMAND = 0x01
PARAM_DATA = 0x02
string_map = {
CREATE : "CREATE",
SET_PRN : "SET_PRN",
CALC_CHECKSUM : "CALC_CHECKSUM",
EXECUTE : "EXECUTE",
SELECT : "SELECT",
RESPONSE : "RESPONSE",
}
@staticmethod
def to_string(proc):
return Procedures.string_map[proc]
@staticmethod
def from_string(proc_str):
return int(proc_str, 16)
class Results:
INVALID_CODE = 0x00
SUCCESS = 0x01
OPCODE_NOT_SUPPORTED = 0x02
INVALID_PARAMETER = 0x03
INSUFF_RESOURCES = 0x04
INVALID_OBJECT = 0x05
UNSUPPORTED_TYPE = 0x07
OPERATION_NOT_PERMITTED = 0x08
OPERATION_FAILED = 0x0A
string_map = {
INVALID_CODE : "INVALID_CODE",
SUCCESS : "SUCCESS",
OPCODE_NOT_SUPPORTED : "OPCODE_NOT_SUPPORTED",
INVALID_PARAMETER : "INVALID_PARAMETER",
INSUFF_RESOURCES : "INSUFFICIENT_RESOURCES",
INVALID_OBJECT : "INVALID_OBJECT",
UNSUPPORTED_TYPE : "UNSUPPORTED_TYPE",
OPERATION_NOT_PERMITTED : "OPERATION_NOT_PERMITTED",
OPERATION_FAILED : "OPERATION_FAILED",
}
@staticmethod
def to_string(res):
return Results.string_map[res]
@staticmethod
def from_string(res_str):
return int(res_str, 16)
class BleDfuControllerSecure(NrfBleDfuController):
# Class constants
UUID_BUTTONLESS = '8e400001-f315-4f60-9fb8-838830daea50'
UUID_CONTROL_POINT = '8ec90001-f315-4f60-9fb8-838830daea50'
UUID_PACKET = '8ec90002-f315-4f60-9fb8-838830daea50'
# Constructor inherited from abstract base class
# --------------------------------------------------------------------------
# Start the firmware update process
# --------------------------------------------------------------------------
def start(self):
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
if verbose:
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
print('Packet handle: 0x%04x' % (self.data_handle))
# Subscribe to notifications from Control Point characteristic
self._enable_notifications(self.ctrlpt_cccd_handle)
# Set the Packet Receipt Notification interval
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
self._dfu_send_command(Procedures.SET_PRN, prn)
self._dfu_send_init()
self._dfu_send_image()
# --------------------------------------------------------------------------
# Check if the peripheral is running in bootloader (DFU) or application mode
# Returns True if the peripheral is in DFU mode
# --------------------------------------------------------------------------
def check_DFU_mode(self):
print("Checking DFU State...")
self.ble_conn.sendline('characteristics')
dfu_mode = False
try:
self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=2)
except pexpect.TIMEOUT as e:
dfu_mode = True
return dfu_mode
def switch_to_dfu_mode(self):
(_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_BUTTONLESS)
self._enable_notifications(bl_cccd_handle)
# Reset the board in DFU mode. After reset the board will be disconnected
cmd = 'char-write-req 0x%04x 01' % (bl_value_handle)
self.ble_conn.sendline(cmd)
# Wait some time for board to reboot
time.sleep(0.5)
# Increase the mac address by one and reconnect
self.target_mac_increase(1)
return self.scan_and_connect()
# --------------------------------------------------------------------------
# Parse notification status results
# --------------------------------------------------------------------------
def _dfu_parse_notify(self, notify):
if len(notify) < 3:
print("notify data length error")
return None
if verbose: print(notify)
dfu_notify_opcode = Procedures.from_string(notify[0])
if dfu_notify_opcode == Procedures.RESPONSE:
dfu_procedure = Procedures.from_string(notify[1])
dfu_result = Results.from_string(notify[2])
procedure_str = Procedures.to_string(dfu_procedure)
result_str = Results.to_string(dfu_result)
# if verbose: print "opcode: {0}, proc: {1}, res: {2}".format(dfu_notify_opcode, procedure_str, result_str)
if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, result_str))
# Packet Receipt notifications are sent in the exact same format
# as responses to the CALC_CHECKSUM procedure.
if(dfu_procedure == Procedures.CALC_CHECKSUM and dfu_result == Results.SUCCESS):
offset = bytes_to_uint32_le(notify[3:7])
crc32 = bytes_to_uint32_le(notify[7:11])
return (dfu_procedure, dfu_result, offset, crc32)
elif(dfu_procedure == Procedures.SELECT and dfu_result == Results.SUCCESS):
max_size = bytes_to_uint32_le(notify[3:7])
offset = bytes_to_uint32_le(notify[7:11])
crc32 = bytes_to_uint32_le(notify[11:15])
return (dfu_procedure, dfu_result, max_size, offset, crc32)
else:
return (dfu_procedure, dfu_result)
# --------------------------------------------------------------------------
# Wait for a notification and parse the response
# --------------------------------------------------------------------------
def _wait_and_parse_notify(self):
if verbose: print("Waiting for notification")
notify = self._dfu_wait_for_notify()
if notify is None:
raise Exception("No notification received")
if verbose: print("Parsing notification")
result = self._dfu_parse_notify(notify)
if result[1] != Results.SUCCESS:
raise Exception("Error in {} procedure, reason: {}".format(
Procedures.to_string(result[0]),
Results.to_string(result[1])))
return result
# --------------------------------------------------------------------------
# Send the Init info (*.dat file contents) to peripheral device.
# --------------------------------------------------------------------------
def _dfu_send_init(self):
if verbose: print("dfu_send_init")
# Open the DAT file and create array of its contents
init_bin_array = array('B', open(self.datfile_path, 'rb').read())
init_size = len(init_bin_array)
init_crc = 0;
# Select command
self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_COMMAND]);
(proc, res, max_size, offset, crc32) = self._wait_and_parse_notify()
if offset != init_size or crc32 != init_crc:
if offset == 0 or offset > init_size:
# Create command
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_COMMAND] + uint32_to_bytes_le(init_size))
res = self._wait_and_parse_notify()
segment_count = 0
segment_total = int(math.ceil(init_size/float(self.pkt_payload_size)))
for i in range(0, init_size, self.pkt_payload_size):
segment = init_bin_array[i:i + self.pkt_payload_size]
self._dfu_send_data(segment)
segment_count += 1
if (segment_count % self.pkt_receipt_interval) == 0:
(proc, res, offset, crc32) = self._wait_and_parse_notify()
if res != Results.SUCCESS:
raise Exception("bad notification status: {}".format(Results.to_string(res)))
# Calculate CRC
self._dfu_send_command(Procedures.CALC_CHECKSUM)
self._wait_and_parse_notify()
# Execute command
self._dfu_send_command(Procedures.EXECUTE)
self._wait_and_parse_notify()
print("Init packet successfully transferred")
# --------------------------------------------------------------------------
# Send the Firmware image to peripheral device.
# --------------------------------------------------------------------------
def _dfu_send_image(self):
if verbose: print("dfu_send_image")
# Select Data Object
self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_DATA])
(proc, res, max_size, offset, crc32) = self._wait_and_parse_notify()
# Split the firmware into multiple objects
num_objects = int(math.ceil(self.image_size / float(max_size)))
print("Max object size: %d, num objects: %d, offset: %d, total size: %d" % (max_size, num_objects, offset, self.image_size))
time_start = time.time()
last_send_time = time.time()
obj_offset = (offset/max_size)*max_size
while(obj_offset < self.image_size):
# print "\nSending object {} of {}".format(obj_offset/max_size+1, num_objects)
obj_offset += self._dfu_send_object(obj_offset, max_size)
# Image uploaded successfully, update the progress bar
print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
duration = time.time() - time_start
print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60)))
# --------------------------------------------------------------------------
# Send a single data object of given size and offset.
# --------------------------------------------------------------------------
def _dfu_send_object(self, offset, obj_max_size):
if offset != self.image_size:
if offset == 0 or offset >= obj_max_size or crc32 != crc32_unsigned(self.bin_array[0:offset]):
# Create Data Object
size = min(obj_max_size, self.image_size - offset)
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(size))
self._wait_and_parse_notify()
segment_count = 0
segment_total = int(math.ceil(min(obj_max_size, self.image_size-offset)/float(self.pkt_payload_size)))
segment_begin = offset
segment_end = min(offset+obj_max_size, self.image_size)
for i in range(segment_begin, segment_end, self.pkt_payload_size):
num_bytes = min(self.pkt_payload_size, segment_end - i)
segment = self.bin_array[i:i + num_bytes]
self._dfu_send_data(segment)
segment_count += 1
# print "j: {} i: {}, end: {}, bytes: {}, size: {} segment #{} of {}".format(
# offset, i, segment_end, num_bytes, self.image_size, segment_count, segment_total)
if (segment_count % self.pkt_receipt_interval) == 0:
try:
(proc, res, offset, crc32) = self._wait_and_parse_notify()
except e:
# Likely no notification received, need to re-transmit object
return 0
if res != Results.SUCCESS:
raise Exception("bad notification status: {}".format(Results.to_string(res)))
if crc32 != crc32_unsigned(self.bin_array[0:offset]):
# Something went wrong, need to re-transmit this object
return 0
print_progress(offset, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
# Calculate CRC
self._dfu_send_command(Procedures.CALC_CHECKSUM)
(proc, res, offset, crc32) = self._wait_and_parse_notify()
if(crc32 != crc32_unsigned(self.bin_array[0:offset])):
# Need to re-transmit object
return 0
# Execute command
self._dfu_send_command(Procedures.EXECUTE)
self._wait_and_parse_notify()
# If everything executed correctly, return amount of bytes transferred
return obj_max_size