| #!/usr/bin/python2 |
| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import itertools |
| import mock |
| import unittest |
| |
| import common |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib import hosts |
| from autotest_lib.client.common_lib.cros import retry |
| from autotest_lib.server.hosts import cros_firmware |
| from autotest_lib.server.hosts import cros_repair |
| from autotest_lib.server.hosts import repair_utils |
| |
| |
| CROS_VERIFY_DAG = ( |
| (repair_utils.PingVerifier, 'ping', ()), |
| (repair_utils.SshVerifier, 'ssh', ('ping', )), |
| (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()), |
| (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )), |
| (cros_repair.DevModeVerifier, 'devmode', ('ssh', )), |
| (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )), |
| (cros_repair.HWIDVerifier, 'hwid', ('ssh', )), |
| (cros_repair.ACPowerVerifier, 'power', ('ssh', )), |
| (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )), |
| (cros_repair.WritableVerifier, 'writable', ('ssh', )), |
| (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )), |
| (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )), |
| (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )), |
| (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )), |
| (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )), |
| (cros_repair.PythonVerifier, 'python', ('ssh', )), |
| (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )), |
| (cros_repair.CrosVerisionVerifier, 'cros_version_label', ('ssh', )), |
| (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh', )), |
| (cros_repair.DUTStorageVerifier, 'storage', ('ssh', )), |
| ) |
| |
| CROS_REPAIR_ACTIONS = ( |
| (repair_utils.RPMCycleRepair, 'rpm', (), ( |
| 'ping', |
| 'ssh', |
| 'power', |
| )), |
| (cros_repair.ServoResetRepair, 'servoreset', (), ( |
| 'ping', |
| 'ssh', |
| 'stop_start_ui', |
| 'power', |
| )), |
| ( |
| cros_repair.ServoCr50RebootRepair, |
| 'cr50_reset', |
| (), |
| ('ping', 'ssh', 'stop_start_ui', 'power'), |
| ), |
| (cros_repair.ServoSysRqRepair, 'sysrq', (), ( |
| 'ping', |
| 'ssh', |
| )), |
| (cros_repair.LabelCleanupRepair, 'label_cleanup', ('ssh', ), |
| ('cros_version_label', )), |
| (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (), |
| ('ping', 'ssh', 'fwstatus', 'good_provision')), |
| (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ), |
| ('dev_default_boot', )), |
| (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), ( |
| 'devmode', |
| 'writable', |
| )), |
| (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ), |
| ('enrollment_state', )), |
| (cros_repair.ProvisionRepair, 'provision', |
| ('ping', 'ssh', 'writable', 'stop_start_ui', 'tpm', 'good_provision', |
| 'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', |
| 'dev_default_boot')), |
| (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable', |
| 'stop_start_ui'), |
| ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus', |
| 'python', 'hwid', 'cros', 'dev_default_boot')), |
| (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), |
| ('ping', 'ssh', 'writable', 'stop_start_ui', 'tpm', 'good_provision', |
| 'ext4', 'power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', |
| 'dev_default_boot', 'faft_tpm')), |
| (cros_firmware.GeneralFirmwareRepair, 'general_firmware', |
| ('usb_drive', ), ( |
| 'ping', |
| 'ssh', |
| )), |
| ) |
| |
| MOBLAB_VERIFY_DAG = ( |
| (repair_utils.SshVerifier, 'ssh', ()), |
| (cros_repair.ACPowerVerifier, 'power', ('ssh',)), |
| (cros_repair.PythonVerifier, 'python', ('ssh',)), |
| (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), |
| ) |
| |
| MOBLAB_REPAIR_ACTIONS = ( |
| (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), |
| (cros_repair.ProvisionRepair, |
| 'provision', ('ssh',), ('power', 'python', 'cros',)), |
| ) |
| |
| JETSTREAM_VERIFY_DAG = ( |
| (repair_utils.PingVerifier, 'ping', ()), |
| (repair_utils.SshVerifier, 'ssh', ('ping', )), |
| (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()), |
| (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )), |
| (cros_repair.DevModeVerifier, 'devmode', ('ssh', )), |
| (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )), |
| (cros_repair.HWIDVerifier, 'hwid', ('ssh', )), |
| (cros_repair.ACPowerVerifier, 'power', ('ssh', )), |
| (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )), |
| (cros_repair.WritableVerifier, 'writable', ('ssh', )), |
| (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )), |
| (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )), |
| (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )), |
| (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )), |
| (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )), |
| (cros_repair.PythonVerifier, 'python', ('ssh', )), |
| (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )), |
| (cros_repair.CrosVerisionVerifier, 'cros_version_label', ('ssh', )), |
| (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh', )), |
| (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation', |
| ('ssh', )), |
| (cros_repair.JetstreamServicesVerifier, 'jetstream_services', |
| ('ssh', )), |
| ) |
| |
| JETSTREAM_REPAIR_ACTIONS = ( |
| (repair_utils.RPMCycleRepair, 'rpm', (), ( |
| 'ping', |
| 'ssh', |
| 'power', |
| )), |
| (cros_repair.ServoResetRepair, 'servoreset', (), ( |
| 'ping', |
| 'ssh', |
| )), |
| (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (), ( |
| 'ping', |
| 'ssh', |
| )), |
| (cros_repair.ServoSysRqRepair, 'sysrq', (), ( |
| 'ping', |
| 'ssh', |
| )), |
| (cros_repair.LabelCleanupRepair, 'label_cleanup', ('ssh', ), |
| ('cros_version_label', )), |
| (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (), |
| ('ping', 'ssh', 'fwstatus', 'good_provision')), |
| (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ), |
| ('dev_default_boot', )), |
| (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), ( |
| 'devmode', |
| 'writable', |
| )), |
| (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ), |
| ('enrollment_state', )), |
| (cros_repair.JetstreamTpmRepair, 'jetstream_tpm_repair', |
| ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4'), |
| ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', |
| 'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation')), |
| (cros_repair.JetstreamServiceRepair, 'jetstream_service_repair', |
| ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4', |
| 'jetstream_tpm', 'jetstream_attestation'), |
| ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', |
| 'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation', |
| 'jetstream_services')), |
| (cros_repair.ProvisionRepair, 'provision', |
| ('ping', 'ssh', 'writable', 'tpm', 'good_provision', |
| 'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', |
| 'dev_default_boot', 'jetstream_tpm', |
| 'jetstream_attestation', 'jetstream_services')), |
| (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'), |
| ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus', |
| 'python', 'hwid', 'cros', 'dev_default_boot', 'jetstream_tpm', |
| 'jetstream_attestation', 'jetstream_services')), |
| (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), ( |
| 'ping', |
| 'ssh', |
| 'writable', |
| 'tpm', |
| 'good_provision', |
| 'ext4', |
| 'power', |
| 'rwfw', |
| 'fwstatus', |
| 'python', |
| 'hwid', |
| 'cros', |
| 'dev_default_boot', |
| 'jetstream_tpm', |
| 'jetstream_attestation', |
| 'jetstream_services', |
| 'faft_tpm', |
| )), |
| ) |
| |
| CRYPTOHOME_STATUS_OWNED = """{ |
| "installattrs": { |
| "first_install": true, |
| "initialized": true, |
| "invalid": false, |
| "lockbox_index": 536870916, |
| "lockbox_nvram_version": 2, |
| "secure": true, |
| "size": 0, |
| "version": 1 |
| }, |
| "mounts": [ ], |
| "tpm": { |
| "being_owned": false, |
| "can_connect": true, |
| "can_decrypt": false, |
| "can_encrypt": false, |
| "can_load_srk": true, |
| "can_load_srk_pubkey": true, |
| "enabled": true, |
| "has_context": true, |
| "has_cryptohome_key": false, |
| "has_key_handle": false, |
| "last_error": 0, |
| "owned": true |
| } |
| } |
| """ |
| |
| CRYPTOHOME_STATUS_NOT_OWNED = """{ |
| "installattrs": { |
| "first_install": true, |
| "initialized": true, |
| "invalid": false, |
| "lockbox_index": 536870916, |
| "lockbox_nvram_version": 2, |
| "secure": true, |
| "size": 0, |
| "version": 1 |
| }, |
| "mounts": [ ], |
| "tpm": { |
| "being_owned": false, |
| "can_connect": true, |
| "can_decrypt": false, |
| "can_encrypt": false, |
| "can_load_srk": false, |
| "can_load_srk_pubkey": false, |
| "enabled": true, |
| "has_context": true, |
| "has_cryptohome_key": false, |
| "has_key_handle": false, |
| "last_error": 0, |
| "owned": false |
| } |
| } |
| """ |
| |
| CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{ |
| "installattrs": { |
| "first_install": true, |
| "initialized": true, |
| "invalid": false, |
| "lockbox_index": 536870916, |
| "lockbox_nvram_version": 2, |
| "secure": true, |
| "size": 0, |
| "version": 1 |
| }, |
| "mounts": [ ], |
| "tpm": { |
| "being_owned": false, |
| "can_connect": true, |
| "can_decrypt": false, |
| "can_encrypt": false, |
| "can_load_srk": false, |
| "can_load_srk_pubkey": false, |
| "enabled": true, |
| "has_context": true, |
| "has_cryptohome_key": false, |
| "has_key_handle": false, |
| "last_error": 0, |
| "owned": true |
| } |
| } |
| """ |
| |
| TPM_STATUS_READY = """ |
| TPM Enabled: true |
| TPM Owned: true |
| TPM Being Owned: false |
| TPM Ready: true |
| TPM Password: 9eaee4da8b4c |
| """ |
| |
| TPM_STATUS_NOT_READY = """ |
| TPM Enabled: true |
| TPM Owned: false |
| TPM Being Owned: true |
| TPM Ready: false |
| TPM Password: |
| """ |
| |
| |
| class CrosRepairUnittests(unittest.TestCase): |
| # pylint: disable=missing-docstring |
| |
| maxDiff = None |
| |
| def test_cros_repair(self): |
| verify_dag = cros_repair._cros_verify_dag() |
| self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG) |
| self.check_verify_dag(verify_dag) |
| repair_actions = cros_repair._cros_repair_actions() |
| self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS) |
| self.check_repair_actions(verify_dag, repair_actions) |
| |
| def test_moblab_repair(self): |
| verify_dag = cros_repair._moblab_verify_dag() |
| self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG) |
| self.check_verify_dag(verify_dag) |
| repair_actions = cros_repair._moblab_repair_actions() |
| self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS) |
| self.check_repair_actions(verify_dag, repair_actions) |
| |
| def test_jetstream_repair(self): |
| verify_dag = cros_repair._jetstream_verify_dag() |
| self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG) |
| self.check_verify_dag(verify_dag) |
| repair_actions = cros_repair._jetstream_repair_actions() |
| self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS) |
| self.check_repair_actions(verify_dag, repair_actions) |
| |
| def check_verify_dag(self, verify_dag): |
| """Checks that dependency labels are defined.""" |
| labels = [n[1] for n in verify_dag] |
| for node in verify_dag: |
| for dep in node[2]: |
| self.assertIn(dep, labels) |
| |
| def check_repair_actions(self, verify_dag, repair_actions): |
| """Checks that dependency and trigger labels are defined.""" |
| verify_labels = [n[1] for n in verify_dag] |
| for action in repair_actions: |
| deps = action[2] |
| triggers = action[3] |
| for label in deps + triggers: |
| self.assertIn(label, verify_labels) |
| |
| def test_get_cryptohome_status_owned(self): |
| mock_host = mock.Mock() |
| mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED |
| status = cros_repair.CryptohomeStatus(mock_host) |
| self.assertDictEqual({ |
| 'being_owned': False, |
| 'can_connect': True, |
| 'can_decrypt': False, |
| 'can_encrypt': False, |
| 'can_load_srk': True, |
| 'can_load_srk_pubkey': True, |
| 'enabled': True, |
| 'has_context': True, |
| 'has_cryptohome_key': False, |
| 'has_key_handle': False, |
| 'last_error': 0, |
| 'owned': True, |
| }, status['tpm']) |
| self.assertTrue(status.tpm_enabled) |
| self.assertTrue(status.tpm_owned) |
| self.assertTrue(status.tpm_can_load_srk) |
| self.assertTrue(status.tpm_can_load_srk_pubkey) |
| |
| def test_get_cryptohome_status_not_owned(self): |
| mock_host = mock.Mock() |
| mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED |
| status = cros_repair.CryptohomeStatus(mock_host) |
| self.assertDictEqual({ |
| 'being_owned': False, |
| 'can_connect': True, |
| 'can_decrypt': False, |
| 'can_encrypt': False, |
| 'can_load_srk': False, |
| 'can_load_srk_pubkey': False, |
| 'enabled': True, |
| 'has_context': True, |
| 'has_cryptohome_key': False, |
| 'has_key_handle': False, |
| 'last_error': 0, |
| 'owned': False, |
| }, status['tpm']) |
| self.assertTrue(status.tpm_enabled) |
| self.assertFalse(status.tpm_owned) |
| self.assertFalse(status.tpm_can_load_srk) |
| self.assertFalse(status.tpm_can_load_srk_pubkey) |
| |
| @mock.patch.object(cros_repair, '_is_virtual_machine') |
| def test_tpm_status_verifier_owned(self, mock_is_virt): |
| mock_is_virt.return_value = False |
| mock_host = mock.Mock() |
| mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED |
| tpm_verifier = cros_repair.TPMStatusVerifier('test', []) |
| tpm_verifier.verify(mock_host) |
| |
| @mock.patch.object(cros_repair, '_is_virtual_machine') |
| def test_tpm_status_verifier_not_owned(self, mock_is_virt): |
| mock_is_virt.return_value = False |
| mock_host = mock.Mock() |
| mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED |
| tpm_verifier = cros_repair.TPMStatusVerifier('test', []) |
| tpm_verifier.verify(mock_host) |
| |
| @mock.patch.object(cros_repair, '_is_virtual_machine') |
| def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): |
| mock_is_virt.return_value = False |
| mock_host = mock.Mock() |
| mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK |
| tpm_verifier = cros_repair.TPMStatusVerifier('test', []) |
| with self.assertRaises(hosts.AutoservVerifyError) as ctx: |
| tpm_verifier.verify(mock_host) |
| self.assertEqual('Cannot load the TPM SRK', |
| ctx.exception.message) |
| |
| def test_jetstream_tpm_owned(self): |
| mock_host = mock.Mock() |
| mock_host.run.side_effect = [ |
| mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), |
| mock.Mock(stdout=TPM_STATUS_READY), |
| ] |
| tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) |
| tpm_verifier.verify(mock_host) |
| |
| @mock.patch.object(retry.logging, 'warning') |
| @mock.patch.object(retry.time, 'time') |
| @mock.patch.object(retry.time, 'sleep') |
| def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): |
| mock_time.side_effect = itertools.count(0, 20) |
| mock_host = mock.Mock() |
| mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED |
| tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) |
| with self.assertRaises(hosts.AutoservVerifyError) as ctx: |
| tpm_verifier.verify(mock_host) |
| self.assertEqual('TPM is not owned', ctx.exception.message) |
| |
| @mock.patch.object(retry.logging, 'warning') |
| @mock.patch.object(retry.time, 'time') |
| @mock.patch.object(retry.time, 'sleep') |
| def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging): |
| mock_time.side_effect = itertools.count(0, 20) |
| mock_host = mock.Mock() |
| mock_host.run.side_effect = itertools.cycle([ |
| mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), |
| mock.Mock(stdout=TPM_STATUS_NOT_READY), |
| ]) |
| tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) |
| with self.assertRaises(hosts.AutoservVerifyError) as ctx: |
| tpm_verifier.verify(mock_host) |
| self.assertEqual('TPM is not ready', ctx.exception.message) |
| |
| @mock.patch.object(retry.logging, 'warning') |
| @mock.patch.object(retry.time, 'time') |
| @mock.patch.object(retry.time, 'sleep') |
| def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time, |
| mock_logging): |
| mock_time.side_effect = itertools.count(0, 20) |
| mock_host = mock.Mock() |
| mock_host.run.side_effect = error.AutoservRunError('test', None) |
| tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) |
| with self.assertRaises(hosts.AutoservVerifyError) as ctx: |
| tpm_verifier.verify(mock_host) |
| self.assertEqual('Could not determine TPM status', |
| ctx.exception.message) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |