tiny-test-fw: move to tools/esp_python_packages:

make `tiny_test_fw` as a package and move to root path of idf python
packages
This commit is contained in:
He Yin Ling
2019-11-27 11:21:33 +08:00
parent f5e60524ac
commit d621d0e88e
29 changed files with 37 additions and 43 deletions

View File

@@ -0,0 +1,98 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
class for handling Test Apps. Currently it provides the following features:
1. get SDK path
2. get SDK tools
3. parse application info from its path. for example:
* provide download info
* provide partition table info
Test Apps should inherent from BaseApp class and overwrite the methods.
"""
import os
import sys
import time
# timestamp used for calculate log folder name
LOG_FOLDER_TIMESTAMP = time.time()
class BaseApp(object):
"""
Base Class for App.
Defines the mandatory methods that App need to implement.
Also implements some common methods.
:param app_path: the path for app.
:param config_name: app configuration to be tested
:param target: build target
"""
def __init__(self, app_path, config_name=None, target=None):
pass
@classmethod
def get_sdk_path(cls):
"""
get sdk path.
subclass must overwrite this method.
:return: abs sdk path
"""
pass
@classmethod
def get_tools(cls):
"""
get SDK related tools for applications
subclass must overwrite this method.
:return: tuple, abs path of each tool
"""
pass
@classmethod
def get_log_folder(cls, test_suite_name):
"""
By default log folder is ``${SDK_PATH}/TEST_LOGS/${test_suite_name}_${timestamp}``.
The log folder name is consist once start running, ensure all logs of will be put into the same folder.
:param test_suite_name: the test suite name, by default it's the base file name for main module
:return: the log folder path
"""
if not test_suite_name:
test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
sdk_path = cls.get_sdk_path()
log_folder = os.path.join(sdk_path, "TEST_LOGS",
test_suite_name +
time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
if not os.path.exists(log_folder):
os.makedirs(log_folder)
return log_folder
def process_app_info(self):
"""
parse built app info for DUTTool
subclass must overwrite this method.
:return: required info for specific DUTTool
"""
pass

View File

@@ -0,0 +1,787 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DUT provides 3 major groups of features:
* DUT port feature, provide basic open/close/read/write features
* DUT tools, provide extra methods to control the device, like download and start app
* DUT expect method, provide features for users to check DUT outputs
The current design of DUT have 3 classes for one DUT: BaseDUT, DUTPort, DUTTool.
* BaseDUT class:
* defines methods DUT port and DUT tool need to overwrite
* provide the expect methods and some other methods based on DUTPort
* DUTPort class:
* inherent from BaseDUT class
* implements the port features by overwriting port methods defined in BaseDUT
* DUTTool class:
* inherent from one of the DUTPort class
* implements the tools features by overwriting tool methods defined in BaseDUT
* could add some new methods provided by the tool
This module implements the BaseDUT class and one of the port class SerialDUT.
User should implement their DUTTool classes.
If they using different port then need to implement their DUTPort class as well.
"""
from __future__ import print_function
import time
import re
import threading
import copy
import functools
# python2 and python3 queue package name is different
try:
import Queue as _queue
except ImportError:
import queue as _queue
import serial
from serial.tools import list_ports
import Utility
class ExpectTimeout(ValueError):
""" timeout for expect method """
pass
class UnsupportedExpectItem(ValueError):
""" expect item not supported by the expect method """
pass
def _expect_lock(func):
@functools.wraps(func)
def handler(self, *args, **kwargs):
with self.expect_lock:
ret = func(self, *args, **kwargs)
return ret
return handler
def _decode_data(data):
""" for python3, if the data is bytes, then decode it to string """
if isinstance(data, bytes):
# convert bytes to string
try:
data = data.decode("utf-8", "ignore")
except UnicodeDecodeError:
data = data.decode("iso8859-1", )
return data
def _pattern_to_string(pattern):
try:
ret = "RegEx: " + pattern.pattern
except AttributeError:
ret = pattern
return ret
class _DataCache(_queue.Queue):
"""
Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
"""
def __init__(self, maxsize=0):
_queue.Queue.__init__(self, maxsize=maxsize)
self.data_cache = str()
def _move_from_queue_to_cache(self):
"""
move all of the available data in the queue to cache
:return: True if moved any item from queue to data cache, else False
"""
ret = False
while True:
try:
self.data_cache += _decode_data(self.get(0))
ret = True
except _queue.Empty:
break
return ret
def get_data(self, timeout=0.0):
"""
get a copy of data from cache.
:param timeout: timeout for waiting new queue item
:return: copy of data cache
"""
# make sure timeout is non-negative
if timeout < 0:
timeout = 0
ret = self._move_from_queue_to_cache()
if not ret:
# we only wait for new data if we can't provide a new data_cache
try:
data = self.get(timeout=timeout)
self.data_cache += _decode_data(data)
except _queue.Empty:
# don't do anything when on update for cache
pass
return copy.deepcopy(self.data_cache)
def flush(self, index=0xFFFFFFFF):
"""
flush data from cache.
:param index: if < 0 then don't do flush, otherwise flush data before index
:return: None
"""
# first add data in queue to cache
self.get_data()
if index > 0:
self.data_cache = self.data_cache[index:]
class _LogThread(threading.Thread, _queue.Queue):
"""
We found some SD card on Raspberry Pi could have very bad performance.
It could take seconds to save small amount of data.
If the DUT receives data and save it as log, then it stops receiving data until log is saved.
This could lead to expect timeout.
As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
Then data will be passed to ``expect`` as soon as received.
"""
def __init__(self):
threading.Thread.__init__(self, name="LogThread")
_queue.Queue.__init__(self, maxsize=0)
self.setDaemon(True)
self.flush_lock = threading.Lock()
def save_log(self, filename, data):
"""
:param filename: log file name
:param data: log data. Must be ``bytes``.
"""
self.put({"filename": filename, "data": data})
def flush_data(self):
with self.flush_lock:
data_cache = dict()
while True:
# move all data from queue to data cache
try:
log = self.get_nowait()
try:
data_cache[log["filename"]] += log["data"]
except KeyError:
data_cache[log["filename"]] = log["data"]
except _queue.Empty:
break
# flush data
for filename in data_cache:
with open(filename, "ab+") as f:
f.write(data_cache[filename])
def run(self):
while True:
time.sleep(1)
self.flush_data()
class RecvThread(threading.Thread):
CHECK_FUNCTIONS = []
""" DUT subclass can define a few check functions to process received data. """
def __init__(self, read, dut):
super(RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.dut = dut
self.data_cache = dut.data_cache
self.recorded_data = dut.recorded_data
self.record_data_lock = dut.record_data_lock
self._line_cache = str()
def _line_completion(self, data):
"""
Usually check functions requires to check for one complete line.
This method will do line completion for the first line, and strip incomplete last line.
"""
ret = self._line_cache
decoded_data = _decode_data(data)
# cache incomplete line to later process
lines = decoded_data.splitlines(True)
last_line = lines[-1]
if last_line[-1] != "\n":
if len(lines) == 1:
# only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
ret = str()
else:
# more than one line and not finished, replace line cache
self._line_cache = lines[-1]
ret += "".join(lines[:-1])
else:
# line finishes, flush cache
self._line_cache = str()
ret += decoded_data
return ret
def run(self):
while not self.exit_event.isSet():
raw_data = self.read(1000)
if raw_data:
with self.record_data_lock:
self.data_cache.put(raw_data)
for capture_id in self.recorded_data:
self.recorded_data[capture_id].put(raw_data)
# we need to do line completion before call check functions
comp_data = self._line_completion(raw_data)
for check_function in self.CHECK_FUNCTIONS:
check_function(self, comp_data)
def exit(self):
self.exit_event.set()
self.join()
class BaseDUT(object):
"""
:param name: application defined name for port
:param port: comport name, used to create DUT port
:param log_file: log file name
:param app: test app instance
:param kwargs: extra args for DUT to create ports
"""
DEFAULT_EXPECT_TIMEOUT = 10
MAX_EXPECT_FAILURES_TO_SAVED = 10
RECV_THREAD_CLS = RecvThread
TARGET = None
""" DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data.
For example, DUT can implement exception detect & analysis logic in receive thread subclass. """
LOG_THREAD = _LogThread()
LOG_THREAD.start()
def __init__(self, name, port, log_file, app, **kwargs):
self.expect_lock = threading.Lock()
self.name = name
self.port = port
self.log_file = log_file
self.app = app
self.data_cache = _DataCache()
# the main process of recorded data are done in receive thread
# but receive thread could be closed in DUT lifetime (tool methods)
# so we keep it in BaseDUT, as their life cycle are same
self.recorded_data = dict()
self.record_data_lock = threading.RLock()
self.receive_thread = None
self.expect_failures = []
self._port_open()
self.start_receive()
def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port))
def _save_expect_failure(self, pattern, data, start_time):
"""
Save expect failure. If the test fails, then it will print the expect failures.
In some cases, user will handle expect exceptions.
The expect failures could be false alarm, and test case might generate a lot of such failures.
Therefore, we don't print the failure immediately and limit the max size of failure list.
"""
self.expect_failures.insert(0, {"pattern": pattern, "data": data,
"start": start_time, "end": time.time()})
self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
def _save_dut_log(self, data):
"""
Save DUT log into file using another thread.
This is a workaround for some devices takes long time for file system operations.
See descriptions in ``_LogThread`` for details.
"""
self.LOG_THREAD.save_log(self.log_file, data)
# define for methods need to be overwritten by Port
@classmethod
def list_available_ports(cls):
"""
list all available ports.
subclass (port) must overwrite this method.
:return: list of available comports
"""
pass
def _port_open(self):
"""
open the port.
subclass (port) must overwrite this method.
:return: None
"""
pass
def _port_read(self, size=1):
"""
read form port. This method should not blocking for long time, otherwise receive thread can not exit.
subclass (port) must overwrite this method.
:param size: max size to read.
:return: read data.
"""
pass
def _port_write(self, data):
"""
write to port.
subclass (port) must overwrite this method.
:param data: data to write
:return: None
"""
pass
def _port_close(self):
"""
close port.
subclass (port) must overwrite this method.
:return: None
"""
pass
# methods that need to be overwritten by Tool
@classmethod
def confirm_dut(cls, port, **kwargs):
"""
confirm if it's a DUT, usually used by auto detecting DUT in by Env config.
subclass (tool) must overwrite this method.
:param port: comport
:return: tuple of result (bool), and target (str)
"""
pass
def start_app(self):
"""
usually after we got DUT, we need to do some extra works to let App start.
For example, we need to reset->download->reset to let IDF application start on DUT.
subclass (tool) must overwrite this method.
:return: None
"""
pass
# methods that features raw port methods
def start_receive(self):
"""
Start thread to receive data.
:return: None
"""
self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self)
self.receive_thread.start()
def stop_receive(self):
"""
stop the receiving thread for the port
:return: None
"""
if self.receive_thread:
self.receive_thread.exit()
self.LOG_THREAD.flush_data()
self.receive_thread = None
def close(self):
"""
permanently close the port
"""
self.stop_receive()
self._port_close()
@staticmethod
def u_to_bytearray(data):
"""
if data is not bytearray then it tries to convert it
:param data: data which needs to be checked and maybe transformed
"""
if isinstance(data, type(u'')):
try:
data = data.encode('utf-8')
except Exception as e:
print(u'Cannot encode {} of type {}'.format(data, type(data)))
raise e
return data
def write(self, data, eol="\r\n", flush=True):
"""
:param data: data
:param eol: end of line pattern.
:param flush: if need to flush received data cache before write data.
usually we need to flush data before write,
make sure processing outputs generated by wrote.
:return: None
"""
# do flush before write
if flush:
self.data_cache.flush()
# do write if cache
if data is not None:
self._port_write(self.u_to_bytearray(data) + self.u_to_bytearray(eol) if eol else self.u_to_bytearray(data))
@_expect_lock
def read(self, size=0xFFFFFFFF):
"""
read(size=0xFFFFFFFF)
read raw data. NOT suggested to use this method.
Only use it if expect method doesn't meet your requirement.
:param size: read size. default read all data
:return: read data
"""
data = self.data_cache.get_data(0)[:size]
self.data_cache.flush(size)
return data
def start_capture_raw_data(self, capture_id="default"):
"""
Sometime application want to get DUT raw data and use ``expect`` method at the same time.
Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method.
If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID.
:param capture_id: ID of capture. You can use different IDs to do different captures at the same time.
"""
with self.record_data_lock:
try:
# if start capture on existed ID, we do flush data and restart capture
self.recorded_data[capture_id].flush()
except KeyError:
# otherwise, create new data cache
self.recorded_data[capture_id] = _DataCache()
def stop_capture_raw_data(self, capture_id="default"):
"""
Stop capture and get raw data.
This method should be used after ``start_capture_raw_data`` on the same capture ID.
:param capture_id: ID of capture.
:return: captured raw data between start capture and stop capture.
"""
with self.record_data_lock:
try:
ret = self.recorded_data[capture_id].get_data()
self.recorded_data.pop(capture_id)
except KeyError as e:
e.message = "capture_id does not exist. " \
"You should call start_capture_raw_data with same ID " \
"before calling stop_capture_raw_data"
raise e
return ret
# expect related methods
@staticmethod
def _expect_str(data, pattern):
"""
protected method. check if string is matched in data cache.
:param data: data to process
:param pattern: string
:return: pattern if match succeed otherwise None
"""
index = data.find(pattern)
if index != -1:
ret = pattern
index += len(pattern)
else:
ret = None
return ret, index
@staticmethod
def _expect_re(data, pattern):
"""
protected method. check if re pattern is matched in data cache
:param data: data to process
:param pattern: compiled RegEx pattern
:return: match groups if match succeed otherwise None
"""
ret = None
if isinstance(pattern.pattern, type(u'')):
pattern = re.compile(BaseDUT.u_to_bytearray(pattern.pattern))
if isinstance(data, type(u'')):
data = BaseDUT.u_to_bytearray(data)
match = pattern.search(data)
if match:
ret = tuple(None if x is None else x.decode() for x in match.groups())
index = match.end()
else:
index = -1
return ret, index
EXPECT_METHOD = [
[type(re.compile("")), "_expect_re"],
[type(b''), "_expect_str"], # Python 2 & 3 hook to work without 'from builtins import str' from future
[type(u''), "_expect_str"],
]
def _get_expect_method(self, pattern):
"""
protected method. get expect method according to pattern type.
:param pattern: expect pattern, string or compiled RegEx
:return: ``_expect_str`` or ``_expect_re``
"""
for expect_method in self.EXPECT_METHOD:
if isinstance(pattern, expect_method[0]):
method = expect_method[1]
break
else:
raise UnsupportedExpectItem()
return self.__getattribute__(method)
@_expect_lock
def expect(self, pattern, timeout=DEFAULT_EXPECT_TIMEOUT):
"""
expect(pattern, timeout=DEFAULT_EXPECT_TIMEOUT)
expect received data on DUT match the pattern. will raise exception when expect timeout.
:raise ExpectTimeout: failed to find the pattern before timeout
:raise UnsupportedExpectItem: pattern is not string or compiled RegEx
:param pattern: string or compiled RegEx(string pattern)
:param timeout: timeout for expect
:return: string if pattern is string; matched groups if pattern is RegEx
"""
method = self._get_expect_method(pattern)
# non-blocking get data for first time
data = self.data_cache.get_data(0)
start_time = time.time()
while True:
ret, index = method(data, pattern)
if ret is not None:
self.data_cache.flush(index)
break
time_remaining = start_time + timeout - time.time()
if time_remaining < 0:
break
# wait for new data from cache
data = self.data_cache.get_data(time_remaining)
if ret is None:
pattern = _pattern_to_string(pattern)
self._save_expect_failure(pattern, data, start_time)
raise ExpectTimeout(self.name + ": " + pattern)
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
"""
protected method. internal logical for expect multi.
:param expect_all: True or False, expect all items in the list or any in the list
:param expect_item_list: expect item list
:param timeout: timeout
:return: None
"""
def process_expected_item(item_raw):
# convert item raw data to standard dict
item = {
"pattern": item_raw[0] if isinstance(item_raw, tuple) else item_raw,
"method": self._get_expect_method(item_raw[0] if isinstance(item_raw, tuple)
else item_raw),
"callback": item_raw[1] if isinstance(item_raw, tuple) else None,
"index": -1,
"ret": None,
}
return item
expect_items = [process_expected_item(x) for x in expect_item_list]
# non-blocking get data for first time
data = self.data_cache.get_data(0)
start_time = time.time()
matched_expect_items = list()
while True:
for expect_item in expect_items:
if expect_item not in matched_expect_items:
# exclude those already matched
expect_item["ret"], expect_item["index"] = \
expect_item["method"](data, expect_item["pattern"])
if expect_item["ret"] is not None:
# match succeed for one item
matched_expect_items.append(expect_item)
# if expect all, then all items need to be matched,
# else only one item need to matched
if expect_all:
match_succeed = len(matched_expect_items) == len(expect_items)
else:
match_succeed = True if matched_expect_items else False
time_remaining = start_time + timeout - time.time()
if time_remaining < 0 or match_succeed:
break
else:
data = self.data_cache.get_data(time_remaining)
if match_succeed:
# sort matched items according to order of appearance in the input data,
# so that the callbacks are invoked in correct order
matched_expect_items = sorted(matched_expect_items, key=lambda it: it["index"])
# invoke callbacks and flush matched data cache
slice_index = -1
for expect_item in matched_expect_items:
# trigger callback
if expect_item["callback"]:
expect_item["callback"](expect_item["ret"])
slice_index = max(slice_index, expect_item["index"])
# flush already matched data
self.data_cache.flush(slice_index)
else:
pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
self._save_expect_failure(pattern, data, start_time)
raise ExpectTimeout(self.name + ": " + pattern)
@_expect_lock
def expect_any(self, *expect_items, **timeout):
"""
expect_any(*expect_items, timeout=DEFAULT_TIMEOUT)
expect any of the patterns.
will call callback (if provided) if pattern match succeed and then return.
will pass match result to the callback.
:raise ExpectTimeout: failed to match any one of the expect items before timeout
:raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
:arg expect_items: one or more expect items.
string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
:keyword timeout: timeout for expect
:return: None
"""
# to be compatible with python2
# in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
if "timeout" not in timeout:
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(False, expect_items, **timeout)
@_expect_lock
def expect_all(self, *expect_items, **timeout):
"""
expect_all(*expect_items, timeout=DEFAULT_TIMEOUT)
expect all of the patterns.
will call callback (if provided) if all pattern match succeed and then return.
will pass match result to the callback.
:raise ExpectTimeout: failed to match all of the expect items before timeout
:raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
:arg expect_items: one or more expect items.
string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
:keyword timeout: timeout for expect
:return: None
"""
# to be compatible with python2
# in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
if "timeout" not in timeout:
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(True, expect_items, **timeout)
@staticmethod
def _format_ts(ts):
return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
def print_debug_info(self):
"""
Print debug info of current DUT. Currently we will print debug info for expect failures.
"""
Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
for failure in self.expect_failures:
Utility.console_log(u"\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
.format(failure["pattern"], failure["data"],
self._format_ts(failure["start"]), self._format_ts(failure["end"])),
color="orange")
class SerialDUT(BaseDUT):
""" serial with logging received data feature """
DEFAULT_UART_CONFIG = {
"baudrate": 115200,
"bytesize": serial.EIGHTBITS,
"parity": serial.PARITY_NONE,
"stopbits": serial.STOPBITS_ONE,
"timeout": 0.05,
"xonxoff": False,
"rtscts": False,
}
def __init__(self, name, port, log_file, app, **kwargs):
self.port_inst = None
self.serial_configs = self.DEFAULT_UART_CONFIG.copy()
self.serial_configs.update(kwargs)
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
def _format_data(self, data):
"""
format data for logging. do decode and add timestamp.
:param data: raw data from read
:return: formatted data (str)
"""
timestamp = "[{}]".format(self._format_ts(time.time()))
formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
return formatted_data
def _port_open(self):
self.port_inst = serial.Serial(self.port, **self.serial_configs)
def _port_close(self):
self.port_inst.close()
def _port_read(self, size=1):
data = self.port_inst.read(size)
if data:
self._save_dut_log(self._format_data(data))
return data
def _port_write(self, data):
if isinstance(data, str):
data = data.encode()
self.port_inst.write(data)
@classmethod
def list_available_ports(cls):
return [x.device for x in list_ports.comports()]

View File

@@ -0,0 +1,195 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Test Env, manages DUT, App and EnvConfig, interface for test cases to access these components """
import os
import threading
import functools
import netifaces
import EnvConfig
def _synced(func):
@functools.wraps(func)
def decorator(self, *args, **kwargs):
with self.lock:
ret = func(self, *args, **kwargs)
return ret
decorator.__doc__ = func.__doc__
return decorator
class Env(object):
"""
test env, manages DUTs and env configs.
:keyword app: class for default application
:keyword dut: class for default DUT
:keyword env_tag: test env tag, used to select configs from env config file
:keyword env_config_file: test env config file path
:keyword test_name: test suite name, used when generate log folder name
"""
def __init__(self,
app=None,
dut=None,
env_tag=None,
env_config_file=None,
test_suite_name=None,
**kwargs):
self.app_cls = app
self.default_dut_cls = dut
self.config = EnvConfig.Config(env_config_file, env_tag)
self.log_path = self.app_cls.get_log_folder(test_suite_name)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
self.allocated_duts = dict()
self.lock = threading.RLock()
@_synced
def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, app_config_name=None, **dut_init_args):
"""
get_dut(dut_name, app_path, dut_class=None, app_class=None)
:param dut_name: user defined name for DUT
:param app_path: application path, app instance will use this path to process application info
:param dut_class: dut class, if not specified will use default dut class of env
:param app_class: app class, if not specified will use default app of env
:param app_config_name: app build config
:keyword dut_init_args: extra kwargs used when creating DUT instance
:return: dut instance
"""
if dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
else:
if dut_class is None:
dut_class = self.default_dut_cls
if app_class is None:
app_class = self.app_cls
detected_target = None
try:
port = self.config.get_variable(dut_name)
except ValueError:
# try to auto detect ports
allocated_ports = [self.allocated_duts[x]["port"] for x in self.allocated_duts]
available_ports = dut_class.list_available_ports()
for port in available_ports:
if port not in allocated_ports:
result, detected_target = dut_class.confirm_dut(port)
if result:
break
else:
port = None
app_target = dut_class.TARGET
if not app_target:
app_target = detected_target
if not app_target:
raise ValueError("DUT class doesn't specify the target, and autodetection failed")
app_inst = app_class(app_path, app_config_name, app_target)
if port:
try:
dut_config = self.get_variable(dut_name + "_port_config")
except ValueError:
dut_config = dict()
dut_config.update(dut_init_args)
dut = dut_class(dut_name, port,
os.path.join(self.log_path, dut_name + ".log"),
app_inst,
**dut_config)
self.allocated_duts[dut_name] = {"port": port, "dut": dut}
else:
raise ValueError("Failed to get DUT")
return dut
@_synced
def close_dut(self, dut_name):
"""
close_dut(dut_name)
close one DUT by name if DUT name is valid (the name used by ``get_dut``). otherwise will do nothing.
:param dut_name: user defined name for DUT
:return: None
"""
try:
dut = self.allocated_duts.pop(dut_name)["dut"]
dut.close()
except KeyError:
pass
@_synced
def get_variable(self, variable_name):
"""
get_variable(variable_name)
get variable from config file. If failed then try to auto-detected it.
:param variable_name: name of the variable
:return: value of variable if successfully found. otherwise None.
"""
return self.config.get_variable(variable_name)
PROTO_MAP = {
"ipv4": netifaces.AF_INET,
"ipv6": netifaces.AF_INET6,
"mac": netifaces.AF_LINK,
}
@_synced
def get_pc_nic_info(self, nic_name="pc_nic", proto="ipv4"):
"""
get_pc_nic_info(nic_name="pc_nic")
try to get info of a specified NIC and protocol.
:param nic_name: pc nic name. allows passing variable name, nic name value.
:param proto: "ipv4", "ipv6" or "mac"
:return: a dict of nic info if successfully found. otherwise None.
nic info keys could be different for different protocols.
key "addr" is available for both mac, ipv4 and ipv6 pic info.
"""
interfaces = netifaces.interfaces()
if nic_name in interfaces:
# the name is in the interface list, we regard it as NIC name
if_addr = netifaces.ifaddresses(nic_name)
else:
# it's not in interface name list, we assume it's variable name
_nic_name = self.get_variable(nic_name)
if_addr = netifaces.ifaddresses(_nic_name)
return if_addr[self.PROTO_MAP[proto]][0]
@_synced
def close(self, dut_debug=False):
"""
close()
close all DUTs of the Env.
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it
:return: exceptions during close DUT
"""
dut_close_errors = []
for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
if dut_debug:
dut.print_debug_info()
try:
dut.close()
except Exception as e:
dut_close_errors.append(e)
self.allocated_duts = dict()
return dut_close_errors

View File

@@ -0,0 +1,79 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The test env could change when we running test from different computers.
Test env config provide ``get_variable`` method to allow user get test environment related variables.
It will first try to get variable from config file.
If failed, then it will try to auto detect (Not supported yet).
Config file format is yaml. it's a set of key-value pair. The following is an example of config file::
Example_WIFI:
ap_ssid: "myssid"
ap_password: "mypassword"
Example_ShieldBox:
attenuator_port: "/dev/ttyUSB2"
ap_ssid: "myssid"
ap_password: "mypassword"
It will first define the env tag for each environment, then add its key-value pairs.
This will prevent test cases from getting configs from other env when there're configs for multiple env in one file.
"""
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader as Loader
class Config(object):
""" Test Env Config """
def __init__(self, config_file, env_tag):
self.configs = self.load_config_file(config_file, env_tag)
@staticmethod
def load_config_file(config_file, env_name):
"""
load configs from config file.
:param config_file: config file path
:param env_name: env tag name
:return: configs for the test env
"""
try:
with open(config_file) as f:
configs = yaml.load(f, Loader=Loader)[env_name]
except (OSError, TypeError, IOError):
configs = dict()
return configs
def get_variable(self, variable_name):
"""
first try to get from config file. if not found, try to auto detect the variable.
:param variable_name: name of variable
:return: value or None
"""
try:
value = self.configs[variable_name]
except KeyError:
# TODO: to support auto get variable here
value = None
if value is None:
raise ValueError("Failed to get variable")
return value

View File

@@ -0,0 +1,6 @@
.external_ap: &external_ap
ap_ssid: "myssid"
ap_password: "mypassword"
Examples_WIFI:
<<: external_ap

View File

@@ -0,0 +1,230 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Interface for test cases. """
import os
import time
import traceback
import functools
import socket
from datetime import datetime
import junit_xml
import Env
import DUT
import App
import Utility
class DefaultEnvConfig(object):
"""
default test configs. There're 3 places to set configs, priority is (high -> low):
1. overwrite set by caller of test method
2. values set by test_method decorator
3. default env config get from this class
"""
DEFAULT_CONFIG = {
"app": App.BaseApp,
"dut": DUT.BaseDUT,
"env_tag": "default",
"env_config_file": None,
"test_suite_name": None,
}
@classmethod
def set_default_config(cls, **kwargs):
"""
:param kwargs: configs need to be updated
:return: None
"""
cls.DEFAULT_CONFIG.update(kwargs)
@classmethod
def get_default_config(cls):
"""
:return: current default config
"""
return cls.DEFAULT_CONFIG.copy()
set_default_config = DefaultEnvConfig.set_default_config
get_default_config = DefaultEnvConfig.get_default_config
MANDATORY_INFO = {
"execution_time": 1,
"env_tag": "default",
"category": "function",
"ignore": False,
}
class JunitReport(object):
# wrapper for junit test report
# TODO: JunitReport methods are not thread safe (although not likely to be used this way).
JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
JUNIT_DEFAULT_TEST_SUITE = "test-suite"
JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE,
hostname=socket.gethostname(),
timestamp=datetime.utcnow().isoformat())
JUNIT_CURRENT_TEST_CASE = None
_TEST_CASE_CREATED_TS = 0
@classmethod
def output_report(cls, junit_file_path):
""" Output current test result to file. """
with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f:
cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False)
@classmethod
def get_current_test_case(cls):
"""
By default, the test framework will handle junit test report automatically.
While some test case might want to update some info to test report.
They can use this method to get current test case created by test framework.
:return: current junit test case instance created by ``JunitTestReport.create_test_case``
"""
return cls.JUNIT_CURRENT_TEST_CASE
@classmethod
def test_case_finish(cls, test_case):
"""
Append the test case to test suite so it can be output to file.
Execution time will be automatically updated (compared to ``create_test_case``).
"""
test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS
cls.JUNIT_TEST_SUITE.test_cases.append(test_case)
@classmethod
def create_test_case(cls, name):
"""
Extend ``junit_xml.TestCase`` with:
1. save create test case so it can be get by ``get_current_test_case``
2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``.
:param name: test case name
:return: instance of ``junit_xml.TestCase``
"""
# set stdout to empty string, so we can always append string to stdout.
# It won't affect output logic. If stdout is empty, it won't be put to report.
test_case = junit_xml.TestCase(name, stdout="")
cls.JUNIT_CURRENT_TEST_CASE = test_case
cls._TEST_CASE_CREATED_TS = time.time()
return test_case
@classmethod
def update_performance(cls, performance_items):
"""
Update performance results to ``stdout`` of current test case.
:param performance_items: a list of performance items. each performance item is a key-value pair.
"""
assert cls.JUNIT_CURRENT_TEST_CASE
for item in performance_items:
cls.JUNIT_CURRENT_TEST_CASE.stdout += "[{}]: {}\n".format(item[0], item[1])
def test_method(**kwargs):
"""
decorator for test case function.
The following keyword arguments are pre-defined.
Any other keyword arguments will be regarded as filter for the test case,
able to access them by ``case_info`` attribute of test method.
:keyword app: class for test app. see :doc:`App <App>` for details
:keyword dut: class for current dut. see :doc:`DUT <DUT>` for details
:keyword env_tag: name for test environment, used to select configs from config file
:keyword env_config_file: test env config file. usually will not set this keyword when define case
:keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
usually will not set this keyword when define case
:keyword junit_report_by_case: By default the test fw will handle junit report generation.
In some cases, one test function might test many test cases.
If this flag is set, test case can update junit report by its own.
"""
def test(test_func):
case_info = MANDATORY_INFO.copy()
case_info["name"] = case_info["ID"] = test_func.__name__
case_info["junit_report_by_case"] = False
case_info.update(kwargs)
@functools.wraps(test_func)
def handle_test(extra_data=None, **overwrite):
"""
create env, run test and record test results
:param extra_data: extra data that runner or main passed to test case
:param overwrite: args that runner or main want to overwrite
:return: None
"""
# create env instance
env_config = DefaultEnvConfig.get_default_config()
for key in kwargs:
if key in env_config:
env_config[key] = kwargs[key]
env_config.update(overwrite)
env_inst = Env.Env(**env_config)
# prepare for xunit test results
junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"])
junit_test_case = JunitReport.create_test_case(case_info["ID"])
result = False
try:
Utility.console_log("starting running test: " + test_func.__name__, color="green")
# execute test function
test_func(env_inst, extra_data)
# if finish without exception, test result is True
result = True
except Exception as e:
# handle all the exceptions here
traceback.print_exc()
# log failure
junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally:
# do close all DUTs, if result is False then print DUT debug info
close_errors = env_inst.close(dut_debug=(not result))
# We have a hook in DUT close, allow DUT to raise error to fail test case.
# For example, we don't allow DUT exception (reset) during test execution.
# We don't want to implement in exception detection in test function logic,
# as we need to add it to every test case.
# We can implement it in DUT receive thread,
# and raise exception in DUT close to fail test case if reset detected.
if close_errors:
for error in close_errors:
junit_test_case.add_failure_info(str(error))
result = False
if not case_info["junit_report_by_case"]:
JunitReport.test_case_finish(junit_test_case)
# end case and output result
JunitReport.output_report(junit_file_path)
if result:
Utility.console_log("Test Succeed: " + test_func.__name__, color="green")
else:
Utility.console_log(("Test Fail: " + test_func.__name__), color="red")
return result
handle_test.case_info = case_info
handle_test.test_method = True
return handle_test
return test

View File

@@ -0,0 +1,327 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common logic to assign test cases to CI jobs.
Some background knowledge about Gitlab CI and use flow in esp-idf:
* Gitlab CI jobs are static in ``.gitlab-ci.yml``. We can't dynamically create test jobs
* For test job running on DUT, we use ``tags`` to select runners with different test environment
* We have ``assign_test`` stage, will collect cases, and then assign them to correct test jobs
* ``assign_test`` will fail if failed to assign any cases
* with ``assign_test``, we can:
* dynamically filter test case we want to test
* alert user if they forget to add CI jobs and guide how to add test jobs
* the last step of ``assign_test`` is to output config files, then test jobs will run these cases
The Basic logic to assign test cases is as follow:
1. do search all the cases
2. do filter case (if filter is specified by @bot)
3. put cases to different groups according to rule of ``Group``
* try to put them in existed groups
* if failed then create a new group and add this case
4. parse and filter the test jobs from CI config file
5. try to assign all groups to jobs according to tags
6. output config files for jobs
"""
import os
import re
import json
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader as Loader
from . import (CaseConfig, SearchCases, GitlabCIJob, console_log)
class Group(object):
MAX_EXECUTION_TIME = 30
MAX_CASE = 15
SORT_KEYS = ["env_tag"]
# Matching CI job rules could be different from the way we want to group test cases.
# For example, when assign unit test cases, different test cases need to use different test functions.
# We need to put them into different groups.
# But these groups can be assigned to jobs with same tags, as they use the same test environment.
CI_JOB_MATCH_KEYS = SORT_KEYS
def __init__(self, case):
self.execution_time = 0
self.case_list = [case]
self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS]))
# we use ci_job_match_keys to match CI job tags. It's a set of required tags.
self.ci_job_match_keys = set([self._get_case_attr(case, x) for x in self.CI_JOB_MATCH_KEYS])
@staticmethod
def _get_case_attr(case, attr):
# we might use different type for case (dict or test_func)
# this method will do get attribute form cases
return case.case_info[attr]
def accept_new_case(self):
"""
check if allowed to add any case to this group
:return: True or False
"""
max_time = (sum([self._get_case_attr(x, "execution_time") for x in self.case_list])
< self.MAX_EXECUTION_TIME)
max_case = (len(self.case_list) < self.MAX_CASE)
return max_time and max_case
def add_case(self, case):
"""
add case to current group
:param case: test case
:return: True if add succeed, else False
"""
added = False
if self.accept_new_case():
for key in self.filters:
if self._get_case_attr(case, key) != self.filters[key]:
break
else:
self.case_list.append(case)
added = True
return added
def add_extra_case(self, case):
"""
By default (``add_case`` method), cases will only be added when have equal values of all filters with group.
But in some cases, we also want to add cases which are not best fit.
For example, one group has can run cases require (A, B). It can also accept cases require (A, ) and (B, ).
When assign failed by best fit, we will use this method to try if we can assign all failed cases.
If subclass want to retry, they need to overwrite this method.
Logic can be applied to handle such scenario could be different for different cases.
:return: True if accepted else False
"""
pass
def output(self):
"""
output data for job configs
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
"""
output_data = {
"Filter": self.filters,
"CaseConfig": [{"name": self._get_case_attr(x, "name")} for x in self.case_list],
}
return output_data
class AssignTest(object):
"""
Auto assign tests to CI jobs.
:param test_case_path: path of test case file(s)
:param ci_config_file: path of ``.gitlab-ci.yml``
"""
# subclass need to rewrite CI test job pattern, to filter all test jobs
CI_TEST_JOB_PATTERN = re.compile(r"^test_.+")
# by default we only run function in CI, as other tests could take long time
DEFAULT_FILTER = {
"category": "function",
"ignore": False,
}
def __init__(self, test_case_path, ci_config_file, case_group=Group):
self.test_case_path = test_case_path
self.test_cases = []
self.jobs = self._parse_gitlab_ci_config(ci_config_file)
self.case_group = case_group
@staticmethod
def _handle_parallel_attribute(job_name, job):
jobs_out = []
try:
for i in range(job["parallel"]):
jobs_out.append(GitlabCIJob.Job(job, job_name + "_{}".format(i + 1)))
except KeyError:
# Gitlab don't allow to set parallel to 1.
# to make test job name same ($CI_JOB_NAME_$CI_NODE_INDEX),
# we append "_" to jobs don't have parallel attribute
jobs_out.append(GitlabCIJob.Job(job, job_name + "_"))
return jobs_out
def _parse_gitlab_ci_config(self, ci_config_file):
with open(ci_config_file, "r") as f:
ci_config = yaml.load(f, Loader=Loader)
job_list = list()
for job_name in ci_config:
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
job_list.extend(self._handle_parallel_attribute(job_name, ci_config[job_name]))
job_list.sort(key=lambda x: x["name"])
return job_list
def _search_cases(self, test_case_path, case_filter=None):
"""
:param test_case_path: path contains test case folder
:param case_filter: filter for test cases. the filter to use is default filter updated with case_filter param.
:return: filtered test case list
"""
_case_filter = self.DEFAULT_FILTER.copy()
if case_filter:
_case_filter.update(case_filter)
test_methods = SearchCases.Search.search_test_cases(test_case_path)
return CaseConfig.filter_test_cases(test_methods, _case_filter)
def _group_cases(self):
"""
separate all cases into groups according group rules. each group will be executed by one CI job.
:return: test case groups.
"""
groups = []
for case in self.test_cases:
for group in groups:
# add to current group
if group.add_case(case):
break
else:
# create new group
groups.append(self.case_group(case))
return groups
def _assign_failed_cases(self, assigned_groups, failed_groups):
""" try to assign failed cases to already assigned test groups """
still_failed_groups = []
failed_cases = []
for group in failed_groups:
failed_cases.extend(group.case_list)
for case in failed_cases:
# first try to assign to already assigned groups
for group in assigned_groups:
if group.add_extra_case(case):
break
else:
# if failed, group the failed cases
for group in still_failed_groups:
if group.add_case(case):
break
else:
still_failed_groups.append(self.case_group(case))
return still_failed_groups
@staticmethod
def _apply_bot_filter():
"""
we support customize CI test with bot.
here we process from and return the filter which ``_search_cases`` accepts.
:return: filter for search test cases
"""
bot_filter = os.getenv("BOT_CASE_FILTER")
if bot_filter:
bot_filter = json.loads(bot_filter)
else:
bot_filter = dict()
return bot_filter
def _apply_bot_test_count(self):
"""
Bot could also pass test count.
If filtered cases need to be tested for several times, then we do duplicate them here.
"""
test_count = os.getenv("BOT_TEST_COUNT")
if test_count:
test_count = int(test_count)
self.test_cases *= test_count
@staticmethod
def _count_groups_by_keys(test_groups):
"""
Count the number of test groups by job match keys.
It's an important information to update CI config file.
"""
group_count = dict()
for group in test_groups:
key = ",".join(group.ci_job_match_keys)
try:
group_count[key] += 1
except KeyError:
group_count[key] = 1
return group_count
def assign_cases(self):
"""
separate test cases to groups and assign test cases to CI jobs.
:raise AssertError: if failed to assign any case to CI job.
:return: None
"""
failed_to_assign = []
assigned_groups = []
case_filter = self._apply_bot_filter()
self.test_cases = self._search_cases(self.test_case_path, case_filter)
self._apply_bot_test_count()
test_groups = self._group_cases()
for group in test_groups:
for job in self.jobs:
if job.match_group(group):
job.assign_group(group)
assigned_groups.append(group)
break
else:
failed_to_assign.append(group)
if failed_to_assign:
failed_to_assign = self._assign_failed_cases(assigned_groups, failed_to_assign)
# print debug info
# total requirement of current pipeline
required_group_count = self._count_groups_by_keys(test_groups)
console_log("Required job count by tags:")
for tags in required_group_count:
console_log("\t{}: {}".format(tags, required_group_count[tags]))
# number of unused jobs
not_used_jobs = [job for job in self.jobs if "case group" not in job]
if not_used_jobs:
console_log("{} jobs not used. Please check if you define too much jobs".format(len(not_used_jobs)), "O")
for job in not_used_jobs:
console_log("\t{}".format(job["name"]), "O")
# failures
if failed_to_assign:
console_log("Too many test cases vs jobs to run. "
"Please increase parallel count in tools/ci/config/target-test.yml "
"for jobs with specific tags:", "R")
failed_group_count = self._count_groups_by_keys(failed_to_assign)
for tags in failed_group_count:
console_log("\t{}: {}".format(tags, failed_group_count[tags]), "R")
raise RuntimeError("Failed to assign test case to CI jobs")
def output_configs(self, output_path):
"""
:param output_path: path to output config files for each CI job
:return: None
"""
if not os.path.exists(output_path):
os.makedirs(output_path)
for job in self.jobs:
job.output_config(output_path)

View File

@@ -0,0 +1,225 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processing case config files.
This is mainly designed for CI, we need to auto create and assign test jobs.
Template Config File::
TestConfig:
app:
package: ttfw_idf
class: Example
dut:
path:
class:
config_file: /somewhere/config_file_for_runner
test_name: CI_test_job_1
Filter:
chip: ESP32
env_tag: default
CaseConfig:
- name: test_examples_protocol_https_request
# optional
extra_data: some extra data passed to case with kwarg extra_data
overwrite: # overwrite test configs
app:
package: ttfw_idf
class: Example
- name: xxx
"""
import importlib
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader as Loader
from . import TestCase
def _convert_to_lower_case_bytes(item):
"""
bot filter is always lower case string.
this function will convert to all string to lower case.
Note: Unicode strings are converted to bytes.
"""
if isinstance(item, (tuple, list)):
output = [_convert_to_lower_case_bytes(v) for v in item]
elif isinstance(item, type(b'')):
output = item.lower()
elif isinstance(item, type(u'')):
output = item.encode().lower()
else:
output = item
return output
def _filter_one_case(test_method, case_filter):
""" Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
filter_result = True
# filter keys are lower case. Do map lower case keys with original keys.
key_mapping = {x.lower(): x for x in test_method.case_info.keys()}
for orig_key in case_filter:
key = key_mapping[orig_key]
if key in test_method.case_info:
# the filter key is both in case and filter
# we need to check if they match
filter_item = _convert_to_lower_case_bytes(case_filter[orig_key])
accepted_item = _convert_to_lower_case_bytes(test_method.case_info[key])
if isinstance(filter_item, (tuple, list)) \
and isinstance(accepted_item, (tuple, list)):
# both list/tuple, check if they have common item
filter_result = True if set(filter_item) & set(accepted_item) else False
elif isinstance(filter_item, (tuple, list)):
# filter item list/tuple, check if case accepted value in filter item list/tuple
filter_result = True if accepted_item in filter_item else False
elif isinstance(accepted_item, (tuple, list)):
# accepted item list/tuple, check if case filter value is in accept item list/tuple
filter_result = True if filter_item in accepted_item else False
else:
if type(filter_item) != type(accepted_item):
# This will catch silent ignores of test cases when Unicode and bytes are compared
raise AssertionError(filter_item, '!=', accepted_item)
# both string/int, just do string compare
filter_result = (filter_item == accepted_item)
else:
# key in filter only, which means the case supports all values for this filter key, match succeed
pass
if not filter_result:
# match failed
break
return filter_result
def filter_test_cases(test_methods, case_filter):
"""
filter test case. filter logic:
1. if filter key both in case attribute and filter:
* if both value is string/int, then directly compare
* if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
* if both are list/tuple, then check if they have common item
2. if only case attribute or filter have the key, filter succeed
3. will do case insensitive compare for string
for example, the following are match succeed scenarios
(the rule is symmetric, result is same if exchange values for user filter and case attribute):
* user case filter is ``chip: ["esp32", "esp32c"]``, case doesn't have ``chip`` attribute
* user case filter is ``chip: ["esp32", "esp32c"]``, case attribute is ``chip: "esp32"``
* user case filter is ``chip: "esp32"``, case attribute is ``chip: "esp32"``
:param test_methods: a list of test methods functions
:param case_filter: case filter
:return: filtered test methods
"""
filtered_test_methods = []
for test_method in test_methods:
if _filter_one_case(test_method, case_filter):
filtered_test_methods.append(test_method)
return filtered_test_methods
class Parser(object):
DEFAULT_CONFIG = {
"TestConfig": dict(),
"Filter": dict(),
"CaseConfig": [{"extra_data": None}],
}
@classmethod
def parse_config_file(cls, config_file):
"""
parse from config file and then update to default config.
:param config_file: config file path
:return: configs
"""
configs = cls.DEFAULT_CONFIG.copy()
if config_file:
with open(config_file, "r") as f:
configs.update(yaml.load(f, Loader=Loader))
return configs
@classmethod
def handle_overwrite_args(cls, overwrite):
"""
handle overwrite configs. import module from path and then get the required class.
:param overwrite: overwrite args
:return: dict of (original key: class)
"""
output = dict()
for key in overwrite:
module = importlib.import_module(overwrite[key]["package"])
output[key] = module.__getattribute__(overwrite[key]["class"])
return output
@classmethod
def apply_config(cls, test_methods, config_file):
"""
apply config for test methods
:param test_methods: a list of test methods functions
:param config_file: case filter file
:return: filtered cases
"""
configs = cls.parse_config_file(config_file)
test_case_list = []
for _config in configs["CaseConfig"]:
_filter = configs["Filter"].copy()
_overwrite = cls.handle_overwrite_args(_config.pop("overwrite", dict()))
_extra_data = _config.pop("extra_data", None)
_filter.update(_config)
for test_method in test_methods:
if _filter_one_case(test_method, _filter):
test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite))
return test_case_list
class Generator(object):
""" Case config file generator """
def __init__(self):
self.default_config = {
"TestConfig": dict(),
"Filter": dict(),
}
def set_default_configs(self, test_config, case_filter):
"""
:param test_config: "TestConfig" value
:param case_filter: "Filter" value
:return: None
"""
self.default_config = {"TestConfig": test_config, "Filter": case_filter}
def generate_config(self, case_configs, output_file):
"""
:param case_configs: "CaseConfig" value
:param output_file: output file path
:return: None
"""
config = self.default_config.copy()
config.update({"CaseConfig": case_configs})
with open(output_file, "w") as f:
yaml.dump(config, f)

View File

@@ -0,0 +1,65 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import yaml
class Job(dict):
"""
Gitlab CI job
:param job: job data loaded from .gitlab-ci.yml
:param job_name: job name
"""
def __init__(self, job, job_name):
super(Job, self).__init__(job)
self["name"] = job_name
self.tags = set(self["tags"])
def match_group(self, group):
"""
Match group by tags of job.
All filters values of group should be included in tags.
:param group: case group to match
:return: True or False
"""
match_result = False
if "case group" not in self and group.ci_job_match_keys == self.tags:
# group not assigned and all tags match
match_result = True
return match_result
def assign_group(self, group):
"""
assign a case group to a test job.
:param group: the case group to assign
"""
self["case group"] = group
def output_config(self, file_path):
"""
output test config to the given path.
file name will be job_name.yml
:param file_path: output file path
:return: None
"""
file_name = os.path.join(file_path, self["name"] + ".yml")
if "case group" in self:
with open(file_name, "w") as f:
yaml.dump(self["case group"].output(), f, default_flow_style=False)

View File

@@ -0,0 +1,111 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" search test cases from a given file or path """
import os
import fnmatch
import types
import copy
from . import load_source
class Search(object):
TEST_CASE_FILE_PATTERN = "*_test.py"
@classmethod
def _search_cases_from_file(cls, file_name):
""" get test cases from test case .py file """
print("Try to get cases from: " + file_name)
test_functions = []
try:
mod = load_source(file_name)
for func in [mod.__getattribute__(x) for x in dir(mod)
if isinstance(mod.__getattribute__(x), types.FunctionType)]:
try:
# test method decorator will add test_method attribute to test function
if func.test_method:
test_functions.append(func)
except AttributeError:
continue
except ImportError as e:
print("ImportError: \r\n\tFile:" + file_name + "\r\n\tError:" + str(e))
for i, test_function in enumerate(test_functions):
print("\t{}. ".format(i + 1) + test_function.case_info["name"])
return test_functions
@classmethod
def _search_test_case_files(cls, test_case, file_pattern):
""" search all test case files recursively of a path """
if not os.path.exists(test_case):
raise OSError("test case path not exist")
if os.path.isdir(test_case):
test_case_files = []
for root, _, file_names in os.walk(test_case):
for filename in fnmatch.filter(file_names, file_pattern):
test_case_files.append(os.path.join(root, filename))
else:
test_case_files = [test_case]
return test_case_files
@classmethod
def replicate_case(cls, case):
"""
Replicate cases according to its filter values.
If one case has specified filter chip=(ESP32, ESP32C),
it will create 2 cases, one for ESP32 and on for ESP32C.
Once the cases are replicated, it's easy to filter those we want to execute.
:param case: the original case
:return: a list of replicated cases
"""
replicate_config = []
for key in case.case_info:
if isinstance(case.case_info[key], (list, tuple)):
replicate_config.append(key)
def _replicate_for_key(case_list, replicate_key, replicate_list):
case_out = []
for _case in case_list:
for value in replicate_list:
new_case = copy.deepcopy(_case)
new_case.case_info[replicate_key] = value
case_out.append(new_case)
return case_out
replicated_cases = [case]
for key in replicate_config:
replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key])
return replicated_cases
@classmethod
def search_test_cases(cls, test_case):
"""
search all test cases from a folder or file, and then do case replicate.
:param test_case: test case file(s) path
:return: a list of replicated test methods
"""
test_case_files = cls._search_test_case_files(test_case, cls.TEST_CASE_FILE_PATTERN)
test_cases = []
for test_case_file in test_case_files:
test_cases += cls._search_cases_from_file(test_case_file)
# handle replicate cases
test_case_out = []
for case in test_cases:
test_case_out += cls.replicate_case(case)
return test_case_out

View File

@@ -0,0 +1,58 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader as Loader
class TestCase(object):
"""
Test Case Object, mainly used with runner.
runner can parse all test cases from a given path, set data and config for test case in prepare stage.
TestCase instance will record these data, provide run method to let runner execute test case.
:param test_method: test function
:param extra_data: data passed to test function
:param overwrite_args: kwargs that overwrite original test case configs
"""
DEFAULT_CASE_DOC = dict()
def __init__(self, test_method, extra_data, **overwrite_args):
self.test_method = test_method
self.extra_data = extra_data
self.overwrite_args = overwrite_args
def run(self):
""" execute the test case """
return self.test_method(self.extra_data, **self.overwrite_args)
def document(self):
"""
generate test case document.
parse the case doc with yaml parser and update to original case attributes.
:return: case document, dict of case attributes and values
"""
doc_string = self.test_method.__doc__
try:
doc = yaml.load(doc_string, Loader=Loader)
except (AttributeError, OSError, UnicodeDecodeError):
doc = self.DEFAULT_CASE_DOC
doc.update(self.test_method.env_args)
doc.update(self.test_method.accepted_filter)
return doc

View File

@@ -0,0 +1,71 @@
from __future__ import print_function
import os.path
import sys
_COLOR_CODES = {
"white": u'\033[0m',
"red": u'\033[31m',
"green": u'\033[32m',
"orange": u'\033[33m',
"blue": u'\033[34m',
"purple": u'\033[35m',
"W": u'\033[0m',
"R": u'\033[31m',
"G": u'\033[32m',
"O": u'\033[33m',
"B": u'\033[34m',
"P": u'\033[35m'
}
def console_log(data, color="white", end="\n"):
"""
log data to console.
(if not flush console log, Gitlab-CI won't update logs during job execution)
:param data: data content
:param color: color
"""
if color not in _COLOR_CODES:
color = "white"
color_codes = _COLOR_CODES[color]
if isinstance(data, type(b'')):
data = data.decode('utf-8', 'replace')
print(color_codes + data, end=end)
if color not in ["white", "W"]:
# reset color to white for later logs
print(_COLOR_CODES["white"] + u"\r")
sys.stdout.flush()
__LOADED_MODULES = dict()
# we should only load one module once.
# if we load one module twice,
# python will regard the same object loaded in the first time and second time as different objects.
# it will lead to strange errors like `isinstance(object, type_of_this_object)` return False
def load_source(path):
"""
Dynamic loading python file. Note that this function SHOULD NOT be used to replace ``import``.
It should only be used when the package path is only available in runtime.
:param path: The path of python file
:return: Loaded object
"""
path = os.path.realpath(path)
# load name need to be unique, otherwise it will update the already loaded module
load_name = str(len(__LOADED_MODULES))
try:
return __LOADED_MODULES[path]
except KeyError:
try:
from importlib.machinery import SourceFileLoader
ret = SourceFileLoader(load_name, path).load_module()
except ImportError:
# importlib.machinery doesn't exists in Python 2 so we will use imp (deprecated in Python 3)
import imp
ret = imp.load_source(load_name, path)
__LOADED_MODULES[path] = ret
return ret

View File

@@ -0,0 +1,84 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command line interface to run test cases from a given path.
* search and run test cases of a given path
* config file which support to filter test cases and passing data to test case
Use ``python Runner.py test_case_path -c config_file -e env_config_file`` to run test cases.
"""
import os
import sys
import argparse
import threading
from tiny_test_fw import TinyFW
from tiny_test_fw.Utility import SearchCases, CaseConfig
class Runner(threading.Thread):
"""
:param test_case: test case file or folder
:param case_config: case config file, allow to filter test cases and pass data to test case
:param env_config_file: env config file
"""
def __init__(self, test_case, case_config, env_config_file=None):
super(Runner, self).__init__()
self.setDaemon(True)
if case_config:
test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
else:
test_suite_name = "TestRunner"
TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
test_methods = SearchCases.Search.search_test_cases(test_case)
self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
self.test_result = []
def run(self):
for case in self.test_cases:
result = case.run()
self.test_result.append(result)
def get_test_result(self):
return self.test_result and all(self.test_result)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("test_case",
help="test case folder or file")
parser.add_argument("--case_config", "-c", default=None,
help="case filter/config file")
parser.add_argument("--env_config_file", "-e", default=None,
help="test env config file")
args = parser.parse_args()
runner = Runner(args.test_case, args.case_config, args.env_config_file)
runner.start()
while True:
try:
runner.join(1)
if not runner.isAlive():
break
except KeyboardInterrupt:
print("exit by Ctrl-C")
break
if not runner.get_test_result():
sys.exit(1)

View File

@@ -0,0 +1,55 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" example of writing test with TinyTestFW """
import re
import os
import sys
try:
import TinyFW
except ImportError:
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
from IDF.IDFDUT import ESP32DUT
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_https_request(env, extra_data):
"""
steps: |
1. join AP
2. connect to www.howsmyssl.com:443
3. send http request
"""
dut1 = env.get_dut("https_request", "examples/protocols/https_request", dut_class=ESP32DUT)
dut1.start_app()
dut1.expect(re.compile(r"Connecting to www.howsmyssl.com:443"), timeout=30)
dut1.expect("Performing the SSL/TLS handshake")
dut1.expect("Certificate verified.", timeout=15)
dut1.expect_all(re.compile(r"Cipher suite is TLS-ECDHE-RSA-WITH-AES-128-GCM-SHA256"),
"Reading HTTP response",
timeout=20)
dut1.expect(re.compile(r"Completed (\d) requests"))
if __name__ == '__main__':
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
test_examples_protocol_https_request()

View File

@@ -0,0 +1,26 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXAPI = sphinx-apidoc
SPHINXAPISRC = ..
SPHINXBUILD = python -msphinx
SPHINXPROJ = TinyTestFW
SOURCEDIR = .
BUILDDIR = _build
# define the files to be excluded here
EXCLUEDLIST = "$(SPHINXAPISRC)/example.py"
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXAPI) -o $(SOURCEDIR) $(SPHINXAPISRC) $(EXCLUEDLIST)
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

View File

@@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
#
# TinyTestFW documentation build configuration file, created by
# sphinx-quickstart on Thu Sep 21 20:19:12 2017.
#
# This file is execfile()d with the current directory set to its
# containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
# import sphinx_rtd_theme
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc',
'sphinx.ext.viewcode',
'plantweb.directive']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'TinyTestFW'
copyright = u'2017, Espressif'
author = u'Espressif'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = u'0.1'
# The full version, including alpha/beta/rc tags.
release = u'0.1'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = False
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'TinyTestFWdoc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'TinyTestFW.tex', u'TinyTestFW Documentation',
u'He Yinling', 'manual'),
]
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'tinytestfw', u'TinyTestFW Documentation',
[author], 1)
]
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'TinyTestFW', u'TinyTestFW Documentation',
author, 'TinyTestFW', 'One line description of project.',
'Miscellaneous'),
]

View File

@@ -0,0 +1,204 @@
.. TinyTestFW documentation master file, created by
sphinx-quickstart on Thu Sep 21 20:19:12 2017.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to TinyTestFW's documentation!
======================================
We have a lot of test which depends on interact with DUT via communication port.
Usually we send command to the port and then check response to see if the test succeed.
TinyTestFW is designed for such scenarios.
It supports ESP-IDF applications and can be adapted to other applications by writing new bundles.
Example
-------
Let's first check a simple example::
import re
import os
import sys
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path:
sys.path.insert(0, test_fw_path)
import TinyFW
from IDF import IDFApp, IDFDUT
@TinyFW.test_method(app=IDFApp.Example, dut=IDFDUT.IDFDUT, env_tag="Example_WIFI",
chip="ESP32", module="examples", execution_time=1)
def test_examples_protocol_https_request(env, extra_data):
"""
steps: |
1. join AP
2. connect to www.howsmyssl.com:443
3. send http request
"""
dut1 = env.get_dut("https_request", "examples/protocols/https_request")
dut1.start_app()
dut1.expect("Connecting to www.howsmyssl.com:443", timeout=30)
dut1.expect("Performing the SSL/TLS handshake")
dut1.expect("Certificate verified.", timeout=15)
dut1.expect_all(re.compile(r"Cipher suite is TLS-ECDHE-RSA-WITH-AES-128-GCM-SHA256"),
"Reading HTTP response",
timeout=20)
dut1.expect(re.compile(r"Completed (\d) requests"))
if __name__ == '__main__':
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml")
test_examples_protocol_https_request()
SOP for adding test cases
-------------------------
1. Import test framework:
^^^^^^^^^^^^^^^^^^^^^^^^^
* We assume ``TEST_FW_PATH`` is pre-defined before running the tests
* Then we can import python packages and files from ``TEST_FW_PATH``
2. Define test case:
^^^^^^^^^^^^^^^^^^^^
1. Define test case ``test_xxx(env, extra_data)``
* env: instance of test env, see :doc:`Test Env <Env>` for details
* extra_data: extra data passed from test case caller
2. Add decorator for test case
* add decorator ``TinyFW.test_method`` to test method
* define default case configs and filters in decorator, see :doc:`TinyFW.test_method <TinyFW>`
3. Execute test cases:
^^^^^^^^^^^^^^^^^^^^^^
* define in ``main`` section and execute from this file
1. set preset configs(optional). If the config is not define in case decorator, it will use the preset configs.
2. call test case method:
* if you don't pass any arguments, it will use default values
* you can pass ``extra_data`` to test case by adding ``extra_data=some_data`` as kwarg of test case method.
default value for extra_data is None.
* you can overwrite test case config by adding them as kwarg of test case method.
It will overwrite preset configs and case default configs.
Examples::
test_examples_protocol_https_request(extra_data=["data1", "data2"], dut=SomeOtherDUT, env_tag="OtherEnv")
* or, use ``runner`` to execute. see :doc:`runner <Runner>` for details
Test FW features
----------------
1. Test Environment:
1. DUT: DUT class provides methods to interact with DUT
* read/write through port
* expect method which supports expect one or multiple string or RegEx
* tool methods provided by the tool bundle, like ``start_app``, ``reset``
2. App:
* provide some specific features to the test application of DUT, for example:
* SDK path
* SDK tools
* application information like partition table, download configs
3. Environment Configs:
* support get env configs from config file or auto-detect from current PC
* provide ``get_variable`` method to get variables
2. Allow to customize components (DUT, App) to support different devices
3. Integrate to CI:
* provide interfaces for Gitlab-CI
* provide ``search case`` and ``runner`` interfaces, able to integrate with other CI
Class Diagram
=============
.. uml::
class BaseDUT {
{field} app
{method} expect
{method} expect_any
{method} expect_all
{method} read
{method} write
{method} start_receive
{method} stop_receive
{method} close
}
class SerialDUT {
{method} _port_read
{method} _port_write
{method} _port_open
{method} _port_close
}
class IDFDUT {
{method} reset
{method} start_app
}
class BaseApp {
{method} get_sdk_path
{method} get_log_folder
}
class IDFApp {
{field} flash_files
{field} flash_settings
{field} partition_table
}
class Example {
{method} get_binary_path
}
class EnvConfig {
{method} get_variable
}
class Env {
{field} config
{field} allocated_duts
{field} app_cls
{method} get_dut
{method} close_dut
{method} get_variable
{method} get_pc_nic_info
{method} close
}
SerialDUT --|> BaseDUT
IDFDUT --|> SerialDUT
IDFApp --|> BaseApp
Example --|> IDFApp
Env *-- EnvConfig
Env *-- BaseDUT
Env o-- BaseApp
BaseDUT o-- BaseApp
.. toctree::
:maxdepth: 2
:caption: Contents:
modules
Dependencies
============
Support for both Python2 and Python3 (tested on python 2.7.13 and 3.6.2).
The following 3rd party lib is required:
* pyserial
* pyyaml
* junit_xml
* netifaces
* matplotlib (if use Utility.LineChart)
These libraries can be installed by running ``pip install --user -r requirements.txt`` in tiny-test-fw directory.
To build document, we need to install ``Sphinx``, ``plantweb`` and ``sphinx-rtd-theme`` (you may replace this with your own theme). ``plantweb`` requires internet access during building document.
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@@ -0,0 +1,5 @@
pyserial
pyyaml
junit_xml
netifaces
matplotlib