aboutsummaryrefslogtreecommitdiff
path: root/tests/contrib/test_gce.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/contrib/test_gce.py')
-rw-r--r--tests/contrib/test_gce.py69
1 files changed, 46 insertions, 23 deletions
diff --git a/tests/contrib/test_gce.py b/tests/contrib/test_gce.py
index e71bd44..5f34995 100644
--- a/tests/contrib/test_gce.py
+++ b/tests/contrib/test_gce.py
@@ -16,26 +16,28 @@
import datetime
import json
+import os
+import unittest
-import httplib2
import mock
from six.moves import http_client
-from tests.contrib.test_metadata import request_mock
-import unittest2
+from six.moves import reload_module
from oauth2client import client
+from oauth2client.contrib import _metadata
from oauth2client.contrib import gce
+from tests import http_mock
-__author__ = 'jcgregorio@google.com (Joe Gregorio)'
SERVICE_ACCOUNT_INFO = {
'scopes': ['a', 'b'],
'email': 'a@example.com',
'aliases': ['default']
}
+METADATA_PATH = 'instance/service-accounts/a@example.com/token'
-class AppAssertionCredentialsTests(unittest2.TestCase):
+class AppAssertionCredentialsTests(unittest.TestCase):
def test_constructor(self):
credentials = gce.AppAssertionCredentials()
@@ -68,8 +70,7 @@ class AppAssertionCredentialsTests(unittest2.TestCase):
@mock.patch('oauth2client.contrib._metadata.get_service_account_info',
return_value=SERVICE_ACCOUNT_INFO)
def test_refresh_token(self, get_info, get_token):
- http_request = mock.MagicMock()
- http_mock = mock.MagicMock(request=http_request)
+ http_mock = object()
credentials = gce.AppAssertionCredentials()
credentials.invalid = False
credentials.service_account_email = 'a@example.com'
@@ -77,26 +78,34 @@ class AppAssertionCredentialsTests(unittest2.TestCase):
credentials.get_access_token(http=http_mock)
self.assertEqual(credentials.access_token, 'A')
self.assertTrue(credentials.access_token_expired)
- get_token.assert_called_with(http_request,
+ get_token.assert_called_with(http_mock,
service_account='a@example.com')
credentials.get_access_token(http=http_mock)
self.assertEqual(credentials.access_token, 'B')
self.assertFalse(credentials.access_token_expired)
- get_token.assert_called_with(http_request,
+ get_token.assert_called_with(http_mock,
service_account='a@example.com')
get_info.assert_not_called()
def test_refresh_token_failed_fetch(self):
- http_request = request_mock(
- http_client.NOT_FOUND,
- 'application/json',
- json.dumps({'access_token': 'a', 'expires_in': 100})
- )
+ headers = {
+ 'status': http_client.NOT_FOUND,
+ 'content-type': 'application/json',
+ }
+ response = json.dumps({'access_token': 'a', 'expires_in': 100})
+ http = http_mock.HttpMock(headers=headers, data=response)
credentials = gce.AppAssertionCredentials()
credentials.invalid = False
credentials.service_account_email = 'a@example.com'
with self.assertRaises(client.HttpAccessTokenRefreshError):
- credentials._refresh(http_request)
+ credentials._refresh(http)
+ # Verify mock.
+ self.assertEqual(http.requests, 1)
+ expected_uri = _metadata.METADATA_ROOT + METADATA_PATH
+ self.assertEqual(http.uri, expected_uri)
+ self.assertEqual(http.method, 'GET')
+ self.assertIsNone(http.body)
+ self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
def test_serialization_data(self):
credentials = gce.AppAssertionCredentials()
@@ -115,8 +124,7 @@ class AppAssertionCredentialsTests(unittest2.TestCase):
@mock.patch('oauth2client.contrib._metadata.get_service_account_info',
return_value=SERVICE_ACCOUNT_INFO)
def test_retrieve_scopes(self, metadata):
- http_request = mock.MagicMock()
- http_mock = mock.MagicMock(request=http_request)
+ http_mock = object()
credentials = gce.AppAssertionCredentials()
self.assertTrue(credentials.invalid)
self.assertIsNone(credentials.scopes)
@@ -125,19 +133,18 @@ class AppAssertionCredentialsTests(unittest2.TestCase):
self.assertFalse(credentials.invalid)
credentials.retrieve_scopes(http_mock)
# Assert scopes weren't refetched
- metadata.assert_called_once_with(http_request,
+ metadata.assert_called_once_with(http_mock,
service_account='default')
@mock.patch('oauth2client.contrib._metadata.get_service_account_info',
- side_effect=httplib2.HttpLib2Error('No Such Email'))
+ side_effect=http_client.HTTPException('No Such Email'))
def test_retrieve_scopes_bad_email(self, metadata):
- http_request = mock.MagicMock()
- http_mock = mock.MagicMock(request=http_request)
+ http_mock = object()
credentials = gce.AppAssertionCredentials(email='b@example.com')
- with self.assertRaises(httplib2.HttpLib2Error):
+ with self.assertRaises(http_client.HTTPException):
credentials.retrieve_scopes(http_mock)
- metadata.assert_called_once_with(http_request,
+ metadata.assert_called_once_with(http_mock,
service_account='b@example.com')
def test_save_to_well_known_file(self):
@@ -150,3 +157,19 @@ class AppAssertionCredentialsTests(unittest2.TestCase):
client.save_to_well_known_file(credentials)
finally:
os.path.isdir = ORIGINAL_ISDIR
+
+ def test_custom_metadata_root_from_env(self):
+ headers = {'content-type': 'application/json'}
+ http = http_mock.HttpMock(headers=headers, data='{}')
+ fake_metadata_root = 'another.metadata.service'
+ os.environ['GCE_METADATA_ROOT'] = fake_metadata_root
+ reload_module(_metadata)
+ try:
+ _metadata.get(http, '')
+ finally:
+ del os.environ['GCE_METADATA_ROOT']
+ reload_module(_metadata)
+ # Verify mock.
+ self.assertEqual(http.requests, 1)
+ expected_uri = 'http://{}/computeMetadata/v1/'.format(fake_metadata_root)
+ self.assertEqual(http.uri, expected_uri)