mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-09 20:41:14 +00:00
CI: download only required bin for unit-tests. Refactor AssignTest related code
This commit is contained in:
317
tools/ci/python_packages/ttfw_idf/IDFAssignTest.py
Normal file
317
tools/ci/python_packages/ttfw_idf/IDFAssignTest.py
Normal file
@@ -0,0 +1,317 @@
|
||||
"""
|
||||
Command line tool to assign tests to CI test jobs.
|
||||
"""
|
||||
import argparse
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
import yaml
|
||||
|
||||
try:
|
||||
from yaml import CLoader as Loader
|
||||
except ImportError:
|
||||
from yaml import Loader as Loader
|
||||
|
||||
import gitlab_api
|
||||
from tiny_test_fw.Utility import CIAssignTest
|
||||
|
||||
IDF_PATH_FROM_ENV = os.getenv("IDF_PATH")
|
||||
|
||||
|
||||
class IDFCaseGroup(CIAssignTest.Group):
|
||||
LOCAL_BUILD_DIR = None
|
||||
BUILD_JOB_NAMES = None
|
||||
|
||||
@classmethod
|
||||
def get_artifact_index_file(cls):
|
||||
assert cls.LOCAL_BUILD_DIR
|
||||
if IDF_PATH_FROM_ENV:
|
||||
artifact_index_file = os.path.join(IDF_PATH_FROM_ENV, cls.LOCAL_BUILD_DIR, "artifact_index.json")
|
||||
else:
|
||||
artifact_index_file = "artifact_index.json"
|
||||
return artifact_index_file
|
||||
|
||||
|
||||
class IDFAssignTest(CIAssignTest.AssignTest):
|
||||
def format_build_log_path(self, parallel_num):
|
||||
return "{}/list_job_{}.json".format(self.case_group.LOCAL_BUILD_DIR, parallel_num)
|
||||
|
||||
def create_artifact_index_file(self, project_id=None, pipeline_id=None):
|
||||
if project_id is None:
|
||||
project_id = os.getenv("CI_PROJECT_ID")
|
||||
if pipeline_id is None:
|
||||
pipeline_id = os.getenv("CI_PIPELINE_ID")
|
||||
gitlab_inst = gitlab_api.Gitlab(project_id)
|
||||
|
||||
artifact_index_list = []
|
||||
for build_job_name in self.case_group.BUILD_JOB_NAMES:
|
||||
job_info_list = gitlab_inst.find_job_id(build_job_name, pipeline_id=pipeline_id)
|
||||
for job_info in job_info_list:
|
||||
parallel_num = job_info["parallel_num"] or 1 # Could be None if "parallel_num" not defined for the job
|
||||
raw_data = gitlab_inst.download_artifact(job_info["id"],
|
||||
[self.format_build_log_path(parallel_num)])[0]
|
||||
build_info_list = [json.loads(line) for line in raw_data.decode().splitlines()]
|
||||
for build_info in build_info_list:
|
||||
build_info["ci_job_id"] = job_info["id"]
|
||||
artifact_index_list.append(build_info)
|
||||
artifact_index_file = self.case_group.get_artifact_index_file()
|
||||
try:
|
||||
os.makedirs(os.path.dirname(artifact_index_file))
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise e
|
||||
|
||||
with open(artifact_index_file, "w") as f:
|
||||
json.dump(artifact_index_list, f)
|
||||
|
||||
|
||||
SUPPORTED_TARGETS = [
|
||||
'esp32',
|
||||
'esp32s2',
|
||||
]
|
||||
|
||||
|
||||
class ExampleGroup(IDFCaseGroup):
|
||||
SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "target"]
|
||||
|
||||
LOCAL_BUILD_DIR = "build_examples"
|
||||
BUILD_JOB_NAMES = ["build_examples_cmake_{}".format(target) for target in SUPPORTED_TARGETS]
|
||||
|
||||
|
||||
class TestAppsGroup(ExampleGroup):
|
||||
LOCAL_BUILD_DIR = "build_test_apps"
|
||||
BUILD_JOB_NAMES = ["build_test_apps_{}".format(target) for target in SUPPORTED_TARGETS]
|
||||
|
||||
|
||||
class UnitTestGroup(IDFCaseGroup):
|
||||
SORT_KEYS = ["test environment", "tags", "chip_target"]
|
||||
CI_JOB_MATCH_KEYS = ["test environment"]
|
||||
|
||||
LOCAL_BUILD_DIR = "tools/unit-test-app/builds"
|
||||
BUILD_JOB_NAMES = ["build_esp_idf_tests_cmake_{}".format(target) for target in SUPPORTED_TARGETS]
|
||||
|
||||
MAX_CASE = 50
|
||||
ATTR_CONVERT_TABLE = {
|
||||
"execution_time": "execution time"
|
||||
}
|
||||
DUT_CLS_NAME = {
|
||||
"esp32": "ESP32DUT",
|
||||
"esp32s2": "ESP32S2DUT",
|
||||
"esp8266": "ESP8266DUT",
|
||||
}
|
||||
|
||||
def __init__(self, case):
|
||||
super(UnitTestGroup, self).__init__(case)
|
||||
for tag in self._get_case_attr(case, "tags"):
|
||||
self.ci_job_match_keys.add(tag)
|
||||
|
||||
@staticmethod
|
||||
def _get_case_attr(case, attr):
|
||||
if attr in UnitTestGroup.ATTR_CONVERT_TABLE:
|
||||
attr = UnitTestGroup.ATTR_CONVERT_TABLE[attr]
|
||||
return case[attr]
|
||||
|
||||
def add_extra_case(self, case):
|
||||
""" If current group contains all tags required by case, then add succeed """
|
||||
added = False
|
||||
if self.accept_new_case():
|
||||
for key in self.filters:
|
||||
if self._get_case_attr(case, key) != self.filters[key]:
|
||||
if key == "tags":
|
||||
if set(self._get_case_attr(case, key)).issubset(set(self.filters[key])):
|
||||
continue
|
||||
break
|
||||
else:
|
||||
self.case_list.append(case)
|
||||
added = True
|
||||
return added
|
||||
|
||||
def _create_extra_data(self, test_cases, test_function):
|
||||
"""
|
||||
For unit test case, we need to copy some attributes of test cases into config file.
|
||||
So unit test function knows how to run the case.
|
||||
"""
|
||||
case_data = []
|
||||
for case in test_cases:
|
||||
one_case_data = {
|
||||
"config": self._get_case_attr(case, "config"),
|
||||
"name": self._get_case_attr(case, "summary"),
|
||||
"reset": self._get_case_attr(case, "reset"),
|
||||
"timeout": self._get_case_attr(case, "timeout"),
|
||||
}
|
||||
|
||||
if test_function in ["run_multiple_devices_cases", "run_multiple_stage_cases"]:
|
||||
try:
|
||||
one_case_data["child case num"] = self._get_case_attr(case, "child case num")
|
||||
except KeyError as e:
|
||||
print("multiple devices/stages cases must contains at least two test functions")
|
||||
print("case name: {}".format(one_case_data["name"]))
|
||||
raise e
|
||||
|
||||
case_data.append(one_case_data)
|
||||
return case_data
|
||||
|
||||
def _divide_case_by_test_function(self):
|
||||
"""
|
||||
divide cases of current test group by test function they need to use
|
||||
|
||||
:return: dict of list of cases for each test functions
|
||||
"""
|
||||
case_by_test_function = {
|
||||
"run_multiple_devices_cases": [],
|
||||
"run_multiple_stage_cases": [],
|
||||
"run_unit_test_cases": [],
|
||||
}
|
||||
|
||||
for case in self.case_list:
|
||||
if case["multi_device"] == "Yes":
|
||||
case_by_test_function["run_multiple_devices_cases"].append(case)
|
||||
elif case["multi_stage"] == "Yes":
|
||||
case_by_test_function["run_multiple_stage_cases"].append(case)
|
||||
else:
|
||||
case_by_test_function["run_unit_test_cases"].append(case)
|
||||
return case_by_test_function
|
||||
|
||||
def output(self):
|
||||
"""
|
||||
output data for job configs
|
||||
|
||||
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
|
||||
"""
|
||||
|
||||
target = self._get_case_attr(self.case_list[0], "chip_target")
|
||||
if target:
|
||||
overwrite = {
|
||||
"dut": {
|
||||
"package": "ttfw_idf",
|
||||
"class": self.DUT_CLS_NAME[target],
|
||||
}
|
||||
}
|
||||
else:
|
||||
overwrite = dict()
|
||||
|
||||
case_by_test_function = self._divide_case_by_test_function()
|
||||
|
||||
output_data = {
|
||||
# we don't need filter for test function, as UT uses a few test functions for all cases
|
||||
"CaseConfig": [
|
||||
{
|
||||
"name": test_function,
|
||||
"extra_data": self._create_extra_data(test_cases, test_function),
|
||||
"overwrite": overwrite,
|
||||
} for test_function, test_cases in case_by_test_function.items() if test_cases
|
||||
],
|
||||
}
|
||||
return output_data
|
||||
|
||||
|
||||
class ExampleAssignTest(IDFAssignTest):
|
||||
CI_TEST_JOB_PATTERN = re.compile(r'^example_test_.+')
|
||||
|
||||
def __init__(self, est_case_path, ci_config_file):
|
||||
super(ExampleAssignTest, self).__init__(est_case_path, ci_config_file, case_group=ExampleGroup)
|
||||
|
||||
|
||||
class TestAppsAssignTest(IDFAssignTest):
|
||||
CI_TEST_JOB_PATTERN = re.compile(r'^test_app_test_.+')
|
||||
|
||||
def __init__(self, est_case_path, ci_config_file):
|
||||
super(TestAppsAssignTest, self).__init__(est_case_path, ci_config_file, case_group=TestAppsGroup)
|
||||
|
||||
|
||||
class UnitTestAssignTest(IDFAssignTest):
|
||||
CI_TEST_JOB_PATTERN = re.compile(r'^UT_.+')
|
||||
|
||||
def __init__(self, est_case_path, ci_config_file):
|
||||
super(UnitTestAssignTest, self).__init__(est_case_path, ci_config_file, case_group=UnitTestGroup)
|
||||
|
||||
def search_cases(self, case_filter=None):
|
||||
"""
|
||||
For unit test case, we don't search for test functions.
|
||||
The unit test cases is stored in a yaml file which is created in job build-idf-test.
|
||||
"""
|
||||
|
||||
def find_by_suffix(suffix, path):
|
||||
res = []
|
||||
for root, _, files in os.walk(path):
|
||||
for file in files:
|
||||
if file.endswith(suffix):
|
||||
res.append(os.path.join(root, file))
|
||||
return res
|
||||
|
||||
def get_test_cases_from_yml(yml_file):
|
||||
try:
|
||||
with open(yml_file) as fr:
|
||||
raw_data = yaml.load(fr, Loader=Loader)
|
||||
test_cases = raw_data['test cases']
|
||||
except (IOError, KeyError):
|
||||
return []
|
||||
else:
|
||||
return test_cases
|
||||
|
||||
test_cases = []
|
||||
if os.path.isdir(self.test_case_path):
|
||||
for yml_file in find_by_suffix('.yml', self.test_case_path):
|
||||
test_cases.extend(get_test_cases_from_yml(yml_file))
|
||||
elif os.path.isfile(self.test_case_path):
|
||||
test_cases.extend(get_test_cases_from_yml(self.test_case_path))
|
||||
else:
|
||||
print("Test case path is invalid. Should only happen when use @bot to skip unit test.")
|
||||
|
||||
# filter keys are lower case. Do map lower case keys with original keys.
|
||||
try:
|
||||
key_mapping = {x.lower(): x for x in test_cases[0].keys()}
|
||||
except IndexError:
|
||||
key_mapping = dict()
|
||||
if case_filter:
|
||||
for key in case_filter:
|
||||
filtered_cases = []
|
||||
for case in test_cases:
|
||||
try:
|
||||
mapped_key = key_mapping[key]
|
||||
# bot converts string to lower case
|
||||
if isinstance(case[mapped_key], str):
|
||||
_value = case[mapped_key].lower()
|
||||
else:
|
||||
_value = case[mapped_key]
|
||||
if _value in case_filter[key]:
|
||||
filtered_cases.append(case)
|
||||
except KeyError:
|
||||
# case don't have this key, regard as filter success
|
||||
filtered_cases.append(case)
|
||||
test_cases = filtered_cases
|
||||
# sort cases with configs and test functions
|
||||
# in later stage cases with similar attributes are more likely to be assigned to the same job
|
||||
# it will reduce the count of flash DUT operations
|
||||
test_cases.sort(key=lambda x: x["config"] + x["multi_stage"] + x["multi_device"])
|
||||
return test_cases
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("case_group", choices=["example_test", "custom_test", "unit_test"])
|
||||
parser.add_argument("test_case", help="test case folder or file")
|
||||
parser.add_argument("ci_config_file", help="gitlab ci config file")
|
||||
parser.add_argument("output_path", help="output path of config files")
|
||||
parser.add_argument("--pipeline_id", "-p", type=int, default=None, help="pipeline_id")
|
||||
parser.add_argument("--test-case-file-pattern", help="file name pattern used to find Python test case files")
|
||||
args = parser.parse_args()
|
||||
|
||||
args_list = [args.test_case, args.ci_config_file]
|
||||
if args.case_group == 'example_test':
|
||||
assigner = ExampleAssignTest(*args_list)
|
||||
elif args.case_group == 'custom_test':
|
||||
assigner = TestAppsAssignTest(*args_list)
|
||||
elif args.case_group == 'unit_test':
|
||||
assigner = UnitTestAssignTest(*args_list)
|
||||
else:
|
||||
raise SystemExit(1) # which is impossible
|
||||
|
||||
if args.test_case_file_pattern:
|
||||
assigner.CI_TEST_JOB_PATTERN = re.compile(r'{}'.format(args.test_case_file_pattern))
|
||||
|
||||
assigner.assign_cases()
|
||||
assigner.output_configs(args.output_path)
|
||||
assigner.create_artifact_index_file()
|
Reference in New Issue
Block a user