| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Unit tests for apexer_with_DCLA_preprocessing.""" |
| import hashlib |
| import logging |
| import os |
| import shutil |
| import stat |
| import subprocess |
| import tempfile |
| from typing import List |
| import unittest |
| import zipfile |
| |
| TEST_PRIVATE_KEY = os.path.join('testdata', 'com.android.example.apex.pem') |
| TEST_APEX = 'com.android.example.apex' |
| |
| # In order to debug test failures, set DEBUG_TEST to True and run the test from |
| # local workstation bypassing atest, e.g.: |
| # $ m apexer_with_DCLA_preprocessing_test && \ |
| # out/host/linux-x86/nativetest64/apexer_with_DCLA_preprocessing_test/\ |
| # apexer_with_DCLA_preprocessing_test |
| # |
| # the test will print out the command used, and the temporary files used by the |
| # test. |
| DEBUG_TEST = False |
| |
| def get_current_dir(): |
| """returns the current dir, relative to the script dir.""" |
| # The script dir is the one we want, which could be different from pwd. |
| current_dir = os.path.dirname(os.path.realpath(__file__)) |
| return current_dir |
| |
| # TODO: consolidate these common test utilities into a common python_library_host |
| # to be shared across tests under system/apex |
| def run_command(cmd: List[str]) -> None: |
| """Run a command.""" |
| try: |
| if DEBUG_TEST: |
| cmd_str = ' '.join(cmd) |
| print(f'\nRunning: \n{cmd_str}\n') |
| res = subprocess.run( |
| cmd, |
| check=True, |
| stdout=subprocess.PIPE, |
| universal_newlines=True, |
| stderr=subprocess.PIPE) |
| except subprocess.CalledProcessError as err: |
| print(err.stderr) |
| print(err.output) |
| raise err |
| |
| def get_apexer_with_DCLA_preprocessing() -> str: |
| tool_binary = os.path.join(get_current_dir(), 'apexer_with_DCLA_preprocessing') |
| if not os.path.isfile(tool_binary): |
| raise FileNotFoundError(f'cannot find tooling apexer with DCLA preprocessing') |
| else: |
| os.chmod(tool_binary, stat.S_IRUSR | stat.S_IXUSR); |
| return tool_binary |
| |
| def get_host_tool(tool_name: str) -> str: |
| """get host tools.""" |
| tool_binary = os.path.join(get_current_dir(), 'bin', tool_name) |
| if not os.path.isfile(tool_binary): |
| host_build_top = os.environ.get('ANDROID_BUILD_TOP') |
| if host_build_top: |
| host_command_dir = os.path.join(host_build_top, 'out/host/linux-x86/bin') |
| tool_binary = os.path.join(host_command_dir, tool_name) |
| if not os.path.isfile(tool_binary): |
| host_command_dir = os.path.join(host_build_top, 'prebuilts/sdk/current/public') |
| tool_binary = os.path.join(host_command_dir, tool_name) |
| else: |
| tool_binary = shutil.which(tool_name) |
| |
| if not tool_binary or not os.path.isfile(tool_binary): |
| raise FileNotFoundError(f'cannot find tooling {tool_name}') |
| else: |
| return tool_binary |
| |
| def get_digest(file_path: str) -> str: |
| """Get sha512 digest of a file """ |
| digester = hashlib.sha512() |
| with open(file_path, 'rb') as f: |
| bytes_to_digest = f.read() |
| digester.update(bytes_to_digest) |
| return digester.hexdigest() |
| |
| class ApexerWithDCLAPreprocessingTest(unittest.TestCase): |
| |
| def setUp(self): |
| self._to_cleanup = [] |
| tools_zip_file = os.path.join(get_current_dir(), 'apexer_test_host_tools.zip') |
| self.unzip_host_tools(tools_zip_file) |
| |
| def tearDown(self): |
| if not DEBUG_TEST: |
| for i in self._to_cleanup: |
| if os.path.isdir(i): |
| shutil.rmtree(i, ignore_errors=True) |
| else: |
| os.remove(i) |
| del self._to_cleanup[:] |
| else: |
| print('Cleanup: ' + str(self._to_cleanup)) |
| |
| def create_temp_dir(self) -> str: |
| tmp_dir = tempfile.mkdtemp() |
| self._to_cleanup.append(tmp_dir) |
| return tmp_dir |
| |
| def expand_apex(self, apex_file: str) -> None: |
| """expand an apex file include apex_payload.""" |
| apex_dir = self.create_temp_dir() |
| with zipfile.ZipFile(apex_file, 'r') as apex_zip: |
| apex_zip.extractall(apex_dir) |
| payload_img = os.path.join(apex_dir, 'apex_payload.img') |
| extract_dir = os.path.join(apex_dir, 'payload_extract') |
| os.mkdir(extract_dir) |
| debugfs = get_host_tool('debugfs_static') |
| run_command([debugfs, payload_img, '-R', f'rdump / {extract_dir}']) |
| |
| # remove /etc and /lost+found and /payload_extract/apex_manifest.pb |
| lost_and_found = os.path.join(extract_dir, 'lost+found') |
| etc_dir = os.path.join(extract_dir, 'etc') |
| os.remove(os.path.join(extract_dir, 'apex_manifest.pb')) |
| if os.path.isdir(lost_and_found): |
| shutil.rmtree(lost_and_found) |
| if os.path.isdir(etc_dir): |
| shutil.rmtree(etc_dir) |
| |
| return apex_dir |
| |
| def unzip_host_tools(self, host_tools_file_path: str) -> None: |
| dir_name = get_current_dir() |
| if os.path.isfile(host_tools_file_path): |
| with zipfile.ZipFile(host_tools_file_path, 'r') as zip_obj: |
| zip_obj.extractall(dir_name) |
| |
| for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", |
| "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", |
| "signapk.jar", "android.jar"]: |
| file_path = os.path.join(dir_name, "bin", i) |
| if os.path.exists(file_path): |
| os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); |
| |
| def test_DCLA_preprocessing(self): |
| """test DCLA preprocessing done properly.""" |
| apex_file = os.path.join(get_current_dir(), TEST_APEX + '.apex') |
| apex_dir = self.expand_apex(apex_file) |
| |
| # create apex canned_fs_config file, TEST_APEX does not come with one |
| canned_fs_config_file = os.path.join(apex_dir, 'canned_fs_config') |
| with open(canned_fs_config_file, 'w') as f: |
| # add /lib/foo.so file |
| lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib') |
| os.makedirs(lib_dir) |
| foo_file = os.path.join(lib_dir, 'foo.so') |
| with open(foo_file, 'w') as lib_file: |
| lib_file.write('This is a placeholder lib file.') |
| foo_digest = get_digest(foo_file) |
| |
| # add /lib dir and /lib/foo.so in canned_fs_config |
| f.write(f'/lib 0 2000 0755\n') |
| f.write(f'/lib/foo.so 1000 1000 0644\n') |
| |
| # add /lib/bar.so file |
| lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib64') |
| os.makedirs(lib_dir) |
| bar_file = os.path.join(lib_dir, 'bar.so') |
| with open(bar_file, 'w') as lib_file: |
| lib_file.write('This is another placeholder lib file.') |
| bar_digest = get_digest(bar_file) |
| |
| # add /lib dir and /lib/foo.so in canned_fs_config |
| f.write(f'/lib64 0 2000 0755\n') |
| f.write(f'/lib64/bar.so 1000 1000 0644\n') |
| |
| f.write(f'/ 0 2000 0755\n') |
| f.write(f'/apex_manifest.pb 1000 1000 0644\n') |
| |
| # call apexer_with_DCLA_preprocessing |
| manifest_file = os.path.join(apex_dir, 'apex_manifest.pb') |
| build_info_file = os.path.join(apex_dir, 'apex_build_info.pb') |
| key_file = os.path.join(get_current_dir(), TEST_PRIVATE_KEY) |
| apexer= get_host_tool('apexer') |
| apexer_wrapper = get_apexer_with_DCLA_preprocessing() |
| android_jar = get_host_tool('android.jar') |
| apex_out = os.path.join(apex_dir, 'DCLA_preprocessed_output.apex') |
| run_command([apexer_wrapper, |
| '--apexer', apexer, |
| '--canned_fs_config', canned_fs_config_file, |
| os.path.join(apex_dir, 'payload_extract'), |
| apex_out, |
| '--', |
| '--android_jar_path', android_jar, |
| '--apexer_tool_path', os.path.dirname(apexer), |
| '--key', key_file, |
| '--manifest', manifest_file, |
| '--build_info', build_info_file, |
| '--payload_fs_type', 'ext4', |
| '--payload_type', 'image', |
| '--force' |
| ]) |
| |
| # check the existence of updated canned_fs_config |
| updated_canned_fs_config = os.path.join(apex_dir, 'updated_canned_fs_config') |
| self.assertTrue( |
| os.path.isfile(updated_canned_fs_config), |
| 'missing updated canned_fs_config file named updated_canned_fs_config') |
| |
| # check the resulting apex, it should have /lib/foo.so/<hash>/foo.so and |
| # /lib64/bar.so/<hash>/bar.so |
| result_apex_dir = self.expand_apex(apex_out) |
| replaced_foo = os.path.join( |
| result_apex_dir, f'payload_extract/lib/foo.so/{foo_digest}/foo.so') |
| replaced_bar = os.path.join( |
| result_apex_dir, f'payload_extract/lib64/bar.so/{bar_digest}/bar.so') |
| self.assertTrue( |
| os.path.isfile(replaced_foo), |
| f'expecting /lib/foo.so/{foo_digest}/foo.so') |
| self.assertTrue( |
| os.path.isfile(replaced_bar), |
| f'expecting /lib64/bar.so/{bar_digest}/bar.so') |
| |
| if __name__ == '__main__': |
| unittest.main(verbosity=2) |