diff options
Diffstat (limited to 'tests/test_file.py')
-rw-r--r-- | tests/test_file.py | 171 |
1 files changed, 100 insertions, 71 deletions
diff --git a/tests/test_file.py b/tests/test_file.py index 924acb4..80324d6 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Oauth2client.file tests - -Unit tests for oauth2client.file -""" +"""Unit tests for oauth2client.file.""" import copy import datetime @@ -24,28 +21,31 @@ import os import pickle import stat import tempfile +import unittest +import warnings +import mock import six from six.moves import http_client -import unittest2 +from six.moves import urllib_parse +from oauth2client import _helpers from oauth2client import client -from oauth2client import file -from .http_mock import HttpMockSequence +from oauth2client import file as file_module +from oauth2client import transport +from tests import http_mock try: # Python2 from future_builtins import oct -except: # pragma: NO COVER +except ImportError: # pragma: NO COVER pass -__author__ = 'jcgregorio@google.com (Joe Gregorio)' - _filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data') os.close(_filehandle) -class OAuth2ClientFileTests(unittest2.TestCase): +class OAuth2ClientFileTests(unittest.TestCase): def tearDown(self): try: @@ -54,6 +54,7 @@ class OAuth2ClientFileTests(unittest2.TestCase): pass def setUp(self): + warnings.simplefilter("ignore") try: os.unlink(FILENAME) except OSError: @@ -74,19 +75,31 @@ class OAuth2ClientFileTests(unittest2.TestCase): user_agent) return credentials - def test_non_existent_file_storage(self): - s = file.Storage(FILENAME) - credentials = s.get() - self.assertEquals(None, credentials) + @mock.patch('warnings.warn') + def test_non_existent_file_storage(self, warn_mock): + storage = file_module.Storage(FILENAME) + credentials = storage.get() + warn_mock.assert_called_with( + _helpers._MISSING_FILE_MESSAGE.format(FILENAME)) + self.assertIsNone(credentials) + + def test_directory_file_storage(self): + storage = file_module.Storage(FILENAME) + os.mkdir(FILENAME) + try: + with self.assertRaises(IOError): + storage.get() + finally: + os.rmdir(FILENAME) - @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available') + @unittest.skipIf(not hasattr(os, 'symlink'), 'No symlink available') def test_no_sym_link_credentials(self): SYMFILENAME = FILENAME + '.sym' os.symlink(FILENAME, SYMFILENAME) - s = file.Storage(SYMFILENAME) + storage = file_module.Storage(SYMFILENAME) try: - with self.assertRaises(file.CredentialsFileSymbolicLinkError): - s.get() + with self.assertRaises(IOError): + storage.get() finally: os.unlink(SYMFILENAME) @@ -94,20 +107,20 @@ class OAuth2ClientFileTests(unittest2.TestCase): # Write a file with a pickled OAuth2Credentials. credentials = self._create_test_credentials() - f = open(FILENAME, 'wb') - pickle.dump(credentials, f) - f.close() + credentials_file = open(FILENAME, 'wb') + pickle.dump(credentials, credentials_file) + credentials_file.close() # Storage should be not be able to read that object, as the capability # to read and write credentials as pickled objects has been removed. - s = file.Storage(FILENAME) - read_credentials = s.get() - self.assertEquals(None, read_credentials) + storage = file_module.Storage(FILENAME) + read_credentials = storage.get() + self.assertIsNone(read_credentials) # Now write it back out and confirm it has been rewritten as JSON - s.put(credentials) - with open(FILENAME) as f: - data = json.load(f) + storage.put(credentials) + with open(FILENAME) as credentials_file: + data = json.load(credentials_file) self.assertEquals(data['access_token'], 'foo') self.assertEquals(data['_class'], 'OAuth2Credentials') @@ -118,22 +131,38 @@ class OAuth2ClientFileTests(unittest2.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() + storage = file_module.Storage(FILENAME) + storage.put(credentials) + credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - s.put(new_cred) + storage.put(new_cred) access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} - http = HttpMockSequence([ - ({'status': '200'}, json.dumps(token_response).encode('utf-8')), - ]) + response_content = json.dumps(token_response).encode('utf-8') + http = http_mock.HttpMock(data=response_content) - credentials._refresh(http.request) + credentials._refresh(http) self.assertEquals(credentials.access_token, access_token) + # Verify mocks. + self.assertEqual(http.requests, 1) + self.assertEqual(http.uri, credentials.token_uri) + self.assertEqual(http.method, 'POST') + expected_body = { + 'grant_type': ['refresh_token'], + 'client_id': [credentials.client_id], + 'client_secret': [credentials.client_secret], + 'refresh_token': [credentials.refresh_token], + } + self.assertEqual(urllib_parse.parse_qs(http.body), expected_body) + expected_headers = { + 'content-type': 'application/x-www-form-urlencoded', + 'user-agent': credentials.user_agent, + } + self.assertEqual(http.headers, expected_headers) + def test_token_refresh_store_expires_soon(self): # Tests the case where an access token that is valid when it is read # from the store expires before the original request succeeds. @@ -141,28 +170,28 @@ class OAuth2ClientFileTests(unittest2.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() + storage = file_module.Storage(FILENAME) + storage.put(credentials) + credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - s.put(new_cred) + storage.put(new_cred) access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} - http = HttpMockSequence([ - ({'status': str(int(http_client.UNAUTHORIZED))}, + http = http_mock.HttpMockSequence([ + ({'status': http_client.UNAUTHORIZED}, b'Initial token expired'), - ({'status': str(int(http_client.UNAUTHORIZED))}, + ({'status': http_client.UNAUTHORIZED}, b'Store token expired'), - ({'status': str(int(http_client.OK))}, + ({'status': http_client.OK}, json.dumps(token_response).encode('utf-8')), - ({'status': str(int(http_client.OK))}, + ({'status': http_client.OK}, b'Valid response to original request') ]) credentials.authorize(http) - http.request('https://example.com') + transport.request(http, 'https://example.com') self.assertEqual(credentials.access_token, access_token) def test_token_refresh_good_store(self): @@ -170,12 +199,12 @@ class OAuth2ClientFileTests(unittest2.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() + storage = file_module.Storage(FILENAME) + storage.put(credentials) + credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - s.put(new_cred) + storage.put(new_cred) credentials._refresh(None) self.assertEquals(credentials.access_token, 'bar') @@ -185,43 +214,43 @@ class OAuth2ClientFileTests(unittest2.TestCase): datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() + storage = file_module.Storage(FILENAME) + storage.put(credentials) + credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' - s.put(new_cred) + storage.put(new_cred) valid_access_token = '1/3w' token_response = {'access_token': valid_access_token, 'expires_in': 3600} - http = HttpMockSequence([ - ({'status': str(int(http_client.UNAUTHORIZED))}, + http = http_mock.HttpMockSequence([ + ({'status': http_client.UNAUTHORIZED}, b'Initial token expired'), - ({'status': str(int(http_client.UNAUTHORIZED))}, + ({'status': http_client.UNAUTHORIZED}, b'Store token expired'), - ({'status': str(int(http_client.OK))}, + ({'status': http_client.OK}, json.dumps(token_response).encode('utf-8')), - ({'status': str(int(http_client.OK))}, 'echo_request_body') + ({'status': http_client.OK}, 'echo_request_body') ]) body = six.StringIO('streaming body') credentials.authorize(http) - _, content = http.request('https://example.com', body=body) + _, content = transport.request(http, 'https://example.com', body=body) self.assertEqual(content, 'streaming body') self.assertEqual(credentials.access_token, valid_access_token) def test_credentials_delete(self): credentials = self._create_test_credentials() - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() - self.assertNotEquals(None, credentials) - s.delete() - credentials = s.get() - self.assertEquals(None, credentials) + storage = file_module.Storage(FILENAME) + storage.put(credentials) + credentials = storage.get() + self.assertIsNotNone(credentials) + storage.delete() + credentials = storage.get() + self.assertIsNone(credentials) def test_access_token_credentials(self): access_token = 'foo' @@ -229,11 +258,11 @@ class OAuth2ClientFileTests(unittest2.TestCase): credentials = client.AccessTokenCredentials(access_token, user_agent) - s = file.Storage(FILENAME) - credentials = s.put(credentials) - credentials = s.get() + storage = file_module.Storage(FILENAME) + credentials = storage.put(credentials) + credentials = storage.get() - self.assertNotEquals(None, credentials) + self.assertIsNotNone(credentials) self.assertEquals('foo', credentials.access_token) self.assertTrue(os.path.exists(FILENAME)) |