aboutsummaryrefslogtreecommitdiff
path: root/tests/test_jwt.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_jwt.py')
-rw-r--r--tests/test_jwt.py85
1 files changed, 55 insertions, 30 deletions
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index ecc58e8..6502a4a 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -17,19 +17,18 @@
import os
import tempfile
import time
+import unittest
import mock
-import unittest2
+from six.moves import http_client
from oauth2client import _helpers
from oauth2client import client
from oauth2client import crypt
-from oauth2client import file
+from oauth2client import file as file_module
from oauth2client import service_account
-from .http_mock import HttpMockSequence
-
-
-__author__ = 'jcgregorio@google.com (Joe Gregorio)'
+from oauth2client import transport
+from tests import http_mock
_FORMATS_TO_CONSTRUCTOR_ARGS = {
@@ -47,7 +46,7 @@ def datafile(filename):
return file_obj.read()
-class CryptTests(unittest2.TestCase):
+class CryptTests(unittest.TestCase):
def setUp(self):
self.format_ = 'p12'
@@ -114,25 +113,30 @@ class CryptTests(unittest2.TestCase):
self.assertEqual('billy bob', contents['user'])
self.assertEqual('data', contents['metadata']['meta'])
+ def _verify_http_mock(self, http):
+ self.assertEqual(http.requests, 1)
+ self.assertEqual(http.uri, client.ID_TOKEN_VERIFICATION_CERTS)
+ self.assertEqual(http.method, 'GET')
+ self.assertIsNone(http.body)
+ self.assertIsNone(http.headers)
+
def test_verify_id_token_with_certs_uri(self):
jwt = self._create_signed_jwt()
- http = HttpMockSequence([
- ({'status': '200'}, datafile('certs.json')),
- ])
-
+ http = http_mock.HttpMock(data=datafile('certs.json'))
contents = client.verify_id_token(
jwt, 'some_audience_address@testing.gserviceaccount.com',
http=http)
self.assertEqual('billy bob', contents['user'])
self.assertEqual('data', contents['metadata']['meta'])
+ # Verify mocks.
+ self._verify_http_mock(http)
+
def test_verify_id_token_with_certs_uri_default_http(self):
jwt = self._create_signed_jwt()
- http = HttpMockSequence([
- ({'status': '200'}, datafile('certs.json')),
- ])
+ http = http_mock.HttpMock(data=datafile('certs.json'))
with mock.patch('oauth2client.transport._CACHED_HTTP', new=http):
contents = client.verify_id_token(
@@ -141,17 +145,23 @@ class CryptTests(unittest2.TestCase):
self.assertEqual('billy bob', contents['user'])
self.assertEqual('data', contents['metadata']['meta'])
+ # Verify mocks.
+ self._verify_http_mock(http)
+
def test_verify_id_token_with_certs_uri_fails(self):
jwt = self._create_signed_jwt()
test_email = 'some_audience_address@testing.gserviceaccount.com'
- http = HttpMockSequence([
- ({'status': '404'}, datafile('certs.json')),
- ])
+ http = http_mock.HttpMock(
+ headers={'status': http_client.NOT_FOUND},
+ data=datafile('certs.json'))
with self.assertRaises(client.VerifyJwtTokenError):
client.verify_id_token(jwt, test_email, http=http)
+ # Verify mocks.
+ self._verify_http_mock(http)
+
def test_verify_id_token_bad_tokens(self):
private_key = datafile('privatekey.' + self.format_)
@@ -232,12 +242,16 @@ class PEMCryptTestsOpenSSL(CryptTests):
self.verifier = crypt.OpenSSLVerifier
-class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
+class SignedJwtAssertionCredentialsTests(unittest.TestCase):
def setUp(self):
+ self.orig_signer = crypt.Signer
self.format_ = 'p12'
crypt.Signer = crypt.OpenSSLSigner
+ def tearDown(self):
+ crypt.Signer = self.orig_signer
+
def _make_credentials(self):
private_key = datafile('privatekey.' + self.format_)
signer = crypt.Signer.from_string(private_key)
@@ -257,12 +271,13 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
def test_credentials_good(self):
credentials = self._make_credentials()
- http = HttpMockSequence([
- ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'),
- ({'status': '200'}, 'echo_request_headers'),
+ http = http_mock.HttpMockSequence([
+ ({'status': http_client.OK},
+ b'{"access_token":"1/3w","expires_in":3600}'),
+ ({'status': http_client.OK}, 'echo_request_headers'),
])
http = credentials.authorize(http)
- resp, content = http.request('http://example.org')
+ resp, content = transport.request(http, 'http://example.org')
self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
def test_credentials_to_from_json(self):
@@ -276,14 +291,16 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
self.assertEqual(credentials._kwargs, restored._kwargs)
def _credentials_refresh(self, credentials):
- http = HttpMockSequence([
- ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'),
- ({'status': '401'}, b''),
- ({'status': '200'}, b'{"access_token":"3/3w","expires_in":3600}'),
- ({'status': '200'}, 'echo_request_headers'),
+ http = http_mock.HttpMockSequence([
+ ({'status': http_client.OK},
+ b'{"access_token":"1/3w","expires_in":3600}'),
+ ({'status': http_client.UNAUTHORIZED}, b''),
+ ({'status': http_client.OK},
+ b'{"access_token":"3/3w","expires_in":3600}'),
+ ({'status': http_client.OK}, 'echo_request_headers'),
])
http = credentials.authorize(http)
- _, content = http.request('http://example.org')
+ _, content = transport.request(http, 'http://example.org')
return content
def test_credentials_refresh_without_storage(self):
@@ -296,7 +313,7 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
filehandle, filename = tempfile.mkstemp()
os.close(filehandle)
- store = file.Storage(filename)
+ store = file_module.Storage(filename)
store.put(credentials)
credentials.set_store(store)
@@ -310,19 +327,27 @@ class PEMSignedJwtAssertionCredentialsOpenSSLTests(
SignedJwtAssertionCredentialsTests):
def setUp(self):
+ self.orig_signer = crypt.Signer
self.format_ = 'pem'
crypt.Signer = crypt.OpenSSLSigner
+ def tearDown(self):
+ crypt.Signer = self.orig_signer
+
class PEMSignedJwtAssertionCredentialsPyCryptoTests(
SignedJwtAssertionCredentialsTests):
def setUp(self):
+ self.orig_signer = crypt.Signer
self.format_ = 'pem'
crypt.Signer = crypt.PyCryptoSigner
+ def tearDown(self):
+ crypt.Signer = self.orig_signer
+
-class TestHasOpenSSLFlag(unittest2.TestCase):
+class TestHasOpenSSLFlag(unittest.TestCase):
def test_true(self):
self.assertEqual(True, client.HAS_OPENSSL)