tiny-test-fw: support detect exception in IDFDUT

This commit is contained in:
He Yin Ling
2019-03-16 20:07:52 +08:00
committed by bot
parent 11d141e87d
commit cbc438c807
4 changed files with 164 additions and 46 deletions

View File

@@ -42,19 +42,19 @@ import time
import re
import threading
import copy
import sys
import functools
# python2 and python3 queue package name is different
try:
import Queue as _queue
except ImportError:
import queue as _queue
import serial
from serial.tools import list_ports
import Utility
if sys.version_info[0] == 2:
import Queue as _queue
else:
import queue as _queue
class ExpectTimeout(ValueError):
""" timeout for expect method """
@@ -201,55 +201,61 @@ class _LogThread(threading.Thread, _queue.Queue):
self.flush_data()
class _RecvThread(threading.Thread):
class RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
CHECK_FUNCTIONS = []
""" DUT subclass can define a few check functions to process received data. """
def __init__(self, read, data_cache, recorded_data, record_data_lock):
super(_RecvThread, self).__init__()
super(RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.data_cache = data_cache
self.recorded_data = recorded_data
self.record_data_lock = record_data_lock
# cache the last line of recv data for collecting performance
self._line_cache = str()
def collect_performance(self, data):
""" collect performance """
if data:
decoded_data = _decode_data(data)
def _line_completion(self, data):
"""
Usually check functions requires to check for one complete line.
This method will do line completion for the first line, and strip incomplete last line.
"""
ret = self._line_cache
decoded_data = _decode_data(data)
matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data)
for match in matches:
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
color="orange")
# cache incomplete line to later process
lines = decoded_data.splitlines(True)
last_line = lines[-1]
# cache incomplete line to later process
lines = decoded_data.splitlines(True)
last_line = lines[-1]
if last_line[-1] != "\n":
if len(lines) == 1:
# only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
else:
# more than one line and not finished, replace line cache
self._line_cache = lines[-1]
if last_line[-1] != "\n":
if len(lines) == 1:
# only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
ret = str()
else:
# line finishes, flush cache
self._line_cache = str()
# more than one line and not finished, replace line cache
self._line_cache = lines[-1]
ret += "".join(lines[:-1])
else:
# line finishes, flush cache
self._line_cache = str()
ret += decoded_data
return ret
def run(self):
while not self.exit_event.isSet():
data = self.read(1000)
if data:
raw_data = self.read(1000)
if raw_data:
with self.record_data_lock:
self.data_cache.put(data)
self.data_cache.put(raw_data)
for capture_id in self.recorded_data:
self.recorded_data[capture_id].put(data)
self.collect_performance(data)
self.recorded_data[capture_id].put(raw_data)
# we need to do line completion before call check functions
comp_data = self._line_completion(raw_data)
for check_function in self.CHECK_FUNCTIONS:
check_function(self, comp_data)
def exit(self):
self.exit_event.set()
@@ -267,6 +273,9 @@ class BaseDUT(object):
DEFAULT_EXPECT_TIMEOUT = 10
MAX_EXPECT_FAILURES_TO_SAVED = 10
RECV_THREAD_CLS = RecvThread
""" DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data.
For example, DUT can implement exception detect & analysis logic in receive thread subclass. """
LOG_THREAD = _LogThread()
LOG_THREAD.start()
@@ -398,8 +407,8 @@ class BaseDUT(object):
:return: None
"""
self.receive_thread = _RecvThread(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread.start()
def stop_receive(self):
@@ -429,9 +438,9 @@ class BaseDUT(object):
if isinstance(data, type(u'')):
try:
data = data.encode('utf-8')
except Exception:
except Exception as e:
print(u'Cannot encode {} of type {}'.format(data, type(data)))
raise
raise e
return data
def write(self, data, eol="\r\n", flush=True):