aboutsummaryrefslogtreecommitdiff
path: root/toolchain_utils_githooks/check-presubmit.py
diff options
context:
space:
mode:
Diffstat (limited to 'toolchain_utils_githooks/check-presubmit.py')
-rwxr-xr-xtoolchain_utils_githooks/check-presubmit.py180
1 files changed, 150 insertions, 30 deletions
diff --git a/toolchain_utils_githooks/check-presubmit.py b/toolchain_utils_githooks/check-presubmit.py
index fc6ec9fc..274a75f4 100755
--- a/toolchain_utils_githooks/check-presubmit.py
+++ b/toolchain_utils_githooks/check-presubmit.py
@@ -7,11 +7,7 @@
"""Runs presubmit checks against a bundle of files."""
-# To keep `cros lint` happy
-from __future__ import division, print_function
-
import argparse
-import collections
import datetime
import multiprocessing
import multiprocessing.pool
@@ -23,9 +19,12 @@ import subprocess
import sys
import threading
import traceback
+import typing as t
-def run_command_unchecked(command, cwd, env=None):
+def run_command_unchecked(command: t.List[str],
+ cwd: str,
+ env: t.Dict[str, str] = None) -> t.Tuple[int, str]:
"""Runs a command in the given dir, returning its exit code and stdio."""
p = subprocess.Popen(
command,
@@ -41,12 +40,12 @@ def run_command_unchecked(command, cwd, env=None):
return exit_code, stdout.decode('utf-8', 'replace')
-def has_executable_on_path(exe):
+def has_executable_on_path(exe: str) -> bool:
"""Returns whether we have `exe` somewhere on our $PATH"""
return shutil.which(exe) is not None
-def escape_command(command):
+def escape_command(command: t.Iterable[str]) -> str:
"""Returns a human-readable and copy-pastable shell command.
Only intended for use in output to users. shell=True is strongly discouraged.
@@ -54,18 +53,18 @@ def escape_command(command):
return ' '.join(shlex.quote(x) for x in command)
-def remove_deleted_files(files):
+def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]:
return [f for f in files if os.path.exists(f)]
-def is_file_executable(file_path):
+def is_file_executable(file_path: str) -> bool:
return os.access(file_path, os.X_OK)
# As noted in our docs, some of our Python code depends on modules that sit in
# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
# lint` are kept happy.
-def env_with_pythonpath(toolchain_utils_root):
+def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
env = dict(os.environ)
if 'PYTHONPATH' in env:
env['PYTHONPATH'] += ':' + toolchain_utils_root
@@ -85,12 +84,22 @@ def env_with_pythonpath(toolchain_utils_root):
# least ${number_of_concurrently_running_checkers}+1 threads are present
# in the pool. In order words, blocking on results from the provided
# threadpool is OK.
-CheckResult = collections.namedtuple('CheckResult',
- ('ok', 'output', 'autofix_commands'))
+CheckResult = t.NamedTuple(
+ 'CheckResult',
+ (
+ ('ok', bool),
+ ('output', str),
+ ('autofix_commands', t.List[t.List[str]]),
+ ),
+)
+
+def get_check_result_or_catch(
+ task: multiprocessing.pool.ApplyResult) -> CheckResult:
+ """Returns the result of task(); if that raises, returns a CheckResult.
-def get_check_result_or_catch(task):
- """Returns the result of task(); if that raises, returns a CheckResult."""
+ The task is expected to return a CheckResult on get().
+ """
try:
return task.get()
except Exception:
@@ -102,7 +111,8 @@ def get_check_result_or_catch(task):
)
-def check_yapf(toolchain_utils_root, python_files):
+def check_yapf(toolchain_utils_root: str,
+ python_files: t.Iterable[str]) -> CheckResult:
"""Subchecker of check_py_format. Checks python file formats with yapf"""
command = ['yapf', '-d'] + python_files
exit_code, stdout_and_stderr = run_command_unchecked(
@@ -145,7 +155,7 @@ def check_yapf(toolchain_utils_root, python_files):
)
-def check_python_file_headers(python_files):
+def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult:
"""Subchecker of check_py_format. Checks python #!s"""
add_hashbang = []
remove_hashbang = []
@@ -187,7 +197,9 @@ def check_python_file_headers(python_files):
)
-def check_py_format(toolchain_utils_root, thread_pool, files):
+def check_py_format(toolchain_utils_root: str,
+ thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str]) -> CheckResult:
"""Runs yapf on files to check for style bugs. Also checks for #!s."""
yapf = 'yapf'
if not has_executable_on_path(yapf):
@@ -216,7 +228,13 @@ def check_py_format(toolchain_utils_root, thread_pool, files):
return [(name, get_check_result_or_catch(task)) for name, task in tasks]
-def check_cros_lint(toolchain_utils_root, thread_pool, files):
+def find_chromeos_root_directory() -> t.Optional[str]:
+ return os.getenv('CHROMEOS_ROOT_DIRECTORY')
+
+
+def check_cros_lint(
+ toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]:
"""Runs `cros lint`"""
fixed_env = env_with_pythonpath(toolchain_utils_root)
@@ -224,7 +242,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
# We have to support users who don't have a chroot. So we either run `cros
# lint` (if it's been made available to us), or we try a mix of
# pylint+golint.
- def try_run_cros_lint(cros_binary):
+ def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
exit_code, output = run_command_unchecked(
[cros_binary, 'lint', '--py3', '--'] + files,
toolchain_utils_root,
@@ -245,7 +263,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
if cros_lint is not None:
return cros_lint
- cros_root = os.getenv('CHROMEOS_ROOT_DIRECTORY')
+ cros_root = find_chromeos_root_directory()
if cros_root:
cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros'))
if cros_lint is not None:
@@ -253,7 +271,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
tasks = []
- def check_result_from_command(command):
+ def check_result_from_command(command: t.List[str]) -> CheckResult:
exit_code, output = run_command_unchecked(
command, toolchain_utils_root, env=fixed_env)
return CheckResult(
@@ -265,7 +283,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
if python_files:
- def run_pylint():
+ def run_pylint() -> CheckResult:
# pylint is required. Fail hard if it DNE.
return check_result_from_command(['pylint'] + python_files)
@@ -274,7 +292,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
if go_files:
- def run_golint():
+ def run_golint() -> CheckResult:
if has_executable_on_path('golint'):
return check_result_from_command(['golint', '-set_exit_status'] +
go_files)
@@ -361,7 +379,9 @@ def check_go_format(toolchain_utils_root, _thread_pool, files):
)
-def check_tests(toolchain_utils_root, _thread_pool, files):
+def check_tests(toolchain_utils_root: str,
+ _thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.List[str]) -> CheckResult:
"""Runs tests."""
exit_code, stdout_and_stderr = run_command_unchecked(
[os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files,
@@ -373,15 +393,17 @@ def check_tests(toolchain_utils_root, _thread_pool, files):
)
-def detect_toolchain_utils_root():
+def detect_toolchain_utils_root() -> str:
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-def process_check_result(check_name, check_results, start_time):
+def process_check_result(
+ check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult],
+ start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]:
"""Prints human-readable output for the given check_results."""
indent = ' '
- def indent_block(text):
+ def indent_block(text: str) -> str:
return indent + text.replace('\n', '\n' + indent)
if isinstance(check_results, CheckResult):
@@ -425,7 +447,8 @@ def process_check_result(check_name, check_results, start_time):
return ok, autofix_commands
-def try_autofix(all_autofix_commands, toolchain_utils_root):
+def try_autofix(all_autofix_commands: t.List[t.List[str]],
+ toolchain_utils_root: str) -> None:
"""Tries to run all given autofix commands, if appropriate."""
if not all_autofix_commands:
return
@@ -459,13 +482,102 @@ def try_autofix(all_autofix_commands, toolchain_utils_root):
'some lints may now be fixed')
-def main(argv):
+def find_repo_root(base_dir: str) -> t.Optional[str]:
+ current = base_dir
+ while current != '/':
+ if os.path.isdir(os.path.join(current, '.repo')):
+ return current
+ current = os.path.dirname(current)
+ return None
+
+
+def is_in_chroot() -> bool:
+ return os.path.exists('/etc/cros_chroot_version')
+
+
+def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None:
+ if is_in_chroot():
+ return
+
+ enter_chroot = True
+ chdir_to = None
+ toolchain_utils = detect_toolchain_utils_root()
+ if find_repo_root(toolchain_utils) is None:
+ chromeos_root_dir = find_chromeos_root_directory()
+ if chromeos_root_dir is None:
+ print('Standalone toolchain-utils checkout detected; cannot enter '
+ 'chroot.')
+ enter_chroot = False
+ else:
+ chdir_to = chromeos_root_dir
+
+ if not has_executable_on_path('cros_sdk'):
+ print('No `cros_sdk` detected on $PATH; cannot enter chroot.')
+ enter_chroot = False
+
+ if not enter_chroot:
+ print('Giving up on entering the chroot; be warned that some presubmits '
+ 'may be broken.')
+ return
+
+ # We'll be changing ${PWD}, so make everything relative to toolchain-utils,
+ # which resides at a well-known place inside of the chroot.
+ chroot_toolchain_utils = '/mnt/host/source/src/third_party/toolchain-utils'
+
+ def rebase_path(path: str) -> str:
+ return os.path.join(chroot_toolchain_utils,
+ os.path.relpath(path, toolchain_utils))
+
+ args = [
+ 'cros_sdk',
+ '--enter',
+ '--',
+ rebase_path(__file__),
+ ]
+
+ if not autofix:
+ args.append('--no_autofix')
+ args.extend(rebase_path(x) for x in files)
+
+ if chdir_to is None:
+ print('Attempting to enter the chroot...')
+ else:
+ print(f'Attempting to enter the chroot for tree at {chdir_to}...')
+ os.chdir(chdir_to)
+ os.execvp(args[0], args)
+
+
+# FIXME(crbug.com/980719): we probably want a better way of handling this. For
+# now, as a workaround, ensure we have all dependencies installed as a part of
+# presubmits. pip and scipy are fast enough to install (they take <1min
+# combined on my machine), so hoooopefully users won't get too impatient.
+def ensure_scipy_installed() -> None:
+ if not has_executable_on_path('pip'):
+ print('Autoinstalling `pip`...')
+ subprocess.check_call(['sudo', 'emerge', 'dev-python/pip'])
+
+ exit_code = subprocess.call(
+ ['python3', '-c', 'import scipy'],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ if exit_code != 0:
+ print('Autoinstalling `scipy`...')
+ subprocess.check_call(['pip', 'install', '--user', 'scipy'])
+
+
+def main(argv: t.List[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--no_autofix',
dest='autofix',
action='store_false',
- help="Don't run any autofix commands")
+ help="Don't run any autofix commands.")
+ parser.add_argument(
+ '--no_enter_chroot',
+ dest='enter_chroot',
+ action='store_false',
+ help="Prevent auto-entering the chroot if we're not already in it.")
parser.add_argument('files', nargs='*')
opts = parser.parse_args(argv)
@@ -473,6 +585,14 @@ def main(argv):
if not files:
return 0
+ if opts.enter_chroot:
+ maybe_reexec_inside_chroot(opts.autofix, opts.files)
+
+ # If you ask for --no_enter_chroot, you're on your own for installing these
+ # things.
+ if is_in_chroot():
+ ensure_scipy_installed()
+
files = [os.path.abspath(f) for f in files]
# Note that we extract .__name__s from these, so please name them in a