aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYi Chou <yich@google.com>2022-04-11 18:26:30 +0800
committerChromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-05-31 13:35:37 +0000
commitb08b3d00e6d8c138ff09e32f62e47420cfe9e7d9 (patch)
treeafd4da58811210ce9aad554d6ecad073765abc82
parentfb359b20cc53859d69b4e98ee289a26aeebcdab4 (diff)
downloadautotest-b08b3d00e6d8c138ff09e32f62e47420cfe9e7d9.tar.gz
autotest: Replace the cryptohome to tpm_manager
The cryptohome --action=status had been deprecated, we should use the tpm_manager_client BUG=b:228800370 TEST=CQ Change-Id: Icc3cb5a4f6443c2adb5b1f5236573c5317079f32 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/3580403 Reviewed-by: Derek Beckett <dbeckett@chromium.org> Tested-by: Yi Chou <yich@google.com> Reviewed-by: Leo Lai <cylai@google.com> Commit-Queue: Yi Chou <yich@google.com>
-rw-r--r--server/hosts/cros_repair.py85
-rwxr-xr-xserver/hosts/cros_repair_unittest.py161
2 files changed, 70 insertions, 176 deletions
diff --git a/server/hosts/cros_repair.py b/server/hosts/cros_repair.py
index b44448de3c..4d727122c2 100644
--- a/server/hosts/cros_repair.py
+++ b/server/hosts/cros_repair.py
@@ -7,7 +7,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-import json
import logging
import math
import six
@@ -401,28 +400,19 @@ class TPMStatusVerifier(hosts.Verifier):
return
try:
- status = CryptohomeStatus(host)
+ status = TpmStatus(host)
except hosts.AutoservVerifyError:
logging.info('Cannot determine the Cryptohome valid status - '
'skipping check.')
return
try:
- tpm = status['tpm']
- if not tpm['enabled']:
+ if not status['is_enabled']:
raise hosts.AutoservVerifyError(
'TPM is not enabled -- Hardware is not working.')
- if not tpm['can_connect']:
- raise hosts.AutoservVerifyError(
- ('TPM connect failed -- '
- 'last_error=%d.' % tpm['last_error']))
- if tpm['owned'] and not tpm['can_load_srk']:
- raise hosts.AutoservVerifyError(
- 'Cannot load the TPM SRK')
- if tpm['can_load_srk'] and not tpm['can_load_srk_pubkey']:
- raise hosts.AutoservVerifyError(
- 'Cannot load the TPM SRK public key')
+ if status['is_owned'] and not status['is_srk_default_auth']:
+ raise hosts.AutoservVerifyError('Cannot load the TPM SRK')
except KeyError:
- logging.info('Cannot determine the Cryptohome valid status - '
+ logging.info('Cannot determine the TPM valid status - '
'skipping check.')
@property
@@ -691,7 +681,7 @@ class JetstreamTpmVerifier(hosts.Verifier):
def verify(self, host):
# pylint: disable=missing-docstring
try:
- status = CryptohomeStatus(host)
+ status = TpmStatus(host)
if not status.tpm_enabled:
raise hosts.AutoservVerifyError('TPM is not enabled')
if not status.tpm_owned:
@@ -1883,72 +1873,59 @@ def _is_virtual_machine(host):
'qemu' in output.stdout.lower())
-class CryptohomeStatus(dict):
+class TpmStatus(dict):
"""Wrapper for getting cryptohome status from a host."""
def __init__(self, host):
- super(CryptohomeStatus, self).__init__()
- self.update(_get_cryptohome_status(host))
- self.tpm = self['tpm']
+ super(TpmStatus, self).__init__()
+ self.update(_get_tpm_status(host))
@property
def tpm_enabled(self):
# pylint: disable=missing-docstring
- return self.tpm.get('enabled') == True
+ return self.get('is_enabled') == True
@property
def tpm_owned(self):
# pylint: disable=missing-docstring
- return self.tpm.get('owned') == True
+ return self.get('is_owned') == True
@property
def tpm_can_load_srk(self):
# pylint: disable=missing-docstring
- return self.tpm.get('can_load_srk') == True
+ return self.tpm_owned and self.get('is_srk_default_auth') == True
@property
def tpm_can_load_srk_pubkey(self):
# pylint: disable=missing-docstring
- return self.tpm.get('can_load_srk_pubkey') == True
+ return self.tpm_owned and self.get('is_srk_default_auth') == True
-def _get_cryptohome_status(host):
- """Returns a dictionary containing the cryptohome status.
+def _get_tpm_status(host):
+ """Returns a dictionary containing the TPM status.
@param host: a hosts.Host object.
- @returns A dictionary containing the cryptohome status.
+ @returns A dictionary containing the TPM status.
@raises AutoservVerifyError: if the output could not be parsed or the TPM
status is missing.
@raises hosts.AutoservRunError: if the cryptohome command failed.
"""
- # This cryptohome command emits status information in JSON format. It
- # looks something like this:
- # {
- # "installattrs": {
- # ...
- # },
- # "mounts": [ {
- # ...
- # } ],
- # "tpm": {
- # "being_owned": false,
- # "can_connect": true,
- # "can_decrypt": false,
- # "can_encrypt": false,
- # "can_load_srk": true,
- # "can_load_srk_pubkey": true,
- # "enabled": true,
- # "has_context": true,
- # "has_cryptohome_key": false,
- # "has_key_handle": false,
- # "last_error": 0,
- # "owned": true
- # }
- # }
try:
- output = host.run('cryptohome --action=status').stdout.strip()
- status = json.loads(output)
- if 'tpm' not in status:
+ output = host.run(
+ 'tpm_manager_client status --nonsensitive').stdout.strip()
+ lines = output.split('\n')[1:-1]
+ status = {}
+ for item in lines:
+ item = item.split(':')
+ if not item[0]:
+ continue
+ if len(item) == 1:
+ item.append('')
+ item = [x.strip() for x in item]
+ item[1] = True if item[1] == 'true' else item[1]
+ item[1] = False if item[1] == 'false' else item[1]
+ status[item[0]] = item[1]
+ if status['status'] != 'STATUS_SUCCESS':
raise hosts.AutoservVerifyError('TPM status is missing')
return status
except ValueError:
diff --git a/server/hosts/cros_repair_unittest.py b/server/hosts/cros_repair_unittest.py
index d5dab456a1..bc3979ab29 100755
--- a/server/hosts/cros_repair_unittest.py
+++ b/server/hosts/cros_repair_unittest.py
@@ -210,90 +210,36 @@ JETSTREAM_REPAIR_ACTIONS = (
)),
)
-CRYPTOHOME_STATUS_OWNED = """{
- "installattrs": {
- "first_install": true,
- "initialized": true,
- "invalid": false,
- "lockbox_index": 536870916,
- "lockbox_nvram_version": 2,
- "secure": true,
- "size": 0,
- "version": 1
- },
- "mounts": [ ],
- "tpm": {
- "being_owned": false,
- "can_connect": true,
- "can_decrypt": false,
- "can_encrypt": false,
- "can_load_srk": true,
- "can_load_srk_pubkey": true,
- "enabled": true,
- "has_context": true,
- "has_cryptohome_key": false,
- "has_key_handle": false,
- "last_error": 0,
- "owned": true
- }
+TPM_STATUS_OWNED = """
+Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] {
+ status: STATUS_SUCCESS
+ is_enabled: true
+ is_owned: true
+ is_owner_password_present: true
+ has_reset_lock_permissions: true
+ is_srk_default_auth: true
}
"""
-CRYPTOHOME_STATUS_NOT_OWNED = """{
- "installattrs": {
- "first_install": true,
- "initialized": true,
- "invalid": false,
- "lockbox_index": 536870916,
- "lockbox_nvram_version": 2,
- "secure": true,
- "size": 0,
- "version": 1
- },
- "mounts": [ ],
- "tpm": {
- "being_owned": false,
- "can_connect": true,
- "can_decrypt": false,
- "can_encrypt": false,
- "can_load_srk": false,
- "can_load_srk_pubkey": false,
- "enabled": true,
- "has_context": true,
- "has_cryptohome_key": false,
- "has_key_handle": false,
- "last_error": 0,
- "owned": false
- }
+TPM_STATUS_NOT_OWNED = """
+Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] {
+ status: STATUS_SUCCESS
+ is_enabled: true
+ is_owned: false
+ is_owner_password_present: false
+ has_reset_lock_permissions: false
+ is_srk_default_auth: true
}
"""
-CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{
- "installattrs": {
- "first_install": true,
- "initialized": true,
- "invalid": false,
- "lockbox_index": 536870916,
- "lockbox_nvram_version": 2,
- "secure": true,
- "size": 0,
- "version": 1
- },
- "mounts": [ ],
- "tpm": {
- "being_owned": false,
- "can_connect": true,
- "can_decrypt": false,
- "can_encrypt": false,
- "can_load_srk": false,
- "can_load_srk_pubkey": false,
- "enabled": true,
- "has_context": true,
- "has_cryptohome_key": false,
- "has_key_handle": false,
- "last_error": 0,
- "owned": true
- }
+TPM_STATUS_CANNOT_LOAD_SRK = """
+Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] {
+ status: STATUS_SUCCESS
+ is_enabled: true
+ is_owned: true
+ is_owner_password_present: false
+ has_reset_lock_permissions: false
+ is_srk_default_auth: false
}
"""
@@ -359,47 +305,19 @@ class CrosRepairUnittests(unittest.TestCase):
for label in deps + triggers:
self.assertIn(label, verify_labels)
- def test_get_cryptohome_status_owned(self):
+ def test_get_tpm_status_owned(self):
mock_host = mock.Mock()
- mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
- status = cros_repair.CryptohomeStatus(mock_host)
- self.assertDictEqual({
- 'being_owned': False,
- 'can_connect': True,
- 'can_decrypt': False,
- 'can_encrypt': False,
- 'can_load_srk': True,
- 'can_load_srk_pubkey': True,
- 'enabled': True,
- 'has_context': True,
- 'has_cryptohome_key': False,
- 'has_key_handle': False,
- 'last_error': 0,
- 'owned': True,
- }, status['tpm'])
+ mock_host.run.return_value.stdout = TPM_STATUS_OWNED
+ status = cros_repair.TpmStatus(mock_host)
self.assertTrue(status.tpm_enabled)
self.assertTrue(status.tpm_owned)
self.assertTrue(status.tpm_can_load_srk)
self.assertTrue(status.tpm_can_load_srk_pubkey)
- def test_get_cryptohome_status_not_owned(self):
+ def test_get_tpm_status_not_owned(self):
mock_host = mock.Mock()
- mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
- status = cros_repair.CryptohomeStatus(mock_host)
- self.assertDictEqual({
- 'being_owned': False,
- 'can_connect': True,
- 'can_decrypt': False,
- 'can_encrypt': False,
- 'can_load_srk': False,
- 'can_load_srk_pubkey': False,
- 'enabled': True,
- 'has_context': True,
- 'has_cryptohome_key': False,
- 'has_key_handle': False,
- 'last_error': 0,
- 'owned': False,
- }, status['tpm'])
+ mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED
+ status = cros_repair.TpmStatus(mock_host)
self.assertTrue(status.tpm_enabled)
self.assertFalse(status.tpm_owned)
self.assertFalse(status.tpm_can_load_srk)
@@ -409,7 +327,7 @@ class CrosRepairUnittests(unittest.TestCase):
def test_tpm_status_verifier_owned(self, mock_is_virt):
mock_is_virt.return_value = False
mock_host = mock.Mock()
- mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
+ mock_host.run.return_value.stdout = TPM_STATUS_OWNED
tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
tpm_verifier.verify(mock_host)
@@ -417,7 +335,7 @@ class CrosRepairUnittests(unittest.TestCase):
def test_tpm_status_verifier_not_owned(self, mock_is_virt):
mock_is_virt.return_value = False
mock_host = mock.Mock()
- mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
+ mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED
tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
tpm_verifier.verify(mock_host)
@@ -425,7 +343,7 @@ class CrosRepairUnittests(unittest.TestCase):
def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
mock_is_virt.return_value = False
mock_host = mock.Mock()
- mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK
+ mock_host.run.return_value.stdout = TPM_STATUS_CANNOT_LOAD_SRK
tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
tpm_verifier.verify(mock_host)
@@ -434,8 +352,8 @@ class CrosRepairUnittests(unittest.TestCase):
def test_jetstream_tpm_owned(self):
mock_host = mock.Mock()
mock_host.run.side_effect = [
- mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
- mock.Mock(stdout=TPM_STATUS_READY),
+ mock.Mock(stdout=TPM_STATUS_OWNED),
+ mock.Mock(stdout=TPM_STATUS_READY),
]
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
tpm_verifier.verify(mock_host)
@@ -446,7 +364,7 @@ class CrosRepairUnittests(unittest.TestCase):
def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
mock_time.side_effect = itertools.count(0, 20)
mock_host = mock.Mock()
- mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
+ mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
tpm_verifier.verify(mock_host)
@@ -459,8 +377,8 @@ class CrosRepairUnittests(unittest.TestCase):
mock_time.side_effect = itertools.count(0, 20)
mock_host = mock.Mock()
mock_host.run.side_effect = itertools.cycle([
- mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
- mock.Mock(stdout=TPM_STATUS_NOT_READY),
+ mock.Mock(stdout=TPM_STATUS_OWNED),
+ mock.Mock(stdout=TPM_STATUS_NOT_READY),
])
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
@@ -470,8 +388,7 @@ class CrosRepairUnittests(unittest.TestCase):
@mock.patch.object(retry.logging, 'warning')
@mock.patch.object(retry.time, 'time')
@mock.patch.object(retry.time, 'sleep')
- def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time,
- mock_logging):
+ def test_jetstream_tpm_missing(self, mock_sleep, mock_time, mock_logging):
mock_time.side_effect = itertools.count(0, 20)
mock_host = mock.Mock()
mock_host.run.side_effect = error.AutoservRunError('test', None)