update_payload: import cros_payload_unittest.py as is

import from chromite/cli/cros/cros_payload_unittest.py

BUG=chromium:865845
TEST=none

Change-Id: I4b1d0a65e1e0d19cd230e90651075afad26eded2
Reviewed-on: https://chromium-review.googlesource.com/1144642
Commit-Ready: Amin Hassani <[email protected]>
Tested-by: Amin Hassani <[email protected]>
Reviewed-by: Xiaochu Liu <[email protected]>
diff --git a/scripts/payload_info_unittest.py b/scripts/payload_info_unittest.py
new file mode 100755
index 0000000..d0b43f7
--- /dev/null
+++ b/scripts/payload_info_unittest.py
@@ -0,0 +1,369 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""This module tests the cros payload command."""
+
+from __future__ import print_function
+
+import collections
+import sys
+
+from chromite.lib import constants
+from chromite.cli.cros import cros_payload
+from chromite.lib import cros_test_lib
+
+# Needed for the update_payload import below.
+sys.path.insert(0, constants.UPDATE_ENGINE_SCRIPTS_PATH)
+
+# TODO(alliewood)(chromium:454629) update once update_payload is moved
+# into chromite
+import update_payload
+from update_payload import update_metadata_pb2
+
+class FakePayloadError(Exception):
+  """A generic error when using the FakePayload."""
+
+class FakeOption(object):
+  """Fake options object for testing."""
+
+  def __init__(self, **kwargs):
+    self.list_ops = False
+    self.stats = False
+    self.signatures = False
+    for key, val in kwargs.iteritems():
+      setattr(self, key, val)
+    if not hasattr(self, 'payload_file'):
+      self.payload_file = None
+
+class FakeOp(object):
+  """Fake manifest operation for testing."""
+
+  def __init__(self, src_extents, dst_extents, op_type, **kwargs):
+    self.src_extents = src_extents
+    self.dst_extents = dst_extents
+    self.type = op_type
+    for key, val in kwargs.iteritems():
+      setattr(self, key, val)
+
+  def HasField(self, field):
+    return hasattr(self, field)
+
+class FakePartition(object):
+  """Fake PartitionUpdate field for testing."""
+
+  def __init__(self, partition_name, operations):
+    self.partition_name = partition_name
+    self.operations = operations
+
+class FakeManifest(object):
+  """Fake manifest for testing."""
+
+  def __init__(self, major_version):
+    FakeExtent = collections.namedtuple('FakeExtent',
+                                        ['start_block', 'num_blocks'])
+    self.install_operations = [FakeOp([],
+                                      [FakeExtent(1, 1), FakeExtent(2, 2)],
+                                      update_payload.common.OpType.REPLACE_BZ,
+                                      dst_length=3*4096,
+                                      data_offset=1,
+                                      data_length=1)]
+    self.kernel_install_operations = [FakeOp(
+        [FakeExtent(1, 1)],
+        [FakeExtent(x, x) for x in xrange(20)],
+        update_payload.common.OpType.SOURCE_COPY,
+        src_length=4096)]
+    if major_version == cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO:
+      self.partitions = [FakePartition('rootfs', self.install_operations),
+                         FakePartition('kernel',
+                                       self.kernel_install_operations)]
+      self.install_operations = self.kernel_install_operations = []
+    self.block_size = 4096
+    self.minor_version = 4
+    FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
+    self.old_rootfs_info = FakePartInfo(1 * 4096)
+    self.old_kernel_info = FakePartInfo(2 * 4096)
+    self.new_rootfs_info = FakePartInfo(3 * 4096)
+    self.new_kernel_info = FakePartInfo(4 * 4096)
+    self.signatures_offset = None
+    self.signatures_size = None
+
+  def HasField(self, field_name):
+    """Fake HasField method based on the python members."""
+    return hasattr(self, field_name) and getattr(self, field_name) is not None
+
+class FakeHeader(object):
+  """Fake payload header for testing."""
+
+  def __init__(self, version, manifest_len, metadata_signature_len):
+    self.version = version
+    self.manifest_len = manifest_len
+    self.metadata_signature_len = metadata_signature_len
+
+  @property
+  def size(self):
+    return (20 if self.version == cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS
+            else 24)
+
+
+class FakePayload(object):
+  """Fake payload for testing."""
+
+  def __init__(self, major_version):
+    self._header = FakeHeader(major_version, 222, 0)
+    self.header = None
+    self._manifest = FakeManifest(major_version)
+    self.manifest = None
+
+    self._blobs = {}
+    self._payload_signatures = update_metadata_pb2.Signatures()
+    self._metadata_signatures = update_metadata_pb2.Signatures()
+
+  def Init(self):
+    """Fake Init that sets header and manifest.
+
+    Failing to call Init() will not make header and manifest available to the
+    test.
+    """
+    self.header = self._header
+    self.manifest = self._manifest
+
+  def ReadDataBlob(self, offset, length):
+    """Return the blob that should be present at the offset location"""
+    if not offset in self._blobs:
+      raise FakePayloadError('Requested blob at unknown offset %d' % offset)
+    blob = self._blobs[offset]
+    if len(blob) != length:
+      raise FakePayloadError('Read blob with the wrong length (expect: %d, '
+                             'actual: %d)' % (len(blob), length))
+    return blob
+
+  @staticmethod
+  def _AddSignatureToProto(proto, **kwargs):
+    """Add a new Signature element to the passed proto."""
+    new_signature = proto.signatures.add()
+    for key, val in kwargs.iteritems():
+      setattr(new_signature, key, val)
+
+  def AddPayloadSignature(self, **kwargs):
+    self._AddSignatureToProto(self._payload_signatures, **kwargs)
+    blob = self._payload_signatures.SerializeToString()
+    self._manifest.signatures_offset = 1234
+    self._manifest.signatures_size = len(blob)
+    self._blobs[self._manifest.signatures_offset] = blob
+
+  def AddMetadataSignature(self, **kwargs):
+    self._AddSignatureToProto(self._metadata_signatures, **kwargs)
+    if self._header.metadata_signature_len:
+      del self._blobs[-self._header.metadata_signature_len]
+    blob = self._metadata_signatures.SerializeToString()
+    self._header.metadata_signature_len = len(blob)
+    self._blobs[-len(blob)] = blob
+
+
+class PayloadCommandTest(cros_test_lib.MockOutputTestCase):
+  """Test class for our PayloadCommand class."""
+
+  def testDisplayValue(self):
+    """Verify that DisplayValue prints what we expect."""
+    with self.OutputCapturer() as output:
+      cros_payload.DisplayValue('key', 'value')
+    stdout = output.GetStdout()
+    self.assertEquals(stdout, 'key:                     value\n')
+
+  def testRun(self):
+    """Verify that Run parses and displays the payload like we expect."""
+    payload_cmd = cros_payload.PayloadCommand(FakeOption(action='show'))
+    self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+        cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         1
+Manifest length:         222
+Number of operations:    1
+Number of kernel ops:    1
+Block size:              4096
+Minor version:           4
+"""
+    self.assertEquals(stdout, expected_out)
+
+  def testListOpsOnVersion1(self):
+    """Verify that the --list_ops option gives the correct output."""
+    payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True,
+                                                         action='show'))
+    self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+        cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         1
+Manifest length:         222
+Number of operations:    1
+Number of kernel ops:    1
+Block size:              4096
+Minor version:           4
+
+Install operations:
+  0: REPLACE_BZ
+    Data offset: 1
+    Data length: 1
+    Destination: 2 extents (3 blocks)
+      (1,1) (2,2)
+Kernel install operations:
+  0: SOURCE_COPY
+    Source: 1 extent (1 block)
+      (1,1)
+    Destination: 20 extents (190 blocks)
+      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
+      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
+"""
+    self.assertEquals(stdout, expected_out)
+
+  def testListOpsOnVersion2(self):
+    """Verify that the --list_ops option gives the correct output."""
+    payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True,
+                                                         action='show'))
+    self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+        cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO))
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         2
+Manifest length:         222
+Number of partitions:    2
+  Number of "rootfs" ops: 1
+  Number of "kernel" ops: 1
+Block size:              4096
+Minor version:           4
+
+rootfs install operations:
+  0: REPLACE_BZ
+    Data offset: 1
+    Data length: 1
+    Destination: 2 extents (3 blocks)
+      (1,1) (2,2)
+kernel install operations:
+  0: SOURCE_COPY
+    Source: 1 extent (1 block)
+      (1,1)
+    Destination: 20 extents (190 blocks)
+      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
+      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
+"""
+    self.assertEquals(stdout, expected_out)
+
+  def testStatsOnVersion1(self):
+    """Verify that the --stats option works correctly."""
+    payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True,
+                                                         action='show'))
+    self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+        cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         1
+Manifest length:         222
+Number of operations:    1
+Number of kernel ops:    1
+Block size:              4096
+Minor version:           4
+Blocks read:             11
+Blocks written:          193
+Seeks when writing:      18
+"""
+    self.assertEquals(stdout, expected_out)
+
+  def testStatsOnVersion2(self):
+    """Verify that the --stats option works correctly on version 2."""
+    payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True,
+                                                         action='show'))
+    self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+        cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO))
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         2
+Manifest length:         222
+Number of partitions:    2
+  Number of "rootfs" ops: 1
+  Number of "kernel" ops: 1
+Block size:              4096
+Minor version:           4
+Blocks read:             11
+Blocks written:          193
+Seeks when writing:      18
+"""
+    self.assertEquals(stdout, expected_out)
+
+  def testEmptySignatures(self):
+    """Verify that the --signatures option works with unsigned payloads."""
+    payload_cmd = cros_payload.PayloadCommand(
+        FakeOption(action='show', signatures=True))
+    self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+        cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         1
+Manifest length:         222
+Number of operations:    1
+Number of kernel ops:    1
+Block size:              4096
+Minor version:           4
+No metadata signatures stored in the payload
+No payload signatures stored in the payload
+"""
+    self.assertEquals(stdout, expected_out)
+
+
+  def testSignatures(self):
+    """Verify that the --signatures option shows the present signatures."""
+    payload_cmd = cros_payload.PayloadCommand(
+        FakeOption(action='show', signatures=True))
+    payload = FakePayload(cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO)
+    payload.AddPayloadSignature(version=1,
+                                data='12345678abcdefgh\x00\x01\x02\x03')
+    payload.AddPayloadSignature(data='I am a signature so access is yes.')
+    payload.AddMetadataSignature(data='\x00\x0a\x0c')
+    self.PatchObject(update_payload, 'Payload', return_value=payload)
+
+    with self.OutputCapturer() as output:
+      payload_cmd.Run()
+
+    stdout = output.GetStdout()
+    expected_out = """Payload version:         2
+Manifest length:         222
+Number of partitions:    2
+  Number of "rootfs" ops: 1
+  Number of "kernel" ops: 1
+Block size:              4096
+Minor version:           4
+Metadata signatures blob: file_offset=246 (7 bytes)
+Metadata signatures: (1 entries)
+  version=None, hex_data: (3 bytes)
+    00 0a 0c                                        | ...
+Payload signatures blob: blob_offset=1234 (64 bytes)
+Payload signatures: (2 entries)
+  version=1, hex_data: (20 bytes)
+    31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
+    00 01 02 03                                     | ....
+  version=None, hex_data: (34 bytes)
+    49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
+    20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 |  so access is ye
+    73 2e                                           | s.
+"""
+    self.assertEquals(stdout, expected_out)