diff options
author | Jon Wayne Parrott <jonwayne@google.com> | 2018-03-06 15:55:19 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-06 15:55:19 -0800 |
commit | e1976858f124e16bf452013f6308bf8a1103f797 (patch) | |
tree | 640e30246c9102977a2f4f06fd60bf28387c5c7b /google | |
parent | 35e87e0aca52167029784379ca84e979098e1d6c (diff) | |
download | python-api-core-e1976858f124e16bf452013f6308bf8a1103f797.tar.gz |
Use a class to wrap grpc streaming errors instead of monkey-patching (#4995)
Diffstat (limited to 'google')
-rw-r--r-- | google/api_core/grpc_helpers.py | 62 |
1 files changed, 50 insertions, 12 deletions
diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index 7d81c75..329971e 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -58,6 +58,55 @@ def _wrap_unary_errors(callable_): return error_remapped_callable +class _StreamingResponseIterator(grpc.Call): + def __init__(self, wrapped): + self._wrapped = wrapped + + def __iter__(self): + """This iterator is also an iterable that returns itself.""" + return self + + def next(self): + """Get the next response from the stream. + + Returns: + protobuf.Message: A single response from the stream. + """ + try: + return six.next(self._wrapped) + except grpc.RpcError as exc: + six.raise_from(exceptions.from_grpc_error(exc), exc) + + # Alias needed for Python 2/3 support. + __next__ = next + + # grpc.Call & grpc.RpcContext interface + + def add_callback(self, callback): + return self._wrapped.add_callback(callback) + + def cancel(self): + return self._wrapped.cancel() + + def code(self): + return self._wrapped.code() + + def details(self): + return self._wrapped.details() + + def initial_metadata(self): + return self._wrapped.initial_metadata() + + def is_active(self): + return self._wrapped.is_active() + + def time_remaining(self): + return self._wrapped.time_remaining() + + def trailing_metadata(self): + return self._wrapped.trailing_metadata() + + def _wrap_stream_errors(callable_): """Wrap errors for Unary-Stream and Stream-Stream gRPC callables. @@ -71,18 +120,7 @@ def _wrap_stream_errors(callable_): def error_remapped_callable(*args, **kwargs): try: result = callable_(*args, **kwargs) - # Note: we are patching the private grpc._channel._Rendezvous._next - # method as magic methods (__next__ in this case) can not be - # patched on a per-instance basis (see - # https://docs.python.org/3/reference/datamodel.html - # #special-lookup). - # In an ideal world, gRPC would return a *specific* interface - # from *StreamMultiCallables, but they return a God class that's - # a combination of basically every interface in gRPC making it - # untenable for us to implement a wrapper object using the same - # interface. - result._next = _wrap_unary_errors(result._next) - return result + return _StreamingResponseIterator(result) except grpc.RpcError as exc: six.raise_from(exceptions.from_grpc_error(exc), exc) |