aboutsummaryrefslogtreecommitdiff
path: root/tests/test_jwt.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_jwt.py')
-rw-r--r--tests/test_jwt.py85
1 files changed, 30 insertions, 55 deletions
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 6502a4a..ecc58e8 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -17,18 +17,19 @@
import os
import tempfile
import time
-import unittest
import mock
-from six.moves import http_client
+import unittest2
from oauth2client import _helpers
from oauth2client import client
from oauth2client import crypt
-from oauth2client import file as file_module
+from oauth2client import file
from oauth2client import service_account
-from oauth2client import transport
-from tests import http_mock
+from .http_mock import HttpMockSequence
+
+
+__author__ = 'jcgregorio@google.com (Joe Gregorio)'
_FORMATS_TO_CONSTRUCTOR_ARGS = {
@@ -46,7 +47,7 @@ def datafile(filename):
return file_obj.read()
-class CryptTests(unittest.TestCase):
+class CryptTests(unittest2.TestCase):
def setUp(self):
self.format_ = 'p12'
@@ -113,30 +114,25 @@ class CryptTests(unittest.TestCase):
self.assertEqual('billy bob', contents['user'])
self.assertEqual('data', contents['metadata']['meta'])
- def _verify_http_mock(self, http):
- self.assertEqual(http.requests, 1)
- self.assertEqual(http.uri, client.ID_TOKEN_VERIFICATION_CERTS)
- self.assertEqual(http.method, 'GET')
- self.assertIsNone(http.body)
- self.assertIsNone(http.headers)
-
def test_verify_id_token_with_certs_uri(self):
jwt = self._create_signed_jwt()
- http = http_mock.HttpMock(data=datafile('certs.json'))
+ http = HttpMockSequence([
+ ({'status': '200'}, datafile('certs.json')),
+ ])
+
contents = client.verify_id_token(
jwt, 'some_audience_address@testing.gserviceaccount.com',
http=http)
self.assertEqual('billy bob', contents['user'])
self.assertEqual('data', contents['metadata']['meta'])
- # Verify mocks.
- self._verify_http_mock(http)
-
def test_verify_id_token_with_certs_uri_default_http(self):
jwt = self._create_signed_jwt()
- http = http_mock.HttpMock(data=datafile('certs.json'))
+ http = HttpMockSequence([
+ ({'status': '200'}, datafile('certs.json')),
+ ])
with mock.patch('oauth2client.transport._CACHED_HTTP', new=http):
contents = client.verify_id_token(
@@ -145,23 +141,17 @@ class CryptTests(unittest.TestCase):
self.assertEqual('billy bob', contents['user'])
self.assertEqual('data', contents['metadata']['meta'])
- # Verify mocks.
- self._verify_http_mock(http)
-
def test_verify_id_token_with_certs_uri_fails(self):
jwt = self._create_signed_jwt()
test_email = 'some_audience_address@testing.gserviceaccount.com'
- http = http_mock.HttpMock(
- headers={'status': http_client.NOT_FOUND},
- data=datafile('certs.json'))
+ http = HttpMockSequence([
+ ({'status': '404'}, datafile('certs.json')),
+ ])
with self.assertRaises(client.VerifyJwtTokenError):
client.verify_id_token(jwt, test_email, http=http)
- # Verify mocks.
- self._verify_http_mock(http)
-
def test_verify_id_token_bad_tokens(self):
private_key = datafile('privatekey.' + self.format_)
@@ -242,16 +232,12 @@ class PEMCryptTestsOpenSSL(CryptTests):
self.verifier = crypt.OpenSSLVerifier
-class SignedJwtAssertionCredentialsTests(unittest.TestCase):
+class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
def setUp(self):
- self.orig_signer = crypt.Signer
self.format_ = 'p12'
crypt.Signer = crypt.OpenSSLSigner
- def tearDown(self):
- crypt.Signer = self.orig_signer
-
def _make_credentials(self):
private_key = datafile('privatekey.' + self.format_)
signer = crypt.Signer.from_string(private_key)
@@ -271,13 +257,12 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
def test_credentials_good(self):
credentials = self._make_credentials()
- http = http_mock.HttpMockSequence([
- ({'status': http_client.OK},
- b'{"access_token":"1/3w","expires_in":3600}'),
- ({'status': http_client.OK}, 'echo_request_headers'),
+ http = HttpMockSequence([
+ ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'),
+ ({'status': '200'}, 'echo_request_headers'),
])
http = credentials.authorize(http)
- resp, content = transport.request(http, 'http://example.org')
+ resp, content = http.request('http://example.org')
self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
def test_credentials_to_from_json(self):
@@ -291,16 +276,14 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
self.assertEqual(credentials._kwargs, restored._kwargs)
def _credentials_refresh(self, credentials):
- http = http_mock.HttpMockSequence([
- ({'status': http_client.OK},
- b'{"access_token":"1/3w","expires_in":3600}'),
- ({'status': http_client.UNAUTHORIZED}, b''),
- ({'status': http_client.OK},
- b'{"access_token":"3/3w","expires_in":3600}'),
- ({'status': http_client.OK}, 'echo_request_headers'),
+ http = HttpMockSequence([
+ ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'),
+ ({'status': '401'}, b''),
+ ({'status': '200'}, b'{"access_token":"3/3w","expires_in":3600}'),
+ ({'status': '200'}, 'echo_request_headers'),
])
http = credentials.authorize(http)
- _, content = transport.request(http, 'http://example.org')
+ _, content = http.request('http://example.org')
return content
def test_credentials_refresh_without_storage(self):
@@ -313,7 +296,7 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
filehandle, filename = tempfile.mkstemp()
os.close(filehandle)
- store = file_module.Storage(filename)
+ store = file.Storage(filename)
store.put(credentials)
credentials.set_store(store)
@@ -327,27 +310,19 @@ class PEMSignedJwtAssertionCredentialsOpenSSLTests(
SignedJwtAssertionCredentialsTests):
def setUp(self):
- self.orig_signer = crypt.Signer
self.format_ = 'pem'
crypt.Signer = crypt.OpenSSLSigner
- def tearDown(self):
- crypt.Signer = self.orig_signer
-
class PEMSignedJwtAssertionCredentialsPyCryptoTests(
SignedJwtAssertionCredentialsTests):
def setUp(self):
- self.orig_signer = crypt.Signer
self.format_ = 'pem'
crypt.Signer = crypt.PyCryptoSigner
- def tearDown(self):
- crypt.Signer = self.orig_signer
-
-class TestHasOpenSSLFlag(unittest.TestCase):
+class TestHasOpenSSLFlag(unittest2.TestCase):
def test_true(self):
self.assertEqual(True, client.HAS_OPENSSL)