diff options
Diffstat (limited to 'tests/contrib/test_multistore_file.py')
-rw-r--r-- | tests/contrib/test_multistore_file.py | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/tests/contrib/test_multistore_file.py b/tests/contrib/test_multistore_file.py new file mode 100644 index 0000000..b5cb598 --- /dev/null +++ b/tests/contrib/test_multistore_file.py @@ -0,0 +1,383 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for oauth2client.multistore_file.""" + +import datetime +import errno +import os +import stat +import tempfile + +import mock +import unittest2 + +from oauth2client import client +from oauth2client import util +from oauth2client.contrib import locked_file +from oauth2client.contrib import multistore_file + +_filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data') +os.close(_filehandle) + + +class _MockLockedFile(object): + + def __init__(self, filename_str, error_class, error_code): + self.filename_str = filename_str + self.error_class = error_class + self.error_code = error_code + self.open_and_lock_called = False + + def open_and_lock(self): + self.open_and_lock_called = True + raise self.error_class(self.error_code, '') + + def is_locked(self): + return False + + def filename(self): + return self.filename_str + + +class Test__dict_to_tuple_key(unittest2.TestCase): + + def test_key_conversions(self): + key1, val1 = 'somekey', 'some value' + key2, val2 = 'another', 'something else' + key3, val3 = 'onemore', 'foo' + test_dict = { + key1: val1, + key2: val2, + key3: val3, + } + tuple_key = multistore_file._dict_to_tuple_key(test_dict) + + # the resulting key should be naturally sorted + expected_output = ( + (key2, val2), + (key3, val3), + (key1, val1), + ) + self.assertTupleEqual(expected_output, tuple_key) + # check we get the original dictionary back + self.assertDictEqual(test_dict, dict(tuple_key)) + + +class MultistoreFileTests(unittest2.TestCase): + + def tearDown(self): + try: + os.unlink(FILENAME) + except OSError: + pass + + def setUp(self): + try: + os.unlink(FILENAME) + except OSError: + pass + + def _create_test_credentials(self, client_id='some_client_id', + expiration=None): + access_token = 'foo' + client_secret = 'cOuDdkfjxxnv+' + refresh_token = '1/0/a.df219fjls0' + token_expiry = expiration or datetime.datetime.utcnow() + token_uri = 'https://www.google.com/accounts/o8/oauth2/token' + user_agent = 'refresh_checker/1.0' + + credentials = client.OAuth2Credentials( + access_token, client_id, client_secret, + refresh_token, token_expiry, token_uri, + user_agent) + return credentials + + def test_lock_file_raises_ioerror(self): + filehandle, filename = tempfile.mkstemp() + os.close(filehandle) + + try: + for error_code in (errno.EDEADLK, errno.ENOSYS, errno.ENOLCK, + errno.EACCES): + for error_class in (IOError, OSError): + multistore = multistore_file._MultiStore(filename) + multistore._file = _MockLockedFile( + filename, error_class, error_code) + # Should not raise though the underlying file class did. + multistore._lock() + self.assertTrue(multistore._file.open_and_lock_called) + finally: + os.unlink(filename) + + def test_lock_file_raise_unexpected_error(self): + filehandle, filename = tempfile.mkstemp() + os.close(filehandle) + + try: + multistore = multistore_file._MultiStore(filename) + multistore._file = _MockLockedFile(filename, IOError, errno.EBUSY) + with self.assertRaises(IOError): + multistore._lock() + self.assertTrue(multistore._file.open_and_lock_called) + finally: + os.unlink(filename) + + def test_read_only_file_fail_lock(self): + credentials = self._create_test_credentials() + + open(FILENAME, 'a+b').close() + os.chmod(FILENAME, 0o400) + + store = multistore_file.get_credential_storage( + FILENAME, + credentials.client_id, + credentials.user_agent, + ['some-scope', 'some-other-scope']) + + store.put(credentials) + if os.name == 'posix': # pragma: NO COVER + self.assertTrue(store._multistore._read_only) + os.chmod(FILENAME, 0o600) + + def test_read_only_file_fail_lock_no_warning(self): + open(FILENAME, 'a+b').close() + os.chmod(FILENAME, 0o400) + + multistore = multistore_file._MultiStore(FILENAME) + + with mock.patch.object(multistore_file.logger, 'warn') as mock_warn: + multistore._warn_on_readonly = False + multistore._lock() + self.assertFalse(mock_warn.called) + + def test_lock_skip_refresh(self): + with open(FILENAME, 'w') as f: + f.write('123') + os.chmod(FILENAME, 0o400) + + multistore = multistore_file._MultiStore(FILENAME) + + refresh_patch = mock.patch.object( + multistore, '_refresh_data_cache') + + with refresh_patch as refresh_mock: + multistore._data = {} + multistore._lock() + self.assertFalse(refresh_mock.called) + + @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available') + def test_multistore_no_symbolic_link_files(self): + SYMFILENAME = FILENAME + 'sym' + os.symlink(FILENAME, SYMFILENAME) + store = multistore_file.get_credential_storage( + SYMFILENAME, + 'some_client_id', + 'user-agent/1.0', + ['some-scope', 'some-other-scope']) + try: + with self.assertRaises( + locked_file.CredentialsFileSymbolicLinkError): + store.get() + finally: + os.unlink(SYMFILENAME) + + def test_multistore_non_existent_file(self): + store = multistore_file.get_credential_storage( + FILENAME, + 'some_client_id', + 'user-agent/1.0', + ['some-scope', 'some-other-scope']) + + credentials = store.get() + self.assertEquals(None, credentials) + + def test_multistore_file(self): + credentials = self._create_test_credentials() + + store = multistore_file.get_credential_storage( + FILENAME, + credentials.client_id, + credentials.user_agent, + ['some-scope', 'some-other-scope']) + + # Save credentials + store.put(credentials) + credentials = store.get() + + self.assertNotEquals(None, credentials) + self.assertEquals('foo', credentials.access_token) + + # Delete credentials + store.delete() + credentials = store.get() + + self.assertEquals(None, credentials) + + if os.name == 'posix': # pragma: NO COVER + self.assertEquals( + 0o600, stat.S_IMODE(os.stat(FILENAME).st_mode)) + + def test_multistore_file_custom_key(self): + credentials = self._create_test_credentials() + + custom_key = {'myapp': 'testing', 'clientid': 'some client'} + store = multistore_file.get_credential_storage_custom_key( + FILENAME, custom_key) + + store.put(credentials) + stored_credentials = store.get() + + self.assertNotEquals(None, stored_credentials) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) + + store.delete() + stored_credentials = store.get() + + self.assertEquals(None, stored_credentials) + + def test_multistore_file_custom_string_key(self): + credentials = self._create_test_credentials() + + # store with string key + store = multistore_file.get_credential_storage_custom_string_key( + FILENAME, 'mykey') + + store.put(credentials) + stored_credentials = store.get() + + self.assertNotEquals(None, stored_credentials) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) + + # try retrieving with a dictionary + multistore_file.get_credential_storage_custom_string_key( + FILENAME, {'key': 'mykey'}) + stored_credentials = store.get() + self.assertNotEquals(None, stored_credentials) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) + + store.delete() + stored_credentials = store.get() + + self.assertEquals(None, stored_credentials) + + def test_multistore_file_backwards_compatibility(self): + credentials = self._create_test_credentials() + scopes = ['scope1', 'scope2'] + + # store the credentials using the legacy key method + store = multistore_file.get_credential_storage( + FILENAME, 'client_id', 'user_agent', scopes) + store.put(credentials) + + # retrieve the credentials using a custom key that matches the + # legacy key + key = {'clientId': 'client_id', 'userAgent': 'user_agent', + 'scope': util.scopes_to_string(scopes)} + store = multistore_file.get_credential_storage_custom_key( + FILENAME, key) + stored_credentials = store.get() + + self.assertEqual(credentials.access_token, + stored_credentials.access_token) + + def test_multistore_file_get_all_keys(self): + # start with no keys + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals([], keys) + + # store credentials + credentials = self._create_test_credentials(client_id='client1') + custom_key = {'myapp': 'testing', 'clientid': 'client1'} + store1 = multistore_file.get_credential_storage_custom_key( + FILENAME, custom_key) + store1.put(credentials) + + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals([custom_key], keys) + + # store more credentials + credentials = self._create_test_credentials(client_id='client2') + string_key = 'string_key' + store2 = multistore_file.get_credential_storage_custom_string_key( + FILENAME, string_key) + store2.put(credentials) + + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals(2, len(keys)) + self.assertTrue(custom_key in keys) + self.assertTrue({'key': string_key} in keys) + + # back to no keys + store1.delete() + store2.delete() + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals([], keys) + + def _refresh_data_cache_helper(self): + multistore = multistore_file._MultiStore(FILENAME) + json_patch = mock.patch.object(multistore, '_locked_json_read') + + return multistore, json_patch + + def test__refresh_data_cache_bad_json(self): + multistore, json_patch = self._refresh_data_cache_helper() + + with json_patch as json_mock: + json_mock.side_effect = ValueError('') + multistore._refresh_data_cache() + self.assertTrue(json_mock.called) + self.assertEqual(multistore._data, {}) + + def test__refresh_data_cache_bad_version(self): + multistore, json_patch = self._refresh_data_cache_helper() + + with json_patch as json_mock: + json_mock.return_value = {} + multistore._refresh_data_cache() + self.assertTrue(json_mock.called) + self.assertEqual(multistore._data, {}) + + def test__refresh_data_cache_newer_version(self): + multistore, json_patch = self._refresh_data_cache_helper() + + with json_patch as json_mock: + json_mock.return_value = {'file_version': 5} + with self.assertRaises(multistore_file.NewerCredentialStoreError): + multistore._refresh_data_cache() + self.assertTrue(json_mock.called) + + def test__refresh_data_cache_bad_credentials(self): + multistore, json_patch = self._refresh_data_cache_helper() + + with json_patch as json_mock: + json_mock.return_value = { + 'file_version': 1, + 'data': [ + {'lol': 'this is a bad credential object.'} + ]} + multistore._refresh_data_cache() + self.assertTrue(json_mock.called) + self.assertEqual(multistore._data, {}) + + def test__delete_credential_nonexistent(self): + multistore = multistore_file._MultiStore(FILENAME) + + with mock.patch.object(multistore, '_write') as write_mock: + multistore._data = {} + multistore._delete_credential('nonexistent_key') + self.assertTrue(write_mock.called) |