diff options
Diffstat (limited to 'tests/unit/test_grpc_helpers.py')
-rw-r--r-- | tests/unit/test_grpc_helpers.py | 860 |
1 files changed, 860 insertions, 0 deletions
diff --git a/tests/unit/test_grpc_helpers.py b/tests/unit/test_grpc_helpers.py new file mode 100644 index 0000000..ca969e4 --- /dev/null +++ b/tests/unit/test_grpc_helpers.py @@ -0,0 +1,860 @@ +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import pytest + +try: + import grpc +except ImportError: + pytest.skip("No GRPC", allow_module_level=True) + +from google.api_core import exceptions +from google.api_core import grpc_helpers +import google.auth.credentials +from google.longrunning import operations_pb2 + + +def test__patch_callable_name(): + callable = mock.Mock(spec=["__class__"]) + callable.__class__ = mock.Mock(spec=["__name__"]) + callable.__class__.__name__ = "TestCallable" + + grpc_helpers._patch_callable_name(callable) + + assert callable.__name__ == "TestCallable" + + +def test__patch_callable_name_no_op(): + callable = mock.Mock(spec=["__name__"]) + callable.__name__ = "test_callable" + + grpc_helpers._patch_callable_name(callable) + + assert callable.__name__ == "test_callable" + + +class RpcErrorImpl(grpc.RpcError, grpc.Call): + def __init__(self, code): + super(RpcErrorImpl, self).__init__() + self._code = code + + def code(self): + return self._code + + def details(self): + return None + + def trailing_metadata(self): + return None + + +def test_wrap_unary_errors(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + callable_ = mock.Mock(spec=["__call__"], side_effect=grpc_error) + + wrapped_callable = grpc_helpers._wrap_unary_errors(callable_) + + with pytest.raises(exceptions.InvalidArgument) as exc_info: + wrapped_callable(1, 2, three="four") + + callable_.assert_called_once_with(1, 2, three="four") + assert exc_info.value.response == grpc_error + + +class Test_StreamingResponseIterator: + @staticmethod + def _make_wrapped(*items): + return iter(items) + + @staticmethod + def _make_one(wrapped, **kw): + return grpc_helpers._StreamingResponseIterator(wrapped, **kw) + + def test_ctor_defaults(self): + wrapped = self._make_wrapped("a", "b", "c") + iterator = self._make_one(wrapped) + assert iterator._stored_first_result == "a" + assert list(wrapped) == ["b", "c"] + + def test_ctor_explicit(self): + wrapped = self._make_wrapped("a", "b", "c") + iterator = self._make_one(wrapped, prefetch_first_result=False) + assert getattr(iterator, "_stored_first_result", self) is self + assert list(wrapped) == ["a", "b", "c"] + + def test_ctor_w_rpc_error_on_prefetch(self): + wrapped = mock.MagicMock() + wrapped.__next__.side_effect = grpc.RpcError() + + with pytest.raises(grpc.RpcError): + self._make_one(wrapped) + + def test___iter__(self): + wrapped = self._make_wrapped("a", "b", "c") + iterator = self._make_one(wrapped) + assert iter(iterator) is iterator + + def test___next___w_cached_first_result(self): + wrapped = self._make_wrapped("a", "b", "c") + iterator = self._make_one(wrapped) + assert next(iterator) == "a" + iterator = self._make_one(wrapped, prefetch_first_result=False) + assert next(iterator) == "b" + assert next(iterator) == "c" + + def test___next___wo_cached_first_result(self): + wrapped = self._make_wrapped("a", "b", "c") + iterator = self._make_one(wrapped, prefetch_first_result=False) + assert next(iterator) == "a" + assert next(iterator) == "b" + assert next(iterator) == "c" + + def test___next___w_rpc_error(self): + wrapped = mock.MagicMock() + wrapped.__next__.side_effect = grpc.RpcError() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + with pytest.raises(exceptions.GoogleAPICallError): + next(iterator) + + def test_add_callback(self): + wrapped = mock.MagicMock() + callback = mock.Mock(spec={}) + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.add_callback(callback) is wrapped.add_callback.return_value + + wrapped.add_callback.assert_called_once_with(callback) + + def test_cancel(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.cancel() is wrapped.cancel.return_value + + wrapped.cancel.assert_called_once_with() + + def test_code(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.code() is wrapped.code.return_value + + wrapped.code.assert_called_once_with() + + def test_details(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.details() is wrapped.details.return_value + + wrapped.details.assert_called_once_with() + + def test_initial_metadata(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.initial_metadata() is wrapped.initial_metadata.return_value + + wrapped.initial_metadata.assert_called_once_with() + + def test_is_active(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.is_active() is wrapped.is_active.return_value + + wrapped.is_active.assert_called_once_with() + + def test_time_remaining(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.time_remaining() is wrapped.time_remaining.return_value + + wrapped.time_remaining.assert_called_once_with() + + def test_trailing_metadata(self): + wrapped = mock.MagicMock() + iterator = self._make_one(wrapped, prefetch_first_result=False) + + assert iterator.trailing_metadata() is wrapped.trailing_metadata.return_value + + wrapped.trailing_metadata.assert_called_once_with() + + +def test_wrap_stream_okay(): + expected_responses = [1, 2, 3] + callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses)) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + + got_iterator = wrapped_callable(1, 2, three="four") + + responses = list(got_iterator) + + callable_.assert_called_once_with(1, 2, three="four") + assert responses == expected_responses + + +def test_wrap_stream_prefetch_disabled(): + responses = [1, 2, 3] + iter_responses = iter(responses) + callable_ = mock.Mock(spec=["__call__"], return_value=iter_responses) + callable_._prefetch_first_result_ = False + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + wrapped_callable(1, 2, three="four") + + assert list(iter_responses) == responses # no items should have been pre-fetched + callable_.assert_called_once_with(1, 2, three="four") + + +def test_wrap_stream_iterable_iterface(): + response_iter = mock.create_autospec(grpc.Call, instance=True) + callable_ = mock.Mock(spec=["__call__"], return_value=response_iter) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + + got_iterator = wrapped_callable() + + callable_.assert_called_once_with() + + # Check each aliased method in the grpc.Call interface + got_iterator.add_callback(mock.sentinel.callback) + response_iter.add_callback.assert_called_once_with(mock.sentinel.callback) + + got_iterator.cancel() + response_iter.cancel.assert_called_once_with() + + got_iterator.code() + response_iter.code.assert_called_once_with() + + got_iterator.details() + response_iter.details.assert_called_once_with() + + got_iterator.initial_metadata() + response_iter.initial_metadata.assert_called_once_with() + + got_iterator.is_active() + response_iter.is_active.assert_called_once_with() + + got_iterator.time_remaining() + response_iter.time_remaining.assert_called_once_with() + + got_iterator.trailing_metadata() + response_iter.trailing_metadata.assert_called_once_with() + + +def test_wrap_stream_errors_invocation(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + callable_ = mock.Mock(spec=["__call__"], side_effect=grpc_error) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + + with pytest.raises(exceptions.InvalidArgument) as exc_info: + wrapped_callable(1, 2, three="four") + + callable_.assert_called_once_with(1, 2, three="four") + assert exc_info.value.response == grpc_error + + +def test_wrap_stream_empty_iterator(): + expected_responses = [] + callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses)) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + + got_iterator = wrapped_callable() + + responses = list(got_iterator) + + callable_.assert_called_once_with() + assert responses == expected_responses + + +class RpcResponseIteratorImpl(object): + def __init__(self, iterable): + self._iterable = iter(iterable) + + def next(self): + next_item = next(self._iterable) + if isinstance(next_item, RpcErrorImpl): + raise next_item + return next_item + + __next__ = next + + +def test_wrap_stream_errors_iterator_initialization(): + grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE) + response_iter = RpcResponseIteratorImpl([grpc_error]) + callable_ = mock.Mock(spec=["__call__"], return_value=response_iter) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + + with pytest.raises(exceptions.ServiceUnavailable) as exc_info: + wrapped_callable(1, 2, three="four") + + callable_.assert_called_once_with(1, 2, three="four") + assert exc_info.value.response == grpc_error + + +def test_wrap_stream_errors_during_iteration(): + grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE) + response_iter = RpcResponseIteratorImpl([1, grpc_error]) + callable_ = mock.Mock(spec=["__call__"], return_value=response_iter) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + got_iterator = wrapped_callable(1, 2, three="four") + next(got_iterator) + + with pytest.raises(exceptions.ServiceUnavailable) as exc_info: + next(got_iterator) + + callable_.assert_called_once_with(1, 2, three="four") + assert exc_info.value.response == grpc_error + + +@mock.patch("google.api_core.grpc_helpers._wrap_unary_errors") +def test_wrap_errors_non_streaming(wrap_unary_errors): + callable_ = mock.create_autospec(grpc.UnaryUnaryMultiCallable) + + result = grpc_helpers.wrap_errors(callable_) + + assert result == wrap_unary_errors.return_value + wrap_unary_errors.assert_called_once_with(callable_) + + +@mock.patch("google.api_core.grpc_helpers._wrap_stream_errors") +def test_wrap_errors_streaming(wrap_stream_errors): + callable_ = mock.create_autospec(grpc.UnaryStreamMultiCallable) + + result = grpc_helpers.wrap_errors(callable_) + + assert result == wrap_stream_errors.return_value + wrap_stream_errors.assert_called_once_with(callable_) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.secure_channel") +def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_call): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel(target) + + assert channel is grpc_secure_channel.return_value + + default.assert_called_once_with(scopes=None, default_scopes=None) + + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("google.auth.transport.grpc.AuthMetadataPlugin", autospec=True) +@mock.patch( + "google.auth.transport.requests.Request", + autospec=True, + return_value=mock.sentinel.Request, +) +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.project), +) +@mock.patch("grpc.secure_channel") +def test_create_channel_implicit_with_default_host( + grpc_secure_channel, default, composite_creds_call, request, auth_metadata_plugin +): + target = "example.com:443" + default_host = "example.com" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel(target, default_host=default_host) + + assert channel is grpc_secure_channel.return_value + + default.assert_called_once_with(scopes=None, default_scopes=None) + auth_metadata_plugin.assert_called_once_with( + mock.sentinel.credentials, mock.sentinel.Request, default_host=default_host + ) + + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.secure_channel") +def test_create_channel_implicit_with_ssl_creds( + grpc_secure_channel, default, composite_creds_call +): + target = "example.com:443" + + ssl_creds = grpc.ssl_channel_credentials() + + grpc_helpers.create_channel(target, ssl_credentials=ssl_creds) + + default.assert_called_once_with(scopes=None, default_scopes=None) + + composite_creds_call.assert_called_once_with(ssl_creds, mock.ANY) + composite_creds = composite_creds_call.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.secure_channel") +def test_create_channel_implicit_with_scopes( + grpc_secure_channel, default, composite_creds_call +): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel(target, scopes=["one", "two"]) + + assert channel is grpc_secure_channel.return_value + + default.assert_called_once_with(scopes=["one", "two"], default_scopes=None) + + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.secure_channel") +def test_create_channel_implicit_with_default_scopes( + grpc_secure_channel, default, composite_creds_call +): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel(target, default_scopes=["three", "four"]) + + assert channel is grpc_secure_channel.return_value + + default.assert_called_once_with(scopes=None, default_scopes=["three", "four"]) + + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +def test_create_channel_explicit_with_duplicate_credentials(): + target = "example.com:443" + + with pytest.raises(exceptions.DuplicateCredentialArgs): + grpc_helpers.create_channel( + target, + credentials_file="credentials.json", + credentials=mock.sentinel.credentials, + ) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("google.auth.credentials.with_scopes_if_required", autospec=True) +@mock.patch("grpc.secure_channel") +def test_create_channel_explicit(grpc_secure_channel, auth_creds, composite_creds_call): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel(target, credentials=mock.sentinel.credentials) + + auth_creds.assert_called_once_with( + mock.sentinel.credentials, scopes=None, default_scopes=None + ) + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.secure_channel") +def test_create_channel_explicit_scoped(grpc_secure_channel, composite_creds_call): + target = "example.com:443" + scopes = ["1", "2"] + composite_creds = composite_creds_call.return_value + + credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True) + credentials.requires_scopes = True + + channel = grpc_helpers.create_channel( + target, credentials=credentials, scopes=scopes + ) + + credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None) + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.secure_channel") +def test_create_channel_explicit_default_scopes( + grpc_secure_channel, composite_creds_call +): + target = "example.com:443" + default_scopes = ["3", "4"] + composite_creds = composite_creds_call.return_value + + credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True) + credentials.requires_scopes = True + + channel = grpc_helpers.create_channel( + target, credentials=credentials, default_scopes=default_scopes + ) + + credentials.with_scopes.assert_called_once_with( + scopes=None, default_scopes=default_scopes + ) + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.secure_channel") +def test_create_channel_explicit_with_quota_project( + grpc_secure_channel, composite_creds_call +): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + credentials = mock.create_autospec( + google.auth.credentials.CredentialsWithQuotaProject, instance=True + ) + + channel = grpc_helpers.create_channel( + target, credentials=credentials, quota_project_id="project-foo" + ) + + credentials.with_quota_project.assert_called_once_with("project-foo") + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.secure_channel") +@mock.patch( + "google.auth.load_credentials_from_file", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.project), +) +def test_create_channel_with_credentials_file( + load_credentials_from_file, grpc_secure_channel, composite_creds_call +): + target = "example.com:443" + + credentials_file = "/path/to/credentials/file.json" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel(target, credentials_file=credentials_file) + + google.auth.load_credentials_from_file.assert_called_once_with( + credentials_file, scopes=None, default_scopes=None + ) + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.secure_channel") +@mock.patch( + "google.auth.load_credentials_from_file", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.project), +) +def test_create_channel_with_credentials_file_and_scopes( + load_credentials_from_file, grpc_secure_channel, composite_creds_call +): + target = "example.com:443" + scopes = ["1", "2"] + + credentials_file = "/path/to/credentials/file.json" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel( + target, credentials_file=credentials_file, scopes=scopes + ) + + google.auth.load_credentials_from_file.assert_called_once_with( + credentials_file, scopes=scopes, default_scopes=None + ) + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.secure_channel") +@mock.patch( + "google.auth.load_credentials_from_file", + autospec=True, + return_value=(mock.sentinel.credentials, mock.sentinel.project), +) +def test_create_channel_with_credentials_file_and_default_scopes( + load_credentials_from_file, grpc_secure_channel, composite_creds_call +): + target = "example.com:443" + default_scopes = ["3", "4"] + + credentials_file = "/path/to/credentials/file.json" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers.create_channel( + target, credentials_file=credentials_file, default_scopes=default_scopes + ) + + load_credentials_from_file.assert_called_once_with( + credentials_file, scopes=None, default_scopes=default_scopes + ) + + assert channel is grpc_secure_channel.return_value + if grpc_helpers.HAS_GRPC_GCP: + grpc_secure_channel.assert_called_once_with(target, composite_creds, None) + else: + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@pytest.mark.skipif( + not grpc_helpers.HAS_GRPC_GCP, reason="grpc_gcp module not available" +) +@mock.patch("grpc_gcp.secure_channel") +def test_create_channel_with_grpc_gcp(grpc_gcp_secure_channel): + target = "example.com:443" + scopes = ["test_scope"] + + credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True) + credentials.requires_scopes = True + + grpc_helpers.create_channel(target, credentials=credentials, scopes=scopes) + grpc_gcp_secure_channel.assert_called() + + credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None) + + +@pytest.mark.skipif(grpc_helpers.HAS_GRPC_GCP, reason="grpc_gcp module not available") +@mock.patch("grpc.secure_channel") +def test_create_channel_without_grpc_gcp(grpc_secure_channel): + target = "example.com:443" + scopes = ["test_scope"] + + credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True) + credentials.requires_scopes = True + + grpc_helpers.create_channel(target, credentials=credentials, scopes=scopes) + grpc_secure_channel.assert_called() + + credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None) + + +class TestChannelStub(object): + def test_single_response(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + expected_request = operations_pb2.GetOperationRequest(name="meep") + expected_response = operations_pb2.Operation(name="moop") + + channel.GetOperation.response = expected_response + + response = stub.GetOperation(expected_request) + + assert response == expected_response + assert channel.requests == [("GetOperation", expected_request)] + assert channel.GetOperation.requests == [expected_request] + + def test_no_response(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + expected_request = operations_pb2.GetOperationRequest(name="meep") + + with pytest.raises(ValueError) as exc_info: + stub.GetOperation(expected_request) + + assert exc_info.match("GetOperation") + + def test_missing_method(self): + channel = grpc_helpers.ChannelStub() + + with pytest.raises(AttributeError): + channel.DoesNotExist.response + + def test_exception_response(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + expected_request = operations_pb2.GetOperationRequest(name="meep") + + channel.GetOperation.response = RuntimeError() + + with pytest.raises(RuntimeError): + stub.GetOperation(expected_request) + + def test_callable_response(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + expected_request = operations_pb2.GetOperationRequest(name="meep") + expected_response = operations_pb2.Operation(name="moop") + + on_get_operation = mock.Mock(spec=("__call__",), return_value=expected_response) + + channel.GetOperation.response = on_get_operation + + response = stub.GetOperation(expected_request) + + assert response == expected_response + on_get_operation.assert_called_once_with(expected_request) + + def test_multiple_responses(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + expected_request = operations_pb2.GetOperationRequest(name="meep") + expected_responses = [ + operations_pb2.Operation(name="foo"), + operations_pb2.Operation(name="bar"), + operations_pb2.Operation(name="baz"), + ] + + channel.GetOperation.responses = iter(expected_responses) + + response1 = stub.GetOperation(expected_request) + response2 = stub.GetOperation(expected_request) + response3 = stub.GetOperation(expected_request) + + assert response1 == expected_responses[0] + assert response2 == expected_responses[1] + assert response3 == expected_responses[2] + assert channel.requests == [("GetOperation", expected_request)] * 3 + assert channel.GetOperation.requests == [expected_request] * 3 + + with pytest.raises(StopIteration): + stub.GetOperation(expected_request) + + def test_multiple_responses_and_single_response_error(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + channel.GetOperation.responses = [] + channel.GetOperation.response = mock.sentinel.response + + with pytest.raises(ValueError): + stub.GetOperation(operations_pb2.GetOperationRequest()) + + def test_call_info(self): + channel = grpc_helpers.ChannelStub() + stub = operations_pb2.OperationsStub(channel) + expected_request = operations_pb2.GetOperationRequest(name="meep") + expected_response = operations_pb2.Operation(name="moop") + expected_metadata = [("red", "blue"), ("two", "shoe")] + expected_credentials = mock.sentinel.credentials + channel.GetOperation.response = expected_response + + response = stub.GetOperation( + expected_request, + timeout=42, + metadata=expected_metadata, + credentials=expected_credentials, + ) + + assert response == expected_response + assert channel.requests == [("GetOperation", expected_request)] + assert channel.GetOperation.calls == [ + (expected_request, 42, expected_metadata, expected_credentials) + ] + + def test_unary_unary(self): + channel = grpc_helpers.ChannelStub() + method_name = "GetOperation" + callable_stub = channel.unary_unary(method_name) + assert callable_stub._method == method_name + assert callable_stub._channel == channel + + def test_unary_stream(self): + channel = grpc_helpers.ChannelStub() + method_name = "GetOperation" + callable_stub = channel.unary_stream(method_name) + assert callable_stub._method == method_name + assert callable_stub._channel == channel + + def test_stream_unary(self): + channel = grpc_helpers.ChannelStub() + method_name = "GetOperation" + callable_stub = channel.stream_unary(method_name) + assert callable_stub._method == method_name + assert callable_stub._channel == channel + + def test_stream_stream(self): + channel = grpc_helpers.ChannelStub() + method_name = "GetOperation" + callable_stub = channel.stream_stream(method_name) + assert callable_stub._method == method_name + assert callable_stub._channel == channel + + def test_subscribe_unsubscribe(self): + channel = grpc_helpers.ChannelStub() + assert channel.subscribe(None) is None + assert channel.unsubscribe(None) is None + + def test_close(self): + channel = grpc_helpers.ChannelStub() + assert channel.close() is None |