diff options
author | George Burgess IV <gbiv@google.com> | 2023-07-28 09:39:59 -0600 |
---|---|---|
committer | Chromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2023-08-01 17:18:14 +0000 |
commit | f208ba28a6d1ab77fae68e5ce0f35d5cfc3c4943 (patch) | |
tree | 86226c9fb7ba4f3f3479a4174b25d9ca40d695bd | |
parent | 5bc51440df9a09360b0e52677cc880893a1b100e (diff) | |
download | toolchain-utils-f208ba28a6d1ab77fae68e5ce0f35d5cfc3c4943.tar.gz |
llvm_tools: remove 4c scripts
4c was turned down, so these scripts no longer have a purpose.
BUG=b:293166552
TEST=None
Change-Id: Ic2589c5c31cd6f6fd1af7501669bd2c663322c09
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/4729352
Commit-Queue: Ryan Beltran <ryanbeltran@chromium.org>
Tested-by: George Burgess <gbiv@chromium.org>
Reviewed-by: Ryan Beltran <ryanbeltran@chromium.org>
Auto-Submit: George Burgess <gbiv@chromium.org>
-rwxr-xr-x | llvm_tools/bisect_clang_crashes.py | 157 | ||||
-rwxr-xr-x | llvm_tools/bisect_clang_crashes_unittest.py | 101 | ||||
-rwxr-xr-x | llvm_tools/upload_lexan_crashes_to_forcey.py | 318 | ||||
-rwxr-xr-x | llvm_tools/upload_lexan_crashes_to_forcey_test.py | 207 |
4 files changed, 0 insertions, 783 deletions
diff --git a/llvm_tools/bisect_clang_crashes.py b/llvm_tools/bisect_clang_crashes.py deleted file mode 100755 index b2759051..00000000 --- a/llvm_tools/bisect_clang_crashes.py +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2020 The ChromiumOS Authors -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Fetches and submits the artifacts from ChromeOS toolchain's crash bucket. -""" - -import argparse -import glob -import json -import logging -import os -import os.path -import shutil -import subprocess -import sys - -import chroot - - -def get_artifacts(pattern): - results = subprocess.check_output( - ["gsutil.py", "ls", pattern], stderr=subprocess.STDOUT, encoding="utf-8" - ) - return sorted(l.strip() for l in results.splitlines()) - - -def get_crash_reproducers(working_dir): - results = [] - for src in [ - f - for f in glob.glob("%s/*.c*" % working_dir) - if f.split(".")[-1] in ["c", "cc", "cpp"] - ]: - script = ".".join(src.split(".")[:-1]) + ".sh" - if not os.path.exists(script): - logging.warning("could not find the matching script of %s", src) - else: - results.append((src, script)) - return results - - -def submit_crash_to_forcey( - forcey: str, temporary_directory: str, buildbucket_id: str, url: str -) -> None: - dest_dir = os.path.join(temporary_directory, buildbucket_id) - dest_file = os.path.join(dest_dir, os.path.basename(url)) - logging.info("Downloading and submitting %r...", url) - subprocess.check_output( - ["gsutil.py", "cp", url, dest_file], stderr=subprocess.STDOUT - ) - subprocess.check_output(["tar", "-xJf", dest_file], cwd=dest_dir) - for src, script in get_crash_reproducers(dest_dir): - subprocess.check_output( - [ - forcey, - "reduce", - "-wait=false", - "-note", - "%s:%s" % (url, src), - "-sh_file", - script, - "-src_file", - src, - ] - ) - - -def main(argv): - chroot.VerifyOutsideChroot() - logging.basicConfig( - format="%(asctime)s: %(levelname)s: %(filename)s:%(lineno)d: %(message)s", - level=logging.INFO, - ) - cur_dir = os.path.dirname(os.path.abspath(__file__)) - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "--4c", dest="forcey", required=True, help="Path to a 4c client binary" - ) - parser.add_argument( - "--state_file", - default=os.path.join(cur_dir, "chromeos-state.json"), - help="The path to the state file.", - ) - parser.add_argument( - "--nocleanup", - action="store_false", - dest="cleanup", - help="Keep temporary files created after the script finishes.", - ) - opts = parser.parse_args(argv) - - state_file = os.path.abspath(opts.state_file) - os.makedirs(os.path.dirname(state_file), exist_ok=True) - temporary_directory = "/tmp/bisect_clang_crashes" - os.makedirs(temporary_directory, exist_ok=True) - urls = get_artifacts( - "gs://chromeos-toolchain-artifacts/clang-crash-diagnoses" - "/**/*clang_crash_diagnoses.tar.xz" - ) - logging.info("%d crash URLs found", len(urls)) - - visited = {} - if os.path.exists(state_file): - buildbucket_ids = {url.split("/")[-2] for url in urls} - with open(state_file, encoding="utf-8") as f: - data = json.load(f) - visited = {k: v for k, v in data.items() if k in buildbucket_ids} - logging.info( - "Successfully loaded %d previously-submitted crashes", len(visited) - ) - - try: - for url in urls: - splits = url.split("/") - buildbucket_id = splits[-2] - # Skip the builds that has been processed - if buildbucket_id in visited: - continue - submit_crash_to_forcey( - forcey=opts.forcey, - temporary_directory=temporary_directory, - buildbucket_id=buildbucket_id, - url=url, - ) - visited[buildbucket_id] = url - - exception_in_flight = False - except: - exception_in_flight = True - raise - finally: - if exception_in_flight: - # This is best-effort. If the machine powers off or similar, we'll just - # resubmit the same crashes, which is suboptimal, but otherwise - # acceptable. - logging.error( - "Something went wrong; attempting to save our work..." - ) - else: - logging.info("Persisting state...") - - tmp_state_file = state_file + ".tmp" - with open(tmp_state_file, "w", encoding="utf-8") as f: - json.dump(visited, f, indent=2) - os.rename(tmp_state_file, state_file) - - logging.info("State successfully persisted") - - if opts.cleanup: - shutil.rmtree(temporary_directory) - - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) diff --git a/llvm_tools/bisect_clang_crashes_unittest.py b/llvm_tools/bisect_clang_crashes_unittest.py deleted file mode 100755 index 22c9be19..00000000 --- a/llvm_tools/bisect_clang_crashes_unittest.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2020 The ChromiumOS Authors -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Tests for bisect_clang_crashes.""" - -import glob -import logging -import os.path -import subprocess -import unittest -import unittest.mock as mock - -import bisect_clang_crashes - - -class Test(unittest.TestCase): - """Tests for bisect_clang_crashes.""" - - class _SilencingFilter(object): - """Silences all log messages. - - Also collects info about log messages that would've been emitted. - """ - - def __init__(self): - self.messages = [] - - def filter(self, record): - self.messages.append(record.getMessage()) - return 0 - - @mock.patch.object(subprocess, "check_output") - def test_get_artifacts(self, mock_gsutil_ls): - pattern = ( - "gs://chromeos-toolchain-artifacts/clang-crash-diagnoses/" - "**/*clang_crash_diagnoses.tar.xz" - ) - mock_gsutil_ls.return_value = "artifact1\nartifact2\nartifact3" - results = bisect_clang_crashes.get_artifacts(pattern) - self.assertEqual(results, ["artifact1", "artifact2", "artifact3"]) - mock_gsutil_ls.assert_called_once_with( - ["gsutil.py", "ls", pattern], - stderr=subprocess.STDOUT, - encoding="utf-8", - ) - - @mock.patch.object(os.path, "exists") - @mock.patch.object(glob, "glob") - def test_get_crash_reproducers_succeed( - self, mock_file_search, mock_file_check - ): - working_dir = "SomeDirectory" - mock_file_search.return_value = ["a.c", "b.cpp", "c.cc"] - mock_file_check.side_effect = [True, True, True] - results = bisect_clang_crashes.get_crash_reproducers(working_dir) - mock_file_search.assert_called_once_with("%s/*.c*" % working_dir) - self.assertEqual(mock_file_check.call_count, 3) - self.assertEqual(mock_file_check.call_args_list[0], mock.call("a.sh")) - self.assertEqual(mock_file_check.call_args_list[1], mock.call("b.sh")) - self.assertEqual(mock_file_check.call_args_list[2], mock.call("c.sh")) - self.assertEqual( - results, [("a.c", "a.sh"), ("b.cpp", "b.sh"), ("c.cc", "c.sh")] - ) - - @mock.patch.object(os.path, "exists") - @mock.patch.object(glob, "glob") - def test_get_crash_reproducers_no_matching_script( - self, mock_file_search, mock_file_check - ): - def silence_logging(): - root = logging.getLogger() - filt = self._SilencingFilter() - root.addFilter(filt) - self.addCleanup(root.removeFilter, filt) - return filt - - log_filter = silence_logging() - working_dir = "SomeDirectory" - mock_file_search.return_value = ["a.c", "b.cpp", "c.cc"] - mock_file_check.side_effect = [True, False, True] - results = bisect_clang_crashes.get_crash_reproducers(working_dir) - mock_file_search.assert_called_once_with("%s/*.c*" % working_dir) - self.assertEqual(mock_file_check.call_count, 3) - self.assertEqual(mock_file_check.call_args_list[0], mock.call("a.sh")) - self.assertEqual(mock_file_check.call_args_list[1], mock.call("b.sh")) - self.assertEqual(mock_file_check.call_args_list[2], mock.call("c.sh")) - self.assertEqual(results, [("a.c", "a.sh"), ("c.cc", "c.sh")]) - self.assertTrue( - any( - "could not find the matching script of b.cpp" in x - for x in log_filter.messages - ), - log_filter.messages, - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/llvm_tools/upload_lexan_crashes_to_forcey.py b/llvm_tools/upload_lexan_crashes_to_forcey.py deleted file mode 100755 index f52a27f4..00000000 --- a/llvm_tools/upload_lexan_crashes_to_forcey.py +++ /dev/null @@ -1,318 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2020 The ChromiumOS Authors -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Fetches and submits the latest test-cases from Lexan's crash bucket.""" - -import argparse -import contextlib -import datetime -import json -import logging -import os -import shutil -import subprocess -import sys -import tempfile -from typing import Generator, Iterable, List - - -gsurl_base = "gs://chrome-clang-crash-reports/v1" - - -def gsutil_ls(loc: str) -> List[str]: - results = subprocess.run( - ["gsutil.py", "ls", loc], - stdout=subprocess.PIPE, - check=True, - encoding="utf-8", - ) - return [l.strip() for l in results.stdout.splitlines()] - - -def gsurl_ls_last_numbers(url: str) -> List[int]: - return sorted(int(x.rstrip("/").split("/")[-1]) for x in gsutil_ls(url)) - - -def get_available_year_numbers() -> List[int]: - return gsurl_ls_last_numbers(gsurl_base) - - -def get_available_month_numbers(year: int) -> List[int]: - return gsurl_ls_last_numbers(f"{gsurl_base}/{year}") - - -def get_available_day_numbers(year: int, month: int) -> List[int]: - return gsurl_ls_last_numbers(f"{gsurl_base}/{year}/{month:02d}") - - -def get_available_test_case_urls(year: int, month: int, day: int) -> List[str]: - return gsutil_ls(f"{gsurl_base}/{year}/{month:02d}/{day:02d}") - - -def test_cases_on_or_after( - date: datetime.datetime, -) -> Generator[str, None, None]: - """Yields all test-cases submitted on or after the given date.""" - for year in get_available_year_numbers(): - if year < date.year: - continue - - for month in get_available_month_numbers(year): - if year == date.year and month < date.month: - continue - - for day in get_available_day_numbers(year, month): - when = datetime.date(year, month, day) - if when < date: - continue - - yield when, get_available_test_case_urls(year, month, day) - - -def to_ymd(date: datetime.date) -> str: - return date.strftime("%Y-%m-%d") - - -def from_ymd(date_str: str) -> datetime.date: - return datetime.datetime.strptime(date_str, "%Y-%m-%d").date() - - -def persist_state( - seen_urls: Iterable[str], state_file: str, current_date: datetime.date -): - tmp_state_file = state_file + ".tmp" - with open(tmp_state_file, "w", encoding="utf-8") as f: - json.dump( - { - "already_seen": sorted(seen_urls), - "most_recent_date": to_ymd(current_date), - }, - f, - ) - os.rename(tmp_state_file, state_file) - - -@contextlib.contextmanager -def temp_dir() -> Generator[str, None, None]: - loc = tempfile.mkdtemp("lexan-autosubmit") - try: - yield loc - finally: - shutil.rmtree(loc) - - -def fetch_gs_file_size(gs_url: str) -> int: - """Returns the size of the file at gs_url, in bytes.""" - du = subprocess.run( - ["gsutil.py", "du", gs_url], - check=True, - stdout=subprocess.PIPE, - encoding="utf-8", - ).stdout - - lines = du.splitlines() - assert len(lines) == 1, f"{lines}" - # Format is `size file_name`. - num_bytes = lines[0].lstrip().split(None, 1)[0] - return int(num_bytes) - - -def download_and_unpack_test_case(gs_url: str, tempdir: str) -> None: - suffix = os.path.splitext(gs_url)[1] - target_name = "test_case" + suffix - target = os.path.join(tempdir, target_name) - subprocess.run(["gsutil.py", "cp", gs_url, target], check=True) - subprocess.run(["tar", "xaf", target_name], check=True, cwd=tempdir) - os.unlink(target) - - -def submit_test_case(gs_url: str, cr_tool: str) -> None: - size_limit = 100 * 1024 - size_kb = fetch_gs_file_size(gs_url) // 1024 - if size_kb > size_limit: - logging.warning( - "Ignoring %s; it's %dKB, and the limit is %dKB", - gs_url, - size_kb, - size_limit, - ) - return - - logging.info("Downloading %s (%dKB)", gs_url, size_kb) - with temp_dir() as tempdir: - download_and_unpack_test_case(gs_url, tempdir) - - # Sometimes (e.g., in - # gs://chrome-clang-crash-reports/v1/2020/03/27/ - # chromium.clang-ToTiOS-12754-GTXToolKit-2bfcde.tgz) - # we'll get `.crash` files. Unclear why, but let's filter them out anyway. - repro_files = [ - os.path.join(tempdir, x) - for x in os.listdir(tempdir) - if not x.endswith(".crash") - ] - if len(repro_files) == 1 and repro_files[0].endswith(".tar"): - logging.info( - "Skipping submission of %s; it's a linker crash", gs_url - ) - return - - assert len(repro_files) == 2, repro_files - if repro_files[0].endswith(".sh"): - sh_file, src_file = repro_files - assert not src_file.endswith(".sh"), repro_files - else: - src_file, sh_file = repro_files - assert sh_file.endswith(".sh"), repro_files - - # Peephole: lexan got a crash upload with a way old clang. Ignore it. - with open(sh_file, encoding="utf-8") as f: - if "Crash reproducer for clang version 9.0.0" in f.read(): - logging.warning( - "Skipping upload for %s; seems to be with an old clang", - gs_url, - ) - return - - logging.info("Submitting %s", gs_url) - subprocess.run( - [ - cr_tool, - "reduce", - "-stream=false", - "-wait=false", - "-note", - gs_url, - "-sh_file", - os.path.join(tempdir, sh_file), - "-src_file", - os.path.join(tempdir, src_file), - ], - check=True, - ) - - -def submit_new_test_cases( - last_seen_test_cases: Iterable[str], - earliest_date_to_check: datetime.date, - forcey: str, - state_file_path: str, -) -> None: - """Submits new test-cases to forcey. - - This will persist state after each test-case is submitted. - - Args: - last_seen_test_cases: test-cases which have been submitted already, and - should be skipped if seen again. - earliest_date_to_check: the earliest date we should consider test-cases - from. - forcey: path to the forcey binary. - state_file_path: path to our state file. - """ - # `all_test_cases_seen` is the union of all test-cases seen on this and prior - # invocations. It guarantees, in all cases we care about, that we won't - # submit the same test-case twice. `test_cases_seen_this_invocation` is - # persisted as "all of the test-cases we've seen on this and prior - # invocations" if we successfully submit _all_ test-cases. - # - # Since you can visualize the test-cases this script considers as a sliding - # window that only moves forward, if we saw a test-case on a prior iteration - # but no longer see it, we'll never see it again (since it fell out of our - # sliding window by being too old). Hence, keeping it around is - # pointless. - # - # We only persist this minimized set of test-cases if _everything_ succeeds, - # since if something fails below, there's a chance that we haven't revisited - # test-cases that we've already seen. - all_test_cases_seen = set(last_seen_test_cases) - test_cases_seen_this_invocation = [] - most_recent_date = earliest_date_to_check - for date, candidates in test_cases_on_or_after(earliest_date_to_check): - most_recent_date = max(most_recent_date, date) - - for url in candidates: - test_cases_seen_this_invocation.append(url) - if url in all_test_cases_seen: - continue - - all_test_cases_seen.add(url) - submit_test_case(url, forcey) - - # Persisting on each iteration of this loop isn't free, but it's the - # easiest way to not resubmit test-cases, and it's good to keep in mind - # that: - # - the state file will be small (<12KB, since it only keeps a few days - # worth of test-cases after the first run) - # - in addition to this, we're downloading+unzipping+reuploading multiple - # MB of test-case bytes. - # - # So comparatively, the overhead here probably isn't an issue. - persist_state( - all_test_cases_seen, state_file_path, most_recent_date - ) - - persist_state( - test_cases_seen_this_invocation, state_file_path, most_recent_date - ) - - -def main(argv: List[str]): - logging.basicConfig( - format=">> %(asctime)s: %(levelname)s: %(filename)s:%(lineno)d: " - "%(message)s", - level=logging.INFO, - ) - - my_dir = os.path.dirname(os.path.abspath(__file__)) - - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "--state_file", default=os.path.join(my_dir, "lexan-state.json") - ) - parser.add_argument( - "--last_date", - help="The earliest date that we care about. All test cases from here " - "on will be picked up. Format is YYYY-MM-DD.", - ) - parser.add_argument( - "--4c", dest="forcey", required=True, help="Path to a 4c client binary" - ) - opts = parser.parse_args(argv) - - forcey = opts.forcey - state_file = opts.state_file - last_date_str = opts.last_date - - os.makedirs(os.path.dirname(state_file), 0o755, exist_ok=True) - - if last_date_str is None: - with open(state_file, encoding="utf-8") as f: - data = json.load(f) - most_recent_date = from_ymd(data["most_recent_date"]) - submit_new_test_cases( - last_seen_test_cases=data["already_seen"], - # Note that we always subtract one day from this to avoid a race: - # uploads may appear slightly out-of-order (or builders may lag, or - # ...), so the last test-case uploaded for 2020/01/01 might appear - # _after_ the first test-case for 2020/01/02. Assuming that builders - # won't lag behind for over a day, the easiest way to handle this is to - # always check the previous and current days. - earliest_date_to_check=most_recent_date - - datetime.timedelta(days=1), - forcey=forcey, - state_file_path=state_file, - ) - else: - submit_new_test_cases( - last_seen_test_cases=(), - earliest_date_to_check=from_ymd(last_date_str), - forcey=forcey, - state_file_path=state_file, - ) - - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) diff --git a/llvm_tools/upload_lexan_crashes_to_forcey_test.py b/llvm_tools/upload_lexan_crashes_to_forcey_test.py deleted file mode 100755 index 76e90e08..00000000 --- a/llvm_tools/upload_lexan_crashes_to_forcey_test.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2020 The ChromiumOS Authors -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Tests for upload_lexan_crashes_to_forcey.""" - -import datetime -import os -import unittest -import unittest.mock - -import upload_lexan_crashes_to_forcey - - -class Test(unittest.TestCase): - """Tests for upload_lexan_crashes_to_forcey.""" - - def test_date_parsing_functions(self): - self.assertEqual( - datetime.date(2020, 2, 1), - upload_lexan_crashes_to_forcey.from_ymd("2020-02-01"), - ) - - @unittest.mock.patch( - "upload_lexan_crashes_to_forcey.test_cases_on_or_after", - return_value=( - ( - datetime.date(2020, 1, 1), - ("gs://test-case-1", "gs://test-case-1.1"), - ), - (datetime.date(2020, 1, 2), ("gs://test-case-2",)), - (datetime.date(2020, 1, 1), ("gs://test-case-3",)), - (datetime.date(2020, 1, 4), ("gs://test-case-4",)), - ), - ) - @unittest.mock.patch("upload_lexan_crashes_to_forcey.submit_test_case") - @unittest.mock.patch("upload_lexan_crashes_to_forcey.persist_state") - def test_new_test_case_submission_functions( - self, - persist_state_mock, - submit_test_case_mock, - test_cases_on_or_after_mock, - ): - forcey_path = "/path/to/4c" - real_state_file_path = "/path/to/state/file" - earliest_date = datetime.date(2020, 1, 1) - - persist_state_calls = [] - - # Since the set this gets is mutated, we need to copy it somehow. - def persist_state_side_effect( - test_cases_to_persist, state_file_path, most_recent_date - ): - self.assertEqual(state_file_path, real_state_file_path) - persist_state_calls.append( - (sorted(test_cases_to_persist), most_recent_date) - ) - - persist_state_mock.side_effect = persist_state_side_effect - - upload_lexan_crashes_to_forcey.submit_new_test_cases( - last_seen_test_cases=( - "gs://test-case-0", - "gs://test-case-1", - ), - earliest_date_to_check=earliest_date, - forcey=forcey_path, - state_file_path=real_state_file_path, - ) - - test_cases_on_or_after_mock.assert_called_once_with(earliest_date) - self.assertEqual( - submit_test_case_mock.call_args_list, - [ - unittest.mock.call("gs://test-case-1.1", forcey_path), - unittest.mock.call("gs://test-case-2", forcey_path), - unittest.mock.call("gs://test-case-3", forcey_path), - unittest.mock.call("gs://test-case-4", forcey_path), - ], - ) - - self.assertEqual( - persist_state_calls, - [ - ( - [ - "gs://test-case-0", - "gs://test-case-1", - "gs://test-case-1.1", - ], - datetime.date(2020, 1, 1), - ), - ( - [ - "gs://test-case-0", - "gs://test-case-1", - "gs://test-case-1.1", - "gs://test-case-2", - ], - datetime.date(2020, 1, 2), - ), - ( - [ - "gs://test-case-0", - "gs://test-case-1", - "gs://test-case-1.1", - "gs://test-case-2", - "gs://test-case-3", - ], - datetime.date(2020, 1, 2), - ), - ( - [ - "gs://test-case-0", - "gs://test-case-1", - "gs://test-case-1.1", - "gs://test-case-2", - "gs://test-case-3", - "gs://test-case-4", - ], - datetime.date(2020, 1, 4), - ), - ( - [ - "gs://test-case-1", - "gs://test-case-1.1", - "gs://test-case-2", - "gs://test-case-3", - "gs://test-case-4", - ], - datetime.date(2020, 1, 4), - ), - ], - ) - - @unittest.mock.patch( - "upload_lexan_crashes_to_forcey.download_and_unpack_test_case" - ) - @unittest.mock.patch("subprocess.run") - @unittest.mock.patch("upload_lexan_crashes_to_forcey.fetch_gs_file_size") - def test_test_case_submission_functions( - self, - fetch_gs_file_size_mock, - subprocess_run_mock, - download_and_unpack_mock, - ): - fetch_gs_file_size_mock.return_value = 1024 - mock_gs_url = "gs://foo/bar/baz" - - def side_effect(gs_url: str, tempdir: str) -> None: - self.assertEqual(gs_url, mock_gs_url) - - # All we need is an empty file here. - open(os.path.join(tempdir, "test_case.c"), "w").close() - with open( - os.path.join(tempdir, "test_case.sh"), "w", encoding="utf-8" - ) as f: - f.write("# Crash reproducer for clang version 9.0.0 (...)\n") - f.write("clang something or other\n") - - download_and_unpack_mock.side_effect = side_effect - upload_lexan_crashes_to_forcey.submit_test_case(mock_gs_url, "4c") - subprocess_run_mock.assert_not_called() - - @unittest.mock.patch("subprocess.run") - def test_file_size_getting_functions(self, subprocess_run_mock): - mock_gs_url = "gs://foo/bar/baz" - - def side_effect(cmd, **_kwargs) -> None: - self.assertEqual(cmd, ["gsutil.py", "du", mock_gs_url]) - result = unittest.mock.MagicMock() - result.stdout = f"1234 {mock_gs_url}" - return result - - subprocess_run_mock.side_effect = side_effect - size = upload_lexan_crashes_to_forcey.fetch_gs_file_size(mock_gs_url) - self.assertEqual(size, 1234) - subprocess_run_mock.assert_called_once() - - @unittest.mock.patch( - "upload_lexan_crashes_to_forcey.download_and_unpack_test_case" - ) - @unittest.mock.patch("subprocess.run") - @unittest.mock.patch("upload_lexan_crashes_to_forcey.fetch_gs_file_size") - def test_linker_tarballs_are_skipped( - self, - fetch_gs_file_size_mock, - subprocess_run_mock, - download_and_unpack_mock, - ): - fetch_gs_file_size_mock.return_value = 1024 - mock_gs_url = "gs://foo/bar/baz" - - def side_effect(gs_url: str, tempdir: str) -> None: - self.assertEqual(gs_url, mock_gs_url) - # All we need is an empty file here. - open(os.path.join(tempdir, "test_case.tar"), "w").close() - - download_and_unpack_mock.side_effect = side_effect - upload_lexan_crashes_to_forcey.submit_test_case(mock_gs_url, "4c") - subprocess_run_mock.assert_not_called() - - -if __name__ == "__main__": - unittest.main() |