aboutsummaryrefslogtreecommitdiff
path: root/system_tests/system_tests_sync/test_downscoping.py
diff options
context:
space:
mode:
Diffstat (limited to 'system_tests/system_tests_sync/test_downscoping.py')
-rw-r--r--system_tests/system_tests_sync/test_downscoping.py162
1 files changed, 162 insertions, 0 deletions
diff --git a/system_tests/system_tests_sync/test_downscoping.py b/system_tests/system_tests_sync/test_downscoping.py
new file mode 100644
index 0000000..fdb4efa
--- /dev/null
+++ b/system_tests/system_tests_sync/test_downscoping.py
@@ -0,0 +1,162 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import uuid
+
+import google.auth
+
+from google.auth import downscoped
+from google.auth.transport import requests
+from google.cloud import exceptions
+from google.cloud import storage
+from google.oauth2 import credentials
+
+import pytest
+
+ # The object prefix used to test access to files beginning with this prefix.
+_OBJECT_PREFIX = "customer-a"
+# The object name of the object inaccessible by the downscoped token.
+_ACCESSIBLE_OBJECT_NAME = "{0}-data.txt".format(_OBJECT_PREFIX)
+# The content of the object accessible by the downscoped token.
+_ACCESSIBLE_CONTENT = "hello world"
+# The content of the object inaccessible by the downscoped token.
+_INACCESSIBLE_CONTENT = "secret content"
+# The object name of the object inaccessible by the downscoped token.
+_INACCESSIBLE_OBJECT_NAME = "other-customer-data.txt"
+
+
+@pytest.fixture(scope="module")
+def temp_bucket():
+ """Yields a bucket that is deleted after the test completes."""
+ bucket = None
+ while bucket is None or bucket.exists():
+ bucket_name = "auth-python-downscope-test-{}".format(uuid.uuid4())
+ bucket = storage.Client().bucket(bucket_name)
+ bucket = storage.Client().create_bucket(bucket.name)
+ yield bucket
+ bucket.delete(force=True)
+
+
+@pytest.fixture(scope="module")
+def temp_blobs(temp_bucket):
+ """Yields two blobs that are deleted after the test completes."""
+ bucket = temp_bucket
+ # Downscoped tokens will have readonly access to this blob.
+ accessible_blob = bucket.blob(_ACCESSIBLE_OBJECT_NAME)
+ accessible_blob.upload_from_string(_ACCESSIBLE_CONTENT)
+ # Downscoped tokens will have no access to this blob.
+ inaccessible_blob = bucket.blob(_INACCESSIBLE_OBJECT_NAME)
+ inaccessible_blob.upload_from_string(_INACCESSIBLE_CONTENT)
+ yield (accessible_blob, inaccessible_blob)
+ bucket.delete_blobs([accessible_blob, inaccessible_blob])
+
+
+def get_token_from_broker(bucket_name, object_prefix):
+ """Simulates token broker generating downscoped tokens for specified bucket.
+
+ Args:
+ bucket_name (str): The name of the Cloud Storage bucket.
+ object_prefix (str): The prefix string of the object name. This is used
+ to ensure access is restricted to only objects starting with this
+ prefix string.
+
+ Returns:
+ Tuple[str, datetime.datetime]: The downscoped access token and its expiry date.
+ """
+ # Initialize the Credential Access Boundary rules.
+ available_resource = "//storage.googleapis.com/projects/_/buckets/{0}".format(bucket_name)
+ # Downscoped credentials will have readonly access to the resource.
+ available_permissions = ["inRole:roles/storage.objectViewer"]
+ # Only objects starting with the specified prefix string in the object name
+ # will be allowed read access.
+ availability_expression = (
+ "resource.name.startsWith('projects/_/buckets/{0}/objects/{1}')".format(bucket_name, object_prefix)
+ )
+ availability_condition = downscoped.AvailabilityCondition(availability_expression)
+ # Define the single access boundary rule using the above properties.
+ rule = downscoped.AccessBoundaryRule(
+ available_resource=available_resource,
+ available_permissions=available_permissions,
+ availability_condition=availability_condition,
+ )
+ # Define the Credential Access Boundary with all the relevant rules.
+ credential_access_boundary = downscoped.CredentialAccessBoundary(rules=[rule])
+
+ # Retrieve the source credentials via ADC.
+ source_credentials, _ = google.auth.default()
+ if source_credentials.requires_scopes:
+ source_credentials = source_credentials.with_scopes(
+ ["https://www.googleapis.com/auth/cloud-platform"]
+ )
+
+ # Create the downscoped credentials.
+ downscoped_credentials = downscoped.Credentials(
+ source_credentials=source_credentials,
+ credential_access_boundary=credential_access_boundary,
+ )
+
+ # Refresh the tokens.
+ downscoped_credentials.refresh(requests.Request())
+
+ # These values will need to be passed to the token consumer.
+ access_token = downscoped_credentials.token
+ expiry = downscoped_credentials.expiry
+ return (access_token, expiry)
+
+
+def test_downscoping(temp_blobs):
+ """Tests token consumer access to cloud storage using downscoped tokens.
+
+ Args:
+ temp_blobs (Tuple[google.cloud.storage.blob.Blob, ...]): The temporarily
+ created test cloud storage blobs (one readonly accessible, the other
+ not).
+ """
+ accessible_blob, inaccessible_blob = temp_blobs
+ bucket_name = accessible_blob.bucket.name
+ # Create the OAuth credentials from the downscoped token and pass a
+ # refresh handler to handle token expiration. We are passing a
+ # refresh_handler instead of a one-time access token/expiry pair.
+ # This will allow testing this on-demand method for getting access tokens.
+ def refresh_handler(request, scopes=None):
+ # Get readonly access tokens to objects with accessible prefix in
+ # the temporarily created bucket.
+ return get_token_from_broker(bucket_name, _OBJECT_PREFIX)
+
+ creds = credentials.Credentials(
+ None,
+ scopes=["https://www.googleapis.com/auth/cloud-platform"],
+ refresh_handler=refresh_handler,
+ )
+
+ # Initialize a Cloud Storage client with the oauth2 credentials.
+ storage_client = storage.Client(credentials=creds)
+
+ # Test read access succeeds to accessible blob.
+ bucket = storage_client.bucket(bucket_name)
+ blob = bucket.blob(accessible_blob.name)
+ assert blob.download_as_bytes().decode("utf-8") == _ACCESSIBLE_CONTENT
+
+ # Test write access fails.
+ with pytest.raises(exceptions.Forbidden) as excinfo:
+ blob.upload_from_string("Write operations are not allowed")
+
+ assert excinfo.match(r"does not have storage.objects.create access")
+
+ # Test read access fails to inaccessible blob.
+ with pytest.raises(exceptions.Forbidden) as excinfo:
+ bucket.blob(inaccessible_blob.name).download_as_bytes()
+
+ assert excinfo.match(r"does not have storage.objects.get access")