mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-10-25 19:28:14 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			807 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			807 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| #
 | |
| # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| """
 | |
| Test script for unit test case.
 | |
| """
 | |
| 
 | |
| import argparse
 | |
| import re
 | |
| import threading
 | |
| import time
 | |
| 
 | |
| import ttfw_idf
 | |
| from tiny_test_fw import DUT, Env, TinyFW, Utility
 | |
| from tiny_test_fw.TinyFW import TestCaseFailed
 | |
| from tiny_test_fw.Utility import format_case_id, handle_unexpected_exception
 | |
| 
 | |
| UT_APP_BOOT_UP_DONE = 'Press ENTER to see the list of tests.'
 | |
| 
 | |
| STRIP_CONFIG_PATTERN = re.compile(r'(.+?)(_\d+)?$')
 | |
| 
 | |
| # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)"
 | |
| RESET_PATTERN = re.compile(r'(rst:0x[0-9a-fA-F]*\s\([\w].*?\),boot:0x[0-9a-fA-F]*\s\([\w].*?\))')
 | |
| 
 | |
| EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
 | |
| ABORT_PATTERN = re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)')
 | |
| FINISH_PATTERN = re.compile(r'1 Tests (\d) Failures (\d) Ignored')
 | |
| END_LIST_STR = r'\r?\nEnter test for running'
 | |
| TEST_PATTERN = re.compile(r'\((\d+)\)\s+"([^"]+)" ([^\r\n]+)\r?\n(' + END_LIST_STR + r')?')
 | |
| TEST_SUBMENU_PATTERN = re.compile(r'\s+\((\d+)\)\s+"[^"]+"\r?\n(?=(?=\()|(' + END_LIST_STR + r'))')
 | |
| UT_APP_PATH = 'tools/unit-test-app'
 | |
| 
 | |
| SIMPLE_TEST_ID = 0
 | |
| MULTI_STAGE_ID = 1
 | |
| MULTI_DEVICE_ID = 2
 | |
| 
 | |
| DEFAULT_TIMEOUT = 20
 | |
| 
 | |
| DUT_DELAY_AFTER_RESET = 2
 | |
| DUT_STARTUP_CHECK_RETRY_COUNT = 5
 | |
| TEST_HISTORY_CHECK_TIMEOUT = 2
 | |
| 
 | |
| 
 | |
| def reset_reason_matches(reported_str, expected_str):
 | |
|     known_aliases = {
 | |
|         '_RESET': '_RST',
 | |
|         'POWERON_RESET': 'POWERON',
 | |
|         'DEEPSLEEP_RESET': 'DSLEEP',
 | |
|     }
 | |
| 
 | |
|     if expected_str in reported_str:
 | |
|         return True
 | |
| 
 | |
|     for token, alias in known_aliases.items():
 | |
|         if token in expected_str:
 | |
|             alt_expected_str = expected_str.replace(token, alias)
 | |
|             if alt_expected_str in reported_str:
 | |
|                 return True
 | |
| 
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def format_test_case_config(test_case_data, target='esp32'):
 | |
|     """
 | |
|     convert the test case data to unified format.
 | |
|     We need to following info to run unit test cases:
 | |
| 
 | |
|     1. unit test app config
 | |
|     2. test case name
 | |
|     3. test case reset info
 | |
| 
 | |
|     the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
 | |
|     Each test case is a dict with "name" and "reset" as keys. For example::
 | |
| 
 | |
|     case_config = {
 | |
|         "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
 | |
|         "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
 | |
|     }
 | |
| 
 | |
|     If config is not specified for test case, then
 | |
| 
 | |
|     :param test_case_data: string, list, or a dictionary list
 | |
|     :param target: target
 | |
|     :return: formatted data
 | |
|     """
 | |
| 
 | |
|     case_config = dict()
 | |
| 
 | |
|     def parse_case(one_case_data):
 | |
|         """ parse and format one case """
 | |
| 
 | |
|         def process_reset_list(reset_list):
 | |
|             # strip space and remove white space only items
 | |
|             _output = list()
 | |
|             for _r in reset_list:
 | |
|                 _data = _r.strip(' ')
 | |
|                 if _data:
 | |
|                     _output.append(_data)
 | |
|             return _output
 | |
| 
 | |
|         _case = dict()
 | |
|         if isinstance(one_case_data, str):
 | |
|             _temp = one_case_data.split(' [reset=')
 | |
|             _case['name'] = _temp[0]
 | |
|             try:
 | |
|                 _case['reset'] = process_reset_list(_temp[1][0:-1].split(','))
 | |
|             except IndexError:
 | |
|                 _case['reset'] = list()
 | |
|         elif isinstance(one_case_data, dict):
 | |
|             _case = one_case_data.copy()
 | |
|             assert 'name' in _case
 | |
|             if 'reset' not in _case:
 | |
|                 _case['reset'] = list()
 | |
|             else:
 | |
|                 if isinstance(_case['reset'], str):
 | |
|                     _case['reset'] = process_reset_list(_case['reset'].split(','))
 | |
|         else:
 | |
|             raise TypeError('Not supported type during parsing unit test case')
 | |
| 
 | |
|         if 'config' not in _case:
 | |
|             _case['config'] = 'default'
 | |
| 
 | |
|         if 'target' not in _case:
 | |
|             _case['target'] = target
 | |
| 
 | |
|         return _case
 | |
| 
 | |
|     if not isinstance(test_case_data, list):
 | |
|         test_case_data = [test_case_data]
 | |
| 
 | |
|     for case_data in test_case_data:
 | |
|         parsed_case = parse_case(case_data)
 | |
|         try:
 | |
|             case_config[parsed_case['config']].append(parsed_case)
 | |
|         except KeyError:
 | |
|             case_config[parsed_case['config']] = [parsed_case]
 | |
| 
 | |
|     return case_config
 | |
| 
 | |
| 
 | |
| def replace_app_bin(dut, name, new_app_bin):
 | |
|     if new_app_bin is None:
 | |
|         return
 | |
|     search_pattern = '/{}.bin'.format(name)
 | |
|     for i, config in enumerate(dut.download_config):
 | |
|         if config.endswith(search_pattern):
 | |
|             dut.download_config[i] = new_app_bin
 | |
|             Utility.console_log('The replaced application binary is {}'.format(new_app_bin), 'O')
 | |
|             break
 | |
| 
 | |
| 
 | |
| def format_case_name(case):
 | |
|     # we could split cases of same config into multiple binaries as we have limited rom space
 | |
|     # we should regard those configs like `default` and `default_2` as the same config
 | |
|     match = STRIP_CONFIG_PATTERN.match(case['config'])
 | |
|     stripped_config_name = match.group(1)
 | |
|     return format_case_id(case['name'], target=case['target'], config=stripped_config_name)
 | |
| 
 | |
| 
 | |
| def reset_dut(dut):
 | |
|     dut.reset()
 | |
|     # esptool ``run`` cmd takes quite long time.
 | |
|     # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
 | |
|     # this could cause checking bootup print failed.
 | |
|     # now use input cmd `-` and check test history to check if DUT is bootup.
 | |
|     # we'll retry this step for a few times,
 | |
|     # in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
 | |
|     #
 | |
|     # during bootup, DUT might only receive part of the first `-` command.
 | |
|     # If it only receive `\n`, then it will print all cases. It could take more than 5 seconds, reset check will fail.
 | |
|     # To solve this problem, we will add a delay between reset and input `-` command. And we'll also enlarge expect timeout.
 | |
|     time.sleep(DUT_DELAY_AFTER_RESET)
 | |
|     for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
 | |
|         dut.write('-')
 | |
|         try:
 | |
|             dut.expect('0 Tests 0 Failures 0 Ignored', timeout=TEST_HISTORY_CHECK_TIMEOUT)
 | |
|             break
 | |
|         except DUT.ExpectTimeout:
 | |
|             pass
 | |
|     else:
 | |
|         raise AssertionError('Reset {} ({}) failed!'.format(dut.name, dut.port))
 | |
| 
 | |
| 
 | |
| def log_test_case(description, test_case, ut_config):
 | |
|     Utility.console_log("Running {} '{}' (config {})".format(description, test_case['name'], ut_config),
 | |
|                         color='orange')
 | |
|     Utility.console_log('Tags: %s' % ', '.join('%s=%s' % (k, v) for (k, v) in test_case.items()
 | |
|                                                if k != 'name' and v is not None),
 | |
|                         color='orange')
 | |
| 
 | |
| 
 | |
| def run_one_normal_case(dut, one_case, junit_test_case):
 | |
|     reset_dut(dut)
 | |
| 
 | |
|     dut.start_capture_raw_data()
 | |
|     # run test case
 | |
|     dut.write("\"{}\"".format(one_case['name']))
 | |
|     dut.expect('Running ' + one_case['name'] + '...')
 | |
| 
 | |
|     exception_reset_list = []
 | |
| 
 | |
|     # we want to set this flag in callbacks (inner functions)
 | |
|     # use list here so we can use append to set this flag
 | |
|     test_finish = list()
 | |
| 
 | |
|     # expect callbacks
 | |
|     def one_case_finish(result):
 | |
|         """ one test finished, let expect loop break and log result """
 | |
|         test_finish.append(True)
 | |
|         output = dut.stop_capture_raw_data()
 | |
|         if result:
 | |
|             Utility.console_log('Success: ' + format_case_name(one_case), color='green')
 | |
|         else:
 | |
|             Utility.console_log('Failed: ' + format_case_name(one_case), color='red')
 | |
|             junit_test_case.add_failure_info(output)
 | |
|             raise TestCaseFailed(format_case_name(one_case))
 | |
| 
 | |
|     def handle_exception_reset(data):
 | |
|         """
 | |
|         just append data to exception list.
 | |
|         exception list will be checked in ``handle_reset_finish``, once reset finished.
 | |
|         """
 | |
|         exception_reset_list.append(data[0])
 | |
| 
 | |
|     def handle_test_finish(data):
 | |
|         """ test finished without reset """
 | |
|         # in this scenario reset should not happen
 | |
|         assert not exception_reset_list
 | |
|         if int(data[1]):
 | |
|             # case ignored
 | |
|             Utility.console_log('Ignored: ' + format_case_name(one_case), color='orange')
 | |
|             junit_test_case.add_skipped_info('ignored')
 | |
|         one_case_finish(not int(data[0]))
 | |
| 
 | |
|     def handle_reset_finish(data):
 | |
|         """ reset happened and reboot finished """
 | |
|         assert exception_reset_list  # reboot but no exception/reset logged. should never happen
 | |
|         result = False
 | |
|         if len(one_case['reset']) == len(exception_reset_list):
 | |
|             for i, exception in enumerate(exception_reset_list):
 | |
|                 if not reset_reason_matches(exception, one_case['reset'][i]):
 | |
|                     break
 | |
|             else:
 | |
|                 result = True
 | |
|         if not result:
 | |
|             err_msg = 'Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}'.format(one_case['reset'],
 | |
|                                                                                    exception_reset_list)
 | |
|             Utility.console_log(err_msg, color='orange')
 | |
|             junit_test_case.add_failure_info(err_msg)
 | |
|         one_case_finish(result)
 | |
| 
 | |
|     while not test_finish:
 | |
|         try:
 | |
|             timeout_value = one_case['timeout']
 | |
|             dut.expect_any((RESET_PATTERN, handle_exception_reset),
 | |
|                            (EXCEPTION_PATTERN, handle_exception_reset),
 | |
|                            (ABORT_PATTERN, handle_exception_reset),
 | |
|                            (FINISH_PATTERN, handle_test_finish),
 | |
|                            (UT_APP_BOOT_UP_DONE, handle_reset_finish),
 | |
|                            timeout=timeout_value)
 | |
|         except DUT.ExpectTimeout:
 | |
|             Utility.console_log('Timeout in expect (%s seconds)' % timeout_value, color='orange')
 | |
|             junit_test_case.add_failure_info('timeout')
 | |
|             one_case_finish(False)
 | |
|             break
 | |
| 
 | |
| 
 | |
| @ttfw_idf.idf_unit_test(env_tag='UT_T1_1', junit_report_by_case=True)
 | |
| def run_unit_test_cases(env, extra_data):
 | |
|     """
 | |
|     extra_data can be three types of value
 | |
|     1. as string:
 | |
|                1. "case_name"
 | |
|                2. "case_name [reset=RESET_REASON]"
 | |
|     2. as dict:
 | |
|                1. with key like {"name": "Intr_alloc test, shared ints"}
 | |
|                2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
 | |
|     3. as list of string or dict:
 | |
|                [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
 | |
| 
 | |
|     :param env: test env instance
 | |
|     :param extra_data: the case name or case list or case dictionary
 | |
|     :return: None
 | |
|     """
 | |
| 
 | |
|     case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
 | |
| 
 | |
|     # we don't want stop on failed case (unless some special scenarios we can't handle)
 | |
|     # this flag is used to log if any of the case failed during executing
 | |
|     # Before exit test function this flag is used to log if the case fails
 | |
|     failed_cases = []
 | |
| 
 | |
|     for ut_config in case_config:
 | |
|         Utility.console_log('Running unit test for config: ' + ut_config, 'O')
 | |
|         dut = env.get_dut('unit-test-app', app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
 | |
|         if len(case_config[ut_config]) > 0:
 | |
|             replace_app_bin(dut, 'unit-test-app', case_config[ut_config][0].get('app_bin'))
 | |
|         dut.start_app()
 | |
|         Utility.console_log('Download finished, start running test cases', 'O')
 | |
| 
 | |
|         for one_case in case_config[ut_config]:
 | |
|             log_test_case('test case', one_case, ut_config)
 | |
|             performance_items = []
 | |
|             # create junit report test case
 | |
|             junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
 | |
|             try:
 | |
|                 run_one_normal_case(dut, one_case, junit_test_case)
 | |
|                 performance_items = dut.get_performance_items()
 | |
|             except TestCaseFailed:
 | |
|                 failed_cases.append(format_case_name(one_case))
 | |
|             except Exception as e:
 | |
|                 handle_unexpected_exception(junit_test_case, e)
 | |
|                 failed_cases.append(format_case_name(one_case))
 | |
|             finally:
 | |
|                 TinyFW.JunitReport.update_performance(performance_items)
 | |
|                 TinyFW.JunitReport.test_case_finish(junit_test_case)
 | |
|         # close DUT when finish running all cases for one config
 | |
|         env.close_dut(dut.name)
 | |
| 
 | |
| 
 | |
| class Handler(threading.Thread):
 | |
|     WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!')
 | |
|     SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!')
 | |
|     FINISH_PATTERN = re.compile(r'1 Tests (\d) Failures (\d) Ignored')
 | |
| 
 | |
|     def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
 | |
|         self.dut = dut
 | |
|         self.sent_signal_list = sent_signal_list
 | |
|         self.lock = lock
 | |
|         self.parent_case_name = parent_case_name
 | |
|         self.child_case_name = ''
 | |
|         self.child_case_index = child_case_index + 1
 | |
|         self.finish = False
 | |
|         self.result = False
 | |
|         self.output = ''
 | |
|         self.fail_name = None
 | |
|         self.timeout = timeout
 | |
|         self.force_stop = threading.Event()  # it show the running status
 | |
| 
 | |
|         reset_dut(self.dut)  # reset the board to make it start from begining
 | |
| 
 | |
|         threading.Thread.__init__(self, name='{} Handler'.format(dut))
 | |
| 
 | |
|     def run(self):
 | |
| 
 | |
|         self.dut.start_capture_raw_data()
 | |
| 
 | |
|         def get_child_case_name(data):
 | |
|             self.child_case_name = data[0]
 | |
|             time.sleep(1)
 | |
|             self.dut.write(str(self.child_case_index))
 | |
| 
 | |
|         def one_device_case_finish(result):
 | |
|             """ one test finished, let expect loop break and log result """
 | |
|             self.finish = True
 | |
|             self.result = result
 | |
|             self.output = '[{}]\n\n{}\n'.format(self.child_case_name,
 | |
|                                                 self.dut.stop_capture_raw_data())
 | |
|             if not result:
 | |
|                 self.fail_name = self.child_case_name
 | |
| 
 | |
|         def device_wait_action(data):
 | |
|             start_time = time.time()
 | |
|             expected_signal = data[0].encode('utf-8')
 | |
|             while 1:
 | |
|                 if time.time() > start_time + self.timeout:
 | |
|                     Utility.console_log('Timeout in device for function: %s' % self.child_case_name, color='orange')
 | |
|                     break
 | |
|                 with self.lock:
 | |
|                     for sent_signal in self.sent_signal_list:
 | |
|                         if expected_signal == sent_signal['name']:
 | |
|                             self.dut.write(sent_signal['parameter'])
 | |
|                             self.sent_signal_list.remove(sent_signal)
 | |
|                             break
 | |
|                     else:
 | |
|                         time.sleep(0.01)
 | |
|                         continue
 | |
|                     break
 | |
| 
 | |
|         def device_send_action(data):
 | |
|             with self.lock:
 | |
|                 self.sent_signal_list.append({
 | |
|                     'name': data[0].encode('utf-8'),
 | |
|                     'parameter': '' if data[2] is None else data[2].encode('utf-8')
 | |
|                     # no parameter means we only write EOL to DUT
 | |
|                 })
 | |
| 
 | |
|         def handle_device_test_finish(data):
 | |
|             """ test finished without reset """
 | |
|             # in this scenario reset should not happen
 | |
|             if int(data[1]):
 | |
|                 # case ignored
 | |
|                 Utility.console_log('Ignored: ' + self.child_case_name, color='orange')
 | |
|             one_device_case_finish(not int(data[0]))
 | |
| 
 | |
|         try:
 | |
|             time.sleep(1)
 | |
|             self.dut.write("\"{}\"".format(self.parent_case_name))
 | |
|             self.dut.expect('Running ' + self.parent_case_name + '...')
 | |
|         except DUT.ExpectTimeout:
 | |
|             Utility.console_log('No case detected!', color='orange')
 | |
|         while not self.finish and not self.force_stop.isSet():
 | |
|             try:
 | |
|                 self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'),  # noqa: W605 - regex
 | |
|                                      get_child_case_name),
 | |
|                                     (self.WAIT_SIGNAL_PATTERN, device_wait_action),  # wait signal pattern
 | |
|                                     (self.SEND_SIGNAL_PATTERN, device_send_action),  # send signal pattern
 | |
|                                     (self.FINISH_PATTERN, handle_device_test_finish),  # test finish pattern
 | |
|                                     timeout=self.timeout)
 | |
|             except DUT.ExpectTimeout:
 | |
|                 Utility.console_log('Timeout in expect (%s seconds)' % self.timeout, color='orange')
 | |
|                 one_device_case_finish(False)
 | |
|                 break
 | |
| 
 | |
|     def stop(self):
 | |
|         self.force_stop.set()
 | |
| 
 | |
| 
 | |
| def get_case_info(one_case):
 | |
|     parent_case = one_case['name']
 | |
|     child_case_num = one_case['child case num']
 | |
|     return parent_case, child_case_num
 | |
| 
 | |
| 
 | |
| def get_dut(duts, env, name, ut_config, app_bin=None):
 | |
|     if name in duts:
 | |
|         dut = duts[name]
 | |
|     else:
 | |
|         dut = env.get_dut(name, app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
 | |
|         duts[name] = dut
 | |
|         replace_app_bin(dut, 'unit-test-app', app_bin)
 | |
|         dut.start_app()  # download bin to board
 | |
|     return dut
 | |
| 
 | |
| 
 | |
| def run_one_multiple_devices_case(duts, ut_config, env, one_case, app_bin, junit_test_case):
 | |
|     lock = threading.RLock()
 | |
|     threads = []
 | |
|     send_signal_list = []
 | |
|     result = True
 | |
|     parent_case, case_num = get_case_info(one_case)
 | |
| 
 | |
|     for i in range(case_num):
 | |
|         dut = get_dut(duts, env, 'dut%d' % i, ut_config, app_bin)
 | |
|         threads.append(Handler(dut, send_signal_list, lock,
 | |
|                                parent_case, i, one_case['timeout']))
 | |
|     for thread in threads:
 | |
|         thread.setDaemon(True)
 | |
|         thread.start()
 | |
|     output = 'Multiple Device Failed\n'
 | |
|     for thread in threads:
 | |
|         thread.join()
 | |
|         result = result and thread.result
 | |
|         output += thread.output
 | |
|         if not thread.result:
 | |
|             [thd.stop() for thd in threads]
 | |
| 
 | |
|     if not result:
 | |
|         junit_test_case.add_failure_info(output)
 | |
| 
 | |
|     # collect performances from DUTs
 | |
|     performance_items = []
 | |
|     for dut_name in duts:
 | |
|         performance_items.extend(duts[dut_name].get_performance_items())
 | |
|     TinyFW.JunitReport.update_performance(performance_items)
 | |
| 
 | |
|     return result
 | |
| 
 | |
| 
 | |
| @ttfw_idf.idf_unit_test(env_tag='UT_T2_1', junit_report_by_case=True)
 | |
| def run_multiple_devices_cases(env, extra_data):
 | |
|     """
 | |
|      extra_data can be two types of value
 | |
|      1. as dict:
 | |
|             e.g.
 | |
|                 {"name":  "gpio master/slave test example",
 | |
|                 "child case num": 2,
 | |
|                 "config": "release",
 | |
|                 "env_tag": "UT_T2_1"}
 | |
|      2. as list dict:
 | |
|             e.g.
 | |
|                [{"name":  "gpio master/slave test example1",
 | |
|                 "child case num": 2,
 | |
|                 "config": "release",
 | |
|                 "env_tag": "UT_T2_1"},
 | |
|                {"name":  "gpio master/slave test example2",
 | |
|                 "child case num": 2,
 | |
|                 "config": "release",
 | |
|                 "env_tag": "UT_T2_1"}]
 | |
| 
 | |
|     """
 | |
|     failed_cases = []
 | |
|     case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
 | |
|     duts = {}
 | |
|     for ut_config in case_config:
 | |
|         Utility.console_log('Running unit test for config: ' + ut_config, 'O')
 | |
|         for one_case in case_config[ut_config]:
 | |
|             log_test_case('multi-device test', one_case, ut_config, )
 | |
|             result = False
 | |
|             junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
 | |
|             try:
 | |
|                 result = run_one_multiple_devices_case(duts, ut_config, env, one_case,
 | |
|                                                        one_case.get('app_bin'), junit_test_case)
 | |
|             except TestCaseFailed:
 | |
|                 pass  # result is False, this is handled by the finally block
 | |
|             except Exception as e:
 | |
|                 handle_unexpected_exception(junit_test_case, e)
 | |
|             finally:
 | |
|                 if result:
 | |
|                     Utility.console_log('Success: ' + format_case_name(one_case), color='green')
 | |
|                 else:
 | |
|                     failed_cases.append(format_case_name(one_case))
 | |
|                     Utility.console_log('Failed: ' + format_case_name(one_case), color='red')
 | |
|                 TinyFW.JunitReport.test_case_finish(junit_test_case)
 | |
|         # close all DUTs when finish running all cases for one config
 | |
|         for dut in duts:
 | |
|             env.close_dut(dut)
 | |
|         duts = {}
 | |
| 
 | |
| 
 | |
| def run_one_multiple_stage_case(dut, one_case, junit_test_case):
 | |
|     reset_dut(dut)
 | |
| 
 | |
|     dut.start_capture_raw_data()
 | |
| 
 | |
|     exception_reset_list = []
 | |
| 
 | |
|     for test_stage in range(one_case['child case num']):
 | |
|         # select multi stage test case name
 | |
|         dut.write("\"{}\"".format(one_case['name']))
 | |
|         dut.expect('Running ' + one_case['name'] + '...')
 | |
|         # select test function for current stage
 | |
|         dut.write(str(test_stage + 1))
 | |
| 
 | |
|         # we want to set this flag in callbacks (inner functions)
 | |
|         # use list here so we can use append to set this flag
 | |
|         stage_finish = list()
 | |
| 
 | |
|         def last_stage():
 | |
|             return test_stage == one_case['child case num'] - 1
 | |
| 
 | |
|         def check_reset():
 | |
|             if one_case['reset']:
 | |
|                 assert exception_reset_list  # reboot but no exception/reset logged. should never happen
 | |
|                 result = False
 | |
|                 if len(one_case['reset']) == len(exception_reset_list):
 | |
|                     for i, exception in enumerate(exception_reset_list):
 | |
|                         if not reset_reason_matches(exception, one_case['reset'][i]):
 | |
|                             break
 | |
|                     else:
 | |
|                         result = True
 | |
|                 if not result:
 | |
|                     err_msg = 'Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}'.format(one_case['reset'],
 | |
|                                                                                            exception_reset_list)
 | |
|                     Utility.console_log(err_msg, color='orange')
 | |
|                     junit_test_case.add_failure_info(err_msg)
 | |
|             else:
 | |
|                 # we allow omit reset in multi stage cases
 | |
|                 result = True
 | |
|             return result
 | |
| 
 | |
|         # expect callbacks
 | |
|         def one_case_finish(result):
 | |
|             """ one test finished, let expect loop break and log result """
 | |
|             # handle test finish
 | |
|             result = result and check_reset()
 | |
|             output = dut.stop_capture_raw_data()
 | |
|             if result:
 | |
|                 Utility.console_log('Success: ' + format_case_name(one_case), color='green')
 | |
|             else:
 | |
|                 Utility.console_log('Failed: ' + format_case_name(one_case), color='red')
 | |
|                 junit_test_case.add_failure_info(output)
 | |
|                 raise TestCaseFailed(format_case_name(one_case))
 | |
|             stage_finish.append('break')
 | |
| 
 | |
|         def handle_exception_reset(data):
 | |
|             """
 | |
|             just append data to exception list.
 | |
|             exception list will be checked in ``handle_reset_finish``, once reset finished.
 | |
|             """
 | |
|             exception_reset_list.append(data[0])
 | |
| 
 | |
|         def handle_test_finish(data):
 | |
|             """ test finished without reset """
 | |
|             # in this scenario reset should not happen
 | |
|             if int(data[1]):
 | |
|                 # case ignored
 | |
|                 Utility.console_log('Ignored: ' + format_case_name(one_case), color='orange')
 | |
|                 junit_test_case.add_skipped_info('ignored')
 | |
|             # only passed in last stage will be regarded as real pass
 | |
|             if last_stage():
 | |
|                 one_case_finish(not int(data[0]))
 | |
|             else:
 | |
|                 Utility.console_log('test finished before enter last stage', color='orange')
 | |
|                 one_case_finish(False)
 | |
| 
 | |
|         def handle_next_stage(data):
 | |
|             """ reboot finished. we goto next stage """
 | |
|             if last_stage():
 | |
|                 # already last stage, should never goto next stage
 | |
|                 Utility.console_log("didn't finish at last stage", color='orange')
 | |
|                 one_case_finish(False)
 | |
|             else:
 | |
|                 stage_finish.append('continue')
 | |
| 
 | |
|         while not stage_finish:
 | |
|             try:
 | |
|                 timeout_value = one_case['timeout']
 | |
|                 dut.expect_any((RESET_PATTERN, handle_exception_reset),
 | |
|                                (EXCEPTION_PATTERN, handle_exception_reset),
 | |
|                                (ABORT_PATTERN, handle_exception_reset),
 | |
|                                (FINISH_PATTERN, handle_test_finish),
 | |
|                                (UT_APP_BOOT_UP_DONE, handle_next_stage),
 | |
|                                timeout=timeout_value)
 | |
|             except DUT.ExpectTimeout:
 | |
|                 Utility.console_log('Timeout in expect (%s seconds)' % timeout_value, color='orange')
 | |
|                 one_case_finish(False)
 | |
|                 break
 | |
|         if stage_finish[0] == 'break':
 | |
|             # test breaks on current stage
 | |
|             break
 | |
| 
 | |
| 
 | |
| @ttfw_idf.idf_unit_test(env_tag='UT_T1_1', junit_report_by_case=True)
 | |
| def run_multiple_stage_cases(env, extra_data):
 | |
|     """
 | |
|     extra_data can be 2 types of value
 | |
|     1. as dict: Mandatory keys: "name" and "child case num", optional keys: "reset" and others
 | |
|     3. as list of string or dict:
 | |
|                [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
 | |
| 
 | |
|     :param env: test env instance
 | |
|     :param extra_data: the case name or case list or case dictionary
 | |
|     :return: None
 | |
|     """
 | |
| 
 | |
|     case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
 | |
| 
 | |
|     # we don't want stop on failed case (unless some special scenarios we can't handle)
 | |
|     # this flag is used to log if any of the case failed during executing
 | |
|     # Before exit test function this flag is used to log if the case fails
 | |
|     failed_cases = []
 | |
| 
 | |
|     for ut_config in case_config:
 | |
|         Utility.console_log('Running unit test for config: ' + ut_config, 'O')
 | |
|         dut = env.get_dut('unit-test-app', app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
 | |
|         if len(case_config[ut_config]) > 0:
 | |
|             replace_app_bin(dut, 'unit-test-app', case_config[ut_config][0].get('app_bin'))
 | |
|         dut.start_app()
 | |
| 
 | |
|         for one_case in case_config[ut_config]:
 | |
|             log_test_case('multi-stage test', one_case, ut_config)
 | |
|             performance_items = []
 | |
|             junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
 | |
|             try:
 | |
|                 run_one_multiple_stage_case(dut, one_case, junit_test_case)
 | |
|                 performance_items = dut.get_performance_items()
 | |
|             except TestCaseFailed:
 | |
|                 failed_cases.append(format_case_name(one_case))
 | |
|             except Exception as e:
 | |
|                 handle_unexpected_exception(junit_test_case, e)
 | |
|                 failed_cases.append(format_case_name(one_case))
 | |
|             finally:
 | |
|                 TinyFW.JunitReport.update_performance(performance_items)
 | |
|                 TinyFW.JunitReport.test_case_finish(junit_test_case)
 | |
|         # close DUT when finish running all cases for one config
 | |
|         env.close_dut(dut.name)
 | |
| 
 | |
| 
 | |
| def detect_update_unit_test_info(env, extra_data, app_bin):
 | |
|     case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
 | |
| 
 | |
|     for ut_config in case_config:
 | |
|         dut = env.get_dut('unit-test-app', app_path=UT_APP_PATH, app_config_name=ut_config)
 | |
|         replace_app_bin(dut, 'unit-test-app', app_bin)
 | |
|         dut.start_app()
 | |
| 
 | |
|         reset_dut(dut)
 | |
| 
 | |
|         # get the list of test cases
 | |
|         dut.write('')
 | |
|         dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
 | |
| 
 | |
|         def find_update_dic(name, _t, _timeout, child_case_num=None):
 | |
|             for _case_data in extra_data:
 | |
|                 if _case_data['name'] == name:
 | |
|                     _case_data['type'] = _t
 | |
|                     if 'timeout' not in _case_data:
 | |
|                         _case_data['timeout'] = _timeout
 | |
|                     if child_case_num:
 | |
|                         _case_data['child case num'] = child_case_num
 | |
| 
 | |
|         try:
 | |
|             while True:
 | |
|                 data = dut.expect(TEST_PATTERN, timeout=DEFAULT_TIMEOUT)
 | |
|                 test_case_name = data[1]
 | |
|                 m = re.search(r'\[timeout=(\d+)\]', data[2])
 | |
|                 if m:
 | |
|                     timeout = int(m.group(1))
 | |
|                 else:
 | |
|                     timeout = 30
 | |
|                 m = re.search(r'\[multi_stage\]', data[2])
 | |
|                 if m:
 | |
|                     test_case_type = MULTI_STAGE_ID
 | |
|                 else:
 | |
|                     m = re.search(r'\[multi_device\]', data[2])
 | |
|                     if m:
 | |
|                         test_case_type = MULTI_DEVICE_ID
 | |
|                     else:
 | |
|                         test_case_type = SIMPLE_TEST_ID
 | |
|                         find_update_dic(test_case_name, test_case_type, timeout)
 | |
|                         if data[3] and re.search(END_LIST_STR, data[3]):
 | |
|                             break
 | |
|                         continue
 | |
|                 # find the last submenu item
 | |
|                 data = dut.expect(TEST_SUBMENU_PATTERN, timeout=DEFAULT_TIMEOUT)
 | |
|                 find_update_dic(test_case_name, test_case_type, timeout, child_case_num=int(data[0]))
 | |
|                 if data[1] and re.search(END_LIST_STR, data[1]):
 | |
|                     break
 | |
|             # check if the unit test case names are correct, i.e. they could be found in the device
 | |
|             for _dic in extra_data:
 | |
|                 if 'type' not in _dic:
 | |
|                     raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
 | |
|         except DUT.ExpectTimeout:
 | |
|             Utility.console_log('Timeout during getting the test list', color='red')
 | |
|         finally:
 | |
|             dut.close()
 | |
| 
 | |
|         # These options are the same for all configs, therefore there is no need to continue
 | |
|         break
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     parser = argparse.ArgumentParser()
 | |
|     parser.add_argument(
 | |
|         '--repeat', '-r',
 | |
|         help='Number of repetitions for the test(s). Default is 1.',
 | |
|         type=int,
 | |
|         default=1
 | |
|     )
 | |
|     parser.add_argument('--env_config_file', '-e',
 | |
|                         help='test env config file',
 | |
|                         default=None)
 | |
|     parser.add_argument('--app_bin', '-b',
 | |
|                         help='application binary file for flashing the chip',
 | |
|                         default=None)
 | |
|     parser.add_argument('test',
 | |
|                         help='Comma separated list of <option>:<argument> where option can be "name" (default), '
 | |
|                              '"child case num", "config", "timeout".',
 | |
|                         nargs='+')
 | |
|     args = parser.parse_args()
 | |
|     list_of_dicts = []
 | |
|     for test in args.test:
 | |
|         test_args = test.split(r',')
 | |
|         test_dict = dict()
 | |
|         for test_item in test_args:
 | |
|             if len(test_item) == 0:
 | |
|                 continue
 | |
|             pair = test_item.split(r':', 1)
 | |
|             if len(pair) == 1 or pair[0] == 'name':
 | |
|                 test_dict['name'] = pair[0]
 | |
|             elif len(pair) == 2:
 | |
|                 if pair[0] == 'timeout' or pair[0] == 'child case num':
 | |
|                     test_dict[pair[0]] = int(pair[1])
 | |
|                 else:
 | |
|                     test_dict[pair[0]] = pair[1]
 | |
|             else:
 | |
|                 raise ValueError('Error in argument item {} of {}'.format(test_item, test))
 | |
|         test_dict['app_bin'] = args.app_bin
 | |
|         list_of_dicts.append(test_dict)
 | |
| 
 | |
|     TinyFW.set_default_config(env_config_file=args.env_config_file)
 | |
| 
 | |
|     env_config = TinyFW.get_default_config()
 | |
|     env_config['app'] = ttfw_idf.UT
 | |
|     env_config['dut'] = ttfw_idf.IDFDUT
 | |
|     env_config['test_suite_name'] = 'unit_test_parsing'
 | |
|     test_env = Env.Env(**env_config)
 | |
|     detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
 | |
| 
 | |
|     for index in range(1, args.repeat + 1):
 | |
|         if args.repeat > 1:
 | |
|             Utility.console_log('Repetition {}'.format(index), color='green')
 | |
|         for dic in list_of_dicts:
 | |
|             t = dic.get('type', SIMPLE_TEST_ID)
 | |
|             if t == SIMPLE_TEST_ID:
 | |
|                 run_unit_test_cases(extra_data=dic)
 | |
|             elif t == MULTI_STAGE_ID:
 | |
|                 run_multiple_stage_cases(extra_data=dic)
 | |
|             elif t == MULTI_DEVICE_ID:
 | |
|                 run_multiple_devices_cases(extra_data=dic)
 | |
|             else:
 | |
|                 raise ValueError('Unknown type {} of {}'.format(t, dic.get('name')))
 | 
