aboutsummaryrefslogtreecommitdiff
path: root/tests/test_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_file.py')
-rw-r--r--tests/test_file.py171
1 files changed, 100 insertions, 71 deletions
diff --git a/tests/test_file.py b/tests/test_file.py
index 924acb4..80324d6 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -12,10 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Oauth2client.file tests
-
-Unit tests for oauth2client.file
-"""
+"""Unit tests for oauth2client.file."""
import copy
import datetime
@@ -24,28 +21,31 @@ import os
import pickle
import stat
import tempfile
+import unittest
+import warnings
+import mock
import six
from six.moves import http_client
-import unittest2
+from six.moves import urllib_parse
+from oauth2client import _helpers
from oauth2client import client
-from oauth2client import file
-from .http_mock import HttpMockSequence
+from oauth2client import file as file_module
+from oauth2client import transport
+from tests import http_mock
try:
# Python2
from future_builtins import oct
-except: # pragma: NO COVER
+except ImportError: # pragma: NO COVER
pass
-__author__ = 'jcgregorio@google.com (Joe Gregorio)'
-
_filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data')
os.close(_filehandle)
-class OAuth2ClientFileTests(unittest2.TestCase):
+class OAuth2ClientFileTests(unittest.TestCase):
def tearDown(self):
try:
@@ -54,6 +54,7 @@ class OAuth2ClientFileTests(unittest2.TestCase):
pass
def setUp(self):
+ warnings.simplefilter("ignore")
try:
os.unlink(FILENAME)
except OSError:
@@ -74,19 +75,31 @@ class OAuth2ClientFileTests(unittest2.TestCase):
user_agent)
return credentials
- def test_non_existent_file_storage(self):
- s = file.Storage(FILENAME)
- credentials = s.get()
- self.assertEquals(None, credentials)
+ @mock.patch('warnings.warn')
+ def test_non_existent_file_storage(self, warn_mock):
+ storage = file_module.Storage(FILENAME)
+ credentials = storage.get()
+ warn_mock.assert_called_with(
+ _helpers._MISSING_FILE_MESSAGE.format(FILENAME))
+ self.assertIsNone(credentials)
+
+ def test_directory_file_storage(self):
+ storage = file_module.Storage(FILENAME)
+ os.mkdir(FILENAME)
+ try:
+ with self.assertRaises(IOError):
+ storage.get()
+ finally:
+ os.rmdir(FILENAME)
- @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available')
+ @unittest.skipIf(not hasattr(os, 'symlink'), 'No symlink available')
def test_no_sym_link_credentials(self):
SYMFILENAME = FILENAME + '.sym'
os.symlink(FILENAME, SYMFILENAME)
- s = file.Storage(SYMFILENAME)
+ storage = file_module.Storage(SYMFILENAME)
try:
- with self.assertRaises(file.CredentialsFileSymbolicLinkError):
- s.get()
+ with self.assertRaises(IOError):
+ storage.get()
finally:
os.unlink(SYMFILENAME)
@@ -94,20 +107,20 @@ class OAuth2ClientFileTests(unittest2.TestCase):
# Write a file with a pickled OAuth2Credentials.
credentials = self._create_test_credentials()
- f = open(FILENAME, 'wb')
- pickle.dump(credentials, f)
- f.close()
+ credentials_file = open(FILENAME, 'wb')
+ pickle.dump(credentials, credentials_file)
+ credentials_file.close()
# Storage should be not be able to read that object, as the capability
# to read and write credentials as pickled objects has been removed.
- s = file.Storage(FILENAME)
- read_credentials = s.get()
- self.assertEquals(None, read_credentials)
+ storage = file_module.Storage(FILENAME)
+ read_credentials = storage.get()
+ self.assertIsNone(read_credentials)
# Now write it back out and confirm it has been rewritten as JSON
- s.put(credentials)
- with open(FILENAME) as f:
- data = json.load(f)
+ storage.put(credentials)
+ with open(FILENAME) as credentials_file:
+ data = json.load(credentials_file)
self.assertEquals(data['access_token'], 'foo')
self.assertEquals(data['_class'], 'OAuth2Credentials')
@@ -118,22 +131,38 @@ class OAuth2ClientFileTests(unittest2.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- s = file.Storage(FILENAME)
- s.put(credentials)
- credentials = s.get()
+ storage = file_module.Storage(FILENAME)
+ storage.put(credentials)
+ credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- s.put(new_cred)
+ storage.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
- http = HttpMockSequence([
- ({'status': '200'}, json.dumps(token_response).encode('utf-8')),
- ])
+ response_content = json.dumps(token_response).encode('utf-8')
+ http = http_mock.HttpMock(data=response_content)
- credentials._refresh(http.request)
+ credentials._refresh(http)
self.assertEquals(credentials.access_token, access_token)
+ # Verify mocks.
+ self.assertEqual(http.requests, 1)
+ self.assertEqual(http.uri, credentials.token_uri)
+ self.assertEqual(http.method, 'POST')
+ expected_body = {
+ 'grant_type': ['refresh_token'],
+ 'client_id': [credentials.client_id],
+ 'client_secret': [credentials.client_secret],
+ 'refresh_token': [credentials.refresh_token],
+ }
+ self.assertEqual(urllib_parse.parse_qs(http.body), expected_body)
+ expected_headers = {
+ 'content-type': 'application/x-www-form-urlencoded',
+ 'user-agent': credentials.user_agent,
+ }
+ self.assertEqual(http.headers, expected_headers)
+
def test_token_refresh_store_expires_soon(self):
# Tests the case where an access token that is valid when it is read
# from the store expires before the original request succeeds.
@@ -141,28 +170,28 @@ class OAuth2ClientFileTests(unittest2.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- s = file.Storage(FILENAME)
- s.put(credentials)
- credentials = s.get()
+ storage = file_module.Storage(FILENAME)
+ storage.put(credentials)
+ credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- s.put(new_cred)
+ storage.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
- http = HttpMockSequence([
- ({'status': str(int(http_client.UNAUTHORIZED))},
+ http = http_mock.HttpMockSequence([
+ ({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
- ({'status': str(int(http_client.UNAUTHORIZED))},
+ ({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
- ({'status': str(int(http_client.OK))},
+ ({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
- ({'status': str(int(http_client.OK))},
+ ({'status': http_client.OK},
b'Valid response to original request')
])
credentials.authorize(http)
- http.request('https://example.com')
+ transport.request(http, 'https://example.com')
self.assertEqual(credentials.access_token, access_token)
def test_token_refresh_good_store(self):
@@ -170,12 +199,12 @@ class OAuth2ClientFileTests(unittest2.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- s = file.Storage(FILENAME)
- s.put(credentials)
- credentials = s.get()
+ storage = file_module.Storage(FILENAME)
+ storage.put(credentials)
+ credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- s.put(new_cred)
+ storage.put(new_cred)
credentials._refresh(None)
self.assertEquals(credentials.access_token, 'bar')
@@ -185,43 +214,43 @@ class OAuth2ClientFileTests(unittest2.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- s = file.Storage(FILENAME)
- s.put(credentials)
- credentials = s.get()
+ storage = file_module.Storage(FILENAME)
+ storage.put(credentials)
+ credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- s.put(new_cred)
+ storage.put(new_cred)
valid_access_token = '1/3w'
token_response = {'access_token': valid_access_token,
'expires_in': 3600}
- http = HttpMockSequence([
- ({'status': str(int(http_client.UNAUTHORIZED))},
+ http = http_mock.HttpMockSequence([
+ ({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
- ({'status': str(int(http_client.UNAUTHORIZED))},
+ ({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
- ({'status': str(int(http_client.OK))},
+ ({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
- ({'status': str(int(http_client.OK))}, 'echo_request_body')
+ ({'status': http_client.OK}, 'echo_request_body')
])
body = six.StringIO('streaming body')
credentials.authorize(http)
- _, content = http.request('https://example.com', body=body)
+ _, content = transport.request(http, 'https://example.com', body=body)
self.assertEqual(content, 'streaming body')
self.assertEqual(credentials.access_token, valid_access_token)
def test_credentials_delete(self):
credentials = self._create_test_credentials()
- s = file.Storage(FILENAME)
- s.put(credentials)
- credentials = s.get()
- self.assertNotEquals(None, credentials)
- s.delete()
- credentials = s.get()
- self.assertEquals(None, credentials)
+ storage = file_module.Storage(FILENAME)
+ storage.put(credentials)
+ credentials = storage.get()
+ self.assertIsNotNone(credentials)
+ storage.delete()
+ credentials = storage.get()
+ self.assertIsNone(credentials)
def test_access_token_credentials(self):
access_token = 'foo'
@@ -229,11 +258,11 @@ class OAuth2ClientFileTests(unittest2.TestCase):
credentials = client.AccessTokenCredentials(access_token, user_agent)
- s = file.Storage(FILENAME)
- credentials = s.put(credentials)
- credentials = s.get()
+ storage = file_module.Storage(FILENAME)
+ credentials = storage.put(credentials)
+ credentials = storage.get()
- self.assertNotEquals(None, credentials)
+ self.assertIsNotNone(credentials)
self.assertEquals('foo', credentials.access_token)
self.assertTrue(os.path.exists(FILENAME))