idf.py: Add automated hints on how to resolve errors

This commit is contained in:
simon.chupin
2021-12-08 18:29:14 +01:00
committed by BOT
parent cb74a64768
commit 43c69f0910
11 changed files with 283 additions and 79 deletions

View File

@@ -1,19 +1,24 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import asyncio
import os
import re
import subprocess
import sys
from asyncio.subprocess import Process
from io import open
from typing import Any, List
from types import FunctionType
from typing import Any, Dict, List, Optional, TextIO, Tuple
import click
import yaml
from idf_monitor_base.output_helpers import yellow_print
from .constants import GENERATORS
from .errors import FatalError
def executable_exists(args):
def executable_exists(args: List) -> bool:
try:
subprocess.check_output(args)
return True
@@ -22,7 +27,7 @@ def executable_exists(args):
return False
def realpath(path):
def realpath(path: str) -> str:
"""
Return the cannonical path with normalized case.
@@ -32,7 +37,7 @@ def realpath(path):
return os.path.normcase(os.path.realpath(path))
def _idf_version_from_cmake():
def _idf_version_from_cmake() -> Optional[str]:
version_path = os.path.join(os.environ['IDF_PATH'], 'tools/cmake/version.cmake')
regex = re.compile(r'^\s*set\s*\(\s*IDF_VERSION_([A-Z]{5})\s+(\d+)')
ver = {}
@@ -50,17 +55,17 @@ def _idf_version_from_cmake():
return None
def get_target(path, sdkconfig_filename='sdkconfig'):
def get_target(path: str, sdkconfig_filename: str='sdkconfig') -> Optional[str]:
path = os.path.join(path, sdkconfig_filename)
return get_sdkconfig_value(path, 'CONFIG_IDF_TARGET')
def idf_version():
def idf_version() -> Optional[str]:
"""Print version of ESP-IDF"""
# Try to get version from git:
try:
version = subprocess.check_output([
version: Optional[str] = subprocess.check_output([
'git',
'--git-dir=%s' % os.path.join(os.environ['IDF_PATH'], '.git'),
'--work-tree=%s' % os.environ['IDF_PATH'],
@@ -74,56 +79,180 @@ def idf_version():
return version
def run_tool(tool_name, args, cwd, env=dict(), custom_error_handler=None):
def quote_arg(arg):
" Quote 'arg' if necessary "
if ' ' in arg and not (arg.startswith('"') or arg.startswith("'")):
return "'" + arg + "'"
return arg
def print_hints(*filenames: str) -> None:
"""Getting output files and printing hints on how to resolve errors based on the output."""
with open(os.path.join(os.path.dirname(__file__), 'hints.yml'), 'r') as file:
hints = yaml.safe_load(file)
for file_name in filenames:
with open(file_name, 'r') as file:
output = ' '.join(line.strip() for line in file if line.strip())
for hint in hints:
try:
match = re.compile(hint['re']).findall(output)
except KeyError:
raise KeyError("Argument 're' missing in {}. Check hints.yml file.".format(hint))
except re.error as e:
raise re.error('{} from hints.yml have {} problem. Check hints.yml file.'.format(hint['re'], e))
if match:
extra_info = ', '.join(match) if hint.get('match_to_output', '') else ''
try:
yellow_print(' '.join(['HINT:', hint['hint'].format(extra_info)]))
except KeyError:
raise KeyError("Argument 'hint' missing in {}. Check hints.yml file.".format(hint))
args = [str(arg) for arg in args]
display_args = ' '.join(quote_arg(arg) for arg in args)
print('Running %s in directory %s' % (tool_name, quote_arg(cwd)))
print('Executing "%s"...' % str(display_args))
env_copy = dict(os.environ)
env_copy.update(env)
def fit_text_in_terminal(out: str) -> str:
"""Fit text in terminal, if the string is not fit replace center with `...`"""
space_for_dots = 3 # Space for "..."
terminal_width, _ = os.get_terminal_size()
if terminal_width <= space_for_dots:
# if the wide of the terminal is too small just print dots
return '.' * terminal_width
if len(out) >= terminal_width:
elide_size = (terminal_width - space_for_dots) // 2
# cut out the middle part of the output if it does not fit in the terminal
return '...'.join([out[:elide_size], out[len(out) - elide_size:]])
return out
if sys.version_info[0] < 3:
# The subprocess lib cannot accept environment variables as "unicode". Convert to str.
# This encoding step is required only in Python 2.
for (key, val) in env_copy.items():
if not isinstance(val, str):
env_copy[key] = val.encode(sys.getfilesystemencoding() or 'utf-8')
try:
class RunTool:
def __init__(self, tool_name: str, args: List, cwd: str, env: Dict=None, custom_error_handler: FunctionType=None, build_dir: str=None,
hints: bool=False, force_progression: bool=False) -> None:
self.tool_name = tool_name
self.args = args
self.cwd = cwd
self.env = env
self.custom_error_handler = custom_error_handler
# build_dir sets by tools that do not use build directory as cwd
self.build_dir = build_dir or cwd
self.hints = hints
self.force_progression = force_progression
def __call__(self) -> None:
def quote_arg(arg: str) -> str:
""" Quote the `arg` with whitespace in them because it can cause problems when we call it from a subprocess."""
if re.match(r"^(?![\'\"]).*\s.*", arg):
return ''.join(["'", arg, "'"])
return arg
self.args = [str(arg) for arg in self.args]
display_args = ' '.join(quote_arg(arg) for arg in self.args)
print('Running %s in directory %s' % (self.tool_name, quote_arg(self.cwd)))
print('Executing "%s"...' % str(display_args))
env_copy = dict(os.environ)
env_copy.update(self.env or {})
process, stderr_output_file, stdout_output_file = asyncio.run(self.run_command(self.args, env_copy))
if process.returncode == 0:
return
if self.custom_error_handler:
self.custom_error_handler(process.returncode, stderr_output_file, stdout_output_file)
return
if stderr_output_file and stdout_output_file:
print_hints(stderr_output_file, stdout_output_file)
raise FatalError('{} failed with exit code {}, output of the command is in the {} and {}'.format(self.tool_name, process.returncode,
stderr_output_file, stdout_output_file))
raise FatalError('{} failed with exit code {}'.format(self.tool_name, process.returncode))
async def run_command(self, cmd: List, env_copy: Dict) -> Tuple[Process, Optional[str], Optional[str]]:
""" Run the `cmd` command with capturing stderr and stdout from that function and return returncode
and of the command, the id of the process, paths to captured output """
if not self.hints:
p = await asyncio.create_subprocess_exec(*cmd, env=env_copy, cwd=self.cwd)
await p.wait() # added for avoiding None returncode
return p, None, None
log_dir_name = 'log'
try:
os.mkdir(os.path.join(self.build_dir, log_dir_name))
except FileExistsError:
pass
# Note: we explicitly pass in os.environ here, as we may have set IDF_PATH there during startup
subprocess.check_call(args, env=env_copy, cwd=cwd)
except subprocess.CalledProcessError as e:
if custom_error_handler:
custom_error_handler(e)
else:
raise FatalError('%s failed with exit code %d' % (tool_name, e.returncode))
# limit was added for avoiding error in idf.py confserver
p = await asyncio.create_subprocess_exec(*cmd, env=env_copy, limit=1024 * 128, cwd=self.cwd, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, )
stderr_output_file = os.path.join(self.build_dir, log_dir_name, f'idf_py_stderr_output_{p.pid}')
stdout_output_file = os.path.join(self.build_dir, log_dir_name, f'idf_py_stdout_output_{p.pid}')
if p.stderr and p.stdout: # it only to avoid None type in p.std
await asyncio.gather(
self.read_and_write_stream(p.stderr, stderr_output_file, sys.stderr),
self.read_and_write_stream(p.stdout, stdout_output_file))
await p.wait() # added for avoiding None returncode
return p, stderr_output_file, stdout_output_file
async def read_and_write_stream(self, input_stream: asyncio.StreamReader, output_filename: str,
output_stream: TextIO=sys.stdout) -> None:
"""read the output of the `input_stream` and then write it into `output_filename` and `output_stream`"""
def delete_ansi_escape(text: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def prepare_for_print(out: bytes) -> str:
# errors='ignore' is here because some chips produce some garbage bytes
result = out.decode(errors='ignore')
if not output_stream.isatty():
# delete escape sequence if we printing in environments where ANSI coloring is disabled
return delete_ansi_escape(result)
return result
def print_progression(output: str) -> None:
# Print a new line on top of the previous line
sys.stdout.write('\x1b[K')
print('\r', end='')
print(fit_text_in_terminal(output.strip('\n\r')), end='', file=output_stream)
try:
with open(output_filename, 'w') as output_file:
while True:
out = await input_stream.readline()
if not out:
break
output = prepare_for_print(out)
output_file.write(output)
# print output in progression way but only the progression related (that started with '[') and if verbose flag is not set
if self.force_progression and output[0] == '[' and '-v' not in self.args:
print_progression(output)
else:
print(output, end='', file=output_stream)
except (RuntimeError, EnvironmentError) as e:
yellow_print('WARNING: The exception {} was raised and we can\'t capture all your {} and '
'hints on how to resolve errors can be not accurate.'.format(e, output_stream.name.strip('<>')))
def run_target(target_name, args, env=dict(), custom_error_handler=None):
def run_tool(*args: Any, **kwargs: Any) -> None:
# Added in case some one use run_tool externally in a idf.py extensions
return RunTool(*args, **kwargs)()
def run_target(target_name: str, args: 'PropertyDict', env: Optional[Dict]=None,
custom_error_handler: FunctionType=None, force_progression: bool=False, hints: bool=False) -> None:
"""Run target in build directory."""
if env is None:
env = {}
generator_cmd = GENERATORS[args.generator]['command']
env.update(GENERATORS[args.generator]['envvar'])
if args.verbose:
generator_cmd += [GENERATORS[args.generator]['verbose_flag']]
run_tool(generator_cmd[0], generator_cmd + [target_name], args.build_dir, env, custom_error_handler)
RunTool(generator_cmd[0], generator_cmd + [target_name], args.build_dir, env, custom_error_handler, hints=hints,
force_progression=force_progression)()
def _strip_quotes(value, regexp=re.compile(r"^\"(.*)\"$|^'(.*)'$|^(.*)$")):
def _strip_quotes(value: str, regexp: re.Pattern=re.compile(r"^\"(.*)\"$|^'(.*)'$|^(.*)$")) -> Optional[str]:
"""
Strip quotes like CMake does during parsing cache entries
"""
return [x for x in regexp.match(value).groups() if x is not None][0].rstrip()
matching_values = regexp.match(value)
return [x for x in matching_values.groups() if x is not None][0].rstrip() if matching_values is not None else None
def _parse_cmakecache(path):
def _parse_cmakecache(path: str) -> Dict:
"""
Parse the CMakeCache file at 'path'.
@@ -142,7 +271,7 @@ def _parse_cmakecache(path):
return result
def _new_cmakecache_entries(cache_path, new_cache_entries):
def _new_cmakecache_entries(cache_path: str, new_cache_entries: List) -> bool:
if not os.path.exists(cache_path):
return True
@@ -158,7 +287,7 @@ def _new_cmakecache_entries(cache_path, new_cache_entries):
return False
def _detect_cmake_generator(prog_name):
def _detect_cmake_generator(prog_name: str) -> Any:
"""
Find the default cmake generator, if none was specified. Raises an exception if no valid generator is found.
"""
@@ -168,7 +297,7 @@ def _detect_cmake_generator(prog_name):
raise FatalError("To use %s, either the 'ninja' or 'GNU make' build tool must be available in the PATH" % prog_name)
def ensure_build_directory(args, prog_name, always_run_cmake=False):
def ensure_build_directory(args: 'PropertyDict', prog_name: str, always_run_cmake: bool=False) -> None:
"""Check the build directory exists and that cmake has been run there.
If this isn't the case, create the build directory (if necessary) and
@@ -220,7 +349,8 @@ def ensure_build_directory(args, prog_name, always_run_cmake=False):
cmake_args += ['-D' + d for d in args.define_cache_entry]
cmake_args += [project_dir]
run_tool('cmake', cmake_args, cwd=args.build_dir)
hints = not args.no_hints
RunTool('cmake', cmake_args, cwd=args.build_dir, hints=hints)()
except Exception:
# don't allow partially valid CMakeCache.txt files,
# to keep the "should I run cmake?" logic simple
@@ -251,8 +381,8 @@ def ensure_build_directory(args, prog_name, always_run_cmake=False):
pass # if cmake failed part way, CMAKE_HOME_DIRECTORY may not be set yet
def merge_action_lists(*action_lists):
merged_actions = {
def merge_action_lists(*action_lists: Dict) -> Dict:
merged_actions: Dict = {
'global_options': [],
'actions': {},
'global_action_callbacks': [],
@@ -264,7 +394,7 @@ def merge_action_lists(*action_lists):
return merged_actions
def get_sdkconfig_value(sdkconfig_file, key):
def get_sdkconfig_value(sdkconfig_file: str, key: str) -> Optional[str]:
"""
Return the value of given key from sdkconfig_file.
If sdkconfig_file does not exist or the option is not present, returns None.
@@ -284,14 +414,14 @@ def get_sdkconfig_value(sdkconfig_file, key):
return value
def is_target_supported(project_path, supported_targets):
def is_target_supported(project_path: str, supported_targets: List) -> bool:
"""
Returns True if the active target is supported, or False otherwise.
"""
return get_target(project_path) in supported_targets
def _guess_or_check_idf_target(args, prog_name, cache):
def _guess_or_check_idf_target(args: 'PropertyDict', prog_name: str, cache: Dict) -> None:
"""
If CMakeCache.txt doesn't exist, and IDF_TARGET is not set in the environment, guess the value from
sdkconfig or sdkconfig.defaults, and pass it to CMake in IDF_TARGET variable.