| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """A module containing rootfs handler class.""" |
| |
| import os |
| import re |
| |
| TMP_FILE_NAME = 'kernel_dump' |
| |
| _KERNEL_MAP = {'A': '2', 'B': '4'} |
| _ROOTFS_MAP = {'A': '3', 'B': '5'} |
| _DM_DEVICE = 'verifyroot' |
| _DM_DEV_PATH = os.path.join('/dev/mapper', _DM_DEVICE) |
| |
| |
| class RootfsHandler(object): |
| """An object to provide ChromeOS root FS related actions. |
| |
| It provides functions to verify the integrity of the root FS. |
| |
| @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface |
| """ |
| |
| def __init__(self, os_if): |
| self.os_if = os_if |
| self.root_dev = None |
| self.kernel_dump_file = None |
| self.initialized = False |
| |
| def verify_rootfs(self, section): |
| """Verifies the integrity of the root FS. |
| |
| @param section: The rootfs to verify. May be A or B. |
| """ |
| kernel_path = self.os_if.join_part(self.root_dev, |
| _KERNEL_MAP[section.upper()]) |
| rootfs_path = self.os_if.join_part(self.root_dev, |
| _ROOTFS_MAP[section.upper()]) |
| # vbutil_kernel won't operate on a device, only a file. |
| self.os_if.run_shell_command( |
| 'dd if=%s of=%s' % (kernel_path, self.kernel_dump_file)) |
| vbutil_kernel = self.os_if.run_shell_command_get_output( |
| 'vbutil_kernel --verify %s --verbose' % self.kernel_dump_file) |
| DM_REGEXP = re.compile( |
| r'dm="(?:1 )?vroot none ro(?: 1)?,(0 (\d+) .+)"') |
| match = DM_REGEXP.search('\n'.join(vbutil_kernel)) |
| if not match: |
| return False |
| |
| table = match.group(1) |
| partition_size = int(match.group(2)) * 512 |
| |
| if table.find('PARTUUID=%U/PARTNROFF=1') < 0: |
| return False |
| table = table.replace('PARTUUID=%U/PARTNROFF=1', rootfs_path) |
| # Cause I/O error on invalid bytes |
| table += ' error_behavior=eio' |
| |
| self._remove_mapper() |
| assert not self.os_if.path_exists(_DM_DEV_PATH) |
| self.os_if.run_shell_command( |
| "dmsetup create -r %s --table '%s'" % (_DM_DEVICE, table)) |
| assert self.os_if.path_exists(_DM_DEV_PATH) |
| try: |
| count = self.os_if.get_file_size(_DM_DEV_PATH) |
| return count == partition_size |
| except: |
| return False |
| finally: |
| self._remove_mapper() |
| |
| def _remove_mapper(self): |
| """Removes the dm device mapper used by this class.""" |
| if self.os_if.path_exists(_DM_DEV_PATH): |
| self.os_if.run_shell_command_get_output( |
| 'dmsetup remove %s' % _DM_DEVICE) |
| |
| def init(self): |
| """Initialize the rootfs handler object.""" |
| self.root_dev = self.os_if.get_root_dev() |
| self.kernel_dump_file = self.os_if.state_dir_file(TMP_FILE_NAME) |
| self.initialized = True |