aboutsummaryrefslogtreecommitdiff
path: root/tests/unit/test_grpc_helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/test_grpc_helpers.py')
-rw-r--r--tests/unit/test_grpc_helpers.py860
1 files changed, 860 insertions, 0 deletions
diff --git a/tests/unit/test_grpc_helpers.py b/tests/unit/test_grpc_helpers.py
new file mode 100644
index 0000000..ca969e4
--- /dev/null
+++ b/tests/unit/test_grpc_helpers.py
@@ -0,0 +1,860 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import pytest
+
+try:
+ import grpc
+except ImportError:
+ pytest.skip("No GRPC", allow_module_level=True)
+
+from google.api_core import exceptions
+from google.api_core import grpc_helpers
+import google.auth.credentials
+from google.longrunning import operations_pb2
+
+
+def test__patch_callable_name():
+ callable = mock.Mock(spec=["__class__"])
+ callable.__class__ = mock.Mock(spec=["__name__"])
+ callable.__class__.__name__ = "TestCallable"
+
+ grpc_helpers._patch_callable_name(callable)
+
+ assert callable.__name__ == "TestCallable"
+
+
+def test__patch_callable_name_no_op():
+ callable = mock.Mock(spec=["__name__"])
+ callable.__name__ = "test_callable"
+
+ grpc_helpers._patch_callable_name(callable)
+
+ assert callable.__name__ == "test_callable"
+
+
+class RpcErrorImpl(grpc.RpcError, grpc.Call):
+ def __init__(self, code):
+ super(RpcErrorImpl, self).__init__()
+ self._code = code
+
+ def code(self):
+ return self._code
+
+ def details(self):
+ return None
+
+ def trailing_metadata(self):
+ return None
+
+
+def test_wrap_unary_errors():
+ grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+ callable_ = mock.Mock(spec=["__call__"], side_effect=grpc_error)
+
+ wrapped_callable = grpc_helpers._wrap_unary_errors(callable_)
+
+ with pytest.raises(exceptions.InvalidArgument) as exc_info:
+ wrapped_callable(1, 2, three="four")
+
+ callable_.assert_called_once_with(1, 2, three="four")
+ assert exc_info.value.response == grpc_error
+
+
+class Test_StreamingResponseIterator:
+ @staticmethod
+ def _make_wrapped(*items):
+ return iter(items)
+
+ @staticmethod
+ def _make_one(wrapped, **kw):
+ return grpc_helpers._StreamingResponseIterator(wrapped, **kw)
+
+ def test_ctor_defaults(self):
+ wrapped = self._make_wrapped("a", "b", "c")
+ iterator = self._make_one(wrapped)
+ assert iterator._stored_first_result == "a"
+ assert list(wrapped) == ["b", "c"]
+
+ def test_ctor_explicit(self):
+ wrapped = self._make_wrapped("a", "b", "c")
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+ assert getattr(iterator, "_stored_first_result", self) is self
+ assert list(wrapped) == ["a", "b", "c"]
+
+ def test_ctor_w_rpc_error_on_prefetch(self):
+ wrapped = mock.MagicMock()
+ wrapped.__next__.side_effect = grpc.RpcError()
+
+ with pytest.raises(grpc.RpcError):
+ self._make_one(wrapped)
+
+ def test___iter__(self):
+ wrapped = self._make_wrapped("a", "b", "c")
+ iterator = self._make_one(wrapped)
+ assert iter(iterator) is iterator
+
+ def test___next___w_cached_first_result(self):
+ wrapped = self._make_wrapped("a", "b", "c")
+ iterator = self._make_one(wrapped)
+ assert next(iterator) == "a"
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+ assert next(iterator) == "b"
+ assert next(iterator) == "c"
+
+ def test___next___wo_cached_first_result(self):
+ wrapped = self._make_wrapped("a", "b", "c")
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+ assert next(iterator) == "a"
+ assert next(iterator) == "b"
+ assert next(iterator) == "c"
+
+ def test___next___w_rpc_error(self):
+ wrapped = mock.MagicMock()
+ wrapped.__next__.side_effect = grpc.RpcError()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ with pytest.raises(exceptions.GoogleAPICallError):
+ next(iterator)
+
+ def test_add_callback(self):
+ wrapped = mock.MagicMock()
+ callback = mock.Mock(spec={})
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.add_callback(callback) is wrapped.add_callback.return_value
+
+ wrapped.add_callback.assert_called_once_with(callback)
+
+ def test_cancel(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.cancel() is wrapped.cancel.return_value
+
+ wrapped.cancel.assert_called_once_with()
+
+ def test_code(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.code() is wrapped.code.return_value
+
+ wrapped.code.assert_called_once_with()
+
+ def test_details(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.details() is wrapped.details.return_value
+
+ wrapped.details.assert_called_once_with()
+
+ def test_initial_metadata(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.initial_metadata() is wrapped.initial_metadata.return_value
+
+ wrapped.initial_metadata.assert_called_once_with()
+
+ def test_is_active(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.is_active() is wrapped.is_active.return_value
+
+ wrapped.is_active.assert_called_once_with()
+
+ def test_time_remaining(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.time_remaining() is wrapped.time_remaining.return_value
+
+ wrapped.time_remaining.assert_called_once_with()
+
+ def test_trailing_metadata(self):
+ wrapped = mock.MagicMock()
+ iterator = self._make_one(wrapped, prefetch_first_result=False)
+
+ assert iterator.trailing_metadata() is wrapped.trailing_metadata.return_value
+
+ wrapped.trailing_metadata.assert_called_once_with()
+
+
+def test_wrap_stream_okay():
+ expected_responses = [1, 2, 3]
+ callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses))
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+ got_iterator = wrapped_callable(1, 2, three="four")
+
+ responses = list(got_iterator)
+
+ callable_.assert_called_once_with(1, 2, three="four")
+ assert responses == expected_responses
+
+
+def test_wrap_stream_prefetch_disabled():
+ responses = [1, 2, 3]
+ iter_responses = iter(responses)
+ callable_ = mock.Mock(spec=["__call__"], return_value=iter_responses)
+ callable_._prefetch_first_result_ = False
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+ wrapped_callable(1, 2, three="four")
+
+ assert list(iter_responses) == responses # no items should have been pre-fetched
+ callable_.assert_called_once_with(1, 2, three="four")
+
+
+def test_wrap_stream_iterable_iterface():
+ response_iter = mock.create_autospec(grpc.Call, instance=True)
+ callable_ = mock.Mock(spec=["__call__"], return_value=response_iter)
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+ got_iterator = wrapped_callable()
+
+ callable_.assert_called_once_with()
+
+ # Check each aliased method in the grpc.Call interface
+ got_iterator.add_callback(mock.sentinel.callback)
+ response_iter.add_callback.assert_called_once_with(mock.sentinel.callback)
+
+ got_iterator.cancel()
+ response_iter.cancel.assert_called_once_with()
+
+ got_iterator.code()
+ response_iter.code.assert_called_once_with()
+
+ got_iterator.details()
+ response_iter.details.assert_called_once_with()
+
+ got_iterator.initial_metadata()
+ response_iter.initial_metadata.assert_called_once_with()
+
+ got_iterator.is_active()
+ response_iter.is_active.assert_called_once_with()
+
+ got_iterator.time_remaining()
+ response_iter.time_remaining.assert_called_once_with()
+
+ got_iterator.trailing_metadata()
+ response_iter.trailing_metadata.assert_called_once_with()
+
+
+def test_wrap_stream_errors_invocation():
+ grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+ callable_ = mock.Mock(spec=["__call__"], side_effect=grpc_error)
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+ with pytest.raises(exceptions.InvalidArgument) as exc_info:
+ wrapped_callable(1, 2, three="four")
+
+ callable_.assert_called_once_with(1, 2, three="four")
+ assert exc_info.value.response == grpc_error
+
+
+def test_wrap_stream_empty_iterator():
+ expected_responses = []
+ callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses))
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+ got_iterator = wrapped_callable()
+
+ responses = list(got_iterator)
+
+ callable_.assert_called_once_with()
+ assert responses == expected_responses
+
+
+class RpcResponseIteratorImpl(object):
+ def __init__(self, iterable):
+ self._iterable = iter(iterable)
+
+ def next(self):
+ next_item = next(self._iterable)
+ if isinstance(next_item, RpcErrorImpl):
+ raise next_item
+ return next_item
+
+ __next__ = next
+
+
+def test_wrap_stream_errors_iterator_initialization():
+ grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE)
+ response_iter = RpcResponseIteratorImpl([grpc_error])
+ callable_ = mock.Mock(spec=["__call__"], return_value=response_iter)
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+ with pytest.raises(exceptions.ServiceUnavailable) as exc_info:
+ wrapped_callable(1, 2, three="four")
+
+ callable_.assert_called_once_with(1, 2, three="four")
+ assert exc_info.value.response == grpc_error
+
+
+def test_wrap_stream_errors_during_iteration():
+ grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE)
+ response_iter = RpcResponseIteratorImpl([1, grpc_error])
+ callable_ = mock.Mock(spec=["__call__"], return_value=response_iter)
+
+ wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+ got_iterator = wrapped_callable(1, 2, three="four")
+ next(got_iterator)
+
+ with pytest.raises(exceptions.ServiceUnavailable) as exc_info:
+ next(got_iterator)
+
+ callable_.assert_called_once_with(1, 2, three="four")
+ assert exc_info.value.response == grpc_error
+
+
+@mock.patch("google.api_core.grpc_helpers._wrap_unary_errors")
+def test_wrap_errors_non_streaming(wrap_unary_errors):
+ callable_ = mock.create_autospec(grpc.UnaryUnaryMultiCallable)
+
+ result = grpc_helpers.wrap_errors(callable_)
+
+ assert result == wrap_unary_errors.return_value
+ wrap_unary_errors.assert_called_once_with(callable_)
+
+
+@mock.patch("google.api_core.grpc_helpers._wrap_stream_errors")
+def test_wrap_errors_streaming(wrap_stream_errors):
+ callable_ = mock.create_autospec(grpc.UnaryStreamMultiCallable)
+
+ result = grpc_helpers.wrap_errors(callable_)
+
+ assert result == wrap_stream_errors.return_value
+ wrap_stream_errors.assert_called_once_with(callable_)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+ "google.auth.default",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.secure_channel")
+def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_call):
+ target = "example.com:443"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(target)
+
+ assert channel is grpc_secure_channel.return_value
+
+ default.assert_called_once_with(scopes=None, default_scopes=None)
+
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("google.auth.transport.grpc.AuthMetadataPlugin", autospec=True)
+@mock.patch(
+ "google.auth.transport.requests.Request",
+ autospec=True,
+ return_value=mock.sentinel.Request,
+)
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+ "google.auth.default",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.project),
+)
+@mock.patch("grpc.secure_channel")
+def test_create_channel_implicit_with_default_host(
+ grpc_secure_channel, default, composite_creds_call, request, auth_metadata_plugin
+):
+ target = "example.com:443"
+ default_host = "example.com"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(target, default_host=default_host)
+
+ assert channel is grpc_secure_channel.return_value
+
+ default.assert_called_once_with(scopes=None, default_scopes=None)
+ auth_metadata_plugin.assert_called_once_with(
+ mock.sentinel.credentials, mock.sentinel.Request, default_host=default_host
+ )
+
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+ "google.auth.default",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.secure_channel")
+def test_create_channel_implicit_with_ssl_creds(
+ grpc_secure_channel, default, composite_creds_call
+):
+ target = "example.com:443"
+
+ ssl_creds = grpc.ssl_channel_credentials()
+
+ grpc_helpers.create_channel(target, ssl_credentials=ssl_creds)
+
+ default.assert_called_once_with(scopes=None, default_scopes=None)
+
+ composite_creds_call.assert_called_once_with(ssl_creds, mock.ANY)
+ composite_creds = composite_creds_call.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+ "google.auth.default",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.secure_channel")
+def test_create_channel_implicit_with_scopes(
+ grpc_secure_channel, default, composite_creds_call
+):
+ target = "example.com:443"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(target, scopes=["one", "two"])
+
+ assert channel is grpc_secure_channel.return_value
+
+ default.assert_called_once_with(scopes=["one", "two"], default_scopes=None)
+
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+ "google.auth.default",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.secure_channel")
+def test_create_channel_implicit_with_default_scopes(
+ grpc_secure_channel, default, composite_creds_call
+):
+ target = "example.com:443"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(target, default_scopes=["three", "four"])
+
+ assert channel is grpc_secure_channel.return_value
+
+ default.assert_called_once_with(scopes=None, default_scopes=["three", "four"])
+
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+def test_create_channel_explicit_with_duplicate_credentials():
+ target = "example.com:443"
+
+ with pytest.raises(exceptions.DuplicateCredentialArgs):
+ grpc_helpers.create_channel(
+ target,
+ credentials_file="credentials.json",
+ credentials=mock.sentinel.credentials,
+ )
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("google.auth.credentials.with_scopes_if_required", autospec=True)
+@mock.patch("grpc.secure_channel")
+def test_create_channel_explicit(grpc_secure_channel, auth_creds, composite_creds_call):
+ target = "example.com:443"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(target, credentials=mock.sentinel.credentials)
+
+ auth_creds.assert_called_once_with(
+ mock.sentinel.credentials, scopes=None, default_scopes=None
+ )
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.secure_channel")
+def test_create_channel_explicit_scoped(grpc_secure_channel, composite_creds_call):
+ target = "example.com:443"
+ scopes = ["1", "2"]
+ composite_creds = composite_creds_call.return_value
+
+ credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True)
+ credentials.requires_scopes = True
+
+ channel = grpc_helpers.create_channel(
+ target, credentials=credentials, scopes=scopes
+ )
+
+ credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None)
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.secure_channel")
+def test_create_channel_explicit_default_scopes(
+ grpc_secure_channel, composite_creds_call
+):
+ target = "example.com:443"
+ default_scopes = ["3", "4"]
+ composite_creds = composite_creds_call.return_value
+
+ credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True)
+ credentials.requires_scopes = True
+
+ channel = grpc_helpers.create_channel(
+ target, credentials=credentials, default_scopes=default_scopes
+ )
+
+ credentials.with_scopes.assert_called_once_with(
+ scopes=None, default_scopes=default_scopes
+ )
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.secure_channel")
+def test_create_channel_explicit_with_quota_project(
+ grpc_secure_channel, composite_creds_call
+):
+ target = "example.com:443"
+ composite_creds = composite_creds_call.return_value
+
+ credentials = mock.create_autospec(
+ google.auth.credentials.CredentialsWithQuotaProject, instance=True
+ )
+
+ channel = grpc_helpers.create_channel(
+ target, credentials=credentials, quota_project_id="project-foo"
+ )
+
+ credentials.with_quota_project.assert_called_once_with("project-foo")
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.secure_channel")
+@mock.patch(
+ "google.auth.load_credentials_from_file",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.project),
+)
+def test_create_channel_with_credentials_file(
+ load_credentials_from_file, grpc_secure_channel, composite_creds_call
+):
+ target = "example.com:443"
+
+ credentials_file = "/path/to/credentials/file.json"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(target, credentials_file=credentials_file)
+
+ google.auth.load_credentials_from_file.assert_called_once_with(
+ credentials_file, scopes=None, default_scopes=None
+ )
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.secure_channel")
+@mock.patch(
+ "google.auth.load_credentials_from_file",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.project),
+)
+def test_create_channel_with_credentials_file_and_scopes(
+ load_credentials_from_file, grpc_secure_channel, composite_creds_call
+):
+ target = "example.com:443"
+ scopes = ["1", "2"]
+
+ credentials_file = "/path/to/credentials/file.json"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(
+ target, credentials_file=credentials_file, scopes=scopes
+ )
+
+ google.auth.load_credentials_from_file.assert_called_once_with(
+ credentials_file, scopes=scopes, default_scopes=None
+ )
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.secure_channel")
+@mock.patch(
+ "google.auth.load_credentials_from_file",
+ autospec=True,
+ return_value=(mock.sentinel.credentials, mock.sentinel.project),
+)
+def test_create_channel_with_credentials_file_and_default_scopes(
+ load_credentials_from_file, grpc_secure_channel, composite_creds_call
+):
+ target = "example.com:443"
+ default_scopes = ["3", "4"]
+
+ credentials_file = "/path/to/credentials/file.json"
+ composite_creds = composite_creds_call.return_value
+
+ channel = grpc_helpers.create_channel(
+ target, credentials_file=credentials_file, default_scopes=default_scopes
+ )
+
+ load_credentials_from_file.assert_called_once_with(
+ credentials_file, scopes=None, default_scopes=default_scopes
+ )
+
+ assert channel is grpc_secure_channel.return_value
+ if grpc_helpers.HAS_GRPC_GCP:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds, None)
+ else:
+ grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@pytest.mark.skipif(
+ not grpc_helpers.HAS_GRPC_GCP, reason="grpc_gcp module not available"
+)
+@mock.patch("grpc_gcp.secure_channel")
+def test_create_channel_with_grpc_gcp(grpc_gcp_secure_channel):
+ target = "example.com:443"
+ scopes = ["test_scope"]
+
+ credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True)
+ credentials.requires_scopes = True
+
+ grpc_helpers.create_channel(target, credentials=credentials, scopes=scopes)
+ grpc_gcp_secure_channel.assert_called()
+
+ credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None)
+
+
+@pytest.mark.skipif(grpc_helpers.HAS_GRPC_GCP, reason="grpc_gcp module not available")
+@mock.patch("grpc.secure_channel")
+def test_create_channel_without_grpc_gcp(grpc_secure_channel):
+ target = "example.com:443"
+ scopes = ["test_scope"]
+
+ credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True)
+ credentials.requires_scopes = True
+
+ grpc_helpers.create_channel(target, credentials=credentials, scopes=scopes)
+ grpc_secure_channel.assert_called()
+
+ credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None)
+
+
+class TestChannelStub(object):
+ def test_single_response(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ expected_request = operations_pb2.GetOperationRequest(name="meep")
+ expected_response = operations_pb2.Operation(name="moop")
+
+ channel.GetOperation.response = expected_response
+
+ response = stub.GetOperation(expected_request)
+
+ assert response == expected_response
+ assert channel.requests == [("GetOperation", expected_request)]
+ assert channel.GetOperation.requests == [expected_request]
+
+ def test_no_response(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ expected_request = operations_pb2.GetOperationRequest(name="meep")
+
+ with pytest.raises(ValueError) as exc_info:
+ stub.GetOperation(expected_request)
+
+ assert exc_info.match("GetOperation")
+
+ def test_missing_method(self):
+ channel = grpc_helpers.ChannelStub()
+
+ with pytest.raises(AttributeError):
+ channel.DoesNotExist.response
+
+ def test_exception_response(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ expected_request = operations_pb2.GetOperationRequest(name="meep")
+
+ channel.GetOperation.response = RuntimeError()
+
+ with pytest.raises(RuntimeError):
+ stub.GetOperation(expected_request)
+
+ def test_callable_response(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ expected_request = operations_pb2.GetOperationRequest(name="meep")
+ expected_response = operations_pb2.Operation(name="moop")
+
+ on_get_operation = mock.Mock(spec=("__call__",), return_value=expected_response)
+
+ channel.GetOperation.response = on_get_operation
+
+ response = stub.GetOperation(expected_request)
+
+ assert response == expected_response
+ on_get_operation.assert_called_once_with(expected_request)
+
+ def test_multiple_responses(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ expected_request = operations_pb2.GetOperationRequest(name="meep")
+ expected_responses = [
+ operations_pb2.Operation(name="foo"),
+ operations_pb2.Operation(name="bar"),
+ operations_pb2.Operation(name="baz"),
+ ]
+
+ channel.GetOperation.responses = iter(expected_responses)
+
+ response1 = stub.GetOperation(expected_request)
+ response2 = stub.GetOperation(expected_request)
+ response3 = stub.GetOperation(expected_request)
+
+ assert response1 == expected_responses[0]
+ assert response2 == expected_responses[1]
+ assert response3 == expected_responses[2]
+ assert channel.requests == [("GetOperation", expected_request)] * 3
+ assert channel.GetOperation.requests == [expected_request] * 3
+
+ with pytest.raises(StopIteration):
+ stub.GetOperation(expected_request)
+
+ def test_multiple_responses_and_single_response_error(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ channel.GetOperation.responses = []
+ channel.GetOperation.response = mock.sentinel.response
+
+ with pytest.raises(ValueError):
+ stub.GetOperation(operations_pb2.GetOperationRequest())
+
+ def test_call_info(self):
+ channel = grpc_helpers.ChannelStub()
+ stub = operations_pb2.OperationsStub(channel)
+ expected_request = operations_pb2.GetOperationRequest(name="meep")
+ expected_response = operations_pb2.Operation(name="moop")
+ expected_metadata = [("red", "blue"), ("two", "shoe")]
+ expected_credentials = mock.sentinel.credentials
+ channel.GetOperation.response = expected_response
+
+ response = stub.GetOperation(
+ expected_request,
+ timeout=42,
+ metadata=expected_metadata,
+ credentials=expected_credentials,
+ )
+
+ assert response == expected_response
+ assert channel.requests == [("GetOperation", expected_request)]
+ assert channel.GetOperation.calls == [
+ (expected_request, 42, expected_metadata, expected_credentials)
+ ]
+
+ def test_unary_unary(self):
+ channel = grpc_helpers.ChannelStub()
+ method_name = "GetOperation"
+ callable_stub = channel.unary_unary(method_name)
+ assert callable_stub._method == method_name
+ assert callable_stub._channel == channel
+
+ def test_unary_stream(self):
+ channel = grpc_helpers.ChannelStub()
+ method_name = "GetOperation"
+ callable_stub = channel.unary_stream(method_name)
+ assert callable_stub._method == method_name
+ assert callable_stub._channel == channel
+
+ def test_stream_unary(self):
+ channel = grpc_helpers.ChannelStub()
+ method_name = "GetOperation"
+ callable_stub = channel.stream_unary(method_name)
+ assert callable_stub._method == method_name
+ assert callable_stub._channel == channel
+
+ def test_stream_stream(self):
+ channel = grpc_helpers.ChannelStub()
+ method_name = "GetOperation"
+ callable_stub = channel.stream_stream(method_name)
+ assert callable_stub._method == method_name
+ assert callable_stub._channel == channel
+
+ def test_subscribe_unsubscribe(self):
+ channel = grpc_helpers.ChannelStub()
+ assert channel.subscribe(None) is None
+ assert channel.unsubscribe(None) is None
+
+ def test_close(self):
+ channel = grpc_helpers.ChannelStub()
+ assert channel.close() is None