diff options
author | Yi Chou <yich@google.com> | 2022-04-11 18:26:30 +0800 |
---|---|---|
committer | Chromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-05-31 13:35:37 +0000 |
commit | b08b3d00e6d8c138ff09e32f62e47420cfe9e7d9 (patch) | |
tree | afd4da58811210ce9aad554d6ecad073765abc82 | |
parent | fb359b20cc53859d69b4e98ee289a26aeebcdab4 (diff) | |
download | autotest-b08b3d00e6d8c138ff09e32f62e47420cfe9e7d9.tar.gz |
autotest: Replace the cryptohome to tpm_manager
The cryptohome --action=status had been deprecated, we should use the
tpm_manager_client
BUG=b:228800370
TEST=CQ
Change-Id: Icc3cb5a4f6443c2adb5b1f5236573c5317079f32
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/3580403
Reviewed-by: Derek Beckett <dbeckett@chromium.org>
Tested-by: Yi Chou <yich@google.com>
Reviewed-by: Leo Lai <cylai@google.com>
Commit-Queue: Yi Chou <yich@google.com>
-rw-r--r-- | server/hosts/cros_repair.py | 85 | ||||
-rwxr-xr-x | server/hosts/cros_repair_unittest.py | 161 |
2 files changed, 70 insertions, 176 deletions
diff --git a/server/hosts/cros_repair.py b/server/hosts/cros_repair.py index b44448de3c..4d727122c2 100644 --- a/server/hosts/cros_repair.py +++ b/server/hosts/cros_repair.py @@ -7,7 +7,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import json import logging import math import six @@ -401,28 +400,19 @@ class TPMStatusVerifier(hosts.Verifier): return try: - status = CryptohomeStatus(host) + status = TpmStatus(host) except hosts.AutoservVerifyError: logging.info('Cannot determine the Cryptohome valid status - ' 'skipping check.') return try: - tpm = status['tpm'] - if not tpm['enabled']: + if not status['is_enabled']: raise hosts.AutoservVerifyError( 'TPM is not enabled -- Hardware is not working.') - if not tpm['can_connect']: - raise hosts.AutoservVerifyError( - ('TPM connect failed -- ' - 'last_error=%d.' % tpm['last_error'])) - if tpm['owned'] and not tpm['can_load_srk']: - raise hosts.AutoservVerifyError( - 'Cannot load the TPM SRK') - if tpm['can_load_srk'] and not tpm['can_load_srk_pubkey']: - raise hosts.AutoservVerifyError( - 'Cannot load the TPM SRK public key') + if status['is_owned'] and not status['is_srk_default_auth']: + raise hosts.AutoservVerifyError('Cannot load the TPM SRK') except KeyError: - logging.info('Cannot determine the Cryptohome valid status - ' + logging.info('Cannot determine the TPM valid status - ' 'skipping check.') @property @@ -691,7 +681,7 @@ class JetstreamTpmVerifier(hosts.Verifier): def verify(self, host): # pylint: disable=missing-docstring try: - status = CryptohomeStatus(host) + status = TpmStatus(host) if not status.tpm_enabled: raise hosts.AutoservVerifyError('TPM is not enabled') if not status.tpm_owned: @@ -1883,72 +1873,59 @@ def _is_virtual_machine(host): 'qemu' in output.stdout.lower()) -class CryptohomeStatus(dict): +class TpmStatus(dict): """Wrapper for getting cryptohome status from a host.""" def __init__(self, host): - super(CryptohomeStatus, self).__init__() - self.update(_get_cryptohome_status(host)) - self.tpm = self['tpm'] + super(TpmStatus, self).__init__() + self.update(_get_tpm_status(host)) @property def tpm_enabled(self): # pylint: disable=missing-docstring - return self.tpm.get('enabled') == True + return self.get('is_enabled') == True @property def tpm_owned(self): # pylint: disable=missing-docstring - return self.tpm.get('owned') == True + return self.get('is_owned') == True @property def tpm_can_load_srk(self): # pylint: disable=missing-docstring - return self.tpm.get('can_load_srk') == True + return self.tpm_owned and self.get('is_srk_default_auth') == True @property def tpm_can_load_srk_pubkey(self): # pylint: disable=missing-docstring - return self.tpm.get('can_load_srk_pubkey') == True + return self.tpm_owned and self.get('is_srk_default_auth') == True -def _get_cryptohome_status(host): - """Returns a dictionary containing the cryptohome status. +def _get_tpm_status(host): + """Returns a dictionary containing the TPM status. @param host: a hosts.Host object. - @returns A dictionary containing the cryptohome status. + @returns A dictionary containing the TPM status. @raises AutoservVerifyError: if the output could not be parsed or the TPM status is missing. @raises hosts.AutoservRunError: if the cryptohome command failed. """ - # This cryptohome command emits status information in JSON format. It - # looks something like this: - # { - # "installattrs": { - # ... - # }, - # "mounts": [ { - # ... - # } ], - # "tpm": { - # "being_owned": false, - # "can_connect": true, - # "can_decrypt": false, - # "can_encrypt": false, - # "can_load_srk": true, - # "can_load_srk_pubkey": true, - # "enabled": true, - # "has_context": true, - # "has_cryptohome_key": false, - # "has_key_handle": false, - # "last_error": 0, - # "owned": true - # } - # } try: - output = host.run('cryptohome --action=status').stdout.strip() - status = json.loads(output) - if 'tpm' not in status: + output = host.run( + 'tpm_manager_client status --nonsensitive').stdout.strip() + lines = output.split('\n')[1:-1] + status = {} + for item in lines: + item = item.split(':') + if not item[0]: + continue + if len(item) == 1: + item.append('') + item = [x.strip() for x in item] + item[1] = True if item[1] == 'true' else item[1] + item[1] = False if item[1] == 'false' else item[1] + status[item[0]] = item[1] + if status['status'] != 'STATUS_SUCCESS': raise hosts.AutoservVerifyError('TPM status is missing') return status except ValueError: diff --git a/server/hosts/cros_repair_unittest.py b/server/hosts/cros_repair_unittest.py index d5dab456a1..bc3979ab29 100755 --- a/server/hosts/cros_repair_unittest.py +++ b/server/hosts/cros_repair_unittest.py @@ -210,90 +210,36 @@ JETSTREAM_REPAIR_ACTIONS = ( )), ) -CRYPTOHOME_STATUS_OWNED = """{ - "installattrs": { - "first_install": true, - "initialized": true, - "invalid": false, - "lockbox_index": 536870916, - "lockbox_nvram_version": 2, - "secure": true, - "size": 0, - "version": 1 - }, - "mounts": [ ], - "tpm": { - "being_owned": false, - "can_connect": true, - "can_decrypt": false, - "can_encrypt": false, - "can_load_srk": true, - "can_load_srk_pubkey": true, - "enabled": true, - "has_context": true, - "has_cryptohome_key": false, - "has_key_handle": false, - "last_error": 0, - "owned": true - } +TPM_STATUS_OWNED = """ +Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] { + status: STATUS_SUCCESS + is_enabled: true + is_owned: true + is_owner_password_present: true + has_reset_lock_permissions: true + is_srk_default_auth: true } """ -CRYPTOHOME_STATUS_NOT_OWNED = """{ - "installattrs": { - "first_install": true, - "initialized": true, - "invalid": false, - "lockbox_index": 536870916, - "lockbox_nvram_version": 2, - "secure": true, - "size": 0, - "version": 1 - }, - "mounts": [ ], - "tpm": { - "being_owned": false, - "can_connect": true, - "can_decrypt": false, - "can_encrypt": false, - "can_load_srk": false, - "can_load_srk_pubkey": false, - "enabled": true, - "has_context": true, - "has_cryptohome_key": false, - "has_key_handle": false, - "last_error": 0, - "owned": false - } +TPM_STATUS_NOT_OWNED = """ +Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] { + status: STATUS_SUCCESS + is_enabled: true + is_owned: false + is_owner_password_present: false + has_reset_lock_permissions: false + is_srk_default_auth: true } """ -CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{ - "installattrs": { - "first_install": true, - "initialized": true, - "invalid": false, - "lockbox_index": 536870916, - "lockbox_nvram_version": 2, - "secure": true, - "size": 0, - "version": 1 - }, - "mounts": [ ], - "tpm": { - "being_owned": false, - "can_connect": true, - "can_decrypt": false, - "can_encrypt": false, - "can_load_srk": false, - "can_load_srk_pubkey": false, - "enabled": true, - "has_context": true, - "has_cryptohome_key": false, - "has_key_handle": false, - "last_error": 0, - "owned": true - } +TPM_STATUS_CANNOT_LOAD_SRK = """ +Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] { + status: STATUS_SUCCESS + is_enabled: true + is_owned: true + is_owner_password_present: false + has_reset_lock_permissions: false + is_srk_default_auth: false } """ @@ -359,47 +305,19 @@ class CrosRepairUnittests(unittest.TestCase): for label in deps + triggers: self.assertIn(label, verify_labels) - def test_get_cryptohome_status_owned(self): + def test_get_tpm_status_owned(self): mock_host = mock.Mock() - mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED - status = cros_repair.CryptohomeStatus(mock_host) - self.assertDictEqual({ - 'being_owned': False, - 'can_connect': True, - 'can_decrypt': False, - 'can_encrypt': False, - 'can_load_srk': True, - 'can_load_srk_pubkey': True, - 'enabled': True, - 'has_context': True, - 'has_cryptohome_key': False, - 'has_key_handle': False, - 'last_error': 0, - 'owned': True, - }, status['tpm']) + mock_host.run.return_value.stdout = TPM_STATUS_OWNED + status = cros_repair.TpmStatus(mock_host) self.assertTrue(status.tpm_enabled) self.assertTrue(status.tpm_owned) self.assertTrue(status.tpm_can_load_srk) self.assertTrue(status.tpm_can_load_srk_pubkey) - def test_get_cryptohome_status_not_owned(self): + def test_get_tpm_status_not_owned(self): mock_host = mock.Mock() - mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED - status = cros_repair.CryptohomeStatus(mock_host) - self.assertDictEqual({ - 'being_owned': False, - 'can_connect': True, - 'can_decrypt': False, - 'can_encrypt': False, - 'can_load_srk': False, - 'can_load_srk_pubkey': False, - 'enabled': True, - 'has_context': True, - 'has_cryptohome_key': False, - 'has_key_handle': False, - 'last_error': 0, - 'owned': False, - }, status['tpm']) + mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED + status = cros_repair.TpmStatus(mock_host) self.assertTrue(status.tpm_enabled) self.assertFalse(status.tpm_owned) self.assertFalse(status.tpm_can_load_srk) @@ -409,7 +327,7 @@ class CrosRepairUnittests(unittest.TestCase): def test_tpm_status_verifier_owned(self, mock_is_virt): mock_is_virt.return_value = False mock_host = mock.Mock() - mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED + mock_host.run.return_value.stdout = TPM_STATUS_OWNED tpm_verifier = cros_repair.TPMStatusVerifier('test', []) tpm_verifier.verify(mock_host) @@ -417,7 +335,7 @@ class CrosRepairUnittests(unittest.TestCase): def test_tpm_status_verifier_not_owned(self, mock_is_virt): mock_is_virt.return_value = False mock_host = mock.Mock() - mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED + mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED tpm_verifier = cros_repair.TPMStatusVerifier('test', []) tpm_verifier.verify(mock_host) @@ -425,7 +343,7 @@ class CrosRepairUnittests(unittest.TestCase): def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): mock_is_virt.return_value = False mock_host = mock.Mock() - mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK + mock_host.run.return_value.stdout = TPM_STATUS_CANNOT_LOAD_SRK tpm_verifier = cros_repair.TPMStatusVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: tpm_verifier.verify(mock_host) @@ -434,8 +352,8 @@ class CrosRepairUnittests(unittest.TestCase): def test_jetstream_tpm_owned(self): mock_host = mock.Mock() mock_host.run.side_effect = [ - mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), - mock.Mock(stdout=TPM_STATUS_READY), + mock.Mock(stdout=TPM_STATUS_OWNED), + mock.Mock(stdout=TPM_STATUS_READY), ] tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) tpm_verifier.verify(mock_host) @@ -446,7 +364,7 @@ class CrosRepairUnittests(unittest.TestCase): def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): mock_time.side_effect = itertools.count(0, 20) mock_host = mock.Mock() - mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED + mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: tpm_verifier.verify(mock_host) @@ -459,8 +377,8 @@ class CrosRepairUnittests(unittest.TestCase): mock_time.side_effect = itertools.count(0, 20) mock_host = mock.Mock() mock_host.run.side_effect = itertools.cycle([ - mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED), - mock.Mock(stdout=TPM_STATUS_NOT_READY), + mock.Mock(stdout=TPM_STATUS_OWNED), + mock.Mock(stdout=TPM_STATUS_NOT_READY), ]) tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) with self.assertRaises(hosts.AutoservVerifyError) as ctx: @@ -470,8 +388,7 @@ class CrosRepairUnittests(unittest.TestCase): @mock.patch.object(retry.logging, 'warning') @mock.patch.object(retry.time, 'time') @mock.patch.object(retry.time, 'sleep') - def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time, - mock_logging): + def test_jetstream_tpm_missing(self, mock_sleep, mock_time, mock_logging): mock_time.side_effect = itertools.count(0, 20) mock_host = mock.Mock() mock_host.run.side_effect = error.AutoservRunError('test', None) |