freemodbus: allow address gaps in master data dictionary (support of UID field in MBAP)

This commit is contained in:
Alex Lisitsyn
2022-01-21 05:18:00 +00:00
parent 09f5c1d32e
commit e9cdb3e0d3
20 changed files with 651 additions and 462 deletions

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2016-2021 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
@@ -72,7 +72,7 @@ class DutTestThread(Thread):
super(DutTestThread, self).__init__()
def __enter__(self):
logger.debug('Restart %s.' % self.tname)
logger.debug('Restart %s.', self.tname)
# Reset DUT first
self.dut.reset()
# Capture output from the DUT
@@ -83,7 +83,7 @@ class DutTestThread(Thread):
""" The exit method of context manager
"""
if exc_type is not None or exc_value is not None:
logger.info('Thread %s rised an exception type: %s, value: %s' % (self.tname, str(exc_type), str(exc_value)))
logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value))
def run(self):
""" The function implements thread functionality
@@ -98,7 +98,7 @@ class DutTestThread(Thread):
# Check DUT exceptions
dut_exceptions = self.dut.get_exceptions()
if 'Guru Meditation Error:' in dut_exceptions:
raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions))
# Mark thread has run to completion without any exceptions
self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
@@ -109,15 +109,17 @@ class DutTestThread(Thread):
message = r'.*Waiting IP([0-9]{1,2}) from stdin.*'
# Read all data from previous restart to get prompt correctly
self.dut.read()
result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT)
result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
if int(result[0]) != index:
raise Exception('Incorrect index of IP=%d for %s\n' % (int(result[0]), str(self.dut)))
message = 'IP%s=%s' % (result[0], self.ip_addr)
self.dut.write(message, '\r\n', False)
logger.debug('Sent message for %s: %s' % (self.tname, message))
raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut)))
# Use the same slave IP address for all characteristics during the test
self.dut.write('IP0=' + self.ip_addr, '\n', False)
self.dut.write('IP1=' + self.ip_addr, '\n', False)
self.dut.write('IP2=' + self.ip_addr, '\n', False)
logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname)
message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT)
logger.debug('Thread %s initialized with slave IP (%s).' % (self.tname, result[0]))
result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr)
def test_start(self, timeout_value):
""" The method to initialize and handle test stages
@@ -125,37 +127,37 @@ class DutTestThread(Thread):
def handle_get_ip4(data):
""" Handle get_ip v4
"""
logger.debug('%s[STACK_IPV4]: %s' % (self.tname, str(data)))
logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data))
self.test_stage = STACK_IPV4
def handle_get_ip6(data):
""" Handle get_ip v6
"""
logger.debug('%s[STACK_IPV6]: %s' % (self.tname, str(data)))
logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data))
self.test_stage = STACK_IPV6
def handle_init(data):
""" Handle init
"""
logger.debug('%s[STACK_INIT]: %s' % (self.tname, str(data)))
logger.debug('%s[STACK_INIT]: %s', self.tname, str(data))
self.test_stage = STACK_INIT
def handle_connect(data):
""" Handle connect
"""
logger.debug('%s[STACK_CONNECT]: %s' % (self.tname, str(data)))
logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data))
self.test_stage = STACK_CONNECT
def handle_test_start(data):
""" Handle connect
"""
logger.debug('%s[STACK_START]: %s' % (self.tname, str(data)))
logger.debug('%s[STACK_START]: %s', self.tname, str(data))
self.test_stage = STACK_START
def handle_par_ok(data):
""" Handle parameter ok
"""
logger.debug('%s[READ_PAR_OK]: %s' % (self.tname, str(data)))
logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data))
if self.test_stage >= STACK_START:
self.param_ok_count += 1
self.test_stage = STACK_PAR_OK
@@ -163,14 +165,14 @@ class DutTestThread(Thread):
def handle_par_fail(data):
""" Handle parameter fail
"""
logger.debug('%s[READ_PAR_FAIL]: %s' % (self.tname, str(data)))
logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data))
self.param_fail_count += 1
self.test_stage = STACK_PAR_FAIL
def handle_destroy(data):
""" Handle destroy
"""
logger.debug('%s[DESTROY]: %s' % (self.tname, str(data)))
logger.debug('%s[DESTROY]: %s', self.tname, str(data))
self.test_stage = STACK_DESTROY
self.test_finish = True
@@ -186,7 +188,7 @@ class DutTestThread(Thread):
(re.compile(self.expected[STACK_DESTROY]), handle_destroy),
timeout=timeout_value)
except DUT.ExpectTimeout:
logger.debug('%s, expect timeout on stage #%d (%s seconds)' % (self.tname, self.test_stage, timeout_value))
logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value)
self.test_finish = True
@@ -196,10 +198,10 @@ def test_check_mode(dut=None, mode_str=None, value=None):
global logger
try:
opt = dut.app.get_sdkconfig()[mode_str]
logger.debug('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt)
return value == opt
except Exception:
logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str)
return False
@@ -238,7 +240,7 @@ def test_modbus_communication(env, comm_mode):
master_name = TEST_MASTER_TCP
else:
logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
raise Exception('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
address = None
if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'):
logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n')
@@ -252,9 +254,9 @@ def test_modbus_communication(env, comm_mode):
if address is not None:
print('Found IP slave address: %s' % address[0])
else:
raise Exception('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
else:
raise Exception('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
# Create thread for each dut
with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
@@ -268,32 +270,32 @@ def test_modbus_communication(env, comm_mode):
dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
if dut_slave_thread.isAlive():
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
if dut_slave_thread.is_alive():
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
if dut_master_thread.isAlive():
logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
raise Exception('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
if dut_master_thread.is_alive():
logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n' %
(dut_master_thread.tname, dut_master_thread.param_fail_count,
dut_slave_thread.tname, dut_slave_thread.param_fail_count))
logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n' %
(dut_master_thread.tname, dut_master_thread.param_ok_count,
dut_slave_thread.tname, dut_slave_thread.param_ok_count))
logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n',
dut_master_thread.tname, dut_master_thread.param_fail_count,
dut_slave_thread.tname, dut_slave_thread.param_fail_count)
logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n',
dut_master_thread.tname, dut_master_thread.param_ok_count,
dut_slave_thread.tname, dut_slave_thread.param_ok_count)
if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
(dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
(dut_slave_thread.param_ok_count == 0) or
(dut_master_thread.param_ok_count == 0)):
raise Exception('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
(dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
(dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n')
finally: