diff options
Diffstat (limited to 'tests/test_identity_pool.py')
-rw-r--r-- | tests/test_identity_pool.py | 213 |
1 files changed, 211 insertions, 2 deletions
diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index efe11b0..e90e288 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -53,6 +53,11 @@ with open(SUBJECT_TOKEN_JSON_FILE) as fh: TOKEN_URL = "https://sts.googleapis.com/v1/token" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" +WORKFORCE_AUDIENCE = ( + "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID" +) +WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token" +WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER" class TestCredentials(object): @@ -158,6 +163,7 @@ class TestCredentials(object): credential_data=None, scopes=None, default_scopes=None, + workforce_pool_user_project=None, ): """Utility to assert that a credentials are initialized with the expected attributes by calling refresh functionality and confirming response matches @@ -183,6 +189,10 @@ class TestCredentials(object): "subject_token": subject_token, "subject_token_type": subject_token_type, } + if workforce_pool_user_project: + token_request_data["options"] = urllib.parse.quote( + json.dumps({"userProject": workforce_pool_user_project}) + ) if service_account_impersonation_url: # Service account impersonation request/response. @@ -250,6 +260,8 @@ class TestCredentials(object): @classmethod def make_credentials( cls, + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, client_id=None, client_secret=None, quota_project_id=None, @@ -257,10 +269,11 @@ class TestCredentials(object): default_scopes=None, service_account_impersonation_url=None, credential_source=None, + workforce_pool_user_project=None, ): return identity_pool.Credentials( - audience=AUDIENCE, - subject_token_type=SUBJECT_TOKEN_TYPE, + audience=audience, + subject_token_type=subject_token_type, token_url=TOKEN_URL, service_account_impersonation_url=service_account_impersonation_url, credential_source=credential_source, @@ -269,6 +282,7 @@ class TestCredentials(object): quota_project_id=quota_project_id, scopes=scopes, default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -297,6 +311,7 @@ class TestCredentials(object): client_secret=CLIENT_SECRET, credential_source=self.CREDENTIAL_SOURCE_TEXT, quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -321,6 +336,33 @@ class TestCredentials(object): client_secret=None, credential_source=self.CREDENTIAL_SOURCE_TEXT, quota_project_id=None, + workforce_pool_user_project=None, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_info_workforce_pool(self, mock_init): + credentials = identity_pool.Credentials.from_info( + { + "audience": WORKFORCE_AUDIENCE, + "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, + } + ) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=None, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -350,6 +392,7 @@ class TestCredentials(object): client_secret=CLIENT_SECRET, credential_source=self.CREDENTIAL_SOURCE_TEXT, quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -375,6 +418,46 @@ class TestCredentials(object): client_secret=None, credential_source=self.CREDENTIAL_SOURCE_TEXT, quota_project_id=None, + workforce_pool_user_project=None, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_file_workforce_pool(self, mock_init, tmpdir): + info = { + "audience": WORKFORCE_AUDIENCE, + "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = identity_pool.Credentials.from_file(str(config_file)) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=None, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + def test_constructor_nonworkforce_with_workforce_pool_user_project(self): + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + audience=AUDIENCE, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + assert excinfo.match( + "workforce_pool_user_project should not be set for non-workforce " + "pool credentials" ) def test_constructor_invalid_options(self): @@ -430,6 +513,23 @@ class TestCredentials(object): r"Missing subject_token_field_name for JSON credential_source format" ) + def test_info_with_workforce_pool_user_project(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(), + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + assert credentials.info == { + "type": "external_account", + "audience": WORKFORCE_AUDIENCE, + "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, + "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, + } + def test_info_with_file_credential_source(self): credentials = self.make_credentials( credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy() @@ -557,6 +657,115 @@ class TestCredentials(object): default_scopes=["ignored"], ) + def test_refresh_workforce_success_with_client_auth_without_impersonation(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This will be ignored in favor of client auth. + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=None, + ) + + def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This is not needed when client Auth is used. + workforce_pool_user_project=None, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=None, + ) + + def test_refresh_workforce_success_without_client_auth_without_impersonation(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This will not be ignored as client auth is not used. + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + def test_refresh_workforce_success_without_client_auth_with_impersonation(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This will not be ignored as client auth is not used. + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + def test_refresh_text_file_success_without_impersonation_use_default_scopes(self): credentials = self.make_credentials( client_id=CLIENT_ID, |