mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-09 20:41:14 +00:00
test: format all test scripts
This commit is contained in:
@@ -1,22 +1,27 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2021-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: CC0-1.0
|
||||
|
||||
import ipaddress
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, Union
|
||||
from typing import Dict
|
||||
from typing import Union
|
||||
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
from pytest_embedded_idf.utils import idf_parametrize
|
||||
from scapy import layers
|
||||
from scapy.all import ICMP, IP, TCP, UDP, AsyncSniffer
|
||||
from scapy.all import AsyncSniffer
|
||||
from scapy.all import ICMP
|
||||
from scapy.all import IP
|
||||
from scapy.all import TCP
|
||||
from scapy.all import UDP
|
||||
|
||||
udp_port = 1234
|
||||
tcp_port = 4321
|
||||
|
||||
|
||||
def run_cmd(command: str, secure: bool=False) -> str:
|
||||
def run_cmd(command: str, secure: bool = False) -> str:
|
||||
if secure is False:
|
||||
print(f'Running: {command}')
|
||||
|
||||
@@ -73,12 +78,16 @@ def setup_network(config: dict) -> None:
|
||||
|
||||
def create_config(dut: Dut) -> dict:
|
||||
pc_iface = dut.app.sdkconfig.get('EXAMPLE_VLAN_PYTEST_PC_IFACE')
|
||||
vlanClient_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_ETHERNET_VLAN_ID')),
|
||||
'name': 'vlanClient',
|
||||
'ip': dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_ADDR_DEF_GW')}
|
||||
vlanServer_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_EXTRA_ETHERNET_VLAN_ID')),
|
||||
'name': 'vlanServer',
|
||||
'ip': dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW')}
|
||||
vlanClient_conf = {
|
||||
'id': str(dut.app.sdkconfig.get('EXAMPLE_ETHERNET_VLAN_ID')),
|
||||
'name': 'vlanClient',
|
||||
'ip': dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_ADDR_DEF_GW'),
|
||||
}
|
||||
vlanServer_conf = {
|
||||
'id': str(dut.app.sdkconfig.get('EXAMPLE_EXTRA_ETHERNET_VLAN_ID')),
|
||||
'name': 'vlanServer',
|
||||
'ip': dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW'),
|
||||
}
|
||||
esp_vlanClient_ip = dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_IPV4_ADDR')
|
||||
esp_vlanServer_ip = dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_IPV4_ADDR')
|
||||
|
||||
@@ -89,31 +98,33 @@ def create_config(dut: Dut) -> dict:
|
||||
config: Dict[str, Union[str, dict, dict, str, str, list, list, list, list]] = {
|
||||
# Basic Configurations
|
||||
'pc_iface': pc_iface,
|
||||
|
||||
'vlanClient': vlanClient_conf,
|
||||
'vlanServer': vlanServer_conf,
|
||||
|
||||
'esp_vlanClient_ip': esp_vlanClient_ip,
|
||||
'esp_vlanServer_ip': esp_vlanServer_ip,
|
||||
|
||||
'vlan_create_cmd_l': [f'ip netns add ns_vlanClient',
|
||||
f"ip link add link {pc_iface} name {vlanClient_conf['name']} type vlan id {vlanClient_conf['id']}",
|
||||
f"ip link set {vlanClient_conf['name']} netns ns_vlanClient",
|
||||
f"ip netns exec ns_vlanClient ip addr add {vlanClient_conf['ip']}/255.255.255.0 dev {vlanClient_conf['name']}",
|
||||
f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} up",
|
||||
f"ip link add link {pc_iface} name {vlanServer_conf['name']} type vlan id {vlanServer_conf['id']}",
|
||||
f"ip addr add {vlanServer_conf['ip']}/255.255.255.0 dev {vlanServer_conf['name']}",
|
||||
f"ip link set dev {vlanServer_conf['name']} up"],
|
||||
|
||||
'vlan_destroy_cmd_l': [f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} down",
|
||||
f"ip netns exec ns_vlanClient ip link delete {vlanClient_conf['name']}",
|
||||
f"ip link set dev {vlanServer_conf['name']} down",
|
||||
f"ip link delete {vlanServer_conf['name']}",
|
||||
f'ip netns delete ns_vlanClient'],
|
||||
|
||||
'set_route_cmd_l': [f'ip netns exec ns_vlanClient ip route add {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
|
||||
|
||||
'delete_route_cmd_l': [f'ip netns exec ns_vlanClient ip route delete {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
|
||||
'vlan_create_cmd_l': [
|
||||
f'ip netns add ns_vlanClient',
|
||||
f'ip link add link {pc_iface} name {vlanClient_conf["name"]} type vlan id {vlanClient_conf["id"]}',
|
||||
f'ip link set {vlanClient_conf["name"]} netns ns_vlanClient',
|
||||
f'ip netns exec ns_vlanClient ip addr add {vlanClient_conf["ip"]}/255.255.255.0 dev {vlanClient_conf["name"]}',
|
||||
f'ip netns exec ns_vlanClient ip link set dev {vlanClient_conf["name"]} up',
|
||||
f'ip link add link {pc_iface} name {vlanServer_conf["name"]} type vlan id {vlanServer_conf["id"]}',
|
||||
f'ip addr add {vlanServer_conf["ip"]}/255.255.255.0 dev {vlanServer_conf["name"]}',
|
||||
f'ip link set dev {vlanServer_conf["name"]} up',
|
||||
],
|
||||
'vlan_destroy_cmd_l': [
|
||||
f'ip netns exec ns_vlanClient ip link set dev {vlanClient_conf["name"]} down',
|
||||
f'ip netns exec ns_vlanClient ip link delete {vlanClient_conf["name"]}',
|
||||
f'ip link set dev {vlanServer_conf["name"]} down',
|
||||
f'ip link delete {vlanServer_conf["name"]}',
|
||||
f'ip netns delete ns_vlanClient',
|
||||
],
|
||||
'set_route_cmd_l': [
|
||||
f'ip netns exec ns_vlanClient ip route add {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'
|
||||
],
|
||||
'delete_route_cmd_l': [
|
||||
f'ip netns exec ns_vlanClient ip route delete {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'
|
||||
],
|
||||
}
|
||||
|
||||
return config
|
||||
@@ -121,7 +132,6 @@ def create_config(dut: Dut) -> dict:
|
||||
|
||||
# Ping Test
|
||||
def ping_test(config: dict) -> None:
|
||||
|
||||
setup_network(config)
|
||||
|
||||
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='icmp', count=10)
|
||||
@@ -131,7 +141,11 @@ def ping_test(config: dict) -> None:
|
||||
|
||||
time.sleep(1)
|
||||
# Run network test commands here
|
||||
print(run_cmd_sec(f"ip netns exec ns_vlanClient ping -I {config['vlanClient']['ip']} {config['vlanServer']['ip']} -c 10"))
|
||||
print(
|
||||
run_cmd_sec(
|
||||
f'ip netns exec ns_vlanClient ping -I {config["vlanClient"]["ip"]} {config["vlanServer"]["ip"]} -c 10'
|
||||
)
|
||||
)
|
||||
|
||||
# Stop sniffing
|
||||
capture.join(timeout=20)
|
||||
@@ -147,12 +161,20 @@ def ping_test(config: dict) -> None:
|
||||
print('Failure: No packets captured')
|
||||
assert False
|
||||
|
||||
print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
|
||||
print(f'Captured: {len(vlanServer_pkt_list)} packets on interface {config["vlanServer"]["name"]}')
|
||||
for pkt in vlanServer_pkt_list:
|
||||
print('Summary: ', pkt.summary())
|
||||
if pkt[ICMP].type == 8 and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
|
||||
if (
|
||||
pkt[ICMP].type == 8
|
||||
and pkt[IP].src == config['esp_vlanServer_ip']
|
||||
and pkt[IP].dst == config['vlanServer']['ip']
|
||||
):
|
||||
vlanServer_forward_flag = True
|
||||
if pkt[ICMP].type == 0 and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
|
||||
if (
|
||||
pkt[ICMP].type == 0
|
||||
and pkt[IP].src == config['vlanServer']['ip']
|
||||
and pkt[IP].dst == config['esp_vlanServer_ip']
|
||||
):
|
||||
vlanServer_return_flag = True
|
||||
|
||||
assert vlanServer_forward_flag and vlanServer_return_flag
|
||||
@@ -170,7 +192,13 @@ def udp_client(serverip: str, port: int) -> None:
|
||||
|
||||
|
||||
def udp_server_client_comm(config: dict, port: int) -> None:
|
||||
server_thread = threading.Thread(target=udp_server, args=(config['vlanServer']['ip'], port,))
|
||||
server_thread = threading.Thread(
|
||||
target=udp_server,
|
||||
args=(
|
||||
config['vlanServer']['ip'],
|
||||
port,
|
||||
),
|
||||
)
|
||||
client_thread = threading.Thread(target=udp_client, args=(config['vlanServer']['ip'], port))
|
||||
|
||||
server_thread.start()
|
||||
@@ -186,13 +214,9 @@ def udp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
|
||||
|
||||
|
||||
def udp_test(config: dict) -> None:
|
||||
|
||||
setup_network(config)
|
||||
|
||||
capture = AsyncSniffer(iface=config['vlanServer']['name'],
|
||||
filter='udp',
|
||||
lfilter=udp_lfilter,
|
||||
count=10)
|
||||
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='udp', lfilter=udp_lfilter, count=10)
|
||||
|
||||
# Start sniffing
|
||||
capture.start()
|
||||
@@ -216,13 +240,21 @@ def udp_test(config: dict) -> None:
|
||||
print('Failure: No packets captured')
|
||||
assert False
|
||||
|
||||
print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
|
||||
print(f'Captured: {len(vlanServer_pkt_list)} packets on interface {config["vlanServer"]["name"]}')
|
||||
for pkt in vlanServer_pkt_list:
|
||||
print('Summary: ', pkt.summary())
|
||||
if UDP in pkt:
|
||||
if pkt[UDP].dport == udp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
|
||||
if (
|
||||
pkt[UDP].dport == udp_port
|
||||
and pkt[IP].src == config['esp_vlanServer_ip']
|
||||
and pkt[IP].dst == config['vlanServer']['ip']
|
||||
):
|
||||
vlanServer_forward_flag = True
|
||||
if pkt[UDP].sport == udp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
|
||||
if (
|
||||
pkt[UDP].sport == udp_port
|
||||
and pkt[IP].src == config['vlanServer']['ip']
|
||||
and pkt[IP].dst == config['esp_vlanServer_ip']
|
||||
):
|
||||
vlanServer_return_flag = True
|
||||
|
||||
assert vlanServer_forward_flag and vlanServer_return_flag
|
||||
@@ -240,7 +272,13 @@ def tcp_client(serverip: str, port: int) -> None:
|
||||
|
||||
|
||||
def tcp_server_client_comm(config: dict, port: int) -> None:
|
||||
server_thread = threading.Thread(target=tcp_server, args=(config['vlanServer']['ip'], port,))
|
||||
server_thread = threading.Thread(
|
||||
target=tcp_server,
|
||||
args=(
|
||||
config['vlanServer']['ip'],
|
||||
port,
|
||||
),
|
||||
)
|
||||
client_thread = threading.Thread(target=tcp_client, args=(config['vlanServer']['ip'], port))
|
||||
|
||||
server_thread.start()
|
||||
@@ -256,13 +294,9 @@ def tcp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
|
||||
|
||||
|
||||
def tcp_test(config: dict) -> None:
|
||||
|
||||
setup_network(config)
|
||||
|
||||
capture = AsyncSniffer(iface=config['vlanServer']['name'],
|
||||
filter='tcp',
|
||||
lfilter=tcp_lfilter,
|
||||
count=10)
|
||||
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='tcp', lfilter=tcp_lfilter, count=10)
|
||||
|
||||
# Start sniffing
|
||||
capture.start()
|
||||
@@ -286,36 +320,44 @@ def tcp_test(config: dict) -> None:
|
||||
print('Failure: No packets captured')
|
||||
assert False
|
||||
|
||||
print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
|
||||
print(f'Captured: {len(vlanServer_pkt_list)} packets on interface {config["vlanServer"]["name"]}')
|
||||
for pkt in vlanServer_pkt_list:
|
||||
print('Summary: ', pkt.summary())
|
||||
if TCP in pkt:
|
||||
if pkt[TCP].dport == tcp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
|
||||
if (
|
||||
pkt[TCP].dport == tcp_port
|
||||
and pkt[IP].src == config['esp_vlanServer_ip']
|
||||
and pkt[IP].dst == config['vlanServer']['ip']
|
||||
):
|
||||
vlanServer_forward_flag = True
|
||||
if pkt[TCP].sport == tcp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
|
||||
if (
|
||||
pkt[TCP].sport == tcp_port
|
||||
and pkt[IP].src == config['vlanServer']['ip']
|
||||
and pkt[IP].dst == config['esp_vlanServer_ip']
|
||||
):
|
||||
vlanServer_return_flag = True
|
||||
|
||||
assert vlanServer_forward_flag and vlanServer_return_flag
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.ethernet_vlan
|
||||
@idf_parametrize('target', ['esp32'], indirect=['target'])
|
||||
def test_vlan_napt_pingtest(dut: Dut) -> None:
|
||||
dut.expect('main_task: Returned from app_main()')
|
||||
test_conf = create_config(dut)
|
||||
ping_test(test_conf)
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.ethernet_vlan
|
||||
@idf_parametrize('target', ['esp32'], indirect=['target'])
|
||||
def test_vlan_napt_udptest(dut: Dut) -> None:
|
||||
dut.expect('main_task: Returned from app_main()')
|
||||
test_conf = create_config(dut)
|
||||
udp_test(test_conf)
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.ethernet_vlan
|
||||
@idf_parametrize('target', ['esp32'], indirect=['target'])
|
||||
def test_vlan_napt_tcptest(dut: Dut) -> None:
|
||||
dut.expect('main_task: Returned from app_main()')
|
||||
test_conf = create_config(dut)
|
||||
|
Reference in New Issue
Block a user