aboutsummaryrefslogtreecommitdiff
path: root/tests/test_impersonated_credentials.py
diff options
context:
space:
mode:
authorBu Sun Kim <8822365+busunkim96@users.noreply.github.com>2019-10-21 17:04:21 -0700
committerGitHub <noreply@github.com>2019-10-21 17:04:21 -0700
commit9eec091cd4340f8e57dd0e837a7e8df808f9f5bd (patch)
treec54e4f29e43bcdae0d1dedfafaed8671e7eb5011 /tests/test_impersonated_credentials.py
parentae3bafd0b55d41a6deabfccdef10276a39b7eb8c (diff)
downloadgoogle-auth-library-python-9eec091cd4340f8e57dd0e837a7e8df808f9f5bd.tar.gz
chore: blacken (#375)
Diffstat (limited to 'tests/test_impersonated_credentials.py')
-rw-r--r--tests/test_impersonated_credentials.py260
1 files changed, 119 insertions, 141 deletions
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 9945401..1cfcc7c 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -28,35 +28,38 @@ from google.auth import transport
from google.auth.impersonated_credentials import Credentials
from google.oauth2 import service_account
-DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "", "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-ID_TOKEN_DATA = ('eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew'
- 'Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc'
- 'zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle'
- 'HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L'
- 'y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN'
- 'zA4NTY4In0.redacted')
+ID_TOKEN_DATA = (
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
+ "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc"
+ "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle"
+ "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L"
+ "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN"
+ "zA4NTY4In0.redacted"
+)
ID_TOKEN_EXPIRY = 1564475051
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
-SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
-TOKEN_URI = 'https://example.com/oauth2/token'
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
+TOKEN_URI = "https://example.com/oauth2/token"
@pytest.fixture
def mock_donor_credentials():
- with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant:
+ with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
grant.return_value = (
"source token",
_helpers.utcnow() + datetime.timedelta(seconds=500),
- {})
+ {},
+ )
yield grant
@@ -71,54 +74,51 @@ class MockResponse:
@pytest.fixture
def mock_authorizedsession_sign():
- with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
- autospec=True) as auth_session:
- data = {
- "keyId": "1",
- "signedBlob": "c2lnbmF0dXJl"
- }
+ with mock.patch(
+ "google.auth.transport.requests.AuthorizedSession.request", autospec=True
+ ) as auth_session:
+ data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"}
auth_session.return_value = MockResponse(data, http_client.OK)
yield auth_session
@pytest.fixture
def mock_authorizedsession_idtoken():
- with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
- autospec=True) as auth_session:
- data = {
- "token": ID_TOKEN_DATA
- }
+ with mock.patch(
+ "google.auth.transport.requests.AuthorizedSession.request", autospec=True
+ ) as auth_session:
+ data = {"token": ID_TOKEN_DATA}
auth_session.return_value = MockResponse(data, http_client.OK)
yield auth_session
class TestImpersonatedCredentials(object):
- SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
- TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com'
- TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only']
+ SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
+ TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com"
+ TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"]
DELEGATES = []
LIFETIME = 3600
SOURCE_CREDENTIALS = service_account.Credentials(
- SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
+ SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
+ )
- def make_credentials(self, lifetime=LIFETIME,
- target_principal=TARGET_PRINCIPAL):
+ def make_credentials(self, lifetime=LIFETIME, target_principal=TARGET_PRINCIPAL):
return Credentials(
source_credentials=self.SOURCE_CREDENTIALS,
target_principal=target_principal,
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
- lifetime=lifetime)
+ lifetime=lifetime,
+ )
def test_default_state(self):
credentials = self.make_credentials()
assert not credentials.valid
assert credentials.expired
- def make_request(self, data, status=http_client.OK,
- headers=None, side_effect=None):
+ def make_request(self, data, status=http_client.OK, headers=None, side_effect=None):
response = mock.create_autospec(transport.Response, instance=False)
response.status = status
response.data = _helpers.to_bytes(data)
@@ -132,40 +132,34 @@ class TestImpersonatedCredentials(object):
def test_refresh_success(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
+ token = "token"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
assert credentials.valid
assert not credentials.expired
- def test_refresh_failure_malformed_expire_time(
- self, mock_donor_credentials):
+ def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
+ token = "token"
- expire_time = (
- _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T')
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat(
+ "T"
+ )
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
@@ -180,15 +174,15 @@ class TestImpersonatedCredentials(object):
response_body = {
"error": {
- "code": 403,
- "message": "The caller does not have permission",
- "status": "PERMISSION_DENIED"
+ "code": 403,
+ "message": "The caller does not have permission",
+ "status": "PERMISSION_DENIED",
}
}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.UNAUTHORIZED)
+ data=json.dumps(response_body), status=http_client.UNAUTHORIZED
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
@@ -204,8 +198,8 @@ class TestImpersonatedCredentials(object):
response_body = {}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.HTTPException)
+ data=json.dumps(response_body), status=http_client.HTTPException
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
@@ -221,31 +215,24 @@ class TestImpersonatedCredentials(object):
def test_signer(self):
credentials = self.make_credentials()
- assert isinstance(credentials.signer,
- impersonated_credentials.Credentials)
+ assert isinstance(credentials.signer, impersonated_credentials.Credentials)
def test_signer_email(self):
- credentials = self.make_credentials(
- target_principal=self.TARGET_PRINCIPAL)
+ credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
assert credentials.signer_email == self.TARGET_PRINCIPAL
def test_service_account_email(self):
- credentials = self.make_credentials(
- target_principal=self.TARGET_PRINCIPAL)
+ credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
assert credentials.service_account_email == self.TARGET_PRINCIPAL
- def test_sign_bytes(self, mock_donor_credentials,
- mock_authorizedsession_sign):
+ def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
+ token = "token"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- token_response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ token_response_body = {"accessToken": token, "expireTime": expire_time}
response = mock.create_autospec(transport.Response, instance=False)
response.status = http_client.OK
@@ -259,26 +246,24 @@ class TestImpersonatedCredentials(object):
assert credentials.valid
assert not credentials.expired
- signature = credentials.sign_bytes(b'signed bytes')
- assert signature == b'signature'
+ signature = credentials.sign_bytes(b"signed bytes")
+ assert signature == b"signature"
- def test_id_token_success(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_success(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
@@ -286,30 +271,28 @@ class TestImpersonatedCredentials(object):
assert not credentials.expired
id_creds = impersonated_credentials.IDTokenCredentials(
- credentials, target_audience=target_audience)
+ credentials, target_audience=target_audience
+ )
id_creds.refresh(request)
assert id_creds.token == ID_TOKEN_DATA
- assert id_creds.expiry == datetime.datetime.fromtimestamp(
- ID_TOKEN_EXPIRY)
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY)
- def test_id_token_from_credential(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_from_credential(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
@@ -317,72 +300,66 @@ class TestImpersonatedCredentials(object):
assert not credentials.expired
id_creds = impersonated_credentials.IDTokenCredentials(
- credentials, target_audience=target_audience)
+ credentials, target_audience=target_audience
+ )
id_creds = id_creds.from_credentials(target_credentials=credentials)
id_creds.refresh(request)
assert id_creds.token == ID_TOKEN_DATA
- def test_id_token_with_target_audience(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_with_target_audience(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
assert credentials.valid
assert not credentials.expired
- id_creds = impersonated_credentials.IDTokenCredentials(
- credentials)
- id_creds = id_creds.with_target_audience(
- target_audience=target_audience)
+ id_creds = impersonated_credentials.IDTokenCredentials(credentials)
+ id_creds = id_creds.with_target_audience(target_audience=target_audience)
id_creds.refresh(request)
assert id_creds.token == ID_TOKEN_DATA
- assert id_creds.expiry == datetime.datetime.fromtimestamp(
- ID_TOKEN_EXPIRY)
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY)
- def test_id_token_invalid_cred(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_invalid_cred(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = None
with pytest.raises(exceptions.GoogleAuthError) as excinfo:
impersonated_credentials.IDTokenCredentials(credentials)
- assert excinfo.match('Provided Credential must be'
- ' impersonated_credentials')
+ assert excinfo.match("Provided Credential must be" " impersonated_credentials")
- def test_id_token_with_include_email(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_with_include_email(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
@@ -390,7 +367,8 @@ class TestImpersonatedCredentials(object):
assert not credentials.expired
id_creds = impersonated_credentials.IDTokenCredentials(
- credentials, target_audience=target_audience)
+ credentials, target_audience=target_audience
+ )
id_creds = id_creds.with_include_email(True)
id_creds.refresh(request)