aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer
diff options
context:
space:
mode:
authorWyatt Hepler <hepler@google.com>2022-11-14 22:50:54 +0000
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-11-14 22:50:54 +0000
commit2e641088c546c6518272f114b1ac4f6401a58aee (patch)
tree03ef2506fb7e9f2f4db575475ea007ba1aa82c91 /pw_transfer
parent49e27cb5be542c4737a9c0efe1a646c7424e65a6 (diff)
downloadpigweed-2e641088c546c6518272f114b1ac4f6401a58aee.tar.gz
pw_transfer: Implement read packet drop recovery in Java
Previously, a transfer parameters packet would be sent in response to each data packet with an unexpected offset. When working with a large window, this caused a flood of transfer parameters packets. The server would repeatedly restart the window, dramatically slowing progress on unreliable links. The C++ and Python clients were previously updated to only send a transfer parameters packet in response to the first packet with an incorrect offset or a retried packet. This commit implements this in Java. With this change the test_1mb_read_dropped_data test in cross_language_large_read_test.py passes for Java, so enable it. Fixes: b/250976246 Change-Id: I62cea8362c8cce8a34b81134f331cf76de875204 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/118270 Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com> Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com> Reviewed-by: Armando Montanez <amontanez@google.com> Commit-Queue: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_transfer')
-rw-r--r--pw_transfer/integration_test/cross_language_large_read_test.py5
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java46
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java6
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java6
-rw-r--r--pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java122
5 files changed, 166 insertions, 19 deletions
diff --git a/pw_transfer/integration_test/cross_language_large_read_test.py b/pw_transfer/integration_test/cross_language_large_read_test.py
index ad626a5d3..568f1b0cd 100644
--- a/pw_transfer/integration_test/cross_language_large_read_test.py
+++ b/pw_transfer/integration_test/cross_language_large_read_test.py
@@ -49,10 +49,7 @@ class LargeReadTransferIntegrationTest(TransferIntegrationTest):
@parameterized.expand([
("cpp"),
- # TODO(b/250976246): This test runs indefinitely (>24 hrs) when using
- # the java client, so it is disabled for now until the issue is
- # diagnosed.
- # ("java"),
+ ("java"),
("python"),
])
def test_1mb_read_dropped_data(self, client_type):
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
index ec98253fe..f541fcc2c 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
@@ -47,7 +47,9 @@ class ReadTransfer extends Transfer<byte[]> {
private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE;
private int offset = 0;
- private int windowEndOffset;
+ private int windowEndOffset = 0;
+
+ private int lastReceivedOffset = 0;
ReadTransfer(int resourceId,
ProtocolVersion desiredProtocolVersion,
@@ -83,16 +85,20 @@ class ReadTransfer extends Transfer<byte[]> {
@Override
VersionedChunk getChunkForRetry() {
VersionedChunk chunk = getLastChunkSent();
- // Always send RETRANSMIT packets instead of CONTINUE packets on retries.
- if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE) {
- return chunk.withType(Chunk.Type.PARAMETERS_RETRANSMIT);
+ // If the last chunk sent was transfer parameters, send an updated RETRANSMIT chunk.
+ if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE
+ || chunk.type() == Chunk.Type.PARAMETERS_RETRANSMIT) {
+ return prepareTransferParameters(/*extend=*/false);
}
return chunk;
}
- class ReceivingData extends ActiveState {
+ private class ReceivingData extends ActiveState {
@Override
public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
+ // Track the last seen offset so the DropRecovery state can detect retried packets.
+ lastReceivedOffset = chunk.offset();
+
if (chunk.offset() != offset) {
logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
ReadTransfer.this,
@@ -102,6 +108,7 @@ class ReadTransfer extends Transfer<byte[]> {
// For now, only in-order transfers are supported. If data is received out of order,
// discard this data and retransmit from the last received offset.
sendChunk(prepareTransferParameters(/*extend=*/false));
+ changeState(new DropRecovery());
setNextChunkTimeout();
return;
}
@@ -145,6 +152,35 @@ class ReadTransfer extends Transfer<byte[]> {
}
}
+ /** State for recovering from dropped packets. */
+ private class DropRecovery extends ActiveState {
+ @Override
+ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
+ if (chunk.offset() == offset) {
+ logger.atFine().log(
+ "%s received expected offset %d, resuming transfer", ReadTransfer.this, offset);
+ changeState(new ReceivingData()).handleDataChunk(chunk);
+ return;
+ }
+
+ // To avoid a flood of identical parameters packets, only send one if a retry is detected.
+ if (chunk.offset() == lastReceivedOffset) {
+ logger.atFiner().log(
+ "%s received repeated offset %d: retry detected, resending transfer parameters",
+ ReadTransfer.this,
+ lastReceivedOffset);
+ sendChunk(prepareTransferParameters(/*extend=*/false));
+ } else {
+ lastReceivedOffset = chunk.offset();
+ logger.atFiner().log("%s expecting offset %d, ignoring received offset %d",
+ ReadTransfer.this,
+ offset,
+ chunk.offset());
+ }
+ setNextChunkTimeout();
+ }
+ }
+
@Override
void setFutureResult() {
updateProgress(totalDataSize, totalDataSize, totalDataSize);
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
index 04027ab5b..39118b5da 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
@@ -371,7 +371,7 @@ abstract class Transfer<T> {
/** Enters the recovery state and returns to this state if recovery succeeds. */
@Override
public void handleTimeout() throws TransferAbortedException {
- changeState(new Recovery(this)).handleTimeout();
+ changeState(new TimeoutRecovery(this)).handleTimeout();
}
@Override
@@ -434,11 +434,11 @@ abstract class Transfer<T> {
}
/** Recovering from an expired timeout. */
- class Recovery extends ActiveState {
+ class TimeoutRecovery extends ActiveState {
private final State nextState;
private int retries;
- Recovery(State nextState) {
+ TimeoutRecovery(State nextState) {
this.nextState = nextState;
}
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
index 2a253382c..9fdfb4d1e 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
@@ -88,8 +88,6 @@ abstract class VersionedChunk {
public abstract VersionedChunk build();
}
- abstract Builder toBuilder();
-
public static VersionedChunk fromMessage(Chunk chunk) {
Builder builder = builder();
@@ -160,10 +158,6 @@ abstract class VersionedChunk {
return builder().setVersion(desiredVersion).setType(Chunk.Type.START).setResourceId(resourceId);
}
- public final VersionedChunk withType(Chunk.Type type) {
- return toBuilder().setType(type).build();
- }
-
public Chunk toMessage() {
Chunk.Builder chunk = Chunk.newBuilder()
.setType(type())
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
index 6c19b2939..eb0af3c73 100644
--- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
+++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
@@ -1188,6 +1188,120 @@ public final class TransferClientTest {
}
@Test
+ public void read_onlySendsOneUpdateAfterDrops() throws Exception {
+ createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
+ TransferParameters params = TransferParameters.create(50, 10, 0);
+
+ // Handshake
+ enqueueReadChunks(2, // Wait for read RPC open & START packet
+ newChunk(Chunk.Type.START_ACK, 99)
+ .setResourceId(7)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+ enqueueReadChunks(1, // Ignore the first START_ACK_CONFIRMATION
+ newChunk(Chunk.Type.START_ACK, 99)
+ .setResourceId(7)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ // Window 1: server waits for START_ACK_CONFIRMATION, drops 2nd packet
+ enqueueReadChunks(1,
+ newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(range(0, 10)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)));
+
+ // Window 2: server waits for retransmit, drops 1st packet
+ enqueueReadChunks(1,
+ newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)));
+
+ // Window 3: server waits for retransmit, drops last packet
+ enqueueReadChunks(1,
+ newChunk(Chunk.Type.DATA, 99).setOffset(10).setData(range(10, 20)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)));
+
+ // Window 4: server waits for continue and retransmit, normal window.
+ enqueueReadChunks(2,
+ newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(60).setData(range(60, 70)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(70).setData(range(70, 80)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(80).setData(range(80, 90)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)));
+ enqueueReadChunks(2, // Ignore continue and retransmit chunks, retry last packet in window
+ newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)),
+ newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)));
+
+ // Window 5: Final packet
+ enqueueReadChunks(2, // Receive two retries, then send final packet
+ newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0));
+ enqueueReadChunks(1, // Ignore first COMPLETION packet
+ newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0));
+ enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 99));
+
+ ListenableFuture<byte[]> future = transferClient.read(7, params);
+ // assertThat(future.get()).isEqualTo(range(0, 110).toByteArray());
+ while (!future.isDone()) {
+ }
+
+ assertThat(lastChunks())
+ .containsExactly(
+ // Handshake
+ initialReadChunk(7, ProtocolVersion.VERSION_TWO, params),
+ readStartAckConfirmation(99, params),
+ readStartAckConfirmation(99, params),
+ // Window 1: send one transfer parameters update after the drop
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99)
+ .setOffset(10)
+ .setWindowEndOffset(60)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ // Window 2: send one transfer parameters update after the drop
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99)
+ .setOffset(10)
+ .setWindowEndOffset(60)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ // Window 3: send one transfer parameters update after the drop, then continue packet
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Not seen by server
+ .setOffset(40)
+ .setWindowEndOffset(90)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after timeout
+ .setOffset(50)
+ .setWindowEndOffset(100)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ // Window 4: send one transfer parameters update after the drop, then continue packet
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Ignored by server
+ .setOffset(80)
+ .setWindowEndOffset(130)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after last packet
+ .setOffset(100)
+ .setWindowEndOffset(150)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet
+ .setOffset(100)
+ .setWindowEndOffset(150)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet
+ .setOffset(100)
+ .setWindowEndOffset(150)
+ .setMaxChunkSizeBytes(10)
+ .build(),
+ // Window 5: final packet and closing handshake
+ newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build(),
+ newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build());
+ }
+
+ @Test
public void read_progressCallbackIsCalled() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future =
@@ -2290,7 +2404,13 @@ public final class TransferClientTest {
.build());
}
- /** Receive these chunks after a chunk is sent. */
+ /** Receive these read chunks after a chunk is sent. */
+ private void enqueueReadChunks(int afterPackets, Chunk.Builder... chunks) {
+ syncWithTransferThread(
+ () -> rpcClient.enqueueServerStream(SERVICE, "Read", afterPackets, chunks));
+ }
+
+ /** Receive these write chunks after a chunk is sent. */
private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) {
syncWithTransferThread(
() -> rpcClient.enqueueServerStream(SERVICE, "Write", afterPackets, chunks));