#!/usr/bin/env python
#
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for apexer."""

import hashlib
import json
import logging
import os
import shutil
import stat
import subprocess
import tempfile
import unittest
from importlib import resources
from zipfile import ZipFile

from apex_manifest import ValidateApexManifest
from apex_manifest import ParseApexManifest

logger = logging.getLogger(__name__)

TEST_APEX = "com.android.example.apex"
TEST_APEX_LEGACY = "com.android.example-legacy.apex"
TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"

TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
TEST_MANIFEST_JSON = os.path.join("testdata", "manifest.json")

def run(args, verbose=None, **kwargs):
    """Creates and returns a subprocess.Popen object.

    Args:
      args: The command represented as a list of strings.
      verbose: Whether the commands should be shown. Default to the global
          verbosity if unspecified.
      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
          stdin, etc. stdout and stderr will default to subprocess.PIPE and
          subprocess.STDOUT respectively unless caller specifies any of them.
          universal_newlines will default to True, as most of the users in
          releasetools expect string output.

    Returns:
      A subprocess.Popen object.
    """
    if 'stdout' not in kwargs and 'stderr' not in kwargs:
        kwargs['stdout'] = subprocess.PIPE
        kwargs['stderr'] = subprocess.STDOUT
    if 'universal_newlines' not in kwargs:
        kwargs['universal_newlines'] = True
    # Don't log any if caller explicitly says so.
    if DEBUG_TEST:
        print("\nRunning: \n%s\n" % " ".join(args))
    if verbose:
        logger.info("  Running: \"%s\"", " ".join(args))
    return subprocess.Popen(args, **kwargs)


def run_host_command(args, verbose=None, **kwargs):
    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
    if host_build_top:
        host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin")
        args[0] = os.path.join(host_command_dir, args[0])
    return run_and_check_output(args, verbose, **kwargs)


def run_and_check_output(args, verbose=None, **kwargs):
    """Runs the given command and returns the output.

    Args:
      args: The command represented as a list of strings.
      verbose: Whether the commands should be shown. Default to the global
          verbosity if unspecified.
      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
          stdin, etc. stdout and stderr will default to subprocess.PIPE and
          subprocess.STDOUT respectively unless caller specifies any of them.

    Returns:
      The output string.

    Raises:
      ExternalError: On non-zero exit from the command.
    """
    proc = run(args, verbose=verbose, **kwargs)
    output, _ = proc.communicate()
    if output is None:
        output = ""
    # Don't log any if caller explicitly says so.
    if verbose:
        logger.info("%s", output.rstrip())
    if proc.returncode != 0:
        raise RuntimeError(
            "Failed to run command '{}' (exit code {}):\n{}".format(
                args, proc.returncode, output))
    return output


def get_sha1sum(file_path):
    h = hashlib.sha256()

    with open(file_path, 'rb') as file:
        while True:
            # Reading is buffered, so we can read smaller chunks.
            chunk = file.read(h.block_size)
            if not chunk:
                break
            h.update(chunk)

    return h.hexdigest()


def round_up(size, unit):
    assert unit & (unit - 1) == 0
    return (size + unit - 1) & (~(unit - 1))

# In order to debug test failures, set DEBUG_TEST to True and run the test from
# local workstation bypassing atest, e.g.:
# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
#
# the test will print out the command used, and the temporary files used by the
# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
# different.
# A simple script to analyze the differences:
#
# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
#
# cd ~/tmp/
# rm -rf input output
# mkdir input output
# unzip ${FILE_INPUT} -d input/
# unzip ${FILE_OUTPUT} -d output/
#
# diff -r input/ output/
#
# For analyzing binary diffs I had mild success using the vbindiff utility.
DEBUG_TEST = False


class ApexerRebuildTest(unittest.TestCase):
    def setUp(self):
        self._to_cleanup = []
        self._get_host_tools()

    def tearDown(self):
        if not DEBUG_TEST:
            for i in self._to_cleanup:
                if os.path.isdir(i):
                    shutil.rmtree(i, ignore_errors=True)
                else:
                    os.remove(i)
            del self._to_cleanup[:]
        else:
            print(self._to_cleanup)

    def _get_host_tools(self):
        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
        self._to_cleanup.append(dir_name)
        with resources.files("apexer_test").joinpath("apexer_test_host_tools.zip").open('rb') as f:
            with ZipFile(f, 'r') as zip_obj:
                zip_obj.extractall(path=dir_name)

        files = {}
        for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
                  "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
                  "signapk.jar", "android.jar", "mkfs.erofs", "fsck.erofs", "conv_apex_manifest"]:
            file_path = os.path.join(dir_name, "bin", i)
            if os.path.exists(file_path):
                os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR)
                files[i] = file_path
            else:
                files[i] = i
        self.host_tools = files
        self.host_tools_path = os.path.join(dir_name, "bin")

        path = self.host_tools_path
        if "PATH" in os.environ:
            path += ":" + os.environ["PATH"]
        os.environ["PATH"] = path

        ld_library_path = os.path.join(dir_name, "lib64")
        if "LD_LIBRARY_PATH" in os.environ:
            ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"]
        if "ANDROID_HOST_OUT" in os.environ:
            ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
        os.environ["LD_LIBRARY_PATH"] = ld_library_path

    def _extract_resource(self, resource_name):
        with (
            resources.files("apexer_test").joinpath(resource_name).open('rb') as f,
            tempfile.NamedTemporaryFile(prefix=resource_name.replace('/', '_'), delete=False) as f2,
        ):
            self._to_cleanup.append(f2.name)
            shutil.copyfileobj(f, f2)
            return f2.name

    def _get_container_files(self, apex_file_path):
        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
        self._to_cleanup.append(dir_name)
        with ZipFile(apex_file_path, 'r') as zip_obj:
            zip_obj.extractall(path=dir_name)
        files = {}
        for i in ["apex_manifest.json", "apex_manifest.pb",
                  "apex_build_info.pb", "assets",
                  "apex_payload.img", "apex_payload.zip"]:
            file_path = os.path.join(dir_name, i)
            if os.path.exists(file_path):
                files[i] = file_path
        self.assertIn("apex_manifest.pb", files)
        self.assertIn("apex_build_info.pb", files)

        image_file = None
        if "apex_payload.img" in files:
            image_file = files["apex_payload.img"]
        elif "apex_payload.zip" in files:
            image_file = files["apex_payload.zip"]
        self.assertIsNotNone(image_file)
        files["apex_payload"] = image_file

        return files

    def _extract_payload_from_img(self, img_file_path):
        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
        self._to_cleanup.append(dir_name)
        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
        run_host_command(cmd)

        # Remove payload files added by apexer and e2fs tools.
        for i in ["apex_manifest.json", "apex_manifest.pb"]:
            if os.path.exists(os.path.join(dir_name, i)):
                os.remove(os.path.join(dir_name, i))
        if os.path.isdir(os.path.join(dir_name, "lost+found")):
            shutil.rmtree(os.path.join(dir_name, "lost+found"))
        return dir_name

    def _extract_payload(self, apex_file_path):
        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
        self._to_cleanup.append(dir_name)
        cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"],
               "--fsckerofs_path", self.host_tools["fsck.erofs"], "extract",
               apex_file_path, dir_name]
        run_host_command(cmd)

        # Remove payload files added by apexer and e2fs tools.
        for i in ["apex_manifest.json", "apex_manifest.pb"]:
            if os.path.exists(os.path.join(dir_name, i)):
                os.remove(os.path.join(dir_name, i))
        if os.path.isdir(os.path.join(dir_name, "lost+found")):
            shutil.rmtree(os.path.join(dir_name, "lost+found"))
        return dir_name

    def _run_apexer(self, container_files, payload_dir, args=[]):
        unsigned_payload_only = False
        payload_only = False
        if "--unsigned_payload_only" in args:
            unsigned_payload_only = True
        if unsigned_payload_only or "--payload_only" in args:
            payload_only = True

        os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path +
            ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
        if DEBUG_TEST:
            cmd.append('-v')
        cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]])
        cmd.extend(["--android_jar_path", self.host_tools["android.jar"]])
        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
        if "apex_manifest.json" in container_files:
            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
        if not payload_only and "assets" in container_files:
            cmd.extend(["--assets_dir", container_files["assets"]])
        if not unsigned_payload_only:
            cmd.extend(["--key", self._extract_resource(TEST_PRIVATE_KEY)])
            cmd.extend(["--pubkey", self._extract_resource(TEST_AVB_PUBLIC_KEY)])
        cmd.extend(args)

        # Decide on output file name
        apex_suffix = ".apex.unsigned"
        if payload_only:
            apex_suffix = ".payload"
        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
        os.close(fd)
        self._to_cleanup.append(fn)
        cmd.extend([payload_dir, fn])

        run_host_command(cmd)
        return fn

    def _get_java_toolchain(self):
        java_toolchain = "java"
        if os.path.isfile("prebuilts/jdk/jdk21/linux-x86/bin/java"):
            java_toolchain = "prebuilts/jdk/jdk21/linux-x86/bin/java"
        elif os.path.isfile("/jdk/jdk21/linux-x86/bin/java"):
            java_toolchain = "/jdk/jdk21/linux-x86/bin/java"
        elif "ANDROID_JAVA_TOOLCHAIN" in os.environ:
            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java")
        elif "ANDROID_JAVA_HOME" in os.environ:
            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java")
        elif "JAVA_HOME" in os.environ:
            java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java")

        java_dep_lib = os.environ["LD_LIBRARY_PATH"]
        if "ANDROID_HOST_OUT" in os.environ:
            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
        if "ANDROID_BUILD_TOP" in os.environ:
            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"],
                "out/host/linux-x86/lib64")

        return [java_toolchain, java_dep_lib]

    def _sign_apk_container(self, unsigned_apex):
        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
        os.close(fd)
        self._to_cleanup.append(fn)
        java_toolchain, java_dep_lib = self._get_java_toolchain()
        cmd = [
            java_toolchain,
            "-Djava.library.path=" + java_dep_lib,
            "-jar", self.host_tools['signapk.jar'],
            "-a", "4096", "--align-file-size",
            self._extract_resource(TEST_X509_KEY),
            self._extract_resource(TEST_PK8_KEY),
            unsigned_apex, fn]
        run_and_check_output(cmd)
        return fn

    def _sign_payload(self, container_files, unsigned_payload):
        fd, signed_payload = \
            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
        os.close(fd)
        self._to_cleanup.append(signed_payload)
        shutil.copyfile(unsigned_payload, signed_payload)

        cmd = ['avbtool']
        cmd.append('add_hashtree_footer')
        cmd.append('--do_not_generate_fec')
        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
        cmd.extend(['--hash_algorithm', 'sha256'])
        cmd.extend(['--key', self._extract_resource(TEST_PRIVATE_KEY)])
        manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"])
        ValidateApexManifest(manifest_apex)
        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
        # Set up the salt based on manifest content which includes name
        # and version
        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
        cmd.extend(['--salt', salt])
        cmd.extend(['--image', signed_payload])
        run_and_check_output(cmd)

        return signed_payload

    def _verify_payload(self, payload):
        """Verifies that the payload is properly signed by avbtool"""
        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
        run_and_check_output(cmd)

    def _run_build_test(self, apex_name):
        apex_file_path = self._extract_resource(apex_name + ".apex")
        if DEBUG_TEST:
            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
            os.close(fd)
            shutil.copyfile(apex_file_path, fn)
            self._to_cleanup.append(fn)
        container_files = self._get_container_files(apex_file_path)
        payload_dir = self._extract_payload(apex_file_path)
        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))

    def test_simple_apex(self):
        self._run_build_test(TEST_APEX)

    def test_legacy_apex(self):
        self._run_build_test(TEST_APEX_LEGACY)

    def test_output_payload_only(self):
        """Assert that payload-only output from apexer is same as the payload we get by unzipping
        apex.
        """
        apex_file_path = self._extract_resource(TEST_APEX + ".apex")
        container_files = self._get_container_files(apex_file_path)
        payload_dir = self._extract_payload(apex_file_path)
        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
        self._verify_payload(payload_only_file_path)
        self.assertEqual(get_sha1sum(payload_only_file_path),
                         get_sha1sum(container_files["apex_payload"]))

    def test_output_unsigned_payload_only(self):
        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
        same as the payload we get by unzipping apex.
        """
        apex_file_path = self._extract_resource(TEST_APEX + ".apex")
        container_files = self._get_container_files(apex_file_path)
        payload_dir = self._extract_payload(apex_file_path)
        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
                                                  ["--unsigned_payload_only"])
        with self.assertRaises(RuntimeError) as error:
            self._verify_payload(unsigned_payload_only_file_path)
        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
        self.assertEqual(get_sha1sum(signed_payload),
                         get_sha1sum(container_files["apex_payload"]))

    def test_apex_with_logging_parent(self):
      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)

    def test_apex_with_overridden_package_name(self):
      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)

    def test_conv_apex_manifest(self):
        # .pb generation from json
        manifest_json_path = self._extract_resource(TEST_MANIFEST_JSON)

        fd, fn = tempfile.mkstemp(prefix=self._testMethodName + "_manifest_", suffix=".pb")
        os.close(fd)
        self._to_cleanup.append(fn)
        cmd = [
            "conv_apex_manifest",
            "proto",
            manifest_json_path,
            "-o", fn]
        run_and_check_output(cmd)

        with open(manifest_json_path) as fd_json:
            manifest_json = json.load(fd_json)
        manifest_apex = ParseApexManifest(fn)
        ValidateApexManifest(manifest_apex)

        self.assertEqual(manifest_apex.name, manifest_json["name"])
        self.assertEqual(manifest_apex.version, manifest_json["version"])

        # setprop check on already generated .pb
        next_version = 20
        cmd = [
            "conv_apex_manifest",
            "setprop",
            "version", str(next_version),
            fn]
        run_and_check_output(cmd)

        manifest_apex = ParseApexManifest(fn)
        ValidateApexManifest(manifest_apex)

        self.assertEqual(manifest_apex.version, next_version)



if __name__ == '__main__':
    unittest.main(verbosity=2)
