diff options
Diffstat (limited to 'llvm_tools/patch_utils.py')
-rw-r--r-- | llvm_tools/patch_utils.py | 594 |
1 files changed, 594 insertions, 0 deletions
diff --git a/llvm_tools/patch_utils.py b/llvm_tools/patch_utils.py new file mode 100644 index 00000000..affb3d0d --- /dev/null +++ b/llvm_tools/patch_utils.py @@ -0,0 +1,594 @@ +# Copyright 2022 The ChromiumOS Authors +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Provides patch utilities for PATCHES.json file handling.""" + +import collections +import contextlib +import dataclasses +import json +from pathlib import Path +import re +import subprocess +import sys +from typing import Any, Dict, IO, Iterable, List, Optional, Tuple, Union + + +CHECKED_FILE_RE = re.compile(r"^checking file\s+(.*)$") +HUNK_FAILED_RE = re.compile(r"^Hunk #(\d+) FAILED at.*") +HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+),(\d+)\s+\+(\d+),(\d+)\s+@@") +HUNK_END_RE = re.compile(r"^--\s*$") +PATCH_SUBFILE_HEADER_RE = re.compile(r"^\+\+\+ [ab]/(.*)$") + + +@contextlib.contextmanager +def atomic_write(fp: Union[Path, str], mode="w", *args, **kwargs): + """Write to a filepath atomically. + + This works by a temp file swap, created with a .tmp suffix in + the same directory briefly until being renamed to the desired + filepath. + + Args: + fp: Filepath to open. + mode: File mode; can be 'w', 'wb'. Default 'w'. + *args: Passed to Path.open as nargs. + **kwargs: Passed to Path.open as kwargs. + + Raises: + ValueError when the mode is invalid. + """ + if isinstance(fp, str): + fp = Path(fp) + if mode not in ("w", "wb"): + raise ValueError(f"mode {mode} not accepted") + temp_fp = fp.with_suffix(fp.suffix + ".tmp") + try: + with temp_fp.open(mode, *args, **kwargs) as f: + yield f + except: + if temp_fp.is_file(): + temp_fp.unlink() + raise + temp_fp.rename(fp) + + +@dataclasses.dataclass +class Hunk: + """Represents a patch Hunk.""" + + hunk_id: int + """Hunk ID for the current file.""" + orig_start: int + orig_hunk_len: int + patch_start: int + patch_hunk_len: int + patch_hunk_lineno_begin: int + patch_hunk_lineno_end: Optional[int] + + +def parse_patch_stream(patch_stream: IO[str]) -> Dict[str, List[Hunk]]: + """Parse a patch file-like into Hunks. + + Args: + patch_stream: A IO stream formatted like a git patch file. + + Returns: + A dictionary mapping filenames to lists of Hunks present + in the patch stream. + """ + + current_filepath = None + current_hunk_id = 0 + current_hunk = None + out = collections.defaultdict(list) + for lineno, line in enumerate(patch_stream.readlines()): + subfile_header = PATCH_SUBFILE_HEADER_RE.match(line) + if subfile_header: + current_filepath = subfile_header.group(1) + if not current_filepath: + raise RuntimeError("Could not get file header in patch stream") + # Need to reset the hunk id, as it's per-file. + current_hunk_id = 0 + continue + hunk_header = HUNK_HEADER_RE.match(line) + if hunk_header: + if not current_filepath: + raise RuntimeError( + "Parsed hunk before file header in patch stream" + ) + if current_hunk: + # Already parsing a hunk + current_hunk.patch_hunk_lineno_end = lineno + current_hunk_id += 1 + current_hunk = Hunk( + hunk_id=current_hunk_id, + orig_start=int(hunk_header.group(1)), + orig_hunk_len=int(hunk_header.group(2)), + patch_start=int(hunk_header.group(3)), + patch_hunk_len=int(hunk_header.group(4)), + patch_hunk_lineno_begin=lineno + 1, + patch_hunk_lineno_end=None, + ) + out[current_filepath].append(current_hunk) + continue + if current_hunk and HUNK_END_RE.match(line): + current_hunk.patch_hunk_lineno_end = lineno + return out + + +def parse_failed_patch_output(text: str) -> Dict[str, List[int]]: + current_file = None + failed_hunks = collections.defaultdict(list) + for eline in text.split("\n"): + checked_file_match = CHECKED_FILE_RE.match(eline) + if checked_file_match: + current_file = checked_file_match.group(1) + continue + failed_match = HUNK_FAILED_RE.match(eline) + if failed_match: + if not current_file: + raise ValueError("Input stream was not parsable") + hunk_id = int(failed_match.group(1)) + failed_hunks[current_file].append(hunk_id) + return failed_hunks + + +@dataclasses.dataclass(frozen=True) +class PatchResult: + """Result of a patch application.""" + + succeeded: bool + failed_hunks: Dict[str, List[Hunk]] = dataclasses.field( + default_factory=dict + ) + + def __bool__(self): + return self.succeeded + + def failure_info(self) -> str: + if self.succeeded: + return "" + s = "" + for file, hunks in self.failed_hunks.items(): + s += f"{file}:\n" + for h in hunks: + s += f"Lines {h.orig_start} to {h.orig_start + h.orig_hunk_len}\n" + s += "--------------------\n" + return s + + +@dataclasses.dataclass +class PatchEntry: + """Object mapping of an entry of PATCHES.json.""" + + workdir: Path + """Storage location for the patches.""" + metadata: Optional[Dict[str, Any]] + platforms: Optional[List[str]] + rel_patch_path: str + version_range: Optional[Dict[str, Optional[int]]] + _parsed_hunks = None + + def __post_init__(self): + if not self.workdir.is_dir(): + raise ValueError(f"workdir {self.workdir} is not a directory") + + @classmethod + def from_dict(cls, workdir: Path, data: Dict[str, Any]): + """Instatiate from a dictionary. + + Dictionary must have at least the following key: + + { + 'rel_patch_path': '<relative patch path to workdir>', + } + + Returns: + A new PatchEntry. + """ + return cls( + workdir, + data.get("metadata"), + data.get("platforms"), + data["rel_patch_path"], + data.get("version_range"), + ) + + def to_dict(self) -> Dict[str, Any]: + out: Dict[str, Any] = { + "metadata": self.metadata, + } + if self.platforms: + # To match patch_sync, only serialized when + # non-empty and non-null. + out["platforms"] = sorted(self.platforms) + out.update( + { + "rel_patch_path": self.rel_patch_path, + "version_range": self.version_range, + } + ) + return out + + def parsed_hunks(self) -> Dict[str, List[Hunk]]: + # Minor caching here because IO is slow. + if not self._parsed_hunks: + with self.patch_path().open(encoding="utf-8") as f: + self._parsed_hunks = parse_patch_stream(f) + return self._parsed_hunks + + def patch_path(self) -> Path: + return self.workdir / self.rel_patch_path + + def can_patch_version(self, svn_version: int) -> bool: + """Is this patch meant to apply to `svn_version`?""" + # Sometimes the key is there, but it's set to None. + if not self.version_range: + return True + from_v = self.version_range.get("from") or 0 + until_v = self.version_range.get("until") + if until_v is None: + until_v = sys.maxsize + return from_v <= svn_version < until_v + + def is_old(self, svn_version: int) -> bool: + """Is this patch old compared to `svn_version`?""" + if not self.version_range: + return False + until_v = self.version_range.get("until") + # Sometimes the key is there, but it's set to None. + if until_v is None: + until_v = sys.maxsize + return svn_version >= until_v + + def apply( + self, root_dir: Path, extra_args: Optional[List[str]] = None + ) -> PatchResult: + """Apply a patch to a given directory.""" + if not extra_args: + extra_args = [] + # Cmd to apply a patch in the src unpack path. + abs_patch_path = self.patch_path().absolute() + if not abs_patch_path.is_file(): + raise RuntimeError( + f"Cannot apply: patch {abs_patch_path} is not a file" + ) + cmd = [ + "patch", + "-d", + root_dir.absolute(), + "-f", + "-p1", + "--no-backup-if-mismatch", + "-i", + abs_patch_path, + ] + extra_args + try: + subprocess.run( + cmd, encoding="utf-8", check=True, stdout=subprocess.PIPE + ) + except subprocess.CalledProcessError as e: + parsed_hunks = self.parsed_hunks() + failed_hunks_id_dict = parse_failed_patch_output(e.stdout) + failed_hunks = {} + for path, failed_hunk_ids in failed_hunks_id_dict.items(): + hunks_for_file = parsed_hunks[path] + failed_hunks[path] = [ + hunk + for hunk in hunks_for_file + if hunk.hunk_id in failed_hunk_ids + ] + return PatchResult(succeeded=False, failed_hunks=failed_hunks) + return PatchResult(succeeded=True) + + def test_apply(self, root_dir: Path) -> PatchResult: + """Dry run applying a patch to a given directory.""" + return self.apply(root_dir, ["--dry-run"]) + + def title(self) -> str: + if not self.metadata: + return "" + return self.metadata.get("title", "") + + +@dataclasses.dataclass(frozen=True) +class PatchInfo: + """Holds info for a round of patch applications.""" + + # str types are legacy. Patch lists should + # probably be PatchEntries, + applied_patches: List[PatchEntry] + failed_patches: List[PatchEntry] + # Can be deleted once legacy code is removed. + non_applicable_patches: List[str] + # Can be deleted once legacy code is removed. + disabled_patches: List[str] + # Can be deleted once legacy code is removed. + removed_patches: List[str] + # Can be deleted once legacy code is removed. + modified_metadata: Optional[str] + + def _asdict(self): + return dataclasses.asdict(self) + + +def json_to_patch_entries(workdir: Path, json_fd: IO[str]) -> List[PatchEntry]: + """Convert a json IO object to List[PatchEntry]. + + Examples: + >>> f = open('PATCHES.json') + >>> patch_entries = json_to_patch_entries(Path(), f) + """ + return [PatchEntry.from_dict(workdir, d) for d in json.load(json_fd)] + + +def _print_failed_patch(pe: PatchEntry, failed_hunks: Dict[str, List[Hunk]]): + """Print information about a single failing PatchEntry. + + Args: + pe: A PatchEntry that failed. + failed_hunks: Hunks for pe which failed as dict: + filepath: [Hunk...] + """ + print(f"Could not apply {pe.rel_patch_path}: {pe.title()}", file=sys.stderr) + for fp, hunks in failed_hunks.items(): + print(f"{fp}:", file=sys.stderr) + for h in hunks: + print( + f"- {pe.rel_patch_path} " + f"l:{h.patch_hunk_lineno_begin}...{h.patch_hunk_lineno_end}", + file=sys.stderr, + ) + + +def apply_all_from_json( + svn_version: int, + llvm_src_dir: Path, + patches_json_fp: Path, + continue_on_failure: bool = False, +) -> PatchInfo: + """Attempt to apply some patches to a given LLVM source tree. + + This relies on a PATCHES.json file to be the primary way + the patches are applied. + + Args: + svn_version: LLVM Subversion revision to patch. + llvm_src_dir: llvm-project root-level source directory to patch. + patches_json_fp: Filepath to the PATCHES.json file. + continue_on_failure: Skip any patches which failed to apply, + rather than throw an Exception. + """ + with patches_json_fp.open(encoding="utf-8") as f: + patches = json_to_patch_entries(patches_json_fp.parent, f) + skipped_patches = [] + failed_patches = [] + applied_patches = [] + for pe in patches: + applied, failed_hunks = apply_single_patch_entry( + svn_version, llvm_src_dir, pe + ) + if applied: + applied_patches.append(pe) + continue + if failed_hunks is not None: + if continue_on_failure: + failed_patches.append(pe) + continue + else: + _print_failed_patch(pe, failed_hunks) + raise RuntimeError( + "failed to apply patch " f"{pe.patch_path()}: {pe.title()}" + ) + # Didn't apply, didn't fail, it was skipped. + skipped_patches.append(pe) + return PatchInfo( + non_applicable_patches=skipped_patches, + applied_patches=applied_patches, + failed_patches=failed_patches, + disabled_patches=[], + removed_patches=[], + modified_metadata=None, + ) + + +def apply_single_patch_entry( + svn_version: int, + llvm_src_dir: Path, + pe: PatchEntry, + ignore_version_range: bool = False, +) -> Tuple[bool, Optional[Dict[str, List[Hunk]]]]: + """Try to apply a single PatchEntry object. + + Returns: + Tuple where the first element indicates whether the patch applied, + and the second element is a faild hunk mapping from file name to lists of + hunks (if the patch didn't apply). + """ + # Don't apply patches outside of the version range. + if not ignore_version_range and not pe.can_patch_version(svn_version): + return False, None + # Test first to avoid making changes. + test_application = pe.test_apply(llvm_src_dir) + if not test_application: + return False, test_application.failed_hunks + # Now actually make changes. + application_result = pe.apply(llvm_src_dir) + if not application_result: + # This should be very rare/impossible. + return False, application_result.failed_hunks + return True, None + + +def is_git_dirty(git_root_dir: Path) -> bool: + """Return whether the given git directory has uncommitted changes.""" + if not git_root_dir.is_dir(): + raise ValueError(f"git_root_dir {git_root_dir} is not a directory") + cmd = ["git", "ls-files", "-m", "--other", "--exclude-standard"] + return ( + subprocess.run( + cmd, + stdout=subprocess.PIPE, + check=True, + cwd=git_root_dir, + encoding="utf-8", + ).stdout + != "" + ) + + +def clean_src_tree(src_path): + """Cleans the source tree of the changes made in 'src_path'.""" + + reset_src_tree_cmd = ["git", "-C", src_path, "reset", "HEAD", "--hard"] + + subprocess.run(reset_src_tree_cmd, check=True) + + clean_src_tree_cmd = ["git", "-C", src_path, "clean", "-fd"] + + subprocess.run(clean_src_tree_cmd, check=True) + + +@contextlib.contextmanager +def git_clean_context(git_root_dir: Path): + """Cleans up a git directory when the context exits.""" + if is_git_dirty(git_root_dir): + raise RuntimeError("Cannot setup clean context; git_root_dir is dirty") + try: + yield + finally: + clean_src_tree(git_root_dir) + + +def _write_json_changes(patches: List[Dict[str, Any]], file_io: IO[str]): + """Write JSON changes to file, does not acquire new file lock.""" + json.dump(patches, file_io, indent=4, separators=(",", ": ")) + # Need to add a newline as json.dump omits it. + file_io.write("\n") + + +def update_version_ranges( + svn_version: int, llvm_src_dir: Path, patches_json_fp: Path +) -> PatchInfo: + """Reduce the version ranges of failing patches. + + Patches which fail to apply will have their 'version_range.until' + field reduced to the passed in svn_version. + + Modifies the contents of patches_json_fp. + + Args: + svn_version: LLVM revision number. + llvm_src_dir: llvm-project directory path. + patches_json_fp: Filepath to the PATCHES.json file. + + Returns: + PatchInfo for applied and disabled patches. + """ + with patches_json_fp.open(encoding="utf-8") as f: + patch_entries = json_to_patch_entries( + patches_json_fp.parent, + f, + ) + modified_entries, applied_patches = update_version_ranges_with_entries( + svn_version, llvm_src_dir, patch_entries + ) + with atomic_write(patches_json_fp, encoding="utf-8") as f: + _write_json_changes([p.to_dict() for p in patch_entries], f) + for entry in modified_entries: + print( + f"Stopped applying {entry.rel_patch_path} ({entry.title()}) " + f"for r{svn_version}" + ) + return PatchInfo( + non_applicable_patches=[], + applied_patches=applied_patches, + failed_patches=[], + disabled_patches=[p.rel_patch_path for p in modified_entries], + removed_patches=[], + modified_metadata=str(patches_json_fp) if modified_entries else None, + ) + + +def update_version_ranges_with_entries( + svn_version: int, + llvm_src_dir: Path, + patch_entries: Iterable[PatchEntry], +) -> Tuple[List[PatchEntry], List[PatchEntry]]: + """Test-able helper for UpdateVersionRanges. + + Args: + svn_version: LLVM revision number. + llvm_src_dir: llvm-project directory path. + patch_entries: PatchEntry objects to modify. + + Returns: + Tuple of (modified entries, applied patches) + + Post: + Modifies patch_entries in place. + """ + modified_entries: List[PatchEntry] = [] + applied_patches: List[PatchEntry] = [] + active_patches = (pe for pe in patch_entries if not pe.is_old(svn_version)) + with git_clean_context(llvm_src_dir): + for pe in active_patches: + test_result = pe.test_apply(llvm_src_dir) + if not test_result: + if pe.version_range is None: + pe.version_range = {} + pe.version_range["until"] = svn_version + modified_entries.append(pe) + else: + # We have to actually apply the patch so that future patches + # will stack properly. + if not pe.apply(llvm_src_dir).succeeded: + raise RuntimeError( + "Could not apply patch that dry ran successfully" + ) + applied_patches.append(pe) + + return modified_entries, applied_patches + + +def remove_old_patches( + svn_version: int, llvm_src_dir: Path, patches_json_fp: Path +) -> PatchInfo: + """Remove patches that don't and will never apply for the future. + + Patches are determined to be "old" via the "is_old" method for + each patch entry. + + Args: + svn_version: LLVM SVN version. + llvm_src_dir: LLVM source directory. + patches_json_fp: Location to edit patches on. + + Returns: + PatchInfo for modified patches. + """ + with patches_json_fp.open(encoding="utf-8") as f: + patches_list = json.load(f) + patch_entries = ( + PatchEntry.from_dict(llvm_src_dir, elem) for elem in patches_list + ) + oldness = [(entry, entry.is_old(svn_version)) for entry in patch_entries] + filtered_entries = [entry.to_dict() for entry, old in oldness if not old] + with atomic_write(patches_json_fp, encoding="utf-8") as f: + _write_json_changes(filtered_entries, f) + removed_entries = [entry for entry, old in oldness if old] + plural_patches = "patch" if len(removed_entries) == 1 else "patches" + print(f"Removed {len(removed_entries)} old {plural_patches}:") + for r in removed_entries: + print(f"- {r.rel_patch_path}: {r.title()}") + + return PatchInfo( + non_applicable_patches=[], + applied_patches=[], + failed_patches=[], + disabled_patches=[], + removed_patches=[p.rel_patch_path for p in removed_entries], + modified_metadata=str(patches_json_fp) if removed_entries else None, + ) |