# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for running fuzzers.""" import json import os import sys import shutil import tempfile import unittest from unittest import mock import parameterized from pyfakefs import fake_filesystem_unittest import build_fuzzers import fuzz_target import run_fuzzers # pylint: disable=wrong-import-position INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(INFRA_DIR) import helper import test_helpers # NOTE: This integration test relies on # https://github.com/google/oss-fuzz/tree/master/projects/example project. EXAMPLE_PROJECT = 'example' # Location of files used for testing. TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_data') MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory') MEMORY_FUZZER = 'curl_fuzzer_memory' UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined') UNDEFINED_FUZZER = 'curl_fuzzer_undefined' FUZZ_SECONDS = 10 class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name """Mixin for integration test classes that runbuild_fuzzers on builds of a specific sanitizer.""" # These must be defined by children. FUZZER_DIR = None FUZZER = None def setUp(self): """Patch the environ so that we can execute runner scripts.""" test_helpers.patch_environ(self, runner=True) def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer): """Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts the run succeeded and that no bug was found.""" with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy: config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS, workspace=fuzzer_dir_copy, oss_fuzz_project_name='curl', sanitizer=sanitizer) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, unittest.TestCase): """Integration test for build_fuzzers with an MSAN build.""" FUZZER_DIR = MEMORY_FUZZER_DIR FUZZER = MEMORY_FUZZER def test_run_with_memory_sanitizer(self): """Tests run_fuzzers with a valid MSAN build.""" self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory') @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, unittest.TestCase): """Integration test for build_fuzzers with an UBSAN build.""" FUZZER_DIR = UNDEFINED_FUZZER_DIR FUZZER = UNDEFINED_FUZZER def test_run_with_undefined_sanitizer(self): """Tests run_fuzzers with a valid UBSAN build.""" self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined') class BaseFuzzTargetRunnerTest(unittest.TestCase): """Tests BaseFuzzTargetRunner.""" def _create_runner(self, **kwargs): # pylint: disable=no-self-use defaults = { 'fuzz_seconds': FUZZ_SECONDS, 'oss_fuzz_project_name': EXAMPLE_PROJECT } for default_key, default_value in defaults.items(): if default_key not in kwargs: kwargs[default_key] = default_value config = test_helpers.create_run_config(**kwargs) return run_fuzzers.BaseFuzzTargetRunner(config) def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs): with mock.patch('logging.error') as mock_error: runner = self._create_runner(**create_runner_kwargs) self.assertFalse(runner.initialize()) mock_error.assert_called_with(*expected_error_args) @parameterized.parameterized.expand([(0,), (None,), (-1,)]) def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds): """Tests initialize fails with an invalid fuzz seconds.""" expected_error_args = ('Fuzz_seconds argument must be greater than 1, ' 'but was: %s.', fuzz_seconds) with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.mkdir(out_path) with mock.patch('utils.get_fuzz_targets') as mock_get_fuzz_targets: mock_get_fuzz_targets.return_value = [ os.path.join(out_path, 'fuzz_target') ] self._test_initialize_fail(expected_error_args, fuzz_seconds=fuzz_seconds, workspace=tmp_dir) def test_initialize_no_out_dir(self): """Tests initialize fails with no out dir.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') expected_error_args = ('Out directory: %s does not exist.', out_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) def test_initialize_nonempty_artifacts(self): """Tests initialize with a file artifacts path.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.mkdir(out_path) os.makedirs(os.path.join(tmp_dir, 'out')) artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts') with open(artifacts_path, 'w') as artifacts_handle: artifacts_handle.write('fake') expected_error_args = ( 'Artifacts path: %s exists and is not an empty directory.', artifacts_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) def test_initialize_bad_artifacts(self): """Tests initialize with a non-empty artifacts path.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.mkdir(out_path) artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts') os.makedirs(artifacts_path) artifact_path = os.path.join(artifacts_path, 'artifact') with open(artifact_path, 'w') as artifact_handle: artifact_handle.write('fake') expected_error_args = ( 'Artifacts path: %s exists and is not an empty directory.', artifacts_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) @mock.patch('utils.get_fuzz_targets') @mock.patch('logging.error') def test_initialize_empty_artifacts(self, mock_log_error, mock_get_fuzz_targets): """Tests initialize with an empty artifacts dir.""" mock_get_fuzz_targets.return_value = ['fuzz-target'] with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.mkdir(out_path) artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts') os.makedirs(artifacts_path) runner = self._create_runner(workspace=tmp_dir) self.assertTrue(runner.initialize()) mock_log_error.assert_not_called() self.assertTrue(os.path.isdir(artifacts_path)) @mock.patch('utils.get_fuzz_targets') @mock.patch('logging.error') def test_initialize_no_artifacts(self, mock_log_error, mock_get_fuzz_targets): """Tests initialize with no artifacts dir (the expected setting).""" mock_get_fuzz_targets.return_value = ['fuzz-target'] with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.mkdir(out_path) runner = self._create_runner(workspace=tmp_dir) self.assertTrue(runner.initialize()) mock_log_error.assert_not_called() self.assertTrue(os.path.isdir(os.path.join(tmp_dir, 'out', 'artifacts'))) def test_initialize_no_fuzz_targets(self): """Tests initialize with no fuzz targets.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.makedirs(out_path) expected_error_args = ('No fuzz targets were found in out directory: %s.', out_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) def test_get_fuzz_target_artifact(self): """Tests that get_fuzz_target_artifact works as intended.""" with tempfile.TemporaryDirectory() as tmp_dir: runner = self._create_runner(workspace=tmp_dir) crashes_dir = 'crashes-dir' runner.crashes_dir = crashes_dir artifact_name = 'artifact-name' target = mock.MagicMock() target_name = 'target_name' target.target_name = target_name fuzz_target_artifact = runner.get_fuzz_target_artifact( target, artifact_name) expected_fuzz_target_artifact = os.path.join( tmp_dir, 'out', 'artifacts', 'target_name-address-artifact-name') self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact) class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): """Tests that CiFuzzTargetRunner works as intended.""" def setUp(self): self.setUpPyfakefs() @mock.patch('utils.get_fuzz_targets') @mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target') @mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj') def test_run_fuzz_targets_quits(self, mock_create_fuzz_target_obj, mock_run_fuzz_target, mock_get_fuzz_targets): """Tests that run_fuzz_targets quits on the first crash it finds.""" workspace = 'workspace' out_path = os.path.join(workspace, 'build-out') self.fs.create_dir(out_path) config = test_helpers.create_run_config( fuzz_seconds=FUZZ_SECONDS, workspace=workspace, oss_fuzz_project_name=EXAMPLE_PROJECT) runner = run_fuzzers.CiFuzzTargetRunner(config) mock_get_fuzz_targets.return_value = ['target1', 'target2'] runner.initialize() testcase = os.path.join(workspace, 'testcase') self.fs.create_file(testcase) stacktrace = 'stacktrace' corpus_dir = 'corpus' self.fs.create_dir(corpus_dir) mock_run_fuzz_target.return_value = fuzz_target.FuzzResult( testcase, stacktrace, corpus_dir) magic_mock = mock.MagicMock() magic_mock.target_name = 'target1' mock_create_fuzz_target_obj.return_value = magic_mock self.assertTrue(runner.run_fuzz_targets()) self.assertIn('target1-address-testcase', os.listdir(runner.workspace.artifacts)) self.assertEqual(mock_run_fuzz_target.call_count, 1) class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): """Tests that BatchFuzzTargetRunnerTest works as intended.""" WORKSPACE = 'workspace' STACKTRACE = 'stacktrace' CORPUS_DIR = 'corpus' def setUp(self): self.setUpPyfakefs() out_dir = os.path.join(self.WORKSPACE, 'build-out') self.fs.create_dir(out_dir) self.testcase1 = os.path.join(out_dir, 'testcase-aaa') self.fs.create_file(self.testcase1) self.testcase2 = os.path.join(out_dir, 'testcase-bbb') self.fs.create_file(self.testcase2) self.config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS, workspace=self.WORKSPACE, is_github=True) @mock.patch('utils.get_fuzz_targets', return_value=['target1', 'target2']) @mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_build', return_value=True) @mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target') @mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj') def test_run_fuzz_targets_quits(self, mock_create_fuzz_target_obj, mock_run_fuzz_target, _, __): """Tests that run_fuzz_targets doesn't quit on the first crash it finds.""" runner = run_fuzzers.BatchFuzzTargetRunner(self.config) runner.initialize() call_count = 0 def mock_run_fuzz_target_impl(_): nonlocal call_count if call_count == 0: testcase = self.testcase1 elif call_count == 1: testcase = self.testcase2 assert call_count != 2 call_count += 1 if not os.path.exists(self.CORPUS_DIR): self.fs.create_dir(self.CORPUS_DIR) return fuzz_target.FuzzResult(testcase, self.STACKTRACE, self.CORPUS_DIR) mock_run_fuzz_target.side_effect = mock_run_fuzz_target_impl magic_mock = mock.MagicMock() magic_mock.target_name = 'target1' mock_create_fuzz_target_obj.return_value = magic_mock self.assertTrue(runner.run_fuzz_targets()) self.assertEqual(mock_run_fuzz_target.call_count, 2) @mock.patch('run_fuzzers.BaseFuzzTargetRunner.run_fuzz_targets', return_value=False) @mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_crashes') def test_run_fuzz_targets_upload_crashes_and_builds(self, mock_upload_crashes, _): """Tests that run_fuzz_targets uploads crashes and builds correctly.""" runner = run_fuzzers.BatchFuzzTargetRunner(self.config) # TODO(metzman): Don't rely on this failing gracefully. runner.initialize() self.assertFalse(runner.run_fuzz_targets()) self.assertEqual(mock_upload_crashes.call_count, 1) @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') class CoverageReportIntegrationTest(unittest.TestCase): """Integration tests for coverage reports.""" SANITIZER = 'coverage' def setUp(self): test_helpers.patch_environ(self, runner=True) @mock.patch('filestore.github_actions._upload_artifact_with_upload_js') def test_coverage_report(self, _): """Tests generation of coverage reports end-to-end, from building to generation.""" with test_helpers.docker_temp_dir() as temp_dir: shared = os.path.join(temp_dir, 'shared') os.mkdir(shared) copy_command = ('cp -r /opt/code_coverage /shared && ' 'cp $(which llvm-profdata) /shared && ' 'cp $(which llvm-cov) /shared') assert helper.docker_run([ '-v', f'{shared}:/shared', 'gcr.io/oss-fuzz-base/base-runner', 'bash', '-c', copy_command ]) os.environ['CODE_COVERAGE_SRC'] = os.path.join(shared, 'code_coverage') os.environ['PATH'] += os.pathsep + shared # Do coverage build. build_config = test_helpers.create_build_config( oss_fuzz_project_name=EXAMPLE_PROJECT, project_repo_name='oss-fuzz', workspace=temp_dir, commit_sha='0b95fe1039ed7c38fea1f97078316bfc1030c523', base_commit='da0746452433dc18bae699e355a9821285d863c8', sanitizer=self.SANITIZER, is_github=True, # Needed for test not to fail because of permissions issues. bad_build_check=False) self.assertTrue(build_fuzzers.build_fuzzers(build_config)) # TODO(metzman): Get rid of this here and make 'compile' do this. chmod_command = ('chmod -R +r /out && ' 'find /out -type d -exec chmod +x {} +') assert helper.docker_run([ '-v', f'{os.path.join(temp_dir, "build-out")}:/out', 'gcr.io/oss-fuzz-base/base-builder', 'bash', '-c', chmod_command ]) # Generate report. run_config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS, workspace=temp_dir, sanitizer=self.SANITIZER, run_fuzzers_mode='coverage', is_github=True) result = run_fuzzers.run_fuzzers(run_config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) expected_summary_path = os.path.join( TEST_DATA_PATH, 'example_coverage_report_summary.json') with open(expected_summary_path) as file_handle: expected_summary = json.loads(file_handle.read()) actual_summary_path = os.path.join(temp_dir, 'cifuzz-coverage', 'report', 'linux', 'summary.json') with open(actual_summary_path) as file_handle: actual_summary = json.loads(file_handle.read()) self.assertEqual(expected_summary, actual_summary) @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin, unittest.TestCase): """Integration tests for build_fuzzers with an ASAN build.""" BUILD_DIR_NAME = 'cifuzz-latest-build' def test_new_bug_found(self): """Tests run_fuzzers with a valid ASAN build.""" # Set the first return value to True, then the second to False to # emulate a bug existing in the current PR but not on the downloaded # OSS-Fuzz build. with mock.patch('fuzz_target.FuzzTarget.is_reproducible', side_effect=[True, False]): with tempfile.TemporaryDirectory() as tmp_dir: workspace = os.path.join(tmp_dir, 'workspace') shutil.copytree(TEST_DATA_PATH, workspace) config = test_helpers.create_run_config( fuzz_seconds=FUZZ_SECONDS, workspace=workspace, oss_fuzz_project_name=EXAMPLE_PROJECT) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND) @mock.patch('fuzz_target.FuzzTarget.is_reproducible', side_effect=[True, True]) def test_old_bug_found(self, _): """Tests run_fuzzers with a bug found in OSS-Fuzz before.""" with tempfile.TemporaryDirectory() as tmp_dir: workspace = os.path.join(tmp_dir, 'workspace') shutil.copytree(TEST_DATA_PATH, workspace) config = test_helpers.create_run_config( fuzz_seconds=FUZZ_SECONDS, workspace=workspace, oss_fuzz_project_name=EXAMPLE_PROJECT) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) def test_invalid_build(self): """Tests run_fuzzers with an invalid ASAN build.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'build-out') os.mkdir(out_path) config = test_helpers.create_run_config( fuzz_seconds=FUZZ_SECONDS, workspace=tmp_dir, oss_fuzz_project_name=EXAMPLE_PROJECT) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR) class GetFuzzTargetRunnerTest(unittest.TestCase): """Tests for get_fuzz_fuzz_target_runner.""" @parameterized.parameterized.expand([ ('batch', run_fuzzers.BatchFuzzTargetRunner), ('ci', run_fuzzers.CiFuzzTargetRunner), ('coverage', run_fuzzers.CoverageTargetRunner) ]) def test_get_fuzz_target_runner(self, run_fuzzers_mode, fuzz_target_runner_cls): """Tests that get_fuzz_target_runner returns the correct runner based on the specified run_fuzzers_mode.""" with tempfile.TemporaryDirectory() as tmp_dir: run_config = test_helpers.create_run_config( fuzz_seconds=FUZZ_SECONDS, workspace=tmp_dir, oss_fuzz_project_name='example', run_fuzzers_mode=run_fuzzers_mode) runner = run_fuzzers.get_fuzz_target_runner(run_config) self.assertTrue(isinstance(runner, fuzz_target_runner_cls)) if __name__ == '__main__': unittest.main()