summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYu Shan <shanyu@google.com>2018-05-08 18:41:32 -0700
committerYu Shan <shanyu@google.com>2018-05-23 15:17:58 -0700
commitcef5ea2799ead3a8272475c0cda0d2c06665292f (patch)
tree31b1282680c4423b04dc13733be4551c08a1215e
parent9e730f24d0d3afc055c80ed5aa4169f738c4f479 (diff)
downloadattestation-stable.tar.gz
[ATFT] Add unlock avb support.stable
Enhance provision_steps configuration, now support steps in any order. The device would only show complete (green) if all the steps are done. Support 'UnlockAvb' in the provision step. Add 'UNLOCK_CREDENTIAL' (optional) to config file to support customized unlock credential. Change-Id: Ib3fee0b333b1e3d21e379bbe950b235d59ee1c29 Test: Manual test, unit tests. Bug: b/79164608
-rw-r--r--at-factory-tool/atft.py187
-rw-r--r--at-factory-tool/atft_unittest.py345
-rw-r--r--at-factory-tool/atftman.py54
-rw-r--r--at-factory-tool/atftman_unittest.py56
4 files changed, 608 insertions, 34 deletions
diff --git a/at-factory-tool/atft.py b/at-factory-tool/atft.py
index 470554a..ff69770 100644
--- a/at-factory-tool/atft.py
+++ b/at-factory-tool/atft.py
@@ -20,6 +20,7 @@ This tool allows for easy graphical access to common ATFA commands. It also
locates Fastboot devices and can initiate communication between the ATFA and
an Android Things device.
"""
+import copy
from datetime import datetime
import json
import math
@@ -30,6 +31,7 @@ import threading
import time
from atftman import AtftManager
+from atftman import ProvisionState
from atftman import ProvisionStatus
from fastboot_exceptions import DeviceNotFoundException
from fastboot_exceptions import FastbootFailure
@@ -275,6 +277,19 @@ class Atft(wx.Frame):
ID_TOOL_CLEAR = 2
def __init__(self):
+ # If this is set to True, no prerequisites would be checked against manual
+ # operation, such as you can do key provisioning before fusing the vboot key.
+ self.TEST_MODE = False
+
+ self.PROVISION_STEPS = []
+
+ # The default steps included in the auto provisioning process.
+ self.DEFAULT_PROVISION_STEPS = [
+ 'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'Provision']
+
+ # The available provision steps.
+ self.AVAILABLE_PROVISION_STEPS = [
+ 'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'Provision', 'UnlockAvb']
self.configs = self.ParseConfigFile()
@@ -336,6 +351,8 @@ class Atft(wx.Frame):
if not self.log.log_dir_file:
self._SendAlertEvent(self.ALERT_FAIL_TO_CREATE_LOG)
+ self._CheckProvisionSteps()
+
self.StartRefreshingDevices()
self.ChooseProduct(None)
@@ -371,14 +388,6 @@ class Atft(wx.Frame):
self.REBOOT_TIMEOUT = 0
self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa'
- # If this is set to True, no prerequisites would be checked against manual
- # operation, such as you can do key provisioning before fusing the vboot key.
- self.TEST_MODE = False
-
- # The steps included in the auto provisioning process.
- self.PROVISION_STEPS = ["FuseVbootKey", "FusePermAttr", "LockAvb",
- "Provision"]
-
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
if not os.path.exists(config_file_path):
return None
@@ -410,6 +419,74 @@ class Atft(wx.Frame):
return configs
+ def _CheckProvisionSteps(self):
+ """Check whether the "PROVISION_STEPS" config is valid.
+
+ Check the format of "PROVISION_STEPS" config. If TEST_MODE is not set to
+ True, verify that the customized provision steps meet the necessary security
+ requirement.
+ """
+ if not self.PROVISION_STEPS:
+ self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS
+ return
+ try:
+ provision_steps_verified = (
+ self._VerifyProvisionSteps(self.PROVISION_STEPS))
+ except ValueError:
+ self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS
+ self._SendAlertEvent(self.ALERT_PROVISION_STEPS_SYNTAX_ERROR)
+ return
+
+ if self.TEST_MODE:
+ return
+ if not provision_steps_verified:
+ self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS
+ self._SendAlertEvent(self.ALERT_PROVISION_STEPS_SECURITY_REQ)
+
+
+ def _VerifyProvisionSteps(self, provision_steps):
+ """Verify if the customized provision steps meet security requirements.
+
+ Args:
+ provision_steps: The customized provision steps to verify.
+ Raises:
+ ValueError: If the syntax for provision_steps is not correct.
+ """
+ if not isinstance(provision_steps, list):
+ raise ValueError()
+ provision_state = ProvisionState()
+ for operation in provision_steps:
+ if operation not in self.AVAILABLE_PROVISION_STEPS:
+ raise ValueError()
+ if operation == 'FuseVbootKey':
+ provision_state.bootloader_locked = True
+ continue
+ elif operation == 'FusePermAttr':
+ if (not provision_state.bootloader_locked or
+ provision_state.avb_perm_attr_set):
+ return False
+ else:
+ provision_state.avb_perm_attr_set = True
+ continue
+ elif operation == 'LockAvb':
+ if (not provision_state.bootloader_locked or
+ not provision_state.avb_perm_attr_set):
+ return False
+ else:
+ provision_state.avb_locked = True
+ continue
+ elif operation == 'UnlockAvb':
+ provision_state.avb_locked = False
+ continue
+ elif operation == 'Provision':
+ if (not provision_state.bootloader_locked or
+ not provision_state.avb_perm_attr_set or
+ provision_state.provisioned):
+ return False
+ else:
+ provision_state.provisioned = True
+ return True
+
def _StoreConfigToFile(self):
"""Store the configuration to the configuration file.
@@ -590,12 +667,24 @@ class Atft(wx.Frame):
'Cannot lock android verified boot for device that is not fused '
'permanent attributes or already locked!',
'无法锁定一个没有烧录过产品信息或者已经锁定AVB的设备!'][index]
+ self.ALERT_UNLOCKAVB_UNLOCKED = [
+ 'Cannot unlock android verified boot for device that is already '
+ 'unlocked!',
+ '无法解锁一个已经解锁AVB的设备'][index]
self.ALERT_PROV_PROVED = [
'Cannot provision device that is not ready for provisioning or '
'already provisioned!',
'无法传输密钥给一个不在正确状态或者已经拥有密钥的设备!'][index]
-
-
+ self.ALERT_PROVISION_STEPS_SYNTAX_ERROR = [
+ 'Config "PROVISION_STEPS" is not an array or contains unsupported '
+ 'operations',
+ '设置项"PROVISION_STEPS"不是一个数组或者包含不支持的步骤'][index]
+ self.ALERT_PROVISION_STEPS_SECURITY_REQ = [
+ 'Config "PROVISION_STEPS" does not meet the necessary security '
+ 'requirement, please check again or set TEST_MODE to true if you are '
+ 'really sure what you are doing.',
+ '设置项"PROVISION_STEPS"不符合必要的信息安全要求,请检查或者设定TEST_MODE为True'
+ '如果你明确此行为带来的后果.'][index]
def InitializeUI(self):
"""Initialize the application UI."""
@@ -1249,7 +1338,7 @@ class Atft(wx.Frame):
self.StopRefresh()
self.Destroy()
- def _is_provision_success(self, target):
+ def _is_provision_steps_finished(self, provision_state):
"""Check if the target device has successfully finished provision steps.
Args:
@@ -1258,17 +1347,23 @@ class Atft(wx.Frame):
success if the target device has already gone through the provision steps
successfully.
"""
- if len(self.PROVISION_STEPS) == 0:
- return True;
- if self.PROVISION_STEPS[-1] == 'Provision':
- return target.provision_state.provisioned
- if self.PROVISION_STEPS[-1] == 'LockAvb':
- return target.provision_state.avb_locked
- if self.PROVISION_STEPS[-1] == 'FusePermAttr':
- return target.provision_state.avb_perm_attr_set
- if self.PROVISION_STEPS[-1] == 'FuseVbootKey':
- return target.provision_state.bootloader_locked
- return True;
+ final_state = copy.deepcopy(provision_state)
+ for operation in self.PROVISION_STEPS:
+ if operation == 'FuseVbootKey':
+ final_state.bootloader_locked = True
+ continue
+ elif operation == 'FusePermAttr':
+ final_state.avb_perm_attr_set = True
+ continue
+ elif operation == 'LockAvb':
+ final_state.avb_locked = True
+ continue
+ elif operation == 'UnlockAvb':
+ final_state.avb_locked = False
+ continue
+ elif operation == 'Provision':
+ final_state.provisioned = True
+ return (provision_state == final_state);
def _HandleAutoProv(self):
"""Do the state transition for devices if in auto provisioning mode.
@@ -1277,7 +1372,7 @@ class Atft(wx.Frame):
# All idle devices -> waiting.
for target_dev in self.atft_manager.target_devs:
if (target_dev.serial_number not in self.auto_dev_serials and
- not self._is_provision_success(target_dev) and
+ not self._is_provision_steps_finished(target_dev.provision_state) and
not ProvisionStatus.isFailed(target_dev.provision_status)
):
self.auto_dev_serials.append(target_dev.serial_number)
@@ -1828,6 +1923,46 @@ class Atft(wx.Frame):
self._SendOperationSucceedEvent(operation, target)
+ def _UnlockAvb(self, selected_serials):
+ """Unlock android verified boot for selected devices.
+
+ Args:
+ selected_serials: The list of serial numbers for the selected devices.
+ """
+ pending_targets = []
+ for serial in selected_serials:
+ target = self.atft_manager.GetTargetDevice(serial)
+ if not target:
+ continue
+ if (self.TEST_MODE or target.provision_state.avb_locked):
+ target.provision_status = ProvisionStatus.WAITING
+ pending_targets.append(target)
+ else:
+ self._SendAlertEvent(self.ALERT_UNLOCKAVB_UNLOCKED)
+
+ for target in pending_targets:
+ self._UnlockAvbTarget(target)
+
+ def _UnlockAvbTarget(self, target):
+ """Unlock android verified boot for the specific target device.
+
+ Args:
+ target: The target device DeviceInfo object.
+ """
+ operation = 'Unlock android verified boot'
+ self._SendOperationStartEvent(operation, target)
+ self.PauseRefresh()
+
+ try:
+ self.atft_manager.UnlockAvb(target)
+ except FastbootFailure as e:
+ self._HandleException('E', e, operation)
+ return
+ finally:
+ self.ResumeRefresh()
+
+ self._SendOperationSucceedEvent(operation, target)
+
def _CheckLowKeyAlert(self):
"""Check whether the attestation key is lower than the threshold.
@@ -1995,10 +2130,12 @@ class Atft(wx.Frame):
operation == 'FusePermAttr'):
self._FusePermAttrTarget(target)
continue
- elif (not target.provision_state.avb_locked and
- operation == 'LockAvb'):
+ elif (not target.provision_state.avb_locked and operation == 'LockAvb'):
self._LockAvbTarget(target)
continue
+ elif (target.provision_state.avb_locked and operation == 'UnlockAvb'):
+ self._UnlockAvbTarget(target)
+ continue
elif (not target.provision_state.provisioned and
operation == 'Provision'):
self._ProvisionTarget(target)
diff --git a/at-factory-tool/atft_unittest.py b/at-factory-tool/atft_unittest.py
index 122704d..187de2a 100644
--- a/at-factory-tool/atft_unittest.py
+++ b/at-factory-tool/atft_unittest.py
@@ -49,10 +49,6 @@ class MockAtft(atft.Atft):
self.LANGUAGE = 'ENG'
self.REBOOT_TIMEOUT = 1.0
self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa'
- # Disable the test mode. (This mode is just for usage test, not unit test)
- self.TEST_MODE = False
- self.PROVISION_STEPS = ["FuseVbootKey", "FusePermAttr", "LockAvb",
- "Provision"]
return {}
@@ -359,6 +355,9 @@ class AtftTest(unittest.TestCase):
mock_atft = MockAtft()
test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
ProvisionStatus.PROVISION_SUCCESS)
+ test_dev1.provision_state.bootloader_locked = True
+ test_dev1.provision_state.avb_perm_attr_set = True
+ test_dev1.provision_state.avb_locked = True
test_dev1.provision_state.provisioned = True
test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1,
ProvisionStatus.IDLE)
@@ -416,7 +415,9 @@ class AtftTest(unittest.TestCase):
# Test atft._HandleStateTransition
def MockStateChange(self, target, state):
- target.provision_status = state
+ if ProvisionStatus.isFailed(state):
+ target.provision_status = state
+ return
if state == ProvisionStatus.REBOOT_SUCCESS:
target.provision_state.bootloader_locked = True
if state == ProvisionStatus.FUSEATTR_SUCCESS:
@@ -425,6 +426,19 @@ class AtftTest(unittest.TestCase):
target.provision_state.avb_locked = True
if state == ProvisionStatus.PROVISION_SUCCESS:
target.provision_state.provisioned = True
+ if state == ProvisionStatus.UNLOCKAVB_SUCCESS:
+ target.provision_state.avb_locked = False
+ if target.provision_state.provisioned:
+ target.provision_status = ProvisionStatus.PROVISION_SUCCESS
+ return
+ if target.provision_state.avb_locked:
+ target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS
+ return
+ if target.provision_state.avb_perm_attr_set:
+ target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS
+ return
+ if target.provision_state.bootloader_locked:
+ target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS
def testHandleStateTransition(self):
mock_atft = MockAtft()
@@ -638,6 +652,189 @@ class AtftTest(unittest.TestCase):
self.assertEqual(True, test_dev1.provision_state.avb_locked)
self.assertEqual(True, test_dev1.provision_state.provisioned)
+ def testHandleStateTransitionIncludeUnlock(self):
+ """Test the provision_steps that unlock avb after provisioning.
+
+ We assume that the device would be locked avb during fuse vboot key and
+ we want the final state to be avb unlocked.
+ """
+ mock_atft = MockAtft()
+ test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+ ProvisionStatus.WAITING)
+ mock_atft._FuseVbootKeyTarget = MagicMock()
+ mock_atft._FuseVbootKeyTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._FusePermAttrTarget = MagicMock()
+ mock_atft._FusePermAttrTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._ProvisionTarget = MagicMock()
+ mock_atft._ProvisionTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._UnlockAvbTarget = MagicMock()
+ mock_atft._UnlockAvbTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.UNLOCKAVB_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._LockAvbTarget = MagicMock()
+ mock_atft._LockAvbTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+ self.MockStateChange(target, state))
+
+ mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'LockAvb',
+ 'Provision', 'UnlockAvb']
+ mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+ mock_atft.auto_prov = True
+ mock_atft.atft_manager = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+ mock_atft._HandleStateTransition(test_dev1)
+ mock_atft._FuseVbootKeyTarget.assert_called_once()
+ mock_atft._LockAvbTarget.assert_called_once()
+ mock_atft._UnlockAvbTarget.assert_called_once()
+ mock_atft._FusePermAttrTarget.assert_called_once()
+ mock_atft._ProvisionTarget.assert_called_once()
+ self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+ self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+ self.assertEqual(False, test_dev1.provision_state.avb_locked)
+ self.assertEqual(True, test_dev1.provision_state.provisioned)
+ self.assertEqual(
+ ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status)
+ self.assertEqual(
+ True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
+ def testHandleStateTransitionLockUnlockLock(self):
+ mock_atft = MockAtft()
+ test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+ ProvisionStatus.WAITING)
+ mock_atft._FuseVbootKeyTarget = MagicMock()
+ mock_atft._FuseVbootKeyTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._FusePermAttrTarget = MagicMock()
+ mock_atft._FusePermAttrTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._ProvisionTarget = MagicMock()
+ mock_atft._ProvisionTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._UnlockAvbTarget = MagicMock()
+ mock_atft._UnlockAvbTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.UNLOCKAVB_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._LockAvbTarget = MagicMock()
+ mock_atft._LockAvbTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+ self.MockStateChange(target, state))
+
+ mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'LockAvb',
+ 'Provision', 'UnlockAvb', 'LockAvb', 'UnlockAvb']
+ mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+ mock_atft.auto_prov = True
+ mock_atft.atft_manager = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+ mock_atft._HandleStateTransition(test_dev1)
+ self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+ self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+ self.assertEqual(False, test_dev1.provision_state.avb_locked)
+ self.assertEqual(True, test_dev1.provision_state.provisioned)
+ self.assertEqual(
+ ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status)
+ self.assertEqual(
+ True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
+ def testHandleStateTransitionReorder(self):
+ """Test the provision_steps that has been reordered.
+
+ We should make sure all the steps are executed even if they are reordered.
+ """
+ mock_atft = MockAtft()
+ test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+ ProvisionStatus.WAITING)
+ mock_atft._FuseVbootKeyTarget = MagicMock()
+ mock_atft._FuseVbootKeyTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._FusePermAttrTarget = MagicMock()
+ mock_atft._FusePermAttrTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._ProvisionTarget = MagicMock()
+ mock_atft._ProvisionTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._LockAvbTarget = MagicMock()
+ mock_atft._LockAvbTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+ self.MockStateChange(target, state))
+
+ mock_atft.PROVISION_STEPS = ['FusePermAttr', 'FuseVbootKey', 'Provision',
+ 'LockAvb']
+ mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+ mock_atft.auto_prov = True
+ mock_atft.atft_manager = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+ mock_atft._HandleStateTransition(test_dev1)
+ mock_atft._FuseVbootKeyTarget.assert_called_once()
+ mock_atft._LockAvbTarget.assert_called_once()
+ mock_atft._FusePermAttrTarget.assert_called_once()
+ mock_atft._ProvisionTarget.assert_called_once()
+ self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+ self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+ self.assertEqual(True, test_dev1.provision_state.avb_locked)
+ self.assertEqual(True, test_dev1.provision_state.provisioned)
+ self.assertEqual(
+ ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status)
+ self.assertEqual(
+ True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
+ def testHandleStateTransitionNoProvision(self):
+ """Test the provision_steps that does not provision key.
+ """
+ mock_atft = MockAtft()
+ test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+ ProvisionStatus.WAITING)
+ mock_atft._FuseVbootKeyTarget = MagicMock()
+ mock_atft._FuseVbootKeyTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._FusePermAttrTarget = MagicMock()
+ mock_atft._FusePermAttrTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._ProvisionTarget = MagicMock()
+ mock_atft._ProvisionTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+ self.MockStateChange(target, state))
+ mock_atft._LockAvbTarget = MagicMock()
+ mock_atft._LockAvbTarget.side_effect = (
+ lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+ self.MockStateChange(target, state))
+
+ mock_atft.PROVISION_STEPS = ['FusePermAttr', 'FuseVbootKey', 'LockAvb']
+ mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+ mock_atft.auto_prov = True
+ mock_atft.atft_manager = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice = MagicMock()
+ mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+ mock_atft._HandleStateTransition(test_dev1)
+ mock_atft._FuseVbootKeyTarget.assert_called_once()
+ mock_atft._LockAvbTarget.assert_called_once()
+ mock_atft._FusePermAttrTarget.assert_called_once()
+ mock_atft._ProvisionTarget.assert_not_called()
+ self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+ self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+ self.assertEqual(True, test_dev1.provision_state.avb_locked)
+ self.assertEqual(False, test_dev1.provision_state.provisioned)
+ self.assertEqual(
+ ProvisionStatus.LOCKAVB_SUCCESS, test_dev1.provision_status)
+ self.assertEqual(
+ True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
# Test atft._CheckATFAStatus
def testCheckATFAStatus(self):
mock_atft = MockAtft()
@@ -1213,6 +1410,144 @@ class AtftTest(unittest.TestCase):
mock_atft._SendOperationStartEvent.assert_called_once()
mock_atft._SendOperationSucceedEvent.assert_not_called()
+ def testCheckProvisionStepsSuccess(self):
+ mock_atft = MockAtft()
+ mock_atft._SendAlertEvent = MagicMock()
+ mock_atft.PROVISION_STEPS = []
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test [step1], [step1, step2], [step1, step2, step3] ...
+ for i in range(1, len(mock_atft.DEFAULT_PROVISION_STEPS)):
+ provision_steps = []
+ for j in range(0, i):
+ provision_steps.append(mock_atft.DEFAULT_PROVISION_STEPS[j])
+ mock_atft.PROVISION_STEPS = provision_steps
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertEqual(
+ provision_steps, mock_atft.PROVISION_STEPS)
+
+ def testCheckProvisionStepsInvalidSyntax(self):
+ mock_atft = MockAtft()
+ mock_atft._SendAlertEvent = MagicMock()
+ # Test invalid format (not array).
+ mock_atft.PROVISION_STEPS = '1234'
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test invalid operation.
+ mock_atft.PROVISION_STEPS = ['1234']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Even if TEST_MODE is true, syntax error is still failure.
+ mock_atft.TEST_MODE = True
+ mock_atft.PROVISION_STEPS = '1234'
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ mock_atft.PROVISION_STEPS = ['1234']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ def testCheckProvisionStepsSecurityReq(self):
+ # Test cases when the provision steps do not meet security requirement.
+ mock_atft = MockAtft()
+ mock_atft._SendAlertEvent = MagicMock()
+ # Test fuse perm attr without fusing vboot key.
+ mock_atft.PROVISION_STEPS = ['FusePermAttr']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test fuse perm attr when already fused.
+ mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test LockAvb when vboot key is not fused.
+ mock_atft.PROVISION_STEPS = ['LockAvb']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test LockAvb when perm attr not fused.
+ mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'LockAvb']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test provision when perm attr not fused.
+ mock_atft.PROVISION_STEPS = [
+ 'FuseVbootKey', 'LockAvb', 'Provision']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_called_once()
+ mock_atft._SendAlertEvent.reset_mock()
+ self.assertEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # All the tests should succeed if TEST_MODE is set to True
+
+ # Test fuse perm attr without fusing vboot key.
+ mock_atft.TEST_MODE = True
+ mock_atft.PROVISION_STEPS = ['FusePermAttr']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertNotEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test fuse perm attr when already fused.
+ mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertNotEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test LockAvb when vboot key is not fused.
+ mock_atft.PROVISION_STEPS = ['LockAvb']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertNotEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test LockAvb when perm attr not fused.
+ mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'LockAvb']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertNotEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+ # Test provision when perm attr not fused.
+ mock_atft.PROVISION_STEPS = [
+ 'FuseVbootKey', 'LockAvb', 'Provision']
+ mock_atft._CheckProvisionSteps()
+ mock_atft._SendAlertEvent.assert_not_called()
+ self.assertNotEqual(
+ mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
if __name__ == '__main__':
unittest.main()
diff --git a/at-factory-tool/atftman.py b/at-factory-tool/atftman.py
index f47dfba..d2569a8 100644
--- a/at-factory-tool/atftman.py
+++ b/at-factory-tool/atftman.py
@@ -65,7 +65,11 @@ class ProvisionStatus(object):
LOCKAVB_FAILED = (40 + _FAILED)
PROVISION_ING = (50 + _PROCESSING)
PROVISION_SUCCESS = (50 + _SUCCESS)
- PROVISION_FAILED = ( + _FAILED)
+ PROVISION_FAILED = (50 + _FAILED)
+ UNLOCKAVB_ING = (60 + _PROCESSING)
+ UNLOCKAVB_SUCCESS = (60 + _SUCCESS)
+ UNLOCKAVB_FAILED = (60 + _FAILED)
+
STRING_MAP = {
IDLE : ['Idle', '初始'],
@@ -85,8 +89,10 @@ class ProvisionStatus(object):
LOCKAVB_FAILED : ['Lock Android Verified Boot Failed', '锁定AVB失败'],
PROVISION_ING : ['Provisioning Attestation Key', '传输密钥中...'],
PROVISION_SUCCESS : ['Attestation Key Provisioned', '传输密钥成功'],
- PROVISION_FAILED : ['Provision Attestation Key Failed', '传输密钥失败']
-
+ PROVISION_FAILED : ['Provision Attestation Key Failed', '传输密钥失败'],
+ UNLOCKAVB_ING : ['Unlocking AVB', '解锁AVB中...'],
+ UNLOCKAVB_SUCCESS : ['AVB Unlocked', '已解锁AVB'],
+ UNLOCKAVB_FAILED : ['Unlock AVB Failed', '解锁AVB失败']
}
@staticmethod
@@ -107,12 +113,28 @@ class ProvisionStatus(object):
class ProvisionState(object):
- """The provision state of the target device."""
+ """The provision state of the target device.
+
+ Attributes:
+ bootloader_locked: Whether bootloader is locked.
+ avb_perm_attr_set: Whether permanent attribute is set.
+ avb_locked: Whether avb is locked.
+ provisioned: Whether the device has product key provisioned.
+ """
bootloader_locked = False
avb_perm_attr_set = False
avb_locked = False
provisioned = False
+ def __eq__(self, other):
+ return (self.bootloader_locked == other.bootloader_locked and
+ self.avb_perm_attr_set == other.avb_perm_attr_set and
+ self.avb_locked == other.avb_locked and
+ self.provisioned == other.provisioned)
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
class ProductInfo(object):
"""The information about a product.
@@ -267,6 +289,9 @@ class AtftManager(object):
self.ATFA_REBOOT_TIMEOUT = float(configs['ATFA_REBOOT_TIMEOUT'])
except ValueError:
pass
+ self.UNLOCK_CREDENTIAL = None
+ if configs and 'UNLOCK_CREDENTIAL' in configs:
+ self.UNLOCK_CREDENTIAL = configs['UNLOCK_CREDENTIAL']
# The serial numbers for the devices that are at least seen twice.
self.stable_serials = []
@@ -768,6 +793,27 @@ class AtftManager(object):
target.provision_status = ProvisionStatus.LOCKAVB_FAILED
raise e
+ def UnlockAvb(self, target):
+ """Unlock the android verified boot for the target.
+
+ Args:
+ target: The target device.
+ Raises:
+ FastbootFailure: When fastboot command fails.
+ """
+ try:
+ target.provision_status = ProvisionStatus.UNLOCKAVB_ING
+ unlock_command = 'at-unlock-vboot'
+ if self.UNLOCK_CREDENTIAL:
+ unlock_command += ' ' + self.UNLOCK_CREDENTIAL
+ target.Oem(unlock_command)
+ self.CheckProvisionStatus(target)
+ if target.provision_state.avb_locked:
+ raise FastbootFailure('Status not updated')
+ except FastbootFailure as e:
+ target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED
+ raise e
+
def Reboot(self, target, timeout, success_callback, timeout_callback):
"""Reboot the target device.
diff --git a/at-factory-tool/atftman_unittest.py b/at-factory-tool/atftman_unittest.py
index 0c2ea10..c101f27 100644
--- a/at-factory-tool/atftman_unittest.py
+++ b/at-factory-tool/atftman_unittest.py
@@ -100,6 +100,7 @@ class AtftManTest(unittest.TestCase):
self.configs = {}
self.configs['ATFA_REBOOT_TIMEOUT'] = 30
self.configs['DEFAULT_KEY_THRESHOLD'] = 100
+ self.configs['UNLOCK_CREDENTIAL'] = None
# Test AtftManager.ListDevices
class MockInstantTimer(object):
@@ -1172,6 +1173,61 @@ class AtftManTest(unittest.TestCase):
self.assertEqual(
ProvisionStatus.LOCKAVB_FAILED, mock_target.provision_status)
+ # Test AtftManager.LockAvb
+ def MockSetUnlockAvbSuccess(self, target):
+ target.provision_status = ProvisionStatus.UNLOCKAVB_SUCCESS
+ target.provision_state = ProvisionState()
+ target.provision_state.avb_locked = False
+
+ def MockSetUnlockAvbFail(self, target):
+ target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED
+ target.provision_state = ProvisionState()
+ target.provision_state.avb_locked = True
+
+ def testUnlockAvb(self):
+ atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+ self.mock_serial_mapper, self.configs)
+ mock_target = MagicMock()
+ atft_manager.CheckProvisionStatus = MagicMock()
+ atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess
+ atft_manager.UnlockAvb(mock_target)
+ mock_target.Oem.assert_called_once_with('at-unlock-vboot')
+ self.assertEqual(
+ ProvisionStatus.UNLOCKAVB_SUCCESS, mock_target.provision_status)
+
+ def testUnlockAvbWithCredential(self):
+ atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+ self.mock_serial_mapper, self.configs)
+ mock_target = MagicMock()
+ atft_manager.CheckProvisionStatus = MagicMock()
+ atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess
+ atft_manager.UNLOCK_CREDENTIAL = 'test'
+ atft_manager.UnlockAvb(mock_target)
+ mock_target.Oem.assert_called_once_with('at-unlock-vboot test')
+ self.assertEqual(
+ ProvisionStatus.UNLOCKAVB_SUCCESS, mock_target.provision_status)
+
+ def testUnlockAvbFail(self):
+ atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+ self.mock_serial_mapper, self.configs)
+ mock_target = MagicMock()
+ atft_manager.CheckProvisionStatus = MagicMock()
+ atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbFail
+ with self.assertRaises(FastbootFailure):
+ atft_manager.UnlockAvb(mock_target)
+
+ def testUnlockAvbFastbootFailure(self):
+ atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+ self.mock_serial_mapper, self.configs)
+ mock_target = MagicMock()
+ atft_manager.CheckProvisionStatus = MagicMock()
+ atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess
+ mock_target.Oem.side_effect = FastbootFailure('')
+ with self.assertRaises(FastbootFailure):
+ atft_manager.UnlockAvb(mock_target)
+ self.assertEqual(
+ ProvisionStatus.UNLOCKAVB_FAILED, mock_target.provision_status)
+
# Test AtftManager.Reboot
class MockTimer(object):
def __init__(self, interval, callback):