aboutsummaryrefslogtreecommitdiff
path: root/google
diff options
context:
space:
mode:
authorJon Wayne Parrott <jonwayne@google.com>2018-03-06 15:55:19 -0800
committerGitHub <noreply@github.com>2018-03-06 15:55:19 -0800
commite1976858f124e16bf452013f6308bf8a1103f797 (patch)
tree640e30246c9102977a2f4f06fd60bf28387c5c7b /google
parent35e87e0aca52167029784379ca84e979098e1d6c (diff)
downloadpython-api-core-e1976858f124e16bf452013f6308bf8a1103f797.tar.gz
Use a class to wrap grpc streaming errors instead of monkey-patching (#4995)
Diffstat (limited to 'google')
-rw-r--r--google/api_core/grpc_helpers.py62
1 files changed, 50 insertions, 12 deletions
diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py
index 7d81c75..329971e 100644
--- a/google/api_core/grpc_helpers.py
+++ b/google/api_core/grpc_helpers.py
@@ -58,6 +58,55 @@ def _wrap_unary_errors(callable_):
return error_remapped_callable
+class _StreamingResponseIterator(grpc.Call):
+ def __init__(self, wrapped):
+ self._wrapped = wrapped
+
+ def __iter__(self):
+ """This iterator is also an iterable that returns itself."""
+ return self
+
+ def next(self):
+ """Get the next response from the stream.
+
+ Returns:
+ protobuf.Message: A single response from the stream.
+ """
+ try:
+ return six.next(self._wrapped)
+ except grpc.RpcError as exc:
+ six.raise_from(exceptions.from_grpc_error(exc), exc)
+
+ # Alias needed for Python 2/3 support.
+ __next__ = next
+
+ # grpc.Call & grpc.RpcContext interface
+
+ def add_callback(self, callback):
+ return self._wrapped.add_callback(callback)
+
+ def cancel(self):
+ return self._wrapped.cancel()
+
+ def code(self):
+ return self._wrapped.code()
+
+ def details(self):
+ return self._wrapped.details()
+
+ def initial_metadata(self):
+ return self._wrapped.initial_metadata()
+
+ def is_active(self):
+ return self._wrapped.is_active()
+
+ def time_remaining(self):
+ return self._wrapped.time_remaining()
+
+ def trailing_metadata(self):
+ return self._wrapped.trailing_metadata()
+
+
def _wrap_stream_errors(callable_):
"""Wrap errors for Unary-Stream and Stream-Stream gRPC callables.
@@ -71,18 +120,7 @@ def _wrap_stream_errors(callable_):
def error_remapped_callable(*args, **kwargs):
try:
result = callable_(*args, **kwargs)
- # Note: we are patching the private grpc._channel._Rendezvous._next
- # method as magic methods (__next__ in this case) can not be
- # patched on a per-instance basis (see
- # https://docs.python.org/3/reference/datamodel.html
- # #special-lookup).
- # In an ideal world, gRPC would return a *specific* interface
- # from *StreamMultiCallables, but they return a God class that's
- # a combination of basically every interface in gRPC making it
- # untenable for us to implement a wrapper object using the same
- # interface.
- result._next = _wrap_unary_errors(result._next)
- return result
+ return _StreamingResponseIterator(result)
except grpc.RpcError as exc:
six.raise_from(exceptions.from_grpc_error(exc), exc)