diff options
author | Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> | 2019-10-21 17:04:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-21 17:04:21 -0700 |
commit | 9eec091cd4340f8e57dd0e837a7e8df808f9f5bd (patch) | |
tree | c54e4f29e43bcdae0d1dedfafaed8671e7eb5011 /tests/test_impersonated_credentials.py | |
parent | ae3bafd0b55d41a6deabfccdef10276a39b7eb8c (diff) | |
download | google-auth-library-python-9eec091cd4340f8e57dd0e837a7e8df808f9f5bd.tar.gz |
chore: blacken (#375)
Diffstat (limited to 'tests/test_impersonated_credentials.py')
-rw-r--r-- | tests/test_impersonated_credentials.py | 260 |
1 files changed, 119 insertions, 141 deletions
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 9945401..1cfcc7c 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -28,35 +28,38 @@ from google.auth import transport from google.auth.impersonated_credentials import Credentials from google.oauth2 import service_account -DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data') +DATA_DIR = os.path.join(os.path.dirname(__file__), "", "data") -with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh: +with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: PRIVATE_KEY_BYTES = fh.read() -SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json') +SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json") -ID_TOKEN_DATA = ('eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew' - 'Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc' - 'zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle' - 'HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L' - 'y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN' - 'zA4NTY4In0.redacted') +ID_TOKEN_DATA = ( + "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew" + "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc" + "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle" + "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L" + "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN" + "zA4NTY4In0.redacted" +) ID_TOKEN_EXPIRY = 1564475051 -with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh: +with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh: SERVICE_ACCOUNT_INFO = json.load(fh) -SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1') -TOKEN_URI = 'https://example.com/oauth2/token' +SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") +TOKEN_URI = "https://example.com/oauth2/token" @pytest.fixture def mock_donor_credentials(): - with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant: + with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant: grant.return_value = ( "source token", _helpers.utcnow() + datetime.timedelta(seconds=500), - {}) + {}, + ) yield grant @@ -71,54 +74,51 @@ class MockResponse: @pytest.fixture def mock_authorizedsession_sign(): - with mock.patch('google.auth.transport.requests.AuthorizedSession.request', - autospec=True) as auth_session: - data = { - "keyId": "1", - "signedBlob": "c2lnbmF0dXJl" - } + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.request", autospec=True + ) as auth_session: + data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"} auth_session.return_value = MockResponse(data, http_client.OK) yield auth_session @pytest.fixture def mock_authorizedsession_idtoken(): - with mock.patch('google.auth.transport.requests.AuthorizedSession.request', - autospec=True) as auth_session: - data = { - "token": ID_TOKEN_DATA - } + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.request", autospec=True + ) as auth_session: + data = {"token": ID_TOKEN_DATA} auth_session.return_value = MockResponse(data, http_client.OK) yield auth_session class TestImpersonatedCredentials(object): - SERVICE_ACCOUNT_EMAIL = 'service-account@example.com' - TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com' - TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only'] + SERVICE_ACCOUNT_EMAIL = "service-account@example.com" + TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com" + TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"] DELEGATES = [] LIFETIME = 3600 SOURCE_CREDENTIALS = service_account.Credentials( - SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI) + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI + ) - def make_credentials(self, lifetime=LIFETIME, - target_principal=TARGET_PRINCIPAL): + def make_credentials(self, lifetime=LIFETIME, target_principal=TARGET_PRINCIPAL): return Credentials( source_credentials=self.SOURCE_CREDENTIALS, target_principal=target_principal, target_scopes=self.TARGET_SCOPES, delegates=self.DELEGATES, - lifetime=lifetime) + lifetime=lifetime, + ) def test_default_state(self): credentials = self.make_credentials() assert not credentials.valid assert credentials.expired - def make_request(self, data, status=http_client.OK, - headers=None, side_effect=None): + def make_request(self, data, status=http_client.OK, headers=None, side_effect=None): response = mock.create_autospec(transport.Response, instance=False) response.status = status response.data = _helpers.to_bytes(data) @@ -132,40 +132,34 @@ class TestImpersonatedCredentials(object): def test_refresh_success(self, mock_donor_credentials): credentials = self.make_credentials(lifetime=None) - token = 'token' + token = "token" expire_time = ( - _helpers.utcnow().replace(microsecond=0) + - datetime.timedelta(seconds=500)).isoformat('T') + 'Z' - response_body = { - "accessToken": token, - "expireTime": expire_time - } + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} request = self.make_request( - data=json.dumps(response_body), - status=http_client.OK) + data=json.dumps(response_body), status=http_client.OK + ) credentials.refresh(request) assert credentials.valid assert not credentials.expired - def test_refresh_failure_malformed_expire_time( - self, mock_donor_credentials): + def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials): credentials = self.make_credentials(lifetime=None) - token = 'token' + token = "token" - expire_time = ( - _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T') - response_body = { - "accessToken": token, - "expireTime": expire_time - } + expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat( + "T" + ) + response_body = {"accessToken": token, "expireTime": expire_time} request = self.make_request( - data=json.dumps(response_body), - status=http_client.OK) + data=json.dumps(response_body), status=http_client.OK + ) with pytest.raises(exceptions.RefreshError) as excinfo: credentials.refresh(request) @@ -180,15 +174,15 @@ class TestImpersonatedCredentials(object): response_body = { "error": { - "code": 403, - "message": "The caller does not have permission", - "status": "PERMISSION_DENIED" + "code": 403, + "message": "The caller does not have permission", + "status": "PERMISSION_DENIED", } } request = self.make_request( - data=json.dumps(response_body), - status=http_client.UNAUTHORIZED) + data=json.dumps(response_body), status=http_client.UNAUTHORIZED + ) with pytest.raises(exceptions.RefreshError) as excinfo: credentials.refresh(request) @@ -204,8 +198,8 @@ class TestImpersonatedCredentials(object): response_body = {} request = self.make_request( - data=json.dumps(response_body), - status=http_client.HTTPException) + data=json.dumps(response_body), status=http_client.HTTPException + ) with pytest.raises(exceptions.RefreshError) as excinfo: credentials.refresh(request) @@ -221,31 +215,24 @@ class TestImpersonatedCredentials(object): def test_signer(self): credentials = self.make_credentials() - assert isinstance(credentials.signer, - impersonated_credentials.Credentials) + assert isinstance(credentials.signer, impersonated_credentials.Credentials) def test_signer_email(self): - credentials = self.make_credentials( - target_principal=self.TARGET_PRINCIPAL) + credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL) assert credentials.signer_email == self.TARGET_PRINCIPAL def test_service_account_email(self): - credentials = self.make_credentials( - target_principal=self.TARGET_PRINCIPAL) + credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL) assert credentials.service_account_email == self.TARGET_PRINCIPAL - def test_sign_bytes(self, mock_donor_credentials, - mock_authorizedsession_sign): + def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign): credentials = self.make_credentials(lifetime=None) - token = 'token' + token = "token" expire_time = ( - _helpers.utcnow().replace(microsecond=0) + - datetime.timedelta(seconds=500)).isoformat('T') + 'Z' - token_response_body = { - "accessToken": token, - "expireTime": expire_time - } + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + token_response_body = {"accessToken": token, "expireTime": expire_time} response = mock.create_autospec(transport.Response, instance=False) response.status = http_client.OK @@ -259,26 +246,24 @@ class TestImpersonatedCredentials(object): assert credentials.valid assert not credentials.expired - signature = credentials.sign_bytes(b'signed bytes') - assert signature == b'signature' + signature = credentials.sign_bytes(b"signed bytes") + assert signature == b"signature" - def test_id_token_success(self, mock_donor_credentials, - mock_authorizedsession_idtoken): + def test_id_token_success( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): credentials = self.make_credentials(lifetime=None) - token = 'token' - target_audience = 'https://foo.bar' + token = "token" + target_audience = "https://foo.bar" expire_time = ( - _helpers.utcnow().replace(microsecond=0) + - datetime.timedelta(seconds=500)).isoformat('T') + 'Z' - response_body = { - "accessToken": token, - "expireTime": expire_time - } + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} request = self.make_request( - data=json.dumps(response_body), - status=http_client.OK) + data=json.dumps(response_body), status=http_client.OK + ) credentials.refresh(request) @@ -286,30 +271,28 @@ class TestImpersonatedCredentials(object): assert not credentials.expired id_creds = impersonated_credentials.IDTokenCredentials( - credentials, target_audience=target_audience) + credentials, target_audience=target_audience + ) id_creds.refresh(request) assert id_creds.token == ID_TOKEN_DATA - assert id_creds.expiry == datetime.datetime.fromtimestamp( - ID_TOKEN_EXPIRY) + assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY) - def test_id_token_from_credential(self, mock_donor_credentials, - mock_authorizedsession_idtoken): + def test_id_token_from_credential( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): credentials = self.make_credentials(lifetime=None) - token = 'token' - target_audience = 'https://foo.bar' + token = "token" + target_audience = "https://foo.bar" expire_time = ( - _helpers.utcnow().replace(microsecond=0) + - datetime.timedelta(seconds=500)).isoformat('T') + 'Z' - response_body = { - "accessToken": token, - "expireTime": expire_time - } + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} request = self.make_request( - data=json.dumps(response_body), - status=http_client.OK) + data=json.dumps(response_body), status=http_client.OK + ) credentials.refresh(request) @@ -317,72 +300,66 @@ class TestImpersonatedCredentials(object): assert not credentials.expired id_creds = impersonated_credentials.IDTokenCredentials( - credentials, target_audience=target_audience) + credentials, target_audience=target_audience + ) id_creds = id_creds.from_credentials(target_credentials=credentials) id_creds.refresh(request) assert id_creds.token == ID_TOKEN_DATA - def test_id_token_with_target_audience(self, mock_donor_credentials, - mock_authorizedsession_idtoken): + def test_id_token_with_target_audience( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): credentials = self.make_credentials(lifetime=None) - token = 'token' - target_audience = 'https://foo.bar' + token = "token" + target_audience = "https://foo.bar" expire_time = ( - _helpers.utcnow().replace(microsecond=0) + - datetime.timedelta(seconds=500)).isoformat('T') + 'Z' - response_body = { - "accessToken": token, - "expireTime": expire_time - } + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} request = self.make_request( - data=json.dumps(response_body), - status=http_client.OK) + data=json.dumps(response_body), status=http_client.OK + ) credentials.refresh(request) assert credentials.valid assert not credentials.expired - id_creds = impersonated_credentials.IDTokenCredentials( - credentials) - id_creds = id_creds.with_target_audience( - target_audience=target_audience) + id_creds = impersonated_credentials.IDTokenCredentials(credentials) + id_creds = id_creds.with_target_audience(target_audience=target_audience) id_creds.refresh(request) assert id_creds.token == ID_TOKEN_DATA - assert id_creds.expiry == datetime.datetime.fromtimestamp( - ID_TOKEN_EXPIRY) + assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY) - def test_id_token_invalid_cred(self, mock_donor_credentials, - mock_authorizedsession_idtoken): + def test_id_token_invalid_cred( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): credentials = None with pytest.raises(exceptions.GoogleAuthError) as excinfo: impersonated_credentials.IDTokenCredentials(credentials) - assert excinfo.match('Provided Credential must be' - ' impersonated_credentials') + assert excinfo.match("Provided Credential must be" " impersonated_credentials") - def test_id_token_with_include_email(self, mock_donor_credentials, - mock_authorizedsession_idtoken): + def test_id_token_with_include_email( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): credentials = self.make_credentials(lifetime=None) - token = 'token' - target_audience = 'https://foo.bar' + token = "token" + target_audience = "https://foo.bar" expire_time = ( - _helpers.utcnow().replace(microsecond=0) + - datetime.timedelta(seconds=500)).isoformat('T') + 'Z' - response_body = { - "accessToken": token, - "expireTime": expire_time - } + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} request = self.make_request( - data=json.dumps(response_body), - status=http_client.OK) + data=json.dumps(response_body), status=http_client.OK + ) credentials.refresh(request) @@ -390,7 +367,8 @@ class TestImpersonatedCredentials(object): assert not credentials.expired id_creds = impersonated_credentials.IDTokenCredentials( - credentials, target_audience=target_audience) + credentials, target_audience=target_audience + ) id_creds = id_creds.with_include_email(True) id_creds.refresh(request) |