aboutsummaryrefslogtreecommitdiff
path: root/tests/oauth2/test__client.py
diff options
context:
space:
mode:
authorchojoyce <chojoyce@google.com>2022-01-05 04:25:17 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2022-01-05 04:25:17 +0000
commitca3e33f7be9f05a4ac2dab617d3c71f12d4d8482 (patch)
treec5f0cf22de9f5323689a83dfef06907864fdd028 /tests/oauth2/test__client.py
parentd75f43fb5f091b4542704d5ebed57e9dc920fae5 (diff)
parent8c673285f7eda845e99cc693855307b589b4ce7f (diff)
downloadgoogle-auth-library-python-ca3e33f7be9f05a4ac2dab617d3c71f12d4d8482.tar.gz
Merge platform/external/python/google-auth-library-python v2.3.3 am: 8c673285f7
Original change: https://android-review.googlesource.com/c/platform/external/python/google-auth-library-python/+/1930534 Change-Id: I63201633a4b26c75d30d22e9bc937c3c2ca7a7d5
Diffstat (limited to 'tests/oauth2/test__client.py')
-rw-r--r--tests/oauth2/test__client.py329
1 files changed, 329 insertions, 0 deletions
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
new file mode 100644
index 0000000..54686df
--- /dev/null
+++ b/tests/oauth2/test__client.py
@@ -0,0 +1,329 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+
+import mock
+import pytest
+import six
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import _helpers
+from google.auth import crypt
+from google.auth import exceptions
+from google.auth import jwt
+from google.auth import transport
+from google.oauth2 import _client
+
+
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
+
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
+
+SCOPES_AS_LIST = [
+ "https://www.googleapis.com/auth/pubsub",
+ "https://www.googleapis.com/auth/logging.write",
+]
+SCOPES_AS_STRING = (
+ "https://www.googleapis.com/auth/pubsub"
+ " https://www.googleapis.com/auth/logging.write"
+)
+
+
+def test__handle_error_response():
+ response_data = {"error": "help", "error_description": "I'm alive"}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data)
+
+ assert excinfo.match(r"help: I\'m alive")
+
+
+def test__handle_error_response_non_json():
+ response_data = {"foo": "bar"}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data)
+
+ assert excinfo.match(r"{\"foo\": \"bar\"}")
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test__parse_expiry(unused_utcnow):
+ result = _client._parse_expiry({"expires_in": 500})
+ assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
+
+
+def test__parse_expiry_none():
+ assert _client._parse_expiry({}) is None
+
+
+def make_request(response_data, status=http_client.OK):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+ response.data = json.dumps(response_data).encode("utf-8")
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
+
+
+def test__token_endpoint_request():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request, "http://example.com", {"test": "params"}
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ body="test=params".encode("utf-8"),
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
+def test__token_endpoint_request_use_json():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request,
+ "http://example.com",
+ {"test": "params"},
+ access_token="access_token",
+ use_json=True,
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Bearer access_token",
+ },
+ body=b'{"test": "params"}',
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
+def test__token_endpoint_request_error():
+ request = make_request({}, status=http_client.BAD_REQUEST)
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(request, "http://example.com", {})
+
+
+def test__token_endpoint_request_internal_failure_error():
+ request = make_request(
+ {"error_description": "internal_failure"}, status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error_description": "internal_failure"}
+ )
+
+ request = make_request(
+ {"error": "internal_failure"}, status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error": "internal_failure"}
+ )
+
+
+def verify_request_params(request, params):
+ request_body = request.call_args[1]["body"].decode("utf-8")
+ request_params = urllib.parse.parse_qs(request_body)
+
+ for key, value in six.iteritems(params):
+ assert request_params[key][0] == value
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_jwt_grant(utcnow):
+ request = make_request(
+ {"access_token": "token", "expires_in": 500, "extra": "data"}
+ )
+
+ token, expiry, extra_data = _client.jwt_grant(
+ request, "http://example.com", "assertion_value"
+ )
+
+ # Check request call
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
+
+ # Check result
+ assert token == "token"
+ assert expiry == utcnow() + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+def test_jwt_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.jwt_grant(request, "http://example.com", "assertion_value")
+
+
+def test_id_token_jwt_grant():
+ now = _helpers.utcnow()
+ id_token_expiry = _helpers.datetime_to_secs(now)
+ id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
+ request = make_request({"id_token": id_token, "extra": "data"})
+
+ token, expiry, extra_data = _client.id_token_jwt_grant(
+ request, "http://example.com", "assertion_value"
+ )
+
+ # Check request call
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
+
+ # Check result
+ assert token == id_token
+ # JWT does not store microseconds
+ now = now.replace(microsecond=0)
+ assert expiry == now
+ assert extra_data["extra"] == "data"
+
+
+def test_id_token_jwt_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.id_token_jwt_grant(request, "http://example.com", "assertion_value")
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_refresh_grant(unused_utcnow):
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ rapt_token="rapt_token",
+ )
+
+ # Check request call
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "rapt": "rapt_token",
+ },
+ )
+
+ # Check result
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_refresh_grant_with_scopes(unused_utcnow):
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ "scope": SCOPES_AS_STRING,
+ }
+ )
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ SCOPES_AS_LIST,
+ )
+
+ # Check request call.
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "scope": SCOPES_AS_STRING,
+ },
+ )
+
+ # Check result.
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+def test_refresh_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.refresh_grant(
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )