example: add socket scripts again

This commit is contained in:
Chen Yudong
2022-12-16 13:51:20 +08:00
parent ec9abc7b02
commit 51d2d20f75
9 changed files with 324 additions and 171 deletions

View File

@@ -2,46 +2,25 @@
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import socket
import pytest
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
from pytest_embedded import Dut
try:
from run_udp_client import udp_client
except ImportError:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
from run_udp_client import udp_client
PORT = 3333
MESSAGE = 'Data to ESP'
MAX_RETRIES = 3
def udp_client(address: str, payload: str) -> str:
for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
socket.SOCK_DGRAM, 0, socket.AI_PASSIVE):
family_addr, socktype, proto, canonname, addr = res
try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM)
sock.settimeout(20.0)
except socket.error as msg:
print('Could not create socket')
print(os.strerror(msg.errno))
raise
try:
sock.sendto(payload.encode(), addr)
reply, addr = sock.recvfrom(128)
if not reply:
return ''
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
except socket.timeout:
print('Socket operation timeout')
return ''
except socket.error as msg:
print('Error while sending or receiving data from the socket')
print(os.strerror(msg.errno))
sock.close()
raise
return reply.decode()
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_udp_server_ipv4(dut: Dut) -> None:
@@ -59,7 +38,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None:
# test IPv4
for _ in range(MAX_RETRIES):
print('Testing UDP on IPv4...')
received = udp_client(ipv4, MESSAGE)
received = udp_client(ipv4, PORT, MESSAGE)
if received == MESSAGE:
print('OK')
break
@@ -89,7 +68,7 @@ def test_examples_udp_server_ipv6(dut: Dut) -> None:
# test IPv6
for _ in range(MAX_RETRIES):
print('Testing UDP on IPv6...')
received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE)
received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
if received == MESSAGE:
print('OK')
break