# Copyright 2016 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import os import time import mock import pytest from google.auth import _helpers from google.auth import credentials from google.auth import environment_vars from google.auth import exceptions from google.auth import transport from google.oauth2 import service_account try: # pylint: disable=ungrouped-imports import grpc import google.auth.transport.grpc HAS_GRPC = True except ImportError: # pragma: NO COVER HAS_GRPC = False DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") METADATA_PATH = os.path.join(DATA_DIR, "context_aware_metadata.json") with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: PRIVATE_KEY_BYTES = fh.read() with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh: PUBLIC_CERT_BYTES = fh.read() pytestmark = pytest.mark.skipif(not HAS_GRPC, reason="gRPC is unavailable.") class CredentialsStub(credentials.Credentials): def __init__(self, token="token"): super(CredentialsStub, self).__init__() self.token = token self.expiry = None def refresh(self, request): self.token += "1" def with_quota_project(self, quota_project_id): raise NotImplementedError() class TestAuthMetadataPlugin(object): def test_call_no_refresh(self): credentials = CredentialsStub() request = mock.create_autospec(transport.Request) plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request) context = mock.create_autospec(grpc.AuthMetadataContext, instance=True) context.method_name = mock.sentinel.method_name context.service_url = mock.sentinel.service_url callback = mock.create_autospec(grpc.AuthMetadataPluginCallback) plugin(context, callback) time.sleep(2) callback.assert_called_once_with( [("authorization", "Bearer {}".format(credentials.token))], None ) def test_call_refresh(self): credentials = CredentialsStub() credentials.expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD request = mock.create_autospec(transport.Request) plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request) context = mock.create_autospec(grpc.AuthMetadataContext, instance=True) context.method_name = mock.sentinel.method_name context.service_url = mock.sentinel.service_url callback = mock.create_autospec(grpc.AuthMetadataPluginCallback) plugin(context, callback) time.sleep(2) assert credentials.token == "token1" callback.assert_called_once_with( [("authorization", "Bearer {}".format(credentials.token))], None ) def test__get_authorization_headers_with_service_account(self): credentials = mock.create_autospec(service_account.Credentials) request = mock.create_autospec(transport.Request) plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request) context = mock.create_autospec(grpc.AuthMetadataContext, instance=True) context.method_name = "methodName" context.service_url = "https://pubsub.googleapis.com/methodName" plugin._get_authorization_headers(context) credentials._create_self_signed_jwt.assert_called_once_with(None) def test__get_authorization_headers_with_service_account_and_default_host(self): credentials = mock.create_autospec(service_account.Credentials) request = mock.create_autospec(transport.Request) default_host = "pubsub.googleapis.com" plugin = google.auth.transport.grpc.AuthMetadataPlugin( credentials, request, default_host=default_host ) context = mock.create_autospec(grpc.AuthMetadataContext, instance=True) context.method_name = "methodName" context.service_url = "https://pubsub.googleapis.com/methodName" plugin._get_authorization_headers(context) credentials._create_self_signed_jwt.assert_called_once_with( "https://{}/".format(default_host) ) @mock.patch( "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True ) @mock.patch("grpc.composite_channel_credentials", autospec=True) @mock.patch("grpc.metadata_call_credentials", autospec=True) @mock.patch("grpc.ssl_channel_credentials", autospec=True) @mock.patch("grpc.secure_channel", autospec=True) class TestSecureAuthorizedChannel(object): @mock.patch( "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_secure_authorized_channel_adc( self, check_dca_metadata_path, read_dca_metadata_file, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): credentials = CredentialsStub() request = mock.create_autospec(transport.Request) target = "example.com:80" # Mock the context aware metadata and client cert/key so mTLS SSL channel # will be used. check_dca_metadata_path.return_value = METADATA_PATH read_dca_metadata_file.return_value = { "cert_provider_command": ["some command"] } get_client_ssl_credentials.return_value = ( True, PUBLIC_CERT_BYTES, PRIVATE_KEY_BYTES, None, ) channel = None with mock.patch.dict( os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} ): channel = google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, options=mock.sentinel.options ) # Check the auth plugin construction. auth_plugin = metadata_call_credentials.call_args[0][0] assert isinstance(auth_plugin, google.auth.transport.grpc.AuthMetadataPlugin) assert auth_plugin._credentials == credentials assert auth_plugin._request == request # Check the ssl channel call. ssl_channel_credentials.assert_called_once_with( certificate_chain=PUBLIC_CERT_BYTES, private_key=PRIVATE_KEY_BYTES ) # Check the composite credentials call. composite_channel_credentials.assert_called_once_with( ssl_channel_credentials.return_value, metadata_call_credentials.return_value ) # Check the channel call. secure_channel.assert_called_once_with( target, composite_channel_credentials.return_value, options=mock.sentinel.options, ) assert channel == secure_channel.return_value @mock.patch("google.auth.transport.grpc.SslCredentials", autospec=True) def test_secure_authorized_channel_adc_without_client_cert_env( self, ssl_credentials_adc_method, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): # Test client cert won't be used if GOOGLE_API_USE_CLIENT_CERTIFICATE # environment variable is not set. credentials = CredentialsStub() request = mock.create_autospec(transport.Request) target = "example.com:80" channel = google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, options=mock.sentinel.options ) # Check the auth plugin construction. auth_plugin = metadata_call_credentials.call_args[0][0] assert isinstance(auth_plugin, google.auth.transport.grpc.AuthMetadataPlugin) assert auth_plugin._credentials == credentials assert auth_plugin._request == request # Check the ssl channel call. ssl_channel_credentials.assert_called_once() ssl_credentials_adc_method.assert_not_called() # Check the composite credentials call. composite_channel_credentials.assert_called_once_with( ssl_channel_credentials.return_value, metadata_call_credentials.return_value ) # Check the channel call. secure_channel.assert_called_once_with( target, composite_channel_credentials.return_value, options=mock.sentinel.options, ) assert channel == secure_channel.return_value def test_secure_authorized_channel_explicit_ssl( self, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): credentials = mock.Mock() request = mock.Mock() target = "example.com:80" ssl_credentials = mock.Mock() google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, ssl_credentials=ssl_credentials ) # Since explicit SSL credentials are provided, get_client_ssl_credentials # shouldn't be called. assert not get_client_ssl_credentials.called # Check the ssl channel call. assert not ssl_channel_credentials.called # Check the composite credentials call. composite_channel_credentials.assert_called_once_with( ssl_credentials, metadata_call_credentials.return_value ) def test_secure_authorized_channel_mutual_exclusive( self, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): credentials = mock.Mock() request = mock.Mock() target = "example.com:80" ssl_credentials = mock.Mock() client_cert_callback = mock.Mock() with pytest.raises(ValueError): google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, ssl_credentials=ssl_credentials, client_cert_callback=client_cert_callback, ) def test_secure_authorized_channel_with_client_cert_callback_success( self, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): credentials = mock.Mock() request = mock.Mock() target = "example.com:80" client_cert_callback = mock.Mock() client_cert_callback.return_value = (PUBLIC_CERT_BYTES, PRIVATE_KEY_BYTES) with mock.patch.dict( os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} ): google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, client_cert_callback=client_cert_callback ) client_cert_callback.assert_called_once() # Check we are using the cert and key provided by client_cert_callback. ssl_channel_credentials.assert_called_once_with( certificate_chain=PUBLIC_CERT_BYTES, private_key=PRIVATE_KEY_BYTES ) # Check the composite credentials call. composite_channel_credentials.assert_called_once_with( ssl_channel_credentials.return_value, metadata_call_credentials.return_value ) @mock.patch( "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_secure_authorized_channel_with_client_cert_callback_failure( self, check_dca_metadata_path, read_dca_metadata_file, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): credentials = mock.Mock() request = mock.Mock() target = "example.com:80" client_cert_callback = mock.Mock() client_cert_callback.side_effect = Exception("callback exception") with pytest.raises(Exception) as excinfo: with mock.patch.dict( os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} ): google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, client_cert_callback=client_cert_callback, ) assert str(excinfo.value) == "callback exception" def test_secure_authorized_channel_cert_callback_without_client_cert_env( self, secure_channel, ssl_channel_credentials, metadata_call_credentials, composite_channel_credentials, get_client_ssl_credentials, ): # Test client cert won't be used if GOOGLE_API_USE_CLIENT_CERTIFICATE # environment variable is not set. credentials = mock.Mock() request = mock.Mock() target = "example.com:80" client_cert_callback = mock.Mock() google.auth.transport.grpc.secure_authorized_channel( credentials, request, target, client_cert_callback=client_cert_callback ) # Check client_cert_callback is not called because GOOGLE_API_USE_CLIENT_CERTIFICATE # is not set. client_cert_callback.assert_not_called() ssl_channel_credentials.assert_called_once() # Check the composite credentials call. composite_channel_credentials.assert_called_once_with( ssl_channel_credentials.return_value, metadata_call_credentials.return_value ) @mock.patch("grpc.ssl_channel_credentials", autospec=True) @mock.patch( "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True ) @mock.patch("google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) class TestSslCredentials(object): def test_no_context_aware_metadata( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): # Mock that the metadata file doesn't exist. mock_check_dca_metadata_path.return_value = None with mock.patch.dict( os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} ): ssl_credentials = google.auth.transport.grpc.SslCredentials() # Since no context aware metadata is found, we wouldn't call # get_client_ssl_credentials, and the SSL channel credentials created is # non mTLS. assert ssl_credentials.ssl_credentials is not None assert not ssl_credentials.is_mtls mock_get_client_ssl_credentials.assert_not_called() mock_ssl_channel_credentials.assert_called_once_with() def test_get_client_ssl_credentials_failure( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): mock_check_dca_metadata_path.return_value = METADATA_PATH mock_read_dca_metadata_file.return_value = { "cert_provider_command": ["some command"] } # Mock that client cert and key are not loaded and exception is raised. mock_get_client_ssl_credentials.side_effect = exceptions.ClientCertError() with pytest.raises(exceptions.MutualTLSChannelError): with mock.patch.dict( os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} ): assert google.auth.transport.grpc.SslCredentials().ssl_credentials def test_get_client_ssl_credentials_success( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): mock_check_dca_metadata_path.return_value = METADATA_PATH mock_read_dca_metadata_file.return_value = { "cert_provider_command": ["some command"] } mock_get_client_ssl_credentials.return_value = ( True, PUBLIC_CERT_BYTES, PRIVATE_KEY_BYTES, None, ) with mock.patch.dict( os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} ): ssl_credentials = google.auth.transport.grpc.SslCredentials() assert ssl_credentials.ssl_credentials is not None assert ssl_credentials.is_mtls mock_get_client_ssl_credentials.assert_called_once() mock_ssl_channel_credentials.assert_called_once_with( certificate_chain=PUBLIC_CERT_BYTES, private_key=PRIVATE_KEY_BYTES ) def test_get_client_ssl_credentials_without_client_cert_env( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): # Test client cert won't be used if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set. ssl_credentials = google.auth.transport.grpc.SslCredentials() assert ssl_credentials.ssl_credentials is not None assert not ssl_credentials.is_mtls mock_check_dca_metadata_path.assert_not_called() mock_read_dca_metadata_file.assert_not_called() mock_get_client_ssl_credentials.assert_not_called() mock_ssl_channel_credentials.assert_called_once()