mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-10 04:43:33 +00:00
https examples pytest migration
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
| Supported Targets | ESP32 | ESP32-S2 | ESP32-S3 | ESP32-C3 |
|
||||
| ----------------- | ----- | -------- | -------- | -------- |
|
||||
# HTTP server with SSL support using OpenSSL
|
||||
|
||||
This example creates a SSL server that returns a simple HTML page when you visit its root URL.
|
||||
|
@@ -4,13 +4,12 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import http.client
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import ssl
|
||||
|
||||
import tiny_test_fw
|
||||
import ttfw_idf
|
||||
from tiny_test_fw import Utility
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
|
||||
server_cert_pem = '-----BEGIN CERTIFICATE-----\n'\
|
||||
'MIIDKzCCAhOgAwIBAgIUBxM3WJf2bP12kAfqhmhhjZWv0ukwDQYJKoZIhvcNAQEL\n'\
|
||||
@@ -90,33 +89,36 @@ client_key_pem = '-----BEGIN PRIVATE KEY-----\n' \
|
||||
success_response = '<h1>Hello Secure World!</h1>'
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_https_server_simple(env, extra_data): # type: (tiny_test_fw.Env.Env, None) -> None # pylint: disable=unused-argument
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
def test_examples_protocol_https_server_simple(dut: Dut) -> None:
|
||||
"""
|
||||
steps: |
|
||||
1. join AP
|
||||
2. connect to www.howsmyssl.com:443
|
||||
3. send http request
|
||||
"""
|
||||
dut1 = env.get_dut('https_server_simple', 'examples/protocols/https_server/simple', dut_class=ttfw_idf.ESP32DUT)
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'https_server.bin')
|
||||
binary_file = os.path.join(dut.app.binary_path, 'https_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('https_server_simple_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
logging.info('https_server_simple_bin_size : {}KB'.format(bin_size // 1024))
|
||||
# start test
|
||||
dut1.start_app()
|
||||
# Parse IP address and port of the server
|
||||
dut1.expect(re.compile(r'Starting server'))
|
||||
got_port = dut1.expect(re.compile(r'Server listening on port (\d+)'), timeout=30)[0]
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
logging.info('Waiting to connect with AP')
|
||||
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
|
||||
got_ip = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)'), timeout=30)[0]
|
||||
# Expected logs
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
Utility.console_log('Performing GET request over an SSL connection with the server')
|
||||
logging.info('Performing GET request over an SSL connection with the server')
|
||||
|
||||
CLIENT_CERT_FILE = 'client_cert.pem'
|
||||
CLIENT_KEY_FILE = 'client_key.pem'
|
||||
@@ -133,51 +135,61 @@ def test_examples_protocol_https_server_simple(env, extra_data): # type: (tiny_
|
||||
ssl_context.load_cert_chain(certfile=CLIENT_CERT_FILE, keyfile=CLIENT_KEY_FILE)
|
||||
|
||||
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
||||
Utility.console_log('Performing SSL handshake with the server')
|
||||
logging.info('Performing SSL handshake with the server')
|
||||
conn.request('GET','/')
|
||||
resp = conn.getresponse()
|
||||
dut1.expect('performing session handshake')
|
||||
dut.expect('performing session handshake')
|
||||
got_resp = resp.read().decode('utf-8')
|
||||
if got_resp != success_response:
|
||||
Utility.console_log('Response obtained does not match with correct response')
|
||||
logging.info('Response obtained does not match with correct response')
|
||||
raise RuntimeError('Failed to test SSL connection')
|
||||
|
||||
current_cipher = dut1.expect(re.compile(r'Current Ciphersuite(.*)'), timeout=5)[0]
|
||||
Utility.console_log('Current Ciphersuite' + current_cipher)
|
||||
current_cipher = dut.expect(r'Current Ciphersuite(.*)', timeout=5)[0]
|
||||
logging.info('Current Ciphersuite {}'.format(current_cipher))
|
||||
|
||||
# Close the connection
|
||||
conn.close()
|
||||
|
||||
Utility.console_log('Checking user callback: Obtaining client certificate...')
|
||||
logging.info('Checking user callback: Obtaining client certificate...')
|
||||
|
||||
serial_number = dut1.expect(re.compile(r'serial number(.*)'), timeout=5)[0]
|
||||
issuer_name = dut1.expect(re.compile(r'issuer name(.*)'), timeout=5)[0]
|
||||
expiry = dut1.expect(re.compile(r'expires on(.*)'), timeout=5)[0]
|
||||
serial_number = dut.expect(r'serial number(.*)', timeout=5)[0]
|
||||
issuer_name = dut.expect(r'issuer name(.*)', timeout=5)[0]
|
||||
expiry = dut.expect(r'expires on ((.*)\d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])*)', timeout=5)[1].decode()
|
||||
|
||||
Utility.console_log('Serial No.' + serial_number)
|
||||
Utility.console_log('Issuer Name' + issuer_name)
|
||||
Utility.console_log('Expires on' + expiry)
|
||||
logging.info('Serial No. {}'.format(serial_number))
|
||||
logging.info('Issuer Name {}'.format(issuer_name))
|
||||
logging.info('Expires on {}'.format(expiry))
|
||||
|
||||
Utility.console_log('Correct response obtained')
|
||||
Utility.console_log('SSL connection test successful\nClosing the connection')
|
||||
logging.info('Correct response obtained')
|
||||
logging.info('SSL connection test successful\nClosing the connection')
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
@pytest.mark.parametrize('config', ['dynamic_buffer',], indirect=True)
|
||||
def test_examples_protocol_https_server_simple_dynamic_buffers(dut: Dut) -> None:
|
||||
# Test with mbedTLS dynamic buffer feature
|
||||
dut1 = env.get_dut('https_server_simple', 'examples/protocols/https_server/simple', dut_class=ttfw_idf.ESP32DUT, app_config_name='dynamic_buffer')
|
||||
|
||||
# start test
|
||||
dut1.start_app()
|
||||
# Parse IP address and port of the server
|
||||
dut1.expect(re.compile(r'Starting server'))
|
||||
got_port = dut1.expect(re.compile(r'Server listening on port (\d+)'), timeout=30)[0]
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
logging.info('Waiting to connect with AP')
|
||||
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
|
||||
got_ip = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)'), timeout=30)[0]
|
||||
# Expected logs
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
Utility.console_log('Performing GET request over an SSL connection with the server')
|
||||
logging.info('Performing GET request over an SSL connection with the server')
|
||||
|
||||
CLIENT_CERT_FILE = 'client_cert.pem'
|
||||
CLIENT_KEY_FILE = 'client_key.pem'
|
||||
|
||||
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
||||
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
||||
@@ -190,34 +202,30 @@ def test_examples_protocol_https_server_simple(env, extra_data): # type: (tiny_
|
||||
os.remove(CLIENT_KEY_FILE)
|
||||
|
||||
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
||||
Utility.console_log('Performing SSL handshake with the server')
|
||||
logging.info('Performing SSL handshake with the server')
|
||||
conn.request('GET','/')
|
||||
resp = conn.getresponse()
|
||||
dut1.expect('performing session handshake')
|
||||
dut.expect('performing session handshake')
|
||||
got_resp = resp.read().decode('utf-8')
|
||||
if got_resp != success_response:
|
||||
Utility.console_log('Response obtained does not match with correct response')
|
||||
logging.info('Response obtained does not match with correct response')
|
||||
raise RuntimeError('Failed to test SSL connection')
|
||||
|
||||
current_cipher = dut1.expect(re.compile(r'Current Ciphersuite(.*)'), timeout=5)[0]
|
||||
Utility.console_log('Current Ciphersuite' + current_cipher)
|
||||
current_cipher = dut.expect(r'Current Ciphersuite(.*)', timeout=5)[0]
|
||||
logging.info('Current Ciphersuite {}'.format(current_cipher))
|
||||
|
||||
# Close the connection
|
||||
conn.close()
|
||||
|
||||
Utility.console_log('Checking user callback: Obtaining client certificate...')
|
||||
logging.info('Checking user callback: Obtaining client certificate...')
|
||||
|
||||
serial_number = dut1.expect(re.compile(r'serial number(.*)'), timeout=5)[0]
|
||||
issuer_name = dut1.expect(re.compile(r'issuer name(.*)'), timeout=5)[0]
|
||||
expiry = dut1.expect(re.compile(r'expires on(.*)'), timeout=5)[0]
|
||||
serial_number = dut.expect(r'serial number(.*)', timeout=5)[0]
|
||||
issuer_name = dut.expect(r'issuer name(.*)', timeout=5)[0]
|
||||
expiry = dut.expect(r'expires on ((.*)\d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])*)', timeout=5)[1].decode()
|
||||
|
||||
Utility.console_log('Serial No.' + serial_number)
|
||||
Utility.console_log('Issuer Name' + issuer_name)
|
||||
Utility.console_log('Expires on' + expiry)
|
||||
logging.info('Serial No. : {}'.format(serial_number))
|
||||
logging.info('Issuer Name : {}'.format(issuer_name))
|
||||
logging.info('Expires on : {}'.format(expiry))
|
||||
|
||||
Utility.console_log('Correct response obtained')
|
||||
Utility.console_log('SSL connection test successful\nClosing the connection')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_https_server_simple() # pylint: disable=no-value-for-parameter
|
||||
logging.info('Correct response obtained')
|
||||
logging.info('SSL connection test successful\nClosing the connection')
|
Reference in New Issue
Block a user