update_payload: import cros_payload_unittest.py as is
import from chromite/cli/cros/cros_payload_unittest.py
BUG=chromium:865845
TEST=none
Change-Id: I4b1d0a65e1e0d19cd230e90651075afad26eded2
Reviewed-on: https://chromium-review.googlesource.com/1144642
Commit-Ready: Amin Hassani <[email protected]>
Tested-by: Amin Hassani <[email protected]>
Reviewed-by: Xiaochu Liu <[email protected]>
diff --git a/scripts/payload_info_unittest.py b/scripts/payload_info_unittest.py
new file mode 100755
index 0000000..d0b43f7
--- /dev/null
+++ b/scripts/payload_info_unittest.py
@@ -0,0 +1,369 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""This module tests the cros payload command."""
+
+from __future__ import print_function
+
+import collections
+import sys
+
+from chromite.lib import constants
+from chromite.cli.cros import cros_payload
+from chromite.lib import cros_test_lib
+
+# Needed for the update_payload import below.
+sys.path.insert(0, constants.UPDATE_ENGINE_SCRIPTS_PATH)
+
+# TODO(alliewood)(chromium:454629) update once update_payload is moved
+# into chromite
+import update_payload
+from update_payload import update_metadata_pb2
+
+class FakePayloadError(Exception):
+ """A generic error when using the FakePayload."""
+
+class FakeOption(object):
+ """Fake options object for testing."""
+
+ def __init__(self, **kwargs):
+ self.list_ops = False
+ self.stats = False
+ self.signatures = False
+ for key, val in kwargs.iteritems():
+ setattr(self, key, val)
+ if not hasattr(self, 'payload_file'):
+ self.payload_file = None
+
+class FakeOp(object):
+ """Fake manifest operation for testing."""
+
+ def __init__(self, src_extents, dst_extents, op_type, **kwargs):
+ self.src_extents = src_extents
+ self.dst_extents = dst_extents
+ self.type = op_type
+ for key, val in kwargs.iteritems():
+ setattr(self, key, val)
+
+ def HasField(self, field):
+ return hasattr(self, field)
+
+class FakePartition(object):
+ """Fake PartitionUpdate field for testing."""
+
+ def __init__(self, partition_name, operations):
+ self.partition_name = partition_name
+ self.operations = operations
+
+class FakeManifest(object):
+ """Fake manifest for testing."""
+
+ def __init__(self, major_version):
+ FakeExtent = collections.namedtuple('FakeExtent',
+ ['start_block', 'num_blocks'])
+ self.install_operations = [FakeOp([],
+ [FakeExtent(1, 1), FakeExtent(2, 2)],
+ update_payload.common.OpType.REPLACE_BZ,
+ dst_length=3*4096,
+ data_offset=1,
+ data_length=1)]
+ self.kernel_install_operations = [FakeOp(
+ [FakeExtent(1, 1)],
+ [FakeExtent(x, x) for x in xrange(20)],
+ update_payload.common.OpType.SOURCE_COPY,
+ src_length=4096)]
+ if major_version == cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO:
+ self.partitions = [FakePartition('rootfs', self.install_operations),
+ FakePartition('kernel',
+ self.kernel_install_operations)]
+ self.install_operations = self.kernel_install_operations = []
+ self.block_size = 4096
+ self.minor_version = 4
+ FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
+ self.old_rootfs_info = FakePartInfo(1 * 4096)
+ self.old_kernel_info = FakePartInfo(2 * 4096)
+ self.new_rootfs_info = FakePartInfo(3 * 4096)
+ self.new_kernel_info = FakePartInfo(4 * 4096)
+ self.signatures_offset = None
+ self.signatures_size = None
+
+ def HasField(self, field_name):
+ """Fake HasField method based on the python members."""
+ return hasattr(self, field_name) and getattr(self, field_name) is not None
+
+class FakeHeader(object):
+ """Fake payload header for testing."""
+
+ def __init__(self, version, manifest_len, metadata_signature_len):
+ self.version = version
+ self.manifest_len = manifest_len
+ self.metadata_signature_len = metadata_signature_len
+
+ @property
+ def size(self):
+ return (20 if self.version == cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS
+ else 24)
+
+
+class FakePayload(object):
+ """Fake payload for testing."""
+
+ def __init__(self, major_version):
+ self._header = FakeHeader(major_version, 222, 0)
+ self.header = None
+ self._manifest = FakeManifest(major_version)
+ self.manifest = None
+
+ self._blobs = {}
+ self._payload_signatures = update_metadata_pb2.Signatures()
+ self._metadata_signatures = update_metadata_pb2.Signatures()
+
+ def Init(self):
+ """Fake Init that sets header and manifest.
+
+ Failing to call Init() will not make header and manifest available to the
+ test.
+ """
+ self.header = self._header
+ self.manifest = self._manifest
+
+ def ReadDataBlob(self, offset, length):
+ """Return the blob that should be present at the offset location"""
+ if not offset in self._blobs:
+ raise FakePayloadError('Requested blob at unknown offset %d' % offset)
+ blob = self._blobs[offset]
+ if len(blob) != length:
+ raise FakePayloadError('Read blob with the wrong length (expect: %d, '
+ 'actual: %d)' % (len(blob), length))
+ return blob
+
+ @staticmethod
+ def _AddSignatureToProto(proto, **kwargs):
+ """Add a new Signature element to the passed proto."""
+ new_signature = proto.signatures.add()
+ for key, val in kwargs.iteritems():
+ setattr(new_signature, key, val)
+
+ def AddPayloadSignature(self, **kwargs):
+ self._AddSignatureToProto(self._payload_signatures, **kwargs)
+ blob = self._payload_signatures.SerializeToString()
+ self._manifest.signatures_offset = 1234
+ self._manifest.signatures_size = len(blob)
+ self._blobs[self._manifest.signatures_offset] = blob
+
+ def AddMetadataSignature(self, **kwargs):
+ self._AddSignatureToProto(self._metadata_signatures, **kwargs)
+ if self._header.metadata_signature_len:
+ del self._blobs[-self._header.metadata_signature_len]
+ blob = self._metadata_signatures.SerializeToString()
+ self._header.metadata_signature_len = len(blob)
+ self._blobs[-len(blob)] = blob
+
+
+class PayloadCommandTest(cros_test_lib.MockOutputTestCase):
+ """Test class for our PayloadCommand class."""
+
+ def testDisplayValue(self):
+ """Verify that DisplayValue prints what we expect."""
+ with self.OutputCapturer() as output:
+ cros_payload.DisplayValue('key', 'value')
+ stdout = output.GetStdout()
+ self.assertEquals(stdout, 'key: value\n')
+
+ def testRun(self):
+ """Verify that Run parses and displays the payload like we expect."""
+ payload_cmd = cros_payload.PayloadCommand(FakeOption(action='show'))
+ self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+ cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 1
+Manifest length: 222
+Number of operations: 1
+Number of kernel ops: 1
+Block size: 4096
+Minor version: 4
+"""
+ self.assertEquals(stdout, expected_out)
+
+ def testListOpsOnVersion1(self):
+ """Verify that the --list_ops option gives the correct output."""
+ payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True,
+ action='show'))
+ self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+ cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 1
+Manifest length: 222
+Number of operations: 1
+Number of kernel ops: 1
+Block size: 4096
+Minor version: 4
+
+Install operations:
+ 0: REPLACE_BZ
+ Data offset: 1
+ Data length: 1
+ Destination: 2 extents (3 blocks)
+ (1,1) (2,2)
+Kernel install operations:
+ 0: SOURCE_COPY
+ Source: 1 extent (1 block)
+ (1,1)
+ Destination: 20 extents (190 blocks)
+ (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
+ (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
+"""
+ self.assertEquals(stdout, expected_out)
+
+ def testListOpsOnVersion2(self):
+ """Verify that the --list_ops option gives the correct output."""
+ payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True,
+ action='show'))
+ self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+ cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO))
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 2
+Manifest length: 222
+Number of partitions: 2
+ Number of "rootfs" ops: 1
+ Number of "kernel" ops: 1
+Block size: 4096
+Minor version: 4
+
+rootfs install operations:
+ 0: REPLACE_BZ
+ Data offset: 1
+ Data length: 1
+ Destination: 2 extents (3 blocks)
+ (1,1) (2,2)
+kernel install operations:
+ 0: SOURCE_COPY
+ Source: 1 extent (1 block)
+ (1,1)
+ Destination: 20 extents (190 blocks)
+ (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
+ (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
+"""
+ self.assertEquals(stdout, expected_out)
+
+ def testStatsOnVersion1(self):
+ """Verify that the --stats option works correctly."""
+ payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True,
+ action='show'))
+ self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+ cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 1
+Manifest length: 222
+Number of operations: 1
+Number of kernel ops: 1
+Block size: 4096
+Minor version: 4
+Blocks read: 11
+Blocks written: 193
+Seeks when writing: 18
+"""
+ self.assertEquals(stdout, expected_out)
+
+ def testStatsOnVersion2(self):
+ """Verify that the --stats option works correctly on version 2."""
+ payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True,
+ action='show'))
+ self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+ cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO))
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 2
+Manifest length: 222
+Number of partitions: 2
+ Number of "rootfs" ops: 1
+ Number of "kernel" ops: 1
+Block size: 4096
+Minor version: 4
+Blocks read: 11
+Blocks written: 193
+Seeks when writing: 18
+"""
+ self.assertEquals(stdout, expected_out)
+
+ def testEmptySignatures(self):
+ """Verify that the --signatures option works with unsigned payloads."""
+ payload_cmd = cros_payload.PayloadCommand(
+ FakeOption(action='show', signatures=True))
+ self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
+ cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 1
+Manifest length: 222
+Number of operations: 1
+Number of kernel ops: 1
+Block size: 4096
+Minor version: 4
+No metadata signatures stored in the payload
+No payload signatures stored in the payload
+"""
+ self.assertEquals(stdout, expected_out)
+
+
+ def testSignatures(self):
+ """Verify that the --signatures option shows the present signatures."""
+ payload_cmd = cros_payload.PayloadCommand(
+ FakeOption(action='show', signatures=True))
+ payload = FakePayload(cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO)
+ payload.AddPayloadSignature(version=1,
+ data='12345678abcdefgh\x00\x01\x02\x03')
+ payload.AddPayloadSignature(data='I am a signature so access is yes.')
+ payload.AddMetadataSignature(data='\x00\x0a\x0c')
+ self.PatchObject(update_payload, 'Payload', return_value=payload)
+
+ with self.OutputCapturer() as output:
+ payload_cmd.Run()
+
+ stdout = output.GetStdout()
+ expected_out = """Payload version: 2
+Manifest length: 222
+Number of partitions: 2
+ Number of "rootfs" ops: 1
+ Number of "kernel" ops: 1
+Block size: 4096
+Minor version: 4
+Metadata signatures blob: file_offset=246 (7 bytes)
+Metadata signatures: (1 entries)
+ version=None, hex_data: (3 bytes)
+ 00 0a 0c | ...
+Payload signatures blob: blob_offset=1234 (64 bytes)
+Payload signatures: (2 entries)
+ version=1, hex_data: (20 bytes)
+ 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
+ 00 01 02 03 | ....
+ version=None, hex_data: (34 bytes)
+ 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
+ 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye
+ 73 2e | s.
+"""
+ self.assertEquals(stdout, expected_out)