aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbojeil-google <bojeil-google@users.noreply.github.com>2021-08-16 12:34:28 -0700
committerGitHub <noreply@github.com>2021-08-16 12:34:28 -0700
commited8e035045dcc3c398efae9e57ba2e204fbcc354 (patch)
treeecef3215c6c4ae82254e610ad804447afd9d5f2d
parent5286d6f2bfe78ece3ccbb73c96254800705b0b21 (diff)
downloadgoogle-auth-library-python-ed8e035045dcc3c398efae9e57ba2e204fbcc354.tar.gz
test: Add integration tests for downscoping (#837)
This tests creates a temporary bucket and 2 objects in it. A downscoped token is then created to access only one of the objects (readonly). The test would then check: - Read access to accessible object is successful. - Write access to that object is unsuccessful. - Read access to the inaccessible object is not successful.
-rw-r--r--CONTRIBUTING.rst1
-rw-r--r--system_tests/noxfile.py14
-rw-r--r--system_tests/system_tests_sync/test_downscoping.py163
3 files changed, 178 insertions, 0 deletions
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 8410899..0e784af 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -97,6 +97,7 @@ Grant the account associated with ``service_account.json`` the following roles.
- Pub/Sub Viewer (for gRPC tests)
- Storage Object Viewer (for impersonated credentials tests)
- DNS Viewer (for workload identity federation tests)
+- GCE Storage Bucket Admin (for downscoping tests)
``impersonated_service_account.json``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/system_tests/noxfile.py b/system_tests/noxfile.py
index 33e49c3..540727e 100644
--- a/system_tests/noxfile.py
+++ b/system_tests/noxfile.py
@@ -350,6 +350,20 @@ def external_accounts(session):
)
+@nox.session(python=PYTHON_VERSIONS_SYNC)
+def downscoping(session):
+ session.install(
+ *TEST_DEPENDENCIES_SYNC,
+ "google-auth",
+ "google-cloud-storage",
+ )
+ default(
+ session,
+ "system_tests_sync/test_downscoping.py",
+ *session.posargs,
+ )
+
+
# ASYNC SYSTEM TESTS
@nox.session(python=PYTHON_VERSIONS_ASYNC)
diff --git a/system_tests/system_tests_sync/test_downscoping.py b/system_tests/system_tests_sync/test_downscoping.py
new file mode 100644
index 0000000..3e62aba
--- /dev/null
+++ b/system_tests/system_tests_sync/test_downscoping.py
@@ -0,0 +1,163 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import uuid
+
+import google.auth
+
+from google.auth import downscoped
+from google.auth.transport import requests
+from google.cloud import exceptions
+from google.cloud import storage
+from google.oauth2 import credentials
+
+import pytest
+
+ # The object prefix used to test access to files beginning with this prefix.
+_OBJECT_PREFIX = "customer-a"
+# The object name of the object inaccessible by the downscoped token.
+_ACCESSIBLE_OBJECT_NAME = f"{_OBJECT_PREFIX}-data.txt"
+# The content of the object accessible by the downscoped token.
+_ACCESSIBLE_CONTENT = "hello world"
+# The content of the object inaccessible by the downscoped token.
+_INACCESSIBLE_CONTENT = "secret content"
+# The object name of the object inaccessible by the downscoped token.
+_INACCESSIBLE_OBJECT_NAME = "other-customer-data.txt"
+
+
+@pytest.fixture
+def temp_bucket():
+ """Yields a bucket that is deleted after the test completes."""
+ bucket = None
+ while bucket is None or bucket.exists():
+ bucket_name = "bucket-downscoping-test-{}".format(uuid.uuid4())
+ bucket = storage.Client().bucket(bucket_name)
+ bucket = storage.Client().create_bucket(bucket.name)
+ yield bucket
+ bucket.delete(force=True)
+
+
+@pytest.fixture
+def temp_blobs(temp_bucket):
+ """Yields a blob that is deleted after the test completes."""
+ bucket = temp_bucket
+ # Downscoped tokens will have readonly access to this blob.
+ accessible_blob = bucket.blob(_ACCESSIBLE_OBJECT_NAME)
+ accessible_blob.upload_from_string(_ACCESSIBLE_CONTENT)
+ # Downscoped tokens will have no access to this blob.
+ inaccessible_blob = bucket.blob(_INACCESSIBLE_OBJECT_NAME)
+ inaccessible_blob.upload_from_string(_INACCESSIBLE_CONTENT)
+ yield (accessible_blob, inaccessible_blob)
+
+
+def get_token_from_broker(bucket_name, object_prefix):
+ """Simulates token broker generating downscoped tokens for specified bucket.
+
+ Args:
+ bucket_name (str): The name of the Cloud Storage bucket.
+ object_prefix (str): The prefix string of the object name. This is used
+ to ensure access is restricted to only objects starting with this
+ prefix string.
+
+ Returns:
+ Tuple[str, datetime.datetime]: The downscoped access token and its expiry date.
+ """
+ # Initialize the Credential Access Boundary rules.
+ available_resource = f"//storage.googleapis.com/projects/_/buckets/{bucket_name}"
+ # Downscoped credentials will have readonly access to the resource.
+ available_permissions = ["inRole:roles/storage.objectViewer"]
+ # Only objects starting with the specified prefix string in the object name
+ # will be allowed read access.
+ availability_expression = (
+ "resource.name.startsWith('projects/_/buckets/{}/objects/{}')".format(
+ bucket_name, object_prefix
+ )
+ )
+ availability_condition = downscoped.AvailabilityCondition(availability_expression)
+ # Define the single access boundary rule using the above properties.
+ rule = downscoped.AccessBoundaryRule(
+ available_resource=available_resource,
+ available_permissions=available_permissions,
+ availability_condition=availability_condition,
+ )
+ # Define the Credential Access Boundary with all the relevant rules.
+ credential_access_boundary = downscoped.CredentialAccessBoundary(rules=[rule])
+
+ # Retrieve the source credentials via ADC.
+ source_credentials, _ = google.auth.default()
+ if source_credentials.requires_scopes:
+ source_credentials = source_credentials.with_scopes(
+ ["https://www.googleapis.com/auth/cloud-platform"]
+ )
+
+ # Create the downscoped credentials.
+ downscoped_credentials = downscoped.Credentials(
+ source_credentials=source_credentials,
+ credential_access_boundary=credential_access_boundary,
+ )
+
+ # Refresh the tokens.
+ downscoped_credentials.refresh(requests.Request())
+
+ # These values will need to be passed to the token consumer.
+ access_token = downscoped_credentials.token
+ expiry = downscoped_credentials.expiry
+ return (access_token, expiry)
+
+
+def test_downscoping(temp_blobs):
+ """Tests token consumer access to cloud storage using downscoped tokens.
+
+ Args:
+ temp_blobs (Tuple[google.cloud.storage.blob.Blob, ...]): The temporarily
+ created test cloud storage blobs (one readonly accessible, the other
+ not).
+ """
+ accessible_blob, inaccessible_blob = temp_blobs
+ bucket_name = accessible_blob.bucket.name
+ # Create the OAuth credentials from the downscoped token and pass a
+ # refresh handler to handle token expiration. We are passing a
+ # refresh_handler instead of a one-time access token/expiry pair.
+ # This will allow testing this on-demand method for getting access tokens.
+ def refresh_handler(request, scopes=None):
+ # Get readonly access tokens to objects with accessible prefix in
+ # the temporarily created bucket.
+ return get_token_from_broker(bucket_name, _OBJECT_PREFIX)
+
+ creds = credentials.Credentials(
+ None,
+ scopes=["https://www.googleapis.com/auth/cloud-platform"],
+ refresh_handler=refresh_handler,
+ )
+
+ # Initialize a Cloud Storage client with the oauth2 credentials.
+ storage_client = storage.Client(credentials=creds)
+
+ # Test read access succeeds to accessible blob.
+ bucket = storage_client.bucket(bucket_name)
+ blob = bucket.blob(accessible_blob.name)
+ assert blob.download_as_bytes().decode("utf-8") == _ACCESSIBLE_CONTENT
+
+ # Test write access fails.
+ with pytest.raises(exceptions.GoogleCloudError) as excinfo:
+ blob.upload_from_string("Write operations are not allowed")
+
+ assert excinfo.match(r"does not have storage.objects.create access")
+
+ # Test read access fails to inaccessible blob.
+ with pytest.raises(exceptions.GoogleCloudError) as excinfo:
+ bucket.blob(inaccessible_blob.name).download_as_bytes()
+
+ assert excinfo.match(r"does not have storage.objects.get access")