Add tool to copy files in to fat16 images

fat16copy.py can be used to copy files or directories in to the root of
a fat16 image.

Usage: fat16copy <image> <file> [<file> ...]

Test: Used to compose a Raspberry Pi 3 boot image
Bug: 28912590
Change-Id: I29c9eec3786e2c5cc94f9a160360bac850809a93
diff --git a/tools/fat16copy.py b/tools/fat16copy.py
new file mode 100755
index 0000000..1dd15b7
--- /dev/null
+++ b/tools/fat16copy.py
@@ -0,0 +1,789 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import struct
+
+FAT_TABLE_START = 0x200
+DEL_MARKER = 0xe5
+ESCAPE_DEL_MARKER = 0x05
+
+ATTRIBUTE_READ_ONLY = 0x1
+ATTRIBUTE_HIDDEN = 0x2
+ATTRIBUTE_SYSTEM = 0x4
+ATTRIBUTE_VOLUME_LABEL = 0x8
+ATTRIBUTE_SUBDIRECTORY = 0x10
+ATTRIBUTE_ARCHIVE = 0x20
+ATTRIBUTE_DEVICE = 0x40
+
+LFN_ATTRIBUTES = \
+    ATTRIBUTE_VOLUME_LABEL | \
+    ATTRIBUTE_SYSTEM | \
+    ATTRIBUTE_HIDDEN | \
+    ATTRIBUTE_READ_ONLY
+LFN_ATTRIBUTES_BYTE = struct.pack("B", LFN_ATTRIBUTES)
+
+MAX_CLUSTER_ID = 0x7FFF
+
+def read_le_short(f):
+  "Read a little-endian 2-byte integer from the given file-like object"
+  return struct.unpack("<H", f.read(2))[0]
+
+def read_le_long(f):
+  "Read a little-endian 4-byte integer from the given file-like object"
+  return struct.unpack("<L", f.read(4))[0]
+
+def read_byte(f):
+  "Read a 1-byte integer from the given file-like object"
+  return struct.unpack("B", f.read(1))[0]
+
+def skip_bytes(f, n):
+  "Fast-forward the given file-like object by n bytes"
+  f.seek(n, os.SEEK_CUR)
+
+def skip_short(f):
+  "Fast-forward the given file-like object 2 bytes"
+  skip_bytes(f, 2)
+
+def skip_byte(f):
+  "Fast-forward the given file-like object 1 byte"
+  skip_bytes(f, 1)
+
+def rewind_bytes(f, n):
+  "Rewind the given file-like object n bytes"
+  skip_bytes(f, -n)
+
+def rewind_short(f):
+  "Rewind the given file-like object 2 bytes"
+  rewind_bytes(f, 2)
+
+class fake_file(object):
+  """
+  Interface for python file-like objects that we use to manipulate the image.
+  Inheritors must have an idx member which indicates the file pointer, and a
+  size member which indicates the total file size.
+  """
+
+  def seek(self, amount, direction=0):
+    "Implementation of seek from python's file-like object interface."
+    if direction == os.SEEK_CUR:
+      self.idx += amount
+    elif direction == os.SEEK_END:
+      self.idx = self.size - amount
+    else:
+      self.idx = amount
+
+    if self.idx < 0:
+      self.idx = 0
+    if self.idx > self.size:
+      self.idx = self.size
+
+class fat_file(fake_file):
+  """
+  A file inside of our fat image. The file may or may not have a dentry, and
+  if it does this object knows nothing about it. All we see is a valid cluster
+  chain.
+  """
+
+  def __init__(self, fs, cluster, size=None):
+    """
+    fs: The fat() object for the image this file resides in.
+    cluster: The first cluster of data for this file.
+    size: The size of this file. If not given, we use the total length of the
+          cluster chain that starts from the cluster argument.
+    """
+    self.fs = fs
+    self.start_cluster = cluster
+    self.size = size
+
+    if self.size is None:
+      self.size = fs.get_chain_size(cluster)
+
+    self.idx = 0
+
+  def read(self, size):
+    "Read method for pythonic file-like interface."
+    if self.idx + size > self.size:
+      size = self.size - self.idx
+    got = self.fs.read_file(self.start_cluster, self.idx, size)
+    self.idx += len(got)
+    return got
+
+  def write(self, data):
+    "Write method for pythonic file-like interface."
+    self.fs.write_file(self.start_cluster, self.idx, data)
+    self.idx += len(data)
+
+    if self.idx > self.size:
+      self.size = self.idx
+
+def shorten(name, index):
+  """
+  Create a file short name from the given long name (with the extension already
+  removed). The index argument gives a disambiguating integer to work into the
+  name to avoid collisions.
+  """
+  name = "".join(name.split('.')).upper()
+  postfix = "~" + str(index)
+  return name[:8 - len(postfix)] + postfix
+
+class fat_dir(object):
+  "A directory in our fat filesystem."
+
+  def __init__(self, backing):
+    """
+    backing: A file-like object from which we can read dentry info. Should have
+    an fs member allowing us to get to the underlying image.
+    """
+    self.backing = backing
+    self.dentries = []
+    to_read = self.backing.size / 32
+
+    self.backing.seek(0)
+
+    while to_read > 0:
+      (dent, consumed) = self.backing.fs.read_dentry(self.backing)
+      to_read -= consumed
+
+      if dent:
+        self.dentries.append(dent)
+
+  def __str__(self):
+    return "\n".join([str(x) for x in self.dentries]) + "\n"
+
+  def add_dentry(self, attributes, shortname, ext, longname, first_cluster,
+      size):
+    """
+    Add a new dentry to this directory.
+    attributes: Attribute flags for this dentry. See the ATTRIBUTE_ constants
+                above.
+    shortname: Short name of this file. Up to 8 characters, no dots.
+    ext: Extension for this file. Up to 3 characters, no dots.
+    longname: The long name for this file, with extension. Largely unrestricted.
+    first_cluster: The first cluster in the cluster chain holding the contents
+                   of this file.
+    size: The size of this file. Set to 0 for subdirectories.
+    """
+    new_dentry = dentry(self.backing.fs, attributes, shortname, ext,
+        longname, first_cluster, size)
+    new_dentry.commit(self.backing)
+    self.dentries.append(new_dentry)
+    return new_dentry
+
+  def make_short_name(self, name):
+    """
+    Given a long file name, return an 8.3 short name as a tuple. Name will be
+    engineered not to collide with other such names in this folder.
+    """
+    parts = name.rsplit('.', 1)
+
+    if len(parts) == 1:
+      parts.append('')
+
+    name = parts[0]
+    ext = parts[1].upper()
+
+    index = 1
+    shortened = shorten(name, index)
+
+    for dent in self.dentries:
+      assert dent.longname != name, "File must not exist"
+      if dent.shortname == shortened:
+        index += 1
+        shortened = shorten(name, index)
+
+    if len(name) <= 8 and len(ext) <= 3 and not '.' in name:
+      return (name.upper().ljust(8), ext.ljust(3))
+
+    return (shortened.ljust(8), ext[:3].ljust(3))
+
+  def new_file(self, name, data=None):
+    """
+    Add a new regular file to this directory.
+    name: The name of the new file.
+    data: The contents of the new file. Given as a file-like object.
+    """
+    size = 0
+    if data:
+      data.seek(0, os.SEEK_END)
+      size = data.tell()
+
+    chunk = self.backing.fs.allocate(size or 1)
+    (shortname, ext) = self.make_short_name(name)
+    self.add_dentry(0, shortname, ext, name, chunk, size)
+
+    if data is None:
+      return
+
+    data_file = fat_file(self.backing.fs, chunk, size)
+    data.seek(0)
+    data_file.write(data.read())
+
+  def new_subdirectory(self, name):
+    """
+    Create a new subdirectory of this directory with the given name.
+    Returns a fat_dir().
+    """
+    chunk = self.backing.fs.allocate(1)
+    (shortname, ext) = self.make_short_name(name)
+    new_dentry = dentry(self.backing.fs, ATTRIBUTE_SUBDIRECTORY,
+        shortname, ext, name, chunk, 0)
+    new_dentry.commit(self.backing)
+    return new_dentry.open_directory()
+
+def lfn_checksum(name_data):
+  """
+  Given the characters of an 8.3 file name (concatenated *without* the dot),
+  Compute a one-byte checksum which needs to appear in corresponding long file
+  name entries.
+  """
+  assert len(name_data) == 11, "Name data should be exactly 11 characters"
+  name_data = struct.unpack("B" * 11, name_data)
+
+  result = 0
+
+  for char in name_data:
+    last_bit = (result & 1) << 7
+    result = (result >> 1) | last_bit
+    result += char
+    result = result & 0xFF
+
+  return struct.pack("B", result)
+
+class dentry(object):
+  "A directory entry"
+  def __init__(self, fs, attributes, shortname, ext, longname,
+      first_cluster, size):
+    """
+    fs: The fat() object for the image we're stored in.
+    attributes: The attribute flags for this dentry. See the ATTRIBUTE_ flags
+                above.
+    shortname: The short name stored in this dentry. Up to 8 characters, no
+               dots.
+    ext: The file extension stored in this dentry. Up to 3 characters, no
+         dots.
+    longname: The long file name stored in this dentry.
+    first_cluster: The first cluster in the cluster chain backing the file
+                   this dentry points to.
+    size: Size of the file this dentry points to. 0 for subdirectories.
+    """
+    self.fs = fs
+    self.attributes = attributes
+    self.shortname = shortname
+    self.ext = ext
+    self.longname = longname
+    self.first_cluster = first_cluster
+    self.size = size
+
+  def name(self):
+    "A friendly text file name for this dentry."
+    if self.longname:
+      return self.longname
+
+    if not self.ext or len(self.ext) == 0:
+      return self.shortname
+
+    return self.shortname + "." + self.ext
+
+  def __str__(self):
+    return self.name() + " (" + str(self.size) + \
+      " bytes @ " + str(self.first_cluster) + ")"
+
+  def is_directory(self):
+    "Return whether this dentry points to a directory."
+    return (self.attributes & ATTRIBUTE_SUBDIRECTORY) != 0
+
+  def open_file(self):
+    "Open the target of this dentry if it is a regular file."
+    assert not self.is_directory(), "Cannot open directory as file"
+    return fat_file(self.fs, self.first_cluster, self.size)
+
+  def open_directory(self):
+    "Open the target of this dentry if it is a directory."
+    assert self.is_directory(), "Cannot open file as directory"
+    return fat_dir(fat_file(self.fs, self.first_cluster))
+
+  def longname_records(self, checksum):
+    """
+    Get the longname records necessary to store this dentry's long name,
+    packed as a series of 32-byte strings.
+    """
+    if self.longname is None:
+      return []
+    if len(self.longname) == 0:
+      return []
+
+    encoded_long_name = self.longname.encode('utf-16-le')
+    long_name_padding = "\0" * (26 - (len(encoded_long_name) % 26))
+    padded_long_name = encoded_long_name + long_name_padding
+
+    chunks = [padded_long_name[i:i+26] for i in range(0,
+      len(padded_long_name), 26)]
+    records = []
+    sequence_number = 1
+
+    for c in chunks:
+      sequence_byte = struct.pack("B", sequence_number)
+      sequence_number += 1
+      record = sequence_byte + c[:10] + LFN_ATTRIBUTES_BYTE + "\0" + \
+          checksum + c[10:22] + "\0\0" + c[22:]
+      records.append(record)
+
+    last = records.pop()
+    last_seq = struct.unpack("B", last[0])[0]
+    last_seq = last_seq | 0x40
+    last = struct.pack("B", last_seq) + last[1:]
+    records.append(last)
+    records.reverse()
+
+    return records
+
+  def commit(self, f):
+    """
+    Write this dentry into the given file-like object,
+    which is assumed to contain a FAT directory.
+    """
+    f.seek(0)
+    padded_short_name = self.shortname.ljust(8)
+    padded_ext = self.ext.ljust(3)
+    name_data = padded_short_name + padded_ext
+    longname_record_data = self.longname_records(lfn_checksum(name_data))
+    record = struct.pack("<11sBBBHHHHHHHL",
+        name_data,
+        self.attributes,
+        0,
+        0,
+        0,
+        0,
+        0,
+        0,
+        0,
+        0,
+        self.first_cluster,
+        self.size)
+    entry = "".join(longname_record_data + [record])
+
+    record_count = len(longname_record_data) + 1
+
+    found_count = 0
+
+    while True:
+      record = f.read(32)
+
+      if record is None or len(record) != 32:
+        break
+
+      marker = struct.unpack("B", record[0])[0]
+
+      if marker == DEL_MARKER or marker == 0:
+        found_count += 1
+
+        if found_count == record_count:
+          break
+      else:
+        found_count = 0
+
+    if found_count != record_count:
+      f.write("\0" * self.fs.bytes_per_cluster)
+      f.seek(-self.fs.bytes_per_cluster, os.SEEK_CUR)
+    else:
+      f.seek(-(record_count * 32), os.SEEK_CUR)
+    f.write(entry)
+
+class root_dentry_file(fake_file):
+  """
+  File-like object for the root directory. The root directory isn't stored in a
+  normal file, so we can't use a normal fat_file object to create a view of it.
+  """
+  def __init__(self, fs):
+    self.fs = fs
+    self.idx = 0
+    self.size = fs.root_entries * 32
+
+  def read(self, count):
+    f = self.fs.f
+    f.seek(self.fs.data_start() + self.idx)
+
+    if self.idx + count > self.size:
+      count = self.size - self.idx
+
+    ret = f.read(count)
+    self.idx += len(ret)
+    return ret
+
+  def write(self, data):
+    f = self.fs.f
+    f.seek(self.fs.data_start() + self.idx)
+
+    if self.idx + len(data) > self.size:
+      data = data[:self.size - self.idx]
+
+    f.write(data)
+    self.idx += len(data)
+    if self.idx > self.size:
+      self.size = self.idx
+
+class fat(object):
+  "A FAT image"
+
+  def __init__(self, path):
+    """
+    path: Path to an image file containing a FAT file system.
+    """
+    f = open(path, "r+b")
+
+    self.f = f
+
+    f.seek(0xb)
+    bytes_per_sector = read_le_short(f)
+    sectors_per_cluster = read_byte(f)
+
+    self.bytes_per_cluster = bytes_per_sector * sectors_per_cluster
+
+    reserved_sectors = read_le_short(f)
+    assert reserved_sectors == 1, \
+        "Can only handle FAT with 1 reserved sector"
+
+    fat_count = read_byte(f)
+    assert fat_count == 2, "Can only handle FAT with 2 tables"
+
+    self.root_entries = read_le_short(f)
+
+    skip_short(f) # Image size. Sort of. Useless field.
+    skip_byte(f) # Media type. We don't care.
+
+    self.fat_size = read_le_short(f) * bytes_per_sector
+    self.root = fat_dir(root_dentry_file(self))
+
+  def data_start(self):
+    """
+    Index of the first byte after the FAT tables.
+    """
+    return FAT_TABLE_START + self.fat_size * 2
+
+  def get_chain_size(self, head_cluster):
+    """
+    Return how many total bytes are in the cluster chain rooted at the given
+    cluster.
+    """
+    if head_cluster == 0:
+      return 0
+
+    f = self.f
+    f.seek(FAT_TABLE_START + head_cluster * 2)
+
+    cluster_count = 0
+
+    while head_cluster <= MAX_CLUSTER_ID:
+      cluster_count += 1
+      head_cluster = read_le_short(f)
+      f.seek(FAT_TABLE_START + head_cluster * 2)
+
+    return cluster_count * self.bytes_per_cluster
+
+  def read_dentry(self, f=None):
+    """
+    Read and decode a dentry from the given file-like object at its current
+    seek position.
+    """
+    f = f or self.f
+    attributes = None
+
+    consumed = 1
+
+    lfn_entries = {}
+
+    while True:
+      skip_bytes(f, 11)
+      attributes = read_byte(f)
+      rewind_bytes(f, 12)
+
+      if attributes & LFN_ATTRIBUTES != LFN_ATTRIBUTES:
+        break
+
+      consumed += 1
+
+      seq = read_byte(f)
+      chars = f.read(10)
+      skip_bytes(f, 3) # Various hackish nonsense
+      chars += f.read(12)
+      skip_short(f) # Lots more nonsense
+      chars += f.read(4)
+
+      chars = unicode(chars, "utf-16-le").encode("utf-8")
+
+      lfn_entries[seq] = chars
+
+    ind = read_byte(f)
+
+    if ind == 0 or ind == DEL_MARKER:
+      skip_bytes(f, 31)
+      return (None, consumed)
+
+    if ind == ESCAPE_DEL_MARKER:
+      ind = DEL_MARKER
+
+    ind = str(unichr(ind))
+
+    if ind == '.':
+      skip_bytes(f, 31)
+      return (None, consumed)
+
+    shortname = ind + f.read(7).rstrip()
+    ext = f.read(3).rstrip()
+    skip_bytes(f, 15) # Assorted flags, ctime/atime/mtime, etc.
+    first_cluster = read_le_short(f)
+    size = read_le_long(f)
+
+    lfn = lfn_entries.items()
+    lfn.sort(key=lambda x: x[0])
+    lfn = reduce(lambda x, y: x + y[1], lfn, "")
+
+    if len(lfn) == 0:
+      lfn = None
+    else:
+      lfn = lfn.split('\0', 1)[0]
+
+    return (dentry(self, attributes, shortname, ext, lfn, first_cluster,
+      size), consumed)
+
+  def read_file(self, head_cluster, start_byte, size):
+    """
+    Read from a given FAT file.
+    head_cluster: The first cluster in the file.
+    start_byte: How many bytes in to the file to begin the read.
+    size: How many bytes to read.
+    """
+    f = self.f
+
+    assert size >= 0, "Can't read a negative amount"
+    if size == 0:
+      return ""
+
+    got_data = ""
+
+    while True:
+      size_now = size
+      if start_byte + size > self.bytes_per_cluster:
+        size_now = self.bytes_per_cluster - start_byte
+
+      if start_byte < self.bytes_per_cluster:
+        size -= size_now
+
+        cluster_bytes_from_root = (head_cluster - 2) * \
+            self.bytes_per_cluster
+        bytes_from_root = cluster_bytes_from_root + start_byte
+        bytes_from_data_start = bytes_from_root + self.root_entries * 32
+
+        f.seek(self.data_start() + bytes_from_data_start)
+        line = f.read(size_now)
+        got_data += line
+
+        if size == 0:
+          return got_data
+
+      start_byte -= self.bytes_per_cluster
+
+      if start_byte < 0:
+        start_byte = 0
+
+      f.seek(FAT_TABLE_START + head_cluster * 2)
+      assert head_cluster <= MAX_CLUSTER_ID, "Out-of-bounds read"
+      head_cluster = read_le_short(f)
+      assert head_cluster > 0, "Read free cluster"
+
+    return got_data
+
+  def write_cluster_entry(self, entry):
+    """
+    Write a cluster entry to the FAT table. Assumes our backing file is already
+    seeked to the correct entry in the first FAT table.
+    """
+    f = self.f
+    f.write(struct.pack("<H", entry))
+    skip_bytes(f, self.fat_size - 2)
+    f.write(struct.pack("<H", entry))
+    rewind_bytes(f, self.fat_size)
+
+  def allocate(self, amount):
+    """
+    Allocate a new cluster chain big enough to hold at least the given amount
+    of bytes.
+    """
+    f = self.f
+    f.seek(FAT_TABLE_START + 4)
+
+    current = None
+    current_size = 0
+    free_zones = {}
+
+    pos = 2
+    while pos < self.fat_size / 2:
+      data = read_le_short(f)
+
+      if data == 0 and current is not None:
+        current_size += 1
+      elif data == 0:
+        current = pos
+        current_size = 1
+      elif current is not None:
+        free_zones[current] = current_size
+        current = None
+
+      pos += 1
+
+    if current is not None:
+      free_zones[current] = current_size
+
+    free_zones = free_zones.items()
+    free_zones.sort(key=lambda x: x[1])
+
+    grabbed_zones = []
+    grabbed = 0
+
+    while grabbed < amount and len(free_zones) > 0:
+      zone = free_zones.pop()
+      grabbed += zone[1] * self.bytes_per_cluster
+      grabbed_zones.append(zone)
+
+    if grabbed < amount:
+      return None
+
+    excess = (grabbed - amount) / self.bytes_per_cluster
+
+    grabbed_zones[-1] = (grabbed_zones[-1][0],
+        grabbed_zones[-1][1] - excess)
+
+    out = None
+    grabbed_zones.reverse()
+
+    for cluster, size in grabbed_zones:
+      entries = range(cluster + 1, cluster + size)
+      entries.append(out or 0xFFFF)
+      out = cluster
+      f.seek(FAT_TABLE_START + cluster * 2)
+      for entry in entries:
+        self.write_cluster_entry(entry)
+
+    return out
+
+  def extend_cluster(self, cluster, amount):
+    """
+    Given a cluster which is the *last* cluster in a chain, extend it to hold
+    at least `amount` more bytes.
+    """
+    return_cluster = None
+    f = self.f
+
+    position = FAT_TABLE_START + cluster * 2
+    f.seek(position)
+
+    assert read_le_short(f) == 0xFFFF, "Extending from middle of chain"
+    rewind_short(f)
+
+    while position + 2 < FAT_TABLE_START + self.fat_size and amount > 0:
+      skip_short(f)
+      got = read_le_short(f)
+      rewind_short(f)
+      rewind_short(f)
+
+      if got != 0:
+        break
+
+      cluster += 1
+      return_cluster = return_cluster or cluster
+      position += 2
+      self.write_cluster_entry(cluster)
+
+    if amount < 0:
+      self.write_cluster_entry(0xFFFF)
+      return return_cluster
+
+    new_chunk = self.allocate(amount)
+    f.seek(FAT_TABLE_START + cluster * 2)
+    self.write_cluster_entry(new_chunk)
+
+    return return_cluster or new_chunk
+
+  def write_file(self, head_cluster, start_byte, data):
+    """
+    Write to a given FAT file.
+
+    head_cluster: The first cluster in the file.
+    start_byte: How many bytes in to the file to begin the write.
+    data: The data to write.
+    """
+    f = self.f
+
+    while True:
+      if start_byte < self.bytes_per_cluster:
+        to_write = data[:self.bytes_per_cluster - start_byte]
+        data = data[self.bytes_per_cluster - start_byte:]
+
+        cluster_bytes_from_root = (head_cluster - 2) * \
+            self.bytes_per_cluster
+        bytes_from_root = cluster_bytes_from_root + start_byte
+        bytes_from_data_start = bytes_from_root + self.root_entries * 32
+
+        f.seek(self.data_start() + bytes_from_data_start)
+        f.write(to_write)
+
+        if len(data) == 0:
+          return
+
+      start_byte -= self.bytes_per_cluster
+
+      if start_byte < 0:
+        start_byte = 0
+
+      f.seek(FAT_TABLE_START + head_cluster * 2)
+      next_cluster = read_le_short(f)
+      if next_cluster > MAX_CLUSTER_ID:
+        head_cluster = self.extend_cluster(head_cluster, len(data))
+      else:
+        head_cluster = next_cluster
+      assert head_cluster > 0, "Cannot write free cluster"
+
+def add_item(directory, item):
+  """
+  Copy a file into the given FAT directory. If the path given is a directory,
+  copy recursively.
+  directory: fat_dir to copy the file in to
+  item: Path of local file to copy
+  """
+  if os.path.isdir(item):
+    base = os.path.basename(item)
+    if len(base) == 0:
+      base = os.path.basename(item[:-1])
+    sub = directory.new_subdirectory(base)
+    for next_item in os.listdir(item):
+      add_item(sub, os.path.join(item, next_item))
+  else:
+    with open(item, 'rb') as f:
+      directory.new_file(os.path.basename(item), f)
+
+if __name__ == "__main__":
+  if len(sys.argv) < 3:
+    print("Usage: fat16copy.py <image> <file> [<file> ...]")
+    print("Files are copied into the root of the image.")
+    print("Directories are copied recursively")
+    sys.exit(1)
+
+  root = fat(sys.argv[1]).root
+
+  for p in sys.argv[2:]:
+    add_item(root, p)