diff options
Diffstat (limited to 'tests/test_downscoped.py')
-rw-r--r-- | tests/test_downscoped.py | 696 |
1 files changed, 696 insertions, 0 deletions
diff --git a/tests/test_downscoped.py b/tests/test_downscoped.py new file mode 100644 index 0000000..9ca95f5 --- /dev/null +++ b/tests/test_downscoped.py @@ -0,0 +1,696 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json + +import mock +import pytest +from six.moves import http_client +from six.moves import urllib + +from google.auth import _helpers +from google.auth import credentials +from google.auth import downscoped +from google.auth import exceptions +from google.auth import transport + + +EXPRESSION = ( + "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')" +) +TITLE = "customer-a-objects" +DESCRIPTION = ( + "Condition to make permissions available for objects starting with customer-a" +) +AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket" +AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"] + +OTHER_EXPRESSION = ( + "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')" +) +OTHER_TITLE = "customer-b-objects" +OTHER_DESCRIPTION = ( + "Condition to make permissions available for objects starting with customer-b" +) +OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket" +OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"] +QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" +GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" +REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" +TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token" +SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" +SUCCESS_RESPONSE = { + "access_token": "ACCESS_TOKEN", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + "token_type": "Bearer", + "expires_in": 3600, +} +ERROR_RESPONSE = { + "error": "invalid_grant", + "error_description": "Subject token is invalid.", + "error_uri": "https://tools.ietf.org/html/rfc6749", +} +CREDENTIAL_ACCESS_BOUNDARY_JSON = { + "accessBoundary": { + "accessBoundaryRules": [ + { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + "availabilityCondition": { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + }, + } + ] + } +} + + +class SourceCredentials(credentials.Credentials): + def __init__(self, raise_error=False, expires_in=3600): + super(SourceCredentials, self).__init__() + self._counter = 0 + self._raise_error = raise_error + self._expires_in = expires_in + + def refresh(self, request): + if self._raise_error: + raise exceptions.RefreshError( + "Failed to refresh access token in source credentials." + ) + now = _helpers.utcnow() + self._counter += 1 + self.token = "ACCESS_TOKEN_{}".format(self._counter) + self.expiry = now + datetime.timedelta(seconds=self._expires_in) + + +def make_availability_condition(expression, title=None, description=None): + return downscoped.AvailabilityCondition(expression, title, description) + + +def make_access_boundary_rule( + available_resource, available_permissions, availability_condition=None +): + return downscoped.AccessBoundaryRule( + available_resource, available_permissions, availability_condition + ) + + +def make_credential_access_boundary(rules): + return downscoped.CredentialAccessBoundary(rules) + + +class TestAvailabilityCondition(object): + def test_constructor(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + + assert availability_condition.expression == EXPRESSION + assert availability_condition.title == TITLE + assert availability_condition.description == DESCRIPTION + + def test_constructor_required_params_only(self): + availability_condition = make_availability_condition(EXPRESSION) + + assert availability_condition.expression == EXPRESSION + assert availability_condition.title is None + assert availability_condition.description is None + + def test_setters(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + availability_condition.expression = OTHER_EXPRESSION + availability_condition.title = OTHER_TITLE + availability_condition.description = OTHER_DESCRIPTION + + assert availability_condition.expression == OTHER_EXPRESSION + assert availability_condition.title == OTHER_TITLE + assert availability_condition.description == OTHER_DESCRIPTION + + def test_invalid_expression_type(self): + with pytest.raises(TypeError) as excinfo: + make_availability_condition([EXPRESSION], TITLE, DESCRIPTION) + + assert excinfo.match("The provided expression is not a string.") + + def test_invalid_title_type(self): + with pytest.raises(TypeError) as excinfo: + make_availability_condition(EXPRESSION, False, DESCRIPTION) + + assert excinfo.match("The provided title is not a string or None.") + + def test_invalid_description_type(self): + with pytest.raises(TypeError) as excinfo: + make_availability_condition(EXPRESSION, TITLE, False) + + assert excinfo.match("The provided description is not a string or None.") + + def test_to_json_required_params_only(self): + availability_condition = make_availability_condition(EXPRESSION) + + assert availability_condition.to_json() == {"expression": EXPRESSION} + + def test_to_json_(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + + assert availability_condition.to_json() == { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + } + + +class TestAccessBoundaryRule(object): + def test_constructor(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + + assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE + assert access_boundary_rule.available_permissions == tuple( + AVAILABLE_PERMISSIONS + ) + assert access_boundary_rule.availability_condition == availability_condition + + def test_constructor_required_params_only(self): + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS + ) + + assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE + assert access_boundary_rule.available_permissions == tuple( + AVAILABLE_PERMISSIONS + ) + assert access_boundary_rule.availability_condition is None + + def test_setters(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + other_availability_condition = make_availability_condition( + OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE + access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS + access_boundary_rule.availability_condition = other_availability_condition + + assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE + assert access_boundary_rule.available_permissions == tuple( + OTHER_AVAILABLE_PERMISSIONS + ) + assert ( + access_boundary_rule.availability_condition == other_availability_condition + ) + + def test_invalid_available_resource_type(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + with pytest.raises(TypeError) as excinfo: + make_access_boundary_rule( + None, AVAILABLE_PERMISSIONS, availability_condition + ) + + assert excinfo.match("The provided available_resource is not a string.") + + def test_invalid_available_permissions_type(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + with pytest.raises(TypeError) as excinfo: + make_access_boundary_rule( + AVAILABLE_RESOURCE, [0, 1, 2], availability_condition + ) + + assert excinfo.match( + "Provided available_permissions are not a list of strings." + ) + + def test_invalid_available_permissions_value(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + with pytest.raises(ValueError) as excinfo: + make_access_boundary_rule( + AVAILABLE_RESOURCE, + ["roles/storage.objectViewer"], + availability_condition, + ) + + assert excinfo.match("available_permissions must be prefixed with 'inRole:'.") + + def test_invalid_availability_condition_type(self): + with pytest.raises(TypeError) as excinfo: + make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"} + ) + + assert excinfo.match( + "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None." + ) + + def test_to_json(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + + assert access_boundary_rule.to_json() == { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + "availabilityCondition": { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + }, + } + + def test_to_json_required_params_only(self): + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS + ) + + assert access_boundary_rule.to_json() == { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + } + + +class TestCredentialAccessBoundary(object): + def test_constructor(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + assert credential_access_boundary.rules == tuple(rules) + + def test_setters(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + other_availability_condition = make_availability_condition( + OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION + ) + other_access_boundary_rule = make_access_boundary_rule( + OTHER_AVAILABLE_RESOURCE, + OTHER_AVAILABLE_PERMISSIONS, + other_availability_condition, + ) + other_rules = [other_access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + credential_access_boundary.rules = other_rules + + assert credential_access_boundary.rules == tuple(other_rules) + + def test_add_rule(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] * 9 + credential_access_boundary = make_credential_access_boundary(rules) + + # Add one more rule. This should not raise an error. + additional_access_boundary_rule = make_access_boundary_rule( + OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS + ) + credential_access_boundary.add_rule(additional_access_boundary_rule) + + assert len(credential_access_boundary.rules) == 10 + assert credential_access_boundary.rules[9] == additional_access_boundary_rule + + def test_add_rule_invalid_value(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] * 10 + credential_access_boundary = make_credential_access_boundary(rules) + + # Add one more rule to exceed maximum allowed rules. + with pytest.raises(ValueError) as excinfo: + credential_access_boundary.add_rule(access_boundary_rule) + + assert excinfo.match( + "Credential access boundary rules can have a maximum of 10 rules." + ) + assert len(credential_access_boundary.rules) == 10 + + def test_add_rule_invalid_type(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + # Add an invalid rule to exceed maximum allowed rules. + with pytest.raises(TypeError) as excinfo: + credential_access_boundary.add_rule("invalid") + + assert excinfo.match( + "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." + ) + assert len(credential_access_boundary.rules) == 1 + assert credential_access_boundary.rules[0] == access_boundary_rule + + def test_invalid_rules_type(self): + with pytest.raises(TypeError) as excinfo: + make_credential_access_boundary(["invalid"]) + + assert excinfo.match( + "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." + ) + + def test_invalid_rules_value(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + too_many_rules = [access_boundary_rule] * 11 + with pytest.raises(ValueError) as excinfo: + make_credential_access_boundary(too_many_rules) + + assert excinfo.match( + "Credential access boundary rules can have a maximum of 10 rules." + ) + + def test_to_json(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + assert credential_access_boundary.to_json() == { + "accessBoundary": { + "accessBoundaryRules": [ + { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + "availabilityCondition": { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + }, + } + ] + } + } + + +class TestCredentials(object): + @staticmethod + def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + return downscoped.Credentials( + source_credentials, credential_access_boundary, quota_project_id + ) + + @staticmethod + def make_mock_request(data, status=http_client.OK): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + response.data = json.dumps(data).encode("utf-8") + + request = mock.create_autospec(transport.Request) + request.return_value = response + + return request + + @staticmethod + def assert_request_kwargs(request_kwargs, headers, request_data): + """Asserts the request was called with the expected parameters. + """ + assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) + for (k, v) in body_tuples: + assert v.decode("utf-8") == request_data[k.decode("utf-8")] + assert len(body_tuples) == len(request_data.keys()) + + def test_default_state(self): + credentials = self.make_credentials() + + # No token acquired yet. + assert not credentials.token + assert not credentials.valid + # Expiration hasn't been set yet. + assert not credentials.expiry + assert not credentials.expired + # No quota project ID set. + assert not credentials.quota_project_id + + def test_with_quota_project(self): + credentials = self.make_credentials() + + assert not credentials.quota_project_id + + quota_project_creds = credentials.with_quota_project("project-foo") + + assert quota_project_creds.quota_project_id == "project-foo" + + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_refresh(self, unused_utcnow): + response = SUCCESS_RESPONSE.copy() + # Test custom expiration to confirm expiry is set correctly. + response["expires_in"] = 2800 + expected_expiry = datetime.datetime.min + datetime.timedelta( + seconds=response["expires_in"] + ) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + request_data = { + "grant_type": GRANT_TYPE, + "subject_token": "ACCESS_TOKEN_1", + "subject_token_type": SUBJECT_TOKEN_TYPE, + "requested_token_type": REQUESTED_TOKEN_TYPE, + "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)), + } + request = self.make_mock_request(status=http_client.OK, data=response) + source_credentials = SourceCredentials() + credentials = self.make_credentials(source_credentials=source_credentials) + + # Spy on calls to source credentials refresh to confirm the expected request + # instance is used. + with mock.patch.object( + source_credentials, "refresh", wraps=source_credentials.refresh + ) as wrapped_souce_cred_refresh: + credentials.refresh(request) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert credentials.valid + assert credentials.expiry == expected_expiry + assert not credentials.expired + assert credentials.token == response["access_token"] + # Confirm source credentials called with the same request instance. + wrapped_souce_cred_refresh.assert_called_with(request) + + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_refresh_without_response_expires_in(self, unused_utcnow): + response = SUCCESS_RESPONSE.copy() + # Simulate the response is missing the expires_in field. + # The downscoped token expiration should match the source credentials + # expiration. + del response["expires_in"] + expected_expires_in = 1800 + # Simulate the source credentials generates a token with 1800 second + # expiration time. The generated downscoped token should have the same + # expiration time. + source_credentials = SourceCredentials(expires_in=expected_expires_in) + expected_expiry = datetime.datetime.min + datetime.timedelta( + seconds=expected_expires_in + ) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + request_data = { + "grant_type": GRANT_TYPE, + "subject_token": "ACCESS_TOKEN_1", + "subject_token_type": SUBJECT_TOKEN_TYPE, + "requested_token_type": REQUESTED_TOKEN_TYPE, + "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)), + } + request = self.make_mock_request(status=http_client.OK, data=response) + credentials = self.make_credentials(source_credentials=source_credentials) + + # Spy on calls to source credentials refresh to confirm the expected request + # instance is used. + with mock.patch.object( + source_credentials, "refresh", wraps=source_credentials.refresh + ) as wrapped_souce_cred_refresh: + credentials.refresh(request) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert credentials.valid + assert credentials.expiry == expected_expiry + assert not credentials.expired + assert credentials.token == response["access_token"] + # Confirm source credentials called with the same request instance. + wrapped_souce_cred_refresh.assert_called_with(request) + + def test_refresh_token_exchange_error(self): + request = self.make_mock_request( + status=http_client.BAD_REQUEST, data=ERROR_RESPONSE + ) + credentials = self.make_credentials() + + with pytest.raises(exceptions.OAuthError) as excinfo: + credentials.refresh(request) + + assert excinfo.match( + r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749" + ) + assert not credentials.expired + assert credentials.token is None + + def test_refresh_source_credentials_refresh_error(self): + # Initialize downscoped credentials with source credentials that raise + # an error on refresh. + credentials = self.make_credentials( + source_credentials=SourceCredentials(raise_error=True) + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(mock.sentinel.request) + + assert excinfo.match(r"Failed to refresh access token in source credentials.") + assert not credentials.expired + assert credentials.token is None + + def test_apply_without_quota_project_id(self): + headers = {} + request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) + credentials = self.make_credentials() + + credentials.refresh(request) + credentials.apply(headers) + + assert headers == { + "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]) + } + + def test_apply_with_quota_project_id(self): + headers = {"other": "header-value"} + request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) + credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID) + + credentials.refresh(request) + credentials.apply(headers) + + assert headers == { + "other": "header-value", + "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), + "x-goog-user-project": QUOTA_PROJECT_ID, + } + + def test_before_request(self): + headers = {"other": "header-value"} + request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) + credentials = self.make_credentials() + + # First call should call refresh, setting the token. + credentials.before_request(request, "POST", "https://example.com/api", headers) + + assert headers == { + "other": "header-value", + "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), + } + + # Second call shouldn't call refresh (request should be untouched). + credentials.before_request( + mock.sentinel.request, "POST", "https://example.com/api", headers + ) + + assert headers == { + "other": "header-value", + "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), + } + + @mock.patch("google.auth._helpers.utcnow") + def test_before_request_expired(self, utcnow): + headers = {} + request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) + credentials = self.make_credentials() + credentials.token = "token" + utcnow.return_value = datetime.datetime.min + # Set the expiration to one second more than now plus the clock skew + # accommodation. These credentials should be valid. + credentials.expiry = ( + datetime.datetime.min + + _helpers.REFRESH_THRESHOLD + + datetime.timedelta(seconds=1) + ) + + assert credentials.valid + assert not credentials.expired + + credentials.before_request(request, "POST", "https://example.com/api", headers) + + # Cached token should be used. + assert headers == {"authorization": "Bearer token"} + + # Next call should simulate 1 second passed. + utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1) + + assert not credentials.valid + assert credentials.expired + + credentials.before_request(request, "POST", "https://example.com/api", headers) + + # New token should be retrieved. + assert headers == { + "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]) + } |