#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2019 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Runs presubmit checks against a bundle of files.""" import argparse import datetime import multiprocessing import multiprocessing.pool import os import re import shlex import shutil import subprocess import sys import threading import traceback import typing as t def run_command_unchecked(command: t.List[str], cwd: str, env: t.Dict[str, str] = None) -> t.Tuple[int, str]: """Runs a command in the given dir, returning its exit code and stdio.""" p = subprocess.Popen( command, cwd=cwd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, ) stdout, _ = p.communicate() exit_code = p.wait() return exit_code, stdout.decode('utf-8', 'replace') def has_executable_on_path(exe: str) -> bool: """Returns whether we have `exe` somewhere on our $PATH""" return shutil.which(exe) is not None def escape_command(command: t.Iterable[str]) -> str: """Returns a human-readable and copy-pastable shell command. Only intended for use in output to users. shell=True is strongly discouraged. """ return ' '.join(shlex.quote(x) for x in command) def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]: return [f for f in files if os.path.exists(f)] def is_file_executable(file_path: str) -> bool: return os.access(file_path, os.X_OK) # As noted in our docs, some of our Python code depends on modules that sit in # toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros # lint` are kept happy. def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]: env = dict(os.environ) if 'PYTHONPATH' in env: env['PYTHONPATH'] += ':' + toolchain_utils_root else: env['PYTHONPATH'] = toolchain_utils_root return env # Each checker represents an independent check that's done on our sources. # # They should: # - never write to stdout/stderr or read from stdin directly # - return either a CheckResult, or a list of [(subcheck_name, CheckResult)] # - ideally use thread_pool to check things concurrently # - though it's important to note that these *also* live on the threadpool # we've provided. It's the caller's responsibility to guarantee that at # least ${number_of_concurrently_running_checkers}+1 threads are present # in the pool. In order words, blocking on results from the provided # threadpool is OK. CheckResult = t.NamedTuple( 'CheckResult', ( ('ok', bool), ('output', str), ('autofix_commands', t.List[t.List[str]]), ), ) def get_check_result_or_catch( task: multiprocessing.pool.ApplyResult) -> CheckResult: """Returns the result of task(); if that raises, returns a CheckResult. The task is expected to return a CheckResult on get(). """ try: return task.get() except Exception: return CheckResult( ok=False, output='Check exited with an unexpected exception:\n%s' % traceback.format_exc(), autofix_commands=[], ) def check_yapf(toolchain_utils_root: str, python_files: t.Iterable[str]) -> CheckResult: """Subchecker of check_py_format. Checks python file formats with yapf""" command = ['yapf', '-d'] + python_files exit_code, stdout_and_stderr = run_command_unchecked( command, cwd=toolchain_utils_root) # yapf fails when files are poorly formatted. if exit_code == 0: return CheckResult( ok=True, output='', autofix_commands=[], ) bad_files = [] bad_file_re = re.compile(r'^--- (.*)\s+\(original\)\s*$') for line in stdout_and_stderr.splitlines(): m = bad_file_re.match(line) if not m: continue file_name, = m.groups() bad_files.append(file_name.strip()) # ... and doesn't really differentiate "your files have broken formatting" # errors from general ones. So if we see nothing diffed, assume that a # general error happened. if not bad_files: return CheckResult( ok=False, output='`%s` failed; stdout/stderr:\n%s' % (escape_command(command), stdout_and_stderr), autofix_commands=[], ) autofix = ['yapf', '-i'] + bad_files return CheckResult( ok=False, output='The following file(s) have formatting errors: %s' % bad_files, autofix_commands=[autofix], ) def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult: """Subchecker of check_py_format. Checks python #!s""" add_hashbang = [] remove_hashbang = [] for python_file in python_files: needs_hashbang = is_file_executable(python_file) with open(python_file, encoding='utf-8') as f: has_hashbang = f.read(2) == '#!' if needs_hashbang == has_hashbang: continue if needs_hashbang: add_hashbang.append(python_file) else: remove_hashbang.append(python_file) autofix = [] output = [] if add_hashbang: output.append( 'The following files have no #!, but need one: %s' % add_hashbang) autofix.append(['sed', '-i', '1i#!/usr/bin/env python3'] + add_hashbang) if remove_hashbang: output.append( "The following files have a #!, but shouldn't: %s" % remove_hashbang) autofix.append(['sed', '-i', '1d'] + remove_hashbang) if not output: return CheckResult( ok=True, output='', autofix_commands=[], ) return CheckResult( ok=False, output='\n'.join(output), autofix_commands=autofix, ) def check_py_format(toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool, files: t.Iterable[str]) -> CheckResult: """Runs yapf on files to check for style bugs. Also checks for #!s.""" yapf = 'yapf' if not has_executable_on_path(yapf): return CheckResult( ok=False, output="yapf isn't available on your $PATH. Please either " 'enter a chroot, or place depot_tools on your $PATH.', autofix_commands=[], ) python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')] if not python_files: return CheckResult( ok=True, output='no python files to check', autofix_commands=[], ) tasks = [ ('check_yapf', thread_pool.apply_async(check_yapf, (toolchain_utils_root, python_files))), ('check_file_headers', thread_pool.apply_async(check_python_file_headers, (python_files,))), ] return [(name, get_check_result_or_catch(task)) for name, task in tasks] def find_chromeos_root_directory() -> t.Optional[str]: return os.getenv('CHROMEOS_ROOT_DIRECTORY') def check_cros_lint( toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool, files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]: """Runs `cros lint`""" fixed_env = env_with_pythonpath(toolchain_utils_root) # We have to support users who don't have a chroot. So we either run `cros # lint` (if it's been made available to us), or we try a mix of # pylint+golint. def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]: exit_code, output = run_command_unchecked( [cros_binary, 'lint', '--'] + files, toolchain_utils_root, env=fixed_env) # This is returned specifically if cros couldn't find the Chrome OS tree # root. if exit_code == 127: return None return CheckResult( ok=exit_code == 0, output=output, autofix_commands=[], ) cros_lint = try_run_cros_lint('cros') if cros_lint is not None: return cros_lint cros_root = find_chromeos_root_directory() if cros_root: cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros')) if cros_lint is not None: return cros_lint tasks = [] def check_result_from_command(command: t.List[str]) -> CheckResult: exit_code, output = run_command_unchecked( command, toolchain_utils_root, env=fixed_env) return CheckResult( ok=exit_code == 0, output=output, autofix_commands=[], ) python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')] if python_files: def run_pylint() -> CheckResult: # pylint is required. Fail hard if it DNE. return check_result_from_command(['pylint'] + python_files) tasks.append(('pylint', thread_pool.apply_async(run_pylint))) go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')] if go_files: def run_golint() -> CheckResult: if has_executable_on_path('golint'): return check_result_from_command(['golint', '-set_exit_status'] + go_files) complaint = '\n'.join(( 'WARNING: go linting disabled. golint is not on your $PATH.', 'Please either enter a chroot, or install go locally. Continuing.', )) return CheckResult( ok=True, output=complaint, autofix_commands=[], ) tasks.append(('golint', thread_pool.apply_async(run_golint))) complaint = '\n'.join(( 'WARNING: No Chrome OS checkout detected, and no viable CrOS tree', 'found; falling back to linting only python and go. If you have a', 'Chrome OS checkout, please either develop from inside of the source', 'tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.', )) results = [(name, get_check_result_or_catch(task)) for name, task in tasks] if not results: return CheckResult( ok=True, output=complaint, autofix_commands=[], ) # We need to complain _somewhere_. name, angry_result = results[0] angry_complaint = (complaint + '\n\n' + angry_result.output).strip() results[0] = (name, angry_result._replace(output=angry_complaint)) return results def check_go_format(toolchain_utils_root, _thread_pool, files): """Runs gofmt on files to check for style bugs.""" gofmt = 'gofmt' if not has_executable_on_path(gofmt): return CheckResult( ok=False, output="gofmt isn't available on your $PATH. Please either " 'enter a chroot, or place your go bin/ directory on your $PATH.', autofix_commands=[], ) go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')] if not go_files: return CheckResult( ok=True, output='no go files to check', autofix_commands=[], ) command = [gofmt, '-l'] + go_files exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root) if exit_code: return CheckResult( ok=False, output='%s failed; stdout/stderr:\n%s' % (escape_command(command), output), autofix_commands=[], ) output = output.strip() if not output: return CheckResult( ok=True, output='', autofix_commands=[], ) broken_files = [x.strip() for x in output.splitlines()] autofix = [gofmt, '-w'] + broken_files return CheckResult( ok=False, output='The following Go files have incorrect ' 'formatting: %s' % broken_files, autofix_commands=[autofix], ) def check_tests(toolchain_utils_root: str, _thread_pool: multiprocessing.pool.ThreadPool, files: t.List[str]) -> CheckResult: """Runs tests.""" exit_code, stdout_and_stderr = run_command_unchecked( [os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files, toolchain_utils_root) return CheckResult( ok=exit_code == 0, output=stdout_and_stderr, autofix_commands=[], ) def detect_toolchain_utils_root() -> str: return os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def process_check_result( check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult], start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]: """Prints human-readable output for the given check_results.""" indent = ' ' def indent_block(text: str) -> str: return indent + text.replace('\n', '\n' + indent) if isinstance(check_results, CheckResult): ok, output, autofix_commands = check_results if not ok and autofix_commands: recommendation = ('Recommended command(s) to fix this: %s' % [escape_command(x) for x in autofix_commands]) if output: output += '\n' + recommendation else: output = recommendation else: output_pieces = [] autofix_commands = [] for subname, (ok, output, autofix) in check_results: status = 'succeeded' if ok else 'failed' message = ['*** %s.%s %s' % (check_name, subname, status)] if output: message.append(indent_block(output)) if not ok and autofix: message.append( indent_block('Recommended command(s) to fix this: %s' % [escape_command(x) for x in autofix])) output_pieces.append('\n'.join(message)) autofix_commands += autofix ok = all(x.ok for _, x in check_results) output = '\n\n'.join(output_pieces) time_taken = datetime.datetime.now() - start_time if ok: print('*** %s succeeded after %s' % (check_name, time_taken)) else: print('*** %s failed after %s' % (check_name, time_taken)) if output: print(indent_block(output)) print() return ok, autofix_commands def try_autofix(all_autofix_commands: t.List[t.List[str]], toolchain_utils_root: str) -> None: """Tries to run all given autofix commands, if appropriate.""" if not all_autofix_commands: return exit_code, output = run_command_unchecked(['git', 'status', '--porcelain'], cwd=toolchain_utils_root) if exit_code != 0: print("Autofix aborted: couldn't get toolchain-utils git status.") return if output.strip(): # A clean repo makes checking/undoing autofix commands trivial. A dirty # one... less so. :) print('Git repo seems dirty; skipping autofix.') return anything_succeeded = False for command in all_autofix_commands: exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root) if exit_code: print('*** Autofix command `%s` exited with code %d; stdout/stderr:' % (escape_command(command), exit_code)) print(output) else: print('*** Autofix `%s` succeeded' % escape_command(command)) anything_succeeded = True if anything_succeeded: print('NOTE: Autofixes have been applied. Please check your tree, since ' 'some lints may now be fixed') def find_repo_root(base_dir: str) -> t.Optional[str]: current = base_dir while current != '/': if os.path.isdir(os.path.join(current, '.repo')): return current current = os.path.dirname(current) return None def is_in_chroot() -> bool: return os.path.exists('/etc/cros_chroot_version') def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None: if is_in_chroot(): return enter_chroot = True chdir_to = None toolchain_utils = detect_toolchain_utils_root() if find_repo_root(toolchain_utils) is None: chromeos_root_dir = find_chromeos_root_directory() if chromeos_root_dir is None: print('Standalone toolchain-utils checkout detected; cannot enter ' 'chroot.') enter_chroot = False else: chdir_to = chromeos_root_dir if not has_executable_on_path('cros_sdk'): print('No `cros_sdk` detected on $PATH; cannot enter chroot.') enter_chroot = False if not enter_chroot: print('Giving up on entering the chroot; be warned that some presubmits ' 'may be broken.') return # We'll be changing ${PWD}, so make everything relative to toolchain-utils, # which resides at a well-known place inside of the chroot. chroot_toolchain_utils = '/mnt/host/source/src/third_party/toolchain-utils' def rebase_path(path: str) -> str: return os.path.join(chroot_toolchain_utils, os.path.relpath(path, toolchain_utils)) args = [ 'cros_sdk', '--enter', '--', rebase_path(__file__), ] if not autofix: args.append('--no_autofix') args.extend(rebase_path(x) for x in files) if chdir_to is None: print('Attempting to enter the chroot...') else: print(f'Attempting to enter the chroot for tree at {chdir_to}...') os.chdir(chdir_to) os.execvp(args[0], args) # FIXME(crbug.com/980719): we probably want a better way of handling this. For # now, as a workaround, ensure we have all dependencies installed as a part of # presubmits. pip and scipy are fast enough to install (they take <1min # combined on my machine), so hoooopefully users won't get too impatient. def ensure_scipy_installed() -> None: if not has_executable_on_path('pip'): print('Autoinstalling `pip`...') subprocess.check_call(['sudo', 'emerge', 'dev-python/pip']) exit_code = subprocess.call( ['python3', '-c', 'import scipy'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) if exit_code != 0: print('Autoinstalling `scipy`...') subprocess.check_call(['pip', 'install', '--user', 'scipy']) def main(argv: t.List[str]) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( '--no_autofix', dest='autofix', action='store_false', help="Don't run any autofix commands.") parser.add_argument( '--no_enter_chroot', dest='enter_chroot', action='store_false', help="Prevent auto-entering the chroot if we're not already in it.") parser.add_argument('files', nargs='*') opts = parser.parse_args(argv) files = opts.files if not files: return 0 if opts.enter_chroot: maybe_reexec_inside_chroot(opts.autofix, opts.files) # If you ask for --no_enter_chroot, you're on your own for installing these # things. if is_in_chroot(): ensure_scipy_installed() files = [os.path.abspath(f) for f in files] # Note that we extract .__name__s from these, so please name them in a # user-friendly way. checks = [ check_cros_lint, check_py_format, check_go_format, check_tests, ] toolchain_utils_root = detect_toolchain_utils_root() # NOTE: As mentioned above, checks can block on threads they spawn in this # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2 # so all checks can make progress at a decent rate. num_threads = max(multiprocessing.cpu_count(), len(checks) * 2) start_time = datetime.datetime.now() # For our single print statement... spawn_print_lock = threading.RLock() def run_check(check_fn): name = check_fn.__name__ with spawn_print_lock: print('*** Spawning %s' % name) return name, check_fn(toolchain_utils_root, pool, files) # ThreadPool is a ContextManager in py3. # pylint: disable=not-context-manager with multiprocessing.pool.ThreadPool(num_threads) as pool: all_checks_ok = True all_autofix_commands = [] for check_name, result in pool.imap_unordered(run_check, checks): ok, autofix_commands = process_check_result(check_name, result, start_time) all_checks_ok = ok and all_checks_ok all_autofix_commands += autofix_commands # Run these after everything settles, so: # - we don't collide with checkers that are running concurrently # - we clearly print out everything that went wrong ahead of time, in case # any of these fail if opts.autofix: try_autofix(all_autofix_commands, toolchain_utils_root) if not all_checks_ok: return 1 return 0 if __name__ == '__main__': sys.exit(main(sys.argv[1:]))