diff options
author | bojeil-google <bojeil-google@users.noreply.github.com> | 2021-08-16 12:34:28 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-16 12:34:28 -0700 |
commit | ed8e035045dcc3c398efae9e57ba2e204fbcc354 (patch) | |
tree | ecef3215c6c4ae82254e610ad804447afd9d5f2d | |
parent | 5286d6f2bfe78ece3ccbb73c96254800705b0b21 (diff) | |
download | google-auth-library-python-ed8e035045dcc3c398efae9e57ba2e204fbcc354.tar.gz |
test: Add integration tests for downscoping (#837)
This tests creates a temporary bucket and 2 objects in it.
A downscoped token is then created to access only one of the objects (readonly).
The test would then check:
- Read access to accessible object is successful.
- Write access to that object is unsuccessful.
- Read access to the inaccessible object is not successful.
-rw-r--r-- | CONTRIBUTING.rst | 1 | ||||
-rw-r--r-- | system_tests/noxfile.py | 14 | ||||
-rw-r--r-- | system_tests/system_tests_sync/test_downscoping.py | 163 |
3 files changed, 178 insertions, 0 deletions
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 8410899..0e784af 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -97,6 +97,7 @@ Grant the account associated with ``service_account.json`` the following roles. - Pub/Sub Viewer (for gRPC tests) - Storage Object Viewer (for impersonated credentials tests) - DNS Viewer (for workload identity federation tests) +- GCE Storage Bucket Admin (for downscoping tests) ``impersonated_service_account.json`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/system_tests/noxfile.py b/system_tests/noxfile.py index 33e49c3..540727e 100644 --- a/system_tests/noxfile.py +++ b/system_tests/noxfile.py @@ -350,6 +350,20 @@ def external_accounts(session): ) +@nox.session(python=PYTHON_VERSIONS_SYNC) +def downscoping(session): + session.install( + *TEST_DEPENDENCIES_SYNC, + "google-auth", + "google-cloud-storage", + ) + default( + session, + "system_tests_sync/test_downscoping.py", + *session.posargs, + ) + + # ASYNC SYSTEM TESTS @nox.session(python=PYTHON_VERSIONS_ASYNC) diff --git a/system_tests/system_tests_sync/test_downscoping.py b/system_tests/system_tests_sync/test_downscoping.py new file mode 100644 index 0000000..3e62aba --- /dev/null +++ b/system_tests/system_tests_sync/test_downscoping.py @@ -0,0 +1,163 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +import uuid + +import google.auth + +from google.auth import downscoped +from google.auth.transport import requests +from google.cloud import exceptions +from google.cloud import storage +from google.oauth2 import credentials + +import pytest + + # The object prefix used to test access to files beginning with this prefix. +_OBJECT_PREFIX = "customer-a" +# The object name of the object inaccessible by the downscoped token. +_ACCESSIBLE_OBJECT_NAME = f"{_OBJECT_PREFIX}-data.txt" +# The content of the object accessible by the downscoped token. +_ACCESSIBLE_CONTENT = "hello world" +# The content of the object inaccessible by the downscoped token. +_INACCESSIBLE_CONTENT = "secret content" +# The object name of the object inaccessible by the downscoped token. +_INACCESSIBLE_OBJECT_NAME = "other-customer-data.txt" + + +@pytest.fixture +def temp_bucket(): + """Yields a bucket that is deleted after the test completes.""" + bucket = None + while bucket is None or bucket.exists(): + bucket_name = "bucket-downscoping-test-{}".format(uuid.uuid4()) + bucket = storage.Client().bucket(bucket_name) + bucket = storage.Client().create_bucket(bucket.name) + yield bucket + bucket.delete(force=True) + + +@pytest.fixture +def temp_blobs(temp_bucket): + """Yields a blob that is deleted after the test completes.""" + bucket = temp_bucket + # Downscoped tokens will have readonly access to this blob. + accessible_blob = bucket.blob(_ACCESSIBLE_OBJECT_NAME) + accessible_blob.upload_from_string(_ACCESSIBLE_CONTENT) + # Downscoped tokens will have no access to this blob. + inaccessible_blob = bucket.blob(_INACCESSIBLE_OBJECT_NAME) + inaccessible_blob.upload_from_string(_INACCESSIBLE_CONTENT) + yield (accessible_blob, inaccessible_blob) + + +def get_token_from_broker(bucket_name, object_prefix): + """Simulates token broker generating downscoped tokens for specified bucket. + + Args: + bucket_name (str): The name of the Cloud Storage bucket. + object_prefix (str): The prefix string of the object name. This is used + to ensure access is restricted to only objects starting with this + prefix string. + + Returns: + Tuple[str, datetime.datetime]: The downscoped access token and its expiry date. + """ + # Initialize the Credential Access Boundary rules. + available_resource = f"//storage.googleapis.com/projects/_/buckets/{bucket_name}" + # Downscoped credentials will have readonly access to the resource. + available_permissions = ["inRole:roles/storage.objectViewer"] + # Only objects starting with the specified prefix string in the object name + # will be allowed read access. + availability_expression = ( + "resource.name.startsWith('projects/_/buckets/{}/objects/{}')".format( + bucket_name, object_prefix + ) + ) + availability_condition = downscoped.AvailabilityCondition(availability_expression) + # Define the single access boundary rule using the above properties. + rule = downscoped.AccessBoundaryRule( + available_resource=available_resource, + available_permissions=available_permissions, + availability_condition=availability_condition, + ) + # Define the Credential Access Boundary with all the relevant rules. + credential_access_boundary = downscoped.CredentialAccessBoundary(rules=[rule]) + + # Retrieve the source credentials via ADC. + source_credentials, _ = google.auth.default() + if source_credentials.requires_scopes: + source_credentials = source_credentials.with_scopes( + ["https://www.googleapis.com/auth/cloud-platform"] + ) + + # Create the downscoped credentials. + downscoped_credentials = downscoped.Credentials( + source_credentials=source_credentials, + credential_access_boundary=credential_access_boundary, + ) + + # Refresh the tokens. + downscoped_credentials.refresh(requests.Request()) + + # These values will need to be passed to the token consumer. + access_token = downscoped_credentials.token + expiry = downscoped_credentials.expiry + return (access_token, expiry) + + +def test_downscoping(temp_blobs): + """Tests token consumer access to cloud storage using downscoped tokens. + + Args: + temp_blobs (Tuple[google.cloud.storage.blob.Blob, ...]): The temporarily + created test cloud storage blobs (one readonly accessible, the other + not). + """ + accessible_blob, inaccessible_blob = temp_blobs + bucket_name = accessible_blob.bucket.name + # Create the OAuth credentials from the downscoped token and pass a + # refresh handler to handle token expiration. We are passing a + # refresh_handler instead of a one-time access token/expiry pair. + # This will allow testing this on-demand method for getting access tokens. + def refresh_handler(request, scopes=None): + # Get readonly access tokens to objects with accessible prefix in + # the temporarily created bucket. + return get_token_from_broker(bucket_name, _OBJECT_PREFIX) + + creds = credentials.Credentials( + None, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + refresh_handler=refresh_handler, + ) + + # Initialize a Cloud Storage client with the oauth2 credentials. + storage_client = storage.Client(credentials=creds) + + # Test read access succeeds to accessible blob. + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(accessible_blob.name) + assert blob.download_as_bytes().decode("utf-8") == _ACCESSIBLE_CONTENT + + # Test write access fails. + with pytest.raises(exceptions.GoogleCloudError) as excinfo: + blob.upload_from_string("Write operations are not allowed") + + assert excinfo.match(r"does not have storage.objects.create access") + + # Test read access fails to inaccessible blob. + with pytest.raises(exceptions.GoogleCloudError) as excinfo: + bucket.blob(inaccessible_blob.name).download_as_bytes() + + assert excinfo.match(r"does not have storage.objects.get access") |