diff options
author | Yu Shan <shanyu@google.com> | 2018-05-08 18:41:32 -0700 |
---|---|---|
committer | Yu Shan <shanyu@google.com> | 2018-05-23 15:17:58 -0700 |
commit | cef5ea2799ead3a8272475c0cda0d2c06665292f (patch) | |
tree | 31b1282680c4423b04dc13733be4551c08a1215e | |
parent | 9e730f24d0d3afc055c80ed5aa4169f738c4f479 (diff) | |
download | attestation-stable.tar.gz |
[ATFT] Add unlock avb support.stable
Enhance provision_steps configuration, now support steps in any
order. The device would only show complete (green) if all the
steps are done. Support 'UnlockAvb' in the provision step.
Add 'UNLOCK_CREDENTIAL' (optional) to config file to support
customized unlock credential.
Change-Id: Ib3fee0b333b1e3d21e379bbe950b235d59ee1c29
Test: Manual test, unit tests.
Bug: b/79164608
-rw-r--r-- | at-factory-tool/atft.py | 187 | ||||
-rw-r--r-- | at-factory-tool/atft_unittest.py | 345 | ||||
-rw-r--r-- | at-factory-tool/atftman.py | 54 | ||||
-rw-r--r-- | at-factory-tool/atftman_unittest.py | 56 |
4 files changed, 608 insertions, 34 deletions
diff --git a/at-factory-tool/atft.py b/at-factory-tool/atft.py index 470554a..ff69770 100644 --- a/at-factory-tool/atft.py +++ b/at-factory-tool/atft.py @@ -20,6 +20,7 @@ This tool allows for easy graphical access to common ATFA commands. It also locates Fastboot devices and can initiate communication between the ATFA and an Android Things device. """ +import copy from datetime import datetime import json import math @@ -30,6 +31,7 @@ import threading import time from atftman import AtftManager +from atftman import ProvisionState from atftman import ProvisionStatus from fastboot_exceptions import DeviceNotFoundException from fastboot_exceptions import FastbootFailure @@ -275,6 +277,19 @@ class Atft(wx.Frame): ID_TOOL_CLEAR = 2 def __init__(self): + # If this is set to True, no prerequisites would be checked against manual + # operation, such as you can do key provisioning before fusing the vboot key. + self.TEST_MODE = False + + self.PROVISION_STEPS = [] + + # The default steps included in the auto provisioning process. + self.DEFAULT_PROVISION_STEPS = [ + 'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'Provision'] + + # The available provision steps. + self.AVAILABLE_PROVISION_STEPS = [ + 'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'Provision', 'UnlockAvb'] self.configs = self.ParseConfigFile() @@ -336,6 +351,8 @@ class Atft(wx.Frame): if not self.log.log_dir_file: self._SendAlertEvent(self.ALERT_FAIL_TO_CREATE_LOG) + self._CheckProvisionSteps() + self.StartRefreshingDevices() self.ChooseProduct(None) @@ -371,14 +388,6 @@ class Atft(wx.Frame): self.REBOOT_TIMEOUT = 0 self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa' - # If this is set to True, no prerequisites would be checked against manual - # operation, such as you can do key provisioning before fusing the vboot key. - self.TEST_MODE = False - - # The steps included in the auto provisioning process. - self.PROVISION_STEPS = ["FuseVbootKey", "FusePermAttr", "LockAvb", - "Provision"] - config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE) if not os.path.exists(config_file_path): return None @@ -410,6 +419,74 @@ class Atft(wx.Frame): return configs + def _CheckProvisionSteps(self): + """Check whether the "PROVISION_STEPS" config is valid. + + Check the format of "PROVISION_STEPS" config. If TEST_MODE is not set to + True, verify that the customized provision steps meet the necessary security + requirement. + """ + if not self.PROVISION_STEPS: + self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS + return + try: + provision_steps_verified = ( + self._VerifyProvisionSteps(self.PROVISION_STEPS)) + except ValueError: + self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS + self._SendAlertEvent(self.ALERT_PROVISION_STEPS_SYNTAX_ERROR) + return + + if self.TEST_MODE: + return + if not provision_steps_verified: + self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS + self._SendAlertEvent(self.ALERT_PROVISION_STEPS_SECURITY_REQ) + + + def _VerifyProvisionSteps(self, provision_steps): + """Verify if the customized provision steps meet security requirements. + + Args: + provision_steps: The customized provision steps to verify. + Raises: + ValueError: If the syntax for provision_steps is not correct. + """ + if not isinstance(provision_steps, list): + raise ValueError() + provision_state = ProvisionState() + for operation in provision_steps: + if operation not in self.AVAILABLE_PROVISION_STEPS: + raise ValueError() + if operation == 'FuseVbootKey': + provision_state.bootloader_locked = True + continue + elif operation == 'FusePermAttr': + if (not provision_state.bootloader_locked or + provision_state.avb_perm_attr_set): + return False + else: + provision_state.avb_perm_attr_set = True + continue + elif operation == 'LockAvb': + if (not provision_state.bootloader_locked or + not provision_state.avb_perm_attr_set): + return False + else: + provision_state.avb_locked = True + continue + elif operation == 'UnlockAvb': + provision_state.avb_locked = False + continue + elif operation == 'Provision': + if (not provision_state.bootloader_locked or + not provision_state.avb_perm_attr_set or + provision_state.provisioned): + return False + else: + provision_state.provisioned = True + return True + def _StoreConfigToFile(self): """Store the configuration to the configuration file. @@ -590,12 +667,24 @@ class Atft(wx.Frame): 'Cannot lock android verified boot for device that is not fused ' 'permanent attributes or already locked!', '无法锁定一个没有烧录过产品信息或者已经锁定AVB的设备!'][index] + self.ALERT_UNLOCKAVB_UNLOCKED = [ + 'Cannot unlock android verified boot for device that is already ' + 'unlocked!', + '无法解锁一个已经解锁AVB的设备'][index] self.ALERT_PROV_PROVED = [ 'Cannot provision device that is not ready for provisioning or ' 'already provisioned!', '无法传输密钥给一个不在正确状态或者已经拥有密钥的设备!'][index] - - + self.ALERT_PROVISION_STEPS_SYNTAX_ERROR = [ + 'Config "PROVISION_STEPS" is not an array or contains unsupported ' + 'operations', + '设置项"PROVISION_STEPS"不是一个数组或者包含不支持的步骤'][index] + self.ALERT_PROVISION_STEPS_SECURITY_REQ = [ + 'Config "PROVISION_STEPS" does not meet the necessary security ' + 'requirement, please check again or set TEST_MODE to true if you are ' + 'really sure what you are doing.', + '设置项"PROVISION_STEPS"不符合必要的信息安全要求,请检查或者设定TEST_MODE为True' + '如果你明确此行为带来的后果.'][index] def InitializeUI(self): """Initialize the application UI.""" @@ -1249,7 +1338,7 @@ class Atft(wx.Frame): self.StopRefresh() self.Destroy() - def _is_provision_success(self, target): + def _is_provision_steps_finished(self, provision_state): """Check if the target device has successfully finished provision steps. Args: @@ -1258,17 +1347,23 @@ class Atft(wx.Frame): success if the target device has already gone through the provision steps successfully. """ - if len(self.PROVISION_STEPS) == 0: - return True; - if self.PROVISION_STEPS[-1] == 'Provision': - return target.provision_state.provisioned - if self.PROVISION_STEPS[-1] == 'LockAvb': - return target.provision_state.avb_locked - if self.PROVISION_STEPS[-1] == 'FusePermAttr': - return target.provision_state.avb_perm_attr_set - if self.PROVISION_STEPS[-1] == 'FuseVbootKey': - return target.provision_state.bootloader_locked - return True; + final_state = copy.deepcopy(provision_state) + for operation in self.PROVISION_STEPS: + if operation == 'FuseVbootKey': + final_state.bootloader_locked = True + continue + elif operation == 'FusePermAttr': + final_state.avb_perm_attr_set = True + continue + elif operation == 'LockAvb': + final_state.avb_locked = True + continue + elif operation == 'UnlockAvb': + final_state.avb_locked = False + continue + elif operation == 'Provision': + final_state.provisioned = True + return (provision_state == final_state); def _HandleAutoProv(self): """Do the state transition for devices if in auto provisioning mode. @@ -1277,7 +1372,7 @@ class Atft(wx.Frame): # All idle devices -> waiting. for target_dev in self.atft_manager.target_devs: if (target_dev.serial_number not in self.auto_dev_serials and - not self._is_provision_success(target_dev) and + not self._is_provision_steps_finished(target_dev.provision_state) and not ProvisionStatus.isFailed(target_dev.provision_status) ): self.auto_dev_serials.append(target_dev.serial_number) @@ -1828,6 +1923,46 @@ class Atft(wx.Frame): self._SendOperationSucceedEvent(operation, target) + def _UnlockAvb(self, selected_serials): + """Unlock android verified boot for selected devices. + + Args: + selected_serials: The list of serial numbers for the selected devices. + """ + pending_targets = [] + for serial in selected_serials: + target = self.atft_manager.GetTargetDevice(serial) + if not target: + continue + if (self.TEST_MODE or target.provision_state.avb_locked): + target.provision_status = ProvisionStatus.WAITING + pending_targets.append(target) + else: + self._SendAlertEvent(self.ALERT_UNLOCKAVB_UNLOCKED) + + for target in pending_targets: + self._UnlockAvbTarget(target) + + def _UnlockAvbTarget(self, target): + """Unlock android verified boot for the specific target device. + + Args: + target: The target device DeviceInfo object. + """ + operation = 'Unlock android verified boot' + self._SendOperationStartEvent(operation, target) + self.PauseRefresh() + + try: + self.atft_manager.UnlockAvb(target) + except FastbootFailure as e: + self._HandleException('E', e, operation) + return + finally: + self.ResumeRefresh() + + self._SendOperationSucceedEvent(operation, target) + def _CheckLowKeyAlert(self): """Check whether the attestation key is lower than the threshold. @@ -1995,10 +2130,12 @@ class Atft(wx.Frame): operation == 'FusePermAttr'): self._FusePermAttrTarget(target) continue - elif (not target.provision_state.avb_locked and - operation == 'LockAvb'): + elif (not target.provision_state.avb_locked and operation == 'LockAvb'): self._LockAvbTarget(target) continue + elif (target.provision_state.avb_locked and operation == 'UnlockAvb'): + self._UnlockAvbTarget(target) + continue elif (not target.provision_state.provisioned and operation == 'Provision'): self._ProvisionTarget(target) diff --git a/at-factory-tool/atft_unittest.py b/at-factory-tool/atft_unittest.py index 122704d..187de2a 100644 --- a/at-factory-tool/atft_unittest.py +++ b/at-factory-tool/atft_unittest.py @@ -49,10 +49,6 @@ class MockAtft(atft.Atft): self.LANGUAGE = 'ENG' self.REBOOT_TIMEOUT = 1.0 self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa' - # Disable the test mode. (This mode is just for usage test, not unit test) - self.TEST_MODE = False - self.PROVISION_STEPS = ["FuseVbootKey", "FusePermAttr", "LockAvb", - "Provision"] return {} @@ -359,6 +355,9 @@ class AtftTest(unittest.TestCase): mock_atft = MockAtft() test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, ProvisionStatus.PROVISION_SUCCESS) + test_dev1.provision_state.bootloader_locked = True + test_dev1.provision_state.avb_perm_attr_set = True + test_dev1.provision_state.avb_locked = True test_dev1.provision_state.provisioned = True test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1, ProvisionStatus.IDLE) @@ -416,7 +415,9 @@ class AtftTest(unittest.TestCase): # Test atft._HandleStateTransition def MockStateChange(self, target, state): - target.provision_status = state + if ProvisionStatus.isFailed(state): + target.provision_status = state + return if state == ProvisionStatus.REBOOT_SUCCESS: target.provision_state.bootloader_locked = True if state == ProvisionStatus.FUSEATTR_SUCCESS: @@ -425,6 +426,19 @@ class AtftTest(unittest.TestCase): target.provision_state.avb_locked = True if state == ProvisionStatus.PROVISION_SUCCESS: target.provision_state.provisioned = True + if state == ProvisionStatus.UNLOCKAVB_SUCCESS: + target.provision_state.avb_locked = False + if target.provision_state.provisioned: + target.provision_status = ProvisionStatus.PROVISION_SUCCESS + return + if target.provision_state.avb_locked: + target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS + return + if target.provision_state.avb_perm_attr_set: + target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS + return + if target.provision_state.bootloader_locked: + target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS def testHandleStateTransition(self): mock_atft = MockAtft() @@ -638,6 +652,189 @@ class AtftTest(unittest.TestCase): self.assertEqual(True, test_dev1.provision_state.avb_locked) self.assertEqual(True, test_dev1.provision_state.provisioned) + def testHandleStateTransitionIncludeUnlock(self): + """Test the provision_steps that unlock avb after provisioning. + + We assume that the device would be locked avb during fuse vboot key and + we want the final state to be avb unlocked. + """ + mock_atft = MockAtft() + test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, + ProvisionStatus.WAITING) + mock_atft._FuseVbootKeyTarget = MagicMock() + mock_atft._FuseVbootKeyTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._FusePermAttrTarget = MagicMock() + mock_atft._FusePermAttrTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._ProvisionTarget = MagicMock() + mock_atft._ProvisionTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._UnlockAvbTarget = MagicMock() + mock_atft._UnlockAvbTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.UNLOCKAVB_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._LockAvbTarget = MagicMock() + mock_atft._LockAvbTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS: + self.MockStateChange(target, state)) + + mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'LockAvb', + 'Provision', 'UnlockAvb'] + mock_atft.auto_dev_serials = [self.TEST_SERIAL1] + mock_atft.auto_prov = True + mock_atft.atft_manager = MagicMock() + mock_atft.atft_manager.GetTargetDevice = MagicMock() + mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 + mock_atft._HandleStateTransition(test_dev1) + mock_atft._FuseVbootKeyTarget.assert_called_once() + mock_atft._LockAvbTarget.assert_called_once() + mock_atft._UnlockAvbTarget.assert_called_once() + mock_atft._FusePermAttrTarget.assert_called_once() + mock_atft._ProvisionTarget.assert_called_once() + self.assertEqual(True, test_dev1.provision_state.bootloader_locked) + self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) + self.assertEqual(False, test_dev1.provision_state.avb_locked) + self.assertEqual(True, test_dev1.provision_state.provisioned) + self.assertEqual( + ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status) + self.assertEqual( + True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) + + def testHandleStateTransitionLockUnlockLock(self): + mock_atft = MockAtft() + test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, + ProvisionStatus.WAITING) + mock_atft._FuseVbootKeyTarget = MagicMock() + mock_atft._FuseVbootKeyTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._FusePermAttrTarget = MagicMock() + mock_atft._FusePermAttrTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._ProvisionTarget = MagicMock() + mock_atft._ProvisionTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._UnlockAvbTarget = MagicMock() + mock_atft._UnlockAvbTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.UNLOCKAVB_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._LockAvbTarget = MagicMock() + mock_atft._LockAvbTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS: + self.MockStateChange(target, state)) + + mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'LockAvb', + 'Provision', 'UnlockAvb', 'LockAvb', 'UnlockAvb'] + mock_atft.auto_dev_serials = [self.TEST_SERIAL1] + mock_atft.auto_prov = True + mock_atft.atft_manager = MagicMock() + mock_atft.atft_manager.GetTargetDevice = MagicMock() + mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 + mock_atft._HandleStateTransition(test_dev1) + self.assertEqual(True, test_dev1.provision_state.bootloader_locked) + self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) + self.assertEqual(False, test_dev1.provision_state.avb_locked) + self.assertEqual(True, test_dev1.provision_state.provisioned) + self.assertEqual( + ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status) + self.assertEqual( + True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) + + def testHandleStateTransitionReorder(self): + """Test the provision_steps that has been reordered. + + We should make sure all the steps are executed even if they are reordered. + """ + mock_atft = MockAtft() + test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, + ProvisionStatus.WAITING) + mock_atft._FuseVbootKeyTarget = MagicMock() + mock_atft._FuseVbootKeyTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._FusePermAttrTarget = MagicMock() + mock_atft._FusePermAttrTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._ProvisionTarget = MagicMock() + mock_atft._ProvisionTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._LockAvbTarget = MagicMock() + mock_atft._LockAvbTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS: + self.MockStateChange(target, state)) + + mock_atft.PROVISION_STEPS = ['FusePermAttr', 'FuseVbootKey', 'Provision', + 'LockAvb'] + mock_atft.auto_dev_serials = [self.TEST_SERIAL1] + mock_atft.auto_prov = True + mock_atft.atft_manager = MagicMock() + mock_atft.atft_manager.GetTargetDevice = MagicMock() + mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 + mock_atft._HandleStateTransition(test_dev1) + mock_atft._FuseVbootKeyTarget.assert_called_once() + mock_atft._LockAvbTarget.assert_called_once() + mock_atft._FusePermAttrTarget.assert_called_once() + mock_atft._ProvisionTarget.assert_called_once() + self.assertEqual(True, test_dev1.provision_state.bootloader_locked) + self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) + self.assertEqual(True, test_dev1.provision_state.avb_locked) + self.assertEqual(True, test_dev1.provision_state.provisioned) + self.assertEqual( + ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status) + self.assertEqual( + True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) + + def testHandleStateTransitionNoProvision(self): + """Test the provision_steps that does not provision key. + """ + mock_atft = MockAtft() + test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, + ProvisionStatus.WAITING) + mock_atft._FuseVbootKeyTarget = MagicMock() + mock_atft._FuseVbootKeyTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._FusePermAttrTarget = MagicMock() + mock_atft._FusePermAttrTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._ProvisionTarget = MagicMock() + mock_atft._ProvisionTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS: + self.MockStateChange(target, state)) + mock_atft._LockAvbTarget = MagicMock() + mock_atft._LockAvbTarget.side_effect = ( + lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS: + self.MockStateChange(target, state)) + + mock_atft.PROVISION_STEPS = ['FusePermAttr', 'FuseVbootKey', 'LockAvb'] + mock_atft.auto_dev_serials = [self.TEST_SERIAL1] + mock_atft.auto_prov = True + mock_atft.atft_manager = MagicMock() + mock_atft.atft_manager.GetTargetDevice = MagicMock() + mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 + mock_atft._HandleStateTransition(test_dev1) + mock_atft._FuseVbootKeyTarget.assert_called_once() + mock_atft._LockAvbTarget.assert_called_once() + mock_atft._FusePermAttrTarget.assert_called_once() + mock_atft._ProvisionTarget.assert_not_called() + self.assertEqual(True, test_dev1.provision_state.bootloader_locked) + self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) + self.assertEqual(True, test_dev1.provision_state.avb_locked) + self.assertEqual(False, test_dev1.provision_state.provisioned) + self.assertEqual( + ProvisionStatus.LOCKAVB_SUCCESS, test_dev1.provision_status) + self.assertEqual( + True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) + # Test atft._CheckATFAStatus def testCheckATFAStatus(self): mock_atft = MockAtft() @@ -1213,6 +1410,144 @@ class AtftTest(unittest.TestCase): mock_atft._SendOperationStartEvent.assert_called_once() mock_atft._SendOperationSucceedEvent.assert_not_called() + def testCheckProvisionStepsSuccess(self): + mock_atft = MockAtft() + mock_atft._SendAlertEvent = MagicMock() + mock_atft.PROVISION_STEPS = [] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test [step1], [step1, step2], [step1, step2, step3] ... + for i in range(1, len(mock_atft.DEFAULT_PROVISION_STEPS)): + provision_steps = [] + for j in range(0, i): + provision_steps.append(mock_atft.DEFAULT_PROVISION_STEPS[j]) + mock_atft.PROVISION_STEPS = provision_steps + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertEqual( + provision_steps, mock_atft.PROVISION_STEPS) + + def testCheckProvisionStepsInvalidSyntax(self): + mock_atft = MockAtft() + mock_atft._SendAlertEvent = MagicMock() + # Test invalid format (not array). + mock_atft.PROVISION_STEPS = '1234' + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test invalid operation. + mock_atft.PROVISION_STEPS = ['1234'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Even if TEST_MODE is true, syntax error is still failure. + mock_atft.TEST_MODE = True + mock_atft.PROVISION_STEPS = '1234' + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + mock_atft.PROVISION_STEPS = ['1234'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + def testCheckProvisionStepsSecurityReq(self): + # Test cases when the provision steps do not meet security requirement. + mock_atft = MockAtft() + mock_atft._SendAlertEvent = MagicMock() + # Test fuse perm attr without fusing vboot key. + mock_atft.PROVISION_STEPS = ['FusePermAttr'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test fuse perm attr when already fused. + mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test LockAvb when vboot key is not fused. + mock_atft.PROVISION_STEPS = ['LockAvb'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test LockAvb when perm attr not fused. + mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'LockAvb'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test provision when perm attr not fused. + mock_atft.PROVISION_STEPS = [ + 'FuseVbootKey', 'LockAvb', 'Provision'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_called_once() + mock_atft._SendAlertEvent.reset_mock() + self.assertEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # All the tests should succeed if TEST_MODE is set to True + + # Test fuse perm attr without fusing vboot key. + mock_atft.TEST_MODE = True + mock_atft.PROVISION_STEPS = ['FusePermAttr'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertNotEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test fuse perm attr when already fused. + mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertNotEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test LockAvb when vboot key is not fused. + mock_atft.PROVISION_STEPS = ['LockAvb'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertNotEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test LockAvb when perm attr not fused. + mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'LockAvb'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertNotEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) + + # Test provision when perm attr not fused. + mock_atft.PROVISION_STEPS = [ + 'FuseVbootKey', 'LockAvb', 'Provision'] + mock_atft._CheckProvisionSteps() + mock_atft._SendAlertEvent.assert_not_called() + self.assertNotEqual( + mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS) if __name__ == '__main__': unittest.main() diff --git a/at-factory-tool/atftman.py b/at-factory-tool/atftman.py index f47dfba..d2569a8 100644 --- a/at-factory-tool/atftman.py +++ b/at-factory-tool/atftman.py @@ -65,7 +65,11 @@ class ProvisionStatus(object): LOCKAVB_FAILED = (40 + _FAILED) PROVISION_ING = (50 + _PROCESSING) PROVISION_SUCCESS = (50 + _SUCCESS) - PROVISION_FAILED = ( + _FAILED) + PROVISION_FAILED = (50 + _FAILED) + UNLOCKAVB_ING = (60 + _PROCESSING) + UNLOCKAVB_SUCCESS = (60 + _SUCCESS) + UNLOCKAVB_FAILED = (60 + _FAILED) + STRING_MAP = { IDLE : ['Idle', '初始'], @@ -85,8 +89,10 @@ class ProvisionStatus(object): LOCKAVB_FAILED : ['Lock Android Verified Boot Failed', '锁定AVB失败'], PROVISION_ING : ['Provisioning Attestation Key', '传输密钥中...'], PROVISION_SUCCESS : ['Attestation Key Provisioned', '传输密钥成功'], - PROVISION_FAILED : ['Provision Attestation Key Failed', '传输密钥失败'] - + PROVISION_FAILED : ['Provision Attestation Key Failed', '传输密钥失败'], + UNLOCKAVB_ING : ['Unlocking AVB', '解锁AVB中...'], + UNLOCKAVB_SUCCESS : ['AVB Unlocked', '已解锁AVB'], + UNLOCKAVB_FAILED : ['Unlock AVB Failed', '解锁AVB失败'] } @staticmethod @@ -107,12 +113,28 @@ class ProvisionStatus(object): class ProvisionState(object): - """The provision state of the target device.""" + """The provision state of the target device. + + Attributes: + bootloader_locked: Whether bootloader is locked. + avb_perm_attr_set: Whether permanent attribute is set. + avb_locked: Whether avb is locked. + provisioned: Whether the device has product key provisioned. + """ bootloader_locked = False avb_perm_attr_set = False avb_locked = False provisioned = False + def __eq__(self, other): + return (self.bootloader_locked == other.bootloader_locked and + self.avb_perm_attr_set == other.avb_perm_attr_set and + self.avb_locked == other.avb_locked and + self.provisioned == other.provisioned) + + def __ne__(self, other): + return not self.__eq__(other) + class ProductInfo(object): """The information about a product. @@ -267,6 +289,9 @@ class AtftManager(object): self.ATFA_REBOOT_TIMEOUT = float(configs['ATFA_REBOOT_TIMEOUT']) except ValueError: pass + self.UNLOCK_CREDENTIAL = None + if configs and 'UNLOCK_CREDENTIAL' in configs: + self.UNLOCK_CREDENTIAL = configs['UNLOCK_CREDENTIAL'] # The serial numbers for the devices that are at least seen twice. self.stable_serials = [] @@ -768,6 +793,27 @@ class AtftManager(object): target.provision_status = ProvisionStatus.LOCKAVB_FAILED raise e + def UnlockAvb(self, target): + """Unlock the android verified boot for the target. + + Args: + target: The target device. + Raises: + FastbootFailure: When fastboot command fails. + """ + try: + target.provision_status = ProvisionStatus.UNLOCKAVB_ING + unlock_command = 'at-unlock-vboot' + if self.UNLOCK_CREDENTIAL: + unlock_command += ' ' + self.UNLOCK_CREDENTIAL + target.Oem(unlock_command) + self.CheckProvisionStatus(target) + if target.provision_state.avb_locked: + raise FastbootFailure('Status not updated') + except FastbootFailure as e: + target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED + raise e + def Reboot(self, target, timeout, success_callback, timeout_callback): """Reboot the target device. diff --git a/at-factory-tool/atftman_unittest.py b/at-factory-tool/atftman_unittest.py index 0c2ea10..c101f27 100644 --- a/at-factory-tool/atftman_unittest.py +++ b/at-factory-tool/atftman_unittest.py @@ -100,6 +100,7 @@ class AtftManTest(unittest.TestCase): self.configs = {} self.configs['ATFA_REBOOT_TIMEOUT'] = 30 self.configs['DEFAULT_KEY_THRESHOLD'] = 100 + self.configs['UNLOCK_CREDENTIAL'] = None # Test AtftManager.ListDevices class MockInstantTimer(object): @@ -1172,6 +1173,61 @@ class AtftManTest(unittest.TestCase): self.assertEqual( ProvisionStatus.LOCKAVB_FAILED, mock_target.provision_status) + # Test AtftManager.LockAvb + def MockSetUnlockAvbSuccess(self, target): + target.provision_status = ProvisionStatus.UNLOCKAVB_SUCCESS + target.provision_state = ProvisionState() + target.provision_state.avb_locked = False + + def MockSetUnlockAvbFail(self, target): + target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED + target.provision_state = ProvisionState() + target.provision_state.avb_locked = True + + def testUnlockAvb(self): + atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, + self.mock_serial_mapper, self.configs) + mock_target = MagicMock() + atft_manager.CheckProvisionStatus = MagicMock() + atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess + atft_manager.UnlockAvb(mock_target) + mock_target.Oem.assert_called_once_with('at-unlock-vboot') + self.assertEqual( + ProvisionStatus.UNLOCKAVB_SUCCESS, mock_target.provision_status) + + def testUnlockAvbWithCredential(self): + atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, + self.mock_serial_mapper, self.configs) + mock_target = MagicMock() + atft_manager.CheckProvisionStatus = MagicMock() + atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess + atft_manager.UNLOCK_CREDENTIAL = 'test' + atft_manager.UnlockAvb(mock_target) + mock_target.Oem.assert_called_once_with('at-unlock-vboot test') + self.assertEqual( + ProvisionStatus.UNLOCKAVB_SUCCESS, mock_target.provision_status) + + def testUnlockAvbFail(self): + atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, + self.mock_serial_mapper, self.configs) + mock_target = MagicMock() + atft_manager.CheckProvisionStatus = MagicMock() + atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbFail + with self.assertRaises(FastbootFailure): + atft_manager.UnlockAvb(mock_target) + + def testUnlockAvbFastbootFailure(self): + atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, + self.mock_serial_mapper, self.configs) + mock_target = MagicMock() + atft_manager.CheckProvisionStatus = MagicMock() + atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess + mock_target.Oem.side_effect = FastbootFailure('') + with self.assertRaises(FastbootFailure): + atft_manager.UnlockAvb(mock_target) + self.assertEqual( + ProvisionStatus.UNLOCKAVB_FAILED, mock_target.provision_status) + # Test AtftManager.Reboot class MockTimer(object): def __init__(self, interval, callback): |