diff options
author | Wyatt Hepler <hepler@google.com> | 2022-11-14 22:50:54 +0000 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-11-14 22:50:54 +0000 |
commit | 2e641088c546c6518272f114b1ac4f6401a58aee (patch) | |
tree | 03ef2506fb7e9f2f4db575475ea007ba1aa82c91 /pw_transfer | |
parent | 49e27cb5be542c4737a9c0efe1a646c7424e65a6 (diff) | |
download | pigweed-2e641088c546c6518272f114b1ac4f6401a58aee.tar.gz |
pw_transfer: Implement read packet drop recovery in Java
Previously, a transfer parameters packet would be sent in response to
each data packet with an unexpected offset. When working with a large
window, this caused a flood of transfer parameters packets. The server
would repeatedly restart the window, dramatically slowing progress on
unreliable links.
The C++ and Python clients were previously updated to only send a
transfer parameters packet in response to the first packet with an
incorrect offset or a retried packet. This commit implements this in
Java.
With this change the test_1mb_read_dropped_data test in
cross_language_large_read_test.py passes for Java, so enable it.
Fixes: b/250976246
Change-Id: I62cea8362c8cce8a34b81134f331cf76de875204
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/118270
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_transfer')
5 files changed, 166 insertions, 19 deletions
diff --git a/pw_transfer/integration_test/cross_language_large_read_test.py b/pw_transfer/integration_test/cross_language_large_read_test.py index ad626a5d3..568f1b0cd 100644 --- a/pw_transfer/integration_test/cross_language_large_read_test.py +++ b/pw_transfer/integration_test/cross_language_large_read_test.py @@ -49,10 +49,7 @@ class LargeReadTransferIntegrationTest(TransferIntegrationTest): @parameterized.expand([ ("cpp"), - # TODO(b/250976246): This test runs indefinitely (>24 hrs) when using - # the java client, so it is disabled for now until the issue is - # diagnosed. - # ("java"), + ("java"), ("python"), ]) def test_1mb_read_dropped_data(self, client_type): diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java index ec98253fe..f541fcc2c 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java @@ -47,7 +47,9 @@ class ReadTransfer extends Transfer<byte[]> { private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE; private int offset = 0; - private int windowEndOffset; + private int windowEndOffset = 0; + + private int lastReceivedOffset = 0; ReadTransfer(int resourceId, ProtocolVersion desiredProtocolVersion, @@ -83,16 +85,20 @@ class ReadTransfer extends Transfer<byte[]> { @Override VersionedChunk getChunkForRetry() { VersionedChunk chunk = getLastChunkSent(); - // Always send RETRANSMIT packets instead of CONTINUE packets on retries. - if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE) { - return chunk.withType(Chunk.Type.PARAMETERS_RETRANSMIT); + // If the last chunk sent was transfer parameters, send an updated RETRANSMIT chunk. + if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE + || chunk.type() == Chunk.Type.PARAMETERS_RETRANSMIT) { + return prepareTransferParameters(/*extend=*/false); } return chunk; } - class ReceivingData extends ActiveState { + private class ReceivingData extends ActiveState { @Override public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { + // Track the last seen offset so the DropRecovery state can detect retried packets. + lastReceivedOffset = chunk.offset(); + if (chunk.offset() != offset) { logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters", ReadTransfer.this, @@ -102,6 +108,7 @@ class ReadTransfer extends Transfer<byte[]> { // For now, only in-order transfers are supported. If data is received out of order, // discard this data and retransmit from the last received offset. sendChunk(prepareTransferParameters(/*extend=*/false)); + changeState(new DropRecovery()); setNextChunkTimeout(); return; } @@ -145,6 +152,35 @@ class ReadTransfer extends Transfer<byte[]> { } } + /** State for recovering from dropped packets. */ + private class DropRecovery extends ActiveState { + @Override + public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { + if (chunk.offset() == offset) { + logger.atFine().log( + "%s received expected offset %d, resuming transfer", ReadTransfer.this, offset); + changeState(new ReceivingData()).handleDataChunk(chunk); + return; + } + + // To avoid a flood of identical parameters packets, only send one if a retry is detected. + if (chunk.offset() == lastReceivedOffset) { + logger.atFiner().log( + "%s received repeated offset %d: retry detected, resending transfer parameters", + ReadTransfer.this, + lastReceivedOffset); + sendChunk(prepareTransferParameters(/*extend=*/false)); + } else { + lastReceivedOffset = chunk.offset(); + logger.atFiner().log("%s expecting offset %d, ignoring received offset %d", + ReadTransfer.this, + offset, + chunk.offset()); + } + setNextChunkTimeout(); + } + } + @Override void setFutureResult() { updateProgress(totalDataSize, totalDataSize, totalDataSize); diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java index 04027ab5b..39118b5da 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java @@ -371,7 +371,7 @@ abstract class Transfer<T> { /** Enters the recovery state and returns to this state if recovery succeeds. */ @Override public void handleTimeout() throws TransferAbortedException { - changeState(new Recovery(this)).handleTimeout(); + changeState(new TimeoutRecovery(this)).handleTimeout(); } @Override @@ -434,11 +434,11 @@ abstract class Transfer<T> { } /** Recovering from an expired timeout. */ - class Recovery extends ActiveState { + class TimeoutRecovery extends ActiveState { private final State nextState; private int retries; - Recovery(State nextState) { + TimeoutRecovery(State nextState) { this.nextState = nextState; } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java index 2a253382c..9fdfb4d1e 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java @@ -88,8 +88,6 @@ abstract class VersionedChunk { public abstract VersionedChunk build(); } - abstract Builder toBuilder(); - public static VersionedChunk fromMessage(Chunk chunk) { Builder builder = builder(); @@ -160,10 +158,6 @@ abstract class VersionedChunk { return builder().setVersion(desiredVersion).setType(Chunk.Type.START).setResourceId(resourceId); } - public final VersionedChunk withType(Chunk.Type type) { - return toBuilder().setType(type).build(); - } - public Chunk toMessage() { Chunk.Builder chunk = Chunk.newBuilder() .setType(type()) diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java index 6c19b2939..eb0af3c73 100644 --- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java +++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java @@ -1188,6 +1188,120 @@ public final class TransferClientTest { } @Test + public void read_onlySendsOneUpdateAfterDrops() throws Exception { + createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); + TransferParameters params = TransferParameters.create(50, 10, 0); + + // Handshake + enqueueReadChunks(2, // Wait for read RPC open & START packet + newChunk(Chunk.Type.START_ACK, 99) + .setResourceId(7) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + enqueueReadChunks(1, // Ignore the first START_ACK_CONFIRMATION + newChunk(Chunk.Type.START_ACK, 99) + .setResourceId(7) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + // Window 1: server waits for START_ACK_CONFIRMATION, drops 2nd packet + enqueueReadChunks(1, + newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(range(0, 10)), + newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), + newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), + newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50))); + + // Window 2: server waits for retransmit, drops 1st packet + enqueueReadChunks(1, + newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), + newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), + newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)), + newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60))); + + // Window 3: server waits for retransmit, drops last packet + enqueueReadChunks(1, + newChunk(Chunk.Type.DATA, 99).setOffset(10).setData(range(10, 20)), + newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), + newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), + newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50))); + + // Window 4: server waits for continue and retransmit, normal window. + enqueueReadChunks(2, + newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)), + newChunk(Chunk.Type.DATA, 99).setOffset(60).setData(range(60, 70)), + newChunk(Chunk.Type.DATA, 99).setOffset(70).setData(range(70, 80)), + newChunk(Chunk.Type.DATA, 99).setOffset(80).setData(range(80, 90)), + newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100))); + enqueueReadChunks(2, // Ignore continue and retransmit chunks, retry last packet in window + newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)), + newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100))); + + // Window 5: Final packet + enqueueReadChunks(2, // Receive two retries, then send final packet + newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); + enqueueReadChunks(1, // Ignore first COMPLETION packet + newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); + enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 99)); + + ListenableFuture<byte[]> future = transferClient.read(7, params); + // assertThat(future.get()).isEqualTo(range(0, 110).toByteArray()); + while (!future.isDone()) { + } + + assertThat(lastChunks()) + .containsExactly( + // Handshake + initialReadChunk(7, ProtocolVersion.VERSION_TWO, params), + readStartAckConfirmation(99, params), + readStartAckConfirmation(99, params), + // Window 1: send one transfer parameters update after the drop + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) + .setOffset(10) + .setWindowEndOffset(60) + .setMaxChunkSizeBytes(10) + .build(), + // Window 2: send one transfer parameters update after the drop + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) + .setOffset(10) + .setWindowEndOffset(60) + .setMaxChunkSizeBytes(10) + .build(), + // Window 3: send one transfer parameters update after the drop, then continue packet + newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Not seen by server + .setOffset(40) + .setWindowEndOffset(90) + .setMaxChunkSizeBytes(10) + .build(), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after timeout + .setOffset(50) + .setWindowEndOffset(100) + .setMaxChunkSizeBytes(10) + .build(), + // Window 4: send one transfer parameters update after the drop, then continue packet + newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Ignored by server + .setOffset(80) + .setWindowEndOffset(130) + .setMaxChunkSizeBytes(10) + .build(), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after last packet + .setOffset(100) + .setWindowEndOffset(150) + .setMaxChunkSizeBytes(10) + .build(), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet + .setOffset(100) + .setWindowEndOffset(150) + .setMaxChunkSizeBytes(10) + .build(), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet + .setOffset(100) + .setWindowEndOffset(150) + .setMaxChunkSizeBytes(10) + .build(), + // Window 5: final packet and closing handshake + newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build(), + newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build()); + } + + @Test public void read_progressCallbackIsCalled() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = @@ -2290,7 +2404,13 @@ public final class TransferClientTest { .build()); } - /** Receive these chunks after a chunk is sent. */ + /** Receive these read chunks after a chunk is sent. */ + private void enqueueReadChunks(int afterPackets, Chunk.Builder... chunks) { + syncWithTransferThread( + () -> rpcClient.enqueueServerStream(SERVICE, "Read", afterPackets, chunks)); + } + + /** Receive these write chunks after a chunk is sent. */ private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) { syncWithTransferThread( () -> rpcClient.enqueueServerStream(SERVICE, "Write", afterPackets, chunks)); |