aboutsummaryrefslogtreecommitdiff
path: root/toolchain_utils_githooks/check-presubmit.py
diff options
context:
space:
mode:
Diffstat (limited to 'toolchain_utils_githooks/check-presubmit.py')
-rwxr-xr-xtoolchain_utils_githooks/check-presubmit.py96
1 files changed, 64 insertions, 32 deletions
diff --git a/toolchain_utils_githooks/check-presubmit.py b/toolchain_utils_githooks/check-presubmit.py
index 26373342..274a75f4 100755
--- a/toolchain_utils_githooks/check-presubmit.py
+++ b/toolchain_utils_githooks/check-presubmit.py
@@ -7,11 +7,7 @@
"""Runs presubmit checks against a bundle of files."""
-# To keep `cros lint` happy
-from __future__ import division, print_function
-
import argparse
-import collections
import datetime
import multiprocessing
import multiprocessing.pool
@@ -26,7 +22,9 @@ import traceback
import typing as t
-def run_command_unchecked(command, cwd, env=None):
+def run_command_unchecked(command: t.List[str],
+ cwd: str,
+ env: t.Dict[str, str] = None) -> t.Tuple[int, str]:
"""Runs a command in the given dir, returning its exit code and stdio."""
p = subprocess.Popen(
command,
@@ -42,12 +40,12 @@ def run_command_unchecked(command, cwd, env=None):
return exit_code, stdout.decode('utf-8', 'replace')
-def has_executable_on_path(exe):
+def has_executable_on_path(exe: str) -> bool:
"""Returns whether we have `exe` somewhere on our $PATH"""
return shutil.which(exe) is not None
-def escape_command(command):
+def escape_command(command: t.Iterable[str]) -> str:
"""Returns a human-readable and copy-pastable shell command.
Only intended for use in output to users. shell=True is strongly discouraged.
@@ -55,18 +53,18 @@ def escape_command(command):
return ' '.join(shlex.quote(x) for x in command)
-def remove_deleted_files(files):
+def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]:
return [f for f in files if os.path.exists(f)]
-def is_file_executable(file_path):
+def is_file_executable(file_path: str) -> bool:
return os.access(file_path, os.X_OK)
# As noted in our docs, some of our Python code depends on modules that sit in
# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
# lint` are kept happy.
-def env_with_pythonpath(toolchain_utils_root):
+def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
env = dict(os.environ)
if 'PYTHONPATH' in env:
env['PYTHONPATH'] += ':' + toolchain_utils_root
@@ -86,12 +84,22 @@ def env_with_pythonpath(toolchain_utils_root):
# least ${number_of_concurrently_running_checkers}+1 threads are present
# in the pool. In order words, blocking on results from the provided
# threadpool is OK.
-CheckResult = collections.namedtuple('CheckResult',
- ('ok', 'output', 'autofix_commands'))
+CheckResult = t.NamedTuple(
+ 'CheckResult',
+ (
+ ('ok', bool),
+ ('output', str),
+ ('autofix_commands', t.List[t.List[str]]),
+ ),
+)
+
+def get_check_result_or_catch(
+ task: multiprocessing.pool.ApplyResult) -> CheckResult:
+ """Returns the result of task(); if that raises, returns a CheckResult.
-def get_check_result_or_catch(task):
- """Returns the result of task(); if that raises, returns a CheckResult."""
+ The task is expected to return a CheckResult on get().
+ """
try:
return task.get()
except Exception:
@@ -103,7 +111,8 @@ def get_check_result_or_catch(task):
)
-def check_yapf(toolchain_utils_root, python_files):
+def check_yapf(toolchain_utils_root: str,
+ python_files: t.Iterable[str]) -> CheckResult:
"""Subchecker of check_py_format. Checks python file formats with yapf"""
command = ['yapf', '-d'] + python_files
exit_code, stdout_and_stderr = run_command_unchecked(
@@ -146,7 +155,7 @@ def check_yapf(toolchain_utils_root, python_files):
)
-def check_python_file_headers(python_files):
+def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult:
"""Subchecker of check_py_format. Checks python #!s"""
add_hashbang = []
remove_hashbang = []
@@ -188,7 +197,9 @@ def check_python_file_headers(python_files):
)
-def check_py_format(toolchain_utils_root, thread_pool, files):
+def check_py_format(toolchain_utils_root: str,
+ thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str]) -> CheckResult:
"""Runs yapf on files to check for style bugs. Also checks for #!s."""
yapf = 'yapf'
if not has_executable_on_path(yapf):
@@ -217,7 +228,13 @@ def check_py_format(toolchain_utils_root, thread_pool, files):
return [(name, get_check_result_or_catch(task)) for name, task in tasks]
-def check_cros_lint(toolchain_utils_root, thread_pool, files):
+def find_chromeos_root_directory() -> t.Optional[str]:
+ return os.getenv('CHROMEOS_ROOT_DIRECTORY')
+
+
+def check_cros_lint(
+ toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]:
"""Runs `cros lint`"""
fixed_env = env_with_pythonpath(toolchain_utils_root)
@@ -225,7 +242,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
# We have to support users who don't have a chroot. So we either run `cros
# lint` (if it's been made available to us), or we try a mix of
# pylint+golint.
- def try_run_cros_lint(cros_binary):
+ def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
exit_code, output = run_command_unchecked(
[cros_binary, 'lint', '--py3', '--'] + files,
toolchain_utils_root,
@@ -246,7 +263,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
if cros_lint is not None:
return cros_lint
- cros_root = os.getenv('CHROMEOS_ROOT_DIRECTORY')
+ cros_root = find_chromeos_root_directory()
if cros_root:
cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros'))
if cros_lint is not None:
@@ -254,7 +271,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
tasks = []
- def check_result_from_command(command):
+ def check_result_from_command(command: t.List[str]) -> CheckResult:
exit_code, output = run_command_unchecked(
command, toolchain_utils_root, env=fixed_env)
return CheckResult(
@@ -266,7 +283,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
if python_files:
- def run_pylint():
+ def run_pylint() -> CheckResult:
# pylint is required. Fail hard if it DNE.
return check_result_from_command(['pylint'] + python_files)
@@ -275,7 +292,7 @@ def check_cros_lint(toolchain_utils_root, thread_pool, files):
go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
if go_files:
- def run_golint():
+ def run_golint() -> CheckResult:
if has_executable_on_path('golint'):
return check_result_from_command(['golint', '-set_exit_status'] +
go_files)
@@ -362,7 +379,9 @@ def check_go_format(toolchain_utils_root, _thread_pool, files):
)
-def check_tests(toolchain_utils_root, _thread_pool, files):
+def check_tests(toolchain_utils_root: str,
+ _thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.List[str]) -> CheckResult:
"""Runs tests."""
exit_code, stdout_and_stderr = run_command_unchecked(
[os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files,
@@ -374,15 +393,17 @@ def check_tests(toolchain_utils_root, _thread_pool, files):
)
-def detect_toolchain_utils_root():
+def detect_toolchain_utils_root() -> str:
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-def process_check_result(check_name, check_results, start_time):
+def process_check_result(
+ check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult],
+ start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]:
"""Prints human-readable output for the given check_results."""
indent = ' '
- def indent_block(text):
+ def indent_block(text: str) -> str:
return indent + text.replace('\n', '\n' + indent)
if isinstance(check_results, CheckResult):
@@ -426,7 +447,8 @@ def process_check_result(check_name, check_results, start_time):
return ok, autofix_commands
-def try_autofix(all_autofix_commands, toolchain_utils_root):
+def try_autofix(all_autofix_commands: t.List[t.List[str]],
+ toolchain_utils_root: str) -> None:
"""Tries to run all given autofix commands, if appropriate."""
if not all_autofix_commands:
return
@@ -478,10 +500,16 @@ def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None:
return
enter_chroot = True
+ chdir_to = None
toolchain_utils = detect_toolchain_utils_root()
if find_repo_root(toolchain_utils) is None:
- print('Standalone toolchain-utils checkout detected; cannot enter chroot.')
- enter_chroot = False
+ chromeos_root_dir = find_chromeos_root_directory()
+ if chromeos_root_dir is None:
+ print('Standalone toolchain-utils checkout detected; cannot enter '
+ 'chroot.')
+ enter_chroot = False
+ else:
+ chdir_to = chromeos_root_dir
if not has_executable_on_path('cros_sdk'):
print('No `cros_sdk` detected on $PATH; cannot enter chroot.')
@@ -511,7 +539,11 @@ def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None:
args.append('--no_autofix')
args.extend(rebase_path(x) for x in files)
- print('Attempting to enter the chroot...')
+ if chdir_to is None:
+ print('Attempting to enter the chroot...')
+ else:
+ print(f'Attempting to enter the chroot for tree at {chdir_to}...')
+ os.chdir(chdir_to)
os.execvp(args[0], args)
@@ -534,7 +566,7 @@ def ensure_scipy_installed() -> None:
subprocess.check_call(['pip', 'install', '--user', 'scipy'])
-def main(argv):
+def main(argv: t.List[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--no_autofix',