diff options
author | Jordan R Abrahams-Whitehead <ajordanr@google.com> | 2022-05-21 02:21:53 +0000 |
---|---|---|
committer | Chromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-06-01 17:00:18 +0000 |
commit | 69f5ec0984e02c8a7f0748522a333fca57f357aa (patch) | |
tree | fcb712a011d7090bd496bec431ed9d692322880d /llvm_tools/patch_utils.py | |
parent | 49d95342584d81f73b33cd25a178f0c8854cd826 (diff) | |
download | toolchain-utils-69f5ec0984e02c8a7f0748522a333fca57f357aa.tar.gz |
llvm_tools: Add patch_utils.py and unittests
This introduces the patch_utils.py library, which
contains various PATCHES.json and patch_manager
utilities that will be useful for future patch manager
restructuring.
In particular, patch_manager.py doesn't explain why its
patches fail, or give any information as to what is wrong with
its patch applications.
patch_utils.py provides the PatchEntry class, which is a self
contained object which can provide this diagnostic information.
This module will later be incorporated into patch_manager.py
and get_upstream_patches.py
BUG=b:188465085, b:227216280
TEST=./patch_utils_unittest.py
Change-Id: I6f6e24e6449ea68f6751fbcad14fca76c1bbaec8
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/3648887
Tested-by: Jordan Abrahams-Whitehead <ajordanr@google.com>
Reviewed-by: George Burgess <gbiv@chromium.org>
Commit-Queue: Jordan Abrahams-Whitehead <ajordanr@google.com>
Diffstat (limited to 'llvm_tools/patch_utils.py')
-rw-r--r-- | llvm_tools/patch_utils.py | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/llvm_tools/patch_utils.py b/llvm_tools/patch_utils.py new file mode 100644 index 00000000..2f282990 --- /dev/null +++ b/llvm_tools/patch_utils.py @@ -0,0 +1,214 @@ +# Copyright 2022 The ChromiumOS Authors. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Provides patch utilities for PATCHES.json file handling.""" + +import collections +import dataclasses +import io +from pathlib import Path +import re +import subprocess +import sys +from typing import Any, Dict, List, Optional + + +CHECKED_FILE_RE = re.compile(r'^checking file\s+(.*)$') +HUNK_FAILED_RE = re.compile(r'^Hunk #(\d+) FAILED at.*') +HUNK_HEADER_RE = re.compile(r'^@@\s+-(\d+),(\d+)\s+\+(\d+),(\d+)\s+@@') +HUNK_END_RE = re.compile(r'^--\s*$') +PATCH_SUBFILE_HEADER_RE = re.compile(r'^\+\+\+ [ab]/(.*)$') + + +@dataclasses.dataclass +class Hunk: + """Represents a patch Hunk.""" + hunk_id: int + """Hunk ID for the current file.""" + orig_start: int + orig_hunk_len: int + patch_start: int + patch_hunk_len: int + patch_hunk_lineno_begin: int + patch_hunk_lineno_end: Optional[int] + + +def parse_patch_stream(patch_stream: io.TextIOBase) -> Dict[str, List[Hunk]]: + """Parse a patch file-like into Hunks. + + Args: + patch_stream: A IO stream formatted like a git patch file. + + Returns: + A dictionary mapping filenames to lists of Hunks present + in the patch stream. + """ + + current_filepath = None + current_hunk_id = 0 + current_hunk = None + out = collections.defaultdict(list) + for lineno, line in enumerate(patch_stream.readlines()): + subfile_header = PATCH_SUBFILE_HEADER_RE.match(line) + if subfile_header: + current_filepath = subfile_header.group(1) + if not current_filepath: + raise RuntimeError('Could not get file header in patch stream') + # Need to reset the hunk id, as it's per-file. + current_hunk_id = 0 + continue + hunk_header = HUNK_HEADER_RE.match(line) + if hunk_header: + if not current_filepath: + raise RuntimeError('Parsed hunk before file header in patch stream') + if current_hunk: + # Already parsing a hunk + current_hunk.patch_hunk_lineno_end = lineno + current_hunk_id += 1 + current_hunk = Hunk(hunk_id=current_hunk_id, + orig_start=int(hunk_header.group(1)), + orig_hunk_len=int(hunk_header.group(2)), + patch_start=int(hunk_header.group(3)), + patch_hunk_len=int(hunk_header.group(4)), + patch_hunk_lineno_begin=lineno + 1, + patch_hunk_lineno_end=None) + out[current_filepath].append(current_hunk) + continue + if current_hunk and HUNK_END_RE.match(line): + current_hunk.patch_hunk_lineno_end = lineno + return out + + +def parse_failed_patch_output(text: str) -> Dict[str, List[int]]: + current_file = None + failed_hunks = collections.defaultdict(list) + for eline in text.split('\n'): + checked_file_match = CHECKED_FILE_RE.match(eline) + if checked_file_match: + current_file = checked_file_match.group(1) + continue + failed_match = HUNK_FAILED_RE.match(eline) + if failed_match: + if not current_file: + raise ValueError('Input stream was not parsable') + hunk_id = int(failed_match.group(1)) + failed_hunks[current_file].append(hunk_id) + return failed_hunks + + +@dataclasses.dataclass(frozen=True) +class PatchResult: + """Result of a patch application.""" + succeeded: bool + failed_hunks: Dict[str, List[Hunk]] = dataclasses.field(default_factory=dict) + + def __bool__(self): + return self.succeeded + + +@dataclasses.dataclass +class PatchEntry: + """Object mapping of an entry of PATCHES.json.""" + workdir: Path + metadata: Dict[str, Any] + platforms: List[str] + rel_patch_path: str + version_range: Dict[str, int] + _parsed_hunks = None + + def __post_init__(self): + if not self.workdir.is_dir(): + raise ValueError(f'workdir {self.workdir} is not a directory') + + @classmethod + def from_dict(cls, workdir: Path, data: Dict[str, Any]): + """Instatiate from a dictionary. + + Dictionary must have at least the following keys: + + { + 'metadata': { + 'title': '<title>' + }, + 'platforms': ['<platform>'], + 'rel_patch_path': '<relative patch path to workdir>', + 'version_range': { + 'from': <int>, + 'until': <int>, + }, + } + + Returns: + A new PatchEntry. + """ + return cls(workdir, data['metadata'], data['platforms'], + data['rel_patch_path'], data['version_range']) + + def to_dict(self) -> Dict[str, Any]: + return { + 'metadata': self.metadata, + 'platforms': self.platforms, + 'rel_patch_path': self.rel_patch_path, + 'version_range': self.version_range, + } + + def parsed_hunks(self) -> Dict[str, List[Hunk]]: + # Minor caching here because IO is slow. + if not self._parsed_hunks: + with self.patch_path().open(encoding='utf-8') as f: + self._parsed_hunks = parse_patch_stream(f) + return self._parsed_hunks + + def patch_path(self) -> Path: + return self.workdir / self.rel_patch_path + + def can_patch_version(self, svn_version: int) -> bool: + """Is this patch meant to apply to `svn_version`?""" + # Sometimes the key is there, but it's set to None. + from_v = self.version_range.get('from') or 0 + until_v = self.version_range.get('until') + if until_v is None: + until_v = sys.maxsize + return from_v <= svn_version < until_v + + def is_old(self, svn_version: int) -> bool: + """Is this patch old compared to `svn_version`?""" + until_v = self.version_range.get('until') + # Sometimes the key is there, but it's set to None. + if until_v is None: + until_v = sys.maxsize + return svn_version >= until_v + + def apply(self, + root_dir: Path, + extra_args: Optional[List[str]] = None) -> PatchResult: + """Apply a patch to a given directory.""" + if not extra_args: + extra_args = [] + # Cmd to apply a patch in the src unpack path. + cmd = [ + 'patch', '-d', + root_dir.absolute(), '-f', '-p1', '--no-backup-if-mismatch', '-i', + self.patch_path().absolute() + ] + extra_args + try: + subprocess.run(cmd, encoding='utf-8', check=True, stdout=subprocess.PIPE) + except subprocess.CalledProcessError as e: + parsed_hunks = self.parsed_hunks() + failed_hunks_id_dict = parse_failed_patch_output(e.stdout) + failed_hunks = {} + for path, failed_hunk_ids in failed_hunks_id_dict.items(): + hunks_for_file = parsed_hunks[path] + failed_hunks[path] = [ + hunk for hunk in hunks_for_file if hunk.hunk_id in failed_hunk_ids + ] + return PatchResult(succeeded=False, failed_hunks=failed_hunks) + return PatchResult(succeeded=True) + + def test_apply(self, root_dir: Path) -> PatchResult: + """Dry run applying a patch to a given directory.""" + return self.apply(root_dir, ['--dry-run']) + + def title(self) -> str: + return self.metadata['title'] |