feature: Pre Encrypted Binary for OTA updates

This feature enables using encrypted binaries for OTA updates

Closes https://github.com/espressif/esp-idf/issues/6172
This commit is contained in:
Harshit Malpani
2021-12-17 15:27:14 +05:30
parent 0d03c17ab2
commit 5a6dbaa487
13 changed files with 618 additions and 46 deletions

View File

@@ -138,8 +138,8 @@ def test_examples_protocol_advanced_https_ota_example(env, extra_data):
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
@@ -172,11 +172,10 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_d
truncated_bin_size = 64000
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
f = open(binary_file, 'rb+')
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
fo.write(f.read(truncated_bin_size))
fo.close()
f.close()
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+') as fo:
fo.write(f.read(truncated_bin_size))
binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
@@ -192,14 +191,17 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_d
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
dut1.expect('Image validation failed, image is corrupted', timeout=30)
os.remove(binary_file)
try:
os.remove(binary_file)
except OSError:
pass
thread1.terminate()
@@ -224,11 +226,10 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extr
truncated_bin_size = 180
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
f = open(binary_file, 'rb+')
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
fo.write(f.read(truncated_bin_size))
fo.close()
f.close()
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+') as fo:
fo.write(f.read(truncated_bin_size))
binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
@@ -244,14 +245,17 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extr
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
dut1.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
os.remove(binary_file)
try:
os.remove(binary_file)
except OSError:
pass
thread1.terminate()
@@ -274,13 +278,13 @@ def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
random_bin_size = 32000
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
fo = open(binary_file, 'wb+')
# First byte of binary file is always set to zero. If first byte is generated randomly,
# in some cases it may generate 0xE9 which will result in failure of testcase.
fo.write(struct.pack('B', 0))
for i in range(random_bin_size - 1):
fo.write(struct.pack('B', random.randrange(0,255,1)))
fo.close()
with open(binary_file, 'wb+') as fo:
# First byte of binary file is always set to zero. If first byte is generated randomly,
# in some cases it may generate 0xE9 which will result in failure of testcase.
fo.write(struct.pack('B', 0))
for i in range(random_bin_size - 1):
fo.write(struct.pack('B', random.randrange(0,255,1)))
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
@@ -295,14 +299,71 @@ def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
dut1.expect(re.compile(r'esp_https_ota: Incorrect app descriptor magic'), timeout=10)
try:
os.remove(binary_file)
except OSError:
pass
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(env, extra_data):
"""
Working of OTA if binary file have invalid chip id is validated in this test case.
Chip id verification should fail in this case.
steps: |
1. join AP
2. Generate binary image with invalid chip id
3. Fetch OTA image over HTTPS
4. Check working of code for random binary file
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
server_port = 8001
bin_name = 'advanced_https_ota.bin'
# Random binary file to be generated
random_bin_name = 'random.bin'
random_binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
# Size of random binary file. 2000 is choosen, to reduce the time required to run the test-case
random_bin_size = 2000
binary_file = os.path.join(dut1.app.binary_path, bin_name)
with open(binary_file, 'rb+') as f:
data = list(f.read(random_bin_size))
# Changing Chip id
data[13] = 0xfe
with open(random_binary_file, 'wb+') as fo:
fo.write(bytearray(data))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
dut1.expect(re.compile(r'esp_https_ota: Mismatch chip id, expected 0, found \d'), timeout=10)
os.remove(binary_file)
try:
os.remove(random_binary_file)
except OSError:
pass
thread1.terminate()
@@ -378,9 +439,9 @@ def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_da
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
thread2.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name))
@@ -415,14 +476,12 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_d
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
file_size = os.path.getsize(binary_file)
f = open(binary_file, 'rb+')
fo = open(os.path.join(dut1.app.binary_path, anti_rollback_bin_name), 'wb+')
fo.write(f.read(file_size))
# Change security_version to 0 for negative test case
fo.seek(36)
fo.write(b'\x00')
fo.close()
f.close()
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, anti_rollback_bin_name), 'wb+') as fo:
fo.write(f.read(file_size))
# Change security_version to 0 for negative test case
fo.seek(36)
fo.write(b'\x00')
binary_file = os.path.join(dut1.app.binary_path, anti_rollback_bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
@@ -439,8 +498,8 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_d
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
# Use originally generated image with secure_version=1
@@ -456,7 +515,10 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_d
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
dut1.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
os.remove(binary_file)
try:
os.remove(binary_file)
except OSError:
pass
thread1.terminate()
@@ -493,8 +555,8 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(env, extra
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
Utility.console_log('ENV_TEST_FAILURE: Cannot connect to AP')
raise
thread1.terminate()
raise
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
@@ -537,8 +599,8 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(env, extra_da
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
@@ -581,8 +643,8 @@ def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(env, extra
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
@@ -616,13 +678,11 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(env, e
bin_size = os.path.getsize(binary_file)
# Dummy data required to align binary size to 289 bytes boundary
dummy_data_size = 289 - (bin_size % 289)
f = open(binary_file, 'rb+')
fo = open(os.path.join(dut1.app.binary_path, aligned_bin_name), 'wb+')
fo.write(f.read(bin_size))
for _ in range(dummy_data_size):
fo.write(struct.pack('B', random.randrange(0,255,1)))
fo.close()
f.close()
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, aligned_bin_name), 'wb+') as fo:
fo.write(f.read(bin_size))
for _ in range(dummy_data_size):
fo.write(struct.pack('B', random.randrange(0,255,1)))
# start test
host_ip = get_my_ip()
chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
@@ -640,7 +700,10 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(env, e
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
chunked_server.kill()
os.remove(aligned_bin_name)
try:
os.remove(aligned_bin_name)
except OSError:
pass
if __name__ == '__main__':
@@ -650,6 +713,7 @@ if __name__ == '__main__':
test_examples_protocol_advanced_https_ota_example_truncated_bin()
test_examples_protocol_advanced_https_ota_example_truncated_header()
test_examples_protocol_advanced_https_ota_example_random()
test_examples_protocol_advanced_https_ota_example_invalid_chip_id()
test_examples_protocol_advanced_https_ota_example_anti_rollback()
test_examples_protocol_advanced_https_ota_example_partial_request()
test_examples_protocol_advanced_https_ota_example_nimble_gatts()