diff options
Diffstat (limited to 'toolchain_utils_githooks')
-rwxr-xr-x | toolchain_utils_githooks/check-presubmit.py | 1189 | ||||
-rwxr-xr-x | toolchain_utils_githooks/pre-push | 2 | ||||
-rwxr-xr-x | toolchain_utils_githooks/pre-push.real | 2 |
3 files changed, 667 insertions, 526 deletions
diff --git a/toolchain_utils_githooks/check-presubmit.py b/toolchain_utils_githooks/check-presubmit.py index 99500acd..485737d5 100755 --- a/toolchain_utils_githooks/check-presubmit.py +++ b/toolchain_utils_githooks/check-presubmit.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# Copyright 2019 The Chromium OS Authors. All rights reserved. +# Copyright 2019 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -12,6 +12,7 @@ import datetime import multiprocessing import multiprocessing.pool import os +from pathlib import Path import re import shlex import shutil @@ -22,55 +23,55 @@ import traceback import typing as t -def run_command_unchecked(command: t.List[str], - cwd: str, - env: t.Dict[str, str] = None) -> t.Tuple[int, str]: - """Runs a command in the given dir, returning its exit code and stdio.""" - p = subprocess.Popen( - command, - cwd=cwd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - env=env, - ) - - stdout, _ = p.communicate() - exit_code = p.wait() - return exit_code, stdout.decode('utf-8', 'replace') +def run_command_unchecked( + command: t.List[str], cwd: str, env: t.Dict[str, str] = None +) -> t.Tuple[int, str]: + """Runs a command in the given dir, returning its exit code and stdio.""" + p = subprocess.run( + command, + check=False, + cwd=cwd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + encoding="utf-8", + errors="replace", + ) + return p.returncode, p.stdout def has_executable_on_path(exe: str) -> bool: - """Returns whether we have `exe` somewhere on our $PATH""" - return shutil.which(exe) is not None + """Returns whether we have `exe` somewhere on our $PATH""" + return shutil.which(exe) is not None def escape_command(command: t.Iterable[str]) -> str: - """Returns a human-readable and copy-pastable shell command. + """Returns a human-readable and copy-pastable shell command. - Only intended for use in output to users. shell=True is strongly discouraged. - """ - return ' '.join(shlex.quote(x) for x in command) + Only intended for use in output to users. shell=True is strongly discouraged. + """ + return " ".join(shlex.quote(x) for x in command) def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]: - return [f for f in files if os.path.exists(f)] + return [f for f in files if os.path.exists(f)] def is_file_executable(file_path: str) -> bool: - return os.access(file_path, os.X_OK) + return os.access(file_path, os.X_OK) # As noted in our docs, some of our Python code depends on modules that sit in # toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros # lint` are kept happy. def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]: - env = dict(os.environ) - if 'PYTHONPATH' in env: - env['PYTHONPATH'] += ':' + toolchain_utils_root - else: - env['PYTHONPATH'] = toolchain_utils_root - return env + env = dict(os.environ) + if "PYTHONPATH" in env: + env["PYTHONPATH"] += ":" + toolchain_utils_root + else: + env["PYTHONPATH"] = toolchain_utils_root + return env # Each checker represents an independent check that's done on our sources. @@ -85,564 +86,704 @@ def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]: # in the pool. In order words, blocking on results from the provided # threadpool is OK. CheckResult = t.NamedTuple( - 'CheckResult', + "CheckResult", ( - ('ok', bool), - ('output', str), - ('autofix_commands', t.List[t.List[str]]), + ("ok", bool), + ("output", str), + ("autofix_commands", t.List[t.List[str]]), ), ) def get_check_result_or_catch( - task: multiprocessing.pool.ApplyResult) -> CheckResult: - """Returns the result of task(); if that raises, returns a CheckResult. - - The task is expected to return a CheckResult on get(). - """ - try: - return task.get() - except Exception: - return CheckResult( - ok=False, - output='Check exited with an unexpected exception:\n%s' % - traceback.format_exc(), - autofix_commands=[], + task: multiprocessing.pool.ApplyResult, +) -> CheckResult: + """Returns the result of task(); if that raises, returns a CheckResult. + + The task is expected to return a CheckResult on get(). + """ + try: + return task.get() + except Exception: + return CheckResult( + ok=False, + output="Check exited with an unexpected exception:\n%s" + % traceback.format_exc(), + autofix_commands=[], + ) + + +def check_isort( + toolchain_utils_root: str, python_files: t.Iterable[str] +) -> CheckResult: + """Subchecker of check_py_format. Checks python file formats with isort""" + chromite = Path("/mnt/host/source/chromite") + isort = chromite / "scripts" / "isort" + config_file = chromite / ".isort.cfg" + + if not (isort.exists() and config_file.exists()): + return CheckResult( + ok=True, + output="isort not found; skipping", + autofix_commands=[], + ) + + config_file_flag = f"--settings-file={config_file}" + command = [isort, "-c", config_file_flag] + python_files + exit_code, stdout_and_stderr = run_command_unchecked( + command, cwd=toolchain_utils_root ) - -def check_yapf(toolchain_utils_root: str, - python_files: t.Iterable[str]) -> CheckResult: - """Subchecker of check_py_format. Checks python file formats with yapf""" - command = ['yapf', '-d'] + python_files - exit_code, stdout_and_stderr = run_command_unchecked( - command, cwd=toolchain_utils_root) - - # yapf fails when files are poorly formatted. - if exit_code == 0: + # isort fails when files have broken formatting. + if not exit_code: + return CheckResult( + ok=True, + output="", + autofix_commands=[], + ) + + bad_files = [] + bad_file_re = re.compile( + r"^ERROR: (.*) Imports are incorrectly sorted and/or formatted\.$" + ) + for line in stdout_and_stderr.splitlines(): + m = bad_file_re.match(line) + if m: + (file_name,) = m.groups() + bad_files.append(file_name.strip()) + + if not bad_files: + return CheckResult( + ok=False, + output="`%s` failed; stdout/stderr:\n%s" + % (escape_command(command), stdout_and_stderr), + autofix_commands=[], + ) + + autofix = [str(isort), config_file_flag] + bad_files return CheckResult( - ok=True, - output='', - autofix_commands=[], + ok=False, + output="The following file(s) have formatting errors: %s" % bad_files, + autofix_commands=[autofix], ) - bad_files = [] - bad_file_re = re.compile(r'^--- (.*)\s+\(original\)\s*$') - for line in stdout_and_stderr.splitlines(): - m = bad_file_re.match(line) - if not m: - continue - file_name, = m.groups() - bad_files.append(file_name.strip()) - - # ... and doesn't really differentiate "your files have broken formatting" - # errors from general ones. So if we see nothing diffed, assume that a - # general error happened. - if not bad_files: +def check_black( + toolchain_utils_root: str, black: Path, python_files: t.Iterable[str] +) -> CheckResult: + """Subchecker of check_py_format. Checks python file formats with black""" + # Folks have been bitten by accidentally using multiple formatter versions in + # the past. This is an issue, since newer versions of black may format things + # differently. Make the version obvious. + command = [black, "--version"] + exit_code, stdout_and_stderr = run_command_unchecked( + command, cwd=toolchain_utils_root + ) + if exit_code: + return CheckResult( + ok=False, + output=f"Failed getting black version; stdstreams: {stdout_and_stderr}", + autofix_commands=[], + ) + + black_version = stdout_and_stderr.strip() + black_invocation: t.List[str] = [str(black), "--line-length=80"] + command = black_invocation + ["--check"] + list(python_files) + exit_code, stdout_and_stderr = run_command_unchecked( + command, cwd=toolchain_utils_root + ) + # black fails when files are poorly formatted. + if exit_code == 0: + return CheckResult( + ok=True, + output=f"Using {black_version!r}, no issues were found.", + autofix_commands=[], + ) + + # Output format looks something like: + # f'{complaints}\nOh no!{emojis}\n{summary}' + # Whittle it down to complaints. + complaints = stdout_and_stderr.split("\nOh no!", 1) + if len(complaints) != 2: + return CheckResult( + ok=False, + output=f"Unparseable `black` output:\n{stdout_and_stderr}", + autofix_commands=[], + ) + + bad_files = [] + errors = [] + refmt_prefix = "would reformat " + for line in complaints[0].strip().splitlines(): + line = line.strip() + if line.startswith("error:"): + errors.append(line) + continue + + if not line.startswith(refmt_prefix): + return CheckResult( + ok=False, + output=f"Unparseable `black` output:\n{stdout_and_stderr}", + autofix_commands=[], + ) + + bad_files.append(line[len(refmt_prefix) :].strip()) + + # If black had internal errors that it could handle, print them out and exit + # without an autofix. + if errors: + err_str = "\n".join(errors) + return CheckResult( + ok=False, + output=f"Using {black_version!r} had the following errors:\n{err_str}", + autofix_commands=[], + ) + + autofix = black_invocation + bad_files return CheckResult( ok=False, - output='`%s` failed; stdout/stderr:\n%s' % (escape_command(command), - stdout_and_stderr), - autofix_commands=[], + output=f"Using {black_version!r}, these file(s) have formatting errors: " + f"{bad_files}", + autofix_commands=[autofix], ) - autofix = ['yapf', '-i'] + bad_files - return CheckResult( - ok=False, - output='The following file(s) have formatting errors: %s' % bad_files, - autofix_commands=[autofix], - ) - def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult: - """Subchecker of check_py_format. Checks python #!s""" - add_hashbang = [] - remove_hashbang = [] - - for python_file in python_files: - needs_hashbang = is_file_executable(python_file) - with open(python_file, encoding='utf-8') as f: - has_hashbang = f.read(2) == '#!' - if needs_hashbang == has_hashbang: - continue - - if needs_hashbang: - add_hashbang.append(python_file) - else: - remove_hashbang.append(python_file) - - autofix = [] - output = [] - if add_hashbang: - output.append( - 'The following files have no #!, but need one: %s' % add_hashbang) - autofix.append(['sed', '-i', '1i#!/usr/bin/env python3'] + add_hashbang) - - if remove_hashbang: - output.append( - "The following files have a #!, but shouldn't: %s" % remove_hashbang) - autofix.append(['sed', '-i', '1d'] + remove_hashbang) - - if not output: - return CheckResult( - ok=True, - output='', - autofix_commands=[], - ) - return CheckResult( - ok=False, - output='\n'.join(output), - autofix_commands=autofix, - ) - - -def check_py_format(toolchain_utils_root: str, - thread_pool: multiprocessing.pool.ThreadPool, - files: t.Iterable[str]) -> CheckResult: - """Runs yapf on files to check for style bugs. Also checks for #!s.""" - yapf = 'yapf' - if not has_executable_on_path(yapf): + """Subchecker of check_py_format. Checks python #!s""" + add_hashbang = [] + remove_hashbang = [] + + for python_file in python_files: + needs_hashbang = is_file_executable(python_file) + with open(python_file, encoding="utf-8") as f: + has_hashbang = f.read(2) == "#!" + if needs_hashbang == has_hashbang: + continue + + if needs_hashbang: + add_hashbang.append(python_file) + else: + remove_hashbang.append(python_file) + + autofix = [] + output = [] + if add_hashbang: + output.append( + "The following files have no #!, but need one: %s" % add_hashbang + ) + autofix.append(["sed", "-i", "1i#!/usr/bin/env python3"] + add_hashbang) + + if remove_hashbang: + output.append( + "The following files have a #!, but shouldn't: %s" % remove_hashbang + ) + autofix.append(["sed", "-i", "1d"] + remove_hashbang) + + if not output: + return CheckResult( + ok=True, + output="", + autofix_commands=[], + ) return CheckResult( ok=False, - output="yapf isn't available on your $PATH. Please either " - 'enter a chroot, or place depot_tools on your $PATH.', - autofix_commands=[], + output="\n".join(output), + autofix_commands=autofix, ) - python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')] - if not python_files: - return CheckResult( - ok=True, - output='no python files to check', - autofix_commands=[], - ) - tasks = [ - ('check_yapf', - thread_pool.apply_async(check_yapf, - (toolchain_utils_root, python_files))), - ('check_file_headers', - thread_pool.apply_async(check_python_file_headers, (python_files,))), - ] - return [(name, get_check_result_or_catch(task)) for name, task in tasks] +def check_py_format( + toolchain_utils_root: str, + thread_pool: multiprocessing.pool.ThreadPool, + files: t.Iterable[str], +) -> t.List[CheckResult]: + """Runs yapf on files to check for style bugs. Also checks for #!s.""" + black = "black" + if not has_executable_on_path(black): + return CheckResult( + ok=False, + output="black isn't available on your $PATH. Please either " + "enter a chroot, or place depot_tools on your $PATH.", + autofix_commands=[], + ) + + python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")] + if not python_files: + return CheckResult( + ok=True, + output="no python files to check", + autofix_commands=[], + ) + + tasks = [ + ( + "check_black", + thread_pool.apply_async( + check_black, (toolchain_utils_root, black, python_files) + ), + ), + ( + "check_isort", + thread_pool.apply_async( + check_isort, (toolchain_utils_root, python_files) + ), + ), + ( + "check_file_headers", + thread_pool.apply_async(check_python_file_headers, (python_files,)), + ), + ] + return [(name, get_check_result_or_catch(task)) for name, task in tasks] def find_chromeos_root_directory() -> t.Optional[str]: - return os.getenv('CHROMEOS_ROOT_DIRECTORY') + return os.getenv("CHROMEOS_ROOT_DIRECTORY") def check_cros_lint( - toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool, - files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]: - """Runs `cros lint`""" - - fixed_env = env_with_pythonpath(toolchain_utils_root) - - # We have to support users who don't have a chroot. So we either run `cros - # lint` (if it's been made available to us), or we try a mix of - # pylint+golint. - def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]: - exit_code, output = run_command_unchecked( - [cros_binary, 'lint', '--'] + files, - toolchain_utils_root, - env=fixed_env) - - # This is returned specifically if cros couldn't find the Chrome OS tree - # root. - if exit_code == 127: - return None - - return CheckResult( - ok=exit_code == 0, - output=output, - autofix_commands=[], - ) - - cros_lint = try_run_cros_lint('cros') - if cros_lint is not None: - return cros_lint - - cros_root = find_chromeos_root_directory() - if cros_root: - cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros')) + toolchain_utils_root: str, + thread_pool: multiprocessing.pool.ThreadPool, + files: t.Iterable[str], +) -> t.Union[t.List[CheckResult], CheckResult]: + """Runs `cros lint`""" + + fixed_env = env_with_pythonpath(toolchain_utils_root) + + # We have to support users who don't have a chroot. So we either run `cros + # lint` (if it's been made available to us), or we try a mix of + # pylint+golint. + def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]: + exit_code, output = run_command_unchecked( + [cros_binary, "lint", "--"] + files, + toolchain_utils_root, + env=fixed_env, + ) + + # This is returned specifically if cros couldn't find the ChromeOS tree + # root. + if exit_code == 127: + return None + + return CheckResult( + ok=exit_code == 0, + output=output, + autofix_commands=[], + ) + + cros_lint = try_run_cros_lint("cros") if cros_lint is not None: - return cros_lint - - tasks = [] - - def check_result_from_command(command: t.List[str]) -> CheckResult: - exit_code, output = run_command_unchecked( - command, toolchain_utils_root, env=fixed_env) - return CheckResult( - ok=exit_code == 0, - output=output, - autofix_commands=[], + return cros_lint + + cros_root = find_chromeos_root_directory() + if cros_root: + cros_lint = try_run_cros_lint( + os.path.join(cros_root, "chromite/bin/cros") + ) + if cros_lint is not None: + return cros_lint + + tasks = [] + + def check_result_from_command(command: t.List[str]) -> CheckResult: + exit_code, output = run_command_unchecked( + command, toolchain_utils_root, env=fixed_env + ) + return CheckResult( + ok=exit_code == 0, + output=output, + autofix_commands=[], + ) + + python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")] + if python_files: + + def run_pylint() -> CheckResult: + # pylint is required. Fail hard if it DNE. + return check_result_from_command(["pylint"] + python_files) + + tasks.append(("pylint", thread_pool.apply_async(run_pylint))) + + go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")] + if go_files: + + def run_golint() -> CheckResult: + if has_executable_on_path("golint"): + return check_result_from_command( + ["golint", "-set_exit_status"] + go_files + ) + + complaint = "\n".join( + ( + "WARNING: go linting disabled. golint is not on your $PATH.", + "Please either enter a chroot, or install go locally. Continuing.", + ) + ) + return CheckResult( + ok=True, + output=complaint, + autofix_commands=[], + ) + + tasks.append(("golint", thread_pool.apply_async(run_golint))) + + complaint = "\n".join( + ( + "WARNING: No ChromeOS checkout detected, and no viable CrOS tree", + "found; falling back to linting only python and go. If you have a", + "ChromeOS checkout, please either develop from inside of the source", + "tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.", + ) ) - python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')] - if python_files: + results = [(name, get_check_result_or_catch(task)) for name, task in tasks] + if not results: + return CheckResult( + ok=True, + output=complaint, + autofix_commands=[], + ) - def run_pylint() -> CheckResult: - # pylint is required. Fail hard if it DNE. - return check_result_from_command(['pylint'] + python_files) - - tasks.append(('pylint', thread_pool.apply_async(run_pylint))) - - go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')] - if go_files: - - def run_golint() -> CheckResult: - if has_executable_on_path('golint'): - return check_result_from_command(['golint', '-set_exit_status'] + - go_files) - - complaint = '\n'.join(( - 'WARNING: go linting disabled. golint is not on your $PATH.', - 'Please either enter a chroot, or install go locally. Continuing.', - )) - return CheckResult( - ok=True, - output=complaint, - autofix_commands=[], - ) - - tasks.append(('golint', thread_pool.apply_async(run_golint))) - - complaint = '\n'.join(( - 'WARNING: No Chrome OS checkout detected, and no viable CrOS tree', - 'found; falling back to linting only python and go. If you have a', - 'Chrome OS checkout, please either develop from inside of the source', - 'tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.', - )) - - results = [(name, get_check_result_or_catch(task)) for name, task in tasks] - if not results: - return CheckResult( - ok=True, - output=complaint, - autofix_commands=[], - ) - - # We need to complain _somewhere_. - name, angry_result = results[0] - angry_complaint = (complaint + '\n\n' + angry_result.output).strip() - results[0] = (name, angry_result._replace(output=angry_complaint)) - return results + # We need to complain _somewhere_. + name, angry_result = results[0] + angry_complaint = (complaint + "\n\n" + angry_result.output).strip() + results[0] = (name, angry_result._replace(output=angry_complaint)) + return results def check_go_format(toolchain_utils_root, _thread_pool, files): - """Runs gofmt on files to check for style bugs.""" - gofmt = 'gofmt' - if not has_executable_on_path(gofmt): - return CheckResult( - ok=False, - output="gofmt isn't available on your $PATH. Please either " - 'enter a chroot, or place your go bin/ directory on your $PATH.', - autofix_commands=[], - ) + """Runs gofmt on files to check for style bugs.""" + gofmt = "gofmt" + if not has_executable_on_path(gofmt): + return CheckResult( + ok=False, + output="gofmt isn't available on your $PATH. Please either " + "enter a chroot, or place your go bin/ directory on your $PATH.", + autofix_commands=[], + ) + + go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")] + if not go_files: + return CheckResult( + ok=True, + output="no go files to check", + autofix_commands=[], + ) + + command = [gofmt, "-l"] + go_files + exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root) - go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')] - if not go_files: + if exit_code: + return CheckResult( + ok=False, + output="%s failed; stdout/stderr:\n%s" + % (escape_command(command), output), + autofix_commands=[], + ) + + output = output.strip() + if not output: + return CheckResult( + ok=True, + output="", + autofix_commands=[], + ) + + broken_files = [x.strip() for x in output.splitlines()] + autofix = [gofmt, "-w"] + broken_files return CheckResult( - ok=True, - output='no go files to check', - autofix_commands=[], + ok=False, + output="The following Go files have incorrect " + "formatting: %s" % broken_files, + autofix_commands=[autofix], ) - command = [gofmt, '-l'] + go_files - exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root) - if exit_code: - return CheckResult( - ok=False, - output='%s failed; stdout/stderr:\n%s' % (escape_command(command), - output), - autofix_commands=[], +def check_tests( + toolchain_utils_root: str, + _thread_pool: multiprocessing.pool.ThreadPool, + files: t.List[str], +) -> CheckResult: + """Runs tests.""" + exit_code, stdout_and_stderr = run_command_unchecked( + [os.path.join(toolchain_utils_root, "run_tests_for.py"), "--"] + files, + toolchain_utils_root, ) - - output = output.strip() - if not output: return CheckResult( - ok=True, - output='', + ok=exit_code == 0, + output=stdout_and_stderr, autofix_commands=[], ) - broken_files = [x.strip() for x in output.splitlines()] - autofix = [gofmt, '-w'] + broken_files - return CheckResult( - ok=False, - output='The following Go files have incorrect ' - 'formatting: %s' % broken_files, - autofix_commands=[autofix], - ) - - -def check_tests(toolchain_utils_root: str, - _thread_pool: multiprocessing.pool.ThreadPool, - files: t.List[str]) -> CheckResult: - """Runs tests.""" - exit_code, stdout_and_stderr = run_command_unchecked( - [os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files, - toolchain_utils_root) - return CheckResult( - ok=exit_code == 0, - output=stdout_and_stderr, - autofix_commands=[], - ) - def detect_toolchain_utils_root() -> str: - return os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + return os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def process_check_result( - check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult], - start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]: - """Prints human-readable output for the given check_results.""" - indent = ' ' - - def indent_block(text: str) -> str: - return indent + text.replace('\n', '\n' + indent) - - if isinstance(check_results, CheckResult): - ok, output, autofix_commands = check_results - if not ok and autofix_commands: - recommendation = ('Recommended command(s) to fix this: %s' % - [escape_command(x) for x in autofix_commands]) - if output: - output += '\n' + recommendation - else: - output = recommendation - else: - output_pieces = [] - autofix_commands = [] - for subname, (ok, output, autofix) in check_results: - status = 'succeeded' if ok else 'failed' - message = ['*** %s.%s %s' % (check_name, subname, status)] - if output: - message.append(indent_block(output)) - if not ok and autofix: - message.append( - indent_block('Recommended command(s) to fix this: %s' % - [escape_command(x) for x in autofix])) - - output_pieces.append('\n'.join(message)) - autofix_commands += autofix - - ok = all(x.ok for _, x in check_results) - output = '\n\n'.join(output_pieces) - - time_taken = datetime.datetime.now() - start_time - if ok: - print('*** %s succeeded after %s' % (check_name, time_taken)) - else: - print('*** %s failed after %s' % (check_name, time_taken)) - - if output: - print(indent_block(output)) - - print() - return ok, autofix_commands - - -def try_autofix(all_autofix_commands: t.List[t.List[str]], - toolchain_utils_root: str) -> None: - """Tries to run all given autofix commands, if appropriate.""" - if not all_autofix_commands: - return - - exit_code, output = run_command_unchecked(['git', 'status', '--porcelain'], - cwd=toolchain_utils_root) - if exit_code != 0: - print("Autofix aborted: couldn't get toolchain-utils git status.") - return - - if output.strip(): - # A clean repo makes checking/undoing autofix commands trivial. A dirty - # one... less so. :) - print('Git repo seems dirty; skipping autofix.') - return - - anything_succeeded = False - for command in all_autofix_commands: - exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root) - - if exit_code: - print('*** Autofix command `%s` exited with code %d; stdout/stderr:' % - (escape_command(command), exit_code)) - print(output) + check_name: str, + check_results: t.Union[t.List[CheckResult], CheckResult], + start_time: datetime.datetime, +) -> t.Tuple[bool, t.List[t.List[str]]]: + """Prints human-readable output for the given check_results.""" + indent = " " + + def indent_block(text: str) -> str: + return indent + text.replace("\n", "\n" + indent) + + if isinstance(check_results, CheckResult): + ok, output, autofix_commands = check_results + if not ok and autofix_commands: + recommendation = "Recommended command(s) to fix this: %s" % [ + escape_command(x) for x in autofix_commands + ] + if output: + output += "\n" + recommendation + else: + output = recommendation + else: + output_pieces = [] + autofix_commands = [] + for subname, (ok, output, autofix) in check_results: + status = "succeeded" if ok else "failed" + message = ["*** %s.%s %s" % (check_name, subname, status)] + if output: + message.append(indent_block(output)) + if not ok and autofix: + message.append( + indent_block( + "Recommended command(s) to fix this: %s" + % [escape_command(x) for x in autofix] + ) + ) + + output_pieces.append("\n".join(message)) + autofix_commands += autofix + + ok = all(x.ok for _, x in check_results) + output = "\n\n".join(output_pieces) + + time_taken = datetime.datetime.now() - start_time + if ok: + print("*** %s succeeded after %s" % (check_name, time_taken)) else: - print('*** Autofix `%s` succeeded' % escape_command(command)) - anything_succeeded = True + print("*** %s failed after %s" % (check_name, time_taken)) + + if output: + print(indent_block(output)) + + print() + return ok, autofix_commands - if anything_succeeded: - print('NOTE: Autofixes have been applied. Please check your tree, since ' - 'some lints may now be fixed') + +def try_autofix( + all_autofix_commands: t.List[t.List[str]], toolchain_utils_root: str +) -> None: + """Tries to run all given autofix commands, if appropriate.""" + if not all_autofix_commands: + return + + exit_code, output = run_command_unchecked( + ["git", "status", "--porcelain"], cwd=toolchain_utils_root + ) + if exit_code != 0: + print("Autofix aborted: couldn't get toolchain-utils git status.") + return + + if output.strip(): + # A clean repo makes checking/undoing autofix commands trivial. A dirty + # one... less so. :) + print("Git repo seems dirty; skipping autofix.") + return + + anything_succeeded = False + for command in all_autofix_commands: + exit_code, output = run_command_unchecked( + command, cwd=toolchain_utils_root + ) + + if exit_code: + print( + "*** Autofix command `%s` exited with code %d; stdout/stderr:" + % (escape_command(command), exit_code) + ) + print(output) + else: + print("*** Autofix `%s` succeeded" % escape_command(command)) + anything_succeeded = True + + if anything_succeeded: + print( + "NOTE: Autofixes have been applied. Please check your tree, since " + "some lints may now be fixed" + ) def find_repo_root(base_dir: str) -> t.Optional[str]: - current = base_dir - while current != '/': - if os.path.isdir(os.path.join(current, '.repo')): - return current - current = os.path.dirname(current) - return None + current = base_dir + while current != "/": + if os.path.isdir(os.path.join(current, ".repo")): + return current + current = os.path.dirname(current) + return None def is_in_chroot() -> bool: - return os.path.exists('/etc/cros_chroot_version') + return os.path.exists("/etc/cros_chroot_version") def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None: - if is_in_chroot(): - return - - enter_chroot = True - chdir_to = None - toolchain_utils = detect_toolchain_utils_root() - if find_repo_root(toolchain_utils) is None: - chromeos_root_dir = find_chromeos_root_directory() - if chromeos_root_dir is None: - print('Standalone toolchain-utils checkout detected; cannot enter ' - 'chroot.') - enter_chroot = False + if is_in_chroot(): + return + + enter_chroot = True + chdir_to = None + toolchain_utils = detect_toolchain_utils_root() + if find_repo_root(toolchain_utils) is None: + chromeos_root_dir = find_chromeos_root_directory() + if chromeos_root_dir is None: + print( + "Standalone toolchain-utils checkout detected; cannot enter " + "chroot." + ) + enter_chroot = False + else: + chdir_to = chromeos_root_dir + + if not has_executable_on_path("cros_sdk"): + print("No `cros_sdk` detected on $PATH; cannot enter chroot.") + enter_chroot = False + + if not enter_chroot: + print( + "Giving up on entering the chroot; be warned that some presubmits " + "may be broken." + ) + return + + # We'll be changing ${PWD}, so make everything relative to toolchain-utils, + # which resides at a well-known place inside of the chroot. + chroot_toolchain_utils = "/mnt/host/source/src/third_party/toolchain-utils" + + def rebase_path(path: str) -> str: + return os.path.join( + chroot_toolchain_utils, os.path.relpath(path, toolchain_utils) + ) + + args = [ + "cros_sdk", + "--enter", + "--", + rebase_path(__file__), + ] + + if not autofix: + args.append("--no_autofix") + args.extend(rebase_path(x) for x in files) + + if chdir_to is None: + print("Attempting to enter the chroot...") else: - chdir_to = chromeos_root_dir - - if not has_executable_on_path('cros_sdk'): - print('No `cros_sdk` detected on $PATH; cannot enter chroot.') - enter_chroot = False - - if not enter_chroot: - print('Giving up on entering the chroot; be warned that some presubmits ' - 'may be broken.') - return - - # We'll be changing ${PWD}, so make everything relative to toolchain-utils, - # which resides at a well-known place inside of the chroot. - chroot_toolchain_utils = '/mnt/host/source/src/third_party/toolchain-utils' - - def rebase_path(path: str) -> str: - return os.path.join(chroot_toolchain_utils, - os.path.relpath(path, toolchain_utils)) - - args = [ - 'cros_sdk', - '--enter', - '--', - rebase_path(__file__), - ] - - if not autofix: - args.append('--no_autofix') - args.extend(rebase_path(x) for x in files) - - if chdir_to is None: - print('Attempting to enter the chroot...') - else: - print(f'Attempting to enter the chroot for tree at {chdir_to}...') - os.chdir(chdir_to) - os.execvp(args[0], args) - - -# FIXME(crbug.com/980719): we probably want a better way of handling this. For -# now, as a workaround, ensure we have all dependencies installed as a part of -# presubmits. pip and scipy are fast enough to install (they take <1min -# combined on my machine), so hoooopefully users won't get too impatient. -def ensure_scipy_installed() -> None: - if not has_executable_on_path('pip'): - print('Autoinstalling `pip`...') - subprocess.check_call(['sudo', 'emerge', 'dev-python/pip']) - - exit_code = subprocess.call( - ['python3', '-c', 'import scipy'], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if exit_code != 0: - print('Autoinstalling `scipy`...') - subprocess.check_call(['pip', 'install', '--user', 'scipy']) + print(f"Attempting to enter the chroot for tree at {chdir_to}...") + os.chdir(chdir_to) + os.execvp(args[0], args) + + +def ensure_pip_deps_installed() -> None: + if not has_executable_on_path("pip"): + print("Autoinstalling `pip`...") + subprocess.check_call(["sudo", "emerge", "dev-python/pip"]) + + for package in ("scipy", "yapf"): + exit_code = subprocess.call( + ["python3", "-c", f"import {package}"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if exit_code != 0: + print(f"Autoinstalling `{package}`...") + subprocess.check_call(["pip", "install", "--user", package]) def main(argv: t.List[str]) -> int: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - '--no_autofix', - dest='autofix', - action='store_false', - help="Don't run any autofix commands.") - parser.add_argument( - '--no_enter_chroot', - dest='enter_chroot', - action='store_false', - help="Prevent auto-entering the chroot if we're not already in it.") - parser.add_argument('files', nargs='*') - opts = parser.parse_args(argv) - - files = opts.files - if not files: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--no_autofix", + dest="autofix", + action="store_false", + help="Don't run any autofix commands.", + ) + parser.add_argument( + "--no_enter_chroot", + dest="enter_chroot", + action="store_false", + help="Prevent auto-entering the chroot if we're not already in it.", + ) + parser.add_argument("files", nargs="*") + opts = parser.parse_args(argv) + + files = opts.files + if not files: + return 0 + + if opts.enter_chroot: + maybe_reexec_inside_chroot(opts.autofix, opts.files) + + # If you ask for --no_enter_chroot, you're on your own for installing these + # things. + if is_in_chroot(): + ensure_pip_deps_installed() + + files = [os.path.abspath(f) for f in files] + + # Note that we extract .__name__s from these, so please name them in a + # user-friendly way. + checks = [ + check_cros_lint, + check_py_format, + check_go_format, + check_tests, + ] + + toolchain_utils_root = detect_toolchain_utils_root() + + # NOTE: As mentioned above, checks can block on threads they spawn in this + # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2 + # so all checks can make progress at a decent rate. + num_threads = max(multiprocessing.cpu_count(), len(checks) * 2) + start_time = datetime.datetime.now() + + # For our single print statement... + spawn_print_lock = threading.RLock() + + def run_check(check_fn): + name = check_fn.__name__ + with spawn_print_lock: + print("*** Spawning %s" % name) + return name, check_fn(toolchain_utils_root, pool, files) + + # ThreadPool is a ContextManager in py3. + # pylint: disable=not-context-manager + with multiprocessing.pool.ThreadPool(num_threads) as pool: + all_checks_ok = True + all_autofix_commands = [] + for check_name, result in pool.imap_unordered(run_check, checks): + ok, autofix_commands = process_check_result( + check_name, result, start_time + ) + all_checks_ok = ok and all_checks_ok + all_autofix_commands += autofix_commands + + # Run these after everything settles, so: + # - we don't collide with checkers that are running concurrently + # - we clearly print out everything that went wrong ahead of time, in case + # any of these fail + if opts.autofix: + try_autofix(all_autofix_commands, toolchain_utils_root) + + if not all_checks_ok: + return 1 return 0 - if opts.enter_chroot: - maybe_reexec_inside_chroot(opts.autofix, opts.files) - - # If you ask for --no_enter_chroot, you're on your own for installing these - # things. - if is_in_chroot(): - ensure_scipy_installed() - - files = [os.path.abspath(f) for f in files] - - # Note that we extract .__name__s from these, so please name them in a - # user-friendly way. - checks = [ - check_cros_lint, - check_py_format, - check_go_format, - check_tests, - ] - - toolchain_utils_root = detect_toolchain_utils_root() - - # NOTE: As mentioned above, checks can block on threads they spawn in this - # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2 - # so all checks can make progress at a decent rate. - num_threads = max(multiprocessing.cpu_count(), len(checks) * 2) - start_time = datetime.datetime.now() - - # For our single print statement... - spawn_print_lock = threading.RLock() - - def run_check(check_fn): - name = check_fn.__name__ - with spawn_print_lock: - print('*** Spawning %s' % name) - return name, check_fn(toolchain_utils_root, pool, files) - - # ThreadPool is a ContextManager in py3. - # pylint: disable=not-context-manager - with multiprocessing.pool.ThreadPool(num_threads) as pool: - all_checks_ok = True - all_autofix_commands = [] - for check_name, result in pool.imap_unordered(run_check, checks): - ok, autofix_commands = process_check_result(check_name, result, - start_time) - all_checks_ok = ok and all_checks_ok - all_autofix_commands += autofix_commands - - # Run these after everything settles, so: - # - we don't collide with checkers that are running concurrently - # - we clearly print out everything that went wrong ahead of time, in case - # any of these fail - if opts.autofix: - try_autofix(all_autofix_commands, toolchain_utils_root) - - if not all_checks_ok: - return 1 - return 0 - - -if __name__ == '__main__': - sys.exit(main(sys.argv[1:])) + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/toolchain_utils_githooks/pre-push b/toolchain_utils_githooks/pre-push index eef8a09a..49548a17 100755 --- a/toolchain_utils_githooks/pre-push +++ b/toolchain_utils_githooks/pre-push @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2016 Google Inc. +# Copyright 2016 Google LLC # # Just execute our custom pre-push script. diff --git a/toolchain_utils_githooks/pre-push.real b/toolchain_utils_githooks/pre-push.real index 06aa6213..f913e802 100755 --- a/toolchain_utils_githooks/pre-push.real +++ b/toolchain_utils_githooks/pre-push.real @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2015 Google Inc. +# Copyright 2015 Google LLC # # This is a pre-push hook that does the following before uploading a # CL for review: |