aboutsummaryrefslogtreecommitdiff
path: root/tests/test_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_file.py')
-rw-r--r--tests/test_file.py171
1 files changed, 71 insertions, 100 deletions
diff --git a/tests/test_file.py b/tests/test_file.py
index 80324d6..924acb4 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Unit tests for oauth2client.file."""
+"""Oauth2client.file tests
+
+Unit tests for oauth2client.file
+"""
import copy
import datetime
@@ -21,31 +24,28 @@ import os
import pickle
import stat
import tempfile
-import unittest
-import warnings
-import mock
import six
from six.moves import http_client
-from six.moves import urllib_parse
+import unittest2
-from oauth2client import _helpers
from oauth2client import client
-from oauth2client import file as file_module
-from oauth2client import transport
-from tests import http_mock
+from oauth2client import file
+from .http_mock import HttpMockSequence
try:
# Python2
from future_builtins import oct
-except ImportError: # pragma: NO COVER
+except: # pragma: NO COVER
pass
+__author__ = 'jcgregorio@google.com (Joe Gregorio)'
+
_filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data')
os.close(_filehandle)
-class OAuth2ClientFileTests(unittest.TestCase):
+class OAuth2ClientFileTests(unittest2.TestCase):
def tearDown(self):
try:
@@ -54,7 +54,6 @@ class OAuth2ClientFileTests(unittest.TestCase):
pass
def setUp(self):
- warnings.simplefilter("ignore")
try:
os.unlink(FILENAME)
except OSError:
@@ -75,31 +74,19 @@ class OAuth2ClientFileTests(unittest.TestCase):
user_agent)
return credentials
- @mock.patch('warnings.warn')
- def test_non_existent_file_storage(self, warn_mock):
- storage = file_module.Storage(FILENAME)
- credentials = storage.get()
- warn_mock.assert_called_with(
- _helpers._MISSING_FILE_MESSAGE.format(FILENAME))
- self.assertIsNone(credentials)
-
- def test_directory_file_storage(self):
- storage = file_module.Storage(FILENAME)
- os.mkdir(FILENAME)
- try:
- with self.assertRaises(IOError):
- storage.get()
- finally:
- os.rmdir(FILENAME)
+ def test_non_existent_file_storage(self):
+ s = file.Storage(FILENAME)
+ credentials = s.get()
+ self.assertEquals(None, credentials)
- @unittest.skipIf(not hasattr(os, 'symlink'), 'No symlink available')
+ @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available')
def test_no_sym_link_credentials(self):
SYMFILENAME = FILENAME + '.sym'
os.symlink(FILENAME, SYMFILENAME)
- storage = file_module.Storage(SYMFILENAME)
+ s = file.Storage(SYMFILENAME)
try:
- with self.assertRaises(IOError):
- storage.get()
+ with self.assertRaises(file.CredentialsFileSymbolicLinkError):
+ s.get()
finally:
os.unlink(SYMFILENAME)
@@ -107,20 +94,20 @@ class OAuth2ClientFileTests(unittest.TestCase):
# Write a file with a pickled OAuth2Credentials.
credentials = self._create_test_credentials()
- credentials_file = open(FILENAME, 'wb')
- pickle.dump(credentials, credentials_file)
- credentials_file.close()
+ f = open(FILENAME, 'wb')
+ pickle.dump(credentials, f)
+ f.close()
# Storage should be not be able to read that object, as the capability
# to read and write credentials as pickled objects has been removed.
- storage = file_module.Storage(FILENAME)
- read_credentials = storage.get()
- self.assertIsNone(read_credentials)
+ s = file.Storage(FILENAME)
+ read_credentials = s.get()
+ self.assertEquals(None, read_credentials)
# Now write it back out and confirm it has been rewritten as JSON
- storage.put(credentials)
- with open(FILENAME) as credentials_file:
- data = json.load(credentials_file)
+ s.put(credentials)
+ with open(FILENAME) as f:
+ data = json.load(f)
self.assertEquals(data['access_token'], 'foo')
self.assertEquals(data['_class'], 'OAuth2Credentials')
@@ -131,38 +118,22 @@ class OAuth2ClientFileTests(unittest.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- storage = file_module.Storage(FILENAME)
- storage.put(credentials)
- credentials = storage.get()
+ s = file.Storage(FILENAME)
+ s.put(credentials)
+ credentials = s.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- storage.put(new_cred)
+ s.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
- response_content = json.dumps(token_response).encode('utf-8')
- http = http_mock.HttpMock(data=response_content)
+ http = HttpMockSequence([
+ ({'status': '200'}, json.dumps(token_response).encode('utf-8')),
+ ])
- credentials._refresh(http)
+ credentials._refresh(http.request)
self.assertEquals(credentials.access_token, access_token)
- # Verify mocks.
- self.assertEqual(http.requests, 1)
- self.assertEqual(http.uri, credentials.token_uri)
- self.assertEqual(http.method, 'POST')
- expected_body = {
- 'grant_type': ['refresh_token'],
- 'client_id': [credentials.client_id],
- 'client_secret': [credentials.client_secret],
- 'refresh_token': [credentials.refresh_token],
- }
- self.assertEqual(urllib_parse.parse_qs(http.body), expected_body)
- expected_headers = {
- 'content-type': 'application/x-www-form-urlencoded',
- 'user-agent': credentials.user_agent,
- }
- self.assertEqual(http.headers, expected_headers)
-
def test_token_refresh_store_expires_soon(self):
# Tests the case where an access token that is valid when it is read
# from the store expires before the original request succeeds.
@@ -170,28 +141,28 @@ class OAuth2ClientFileTests(unittest.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- storage = file_module.Storage(FILENAME)
- storage.put(credentials)
- credentials = storage.get()
+ s = file.Storage(FILENAME)
+ s.put(credentials)
+ credentials = s.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- storage.put(new_cred)
+ s.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
- http = http_mock.HttpMockSequence([
- ({'status': http_client.UNAUTHORIZED},
+ http = HttpMockSequence([
+ ({'status': str(int(http_client.UNAUTHORIZED))},
b'Initial token expired'),
- ({'status': http_client.UNAUTHORIZED},
+ ({'status': str(int(http_client.UNAUTHORIZED))},
b'Store token expired'),
- ({'status': http_client.OK},
+ ({'status': str(int(http_client.OK))},
json.dumps(token_response).encode('utf-8')),
- ({'status': http_client.OK},
+ ({'status': str(int(http_client.OK))},
b'Valid response to original request')
])
credentials.authorize(http)
- transport.request(http, 'https://example.com')
+ http.request('https://example.com')
self.assertEqual(credentials.access_token, access_token)
def test_token_refresh_good_store(self):
@@ -199,12 +170,12 @@ class OAuth2ClientFileTests(unittest.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- storage = file_module.Storage(FILENAME)
- storage.put(credentials)
- credentials = storage.get()
+ s = file.Storage(FILENAME)
+ s.put(credentials)
+ credentials = s.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- storage.put(new_cred)
+ s.put(new_cred)
credentials._refresh(None)
self.assertEquals(credentials.access_token, 'bar')
@@ -214,43 +185,43 @@ class OAuth2ClientFileTests(unittest.TestCase):
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
- storage = file_module.Storage(FILENAME)
- storage.put(credentials)
- credentials = storage.get()
+ s = file.Storage(FILENAME)
+ s.put(credentials)
+ credentials = s.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
- storage.put(new_cred)
+ s.put(new_cred)
valid_access_token = '1/3w'
token_response = {'access_token': valid_access_token,
'expires_in': 3600}
- http = http_mock.HttpMockSequence([
- ({'status': http_client.UNAUTHORIZED},
+ http = HttpMockSequence([
+ ({'status': str(int(http_client.UNAUTHORIZED))},
b'Initial token expired'),
- ({'status': http_client.UNAUTHORIZED},
+ ({'status': str(int(http_client.UNAUTHORIZED))},
b'Store token expired'),
- ({'status': http_client.OK},
+ ({'status': str(int(http_client.OK))},
json.dumps(token_response).encode('utf-8')),
- ({'status': http_client.OK}, 'echo_request_body')
+ ({'status': str(int(http_client.OK))}, 'echo_request_body')
])
body = six.StringIO('streaming body')
credentials.authorize(http)
- _, content = transport.request(http, 'https://example.com', body=body)
+ _, content = http.request('https://example.com', body=body)
self.assertEqual(content, 'streaming body')
self.assertEqual(credentials.access_token, valid_access_token)
def test_credentials_delete(self):
credentials = self._create_test_credentials()
- storage = file_module.Storage(FILENAME)
- storage.put(credentials)
- credentials = storage.get()
- self.assertIsNotNone(credentials)
- storage.delete()
- credentials = storage.get()
- self.assertIsNone(credentials)
+ s = file.Storage(FILENAME)
+ s.put(credentials)
+ credentials = s.get()
+ self.assertNotEquals(None, credentials)
+ s.delete()
+ credentials = s.get()
+ self.assertEquals(None, credentials)
def test_access_token_credentials(self):
access_token = 'foo'
@@ -258,11 +229,11 @@ class OAuth2ClientFileTests(unittest.TestCase):
credentials = client.AccessTokenCredentials(access_token, user_agent)
- storage = file_module.Storage(FILENAME)
- credentials = storage.put(credentials)
- credentials = storage.get()
+ s = file.Storage(FILENAME)
+ credentials = s.put(credentials)
+ credentials = s.get()
- self.assertIsNotNone(credentials)
+ self.assertNotEquals(None, credentials)
self.assertEquals('foo', credentials.access_token)
self.assertTrue(os.path.exists(FILENAME))