mirror of
https://github.com/espressif/esp-idf.git
synced 2026-01-20 18:45:17 +00:00
feat(openthread): output logs of host for debugging CI issues
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
# !/usr/bin/env python3
|
||||
# this file defines some functions for testing cli and br under pytest framework
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
@@ -28,7 +29,7 @@ def extract_address(
|
||||
try:
|
||||
result = dut.expect(pattern, timeout=5)[1].decode()
|
||||
except Exception as e:
|
||||
print(f'Error: {e}')
|
||||
logging.error(f'Error: {e}')
|
||||
return default_return
|
||||
return func(result)
|
||||
|
||||
@@ -151,7 +152,7 @@ def getDeviceRole(dut: IdfDut) -> str:
|
||||
wait(dut, 1)
|
||||
execute_command(dut, 'state')
|
||||
role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode()
|
||||
print(role)
|
||||
logging.info(role)
|
||||
return str(role)
|
||||
|
||||
|
||||
@@ -310,7 +311,7 @@ def get_host_interface_name() -> str:
|
||||
interface_name = config.get('interface_name')
|
||||
if interface_name:
|
||||
if interface_name == 'eth0':
|
||||
print(
|
||||
logging.warning(
|
||||
f"Warning: 'eth0' is not recommended as a valid network interface. "
|
||||
f"Please check and update the 'interface_name' in the configuration file: "
|
||||
f'{config_path}'
|
||||
@@ -318,9 +319,9 @@ def get_host_interface_name() -> str:
|
||||
else:
|
||||
return str(interface_name)
|
||||
else:
|
||||
print("Warning: Configuration file found but 'interface_name' is not defined.")
|
||||
logging.warning("Warning: Configuration file found but 'interface_name' is not defined.")
|
||||
except Exception as e:
|
||||
print(f'Error: Failed to read or parse {config_path}. Details: {e}')
|
||||
logging.error(f'Error: Failed to read or parse {config_path}. Details: {e}')
|
||||
if 'eth1' in netifaces.interfaces():
|
||||
return 'eth1'
|
||||
|
||||
@@ -338,8 +339,8 @@ def check_if_host_receive_ra(br: IdfDut) -> bool:
|
||||
omrprefix = get_omrprefix(br)
|
||||
command = 'ip -6 route | grep ' + str(interface_name)
|
||||
out_str = subprocess.getoutput(command)
|
||||
print('br omrprefix: ', str(omrprefix))
|
||||
print('host route table:\n', str(out_str))
|
||||
logging.info(f'br omrprefix: {omrprefix}')
|
||||
logging.info(f'host route table:\n {out_str}')
|
||||
return str(omrprefix) in str(out_str)
|
||||
|
||||
|
||||
@@ -404,7 +405,7 @@ def create_host_udp_server(myudp: udp_parameter) -> None:
|
||||
AF_INET = socket.AF_INET6
|
||||
else:
|
||||
AF_INET = socket.AF_INET
|
||||
print('The host start to create udp server!')
|
||||
logging.info('The host start to create udp server!')
|
||||
if_index = socket.if_nametoindex(interface_name)
|
||||
sock = socket.socket(AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((myudp.addr, myudp.port))
|
||||
@@ -417,13 +418,14 @@ def create_host_udp_server(myudp: udp_parameter) -> None:
|
||||
)
|
||||
sock.settimeout(myudp.timeout)
|
||||
myudp.init_flag = True
|
||||
print('The host start to receive message!')
|
||||
logging.info('The host start to receive message!')
|
||||
myudp.udp_bytes = (sock.recvfrom(1024))[0]
|
||||
print('The host has received message: ', myudp.udp_bytes)
|
||||
udp_str = str(myudp.udp_bytes)
|
||||
logging.info(f'The host has received message: {udp_str}')
|
||||
except OSError:
|
||||
print('The host did not receive message!')
|
||||
logging.error('The host did not receive message!')
|
||||
finally:
|
||||
print('Close the socket.')
|
||||
logging.info('Close the socket.')
|
||||
sock.close()
|
||||
|
||||
|
||||
@@ -438,10 +440,10 @@ def host_udp_send_message(udp_target: udp_parameter) -> None:
|
||||
sock.bind(('::', 12350))
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, interface_name.encode())
|
||||
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 32)
|
||||
print('Host is sending message')
|
||||
logging.info('Host is sending message')
|
||||
sock.sendto(udp_target.udp_bytes, (udp_target.addr, udp_target.port))
|
||||
except OSError:
|
||||
print('Host cannot send message')
|
||||
logging.error('Host cannot send message')
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
@@ -481,13 +483,13 @@ def host_close_service() -> None:
|
||||
command = 'ps auxww | grep avahi-publish-s'
|
||||
out_bytes = subprocess.check_output(command, shell=True, timeout=5)
|
||||
out_str = out_bytes.decode('utf-8')
|
||||
print('host close service avahi status:\n', out_str)
|
||||
logging.info(f'host close service avahi status:\n {out_str}')
|
||||
service_info = [line for line in out_str.splitlines() if 'testxxx _testxxx._udp' in line]
|
||||
for line in service_info:
|
||||
print('Process:', line)
|
||||
logging.info(f'Process:{line}')
|
||||
pid = line.split()[1]
|
||||
command = 'kill -9 ' + pid
|
||||
print('kill ', pid)
|
||||
logging.info(f'kill {pid}')
|
||||
subprocess.call(command, shell=True, timeout=5)
|
||||
time.sleep(1)
|
||||
|
||||
@@ -520,24 +522,24 @@ def open_host_interface() -> None:
|
||||
|
||||
def get_domain() -> str:
|
||||
hostname = socket.gethostname()
|
||||
print('hostname is: ', hostname)
|
||||
logging.info(f'hostname is: {hostname}')
|
||||
command = 'ps -auxww | grep avahi-daemon | grep running'
|
||||
out_str = subprocess.getoutput(command)
|
||||
print('avahi status:\n', out_str)
|
||||
logging.info(f'avahi status:\n {out_str}')
|
||||
role = re.findall(r'\[([\w\W]+)\.local\]', str(out_str))[0]
|
||||
print('active host is: ', role)
|
||||
logging.info(f'active host is: {role}')
|
||||
return str(role)
|
||||
|
||||
|
||||
def flush_ipv6_addr_by_interface() -> None:
|
||||
interface_name = get_host_interface_name()
|
||||
print(f'flush ipv6 addr : {interface_name}')
|
||||
logging.info(f'flush ipv6 addr : {interface_name}')
|
||||
command_show_addr = f'ip -6 addr show dev {interface_name}'
|
||||
command_show_route = f'ip -6 route show dev {interface_name}'
|
||||
addr_before = subprocess.getoutput(command_show_addr)
|
||||
route_before = subprocess.getoutput(command_show_route)
|
||||
print(f'Before flush, IPv6 addresses: \n{addr_before}')
|
||||
print(f'Before flush, IPv6 routes: \n{route_before}')
|
||||
logging.info(f'Before flush, IPv6 addresses: \n{addr_before}')
|
||||
logging.info(f'Before flush, IPv6 routes: \n{route_before}')
|
||||
subprocess.run(['ip', 'link', 'set', interface_name, 'down'])
|
||||
subprocess.run(['ip', '-6', 'addr', 'flush', 'dev', interface_name])
|
||||
subprocess.run(['ip', '-6', 'route', 'flush', 'dev', interface_name])
|
||||
@@ -545,8 +547,8 @@ def flush_ipv6_addr_by_interface() -> None:
|
||||
time.sleep(5)
|
||||
addr_after = subprocess.getoutput(command_show_addr)
|
||||
route_after = subprocess.getoutput(command_show_route)
|
||||
print(f'After flush, IPv6 addresses: \n{addr_after}')
|
||||
print(f'After flush, IPv6 routes: \n{route_after}')
|
||||
logging.info(f'After flush, IPv6 addresses: \n{addr_after}')
|
||||
logging.info(f'After flush, IPv6 routes: \n{route_after}')
|
||||
|
||||
|
||||
class tcp_parameter:
|
||||
@@ -575,28 +577,29 @@ def create_host_tcp_server(mytcp: tcp_parameter) -> None:
|
||||
AF_INET = socket.AF_INET6
|
||||
else:
|
||||
AF_INET = socket.AF_INET
|
||||
print('The host start to create a tcp server!')
|
||||
logging.info('The host start to create a tcp server!')
|
||||
sock = socket.socket(AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind((mytcp.addr, mytcp.port))
|
||||
sock.listen(5)
|
||||
mytcp.listen_flag = True
|
||||
|
||||
print('The tcp server is waiting for connection!')
|
||||
logging.info('The tcp server is waiting for connection!')
|
||||
sock.settimeout(mytcp.timeout)
|
||||
connfd, addr = sock.accept()
|
||||
print('The tcp server connected with ', addr)
|
||||
logging.info(f'The tcp server connected with {addr}')
|
||||
mytcp.recv_flag = True
|
||||
|
||||
mytcp.tcp_bytes = connfd.recv(1024)
|
||||
print('The tcp server has received message: ', mytcp.tcp_bytes)
|
||||
tcp_str = str(mytcp.tcp_bytes)
|
||||
logging.info(f'The tcp server has received message: {tcp_str}')
|
||||
|
||||
except OSError:
|
||||
if mytcp.recv_flag:
|
||||
print('The tcp server did not receive message!')
|
||||
logging.error('The tcp server did not receive message!')
|
||||
else:
|
||||
print('The tcp server fail to connect!')
|
||||
logging.error('The tcp server fail to connect!')
|
||||
finally:
|
||||
print('Close the socket.')
|
||||
logging.info('Close the socket.')
|
||||
sock.close()
|
||||
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
# !/usr/bin/env python3
|
||||
import copy
|
||||
import logging
|
||||
import os.path
|
||||
import random
|
||||
import re
|
||||
@@ -9,7 +10,6 @@ import secrets
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from typing import Tuple
|
||||
|
||||
import ot_ci_function as ocf
|
||||
import pexpect
|
||||
@@ -76,7 +76,7 @@ from pytest_embedded_idf.dut import IdfDut
|
||||
|
||||
@pytest.fixture(scope='module', name='Init_avahi')
|
||||
def fixture_Init_avahi() -> bool:
|
||||
print('Init Avahi')
|
||||
logging.info('Init Avahi')
|
||||
ocf.start_avahi()
|
||||
time.sleep(10)
|
||||
return True
|
||||
@@ -84,7 +84,7 @@ def fixture_Init_avahi() -> bool:
|
||||
|
||||
@pytest.fixture(name='Init_interface')
|
||||
def fixture_Init_interface() -> bool:
|
||||
print('Init interface')
|
||||
logging.info('Init interface')
|
||||
ocf.flush_ipv6_addr_by_interface()
|
||||
# The sleep time is set based on experience; reducing it might cause the host to be unready.
|
||||
time.sleep(30)
|
||||
@@ -131,7 +131,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_thread_connect(dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli_h2 = dut[1]
|
||||
dut[0].serial.stop_redirect_thread()
|
||||
@@ -204,7 +204,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None:
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -214,10 +214,10 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut
|
||||
try:
|
||||
assert ocf.is_joined_wifi_network(br)
|
||||
cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
|
||||
print('cli_global_unicast_addr', cli_global_unicast_addr)
|
||||
logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}')
|
||||
command = 'ping ' + str(cli_global_unicast_addr) + ' -c 10'
|
||||
out_str = subprocess.getoutput(command)
|
||||
print('ping result:\n', str(out_str))
|
||||
logging.info(f'ping result:\n{out_str}')
|
||||
role = re.findall(r' (\d+)%', str(out_str))[0]
|
||||
assert role != '100'
|
||||
interface_name = ocf.get_host_interface_name()
|
||||
@@ -225,7 +225,8 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut
|
||||
out_bytes = subprocess.check_output(command, shell=True, timeout=5)
|
||||
out_str = out_bytes.decode('utf-8')
|
||||
onlinkprefix = ocf.get_onlinkprefix(br)
|
||||
host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str))
|
||||
pattern = rf'\W+({onlinkprefix}(?:\w+:){{3}}\w+)\W+'
|
||||
host_global_unicast_addr = re.findall(pattern, out_str)
|
||||
rx_nums = 0
|
||||
for ip_addr in host_global_unicast_addr:
|
||||
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5)
|
||||
@@ -256,7 +257,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -271,7 +272,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut,
|
||||
interface_name = ocf.get_host_interface_name()
|
||||
command = 'ping -I ' + str(interface_name) + ' -t 64 ff04::125 -c 10'
|
||||
out_str = subprocess.getoutput(command)
|
||||
print('ping result:\n', str(out_str))
|
||||
logging.info(f'ping result:\n{out_str}')
|
||||
role = re.findall(r' (\d+)%', str(out_str))[0]
|
||||
assert role != '100'
|
||||
ocf.execute_command(cli, 'udp open')
|
||||
@@ -309,7 +310,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut,
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -364,7 +365,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut,
|
||||
indirect=True,
|
||||
)
|
||||
def test_service_discovery_of_Thread_device(
|
||||
Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]
|
||||
Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut]
|
||||
) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
@@ -377,14 +378,14 @@ def test_service_discovery_of_Thread_device(
|
||||
assert ocf.is_joined_wifi_network(br)
|
||||
command = 'avahi-browse -rt _testyyy._udp'
|
||||
out_str = subprocess.getoutput(command)
|
||||
print('avahi-browse:\n', str(out_str))
|
||||
logging.info(f'avahi-browse:\n{out_str}')
|
||||
assert 'myTest' not in str(out_str)
|
||||
hostname = 'myTest'
|
||||
command = 'srp client host name ' + hostname
|
||||
ocf.execute_command(cli, command)
|
||||
cli.expect('Done', timeout=5)
|
||||
cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
|
||||
print('cli_global_unicast_addr', cli_global_unicast_addr)
|
||||
logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}')
|
||||
command = 'srp client host address ' + str(cli_global_unicast_addr)
|
||||
ocf.execute_command(cli, command)
|
||||
cli.expect('Done', timeout=5)
|
||||
@@ -397,7 +398,7 @@ def test_service_discovery_of_Thread_device(
|
||||
ocf.wait(cli, 3)
|
||||
command = 'avahi-browse -rt _testyyy._udp'
|
||||
out_str = subprocess.getoutput(command)
|
||||
print('avahi-browse:\n', str(out_str))
|
||||
logging.info(f'avahi-browse:\n {out_str}')
|
||||
assert 'myTest' in str(out_str)
|
||||
finally:
|
||||
ocf.execute_command(br, 'factoryreset')
|
||||
@@ -425,7 +426,7 @@ def test_service_discovery_of_Thread_device(
|
||||
indirect=True,
|
||||
)
|
||||
def test_service_discovery_of_WiFi_device(
|
||||
Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]
|
||||
Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut]
|
||||
) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
@@ -442,7 +443,7 @@ def test_service_discovery_of_WiFi_device(
|
||||
cli.expect('Done', timeout=5)
|
||||
ocf.wait(cli, 1)
|
||||
domain_name = ocf.get_domain()
|
||||
print('domain name is: ', domain_name)
|
||||
logging.info(f'domain name is: {domain_name}')
|
||||
command = 'dns resolve ' + domain_name + '.default.service.arpa.'
|
||||
|
||||
ocf.execute_command(cli, command)
|
||||
@@ -494,7 +495,7 @@ def test_service_discovery_of_WiFi_device(
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -504,7 +505,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
try:
|
||||
assert ocf.is_joined_wifi_network(br)
|
||||
host_ipv4_address = ocf.get_host_ipv4_address()
|
||||
print('host_ipv4_address: ', host_ipv4_address)
|
||||
logging.info(f'host_ipv4_address: {host_ipv4_address}')
|
||||
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1]
|
||||
assert rx_nums != 0
|
||||
finally:
|
||||
@@ -532,7 +533,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -547,7 +548,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
cli.expect('Done', timeout=5)
|
||||
ocf.wait(cli, 3)
|
||||
host_ipv4_address = ocf.get_host_ipv4_address()
|
||||
print('host_ipv4_address: ', host_ipv4_address)
|
||||
logging.info(f'host_ipv4_address: {host_ipv4_address}')
|
||||
myudp = ocf.udp_parameter('INET4', host_ipv4_address, 5090, '', False, 15.0, b'')
|
||||
udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp,))
|
||||
udp_mission.start()
|
||||
@@ -588,7 +589,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -604,7 +605,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
ocf.wait(cli, 3)
|
||||
host_ipv4_address = ocf.get_host_ipv4_address()
|
||||
connect_address = ocf.get_ipv6_from_ipv4(host_ipv4_address, br)
|
||||
print('connect_address is: ', connect_address)
|
||||
logging.info(f'connect_address is: {connect_address}')
|
||||
mytcp = ocf.tcp_parameter('INET4', host_ipv4_address, 12345, False, False, 15.0, b'')
|
||||
tcp_mission = threading.Thread(target=ocf.create_host_tcp_server, args=(mytcp,))
|
||||
tcp_mission.start()
|
||||
@@ -657,7 +658,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
|
||||
leader = dut[0]
|
||||
sleepy_device = dut[1]
|
||||
fail_info = re.compile(r'Core\W*?\d\W*?register dump')
|
||||
@@ -706,7 +707,7 @@ def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None:
|
||||
br = dut[1]
|
||||
dut[0].serial.stop_redirect_thread()
|
||||
try:
|
||||
@@ -748,7 +749,7 @@ def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -787,7 +788,7 @@ def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut]) -> None:
|
||||
br = dut[1]
|
||||
assert Init_interface
|
||||
assert Init_avahi
|
||||
@@ -813,9 +814,9 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, I
|
||||
except subprocess.CalledProcessError as e:
|
||||
output_bytes = e.stdout
|
||||
finally:
|
||||
print('out_bytes: ', output_bytes)
|
||||
logging.info(f'out_bytes: {output_bytes!r}')
|
||||
output_str = str(output_bytes)
|
||||
print('out_str: ', output_str)
|
||||
logging.info(f'out_str: {output_str}')
|
||||
|
||||
assert 'hostname = [esp-ot-br.local]' in str(output_str)
|
||||
assert ('address = [' + ipv4_address + ']') in str(output_str)
|
||||
@@ -849,7 +850,7 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, I
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
|
||||
br = dut[2]
|
||||
cli = dut[1]
|
||||
assert Init_interface
|
||||
@@ -887,7 +888,7 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
|
||||
trel_s3 = dut[1]
|
||||
trel_c6 = dut[0]
|
||||
trel_list = [trel_c6]
|
||||
@@ -940,7 +941,7 @@ def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None:
|
||||
br = dut[1]
|
||||
dut[0].serial.stop_redirect_thread()
|
||||
try:
|
||||
@@ -978,7 +979,7 @@ def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
],
|
||||
indirect=True,
|
||||
)
|
||||
def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None:
|
||||
def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None:
|
||||
leader = dut[0]
|
||||
ssed_device = dut[1]
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user