diff options
Diffstat (limited to 'tests/test_file.py')
-rw-r--r-- | tests/test_file.py | 171 |
1 files changed, 71 insertions, 100 deletions
diff --git a/tests/test_file.py b/tests/test_file.py index 80324d6..924acb4 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Unit tests for oauth2client.file.""" +"""Oauth2client.file tests + +Unit tests for oauth2client.file +""" import copy import datetime @@ -21,31 +24,28 @@ import os import pickle import stat import tempfile -import unittest -import warnings -import mock import six from six.moves import http_client -from six.moves import urllib_parse +import unittest2 -from oauth2client import _helpers from oauth2client import client -from oauth2client import file as file_module -from oauth2client import transport -from tests import http_mock +from oauth2client import file +from .http_mock import HttpMockSequence try: # Python2 from future_builtins import oct -except ImportError: # pragma: NO COVER +except: # pragma: NO COVER pass +__author__ = 'jcgregorio@google.com (Joe Gregorio)' + _filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data') os.close(_filehandle) -class OAuth2ClientFileTests(unittest.TestCase): +class OAuth2ClientFileTests(unittest2.TestCase): def tearDown(self): try: @@ -54,7 +54,6 @@ class OAuth2ClientFileTests(unittest.TestCase): pass def setUp(self): - warnings.simplefilter("ignore") try: os.unlink(FILENAME) except OSError: @@ -75,31 +74,19 @@ class OAuth2ClientFileTests(unittest.TestCase): user_agent) return credentials - @mock.patch('warnings.warn') - def test_non_existent_file_storage(self, warn_mock): - storage = file_module.Storage(FILENAME) - credentials = storage.get() - warn_mock.assert_called_with( - _helpers._MISSING_FILE_MESSAGE.format(FILENAME)) - self.assertIsNone(credentials) - - def test_directory_file_storage(self): - storage = file_module.Storage(FILENAME) - os.mkdir(FILENAME) - try: - with self.assertRaises(IOError): - storage.get() - finally: - os.rmdir(FILENAME) + def test_non_existent_file_storage(self): + s = file.Storage(FILENAME) + credentials = s.get() + self.assertEquals(None, credentials) - @unittest.skipIf(not hasattr(os, 'symlink'), 'No symlink available') + @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available') def test_no_sym_link_credentials(self): SYMFILENAME = FILENAME + '.sym' os.symlink(FILENAME, SYMFILENAME) - storage = file_module.Storage(SYMFILENAME) + s = file.Storage(SYMFILENAME) try: - with self.assertRaises(IOError): - storage.get() + with self.assertRaises(file.CredentialsFileSymbolicLinkError): + s.get() finally: os.unlink(SYMFILENAME) @@ -107,20 +94,20 @@ class OAuth2ClientFileTests(unittest.TestCase): # Write a file with a pickled OAuth2Credentials. credentials = self._create_test_credentials() - credentials_file = open(FILENAME, 'wb') - pickle.dump(credentials, credentials_file) - credentials_file.close() + f = open(FILENAME, 'wb') + pickle.dump(credentials, f) + f.close() # Storage should be not be able to read that object, as the capability # to read and write credentials as pickled objects has been removed. - storage = file_module.Storage(FILENAME) - read_credentials = storage.get() - self.assertIsNone(read_credentials) + s = file.Storage(FILENAME) + read_credentials = s.get() + self.assertEquals(None, read_credentials) # Now write it back out and confirm it has been rewritten as JSON - storage.put(credentials) - with open(FILENAME) as credentials_file: - data = json.load(credentials_file) + s.put(credentials) + with open(FILENAME) as f: + data = json.load(f) self.assertEquals(data['access_token'], 'foo') self.assertEquals(data['_class'], 'OAuth2Credentials') @@ -131,38 +118,22 @@ class OAuth2ClientFileTests(unittest.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - storage = file_module.Storage(FILENAME) - storage.put(credentials) - credentials = storage.get() + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - storage.put(new_cred) + s.put(new_cred) access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} - response_content = json.dumps(token_response).encode('utf-8') - http = http_mock.HttpMock(data=response_content) + http = HttpMockSequence([ + ({'status': '200'}, json.dumps(token_response).encode('utf-8')), + ]) - credentials._refresh(http) + credentials._refresh(http.request) self.assertEquals(credentials.access_token, access_token) - # Verify mocks. - self.assertEqual(http.requests, 1) - self.assertEqual(http.uri, credentials.token_uri) - self.assertEqual(http.method, 'POST') - expected_body = { - 'grant_type': ['refresh_token'], - 'client_id': [credentials.client_id], - 'client_secret': [credentials.client_secret], - 'refresh_token': [credentials.refresh_token], - } - self.assertEqual(urllib_parse.parse_qs(http.body), expected_body) - expected_headers = { - 'content-type': 'application/x-www-form-urlencoded', - 'user-agent': credentials.user_agent, - } - self.assertEqual(http.headers, expected_headers) - def test_token_refresh_store_expires_soon(self): # Tests the case where an access token that is valid when it is read # from the store expires before the original request succeeds. @@ -170,28 +141,28 @@ class OAuth2ClientFileTests(unittest.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - storage = file_module.Storage(FILENAME) - storage.put(credentials) - credentials = storage.get() + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - storage.put(new_cred) + s.put(new_cred) access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} - http = http_mock.HttpMockSequence([ - ({'status': http_client.UNAUTHORIZED}, + http = HttpMockSequence([ + ({'status': str(int(http_client.UNAUTHORIZED))}, b'Initial token expired'), - ({'status': http_client.UNAUTHORIZED}, + ({'status': str(int(http_client.UNAUTHORIZED))}, b'Store token expired'), - ({'status': http_client.OK}, + ({'status': str(int(http_client.OK))}, json.dumps(token_response).encode('utf-8')), - ({'status': http_client.OK}, + ({'status': str(int(http_client.OK))}, b'Valid response to original request') ]) credentials.authorize(http) - transport.request(http, 'https://example.com') + http.request('https://example.com') self.assertEqual(credentials.access_token, access_token) def test_token_refresh_good_store(self): @@ -199,12 +170,12 @@ class OAuth2ClientFileTests(unittest.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - storage = file_module.Storage(FILENAME) - storage.put(credentials) - credentials = storage.get() + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - storage.put(new_cred) + s.put(new_cred) credentials._refresh(None) self.assertEquals(credentials.access_token, 'bar') @@ -214,43 +185,43 @@ class OAuth2ClientFileTests(unittest.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - storage = file_module.Storage(FILENAME) - storage.put(credentials) - credentials = storage.get() + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - storage.put(new_cred) + s.put(new_cred) valid_access_token = '1/3w' token_response = {'access_token': valid_access_token, 'expires_in': 3600} - http = http_mock.HttpMockSequence([ - ({'status': http_client.UNAUTHORIZED}, + http = HttpMockSequence([ + ({'status': str(int(http_client.UNAUTHORIZED))}, b'Initial token expired'), - ({'status': http_client.UNAUTHORIZED}, + ({'status': str(int(http_client.UNAUTHORIZED))}, b'Store token expired'), - ({'status': http_client.OK}, + ({'status': str(int(http_client.OK))}, json.dumps(token_response).encode('utf-8')), - ({'status': http_client.OK}, 'echo_request_body') + ({'status': str(int(http_client.OK))}, 'echo_request_body') ]) body = six.StringIO('streaming body') credentials.authorize(http) - _, content = transport.request(http, 'https://example.com', body=body) + _, content = http.request('https://example.com', body=body) self.assertEqual(content, 'streaming body') self.assertEqual(credentials.access_token, valid_access_token) def test_credentials_delete(self): credentials = self._create_test_credentials() - storage = file_module.Storage(FILENAME) - storage.put(credentials) - credentials = storage.get() - self.assertIsNotNone(credentials) - storage.delete() - credentials = storage.get() - self.assertIsNone(credentials) + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() + self.assertNotEquals(None, credentials) + s.delete() + credentials = s.get() + self.assertEquals(None, credentials) def test_access_token_credentials(self): access_token = 'foo' @@ -258,11 +229,11 @@ class OAuth2ClientFileTests(unittest.TestCase): credentials = client.AccessTokenCredentials(access_token, user_agent) - storage = file_module.Storage(FILENAME) - credentials = storage.put(credentials) - credentials = storage.get() + s = file.Storage(FILENAME) + credentials = s.put(credentials) + credentials = s.get() - self.assertIsNotNone(credentials) + self.assertNotEquals(None, credentials) self.assertEquals('foo', credentials.access_token) self.assertTrue(os.path.exists(FILENAME)) |