aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer
diff options
context:
space:
mode:
authorWyatt Hepler <hepler@google.com>2022-11-14 22:39:19 +0000
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-11-14 22:39:19 +0000
commit49e27cb5be542c4737a9c0efe1a646c7424e65a6 (patch)
tree3ac3bff43ecaa7108ef43683d6644b1ccc22c8ca /pw_transfer
parent675e2b1b8d5592a2ba7681d812b3ecaf8e6d32c4 (diff)
downloadpigweed-49e27cb5be542c4737a9c0efe1a646c7424e65a6.tar.gz
pw_transfer: Update Java client to support v2 protocol
- Support the v2 protocol, including implementing starting and ending handshakes. - Move retry functionality into the base Transfer class to share more code between read and write transfers. - Have sendChunk() throw an exception rather than return false transfer is aborted. This simplifies the logic around sendChunk() calls. - Move the cleanUp() function into the Completed() state constructor. This removes the need to check if cleanup has already occurred. - Update resource/session ID handling to use resource ID until session ID has been assigned. - Improve log statements by adding a descriptive toString() for Transfer objects. - Make legacy and v2 versions of all of the unit tests. - Have tests that need to timeout use a slightly different test version of the transfer thread run loop to prevent test flakes. - Add a small section about the Java client to the docs. Bug: b/235517604, b/235387007, b/257343283 Change-Id: Ib969b843a1eac3d7e3d0007c492d5e50e4c0f8c9 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/117151 Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com> Reviewed-by: Alexei Frolov <frolv@google.com> Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Diffstat (limited to 'pw_transfer')
-rw-r--r--pw_transfer/docs.rst31
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java26
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java413
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java56
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java5
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java119
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java28
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java63
-rw-r--r--pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel1
-rw-r--r--pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java1809
10 files changed, 2029 insertions, 522 deletions
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index 6ed214d14..e1ad8babc 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -311,7 +311,6 @@ Python
Typescript
==========
-
Provides a simple interface for transferring bulk data over pw_rpc.
**Example**
@@ -339,6 +338,36 @@ Provides a simple interface for transferring bulk data over pw_rpc.
console.log(`Failed to read: ${error.status}`);
});
+Java
+====
+pw_transfer provides a Java client. The transfer client returns a
+`ListenableFuture <https://guava.dev/releases/21.0/api/docs/com/google/common/util/concurrent/ListenableFuture>`_
+to represent the results of a read or write transfer.
+
+.. code-block:: java
+
+ import dev.pigweed.pw_transfer.TransferClient;
+
+ // Create a new transfer client.
+ TransferClient client = new TransferClient(
+ transferReadMethodClient,
+ transferWriteMethodClient,
+ transferTimeoutMillis,
+ initialTransferTimeoutMillis,
+ maxRetries);
+
+ // Start a read transfer.
+ ListenableFuture<byte[]> readTransfer = client.read(123);
+
+ // Start a write transfer.
+ ListenableFuture<Void> writeTransfer = client.write(123, dataToWrite);
+
+ // Get the data from the read transfer.
+ byte[] readData = readTransfer.get();
+
+ // Wait for the write transfer to complete.
+ writeTransfer.get();
+
--------
Protocol
--------
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
index f564029e0..ec98253fe 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
@@ -50,6 +50,7 @@ class ReadTransfer extends Transfer<byte[]> {
private int windowEndOffset;
ReadTransfer(int resourceId,
+ ProtocolVersion desiredProtocolVersion,
TransferInterface transferManager,
int timeoutMillis,
int initialTimeoutMillis,
@@ -58,6 +59,7 @@ class ReadTransfer extends Transfer<byte[]> {
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(resourceId,
+ desiredProtocolVersion,
transferManager,
timeoutMillis,
initialTimeoutMillis,
@@ -74,10 +76,8 @@ class ReadTransfer extends Transfer<byte[]> {
}
@Override
- VersionedChunk getInitialChunk(ProtocolVersion desiredProcotolVersion) {
- return setTransferParameters(
- VersionedChunk.createInitialChunk(desiredProcotolVersion, getResourceId()))
- .build();
+ void prepareInitialChunk(VersionedChunk.Builder chunk) {
+ setTransferParameters(chunk);
}
@Override
@@ -90,18 +90,12 @@ class ReadTransfer extends Transfer<byte[]> {
return chunk;
}
- class ReceivingData extends State {
+ class ReceivingData extends ActiveState {
@Override
- void handleTimeout() {
- setState(new Recovery());
- }
-
- @Override
- void handleDataChunk(VersionedChunk chunk) {
+ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
if (chunk.offset() != offset) {
- logger.atFine().log(
- "Transfer %d expected offset %d, received %d; resending transfer parameters",
- getId(),
+ logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
+ ReadTransfer.this,
offset,
chunk.offset());
@@ -120,7 +114,7 @@ class ReadTransfer extends Transfer<byte[]> {
if (chunk.remainingBytes().isPresent()) {
if (chunk.remainingBytes().getAsLong() == 0) {
- setStateCompletedAndSendFinalChunk(Status.OK);
+ setStateTerminatingAndSendFinalChunk(Status.OK);
return;
}
@@ -142,7 +136,7 @@ class ReadTransfer extends Transfer<byte[]> {
if (remainingWindowSize == 0) {
logger.atFiner().log(
- "Transfer %d received all pending bytes; sending transfer parameters update", getId());
+ "%s received all pending bytes; sending transfer parameters update", ReadTransfer.this);
sendChunk(prepareTransferParameters(/*extend=*/false));
} else if (extendWindow) {
sendChunk(prepareTransferParameters(/*extend=*/true));
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
index 06ecf45b8..04027ab5b 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
@@ -20,8 +20,10 @@ import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
import com.google.common.util.concurrent.SettableFuture;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
+import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
import java.time.Duration;
import java.time.Instant;
+import java.util.Locale;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@@ -32,9 +34,6 @@ abstract class Transfer<T> {
// Largest nanosecond instant. Used to block indefinitely when no transfers are pending.
static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE);
- // TODO(hepler): Make this configurable
- private static final ProtocolVersion DESIRED_PROTOCOL_VERSION = ProtocolVersion.LEGACY;
-
private final int resourceId;
private final ProtocolVersion desiredProtocolVersion;
private final TransferEventHandler.TransferInterface eventHandler;
@@ -48,9 +47,8 @@ abstract class Transfer<T> {
private int sessionId = VersionedChunk.UNASSIGNED_SESSION_ID;
private ProtocolVersion configuredProtocolVersion = ProtocolVersion.UNKNOWN;
- private State state = new Inactive();
private Instant deadline = NO_TIMEOUT;
- private boolean isCleanedUp = false;
+ private State state;
private VersionedChunk lastChunkSent;
// The number of times this transfer has retried due to an RPC disconnection. Limit this to
@@ -60,8 +58,8 @@ abstract class Transfer<T> {
/**
* Creates a new read or write transfer.
- *
* @param resourceId The resource ID of the transfer
+ * @param desiredProtocolVersion protocol version to request
* @param eventHandler Interface to use to send a chunk.
* @param timeoutMillis Maximum time to wait for a chunk from the server.
* @param initialTimeoutMillis Maximum time to wait for the first chunk from the server.
@@ -70,14 +68,15 @@ abstract class Transfer<T> {
* @param shouldAbortCallback BooleanSupplier that returns true if a transfer should be aborted.
*/
Transfer(int resourceId,
- TransferEventHandler.TransferInterface eventHandler,
+ ProtocolVersion desiredProtocolVersion,
+ TransferInterface eventHandler,
int timeoutMillis,
int initialTimeoutMillis,
int maxRetries,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
this.resourceId = resourceId;
- this.desiredProtocolVersion = DESIRED_PROTOCOL_VERSION;
+ this.desiredProtocolVersion = desiredProtocolVersion;
this.eventHandler = eventHandler;
this.future = SettableFuture.create();
@@ -94,16 +93,44 @@ abstract class Transfer<T> {
}
}, directExecutor());
+ if (desiredProtocolVersion == ProtocolVersion.LEGACY) {
+ // Legacy transfers skip protocol negotiation stage and use the resource ID as the session ID.
+ configuredProtocolVersion = ProtocolVersion.LEGACY;
+ assignSessionId(resourceId);
+ state = getWaitingForDataState();
+ } else {
+ state = new Initiating();
+ }
+
startTime = Instant.now();
}
- final int getResourceId() {
+ @Override
+ public String toString() {
+ return String.format(Locale.ENGLISH,
+ "%s(%d:%d)[%s]",
+ this.getClass().getSimpleName(),
+ resourceId,
+ sessionId,
+ state.getClass().getSimpleName());
+ }
+
+ public final int getResourceId() {
return resourceId;
}
- /** Returns the current ID for this transfer, which may be the session or resource ID. */
- final int getId() {
- return sessionId != VersionedChunk.UNASSIGNED_SESSION_ID ? sessionId : resourceId;
+ public final int getSessionId() {
+ return sessionId;
+ }
+
+ private void assignSessionId(int newSessionId) {
+ sessionId = newSessionId;
+ eventHandler.assignSessionId(this);
+ }
+
+ /** Terminates the transfer without sending any packets. */
+ public final void terminate(TransferError error) {
+ changeState(new Completed(error));
}
final Instant getDeadline() {
@@ -127,27 +154,20 @@ abstract class Transfer<T> {
}
final void start() {
- logger.atFine().log(
- "Transfer %d for resource %d starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires",
- getId(),
- getResourceId(),
+ logger.atInfo().log(
+ "%s starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires",
+ this,
timeoutMillis,
initialTimeoutMillis,
maxRetries);
- if (!sendChunk(getInitialChunk(desiredProtocolVersion))) {
+ VersionedChunk.Builder chunk =
+ VersionedChunk.createInitialChunk(desiredProtocolVersion, resourceId);
+ prepareInitialChunk(chunk);
+ try {
+ sendChunk(chunk.build());
+ } catch (TransferAbortedException e) {
return; // Sending failed, transfer is cancelled
}
-
- if (desiredProtocolVersion == ProtocolVersion.LEGACY) {
- // Legacy transfers skip the protocol negotiation stage and use the resource ID as the session
- // ID.
- configuredProtocolVersion = ProtocolVersion.LEGACY;
- sessionId = resourceId;
- setState(getWaitingForDataState());
- } else {
- setState(new Initiating());
- throw new AssertionError("Cannot set desired protocol to v2; not implemented yet!");
- }
setInitialTimeout();
}
@@ -156,29 +176,28 @@ abstract class Transfer<T> {
// Since a packet has been received, don't allow retries on disconnection; abort instead.
disconnectionRetries = Integer.MAX_VALUE;
- if (chunk.version() == ProtocolVersion.UNKNOWN) {
- logger.atWarning().log(
- "Cannot handle packet from unsupported session ID %d", chunk.sessionId());
- setStateCompletedAndSendFinalChunk(Status.INVALID_ARGUMENT);
- return;
- }
-
- if (chunk.type() == Chunk.Type.COMPLETION) {
- logger.atFinest().log("Event: handle final chunk");
- state.handleFinalChunk(chunk.status().orElseGet(() -> {
- logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL");
- return Status.INTERNAL.code();
- }));
- } else {
- logger.atFinest().log("Event: handle data chunk");
- state.handleDataChunk(chunk);
+ try {
+ if (chunk.type() == Chunk.Type.COMPLETION) {
+ state.handleFinalChunk(chunk.status().orElseGet(() -> {
+ logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL");
+ return Status.INTERNAL.code();
+ }));
+ } else {
+ state.handleDataChunk(chunk);
+ }
+ } catch (TransferAbortedException e) {
+ // Transfer was aborted; nothing else to do.
}
}
final void handleTimeoutIfDeadlineExceeded() {
if (Instant.now().isAfter(deadline)) {
- logger.atFinest().log("Event: handleTimeout since %s is after %s", Instant.now(), deadline);
- state.handleTimeout();
+ logger.atFinest().log("%s timed out since the deadline %s has passed", this, deadline);
+ try {
+ state.handleTimeout();
+ } catch (TransferAbortedException e) {
+ // Transfer was aborted; nothing else to do.
+ }
}
}
@@ -195,24 +214,27 @@ abstract class Transfer<T> {
// disconnectionRetries is set to Int.MAX_VALUE when a packet is received to prevent retries
// after the initial packet.
if (disconnectionRetries++ < maxRetries) {
- logger.atFine().log("Restarting the pw_transfer RPC for transfer %d (attempt %d/%d)",
- sessionId,
+ logger.atFine().log("Restarting the pw_transfer RPC for %s (attempt %d/%d)",
+ this,
disconnectionRetries,
maxRetries);
- if (sendChunk(getChunkForRetry())) {
- setInitialTimeout();
+ try {
+ sendChunk(getChunkForRetry());
+ } catch (TransferAbortedException e) {
+ return; // Transfer is aborted; nothing else to do.
}
+ setInitialTimeout();
} else {
- cleanUp(new TransferError(
+ changeState(new Completed(new TransferError(
"Transfer " + sessionId + " restarted " + maxRetries + " times, aborting",
- Status.INTERNAL));
+ Status.INTERNAL)));
}
}
/** Returns the State to enter immediately after sending the first packet. */
abstract State getWaitingForDataState();
- abstract VersionedChunk getInitialChunk(ProtocolVersion desiredProtocolVersion);
+ abstract void prepareInitialChunk(VersionedChunk.Builder chunk);
/**
* Returns the chunk to send for a retry. Returns the initial chunk if no chunks have been sent.
@@ -224,78 +246,61 @@ abstract class Transfer<T> {
final VersionedChunk.Builder newChunk(Chunk.Type type) {
return VersionedChunk.builder()
- .setVersion(configuredProtocolVersion)
+ .setVersion(configuredProtocolVersion != ProtocolVersion.UNKNOWN ? configuredProtocolVersion
+ : desiredProtocolVersion)
.setType(type)
- .setSessionId(getId());
+ .setSessionId(sessionId);
}
final VersionedChunk getLastChunkSent() {
return lastChunkSent;
}
- final void setState(State newState) {
+ final State changeState(State newState) {
if (newState != state) {
- logger.atFinest().log(
- "Transfer %d state %s -> %s", getId(), state.getName(), newState.getName());
+ logger.atFinest().log("%s state %s -> %s",
+ this,
+ state.getClass().getSimpleName(),
+ newState.getClass().getSimpleName());
}
state = newState;
+ return state;
}
- /** Sends a chunk. Returns true if sent, false if sending failed and the transfer was aborted. */
- final boolean sendChunk(VersionedChunk chunk) {
+ /** Exception thrown when the transfer is aborted. */
+ static class TransferAbortedException extends Exception {}
+
+ /**
+ * Sends a chunk.
+ *
+ * If sending fails, the transfer cannot proceed. sendChunk() sets the state to completed and
+ * throws a TransferAbortedException.
+ */
+ final void sendChunk(VersionedChunk chunk) throws TransferAbortedException {
lastChunkSent = chunk;
if (shouldAbortCallback.getAsBoolean()) {
logger.atWarning().log("Abort signal received.");
- cleanUp(new TransferError(sessionId, Status.ABORTED));
- return false;
+ changeState(new Completed(new TransferError(this, Status.ABORTED)));
+ throw new TransferAbortedException();
}
try {
+ logger.atFinest().log("%s sending %s", this, chunk);
eventHandler.sendChunk(chunk.toMessage());
} catch (TransferError transferError) {
- cleanUp(transferError);
- return false;
+ changeState(new Completed(transferError));
+ throw new TransferAbortedException();
}
- return true;
- }
-
- /** Performs final cleanup of a completed transfer. No packets are sent to the server. */
- private void cleanUp(Status status) {
- eventHandler.unregisterTransfer(sessionId);
-
- logger.atInfo().log("Transfer %d completed with status %s", sessionId, status);
- if (status.ok()) {
- setFutureResult();
- } else {
- future.setException(new TransferError(sessionId, status));
- }
- isCleanedUp = true;
- }
-
- /** Finishes the transfer due to an exception. No packets are sent to the server. */
- final void cleanUp(TransferError exception) {
- eventHandler.unregisterTransfer(sessionId);
-
- logger.atWarning().withCause(exception).log("Transfer %d terminated with exception", sessionId);
- future.setException(exception);
- isCleanedUp = true;
}
/** Sends a status chunk to the server and finishes the transfer. */
- final void setStateCompletedAndSendFinalChunk(Status status) {
- setState(new Completed());
-
- logger.atFine().log("Sending final chunk for transfer %d with status %s", sessionId, status);
- // If the transfer was completed due to concurrently handling chunks, don't send.
- if (isCleanedUp) {
- logger.atFine().log("Skipping sending final chunk on already-completed transfer");
- return;
- }
-
- // Only call finish() if the sendChunk was successful. If it wasn't, the exception would have
- // already terminated the transfer.
- if (sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build())) {
- cleanUp(status);
+ final void setStateTerminatingAndSendFinalChunk(Status status) throws TransferAbortedException {
+ logger.atFine().log("%s sending final chunk with status %s", this, status);
+ sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build());
+ if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) {
+ changeState(new Terminating(status));
+ } else {
+ changeState(new Completed(status));
}
}
@@ -308,9 +313,9 @@ abstract class Transfer<T> {
long durationNanos = Duration.between(startTime, Instant.now()).toNanos();
long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos);
- logger.atFine().log("Transfer %d progress: "
+ logger.atFine().log("%s progress: "
+ "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s",
- sessionId,
+ this,
progress.percentReceived(),
bytesSent,
bytesConfirmedReceived,
@@ -318,67 +323,135 @@ abstract class Transfer<T> {
totalRate);
}
- /** Represents a state in the transfer state machine. */
- abstract class State {
- private String getName() {
- return getClass().getSimpleName();
- }
-
- /** Called to handle a non-final chunk for this transfer. */
- void handleDataChunk(VersionedChunk chunk) {
- logger.atFine().log(
- "Transfer %d [%s state]: Received unexpected data chunk", getId(), getName());
- }
+ interface State {
+ /**
+ * Called to handle a non-final chunk for this transfer.
+ */
+ void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException;
+
+ /**
+ * Called to handle the final chunk for this transfer.
+ */
+ void handleFinalChunk(int statusCode) throws TransferAbortedException;
+
+ /**
+ * Called when this transfer's deadline expires.
+ */
+ void handleTimeout() throws TransferAbortedException;
+
+ /**
+ * Called if the transfer is cancelled by the user.
+ */
+ void handleCancellation();
+
+ /**
+ * Called when the transfer thread is shutting down.
+ */
+ void handleTermination();
+ }
- /** Called to handle the final chunk for this transfer. */
- void handleFinalChunk(int statusCode) {
+ /** Represents an active state in the transfer state machine. */
+ abstract class ActiveState implements State {
+ @Override
+ public final void handleFinalChunk(int statusCode) throws TransferAbortedException {
Status status = Status.fromCode(statusCode);
- if (status != null) {
- cleanUp(status);
- setState(new Completed());
- } else {
- logger.atWarning().log("Received invalid status value %d", statusCode);
- setStateCompletedAndSendFinalChunk(Status.INVALID_ARGUMENT);
+ if (status == null) {
+ logger.atWarning().log("Received invalid status value %d, using INTERNAL", statusCode);
+ status = Status.INTERNAL;
}
+
+ // If this is not version 2, immediately clean up. If it is, send the COMPLETION_ACK first and
+ // clean up if that succeeded.
+ if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) {
+ sendChunk(newChunk(Chunk.Type.COMPLETION_ACK).build());
+ }
+ changeState(new Completed(status));
}
- /** Called when this transfer's deadline expires. */
- void handleTimeout() {
- logger.atFine().log("Transfer %d [%s state]: Ignoring timeout", getId(), getName());
+ /** Enters the recovery state and returns to this state if recovery succeeds. */
+ @Override
+ public void handleTimeout() throws TransferAbortedException {
+ changeState(new Recovery(this)).handleTimeout();
}
- /** Called if the transfer is cancelled by the user. */
- void handleCancellation() {
- setStateCompletedAndSendFinalChunk(Status.CANCELLED);
+ @Override
+ public final void handleCancellation() {
+ try {
+ setStateTerminatingAndSendFinalChunk(Status.CANCELLED);
+ } catch (TransferAbortedException e) {
+ // Transfer was aborted; nothing to do.
+ }
}
- /** Called when the transfer thread is shutting down. */
- void handleTermination() {
- setStateCompletedAndSendFinalChunk(Status.ABORTED);
+ @Override
+ public final void handleTermination() {
+ try {
+ setStateTerminatingAndSendFinalChunk(Status.ABORTED);
+ } catch (TransferAbortedException e) {
+ // Transfer was aborted; nothing to do.
+ }
}
}
- private class Inactive extends State {}
+ private class Initiating extends ActiveState {
+ @Override
+ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
+ assignSessionId(chunk.sessionId());
+
+ if (chunk.version() == ProtocolVersion.UNKNOWN) {
+ logger.atWarning().log(
+ "%s aborting due to unsupported protocol version: %s", Transfer.this, chunk);
+ setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
+ return;
+ }
+
+ changeState(getWaitingForDataState());
+
+ if (chunk.type() != Chunk.Type.START_ACK) {
+ logger.atFine().log(
+ "%s got non-handshake chunk; reverting to legacy protocol", Transfer.this);
+ configuredProtocolVersion = ProtocolVersion.LEGACY;
+ state.handleDataChunk(chunk);
+ return;
+ }
- private class Initiating extends State {
- // TODO(hepler): Implement the starting handshake
+ if (chunk.version().compareTo(desiredProtocolVersion) <= 0) {
+ configuredProtocolVersion = chunk.version();
+ } else {
+ configuredProtocolVersion = desiredProtocolVersion;
+ }
+
+ logger.atFine().log("%s negotiated protocol %s (ours=%s, theirs=%s)",
+ Transfer.this,
+ configuredProtocolVersion,
+ desiredProtocolVersion,
+ chunk.version());
+
+ VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION);
+ prepareInitialChunk(startAckConfirmation);
+ sendChunk(startAckConfirmation.build());
+ }
}
/** Recovering from an expired timeout. */
- class Recovery extends State {
+ class Recovery extends ActiveState {
+ private final State nextState;
private int retries;
+ Recovery(State nextState) {
+ this.nextState = nextState;
+ }
+
@Override
- void handleDataChunk(VersionedChunk chunk) {
- setState(getWaitingForDataState());
- state.handleDataChunk(chunk);
+ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
+ changeState(nextState).handleDataChunk(chunk);
}
@Override
- void handleTimeout() {
+ public void handleTimeout() throws TransferAbortedException {
if (retries < maxRetries) {
- logger.atFiner().log("Transfer %d received no chunks for %d ms; retrying %d/%d",
- getId(),
+ logger.atFiner().log("%s received no chunks for %d ms; retrying %d/%d",
+ Transfer.this,
timeoutMillis,
retries,
maxRetries);
@@ -388,20 +461,68 @@ abstract class Transfer<T> {
return;
}
- setStateCompletedAndSendFinalChunk(Status.DEADLINE_EXCEEDED);
+ // If the transfer timed out, skip to the completed state. Don't send any more packets.
+ changeState(new Completed(Status.DEADLINE_EXCEEDED));
}
}
/** Transfer completed. Do nothing if the transfer is terminated or cancelled. */
- class Completed extends State {
- Completed() {
+ class Terminating extends ActiveState {
+ private final Status status;
+
+ Terminating(Status status) {
+ this.status = status;
+ }
+
+ @Override
+ public void handleDataChunk(VersionedChunk chunk) {
+ if (chunk.type() == Chunk.Type.COMPLETION_ACK) {
+ changeState(new Completed(status));
+ }
+ }
+ }
+
+ class Completed implements State {
+ /** Performs final cleanup of a completed transfer. No packets are sent to the server. */
+ Completed(Status status) {
+ cleanUp();
+ logger.atInfo().log("%s completed with status %s", Transfer.this, status);
+ if (status.ok()) {
+ setFutureResult();
+ } else {
+ future.setException(new TransferError(Transfer.this, status));
+ }
+ }
+
+ /** Finishes the transfer due to an exception. No packets are sent to the server. */
+ Completed(TransferError exception) {
+ cleanUp();
+ logger.atWarning().withCause(exception).log("%s terminated with exception", Transfer.this);
+ future.setException(exception);
+ }
+
+ private void cleanUp() {
deadline = NO_TIMEOUT;
+ eventHandler.unregisterTransfer(Transfer.this);
+ }
+
+ @Override
+ public void handleDataChunk(VersionedChunk chunk) {
+ logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this);
}
@Override
- void handleTermination() {}
+ public void handleFinalChunk(int statusCode) {
+ logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this);
+ }
+
+ @Override
+ public void handleTimeout() {}
+
+ @Override
+ public void handleTermination() {}
@Override
- void handleCancellation() {}
+ public void handleCancellation() {}
}
}
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java
index d6c1d1499..d1d81fdb5 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java
@@ -37,6 +37,8 @@ public class TransferClient {
private final TransferEventHandler transferEventHandler;
private final Thread transferEventHandlerThread;
+ private ProtocolVersion desiredProtocolVersion = ProtocolVersion.LEGACY;
+
/**
* Creates a new transfer client for sending and receiving data with pw_transfer.
*
@@ -47,6 +49,25 @@ public class TransferClient {
* @param initialTransferTimeoutMillis How long to wait for the initial communication from the
* server. If the server delays longer than this, retry up to maxRetries times.
* @param maxRetries How many times to retry if a communication times out.
+ */
+ public TransferClient(MethodClient readMethod,
+ MethodClient writeMethod,
+ int transferTimeoutMillis,
+ int initialTransferTimeoutMillis,
+ int maxRetries) {
+ this(readMethod,
+ writeMethod,
+ transferTimeoutMillis,
+ initialTransferTimeoutMillis,
+ maxRetries,
+ ()
+ -> false, // Never abort
+ TransferEventHandler::run);
+ }
+
+ /**
+ * Creates a new transfer client with a callback that can be used to terminate transfers.
+ *
* @param shouldAbortCallback BooleanSupplier that returns true if a transfer should be aborted.
*/
public TransferClient(MethodClient readMethod,
@@ -55,13 +76,30 @@ public class TransferClient {
int initialTransferTimeoutMillis,
int maxRetries,
BooleanSupplier shouldAbortCallback) {
+ this(readMethod,
+ writeMethod,
+ transferTimeoutMillis,
+ initialTransferTimeoutMillis,
+ maxRetries,
+ shouldAbortCallback,
+ TransferEventHandler::run);
+ }
+
+ /** Constructor exposed to package for test use only. */
+ TransferClient(MethodClient readMethod,
+ MethodClient writeMethod,
+ int transferTimeoutMillis,
+ int initialTransferTimeoutMillis,
+ int maxRetries,
+ BooleanSupplier shouldAbortCallback,
+ Consumer<TransferEventHandler> runFunction) {
this.transferTimeoutMillis = transferTimeoutMillis;
this.initialTransferTimeoutMillis = initialTransferTimeoutMillis;
this.maxRetries = maxRetries;
this.shouldAbortCallback = shouldAbortCallback;
transferEventHandler = new TransferEventHandler(readMethod, writeMethod);
- transferEventHandlerThread = new Thread(transferEventHandler::run);
+ transferEventHandlerThread = new Thread(() -> runFunction.accept(transferEventHandler));
transferEventHandlerThread.start();
}
@@ -81,6 +119,7 @@ public class TransferClient {
public ListenableFuture<Void> write(
int resourceId, byte[] data, Consumer<TransferProgress> progressCallback) {
return transferEventHandler.startWriteTransferAsClient(resourceId,
+ desiredProtocolVersion,
transferTimeoutMillis,
initialTransferTimeoutMillis,
maxRetries,
@@ -111,6 +150,7 @@ public class TransferClient {
public ListenableFuture<byte[]> read(
int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback) {
return transferEventHandler.startReadTransferAsClient(resourceId,
+ desiredProtocolVersion,
transferTimeoutMillis,
initialTransferTimeoutMillis,
maxRetries,
@@ -119,6 +159,20 @@ public class TransferClient {
shouldAbortCallback);
}
+ /**
+ * Sets the protocol version to request for future transfers
+ *
+ * Does not affect ongoing transfers. Version cannot be set to UNKNOWN!
+ *
+ * @throws IllegalArgumentException if the protocol version is UNKNOWN
+ */
+ public void setProtocolVersion(ProtocolVersion version) {
+ if (version == ProtocolVersion.UNKNOWN) {
+ throw new IllegalArgumentException("Cannot set protocol version to UNKNOWN!");
+ }
+ desiredProtocolVersion = version;
+ }
+
/** Stops the background thread and waits until it terminates. */
public void close() throws InterruptedException {
transferEventHandler.stop();
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java
index fe448445c..c059c05c7 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java
@@ -26,9 +26,8 @@ public class TransferError extends Exception {
error = Status.UNKNOWN;
}
- TransferError(int id, Status error) {
- this(String.format(Locale.ENGLISH, "Transfer %d failed with status %s", id, error.name()),
- error);
+ TransferError(Transfer<?> transfer, Status error) {
+ this(String.format(Locale.ENGLISH, "%s failed with status %s", transfer, error.name()), error);
}
TransferError(String msg, Status error) {
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java
index 35bba8886..e8d045812 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java
@@ -52,8 +52,9 @@ class TransferEventHandler {
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
- // Maps resource ID to transfer
- private final Map<Integer, Transfer<?>> transfers = new HashMap<>();
+ // Map resource ID to transfer, and session ID to resource ID.
+ private final Map<Integer, Transfer<?>> resourceIdToTransfer = new HashMap<>();
+ private final Map<Integer, Integer> sessionToResourceId = new HashMap<>();
@Nullable private Call.ClientStreaming<Chunk> readStream = null;
@Nullable private Call.ClientStreaming<Chunk> writeStream = null;
@@ -65,6 +66,7 @@ class TransferEventHandler {
}
ListenableFuture<Void> startWriteTransferAsClient(int resourceId,
+ ProtocolVersion desiredProtocolVersion,
int transferTimeoutMillis,
int initialTransferTimeoutMillis,
int maxRetries,
@@ -72,6 +74,7 @@ class TransferEventHandler {
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
WriteTransfer transfer = new WriteTransfer(resourceId,
+ desiredProtocolVersion,
new TransferInterface() {
@Override
Call.ClientStreaming<Chunk> getStream() throws ChannelOutputException {
@@ -97,6 +100,7 @@ class TransferEventHandler {
}
ListenableFuture<byte[]> startReadTransferAsClient(int resourceId,
+ ProtocolVersion desiredProtocolVersion,
int transferTimeoutMillis,
int initialTransferTimeoutMillis,
int maxRetries,
@@ -104,6 +108,7 @@ class TransferEventHandler {
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
ReadTransfer transfer = new ReadTransfer(resourceId,
+ desiredProtocolVersion,
new TransferInterface() {
@Override
Call.ClientStreaming<Chunk> getStream() throws ChannelOutputException {
@@ -130,12 +135,14 @@ class TransferEventHandler {
private void startTransferAsClient(Transfer<?> transfer) {
enqueueEvent(() -> {
- if (transfers.put(transfer.getResourceId(), transfer) != null) {
- transfer.cleanUp(new TransferError("A transfer for resource ID " + transfer.getResourceId()
+ if (resourceIdToTransfer.containsKey(transfer.getResourceId())) {
+ transfer.terminate(new TransferError("A transfer for resource ID "
+ + transfer.getResourceId()
+ " is already in progress! Only one read/write transfer per resource is supported at a time",
Status.ALREADY_EXISTS));
return;
}
+ resourceIdToTransfer.put(transfer.getResourceId(), transfer);
transfer.start();
});
}
@@ -144,14 +151,34 @@ class TransferEventHandler {
void run() {
while (processEvents) {
handleNextEvent();
+ handleTimeouts();
}
}
+ /**
+ * Test version of run() that processes all enqueued events before checking for timeouts.
+ *
+ * Tests that need to time out should process all enqueued events first to prevent flaky failures.
+ * If handling one of several queued packets takes longer than the timeout (which must be short
+ * for a unit test), then the test may fail spuriously.
+ *
+ * This run function is not used outside of tests because processing all incoming packets before
+ * checking for timeouts could delay the transfer client's outgoing write packets if there are
+ * lots of inbound packets. This could delay transfers and cause unnecessary timeouts.
+ */
+ void runForTestsThatMustTimeOut() {
+ while (processEvents) {
+ while (!events.isEmpty()) {
+ handleNextEvent();
+ }
+ handleTimeouts();
+ }
+ }
/** Stops the transfer event handler from processing events. */
void stop() {
enqueueEvent(() -> {
logger.atFine().log("Terminating TransferEventHandler");
- transfers.values().forEach(Transfer::handleTermination);
+ resourceIdToTransfer.values().forEach(Transfer::handleTermination);
processEvents = false;
});
}
@@ -186,17 +213,19 @@ class TransferEventHandler {
event.handle();
}
} catch (InterruptedException e) {
- // If interrupted, check for timeouts anyway.
+ // If interrupted, continue around the loop.
}
+ }
- for (Transfer<?> transfer : transfers.values()) {
+ private void handleTimeouts() {
+ for (Transfer<?> transfer : resourceIdToTransfer.values()) {
transfer.handleTimeoutIfDeadlineExceeded();
}
}
private Instant getNextTimeout() {
Optional<Transfer<?>> transfer =
- transfers.values().stream().min(Comparator.comparing(Transfer::getDeadline));
+ resourceIdToTransfer.values().stream().min(Comparator.comparing(Transfer::getDeadline));
return transfer.isPresent() ? transfer.get().getDeadline() : Transfer.NO_TIMEOUT;
}
@@ -207,7 +236,7 @@ class TransferEventHandler {
/**
* Sends the provided transfer chunk.
*
- * Must be called on the transfer therad.
+ * Must be called on the transfer thread.
*/
void sendChunk(Chunk chunk) throws TransferError {
try {
@@ -218,12 +247,22 @@ class TransferEventHandler {
}
/**
+ * Associates the transfer's session ID with its resource ID.
+ *
+ * Must be called on the transfer thread.
+ */
+ void assignSessionId(Transfer<?> transfer) {
+ sessionToResourceId.put(transfer.getSessionId(), transfer.getResourceId());
+ }
+
+ /**
* Removes this transfer from the list of active transfers.
*
- * Must be called on the transfer therad.
+ * Must be called on the transfer thread.
*/
- void unregisterTransfer(int sessionId) {
- transfers.remove(sessionId);
+ void unregisterTransfer(Transfer<?> transfer) {
+ resourceIdToTransfer.remove(transfer.getResourceId());
+ sessionToResourceId.remove(transfer.getSessionId());
}
/**
@@ -246,13 +285,21 @@ class TransferEventHandler {
VersionedChunk chunk = VersionedChunk.fromMessage(chunkProto);
enqueueEvent(() -> {
- Transfer<?> transfer = transfers.get(chunk.sessionId());
+ Transfer<?> transfer = null;
+ if (chunk.resourceId().isPresent()) {
+ transfer = resourceIdToTransfer.get(chunk.resourceId().getAsInt());
+ } else {
+ Integer resourceId = sessionToResourceId.get(chunk.sessionId());
+ if (resourceId != null) {
+ transfer = resourceIdToTransfer.get(resourceId);
+ }
+ }
+
if (transfer != null) {
- logger.atFinest().log(
- "Transfer %d received chunk: %s", transfer.getId(), chunkToString(chunkProto));
+ logger.atFinest().log("%s received chunk: %s", transfer, chunk);
transfer.handleChunk(chunk);
} else {
- logger.atWarning().log("Ignoring unrecognized transfer session ID %d", chunk.sessionId());
+ logger.atInfo().log("Ignoring unrecognized transfer chunk: %s", chunk);
}
});
}
@@ -268,7 +315,7 @@ class TransferEventHandler {
resetStream();
// The transfers remove themselves from the Map during cleanup, iterate over a copied list.
- List<Transfer<?>> activeTransfers = new ArrayList<>(transfers.values());
+ List<Transfer<?>> activeTransfers = new ArrayList<>(resourceIdToTransfer.values());
// FAILED_PRECONDITION indicates that the stream packet was not recognized as the stream is
// not open. This could occur if the server resets. Notify pending transfers that this has
@@ -278,7 +325,7 @@ class TransferEventHandler {
} else {
TransferError error = new TransferError(
"Transfer stream RPC closed unexpectedly with status " + status, Status.INTERNAL);
- activeTransfers.forEach(t -> t.cleanUp(error));
+ activeTransfers.forEach(t -> t.terminate(error));
}
});
}
@@ -286,42 +333,6 @@ class TransferEventHandler {
abstract void resetStream();
}
- private static String chunkToString(Chunk chunk) {
- StringBuilder str = new StringBuilder();
- str.append("transferId:").append(chunk.getTransferId()).append(" ");
- str.append("sessionId:").append(chunk.getSessionId()).append(" ");
- str.append("resourceId:").append(chunk.getResourceId()).append(" ");
- str.append("windowEndOffset:").append(chunk.getWindowEndOffset()).append(" ");
- str.append("offset:").append(chunk.getOffset()).append(" ");
- // Don't include the actual data; it's too much.
- str.append("len(data):").append(chunk.getData().size()).append(" ");
- if (chunk.hasPendingBytes()) {
- str.append("pendingBytes:").append(chunk.getPendingBytes()).append(" ");
- }
- if (chunk.hasMaxChunkSizeBytes()) {
- str.append("maxChunkSizeBytes:").append(chunk.getMaxChunkSizeBytes()).append(" ");
- }
- if (chunk.hasMinDelayMicroseconds()) {
- str.append("minDelayMicroseconds:").append(chunk.getMinDelayMicroseconds()).append(" ");
- }
- if (chunk.hasRemainingBytes()) {
- str.append("remainingBytes:").append(chunk.getRemainingBytes()).append(" ");
- }
- if (chunk.hasStatus()) {
- str.append("status:").append(chunk.getStatus()).append(" ");
- }
- if (chunk.hasType()) {
- str.append("type:").append(chunk.getTypeValue()).append(" ");
- }
- if (chunk.hasResourceId()) {
- str.append("resourceId:").append(chunk.getSessionId()).append(" ");
- }
- if (chunk.hasSessionId()) {
- str.append("sessionId:").append(chunk.getSessionId()).append(" ");
- }
- return str.toString();
- }
-
// Represents an event that occurs during a transfer
private interface Event {
void handle();
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
index c4c520ac8..2a253382c 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
@@ -119,13 +119,18 @@ abstract class VersionedChunk {
builder.setType(Chunk.Type.PARAMETERS_RETRANSMIT);
}
- if (version == ProtocolVersion.LEGACY && chunk.hasStatus()) {
- builder.setType(Chunk.Type.COMPLETION);
+ // For legacy chunks, use the transfer ID as both the resource and session IDs.
+ if (version == ProtocolVersion.LEGACY) {
+ builder.setSessionId(chunk.getTransferId());
+ builder.setResourceId(chunk.getTransferId());
+ if (chunk.hasStatus()) {
+ builder.setType(Chunk.Type.COMPLETION);
+ }
+ } else {
+ builder.setSessionId(chunk.getSessionId());
}
- builder.setSessionId(chunk.hasSessionId() ? chunk.getSessionId() : chunk.getTransferId())
- .setOffset((int) chunk.getOffset())
- .setData(chunk.getData());
+ builder.setOffset((int) chunk.getOffset()).setData(chunk.getData());
if (chunk.hasResourceId()) {
builder.setResourceId(chunk.getResourceId());
@@ -162,7 +167,6 @@ abstract class VersionedChunk {
public Chunk toMessage() {
Chunk.Builder chunk = Chunk.newBuilder()
.setType(type())
- .setSessionId(sessionId())
.setOffset(offset())
.setWindowEndOffset(windowEndOffset())
.setData(data());
@@ -173,19 +177,19 @@ abstract class VersionedChunk {
minDelayMicroseconds().ifPresent(chunk::setMinDelayMicroseconds);
status().ifPresent(chunk::setStatus);
+ // session_id did not exist in the legacy protocol, so don't send it.
+ if (version() != ProtocolVersion.LEGACY && sessionId() != UNASSIGNED_SESSION_ID) {
+ chunk.setSessionId(sessionId());
+ }
+
if (shouldEncodeLegacyFields()) {
- chunk.setTransferId(resourceId().orElse(chunk.getSessionId()));
+ chunk.setTransferId(resourceId().orElse(sessionId()));
if (chunk.getWindowEndOffset() != 0) {
chunk.setPendingBytes(chunk.getWindowEndOffset() - offset());
}
}
- if (version() == ProtocolVersion.LEGACY) {
- // session_id did not exist in the legacy protocol, so don't send it.
- chunk.clearSessionId();
- }
-
if (isInitialHandshakeChunk()) {
chunk.setProtocolVersion(version().ordinal());
}
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
index d1386c0b2..fc16bb6d9 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
@@ -38,6 +38,7 @@ class WriteTransfer extends Transfer<Void> {
private final byte[] data;
protected WriteTransfer(int resourceId,
+ ProtocolVersion desiredProtocolVersion,
TransferInterface transferManager,
int timeoutMillis,
int initialTimeoutMillis,
@@ -46,6 +47,7 @@ class WriteTransfer extends Transfer<Void> {
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(resourceId,
+ desiredProtocolVersion,
transferManager,
timeoutMillis,
initialTimeoutMillis,
@@ -56,10 +58,8 @@ class WriteTransfer extends Transfer<Void> {
}
@Override
- VersionedChunk getInitialChunk(ProtocolVersion desiredProcotolVersion) {
- return VersionedChunk.createInitialChunk(desiredProcotolVersion, getResourceId())
- .setRemainingBytes(data.length)
- .build();
+ void prepareInitialChunk(VersionedChunk.Builder chunk) {
+ chunk.setRemainingBytes(data.length);
}
@Override
@@ -67,22 +67,15 @@ class WriteTransfer extends Transfer<Void> {
return new WaitingForTransferParameters();
}
- private class WaitingForTransferParameters extends State {
+ private class WaitingForTransferParameters extends ActiveState {
@Override
- void handleTimeout() {
- Recovery recoveryState = new Recovery();
- setState(recoveryState);
- recoveryState.handleTimeout();
- }
-
- @Override
- void handleDataChunk(VersionedChunk chunk) {
+ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
updateTransferParameters(chunk);
}
}
/** Transmitting a transfer window. */
- private class Transmitting extends State {
+ private class Transmitting extends ActiveState {
private final int windowStartOffset;
private final int windowEndOffset;
@@ -92,26 +85,23 @@ class WriteTransfer extends Transfer<Void> {
}
@Override
- void handleDataChunk(VersionedChunk chunk) {
+ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
updateTransferParameters(chunk);
}
@Override
- void handleTimeout() {
+ public void handleTimeout() throws TransferAbortedException {
ByteString chunkData = ByteString.copyFrom(
data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes));
- logger.atFiner().log("Transfer %d: sending bytes %d-%d (%d B chunk, max size %d B)",
- getId(),
+ logger.atFiner().log("%s sending bytes %d-%d (%d B chunk, max size %d B)",
+ WriteTransfer.this,
sentOffset,
sentOffset + chunkData.size() - 1,
chunkData.size(),
maxChunkSizeBytes);
- if (!sendChunk(buildDataChunk(chunkData))) {
- setState(new Completed());
- return;
- }
+ sendChunk(buildDataChunk(chunkData));
sentOffset += chunkData.size();
updateProgress(sentOffset, windowStartOffset, data.length);
@@ -121,7 +111,7 @@ class WriteTransfer extends Transfer<Void> {
return; // Keep transmitting packets
}
setNextChunkTimeout();
- setState(new WaitingForTransferParameters());
+ changeState(new WaitingForTransferParameters());
}
}
@@ -139,15 +129,11 @@ class WriteTransfer extends Transfer<Void> {
getFuture().set(null);
}
- private void updateTransferParameters(VersionedChunk chunk) {
- logger.atFiner().log("Transfer %d received new chunk (type=%s, offset=%d, windowEndOffset=%d)",
- getId(),
- chunk.type(),
- chunk.offset(),
- chunk.windowEndOffset());
+ private void updateTransferParameters(VersionedChunk chunk) throws TransferAbortedException {
+ logger.atFiner().log("%s received new chunk %s", this, chunk);
if (chunk.offset() > data.length) {
- setStateCompletedAndSendFinalChunk(Status.OUT_OF_RANGE);
+ setStateTerminatingAndSendFinalChunk(Status.OUT_OF_RANGE);
return;
}
@@ -156,15 +142,15 @@ class WriteTransfer extends Transfer<Void> {
long droppedBytes = sentOffset - chunk.offset();
if (droppedBytes > 0) {
totalDroppedBytes += droppedBytes;
- logger.atFine().log("Transfer %d retransmitting %d B (%d retransmitted of %d sent)",
- getId(),
+ logger.atFine().log("%s retransmitting %d B (%d retransmitted of %d sent)",
+ this,
droppedBytes,
totalDroppedBytes,
sentOffset);
}
sentOffset = chunk.offset();
} else if (windowEndOffset <= sentOffset) {
- logger.atFiner().log("Transfer %d: ignoring old rolling window packet", getId());
+ logger.atFiner().log("%s ignoring old rolling window packet", this);
setNextChunkTimeout();
return; // Received an old rolling window packet, ignore it.
}
@@ -179,17 +165,16 @@ class WriteTransfer extends Transfer<Void> {
if (maxChunkSizeBytes == 0) {
if (windowEndOffset == sentOffset) {
- logger.atWarning().log("Server requested 0 bytes in write transfer %d; aborting", getId());
- setStateCompletedAndSendFinalChunk(Status.INVALID_ARGUMENT);
+ logger.atWarning().log("%s server requested 0 bytes; aborting", this);
+ setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
return;
}
// Default to sending the entire window if the max chunk size is not specified (or is 0).
maxChunkSizeBytes = windowEndOffset - sentOffset;
}
- Transmitting transmittingState = new Transmitting(chunk.offset(), windowEndOffset);
- setState(transmittingState);
- transmittingState.handleTimeout(); // Immediately send the first packet
+ // Enter the transmitting state and immediately send the first packet
+ changeState(new Transmitting(chunk.offset(), windowEndOffset)).handleTimeout();
}
private VersionedChunk buildDataChunk(ByteString chunkData) {
@@ -198,7 +183,7 @@ class WriteTransfer extends Transfer<Void> {
// If this is the last data chunk, setRemainingBytes to 0.
if (sentOffset + chunkData.size() == data.length) {
- logger.atFiner().log("Transfer %d sending final chunk with %d B", getId(), chunkData.size());
+ logger.atFiner().log("%s sending final chunk with %d B", this, chunkData.size());
chunk.setRemainingBytes(0);
}
return chunk.build();
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel b/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel
index a4b2955a0..1a0f8110a 100644
--- a/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel
+++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel
@@ -20,7 +20,6 @@ java_test(
name = "TransferClientTest",
size = "small",
srcs = ["TransferClientTest.java"],
- tags = ["manual"],
test_class = "dev.pigweed.pw_transfer.TransferClientTest",
visibility = ["//visibility:public"],
deps = [
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
index 78c2bb566..6c19b2939 100644
--- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
+++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
@@ -51,7 +51,6 @@ public final class TransferClientTest {
private static final TransferParameters TRANSFER_PARAMETERS =
TransferParameters.create(50, 30, 0);
private static final int MAX_RETRIES = 2;
- private static final int ID = 123;
private boolean shouldAbortFlag = false;
private TestClient rpcClient;
@@ -63,9 +62,6 @@ public final class TransferClientTest {
@Before
public void setup() {
rpcClient = new TestClient(ImmutableList.of(TransferService.get()));
-
- // Default to a long timeout that should never trigger.
- transferClient = createTransferClient(60000, 60000);
}
@After
@@ -78,43 +74,50 @@ public final class TransferClientTest {
}
@Test
- public void read_singleChunk_successful() throws Exception {
+ public void legacy_read_singleChunk_successful() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(1);
assertThat(future.isDone()).isFalse();
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
- public void read_failedPreconditionError_retriesInitialPacket() throws Exception {
+ public void legacy_read_failedPreconditionError_retriesInitialPacket() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY));
receiveReadServerError(Status.FAILED_PRECONDITION);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY));
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
- public void read_failedPreconditionError_abortsAfterInitialPacket() {
+ public void legacy_read_failedPreconditionError_abortsAfterInitialPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
TransferParameters params = TransferParameters.create(50, 50, 0);
ListenableFuture<byte[]> future = transferClient.read(1, params);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, params));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY, params));
- receiveReadChunks(dataChunk(1, TEST_DATA_100B, 0, 50));
+ receiveReadChunks(legacyDataChunk(1, TEST_DATA_100B, 0, 50));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setOffset(50)
.setPendingBytes(50)
.setWindowEndOffset(100)
@@ -129,14 +132,15 @@ public final class TransferClientTest {
}
@Test
- public void read_failedPreconditionErrorMaxRetriesTimes_aborts() {
+ public void legacy_read_failedPreconditionErrorMaxRetriesTimes_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveReadServerError(Status.FAILED_PRECONDITION);
}
- Chunk initialChunk = initialReadChunk(1);
+ Chunk initialChunk = initialReadChunk(1, ProtocolVersion.LEGACY);
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
@@ -150,94 +154,126 @@ public final class TransferClientTest {
}
@Test
- public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception {
+ public void legacy_read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(1);
assertThat(future.isDone()).isFalse();
- receiveReadChunks(finalChunk(2, Status.OK),
- newChunk(Chunk.Type.DATA, 0).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
- newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
- receiveWriteChunks(finalChunk(1, Status.OK),
- newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
- newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
+ receiveReadChunks(legacyFinalChunk(2, Status.OK),
+ newLegacyChunk(Chunk.Type.DATA, 0)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0),
+ newLegacyChunk(Chunk.Type.DATA, 3)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0));
+ receiveWriteChunks(legacyFinalChunk(1, Status.OK),
+ newLegacyChunk(Chunk.Type.DATA, 1)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0),
+ newLegacyChunk(Chunk.Type.DATA, 2)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0));
assertThat(future.isDone()).isFalse();
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
- public void read_empty() throws Exception {
+ public void legacy_read_empty() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(2);
lastChunks(); // Discard initial chunk (tested elsewhere)
- receiveReadChunks(newChunk(Chunk.Type.DATA, 2).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 2).setRemainingBytes(0));
- assertThat(lastChunks()).containsExactly(finalChunk(2, Status.OK));
+ assertThat(lastChunks()).containsExactly(legacyFinalChunk(2, Status.OK));
assertThat(future.get()).isEqualTo(new byte[] {});
}
@Test
- public void read_sendsTransferParametersFirst() {
+ public void legacy_read_sendsTransferParametersFirst() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
TransferParameters params = TransferParameters.create(3, 2, 1);
ListenableFuture<byte[]> future = transferClient.read(99, params);
- assertThat(lastChunks()).containsExactly(initialReadChunk(99, params));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(99, ProtocolVersion.LEGACY, params));
assertThat(future.cancel(true)).isTrue();
}
@Test
- public void read_severalChunks() throws Exception {
- ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
+ public void legacy_read_severalChunks() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
- assertThat(lastChunks()).containsExactly(initialReadChunk(ID));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY));
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20)).setRemainingBytes(70),
- newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 40)));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(0)
+ .setData(range(0, 20))
+ .setRemainingBytes(70),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(40)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(90)
.build());
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(40).setData(range(40, 70)));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(70)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(120)
.build());
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 100)).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(70)
+ .setData(range(70, 100))
+ .setRemainingBytes(0));
- assertThat(lastChunks()).containsExactly(finalChunk(ID, Status.OK));
+ assertThat(lastChunks()).containsExactly(legacyFinalChunk(123, Status.OK));
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@Test
- public void read_progressCallbackIsCalled() {
+ public void legacy_read_progressCallbackIsCalled() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future =
- transferClient.read(ID, TRANSFER_PARAMETERS, progressCallback);
-
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)),
- newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)),
- newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 60)).setRemainingBytes(5),
- newChunk(Chunk.Type.DATA, ID).setOffset(60).setData(range(60, 70)),
- newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 80)).setRemainingBytes(20),
- newChunk(Chunk.Type.DATA, ID).setOffset(90).setData(range(90, 100)),
- newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+ transferClient.read(123, TRANSFER_PARAMETERS, progressCallback);
+
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)),
+ newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(50)
+ .setData(range(50, 60))
+ .setRemainingBytes(5),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)),
+ newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(70)
+ .setData(range(70, 80))
+ .setRemainingBytes(20),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)),
+ newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
@@ -251,72 +287,82 @@ public final class TransferClientTest {
}
@Test
- public void read_rewindWhenPacketsSkipped() throws Exception {
- ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
+ public void legacy_read_rewindWhenPacketsSkipped() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
- assertThat(lastChunks()).containsExactly(initialReadChunk(ID));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY));
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(30, 50)));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)),
- newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(30)
.setPendingBytes(50)
.setWindowEndOffset(80)
.setMaxChunkSizeBytes(30)
.build());
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(50)
.setPendingBytes(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(30)
.build());
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)),
- newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)),
+ newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(80)
.setPendingBytes(50)
.setWindowEndOffset(130)
.setMaxChunkSizeBytes(30)
.build(),
- finalChunk(ID, Status.OK));
+ legacyFinalChunk(123, Status.OK));
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@Test
- public void read_multipleWithSameId_sequentially_successful() throws Exception {
+ public void legacy_read_multipleWithSameId_sequentially_successful() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
for (int i = 0; i < 3; ++i) {
ListenableFuture<byte[]> future = transferClient.read(1);
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
}
@Test
- public void read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
+ public void legacy_read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> first = transferClient.read(123);
ListenableFuture<byte[]> second = transferClient.read(123);
@@ -328,7 +374,8 @@ public final class TransferClientTest {
}
@Test
- public void read_sendErrorOnFirstPacket_fails() {
+ public void legacy_read_sendErrorOnFirstPacket_fails() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
ListenableFuture<byte[]> future = transferClient.read(123);
@@ -339,15 +386,16 @@ public final class TransferClientTest {
}
@Test
- public void read_sendErrorOnLaterPacket_aborts() {
- ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
+ public void legacy_read_sendErrorOnLaterPacket_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20)));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)));
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 50)));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50)));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -355,20 +403,22 @@ public final class TransferClientTest {
}
@Test
- public void read_cancelFuture_abortsTransfer() {
- ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
+ public void legacy_read_cancelFuture_abortsTransfer() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
assertThat(future.cancel(true)).isTrue();
- receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)));
- assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED));
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)));
+ assertThat(lastChunks()).contains(legacyFinalChunk(123, Status.CANCELLED));
}
@Test
- public void read_protocolError_aborts() {
- ListenableFuture<byte[]> future = transferClient.read(ID);
+ public void legacy_read_transferProtocolError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<byte[]> future = transferClient.read(123);
- receiveReadChunks(finalChunk(ID, Status.ALREADY_EXISTS));
+ receiveReadChunks(legacyFinalChunk(123, Status.ALREADY_EXISTS));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -377,7 +427,8 @@ public final class TransferClientTest {
}
@Test
- public void read_rpcError_aborts() {
+ public void legacy_read_rpcError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(2);
receiveReadServerError(Status.NOT_FOUND);
@@ -387,21 +438,9 @@ public final class TransferClientTest {
}
@Test
- public void read_unknownVersion_invalidArgument() {
- ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS);
-
- receiveReadChunks(newChunk(Chunk.Type.DATA, 2).setProtocolVersion(3));
-
- ExecutionException exception = assertThrows(ExecutionException.class, future::get);
- assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
-
- assertThat(lastChunks())
- .containsExactly(initialReadChunk(2), finalChunk(2, Status.INVALID_ARGUMENT));
- }
- @Test
- public void read_timeout() {
- transferClient = createTransferClient(1, 1); // Create a manager with a very short timeout.
- ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
+ public void legacy_read_timeout() {
+ createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
@@ -409,37 +448,38 @@ public final class TransferClientTest {
// read should have retried sending the transfer parameters 2 times, for a total of 3
assertThat(lastChunks())
- .containsExactly(initialReadChunk(ID),
- initialReadChunk(ID),
- initialReadChunk(ID),
- finalChunk(ID, Status.DEADLINE_EXCEEDED));
+ .containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY),
+ initialReadChunk(123, ProtocolVersion.LEGACY),
+ initialReadChunk(123, ProtocolVersion.LEGACY));
}
@Test
- public void write_singleChunk() throws Exception {
+ public void legacy_write_singleChunk() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(future.isDone()).isFalse();
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
- finalChunk(2, Status.OK));
+ legacyFinalChunk(2, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
- public void write_platformTransferDisabled_aborted() {
+ public void legacy_write_platformTransferDisabled_aborted() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(future.isDone()).isFalse();
shouldAbortFlag = true;
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
- finalChunk(2, Status.OK));
+ legacyFinalChunk(2, Status.OK));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -447,36 +487,40 @@ public final class TransferClientTest {
}
@Test
- public void write_failedPreconditionError_retriesInitialPacket() throws Exception {
+ public void legacy_write_failedPreconditionError_retriesInitialPacket() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
- assertThat(lastChunks()).containsExactly(initialWriteChunk(2, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()));
receiveWriteServerError(Status.FAILED_PRECONDITION);
- assertThat(lastChunks()).containsExactly(initialWriteChunk(2, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()));
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
- finalChunk(2, Status.OK));
+ legacyFinalChunk(2, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
- public void write_failedPreconditionError_abortsAfterInitialPacket() {
+ public void legacy_write_failedPreconditionError_abortsAfterInitialPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(50));
assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(2, TEST_DATA_100B.size()), dataChunk(2, TEST_DATA_100B, 0, 50));
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ legacyDataChunk(2, TEST_DATA_100B, 0, 50));
receiveWriteServerError(Status.FAILED_PRECONDITION);
@@ -486,14 +530,15 @@ public final class TransferClientTest {
}
@Test
- public void write_failedPreconditionErrorMaxRetriesTimes_aborts() {
+ public void legacy_write_failedPreconditionErrorMaxRetriesTimes_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveWriteServerError(Status.FAILED_PRECONDITION);
}
- Chunk initialChunk = initialWriteChunk(2, TEST_DATA_SHORT.size());
+ Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size());
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
@@ -507,47 +552,52 @@ public final class TransferClientTest {
}
@Test
- public void write_empty() throws Exception {
+ public void legacy_write_empty() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, new byte[] {});
assertThat(future.isDone()).isFalse();
- receiveWriteChunks(finalChunk(2, Status.OK));
+ receiveWriteChunks(legacyFinalChunk(2, Status.OK));
- assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 0));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, 0));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
- public void write_severalChunks() throws Exception {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ public void legacy_write_severalChunks() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size()));
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()));
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(),
- newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build());
+ .containsExactly(
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(50)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 75)).build(),
- newChunk(Chunk.Type.DATA, ID).setOffset(75).setData(range(75, 90)).build());
+ .containsExactly(
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build());
receiveWriteChunks(
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(90).setPendingBytes(50));
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setPendingBytes(50));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123)
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
@@ -555,18 +605,20 @@ public final class TransferClientTest {
assertThat(future.isDone()).isFalse();
- receiveWriteChunks(finalChunk(ID, Status.OK));
+ receiveWriteChunks(legacyFinalChunk(123, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
- public void write_parametersContinue() throws Exception {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ public void legacy_write_parametersContinue() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size()));
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()));
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
.setPendingBytes(50)
.setWindowEndOffset(50)
@@ -574,10 +626,11 @@ public final class TransferClientTest {
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(),
- newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build());
+ .containsExactly(
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(30)
.setPendingBytes(50)
.setWindowEndOffset(80));
@@ -585,15 +638,15 @@ public final class TransferClientTest {
// Transfer doesn't roll back to offset 30 but instead continues sending up to 80.
assertThat(lastChunks())
.containsExactly(
- newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)).build());
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(80)
.setPendingBytes(50)
.setWindowEndOffset(130));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123)
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0)
@@ -601,18 +654,20 @@ public final class TransferClientTest {
assertThat(future.isDone()).isFalse();
- receiveWriteChunks(finalChunk(ID, Status.OK));
+ receiveWriteChunks(legacyFinalChunk(123, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
- public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ public void legacy_write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size()));
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()));
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
.setPendingBytes(90)
.setWindowEndOffset(90)
@@ -620,40 +675,42 @@ public final class TransferClientTest {
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 90)).build());
+ .containsExactly(
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 90)).build());
receiveWriteChunks(
// This stale packet with a window end before the offset should be ignored.
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
+ newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
.setOffset(25)
.setPendingBytes(25)
.setWindowEndOffset(50),
// Start from an arbitrary offset before the current, but extend the window to the end.
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID).setOffset(80).setWindowEndOffset(100));
+ newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(100));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, ID)
+ .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123)
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
.build());
- receiveWriteChunks(finalChunk(ID, Status.OK));
+ receiveWriteChunks(legacyFinalChunk(123, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
- public void write_progressCallbackIsCalled() {
+ public void legacy_write_progressCallbackIsCalled() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future =
- transferClient.write(ID, TEST_DATA_100B.toByteArray(), progressCallback);
+ transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback);
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
.setPendingBytes(90)
.setMaxChunkSizeBytes(30),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(50).setPendingBytes(50),
- finalChunk(ID, Status.OK));
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setPendingBytes(50),
+ legacyFinalChunk(123, Status.OK));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
@@ -667,28 +724,1039 @@ public final class TransferClientTest {
}
@Test
- public void write_asksForFinalOffset_sendsFinalPacket() {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ public void legacy_write_asksForFinalOffset_sendsFinalPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(100)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size()),
- newChunk(Chunk.Type.DATA, ID).setOffset(100).setRemainingBytes(0).build());
+ .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ newLegacyChunk(Chunk.Type.DATA, 123).setOffset(100).setRemainingBytes(0).build());
+ assertThat(future.isDone()).isFalse();
+ }
+
+ @Test
+ public void legacy_write_multipleWithSameId_sequentially_successful() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ for (int i = 0; i < 3; ++i) {
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ receiveWriteChunks(
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setPendingBytes(50),
+ legacyFinalChunk(123, Status.OK));
+
+ future.get();
+ }
+ }
+
+ @Test
+ public void legacy_write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+ ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ assertThat(first.isDone()).isFalse();
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, second::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
+ }
+
+ @Test
+ public void legacy_write_sendErrorOnFirstPacket_failsImmediately() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ChannelOutputException exception = new ChannelOutputException("blah");
+ rpcClient.setChannelOutputException(exception);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
+ }
+
+ @Test
+ public void legacy_write_serviceRequestsNoData_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
+ }
+
+ @Test
+ public void legacy_write_invalidOffset_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(101)
+ .setPendingBytes(40)
+ .setMaxChunkSizeBytes(25));
+
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ legacyFinalChunk(123, Status.OUT_OF_RANGE));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE);
+ }
+
+ @Test
+ public void legacy_write_sendErrorOnLaterPacket_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ ChannelOutputException exception = new ChannelOutputException("blah");
+ rpcClient.setChannelOutputException(exception);
+
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setPendingBytes(50)
+ .setMaxChunkSizeBytes(30));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
+ }
+
+ @Test
+ public void legacy_write_cancelFuture_abortsTransfer() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+
+ assertThat(future.cancel(true)).isTrue();
+
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setPendingBytes(50)
+ .setMaxChunkSizeBytes(50));
+ assertThat(lastChunks()).contains(legacyFinalChunk(123, Status.CANCELLED));
+ }
+
+ @Test
+ public void legacy_write_transferProtocolError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ receiveWriteChunks(legacyFinalChunk(123, Status.NOT_FOUND));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND);
+ }
+
+ @Test
+ public void legacy_write_rpcError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+
+ receiveWriteServerError(Status.NOT_FOUND);
+
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void legacy_write_timeoutAfterInitialChunk() {
+ createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ // Call future.get() without sending any server-side packets.
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
+
+ // Client should have resent the last chunk (the initial chunk in this case) for each timeout.
+ assertThat(lastChunks())
+ .containsExactly(
+ initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial
+ initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // retry 1
+ initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); // retry 2
+ }
+
+ @Test
+ public void legacy_write_timeoutAfterSingleChunk() {
+ createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY);
+
+ // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
+ enqueueWriteChunks(2,
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setPendingBytes(90)
+ .setMaxChunkSizeBytes(30));
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
+
+ Chunk data = newLegacyChunk(Chunk.Type.DATA, 123)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0)
+ .build();
+ assertThat(lastChunks())
+ .containsExactly(
+ initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial
+ data, // data chunk
+ data, // retry 1
+ data); // retry 2
+ }
+
+ @Test
+ public void legacy_write_multipleTimeoutsAndRecoveries() throws Exception {
+ createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY);
+
+ // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
+ enqueueWriteChunks(2,
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setWindowEndOffset(40)
+ .setMaxChunkSizeBytes(20));
+
+ // After the second retry, send more transfer parameters
+ enqueueWriteChunks(4,
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(40)
+ .setWindowEndOffset(120)
+ .setMaxChunkSizeBytes(40));
+
+ // After the first retry, send more transfer parameters
+ enqueueWriteChunks(3,
+ newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(80)
+ .setWindowEndOffset(160)
+ .setMaxChunkSizeBytes(10));
+
+ // After the second retry, confirm completed
+ enqueueWriteChunks(
+ 4, newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setStatus(Status.OK.code()));
+
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+
+ assertThat(lastChunks())
+ .containsExactly(
+ // initial
+ initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ // after 2, receive parameters: 40 from 0 by 20
+ legacyDataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
+ legacyDataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
+ legacyDataChunk(123, TEST_DATA_100B, 20, 40), // retry 1
+ legacyDataChunk(123, TEST_DATA_100B, 20, 40), // retry 2
+ // after 4, receive parameters: 80 from 40 by 40
+ legacyDataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80
+ legacyDataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100
+ legacyDataChunk(123, TEST_DATA_100B, 80, 100), // retry 1
+ // after 3, receive parameters: 80 from 80 by 10
+ legacyDataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90
+ legacyDataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100
+ legacyDataChunk(123, TEST_DATA_100B, 90, 100), // retry 1
+ legacyDataChunk(123, TEST_DATA_100B, 90, 100)); // retry 2
+ // after 4, receive final OK
+ }
+
+ @Test
+ public void read_singleChunk_successful() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(3, TRANSFER_PARAMETERS);
+ assertThat(future.isDone()).isFalse();
+
+ assertThat(lastChunks()).containsExactly(initialReadChunk(3, ProtocolVersion.VERSION_TWO));
+
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, 321)
+ .setResourceId(3)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks()).containsExactly(readStartAckConfirmation(321, TRANSFER_PARAMETERS));
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 321).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+
+ assertThat(lastChunks())
+ .containsExactly(Chunk.newBuilder()
+ .setType(Chunk.Type.COMPLETION)
+ .setSessionId(321)
+ .setStatus(Status.OK.ordinal())
+ .build());
+
+ assertThat(future.isDone()).isFalse();
+
+ receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, 321));
+
+ assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
+ }
+
+ @Test
+ public void read_requestV2ReceiveLegacy() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+ assertThat(future.isDone()).isFalse();
+
+ assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO));
+
+ receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
+
+ // No handshake packets since the server responded as legacy.
+ assertThat(lastChunks()).containsExactly(legacyFinalChunk(1, Status.OK));
+
+ assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
+ }
+
+ @Test
+ public void read_failedPreconditionError_retriesInitialPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+
+ assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO));
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ receiveReadServerError(Status.FAILED_PRECONDITION);
+
+ assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO));
+ }
+
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, 54321)
+ .setResourceId(1)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks()).containsExactly(readStartAckConfirmation(54321, TRANSFER_PARAMETERS));
+ }
+
+ @Test
+ public void read_failedPreconditionError_abortsAfterInitial() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ TransferParameters params = TransferParameters.create(50, 50, 0);
+ ListenableFuture<byte[]> future = transferClient.read(1, params);
+
+ assertThat(lastChunks())
+ .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params));
+
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555)
+ .setResourceId(1)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ receiveReadServerError(Status.FAILED_PRECONDITION);
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void read_failedPreconditionError_abortsAfterHandshake() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ TransferParameters params = TransferParameters.create(50, 50, 0);
+ ListenableFuture<byte[]> future = transferClient.read(1, params);
+
+ assertThat(lastChunks())
+ .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params));
+
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555)
+ .setResourceId(1)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks()).containsExactly(readStartAckConfirmation(555, params));
+
+ receiveReadChunks(dataChunk(555, TEST_DATA_100B, 0, 50));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555)
+ .setOffset(50)
+ .setWindowEndOffset(100)
+ .setMaxChunkSizeBytes(50)
+ .build());
+
+ receiveReadServerError(Status.FAILED_PRECONDITION);
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void read_failedPreconditionErrorMaxRetriesTimes_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ receiveReadServerError(Status.FAILED_PRECONDITION);
+ }
+
+ Chunk initialChunk = initialReadChunk(1, ProtocolVersion.VERSION_TWO);
+ assertThat(lastChunks())
+ .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
+
+ receiveReadServerError(Status.FAILED_PRECONDITION);
+
+ assertThat(lastChunks()).isEmpty();
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(1);
+ assertThat(future.isDone()).isFalse();
+
+ performReadStartHandshake(1, 99);
+
+ receiveReadChunks(finalChunk(2, Status.OK),
+ newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
+ newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
+ receiveWriteChunks(finalChunk(99, Status.INVALID_ARGUMENT),
+ newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
+ newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
+
assertThat(future.isDone()).isFalse();
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+
+ performReadCompletionHandshake(99, Status.OK);
+
+ assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
+ }
+
+ @Test
+ public void read_empty() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(2);
+ performReadStartHandshake(2, 5678);
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 5678).setRemainingBytes(0));
+
+ performReadCompletionHandshake(5678, Status.OK);
+
+ assertThat(future.get()).isEqualTo(new byte[] {});
+ }
+
+ @Test
+ public void read_sendsTransferParametersFirst() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ TransferParameters params = TransferParameters.create(3, 2, 1);
+ ListenableFuture<byte[]> future = transferClient.read(99, params);
+
+ assertThat(lastChunks())
+ .containsExactly(initialReadChunk(99, ProtocolVersion.VERSION_TWO, params));
+ assertThat(future.cancel(true)).isTrue();
+ }
+
+ @Test
+ public void read_severalChunks() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(7, TRANSFER_PARAMETERS);
+
+ performReadStartHandshake(7, 123, TRANSFER_PARAMETERS);
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)).setRemainingBytes(70),
+ newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40)));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .setOffset(40)
+ .setMaxChunkSizeBytes(30)
+ .setWindowEndOffset(90)
+ .build());
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70)));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .setOffset(70)
+ .setMaxChunkSizeBytes(30)
+ .setWindowEndOffset(120)
+ .build());
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 100)).setRemainingBytes(0));
+
+ performReadCompletionHandshake(123, Status.OK);
+
+ assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
+ }
+
+ @Test
+ public void read_progressCallbackIsCalled() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future =
+ transferClient.read(123, TRANSFER_PARAMETERS, progressCallback);
+
+ performReadStartHandshake(123, 123, TRANSFER_PARAMETERS);
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)),
+ newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)),
+ newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 60)).setRemainingBytes(5),
+ newChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)),
+ newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 80)).setRemainingBytes(20),
+ newChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)),
+ newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)));
+ lastChunks(); // Discard chunks; no need to inspect for this test
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+ performReadCompletionHandshake(123, Status.OK);
+
+ verify(progressCallback, times(6)).accept(progress.capture());
+ assertThat(progress.getAllValues())
+ .containsExactly(TransferProgress.create(30, 30, TransferProgress.UNKNOWN_TRANSFER_SIZE),
+ TransferProgress.create(50, 50, TransferProgress.UNKNOWN_TRANSFER_SIZE),
+ TransferProgress.create(60, 60, 65),
+ TransferProgress.create(70, 70, TransferProgress.UNKNOWN_TRANSFER_SIZE),
+ TransferProgress.create(80, 80, 100),
+ TransferProgress.create(100, 100, 100));
+
+ assertThat(future.isDone()).isTrue();
+ }
+
+ @Test
+ public void read_rewindWhenPacketsSkipped() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
+
+ performReadStartHandshake(123, 123, TRANSFER_PARAMETERS);
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50)));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setWindowEndOffset(50)
+ .setMaxChunkSizeBytes(30)
+ .setOffset(0)
+ .build());
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)),
+ newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .setOffset(30)
+ .setWindowEndOffset(80)
+ .setMaxChunkSizeBytes(30)
+ .build());
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(50)
+ .setWindowEndOffset(100)
+ .setMaxChunkSizeBytes(30)
+ .build());
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .setOffset(80)
+ .setWindowEndOffset(130)
+ .setMaxChunkSizeBytes(30)
+ .build());
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+
+ performReadCompletionHandshake(123, Status.OK);
+
+ assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
+ }
+
+ @Test
+ public void read_multipleWithSameId_sequentially_successful() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ for (int i = 0; i < 3; ++i) {
+ ListenableFuture<byte[]> future = transferClient.read(1);
+
+ performReadStartHandshake(1, 100 + i);
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 100 + i)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
+
+ performReadCompletionHandshake(100 + i, Status.OK);
+
+ assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
+ }
+ }
+
+ @Test
+ public void read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> first = transferClient.read(123);
+ ListenableFuture<byte[]> second = transferClient.read(123);
+
+ assertThat(first.isDone()).isFalse();
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, second::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
+ }
+
+ @Test
+ public void read_sendErrorOnFirstPacket_fails() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ChannelOutputException exception = new ChannelOutputException("blah");
+ rpcClient.setChannelOutputException(exception);
+ ListenableFuture<byte[]> future = transferClient.read(123);
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
+ }
+
+ @Test
+ public void read_sendErrorOnLaterPacket_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(1024, TRANSFER_PARAMETERS);
+
+ performReadStartHandshake(1024, 123, TRANSFER_PARAMETERS);
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)));
+
+ ChannelOutputException exception = new ChannelOutputException("blah");
+ rpcClient.setChannelOutputException(exception);
+
+ receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50)));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
+ }
+
+ @Test
+ public void read_cancelFuture_abortsTransfer() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+
+ performReadStartHandshake(1, 123, TRANSFER_PARAMETERS);
+
+ assertThat(future.cancel(true)).isTrue();
+
+ assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED));
+ }
+
+ @Test
+ public void read_immediateTransferProtocolError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(123);
+
+ // Resource ID will be set since session ID hasn't been assigned yet.
+ receiveReadChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID)
+ .setResourceId(123)
+ .setStatus(Status.ALREADY_EXISTS.ordinal()));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
+ }
+
+ @Test
+ public void read_laterTransferProtocolError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(123);
+
+ performReadStartHandshake(123, 514);
+
+ receiveReadChunks(finalChunk(514, Status.ALREADY_EXISTS));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
+ }
+
+ @Test
+ public void read_rpcError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(2);
+
+ receiveReadServerError(Status.NOT_FOUND);
+
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void read_serverRespondsWithUnknownVersion_invalidArgument() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS);
+
+ assertThat(lastChunks())
+ .containsExactly(initialReadChunk(2, ProtocolVersion.VERSION_TWO, TRANSFER_PARAMETERS));
+
+ receiveReadChunks(
+ newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613));
+
+ assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT));
+
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
+ }
+
+ @Test
+ public void read_timeout() {
+ createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
+
+ // Call future.get() without sending any server-side packets.
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
+
+ // read should have retried sending the transfer parameters 2 times, for a total of 3
+ assertThat(lastChunks())
+ .containsExactly(initialReadChunk(123, ProtocolVersion.VERSION_TWO),
+ initialReadChunk(123, ProtocolVersion.VERSION_TWO),
+ initialReadChunk(123, ProtocolVersion.VERSION_TWO));
+ }
+
+ @Test
+ public void write_singleChunk() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+
+ // Do the start handshake (equivalent to performWriteStartHandshake()).
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 123)
+ .setResourceId(2)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
+ .setRemainingBytes(TEST_DATA_SHORT.size())
+ .build());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setWindowEndOffset(1024)
+ .setMaxChunkSizeBytes(128));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 123)
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0)
+ .build());
+
+ receiveWriteChunks(finalChunk(123, Status.OK));
+
+ assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+ }
+
+ @Test
+ public void write_requestV2ReceiveLegacy() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+
+ receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
+ .setOffset(0)
+ .setWindowEndOffset(1024)
+ .setMaxChunkSizeBytes(128),
+ legacyFinalChunk(2, Status.OK));
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+ }
+
+ @Test
+ public void write_platformTransferDisabled_aborted() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ assertThat(future.isDone()).isFalse();
+
+ shouldAbortFlag = true;
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ABORTED);
+ }
+
+ @Test
+ public void write_failedPreconditionError_retriesInitialPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ receiveWriteServerError(Status.FAILED_PRECONDITION);
+
+ assertThat(lastChunks())
+ .containsExactly(
+ initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+ }
+
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 54321)
+ .setResourceId(2)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 54321)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
+ .setRemainingBytes(TEST_DATA_SHORT.size())
+ .build());
+ }
+
+ @Test
+ public void write_failedPreconditionError_abortsAfterInitialPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray());
+
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()));
+
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 4)
+ .setResourceId(2)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ receiveWriteServerError(Status.FAILED_PRECONDITION);
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void write_failedPreconditionErrorMaxRetriesTimes_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ receiveWriteServerError(Status.FAILED_PRECONDITION);
+ }
+
+ Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size());
+ assertThat(lastChunks())
+ .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
+
+ receiveWriteServerError(Status.FAILED_PRECONDITION);
+
+ assertThat(lastChunks()).isEmpty();
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
+ }
+
+ @Test
+ public void write_empty() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, new byte[] {});
+
+ performWriteStartHandshake(2, 123, 0);
+
+ receiveWriteChunks(finalChunk(123, Status.OK));
+
+ assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+ }
+
+ @Test
+ public void write_severalChunks() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(500, TEST_DATA_100B.toByteArray());
+
+ performWriteStartHandshake(500, 123, TEST_DATA_100B.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setWindowEndOffset(50)
+ .setMaxChunkSizeBytes(30)
+ .setMinDelayMicroseconds(1));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(),
+ newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(50)
+ .setWindowEndOffset(90)
+ .setMaxChunkSizeBytes(25));
+
+ assertThat(lastChunks())
+ .containsExactly(
+ newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(),
+ newChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build());
+
+ receiveWriteChunks(
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setWindowEndOffset(140));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 123)
+ .setOffset(90)
+ .setData(range(90, 100))
+ .setRemainingBytes(0)
+ .build());
+
+ assertThat(future.isDone()).isFalse();
+
+ receiveWriteChunks(finalChunk(123, Status.OK));
+
+ assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+ }
+
+ @Test
+ public void write_parametersContinue() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(321, TEST_DATA_100B.toByteArray());
+
+ performWriteStartHandshake(321, 123, TEST_DATA_100B.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setWindowEndOffset(50)
+ .setMaxChunkSizeBytes(30)
+ .setMinDelayMicroseconds(1));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(),
+ newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build());
+
+ receiveWriteChunks(
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(30).setWindowEndOffset(80));
+
+ // Transfer doesn't roll back to offset 30 but instead continues sending up to 80.
+ assertThat(lastChunks())
+ .containsExactly(
+ newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build());
+
+ receiveWriteChunks(
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(130));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 123)
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0)
+ .build());
+
+ assertThat(future.isDone()).isFalse();
+
+ receiveWriteChunks(finalChunk(123, Status.OK));
+
+ assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+ }
+
+ @Test
+ public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+
+ performWriteStartHandshake(123, 555, TEST_DATA_100B.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555)
+ .setOffset(0)
+ .setWindowEndOffset(90)
+ .setMaxChunkSizeBytes(90)
+ .setMinDelayMicroseconds(1));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 555).setOffset(0).setData(range(0, 90)).build());
+
+ receiveWriteChunks(
+ // This stale packet with a window end before the offset should be ignored.
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(25).setWindowEndOffset(50),
+ // Start from an arbitrary offset before the current, but extend the window to the end.
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(80).setWindowEndOffset(100));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.DATA, 555)
+ .setOffset(90)
+ .setData(range(90, 100))
+ .setRemainingBytes(0)
+ .build());
+
+ receiveWriteChunks(finalChunk(555, Status.OK));
+ assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 555).build());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+ }
+
+ @Test
+ public void write_progressCallbackIsCalled() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future =
+ transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback);
+
+ performWriteStartHandshake(123, 123, TEST_DATA_100B.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setWindowEndOffset(90)
+ .setMaxChunkSizeBytes(30),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setWindowEndOffset(100),
+ finalChunk(123, Status.OK));
+
+ verify(progressCallback, times(6)).accept(progress.capture());
+ assertThat(progress.getAllValues())
+ .containsExactly(TransferProgress.create(30, 0, 100),
+ TransferProgress.create(60, 0, 100),
+ TransferProgress.create(90, 0, 100),
+ TransferProgress.create(80, 50, 100),
+ TransferProgress.create(100, 50, 100),
+ TransferProgress.create(100, 100, 100));
+ assertThat(future.isDone()).isTrue();
+ }
+
+ @Test
+ public void write_asksForFinalOffset_sendsFinalPacket() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+
+ performWriteStartHandshake(123, 456, TEST_DATA_100B.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 456)
+ .setOffset(100)
+ .setWindowEndOffset(140)
+ .setMaxChunkSizeBytes(25));
+
+ assertThat(lastChunks())
+ .containsExactly(
+ newChunk(Chunk.Type.DATA, 456).setOffset(100).setRemainingBytes(0).build());
}
@Test
public void write_multipleWithSameId_sequentially_successful() throws Exception {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
for (int i = 0; i < 3; ++i) {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
+ ListenableFuture<Void> future = transferClient.write(6, TEST_DATA_SHORT.toByteArray());
+
+ performWriteStartHandshake(6, 123, TEST_DATA_SHORT.size());
receiveWriteChunks(
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0).setPendingBytes(50),
- finalChunk(ID, Status.OK));
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setWindowEndOffset(50),
+ finalChunk(123, Status.OK));
+
+ assertThat(lastChunks())
+ .containsExactly(
+ newChunk(Chunk.Type.DATA, 123).setData(TEST_DATA_SHORT).setRemainingBytes(0).build(),
+ newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
future.get();
}
@@ -696,6 +1764,7 @@ public final class TransferClientTest {
@Test
public void write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
@@ -708,6 +1777,7 @@ public final class TransferClientTest {
@Test
public void write_sendErrorOnFirstPacket_failsImmediately() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
@@ -719,9 +1789,15 @@ public final class TransferClientTest {
@Test
public void write_serviceRequestsNoData_aborts() {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray());
+
+ performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0));
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0));
+ assertThat(lastChunks()).containsExactly(finalChunk(123, Status.INVALID_ARGUMENT));
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
@@ -729,16 +1805,18 @@ public final class TransferClientTest {
@Test
public void write_invalidOffset_aborts() {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ performWriteStartHandshake(7, 123, TEST_DATA_100B.size());
+
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(101)
- .setPendingBytes(40)
+ .setWindowEndOffset(141)
.setMaxChunkSizeBytes(25));
- assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(ID, TEST_DATA_100B.size()), finalChunk(ID, Status.OUT_OF_RANGE));
+ assertThat(lastChunks()).containsExactly(finalChunk(123, Status.OUT_OF_RANGE));
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE);
@@ -746,14 +1824,17 @@ public final class TransferClientTest {
@Test
public void write_sendErrorOnLaterPacket_aborts() {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray());
+
+ performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size());
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
- .setPendingBytes(50)
+ .setWindowEndOffset(50)
.setMaxChunkSizeBytes(30));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
@@ -763,22 +1844,46 @@ public final class TransferClientTest {
@Test
public void write_cancelFuture_abortsTransfer() {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray());
+
+ performWriteStartHandshake(7, 123, TEST_DATA_100B.size());
assertThat(future.cancel(true)).isTrue();
+ assertThat(future.isCancelled()).isTrue();
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
- .setPendingBytes(50)
+ .setWindowEndOffset(50)
.setMaxChunkSizeBytes(50));
- assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED));
+
+ assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED));
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123));
+ }
+
+ @Test
+ public void write_immediateTransferProtocolError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID)
+ .setResourceId(123)
+ .setStatus(Status.NOT_FOUND.ordinal()));
+
+ ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
+ assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
+
+ assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND);
}
@Test
- public void write_protocolError_aborts() {
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
+ public void write_laterTransferProtocolError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+
+ performWriteStartHandshake(123, 123, TEST_DATA_SHORT.size());
- receiveWriteChunks(finalChunk(ID, Status.NOT_FOUND));
+ receiveWriteChunks(finalChunk(123, Status.NOT_FOUND));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -788,6 +1893,7 @@ public final class TransferClientTest {
@Test
public void write_rpcError_aborts() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
receiveWriteServerError(Status.NOT_FOUND);
@@ -798,22 +1904,40 @@ public final class TransferClientTest {
@Test
public void write_unknownVersion_invalidArgument() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2).setProtocolVersion(3));
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2).setProtocolVersion(9));
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(2, TEST_DATA_SHORT.size()), finalChunk(2, Status.INVALID_ARGUMENT));
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()),
+ finalChunk(3, Status.INVALID_ARGUMENT));
+ }
+
+ @Test
+ public void write_serverRespondsWithUnknownVersion_invalidArgument() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray());
+
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, 100));
+
+ receiveWriteChunks(
+ newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613));
+
+ assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT));
+
+ ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+ assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
}
@Test
public void write_timeoutAfterInitialChunk() {
- transferClient = createTransferClient(1, 1); // Create a manager with a very short timeout.
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
+ createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
+ ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
@@ -821,94 +1945,183 @@ public final class TransferClientTest {
// Client should have resent the last chunk (the initial chunk in this case) for each timeout.
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(ID, TEST_DATA_SHORT.size()), // initial
- initialWriteChunk(ID, TEST_DATA_SHORT.size()), // retry 1
- initialWriteChunk(ID, TEST_DATA_SHORT.size()), // retry 2
- finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort
+ .containsExactly(
+ initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial
+ initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // retry 1
+ initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); // retry 2
}
@Test
- public void write_timeoutAfterSingleChunk() throws Exception {
- transferClient.close();
- transferClient = createTransferClient(10, 10); // Create a manager with a very short timeout.
+ public void write_timeoutAfterSingleChunk() {
+ createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
- // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
+ // Wait for two outgoing packets (Write RPC request and first chunk), then do the handshake.
enqueueWriteChunks(2,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ newChunk(Chunk.Type.START_ACK, 123).setResourceId(9),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
- .setPendingBytes(90)
+ .setWindowEndOffset(90)
.setMaxChunkSizeBytes(30));
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
+ ListenableFuture<Void> future = transferClient.write(9, TEST_DATA_SHORT.toByteArray());
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
- Chunk data = newChunk(Chunk.Type.DATA, ID)
+ Chunk data = newChunk(Chunk.Type.DATA, 123)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0)
.build();
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(ID, TEST_DATA_SHORT.size()), // initial
+ .containsExactly(
+ initialWriteChunk(9, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
+ .setRemainingBytes(TEST_DATA_SHORT.size())
+ .build(),
data, // data chunk
data, // retry 1
- data, // retry 2
- finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort
+ data); // retry 2
+ }
+
+ @Test
+ public void write_timeoutAndRecoverDuringHandshakes() throws Exception {
+ createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
+ assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries
+
+ // Wait for four outgoing packets (Write RPC request and START chunk + retry), then handshake.
+ enqueueWriteChunks(3,
+ newChunk(Chunk.Type.START_ACK, 123)
+ .setResourceId(5)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ // Wait for start ack confirmation + 2 retries, then request three packets.
+ enqueueWriteChunks(3,
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(0)
+ .setWindowEndOffset(60)
+ .setMaxChunkSizeBytes(20));
+
+ // After two packets, request the remainder of the packets.
+ enqueueWriteChunks(
+ 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(20).setWindowEndOffset(200));
+
+ // Wait for last 3 data packets, then 2 final packet retries.
+ enqueueWriteChunks(5,
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(80)
+ .setWindowEndOffset(200)
+ .setMaxChunkSizeBytes(20),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .setOffset(80)
+ .setWindowEndOffset(200)
+ .setMaxChunkSizeBytes(20));
+
+ // After the retry, confirm completed multiple times; additional packets should be dropped
+ enqueueWriteChunks(1,
+ newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()),
+ newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()),
+ newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()),
+ newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()));
+
+ ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray());
+
+ assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
+
+ final Chunk startAckConfirmation =
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
+ .setRemainingBytes(TEST_DATA_100B.size())
+ .build();
+
+ assertThat(lastChunks())
+ .containsExactly(
+ // initial handshake with retries
+ initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
+ initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
+ startAckConfirmation,
+ startAckConfirmation,
+ startAckConfirmation,
+ // send all data
+ dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
+ dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
+ dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60
+ dataChunk(123, TEST_DATA_100B, 60, 80), // data 60-80
+ dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ // retry last packet two times
+ dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ // respond to two PARAMETERS_RETRANSMIT packets
+ dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ // respond to OK packet
+ newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
}
@Test
public void write_multipleTimeoutsAndRecoveries() throws Exception {
- transferClient.close();
- transferClient = createTransferClient(10, 10); // Create a manager with short timeouts.
+ createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
+ assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries
- // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
+ // Wait for two outgoing packets (Write RPC request and START chunk), then do the handshake.
enqueueWriteChunks(2,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ newChunk(Chunk.Type.START_ACK, 123)
+ .setResourceId(5)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ // Request two packets.
+ enqueueWriteChunks(1,
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
- .setPendingBytes(40)
+ .setWindowEndOffset(40)
.setMaxChunkSizeBytes(20));
// After the second retry, send more transfer parameters
enqueueWriteChunks(4,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(40)
- .setPendingBytes(80)
+ .setWindowEndOffset(120)
.setMaxChunkSizeBytes(40));
// After the first retry, send more transfer parameters
enqueueWriteChunks(3,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(80)
- .setPendingBytes(80)
+ .setWindowEndOffset(160)
.setMaxChunkSizeBytes(10));
// After the second retry, confirm completed
- enqueueWriteChunks(
- 4, newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setStatus(Status.OK.code()));
+ enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()));
+ enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 123));
- ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
+ ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
assertThat(lastChunks())
.containsExactly(
- // initial
- initialWriteChunk(ID, TEST_DATA_100B.size()),
+ // initial handshake
+ initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
+ .setRemainingBytes(TEST_DATA_100B.size())
+ .build(),
// after 2, receive parameters: 40 from 0 by 20
- dataChunk(ID, TEST_DATA_100B, 0, 20), // data 0-20
- dataChunk(ID, TEST_DATA_100B, 20, 40), // data 20-40
- dataChunk(ID, TEST_DATA_100B, 20, 40), // retry 1
- dataChunk(ID, TEST_DATA_100B, 20, 40), // retry 2
+ dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
+ dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
+ dataChunk(123, TEST_DATA_100B, 20, 40), // retry 1
+ dataChunk(123, TEST_DATA_100B, 20, 40), // retry 2
// after 4, receive parameters: 80 from 40 by 40
- dataChunk(ID, TEST_DATA_100B, 40, 80), // data 40-80
- dataChunk(ID, TEST_DATA_100B, 80, 100), // data 80-100
- dataChunk(ID, TEST_DATA_100B, 80, 100), // retry 1
+ dataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80
+ dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100
+ dataChunk(123, TEST_DATA_100B, 80, 100), // retry 1
// after 3, receive parameters: 80 from 80 by 10
- dataChunk(ID, TEST_DATA_100B, 80, 90), // data 80-90
- dataChunk(ID, TEST_DATA_100B, 90, 100), // data 90-100
- dataChunk(ID, TEST_DATA_100B, 90, 100), // retry 1
- dataChunk(ID, TEST_DATA_100B, 90, 100)); // retry 2
- // after 4, receive final OK
+ dataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90
+ dataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100
+ dataChunk(123, TEST_DATA_100B, 90, 100), // retry 1
+ dataChunk(123, TEST_DATA_100B, 90, 100), // retry 2
+ // after 4, receive final OK
+ newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
}
private static ByteString range(int startInclusive, int endExclusive) {
@@ -922,39 +2135,79 @@ public final class TransferClientTest {
return ByteString.copyFrom(bytes);
}
- private static Chunk.Builder newChunk(Chunk.Type type, int resourceId) {
- return Chunk.newBuilder().setType(type).setTransferId(resourceId);
+ private static Chunk.Builder newLegacyChunk(Chunk.Type type, int transferId) {
+ return Chunk.newBuilder().setType(type).setTransferId(transferId);
+ }
+
+ private static Chunk.Builder newChunk(Chunk.Type type, int sessionId) {
+ return Chunk.newBuilder().setType(type).setSessionId(sessionId);
}
- private static Chunk initialReadChunk(int resourceId) {
- return initialReadChunk(resourceId, TRANSFER_PARAMETERS);
+ private static Chunk initialReadChunk(int resourceId, ProtocolVersion version) {
+ return initialReadChunk(resourceId, version, TRANSFER_PARAMETERS);
}
- private static Chunk initialReadChunk(int resourceId, TransferParameters params) {
- Chunk.Builder chunk = newChunk(Chunk.Type.START, resourceId)
+ private static Chunk initialReadChunk(
+ int resourceId, ProtocolVersion version, TransferParameters params) {
+ Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId)
.setResourceId(resourceId)
.setPendingBytes(params.maxPendingBytes())
.setWindowEndOffset(params.maxPendingBytes())
.setMaxChunkSizeBytes(params.maxChunkSizeBytes())
.setOffset(0);
+ if (version != ProtocolVersion.LEGACY) {
+ chunk.setProtocolVersion(version.ordinal());
+ }
+ if (params.chunkDelayMicroseconds() > 0) {
+ chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds());
+ }
+ return chunk.build();
+ }
+ private static Chunk readStartAckConfirmation(int sessionId, TransferParameters params) {
+ Chunk.Builder chunk = newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId)
+ .setWindowEndOffset(params.maxPendingBytes())
+ .setMaxChunkSizeBytes(params.maxChunkSizeBytes())
+ .setOffset(0)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal());
if (params.chunkDelayMicroseconds() > 0) {
chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds());
}
return chunk.build();
}
- private static Chunk initialWriteChunk(int resourceId, int size) {
- return newChunk(Chunk.Type.START, resourceId)
- .setResourceId(resourceId)
- .setRemainingBytes(size)
- .build();
+ private static Chunk initialWriteChunk(int resourceId, ProtocolVersion version, int size) {
+ Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId)
+ .setResourceId(resourceId)
+ .setRemainingBytes(size);
+ if (version != ProtocolVersion.LEGACY) {
+ chunk.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal());
+ }
+ return chunk.build();
+ }
+
+ private static Chunk legacyFinalChunk(int sessionId, Status status) {
+ return newLegacyChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build();
}
private static Chunk finalChunk(int sessionId, Status status) {
return newChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build();
}
+ private static Chunk legacyDataChunk(int sessionId, ByteString data, int start, int end) {
+ if (start < 0 || end > data.size()) {
+ throw new IndexOutOfBoundsException("Invalid start or end");
+ }
+
+ Chunk.Builder chunk = newLegacyChunk(Chunk.Type.DATA, sessionId)
+ .setOffset(start)
+ .setData(data.substring(start, end));
+ if (end == data.size()) {
+ chunk.setRemainingBytes(0);
+ }
+ return chunk.build();
+ }
+
private static Chunk dataChunk(int sessionId, ByteString data, int start, int end) {
if (start < 0 || end > data.size()) {
throw new IndexOutOfBoundsException("Invalid start or end");
@@ -995,6 +2248,48 @@ public final class TransferClientTest {
}
}
+ private void performReadStartHandshake(int resourceId, int sessionId) {
+ performReadStartHandshake(
+ resourceId, sessionId, TransferClient.DEFAULT_READ_TRANSFER_PARAMETERS);
+ }
+
+ private void performReadStartHandshake(int resourceId, int sessionId, TransferParameters params) {
+ assertThat(lastChunks())
+ .containsExactly(initialReadChunk(resourceId, ProtocolVersion.VERSION_TWO, params));
+
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, sessionId)
+ .setResourceId(resourceId)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks()).containsExactly(readStartAckConfirmation(sessionId, params));
+ }
+
+ private void performReadCompletionHandshake(int sessionId, Status status) {
+ assertThat(lastChunks())
+ .containsExactly(Chunk.newBuilder()
+ .setType(Chunk.Type.COMPLETION)
+ .setSessionId(sessionId)
+ .setStatus(status.ordinal())
+ .build());
+
+ receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, sessionId));
+ }
+
+ private void performWriteStartHandshake(int resourceId, int sessionId, int dataSize) {
+ assertThat(lastChunks())
+ .containsExactly(initialWriteChunk(resourceId, ProtocolVersion.VERSION_TWO, dataSize));
+
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, sessionId)
+ .setResourceId(resourceId)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
+
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId)
+ .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
+ .setRemainingBytes(dataSize)
+ .build());
+ }
+
/** Receive these chunks after a chunk is sent. */
private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) {
syncWithTransferThread(
@@ -1006,13 +2301,29 @@ public final class TransferClientTest {
return rpcClient.lastClientStreams(Chunk.class);
}
- private TransferClient createTransferClient(
- int transferTimeoutMillis, int initialTransferTimeoutMillis) {
- return new TransferClient(rpcClient.client().method(CHANNEL_ID, SERVICE + "/Read"),
+ private void createTransferClientThatMayTimeOut(ProtocolVersion version) {
+ createTransferClient(version, 1, 1, TransferEventHandler::runForTestsThatMustTimeOut);
+ }
+
+ private void createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion version) {
+ createTransferClient(version, 60000, 60000, TransferEventHandler::run);
+ }
+
+ private void createTransferClient(ProtocolVersion version,
+ int transferTimeoutMillis,
+ int initialTransferTimeoutMillis,
+ Consumer<TransferEventHandler> eventHandlerFunction) {
+ if (transferClient != null) {
+ throw new AssertionError("createTransferClient must only be called once!");
+ }
+ transferClient = new TransferClient(rpcClient.client().method(CHANNEL_ID, SERVICE + "/Read"),
rpcClient.client().method(CHANNEL_ID, SERVICE + "/Write"),
transferTimeoutMillis,
initialTransferTimeoutMillis,
MAX_RETRIES,
- () -> this.shouldAbortFlag);
+ ()
+ -> this.shouldAbortFlag,
+ eventHandlerFunction);
+ transferClient.setProtocolVersion(version);
}
}