diff options
author | Yu Shan <shanyu@google.com> | 2018-10-02 11:53:14 -0700 |
---|---|---|
committer | Yu Shan <shanyu@google.com> | 2018-10-02 16:11:04 -0700 |
commit | 7712c6c2d444d61fd8aae49037c2d39456b9dbf2 (patch) | |
tree | 1a761399eb0e873f9ba66d58f37e95b1552743ff | |
parent | 8e5b70fa4285db480c6d5d048129a4c1c2b8d32a (diff) | |
download | attestation-master-cuttlefish-testing-release.tar.gz |
[ATFT] Change unit tests to use pyfakefs.master-cuttlefish-testing-release
Use pyfakefs for fake file system operation in the unit tests.
Test: All unit tests still pass.
Bug: None
Change-Id: I9254290bfdf06a821ce1899fbff300d4fcaa4da2
-rw-r--r-- | at-factory-tool/atft_unittest.py | 599 |
1 files changed, 279 insertions, 320 deletions
diff --git a/at-factory-tool/atft_unittest.py b/at-factory-tool/atft_unittest.py index 5d80a80..6b4f9ab 100644 --- a/at-factory-tool/atft_unittest.py +++ b/at-factory-tool/atft_unittest.py @@ -20,12 +20,14 @@ import atft from atftman import ProvisionStatus from atftman import ProvisionState import fastboot_exceptions +from pyfakefs.fake_filesystem_unittest import TestCase from mock import call from mock import MagicMock from mock import patch from mock import mock_open import os import sets +import shutil import wx @@ -108,7 +110,7 @@ class TestDeviceInfo(object): self.provision_status) -class AtftTest(unittest.TestCase): +class AtftTest(TestCase): TEST_SERIAL1 = 'test-serial1' TEST_LOCATION1 = 'test-location1' TEST_SERIAL2 = 'test-serial2' @@ -127,6 +129,8 @@ class AtftTest(unittest.TestCase): TEST_PASSWORD2 = 'PassWord 2!' TEST_FILENAME = 'filename' TEST_ATTEST_UUID = 'test attest uuid' + TEST_ATFA_ID1 ="ATFATEST1" + TEST_ATFA_ID2 = "ATFATEST2" def setUp(self): self.test_target_devs = [] @@ -137,6 +141,7 @@ class AtftTest(unittest.TestCase): self.test_text_window = '' self.atfa_keys = None self.device_map = {} + self.setUpPyfakefs() def AppendTargetDevice(self, device): self.test_target_devs.append(device) @@ -1947,8 +1952,7 @@ class AtftTest(unittest.TestCase): mock_atft._HandleException.assert_called_once() # Test atft._GetRegFile - @patch('__builtin__.open') - def testGetRegFile(self, my_mock_open): + def testGetRegFile(self): mock_atft = MockAtft() mock_atft.atft_manager = MagicMock() mock_atft.atft_manager.GetATFADevice = MagicMock() @@ -1961,32 +1965,17 @@ class AtftTest(unittest.TestCase): mock_atft.ResumeRefresh = MagicMock() mock_atft._HandleException = MagicMock() mock_atft._SendAlertEvent = MagicMock() - mock_path = MagicMock() - mock_path.encode.return_value = mock_path - mock_atft._GetRegFile(mock_path) - my_mock_open.assert_called_once_with(mock_path, 'w+') - mock_atfa.Upload.assert_called_once_with(mock_path) + mock_atft._GetRegFile(self.TEST_TEXT) + mock_atfa.Upload.assert_called_once_with(self.TEST_TEXT) mock_atft.atft_manager.PrepareFile.assert_called_once_with('reg') mock_atft.PauseRefresh.assert_called_once() mock_atft.ResumeRefresh.assert_called_once() mock_atft._HandleException.assert_not_called() mock_atft._SendOperationStartEvent.assert_called_once() mock_atft._SendOperationSucceedEvent.assert_called_once() - - # Cannot create file - mock_atft._HandleException.reset_mock() - mock_atft._SendOperationStartEvent.reset_mock() - mock_atft._SendOperationSucceedEvent.reset_mock() - mock_atft.PauseRefresh.reset_mock() - mock_atft.ResumeRefresh.reset_mock() - my_mock_open.side_effect = IOError - mock_atft._GetRegFile(mock_path) - mock_atft.PauseRefresh.assert_called_once() - mock_atft.ResumeRefresh.assert_called_once() - mock_atft._HandleException.assert_called_once() - mock_atft._SendOperationStartEvent.assert_called_once() - mock_atft._SendOperationSucceedEvent.assert_not_called() + self.assertEqual(True, os.path.exists(self.TEST_TEXT)) + os.remove(self.TEST_TEXT) def testGetRegFileFailure(self): self.TestGetRegFileFailureCommon( @@ -1998,40 +1987,36 @@ class AtftTest(unittest.TestCase): self.TestGetRegFileFailureCommon( fastboot_exceptions.DeviceNotFoundException, True) - def TestGetRegFileFailureCommon( - self, exception, upload_fail=False): - with patch('__builtin__.open') as my_mock_open: - mock_atft = MockAtft() - mock_atft.atft_manager = MagicMock() + def TestGetRegFileFailureCommon(self, exception, upload_fail=False): + mock_atft = MockAtft() + mock_atft.atft_manager = MagicMock() + mock_atft.atft_manager.GetATFADevice = MagicMock() + mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() + mock_atft._UpdateKeysLeftInATFA = MagicMock() + mock_atft._SendOperationStartEvent = MagicMock() + mock_atft._SendOperationSucceedEvent = MagicMock() + mock_atft.PauseRefresh = MagicMock() + mock_atft.ResumeRefresh = MagicMock() + mock_atft._HandleException = MagicMock() + mock_atft._SendAlertEvent = MagicMock() + if not upload_fail: + mock_atft.atft_manager.PrepareFile.side_effect = exception + else: mock_atft.atft_manager.GetATFADevice = MagicMock() - mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() - mock_atft._UpdateKeysLeftInATFA = MagicMock() - mock_atft._SendOperationStartEvent = MagicMock() - mock_atft._SendOperationSucceedEvent = MagicMock() - mock_atft.PauseRefresh = MagicMock() - mock_atft.ResumeRefresh = MagicMock() - mock_atft._HandleException = MagicMock() - mock_atft._SendAlertEvent = MagicMock() - mock_path = MagicMock() - mock_path.encode.return_value = mock_path - my_mock_open.side_effect = IOError - if not upload_fail: - mock_atft.atft_manager.PrepareFile.side_effect = exception - else: - mock_atft.atft_manager.GetATFADevice = MagicMock() - mock_atft.atft_manager.GetATFADevice.return_value.Upload.side_effect = exception + (mock_atft.atft_manager.GetATFADevice.return_value.Upload. + side_effect) = exception - mock_atft._GetRegFile(mock_path) + # This invalid path would cause an IOError while creating file. + mock_atft._GetRegFile('/123/123') - mock_atft.PauseRefresh.assert_called_once() - mock_atft.ResumeRefresh.assert_called_once() - mock_atft._HandleException.assert_called_once() - mock_atft._SendOperationStartEvent.assert_called_once() - mock_atft._SendOperationSucceedEvent.assert_not_called() + mock_atft.PauseRefresh.assert_called_once() + mock_atft.ResumeRefresh.assert_called_once() + mock_atft._HandleException.assert_called_once() + mock_atft._SendOperationStartEvent.assert_called_once() + mock_atft._SendOperationSucceedEvent.assert_not_called() # Test atft._GetAuditFile - @patch('__builtin__.open') - def testGetAuditFile(self, my_mock_open): + def testGetAuditFile(self): mock_atft = MockAtft() mock_atft.atft_manager = MagicMock() mock_atft.atft_manager.GetATFADevice = MagicMock() @@ -2044,32 +2029,17 @@ class AtftTest(unittest.TestCase): mock_atft.ResumeRefresh = MagicMock() mock_atft._HandleException = MagicMock() mock_atft._SendAlertEvent = MagicMock() - mock_path = MagicMock() - mock_path.encode.return_value = mock_path - mock_atft._GetAuditFile(mock_path) - my_mock_open.assert_called_once_with(mock_path, 'w+') - mock_atfa.Upload.assert_called_once_with(mock_path) + mock_atft._GetAuditFile(self.TEST_TEXT) + mock_atfa.Upload.assert_called_once_with(self.TEST_TEXT) mock_atft.atft_manager.PrepareFile.assert_called_once_with('audit') mock_atft.PauseRefresh.assert_called_once() mock_atft.ResumeRefresh.assert_called_once() mock_atft._HandleException.assert_not_called() mock_atft._SendOperationStartEvent.assert_called_once() mock_atft._SendOperationSucceedEvent.assert_called_once() - - # Cannot create file - mock_atft._HandleException.reset_mock() - mock_atft._SendOperationStartEvent.reset_mock() - mock_atft._SendOperationSucceedEvent.reset_mock() - mock_atft.PauseRefresh.reset_mock() - mock_atft.ResumeRefresh.reset_mock() - my_mock_open.side_effect = IOError - mock_atft._GetAuditFile(mock_path) - mock_atft.PauseRefresh.assert_called_once() - mock_atft.ResumeRefresh.assert_called_once() - mock_atft._HandleException.assert_called_once() - mock_atft._SendOperationStartEvent.assert_called_once() - mock_atft._SendOperationSucceedEvent.assert_not_called() + self.assertEqual(True, os.path.exists(self.TEST_TEXT)) + os.remove(self.TEST_TEXT) def testGetAuditFileFailure(self): self.TestGetAuditFileFailureCommon( @@ -2081,133 +2051,83 @@ class AtftTest(unittest.TestCase): self.TestGetAuditFileFailureCommon( fastboot_exceptions.DeviceNotFoundException, True) - def TestGetAuditFileFailureCommon( - self, exception, upload_fail=False): - with patch('__builtin__.open') as my_mock_open: - mock_atft = MockAtft() - mock_atft.atft_manager = MagicMock() + def TestGetAuditFileFailureCommon(self, exception, upload_fail=False): + mock_atft = MockAtft() + mock_atft.atft_manager = MagicMock() + mock_atft.atft_manager.GetATFADevice = MagicMock() + mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() + mock_atft._UpdateKeysLeftInATFA = MagicMock() + mock_atft._SendOperationStartEvent = MagicMock() + mock_atft._SendOperationSucceedEvent = MagicMock() + mock_atft.PauseRefresh = MagicMock() + mock_atft.ResumeRefresh = MagicMock() + mock_atft._HandleException = MagicMock() + mock_atft._SendAlertEvent = MagicMock() + if not upload_fail: + mock_atft.atft_manager.PrepareFile.side_effect = exception + else: mock_atft.atft_manager.GetATFADevice = MagicMock() - mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() - mock_atft._UpdateKeysLeftInATFA = MagicMock() - mock_atft._SendOperationStartEvent = MagicMock() - mock_atft._SendOperationSucceedEvent = MagicMock() - mock_atft.PauseRefresh = MagicMock() - mock_atft.ResumeRefresh = MagicMock() - mock_atft._HandleException = MagicMock() - mock_atft._SendAlertEvent = MagicMock() - mock_path = MagicMock() - mock_path.encode.return_value = mock_path - my_mock_open.side_effect = IOError - if not upload_fail: - mock_atft.atft_manager.PrepareFile.side_effect = exception - else: - mock_atft.atft_manager.GetATFADevice = MagicMock() - mock_atft.atft_manager.GetATFADevice.return_value.Upload.side_effect = exception + (mock_atft.atft_manager.GetATFADevice.return_value.Upload. + side_effect) = exception - mock_atft._GetAuditFile(mock_path) + # This invalid path would cause an IOError while creating file. + mock_atft._GetAuditFile('/123/123') - mock_atft.PauseRefresh.assert_called_once() - mock_atft.ResumeRefresh.assert_called_once() - mock_atft._HandleException.assert_called_once() - mock_atft._SendOperationStartEvent.assert_called_once() - mock_atft._SendOperationSucceedEvent.assert_not_called() + mock_atft.PauseRefresh.assert_called_once() + mock_atft.ResumeRefresh.assert_called_once() + mock_atft._HandleException.assert_called_once() + mock_atft._SendOperationStartEvent.assert_called_once() + mock_atft._SendOperationSucceedEvent.assert_not_called() - # Test AtftLog.Initialize() - @patch('atft.AtftLog.Info', MagicMock()) - @patch('atft.AtftLog._CreateLogFile') - @patch('os.path.isfile') - @patch('os.path.exists') - @patch('os.mkdir') - @patch('os.listdir') - @patch('__builtin__.open') - def testAtftLogCreate(self, my_mock_open, mock_listdir, mock_makedir, - mock_path_exists, mock_isfile, mock_createfile): - log_dir = self.LOG_DIR - log_size = 10 - log_file_number = 1 - mock_listdir.return_value = ['atft_log_1', 'atft_log_2'] - # Log directory not exist - mock_path_exists.return_value = False - # listdir return value is file - mock_isfile.return_value = True - atft_log = atft.AtftLog(log_dir, log_size, log_file_number) - mock_makedir.assert_called_once_with(self.LOG_DIR) - mock_createfile.assert_not_called() - self.assertEqual(atft_log.log_dir_file, os.path.join( - self.LOG_DIR, 'atft_log_2')) @patch('atft.AtftLog.Info', MagicMock()) @patch('atft.AtftLog._CreateLogFile') - @patch('os.path.isfile') - @patch('os.path.exists') - @patch('os.mkdir') - @patch('os.listdir') - @patch('__builtin__.open') - def testAtftLogCreateDirExists( - self, my_mock_open, mock_listdir, mock_makedir, - mock_path_exists, mock_isfile, mock_createfile): + def testAtftLogCreate(self, mock_createfile): + # Test AtftLog.Initialize(), log dir does not exist, create it. log_dir = self.LOG_DIR log_size = 10 log_file_number = 1 - mock_listdir.return_value = ['atft_log_1', 'atft_log_2'] - # Log directory already exists - mock_path_exists.return_value = True - # listdir return value is file - mock_isfile.return_value = True atft_log = atft.AtftLog(log_dir, log_size, log_file_number) - mock_makedir.assert_not_called() - mock_createfile.assert_not_called() + mock_createfile.assert_called_once() + self.assertEqual(os.path.exists(log_dir)) + shutil.rmtree(log_dir) @patch('atft.AtftLog.Info', MagicMock()) @patch('atft.AtftLog._CreateLogFile') - @patch('os.path.isfile') - @patch('os.path.exists') - @patch('os.mkdir') - @patch('os.listdir') - @patch('__builtin__.open') - def testAtftLogCreateFirstLog( - self, my_mock_open, mock_listdir, mock_makedir, - mock_path_exists, mock_isfile, mock_createfile): + def testAtftLogCreateDirExists(self, mock_createfile): + # Log directory exists, should check for existing log files. log_dir = self.LOG_DIR + self.fs.create_dir(log_dir) + self.fs.create_file(os.path.join(log_dir, 'atft_log_1')) + self.fs.create_file(os.path.join(log_dir, 'atft_log_2')) + log_size = 10 log_size = 10 log_file_number = 1 - mock_listdir.return_value = [] - # Log directory already exists - mock_path_exists.return_value = True - # listdir return value is file - mock_isfile.return_value = True atft_log = atft.AtftLog(log_dir, log_size, log_file_number) - mock_createfile.assert_called_once() - - # Test AtftLog._CreateLogFile() - def MockCheckPath(self, path): - if path == self.LOG_DIR: - return True - else: - return False + # Create the first log file. + mock_createfile.assert_not_called() + self.assertEqual(atft_log.log_dir_file, os.path.join( + self.LOG_DIR, 'atft_log_2')) + shutil.rmtree(log_dir) @patch('atft.AtftLog.Initialize', MagicMock()) - @patch('os.path.exists') - @patch('os.listdir') - @patch('__builtin__.open') - def testAtftLogCreate(self, my_mock_open, mock_listdir, mock_path_exists): + def testAtftLogCreate(self): log_dir = self.LOG_DIR + self.fs.create_dir(log_dir) log_size = 10 log_file_number = 1 - mock_listdir.return_value = [] - mock_path_exists.side_effect = self.MockCheckPath atft_log = atft.AtftLog(log_dir, log_size, log_file_number) atft_log.Info = MagicMock() mock_get_time = MagicMock() atft_log._GetCurrentTimestamp = mock_get_time mock_get_time.return_value = self.TEST_TIMESTAMP atft_log._CreateLogFile() - my_mock_open.assert_called_once() - log_file = my_mock_open.call_args[0][0] mock_get_time.assert_called_once() - self.assertEqual(log_file, atft_log.log_dir_file) - self.assertEqual(os.path.join( - self.LOG_DIR, 'atft_log_' + str(self.TEST_TIMESTAMP)), log_file) + log_file_path = os.path.join( + log_dir, 'atft_log_' + str(self.TEST_TIMESTAMP)) + self.assertEqual(log_file_path, atft_log.log_dir_file) + self.assertEqual(True, os.path.exists(log_file_path)) + shutil.rmtree(log_dir) # Test AtftLog._LimitSize() def MockListDir(self, dir): @@ -2216,35 +2136,31 @@ class AtftTest(unittest.TestCase): def MockCreateFile(self, add_file): self.files.append(add_file) - @patch('os.path.getsize') @patch('atft.AtftLog.Initialize', MagicMock()) - @patch('os.path.isfile') - @patch('os.path.exists') - @patch('os.listdir') - @patch('os.remove') - @patch('__builtin__.open') - def testLimitSize( - self, my_mock_open, mock_remove, mock_listdir, mock_path_exists, mock_isfile, - mock_getsize): + def testLimitSize(self): log_dir = self.LOG_DIR - log_size = 10 + # This means each file would have a maximum size of 10. + log_size = 20 log_file_number = 2 - # 1 is older than 2 - self.files = ['atft_log_1', 'atft_log_2'] - mock_listdir.side_effect = self.MockListDir - mock_isfile.return_value = True - mock_getsize.return_value = 5 + self.fs.create_dir(log_dir) + log_file1 = os.path.join(log_dir, 'atft_log_1') + log_file2 = os.path.join(log_dir, 'atft_log_2') + log_file3 = os.path.join(log_dir, 'atft_log_3') + self.fs.create_file(log_file1, contents='abcde') + self.fs.create_file(log_file2, contents='abcde') atft_log = atft.AtftLog(log_dir, log_size, log_file_number) atft_log.Info = MagicMock() - mock_createfile = MagicMock() - mock_createfile.side_effect = ( - lambda file='atft_log_3': self.MockCreateFile(file)) - atft_log._CreateLogFile = mock_createfile - # 5 + 6 should be larger than 10 - atft_log._LimitSize('abcdefg') - atft_log._CreateLogFile.assert_called_once() - mock_remove.assert_called_once_with(os.path.join( - self.LOG_DIR, 'atft_log_1')) + mock_get_time = MagicMock() + atft_log._GetCurrentTimestamp = mock_get_time + mock_get_time.return_value = 3 + atft_log.log_dir_file = log_file2 + # Now atft_log_2 should have 11 characters which is larger than 10. + # A new file should be created and atft_log_1 should be removed. + atft_log._LimitSize('abcdef') + self.assertEqual(False, os.path.exists(log_file1)) + self.assertEqual(True, os.path.exists(log_file2)) + self.assertEqual(True, os.path.exists(log_file3)) + shutil.rmtree(log_dir) # Test ChangeThresholdDialog.OnSave() def testChangeThresholdDialogSaveNormal(self): @@ -2367,40 +2283,45 @@ class AtftTest(unittest.TestCase): mock_atft._SendAlertEvent.assert_called_once() # Test _SaveFileEventHandler - @patch('os.path.isfile') - @patch('os.path.isdir') @patch('wx.DirDialog') - def testSaveFileEvent(self, mock_create_dialog, mock_isdir, mock_isfile): + def testSaveFileEvent(self, mock_create_dialog): mock_atft = MockAtft() message = self.TEST_TEXT filename = self.TEST_FILENAME callback = MagicMock() - mock_isdir.return_value = False - mock_isfile.return_value = False - mock_create_dialog.ShowModal = MagicMock() - mock_create_dialog.ShowModal.return_value = wx.ID_YES + mock_dialog = MagicMock() + mock_dialog_instance = MagicMock() + mock_create_dialog.return_value = mock_dialog + mock_dialog.__enter__.return_value = mock_dialog_instance + mock_dialog_instance.ShowModal = MagicMock() + mock_dialog_instance.ShowModal.return_value = wx.ID_YES + mock_dialog_instance.GetPath = MagicMock() + mock_dialog_instance.GetPath.return_value = self.TEST_TEXT data = mock_atft.SaveFileArg(message, filename, callback) event = MagicMock() event.GetValue.return_value = data mock_atft._SaveFileEventHandler(event) callback.assert_called_once() - @patch('os.path.isfile') - @patch('os.path.isdir') @patch('wx.DirDialog') def testSaveFileEventFileExists( - self, mock_create_dialog, mock_isdir, mock_isfile): + self, mock_create_dialog): mock_atft = MockAtft() mock_atft._ShowWarning = MagicMock() mock_atft._ShowWarning.return_value = True message = self.TEST_TEXT filename = self.TEST_FILENAME callback = MagicMock() - mock_isdir.return_value = False # File already exists, need to give a warning. - mock_isfile.return_value = True - mock_create_dialog.ShowModal = MagicMock() - mock_create_dialog.ShowModal.return_value = wx.ID_YES + self.fs.create_file(os.path.join(self.TEST_TEXT, self.TEST_FILENAME)) + mock_dialog = MagicMock() + mock_dialog_instance = MagicMock() + mock_create_dialog.return_value = mock_dialog + mock_dialog.__enter__.return_value = mock_dialog_instance + mock_dialog_instance.ShowModal = MagicMock() + mock_dialog_instance.ShowModal.return_value = wx.ID_YES + mock_dialog_instance.GetPath = MagicMock() + mock_dialog_instance.GetPath.return_value = self.TEST_TEXT data = mock_atft.SaveFileArg(message, filename, callback) event = MagicMock() event.GetValue.return_value = data @@ -2414,6 +2335,9 @@ class AtftTest(unittest.TestCase): mock_atft._SaveFileEventHandler(event) callback.assert_not_called() + # Clean the fake fs state. + os.remove(os.path.join(self.TEST_TEXT, self.TEST_FILENAME)) + def testCheckProvisionStepsSuccess(self): mock_atft = MockAtft() mock_atft._SendAlertEvent = MagicMock() @@ -2558,31 +2482,20 @@ class AtftTest(unittest.TestCase): self.mock_time += 1 return str(self.mock_time - 1) - def CreateAuditFiles(self, file_path, file_type, show_alert, blocking): - self.mock_audit_files.append(os.path.basename(file_path)) + def CreateAuditFiles(self, filepath, file_type, show_alert, blocking): + self.fs.create_file(filepath) return True - def RemoveAuditFiles(self, file_path): - self.mock_audit_files.remove(os.path.basename(file_path)) - - @patch('os.mkdir') @patch('datetime.datetime') - @patch('os.path.isfile') - @patch('os.remove') - @patch('os.listdir') - def testAtftAudit( - self, mock_listdir, mock_remove, mock_isfile, mock_datetime, - mock_makedir): + def testAtftAudit(self, mock_datetime): download_interval = 2 get_file_handler = MagicMock() get_file_handler.side_effect = self.CreateAuditFiles get_atfa_serial = MagicMock() get_atfa_serial.return_value = self.TEST_SERIAL1 - mock_remove.side_effect = self.RemoveAuditFiles mock_time = MagicMock() mock_datetime.utcnow.return_value = mock_time mock_time.strftime.side_effect = self.IncreaseMockTime - mock_isfile.return_value = True handle_exception_handler = MagicMock() audit_file_path_0 = os.path.join( @@ -2594,9 +2507,7 @@ class AtftTest(unittest.TestCase): audit_file_path_3 = os.path.join( self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit') - self.mock_audit_files = [] self.mock_time = 0 - mock_listdir.return_value = self.mock_audit_files test_audit = atft.AtftAudit( self.AUDIT_DIR, @@ -2604,52 +2515,58 @@ class AtftTest(unittest.TestCase): get_file_handler, handle_exception_handler, get_atfa_serial) - mock_makedir.assert_called_with(self.AUDIT_DIR) + + self.assertEqual(True, os.path.exists(self.AUDIT_DIR)) test_audit.PullAudit(10) get_file_handler.assert_called_once_with( audit_file_path_0, 'audit', False, True) - mock_remove.assert_not_called() get_file_handler.reset_mock() + self.assertEqual(True, os.path.isfile(audit_file_path_0)) + test_audit.PullAudit(9) get_file_handler.assert_not_called() + self.assertEqual(True, os.path.isfile(audit_file_path_0)) + test_audit.PullAudit(8) get_file_handler.assert_called_once_with( audit_file_path_1, 'audit', False, True) - mock_remove.assert_called_once_with(audit_file_path_0) - mock_remove.reset_mock() + self.assertEqual(False, os.path.isfile(audit_file_path_0)) + self.assertEqual(True, os.path.isfile(audit_file_path_1)) + get_file_handler.reset_mock() test_audit.PullAudit(7) get_file_handler.assert_not_called() + self.assertEqual(False, os.path.isfile(audit_file_path_0)) + self.assertEqual(True, os.path.isfile(audit_file_path_1)) + test_audit.PullAudit(6) get_file_handler.assert_called_once_with( audit_file_path_2, 'audit', False, True) get_file_handler.reset_mock() - mock_remove.assert_called_once_with(audit_file_path_1) - mock_remove.reset_mock() + self.assertEqual(False, os.path.isfile(audit_file_path_1)) + self.assertEqual(True, os.path.isfile(audit_file_path_2)) + test_audit.ResetKeysLeft() test_audit.PullAudit(10) get_file_handler.assert_called_once_with( audit_file_path_3, 'audit', False, True) - mock_remove.assert_called_once_with(audit_file_path_2) + self.assertEqual(False, os.path.isfile(audit_file_path_2)) + self.assertEqual(True, os.path.isfile(audit_file_path_3)) + + # Clear state + shutil.rmtree(self.AUDIT_DIR) - @patch('os.mkdir') @patch('datetime.datetime') - @patch('os.path.isfile') - @patch('os.remove') - @patch('os.listdir') - def testAtftAuditRemoveMultipleFiles( - self, mock_listdir, mock_remove, mock_isfile, mock_datetime, - mock_makedir): + def testAtftAuditRemoveMultipleFiles(self, mock_datetime): + # If more than one files for one ATFA left, must remove them all. download_interval = 2 get_file_handler = MagicMock() get_file_handler.side_effect = self.CreateAuditFiles get_atfa_serial = MagicMock() get_atfa_serial.return_value = self.TEST_SERIAL1 - mock_remove.side_effect = self.RemoveAuditFiles mock_time = MagicMock() mock_datetime.utcnow.return_value = mock_time mock_time.strftime.side_effect = self.IncreaseMockTime - mock_isfile.return_value = True handle_exception_handler = MagicMock() audit_file_path_0 = os.path.join( @@ -2661,13 +2578,13 @@ class AtftTest(unittest.TestCase): audit_file_path_3 = os.path.join( self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit') - # If more than one files for one ATFA left, must remove them all. - self.mock_audit_files = [ + mock_audit_files = [ self.TEST_SERIAL1 + '_0.audit', self.TEST_SERIAL1 + '_1.audit', self.TEST_SERIAL1 + '_2.audit'] self.mock_time = 3 - mock_listdir.return_value = self.mock_audit_files + for mock_audit_file in mock_audit_files: + self.fs.create_file(os.path.join(self.AUDIT_DIR, mock_audit_file)) test_audit = atft.AtftAudit( self.AUDIT_DIR, @@ -2678,33 +2595,27 @@ class AtftTest(unittest.TestCase): test_audit.PullAudit(10) get_file_handler.assert_called_once_with( audit_file_path_3, 'audit', False, True) - mock_remove.assert_has_calls([ - call(audit_file_path_0), - call(audit_file_path_1), - call(audit_file_path_2)]) + self.assertEqual(False, os.path.isfile(audit_file_path_0)) + self.assertEqual(False, os.path.isfile(audit_file_path_1)) + self.assertEqual(False, os.path.isfile(audit_file_path_2)) + self.assertEqual(True, os.path.isfile(audit_file_path_3)) + # Clear state + shutil.rmtree(self.AUDIT_DIR) - @patch('os.mkdir') @patch('datetime.datetime') - @patch('os.path.isfile') - @patch('os.remove') - @patch('os.listdir') - def testAtftAuditRemoveMultipleFiles( - self, mock_listdir, mock_remove, mock_isfile, mock_datetime, - mock_makedir): + def testAtftAuditRemoveMultipleFilesFailure(self, mock_datetime): + # If get file fails, must not remove file. download_interval = 2 get_file_handler = MagicMock() + # Get file handler fails. + get_file_handler.return_value = False get_atfa_serial = MagicMock() get_atfa_serial.return_value = self.TEST_SERIAL1 - mock_remove.side_effect = self.RemoveAuditFiles mock_time = MagicMock() mock_datetime.utcnow.return_value = mock_time mock_time.strftime.side_effect = self.IncreaseMockTime - mock_isfile.return_value = True handle_exception_handler = MagicMock() - # If get file fails, must not remove file. - get_file_handler.return_value = False - audit_file_path_0 = os.path.join( self.AUDIT_DIR, self.TEST_SERIAL1 + '_0.audit') audit_file_path_1 = os.path.join( @@ -2714,12 +2625,13 @@ class AtftTest(unittest.TestCase): audit_file_path_3 = os.path.join( self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit') - self.mock_audit_files = [ + mock_audit_files = [ self.TEST_SERIAL1 + '_0.audit', self.TEST_SERIAL1 + '_1.audit', self.TEST_SERIAL1 + '_2.audit'] self.mock_time = 3 - mock_listdir.return_value = self.mock_audit_files + for mock_audit_file in mock_audit_files: + self.fs.create_file(os.path.join(self.AUDIT_DIR, mock_audit_file)) test_audit = atft.AtftAudit( self.AUDIT_DIR, @@ -2730,26 +2642,23 @@ class AtftTest(unittest.TestCase): test_audit.PullAudit(10) get_file_handler.assert_called_once_with( audit_file_path_3, 'audit', False, True) - mock_remove.assert_not_called() + self.assertEqual(True, os.path.isfile(audit_file_path_0)) + self.assertEqual(True, os.path.isfile(audit_file_path_1)) + self.assertEqual(True, os.path.isfile(audit_file_path_2)) + self.assertEqual(False, os.path.isfile(audit_file_path_3)) + # Clear state + shutil.rmtree(self.AUDIT_DIR) - @patch('os.mkdir') @patch('datetime.datetime') - @patch('os.path.isfile') - @patch('os.remove') - @patch('os.listdir') - def testAtftAuditMultipleATFAs( - self, mock_listdir, mock_remove, mock_isfile, mock_datetime, - mock_makedir): + def testAtftAuditMultipleATFAs(self, mock_datetime): download_interval = 2 get_file_handler = MagicMock() get_file_handler.side_effect = self.CreateAuditFiles get_atfa_serial = MagicMock() get_atfa_serial.return_value = self.TEST_SERIAL1 - mock_remove.side_effect = self.RemoveAuditFiles mock_time = MagicMock() mock_datetime.utcnow.return_value = mock_time mock_time.strftime.side_effect = self.IncreaseMockTime - mock_isfile.return_value = True handle_exception_handler = MagicMock() audit_file_path_1_0 = os.path.join( @@ -2761,9 +2670,7 @@ class AtftTest(unittest.TestCase): audit_file_path_2_3 = os.path.join( self.AUDIT_DIR, self.TEST_SERIAL2 + '_3.audit') - self.mock_audit_files = [] self.mock_time = 0 - mock_listdir.return_value = self.mock_audit_files test_audit = atft.AtftAudit( self.AUDIT_DIR, @@ -2774,8 +2681,8 @@ class AtftTest(unittest.TestCase): test_audit.PullAudit(10) get_file_handler.assert_called_once_with( audit_file_path_1_0, 'audit', False, True) - mock_remove.assert_not_called() get_file_handler.reset_mock() + self.assertEqual(True, os.path.isfile(audit_file_path_1_0)) # Insert a new ATFA test_audit.ResetKeysLeft() @@ -2783,8 +2690,9 @@ class AtftTest(unittest.TestCase): test_audit.PullAudit(10) get_file_handler.assert_called_once_with( audit_file_path_2_1, 'audit', False, True) - mock_remove.assert_not_called() get_file_handler.reset_mock() + self.assertEqual(True, os.path.isfile(audit_file_path_1_0)) + self.assertEqual(True, os.path.isfile(audit_file_path_2_1)) # Insert the old one back test_audit.ResetKeysLeft() @@ -2792,9 +2700,10 @@ class AtftTest(unittest.TestCase): test_audit.PullAudit(9) get_file_handler.assert_called_once_with( audit_file_path_1_2, 'audit', False, True) - mock_remove.assert_called_once_with(audit_file_path_1_0) - mock_remove.reset_mock() get_file_handler.reset_mock() + self.assertEqual(False, os.path.isfile(audit_file_path_1_0)) + self.assertEqual(True, os.path.isfile(audit_file_path_1_2)) + self.assertEqual(True, os.path.isfile(audit_file_path_2_1)) # Insert ATFA2 back test_audit.ResetKeysLeft() @@ -2802,33 +2711,27 @@ class AtftTest(unittest.TestCase): test_audit.PullAudit(9) get_file_handler.assert_called_once_with( audit_file_path_2_3, 'audit', False, True) - mock_remove.assert_called_once_with(audit_file_path_2_1) - mock_remove.reset_mock() get_file_handler.reset_mock() + self.assertEqual(False, os.path.isfile(audit_file_path_1_0)) + self.assertEqual(True, os.path.isfile(audit_file_path_1_2)) + self.assertEqual(False, os.path.isfile(audit_file_path_2_1)) + self.assertEqual(True, os.path.isfile(audit_file_path_2_3)) + # Clear state + shutil.rmtree(self.AUDIT_DIR) - @patch('os.mkdir') @patch('datetime.datetime') - @patch('os.path.isfile') - @patch('os.remove') - @patch('os.listdir') - def testAtftAuditFastbootFailure( - self, mock_listdir, mock_remove, mock_isfile, mock_datetime, - mock_makedir): + def testAtftAuditFastbootFailure(self, mock_datetime): download_interval = 2 get_file_handler = MagicMock() get_file_handler.side_effect = self.CreateAuditFiles get_atfa_serial = MagicMock() get_atfa_serial.side_effect = fastboot_exceptions.FastbootFailure('') - mock_remove.side_effect = self.RemoveAuditFiles mock_time = MagicMock() mock_datetime.utcnow.return_value = mock_time mock_time.strftime.side_effect = self.IncreaseMockTime - mock_isfile.return_value = True handle_exception_handler = MagicMock() - self.mock_audit_files = [] self.mock_time = 0 - mock_listdir.return_value = self.mock_audit_files test_audit = atft.AtftAudit( self.AUDIT_DIR, @@ -2840,6 +2743,9 @@ class AtftTest(unittest.TestCase): get_file_handler.assert_not_called() handle_exception_handler.assert_called_once() + # Clear state + shutil.rmtree(self.AUDIT_DIR) + def testAutoMapUSBLocationToSlot(self): mock_atft = MockAtft() mock_atft.device_usb_locations = [] @@ -3053,71 +2959,124 @@ class AtftTest(unittest.TestCase): mock_atft._SendAlertEvent.assert_called_once() mock_atft.SendUpdateMappingEvent.assert_not_called() - @patch('os.mkdir') - @patch('os.path.exists') - @patch('os.path.isfile') - @patch('os.listdir') @patch('threading.Timer') - @patch('__builtin__.open') def testProcessKeyFileAutomatically(self, - mock_create_timer, - mock_open, - mock_listdir, - mock_isfile, - mock_exists, - mock_mkdir): + mock_create_timer): + # Create fake folders. + self.fs.create_dir(self.LOG_DIR) + self.fs.create_dir(self.KEY_DIR) key_extension = '*.atfa' - key1 = self.TEST_TEXT + '_1.atfa' - key2 = self.TEST_TEXT + '_2.atfa' + key1 = self.TEST_ATFA_ID1 + '_1234.atfa' + key2 = self.TEST_ATFA_ID2 + '_1234.atfa' + key3 = self.TEST_ATFA_ID2 + '_2345.atfa' process_key_handler = MagicMock() handle_exception_handler = MagicMock() get_atfa_serial = MagicMock() - mock_isfile.return_value = True - mock_exists.return_value = True atft_key_handler = atft.AtftKeyHandler(self.KEY_DIR, self.LOG_DIR, key_extension, process_key_handler, handle_exception_handler, get_atfa_serial) - atft_key_handler.processed_keys = {} + + self.fs.create_file(os.path.join(self.KEY_DIR, key1)) + get_atfa_serial.return_value = self.TEST_ATFA_ID1 atft_key_handler.key_dir = self.KEY_DIR - mock_listdir.return_value = [key1] - get_atfa_serial.return_value = self.TEST_TEXT - atft_key_handler.ProcessKeyFile() + atft_key_handler.StartProcessKey() process_key_handler.assert_called_once_with( os.path.join(self.KEY_DIR, key1), True) mock_create_timer.assert_called_once() - mock_open.assert_called_once() - self.assertEqual(1, len(atft_key_handler.processed_keys[self.TEST_TEXT])) - self.assertEqual(True, - key1 in atft_key_handler.processed_keys[self.TEST_TEXT]) + self.assertEqual( + True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys) + self.assertEqual( + 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1])) + self.assertEqual( + True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1]) + + # If the key file is not for this ATFA, do not process it. + get_atfa_serial.return_value = self.TEST_ATFA_ID2 + process_key_handler.reset_mock() + atft_key_handler.ProcessKeyFile() + process_key_handler.assert_not_called() # If the error is DeviceNotFound, than don't record it to processed keys. - mock_listdir.return_value = [key2] + get_atfa_serial.return_value = self.TEST_ATFA_ID2 + self.fs.create_file(os.path.join(self.KEY_DIR, key2)) process_key_handler.reset_mock() process_key_handler.side_effect = ( fastboot_exceptions.DeviceNotFoundException) - mock_listdir.return_value = [key2] atft_key_handler.ProcessKeyFile() process_key_handler.assert_called_once() - self.assertEqual(1, len(atft_key_handler.processed_keys[self.TEST_TEXT])) - self.assertEqual(False, - key2 in atft_key_handler.processed_keys[self.TEST_TEXT]) + self.assertEqual( + False, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) # If the error is FastbootFailure, do not add it to processed keys. - mock_listdir.return_value = [key2] process_key_handler.reset_mock() process_key_handler.side_effect = ( fastboot_exceptions.FastbootFailure('')) - mock_listdir.return_value = [key2] atft_key_handler.ProcessKeyFile() process_key_handler.assert_called_once() - self.assertEqual(1, len(atft_key_handler.processed_keys[self.TEST_TEXT])) - self.assertEqual(False, - key2 in atft_key_handler.processed_keys[self.TEST_TEXT]) + self.assertEqual( + False, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) handle_exception_handler.assert_called_once() + # No error happens, add key2 to processed keys for ATFA_ID2 + process_key_handler.side_effect = None + process_key_handler.reset_mock() + handle_exception_handler.reset_mock() + atft_key_handler.ProcessKeyFile() + process_key_handler.assert_called_once() + self.assertEqual( + True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) + self.assertEqual( + 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2])) + self.assertEqual( + True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) + handle_exception_handler.assert_not_called() + + # If another key is added to the folder, process it. + self.fs.create_file(os.path.join(self.KEY_DIR, key3)) + process_key_handler.side_effect = None + process_key_handler.reset_mock() + handle_exception_handler.reset_mock() + atft_key_handler.ProcessKeyFile() + process_key_handler.assert_called_once() + self.assertEqual( + True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) + self.assertEqual( + 2, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2])) + self.assertEqual( + True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) + self.assertEqual( + True, key3 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) + handle_exception_handler.assert_not_called() + + # Assume the user opens the tool again, processed status should be recovered + # from the log. + handle_exception_handler.reset_mock() + process_key_handler.reset_mock() + atft_key_handler.StartProcessKey() + process_key_handler.assert_not_called() + handle_exception_handler.assert_not_called() + self.assertEqual( + True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys) + self.assertEqual( + 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1])) + self.assertEqual( + True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1]) + self.assertEqual( + True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) + self.assertEqual( + 2, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2])) + self.assertEqual( + True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) + self.assertEqual( + True, key3 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) + + # Clear fake fs state + shutil.rmtree(self.KEY_DIR) + shutil.rmtree(self.LOG_DIR) + if __name__ == '__main__': unittest.main() |