aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsalrashid123 <salrashid123@gmail.com>2019-08-07 14:31:33 -0700
committerBu Sun Kim <8822365+busunkim96@users.noreply.github.com>2019-08-07 14:31:33 -0700
commit7a8641a7f0718c0dce413436f23691e8590face1 (patch)
treec098fa332aab95ffa59d4d846c77d46f589bdeee
parent1322d896ba725b8d73fd7ac4793601d9f574a839 (diff)
downloadgoogle-auth-library-python-7a8641a7f0718c0dce413436f23691e8590face1.tar.gz
Add support for imersonated_credentials.Sign, IDToken (#348)
-rw-r--r--docs/index.rst1
-rw-r--r--docs/user-guide.rst80
-rw-r--r--google/auth/impersonated_credentials.py125
-rw-r--r--tests/test_impersonated_credentials.py223
4 files changed, 423 insertions, 6 deletions
diff --git a/docs/index.rst b/docs/index.rst
index 1eb3d86..4287c3d 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -14,6 +14,7 @@ also provides integration with several HTTP libraries.
- Support for Google :func:`Application Default Credentials <google.auth.default>`.
- Support for signing and verifying :mod:`JWTs <google.auth.jwt>`.
+- Support for creating `Google ID Tokens <user-guide.html#identity-tokens>`__.
- Support for verifying and decoding :mod:`ID Tokens <google.oauth2.id_token>`.
- Support for Google :mod:`Service Account credentials <google.oauth2.service_account>`.
- Support for Google :mod:`Impersonated Credentials <google.auth.impersonated_credentials>`.
diff --git a/docs/user-guide.rst b/docs/user-guide.rst
index d43163f..0abe160 100644
--- a/docs/user-guide.rst
+++ b/docs/user-guide.rst
@@ -241,13 +241,91 @@ the "Service Account Token Creator" IAM role. ::
client = storage.Client(credentials=target_credentials)
buckets = client.list_buckets(project='your_project')
for bucket in buckets:
- print bucket.name
+ print(bucket.name)
In the example above `source_credentials` does not have direct access to list buckets
in the target project. Using `ImpersonatedCredentials` will allow the source_credentials
to assume the identity of a target_principal that does have access.
+Identity Tokens
++++++++++++++++
+
+`Google OpenID Connect`_ tokens are avaiable through :mod:`Service Account <google.oauth2.service_account>`,
+:mod:`Impersonated <google.auth.impersonated_credentials>`,
+and :mod:`Compute Engine <google.auth.compute_engine>`. These tokens can be used to
+authenticate against `Cloud Functions`_, `Cloud Run`_, a user service behind
+`Identity Aware Proxy`_ or any other service capable of verifying a `Google ID Token`_.
+
+ServiceAccount ::
+
+ from google.oauth2 import service_account
+
+ target_audience = 'https://example.com'
+
+ creds = service_account.IDTokenCredentials.from_service_account_file(
+ '/path/to/svc.json',
+ target_audience=target_audience)
+
+
+Compute ::
+
+ from google.auth import compute_engine
+ import google.auth.transport.requests
+
+ target_audience = 'https://example.com'
+
+ request = google.auth.transport.requests.Request()
+ creds = compute_engine.IDTokenCredentials(request,
+ target_audience=target_audience)
+
+Impersonated ::
+
+ from google.auth import impersonated_credentials
+
+ # get target_credentials from a source_credential
+
+ target_audience = 'https://example.com'
+
+ creds = impersonated_credentials.IDTokenCredentials(
+ target_credentials,
+ target_audience=target_audience)
+
+IDToken verification can be done for various type of IDTokens using the :class:`google.oauth2.id_token` module
+
+A sample end-to-end flow using an ID Token against a Cloud Run endpoint maybe ::
+
+ from google.oauth2 import id_token
+ from google.oauth2 import service_account
+ import google.auth
+ import google.auth.transport.requests
+ from google.auth.transport.requests import AuthorizedSession
+
+ target_audience = 'https://your-cloud-run-app.a.run.app'
+ url = 'https://your-cloud-run-app.a.run.app'
+
+ creds = service_account.IDTokenCredentials.from_service_account_file(
+ '/path/to/svc.json', target_audience=target_audience)
+
+ authed_session = AuthorizedSession(creds)
+
+ # make authenticated request and print the response, status_code
+ resp = authed_session.get(url)
+ print(resp.status_code)
+ print(resp.text)
+
+ # to verify an ID Token
+ request = google.auth.transport.requests.Request()
+ token = creds.token
+ print(token)
+ print(id_token.verify_token(token,request))
+
+.. _Cloud Functions: https://cloud.google.com/functions/
+.. _Cloud Run: https://cloud.google.com/run/
+.. _Identity Aware Proxy: https://cloud.google.com/iap/
+.. _Google OpenID Connect: https://developers.google.com/identity/protocols/OpenIDConnect
+.. _Google ID Token: https://developers.google.com/identity/protocols/OpenIDConnect#validatinganidtoken
+
Making authenticated requests
-----------------------------
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
index 32dfe83..bb2bbf2 100644
--- a/google/auth/impersonated_credentials.py
+++ b/google/auth/impersonated_credentials.py
@@ -25,6 +25,7 @@ service account.
https://cloud.google.com/iam/credentials/reference/rest/
"""
+import base64
import copy
from datetime import datetime
import json
@@ -35,6 +36,8 @@ from six.moves import http_client
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
+from google.auth import jwt
+from google.auth.transport.requests import AuthorizedSession
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
@@ -43,8 +46,18 @@ _IAM_SCOPE = ['https://www.googleapis.com/auth/iam']
_IAM_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
'/serviceAccounts/{}:generateAccessToken')
+_IAM_SIGN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
+ '/serviceAccounts/{}:signBlob')
+
+_IAM_IDTOKEN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/' +
+ 'projects/-/serviceAccounts/{}:generateIdToken')
+
_REFRESH_ERROR = 'Unable to acquire impersonated credentials'
+_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
+
+_DEFAULT_TOKEN_URI = 'https://oauth2.googleapis.com/token'
+
def _make_iam_token_request(request, principal, headers, body):
"""Makes a request to the Google Cloud IAM service for an access token.
@@ -94,7 +107,7 @@ def _make_iam_token_request(request, principal, headers, body):
six.raise_from(new_exc, caught_exc)
-class Credentials(credentials.Credentials):
+class Credentials(credentials.Credentials, credentials.Signing):
"""This module defines impersonated credentials which are essentially
impersonated identities.
@@ -153,7 +166,7 @@ class Credentials(credentials.Credentials):
client = storage.Client(credentials=target_credentials)
buckets = client.list_buckets(project='your_project')
for bucket in buckets:
- print bucket.name
+ print(bucket.name)
"""
def __init__(self, source_credentials, target_principal,
@@ -172,7 +185,8 @@ class Credentials(credentials.Credentials):
granted to the prceeding identity. For example, if set to
[serviceAccountB, serviceAccountC], the source_credential
must have the Token Creator role on serviceAccountB.
- serviceAccountB must have the Token Creator on serviceAccountC.
+ serviceAccountB must have the Token Creator on
+ serviceAccountC.
Finally, C must have Token Creator on target_principal.
If left unset, source_credential must have that role on
target_principal.
@@ -229,3 +243,108 @@ class Credentials(credentials.Credentials):
principal=self._target_principal,
headers=headers,
body=body)
+
+ def sign_bytes(self, message):
+
+ iam_sign_endpoint = _IAM_SIGN_ENDPOINT.format(self._target_principal)
+
+ body = {
+ "payload": base64.b64encode(message),
+ "delegates": self._delegates
+ }
+
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+
+ authed_session = AuthorizedSession(self._source_credentials)
+
+ response = authed_session.post(
+ url=iam_sign_endpoint,
+ headers=headers,
+ json=body)
+
+ return base64.b64decode(response.json()['signedBlob'])
+
+ @property
+ def signer_email(self):
+ return self._target_principal
+
+ @property
+ def service_account_email(self):
+ return self._target_principal
+
+ @property
+ def signer(self):
+ return self
+
+
+class IDTokenCredentials(credentials.Credentials):
+ """Open ID Connect ID Token-based service account credentials.
+
+ """
+ def __init__(self, target_credentials,
+ target_audience=None, include_email=False):
+ """
+ Args:
+ target_credentials (google.auth.Credentials): The target
+ credential used as to acquire the id tokens for.
+ target_audience (string): Audience to issue the token for.
+ include_email (bool): Include email in IdToken
+ """
+ super(IDTokenCredentials, self).__init__()
+
+ if not isinstance(target_credentials,
+ Credentials):
+ raise exceptions.GoogleAuthError("Provided Credential must be "
+ "impersonated_credentials")
+ self._target_credentials = target_credentials
+ self._target_audience = target_audience
+ self._include_email = include_email
+
+ def from_credentials(self, target_credentials,
+ target_audience=None):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=target_audience)
+
+ def with_target_audience(self, target_audience):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=target_audience)
+
+ def with_include_email(self, include_email):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=self._target_audience,
+ include_email=include_email)
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+
+ iam_sign_endpoint = _IAM_IDTOKEN_ENDPOINT.format(self.
+ _target_credentials.
+ signer_email)
+
+ body = {
+ "audience": self._target_audience,
+ "delegates": self._target_credentials._delegates,
+ "includeEmail": self._include_email
+ }
+
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+
+ authed_session = AuthorizedSession(self._target_credentials.
+ _source_credentials)
+
+ response = authed_session.post(
+ url=iam_sign_endpoint,
+ headers=headers,
+ data=json.dumps(body).encode('utf-8'))
+
+ id_token = response.json()['token']
+ self.token = id_token
+ self.expiry = datetime.fromtimestamp(jwt.decode(id_token,
+ verify=False)['exp'])
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 68a2af8..9945401 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -35,6 +35,14 @@ with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+ID_TOKEN_DATA = ('eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew'
+ 'Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc'
+ 'zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle'
+ 'HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L'
+ 'y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN'
+ 'zA4NTY4In0.redacted')
+ID_TOKEN_EXPIRY = 1564475051
+
with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
@@ -52,6 +60,38 @@ def mock_donor_credentials():
yield grant
+class MockResponse:
+ def __init__(self, json_data, status_code):
+ self.json_data = json_data
+ self.status_code = status_code
+
+ def json(self):
+ return self.json_data
+
+
+@pytest.fixture
+def mock_authorizedsession_sign():
+ with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
+ autospec=True) as auth_session:
+ data = {
+ "keyId": "1",
+ "signedBlob": "c2lnbmF0dXJl"
+ }
+ auth_session.return_value = MockResponse(data, http_client.OK)
+ yield auth_session
+
+
+@pytest.fixture
+def mock_authorizedsession_idtoken():
+ with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
+ autospec=True) as auth_session:
+ data = {
+ "token": ID_TOKEN_DATA
+ }
+ auth_session.return_value = MockResponse(data, http_client.OK)
+ yield auth_session
+
+
class TestImpersonatedCredentials(object):
SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
@@ -62,10 +102,12 @@ class TestImpersonatedCredentials(object):
SOURCE_CREDENTIALS = service_account.Credentials(
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
- def make_credentials(self, lifetime=LIFETIME):
+ def make_credentials(self, lifetime=LIFETIME,
+ target_principal=TARGET_PRINCIPAL):
+
return Credentials(
source_credentials=self.SOURCE_CREDENTIALS,
- target_principal=self.TARGET_PRINCIPAL,
+ target_principal=target_principal,
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
lifetime=lifetime)
@@ -176,3 +218,180 @@ class TestImpersonatedCredentials(object):
def test_expired(self):
credentials = self.make_credentials(lifetime=None)
assert credentials.expired
+
+ def test_signer(self):
+ credentials = self.make_credentials()
+ assert isinstance(credentials.signer,
+ impersonated_credentials.Credentials)
+
+ def test_signer_email(self):
+ credentials = self.make_credentials(
+ target_principal=self.TARGET_PRINCIPAL)
+ assert credentials.signer_email == self.TARGET_PRINCIPAL
+
+ def test_service_account_email(self):
+ credentials = self.make_credentials(
+ target_principal=self.TARGET_PRINCIPAL)
+ assert credentials.service_account_email == self.TARGET_PRINCIPAL
+
+ def test_sign_bytes(self, mock_donor_credentials,
+ mock_authorizedsession_sign):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ token_response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ response = mock.create_autospec(transport.Response, instance=False)
+ response.status = http_client.OK
+ response.data = _helpers.to_bytes(json.dumps(token_response_body))
+
+ request = mock.create_autospec(transport.Request, instance=False)
+ request.return_value = response
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ signature = credentials.sign_bytes(b'signed bytes')
+ assert signature == b'signature'
+
+ def test_id_token_success(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials, target_audience=target_audience)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(
+ ID_TOKEN_EXPIRY)
+
+ def test_id_token_from_credential(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials, target_audience=target_audience)
+ id_creds = id_creds.from_credentials(target_credentials=credentials)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA
+
+ def test_id_token_with_target_audience(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials)
+ id_creds = id_creds.with_target_audience(
+ target_audience=target_audience)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(
+ ID_TOKEN_EXPIRY)
+
+ def test_id_token_invalid_cred(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = None
+
+ with pytest.raises(exceptions.GoogleAuthError) as excinfo:
+ impersonated_credentials.IDTokenCredentials(credentials)
+
+ assert excinfo.match('Provided Credential must be'
+ ' impersonated_credentials')
+
+ def test_id_token_with_include_email(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials, target_audience=target_audience)
+ id_creds = id_creds.with_include_email(True)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA