aboutsummaryrefslogtreecommitdiff
path: root/tests/oauth2/test_sts.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/oauth2/test_sts.py')
-rw-r--r--tests/oauth2/test_sts.py395
1 files changed, 395 insertions, 0 deletions
diff --git a/tests/oauth2/test_sts.py b/tests/oauth2/test_sts.py
new file mode 100644
index 0000000..e8e008d
--- /dev/null
+++ b/tests/oauth2/test_sts.py
@@ -0,0 +1,395 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import mock
+import pytest
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import exceptions
+from google.auth import transport
+from google.oauth2 import sts
+from google.oauth2 import utils
+
+CLIENT_ID = "username"
+CLIENT_SECRET = "password"
+# Base64 encoding of "username:password"
+BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
+
+
+class TestStsClient(object):
+ GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
+ RESOURCE = "https://api.example.com/"
+ AUDIENCE = "urn:example:cooperation-context"
+ SCOPES = ["scope1", "scope2"]
+ REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+ SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE"
+ SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
+ ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE"
+ ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
+ TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2"
+ ADDON_HEADERS = {"x-client-version": "0.1.2"}
+ ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}}
+ SUCCESS_RESPONSE = {
+ "access_token": "ACCESS_TOKEN",
+ "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "token_type": "Bearer",
+ "expires_in": 3600,
+ "scope": "scope1 scope2",
+ }
+ ERROR_RESPONSE = {
+ "error": "invalid_request",
+ "error_description": "Invalid subject token",
+ "error_uri": "https://tools.ietf.org/html/rfc6749",
+ }
+ CLIENT_AUTH_BASIC = utils.ClientAuthentication(
+ utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
+ )
+ CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
+ utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
+ )
+
+ @classmethod
+ def make_client(cls, client_auth=None):
+ return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth)
+
+ @classmethod
+ def make_mock_request(cls, data, status=http_client.OK):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+ response.data = json.dumps(data).encode("utf-8")
+
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+
+ return request
+
+ @classmethod
+ def assert_request_kwargs(cls, request_kwargs, headers, request_data):
+ """Asserts the request was called with the expected parameters.
+ """
+ assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT
+ assert request_kwargs["method"] == "POST"
+ assert request_kwargs["headers"] == headers
+ assert request_kwargs["body"] is not None
+ body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
+ for (k, v) in body_tuples:
+ assert v.decode("utf-8") == request_data[k.decode("utf-8")]
+ assert len(body_tuples) == len(request_data.keys())
+
+ def test_exchange_token_full_success_without_auth(self):
+ """Test token exchange success without client authentication using full
+ parameters.
+ """
+ client = self.make_client()
+ headers = self.ADDON_HEADERS.copy()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "resource": self.RESOURCE,
+ "audience": self.AUDIENCE,
+ "scope": " ".join(self.SCOPES),
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "actor_token": self.ACTOR_TOKEN,
+ "actor_token_type": self.ACTOR_TOKEN_TYPE,
+ "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_partial_success_without_auth(self):
+ """Test token exchange success without client authentication using
+ partial (required only) parameters.
+ """
+ client = self.make_client()
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "audience": self.AUDIENCE,
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ grant_type=self.GRANT_TYPE,
+ subject_token=self.SUBJECT_TOKEN,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ audience=self.AUDIENCE,
+ requested_token_type=self.REQUESTED_TOKEN_TYPE,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_non200_without_auth(self):
+ """Test token exchange without client auth responding with non-200 status.
+ """
+ client = self.make_client()
+ request = self.make_mock_request(
+ status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ assert excinfo.match(
+ r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
+ )
+
+ def test_exchange_token_full_success_with_basic_auth(self):
+ """Test token exchange success with basic client authentication using full
+ parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_BASIC)
+ headers = self.ADDON_HEADERS.copy()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING)
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "resource": self.RESOURCE,
+ "audience": self.AUDIENCE,
+ "scope": " ".join(self.SCOPES),
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "actor_token": self.ACTOR_TOKEN,
+ "actor_token_type": self.ACTOR_TOKEN_TYPE,
+ "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_partial_success_with_basic_auth(self):
+ """Test token exchange success with basic client authentication using
+ partial (required only) parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_BASIC)
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
+ }
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "audience": self.AUDIENCE,
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ grant_type=self.GRANT_TYPE,
+ subject_token=self.SUBJECT_TOKEN,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ audience=self.AUDIENCE,
+ requested_token_type=self.REQUESTED_TOKEN_TYPE,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_non200_with_basic_auth(self):
+ """Test token exchange with basic client auth responding with non-200
+ status.
+ """
+ client = self.make_client(self.CLIENT_AUTH_BASIC)
+ request = self.make_mock_request(
+ status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ assert excinfo.match(
+ r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
+ )
+
+ def test_exchange_token_full_success_with_reqbody_auth(self):
+ """Test token exchange success with request body client authenticaiton
+ using full parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
+ headers = self.ADDON_HEADERS.copy()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "resource": self.RESOURCE,
+ "audience": self.AUDIENCE,
+ "scope": " ".join(self.SCOPES),
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "actor_token": self.ACTOR_TOKEN,
+ "actor_token_type": self.ACTOR_TOKEN_TYPE,
+ "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_partial_success_with_reqbody_auth(self):
+ """Test token exchange success with request body client authentication
+ using partial (required only) parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "audience": self.AUDIENCE,
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ grant_type=self.GRANT_TYPE,
+ subject_token=self.SUBJECT_TOKEN,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ audience=self.AUDIENCE,
+ requested_token_type=self.REQUESTED_TOKEN_TYPE,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_non200_with_reqbody_auth(self):
+ """Test token exchange with POST request body client auth responding
+ with non-200 status.
+ """
+ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
+ request = self.make_mock_request(
+ status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ assert excinfo.match(
+ r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
+ )