aboutsummaryrefslogtreecommitdiff
path: root/tests/test_aws.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_aws.py')
-rw-r--r--tests/test_aws.py1497
1 files changed, 1497 insertions, 0 deletions
diff --git a/tests/test_aws.py b/tests/test_aws.py
new file mode 100644
index 0000000..9ca08d5
--- /dev/null
+++ b/tests/test_aws.py
@@ -0,0 +1,1497 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+
+import mock
+import pytest
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import _helpers
+from google.auth import aws
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.auth import transport
+
+
+CLIENT_ID = "username"
+CLIENT_SECRET = "password"
+# Base64 encoding of "username:password".
+BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
+SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
+SERVICE_ACCOUNT_IMPERSONATION_URL = (
+ "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
+ + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
+)
+QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
+SCOPES = ["scope1", "scope2"]
+TOKEN_URL = "https://sts.googleapis.com/v1/token"
+SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"
+AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
+REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
+SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
+CRED_VERIFICATION_URL = (
+ "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
+)
+# Sample AWS security credentials to be used with tests that require a session token.
+ACCESS_KEY_ID = "ASIARD4OQDT6A77FR3CL"
+SECRET_ACCESS_KEY = "Y8AfSaucF37G4PpvfguKZ3/l7Id4uocLXxX0+VTx"
+TOKEN = "IQoJb3JpZ2luX2VjEIz//////////wEaCXVzLWVhc3QtMiJGMEQCIH7MHX/Oy/OB8OlLQa9GrqU1B914+iMikqWQW7vPCKlgAiA/Lsv8Jcafn14owfxXn95FURZNKaaphj0ykpmS+Ki+CSq0AwhlEAAaDDA3NzA3MTM5MTk5NiIMx9sAeP1ovlMTMKLjKpEDwuJQg41/QUKx0laTZYjPlQvjwSqS3OB9P1KAXPWSLkliVMMqaHqelvMF/WO/glv3KwuTfQsavRNs3v5pcSEm4SPO3l7mCs7KrQUHwGP0neZhIKxEXy+Ls//1C/Bqt53NL+LSbaGv6RPHaX82laz2qElphg95aVLdYgIFY6JWV5fzyjgnhz0DQmy62/Vi8pNcM2/VnxeCQ8CC8dRDSt52ry2v+nc77vstuI9xV5k8mPtnaPoJDRANh0bjwY5Sdwkbp+mGRUJBAQRlNgHUJusefXQgVKBCiyJY4w3Csd8Bgj9IyDV+Azuy1jQqfFZWgP68LSz5bURyIjlWDQunO82stZ0BgplKKAa/KJHBPCp8Qi6i99uy7qh76FQAqgVTsnDuU6fGpHDcsDSGoCls2HgZjZFPeOj8mmRhFk1Xqvkbjuz8V1cJk54d3gIJvQt8gD2D6yJQZecnuGWd5K2e2HohvCc8Fc9kBl1300nUJPV+k4tr/A5R/0QfEKOZL1/k5lf1g9CREnrM8LVkGxCgdYMxLQow1uTL+QU67AHRRSp5PhhGX4Rek+01vdYSnJCMaPhSEgcLqDlQkhk6MPsyT91QMXcWmyO+cAZwUPwnRamFepuP4K8k2KVXs/LIJHLELwAZ0ekyaS7CptgOqS7uaSTFG3U+vzFZLEnGvWQ7y9IPNQZ+Dffgh4p3vF4J68y9049sI6Sr5d5wbKkcbm8hdCDHZcv4lnqohquPirLiFQ3q7B17V9krMPu3mz1cg4Ekgcrn/E09NTsxAqD8NcZ7C7ECom9r+X3zkDOxaajW6hu3Az8hGlyylDaMiFfRbBJpTIlxp7jfa7CxikNgNtEKLH9iCzvuSg2vhA=="
+# To avoid json.dumps() differing behavior from one version to other,
+# the JSON payload is hardcoded.
+REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}'
+# Each tuple contains the following entries:
+# region, time, credentials, original_request, signed_request
+TEST_FIXTURES = [
+ # GET request (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with relative path (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/foo/bar/../..",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/foo/bar/../..",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with /./ path (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/./",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/./",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with pointless dot path (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/./foo",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/./foo",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=910e4d6c9abafaf87898e1eb4c929135782ea25bb0279703146455745391e63a",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with utf8 path (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/%E1%88%B4",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/%E1%88%B4",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=8d6634c189aa8c75c2e51e106b6b5121bed103fdb351f7d7d4381c738823af74",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with duplicate query key (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/?foo=Zoo&foo=aha",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/?foo=Zoo&foo=aha",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=be7148d34ebccdc6423b19085378aa0bee970bdc61d144bd1a8c48c33079ab09",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with duplicate out of order query key (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/?foo=b&foo=a",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/?foo=b&foo=a",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=feb926e49e382bec75c9d7dcb2a1b6dc8aa50ca43c25d2bc51143768c0875acc",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with utf8 query (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "GET",
+ "url": "https://host.foo.com/?{}=bar".format(
+ urllib.parse.unquote("%E1%88%B4")
+ ),
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/?{}=bar".format(
+ urllib.parse.unquote("%E1%88%B4")
+ ),
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=6fb359e9a05394cc7074e0feb42573a2601abc0c869a953e8c5c12e4e01f1a8c",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # POST request with sorted headers (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "POST",
+ "url": "https://host.foo.com/",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "ZOO": "zoobar"},
+ },
+ {
+ "url": "https://host.foo.com/",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=b7a95a52518abbca0964a999a880429ab734f35ebbf1235bd79a5de87756dc4a",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ "ZOO": "zoobar",
+ },
+ },
+ ),
+ # POST request with upper case header value from AWS Python test harness.
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "POST",
+ "url": "https://host.foo.com/",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "zoo": "ZOOBAR"},
+ },
+ {
+ "url": "https://host.foo.com/",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=273313af9d0c265c531e11db70bbd653f3ba074c1009239e8559d3987039cad7",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ "zoo": "ZOOBAR",
+ },
+ },
+ ),
+ # POST request with header and no body (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "POST",
+ "url": "https://host.foo.com/",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "p": "phfft"},
+ },
+ {
+ "url": "https://host.foo.com/",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;p, Signature=debf546796015d6f6ded8626f5ce98597c33b47b9164cf6b17b4642036fcb592",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ "p": "phfft",
+ },
+ },
+ ),
+ # POST request with body and no header (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "POST",
+ "url": "https://host.foo.com/",
+ "headers": {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ "data": "foo=bar",
+ },
+ {
+ "url": "https://host.foo.com/",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=content-type;date;host, Signature=5a15b22cf462f047318703b92e6f4f38884e4a7ab7b1d6426ca46a8bd1c26cbc",
+ "host": "host.foo.com",
+ "Content-Type": "application/x-www-form-urlencoded",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ "data": "foo=bar",
+ },
+ ),
+ # POST request with querystring (AWS botocore tests).
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.req
+ # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.sreq
+ (
+ "us-east-1",
+ "2011-09-09T23:36:00Z",
+ {
+ "access_key_id": "AKIDEXAMPLE",
+ "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
+ },
+ {
+ "method": "POST",
+ "url": "https://host.foo.com/?foo=bar",
+ "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
+ },
+ {
+ "url": "https://host.foo.com/?foo=bar",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b6e3b79003ce0743a491606ba1035a804593b0efb1e20a11cba83f8c25a57a92",
+ "host": "host.foo.com",
+ "date": "Mon, 09 Sep 2011 23:36:00 GMT",
+ },
+ },
+ ),
+ # GET request with session token credentials.
+ (
+ "us-east-2",
+ "2020-08-11T06:55:22Z",
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ },
+ {
+ "method": "GET",
+ "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
+ },
+ {
+ "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
+ "method": "GET",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential="
+ + ACCESS_KEY_ID
+ + "/20200811/us-east-2/ec2/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=631ea80cddfaa545fdadb120dc92c9f18166e38a5c47b50fab9fce476e022855",
+ "host": "ec2.us-east-2.amazonaws.com",
+ "x-amz-date": "20200811T065522Z",
+ "x-amz-security-token": TOKEN,
+ },
+ },
+ ),
+ # POST request with session token credentials.
+ (
+ "us-east-2",
+ "2020-08-11T06:55:22Z",
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ },
+ {
+ "method": "POST",
+ "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+ },
+ {
+ "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential="
+ + ACCESS_KEY_ID
+ + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=73452984e4a880ffdc5c392355733ec3f5ba310d5e0609a89244440cadfe7a7a",
+ "host": "sts.us-east-2.amazonaws.com",
+ "x-amz-date": "20200811T065522Z",
+ "x-amz-security-token": TOKEN,
+ },
+ },
+ ),
+ # POST request with computed x-amz-date and no data.
+ (
+ "us-east-2",
+ "2020-08-11T06:55:22Z",
+ {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY},
+ {
+ "method": "POST",
+ "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+ },
+ {
+ "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential="
+ + ACCESS_KEY_ID
+ + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date, Signature=d095ba304919cd0d5570ba8a3787884ee78b860f268ed040ba23831d55536d56",
+ "host": "sts.us-east-2.amazonaws.com",
+ "x-amz-date": "20200811T065522Z",
+ },
+ },
+ ),
+ # POST request with session token and additional headers/data.
+ (
+ "us-east-2",
+ "2020-08-11T06:55:22Z",
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ },
+ {
+ "method": "POST",
+ "url": "https://dynamodb.us-east-2.amazonaws.com/",
+ "headers": {
+ "Content-Type": "application/x-amz-json-1.0",
+ "x-amz-target": "DynamoDB_20120810.CreateTable",
+ },
+ "data": REQUEST_PARAMS,
+ },
+ {
+ "url": "https://dynamodb.us-east-2.amazonaws.com/",
+ "method": "POST",
+ "headers": {
+ "Authorization": "AWS4-HMAC-SHA256 Credential="
+ + ACCESS_KEY_ID
+ + "/20200811/us-east-2/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-security-token;x-amz-target, Signature=fdaa5b9cc9c86b80fe61eaf504141c0b3523780349120f2bd8145448456e0385",
+ "host": "dynamodb.us-east-2.amazonaws.com",
+ "x-amz-date": "20200811T065522Z",
+ "Content-Type": "application/x-amz-json-1.0",
+ "x-amz-target": "DynamoDB_20120810.CreateTable",
+ "x-amz-security-token": TOKEN,
+ },
+ "data": REQUEST_PARAMS,
+ },
+ ),
+]
+
+
+class TestRequestSigner(object):
+ @pytest.mark.parametrize(
+ "region, time, credentials, original_request, signed_request", TEST_FIXTURES
+ )
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_get_request_options(
+ self, utcnow, region, time, credentials, original_request, signed_request
+ ):
+ utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
+ request_signer = aws.RequestSigner(region)
+ actual_signed_request = request_signer.get_request_options(
+ credentials,
+ original_request.get("url"),
+ original_request.get("method"),
+ original_request.get("data"),
+ original_request.get("headers"),
+ )
+
+ assert actual_signed_request == signed_request
+
+ def test_get_request_options_with_missing_scheme_url(self):
+ request_signer = aws.RequestSigner("us-east-2")
+
+ with pytest.raises(ValueError) as excinfo:
+ request_signer.get_request_options(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ },
+ "invalid",
+ "POST",
+ )
+
+ assert excinfo.match(r"Invalid AWS service URL")
+
+ def test_get_request_options_with_invalid_scheme_url(self):
+ request_signer = aws.RequestSigner("us-east-2")
+
+ with pytest.raises(ValueError) as excinfo:
+ request_signer.get_request_options(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ },
+ "http://invalid",
+ "POST",
+ )
+
+ assert excinfo.match(r"Invalid AWS service URL")
+
+ def test_get_request_options_with_missing_hostname_url(self):
+ request_signer = aws.RequestSigner("us-east-2")
+
+ with pytest.raises(ValueError) as excinfo:
+ request_signer.get_request_options(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ },
+ "https://",
+ "POST",
+ )
+
+ assert excinfo.match(r"Invalid AWS service URL")
+
+
+class TestCredentials(object):
+ AWS_REGION = "us-east-2"
+ AWS_ROLE = "gcp-aws-role"
+ AWS_SECURITY_CREDENTIALS_RESPONSE = {
+ "AccessKeyId": ACCESS_KEY_ID,
+ "SecretAccessKey": SECRET_ACCESS_KEY,
+ "Token": TOKEN,
+ }
+ AWS_SIGNATURE_TIME = "2020-08-11T06:55:22Z"
+ CREDENTIAL_SOURCE = {
+ "environment_id": "aws1",
+ "region_url": REGION_URL,
+ "url": SECURITY_CREDS_URL,
+ "regional_cred_verification_url": CRED_VERIFICATION_URL,
+ }
+ SUCCESS_RESPONSE = {
+ "access_token": "ACCESS_TOKEN",
+ "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "token_type": "Bearer",
+ "expires_in": 3600,
+ "scope": " ".join(SCOPES),
+ }
+
+ @classmethod
+ def make_serialized_aws_signed_request(
+ cls,
+ aws_security_credentials,
+ region_name="us-east-2",
+ url="https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+ ):
+ """Utility to generate serialize AWS signed requests.
+ This makes it easy to assert generated subject tokens based on the
+ provided AWS security credentials, regions and AWS STS endpoint.
+ """
+ request_signer = aws.RequestSigner(region_name)
+ signed_request = request_signer.get_request_options(
+ aws_security_credentials, url, "POST"
+ )
+ reformatted_signed_request = {
+ "url": signed_request.get("url"),
+ "method": signed_request.get("method"),
+ "headers": [
+ {
+ "key": "Authorization",
+ "value": signed_request.get("headers").get("Authorization"),
+ },
+ {"key": "host", "value": signed_request.get("headers").get("host")},
+ {
+ "key": "x-amz-date",
+ "value": signed_request.get("headers").get("x-amz-date"),
+ },
+ ],
+ }
+ # Include security token if available.
+ if "security_token" in aws_security_credentials:
+ reformatted_signed_request.get("headers").append(
+ {
+ "key": "x-amz-security-token",
+ "value": signed_request.get("headers").get("x-amz-security-token"),
+ }
+ )
+ # Append x-goog-cloud-target-resource header.
+ reformatted_signed_request.get("headers").append(
+ {"key": "x-goog-cloud-target-resource", "value": AUDIENCE}
+ ),
+ return urllib.parse.quote(
+ json.dumps(
+ reformatted_signed_request, separators=(",", ":"), sort_keys=True
+ )
+ )
+
+ @classmethod
+ def make_mock_request(
+ cls,
+ region_status=None,
+ region_name=None,
+ role_status=None,
+ role_name=None,
+ security_credentials_status=None,
+ security_credentials_data=None,
+ token_status=None,
+ token_data=None,
+ impersonation_status=None,
+ impersonation_data=None,
+ ):
+ """Utility function to generate a mock HTTP request object.
+ This will facilitate testing various edge cases by specify how the
+ various endpoints will respond while generating a Google Access token
+ in an AWS environment.
+ """
+ responses = []
+ if region_status:
+ # AWS region request.
+ region_response = mock.create_autospec(transport.Response, instance=True)
+ region_response.status = region_status
+ if region_name:
+ region_response.data = "{}b".format(region_name).encode("utf-8")
+ responses.append(region_response)
+
+ if role_status:
+ # AWS role name request.
+ role_response = mock.create_autospec(transport.Response, instance=True)
+ role_response.status = role_status
+ if role_name:
+ role_response.data = role_name.encode("utf-8")
+ responses.append(role_response)
+
+ if security_credentials_status:
+ # AWS security credentials request.
+ security_credentials_response = mock.create_autospec(
+ transport.Response, instance=True
+ )
+ security_credentials_response.status = security_credentials_status
+ if security_credentials_data:
+ security_credentials_response.data = json.dumps(
+ security_credentials_data
+ ).encode("utf-8")
+ responses.append(security_credentials_response)
+
+ if token_status:
+ # GCP token exchange request.
+ token_response = mock.create_autospec(transport.Response, instance=True)
+ token_response.status = token_status
+ token_response.data = json.dumps(token_data).encode("utf-8")
+ responses.append(token_response)
+
+ if impersonation_status:
+ # Service account impersonation request.
+ impersonation_response = mock.create_autospec(
+ transport.Response, instance=True
+ )
+ impersonation_response.status = impersonation_status
+ impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
+ responses.append(impersonation_response)
+
+ request = mock.create_autospec(transport.Request)
+ request.side_effect = responses
+
+ return request
+
+ @classmethod
+ def make_credentials(
+ cls,
+ credential_source,
+ client_id=None,
+ client_secret=None,
+ quota_project_id=None,
+ scopes=None,
+ default_scopes=None,
+ service_account_impersonation_url=None,
+ ):
+ return aws.Credentials(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=service_account_impersonation_url,
+ credential_source=credential_source,
+ client_id=client_id,
+ client_secret=client_secret,
+ quota_project_id=quota_project_id,
+ scopes=scopes,
+ default_scopes=default_scopes,
+ )
+
+ @classmethod
+ def assert_aws_metadata_request_kwargs(cls, request_kwargs, url, headers=None):
+ assert request_kwargs["url"] == url
+ # All used AWS metadata server endpoints use GET HTTP method.
+ assert request_kwargs["method"] == "GET"
+ if headers:
+ assert request_kwargs["headers"] == headers
+ else:
+ assert "headers" not in request_kwargs
+ # None of the endpoints used require any data in request.
+ assert "body" not in request_kwargs
+
+ @classmethod
+ def assert_token_request_kwargs(
+ cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
+ ):
+ assert request_kwargs["url"] == token_url
+ assert request_kwargs["method"] == "POST"
+ assert request_kwargs["headers"] == headers
+ assert request_kwargs["body"] is not None
+ body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
+ assert len(body_tuples) == len(request_data.keys())
+ for (k, v) in body_tuples:
+ assert v.decode("utf-8") == request_data[k.decode("utf-8")]
+
+ @classmethod
+ def assert_impersonation_request_kwargs(
+ cls,
+ request_kwargs,
+ headers,
+ request_data,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ ):
+ assert request_kwargs["url"] == service_account_impersonation_url
+ assert request_kwargs["method"] == "POST"
+ assert request_kwargs["headers"] == headers
+ assert request_kwargs["body"] is not None
+ body_json = json.loads(request_kwargs["body"].decode("utf-8"))
+ assert body_json == request_data
+
+ @mock.patch.object(aws.Credentials, "__init__", return_value=None)
+ def test_from_info_full_options(self, mock_init):
+ credentials = aws.Credentials.from_info(
+ {
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ "quota_project_id": QUOTA_PROJECT_ID,
+ "credential_source": self.CREDENTIAL_SOURCE,
+ }
+ )
+
+ # Confirm aws.Credentials instance initialized with the expected parameters.
+ assert isinstance(credentials, aws.Credentials)
+ mock_init.assert_called_once_with(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ credential_source=self.CREDENTIAL_SOURCE,
+ quota_project_id=QUOTA_PROJECT_ID,
+ )
+
+ @mock.patch.object(aws.Credentials, "__init__", return_value=None)
+ def test_from_info_required_options_only(self, mock_init):
+ credentials = aws.Credentials.from_info(
+ {
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE,
+ }
+ )
+
+ # Confirm aws.Credentials instance initialized with the expected parameters.
+ assert isinstance(credentials, aws.Credentials)
+ mock_init.assert_called_once_with(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ client_id=None,
+ client_secret=None,
+ credential_source=self.CREDENTIAL_SOURCE,
+ quota_project_id=None,
+ )
+
+ @mock.patch.object(aws.Credentials, "__init__", return_value=None)
+ def test_from_file_full_options(self, mock_init, tmpdir):
+ info = {
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ "quota_project_id": QUOTA_PROJECT_ID,
+ "credential_source": self.CREDENTIAL_SOURCE,
+ }
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(info))
+ credentials = aws.Credentials.from_file(str(config_file))
+
+ # Confirm aws.Credentials instance initialized with the expected parameters.
+ assert isinstance(credentials, aws.Credentials)
+ mock_init.assert_called_once_with(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ credential_source=self.CREDENTIAL_SOURCE,
+ quota_project_id=QUOTA_PROJECT_ID,
+ )
+
+ @mock.patch.object(aws.Credentials, "__init__", return_value=None)
+ def test_from_file_required_options_only(self, mock_init, tmpdir):
+ info = {
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE,
+ }
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(info))
+ credentials = aws.Credentials.from_file(str(config_file))
+
+ # Confirm aws.Credentials instance initialized with the expected parameters.
+ assert isinstance(credentials, aws.Credentials)
+ mock_init.assert_called_once_with(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ client_id=None,
+ client_secret=None,
+ credential_source=self.CREDENTIAL_SOURCE,
+ quota_project_id=None,
+ )
+
+ def test_constructor_invalid_credential_source(self):
+ # Provide invalid credential source.
+ credential_source = {"unsupported": "value"}
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"No valid AWS 'credential_source' provided")
+
+ def test_constructor_invalid_environment_id(self):
+ # Provide invalid environment_id.
+ credential_source = self.CREDENTIAL_SOURCE.copy()
+ credential_source["environment_id"] = "azure1"
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"No valid AWS 'credential_source' provided")
+
+ def test_constructor_missing_cred_verification_url(self):
+ # regional_cred_verification_url is a required field.
+ credential_source = self.CREDENTIAL_SOURCE.copy()
+ credential_source.pop("regional_cred_verification_url")
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"No valid AWS 'credential_source' provided")
+
+ def test_constructor_invalid_environment_id_version(self):
+ # Provide an unsupported version.
+ credential_source = self.CREDENTIAL_SOURCE.copy()
+ credential_source["environment_id"] = "aws3"
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"aws version '3' is not supported in the current build.")
+
+ def test_info(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE.copy()
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE,
+ }
+
+ def test_retrieve_subject_token_missing_region_url(self):
+ # When AWS_REGION envvar is not available, region_url is required for
+ # determining the current AWS region.
+ credential_source = self.CREDENTIAL_SOURCE.copy()
+ credential_source.pop("region_url")
+ credentials = self.make_credentials(credential_source=credential_source)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.retrieve_subject_token(None)
+
+ assert excinfo.match(r"Unable to determine AWS region")
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
+ self, utcnow
+ ):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(request)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+ # Assert region request.
+ self.assert_aws_metadata_request_kwargs(
+ request.call_args_list[0][1], REGION_URL
+ )
+ # Assert role request.
+ self.assert_aws_metadata_request_kwargs(
+ request.call_args_list[1][1], SECURITY_CREDS_URL
+ )
+ # Assert security credentials request.
+ self.assert_aws_metadata_request_kwargs(
+ request.call_args_list[2][1],
+ "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
+ {"Content-Type": "application/json"},
+ )
+
+ # Retrieve subject_token again. Region should not be queried again.
+ new_request = self.make_mock_request(
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
+ )
+
+ credentials.retrieve_subject_token(new_request)
+
+ # Only 2 requests should be sent as the region is cached.
+ assert len(new_request.call_args_list) == 2
+ # Assert role request.
+ self.assert_aws_metadata_request_kwargs(
+ new_request.call_args_list[0][1], SECURITY_CREDS_URL
+ )
+ # Assert security credentials request.
+ self.assert_aws_metadata_request_kwargs(
+ new_request.call_args_list[1][1],
+ "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
+ {"Content-Type": "application/json"},
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_permanent_creds_no_environment_vars(
+ self, utcnow
+ ):
+ # Simualte a permanent credential without a session token is
+ # returned by the security-credentials endpoint.
+ security_creds_response = self.AWS_SECURITY_CREDENTIALS_RESPONSE.copy()
+ security_creds_response.pop("Token")
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=security_creds_response,
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(request)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_environment_vars(self, utcnow, monkeypatch):
+ monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
+ monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
+ monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
+ monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(None)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_environment_vars_with_default_region(
+ self, utcnow, monkeypatch
+ ):
+ monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
+ monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
+ monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
+ monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, self.AWS_REGION)
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(None)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_environment_vars_with_both_regions_set(
+ self, utcnow, monkeypatch
+ ):
+ monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
+ monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
+ monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
+ monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, "Malformed AWS Region")
+ # This test makes sure that the AWS_REGION gets used over AWS_DEFAULT_REGION,
+ # So, AWS_DEFAULT_REGION is set to something that would cause the test to fail,
+ # And AWS_REGION is set to the a valid value, and it should succeed
+ monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(None)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_environment_vars_no_session_token(
+ self, utcnow, monkeypatch
+ ):
+ monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
+ monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
+ monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(None)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_environment_vars_except_region(
+ self, utcnow, monkeypatch
+ ):
+ monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
+ monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
+ monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ # Region will be queried since it is not found in envvars.
+ request = self.make_mock_request(
+ region_status=http_client.OK, region_name=self.AWS_REGION
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ subject_token = credentials.retrieve_subject_token(request)
+
+ assert subject_token == self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+
+ def test_retrieve_subject_token_error_determining_aws_region(self):
+ # Simulate error in retrieving the AWS region.
+ request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.retrieve_subject_token(request)
+
+ assert excinfo.match(r"Unable to retrieve AWS region")
+
+ def test_retrieve_subject_token_error_determining_aws_role(self):
+ # Simulate error in retrieving the AWS role name.
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.BAD_REQUEST,
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.retrieve_subject_token(request)
+
+ assert excinfo.match(r"Unable to retrieve AWS role name")
+
+ def test_retrieve_subject_token_error_determining_security_creds_url(self):
+ # Simulate the security-credentials url is missing. This is needed for
+ # determining the AWS security credentials when not found in envvars.
+ credential_source = self.CREDENTIAL_SOURCE.copy()
+ credential_source.pop("url")
+ request = self.make_mock_request(
+ region_status=http_client.OK, region_name=self.AWS_REGION
+ )
+ credentials = self.make_credentials(credential_source=credential_source)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.retrieve_subject_token(request)
+
+ assert excinfo.match(
+ r"Unable to determine the AWS metadata server security credentials endpoint"
+ )
+
+ def test_retrieve_subject_token_error_determining_aws_security_creds(self):
+ # Simulate error in retrieving the AWS security credentials.
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.BAD_REQUEST,
+ )
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.retrieve_subject_token(request)
+
+ assert excinfo.match(r"Unable to retrieve AWS security credentials")
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ expected_subject_token = self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic " + BASIC_AUTH_ENCODING,
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "scope": " ".join(SCOPES),
+ "subject_token": expected_subject_token,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
+ token_status=http_client.OK,
+ token_data=self.SUCCESS_RESPONSE,
+ )
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ credential_source=self.CREDENTIAL_SOURCE,
+ quota_project_id=QUOTA_PROJECT_ID,
+ scopes=SCOPES,
+ # Default scopes should be ignored.
+ default_scopes=["ignored"],
+ )
+
+ credentials.refresh(request)
+
+ assert len(request.call_args_list) == 4
+ # Fourth request should be sent to GCP STS endpoint.
+ self.assert_token_request_kwargs(
+ request.call_args_list[3][1], token_headers, token_request_data
+ )
+ assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
+ assert credentials.quota_project_id == QUOTA_PROJECT_ID
+ assert credentials.scopes == SCOPES
+ assert credentials.default_scopes == ["ignored"]
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ expected_subject_token = self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic " + BASIC_AUTH_ENCODING,
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "scope": " ".join(SCOPES),
+ "subject_token": expected_subject_token,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
+ token_status=http_client.OK,
+ token_data=self.SUCCESS_RESPONSE,
+ )
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ credential_source=self.CREDENTIAL_SOURCE,
+ quota_project_id=QUOTA_PROJECT_ID,
+ scopes=None,
+ # Default scopes should be used since user specified scopes are none.
+ default_scopes=SCOPES,
+ )
+
+ credentials.refresh(request)
+
+ assert len(request.call_args_list) == 4
+ # Fourth request should be sent to GCP STS endpoint.
+ self.assert_token_request_kwargs(
+ request.call_args_list[3][1], token_headers, token_request_data
+ )
+ assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
+ assert credentials.quota_project_id == QUOTA_PROJECT_ID
+ assert credentials.scopes is None
+ assert credentials.default_scopes == SCOPES
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
+ ).isoformat("T") + "Z"
+ expected_subject_token = self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic " + BASIC_AUTH_ENCODING,
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "scope": "https://www.googleapis.com/auth/iam",
+ "subject_token": expected_subject_token,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ }
+ # Service account impersonation request/response.
+ impersonation_response = {
+ "accessToken": "SA_ACCESS_TOKEN",
+ "expireTime": expire_time,
+ }
+ impersonation_headers = {
+ "Content-Type": "application/json",
+ "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
+ "x-goog-user-project": QUOTA_PROJECT_ID,
+ }
+ impersonation_request_data = {
+ "delegates": None,
+ "scope": SCOPES,
+ "lifetime": "3600s",
+ }
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
+ token_status=http_client.OK,
+ token_data=self.SUCCESS_RESPONSE,
+ impersonation_status=http_client.OK,
+ impersonation_data=impersonation_response,
+ )
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ credential_source=self.CREDENTIAL_SOURCE,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ quota_project_id=QUOTA_PROJECT_ID,
+ scopes=SCOPES,
+ # Default scopes should be ignored.
+ default_scopes=["ignored"],
+ )
+
+ credentials.refresh(request)
+
+ assert len(request.call_args_list) == 5
+ # Fourth request should be sent to GCP STS endpoint.
+ self.assert_token_request_kwargs(
+ request.call_args_list[3][1], token_headers, token_request_data
+ )
+ # Fifth request should be sent to iamcredentials endpoint for service
+ # account impersonation.
+ self.assert_impersonation_request_kwargs(
+ request.call_args_list[4][1],
+ impersonation_headers,
+ impersonation_request_data,
+ )
+ assert credentials.token == impersonation_response["accessToken"]
+ assert credentials.quota_project_id == QUOTA_PROJECT_ID
+ assert credentials.scopes == SCOPES
+ assert credentials.default_scopes == ["ignored"]
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
+ ).isoformat("T") + "Z"
+ expected_subject_token = self.make_serialized_aws_signed_request(
+ {
+ "access_key_id": ACCESS_KEY_ID,
+ "secret_access_key": SECRET_ACCESS_KEY,
+ "security_token": TOKEN,
+ }
+ )
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic " + BASIC_AUTH_ENCODING,
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "scope": "https://www.googleapis.com/auth/iam",
+ "subject_token": expected_subject_token,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ }
+ # Service account impersonation request/response.
+ impersonation_response = {
+ "accessToken": "SA_ACCESS_TOKEN",
+ "expireTime": expire_time,
+ }
+ impersonation_headers = {
+ "Content-Type": "application/json",
+ "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
+ "x-goog-user-project": QUOTA_PROJECT_ID,
+ }
+ impersonation_request_data = {
+ "delegates": None,
+ "scope": SCOPES,
+ "lifetime": "3600s",
+ }
+ request = self.make_mock_request(
+ region_status=http_client.OK,
+ region_name=self.AWS_REGION,
+ role_status=http_client.OK,
+ role_name=self.AWS_ROLE,
+ security_credentials_status=http_client.OK,
+ security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
+ token_status=http_client.OK,
+ token_data=self.SUCCESS_RESPONSE,
+ impersonation_status=http_client.OK,
+ impersonation_data=impersonation_response,
+ )
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ credential_source=self.CREDENTIAL_SOURCE,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ quota_project_id=QUOTA_PROJECT_ID,
+ scopes=None,
+ # Default scopes should be used since user specified scopes are none.
+ default_scopes=SCOPES,
+ )
+
+ credentials.refresh(request)
+
+ assert len(request.call_args_list) == 5
+ # Fourth request should be sent to GCP STS endpoint.
+ self.assert_token_request_kwargs(
+ request.call_args_list[3][1], token_headers, token_request_data
+ )
+ # Fifth request should be sent to iamcredentials endpoint for service
+ # account impersonation.
+ self.assert_impersonation_request_kwargs(
+ request.call_args_list[4][1],
+ impersonation_headers,
+ impersonation_request_data,
+ )
+ assert credentials.token == impersonation_response["accessToken"]
+ assert credentials.quota_project_id == QUOTA_PROJECT_ID
+ assert credentials.scopes is None
+ assert credentials.default_scopes == SCOPES
+
+ def test_refresh_with_retrieve_subject_token_error(self):
+ request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
+ credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(r"Unable to retrieve AWS region")