aboutsummaryrefslogtreecommitdiff
path: root/tests/test_identity_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_identity_pool.py')
-rw-r--r--tests/test_identity_pool.py213
1 files changed, 211 insertions, 2 deletions
diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py
index efe11b0..e90e288 100644
--- a/tests/test_identity_pool.py
+++ b/tests/test_identity_pool.py
@@ -53,6 +53,11 @@ with open(SUBJECT_TOKEN_JSON_FILE) as fh:
TOKEN_URL = "https://sts.googleapis.com/v1/token"
SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
+WORKFORCE_AUDIENCE = (
+ "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
+)
+WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
+WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
class TestCredentials(object):
@@ -158,6 +163,7 @@ class TestCredentials(object):
credential_data=None,
scopes=None,
default_scopes=None,
+ workforce_pool_user_project=None,
):
"""Utility to assert that a credentials are initialized with the expected
attributes by calling refresh functionality and confirming response matches
@@ -183,6 +189,10 @@ class TestCredentials(object):
"subject_token": subject_token,
"subject_token_type": subject_token_type,
}
+ if workforce_pool_user_project:
+ token_request_data["options"] = urllib.parse.quote(
+ json.dumps({"userProject": workforce_pool_user_project})
+ )
if service_account_impersonation_url:
# Service account impersonation request/response.
@@ -250,6 +260,8 @@ class TestCredentials(object):
@classmethod
def make_credentials(
cls,
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
client_id=None,
client_secret=None,
quota_project_id=None,
@@ -257,10 +269,11 @@ class TestCredentials(object):
default_scopes=None,
service_account_impersonation_url=None,
credential_source=None,
+ workforce_pool_user_project=None,
):
return identity_pool.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
+ audience=audience,
+ subject_token_type=subject_token_type,
token_url=TOKEN_URL,
service_account_impersonation_url=service_account_impersonation_url,
credential_source=credential_source,
@@ -269,6 +282,7 @@ class TestCredentials(object):
quota_project_id=quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
+ workforce_pool_user_project=workforce_pool_user_project,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -297,6 +311,7 @@ class TestCredentials(object):
client_secret=CLIENT_SECRET,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=QUOTA_PROJECT_ID,
+ workforce_pool_user_project=None,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -321,6 +336,33 @@ class TestCredentials(object):
client_secret=None,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=None,
+ workforce_pool_user_project=None,
+ )
+
+ @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
+ def test_from_info_workforce_pool(self, mock_init):
+ credentials = identity_pool.Credentials.from_info(
+ {
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+ }
+ )
+
+ # Confirm identity_pool.Credentials instantiated with expected attributes.
+ assert isinstance(credentials, identity_pool.Credentials)
+ mock_init.assert_called_once_with(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ client_id=None,
+ client_secret=None,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ quota_project_id=None,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -350,6 +392,7 @@ class TestCredentials(object):
client_secret=CLIENT_SECRET,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=QUOTA_PROJECT_ID,
+ workforce_pool_user_project=None,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -375,6 +418,46 @@ class TestCredentials(object):
client_secret=None,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=None,
+ workforce_pool_user_project=None,
+ )
+
+ @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
+ def test_from_file_workforce_pool(self, mock_init, tmpdir):
+ info = {
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+ }
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(info))
+ credentials = identity_pool.Credentials.from_file(str(config_file))
+
+ # Confirm identity_pool.Credentials instantiated with expected attributes.
+ assert isinstance(credentials, identity_pool.Credentials)
+ mock_init.assert_called_once_with(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ client_id=None,
+ client_secret=None,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ quota_project_id=None,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(
+ audience=AUDIENCE,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ assert excinfo.match(
+ "workforce_pool_user_project should not be set for non-workforce "
+ "pool credentials"
)
def test_constructor_invalid_options(self):
@@ -430,6 +513,23 @@ class TestCredentials(object):
r"Missing subject_token_field_name for JSON credential_source format"
)
+ def test_info_with_workforce_pool_user_project(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+ }
+
def test_info_with_file_credential_source(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
@@ -557,6 +657,115 @@ class TestCredentials(object):
default_scopes=["ignored"],
)
+ def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This will be ignored in favor of client auth.
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ basic_auth_encoding=BASIC_AUTH_ENCODING,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=None,
+ )
+
+ def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This is not needed when client Auth is used.
+ workforce_pool_user_project=None,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ basic_auth_encoding=BASIC_AUTH_ENCODING,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=None,
+ )
+
+ def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=None,
+ client_secret=None,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This will not be ignored as client auth is not used.
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ basic_auth_encoding=None,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=None,
+ client_secret=None,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This will not be ignored as client auth is not used.
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ basic_auth_encoding=None,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,