examples: create example for both Python and CLI otatool interfaces

This commit is contained in:
Renz Christian Bagaporo
2019-05-27 11:10:00 +08:00
parent 5d41322412
commit 7f7a9272f0
5 changed files with 275 additions and 160 deletions

View File

@@ -18,20 +18,12 @@
# limitations under the License.
import os
import sys
import subprocess
import argparse
import serial
import re
IDF_PATH = os.path.expandvars("$IDF_PATH")
OTATOOL_PY = os.path.join(IDF_PATH, "components", "app_update", "otatool.py")
ESPTOOL_PY = os.path.join(IDF_PATH, "components", "esptool_py", "esptool", "esptool.py")
INVOKE_ARGS = [sys.executable, OTATOOL_PY, "-q"]
from get_running_partition import get_running_partition
def sized_file_compare(file1, file2):
def assert_file_same(file1, file2, err):
with open(file1, "rb") as f1:
with open(file2, "rb") as f2:
f1 = f1.read()
@@ -42,122 +34,22 @@ def sized_file_compare(file1, file2):
else:
f1 = f1[:len(f2)]
return f1 == f2
if not f1 == f2:
raise Exception(err)
def check(condition, message):
if not condition:
print("Error: " + message)
sys.exit(1)
def flash_example_firmware_to_ota_partitions(args):
# Invokes the command
#
# otatool.py -q write_ota_partition --slot <part_slot> or
# otatool.py -q write_ota_partition --name <part_name>
#
# to write the contents of a file to the specified ota partition (either using name or the slot number)
print("Writing factory firmware to ota_0")
invoke_args = INVOKE_ARGS + ["write_ota_partition", "--slot", "0", "--input", args.binary]
subprocess.check_call(invoke_args)
print("Writing factory firmware to ota_1")
invoke_args = INVOKE_ARGS + ["write_ota_partition", "--name", "ota_1", "--input", args.binary]
subprocess.check_call(invoke_args)
# Verify that the contents of the two ota slots are the same as that of the factory partition
print("Checking written firmware to ota_0 and ota_1 match factory firmware")
# Invokes the command
#
# otatool.py -q read_ota_partition --slot <part_slot> or
# otatool.py -q read_ota_partition --name <part_name>
#
# to read the contents of a specified ota partition (either using name or the slot number) and write to a file
invoke_args = INVOKE_ARGS + ["read_ota_partition", "--slot", "0", "--output", "app_0.bin"]
subprocess.check_call(invoke_args)
invoke_args = INVOKE_ARGS + ["read_ota_partition", "--name", "ota_1", "--output", "app_1.bin"]
subprocess.check_call(invoke_args)
ota_same = sized_file_compare("app_0.bin", args.binary)
check(ota_same, "Slot 0 app does not match factory app")
ota_same = sized_file_compare("app_1.bin", args.binary)
check(ota_same, "Slot 1 app does not match factory app")
def check_running_ota_partition(expected, port=None):
# Monitor the serial output of target device. The firmware outputs the currently
# running partition. It should match the partition the otatool switched to.
if expected == 0 or expected == "ota_0":
expected = b"ota_0"
elif expected == 1 or expected == "ota_1":
expected = b"ota_1"
else:
expected = b"factory"
sys.path.append(os.path.join(IDF_PATH, 'components', 'esptool_py', 'esptool'))
import esptool
baud = os.environ.get("ESPTOOL_BAUD", esptool.ESPLoader.ESP_ROM_BAUD)
if not port:
# Check what esptool.py finds on what port the device is connected to
output = subprocess.check_output([sys.executable, ESPTOOL_PY, "chip_id"])
pattern = r"Serial port ([\S]+)"
pattern = re.compile(pattern.encode())
port = re.search(pattern, output).group(1)
serial_instance = serial.serial_for_url(port.decode("utf-8"), baud, do_not_open=True)
serial_instance.dtr = False
serial_instance.rts = False
serial_instance.rts = True
serial_instance.open()
serial_instance.rts = False
# Read until example end and find the currently running partition string
content = serial_instance.read_until(b"Example end")
pattern = re.compile(b"Running partition: ([a-z0-9_]+)")
running = re.search(pattern, content).group(1)
check(expected == running, "Running partition %s does not match expected %s" % (running, expected))
def switch_partition(part, port):
if isinstance(part, int):
spec = "slot"
else:
spec = "name"
print("Switching to ota partition %s %s" % (spec, str(part)))
if str(part) == "factory":
# Invokes the command
#
# otatool.py -q erase_otadata
#
# to erase the otadata partition, effectively setting boot firmware to
# factory
subprocess.check_call(INVOKE_ARGS + ["erase_otadata"])
else:
# Invokes the command
#
# otatool.py -q switch_otadata --slot <part_slot> or
# otatool.py -q switch_otadata --name <part_name>
#
# to switch to the indicated ota partition (either using name or the slot number)
subprocess.check_call(INVOKE_ARGS + ["switch_otadata", "--" + spec, str(part)])
check_running_ota_partition(part, port)
def assert_running_partition(expected, port=None):
running = get_running_partition(port)
if running != expected:
raise Exception("Running partition %s does not match expected %s" % (running, expected))
def main():
global INVOKE_ARGS
COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
OTATOOL_DIR = os.path.join(COMPONENTS_PATH, "app_update")
sys.path.append(OTATOOL_DIR)
from otatool import OtatoolTarget
parser = argparse.ArgumentParser("ESP-IDF OTA Tool Example")
@@ -165,29 +57,61 @@ def main():
parser.add_argument("--binary", "-b", help="path to built example binary", default=os.path.join("build", "otatool.bin"))
args = parser.parse_args()
if args.port:
INVOKE_ARGS += ["--port", args.port]
target = OtatoolTarget(args.port)
# Flash the factory firmware to all ota partitions
flash_example_firmware_to_ota_partitions(args)
print("Writing factory firmware to ota_0")
target.write_ota_partition(0, args.binary)
# Perform switching ota partitions
switch_partition("factory", args.port)
switch_partition("factory", args.port) # check switching to factory partition twice in a row
print("Writing factory firmware to ota_1")
target.write_ota_partition("ota_1", args.binary)
switch_partition(0, args.port)
# Verify that the contents of the two ota slots are the same as that of the factory partition
print("Checking written firmware to ota_0 and ota_1 match factory firmware")
target.read_ota_partition("ota_0", "app0.bin")
target.read_ota_partition(1, "app1.bin")
switch_partition("ota_1", args.port)
switch_partition(1, args.port) # check switching to ota_1 partition twice in a row
assert_file_same("app0.bin", args.binary, "Slot 0 app does not match factory app")
assert_file_same("app1.bin", args.binary, "Slot 1 app does not match factory app")
switch_partition("ota_0", args.port)
switch_partition(0, args.port) # check switching to ota_0 partition twice in a row
# Switch to factory app
print("Switching to factory app")
target.erase_otadata()
assert_running_partition("factory")
switch_partition("factory", args.port)
# Switch to slot 0
print("Switching to OTA slot 0")
target.switch_ota_partition(0)
assert_running_partition("ota_0")
switch_partition(1, args.port) # check switching to ota_1 partition from factory
# Switch to slot 1 twice in a row
print("Switching to OTA slot 1 (twice in a row)")
target.switch_ota_partition(1)
assert_running_partition("ota_1")
target.switch_ota_partition("ota_1")
assert_running_partition("ota_1")
# Switch to slot 0 twice in a row
print("Switching to OTA slot 0 (twice in a row)")
target.switch_ota_partition(0)
assert_running_partition("ota_0")
target.switch_ota_partition("ota_0")
assert_running_partition("ota_0")
# Switch to factory app
print("Switching to factory app")
target.erase_otadata()
assert_running_partition("factory")
# Switch to slot 1
print("Switching to OTA slot 1")
target.switch_ota_partition(1)
assert_running_partition("ota_1")
# Example end and cleanup
print("\nOTA tool operations executed successfully!")
clean_files = ["app0.bin", "app1.bin"]
for clean_file in clean_files:
os.unlink(clean_file)
if __name__ == '__main__':