diff options
author | Eric Anderson <ejona@google.com> | 2019-06-13 16:46:00 -0700 |
---|---|---|
committer | Eric Anderson <ejona@google.com> | 2019-06-14 09:01:29 -0700 |
commit | e795f14bedc56af2f2ba60eef1b23add6106ab99 (patch) | |
tree | 8c5be7c04262b20a5e366b9d5389a060813154d5 | |
parent | 0b27e2862da2da952d5073b333ba9a8b9c999c4a (diff) | |
download | grpc-grpc-java-e795f14bedc56af2f2ba60eef1b23add6106ab99.tar.gz |
interop-testing: Observe flow control in TestServiceImpl
-rw-r--r-- | interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 5fe7248b2..8d0c4f421 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -228,7 +228,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private class ResponseDispatcher { private final Chunk completionChunk = new Chunk(0, 0, 0); private final Queue<Chunk> chunks; - private final StreamObserver<StreamingOutputCallResponse> responseStream; + private final ServerCallStreamObserver<StreamingOutputCallResponse> responseStream; private boolean scheduled; @GuardedBy("this") private boolean cancelled; private Throwable failure; @@ -268,7 +268,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { */ public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) { this.chunks = Queues.newLinkedBlockingQueue(); - this.responseStream = responseStream; + this.responseStream = (ServerCallStreamObserver<StreamingOutputCallResponse>) responseStream; + this.responseStream.setOnReadyHandler(new Runnable() { + @Override public void run() { + scheduleNextChunk(); + } + }); } /** @@ -349,6 +354,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return; } + if (chunks.peek() != completionChunk && !responseStream.isReady()) { + // Wait for the onReady handler to be called. + return; + } + // Schedule the next response chunk if there is one. Chunk nextChunk = chunks.peek(); if (nextChunk != null) { |