diff options
author | Ryan Kohler <ryankohler@google.com> | 2021-04-14 11:14:41 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-14 09:14:41 -0600 |
commit | e3836367efad561db863bf50eb452e93293278cc (patch) | |
tree | 7281483784072420d964f135f93f8541819c90e4 | |
parent | 5832fc1626cf16f6862b4dc5f7ad97dc4b590e3d (diff) | |
download | google-auth-library-python-e3836367efad561db863bf50eb452e93293278cc.tar.gz |
test: Create AWS-based external account integration tests (#731)
-rw-r--r-- | scripts/setup_external_accounts.sh | 1 | ||||
-rw-r--r-- | system_tests/system_tests_sync/conftest.py | 32 | ||||
-rw-r--r-- | system_tests/system_tests_sync/test_external_accounts.py | 92 |
3 files changed, 122 insertions, 3 deletions
diff --git a/scripts/setup_external_accounts.sh b/scripts/setup_external_accounts.sh index 2fd04e2..ecc879b 100644 --- a/scripts/setup_external_accounts.sh +++ b/scripts/setup_external_accounts.sh @@ -110,3 +110,4 @@ gcloud iam service-accounts add-iam-policy-binding $service_account_email \ echo "OIDC audience: "$oidc_aud echo "AWS audience: "$aws_aud +echo "AWS role: arn:aws:iam::$aws_account_id:role/$aws_role_name" diff --git a/system_tests/system_tests_sync/conftest.py b/system_tests/system_tests_sync/conftest.py index 37a6fd3..16caa65 100644 --- a/system_tests/system_tests_sync/conftest.py +++ b/system_tests/system_tests_sync/conftest.py @@ -54,15 +54,41 @@ def authorized_user_file(): @pytest.fixture(params=["urllib3", "requests"]) -def http_request(request): +def request_type(request): + yield request.param + + +@pytest.fixture +def http_request(request_type): """A transport.request object.""" - if request.param == "urllib3": + if request_type == "urllib3": yield google.auth.transport.urllib3.Request(URLLIB3_HTTP) - elif request.param == "requests": + elif request_type == "requests": yield google.auth.transport.requests.Request(REQUESTS_SESSION) @pytest.fixture +def authenticated_request(request_type): + """A transport.request object that takes credentials""" + if request_type == "urllib3": + + def wrapper(credentials): + return google.auth.transport.urllib3.AuthorizedHttp( + credentials, http=URLLIB3_HTTP + ).request + + yield wrapper + elif request_type == "requests": + + def wrapper(credentials): + session = google.auth.transport.requests.AuthorizedSession(credentials) + session.verify = False + return google.auth.transport.requests.Request(session) + + yield wrapper + + +@pytest.fixture def token_info(http_request): """Returns a function that obtains OAuth2 token info.""" diff --git a/system_tests/system_tests_sync/test_external_accounts.py b/system_tests/system_tests_sync/test_external_accounts.py index db6f281..e24c7b4 100644 --- a/system_tests/system_tests_sync/test_external_accounts.py +++ b/system_tests/system_tests_sync/test_external_accounts.py @@ -48,6 +48,8 @@ from mock import patch # Populate values from the output of scripts/setup_external_accounts.sh. _AUDIENCE_OIDC = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/oidc-73wslmxn" +_AUDIENCE_AWS = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/aws-73wslmxn" +_ROLE_AWS = "arn:aws:iam::077071391996:role/ci-python-test" def dns_access_direct(request, project_id): @@ -100,6 +102,27 @@ def service_account_info(service_account_file): yield json.load(f) +@pytest.fixture +def aws_oidc_credentials( + service_account_file, service_account_info, authenticated_request +): + credentials = service_account.Credentials.from_service_account_file( + service_account_file, scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + result = authenticated_request(credentials)( + url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateIdToken".format( + service_account_info["client_email"] + ), + method="POST", + body=json.dumps( + {"audience": service_account_info["client_id"], "includeEmail": True} + ), + ) + assert result.status == 200 + + yield json.loads(result.data)["token"] + + # Our external accounts tests involve setting up some preconditions, setting a # credential file, and then making sure that our client libraries can work with # the set credentials. @@ -115,6 +138,14 @@ def get_project_dns(dns_access, credential_data): return dns_access() +def get_xml_value_by_tagname(data, tagname): + startIndex = data.index("<{}>".format(tagname)) + if startIndex >= 0: + endIndex = data.index("</{}>".format(tagname), startIndex) + if endIndex > startIndex: + return data[startIndex + len(tagname) + 2 : endIndex] + + # This test makes sure that setting an accesible credential file # works to allow access to Google resources. def test_file_based_external_account( @@ -211,3 +242,64 @@ def test_url_based_external_account(dns_access, oidc_credentials, service_accoun }, }, ) + + +# AWS provider tests for AWS credentials +# The test suite will also run tests for AWS credentials. This works as +# follows. (Note prequisite setup is needed. This is documented in +# setup_external_accounts.sh). +# - iamcredentials:generateIdToken is used to generate a Google ID token using +# the service account access token. The service account client_id is used as +# audience. +# - AWS STS AssumeRoleWithWebIdentity API is used to exchange this token for +# temporary AWS security credentials for a specified AWS ARN role. +# - AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN +# environment variables are set using these credentials before the test is +# run simulating an AWS VM. +# - The test can now be run. +def test_aws_based_external_account( + aws_oidc_credentials, service_account_info, dns_access, http_request +): + + response = http_request( + url=( + "https://sts.amazonaws.com/" + "?Action=AssumeRoleWithWebIdentity" + "&Version=2011-06-15" + "&DurationSeconds=3600" + "&RoleSessionName=python-test" + "&RoleArn={}" + "&WebIdentityToken={}" + ).format(_ROLE_AWS, aws_oidc_credentials) + ) + assert response.status == 200 + + # The returned data is in XML, but loading an XML parser would be overkill. + # Searching the return text manually for the start and finish tag. + data = response.data.decode("utf-8") + + with patch.dict( + os.environ, + { + "AWS_REGION": "us-east-2", + "AWS_ACCESS_KEY_ID": get_xml_value_by_tagname(data, "AccessKeyId"), + "AWS_SECRET_ACCESS_KEY": get_xml_value_by_tagname(data, "SecretAccessKey"), + "AWS_SESSION_TOKEN": get_xml_value_by_tagname(data, "SessionToken"), + }, + ): + assert get_project_dns( + dns_access, + { + "type": "external_account", + "audience": _AUDIENCE_AWS, + "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", + "token_url": "https://sts.googleapis.com/v1/token", + "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + service_account_info["client_email"] + ), + "credential_source": { + "environment_id": "aws1", + "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", + }, + }, + ) |