summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYu Shan <shanyu@google.com>2018-10-02 11:53:14 -0700
committerYu Shan <shanyu@google.com>2018-10-02 16:11:04 -0700
commit7712c6c2d444d61fd8aae49037c2d39456b9dbf2 (patch)
tree1a761399eb0e873f9ba66d58f37e95b1552743ff
parent8e5b70fa4285db480c6d5d048129a4c1c2b8d32a (diff)
downloadattestation-master-cuttlefish-testing-release.tar.gz
[ATFT] Change unit tests to use pyfakefs.master-cuttlefish-testing-release
Use pyfakefs for fake file system operation in the unit tests. Test: All unit tests still pass. Bug: None Change-Id: I9254290bfdf06a821ce1899fbff300d4fcaa4da2
-rw-r--r--at-factory-tool/atft_unittest.py599
1 files changed, 279 insertions, 320 deletions
diff --git a/at-factory-tool/atft_unittest.py b/at-factory-tool/atft_unittest.py
index 5d80a80..6b4f9ab 100644
--- a/at-factory-tool/atft_unittest.py
+++ b/at-factory-tool/atft_unittest.py
@@ -20,12 +20,14 @@ import atft
from atftman import ProvisionStatus
from atftman import ProvisionState
import fastboot_exceptions
+from pyfakefs.fake_filesystem_unittest import TestCase
from mock import call
from mock import MagicMock
from mock import patch
from mock import mock_open
import os
import sets
+import shutil
import wx
@@ -108,7 +110,7 @@ class TestDeviceInfo(object):
self.provision_status)
-class AtftTest(unittest.TestCase):
+class AtftTest(TestCase):
TEST_SERIAL1 = 'test-serial1'
TEST_LOCATION1 = 'test-location1'
TEST_SERIAL2 = 'test-serial2'
@@ -127,6 +129,8 @@ class AtftTest(unittest.TestCase):
TEST_PASSWORD2 = 'PassWord 2!'
TEST_FILENAME = 'filename'
TEST_ATTEST_UUID = 'test attest uuid'
+ TEST_ATFA_ID1 ="ATFATEST1"
+ TEST_ATFA_ID2 = "ATFATEST2"
def setUp(self):
self.test_target_devs = []
@@ -137,6 +141,7 @@ class AtftTest(unittest.TestCase):
self.test_text_window = ''
self.atfa_keys = None
self.device_map = {}
+ self.setUpPyfakefs()
def AppendTargetDevice(self, device):
self.test_target_devs.append(device)
@@ -1947,8 +1952,7 @@ class AtftTest(unittest.TestCase):
mock_atft._HandleException.assert_called_once()
# Test atft._GetRegFile
- @patch('__builtin__.open')
- def testGetRegFile(self, my_mock_open):
+ def testGetRegFile(self):
mock_atft = MockAtft()
mock_atft.atft_manager = MagicMock()
mock_atft.atft_manager.GetATFADevice = MagicMock()
@@ -1961,32 +1965,17 @@ class AtftTest(unittest.TestCase):
mock_atft.ResumeRefresh = MagicMock()
mock_atft._HandleException = MagicMock()
mock_atft._SendAlertEvent = MagicMock()
- mock_path = MagicMock()
- mock_path.encode.return_value = mock_path
- mock_atft._GetRegFile(mock_path)
- my_mock_open.assert_called_once_with(mock_path, 'w+')
- mock_atfa.Upload.assert_called_once_with(mock_path)
+ mock_atft._GetRegFile(self.TEST_TEXT)
+ mock_atfa.Upload.assert_called_once_with(self.TEST_TEXT)
mock_atft.atft_manager.PrepareFile.assert_called_once_with('reg')
mock_atft.PauseRefresh.assert_called_once()
mock_atft.ResumeRefresh.assert_called_once()
mock_atft._HandleException.assert_not_called()
mock_atft._SendOperationStartEvent.assert_called_once()
mock_atft._SendOperationSucceedEvent.assert_called_once()
-
- # Cannot create file
- mock_atft._HandleException.reset_mock()
- mock_atft._SendOperationStartEvent.reset_mock()
- mock_atft._SendOperationSucceedEvent.reset_mock()
- mock_atft.PauseRefresh.reset_mock()
- mock_atft.ResumeRefresh.reset_mock()
- my_mock_open.side_effect = IOError
- mock_atft._GetRegFile(mock_path)
- mock_atft.PauseRefresh.assert_called_once()
- mock_atft.ResumeRefresh.assert_called_once()
- mock_atft._HandleException.assert_called_once()
- mock_atft._SendOperationStartEvent.assert_called_once()
- mock_atft._SendOperationSucceedEvent.assert_not_called()
+ self.assertEqual(True, os.path.exists(self.TEST_TEXT))
+ os.remove(self.TEST_TEXT)
def testGetRegFileFailure(self):
self.TestGetRegFileFailureCommon(
@@ -1998,40 +1987,36 @@ class AtftTest(unittest.TestCase):
self.TestGetRegFileFailureCommon(
fastboot_exceptions.DeviceNotFoundException, True)
- def TestGetRegFileFailureCommon(
- self, exception, upload_fail=False):
- with patch('__builtin__.open') as my_mock_open:
- mock_atft = MockAtft()
- mock_atft.atft_manager = MagicMock()
+ def TestGetRegFileFailureCommon(self, exception, upload_fail=False):
+ mock_atft = MockAtft()
+ mock_atft.atft_manager = MagicMock()
+ mock_atft.atft_manager.GetATFADevice = MagicMock()
+ mock_atft.atft_manager.GetATFADevice.return_value = MagicMock()
+ mock_atft._UpdateKeysLeftInATFA = MagicMock()
+ mock_atft._SendOperationStartEvent = MagicMock()
+ mock_atft._SendOperationSucceedEvent = MagicMock()
+ mock_atft.PauseRefresh = MagicMock()
+ mock_atft.ResumeRefresh = MagicMock()
+ mock_atft._HandleException = MagicMock()
+ mock_atft._SendAlertEvent = MagicMock()
+ if not upload_fail:
+ mock_atft.atft_manager.PrepareFile.side_effect = exception
+ else:
mock_atft.atft_manager.GetATFADevice = MagicMock()
- mock_atft.atft_manager.GetATFADevice.return_value = MagicMock()
- mock_atft._UpdateKeysLeftInATFA = MagicMock()
- mock_atft._SendOperationStartEvent = MagicMock()
- mock_atft._SendOperationSucceedEvent = MagicMock()
- mock_atft.PauseRefresh = MagicMock()
- mock_atft.ResumeRefresh = MagicMock()
- mock_atft._HandleException = MagicMock()
- mock_atft._SendAlertEvent = MagicMock()
- mock_path = MagicMock()
- mock_path.encode.return_value = mock_path
- my_mock_open.side_effect = IOError
- if not upload_fail:
- mock_atft.atft_manager.PrepareFile.side_effect = exception
- else:
- mock_atft.atft_manager.GetATFADevice = MagicMock()
- mock_atft.atft_manager.GetATFADevice.return_value.Upload.side_effect = exception
+ (mock_atft.atft_manager.GetATFADevice.return_value.Upload.
+ side_effect) = exception
- mock_atft._GetRegFile(mock_path)
+ # This invalid path would cause an IOError while creating file.
+ mock_atft._GetRegFile('/123/123')
- mock_atft.PauseRefresh.assert_called_once()
- mock_atft.ResumeRefresh.assert_called_once()
- mock_atft._HandleException.assert_called_once()
- mock_atft._SendOperationStartEvent.assert_called_once()
- mock_atft._SendOperationSucceedEvent.assert_not_called()
+ mock_atft.PauseRefresh.assert_called_once()
+ mock_atft.ResumeRefresh.assert_called_once()
+ mock_atft._HandleException.assert_called_once()
+ mock_atft._SendOperationStartEvent.assert_called_once()
+ mock_atft._SendOperationSucceedEvent.assert_not_called()
# Test atft._GetAuditFile
- @patch('__builtin__.open')
- def testGetAuditFile(self, my_mock_open):
+ def testGetAuditFile(self):
mock_atft = MockAtft()
mock_atft.atft_manager = MagicMock()
mock_atft.atft_manager.GetATFADevice = MagicMock()
@@ -2044,32 +2029,17 @@ class AtftTest(unittest.TestCase):
mock_atft.ResumeRefresh = MagicMock()
mock_atft._HandleException = MagicMock()
mock_atft._SendAlertEvent = MagicMock()
- mock_path = MagicMock()
- mock_path.encode.return_value = mock_path
- mock_atft._GetAuditFile(mock_path)
- my_mock_open.assert_called_once_with(mock_path, 'w+')
- mock_atfa.Upload.assert_called_once_with(mock_path)
+ mock_atft._GetAuditFile(self.TEST_TEXT)
+ mock_atfa.Upload.assert_called_once_with(self.TEST_TEXT)
mock_atft.atft_manager.PrepareFile.assert_called_once_with('audit')
mock_atft.PauseRefresh.assert_called_once()
mock_atft.ResumeRefresh.assert_called_once()
mock_atft._HandleException.assert_not_called()
mock_atft._SendOperationStartEvent.assert_called_once()
mock_atft._SendOperationSucceedEvent.assert_called_once()
-
- # Cannot create file
- mock_atft._HandleException.reset_mock()
- mock_atft._SendOperationStartEvent.reset_mock()
- mock_atft._SendOperationSucceedEvent.reset_mock()
- mock_atft.PauseRefresh.reset_mock()
- mock_atft.ResumeRefresh.reset_mock()
- my_mock_open.side_effect = IOError
- mock_atft._GetAuditFile(mock_path)
- mock_atft.PauseRefresh.assert_called_once()
- mock_atft.ResumeRefresh.assert_called_once()
- mock_atft._HandleException.assert_called_once()
- mock_atft._SendOperationStartEvent.assert_called_once()
- mock_atft._SendOperationSucceedEvent.assert_not_called()
+ self.assertEqual(True, os.path.exists(self.TEST_TEXT))
+ os.remove(self.TEST_TEXT)
def testGetAuditFileFailure(self):
self.TestGetAuditFileFailureCommon(
@@ -2081,133 +2051,83 @@ class AtftTest(unittest.TestCase):
self.TestGetAuditFileFailureCommon(
fastboot_exceptions.DeviceNotFoundException, True)
- def TestGetAuditFileFailureCommon(
- self, exception, upload_fail=False):
- with patch('__builtin__.open') as my_mock_open:
- mock_atft = MockAtft()
- mock_atft.atft_manager = MagicMock()
+ def TestGetAuditFileFailureCommon(self, exception, upload_fail=False):
+ mock_atft = MockAtft()
+ mock_atft.atft_manager = MagicMock()
+ mock_atft.atft_manager.GetATFADevice = MagicMock()
+ mock_atft.atft_manager.GetATFADevice.return_value = MagicMock()
+ mock_atft._UpdateKeysLeftInATFA = MagicMock()
+ mock_atft._SendOperationStartEvent = MagicMock()
+ mock_atft._SendOperationSucceedEvent = MagicMock()
+ mock_atft.PauseRefresh = MagicMock()
+ mock_atft.ResumeRefresh = MagicMock()
+ mock_atft._HandleException = MagicMock()
+ mock_atft._SendAlertEvent = MagicMock()
+ if not upload_fail:
+ mock_atft.atft_manager.PrepareFile.side_effect = exception
+ else:
mock_atft.atft_manager.GetATFADevice = MagicMock()
- mock_atft.atft_manager.GetATFADevice.return_value = MagicMock()
- mock_atft._UpdateKeysLeftInATFA = MagicMock()
- mock_atft._SendOperationStartEvent = MagicMock()
- mock_atft._SendOperationSucceedEvent = MagicMock()
- mock_atft.PauseRefresh = MagicMock()
- mock_atft.ResumeRefresh = MagicMock()
- mock_atft._HandleException = MagicMock()
- mock_atft._SendAlertEvent = MagicMock()
- mock_path = MagicMock()
- mock_path.encode.return_value = mock_path
- my_mock_open.side_effect = IOError
- if not upload_fail:
- mock_atft.atft_manager.PrepareFile.side_effect = exception
- else:
- mock_atft.atft_manager.GetATFADevice = MagicMock()
- mock_atft.atft_manager.GetATFADevice.return_value.Upload.side_effect = exception
+ (mock_atft.atft_manager.GetATFADevice.return_value.Upload.
+ side_effect) = exception
- mock_atft._GetAuditFile(mock_path)
+ # This invalid path would cause an IOError while creating file.
+ mock_atft._GetAuditFile('/123/123')
- mock_atft.PauseRefresh.assert_called_once()
- mock_atft.ResumeRefresh.assert_called_once()
- mock_atft._HandleException.assert_called_once()
- mock_atft._SendOperationStartEvent.assert_called_once()
- mock_atft._SendOperationSucceedEvent.assert_not_called()
+ mock_atft.PauseRefresh.assert_called_once()
+ mock_atft.ResumeRefresh.assert_called_once()
+ mock_atft._HandleException.assert_called_once()
+ mock_atft._SendOperationStartEvent.assert_called_once()
+ mock_atft._SendOperationSucceedEvent.assert_not_called()
- # Test AtftLog.Initialize()
- @patch('atft.AtftLog.Info', MagicMock())
- @patch('atft.AtftLog._CreateLogFile')
- @patch('os.path.isfile')
- @patch('os.path.exists')
- @patch('os.mkdir')
- @patch('os.listdir')
- @patch('__builtin__.open')
- def testAtftLogCreate(self, my_mock_open, mock_listdir, mock_makedir,
- mock_path_exists, mock_isfile, mock_createfile):
- log_dir = self.LOG_DIR
- log_size = 10
- log_file_number = 1
- mock_listdir.return_value = ['atft_log_1', 'atft_log_2']
- # Log directory not exist
- mock_path_exists.return_value = False
- # listdir return value is file
- mock_isfile.return_value = True
- atft_log = atft.AtftLog(log_dir, log_size, log_file_number)
- mock_makedir.assert_called_once_with(self.LOG_DIR)
- mock_createfile.assert_not_called()
- self.assertEqual(atft_log.log_dir_file, os.path.join(
- self.LOG_DIR, 'atft_log_2'))
@patch('atft.AtftLog.Info', MagicMock())
@patch('atft.AtftLog._CreateLogFile')
- @patch('os.path.isfile')
- @patch('os.path.exists')
- @patch('os.mkdir')
- @patch('os.listdir')
- @patch('__builtin__.open')
- def testAtftLogCreateDirExists(
- self, my_mock_open, mock_listdir, mock_makedir,
- mock_path_exists, mock_isfile, mock_createfile):
+ def testAtftLogCreate(self, mock_createfile):
+ # Test AtftLog.Initialize(), log dir does not exist, create it.
log_dir = self.LOG_DIR
log_size = 10
log_file_number = 1
- mock_listdir.return_value = ['atft_log_1', 'atft_log_2']
- # Log directory already exists
- mock_path_exists.return_value = True
- # listdir return value is file
- mock_isfile.return_value = True
atft_log = atft.AtftLog(log_dir, log_size, log_file_number)
- mock_makedir.assert_not_called()
- mock_createfile.assert_not_called()
+ mock_createfile.assert_called_once()
+ self.assertEqual(os.path.exists(log_dir))
+ shutil.rmtree(log_dir)
@patch('atft.AtftLog.Info', MagicMock())
@patch('atft.AtftLog._CreateLogFile')
- @patch('os.path.isfile')
- @patch('os.path.exists')
- @patch('os.mkdir')
- @patch('os.listdir')
- @patch('__builtin__.open')
- def testAtftLogCreateFirstLog(
- self, my_mock_open, mock_listdir, mock_makedir,
- mock_path_exists, mock_isfile, mock_createfile):
+ def testAtftLogCreateDirExists(self, mock_createfile):
+ # Log directory exists, should check for existing log files.
log_dir = self.LOG_DIR
+ self.fs.create_dir(log_dir)
+ self.fs.create_file(os.path.join(log_dir, 'atft_log_1'))
+ self.fs.create_file(os.path.join(log_dir, 'atft_log_2'))
+ log_size = 10
log_size = 10
log_file_number = 1
- mock_listdir.return_value = []
- # Log directory already exists
- mock_path_exists.return_value = True
- # listdir return value is file
- mock_isfile.return_value = True
atft_log = atft.AtftLog(log_dir, log_size, log_file_number)
- mock_createfile.assert_called_once()
-
- # Test AtftLog._CreateLogFile()
- def MockCheckPath(self, path):
- if path == self.LOG_DIR:
- return True
- else:
- return False
+ # Create the first log file.
+ mock_createfile.assert_not_called()
+ self.assertEqual(atft_log.log_dir_file, os.path.join(
+ self.LOG_DIR, 'atft_log_2'))
+ shutil.rmtree(log_dir)
@patch('atft.AtftLog.Initialize', MagicMock())
- @patch('os.path.exists')
- @patch('os.listdir')
- @patch('__builtin__.open')
- def testAtftLogCreate(self, my_mock_open, mock_listdir, mock_path_exists):
+ def testAtftLogCreate(self):
log_dir = self.LOG_DIR
+ self.fs.create_dir(log_dir)
log_size = 10
log_file_number = 1
- mock_listdir.return_value = []
- mock_path_exists.side_effect = self.MockCheckPath
atft_log = atft.AtftLog(log_dir, log_size, log_file_number)
atft_log.Info = MagicMock()
mock_get_time = MagicMock()
atft_log._GetCurrentTimestamp = mock_get_time
mock_get_time.return_value = self.TEST_TIMESTAMP
atft_log._CreateLogFile()
- my_mock_open.assert_called_once()
- log_file = my_mock_open.call_args[0][0]
mock_get_time.assert_called_once()
- self.assertEqual(log_file, atft_log.log_dir_file)
- self.assertEqual(os.path.join(
- self.LOG_DIR, 'atft_log_' + str(self.TEST_TIMESTAMP)), log_file)
+ log_file_path = os.path.join(
+ log_dir, 'atft_log_' + str(self.TEST_TIMESTAMP))
+ self.assertEqual(log_file_path, atft_log.log_dir_file)
+ self.assertEqual(True, os.path.exists(log_file_path))
+ shutil.rmtree(log_dir)
# Test AtftLog._LimitSize()
def MockListDir(self, dir):
@@ -2216,35 +2136,31 @@ class AtftTest(unittest.TestCase):
def MockCreateFile(self, add_file):
self.files.append(add_file)
- @patch('os.path.getsize')
@patch('atft.AtftLog.Initialize', MagicMock())
- @patch('os.path.isfile')
- @patch('os.path.exists')
- @patch('os.listdir')
- @patch('os.remove')
- @patch('__builtin__.open')
- def testLimitSize(
- self, my_mock_open, mock_remove, mock_listdir, mock_path_exists, mock_isfile,
- mock_getsize):
+ def testLimitSize(self):
log_dir = self.LOG_DIR
- log_size = 10
+ # This means each file would have a maximum size of 10.
+ log_size = 20
log_file_number = 2
- # 1 is older than 2
- self.files = ['atft_log_1', 'atft_log_2']
- mock_listdir.side_effect = self.MockListDir
- mock_isfile.return_value = True
- mock_getsize.return_value = 5
+ self.fs.create_dir(log_dir)
+ log_file1 = os.path.join(log_dir, 'atft_log_1')
+ log_file2 = os.path.join(log_dir, 'atft_log_2')
+ log_file3 = os.path.join(log_dir, 'atft_log_3')
+ self.fs.create_file(log_file1, contents='abcde')
+ self.fs.create_file(log_file2, contents='abcde')
atft_log = atft.AtftLog(log_dir, log_size, log_file_number)
atft_log.Info = MagicMock()
- mock_createfile = MagicMock()
- mock_createfile.side_effect = (
- lambda file='atft_log_3': self.MockCreateFile(file))
- atft_log._CreateLogFile = mock_createfile
- # 5 + 6 should be larger than 10
- atft_log._LimitSize('abcdefg')
- atft_log._CreateLogFile.assert_called_once()
- mock_remove.assert_called_once_with(os.path.join(
- self.LOG_DIR, 'atft_log_1'))
+ mock_get_time = MagicMock()
+ atft_log._GetCurrentTimestamp = mock_get_time
+ mock_get_time.return_value = 3
+ atft_log.log_dir_file = log_file2
+ # Now atft_log_2 should have 11 characters which is larger than 10.
+ # A new file should be created and atft_log_1 should be removed.
+ atft_log._LimitSize('abcdef')
+ self.assertEqual(False, os.path.exists(log_file1))
+ self.assertEqual(True, os.path.exists(log_file2))
+ self.assertEqual(True, os.path.exists(log_file3))
+ shutil.rmtree(log_dir)
# Test ChangeThresholdDialog.OnSave()
def testChangeThresholdDialogSaveNormal(self):
@@ -2367,40 +2283,45 @@ class AtftTest(unittest.TestCase):
mock_atft._SendAlertEvent.assert_called_once()
# Test _SaveFileEventHandler
- @patch('os.path.isfile')
- @patch('os.path.isdir')
@patch('wx.DirDialog')
- def testSaveFileEvent(self, mock_create_dialog, mock_isdir, mock_isfile):
+ def testSaveFileEvent(self, mock_create_dialog):
mock_atft = MockAtft()
message = self.TEST_TEXT
filename = self.TEST_FILENAME
callback = MagicMock()
- mock_isdir.return_value = False
- mock_isfile.return_value = False
- mock_create_dialog.ShowModal = MagicMock()
- mock_create_dialog.ShowModal.return_value = wx.ID_YES
+ mock_dialog = MagicMock()
+ mock_dialog_instance = MagicMock()
+ mock_create_dialog.return_value = mock_dialog
+ mock_dialog.__enter__.return_value = mock_dialog_instance
+ mock_dialog_instance.ShowModal = MagicMock()
+ mock_dialog_instance.ShowModal.return_value = wx.ID_YES
+ mock_dialog_instance.GetPath = MagicMock()
+ mock_dialog_instance.GetPath.return_value = self.TEST_TEXT
data = mock_atft.SaveFileArg(message, filename, callback)
event = MagicMock()
event.GetValue.return_value = data
mock_atft._SaveFileEventHandler(event)
callback.assert_called_once()
- @patch('os.path.isfile')
- @patch('os.path.isdir')
@patch('wx.DirDialog')
def testSaveFileEventFileExists(
- self, mock_create_dialog, mock_isdir, mock_isfile):
+ self, mock_create_dialog):
mock_atft = MockAtft()
mock_atft._ShowWarning = MagicMock()
mock_atft._ShowWarning.return_value = True
message = self.TEST_TEXT
filename = self.TEST_FILENAME
callback = MagicMock()
- mock_isdir.return_value = False
# File already exists, need to give a warning.
- mock_isfile.return_value = True
- mock_create_dialog.ShowModal = MagicMock()
- mock_create_dialog.ShowModal.return_value = wx.ID_YES
+ self.fs.create_file(os.path.join(self.TEST_TEXT, self.TEST_FILENAME))
+ mock_dialog = MagicMock()
+ mock_dialog_instance = MagicMock()
+ mock_create_dialog.return_value = mock_dialog
+ mock_dialog.__enter__.return_value = mock_dialog_instance
+ mock_dialog_instance.ShowModal = MagicMock()
+ mock_dialog_instance.ShowModal.return_value = wx.ID_YES
+ mock_dialog_instance.GetPath = MagicMock()
+ mock_dialog_instance.GetPath.return_value = self.TEST_TEXT
data = mock_atft.SaveFileArg(message, filename, callback)
event = MagicMock()
event.GetValue.return_value = data
@@ -2414,6 +2335,9 @@ class AtftTest(unittest.TestCase):
mock_atft._SaveFileEventHandler(event)
callback.assert_not_called()
+ # Clean the fake fs state.
+ os.remove(os.path.join(self.TEST_TEXT, self.TEST_FILENAME))
+
def testCheckProvisionStepsSuccess(self):
mock_atft = MockAtft()
mock_atft._SendAlertEvent = MagicMock()
@@ -2558,31 +2482,20 @@ class AtftTest(unittest.TestCase):
self.mock_time += 1
return str(self.mock_time - 1)
- def CreateAuditFiles(self, file_path, file_type, show_alert, blocking):
- self.mock_audit_files.append(os.path.basename(file_path))
+ def CreateAuditFiles(self, filepath, file_type, show_alert, blocking):
+ self.fs.create_file(filepath)
return True
- def RemoveAuditFiles(self, file_path):
- self.mock_audit_files.remove(os.path.basename(file_path))
-
- @patch('os.mkdir')
@patch('datetime.datetime')
- @patch('os.path.isfile')
- @patch('os.remove')
- @patch('os.listdir')
- def testAtftAudit(
- self, mock_listdir, mock_remove, mock_isfile, mock_datetime,
- mock_makedir):
+ def testAtftAudit(self, mock_datetime):
download_interval = 2
get_file_handler = MagicMock()
get_file_handler.side_effect = self.CreateAuditFiles
get_atfa_serial = MagicMock()
get_atfa_serial.return_value = self.TEST_SERIAL1
- mock_remove.side_effect = self.RemoveAuditFiles
mock_time = MagicMock()
mock_datetime.utcnow.return_value = mock_time
mock_time.strftime.side_effect = self.IncreaseMockTime
- mock_isfile.return_value = True
handle_exception_handler = MagicMock()
audit_file_path_0 = os.path.join(
@@ -2594,9 +2507,7 @@ class AtftTest(unittest.TestCase):
audit_file_path_3 = os.path.join(
self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit')
- self.mock_audit_files = []
self.mock_time = 0
- mock_listdir.return_value = self.mock_audit_files
test_audit = atft.AtftAudit(
self.AUDIT_DIR,
@@ -2604,52 +2515,58 @@ class AtftTest(unittest.TestCase):
get_file_handler,
handle_exception_handler,
get_atfa_serial)
- mock_makedir.assert_called_with(self.AUDIT_DIR)
+
+ self.assertEqual(True, os.path.exists(self.AUDIT_DIR))
test_audit.PullAudit(10)
get_file_handler.assert_called_once_with(
audit_file_path_0, 'audit', False, True)
- mock_remove.assert_not_called()
get_file_handler.reset_mock()
+ self.assertEqual(True, os.path.isfile(audit_file_path_0))
+
test_audit.PullAudit(9)
get_file_handler.assert_not_called()
+ self.assertEqual(True, os.path.isfile(audit_file_path_0))
+
test_audit.PullAudit(8)
get_file_handler.assert_called_once_with(
audit_file_path_1, 'audit', False, True)
- mock_remove.assert_called_once_with(audit_file_path_0)
- mock_remove.reset_mock()
+ self.assertEqual(False, os.path.isfile(audit_file_path_0))
+ self.assertEqual(True, os.path.isfile(audit_file_path_1))
+
get_file_handler.reset_mock()
test_audit.PullAudit(7)
get_file_handler.assert_not_called()
+ self.assertEqual(False, os.path.isfile(audit_file_path_0))
+ self.assertEqual(True, os.path.isfile(audit_file_path_1))
+
test_audit.PullAudit(6)
get_file_handler.assert_called_once_with(
audit_file_path_2, 'audit', False, True)
get_file_handler.reset_mock()
- mock_remove.assert_called_once_with(audit_file_path_1)
- mock_remove.reset_mock()
+ self.assertEqual(False, os.path.isfile(audit_file_path_1))
+ self.assertEqual(True, os.path.isfile(audit_file_path_2))
+
test_audit.ResetKeysLeft()
test_audit.PullAudit(10)
get_file_handler.assert_called_once_with(
audit_file_path_3, 'audit', False, True)
- mock_remove.assert_called_once_with(audit_file_path_2)
+ self.assertEqual(False, os.path.isfile(audit_file_path_2))
+ self.assertEqual(True, os.path.isfile(audit_file_path_3))
+
+ # Clear state
+ shutil.rmtree(self.AUDIT_DIR)
- @patch('os.mkdir')
@patch('datetime.datetime')
- @patch('os.path.isfile')
- @patch('os.remove')
- @patch('os.listdir')
- def testAtftAuditRemoveMultipleFiles(
- self, mock_listdir, mock_remove, mock_isfile, mock_datetime,
- mock_makedir):
+ def testAtftAuditRemoveMultipleFiles(self, mock_datetime):
+ # If more than one files for one ATFA left, must remove them all.
download_interval = 2
get_file_handler = MagicMock()
get_file_handler.side_effect = self.CreateAuditFiles
get_atfa_serial = MagicMock()
get_atfa_serial.return_value = self.TEST_SERIAL1
- mock_remove.side_effect = self.RemoveAuditFiles
mock_time = MagicMock()
mock_datetime.utcnow.return_value = mock_time
mock_time.strftime.side_effect = self.IncreaseMockTime
- mock_isfile.return_value = True
handle_exception_handler = MagicMock()
audit_file_path_0 = os.path.join(
@@ -2661,13 +2578,13 @@ class AtftTest(unittest.TestCase):
audit_file_path_3 = os.path.join(
self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit')
- # If more than one files for one ATFA left, must remove them all.
- self.mock_audit_files = [
+ mock_audit_files = [
self.TEST_SERIAL1 + '_0.audit',
self.TEST_SERIAL1 + '_1.audit',
self.TEST_SERIAL1 + '_2.audit']
self.mock_time = 3
- mock_listdir.return_value = self.mock_audit_files
+ for mock_audit_file in mock_audit_files:
+ self.fs.create_file(os.path.join(self.AUDIT_DIR, mock_audit_file))
test_audit = atft.AtftAudit(
self.AUDIT_DIR,
@@ -2678,33 +2595,27 @@ class AtftTest(unittest.TestCase):
test_audit.PullAudit(10)
get_file_handler.assert_called_once_with(
audit_file_path_3, 'audit', False, True)
- mock_remove.assert_has_calls([
- call(audit_file_path_0),
- call(audit_file_path_1),
- call(audit_file_path_2)])
+ self.assertEqual(False, os.path.isfile(audit_file_path_0))
+ self.assertEqual(False, os.path.isfile(audit_file_path_1))
+ self.assertEqual(False, os.path.isfile(audit_file_path_2))
+ self.assertEqual(True, os.path.isfile(audit_file_path_3))
+ # Clear state
+ shutil.rmtree(self.AUDIT_DIR)
- @patch('os.mkdir')
@patch('datetime.datetime')
- @patch('os.path.isfile')
- @patch('os.remove')
- @patch('os.listdir')
- def testAtftAuditRemoveMultipleFiles(
- self, mock_listdir, mock_remove, mock_isfile, mock_datetime,
- mock_makedir):
+ def testAtftAuditRemoveMultipleFilesFailure(self, mock_datetime):
+ # If get file fails, must not remove file.
download_interval = 2
get_file_handler = MagicMock()
+ # Get file handler fails.
+ get_file_handler.return_value = False
get_atfa_serial = MagicMock()
get_atfa_serial.return_value = self.TEST_SERIAL1
- mock_remove.side_effect = self.RemoveAuditFiles
mock_time = MagicMock()
mock_datetime.utcnow.return_value = mock_time
mock_time.strftime.side_effect = self.IncreaseMockTime
- mock_isfile.return_value = True
handle_exception_handler = MagicMock()
- # If get file fails, must not remove file.
- get_file_handler.return_value = False
-
audit_file_path_0 = os.path.join(
self.AUDIT_DIR, self.TEST_SERIAL1 + '_0.audit')
audit_file_path_1 = os.path.join(
@@ -2714,12 +2625,13 @@ class AtftTest(unittest.TestCase):
audit_file_path_3 = os.path.join(
self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit')
- self.mock_audit_files = [
+ mock_audit_files = [
self.TEST_SERIAL1 + '_0.audit',
self.TEST_SERIAL1 + '_1.audit',
self.TEST_SERIAL1 + '_2.audit']
self.mock_time = 3
- mock_listdir.return_value = self.mock_audit_files
+ for mock_audit_file in mock_audit_files:
+ self.fs.create_file(os.path.join(self.AUDIT_DIR, mock_audit_file))
test_audit = atft.AtftAudit(
self.AUDIT_DIR,
@@ -2730,26 +2642,23 @@ class AtftTest(unittest.TestCase):
test_audit.PullAudit(10)
get_file_handler.assert_called_once_with(
audit_file_path_3, 'audit', False, True)
- mock_remove.assert_not_called()
+ self.assertEqual(True, os.path.isfile(audit_file_path_0))
+ self.assertEqual(True, os.path.isfile(audit_file_path_1))
+ self.assertEqual(True, os.path.isfile(audit_file_path_2))
+ self.assertEqual(False, os.path.isfile(audit_file_path_3))
+ # Clear state
+ shutil.rmtree(self.AUDIT_DIR)
- @patch('os.mkdir')
@patch('datetime.datetime')
- @patch('os.path.isfile')
- @patch('os.remove')
- @patch('os.listdir')
- def testAtftAuditMultipleATFAs(
- self, mock_listdir, mock_remove, mock_isfile, mock_datetime,
- mock_makedir):
+ def testAtftAuditMultipleATFAs(self, mock_datetime):
download_interval = 2
get_file_handler = MagicMock()
get_file_handler.side_effect = self.CreateAuditFiles
get_atfa_serial = MagicMock()
get_atfa_serial.return_value = self.TEST_SERIAL1
- mock_remove.side_effect = self.RemoveAuditFiles
mock_time = MagicMock()
mock_datetime.utcnow.return_value = mock_time
mock_time.strftime.side_effect = self.IncreaseMockTime
- mock_isfile.return_value = True
handle_exception_handler = MagicMock()
audit_file_path_1_0 = os.path.join(
@@ -2761,9 +2670,7 @@ class AtftTest(unittest.TestCase):
audit_file_path_2_3 = os.path.join(
self.AUDIT_DIR, self.TEST_SERIAL2 + '_3.audit')
- self.mock_audit_files = []
self.mock_time = 0
- mock_listdir.return_value = self.mock_audit_files
test_audit = atft.AtftAudit(
self.AUDIT_DIR,
@@ -2774,8 +2681,8 @@ class AtftTest(unittest.TestCase):
test_audit.PullAudit(10)
get_file_handler.assert_called_once_with(
audit_file_path_1_0, 'audit', False, True)
- mock_remove.assert_not_called()
get_file_handler.reset_mock()
+ self.assertEqual(True, os.path.isfile(audit_file_path_1_0))
# Insert a new ATFA
test_audit.ResetKeysLeft()
@@ -2783,8 +2690,9 @@ class AtftTest(unittest.TestCase):
test_audit.PullAudit(10)
get_file_handler.assert_called_once_with(
audit_file_path_2_1, 'audit', False, True)
- mock_remove.assert_not_called()
get_file_handler.reset_mock()
+ self.assertEqual(True, os.path.isfile(audit_file_path_1_0))
+ self.assertEqual(True, os.path.isfile(audit_file_path_2_1))
# Insert the old one back
test_audit.ResetKeysLeft()
@@ -2792,9 +2700,10 @@ class AtftTest(unittest.TestCase):
test_audit.PullAudit(9)
get_file_handler.assert_called_once_with(
audit_file_path_1_2, 'audit', False, True)
- mock_remove.assert_called_once_with(audit_file_path_1_0)
- mock_remove.reset_mock()
get_file_handler.reset_mock()
+ self.assertEqual(False, os.path.isfile(audit_file_path_1_0))
+ self.assertEqual(True, os.path.isfile(audit_file_path_1_2))
+ self.assertEqual(True, os.path.isfile(audit_file_path_2_1))
# Insert ATFA2 back
test_audit.ResetKeysLeft()
@@ -2802,33 +2711,27 @@ class AtftTest(unittest.TestCase):
test_audit.PullAudit(9)
get_file_handler.assert_called_once_with(
audit_file_path_2_3, 'audit', False, True)
- mock_remove.assert_called_once_with(audit_file_path_2_1)
- mock_remove.reset_mock()
get_file_handler.reset_mock()
+ self.assertEqual(False, os.path.isfile(audit_file_path_1_0))
+ self.assertEqual(True, os.path.isfile(audit_file_path_1_2))
+ self.assertEqual(False, os.path.isfile(audit_file_path_2_1))
+ self.assertEqual(True, os.path.isfile(audit_file_path_2_3))
+ # Clear state
+ shutil.rmtree(self.AUDIT_DIR)
- @patch('os.mkdir')
@patch('datetime.datetime')
- @patch('os.path.isfile')
- @patch('os.remove')
- @patch('os.listdir')
- def testAtftAuditFastbootFailure(
- self, mock_listdir, mock_remove, mock_isfile, mock_datetime,
- mock_makedir):
+ def testAtftAuditFastbootFailure(self, mock_datetime):
download_interval = 2
get_file_handler = MagicMock()
get_file_handler.side_effect = self.CreateAuditFiles
get_atfa_serial = MagicMock()
get_atfa_serial.side_effect = fastboot_exceptions.FastbootFailure('')
- mock_remove.side_effect = self.RemoveAuditFiles
mock_time = MagicMock()
mock_datetime.utcnow.return_value = mock_time
mock_time.strftime.side_effect = self.IncreaseMockTime
- mock_isfile.return_value = True
handle_exception_handler = MagicMock()
- self.mock_audit_files = []
self.mock_time = 0
- mock_listdir.return_value = self.mock_audit_files
test_audit = atft.AtftAudit(
self.AUDIT_DIR,
@@ -2840,6 +2743,9 @@ class AtftTest(unittest.TestCase):
get_file_handler.assert_not_called()
handle_exception_handler.assert_called_once()
+ # Clear state
+ shutil.rmtree(self.AUDIT_DIR)
+
def testAutoMapUSBLocationToSlot(self):
mock_atft = MockAtft()
mock_atft.device_usb_locations = []
@@ -3053,71 +2959,124 @@ class AtftTest(unittest.TestCase):
mock_atft._SendAlertEvent.assert_called_once()
mock_atft.SendUpdateMappingEvent.assert_not_called()
- @patch('os.mkdir')
- @patch('os.path.exists')
- @patch('os.path.isfile')
- @patch('os.listdir')
@patch('threading.Timer')
- @patch('__builtin__.open')
def testProcessKeyFileAutomatically(self,
- mock_create_timer,
- mock_open,
- mock_listdir,
- mock_isfile,
- mock_exists,
- mock_mkdir):
+ mock_create_timer):
+ # Create fake folders.
+ self.fs.create_dir(self.LOG_DIR)
+ self.fs.create_dir(self.KEY_DIR)
key_extension = '*.atfa'
- key1 = self.TEST_TEXT + '_1.atfa'
- key2 = self.TEST_TEXT + '_2.atfa'
+ key1 = self.TEST_ATFA_ID1 + '_1234.atfa'
+ key2 = self.TEST_ATFA_ID2 + '_1234.atfa'
+ key3 = self.TEST_ATFA_ID2 + '_2345.atfa'
process_key_handler = MagicMock()
handle_exception_handler = MagicMock()
get_atfa_serial = MagicMock()
- mock_isfile.return_value = True
- mock_exists.return_value = True
atft_key_handler = atft.AtftKeyHandler(self.KEY_DIR,
self.LOG_DIR,
key_extension,
process_key_handler,
handle_exception_handler,
get_atfa_serial)
- atft_key_handler.processed_keys = {}
+
+ self.fs.create_file(os.path.join(self.KEY_DIR, key1))
+ get_atfa_serial.return_value = self.TEST_ATFA_ID1
atft_key_handler.key_dir = self.KEY_DIR
- mock_listdir.return_value = [key1]
- get_atfa_serial.return_value = self.TEST_TEXT
- atft_key_handler.ProcessKeyFile()
+ atft_key_handler.StartProcessKey()
process_key_handler.assert_called_once_with(
os.path.join(self.KEY_DIR, key1), True)
mock_create_timer.assert_called_once()
- mock_open.assert_called_once()
- self.assertEqual(1, len(atft_key_handler.processed_keys[self.TEST_TEXT]))
- self.assertEqual(True,
- key1 in atft_key_handler.processed_keys[self.TEST_TEXT])
+ self.assertEqual(
+ True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys)
+ self.assertEqual(
+ 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1]))
+ self.assertEqual(
+ True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1])
+
+ # If the key file is not for this ATFA, do not process it.
+ get_atfa_serial.return_value = self.TEST_ATFA_ID2
+ process_key_handler.reset_mock()
+ atft_key_handler.ProcessKeyFile()
+ process_key_handler.assert_not_called()
# If the error is DeviceNotFound, than don't record it to processed keys.
- mock_listdir.return_value = [key2]
+ get_atfa_serial.return_value = self.TEST_ATFA_ID2
+ self.fs.create_file(os.path.join(self.KEY_DIR, key2))
process_key_handler.reset_mock()
process_key_handler.side_effect = (
fastboot_exceptions.DeviceNotFoundException)
- mock_listdir.return_value = [key2]
atft_key_handler.ProcessKeyFile()
process_key_handler.assert_called_once()
- self.assertEqual(1, len(atft_key_handler.processed_keys[self.TEST_TEXT]))
- self.assertEqual(False,
- key2 in atft_key_handler.processed_keys[self.TEST_TEXT])
+ self.assertEqual(
+ False, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys)
# If the error is FastbootFailure, do not add it to processed keys.
- mock_listdir.return_value = [key2]
process_key_handler.reset_mock()
process_key_handler.side_effect = (
fastboot_exceptions.FastbootFailure(''))
- mock_listdir.return_value = [key2]
atft_key_handler.ProcessKeyFile()
process_key_handler.assert_called_once()
- self.assertEqual(1, len(atft_key_handler.processed_keys[self.TEST_TEXT]))
- self.assertEqual(False,
- key2 in atft_key_handler.processed_keys[self.TEST_TEXT])
+ self.assertEqual(
+ False, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys)
handle_exception_handler.assert_called_once()
+ # No error happens, add key2 to processed keys for ATFA_ID2
+ process_key_handler.side_effect = None
+ process_key_handler.reset_mock()
+ handle_exception_handler.reset_mock()
+ atft_key_handler.ProcessKeyFile()
+ process_key_handler.assert_called_once()
+ self.assertEqual(
+ True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys)
+ self.assertEqual(
+ 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2]))
+ self.assertEqual(
+ True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2])
+ handle_exception_handler.assert_not_called()
+
+ # If another key is added to the folder, process it.
+ self.fs.create_file(os.path.join(self.KEY_DIR, key3))
+ process_key_handler.side_effect = None
+ process_key_handler.reset_mock()
+ handle_exception_handler.reset_mock()
+ atft_key_handler.ProcessKeyFile()
+ process_key_handler.assert_called_once()
+ self.assertEqual(
+ True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys)
+ self.assertEqual(
+ 2, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2]))
+ self.assertEqual(
+ True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2])
+ self.assertEqual(
+ True, key3 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2])
+ handle_exception_handler.assert_not_called()
+
+ # Assume the user opens the tool again, processed status should be recovered
+ # from the log.
+ handle_exception_handler.reset_mock()
+ process_key_handler.reset_mock()
+ atft_key_handler.StartProcessKey()
+ process_key_handler.assert_not_called()
+ handle_exception_handler.assert_not_called()
+ self.assertEqual(
+ True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys)
+ self.assertEqual(
+ 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1]))
+ self.assertEqual(
+ True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1])
+ self.assertEqual(
+ True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys)
+ self.assertEqual(
+ 2, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2]))
+ self.assertEqual(
+ True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2])
+ self.assertEqual(
+ True, key3 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2])
+
+ # Clear fake fs state
+ shutil.rmtree(self.KEY_DIR)
+ shutil.rmtree(self.LOG_DIR)
+
if __name__ == '__main__':
unittest.main()