style: format python files with isort and double-quote-string-fixer

This commit is contained in:
Fu Hanxi
2021-01-26 10:49:01 +08:00
parent dc8402ea61
commit 0146f258d7
276 changed files with 8241 additions and 8162 deletions

View File

@@ -13,14 +13,15 @@
# limitations under the License.
""" DUT for IDF applications """
import functools
import os
import os.path
import sys
import re
import functools
import tempfile
import subprocess
import sys
import tempfile
import time
import pexpect
# python2 and python3 queue package name is different
@@ -29,18 +30,16 @@ try:
except ImportError:
import queue as _queue
from serial.tools import list_ports
from tiny_test_fw import DUT, Utility
try:
import esptool
except ImportError: # cheat and use IDF's copy of esptool if available
idf_path = os.getenv("IDF_PATH")
idf_path = os.getenv('IDF_PATH')
if not idf_path or not os.path.exists(idf_path):
raise
sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
sys.path.insert(0, os.path.join(idf_path, 'components', 'esptool_py', 'esptool'))
import esptool
@@ -54,14 +53,14 @@ class IDFDUTException(RuntimeError):
class IDFRecvThread(DUT.RecvThread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
PERFORMANCE_PATTERN = re.compile(r'\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n')
EXCEPTION_PATTERNS = [
re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)'),
re.compile(r'(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))')
]
BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
BACKTRACE_ADDRESS_PATTERN = re.compile(r"(0x[0-9a-f]{8}):0x[0-9a-f]{8}")
BACKTRACE_PATTERN = re.compile(r'Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)')
BACKTRACE_ADDRESS_PATTERN = re.compile(r'(0x[0-9a-f]{8}):0x[0-9a-f]{8}')
def __init__(self, read, dut):
super(IDFRecvThread, self).__init__(read, dut)
@@ -71,8 +70,8 @@ class IDFRecvThread(DUT.RecvThread):
def collect_performance(self, comp_data):
matches = self.PERFORMANCE_PATTERN.findall(comp_data)
for match in matches:
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
color="orange")
Utility.console_log('[Performance][{}]: {}'.format(match[0], match[1]),
color='orange')
self.performance_items.put((match[0], match[1]))
def detect_exception(self, comp_data):
@@ -83,7 +82,7 @@ class IDFRecvThread(DUT.RecvThread):
if match:
start = match.end()
self.exceptions.put(match.group(0))
Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
Utility.console_log('[Exception]: {}'.format(match.group(0)), color='red')
else:
break
@@ -93,18 +92,18 @@ class IDFRecvThread(DUT.RecvThread):
match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
if match:
start = match.end()
Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
Utility.console_log('[Backtrace]:{}'.format(match.group(1)), color='red')
# translate backtrace
addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
translated_backtrace = ""
translated_backtrace = ''
for addr in addresses:
ret = self.dut.lookup_pc_address(addr)
if ret:
translated_backtrace += ret + "\n"
translated_backtrace += ret + '\n'
if translated_backtrace:
Utility.console_log("Translated backtrace\n:" + translated_backtrace, color="yellow")
Utility.console_log('Translated backtrace\n:' + translated_backtrace, color='yellow')
else:
Utility.console_log("Failed to translate backtrace", color="yellow")
Utility.console_log('Failed to translate backtrace', color='yellow')
else:
break
@@ -149,7 +148,7 @@ class IDFDUT(DUT.SerialDUT):
# /dev/ttyAMA0 port is listed in Raspberry Pi
# /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
INVALID_PORT_PATTERN = re.compile(r'AMA|Bluetooth')
# if need to erase NVS partition in start app
ERASE_NVS = True
RECV_THREAD_CLS = IDFRecvThread
@@ -163,7 +162,7 @@ class IDFDUT(DUT.SerialDUT):
@classmethod
def _get_rom(cls):
raise NotImplementedError("This is an abstraction class, method not defined.")
raise NotImplementedError('This is an abstraction class, method not defined.')
@classmethod
def get_mac(cls, app, port):
@@ -200,7 +199,7 @@ class IDFDUT(DUT.SerialDUT):
# Otherwise overwrite it in ESP8266DUT
inst = esptool.ESPLoader.detect_chip(port)
if expected_rom_class and type(inst) != expected_rom_class:
raise RuntimeError("Target not expected")
raise RuntimeError('Target not expected')
return inst.read_mac() is not None, get_target_by_rom_class(type(inst))
except(esptool.FatalError, RuntimeError):
return False, None
@@ -227,7 +226,7 @@ class IDFDUT(DUT.SerialDUT):
# and encrypt_files contains the ones to flash encrypted.
flash_files = self.app.flash_files
encrypt_files = self.app.encrypt_files
encrypt = self.app.flash_settings.get("encrypt", False)
encrypt = self.app.flash_settings.get('encrypt', False)
if encrypt:
flash_files = encrypt_files
encrypt_files = []
@@ -236,12 +235,12 @@ class IDFDUT(DUT.SerialDUT):
for entry in flash_files
if entry not in encrypt_files]
flash_files = [(offs, open(path, "rb")) for (offs, path) in flash_files]
encrypt_files = [(offs, open(path, "rb")) for (offs, path) in encrypt_files]
flash_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
encrypt_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
if erase_nvs:
address = self.app.partition_table["nvs"]["offset"]
size = self.app.partition_table["nvs"]["size"]
address = self.app.partition_table['nvs']['offset']
size = self.app.partition_table['nvs']['size']
nvs_file = tempfile.TemporaryFile()
nvs_file.write(b'\xff' * size)
nvs_file.seek(0)
@@ -252,7 +251,7 @@ class IDFDUT(DUT.SerialDUT):
# Get the CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
# value. If it is set to True, then NVS is always encrypted.
sdkconfig_dict = self.app.get_sdkconfig()
macro_encryption = "CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT" in sdkconfig_dict
macro_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
# If the macro is not enabled (plain text flash) or all files
# must be encrypted, add NVS to flash_files.
if not macro_encryption or encrypt:
@@ -270,9 +269,9 @@ class IDFDUT(DUT.SerialDUT):
# write_flash expects the parameter encrypt_files to be None and not
# an empty list, so perform the check here
flash_args = FlashArgs({
'flash_size': self.app.flash_settings["flash_size"],
'flash_mode': self.app.flash_settings["flash_mode"],
'flash_freq': self.app.flash_settings["flash_freq"],
'flash_size': self.app.flash_settings['flash_size'],
'flash_mode': self.app.flash_settings['flash_mode'],
'flash_freq': self.app.flash_settings['flash_freq'],
'addr_filename': flash_files,
'encrypt_files': encrypt_files or None,
'no_stub': False,
@@ -328,9 +327,9 @@ class IDFDUT(DUT.SerialDUT):
"""
raise NotImplementedError() # TODO: implement this
# address = self.app.partition_table[partition]["offset"]
size = self.app.partition_table[partition]["size"]
size = self.app.partition_table[partition]['size']
# TODO can use esp.erase_region() instead of this, I think
with open(".erase_partition.tmp", "wb") as f:
with open('.erase_partition.tmp', 'wb') as f:
f.write(chr(0xFF) * size)
@_uses_esptool
@@ -356,18 +355,18 @@ class IDFDUT(DUT.SerialDUT):
"""
if os.path.isabs(output_file) is False:
output_file = os.path.relpath(output_file, self.app.get_log_folder())
if "partition" in kwargs:
partition = self.app.partition_table[kwargs["partition"]]
_address = partition["offset"]
_size = partition["size"]
elif "address" in kwargs and "size" in kwargs:
_address = kwargs["address"]
_size = kwargs["size"]
if 'partition' in kwargs:
partition = self.app.partition_table[kwargs['partition']]
_address = partition['offset']
_size = partition['size']
elif 'address' in kwargs and 'size' in kwargs:
_address = kwargs['address']
_size = kwargs['size']
else:
raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
content = esp.read_flash(_address, _size)
with open(output_file, "wb") as f:
with open(output_file, 'wb') as f:
f.write(content)
@classmethod
@@ -405,9 +404,9 @@ class IDFDUT(DUT.SerialDUT):
return ports
def lookup_pc_address(self, pc_addr):
cmd = ["%saddr2line" % self.TOOLCHAIN_PREFIX,
"-pfiaC", "-e", self.app.elf_file, pc_addr]
ret = ""
cmd = ['%saddr2line' % self.TOOLCHAIN_PREFIX,
'-pfiaC', '-e', self.app.elf_file, pc_addr]
ret = ''
try:
translation = subprocess.check_output(cmd)
ret = translation.decode()
@@ -439,7 +438,7 @@ class IDFDUT(DUT.SerialDUT):
def stop_receive(self):
if self.receive_thread:
for name in ["performance_items", "exceptions"]:
for name in ['performance_items', 'exceptions']:
source_queue = getattr(self.receive_thread, name)
dest_queue = getattr(self, name)
self._queue_copy(source_queue, dest_queue)
@@ -447,7 +446,7 @@ class IDFDUT(DUT.SerialDUT):
def get_exceptions(self):
""" Get exceptions detected by DUT receive thread. """
return self._get_from_queue("exceptions")
return self._get_from_queue('exceptions')
def get_performance_items(self):
"""
@@ -456,18 +455,18 @@ class IDFDUT(DUT.SerialDUT):
:return: a list of performance items.
"""
return self._get_from_queue("performance_items")
return self._get_from_queue('performance_items')
def close(self):
super(IDFDUT, self).close()
if not self.allow_dut_exception and self.get_exceptions():
Utility.console_log("DUT exception detected on {}".format(self), color="red")
Utility.console_log('DUT exception detected on {}'.format(self), color='red')
raise IDFDUTException()
class ESP32DUT(IDFDUT):
TARGET = "esp32"
TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
TARGET = 'esp32'
TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
@classmethod
def _get_rom(cls):
@@ -478,8 +477,8 @@ class ESP32DUT(IDFDUT):
class ESP32S2DUT(IDFDUT):
TARGET = "esp32s2"
TOOLCHAIN_PREFIX = "xtensa-esp32s2-elf-"
TARGET = 'esp32s2'
TOOLCHAIN_PREFIX = 'xtensa-esp32s2-elf-'
@classmethod
def _get_rom(cls):
@@ -490,8 +489,8 @@ class ESP32S2DUT(IDFDUT):
class ESP32C3DUT(IDFDUT):
TARGET = "esp32c3"
TOOLCHAIN_PREFIX = "riscv32-esp-elf-"
TARGET = 'esp32c3'
TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
@classmethod
def _get_rom(cls):
@@ -502,8 +501,8 @@ class ESP32C3DUT(IDFDUT):
class ESP8266DUT(IDFDUT):
TARGET = "esp8266"
TOOLCHAIN_PREFIX = "xtensa-lx106-elf-"
TARGET = 'esp8266'
TOOLCHAIN_PREFIX = 'xtensa-lx106-elf-'
@classmethod
def _get_rom(cls):
@@ -528,34 +527,34 @@ class IDFQEMUDUT(IDFDUT):
QEMU_SERIAL_PORT = 3334
def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix=".bin", prefix="qemu_flash_img")
self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix='.bin', prefix='qemu_flash_img')
self.app = app
self.flash_size = 4 * 1024 * 1024
self._write_flash_img()
args = [
"qemu-system-xtensa",
"-nographic",
"-machine", self.TARGET,
"-drive", "file={},if=mtd,format=raw".format(self.flash_image.name),
"-nic", "user,model=open_eth",
"-serial", "tcp::{},server,nowait".format(self.QEMU_SERIAL_PORT),
"-S",
"-global driver=timer.esp32.timg,property=wdt_disable,value=true"]
'qemu-system-xtensa',
'-nographic',
'-machine', self.TARGET,
'-drive', 'file={},if=mtd,format=raw'.format(self.flash_image.name),
'-nic', 'user,model=open_eth',
'-serial', 'tcp::{},server,nowait'.format(self.QEMU_SERIAL_PORT),
'-S',
'-global driver=timer.esp32.timg,property=wdt_disable,value=true']
# TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU
if "QEMU_BIOS_PATH" in os.environ:
args += ["-L", os.environ["QEMU_BIOS_PATH"]]
if 'QEMU_BIOS_PATH' in os.environ:
args += ['-L', os.environ['QEMU_BIOS_PATH']]
self.qemu = pexpect.spawn(" ".join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
self.qemu.expect_exact(b"(qemu)")
self.qemu = pexpect.spawn(' '.join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
self.qemu.expect_exact(b'(qemu)')
super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
def _write_flash_img(self):
self.flash_image.seek(0)
self.flash_image.write(b'\x00' * self.flash_size)
for offs, path in self.app.flash_files:
with open(path, "rb") as flash_file:
with open(path, 'rb') as flash_file:
contents = flash_file.read()
self.flash_image.seek(offs)
self.flash_image.write(contents)
@@ -568,7 +567,7 @@ class IDFQEMUDUT(IDFDUT):
@classmethod
def get_mac(cls, app, port):
# TODO(IDF-1242): get this from QEMU/efuse binary
return "11:22:33:44:55:66"
return '11:22:33:44:55:66'
@classmethod
def confirm_dut(cls, port, **kwargs):
@@ -577,30 +576,30 @@ class IDFQEMUDUT(IDFDUT):
def start_app(self, erase_nvs=ERASE_NVS):
# TODO: implement erase_nvs
# since the flash image is generated every time in the constructor, maybe this isn't needed...
self.qemu.sendline(b"cont\n")
self.qemu.expect_exact(b"(qemu)")
self.qemu.sendline(b'cont\n')
self.qemu.expect_exact(b'(qemu)')
def reset(self):
self.qemu.sendline(b"system_reset\n")
self.qemu.expect_exact(b"(qemu)")
self.qemu.sendline(b'system_reset\n')
self.qemu.expect_exact(b'(qemu)')
def erase_partition(self, partition):
raise NotImplementedError("method erase_partition not implemented")
raise NotImplementedError('method erase_partition not implemented')
def erase_flash(self):
raise NotImplementedError("method erase_flash not implemented")
raise NotImplementedError('method erase_flash not implemented')
def dump_flash(self, output_file, **kwargs):
raise NotImplementedError("method dump_flash not implemented")
raise NotImplementedError('method dump_flash not implemented')
@classmethod
def list_available_ports(cls):
return ["socket://localhost:{}".format(cls.QEMU_SERIAL_PORT)]
return ['socket://localhost:{}'.format(cls.QEMU_SERIAL_PORT)]
def close(self):
super(IDFQEMUDUT, self).close()
self.qemu.sendline(b"q\n")
self.qemu.expect_exact(b"(qemu)")
self.qemu.sendline(b'q\n')
self.qemu.expect_exact(b'(qemu)')
for _ in range(self.DEFAULT_EXPECT_TIMEOUT):
if not self.qemu.isalive():
break
@@ -610,5 +609,5 @@ class IDFQEMUDUT(IDFDUT):
class ESP32QEMUDUT(IDFQEMUDUT):
TARGET = "esp32"
TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
TARGET = 'esp32'
TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'