mirror of
https://github.com/espressif/esp-idf.git
synced 2025-11-13 00:38:54 +00:00
416 lines
14 KiB
Python
Executable File
416 lines
14 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# otatool is used to perform ota-level operations - flashing ota partition
|
|
# erasing ota partition and switching ota partition
|
|
#
|
|
# SPDX-FileCopyrightText: 2018-2025 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
import argparse
|
|
import binascii
|
|
import collections
|
|
import os
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
|
|
try:
|
|
from parttool import PARTITION_TABLE_OFFSET
|
|
from parttool import PartitionName
|
|
from parttool import PartitionType
|
|
from parttool import ParttoolTarget
|
|
except ImportError:
|
|
COMPONENTS_PATH = os.path.expandvars(os.path.join('$IDF_PATH', 'components'))
|
|
PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, 'partition_table')
|
|
sys.path.append(PARTTOOL_DIR)
|
|
from parttool import PARTITION_TABLE_OFFSET
|
|
from parttool import PartitionName
|
|
from parttool import PartitionType
|
|
from parttool import ParttoolTarget
|
|
|
|
__version__ = '2.0'
|
|
|
|
SPI_FLASH_SEC_SIZE = 0x2000
|
|
|
|
quiet = False
|
|
|
|
|
|
def status(msg):
|
|
if not quiet:
|
|
print(msg)
|
|
|
|
|
|
class OtatoolTarget:
|
|
OTADATA_PARTITION = PartitionType('data', 'ota')
|
|
|
|
def __init__(
|
|
self,
|
|
port=None,
|
|
baud=None,
|
|
partition_table_offset=PARTITION_TABLE_OFFSET,
|
|
partition_table_file=None,
|
|
spi_flash_sec_size=SPI_FLASH_SEC_SIZE,
|
|
esptool_args=[],
|
|
esptool_write_args=[],
|
|
esptool_read_args=[],
|
|
esptool_erase_args=[],
|
|
):
|
|
self.target = ParttoolTarget(
|
|
port,
|
|
baud,
|
|
partition_table_offset,
|
|
partition_table_file,
|
|
esptool_args,
|
|
esptool_write_args,
|
|
esptool_read_args,
|
|
esptool_erase_args,
|
|
)
|
|
self.spi_flash_sec_size = spi_flash_sec_size
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False)
|
|
temp_file.close()
|
|
try:
|
|
self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
|
|
with open(temp_file.name, 'rb') as f:
|
|
self.otadata = f.read()
|
|
finally:
|
|
os.unlink(temp_file.name)
|
|
|
|
def _check_otadata_partition(self):
|
|
if not self.otadata:
|
|
raise Exception('No otadata partition found')
|
|
|
|
def erase_otadata(self):
|
|
self._check_otadata_partition()
|
|
self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
|
|
|
|
def _get_otadata_info(self):
|
|
info = []
|
|
|
|
otadata_info = collections.namedtuple('otadata_info', 'seq crc')
|
|
|
|
for i in range(2):
|
|
start = i * (self.spi_flash_sec_size >> 1)
|
|
|
|
seq = bytearray(self.otadata[start : start + 4])
|
|
crc = bytearray(self.otadata[start + 28 : start + 32])
|
|
|
|
seq = struct.unpack('I', seq)
|
|
crc = struct.unpack('I', crc)
|
|
info.append(otadata_info(seq[0], crc[0]))
|
|
|
|
return info
|
|
|
|
def _get_partition_id_from_ota_id(self, ota_id):
|
|
if isinstance(ota_id, int):
|
|
return PartitionType('app', 'ota_' + str(ota_id))
|
|
else:
|
|
return PartitionName(ota_id)
|
|
|
|
def switch_ota_partition(self, ota_id):
|
|
self._check_otadata_partition()
|
|
|
|
import gen_esp32part as gen
|
|
|
|
def is_otadata_info_valid(status):
|
|
seq = status.seq % (1 << 32)
|
|
crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32)
|
|
return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
|
|
|
|
partition_table = self.target.partition_table
|
|
|
|
ota_partitions = list()
|
|
|
|
for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
|
|
ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
|
|
|
|
try:
|
|
ota_partitions.append(list(ota_partition)[0])
|
|
except IndexError:
|
|
break
|
|
|
|
ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
|
|
|
|
if not ota_partitions:
|
|
raise Exception('No ota app partitions found')
|
|
|
|
# Look for the app partition to switch to
|
|
ota_partition_next = None
|
|
|
|
try:
|
|
if isinstance(ota_id, int):
|
|
ota_partition_next = filter(
|
|
lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions
|
|
)
|
|
else:
|
|
ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
|
|
|
|
ota_partition_next = list(ota_partition_next)[0]
|
|
except IndexError:
|
|
raise Exception('Partition to switch to not found')
|
|
|
|
otadata_info = self._get_otadata_info()
|
|
|
|
# Find the copy to base the computation for ota sequence number on
|
|
otadata_compute_base = -1
|
|
|
|
# Both are valid, take the max as computation base
|
|
if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
|
|
if otadata_info[0].seq >= otadata_info[1].seq:
|
|
otadata_compute_base = 0
|
|
else:
|
|
otadata_compute_base = 1
|
|
# Only one copy is valid, use that
|
|
elif is_otadata_info_valid(otadata_info[0]):
|
|
otadata_compute_base = 0
|
|
elif is_otadata_info_valid(otadata_info[1]):
|
|
otadata_compute_base = 1
|
|
# Both are invalid (could be initial state - all 0xFF's)
|
|
else:
|
|
pass
|
|
|
|
ota_seq_next = 0
|
|
ota_partitions_num = len(ota_partitions)
|
|
|
|
target_seq = (ota_partition_next.subtype & 0x0F) + 1
|
|
|
|
# Find the next ota sequence number
|
|
if otadata_compute_base == 0 or otadata_compute_base == 1:
|
|
base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
|
|
|
|
i = 0
|
|
while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
|
|
i += 1
|
|
|
|
ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
|
|
else:
|
|
ota_seq_next = target_seq
|
|
|
|
# Create binary data from computed values
|
|
ota_seq_next = struct.pack('I', ota_seq_next)
|
|
ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
|
|
ota_seq_crc_next = struct.pack('I', ota_seq_crc_next)
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False)
|
|
temp_file.close()
|
|
|
|
try:
|
|
with open(temp_file.name, 'wb') as otadata_next_file:
|
|
start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
|
|
|
|
otadata_next_file.write(self.otadata)
|
|
|
|
otadata_next_file.seek(start)
|
|
otadata_next_file.write(ota_seq_next)
|
|
|
|
otadata_next_file.seek(start + 28)
|
|
otadata_next_file.write(ota_seq_crc_next)
|
|
|
|
otadata_next_file.flush()
|
|
|
|
self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
|
|
finally:
|
|
os.unlink(temp_file.name)
|
|
|
|
def read_ota_partition(self, ota_id, output):
|
|
self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
|
|
|
|
def write_ota_partition(self, ota_id, input_file):
|
|
self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input_file)
|
|
|
|
def erase_ota_partition(self, ota_id):
|
|
self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
|
|
|
|
|
|
def _read_otadata(target):
|
|
target._check_otadata_partition()
|
|
|
|
otadata_info = target._get_otadata_info()
|
|
|
|
print(' {:8s} \t {:8s} | \t {:8s} \t {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC'))
|
|
print(
|
|
f'Firmware: {otadata_info[0].seq:#08x} \t{otadata_info[0].crc:#08x} | '
|
|
f'\t{otadata_info[1].seq:#08x} \t {otadata_info[1].crc:#08x}'
|
|
)
|
|
|
|
|
|
def _erase_otadata(target):
|
|
target.erase_otadata()
|
|
status('Erased ota_data partition contents')
|
|
|
|
|
|
def _switch_ota_partition(target, ota_id):
|
|
target.switch_ota_partition(ota_id)
|
|
|
|
|
|
def _read_ota_partition(target, ota_id, output):
|
|
target.read_ota_partition(ota_id, output)
|
|
status(f'Read ota partition contents to file {output}')
|
|
|
|
|
|
def _write_ota_partition(target, ota_id, input_file):
|
|
target.write_ota_partition(ota_id, input_file)
|
|
status(f'Written contents of file {input_file} to ota partition')
|
|
|
|
|
|
def _erase_ota_partition(target, ota_id):
|
|
target.erase_ota_partition(ota_id)
|
|
status('Erased contents of ota partition')
|
|
|
|
|
|
def main():
|
|
global quiet
|
|
|
|
parser = argparse.ArgumentParser('ESP-IDF OTA Partitions Tool')
|
|
|
|
parser.add_argument('--quiet', '-q', help='suppress stderr messages', action='store_true')
|
|
parser.add_argument('--esptool-args', help='additional main arguments for esptool', nargs='+')
|
|
parser.add_argument(
|
|
'--esptool-write-args', help='additional subcommand arguments for esptool write-flash', nargs='+'
|
|
)
|
|
parser.add_argument('--esptool-read-args', help='additional subcommand arguments for esptool read-flash', nargs='+')
|
|
parser.add_argument(
|
|
'--esptool-erase-args', help='additional subcommand arguments for esptool erase-region', nargs='+'
|
|
)
|
|
|
|
# There are two possible sources for the partition table: a device attached to the host
|
|
# or a partition table CSV/binary file. These sources are mutually exclusive.
|
|
parser.add_argument('--port', '-p', help='port where the device to read the partition table from is attached')
|
|
|
|
parser.add_argument('--baud', '-b', help='baudrate to use', type=int)
|
|
|
|
parser.add_argument('--partition-table-offset', '-o', help='offset to read the partition table from', type=str)
|
|
|
|
parser.add_argument(
|
|
'--partition-table-file',
|
|
'-f',
|
|
help='file (CSV/binary) to read the partition table from; '
|
|
'overrides device attached to specified port as the partition table source when defined',
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(dest='operation', help='run otatool -h for additional help')
|
|
|
|
spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
|
|
spi_flash_sec_size.add_argument('--spi-flash-sec-size', help='value of SPI_FLASH_SEC_SIZE macro', type=str)
|
|
|
|
# Specify the supported operations
|
|
subparsers.add_parser('read_otadata', help='read otadata partition', parents=[spi_flash_sec_size])
|
|
subparsers.add_parser('erase_otadata', help='erase otadata partition')
|
|
|
|
slot_or_name_parser = argparse.ArgumentParser(add_help=False)
|
|
slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
|
|
slot_or_name_parser_args.add_argument('--slot', help='slot number of the ota partition', type=int)
|
|
slot_or_name_parser_args.add_argument('--name', help='name of the ota partition')
|
|
|
|
subparsers.add_parser(
|
|
'switch_ota_partition', help='switch otadata partition', parents=[slot_or_name_parser, spi_flash_sec_size]
|
|
)
|
|
|
|
read_ota_partition_subparser = subparsers.add_parser(
|
|
'read_ota_partition', help='read contents of an ota partition', parents=[slot_or_name_parser]
|
|
)
|
|
read_ota_partition_subparser.add_argument(
|
|
'--output', help='file to write the contents of the ota partition to', required=True
|
|
)
|
|
|
|
write_ota_partition_subparser = subparsers.add_parser(
|
|
'write_ota_partition', help='write contents to an ota partition', parents=[slot_or_name_parser]
|
|
)
|
|
write_ota_partition_subparser.add_argument('--input', help='file whose contents to write to the ota partition')
|
|
|
|
subparsers.add_parser(
|
|
'erase_ota_partition', help='erase contents of an ota partition', parents=[slot_or_name_parser]
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
quiet = args.quiet
|
|
|
|
# No operation specified, display help and exit
|
|
if args.operation is None:
|
|
if not quiet:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
target_args = {}
|
|
|
|
if args.port:
|
|
target_args['port'] = args.port
|
|
|
|
if args.partition_table_file:
|
|
target_args['partition_table_file'] = args.partition_table_file
|
|
|
|
if args.partition_table_offset:
|
|
target_args['partition_table_offset'] = int(args.partition_table_offset, 0)
|
|
|
|
try:
|
|
if args.spi_flash_sec_size:
|
|
target_args['spi_flash_sec_size'] = int(args.spi_flash_sec_size, 0)
|
|
except AttributeError:
|
|
pass
|
|
|
|
if args.esptool_args:
|
|
target_args['esptool_args'] = args.esptool_args
|
|
|
|
if args.esptool_write_args:
|
|
target_args['esptool_write_args'] = args.esptool_write_args
|
|
|
|
if args.esptool_read_args:
|
|
target_args['esptool_read_args'] = args.esptool_read_args
|
|
|
|
if args.esptool_erase_args:
|
|
target_args['esptool_erase_args'] = args.esptool_erase_args
|
|
|
|
if args.baud:
|
|
target_args['baud'] = args.baud
|
|
|
|
target = OtatoolTarget(**target_args)
|
|
|
|
# Create the operation table and execute the operation
|
|
common_args = {'target': target}
|
|
|
|
ota_id = []
|
|
|
|
try:
|
|
if args.name is not None:
|
|
ota_id = ['name']
|
|
else:
|
|
if args.slot is not None:
|
|
ota_id = ['slot']
|
|
except AttributeError:
|
|
pass
|
|
|
|
otatool_ops = {
|
|
'read_otadata': (_read_otadata, []),
|
|
'erase_otadata': (_erase_otadata, []),
|
|
'switch_ota_partition': (_switch_ota_partition, ota_id),
|
|
'read_ota_partition': (_read_ota_partition, ['output'] + ota_id),
|
|
'write_ota_partition': (_write_ota_partition, ['input'] + ota_id),
|
|
'erase_ota_partition': (_erase_ota_partition, ota_id),
|
|
}
|
|
|
|
(op, op_args) = otatool_ops[args.operation]
|
|
|
|
for op_arg in op_args:
|
|
common_args.update({op_arg: vars(args)[op_arg]})
|
|
|
|
try:
|
|
common_args['ota_id'] = common_args.pop('name')
|
|
except KeyError:
|
|
try:
|
|
common_args['ota_id'] = common_args.pop('slot')
|
|
except KeyError:
|
|
pass
|
|
|
|
if quiet:
|
|
# If exceptions occur, suppress and exit quietly
|
|
try:
|
|
op(**common_args)
|
|
except Exception:
|
|
sys.exit(2)
|
|
else:
|
|
op(**common_args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|