diff options
Diffstat (limited to 'toolchain_utils_githooks/check-presubmit.py')
-rwxr-xr-x | toolchain_utils_githooks/check-presubmit.py | 180 |
1 files changed, 150 insertions, 30 deletions
diff --git a/toolchain_utils_githooks/check-presubmit.py b/toolchain_utils_githooks/check-presubmit.py index fc6ec9fc..274a75f4 100755 --- a/toolchain_utils_githooks/check-presubmit.py +++ b/toolchain_utils_githooks/check-presubmit.py @@ -7,11 +7,7 @@ """Runs presubmit checks against a bundle of files.""" -# To keep `cros lint` happy -from __future__ import division, print_function - import argparse -import collections import datetime import multiprocessing import multiprocessing.pool @@ -23,9 +19,12 @@ import subprocess import sys import threading import traceback +import typing as t -def run_command_unchecked(command, cwd, env=None): +def run_command_unchecked(command: t.List[str], + cwd: str, + env: t.Dict[str, str] = None) -> t.Tuple[int, str]: """Runs a command in the given dir, returning its exit code and stdio.""" p = subprocess.Popen( command, @@ -41,12 +40,12 @@ def run_command_unchecked(command, cwd, env=None): return exit_code, stdout.decode('utf-8', 'replace') -def has_executable_on_path(exe): +def has_executable_on_path(exe: str) -> bool: """Returns whether we have `exe` somewhere on our $PATH""" return shutil.which(exe) is not None -def escape_command(command): +def escape_command(command: t.Iterable[str]) -> str: """Returns a human-readable and copy-pastable shell command. Only intended for use in output to users. shell=True is strongly discouraged. @@ -54,18 +53,18 @@ def escape_command(command): return ' '.join(shlex.quote(x) for x in command) -def remove_deleted_files(files): +def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]: return [f for f in files if os.path.exists(f)] -def is_file_executable(file_path): +def is_file_executable(file_path: str) -> bool: return os.access(file_path, os.X_OK) # As noted in our docs, some of our Python code depends on modules that sit in # toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros # lint` are kept happy. -def env_with_pythonpath(toolchain_utils_root): +def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]: env = dict(os.environ) if 'PYTHONPATH' in env: env['PYTHONPATH'] += ':' + toolchain_utils_root @@ -85,12 +84,22 @@ def env_with_pythonpath(toolchain_utils_root): # least ${number_of_concurrently_running_checkers}+1 threads are present # in the pool. In order words, blocking on results from the provided # threadpool is OK. -CheckResult = collections.namedtuple('CheckResult', - ('ok', 'output', 'autofix_commands')) +CheckResult = t.NamedTuple( + 'CheckResult', + ( + ('ok', bool), + ('output', str), + ('autofix_commands', t.List[t.List[str]]), + ), +) + +def get_check_result_or_catch( + task: multiprocessing.pool.ApplyResult) -> CheckResult: + """Returns the result of task(); if that raises, returns a CheckResult. -def get_check_result_or_catch(task): - """Returns the result of task(); if that raises, returns a CheckResult.""" + The task is expected to return a CheckResult on get(). + """ try: return task.get() except Exception: @@ -102,7 +111,8 @@ def get_check_result_or_catch(task): ) -def check_yapf(toolchain_utils_root, python_files): +def check_yapf(toolchain_utils_root: str, + python_files: t.Iterable[str]) -> CheckResult: """Subchecker of check_py_format. Checks python file formats with yapf""" command = ['yapf', '-d'] + python_files exit_code, stdout_and_stderr = run_command_unchecked( @@ -145,7 +155,7 @@ def check_yapf(toolchain_utils_root, python_files): ) -def check_python_file_headers(python_files): +def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult: """Subchecker of check_py_format. Checks python #!s""" add_hashbang = [] remove_hashbang = [] @@ -187,7 +197,9 @@ def check_python_file_headers(python_files): ) -def check_py_format(toolchain_utils_root, thread_pool, files): +def check_py_format(toolchain_utils_root: str, + thread_pool: multiprocessing.pool.ThreadPool, + files: t.Iterable[str]) -> CheckResult: """Runs yapf on files to check for style bugs. Also checks for #!s.""" yapf = 'yapf' if not has_executable_on_path(yapf): @@ -216,7 +228,13 @@ def check_py_format(toolchain_utils_root, thread_pool, files): return [(name, get_check_result_or_catch(task)) for name, task in tasks] -def check_cros_lint(toolchain_utils_root, thread_pool, files): +def find_chromeos_root_directory() -> t.Optional[str]: + return os.getenv('CHROMEOS_ROOT_DIRECTORY') + + +def check_cros_lint( + toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool, + files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]: """Runs `cros lint`""" fixed_env = env_with_pythonpath(toolchain_utils_root) @@ -224,7 +242,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files): # We have to support users who don't have a chroot. So we either run `cros # lint` (if it's been made available to us), or we try a mix of # pylint+golint. - def try_run_cros_lint(cros_binary): + def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]: exit_code, output = run_command_unchecked( [cros_binary, 'lint', '--py3', '--'] + files, toolchain_utils_root, @@ -245,7 +263,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files): if cros_lint is not None: return cros_lint - cros_root = os.getenv('CHROMEOS_ROOT_DIRECTORY') + cros_root = find_chromeos_root_directory() if cros_root: cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros')) if cros_lint is not None: @@ -253,7 +271,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files): tasks = [] - def check_result_from_command(command): + def check_result_from_command(command: t.List[str]) -> CheckResult: exit_code, output = run_command_unchecked( command, toolchain_utils_root, env=fixed_env) return CheckResult( @@ -265,7 +283,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files): python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')] if python_files: - def run_pylint(): + def run_pylint() -> CheckResult: # pylint is required. Fail hard if it DNE. return check_result_from_command(['pylint'] + python_files) @@ -274,7 +292,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files): go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')] if go_files: - def run_golint(): + def run_golint() -> CheckResult: if has_executable_on_path('golint'): return check_result_from_command(['golint', '-set_exit_status'] + go_files) @@ -361,7 +379,9 @@ def check_go_format(toolchain_utils_root, _thread_pool, files): ) -def check_tests(toolchain_utils_root, _thread_pool, files): +def check_tests(toolchain_utils_root: str, + _thread_pool: multiprocessing.pool.ThreadPool, + files: t.List[str]) -> CheckResult: """Runs tests.""" exit_code, stdout_and_stderr = run_command_unchecked( [os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files, @@ -373,15 +393,17 @@ def check_tests(toolchain_utils_root, _thread_pool, files): ) -def detect_toolchain_utils_root(): +def detect_toolchain_utils_root() -> str: return os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -def process_check_result(check_name, check_results, start_time): +def process_check_result( + check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult], + start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]: """Prints human-readable output for the given check_results.""" indent = ' ' - def indent_block(text): + def indent_block(text: str) -> str: return indent + text.replace('\n', '\n' + indent) if isinstance(check_results, CheckResult): @@ -425,7 +447,8 @@ def process_check_result(check_name, check_results, start_time): return ok, autofix_commands -def try_autofix(all_autofix_commands, toolchain_utils_root): +def try_autofix(all_autofix_commands: t.List[t.List[str]], + toolchain_utils_root: str) -> None: """Tries to run all given autofix commands, if appropriate.""" if not all_autofix_commands: return @@ -459,13 +482,102 @@ def try_autofix(all_autofix_commands, toolchain_utils_root): 'some lints may now be fixed') -def main(argv): +def find_repo_root(base_dir: str) -> t.Optional[str]: + current = base_dir + while current != '/': + if os.path.isdir(os.path.join(current, '.repo')): + return current + current = os.path.dirname(current) + return None + + +def is_in_chroot() -> bool: + return os.path.exists('/etc/cros_chroot_version') + + +def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None: + if is_in_chroot(): + return + + enter_chroot = True + chdir_to = None + toolchain_utils = detect_toolchain_utils_root() + if find_repo_root(toolchain_utils) is None: + chromeos_root_dir = find_chromeos_root_directory() + if chromeos_root_dir is None: + print('Standalone toolchain-utils checkout detected; cannot enter ' + 'chroot.') + enter_chroot = False + else: + chdir_to = chromeos_root_dir + + if not has_executable_on_path('cros_sdk'): + print('No `cros_sdk` detected on $PATH; cannot enter chroot.') + enter_chroot = False + + if not enter_chroot: + print('Giving up on entering the chroot; be warned that some presubmits ' + 'may be broken.') + return + + # We'll be changing ${PWD}, so make everything relative to toolchain-utils, + # which resides at a well-known place inside of the chroot. + chroot_toolchain_utils = '/mnt/host/source/src/third_party/toolchain-utils' + + def rebase_path(path: str) -> str: + return os.path.join(chroot_toolchain_utils, + os.path.relpath(path, toolchain_utils)) + + args = [ + 'cros_sdk', + '--enter', + '--', + rebase_path(__file__), + ] + + if not autofix: + args.append('--no_autofix') + args.extend(rebase_path(x) for x in files) + + if chdir_to is None: + print('Attempting to enter the chroot...') + else: + print(f'Attempting to enter the chroot for tree at {chdir_to}...') + os.chdir(chdir_to) + os.execvp(args[0], args) + + +# FIXME(crbug.com/980719): we probably want a better way of handling this. For +# now, as a workaround, ensure we have all dependencies installed as a part of +# presubmits. pip and scipy are fast enough to install (they take <1min +# combined on my machine), so hoooopefully users won't get too impatient. +def ensure_scipy_installed() -> None: + if not has_executable_on_path('pip'): + print('Autoinstalling `pip`...') + subprocess.check_call(['sudo', 'emerge', 'dev-python/pip']) + + exit_code = subprocess.call( + ['python3', '-c', 'import scipy'], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if exit_code != 0: + print('Autoinstalling `scipy`...') + subprocess.check_call(['pip', 'install', '--user', 'scipy']) + + +def main(argv: t.List[str]) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( '--no_autofix', dest='autofix', action='store_false', - help="Don't run any autofix commands") + help="Don't run any autofix commands.") + parser.add_argument( + '--no_enter_chroot', + dest='enter_chroot', + action='store_false', + help="Prevent auto-entering the chroot if we're not already in it.") parser.add_argument('files', nargs='*') opts = parser.parse_args(argv) @@ -473,6 +585,14 @@ def main(argv): if not files: return 0 + if opts.enter_chroot: + maybe_reexec_inside_chroot(opts.autofix, opts.files) + + # If you ask for --no_enter_chroot, you're on your own for installing these + # things. + if is_in_chroot(): + ensure_scipy_installed() + files = [os.path.abspath(f) for f in files] # Note that we extract .__name__s from these, so please name them in a |