aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Anderson <ejona@google.com>2019-06-13 16:46:00 -0700
committerEric Anderson <ejona@google.com>2019-06-14 09:01:29 -0700
commite795f14bedc56af2f2ba60eef1b23add6106ab99 (patch)
tree8c5be7c04262b20a5e366b9d5389a060813154d5
parent0b27e2862da2da952d5073b333ba9a8b9c999c4a (diff)
downloadgrpc-grpc-java-e795f14bedc56af2f2ba60eef1b23add6106ab99.tar.gz
interop-testing: Observe flow control in TestServiceImpl
-rw-r--r--interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java14
1 files changed, 12 insertions, 2 deletions
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java
index 5fe7248b2..8d0c4f421 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java
@@ -228,7 +228,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private class ResponseDispatcher {
private final Chunk completionChunk = new Chunk(0, 0, 0);
private final Queue<Chunk> chunks;
- private final StreamObserver<StreamingOutputCallResponse> responseStream;
+ private final ServerCallStreamObserver<StreamingOutputCallResponse> responseStream;
private boolean scheduled;
@GuardedBy("this") private boolean cancelled;
private Throwable failure;
@@ -268,7 +268,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
*/
public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
this.chunks = Queues.newLinkedBlockingQueue();
- this.responseStream = responseStream;
+ this.responseStream = (ServerCallStreamObserver<StreamingOutputCallResponse>) responseStream;
+ this.responseStream.setOnReadyHandler(new Runnable() {
+ @Override public void run() {
+ scheduleNextChunk();
+ }
+ });
}
/**
@@ -349,6 +354,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
return;
}
+ if (chunks.peek() != completionChunk && !responseStream.isReady()) {
+ // Wait for the onReady handler to be called.
+ return;
+ }
+
// Schedule the next response chunk if there is one.
Chunk nextChunk = chunks.peek();
if (nextChunk != null) {