Files
esp-idf/examples/network/vlan_support/pytest_vlan_napt.py
2025-03-05 12:08:48 +08:00

365 lines
11 KiB
Python

# SPDX-FileCopyrightText: 2021-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: CC0-1.0
import ipaddress
import subprocess
import threading
import time
from typing import Dict
from typing import Union
import pytest
from pytest_embedded import Dut
from pytest_embedded_idf.utils import idf_parametrize
from scapy import layers
from scapy.all import AsyncSniffer
from scapy.all import ICMP
from scapy.all import IP
from scapy.all import TCP
from scapy.all import UDP
udp_port = 1234
tcp_port = 4321
def run_cmd(command: str, secure: bool = False) -> str:
if secure is False:
print(f'Running: {command}')
cmd = command.strip().split(' ')
# Run the command
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send su password if secured
if secure is True:
proc.communicate(input=b'Esp@32')
# Get the output
stdout, stderr = proc.communicate()
# Print the output
if secure is False:
if len(stdout.decode('utf-8')) > 0:
print('Output: ', stdout.decode('utf-8'))
if len(stderr.decode('utf-8')) > 0:
print('Error: ', stderr.decode('utf-8'))
return stdout.decode('utf-8')
def run_cmd_sec(command: str) -> str:
print(f'Running secured: {command}')
return run_cmd(f'sudo -S -k {command}', secure=True)
def clear_network(config: dict) -> None:
# delete route
for each_cmd in config['delete_route_cmd_l']:
run_cmd_sec(each_cmd)
# destroy Vlan interfaces
for each_cmd in config['vlan_destroy_cmd_l']:
run_cmd_sec(each_cmd)
def setup_network(config: dict) -> None:
# Clear network before setting it up
clear_network(config)
# Create Vlan interfaces
for each_cmd in config['vlan_create_cmd_l']:
run_cmd_sec(each_cmd)
# set route
for each_cmd in config['set_route_cmd_l']:
run_cmd_sec(each_cmd)
def create_config(dut: Dut) -> dict:
pc_iface = dut.app.sdkconfig.get('EXAMPLE_VLAN_PYTEST_PC_IFACE')
vlanClient_conf = {
'id': str(dut.app.sdkconfig.get('EXAMPLE_ETHERNET_VLAN_ID')),
'name': 'vlanClient',
'ip': dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_ADDR_DEF_GW'),
}
vlanServer_conf = {
'id': str(dut.app.sdkconfig.get('EXAMPLE_EXTRA_ETHERNET_VLAN_ID')),
'name': 'vlanServer',
'ip': dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW'),
}
esp_vlanClient_ip = dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_IPV4_ADDR')
esp_vlanServer_ip = dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_IPV4_ADDR')
subnet_mask = ipaddress.IPv4Address('255.255.255.0')
tmp_ip = ipaddress.IPv4Address(vlanServer_conf['ip'])
vlanServer_net_addr = ipaddress.IPv4Address(int(tmp_ip) & int(subnet_mask))
config: Dict[str, Union[str, dict, dict, str, str, list, list, list, list]] = {
# Basic Configurations
'pc_iface': pc_iface,
'vlanClient': vlanClient_conf,
'vlanServer': vlanServer_conf,
'esp_vlanClient_ip': esp_vlanClient_ip,
'esp_vlanServer_ip': esp_vlanServer_ip,
'vlan_create_cmd_l': [
f'ip netns add ns_vlanClient',
f'ip link add link {pc_iface} name {vlanClient_conf["name"]} type vlan id {vlanClient_conf["id"]}',
f'ip link set {vlanClient_conf["name"]} netns ns_vlanClient',
f'ip netns exec ns_vlanClient ip addr add {vlanClient_conf["ip"]}/255.255.255.0 dev {vlanClient_conf["name"]}',
f'ip netns exec ns_vlanClient ip link set dev {vlanClient_conf["name"]} up',
f'ip link add link {pc_iface} name {vlanServer_conf["name"]} type vlan id {vlanServer_conf["id"]}',
f'ip addr add {vlanServer_conf["ip"]}/255.255.255.0 dev {vlanServer_conf["name"]}',
f'ip link set dev {vlanServer_conf["name"]} up',
],
'vlan_destroy_cmd_l': [
f'ip netns exec ns_vlanClient ip link set dev {vlanClient_conf["name"]} down',
f'ip netns exec ns_vlanClient ip link delete {vlanClient_conf["name"]}',
f'ip link set dev {vlanServer_conf["name"]} down',
f'ip link delete {vlanServer_conf["name"]}',
f'ip netns delete ns_vlanClient',
],
'set_route_cmd_l': [
f'ip netns exec ns_vlanClient ip route add {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'
],
'delete_route_cmd_l': [
f'ip netns exec ns_vlanClient ip route delete {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'
],
}
return config
# Ping Test
def ping_test(config: dict) -> None:
setup_network(config)
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='icmp', count=10)
# Start sniffing
capture.start()
time.sleep(1)
# Run network test commands here
print(
run_cmd_sec(
f'ip netns exec ns_vlanClient ping -I {config["vlanClient"]["ip"]} {config["vlanServer"]["ip"]} -c 10'
)
)
# Stop sniffing
capture.join(timeout=20)
vlanServer_pkt_list = capture.results
clear_network(config)
# Test Validation
vlanServer_forward_flag = False
vlanServer_return_flag = False
if vlanServer_pkt_list is None:
print('Failure: No packets captured')
assert False
print(f'Captured: {len(vlanServer_pkt_list)} packets on interface {config["vlanServer"]["name"]}')
for pkt in vlanServer_pkt_list:
print('Summary: ', pkt.summary())
if (
pkt[ICMP].type == 8
and pkt[IP].src == config['esp_vlanServer_ip']
and pkt[IP].dst == config['vlanServer']['ip']
):
vlanServer_forward_flag = True
if (
pkt[ICMP].type == 0
and pkt[IP].src == config['vlanServer']['ip']
and pkt[IP].dst == config['esp_vlanServer_ip']
):
vlanServer_return_flag = True
assert vlanServer_forward_flag and vlanServer_return_flag
# UDP Test
def udp_server(serverip: str, port: int) -> None:
print(f'UDP server listening on IP: {serverip} port: {port}')
run_cmd(f'timeout 10s iperf3 -s -p {port}')
def udp_client(serverip: str, port: int) -> None:
time.sleep(1)
print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -u -p {port} --bidir -k 20'))
def udp_server_client_comm(config: dict, port: int) -> None:
server_thread = threading.Thread(
target=udp_server,
args=(
config['vlanServer']['ip'],
port,
),
)
client_thread = threading.Thread(target=udp_client, args=(config['vlanServer']['ip'], port))
server_thread.start()
client_thread.start()
server_thread.join()
client_thread.join()
print('UDP Test Done')
def udp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
return UDP in packet and (packet[UDP].dport == udp_port or packet[UDP].sport == udp_port)
def udp_test(config: dict) -> None:
setup_network(config)
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='udp', lfilter=udp_lfilter, count=10)
# Start sniffing
capture.start()
time.sleep(1)
# Run network test commands here
udp_server_client_comm(config, udp_port)
# Stop sniffing
capture.join(timeout=20)
vlanServer_pkt_list = capture.results
clear_network(config)
# Test Validation
vlanServer_forward_flag = False
vlanServer_return_flag = False
if vlanServer_pkt_list is None:
print('Failure: No packets captured')
assert False
print(f'Captured: {len(vlanServer_pkt_list)} packets on interface {config["vlanServer"]["name"]}')
for pkt in vlanServer_pkt_list:
print('Summary: ', pkt.summary())
if UDP in pkt:
if (
pkt[UDP].dport == udp_port
and pkt[IP].src == config['esp_vlanServer_ip']
and pkt[IP].dst == config['vlanServer']['ip']
):
vlanServer_forward_flag = True
if (
pkt[UDP].sport == udp_port
and pkt[IP].src == config['vlanServer']['ip']
and pkt[IP].dst == config['esp_vlanServer_ip']
):
vlanServer_return_flag = True
assert vlanServer_forward_flag and vlanServer_return_flag
# TCP Test
def tcp_server(serverip: str, port: int) -> None:
print(f'TCP server listening on IP: {serverip} port: {port}')
run_cmd(f'timeout 10s iperf3 -s -p {port}')
def tcp_client(serverip: str, port: int) -> None:
time.sleep(1)
print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -p {port} --bidir -k 20'))
def tcp_server_client_comm(config: dict, port: int) -> None:
server_thread = threading.Thread(
target=tcp_server,
args=(
config['vlanServer']['ip'],
port,
),
)
client_thread = threading.Thread(target=tcp_client, args=(config['vlanServer']['ip'], port))
server_thread.start()
client_thread.start()
server_thread.join()
client_thread.join()
print('TCP Test Done')
def tcp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
return TCP in packet and (packet[TCP].dport == tcp_port or packet[TCP].sport == tcp_port)
def tcp_test(config: dict) -> None:
setup_network(config)
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='tcp', lfilter=tcp_lfilter, count=10)
# Start sniffing
capture.start()
time.sleep(1)
# Run network test commands here
tcp_server_client_comm(config, tcp_port)
# Stop sniffing
capture.join(timeout=20)
vlanServer_pkt_list = capture.results
clear_network(config)
# Test Validation
vlanServer_forward_flag = False
vlanServer_return_flag = False
if vlanServer_pkt_list is None:
print('Failure: No packets captured')
assert False
print(f'Captured: {len(vlanServer_pkt_list)} packets on interface {config["vlanServer"]["name"]}')
for pkt in vlanServer_pkt_list:
print('Summary: ', pkt.summary())
if TCP in pkt:
if (
pkt[TCP].dport == tcp_port
and pkt[IP].src == config['esp_vlanServer_ip']
and pkt[IP].dst == config['vlanServer']['ip']
):
vlanServer_forward_flag = True
if (
pkt[TCP].sport == tcp_port
and pkt[IP].src == config['vlanServer']['ip']
and pkt[IP].dst == config['esp_vlanServer_ip']
):
vlanServer_return_flag = True
assert vlanServer_forward_flag and vlanServer_return_flag
@pytest.mark.ethernet_vlan
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_vlan_napt_pingtest(dut: Dut) -> None:
dut.expect('main_task: Returned from app_main()')
test_conf = create_config(dut)
ping_test(test_conf)
@pytest.mark.ethernet_vlan
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_vlan_napt_udptest(dut: Dut) -> None:
dut.expect('main_task: Returned from app_main()')
test_conf = create_config(dut)
udp_test(test_conf)
@pytest.mark.ethernet_vlan
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_vlan_napt_tcptest(dut: Dut) -> None:
dut.expect('main_task: Returned from app_main()')
test_conf = create_config(dut)
tcp_test(test_conf)