From 55c5040cb5bb0035eb741f20c8989247c505ffe9 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Fri, 18 Aug 2023 10:16:43 -0700 Subject: Remove ThreadlessExecutor from BlockingServerStream (#10496) * Remove ThreadlessExecutor from BlockingServerStream fixes #10490 --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 63 ++++++------------------ 1 file changed, 14 insertions(+), 49 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 74c315058..13fb00d3b 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -133,9 +133,7 @@ public final class ClientCalls { public static RespT blockingUnaryCall(ClientCall call, ReqT req) { try { return getUnchecked(futureUnaryCall(call, req)); - } catch (RuntimeException e) { - throw cancelThrow(call, e); - } catch (Error e) { + } catch (RuntimeException | Error e) { throw cancelThrow(call, e); } } @@ -167,10 +165,7 @@ public final class ClientCalls { } executor.shutdown(); return getUnchecked(responseFuture); - } catch (RuntimeException e) { - // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). - throw cancelThrow(call, e); - } catch (Error e) { + } catch (RuntimeException | Error e) { // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). throw cancelThrow(call, e); } finally { @@ -206,14 +201,12 @@ public final class ClientCalls { * * @return an iterator over the response stream. */ - // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. public static Iterator blockingServerStreamingCall( Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT req) { - ThreadlessExecutor executor = new ThreadlessExecutor(); ClientCall call = channel.newCall(method, - callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) - .withExecutor(executor)); - BlockingResponseStream result = new BlockingResponseStream<>(call, executor); + callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)); + + BlockingResponseStream result = new BlockingResponseStream<>(call); asyncUnaryRequestCall(call, req, result.listener()); return result; } @@ -288,8 +281,7 @@ public final class ClientCalls { private static RuntimeException cancelThrow(ClientCall call, Throwable t) { try { call.cancel(null, t); - } catch (Throwable e) { - assert e instanceof RuntimeException || e instanceof Error; + } catch (RuntimeException | Error e) { logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e); } if (t instanceof RuntimeException) { @@ -320,9 +312,7 @@ public final class ClientCalls { try { call.sendMessage(req); call.halfClose(); - } catch (RuntimeException e) { - throw cancelThrow(call, e); - } catch (Error e) { + } catch (RuntimeException | Error e) { throw cancelThrow(call, e); } } @@ -597,20 +587,12 @@ public final class ClientCalls { private final BlockingQueue buffer = new ArrayBlockingQueue<>(3); private final StartableListener listener = new QueuingListener(); private final ClientCall call; - /** May be null. */ - private final ThreadlessExecutor threadless; // Only accessed when iterating. private Object last; // Non private to avoid synthetic class BlockingResponseStream(ClientCall call) { - this(call, null); - } - - // Non private to avoid synthetic class - BlockingResponseStream(ClientCall call, ThreadlessExecutor threadless) { this.call = call; - this.threadless = threadless; } StartableListener listener() { @@ -620,31 +602,14 @@ public final class ClientCalls { private Object waitForNext() { boolean interrupt = false; try { - if (threadless == null) { - while (true) { - try { - return buffer.take(); - } catch (InterruptedException ie) { - interrupt = true; - call.cancel("Thread interrupted", ie); - // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill - } - } - } else { - Object next; - while ((next = buffer.poll()) == null) { - try { - threadless.waitAndDrain(); - } catch (InterruptedException ie) { - interrupt = true; - call.cancel("Thread interrupted", ie); - // Now wait for onClose() to be called, so interceptors can clean up - } - } - if (next == this || next instanceof StatusRuntimeException) { - threadless.shutdown(); + while (true) { + try { + return buffer.take(); + } catch (InterruptedException ie) { + interrupt = true; + call.cancel("Thread interrupted", ie); + // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill } - return next; } } finally { if (interrupt) { -- cgit v1.2.3