aboutsummaryrefslogtreecommitdiff
path: root/toolchain_utils_githooks/check-presubmit.py
diff options
context:
space:
mode:
Diffstat (limited to 'toolchain_utils_githooks/check-presubmit.py')
-rwxr-xr-xtoolchain_utils_githooks/check-presubmit.py1189
1 files changed, 665 insertions, 524 deletions
diff --git a/toolchain_utils_githooks/check-presubmit.py b/toolchain_utils_githooks/check-presubmit.py
index 99500acd..485737d5 100755
--- a/toolchain_utils_githooks/check-presubmit.py
+++ b/toolchain_utils_githooks/check-presubmit.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
-# Copyright 2019 The Chromium OS Authors. All rights reserved.
+# Copyright 2019 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -12,6 +12,7 @@ import datetime
import multiprocessing
import multiprocessing.pool
import os
+from pathlib import Path
import re
import shlex
import shutil
@@ -22,55 +23,55 @@ import traceback
import typing as t
-def run_command_unchecked(command: t.List[str],
- cwd: str,
- env: t.Dict[str, str] = None) -> t.Tuple[int, str]:
- """Runs a command in the given dir, returning its exit code and stdio."""
- p = subprocess.Popen(
- command,
- cwd=cwd,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- env=env,
- )
-
- stdout, _ = p.communicate()
- exit_code = p.wait()
- return exit_code, stdout.decode('utf-8', 'replace')
+def run_command_unchecked(
+ command: t.List[str], cwd: str, env: t.Dict[str, str] = None
+) -> t.Tuple[int, str]:
+ """Runs a command in the given dir, returning its exit code and stdio."""
+ p = subprocess.run(
+ command,
+ check=False,
+ cwd=cwd,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=env,
+ encoding="utf-8",
+ errors="replace",
+ )
+ return p.returncode, p.stdout
def has_executable_on_path(exe: str) -> bool:
- """Returns whether we have `exe` somewhere on our $PATH"""
- return shutil.which(exe) is not None
+ """Returns whether we have `exe` somewhere on our $PATH"""
+ return shutil.which(exe) is not None
def escape_command(command: t.Iterable[str]) -> str:
- """Returns a human-readable and copy-pastable shell command.
+ """Returns a human-readable and copy-pastable shell command.
- Only intended for use in output to users. shell=True is strongly discouraged.
- """
- return ' '.join(shlex.quote(x) for x in command)
+ Only intended for use in output to users. shell=True is strongly discouraged.
+ """
+ return " ".join(shlex.quote(x) for x in command)
def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]:
- return [f for f in files if os.path.exists(f)]
+ return [f for f in files if os.path.exists(f)]
def is_file_executable(file_path: str) -> bool:
- return os.access(file_path, os.X_OK)
+ return os.access(file_path, os.X_OK)
# As noted in our docs, some of our Python code depends on modules that sit in
# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
# lint` are kept happy.
def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
- env = dict(os.environ)
- if 'PYTHONPATH' in env:
- env['PYTHONPATH'] += ':' + toolchain_utils_root
- else:
- env['PYTHONPATH'] = toolchain_utils_root
- return env
+ env = dict(os.environ)
+ if "PYTHONPATH" in env:
+ env["PYTHONPATH"] += ":" + toolchain_utils_root
+ else:
+ env["PYTHONPATH"] = toolchain_utils_root
+ return env
# Each checker represents an independent check that's done on our sources.
@@ -85,564 +86,704 @@ def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
# in the pool. In order words, blocking on results from the provided
# threadpool is OK.
CheckResult = t.NamedTuple(
- 'CheckResult',
+ "CheckResult",
(
- ('ok', bool),
- ('output', str),
- ('autofix_commands', t.List[t.List[str]]),
+ ("ok", bool),
+ ("output", str),
+ ("autofix_commands", t.List[t.List[str]]),
),
)
def get_check_result_or_catch(
- task: multiprocessing.pool.ApplyResult) -> CheckResult:
- """Returns the result of task(); if that raises, returns a CheckResult.
-
- The task is expected to return a CheckResult on get().
- """
- try:
- return task.get()
- except Exception:
- return CheckResult(
- ok=False,
- output='Check exited with an unexpected exception:\n%s' %
- traceback.format_exc(),
- autofix_commands=[],
+ task: multiprocessing.pool.ApplyResult,
+) -> CheckResult:
+ """Returns the result of task(); if that raises, returns a CheckResult.
+
+ The task is expected to return a CheckResult on get().
+ """
+ try:
+ return task.get()
+ except Exception:
+ return CheckResult(
+ ok=False,
+ output="Check exited with an unexpected exception:\n%s"
+ % traceback.format_exc(),
+ autofix_commands=[],
+ )
+
+
+def check_isort(
+ toolchain_utils_root: str, python_files: t.Iterable[str]
+) -> CheckResult:
+ """Subchecker of check_py_format. Checks python file formats with isort"""
+ chromite = Path("/mnt/host/source/chromite")
+ isort = chromite / "scripts" / "isort"
+ config_file = chromite / ".isort.cfg"
+
+ if not (isort.exists() and config_file.exists()):
+ return CheckResult(
+ ok=True,
+ output="isort not found; skipping",
+ autofix_commands=[],
+ )
+
+ config_file_flag = f"--settings-file={config_file}"
+ command = [isort, "-c", config_file_flag] + python_files
+ exit_code, stdout_and_stderr = run_command_unchecked(
+ command, cwd=toolchain_utils_root
)
-
-def check_yapf(toolchain_utils_root: str,
- python_files: t.Iterable[str]) -> CheckResult:
- """Subchecker of check_py_format. Checks python file formats with yapf"""
- command = ['yapf', '-d'] + python_files
- exit_code, stdout_and_stderr = run_command_unchecked(
- command, cwd=toolchain_utils_root)
-
- # yapf fails when files are poorly formatted.
- if exit_code == 0:
+ # isort fails when files have broken formatting.
+ if not exit_code:
+ return CheckResult(
+ ok=True,
+ output="",
+ autofix_commands=[],
+ )
+
+ bad_files = []
+ bad_file_re = re.compile(
+ r"^ERROR: (.*) Imports are incorrectly sorted and/or formatted\.$"
+ )
+ for line in stdout_and_stderr.splitlines():
+ m = bad_file_re.match(line)
+ if m:
+ (file_name,) = m.groups()
+ bad_files.append(file_name.strip())
+
+ if not bad_files:
+ return CheckResult(
+ ok=False,
+ output="`%s` failed; stdout/stderr:\n%s"
+ % (escape_command(command), stdout_and_stderr),
+ autofix_commands=[],
+ )
+
+ autofix = [str(isort), config_file_flag] + bad_files
return CheckResult(
- ok=True,
- output='',
- autofix_commands=[],
+ ok=False,
+ output="The following file(s) have formatting errors: %s" % bad_files,
+ autofix_commands=[autofix],
)
- bad_files = []
- bad_file_re = re.compile(r'^--- (.*)\s+\(original\)\s*$')
- for line in stdout_and_stderr.splitlines():
- m = bad_file_re.match(line)
- if not m:
- continue
- file_name, = m.groups()
- bad_files.append(file_name.strip())
-
- # ... and doesn't really differentiate "your files have broken formatting"
- # errors from general ones. So if we see nothing diffed, assume that a
- # general error happened.
- if not bad_files:
+def check_black(
+ toolchain_utils_root: str, black: Path, python_files: t.Iterable[str]
+) -> CheckResult:
+ """Subchecker of check_py_format. Checks python file formats with black"""
+ # Folks have been bitten by accidentally using multiple formatter versions in
+ # the past. This is an issue, since newer versions of black may format things
+ # differently. Make the version obvious.
+ command = [black, "--version"]
+ exit_code, stdout_and_stderr = run_command_unchecked(
+ command, cwd=toolchain_utils_root
+ )
+ if exit_code:
+ return CheckResult(
+ ok=False,
+ output=f"Failed getting black version; stdstreams: {stdout_and_stderr}",
+ autofix_commands=[],
+ )
+
+ black_version = stdout_and_stderr.strip()
+ black_invocation: t.List[str] = [str(black), "--line-length=80"]
+ command = black_invocation + ["--check"] + list(python_files)
+ exit_code, stdout_and_stderr = run_command_unchecked(
+ command, cwd=toolchain_utils_root
+ )
+ # black fails when files are poorly formatted.
+ if exit_code == 0:
+ return CheckResult(
+ ok=True,
+ output=f"Using {black_version!r}, no issues were found.",
+ autofix_commands=[],
+ )
+
+ # Output format looks something like:
+ # f'{complaints}\nOh no!{emojis}\n{summary}'
+ # Whittle it down to complaints.
+ complaints = stdout_and_stderr.split("\nOh no!", 1)
+ if len(complaints) != 2:
+ return CheckResult(
+ ok=False,
+ output=f"Unparseable `black` output:\n{stdout_and_stderr}",
+ autofix_commands=[],
+ )
+
+ bad_files = []
+ errors = []
+ refmt_prefix = "would reformat "
+ for line in complaints[0].strip().splitlines():
+ line = line.strip()
+ if line.startswith("error:"):
+ errors.append(line)
+ continue
+
+ if not line.startswith(refmt_prefix):
+ return CheckResult(
+ ok=False,
+ output=f"Unparseable `black` output:\n{stdout_and_stderr}",
+ autofix_commands=[],
+ )
+
+ bad_files.append(line[len(refmt_prefix) :].strip())
+
+ # If black had internal errors that it could handle, print them out and exit
+ # without an autofix.
+ if errors:
+ err_str = "\n".join(errors)
+ return CheckResult(
+ ok=False,
+ output=f"Using {black_version!r} had the following errors:\n{err_str}",
+ autofix_commands=[],
+ )
+
+ autofix = black_invocation + bad_files
return CheckResult(
ok=False,
- output='`%s` failed; stdout/stderr:\n%s' % (escape_command(command),
- stdout_and_stderr),
- autofix_commands=[],
+ output=f"Using {black_version!r}, these file(s) have formatting errors: "
+ f"{bad_files}",
+ autofix_commands=[autofix],
)
- autofix = ['yapf', '-i'] + bad_files
- return CheckResult(
- ok=False,
- output='The following file(s) have formatting errors: %s' % bad_files,
- autofix_commands=[autofix],
- )
-
def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult:
- """Subchecker of check_py_format. Checks python #!s"""
- add_hashbang = []
- remove_hashbang = []
-
- for python_file in python_files:
- needs_hashbang = is_file_executable(python_file)
- with open(python_file, encoding='utf-8') as f:
- has_hashbang = f.read(2) == '#!'
- if needs_hashbang == has_hashbang:
- continue
-
- if needs_hashbang:
- add_hashbang.append(python_file)
- else:
- remove_hashbang.append(python_file)
-
- autofix = []
- output = []
- if add_hashbang:
- output.append(
- 'The following files have no #!, but need one: %s' % add_hashbang)
- autofix.append(['sed', '-i', '1i#!/usr/bin/env python3'] + add_hashbang)
-
- if remove_hashbang:
- output.append(
- "The following files have a #!, but shouldn't: %s" % remove_hashbang)
- autofix.append(['sed', '-i', '1d'] + remove_hashbang)
-
- if not output:
- return CheckResult(
- ok=True,
- output='',
- autofix_commands=[],
- )
- return CheckResult(
- ok=False,
- output='\n'.join(output),
- autofix_commands=autofix,
- )
-
-
-def check_py_format(toolchain_utils_root: str,
- thread_pool: multiprocessing.pool.ThreadPool,
- files: t.Iterable[str]) -> CheckResult:
- """Runs yapf on files to check for style bugs. Also checks for #!s."""
- yapf = 'yapf'
- if not has_executable_on_path(yapf):
+ """Subchecker of check_py_format. Checks python #!s"""
+ add_hashbang = []
+ remove_hashbang = []
+
+ for python_file in python_files:
+ needs_hashbang = is_file_executable(python_file)
+ with open(python_file, encoding="utf-8") as f:
+ has_hashbang = f.read(2) == "#!"
+ if needs_hashbang == has_hashbang:
+ continue
+
+ if needs_hashbang:
+ add_hashbang.append(python_file)
+ else:
+ remove_hashbang.append(python_file)
+
+ autofix = []
+ output = []
+ if add_hashbang:
+ output.append(
+ "The following files have no #!, but need one: %s" % add_hashbang
+ )
+ autofix.append(["sed", "-i", "1i#!/usr/bin/env python3"] + add_hashbang)
+
+ if remove_hashbang:
+ output.append(
+ "The following files have a #!, but shouldn't: %s" % remove_hashbang
+ )
+ autofix.append(["sed", "-i", "1d"] + remove_hashbang)
+
+ if not output:
+ return CheckResult(
+ ok=True,
+ output="",
+ autofix_commands=[],
+ )
return CheckResult(
ok=False,
- output="yapf isn't available on your $PATH. Please either "
- 'enter a chroot, or place depot_tools on your $PATH.',
- autofix_commands=[],
+ output="\n".join(output),
+ autofix_commands=autofix,
)
- python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
- if not python_files:
- return CheckResult(
- ok=True,
- output='no python files to check',
- autofix_commands=[],
- )
- tasks = [
- ('check_yapf',
- thread_pool.apply_async(check_yapf,
- (toolchain_utils_root, python_files))),
- ('check_file_headers',
- thread_pool.apply_async(check_python_file_headers, (python_files,))),
- ]
- return [(name, get_check_result_or_catch(task)) for name, task in tasks]
+def check_py_format(
+ toolchain_utils_root: str,
+ thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str],
+) -> t.List[CheckResult]:
+ """Runs yapf on files to check for style bugs. Also checks for #!s."""
+ black = "black"
+ if not has_executable_on_path(black):
+ return CheckResult(
+ ok=False,
+ output="black isn't available on your $PATH. Please either "
+ "enter a chroot, or place depot_tools on your $PATH.",
+ autofix_commands=[],
+ )
+
+ python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")]
+ if not python_files:
+ return CheckResult(
+ ok=True,
+ output="no python files to check",
+ autofix_commands=[],
+ )
+
+ tasks = [
+ (
+ "check_black",
+ thread_pool.apply_async(
+ check_black, (toolchain_utils_root, black, python_files)
+ ),
+ ),
+ (
+ "check_isort",
+ thread_pool.apply_async(
+ check_isort, (toolchain_utils_root, python_files)
+ ),
+ ),
+ (
+ "check_file_headers",
+ thread_pool.apply_async(check_python_file_headers, (python_files,)),
+ ),
+ ]
+ return [(name, get_check_result_or_catch(task)) for name, task in tasks]
def find_chromeos_root_directory() -> t.Optional[str]:
- return os.getenv('CHROMEOS_ROOT_DIRECTORY')
+ return os.getenv("CHROMEOS_ROOT_DIRECTORY")
def check_cros_lint(
- toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool,
- files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]:
- """Runs `cros lint`"""
-
- fixed_env = env_with_pythonpath(toolchain_utils_root)
-
- # We have to support users who don't have a chroot. So we either run `cros
- # lint` (if it's been made available to us), or we try a mix of
- # pylint+golint.
- def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
- exit_code, output = run_command_unchecked(
- [cros_binary, 'lint', '--'] + files,
- toolchain_utils_root,
- env=fixed_env)
-
- # This is returned specifically if cros couldn't find the Chrome OS tree
- # root.
- if exit_code == 127:
- return None
-
- return CheckResult(
- ok=exit_code == 0,
- output=output,
- autofix_commands=[],
- )
-
- cros_lint = try_run_cros_lint('cros')
- if cros_lint is not None:
- return cros_lint
-
- cros_root = find_chromeos_root_directory()
- if cros_root:
- cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros'))
+ toolchain_utils_root: str,
+ thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str],
+) -> t.Union[t.List[CheckResult], CheckResult]:
+ """Runs `cros lint`"""
+
+ fixed_env = env_with_pythonpath(toolchain_utils_root)
+
+ # We have to support users who don't have a chroot. So we either run `cros
+ # lint` (if it's been made available to us), or we try a mix of
+ # pylint+golint.
+ def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
+ exit_code, output = run_command_unchecked(
+ [cros_binary, "lint", "--"] + files,
+ toolchain_utils_root,
+ env=fixed_env,
+ )
+
+ # This is returned specifically if cros couldn't find the ChromeOS tree
+ # root.
+ if exit_code == 127:
+ return None
+
+ return CheckResult(
+ ok=exit_code == 0,
+ output=output,
+ autofix_commands=[],
+ )
+
+ cros_lint = try_run_cros_lint("cros")
if cros_lint is not None:
- return cros_lint
-
- tasks = []
-
- def check_result_from_command(command: t.List[str]) -> CheckResult:
- exit_code, output = run_command_unchecked(
- command, toolchain_utils_root, env=fixed_env)
- return CheckResult(
- ok=exit_code == 0,
- output=output,
- autofix_commands=[],
+ return cros_lint
+
+ cros_root = find_chromeos_root_directory()
+ if cros_root:
+ cros_lint = try_run_cros_lint(
+ os.path.join(cros_root, "chromite/bin/cros")
+ )
+ if cros_lint is not None:
+ return cros_lint
+
+ tasks = []
+
+ def check_result_from_command(command: t.List[str]) -> CheckResult:
+ exit_code, output = run_command_unchecked(
+ command, toolchain_utils_root, env=fixed_env
+ )
+ return CheckResult(
+ ok=exit_code == 0,
+ output=output,
+ autofix_commands=[],
+ )
+
+ python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")]
+ if python_files:
+
+ def run_pylint() -> CheckResult:
+ # pylint is required. Fail hard if it DNE.
+ return check_result_from_command(["pylint"] + python_files)
+
+ tasks.append(("pylint", thread_pool.apply_async(run_pylint)))
+
+ go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")]
+ if go_files:
+
+ def run_golint() -> CheckResult:
+ if has_executable_on_path("golint"):
+ return check_result_from_command(
+ ["golint", "-set_exit_status"] + go_files
+ )
+
+ complaint = "\n".join(
+ (
+ "WARNING: go linting disabled. golint is not on your $PATH.",
+ "Please either enter a chroot, or install go locally. Continuing.",
+ )
+ )
+ return CheckResult(
+ ok=True,
+ output=complaint,
+ autofix_commands=[],
+ )
+
+ tasks.append(("golint", thread_pool.apply_async(run_golint)))
+
+ complaint = "\n".join(
+ (
+ "WARNING: No ChromeOS checkout detected, and no viable CrOS tree",
+ "found; falling back to linting only python and go. If you have a",
+ "ChromeOS checkout, please either develop from inside of the source",
+ "tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.",
+ )
)
- python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
- if python_files:
+ results = [(name, get_check_result_or_catch(task)) for name, task in tasks]
+ if not results:
+ return CheckResult(
+ ok=True,
+ output=complaint,
+ autofix_commands=[],
+ )
- def run_pylint() -> CheckResult:
- # pylint is required. Fail hard if it DNE.
- return check_result_from_command(['pylint'] + python_files)
-
- tasks.append(('pylint', thread_pool.apply_async(run_pylint)))
-
- go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
- if go_files:
-
- def run_golint() -> CheckResult:
- if has_executable_on_path('golint'):
- return check_result_from_command(['golint', '-set_exit_status'] +
- go_files)
-
- complaint = '\n'.join((
- 'WARNING: go linting disabled. golint is not on your $PATH.',
- 'Please either enter a chroot, or install go locally. Continuing.',
- ))
- return CheckResult(
- ok=True,
- output=complaint,
- autofix_commands=[],
- )
-
- tasks.append(('golint', thread_pool.apply_async(run_golint)))
-
- complaint = '\n'.join((
- 'WARNING: No Chrome OS checkout detected, and no viable CrOS tree',
- 'found; falling back to linting only python and go. If you have a',
- 'Chrome OS checkout, please either develop from inside of the source',
- 'tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.',
- ))
-
- results = [(name, get_check_result_or_catch(task)) for name, task in tasks]
- if not results:
- return CheckResult(
- ok=True,
- output=complaint,
- autofix_commands=[],
- )
-
- # We need to complain _somewhere_.
- name, angry_result = results[0]
- angry_complaint = (complaint + '\n\n' + angry_result.output).strip()
- results[0] = (name, angry_result._replace(output=angry_complaint))
- return results
+ # We need to complain _somewhere_.
+ name, angry_result = results[0]
+ angry_complaint = (complaint + "\n\n" + angry_result.output).strip()
+ results[0] = (name, angry_result._replace(output=angry_complaint))
+ return results
def check_go_format(toolchain_utils_root, _thread_pool, files):
- """Runs gofmt on files to check for style bugs."""
- gofmt = 'gofmt'
- if not has_executable_on_path(gofmt):
- return CheckResult(
- ok=False,
- output="gofmt isn't available on your $PATH. Please either "
- 'enter a chroot, or place your go bin/ directory on your $PATH.',
- autofix_commands=[],
- )
+ """Runs gofmt on files to check for style bugs."""
+ gofmt = "gofmt"
+ if not has_executable_on_path(gofmt):
+ return CheckResult(
+ ok=False,
+ output="gofmt isn't available on your $PATH. Please either "
+ "enter a chroot, or place your go bin/ directory on your $PATH.",
+ autofix_commands=[],
+ )
+
+ go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")]
+ if not go_files:
+ return CheckResult(
+ ok=True,
+ output="no go files to check",
+ autofix_commands=[],
+ )
+
+ command = [gofmt, "-l"] + go_files
+ exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root)
- go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
- if not go_files:
+ if exit_code:
+ return CheckResult(
+ ok=False,
+ output="%s failed; stdout/stderr:\n%s"
+ % (escape_command(command), output),
+ autofix_commands=[],
+ )
+
+ output = output.strip()
+ if not output:
+ return CheckResult(
+ ok=True,
+ output="",
+ autofix_commands=[],
+ )
+
+ broken_files = [x.strip() for x in output.splitlines()]
+ autofix = [gofmt, "-w"] + broken_files
return CheckResult(
- ok=True,
- output='no go files to check',
- autofix_commands=[],
+ ok=False,
+ output="The following Go files have incorrect "
+ "formatting: %s" % broken_files,
+ autofix_commands=[autofix],
)
- command = [gofmt, '-l'] + go_files
- exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root)
- if exit_code:
- return CheckResult(
- ok=False,
- output='%s failed; stdout/stderr:\n%s' % (escape_command(command),
- output),
- autofix_commands=[],
+def check_tests(
+ toolchain_utils_root: str,
+ _thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.List[str],
+) -> CheckResult:
+ """Runs tests."""
+ exit_code, stdout_and_stderr = run_command_unchecked(
+ [os.path.join(toolchain_utils_root, "run_tests_for.py"), "--"] + files,
+ toolchain_utils_root,
)
-
- output = output.strip()
- if not output:
return CheckResult(
- ok=True,
- output='',
+ ok=exit_code == 0,
+ output=stdout_and_stderr,
autofix_commands=[],
)
- broken_files = [x.strip() for x in output.splitlines()]
- autofix = [gofmt, '-w'] + broken_files
- return CheckResult(
- ok=False,
- output='The following Go files have incorrect '
- 'formatting: %s' % broken_files,
- autofix_commands=[autofix],
- )
-
-
-def check_tests(toolchain_utils_root: str,
- _thread_pool: multiprocessing.pool.ThreadPool,
- files: t.List[str]) -> CheckResult:
- """Runs tests."""
- exit_code, stdout_and_stderr = run_command_unchecked(
- [os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files,
- toolchain_utils_root)
- return CheckResult(
- ok=exit_code == 0,
- output=stdout_and_stderr,
- autofix_commands=[],
- )
-
def detect_toolchain_utils_root() -> str:
- return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def process_check_result(
- check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult],
- start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]:
- """Prints human-readable output for the given check_results."""
- indent = ' '
-
- def indent_block(text: str) -> str:
- return indent + text.replace('\n', '\n' + indent)
-
- if isinstance(check_results, CheckResult):
- ok, output, autofix_commands = check_results
- if not ok and autofix_commands:
- recommendation = ('Recommended command(s) to fix this: %s' %
- [escape_command(x) for x in autofix_commands])
- if output:
- output += '\n' + recommendation
- else:
- output = recommendation
- else:
- output_pieces = []
- autofix_commands = []
- for subname, (ok, output, autofix) in check_results:
- status = 'succeeded' if ok else 'failed'
- message = ['*** %s.%s %s' % (check_name, subname, status)]
- if output:
- message.append(indent_block(output))
- if not ok and autofix:
- message.append(
- indent_block('Recommended command(s) to fix this: %s' %
- [escape_command(x) for x in autofix]))
-
- output_pieces.append('\n'.join(message))
- autofix_commands += autofix
-
- ok = all(x.ok for _, x in check_results)
- output = '\n\n'.join(output_pieces)
-
- time_taken = datetime.datetime.now() - start_time
- if ok:
- print('*** %s succeeded after %s' % (check_name, time_taken))
- else:
- print('*** %s failed after %s' % (check_name, time_taken))
-
- if output:
- print(indent_block(output))
-
- print()
- return ok, autofix_commands
-
-
-def try_autofix(all_autofix_commands: t.List[t.List[str]],
- toolchain_utils_root: str) -> None:
- """Tries to run all given autofix commands, if appropriate."""
- if not all_autofix_commands:
- return
-
- exit_code, output = run_command_unchecked(['git', 'status', '--porcelain'],
- cwd=toolchain_utils_root)
- if exit_code != 0:
- print("Autofix aborted: couldn't get toolchain-utils git status.")
- return
-
- if output.strip():
- # A clean repo makes checking/undoing autofix commands trivial. A dirty
- # one... less so. :)
- print('Git repo seems dirty; skipping autofix.')
- return
-
- anything_succeeded = False
- for command in all_autofix_commands:
- exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root)
-
- if exit_code:
- print('*** Autofix command `%s` exited with code %d; stdout/stderr:' %
- (escape_command(command), exit_code))
- print(output)
+ check_name: str,
+ check_results: t.Union[t.List[CheckResult], CheckResult],
+ start_time: datetime.datetime,
+) -> t.Tuple[bool, t.List[t.List[str]]]:
+ """Prints human-readable output for the given check_results."""
+ indent = " "
+
+ def indent_block(text: str) -> str:
+ return indent + text.replace("\n", "\n" + indent)
+
+ if isinstance(check_results, CheckResult):
+ ok, output, autofix_commands = check_results
+ if not ok and autofix_commands:
+ recommendation = "Recommended command(s) to fix this: %s" % [
+ escape_command(x) for x in autofix_commands
+ ]
+ if output:
+ output += "\n" + recommendation
+ else:
+ output = recommendation
+ else:
+ output_pieces = []
+ autofix_commands = []
+ for subname, (ok, output, autofix) in check_results:
+ status = "succeeded" if ok else "failed"
+ message = ["*** %s.%s %s" % (check_name, subname, status)]
+ if output:
+ message.append(indent_block(output))
+ if not ok and autofix:
+ message.append(
+ indent_block(
+ "Recommended command(s) to fix this: %s"
+ % [escape_command(x) for x in autofix]
+ )
+ )
+
+ output_pieces.append("\n".join(message))
+ autofix_commands += autofix
+
+ ok = all(x.ok for _, x in check_results)
+ output = "\n\n".join(output_pieces)
+
+ time_taken = datetime.datetime.now() - start_time
+ if ok:
+ print("*** %s succeeded after %s" % (check_name, time_taken))
else:
- print('*** Autofix `%s` succeeded' % escape_command(command))
- anything_succeeded = True
+ print("*** %s failed after %s" % (check_name, time_taken))
+
+ if output:
+ print(indent_block(output))
+
+ print()
+ return ok, autofix_commands
- if anything_succeeded:
- print('NOTE: Autofixes have been applied. Please check your tree, since '
- 'some lints may now be fixed')
+
+def try_autofix(
+ all_autofix_commands: t.List[t.List[str]], toolchain_utils_root: str
+) -> None:
+ """Tries to run all given autofix commands, if appropriate."""
+ if not all_autofix_commands:
+ return
+
+ exit_code, output = run_command_unchecked(
+ ["git", "status", "--porcelain"], cwd=toolchain_utils_root
+ )
+ if exit_code != 0:
+ print("Autofix aborted: couldn't get toolchain-utils git status.")
+ return
+
+ if output.strip():
+ # A clean repo makes checking/undoing autofix commands trivial. A dirty
+ # one... less so. :)
+ print("Git repo seems dirty; skipping autofix.")
+ return
+
+ anything_succeeded = False
+ for command in all_autofix_commands:
+ exit_code, output = run_command_unchecked(
+ command, cwd=toolchain_utils_root
+ )
+
+ if exit_code:
+ print(
+ "*** Autofix command `%s` exited with code %d; stdout/stderr:"
+ % (escape_command(command), exit_code)
+ )
+ print(output)
+ else:
+ print("*** Autofix `%s` succeeded" % escape_command(command))
+ anything_succeeded = True
+
+ if anything_succeeded:
+ print(
+ "NOTE: Autofixes have been applied. Please check your tree, since "
+ "some lints may now be fixed"
+ )
def find_repo_root(base_dir: str) -> t.Optional[str]:
- current = base_dir
- while current != '/':
- if os.path.isdir(os.path.join(current, '.repo')):
- return current
- current = os.path.dirname(current)
- return None
+ current = base_dir
+ while current != "/":
+ if os.path.isdir(os.path.join(current, ".repo")):
+ return current
+ current = os.path.dirname(current)
+ return None
def is_in_chroot() -> bool:
- return os.path.exists('/etc/cros_chroot_version')
+ return os.path.exists("/etc/cros_chroot_version")
def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None:
- if is_in_chroot():
- return
-
- enter_chroot = True
- chdir_to = None
- toolchain_utils = detect_toolchain_utils_root()
- if find_repo_root(toolchain_utils) is None:
- chromeos_root_dir = find_chromeos_root_directory()
- if chromeos_root_dir is None:
- print('Standalone toolchain-utils checkout detected; cannot enter '
- 'chroot.')
- enter_chroot = False
+ if is_in_chroot():
+ return
+
+ enter_chroot = True
+ chdir_to = None
+ toolchain_utils = detect_toolchain_utils_root()
+ if find_repo_root(toolchain_utils) is None:
+ chromeos_root_dir = find_chromeos_root_directory()
+ if chromeos_root_dir is None:
+ print(
+ "Standalone toolchain-utils checkout detected; cannot enter "
+ "chroot."
+ )
+ enter_chroot = False
+ else:
+ chdir_to = chromeos_root_dir
+
+ if not has_executable_on_path("cros_sdk"):
+ print("No `cros_sdk` detected on $PATH; cannot enter chroot.")
+ enter_chroot = False
+
+ if not enter_chroot:
+ print(
+ "Giving up on entering the chroot; be warned that some presubmits "
+ "may be broken."
+ )
+ return
+
+ # We'll be changing ${PWD}, so make everything relative to toolchain-utils,
+ # which resides at a well-known place inside of the chroot.
+ chroot_toolchain_utils = "/mnt/host/source/src/third_party/toolchain-utils"
+
+ def rebase_path(path: str) -> str:
+ return os.path.join(
+ chroot_toolchain_utils, os.path.relpath(path, toolchain_utils)
+ )
+
+ args = [
+ "cros_sdk",
+ "--enter",
+ "--",
+ rebase_path(__file__),
+ ]
+
+ if not autofix:
+ args.append("--no_autofix")
+ args.extend(rebase_path(x) for x in files)
+
+ if chdir_to is None:
+ print("Attempting to enter the chroot...")
else:
- chdir_to = chromeos_root_dir
-
- if not has_executable_on_path('cros_sdk'):
- print('No `cros_sdk` detected on $PATH; cannot enter chroot.')
- enter_chroot = False
-
- if not enter_chroot:
- print('Giving up on entering the chroot; be warned that some presubmits '
- 'may be broken.')
- return
-
- # We'll be changing ${PWD}, so make everything relative to toolchain-utils,
- # which resides at a well-known place inside of the chroot.
- chroot_toolchain_utils = '/mnt/host/source/src/third_party/toolchain-utils'
-
- def rebase_path(path: str) -> str:
- return os.path.join(chroot_toolchain_utils,
- os.path.relpath(path, toolchain_utils))
-
- args = [
- 'cros_sdk',
- '--enter',
- '--',
- rebase_path(__file__),
- ]
-
- if not autofix:
- args.append('--no_autofix')
- args.extend(rebase_path(x) for x in files)
-
- if chdir_to is None:
- print('Attempting to enter the chroot...')
- else:
- print(f'Attempting to enter the chroot for tree at {chdir_to}...')
- os.chdir(chdir_to)
- os.execvp(args[0], args)
-
-
-# FIXME(crbug.com/980719): we probably want a better way of handling this. For
-# now, as a workaround, ensure we have all dependencies installed as a part of
-# presubmits. pip and scipy are fast enough to install (they take <1min
-# combined on my machine), so hoooopefully users won't get too impatient.
-def ensure_scipy_installed() -> None:
- if not has_executable_on_path('pip'):
- print('Autoinstalling `pip`...')
- subprocess.check_call(['sudo', 'emerge', 'dev-python/pip'])
-
- exit_code = subprocess.call(
- ['python3', '-c', 'import scipy'],
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- )
- if exit_code != 0:
- print('Autoinstalling `scipy`...')
- subprocess.check_call(['pip', 'install', '--user', 'scipy'])
+ print(f"Attempting to enter the chroot for tree at {chdir_to}...")
+ os.chdir(chdir_to)
+ os.execvp(args[0], args)
+
+
+def ensure_pip_deps_installed() -> None:
+ if not has_executable_on_path("pip"):
+ print("Autoinstalling `pip`...")
+ subprocess.check_call(["sudo", "emerge", "dev-python/pip"])
+
+ for package in ("scipy", "yapf"):
+ exit_code = subprocess.call(
+ ["python3", "-c", f"import {package}"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ if exit_code != 0:
+ print(f"Autoinstalling `{package}`...")
+ subprocess.check_call(["pip", "install", "--user", package])
def main(argv: t.List[str]) -> int:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument(
- '--no_autofix',
- dest='autofix',
- action='store_false',
- help="Don't run any autofix commands.")
- parser.add_argument(
- '--no_enter_chroot',
- dest='enter_chroot',
- action='store_false',
- help="Prevent auto-entering the chroot if we're not already in it.")
- parser.add_argument('files', nargs='*')
- opts = parser.parse_args(argv)
-
- files = opts.files
- if not files:
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "--no_autofix",
+ dest="autofix",
+ action="store_false",
+ help="Don't run any autofix commands.",
+ )
+ parser.add_argument(
+ "--no_enter_chroot",
+ dest="enter_chroot",
+ action="store_false",
+ help="Prevent auto-entering the chroot if we're not already in it.",
+ )
+ parser.add_argument("files", nargs="*")
+ opts = parser.parse_args(argv)
+
+ files = opts.files
+ if not files:
+ return 0
+
+ if opts.enter_chroot:
+ maybe_reexec_inside_chroot(opts.autofix, opts.files)
+
+ # If you ask for --no_enter_chroot, you're on your own for installing these
+ # things.
+ if is_in_chroot():
+ ensure_pip_deps_installed()
+
+ files = [os.path.abspath(f) for f in files]
+
+ # Note that we extract .__name__s from these, so please name them in a
+ # user-friendly way.
+ checks = [
+ check_cros_lint,
+ check_py_format,
+ check_go_format,
+ check_tests,
+ ]
+
+ toolchain_utils_root = detect_toolchain_utils_root()
+
+ # NOTE: As mentioned above, checks can block on threads they spawn in this
+ # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2
+ # so all checks can make progress at a decent rate.
+ num_threads = max(multiprocessing.cpu_count(), len(checks) * 2)
+ start_time = datetime.datetime.now()
+
+ # For our single print statement...
+ spawn_print_lock = threading.RLock()
+
+ def run_check(check_fn):
+ name = check_fn.__name__
+ with spawn_print_lock:
+ print("*** Spawning %s" % name)
+ return name, check_fn(toolchain_utils_root, pool, files)
+
+ # ThreadPool is a ContextManager in py3.
+ # pylint: disable=not-context-manager
+ with multiprocessing.pool.ThreadPool(num_threads) as pool:
+ all_checks_ok = True
+ all_autofix_commands = []
+ for check_name, result in pool.imap_unordered(run_check, checks):
+ ok, autofix_commands = process_check_result(
+ check_name, result, start_time
+ )
+ all_checks_ok = ok and all_checks_ok
+ all_autofix_commands += autofix_commands
+
+ # Run these after everything settles, so:
+ # - we don't collide with checkers that are running concurrently
+ # - we clearly print out everything that went wrong ahead of time, in case
+ # any of these fail
+ if opts.autofix:
+ try_autofix(all_autofix_commands, toolchain_utils_root)
+
+ if not all_checks_ok:
+ return 1
return 0
- if opts.enter_chroot:
- maybe_reexec_inside_chroot(opts.autofix, opts.files)
-
- # If you ask for --no_enter_chroot, you're on your own for installing these
- # things.
- if is_in_chroot():
- ensure_scipy_installed()
-
- files = [os.path.abspath(f) for f in files]
-
- # Note that we extract .__name__s from these, so please name them in a
- # user-friendly way.
- checks = [
- check_cros_lint,
- check_py_format,
- check_go_format,
- check_tests,
- ]
-
- toolchain_utils_root = detect_toolchain_utils_root()
-
- # NOTE: As mentioned above, checks can block on threads they spawn in this
- # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2
- # so all checks can make progress at a decent rate.
- num_threads = max(multiprocessing.cpu_count(), len(checks) * 2)
- start_time = datetime.datetime.now()
-
- # For our single print statement...
- spawn_print_lock = threading.RLock()
-
- def run_check(check_fn):
- name = check_fn.__name__
- with spawn_print_lock:
- print('*** Spawning %s' % name)
- return name, check_fn(toolchain_utils_root, pool, files)
-
- # ThreadPool is a ContextManager in py3.
- # pylint: disable=not-context-manager
- with multiprocessing.pool.ThreadPool(num_threads) as pool:
- all_checks_ok = True
- all_autofix_commands = []
- for check_name, result in pool.imap_unordered(run_check, checks):
- ok, autofix_commands = process_check_result(check_name, result,
- start_time)
- all_checks_ok = ok and all_checks_ok
- all_autofix_commands += autofix_commands
-
- # Run these after everything settles, so:
- # - we don't collide with checkers that are running concurrently
- # - we clearly print out everything that went wrong ahead of time, in case
- # any of these fail
- if opts.autofix:
- try_autofix(all_autofix_commands, toolchain_utils_root)
-
- if not all_checks_ok:
- return 1
- return 0
-
-
-if __name__ == '__main__':
- sys.exit(main(sys.argv[1:]))
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv[1:]))