mirror of
https://github.com/espressif/esp-idf.git
synced 2026-01-01 00:34:18 +00:00
Merge branch 'ci/improve-tests' into 'master'
tests: change wifi tests expect timeout to 60 See merge request espressif/esp-idf!42888
This commit is contained in:
@@ -221,7 +221,7 @@ def test_tee_cli_secure_ota_wifi(dut: Dut) -> None:
|
||||
|
||||
# Fetch the DUT IP address
|
||||
try:
|
||||
ip_address = dut.expect(r'got ip:(\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'got ip:(\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
|
||||
@@ -19,12 +19,12 @@ def get_sdk_path() -> str:
|
||||
return idf_path
|
||||
|
||||
|
||||
class CustomProcess(object):
|
||||
class CustomProcess:
|
||||
def __init__(self, cmd: str, logfile: str, verbose: bool = True) -> None:
|
||||
self.verbose = verbose
|
||||
self.f = open(logfile, 'w', encoding='utf-8')
|
||||
if self.verbose:
|
||||
logging.info('Starting {} > {}'.format(cmd, self.f.name))
|
||||
logging.info(f'Starting {cmd} > {self.f.name}')
|
||||
self.pexpect_proc = pexpect.spawn(cmd, timeout=60, logfile=self.f, encoding='utf-8', codec_errors='ignore')
|
||||
|
||||
def __enter__(self): # type: ignore
|
||||
@@ -33,7 +33,7 @@ class CustomProcess(object):
|
||||
def close(self) -> None:
|
||||
self.pexpect_proc.terminate(force=True)
|
||||
|
||||
def __exit__(self, type, value, traceback): # type: ignore
|
||||
def __exit__(self, type, value, traceback): # type: ignore # noqa
|
||||
self.close()
|
||||
self.f.close()
|
||||
|
||||
@@ -58,7 +58,7 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]')[1].decode()
|
||||
dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
if config == 'default':
|
||||
dut.expect('esp_https_server: Starting server')
|
||||
dut.expect('esp_https_server: Server listening on port 443')
|
||||
@@ -73,28 +73,32 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
# Running mDNS services in docker is not a trivial task. Therefore, the script won't connect to the host name but
|
||||
# to IP address. However, the certificates were generated for the host name and will be rejected.
|
||||
if config == 'default':
|
||||
cmd = ' '.join([
|
||||
sys.executable,
|
||||
os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name',
|
||||
dut_ip,
|
||||
'--dont-check-hostname',
|
||||
]) # don't reject the certificate because of the hostname
|
||||
cmd = ' '.join(
|
||||
[
|
||||
sys.executable,
|
||||
os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name',
|
||||
dut_ip,
|
||||
'--dont-check-hostname',
|
||||
]
|
||||
) # don't reject the certificate because of the hostname
|
||||
elif config == 'http':
|
||||
cmd = ' '.join([
|
||||
sys.executable,
|
||||
os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--transport http',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name',
|
||||
dut_ip,
|
||||
'--dont-check-hostname',
|
||||
])
|
||||
cmd = ' '.join(
|
||||
[
|
||||
sys.executable,
|
||||
os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--transport http',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name',
|
||||
dut_ip,
|
||||
'--dont-check-hostname',
|
||||
]
|
||||
)
|
||||
esp_local_ctrl_log = os.path.join(idf_path, rel_project_path, 'esp_local_ctrl.log')
|
||||
with CustomProcess(cmd, esp_local_ctrl_log) as ctrl_py:
|
||||
|
||||
@@ -103,15 +107,15 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
ctrl_py.pexpect_proc.expect_exact('==== Available Properties ====')
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'S.N. Name\s+Type\s+Flags\s+Value'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 1\] timestamp \(us\)\s+TIME\(us\)\s+Read-Only\s+\d+'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 2\] property1\s+INT32\s+{}'.format(prop1)))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(rf'\[ 2\] property1\s+INT32\s+{prop1}'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 3\] property2\s+BOOLEAN\s+Read-Only\s+(True)|(False)'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 4\] property3\s+STRING\s+{}'.format(prop3)))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(rf'\[ 4\] property3\s+STRING\s+{prop3}'))
|
||||
ctrl_py.pexpect_proc.expect_exact("Select properties to set (0 to re-read, 'q' to quit) :")
|
||||
|
||||
property1 = 123456789
|
||||
property3 = ''
|
||||
|
||||
ctrl_py.pexpect_proc.expect_exact('Connecting to {}'.format(dut_ip))
|
||||
ctrl_py.pexpect_proc.expect_exact(f'Connecting to {dut_ip}')
|
||||
if config == 'default':
|
||||
dut.expect('esp_https_server: performing session handshake', timeout=60)
|
||||
expect_properties(property1, property3)
|
||||
@@ -127,14 +131,14 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
ctrl_py.pexpect_proc.sendline('2')
|
||||
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property1) :')
|
||||
ctrl_py.pexpect_proc.sendline(str(property1))
|
||||
dut.expect_exact('control: Setting property1 value to {}'.format(property1))
|
||||
dut.expect_exact(f'control: Setting property1 value to {property1}')
|
||||
expect_properties(property1, property3)
|
||||
|
||||
property3 = 'test'
|
||||
ctrl_py.pexpect_proc.sendline('4')
|
||||
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property3) :')
|
||||
ctrl_py.pexpect_proc.sendline(property3)
|
||||
dut.expect_exact('control: Setting property3 value to {}'.format(property3))
|
||||
dut.expect_exact(f'control: Setting property3 value to {property3}')
|
||||
expect_properties(property1, property3)
|
||||
|
||||
ctrl_py.pexpect_proc.sendline('q')
|
||||
|
||||
@@ -36,7 +36,7 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'tests.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
logging.info('Starting http_server advanced test app')
|
||||
|
||||
@@ -48,7 +48,7 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
@@ -64,8 +64,8 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
max_uri_len = int(result[4])
|
||||
max_stack_size = int(result[5])
|
||||
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
|
||||
# Run test script
|
||||
# If failed raise appropriate exception
|
||||
|
||||
@@ -34,7 +34,7 @@ def test_examples_protocol_http_server_file_serving(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'file_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('file_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'file_server_bin_size : {bin_size // 1024}KB')
|
||||
logging.info('Erasing the storage partition on the chip')
|
||||
dut.serial.erase_partition('storage')
|
||||
# Upload binary and start testing
|
||||
@@ -51,11 +51,11 @@ def test_examples_protocol_http_server_file_serving(dut: Dut) -> None:
|
||||
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# Expected logs
|
||||
got_port = dut.expect(r"Starting HTTP Server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Run test script
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
|
||||
@@ -30,7 +30,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'persistent_sockets.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server persistence test app')
|
||||
@@ -43,11 +43,11 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
@@ -77,7 +77,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
|
||||
# Retest PUT request and change session context value
|
||||
num = random.randint(0, 100)
|
||||
logging.info('Adding: {}'.format(num))
|
||||
logging.info(f'Adding: {num}')
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
@@ -95,7 +95,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
# Test POST request and session persistence
|
||||
random_nums = [random.randint(0, 100) for _ in range(100)]
|
||||
for num in random_nums:
|
||||
logging.info('Adding: {}'.format(num))
|
||||
logging.info(f'Adding: {num}')
|
||||
client.postreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
@@ -103,7 +103,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
dut.expect('/adder handler read ' + str(num), timeout=30)
|
||||
|
||||
# Test GET request and session persistence
|
||||
logging.info('Matching final sum: {}'.format(adder))
|
||||
logging.info(f'Matching final sum: {adder}')
|
||||
if client.getreq(conn, '/adder').decode() != str(adder):
|
||||
raise RuntimeError
|
||||
visitor += 1
|
||||
|
||||
@@ -16,6 +16,7 @@ from pytest_embedded_idf.utils import idf_parametrize
|
||||
|
||||
try:
|
||||
import http.client
|
||||
|
||||
from idf_http_server_test import client
|
||||
except ModuleNotFoundError:
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'tools', 'ci', 'python_packages'))
|
||||
@@ -43,13 +44,13 @@ class http_client_thread(threading.Thread):
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.open_connection(self.ip, self.port, self.delay)
|
||||
except socket.timeout:
|
||||
except TimeoutError:
|
||||
self.exc = 1
|
||||
|
||||
def join(self, timeout=None): # type: ignore
|
||||
threading.Thread.join(self)
|
||||
if self.exc:
|
||||
raise socket.timeout
|
||||
raise TimeoutError
|
||||
|
||||
|
||||
@pytest.mark.wifi_router
|
||||
@@ -58,7 +59,7 @@ def test_examples_protocol_http_server_simple(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
@@ -71,11 +72,11 @@ def test_examples_protocol_http_server_simple(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(' '.join([ap_ssid, ap_password]))
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
@@ -131,7 +132,7 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
@@ -144,11 +145,11 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
@@ -160,7 +161,7 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
except OSError as err:
|
||||
logging.info('Error: unable to start thread, {}'.format(err))
|
||||
logging.info(f'Error: unable to start thread, {err}')
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
@@ -179,7 +180,7 @@ def test_examples_protocol_http_server_sse(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
@@ -192,11 +193,11 @@ def test_examples_protocol_http_server_sse(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = int(dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode())
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
|
||||
@@ -30,7 +30,7 @@ class WsClient:
|
||||
self.uri = uri
|
||||
|
||||
def __enter__(self): # type: ignore
|
||||
self.ws.connect('ws://{}:{}/{}'.format(self.ip, self.port, self.uri))
|
||||
self.ws.connect(f'ws://{self.ip}:{self.port}/{self.uri}')
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
@@ -53,7 +53,7 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'ws_echo_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_ws_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_ws_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
logging.info('Starting ws-echo-server test app based on http_server')
|
||||
|
||||
@@ -65,11 +65,11 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Start ws server test
|
||||
with WsClient(got_ip, int(got_port), uri='ws') as ws:
|
||||
@@ -77,24 +77,24 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
for expected_opcode in [OPCODE_TEXT, OPCODE_BIN, OPCODE_PING]:
|
||||
ws.write(data=DATA, opcode=expected_opcode)
|
||||
opcode, data = ws.read()
|
||||
logging.info('Testing opcode {}: Received opcode:{}, data:{}'.format(expected_opcode, opcode, data))
|
||||
logging.info(f'Testing opcode {expected_opcode}: Received opcode:{opcode}, data:{data}')
|
||||
data = data.decode()
|
||||
if expected_opcode == OPCODE_PING:
|
||||
dut.expect('Got a WS PING frame, Replying PONG')
|
||||
if opcode != OPCODE_PONG or data != DATA:
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}')
|
||||
continue
|
||||
dut_data = dut.expect(r'Got packet with message: ([A-Za-z0-9_]*)')[1]
|
||||
dut_opcode = dut.expect(r'Packet type: ([0-9]*)')[1].decode()
|
||||
|
||||
if opcode != expected_opcode or data != DATA or opcode != int(dut_opcode) or (data not in str(dut_data)):
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}')
|
||||
ws.write(data='Trigger async', opcode=OPCODE_TEXT)
|
||||
opcode, data = ws.read()
|
||||
logging.info('Testing async send: Received opcode:{}, data:{}'.format(opcode, data))
|
||||
logging.info(f'Testing async send: Received opcode:{opcode}, data:{data}')
|
||||
data = data.decode()
|
||||
if opcode != OPCODE_TEXT or data != 'Async data':
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}')
|
||||
|
||||
|
||||
@pytest.mark.wifi_router
|
||||
@@ -111,7 +111,7 @@ def test_ws_auth_handshake(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
# Prepare a minimal WebSocket handshake request
|
||||
# Use WSClient to attempt the handshake, expecting it to fail (handshake rejected)
|
||||
|
||||
@@ -121,7 +121,7 @@ def test_examples_protocol_https_server_simple(dut: Dut) -> None:
|
||||
# Parse IP address and port of the server
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
# Expected logs
|
||||
|
||||
@@ -199,7 +199,7 @@ def test_examples_protocol_https_server_simple_dynamic_buffers(dut: Dut) -> None
|
||||
# Parse IP address and port of the server
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
# Expected logs
|
||||
|
||||
@@ -273,7 +273,7 @@ def test_examples_protocol_https_server_tls1_3(dut: Dut) -> None:
|
||||
# Parse IP address and port of the server
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
# Expected logs
|
||||
|
||||
@@ -364,7 +364,7 @@ def test_examples_protocol_https_server_tls1_2_only(dut: Dut) -> None:
|
||||
# Parse IP address and port of the server
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
# Expected logs
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
|
||||
@@ -6,9 +6,7 @@ import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from types import TracebackType
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import websocket
|
||||
@@ -33,16 +31,16 @@ class WsClient:
|
||||
self.ws.settimeout(10)
|
||||
|
||||
def __enter__(self): # type: ignore
|
||||
self.ws.connect('wss://{}:{}/ws'.format(self.ip, self.port))
|
||||
self.ws.connect(f'wss://{self.ip}:{self.port}/ws')
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: (type, RuntimeError, TracebackType) -> None
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
self.ws.close()
|
||||
|
||||
def read(self): # type: () -> Any
|
||||
def read(self) -> Any:
|
||||
return self.ws.recv_data(control_frame=True)
|
||||
|
||||
def write(self, data, opcode=OPCODE_TEXT): # type: (str, int) -> Any
|
||||
def write(self, data: str, opcode: int = OPCODE_TEXT) -> Any:
|
||||
if opcode == OPCODE_PING:
|
||||
return self.ws.ping(data)
|
||||
if opcode == OPCODE_PONG:
|
||||
@@ -61,7 +59,7 @@ class wss_client_thread(threading.Thread):
|
||||
self.data = 'Espressif'
|
||||
self.async_response = False
|
||||
|
||||
def run(self): # type: () -> None
|
||||
def run(self) -> None:
|
||||
with WsClient(self.ip, self.port, self.ca_file) as ws:
|
||||
while True:
|
||||
try:
|
||||
@@ -73,7 +71,7 @@ class wss_client_thread(threading.Thread):
|
||||
if opcode == OPCODE_TEXT:
|
||||
if data == CORRECT_ASYNC_DATA:
|
||||
self.async_response = True
|
||||
logging.info('Thread {} obtained correct async message'.format(self.name))
|
||||
logging.info(f'Thread {self.name} obtained correct async message')
|
||||
# Keep sending pong to update the keepalive in the server
|
||||
if (time.time() - self.start_time) > 20:
|
||||
break
|
||||
@@ -83,7 +81,7 @@ class wss_client_thread(threading.Thread):
|
||||
if self.async_response is not True:
|
||||
self.exc = RuntimeError('Failed to obtain correct async data') # type: ignore
|
||||
|
||||
def join(self, timeout=0): # type:(Optional[float]) -> None
|
||||
def join(self, timeout: float | None = 0) -> None:
|
||||
threading.Thread.join(self)
|
||||
if self.exc:
|
||||
raise self.exc
|
||||
@@ -111,7 +109,7 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'wss_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('https_wss_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'https_wss_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
logging.info('Starting wss_server test app')
|
||||
|
||||
@@ -124,10 +122,10 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
# Parse IP address of STA
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
ca_file = os.path.join(os.path.dirname(__file__), 'main', 'certs', 'servercert.pem')
|
||||
# Start ws server test
|
||||
@@ -137,11 +135,11 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
dut.expect('performing session handshake')
|
||||
client_fd = int(dut.expect(r'New client connected (\d+)', timeout=30)[1].decode())
|
||||
ws.write(data=DATA, opcode=OPCODE_TEXT)
|
||||
dut.expect(r'Received packet with message: {}'.format(DATA))
|
||||
dut.expect(rf'Received packet with message: {DATA}')
|
||||
opcode, data = ws.read()
|
||||
data = data.decode('UTF-8')
|
||||
if data != DATA:
|
||||
raise RuntimeError(f'Failed to receive the correct echo response.')
|
||||
raise RuntimeError('Failed to receive the correct echo response.')
|
||||
logging.info('Correct echo response obtained from the wss server')
|
||||
|
||||
# Test for PING
|
||||
@@ -172,11 +170,12 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
|
||||
# keepalive timeout is 10 seconds so do not respond for (10 + 1) seconds
|
||||
logging.info(
|
||||
'Testing if client is disconnected if it does not respond for 10s i.e. keep_alive timeout (approx time = 11s)'
|
||||
'Testing if client is disconnected if it does not respond for 10s '
|
||||
'i.e. keep_alive timeout (approx time = 11s)'
|
||||
)
|
||||
try:
|
||||
dut.expect('Client not alive, closing fd {}'.format(client_fd), timeout=20)
|
||||
dut.expect('Client disconnected {}'.format(client_fd))
|
||||
dut.expect(f'Client not alive, closing fd {client_fd}', timeout=20)
|
||||
dut.expect(f'Client disconnected {client_fd}')
|
||||
except Exception:
|
||||
logging.info('ENV_ERROR:Failed the test for keep alive,\nthe connection was not closed after timeout')
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ def _run_test(dut: Dut) -> None:
|
||||
dut.expect('Got IPv4 event:', timeout=30)
|
||||
|
||||
ping_dest = os.getenv('EXAMPLE_ICMP_SERVER', 'ci.espressif.cn')
|
||||
dut.write('ping {} -c 5'.format(ping_dest))
|
||||
dut.write(f'ping {ping_dest} -c 5')
|
||||
|
||||
# expect at least two packets (there could be lost packets)
|
||||
ip = dut.expect(r'64 bytes from (\d+\.\d+\.\d+\.\d+) icmp_seq=\d ttl=\d+ time=\d+ ms')[1].decode()
|
||||
@@ -79,12 +79,12 @@ def test_protocols_icmp_echo_ipv6_only(dut: Dut) -> None:
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
logging.info(f'Connected AP with IPv6={ipv6}')
|
||||
interface_nr = dut.expect(r'Connected on interface: [a-z]{2}\d \((\d+)\)', timeout=30)[1].decode()
|
||||
|
||||
# ping our own address to simplify things
|
||||
dut.write('ping -I {} {} -c 5'.format(interface_nr, ipv6))
|
||||
dut.write(f'ping -I {interface_nr} {ipv6} -c 5')
|
||||
|
||||
# expect at least two packets (there could be lost packets)
|
||||
ip = dut.expect(r'64 bytes from ([0-9a-fA-F:]+) icmp_seq=\d ttl=\d+ time=\d+ ms')[1].decode()
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
from common_test_methods import get_env_config_variable
|
||||
@@ -21,7 +20,7 @@ def test_get_time_from_sntp_server(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
dut.expect('IPv4 address:')
|
||||
dut.expect('IPv4 address:', timeout=60)
|
||||
|
||||
dut.expect('Initializing and starting SNTP')
|
||||
dut.expect('Notification of a time synchronization event')
|
||||
@@ -32,11 +31,11 @@ def test_get_time_from_sntp_server(dut: Dut) -> None:
|
||||
NY_time = None
|
||||
SH_time = None
|
||||
|
||||
def check_time(prev_NY_time: Any, prev_SH_time: Any) -> Tuple[Any, Any]:
|
||||
NY_str = dut.expect(r'The current date/time in New York is: ({})'.format(TIME_FORMAT_REGEX))[1].decode()
|
||||
SH_str = dut.expect(r'The current date/time in Shanghai is: ({})'.format(TIME_FORMAT_REGEX))[1].decode()
|
||||
logging.info('New York: "{}"'.format(NY_str))
|
||||
logging.info('Shanghai: "{}"'.format(SH_str))
|
||||
def check_time(prev_NY_time: Any, prev_SH_time: Any) -> tuple[Any, Any]:
|
||||
NY_str = dut.expect(rf'The current date/time in New York is: ({TIME_FORMAT_REGEX})')[1].decode()
|
||||
SH_str = dut.expect(rf'The current date/time in Shanghai is: ({TIME_FORMAT_REGEX})')[1].decode()
|
||||
logging.info(f'New York: "{NY_str}"')
|
||||
logging.info(f'Shanghai: "{SH_str}"')
|
||||
dut.expect('Entering deep sleep for 10 seconds')
|
||||
logging.info('Sleeping...')
|
||||
new_NY_time = datetime.datetime.strptime(NY_str, TIME_FORMAT)
|
||||
@@ -50,5 +49,5 @@ def test_get_time_from_sntp_server(dut: Dut) -> None:
|
||||
|
||||
NY_time, SH_time = check_time(NY_time, SH_time)
|
||||
for i in range(2, 4):
|
||||
dut.expect('example: Boot count: {}'.format(i), timeout=30)
|
||||
dut.expect(f'example: Boot count: {i}', timeout=30)
|
||||
NY_time, SH_time = check_time(NY_time, SH_time)
|
||||
|
||||
@@ -39,13 +39,13 @@ def test_examples_tcp_client_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected with IPv4={ipv4}')
|
||||
|
||||
# test IPv4
|
||||
with TcpServer(PORT, socket.AF_INET):
|
||||
server_ip = get_host_ip4_by_dest_ip(ipv4)
|
||||
print('Connect tcp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect tcp client to server IP={server_ip}')
|
||||
dut.write(server_ip)
|
||||
dut.expect('OK: Message from ESP32')
|
||||
|
||||
@@ -63,16 +63,16 @@ def test_examples_tcp_client_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
|
||||
# test IPv6
|
||||
my_interface = get_my_interface_by_dest_ip(ipv4)
|
||||
with TcpServer(PORT, socket.AF_INET6):
|
||||
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
|
||||
print('Connect tcp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect tcp client to server IP={server_ip}')
|
||||
dut.write(server_ip)
|
||||
dut.expect('OK: Message from ESP32')
|
||||
|
||||
@@ -39,7 +39,7 @@ def test_examples_tcp_server_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
logging.info(f'Connected with IPv4={ipv4}')
|
||||
time.sleep(1)
|
||||
|
||||
@@ -69,7 +69,7 @@ def test_examples_tcp_server_ipv4_esp32c2_26mhz(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
logging.info(f'Connected with IPv4={ipv4}')
|
||||
time.sleep(1)
|
||||
|
||||
@@ -93,16 +93,16 @@ def test_examples_tcp_server_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
logging.info(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
time.sleep(1)
|
||||
|
||||
interface = get_my_interface_by_dest_ip(ipv4)
|
||||
# test IPv6
|
||||
received = tcp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
|
||||
received = tcp_client(f'{ipv6}%{interface}', PORT, MESSAGE)
|
||||
if not received == MESSAGE:
|
||||
raise
|
||||
dut.expect(MESSAGE)
|
||||
@@ -123,7 +123,7 @@ def test_examples_tcp_server_ipv6_only(dut: Dut) -> None:
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
logging.info(f'Connected AP with IPv6={ipv6}')
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
@@ -41,13 +41,13 @@ def test_examples_udp_client_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected with IPv4={ipv4}')
|
||||
|
||||
# test IPv4
|
||||
with UdpServer(PORT, socket.AF_INET):
|
||||
server_ip = get_host_ip4_by_dest_ip(ipv4)
|
||||
print('Connect udp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect udp client to server IP={server_ip}')
|
||||
for _ in range(MAX_RETRIES):
|
||||
try:
|
||||
dut.write(server_ip)
|
||||
@@ -72,17 +72,17 @@ def test_examples_udp_client_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
|
||||
interface = get_my_interface_by_dest_ip(ipv4)
|
||||
# test IPv6
|
||||
with UdpServer(PORT, socket.AF_INET6):
|
||||
server_ip = get_host_ip6_by_dest_ip(ipv6, interface)
|
||||
print('Connect udp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect udp client to server IP={server_ip}')
|
||||
for _ in range(MAX_RETRIES):
|
||||
try:
|
||||
dut.write(server_ip)
|
||||
|
||||
@@ -38,7 +38,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected with IPv4={ipv4}')
|
||||
|
||||
# test IPv4
|
||||
@@ -49,7 +49,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None:
|
||||
print('OK')
|
||||
break
|
||||
else:
|
||||
raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
|
||||
raise ValueError(f'IPv4: Did not receive UDP message after {MAX_RETRIES} retries')
|
||||
dut.expect(MESSAGE)
|
||||
|
||||
|
||||
@@ -66,20 +66,20 @@ def test_examples_udp_server_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
|
||||
interface = get_my_interface_by_dest_ip(ipv4)
|
||||
# test IPv6
|
||||
for _ in range(MAX_RETRIES):
|
||||
print('Testing UDP on IPv6...')
|
||||
received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
|
||||
received = udp_client(f'{ipv6}%{interface}', PORT, MESSAGE)
|
||||
if received == MESSAGE:
|
||||
print('OK')
|
||||
break
|
||||
else:
|
||||
raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
|
||||
raise ValueError(f'IPv6: Did not receive UDP message after {MAX_RETRIES} retries')
|
||||
dut.expect(MESSAGE)
|
||||
|
||||
@@ -86,7 +86,7 @@ def test_examples_tee_secure_ota_example(dut: Dut) -> None:
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
|
||||
@@ -196,7 +196,7 @@ def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None:
|
||||
for _ in range(iterations):
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -237,7 +237,7 @@ def test_examples_protocol_advanced_https_ota_example_ota_resumption(dut: Dut) -
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -255,7 +255,7 @@ def test_examples_protocol_advanced_https_ota_example_ota_resumption(dut: Dut) -
|
||||
dut.expect('Loaded app from partition at offset', timeout=180)
|
||||
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -307,7 +307,7 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) ->
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -358,7 +358,7 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut)
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -409,7 +409,7 @@ def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None:
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -462,7 +462,7 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut)
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -499,7 +499,7 @@ def test_examples_protocol_advanced_https_ota_example_chunked(dut: Dut) -> None:
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -537,7 +537,7 @@ def test_examples_protocol_advanced_https_ota_example_redirect_url(dut: Dut) ->
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -622,7 +622,7 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) ->
|
||||
# Positive Case
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -633,7 +633,7 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) ->
|
||||
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
|
||||
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
|
||||
dut.expect('Loaded app from partition at offset', timeout=60)
|
||||
dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
dut.expect(r'App is valid, rollback cancelled successfully', timeout=30)
|
||||
|
||||
# Negative Case
|
||||
@@ -686,7 +686,7 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(dut: Dut)
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
@@ -744,7 +744,7 @@ def test_examples_protocol_advanced_https_ota_example_ota_resumption_partial_dow
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
@@ -760,7 +760,7 @@ def test_examples_protocol_advanced_https_ota_example_ota_resumption_partial_dow
|
||||
dut.expect('Loaded app from partition at offset', timeout=180)
|
||||
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -815,7 +815,7 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) ->
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
@@ -871,7 +871,7 @@ def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(dut: Dut)
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
@@ -922,7 +922,7 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -982,7 +982,7 @@ def test_examples_protocol_advanced_https_ota_example_verify_min_chip_revision(d
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -1038,7 +1038,7 @@ def test_examples_protocol_advanced_https_ota_example_verify_max_chip_revision(d
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
|
||||
@@ -6,7 +6,6 @@ import os
|
||||
import ssl
|
||||
import sys
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
@@ -109,7 +108,7 @@ def test_examples_partitions_ota_with_flash_encryption_wifi(dut: Dut) -> None:
|
||||
update_partitions(dut, 'flash_encryption_wifi_high_traffic')
|
||||
|
||||
|
||||
def update_partitions(dut: Dut, env_name: Optional[str]) -> None:
|
||||
def update_partitions(dut: Dut, env_name: str | None) -> None:
|
||||
port = 8000
|
||||
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', port))
|
||||
thread1.daemon = True
|
||||
@@ -123,7 +122,7 @@ def update_partitions(dut: Dut, env_name: Optional[str]) -> None:
|
||||
thread1.terminate()
|
||||
|
||||
|
||||
def update(dut: Dut, port: int, path_to_image: str, env_name: Optional[str]) -> None:
|
||||
def update(dut: Dut, port: int, path_to_image: str, env_name: str | None) -> None:
|
||||
dut.expect('OTA example app_main start', timeout=90)
|
||||
host_ip = setting_connection(dut, env_name)
|
||||
dut.expect('Starting OTA example task', timeout=30)
|
||||
@@ -133,14 +132,14 @@ def update(dut: Dut, port: int, path_to_image: str, env_name: Optional[str]) ->
|
||||
dut.expect('OTA Succeed, Rebooting...', timeout=90)
|
||||
|
||||
|
||||
def setting_connection(dut: Dut, env_name: Optional[str]) -> Any:
|
||||
def setting_connection(dut: Dut, env_name: str | None) -> Any:
|
||||
if env_name is not None and dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
||||
dut.expect('Please input ssid password:')
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
@@ -151,8 +150,8 @@ def start_https_server(
|
||||
ota_image_dir: str,
|
||||
server_ip: str,
|
||||
server_port: int,
|
||||
server_file: Optional[str] = None,
|
||||
key_file: Optional[str] = None,
|
||||
server_file: str | None = None,
|
||||
key_file: str | None = None,
|
||||
) -> None:
|
||||
os.chdir(ota_image_dir)
|
||||
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import http.server
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import ssl
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
@@ -87,8 +86,8 @@ def start_https_server(
|
||||
ota_image_dir: str,
|
||||
server_ip: str,
|
||||
server_port: int,
|
||||
server_file: Optional[str] = None,
|
||||
key_file: Optional[str] = None,
|
||||
server_file: str | None = None,
|
||||
key_file: str | None = None,
|
||||
) -> None:
|
||||
os.chdir(ota_image_dir)
|
||||
|
||||
@@ -143,15 +142,15 @@ def start_tls1_3_server(ota_image_dir: str, server_port: int) -> subprocess.Pope
|
||||
|
||||
|
||||
def check_sha256(sha256_expected: str, sha256_reported: str) -> None:
|
||||
print('sha256_expected: %s' % (sha256_expected))
|
||||
print('sha256_reported: %s' % (sha256_reported))
|
||||
logging.info('sha256_expected: %s', sha256_expected)
|
||||
logging.info('sha256_reported: %s', sha256_reported)
|
||||
if sha256_expected not in sha256_reported:
|
||||
raise ValueError('SHA256 mismatch')
|
||||
else:
|
||||
print('SHA256 expected and reported are the same')
|
||||
logging.info('SHA256 expected and reported are the same')
|
||||
|
||||
|
||||
def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
|
||||
def calc_all_sha256(dut: Dut) -> tuple[str, str]:
|
||||
bootloader_path = os.path.join(dut.app.binary_path, 'bootloader', 'bootloader.bin')
|
||||
sha256_bootloader = dut.app.get_sha256(bootloader_path)
|
||||
|
||||
@@ -161,14 +160,14 @@ def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
|
||||
return str(sha256_bootloader), str(sha256_app)
|
||||
|
||||
|
||||
def setting_connection(dut: Dut, env_name: Optional[str] = None) -> Any:
|
||||
def setting_connection(dut: Dut, env_name: str | None = None) -> Any:
|
||||
if env_name is not None and dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
||||
dut.expect('Please input ssid password:')
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
|
||||
@@ -31,7 +31,7 @@ def _run_test(dut: Dut) -> None:
|
||||
pass
|
||||
|
||||
try:
|
||||
dut.expect(r'got ip: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)
|
||||
dut.expect(r'got ip: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)
|
||||
log_after_got_ip = dut.expect(pexpect.TIMEOUT, timeout=10).decode()
|
||||
if any(s in log_after_got_ip for s in bad_event_str):
|
||||
logging.info('Abnormal connection log:')
|
||||
|
||||
Reference in New Issue
Block a user