| # Copyright 2021 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Tests for running fuzzers.""" |
| import json |
| import os |
| import sys |
| import shutil |
| import tempfile |
| import unittest |
| from unittest import mock |
| |
| import parameterized |
| from pyfakefs import fake_filesystem_unittest |
| |
| import build_fuzzers |
| import fuzz_target |
| import run_fuzzers |
| |
| # pylint: disable=wrong-import-position |
| INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| sys.path.append(INFRA_DIR) |
| |
| import helper |
| import test_helpers |
| |
| # NOTE: This integration test relies on |
| # https://github.com/google/oss-fuzz/tree/master/projects/example project. |
| EXAMPLE_PROJECT = 'example' |
| |
| # Location of files used for testing. |
| TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), |
| 'test_data') |
| |
| MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory') |
| MEMORY_FUZZER = 'curl_fuzzer_memory' |
| |
| UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined') |
| UNDEFINED_FUZZER = 'curl_fuzzer_undefined' |
| |
| FUZZ_SECONDS = 10 |
| |
| |
| class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name |
| """Mixin for integration test classes that runbuild_fuzzers on builds of a |
| specific sanitizer.""" |
| # These must be defined by children. |
| FUZZER_DIR = None |
| FUZZER = None |
| |
| def setUp(self): |
| """Patch the environ so that we can execute runner scripts.""" |
| test_helpers.patch_environ(self, runner=True) |
| |
| def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer): |
| """Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts |
| the run succeeded and that no bug was found.""" |
| with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy: |
| config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS, |
| workspace=fuzzer_dir_copy, |
| oss_fuzz_project_name='curl', |
| sanitizer=sanitizer) |
| result = run_fuzzers.run_fuzzers(config) |
| self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) |
| |
| |
| @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), |
| 'INTEGRATION_TESTS=1 not set') |
| class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, |
| unittest.TestCase): |
| """Integration test for build_fuzzers with an MSAN build.""" |
| FUZZER_DIR = MEMORY_FUZZER_DIR |
| FUZZER = MEMORY_FUZZER |
| |
| def test_run_with_memory_sanitizer(self): |
| """Tests run_fuzzers with a valid MSAN build.""" |
| self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory') |
| |
| |
| @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), |
| 'INTEGRATION_TESTS=1 not set') |
| class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, |
| unittest.TestCase): |
| """Integration test for build_fuzzers with an UBSAN build.""" |
| FUZZER_DIR = UNDEFINED_FUZZER_DIR |
| FUZZER = UNDEFINED_FUZZER |
| |
| def test_run_with_undefined_sanitizer(self): |
| """Tests run_fuzzers with a valid UBSAN build.""" |
| self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined') |
| |
| |
| class BaseFuzzTargetRunnerTest(unittest.TestCase): |
| """Tests BaseFuzzTargetRunner.""" |
| |
| def _create_runner(self, **kwargs): # pylint: disable=no-self-use |
| defaults = { |
| 'fuzz_seconds': FUZZ_SECONDS, |
| 'oss_fuzz_project_name': EXAMPLE_PROJECT |
| } |
| for default_key, default_value in defaults.items(): |
| if default_key not in kwargs: |
| kwargs[default_key] = default_value |
| |
| config = test_helpers.create_run_config(**kwargs) |
| return run_fuzzers.BaseFuzzTargetRunner(config) |
| |
| def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs): |
| with mock.patch('logging.error') as mock_error: |
| runner = self._create_runner(**create_runner_kwargs) |
| self.assertFalse(runner.initialize()) |
| mock_error.assert_called_with(*expected_error_args) |
| |
| @parameterized.parameterized.expand([(0,), (None,), (-1,)]) |
| def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds): |
| """Tests initialize fails with an invalid fuzz seconds.""" |
| expected_error_args = ('Fuzz_seconds argument must be greater than 1, ' |
| 'but was: %s.', fuzz_seconds) |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.mkdir(out_path) |
| with mock.patch('utils.get_fuzz_targets') as mock_get_fuzz_targets: |
| mock_get_fuzz_targets.return_value = [ |
| os.path.join(out_path, 'fuzz_target') |
| ] |
| self._test_initialize_fail(expected_error_args, |
| fuzz_seconds=fuzz_seconds, |
| workspace=tmp_dir) |
| |
| def test_initialize_no_out_dir(self): |
| """Tests initialize fails with no out dir.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| expected_error_args = ('Out directory: %s does not exist.', out_path) |
| self._test_initialize_fail(expected_error_args, workspace=tmp_dir) |
| |
| def test_initialize_nonempty_artifacts(self): |
| """Tests initialize with a file artifacts path.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.mkdir(out_path) |
| os.makedirs(os.path.join(tmp_dir, 'out')) |
| artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts') |
| with open(artifacts_path, 'w') as artifacts_handle: |
| artifacts_handle.write('fake') |
| expected_error_args = ( |
| 'Artifacts path: %s exists and is not an empty directory.', |
| artifacts_path) |
| self._test_initialize_fail(expected_error_args, workspace=tmp_dir) |
| |
| def test_initialize_bad_artifacts(self): |
| """Tests initialize with a non-empty artifacts path.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.mkdir(out_path) |
| artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts') |
| os.makedirs(artifacts_path) |
| artifact_path = os.path.join(artifacts_path, 'artifact') |
| with open(artifact_path, 'w') as artifact_handle: |
| artifact_handle.write('fake') |
| expected_error_args = ( |
| 'Artifacts path: %s exists and is not an empty directory.', |
| artifacts_path) |
| self._test_initialize_fail(expected_error_args, workspace=tmp_dir) |
| |
| @mock.patch('utils.get_fuzz_targets') |
| @mock.patch('logging.error') |
| def test_initialize_empty_artifacts(self, mock_log_error, |
| mock_get_fuzz_targets): |
| """Tests initialize with an empty artifacts dir.""" |
| mock_get_fuzz_targets.return_value = ['fuzz-target'] |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.mkdir(out_path) |
| artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts') |
| os.makedirs(artifacts_path) |
| runner = self._create_runner(workspace=tmp_dir) |
| self.assertTrue(runner.initialize()) |
| mock_log_error.assert_not_called() |
| self.assertTrue(os.path.isdir(artifacts_path)) |
| |
| @mock.patch('utils.get_fuzz_targets') |
| @mock.patch('logging.error') |
| def test_initialize_no_artifacts(self, mock_log_error, mock_get_fuzz_targets): |
| """Tests initialize with no artifacts dir (the expected setting).""" |
| mock_get_fuzz_targets.return_value = ['fuzz-target'] |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.mkdir(out_path) |
| runner = self._create_runner(workspace=tmp_dir) |
| self.assertTrue(runner.initialize()) |
| mock_log_error.assert_not_called() |
| self.assertTrue(os.path.isdir(os.path.join(tmp_dir, 'out', 'artifacts'))) |
| |
| def test_initialize_no_fuzz_targets(self): |
| """Tests initialize with no fuzz targets.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.makedirs(out_path) |
| expected_error_args = ('No fuzz targets were found in out directory: %s.', |
| out_path) |
| self._test_initialize_fail(expected_error_args, workspace=tmp_dir) |
| |
| def test_get_fuzz_target_artifact(self): |
| """Tests that get_fuzz_target_artifact works as intended.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| runner = self._create_runner(workspace=tmp_dir) |
| crashes_dir = 'crashes-dir' |
| runner.crashes_dir = crashes_dir |
| artifact_name = 'artifact-name' |
| target = mock.MagicMock() |
| target_name = 'target_name' |
| target.target_name = target_name |
| |
| fuzz_target_artifact = runner.get_fuzz_target_artifact( |
| target, artifact_name) |
| expected_fuzz_target_artifact = os.path.join( |
| tmp_dir, 'out', 'artifacts', 'target_name-address-artifact-name') |
| |
| self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact) |
| |
| |
| class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): |
| """Tests that CiFuzzTargetRunner works as intended.""" |
| |
| def setUp(self): |
| self.setUpPyfakefs() |
| |
| @mock.patch('utils.get_fuzz_targets') |
| @mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target') |
| @mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj') |
| def test_run_fuzz_targets_quits(self, mock_create_fuzz_target_obj, |
| mock_run_fuzz_target, mock_get_fuzz_targets): |
| """Tests that run_fuzz_targets quits on the first crash it finds.""" |
| workspace = 'workspace' |
| out_path = os.path.join(workspace, 'build-out') |
| self.fs.create_dir(out_path) |
| config = test_helpers.create_run_config( |
| fuzz_seconds=FUZZ_SECONDS, |
| workspace=workspace, |
| oss_fuzz_project_name=EXAMPLE_PROJECT) |
| runner = run_fuzzers.CiFuzzTargetRunner(config) |
| |
| mock_get_fuzz_targets.return_value = ['target1', 'target2'] |
| runner.initialize() |
| testcase = os.path.join(workspace, 'testcase') |
| self.fs.create_file(testcase) |
| stacktrace = 'stacktrace' |
| corpus_dir = 'corpus' |
| self.fs.create_dir(corpus_dir) |
| mock_run_fuzz_target.return_value = fuzz_target.FuzzResult( |
| testcase, stacktrace, corpus_dir) |
| magic_mock = mock.MagicMock() |
| magic_mock.target_name = 'target1' |
| mock_create_fuzz_target_obj.return_value = magic_mock |
| self.assertTrue(runner.run_fuzz_targets()) |
| self.assertIn('target1-address-testcase', |
| os.listdir(runner.workspace.artifacts)) |
| self.assertEqual(mock_run_fuzz_target.call_count, 1) |
| |
| |
| class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): |
| """Tests that BatchFuzzTargetRunnerTest works as intended.""" |
| WORKSPACE = 'workspace' |
| STACKTRACE = 'stacktrace' |
| CORPUS_DIR = 'corpus' |
| |
| def setUp(self): |
| self.setUpPyfakefs() |
| out_dir = os.path.join(self.WORKSPACE, 'build-out') |
| self.fs.create_dir(out_dir) |
| self.testcase1 = os.path.join(out_dir, 'testcase-aaa') |
| self.fs.create_file(self.testcase1) |
| self.testcase2 = os.path.join(out_dir, 'testcase-bbb') |
| self.fs.create_file(self.testcase2) |
| self.config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS, |
| workspace=self.WORKSPACE, |
| is_github=True) |
| |
| @mock.patch('utils.get_fuzz_targets', return_value=['target1', 'target2']) |
| @mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_build', |
| return_value=True) |
| @mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target') |
| @mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj') |
| def test_run_fuzz_targets_quits(self, mock_create_fuzz_target_obj, |
| mock_run_fuzz_target, _, __): |
| """Tests that run_fuzz_targets doesn't quit on the first crash it finds.""" |
| runner = run_fuzzers.BatchFuzzTargetRunner(self.config) |
| runner.initialize() |
| |
| call_count = 0 |
| |
| def mock_run_fuzz_target_impl(_): |
| nonlocal call_count |
| if call_count == 0: |
| testcase = self.testcase1 |
| elif call_count == 1: |
| testcase = self.testcase2 |
| assert call_count != 2 |
| call_count += 1 |
| if not os.path.exists(self.CORPUS_DIR): |
| self.fs.create_dir(self.CORPUS_DIR) |
| return fuzz_target.FuzzResult(testcase, self.STACKTRACE, self.CORPUS_DIR) |
| |
| mock_run_fuzz_target.side_effect = mock_run_fuzz_target_impl |
| magic_mock = mock.MagicMock() |
| magic_mock.target_name = 'target1' |
| mock_create_fuzz_target_obj.return_value = magic_mock |
| self.assertTrue(runner.run_fuzz_targets()) |
| self.assertEqual(mock_run_fuzz_target.call_count, 2) |
| |
| @mock.patch('run_fuzzers.BaseFuzzTargetRunner.run_fuzz_targets', |
| return_value=False) |
| @mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_crashes') |
| def test_run_fuzz_targets_upload_crashes_and_builds(self, mock_upload_crashes, |
| _): |
| """Tests that run_fuzz_targets uploads crashes and builds correctly.""" |
| runner = run_fuzzers.BatchFuzzTargetRunner(self.config) |
| # TODO(metzman): Don't rely on this failing gracefully. |
| runner.initialize() |
| |
| self.assertFalse(runner.run_fuzz_targets()) |
| self.assertEqual(mock_upload_crashes.call_count, 1) |
| |
| |
| @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), |
| 'INTEGRATION_TESTS=1 not set') |
| class CoverageReportIntegrationTest(unittest.TestCase): |
| """Integration tests for coverage reports.""" |
| SANITIZER = 'coverage' |
| |
| def setUp(self): |
| test_helpers.patch_environ(self, runner=True) |
| |
| @mock.patch('filestore.github_actions._upload_artifact_with_upload_js') |
| def test_coverage_report(self, _): |
| """Tests generation of coverage reports end-to-end, from building to |
| generation.""" |
| |
| with test_helpers.docker_temp_dir() as temp_dir: |
| shared = os.path.join(temp_dir, 'shared') |
| os.mkdir(shared) |
| copy_command = ('cp -r /opt/code_coverage /shared && ' |
| 'cp $(which llvm-profdata) /shared && ' |
| 'cp $(which llvm-cov) /shared') |
| assert helper.docker_run([ |
| '-v', f'{shared}:/shared', 'gcr.io/oss-fuzz-base/base-runner', 'bash', |
| '-c', copy_command |
| ]) |
| |
| os.environ['CODE_COVERAGE_SRC'] = os.path.join(shared, 'code_coverage') |
| os.environ['PATH'] += os.pathsep + shared |
| # Do coverage build. |
| build_config = test_helpers.create_build_config( |
| oss_fuzz_project_name=EXAMPLE_PROJECT, |
| project_repo_name='oss-fuzz', |
| workspace=temp_dir, |
| commit_sha='0b95fe1039ed7c38fea1f97078316bfc1030c523', |
| base_commit='da0746452433dc18bae699e355a9821285d863c8', |
| sanitizer=self.SANITIZER, |
| is_github=True, |
| # Needed for test not to fail because of permissions issues. |
| bad_build_check=False) |
| self.assertTrue(build_fuzzers.build_fuzzers(build_config)) |
| |
| # TODO(metzman): Get rid of this here and make 'compile' do this. |
| chmod_command = ('chmod -R +r /out && ' |
| 'find /out -type d -exec chmod +x {} +') |
| |
| assert helper.docker_run([ |
| '-v', f'{os.path.join(temp_dir, "build-out")}:/out', |
| 'gcr.io/oss-fuzz-base/base-builder', 'bash', '-c', chmod_command |
| ]) |
| |
| # Generate report. |
| run_config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS, |
| workspace=temp_dir, |
| sanitizer=self.SANITIZER, |
| run_fuzzers_mode='coverage', |
| is_github=True) |
| result = run_fuzzers.run_fuzzers(run_config) |
| self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) |
| expected_summary_path = os.path.join( |
| TEST_DATA_PATH, 'example_coverage_report_summary.json') |
| with open(expected_summary_path) as file_handle: |
| expected_summary = json.loads(file_handle.read()) |
| actual_summary_path = os.path.join(temp_dir, 'cifuzz-coverage', |
| 'report', 'linux', 'summary.json') |
| with open(actual_summary_path) as file_handle: |
| actual_summary = json.loads(file_handle.read()) |
| self.assertEqual(expected_summary, actual_summary) |
| |
| |
| @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), |
| 'INTEGRATION_TESTS=1 not set') |
| class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin, |
| unittest.TestCase): |
| """Integration tests for build_fuzzers with an ASAN build.""" |
| |
| BUILD_DIR_NAME = 'cifuzz-latest-build' |
| |
| def test_new_bug_found(self): |
| """Tests run_fuzzers with a valid ASAN build.""" |
| # Set the first return value to True, then the second to False to |
| # emulate a bug existing in the current PR but not on the downloaded |
| # OSS-Fuzz build. |
| with mock.patch('fuzz_target.FuzzTarget.is_reproducible', |
| side_effect=[True, False]): |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| workspace = os.path.join(tmp_dir, 'workspace') |
| shutil.copytree(TEST_DATA_PATH, workspace) |
| config = test_helpers.create_run_config( |
| fuzz_seconds=FUZZ_SECONDS, |
| workspace=workspace, |
| oss_fuzz_project_name=EXAMPLE_PROJECT) |
| result = run_fuzzers.run_fuzzers(config) |
| self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND) |
| |
| @mock.patch('fuzz_target.FuzzTarget.is_reproducible', |
| side_effect=[True, True]) |
| def test_old_bug_found(self, _): |
| """Tests run_fuzzers with a bug found in OSS-Fuzz before.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| workspace = os.path.join(tmp_dir, 'workspace') |
| shutil.copytree(TEST_DATA_PATH, workspace) |
| config = test_helpers.create_run_config( |
| fuzz_seconds=FUZZ_SECONDS, |
| workspace=workspace, |
| oss_fuzz_project_name=EXAMPLE_PROJECT) |
| result = run_fuzzers.run_fuzzers(config) |
| self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) |
| |
| def test_invalid_build(self): |
| """Tests run_fuzzers with an invalid ASAN build.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| out_path = os.path.join(tmp_dir, 'build-out') |
| os.mkdir(out_path) |
| config = test_helpers.create_run_config( |
| fuzz_seconds=FUZZ_SECONDS, |
| workspace=tmp_dir, |
| oss_fuzz_project_name=EXAMPLE_PROJECT) |
| result = run_fuzzers.run_fuzzers(config) |
| self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR) |
| |
| |
| class GetFuzzTargetRunnerTest(unittest.TestCase): |
| """Tests for get_fuzz_fuzz_target_runner.""" |
| |
| @parameterized.parameterized.expand([ |
| ('batch', run_fuzzers.BatchFuzzTargetRunner), |
| ('ci', run_fuzzers.CiFuzzTargetRunner), |
| ('coverage', run_fuzzers.CoverageTargetRunner) |
| ]) |
| def test_get_fuzz_target_runner(self, run_fuzzers_mode, |
| fuzz_target_runner_cls): |
| """Tests that get_fuzz_target_runner returns the correct runner based on the |
| specified run_fuzzers_mode.""" |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| run_config = test_helpers.create_run_config( |
| fuzz_seconds=FUZZ_SECONDS, |
| workspace=tmp_dir, |
| oss_fuzz_project_name='example', |
| run_fuzzers_mode=run_fuzzers_mode) |
| runner = run_fuzzers.get_fuzz_target_runner(run_config) |
| self.assertTrue(isinstance(runner, fuzz_target_runner_cls)) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |