#!/usr/bin/env python
#
# Copyright (C) 2019 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""deapexer is a tool that prints out content of an APEX.

To print content of an APEX to stdout:
  deapexer list foo.apex

To extract content of an APEX to the given directory:
  deapexer extract foo.apex dest
"""
from __future__ import print_function

import argparse
import apex_manifest
import enum
import os
import shutil
import sys
import subprocess
import tempfile
import zipfile

BLOCK_SIZE = 4096

# See apexd/apex_file.cpp#RetrieveFsType
FS_TYPES = [
    ('f2fs', 1024, b'\x10\x20\xf5\xf2'),
    ('ext4', 1024 + 0x38, b'\123\357'),
    ('erofs', 1024, b'\xe2\xe1\xf5\xe0'),
]


def RetrieveFileSystemType(file):
  """Returns filesystem type with magic"""
  with open(file, 'rb') as f:
    for type, offset, magic in FS_TYPES:
      buf = bytearray(len(magic))
      f.seek(offset, os.SEEK_SET)
      f.readinto(buf)
      if buf == magic:
        return type
  raise ValueError('Failed to retrieve filesystem type')

class ApexImageEntry(object):

  def __init__(self, name, base_dir, permissions, size, ino, extents,
               is_directory, is_symlink, security_context):
    self._name = name
    self._base_dir = base_dir
    self._permissions = permissions
    self._size = size
    self._is_directory = is_directory
    self._is_symlink = is_symlink
    self._ino = ino
    self._extents = extents
    self._security_context = security_context

  @property
  def name(self):
    return self._name

  @property
  def root(self):
    return self._base_dir == './' and self._name == '.'

  @property
  def full_path(self):
    if self.root:
      return self._base_dir  # './'
    path = os.path.join(self._base_dir, self._name)
    if self.is_directory:
      path += '/'
    return path

  @property
  def is_directory(self):
    return self._is_directory

  @property
  def is_symlink(self):
    return self._is_symlink

  @property
  def is_regular_file(self):
    return not self.is_directory and not self.is_symlink

  @property
  def permissions(self):
    return self._permissions

  @property
  def size(self):
    return self._size

  @property
  def ino(self):
    return self._ino

  @property
  def extents(self):
    return self._extents

  @property
  def security_context(self):
    return self._security_context

  def __str__(self):
    ret = ''
    if self._is_directory:
      ret += 'd'
    elif self._is_symlink:
      ret += 'l'
    else:
      ret += '-'

    def mask_as_string(m):
      ret = 'r' if m & 4 == 4 else '-'
      ret += 'w' if m & 2 == 2 else '-'
      ret += 'x' if m & 1 == 1 else '-'
      return ret

    ret += mask_as_string(self._permissions >> 6)
    ret += mask_as_string((self._permissions >> 3) & 7)
    ret += mask_as_string(self._permissions & 7)

    return ret + ' ' + self._size + ' ' + self._name


class ApexImageDirectory(object):

  def __init__(self, path, entries, apex):
    self._path = path
    self._entries = sorted(entries, key=lambda e: e.name)
    self._apex = apex

  def list(self, is_recursive=False):
    for e in self._entries:
      yield e
      if e.is_directory and e.name != '.' and e.name != '..':
        for ce in self.enter_subdir(e).list(is_recursive):
          yield ce

  def enter_subdir(self, entry):
    return self._apex._list(self._path + entry.name + '/')


class Apex(object):

  def __init__(self, args):
    self._debugfs = args.debugfs_path
    self._fsckerofs = args.fsckerofs_path
    self._apex = args.apex
    self._tempdir = tempfile.mkdtemp()
    # TODO(b/139125405): support flattened APEXes.
    with zipfile.ZipFile(self._apex, 'r') as zip_ref:
      self._payload = zip_ref.extract('apex_payload.img', path=self._tempdir)
    self._payload_fs_type = RetrieveFileSystemType(self._payload)
    self._cache = {}

  def __del__(self):
    shutil.rmtree(self._tempdir)

  def __enter__(self):
    return self

  def __exit__(self, type, value, traceback):
    pass

  def list(self, is_recursive=False):
    if self._payload_fs_type not in ['ext4']:
      sys.exit(f"{self._payload_fs_type} is not supported for `list`.")

    root = self._list('./')
    return root.list(is_recursive)

  def _list(self, path):
    if path in self._cache:
      return self._cache[path]
    process = subprocess.Popen([self._debugfs, '-R', 'ls -l -p %s' % path, self._payload],
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                               universal_newlines=True)
    stdout, _ = process.communicate()
    res = str(stdout)
    entries = []
    for line in res.split('\n'):
      if not line:
        continue
      parts = line.split('/')
      if len(parts) != 8:
        continue
      name = parts[5]
      if not name:
        continue
      ino = parts[1]
      bits = parts[2]
      size = parts[6]
      extents = []
      is_symlink = bits[1]=='2'
      is_directory=bits[1]=='4'

      if not is_symlink and not is_directory:
        process = subprocess.Popen([self._debugfs, '-R', 'dump_extents <%s>' % ino,
                                    self._payload], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                    universal_newlines=True)
        stdout, _ = process.communicate()
        # Output of dump_extents for an inode fragmented in 3 blocks (length and addresses represent
        # block-sized sections):
        # Level Entries       Logical      Physical Length Flags
        # 0/ 0   1/  3     0 -     0    18 -    18      1
        # 0/ 0   2/  3     1 -    15    20 -    34     15
        # 0/ 0   3/  3    16 -  1863    37 -  1884   1848
        res = str(stdout).splitlines()
        res.pop(0) # the first line contains only columns names
        left_length = int(size)
        try: # dump_extents sometimes has an unexpected output
          for line in res:
            tokens = line.split()
            offset = int(tokens[7]) * BLOCK_SIZE
            length = min(int(tokens[-1]) * BLOCK_SIZE, left_length)
            left_length -= length
            extents.append((offset, length))
          if (left_length != 0): # dump_extents sometimes fails to display "hole" blocks
            raise ValueError
        except:
          extents = [] # [] means that we failed to retrieve the file location successfully

      # get 'security.selinux' attribute
      entry_path = os.path.join(path, name)
      stdout = subprocess.check_output([
        self._debugfs,
        '-R',
        f'ea_get -V {entry_path} security.selinux',
        self._payload
      ], text=True, stderr=subprocess.DEVNULL)
      security_context = stdout.rstrip('\n\x00')

      entries.append(ApexImageEntry(name,
                                    base_dir=path,
                                    permissions=int(bits[3:], 8),
                                    size=size,
                                    is_directory=is_directory,
                                    is_symlink=is_symlink,
                                    ino=ino,
                                    extents=extents,
                                    security_context=security_context))

    return ApexImageDirectory(path, entries, self)

  def extract(self, dest):
    if self._payload_fs_type == 'erofs':
      process = subprocess.Popen([self._fsckerofs, '--extract=%s' % (dest), '--overwrite', self._payload],
                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                 universal_newlines=True)
    elif self._payload_fs_type == 'ext4':
      process = subprocess.Popen([self._debugfs, '-R', 'rdump ./ %s' % (dest), self._payload],
                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                 universal_newlines=True)
    else:
      # TODO(b/279688635) f2fs is not supported yet.
      sys.exit(f"{self._payload_fs_type} is not supported for `extract`.")

    _, stderr = process.communicate()
    if process.returncode != 0:
      print(stderr, file=sys.stderr)


def RunList(args):
  if GetType(args.apex) == ApexType.COMPRESSED:
    with tempfile.TemporaryDirectory() as temp:
      decompressed_apex = os.path.join(temp, 'temp.apex')
      decompress(args.apex, decompressed_apex)
      args.apex = decompressed_apex

      RunList(args)
      return

  with Apex(args) as apex:
    for e in apex.list(is_recursive=True):
      # dot(., ..) directories
      if not e.root and e.name in ('.', '..'):
          continue
      res = ''
      if args.size:
        res += e.size + ' '
      res += e.full_path
      if args.extents:
        res += ' [' + '-'.join(str(x) for x in e.extents) + ']'
      if args.contexts:
        res += ' ' + e.security_context
      print(res)


def RunExtract(args):
  if GetType(args.apex) == ApexType.COMPRESSED:
    with tempfile.TemporaryDirectory() as temp:
      decompressed_apex = os.path.join(temp, "temp.apex")
      decompress(args.apex, decompressed_apex)
      args.apex = decompressed_apex

      RunExtract(args)
      return

  with Apex(args) as apex:
    if not os.path.exists(args.dest):
      os.makedirs(args.dest, mode=0o755)
    apex.extract(args.dest)
    if os.path.isdir(os.path.join(args.dest, "lost+found")):
      shutil.rmtree(os.path.join(args.dest, "lost+found"))

class ApexType(enum.Enum):
  INVALID = 0
  UNCOMPRESSED = 1
  COMPRESSED = 2


def GetType(apex_path):
  with zipfile.ZipFile(apex_path, 'r') as zip_file:
    names = zip_file.namelist()
    has_payload = 'apex_payload.img' in names
    has_original_apex = 'original_apex' in names
    if has_payload and has_original_apex:
      return ApexType.INVALID
    if has_payload:
      return ApexType.UNCOMPRESSED
    if has_original_apex:
      return ApexType.COMPRESSED
    return ApexType.INVALID


def RunInfo(args):
  if args.print_type:
    res = GetType(args.apex)
    if res == ApexType.INVALID:
      print(args.apex + ' is not a valid apex')
      sys.exit(1)
    print(res.name)
  else:
    manifest = apex_manifest.fromApex(args.apex)
    print(apex_manifest.toJsonString(manifest))


def RunDecompress(args):
  """RunDecompress takes path to compressed APEX and decompresses it to
  produce the original uncompressed APEX at give output path

  See apex_compression_tool.py#RunCompress for details on compressed APEX
  structure.

  Args:
      args.input: file path to compressed APEX
      args.output: file path to where decompressed APEX will be placed
  """
  compressed_apex_fp = args.input
  decompressed_apex_fp = args.output
  return decompress(compressed_apex_fp, decompressed_apex_fp)

def decompress(compressed_apex_fp, decompressed_apex_fp):
  if os.path.exists(decompressed_apex_fp):
    print("Output path '" + decompressed_apex_fp + "' already exists")
    sys.exit(1)

  with zipfile.ZipFile(compressed_apex_fp, 'r') as zip_obj:
    if 'original_apex' not in zip_obj.namelist():
      print(compressed_apex_fp + ' is not a compressed APEX. Missing '
                                 "'original_apex' file inside it.")
      sys.exit(1)
    # Rename original_apex file to what user provided as output filename
    original_apex_info = zip_obj.getinfo('original_apex')
    original_apex_info.filename = os.path.basename(decompressed_apex_fp)
    # Extract the original_apex as desired name
    zip_obj.extract(original_apex_info,
                    path=os.path.dirname(decompressed_apex_fp))


def main(argv):
  parser = argparse.ArgumentParser()

  debugfs_default = None
  fsckerofs_default = None
  blkid_default = None
  if 'ANDROID_HOST_OUT' in os.environ:
    debugfs_default = '%s/bin/debugfs_static' % os.environ['ANDROID_HOST_OUT']
    fsckerofs_default = '%s/bin/fsck.erofs' % os.environ['ANDROID_HOST_OUT']
    blkid_default = '%s/bin/blkid_static' % os.environ['ANDROID_HOST_OUT']
  parser.add_argument('--debugfs_path', help='The path to debugfs binary', default=debugfs_default)
  parser.add_argument('--fsckerofs_path', help='The path to fsck.erofs binary', default=fsckerofs_default)
  # TODO(b/279858383) remove the argument
  parser.add_argument('--blkid_path', help='NOT USED', default=blkid_default)

  subparsers = parser.add_subparsers(required=True, dest='cmd')

  parser_list = subparsers.add_parser('list', help='prints content of an APEX to stdout')
  parser_list.add_argument('apex', type=str, help='APEX file')
  parser_list.add_argument('--size', help='also show the size of the files', action="store_true")
  parser_list.add_argument('--extents', help='also show the location of the files', action="store_true")
  parser_list.add_argument('-Z', '--contexts',
                           help='also show the security context of the files',
                           action='store_true')
  parser_list.set_defaults(func=RunList)

  parser_extract = subparsers.add_parser('extract', help='extracts content of an APEX to the given '
                                                         'directory')
  parser_extract.add_argument('apex', type=str, help='APEX file')
  parser_extract.add_argument('dest', type=str, help='Directory to extract content of APEX to')
  parser_extract.set_defaults(func=RunExtract)

  parser_info = subparsers.add_parser('info', help='prints APEX manifest')
  parser_info.add_argument('apex', type=str, help='APEX file')
  parser_info.add_argument('--print-type',
                           help='Prints type of the apex (COMPRESSED or UNCOMPRESSED)',
                           action='store_true')
  parser_info.set_defaults(func=RunInfo)

  # Handle sub-command "decompress"
  parser_decompress = subparsers.add_parser('decompress',
                                            help='decompresses a compressed '
                                                 'APEX')
  parser_decompress.add_argument('--input', type=str, required=True,
                                 help='path to compressed APEX file that '
                                      'will be decompressed')
  parser_decompress.add_argument('--output', type=str, required=True,
                                 help='output directory path where '
                                      'decompressed APEX will be extracted')
  parser_decompress.set_defaults(func=RunDecompress)

  args = parser.parse_args(argv)

  debugfs_required_for_cmd = ['list', 'extract']
  if args.cmd in debugfs_required_for_cmd and not args.debugfs_path:
    print('ANDROID_HOST_OUT environment variable is not defined, --debugfs_path must be set',
          file=sys.stderr)
    sys.exit(1)

  if args.cmd == 'extract':
    if not args.blkid_path:
      print('ANDROID_HOST_OUT environment variable is not defined, --blkid_path must be set',
            file=sys.stderr)
      sys.exit(1)

    if not args.fsckerofs_path:
      print('ANDROID_HOST_OUT environment variable is not defined, --fsckerofs_path must be set',
            file=sys.stderr)
      sys.exit(1)

    if not os.path.isfile(args.fsckerofs_path):
      print(f'Cannot find fsck.erofs specified at {args.fsckerofs_path}',
            file=sys.stderr)
      sys.exit(1)

  args.func(args)


if __name__ == '__main__':
  main(sys.argv[1:])
