From e890214955dee0de3bbad07c1c720cc01405f808 Mon Sep 17 00:00:00 2001 From: Yifan Hong Date: Thu, 18 Apr 2024 16:38:44 -0700 Subject: kleaf: Refactor bazel.py for output stream mutation. Prepare for the changes for output filter ( now renamed to output mutation). Bug: 234125794 Test: TH Change-Id: I6c79a0b82222065ba795e7e2e21f3774bb57ac38 --- kleaf/bazel.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 119 insertions(+), 27 deletions(-) (limited to 'kleaf/bazel.py') diff --git a/kleaf/bazel.py b/kleaf/bazel.py index 42851537..2aa7cb60 100755 --- a/kleaf/bazel.py +++ b/kleaf/bazel.py @@ -13,6 +13,7 @@ # limitations under the License. import argparse +import dataclasses import os import pathlib import re @@ -20,7 +21,7 @@ import shlex import shutil import sys import textwrap -from typing import Tuple, Optional +from typing import BinaryIO, Tuple, Optional from kleaf_help import KleafHelpPrinter, FLAGS_BAZEL_RC @@ -39,6 +40,39 @@ _QUERY_ABI_TARGETS_ARG = 'kind("(update_source_file|abi_update) rule", //... exc _REPO_BOUNDARY_FILES = ("MODULE.bazel", "REPO.bazel", "WORKSPACE.bazel", "WORKSPACE") + +@dataclasses.dataclass +class BazelWrapperException(Exception): + """A generic Bazel-wrapper error.""" + + # error message + message: str = "" + + # exit code of the program. + # Default is 1, "Build failed". See https://bazel.build/run/scripts + code: int = 1 + + def __post_init__(self): + super().__init__(self, self.message) + + +class MultipleBazelWrapperException(BazelWrapperException): + """Wraps multiple BazelWrapperException into one.""" + + def __init__(self, errors: list[BazelWrapperException]): + """Wraps multiple BazelWrapperException into one. + + Args: + errors: a list of BazelWrapperException objects. + Must not be empty. + """ + assert errors + super().__init__( + message="\n".join(error.message for error in errors), + code=errors[0].code + ) + + def _require_absolute_path(p: str | pathlib.Path) -> pathlib.Path: p = pathlib.Path(p) if not p.is_absolute(): @@ -519,22 +553,39 @@ class BazelWrapper(KleafHelpPrinter): print("Kleaf ABI update available targets:") self.transformed_command_args.append(_QUERY_ABI_TARGETS_ARG) - def run(self): + def run(self) -> int: + """Runs the wrapper. + + Returns: + exit code""" final_args = self._build_final_args() if self.known_startup_options.help or self.command == "help": self._print_help() - if self._should_run_as_subprocess(): - import asyncio + if not self._should_run_as_subprocess(): + os.execve(path=self.bazel_path, argv=final_args, env=self.env) + assert False, "os.execve should not return" + + output_mutator = OutputMutator( + filter_regex=self._get_output_filter_regex(), + ) + + import asyncio + try: asyncio.run(run( command=final_args, env=self.env, - filter_regex=self._get_output_filter_regex(), + output_mutator=output_mutator, epilog_coroutine=self._get_epilog_coroutine(), )) - else: - os.execve(path=self.bazel_path, argv=final_args, env=self.env) + except BazelWrapperException as exception: + if exception.message: + print(exception.message, file=sys.stderr) + return exception.code + + return 0 + def _should_run_as_subprocess(self): """Returns whether to run bazel command as subprocess""" @@ -565,23 +616,41 @@ class BazelWrapper(KleafHelpPrinter): shutil.rmtree(self.gen_bazelrc_dir, ignore_errors=True) -async def output_filter(input_stream, output_stream, filter_regex): - """Pipes input to output, optionally filtering lines with given filter_regex. +class OutputMutator: + """Helper class to filter and mutate an output stream.""" + def __init__( + self, + filter_regex: re.Pattern | None, + ): + self._filter_regex = filter_regex - If filter_regex is None, don't filter lines. - """ - while not input_stream.at_eof(): - output = await input_stream.readline() - if filter_regex: - output = re.sub(filter_regex, "", output.decode()).encode() - output_stream.buffer.write(output) - output_stream.flush() + async def mutate_stream( + self, + input_stream: BinaryIO, + output_stream: BinaryIO, + stream_name: str, # pylint: disable=unused-argument + ): + """Pipes input to output, optionally mutating lines. + + If filter_regex is None, don't filter lines. + """ + while not input_stream.at_eof(): + output = await input_stream.readline() + if self._filter_regex: + output_decoded = output.decode() + if self._filter_regex: + output_decoded = re.sub( + self._filter_regex, "", output_decoded) + output = output_decoded.encode() + output_stream.buffer.write(output) + output_stream.flush() -async def run(command, env, filter_regex, epilog_coroutine): + +async def run(command, env, epilog_coroutine, output_mutator): """Runs command with env asynchronously. - Outputs are filtered with filter_regex if it is not None. + Outputs are mutated with output_mutator. At the end, run the coroutine epilog_coroutine if it is not None. """ @@ -593,15 +662,38 @@ async def run(command, env, filter_regex, epilog_coroutine): env=env, ) - await asyncio.gather( - output_filter(process.stderr, sys.stderr, filter_regex), - output_filter(process.stdout, sys.stdout, filter_regex), - ) - await process.wait() + stderr_coroutine = output_mutator.mutate_stream( + input_stream=process.stderr, + output_stream=sys.stderr, + stream_name="stderr") + stdout_coroutine = output_mutator.mutate_stream( + input_stream=process.stdout, + output_stream=sys.stdout, + stream_name="stdout") + + # Wait for the process and stdout/stderr filters concurrently. + coroutines = [ + stderr_coroutine, + stdout_coroutine, + process.wait(), + ] + tasks = [asyncio.Task(coroutine) for coroutine in coroutines] + done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + exceptions = [task.exception() for task in done if task.exception()] + + # epilog_coroutine needs to run after process finishes, so it cannot + # be in the coroutines list. if epilog_coroutine: - await epilog_coroutine + try: + await epilog_coroutine + except BazelWrapperException as exception: + exceptions.append(exception) + match len(exceptions): + case 0: pass + case 1: raise exceptions[0] + case _: raise MultipleBazelWrapperException(exceptions) if __name__ == "__main__": - BazelWrapper(kleaf_repo_dir=pathlib.Path(sys.argv[1]), - bazel_args=sys.argv[2:], env=os.environ).run() + sys.exit(BazelWrapper(kleaf_repo_dir=pathlib.Path(sys.argv[1]), + bazel_args=sys.argv[2:], env=os.environ).run()) -- cgit v1.2.3