diff options
author | Wyatt Hepler <hepler@google.com> | 2022-11-14 22:39:19 +0000 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-11-14 22:39:19 +0000 |
commit | 49e27cb5be542c4737a9c0efe1a646c7424e65a6 (patch) | |
tree | 3ac3bff43ecaa7108ef43683d6644b1ccc22c8ca /pw_transfer | |
parent | 675e2b1b8d5592a2ba7681d812b3ecaf8e6d32c4 (diff) | |
download | pigweed-49e27cb5be542c4737a9c0efe1a646c7424e65a6.tar.gz |
pw_transfer: Update Java client to support v2 protocol
- Support the v2 protocol, including implementing starting and ending
handshakes.
- Move retry functionality into the base Transfer class to share more
code between read and write transfers.
- Have sendChunk() throw an exception rather than return false transfer
is aborted. This simplifies the logic around sendChunk() calls.
- Move the cleanUp() function into the Completed() state constructor.
This removes the need to check if cleanup has already occurred.
- Update resource/session ID handling to use resource ID until session
ID has been assigned.
- Improve log statements by adding a descriptive toString() for Transfer
objects.
- Make legacy and v2 versions of all of the unit tests.
- Have tests that need to timeout use a slightly different test version
of the transfer thread run loop to prevent test flakes.
- Add a small section about the Java client to the docs.
Bug: b/235517604, b/235387007, b/257343283
Change-Id: Ib969b843a1eac3d7e3d0007c492d5e50e4c0f8c9
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/117151
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Diffstat (limited to 'pw_transfer')
10 files changed, 2029 insertions, 522 deletions
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst index 6ed214d14..e1ad8babc 100644 --- a/pw_transfer/docs.rst +++ b/pw_transfer/docs.rst @@ -311,7 +311,6 @@ Python Typescript ========== - Provides a simple interface for transferring bulk data over pw_rpc. **Example** @@ -339,6 +338,36 @@ Provides a simple interface for transferring bulk data over pw_rpc. console.log(`Failed to read: ${error.status}`); }); +Java +==== +pw_transfer provides a Java client. The transfer client returns a +`ListenableFuture <https://guava.dev/releases/21.0/api/docs/com/google/common/util/concurrent/ListenableFuture>`_ +to represent the results of a read or write transfer. + +.. code-block:: java + + import dev.pigweed.pw_transfer.TransferClient; + + // Create a new transfer client. + TransferClient client = new TransferClient( + transferReadMethodClient, + transferWriteMethodClient, + transferTimeoutMillis, + initialTransferTimeoutMillis, + maxRetries); + + // Start a read transfer. + ListenableFuture<byte[]> readTransfer = client.read(123); + + // Start a write transfer. + ListenableFuture<Void> writeTransfer = client.write(123, dataToWrite); + + // Get the data from the read transfer. + byte[] readData = readTransfer.get(); + + // Wait for the write transfer to complete. + writeTransfer.get(); + -------- Protocol -------- diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java index f564029e0..ec98253fe 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java @@ -50,6 +50,7 @@ class ReadTransfer extends Transfer<byte[]> { private int windowEndOffset; ReadTransfer(int resourceId, + ProtocolVersion desiredProtocolVersion, TransferInterface transferManager, int timeoutMillis, int initialTimeoutMillis, @@ -58,6 +59,7 @@ class ReadTransfer extends Transfer<byte[]> { Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback) { super(resourceId, + desiredProtocolVersion, transferManager, timeoutMillis, initialTimeoutMillis, @@ -74,10 +76,8 @@ class ReadTransfer extends Transfer<byte[]> { } @Override - VersionedChunk getInitialChunk(ProtocolVersion desiredProcotolVersion) { - return setTransferParameters( - VersionedChunk.createInitialChunk(desiredProcotolVersion, getResourceId())) - .build(); + void prepareInitialChunk(VersionedChunk.Builder chunk) { + setTransferParameters(chunk); } @Override @@ -90,18 +90,12 @@ class ReadTransfer extends Transfer<byte[]> { return chunk; } - class ReceivingData extends State { + class ReceivingData extends ActiveState { @Override - void handleTimeout() { - setState(new Recovery()); - } - - @Override - void handleDataChunk(VersionedChunk chunk) { + public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { if (chunk.offset() != offset) { - logger.atFine().log( - "Transfer %d expected offset %d, received %d; resending transfer parameters", - getId(), + logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters", + ReadTransfer.this, offset, chunk.offset()); @@ -120,7 +114,7 @@ class ReadTransfer extends Transfer<byte[]> { if (chunk.remainingBytes().isPresent()) { if (chunk.remainingBytes().getAsLong() == 0) { - setStateCompletedAndSendFinalChunk(Status.OK); + setStateTerminatingAndSendFinalChunk(Status.OK); return; } @@ -142,7 +136,7 @@ class ReadTransfer extends Transfer<byte[]> { if (remainingWindowSize == 0) { logger.atFiner().log( - "Transfer %d received all pending bytes; sending transfer parameters update", getId()); + "%s received all pending bytes; sending transfer parameters update", ReadTransfer.this); sendChunk(prepareTransferParameters(/*extend=*/false)); } else if (extendWindow) { sendChunk(prepareTransferParameters(/*extend=*/true)); diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java index 06ecf45b8..04027ab5b 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java @@ -20,8 +20,10 @@ import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE; import com.google.common.util.concurrent.SettableFuture; import dev.pigweed.pw_log.Logger; import dev.pigweed.pw_rpc.Status; +import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface; import java.time.Duration; import java.time.Instant; +import java.util.Locale; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -32,9 +34,6 @@ abstract class Transfer<T> { // Largest nanosecond instant. Used to block indefinitely when no transfers are pending. static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE); - // TODO(hepler): Make this configurable - private static final ProtocolVersion DESIRED_PROTOCOL_VERSION = ProtocolVersion.LEGACY; - private final int resourceId; private final ProtocolVersion desiredProtocolVersion; private final TransferEventHandler.TransferInterface eventHandler; @@ -48,9 +47,8 @@ abstract class Transfer<T> { private int sessionId = VersionedChunk.UNASSIGNED_SESSION_ID; private ProtocolVersion configuredProtocolVersion = ProtocolVersion.UNKNOWN; - private State state = new Inactive(); private Instant deadline = NO_TIMEOUT; - private boolean isCleanedUp = false; + private State state; private VersionedChunk lastChunkSent; // The number of times this transfer has retried due to an RPC disconnection. Limit this to @@ -60,8 +58,8 @@ abstract class Transfer<T> { /** * Creates a new read or write transfer. - * * @param resourceId The resource ID of the transfer + * @param desiredProtocolVersion protocol version to request * @param eventHandler Interface to use to send a chunk. * @param timeoutMillis Maximum time to wait for a chunk from the server. * @param initialTimeoutMillis Maximum time to wait for the first chunk from the server. @@ -70,14 +68,15 @@ abstract class Transfer<T> { * @param shouldAbortCallback BooleanSupplier that returns true if a transfer should be aborted. */ Transfer(int resourceId, - TransferEventHandler.TransferInterface eventHandler, + ProtocolVersion desiredProtocolVersion, + TransferInterface eventHandler, int timeoutMillis, int initialTimeoutMillis, int maxRetries, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback) { this.resourceId = resourceId; - this.desiredProtocolVersion = DESIRED_PROTOCOL_VERSION; + this.desiredProtocolVersion = desiredProtocolVersion; this.eventHandler = eventHandler; this.future = SettableFuture.create(); @@ -94,16 +93,44 @@ abstract class Transfer<T> { } }, directExecutor()); + if (desiredProtocolVersion == ProtocolVersion.LEGACY) { + // Legacy transfers skip protocol negotiation stage and use the resource ID as the session ID. + configuredProtocolVersion = ProtocolVersion.LEGACY; + assignSessionId(resourceId); + state = getWaitingForDataState(); + } else { + state = new Initiating(); + } + startTime = Instant.now(); } - final int getResourceId() { + @Override + public String toString() { + return String.format(Locale.ENGLISH, + "%s(%d:%d)[%s]", + this.getClass().getSimpleName(), + resourceId, + sessionId, + state.getClass().getSimpleName()); + } + + public final int getResourceId() { return resourceId; } - /** Returns the current ID for this transfer, which may be the session or resource ID. */ - final int getId() { - return sessionId != VersionedChunk.UNASSIGNED_SESSION_ID ? sessionId : resourceId; + public final int getSessionId() { + return sessionId; + } + + private void assignSessionId(int newSessionId) { + sessionId = newSessionId; + eventHandler.assignSessionId(this); + } + + /** Terminates the transfer without sending any packets. */ + public final void terminate(TransferError error) { + changeState(new Completed(error)); } final Instant getDeadline() { @@ -127,27 +154,20 @@ abstract class Transfer<T> { } final void start() { - logger.atFine().log( - "Transfer %d for resource %d starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires", - getId(), - getResourceId(), + logger.atInfo().log( + "%s starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires", + this, timeoutMillis, initialTimeoutMillis, maxRetries); - if (!sendChunk(getInitialChunk(desiredProtocolVersion))) { + VersionedChunk.Builder chunk = + VersionedChunk.createInitialChunk(desiredProtocolVersion, resourceId); + prepareInitialChunk(chunk); + try { + sendChunk(chunk.build()); + } catch (TransferAbortedException e) { return; // Sending failed, transfer is cancelled } - - if (desiredProtocolVersion == ProtocolVersion.LEGACY) { - // Legacy transfers skip the protocol negotiation stage and use the resource ID as the session - // ID. - configuredProtocolVersion = ProtocolVersion.LEGACY; - sessionId = resourceId; - setState(getWaitingForDataState()); - } else { - setState(new Initiating()); - throw new AssertionError("Cannot set desired protocol to v2; not implemented yet!"); - } setInitialTimeout(); } @@ -156,29 +176,28 @@ abstract class Transfer<T> { // Since a packet has been received, don't allow retries on disconnection; abort instead. disconnectionRetries = Integer.MAX_VALUE; - if (chunk.version() == ProtocolVersion.UNKNOWN) { - logger.atWarning().log( - "Cannot handle packet from unsupported session ID %d", chunk.sessionId()); - setStateCompletedAndSendFinalChunk(Status.INVALID_ARGUMENT); - return; - } - - if (chunk.type() == Chunk.Type.COMPLETION) { - logger.atFinest().log("Event: handle final chunk"); - state.handleFinalChunk(chunk.status().orElseGet(() -> { - logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL"); - return Status.INTERNAL.code(); - })); - } else { - logger.atFinest().log("Event: handle data chunk"); - state.handleDataChunk(chunk); + try { + if (chunk.type() == Chunk.Type.COMPLETION) { + state.handleFinalChunk(chunk.status().orElseGet(() -> { + logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL"); + return Status.INTERNAL.code(); + })); + } else { + state.handleDataChunk(chunk); + } + } catch (TransferAbortedException e) { + // Transfer was aborted; nothing else to do. } } final void handleTimeoutIfDeadlineExceeded() { if (Instant.now().isAfter(deadline)) { - logger.atFinest().log("Event: handleTimeout since %s is after %s", Instant.now(), deadline); - state.handleTimeout(); + logger.atFinest().log("%s timed out since the deadline %s has passed", this, deadline); + try { + state.handleTimeout(); + } catch (TransferAbortedException e) { + // Transfer was aborted; nothing else to do. + } } } @@ -195,24 +214,27 @@ abstract class Transfer<T> { // disconnectionRetries is set to Int.MAX_VALUE when a packet is received to prevent retries // after the initial packet. if (disconnectionRetries++ < maxRetries) { - logger.atFine().log("Restarting the pw_transfer RPC for transfer %d (attempt %d/%d)", - sessionId, + logger.atFine().log("Restarting the pw_transfer RPC for %s (attempt %d/%d)", + this, disconnectionRetries, maxRetries); - if (sendChunk(getChunkForRetry())) { - setInitialTimeout(); + try { + sendChunk(getChunkForRetry()); + } catch (TransferAbortedException e) { + return; // Transfer is aborted; nothing else to do. } + setInitialTimeout(); } else { - cleanUp(new TransferError( + changeState(new Completed(new TransferError( "Transfer " + sessionId + " restarted " + maxRetries + " times, aborting", - Status.INTERNAL)); + Status.INTERNAL))); } } /** Returns the State to enter immediately after sending the first packet. */ abstract State getWaitingForDataState(); - abstract VersionedChunk getInitialChunk(ProtocolVersion desiredProtocolVersion); + abstract void prepareInitialChunk(VersionedChunk.Builder chunk); /** * Returns the chunk to send for a retry. Returns the initial chunk if no chunks have been sent. @@ -224,78 +246,61 @@ abstract class Transfer<T> { final VersionedChunk.Builder newChunk(Chunk.Type type) { return VersionedChunk.builder() - .setVersion(configuredProtocolVersion) + .setVersion(configuredProtocolVersion != ProtocolVersion.UNKNOWN ? configuredProtocolVersion + : desiredProtocolVersion) .setType(type) - .setSessionId(getId()); + .setSessionId(sessionId); } final VersionedChunk getLastChunkSent() { return lastChunkSent; } - final void setState(State newState) { + final State changeState(State newState) { if (newState != state) { - logger.atFinest().log( - "Transfer %d state %s -> %s", getId(), state.getName(), newState.getName()); + logger.atFinest().log("%s state %s -> %s", + this, + state.getClass().getSimpleName(), + newState.getClass().getSimpleName()); } state = newState; + return state; } - /** Sends a chunk. Returns true if sent, false if sending failed and the transfer was aborted. */ - final boolean sendChunk(VersionedChunk chunk) { + /** Exception thrown when the transfer is aborted. */ + static class TransferAbortedException extends Exception {} + + /** + * Sends a chunk. + * + * If sending fails, the transfer cannot proceed. sendChunk() sets the state to completed and + * throws a TransferAbortedException. + */ + final void sendChunk(VersionedChunk chunk) throws TransferAbortedException { lastChunkSent = chunk; if (shouldAbortCallback.getAsBoolean()) { logger.atWarning().log("Abort signal received."); - cleanUp(new TransferError(sessionId, Status.ABORTED)); - return false; + changeState(new Completed(new TransferError(this, Status.ABORTED))); + throw new TransferAbortedException(); } try { + logger.atFinest().log("%s sending %s", this, chunk); eventHandler.sendChunk(chunk.toMessage()); } catch (TransferError transferError) { - cleanUp(transferError); - return false; + changeState(new Completed(transferError)); + throw new TransferAbortedException(); } - return true; - } - - /** Performs final cleanup of a completed transfer. No packets are sent to the server. */ - private void cleanUp(Status status) { - eventHandler.unregisterTransfer(sessionId); - - logger.atInfo().log("Transfer %d completed with status %s", sessionId, status); - if (status.ok()) { - setFutureResult(); - } else { - future.setException(new TransferError(sessionId, status)); - } - isCleanedUp = true; - } - - /** Finishes the transfer due to an exception. No packets are sent to the server. */ - final void cleanUp(TransferError exception) { - eventHandler.unregisterTransfer(sessionId); - - logger.atWarning().withCause(exception).log("Transfer %d terminated with exception", sessionId); - future.setException(exception); - isCleanedUp = true; } /** Sends a status chunk to the server and finishes the transfer. */ - final void setStateCompletedAndSendFinalChunk(Status status) { - setState(new Completed()); - - logger.atFine().log("Sending final chunk for transfer %d with status %s", sessionId, status); - // If the transfer was completed due to concurrently handling chunks, don't send. - if (isCleanedUp) { - logger.atFine().log("Skipping sending final chunk on already-completed transfer"); - return; - } - - // Only call finish() if the sendChunk was successful. If it wasn't, the exception would have - // already terminated the transfer. - if (sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build())) { - cleanUp(status); + final void setStateTerminatingAndSendFinalChunk(Status status) throws TransferAbortedException { + logger.atFine().log("%s sending final chunk with status %s", this, status); + sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build()); + if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) { + changeState(new Terminating(status)); + } else { + changeState(new Completed(status)); } } @@ -308,9 +313,9 @@ abstract class Transfer<T> { long durationNanos = Duration.between(startTime, Instant.now()).toNanos(); long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos); - logger.atFine().log("Transfer %d progress: " + logger.atFine().log("%s progress: " + "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s", - sessionId, + this, progress.percentReceived(), bytesSent, bytesConfirmedReceived, @@ -318,67 +323,135 @@ abstract class Transfer<T> { totalRate); } - /** Represents a state in the transfer state machine. */ - abstract class State { - private String getName() { - return getClass().getSimpleName(); - } - - /** Called to handle a non-final chunk for this transfer. */ - void handleDataChunk(VersionedChunk chunk) { - logger.atFine().log( - "Transfer %d [%s state]: Received unexpected data chunk", getId(), getName()); - } + interface State { + /** + * Called to handle a non-final chunk for this transfer. + */ + void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException; + + /** + * Called to handle the final chunk for this transfer. + */ + void handleFinalChunk(int statusCode) throws TransferAbortedException; + + /** + * Called when this transfer's deadline expires. + */ + void handleTimeout() throws TransferAbortedException; + + /** + * Called if the transfer is cancelled by the user. + */ + void handleCancellation(); + + /** + * Called when the transfer thread is shutting down. + */ + void handleTermination(); + } - /** Called to handle the final chunk for this transfer. */ - void handleFinalChunk(int statusCode) { + /** Represents an active state in the transfer state machine. */ + abstract class ActiveState implements State { + @Override + public final void handleFinalChunk(int statusCode) throws TransferAbortedException { Status status = Status.fromCode(statusCode); - if (status != null) { - cleanUp(status); - setState(new Completed()); - } else { - logger.atWarning().log("Received invalid status value %d", statusCode); - setStateCompletedAndSendFinalChunk(Status.INVALID_ARGUMENT); + if (status == null) { + logger.atWarning().log("Received invalid status value %d, using INTERNAL", statusCode); + status = Status.INTERNAL; } + + // If this is not version 2, immediately clean up. If it is, send the COMPLETION_ACK first and + // clean up if that succeeded. + if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) { + sendChunk(newChunk(Chunk.Type.COMPLETION_ACK).build()); + } + changeState(new Completed(status)); } - /** Called when this transfer's deadline expires. */ - void handleTimeout() { - logger.atFine().log("Transfer %d [%s state]: Ignoring timeout", getId(), getName()); + /** Enters the recovery state and returns to this state if recovery succeeds. */ + @Override + public void handleTimeout() throws TransferAbortedException { + changeState(new Recovery(this)).handleTimeout(); } - /** Called if the transfer is cancelled by the user. */ - void handleCancellation() { - setStateCompletedAndSendFinalChunk(Status.CANCELLED); + @Override + public final void handleCancellation() { + try { + setStateTerminatingAndSendFinalChunk(Status.CANCELLED); + } catch (TransferAbortedException e) { + // Transfer was aborted; nothing to do. + } } - /** Called when the transfer thread is shutting down. */ - void handleTermination() { - setStateCompletedAndSendFinalChunk(Status.ABORTED); + @Override + public final void handleTermination() { + try { + setStateTerminatingAndSendFinalChunk(Status.ABORTED); + } catch (TransferAbortedException e) { + // Transfer was aborted; nothing to do. + } } } - private class Inactive extends State {} + private class Initiating extends ActiveState { + @Override + public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { + assignSessionId(chunk.sessionId()); + + if (chunk.version() == ProtocolVersion.UNKNOWN) { + logger.atWarning().log( + "%s aborting due to unsupported protocol version: %s", Transfer.this, chunk); + setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT); + return; + } + + changeState(getWaitingForDataState()); + + if (chunk.type() != Chunk.Type.START_ACK) { + logger.atFine().log( + "%s got non-handshake chunk; reverting to legacy protocol", Transfer.this); + configuredProtocolVersion = ProtocolVersion.LEGACY; + state.handleDataChunk(chunk); + return; + } - private class Initiating extends State { - // TODO(hepler): Implement the starting handshake + if (chunk.version().compareTo(desiredProtocolVersion) <= 0) { + configuredProtocolVersion = chunk.version(); + } else { + configuredProtocolVersion = desiredProtocolVersion; + } + + logger.atFine().log("%s negotiated protocol %s (ours=%s, theirs=%s)", + Transfer.this, + configuredProtocolVersion, + desiredProtocolVersion, + chunk.version()); + + VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION); + prepareInitialChunk(startAckConfirmation); + sendChunk(startAckConfirmation.build()); + } } /** Recovering from an expired timeout. */ - class Recovery extends State { + class Recovery extends ActiveState { + private final State nextState; private int retries; + Recovery(State nextState) { + this.nextState = nextState; + } + @Override - void handleDataChunk(VersionedChunk chunk) { - setState(getWaitingForDataState()); - state.handleDataChunk(chunk); + public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { + changeState(nextState).handleDataChunk(chunk); } @Override - void handleTimeout() { + public void handleTimeout() throws TransferAbortedException { if (retries < maxRetries) { - logger.atFiner().log("Transfer %d received no chunks for %d ms; retrying %d/%d", - getId(), + logger.atFiner().log("%s received no chunks for %d ms; retrying %d/%d", + Transfer.this, timeoutMillis, retries, maxRetries); @@ -388,20 +461,68 @@ abstract class Transfer<T> { return; } - setStateCompletedAndSendFinalChunk(Status.DEADLINE_EXCEEDED); + // If the transfer timed out, skip to the completed state. Don't send any more packets. + changeState(new Completed(Status.DEADLINE_EXCEEDED)); } } /** Transfer completed. Do nothing if the transfer is terminated or cancelled. */ - class Completed extends State { - Completed() { + class Terminating extends ActiveState { + private final Status status; + + Terminating(Status status) { + this.status = status; + } + + @Override + public void handleDataChunk(VersionedChunk chunk) { + if (chunk.type() == Chunk.Type.COMPLETION_ACK) { + changeState(new Completed(status)); + } + } + } + + class Completed implements State { + /** Performs final cleanup of a completed transfer. No packets are sent to the server. */ + Completed(Status status) { + cleanUp(); + logger.atInfo().log("%s completed with status %s", Transfer.this, status); + if (status.ok()) { + setFutureResult(); + } else { + future.setException(new TransferError(Transfer.this, status)); + } + } + + /** Finishes the transfer due to an exception. No packets are sent to the server. */ + Completed(TransferError exception) { + cleanUp(); + logger.atWarning().withCause(exception).log("%s terminated with exception", Transfer.this); + future.setException(exception); + } + + private void cleanUp() { deadline = NO_TIMEOUT; + eventHandler.unregisterTransfer(Transfer.this); + } + + @Override + public void handleDataChunk(VersionedChunk chunk) { + logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this); } @Override - void handleTermination() {} + public void handleFinalChunk(int statusCode) { + logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this); + } + + @Override + public void handleTimeout() {} + + @Override + public void handleTermination() {} @Override - void handleCancellation() {} + public void handleCancellation() {} } } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java index d6c1d1499..d1d81fdb5 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java @@ -37,6 +37,8 @@ public class TransferClient { private final TransferEventHandler transferEventHandler; private final Thread transferEventHandlerThread; + private ProtocolVersion desiredProtocolVersion = ProtocolVersion.LEGACY; + /** * Creates a new transfer client for sending and receiving data with pw_transfer. * @@ -47,6 +49,25 @@ public class TransferClient { * @param initialTransferTimeoutMillis How long to wait for the initial communication from the * server. If the server delays longer than this, retry up to maxRetries times. * @param maxRetries How many times to retry if a communication times out. + */ + public TransferClient(MethodClient readMethod, + MethodClient writeMethod, + int transferTimeoutMillis, + int initialTransferTimeoutMillis, + int maxRetries) { + this(readMethod, + writeMethod, + transferTimeoutMillis, + initialTransferTimeoutMillis, + maxRetries, + () + -> false, // Never abort + TransferEventHandler::run); + } + + /** + * Creates a new transfer client with a callback that can be used to terminate transfers. + * * @param shouldAbortCallback BooleanSupplier that returns true if a transfer should be aborted. */ public TransferClient(MethodClient readMethod, @@ -55,13 +76,30 @@ public class TransferClient { int initialTransferTimeoutMillis, int maxRetries, BooleanSupplier shouldAbortCallback) { + this(readMethod, + writeMethod, + transferTimeoutMillis, + initialTransferTimeoutMillis, + maxRetries, + shouldAbortCallback, + TransferEventHandler::run); + } + + /** Constructor exposed to package for test use only. */ + TransferClient(MethodClient readMethod, + MethodClient writeMethod, + int transferTimeoutMillis, + int initialTransferTimeoutMillis, + int maxRetries, + BooleanSupplier shouldAbortCallback, + Consumer<TransferEventHandler> runFunction) { this.transferTimeoutMillis = transferTimeoutMillis; this.initialTransferTimeoutMillis = initialTransferTimeoutMillis; this.maxRetries = maxRetries; this.shouldAbortCallback = shouldAbortCallback; transferEventHandler = new TransferEventHandler(readMethod, writeMethod); - transferEventHandlerThread = new Thread(transferEventHandler::run); + transferEventHandlerThread = new Thread(() -> runFunction.accept(transferEventHandler)); transferEventHandlerThread.start(); } @@ -81,6 +119,7 @@ public class TransferClient { public ListenableFuture<Void> write( int resourceId, byte[] data, Consumer<TransferProgress> progressCallback) { return transferEventHandler.startWriteTransferAsClient(resourceId, + desiredProtocolVersion, transferTimeoutMillis, initialTransferTimeoutMillis, maxRetries, @@ -111,6 +150,7 @@ public class TransferClient { public ListenableFuture<byte[]> read( int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback) { return transferEventHandler.startReadTransferAsClient(resourceId, + desiredProtocolVersion, transferTimeoutMillis, initialTransferTimeoutMillis, maxRetries, @@ -119,6 +159,20 @@ public class TransferClient { shouldAbortCallback); } + /** + * Sets the protocol version to request for future transfers + * + * Does not affect ongoing transfers. Version cannot be set to UNKNOWN! + * + * @throws IllegalArgumentException if the protocol version is UNKNOWN + */ + public void setProtocolVersion(ProtocolVersion version) { + if (version == ProtocolVersion.UNKNOWN) { + throw new IllegalArgumentException("Cannot set protocol version to UNKNOWN!"); + } + desiredProtocolVersion = version; + } + /** Stops the background thread and waits until it terminates. */ public void close() throws InterruptedException { transferEventHandler.stop(); diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java index fe448445c..c059c05c7 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferError.java @@ -26,9 +26,8 @@ public class TransferError extends Exception { error = Status.UNKNOWN; } - TransferError(int id, Status error) { - this(String.format(Locale.ENGLISH, "Transfer %d failed with status %s", id, error.name()), - error); + TransferError(Transfer<?> transfer, Status error) { + this(String.format(Locale.ENGLISH, "%s failed with status %s", transfer, error.name()), error); } TransferError(String msg, Status error) { diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java index 35bba8886..e8d045812 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java @@ -52,8 +52,9 @@ class TransferEventHandler { private final BlockingQueue<Event> events = new LinkedBlockingQueue<>(); - // Maps resource ID to transfer - private final Map<Integer, Transfer<?>> transfers = new HashMap<>(); + // Map resource ID to transfer, and session ID to resource ID. + private final Map<Integer, Transfer<?>> resourceIdToTransfer = new HashMap<>(); + private final Map<Integer, Integer> sessionToResourceId = new HashMap<>(); @Nullable private Call.ClientStreaming<Chunk> readStream = null; @Nullable private Call.ClientStreaming<Chunk> writeStream = null; @@ -65,6 +66,7 @@ class TransferEventHandler { } ListenableFuture<Void> startWriteTransferAsClient(int resourceId, + ProtocolVersion desiredProtocolVersion, int transferTimeoutMillis, int initialTransferTimeoutMillis, int maxRetries, @@ -72,6 +74,7 @@ class TransferEventHandler { Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback) { WriteTransfer transfer = new WriteTransfer(resourceId, + desiredProtocolVersion, new TransferInterface() { @Override Call.ClientStreaming<Chunk> getStream() throws ChannelOutputException { @@ -97,6 +100,7 @@ class TransferEventHandler { } ListenableFuture<byte[]> startReadTransferAsClient(int resourceId, + ProtocolVersion desiredProtocolVersion, int transferTimeoutMillis, int initialTransferTimeoutMillis, int maxRetries, @@ -104,6 +108,7 @@ class TransferEventHandler { Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback) { ReadTransfer transfer = new ReadTransfer(resourceId, + desiredProtocolVersion, new TransferInterface() { @Override Call.ClientStreaming<Chunk> getStream() throws ChannelOutputException { @@ -130,12 +135,14 @@ class TransferEventHandler { private void startTransferAsClient(Transfer<?> transfer) { enqueueEvent(() -> { - if (transfers.put(transfer.getResourceId(), transfer) != null) { - transfer.cleanUp(new TransferError("A transfer for resource ID " + transfer.getResourceId() + if (resourceIdToTransfer.containsKey(transfer.getResourceId())) { + transfer.terminate(new TransferError("A transfer for resource ID " + + transfer.getResourceId() + " is already in progress! Only one read/write transfer per resource is supported at a time", Status.ALREADY_EXISTS)); return; } + resourceIdToTransfer.put(transfer.getResourceId(), transfer); transfer.start(); }); } @@ -144,14 +151,34 @@ class TransferEventHandler { void run() { while (processEvents) { handleNextEvent(); + handleTimeouts(); } } + /** + * Test version of run() that processes all enqueued events before checking for timeouts. + * + * Tests that need to time out should process all enqueued events first to prevent flaky failures. + * If handling one of several queued packets takes longer than the timeout (which must be short + * for a unit test), then the test may fail spuriously. + * + * This run function is not used outside of tests because processing all incoming packets before + * checking for timeouts could delay the transfer client's outgoing write packets if there are + * lots of inbound packets. This could delay transfers and cause unnecessary timeouts. + */ + void runForTestsThatMustTimeOut() { + while (processEvents) { + while (!events.isEmpty()) { + handleNextEvent(); + } + handleTimeouts(); + } + } /** Stops the transfer event handler from processing events. */ void stop() { enqueueEvent(() -> { logger.atFine().log("Terminating TransferEventHandler"); - transfers.values().forEach(Transfer::handleTermination); + resourceIdToTransfer.values().forEach(Transfer::handleTermination); processEvents = false; }); } @@ -186,17 +213,19 @@ class TransferEventHandler { event.handle(); } } catch (InterruptedException e) { - // If interrupted, check for timeouts anyway. + // If interrupted, continue around the loop. } + } - for (Transfer<?> transfer : transfers.values()) { + private void handleTimeouts() { + for (Transfer<?> transfer : resourceIdToTransfer.values()) { transfer.handleTimeoutIfDeadlineExceeded(); } } private Instant getNextTimeout() { Optional<Transfer<?>> transfer = - transfers.values().stream().min(Comparator.comparing(Transfer::getDeadline)); + resourceIdToTransfer.values().stream().min(Comparator.comparing(Transfer::getDeadline)); return transfer.isPresent() ? transfer.get().getDeadline() : Transfer.NO_TIMEOUT; } @@ -207,7 +236,7 @@ class TransferEventHandler { /** * Sends the provided transfer chunk. * - * Must be called on the transfer therad. + * Must be called on the transfer thread. */ void sendChunk(Chunk chunk) throws TransferError { try { @@ -218,12 +247,22 @@ class TransferEventHandler { } /** + * Associates the transfer's session ID with its resource ID. + * + * Must be called on the transfer thread. + */ + void assignSessionId(Transfer<?> transfer) { + sessionToResourceId.put(transfer.getSessionId(), transfer.getResourceId()); + } + + /** * Removes this transfer from the list of active transfers. * - * Must be called on the transfer therad. + * Must be called on the transfer thread. */ - void unregisterTransfer(int sessionId) { - transfers.remove(sessionId); + void unregisterTransfer(Transfer<?> transfer) { + resourceIdToTransfer.remove(transfer.getResourceId()); + sessionToResourceId.remove(transfer.getSessionId()); } /** @@ -246,13 +285,21 @@ class TransferEventHandler { VersionedChunk chunk = VersionedChunk.fromMessage(chunkProto); enqueueEvent(() -> { - Transfer<?> transfer = transfers.get(chunk.sessionId()); + Transfer<?> transfer = null; + if (chunk.resourceId().isPresent()) { + transfer = resourceIdToTransfer.get(chunk.resourceId().getAsInt()); + } else { + Integer resourceId = sessionToResourceId.get(chunk.sessionId()); + if (resourceId != null) { + transfer = resourceIdToTransfer.get(resourceId); + } + } + if (transfer != null) { - logger.atFinest().log( - "Transfer %d received chunk: %s", transfer.getId(), chunkToString(chunkProto)); + logger.atFinest().log("%s received chunk: %s", transfer, chunk); transfer.handleChunk(chunk); } else { - logger.atWarning().log("Ignoring unrecognized transfer session ID %d", chunk.sessionId()); + logger.atInfo().log("Ignoring unrecognized transfer chunk: %s", chunk); } }); } @@ -268,7 +315,7 @@ class TransferEventHandler { resetStream(); // The transfers remove themselves from the Map during cleanup, iterate over a copied list. - List<Transfer<?>> activeTransfers = new ArrayList<>(transfers.values()); + List<Transfer<?>> activeTransfers = new ArrayList<>(resourceIdToTransfer.values()); // FAILED_PRECONDITION indicates that the stream packet was not recognized as the stream is // not open. This could occur if the server resets. Notify pending transfers that this has @@ -278,7 +325,7 @@ class TransferEventHandler { } else { TransferError error = new TransferError( "Transfer stream RPC closed unexpectedly with status " + status, Status.INTERNAL); - activeTransfers.forEach(t -> t.cleanUp(error)); + activeTransfers.forEach(t -> t.terminate(error)); } }); } @@ -286,42 +333,6 @@ class TransferEventHandler { abstract void resetStream(); } - private static String chunkToString(Chunk chunk) { - StringBuilder str = new StringBuilder(); - str.append("transferId:").append(chunk.getTransferId()).append(" "); - str.append("sessionId:").append(chunk.getSessionId()).append(" "); - str.append("resourceId:").append(chunk.getResourceId()).append(" "); - str.append("windowEndOffset:").append(chunk.getWindowEndOffset()).append(" "); - str.append("offset:").append(chunk.getOffset()).append(" "); - // Don't include the actual data; it's too much. - str.append("len(data):").append(chunk.getData().size()).append(" "); - if (chunk.hasPendingBytes()) { - str.append("pendingBytes:").append(chunk.getPendingBytes()).append(" "); - } - if (chunk.hasMaxChunkSizeBytes()) { - str.append("maxChunkSizeBytes:").append(chunk.getMaxChunkSizeBytes()).append(" "); - } - if (chunk.hasMinDelayMicroseconds()) { - str.append("minDelayMicroseconds:").append(chunk.getMinDelayMicroseconds()).append(" "); - } - if (chunk.hasRemainingBytes()) { - str.append("remainingBytes:").append(chunk.getRemainingBytes()).append(" "); - } - if (chunk.hasStatus()) { - str.append("status:").append(chunk.getStatus()).append(" "); - } - if (chunk.hasType()) { - str.append("type:").append(chunk.getTypeValue()).append(" "); - } - if (chunk.hasResourceId()) { - str.append("resourceId:").append(chunk.getSessionId()).append(" "); - } - if (chunk.hasSessionId()) { - str.append("sessionId:").append(chunk.getSessionId()).append(" "); - } - return str.toString(); - } - // Represents an event that occurs during a transfer private interface Event { void handle(); diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java index c4c520ac8..2a253382c 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java @@ -119,13 +119,18 @@ abstract class VersionedChunk { builder.setType(Chunk.Type.PARAMETERS_RETRANSMIT); } - if (version == ProtocolVersion.LEGACY && chunk.hasStatus()) { - builder.setType(Chunk.Type.COMPLETION); + // For legacy chunks, use the transfer ID as both the resource and session IDs. + if (version == ProtocolVersion.LEGACY) { + builder.setSessionId(chunk.getTransferId()); + builder.setResourceId(chunk.getTransferId()); + if (chunk.hasStatus()) { + builder.setType(Chunk.Type.COMPLETION); + } + } else { + builder.setSessionId(chunk.getSessionId()); } - builder.setSessionId(chunk.hasSessionId() ? chunk.getSessionId() : chunk.getTransferId()) - .setOffset((int) chunk.getOffset()) - .setData(chunk.getData()); + builder.setOffset((int) chunk.getOffset()).setData(chunk.getData()); if (chunk.hasResourceId()) { builder.setResourceId(chunk.getResourceId()); @@ -162,7 +167,6 @@ abstract class VersionedChunk { public Chunk toMessage() { Chunk.Builder chunk = Chunk.newBuilder() .setType(type()) - .setSessionId(sessionId()) .setOffset(offset()) .setWindowEndOffset(windowEndOffset()) .setData(data()); @@ -173,19 +177,19 @@ abstract class VersionedChunk { minDelayMicroseconds().ifPresent(chunk::setMinDelayMicroseconds); status().ifPresent(chunk::setStatus); + // session_id did not exist in the legacy protocol, so don't send it. + if (version() != ProtocolVersion.LEGACY && sessionId() != UNASSIGNED_SESSION_ID) { + chunk.setSessionId(sessionId()); + } + if (shouldEncodeLegacyFields()) { - chunk.setTransferId(resourceId().orElse(chunk.getSessionId())); + chunk.setTransferId(resourceId().orElse(sessionId())); if (chunk.getWindowEndOffset() != 0) { chunk.setPendingBytes(chunk.getWindowEndOffset() - offset()); } } - if (version() == ProtocolVersion.LEGACY) { - // session_id did not exist in the legacy protocol, so don't send it. - chunk.clearSessionId(); - } - if (isInitialHandshakeChunk()) { chunk.setProtocolVersion(version().ordinal()); } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java index d1386c0b2..fc16bb6d9 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java @@ -38,6 +38,7 @@ class WriteTransfer extends Transfer<Void> { private final byte[] data; protected WriteTransfer(int resourceId, + ProtocolVersion desiredProtocolVersion, TransferInterface transferManager, int timeoutMillis, int initialTimeoutMillis, @@ -46,6 +47,7 @@ class WriteTransfer extends Transfer<Void> { Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback) { super(resourceId, + desiredProtocolVersion, transferManager, timeoutMillis, initialTimeoutMillis, @@ -56,10 +58,8 @@ class WriteTransfer extends Transfer<Void> { } @Override - VersionedChunk getInitialChunk(ProtocolVersion desiredProcotolVersion) { - return VersionedChunk.createInitialChunk(desiredProcotolVersion, getResourceId()) - .setRemainingBytes(data.length) - .build(); + void prepareInitialChunk(VersionedChunk.Builder chunk) { + chunk.setRemainingBytes(data.length); } @Override @@ -67,22 +67,15 @@ class WriteTransfer extends Transfer<Void> { return new WaitingForTransferParameters(); } - private class WaitingForTransferParameters extends State { + private class WaitingForTransferParameters extends ActiveState { @Override - void handleTimeout() { - Recovery recoveryState = new Recovery(); - setState(recoveryState); - recoveryState.handleTimeout(); - } - - @Override - void handleDataChunk(VersionedChunk chunk) { + public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { updateTransferParameters(chunk); } } /** Transmitting a transfer window. */ - private class Transmitting extends State { + private class Transmitting extends ActiveState { private final int windowStartOffset; private final int windowEndOffset; @@ -92,26 +85,23 @@ class WriteTransfer extends Transfer<Void> { } @Override - void handleDataChunk(VersionedChunk chunk) { + public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { updateTransferParameters(chunk); } @Override - void handleTimeout() { + public void handleTimeout() throws TransferAbortedException { ByteString chunkData = ByteString.copyFrom( data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes)); - logger.atFiner().log("Transfer %d: sending bytes %d-%d (%d B chunk, max size %d B)", - getId(), + logger.atFiner().log("%s sending bytes %d-%d (%d B chunk, max size %d B)", + WriteTransfer.this, sentOffset, sentOffset + chunkData.size() - 1, chunkData.size(), maxChunkSizeBytes); - if (!sendChunk(buildDataChunk(chunkData))) { - setState(new Completed()); - return; - } + sendChunk(buildDataChunk(chunkData)); sentOffset += chunkData.size(); updateProgress(sentOffset, windowStartOffset, data.length); @@ -121,7 +111,7 @@ class WriteTransfer extends Transfer<Void> { return; // Keep transmitting packets } setNextChunkTimeout(); - setState(new WaitingForTransferParameters()); + changeState(new WaitingForTransferParameters()); } } @@ -139,15 +129,11 @@ class WriteTransfer extends Transfer<Void> { getFuture().set(null); } - private void updateTransferParameters(VersionedChunk chunk) { - logger.atFiner().log("Transfer %d received new chunk (type=%s, offset=%d, windowEndOffset=%d)", - getId(), - chunk.type(), - chunk.offset(), - chunk.windowEndOffset()); + private void updateTransferParameters(VersionedChunk chunk) throws TransferAbortedException { + logger.atFiner().log("%s received new chunk %s", this, chunk); if (chunk.offset() > data.length) { - setStateCompletedAndSendFinalChunk(Status.OUT_OF_RANGE); + setStateTerminatingAndSendFinalChunk(Status.OUT_OF_RANGE); return; } @@ -156,15 +142,15 @@ class WriteTransfer extends Transfer<Void> { long droppedBytes = sentOffset - chunk.offset(); if (droppedBytes > 0) { totalDroppedBytes += droppedBytes; - logger.atFine().log("Transfer %d retransmitting %d B (%d retransmitted of %d sent)", - getId(), + logger.atFine().log("%s retransmitting %d B (%d retransmitted of %d sent)", + this, droppedBytes, totalDroppedBytes, sentOffset); } sentOffset = chunk.offset(); } else if (windowEndOffset <= sentOffset) { - logger.atFiner().log("Transfer %d: ignoring old rolling window packet", getId()); + logger.atFiner().log("%s ignoring old rolling window packet", this); setNextChunkTimeout(); return; // Received an old rolling window packet, ignore it. } @@ -179,17 +165,16 @@ class WriteTransfer extends Transfer<Void> { if (maxChunkSizeBytes == 0) { if (windowEndOffset == sentOffset) { - logger.atWarning().log("Server requested 0 bytes in write transfer %d; aborting", getId()); - setStateCompletedAndSendFinalChunk(Status.INVALID_ARGUMENT); + logger.atWarning().log("%s server requested 0 bytes; aborting", this); + setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT); return; } // Default to sending the entire window if the max chunk size is not specified (or is 0). maxChunkSizeBytes = windowEndOffset - sentOffset; } - Transmitting transmittingState = new Transmitting(chunk.offset(), windowEndOffset); - setState(transmittingState); - transmittingState.handleTimeout(); // Immediately send the first packet + // Enter the transmitting state and immediately send the first packet + changeState(new Transmitting(chunk.offset(), windowEndOffset)).handleTimeout(); } private VersionedChunk buildDataChunk(ByteString chunkData) { @@ -198,7 +183,7 @@ class WriteTransfer extends Transfer<Void> { // If this is the last data chunk, setRemainingBytes to 0. if (sentOffset + chunkData.size() == data.length) { - logger.atFiner().log("Transfer %d sending final chunk with %d B", getId(), chunkData.size()); + logger.atFiner().log("%s sending final chunk with %d B", this, chunkData.size()); chunk.setRemainingBytes(0); } return chunk.build(); diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel b/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel index a4b2955a0..1a0f8110a 100644 --- a/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel +++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel @@ -20,7 +20,6 @@ java_test( name = "TransferClientTest", size = "small", srcs = ["TransferClientTest.java"], - tags = ["manual"], test_class = "dev.pigweed.pw_transfer.TransferClientTest", visibility = ["//visibility:public"], deps = [ diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java index 78c2bb566..6c19b2939 100644 --- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java +++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java @@ -51,7 +51,6 @@ public final class TransferClientTest { private static final TransferParameters TRANSFER_PARAMETERS = TransferParameters.create(50, 30, 0); private static final int MAX_RETRIES = 2; - private static final int ID = 123; private boolean shouldAbortFlag = false; private TestClient rpcClient; @@ -63,9 +62,6 @@ public final class TransferClientTest { @Before public void setup() { rpcClient = new TestClient(ImmutableList.of(TransferService.get())); - - // Default to a long timeout that should never trigger. - transferClient = createTransferClient(60000, 60000); } @After @@ -78,43 +74,50 @@ public final class TransferClientTest { } @Test - public void read_singleChunk_successful() throws Exception { + public void legacy_read_singleChunk_successful() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(1); assertThat(future.isDone()).isFalse(); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @Test - public void read_failedPreconditionError_retriesInitialPacket() throws Exception { + public void legacy_read_failedPreconditionError_retriesInitialPacket() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); - assertThat(lastChunks()).containsExactly(initialReadChunk(1)); + assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY)); receiveReadServerError(Status.FAILED_PRECONDITION); - assertThat(lastChunks()).containsExactly(initialReadChunk(1)); + assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY)); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @Test - public void read_failedPreconditionError_abortsAfterInitialPacket() { + public void legacy_read_failedPreconditionError_abortsAfterInitialPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); TransferParameters params = TransferParameters.create(50, 50, 0); ListenableFuture<byte[]> future = transferClient.read(1, params); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, params)); + assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY, params)); - receiveReadChunks(dataChunk(1, TEST_DATA_100B, 0, 50)); + receiveReadChunks(legacyDataChunk(1, TEST_DATA_100B, 0, 50)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1) .setOffset(50) .setPendingBytes(50) .setWindowEndOffset(100) @@ -129,14 +132,15 @@ public final class TransferClientTest { } @Test - public void read_failedPreconditionErrorMaxRetriesTimes_aborts() { + public void legacy_read_failedPreconditionErrorMaxRetriesTimes_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); for (int i = 0; i < MAX_RETRIES; ++i) { receiveReadServerError(Status.FAILED_PRECONDITION); } - Chunk initialChunk = initialReadChunk(1); + Chunk initialChunk = initialReadChunk(1, ProtocolVersion.LEGACY); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -150,94 +154,126 @@ public final class TransferClientTest { } @Test - public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { + public void legacy_read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(1); assertThat(future.isDone()).isFalse(); - receiveReadChunks(finalChunk(2, Status.OK), - newChunk(Chunk.Type.DATA, 0).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), - newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); - receiveWriteChunks(finalChunk(1, Status.OK), - newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), - newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); + receiveReadChunks(legacyFinalChunk(2, Status.OK), + newLegacyChunk(Chunk.Type.DATA, 0) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0), + newLegacyChunk(Chunk.Type.DATA, 3) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0)); + receiveWriteChunks(legacyFinalChunk(1, Status.OK), + newLegacyChunk(Chunk.Type.DATA, 1) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0), + newLegacyChunk(Chunk.Type.DATA, 2) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0)); assertThat(future.isDone()).isFalse(); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @Test - public void read_empty() throws Exception { + public void legacy_read_empty() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(2); lastChunks(); // Discard initial chunk (tested elsewhere) - receiveReadChunks(newChunk(Chunk.Type.DATA, 2).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 2).setRemainingBytes(0)); - assertThat(lastChunks()).containsExactly(finalChunk(2, Status.OK)); + assertThat(lastChunks()).containsExactly(legacyFinalChunk(2, Status.OK)); assertThat(future.get()).isEqualTo(new byte[] {}); } @Test - public void read_sendsTransferParametersFirst() { + public void legacy_read_sendsTransferParametersFirst() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); TransferParameters params = TransferParameters.create(3, 2, 1); ListenableFuture<byte[]> future = transferClient.read(99, params); - assertThat(lastChunks()).containsExactly(initialReadChunk(99, params)); + assertThat(lastChunks()).containsExactly(initialReadChunk(99, ProtocolVersion.LEGACY, params)); assertThat(future.cancel(true)).isTrue(); } @Test - public void read_severalChunks() throws Exception { - ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS); + public void legacy_read_severalChunks() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); - assertThat(lastChunks()).containsExactly(initialReadChunk(ID)); + assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY)); - receiveReadChunks( - newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20)).setRemainingBytes(70), - newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 40))); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(0) + .setData(range(0, 20)) + .setRemainingBytes(70), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(40) .setPendingBytes(50) .setMaxChunkSizeBytes(30) .setWindowEndOffset(90) .build()); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(40).setData(range(40, 70))); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(70) .setPendingBytes(50) .setMaxChunkSizeBytes(30) .setWindowEndOffset(120) .build()); - receiveReadChunks( - newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 100)).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(70) + .setData(range(70, 100)) + .setRemainingBytes(0)); - assertThat(lastChunks()).containsExactly(finalChunk(ID, Status.OK)); + assertThat(lastChunks()).containsExactly(legacyFinalChunk(123, Status.OK)); assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); } @Test - public void read_progressCallbackIsCalled() { + public void legacy_read_progressCallbackIsCalled() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = - transferClient.read(ID, TRANSFER_PARAMETERS, progressCallback); - - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)), - newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)), - newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 60)).setRemainingBytes(5), - newChunk(Chunk.Type.DATA, ID).setOffset(60).setData(range(60, 70)), - newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 80)).setRemainingBytes(20), - newChunk(Chunk.Type.DATA, ID).setOffset(90).setData(range(90, 100)), - newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + transferClient.read(123, TRANSFER_PARAMETERS, progressCallback); + + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)), + newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(50) + .setData(range(50, 60)) + .setRemainingBytes(5), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)), + newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(70) + .setData(range(70, 80)) + .setRemainingBytes(20), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)), + newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -251,72 +287,82 @@ public final class TransferClientTest { } @Test - public void read_rewindWhenPacketsSkipped() throws Exception { - ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS); + public void legacy_read_rewindWhenPacketsSkipped() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); - assertThat(lastChunks()).containsExactly(initialReadChunk(ID)); + assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY)); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(30, 50))); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) .build()); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)), - newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50))); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(30) .setPendingBytes(50) .setWindowEndOffset(80) .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks( - newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(50) .setPendingBytes(50) .setWindowEndOffset(100) .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)), - newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)), + newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(80) .setPendingBytes(50) .setWindowEndOffset(130) .setMaxChunkSizeBytes(30) .build(), - finalChunk(ID, Status.OK)); + legacyFinalChunk(123, Status.OK)); assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); } @Test - public void read_multipleWithSameId_sequentially_successful() throws Exception { + public void legacy_read_multipleWithSameId_sequentially_successful() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); for (int i = 0; i < 3; ++i) { ListenableFuture<byte[]> future = transferClient.read(1); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } } @Test - public void read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { + public void legacy_read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> first = transferClient.read(123); ListenableFuture<byte[]> second = transferClient.read(123); @@ -328,7 +374,8 @@ public final class TransferClientTest { } @Test - public void read_sendErrorOnFirstPacket_fails() { + public void legacy_read_sendErrorOnFirstPacket_fails() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ChannelOutputException exception = new ChannelOutputException("blah"); rpcClient.setChannelOutputException(exception); ListenableFuture<byte[]> future = transferClient.read(123); @@ -339,15 +386,16 @@ public final class TransferClientTest { } @Test - public void read_sendErrorOnLaterPacket_aborts() { - ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS); + public void legacy_read_sendErrorOnLaterPacket_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20))); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20))); ChannelOutputException exception = new ChannelOutputException("blah"); rpcClient.setChannelOutputException(exception); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 50))); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50))); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -355,20 +403,22 @@ public final class TransferClientTest { } @Test - public void read_cancelFuture_abortsTransfer() { - ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS); + public void legacy_read_cancelFuture_abortsTransfer() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); assertThat(future.cancel(true)).isTrue(); - receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50))); - assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED)); + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); + assertThat(lastChunks()).contains(legacyFinalChunk(123, Status.CANCELLED)); } @Test - public void read_protocolError_aborts() { - ListenableFuture<byte[]> future = transferClient.read(ID); + public void legacy_read_transferProtocolError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<byte[]> future = transferClient.read(123); - receiveReadChunks(finalChunk(ID, Status.ALREADY_EXISTS)); + receiveReadChunks(legacyFinalChunk(123, Status.ALREADY_EXISTS)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -377,7 +427,8 @@ public final class TransferClientTest { } @Test - public void read_rpcError_aborts() { + public void legacy_read_rpcError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(2); receiveReadServerError(Status.NOT_FOUND); @@ -387,21 +438,9 @@ public final class TransferClientTest { } @Test - public void read_unknownVersion_invalidArgument() { - ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS); - - receiveReadChunks(newChunk(Chunk.Type.DATA, 2).setProtocolVersion(3)); - - ExecutionException exception = assertThrows(ExecutionException.class, future::get); - assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); - - assertThat(lastChunks()) - .containsExactly(initialReadChunk(2), finalChunk(2, Status.INVALID_ARGUMENT)); - } - @Test - public void read_timeout() { - transferClient = createTransferClient(1, 1); // Create a manager with a very short timeout. - ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS); + public void legacy_read_timeout() { + createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); // Call future.get() without sending any server-side packets. ExecutionException exception = assertThrows(ExecutionException.class, future::get); @@ -409,37 +448,38 @@ public final class TransferClientTest { // read should have retried sending the transfer parameters 2 times, for a total of 3 assertThat(lastChunks()) - .containsExactly(initialReadChunk(ID), - initialReadChunk(ID), - initialReadChunk(ID), - finalChunk(ID, Status.DEADLINE_EXCEEDED)); + .containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY), + initialReadChunk(123, ProtocolVersion.LEGACY), + initialReadChunk(123, ProtocolVersion.LEGACY)); } @Test - public void write_singleChunk() throws Exception { + public void legacy_write_singleChunk() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); assertThat(future.isDone()).isFalse(); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) .setOffset(0) .setPendingBytes(1024) .setMaxChunkSizeBytes(128), - finalChunk(2, Status.OK)); + legacyFinalChunk(2, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @Test - public void write_platformTransferDisabled_aborted() { + public void legacy_write_platformTransferDisabled_aborted() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); assertThat(future.isDone()).isFalse(); shouldAbortFlag = true; - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) .setOffset(0) .setPendingBytes(1024) .setMaxChunkSizeBytes(128), - finalChunk(2, Status.OK)); + legacyFinalChunk(2, Status.OK)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -447,36 +487,40 @@ public final class TransferClientTest { } @Test - public void write_failedPreconditionError_retriesInitialPacket() throws Exception { + public void legacy_write_failedPreconditionError_retriesInitialPacket() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); - assertThat(lastChunks()).containsExactly(initialWriteChunk(2, TEST_DATA_SHORT.size())); + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); receiveWriteServerError(Status.FAILED_PRECONDITION); - assertThat(lastChunks()).containsExactly(initialWriteChunk(2, TEST_DATA_SHORT.size())); + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) .setOffset(0) .setPendingBytes(1024) .setMaxChunkSizeBytes(128), - finalChunk(2, Status.OK)); + legacyFinalChunk(2, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @Test - public void write_failedPreconditionError_abortsAfterInitialPacket() { + public void legacy_write_failedPreconditionError_abortsAfterInitialPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) .setOffset(0) .setPendingBytes(50) .setMaxChunkSizeBytes(50)); assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(2, TEST_DATA_100B.size()), dataChunk(2, TEST_DATA_100B, 0, 50)); + .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + legacyDataChunk(2, TEST_DATA_100B, 0, 50)); receiveWriteServerError(Status.FAILED_PRECONDITION); @@ -486,14 +530,15 @@ public final class TransferClientTest { } @Test - public void write_failedPreconditionErrorMaxRetriesTimes_aborts() { + public void legacy_write_failedPreconditionErrorMaxRetriesTimes_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); for (int i = 0; i < MAX_RETRIES; ++i) { receiveWriteServerError(Status.FAILED_PRECONDITION); } - Chunk initialChunk = initialWriteChunk(2, TEST_DATA_SHORT.size()); + Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -507,47 +552,52 @@ public final class TransferClientTest { } @Test - public void write_empty() throws Exception { + public void legacy_write_empty() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, new byte[] {}); assertThat(future.isDone()).isFalse(); - receiveWriteChunks(finalChunk(2, Status.OK)); + receiveWriteChunks(legacyFinalChunk(2, Status.OK)); - assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 0)); + assertThat(lastChunks()).containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, 0)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @Test - public void write_severalChunks() throws Exception { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + public void legacy_write_severalChunks() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size())); + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) .setPendingBytes(50) .setMaxChunkSizeBytes(30) .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build()); + .containsExactly( + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(50) .setPendingBytes(40) .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 75)).build(), - newChunk(Chunk.Type.DATA, ID).setOffset(75).setData(range(75, 90)).build()); + .containsExactly( + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build()); receiveWriteChunks( - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(90).setPendingBytes(50)); + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setPendingBytes(50)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID) + .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123) .setOffset(90) .setData(range(90, 100)) .setRemainingBytes(0) @@ -555,18 +605,20 @@ public final class TransferClientTest { assertThat(future.isDone()).isFalse(); - receiveWriteChunks(finalChunk(ID, Status.OK)); + receiveWriteChunks(legacyFinalChunk(123, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @Test - public void write_parametersContinue() throws Exception { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + public void legacy_write_parametersContinue() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size())); + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) .setPendingBytes(50) .setWindowEndOffset(50) @@ -574,10 +626,11 @@ public final class TransferClientTest { .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build()); + .containsExactly( + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(30) .setPendingBytes(50) .setWindowEndOffset(80)); @@ -585,15 +638,15 @@ public final class TransferClientTest { // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. assertThat(lastChunks()) .containsExactly( - newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)).build()); + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(80) .setPendingBytes(50) .setWindowEndOffset(130)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID) + .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123) .setOffset(80) .setData(range(80, 100)) .setRemainingBytes(0) @@ -601,18 +654,20 @@ public final class TransferClientTest { assertThat(future.isDone()).isFalse(); - receiveWriteChunks(finalChunk(ID, Status.OK)); + receiveWriteChunks(legacyFinalChunk(123, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @Test - public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + public void legacy_write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size())); + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) .setPendingBytes(90) .setWindowEndOffset(90) @@ -620,40 +675,42 @@ public final class TransferClientTest { .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 90)).build()); + .containsExactly( + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 90)).build()); receiveWriteChunks( // This stale packet with a window end before the offset should be ignored. - newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) .setOffset(25) .setPendingBytes(25) .setWindowEndOffset(50), // Start from an arbitrary offset before the current, but extend the window to the end. - newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID).setOffset(80).setWindowEndOffset(100)); + newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(100)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, ID) + .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123) .setOffset(90) .setData(range(90, 100)) .setRemainingBytes(0) .build()); - receiveWriteChunks(finalChunk(ID, Status.OK)); + receiveWriteChunks(legacyFinalChunk(123, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @Test - public void write_progressCallbackIsCalled() { + public void legacy_write_progressCallbackIsCalled() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = - transferClient.write(ID, TEST_DATA_100B.toByteArray(), progressCallback); + transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) .setPendingBytes(90) .setMaxChunkSizeBytes(30), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(50).setPendingBytes(50), - finalChunk(ID, Status.OK)); + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setPendingBytes(50), + legacyFinalChunk(123, Status.OK)); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -667,28 +724,1039 @@ public final class TransferClientTest { } @Test - public void write_asksForFinalOffset_sendsFinalPacket() { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + public void legacy_write_asksForFinalOffset_sendsFinalPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(100) .setPendingBytes(40) .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly(initialWriteChunk(ID, TEST_DATA_100B.size()), - newChunk(Chunk.Type.DATA, ID).setOffset(100).setRemainingBytes(0).build()); + .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + newLegacyChunk(Chunk.Type.DATA, 123).setOffset(100).setRemainingBytes(0).build()); + assertThat(future.isDone()).isFalse(); + } + + @Test + public void legacy_write_multipleWithSameId_sequentially_successful() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + for (int i = 0; i < 3; ++i) { + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + receiveWriteChunks( + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setPendingBytes(50), + legacyFinalChunk(123, Status.OK)); + + future.get(); + } + } + + @Test + public void legacy_write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + assertThat(first.isDone()).isFalse(); + + ExecutionException thrown = assertThrows(ExecutionException.class, second::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); + } + + @Test + public void legacy_write_sendErrorOnFirstPacket_failsImmediately() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ChannelOutputException exception = new ChannelOutputException("blah"); + rpcClient.setChannelOutputException(exception); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); + } + + @Test + public void legacy_write_serviceRequestsNoData_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0)); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); + } + + @Test + public void legacy_write_invalidOffset_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(101) + .setPendingBytes(40) + .setMaxChunkSizeBytes(25)); + + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + legacyFinalChunk(123, Status.OUT_OF_RANGE)); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE); + } + + @Test + public void legacy_write_sendErrorOnLaterPacket_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + ChannelOutputException exception = new ChannelOutputException("blah"); + rpcClient.setChannelOutputException(exception); + + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setPendingBytes(50) + .setMaxChunkSizeBytes(30)); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); + } + + @Test + public void legacy_write_cancelFuture_abortsTransfer() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + + assertThat(future.cancel(true)).isTrue(); + + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setPendingBytes(50) + .setMaxChunkSizeBytes(50)); + assertThat(lastChunks()).contains(legacyFinalChunk(123, Status.CANCELLED)); + } + + @Test + public void legacy_write_transferProtocolError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + receiveWriteChunks(legacyFinalChunk(123, Status.NOT_FOUND)); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND); + } + + @Test + public void legacy_write_rpcError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + + receiveWriteServerError(Status.NOT_FOUND); + + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void legacy_write_timeoutAfterInitialChunk() { + createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + // Call future.get() without sending any server-side packets. + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); + + // Client should have resent the last chunk (the initial chunk in this case) for each timeout. + assertThat(lastChunks()) + .containsExactly( + initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial + initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // retry 1 + initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); // retry 2 + } + + @Test + public void legacy_write_timeoutAfterSingleChunk() { + createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); + + // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. + enqueueWriteChunks(2, + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setPendingBytes(90) + .setMaxChunkSizeBytes(30)); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); + + Chunk data = newLegacyChunk(Chunk.Type.DATA, 123) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0) + .build(); + assertThat(lastChunks()) + .containsExactly( + initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial + data, // data chunk + data, // retry 1 + data); // retry 2 + } + + @Test + public void legacy_write_multipleTimeoutsAndRecoveries() throws Exception { + createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); + + // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. + enqueueWriteChunks(2, + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setWindowEndOffset(40) + .setMaxChunkSizeBytes(20)); + + // After the second retry, send more transfer parameters + enqueueWriteChunks(4, + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(40) + .setWindowEndOffset(120) + .setMaxChunkSizeBytes(40)); + + // After the first retry, send more transfer parameters + enqueueWriteChunks(3, + newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(80) + .setWindowEndOffset(160) + .setMaxChunkSizeBytes(10)); + + // After the second retry, confirm completed + enqueueWriteChunks( + 4, newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setStatus(Status.OK.code())); + + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + + assertThat(lastChunks()) + .containsExactly( + // initial + initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + // after 2, receive parameters: 40 from 0 by 20 + legacyDataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 + legacyDataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 + legacyDataChunk(123, TEST_DATA_100B, 20, 40), // retry 1 + legacyDataChunk(123, TEST_DATA_100B, 20, 40), // retry 2 + // after 4, receive parameters: 80 from 40 by 40 + legacyDataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80 + legacyDataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 + legacyDataChunk(123, TEST_DATA_100B, 80, 100), // retry 1 + // after 3, receive parameters: 80 from 80 by 10 + legacyDataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90 + legacyDataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100 + legacyDataChunk(123, TEST_DATA_100B, 90, 100), // retry 1 + legacyDataChunk(123, TEST_DATA_100B, 90, 100)); // retry 2 + // after 4, receive final OK + } + + @Test + public void read_singleChunk_successful() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(3, TRANSFER_PARAMETERS); + assertThat(future.isDone()).isFalse(); + + assertThat(lastChunks()).containsExactly(initialReadChunk(3, ProtocolVersion.VERSION_TWO)); + + receiveReadChunks(newChunk(Chunk.Type.START_ACK, 321) + .setResourceId(3) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()).containsExactly(readStartAckConfirmation(321, TRANSFER_PARAMETERS)); + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 321).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + + assertThat(lastChunks()) + .containsExactly(Chunk.newBuilder() + .setType(Chunk.Type.COMPLETION) + .setSessionId(321) + .setStatus(Status.OK.ordinal()) + .build()); + + assertThat(future.isDone()).isFalse(); + + receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, 321)); + + assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); + } + + @Test + public void read_requestV2ReceiveLegacy() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + assertThat(future.isDone()).isFalse(); + + assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); + + receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); + + // No handshake packets since the server responded as legacy. + assertThat(lastChunks()).containsExactly(legacyFinalChunk(1, Status.OK)); + + assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); + } + + @Test + public void read_failedPreconditionError_retriesInitialPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + + assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); + for (int i = 0; i < MAX_RETRIES; ++i) { + receiveReadServerError(Status.FAILED_PRECONDITION); + + assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); + } + + receiveReadChunks(newChunk(Chunk.Type.START_ACK, 54321) + .setResourceId(1) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()).containsExactly(readStartAckConfirmation(54321, TRANSFER_PARAMETERS)); + } + + @Test + public void read_failedPreconditionError_abortsAfterInitial() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + TransferParameters params = TransferParameters.create(50, 50, 0); + ListenableFuture<byte[]> future = transferClient.read(1, params); + + assertThat(lastChunks()) + .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params)); + + receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555) + .setResourceId(1) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + receiveReadServerError(Status.FAILED_PRECONDITION); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void read_failedPreconditionError_abortsAfterHandshake() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + TransferParameters params = TransferParameters.create(50, 50, 0); + ListenableFuture<byte[]> future = transferClient.read(1, params); + + assertThat(lastChunks()) + .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params)); + + receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555) + .setResourceId(1) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()).containsExactly(readStartAckConfirmation(555, params)); + + receiveReadChunks(dataChunk(555, TEST_DATA_100B, 0, 50)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555) + .setOffset(50) + .setWindowEndOffset(100) + .setMaxChunkSizeBytes(50) + .build()); + + receiveReadServerError(Status.FAILED_PRECONDITION); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void read_failedPreconditionErrorMaxRetriesTimes_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + + for (int i = 0; i < MAX_RETRIES; ++i) { + receiveReadServerError(Status.FAILED_PRECONDITION); + } + + Chunk initialChunk = initialReadChunk(1, ProtocolVersion.VERSION_TWO); + assertThat(lastChunks()) + .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); + + receiveReadServerError(Status.FAILED_PRECONDITION); + + assertThat(lastChunks()).isEmpty(); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(1); + assertThat(future.isDone()).isFalse(); + + performReadStartHandshake(1, 99); + + receiveReadChunks(finalChunk(2, Status.OK), + newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), + newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); + receiveWriteChunks(finalChunk(99, Status.INVALID_ARGUMENT), + newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), + newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); + assertThat(future.isDone()).isFalse(); + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + + performReadCompletionHandshake(99, Status.OK); + + assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); + } + + @Test + public void read_empty() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(2); + performReadStartHandshake(2, 5678); + receiveReadChunks(newChunk(Chunk.Type.DATA, 5678).setRemainingBytes(0)); + + performReadCompletionHandshake(5678, Status.OK); + + assertThat(future.get()).isEqualTo(new byte[] {}); + } + + @Test + public void read_sendsTransferParametersFirst() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + TransferParameters params = TransferParameters.create(3, 2, 1); + ListenableFuture<byte[]> future = transferClient.read(99, params); + + assertThat(lastChunks()) + .containsExactly(initialReadChunk(99, ProtocolVersion.VERSION_TWO, params)); + assertThat(future.cancel(true)).isTrue(); + } + + @Test + public void read_severalChunks() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(7, TRANSFER_PARAMETERS); + + performReadStartHandshake(7, 123, TRANSFER_PARAMETERS); + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)).setRemainingBytes(70), + newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40))); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .setOffset(40) + .setMaxChunkSizeBytes(30) + .setWindowEndOffset(90) + .build()); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70))); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .setOffset(70) + .setMaxChunkSizeBytes(30) + .setWindowEndOffset(120) + .build()); + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 100)).setRemainingBytes(0)); + + performReadCompletionHandshake(123, Status.OK); + + assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); + } + + @Test + public void read_progressCallbackIsCalled() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = + transferClient.read(123, TRANSFER_PARAMETERS, progressCallback); + + performReadStartHandshake(123, 123, TRANSFER_PARAMETERS); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)), + newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 60)).setRemainingBytes(5), + newChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)), + newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 80)).setRemainingBytes(20), + newChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)), + newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30))); + lastChunks(); // Discard chunks; no need to inspect for this test + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + performReadCompletionHandshake(123, Status.OK); + + verify(progressCallback, times(6)).accept(progress.capture()); + assertThat(progress.getAllValues()) + .containsExactly(TransferProgress.create(30, 30, TransferProgress.UNKNOWN_TRANSFER_SIZE), + TransferProgress.create(50, 50, TransferProgress.UNKNOWN_TRANSFER_SIZE), + TransferProgress.create(60, 60, 65), + TransferProgress.create(70, 70, TransferProgress.UNKNOWN_TRANSFER_SIZE), + TransferProgress.create(80, 80, 100), + TransferProgress.create(100, 100, 100)); + + assertThat(future.isDone()).isTrue(); + } + + @Test + public void read_rewindWhenPacketsSkipped() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); + + performReadStartHandshake(123, 123, TRANSFER_PARAMETERS); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50))); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setWindowEndOffset(50) + .setMaxChunkSizeBytes(30) + .setOffset(0) + .build()); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .setOffset(30) + .setWindowEndOffset(80) + .setMaxChunkSizeBytes(30) + .build()); + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(50) + .setWindowEndOffset(100) + .setMaxChunkSizeBytes(30) + .build()); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80))); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .setOffset(80) + .setWindowEndOffset(130) + .setMaxChunkSizeBytes(30) + .build()); + + receiveReadChunks( + newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + + performReadCompletionHandshake(123, Status.OK); + + assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); + } + + @Test + public void read_multipleWithSameId_sequentially_successful() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + for (int i = 0; i < 3; ++i) { + ListenableFuture<byte[]> future = transferClient.read(1); + + performReadStartHandshake(1, 100 + i); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 100 + i) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); + + performReadCompletionHandshake(100 + i, Status.OK); + + assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); + } + } + + @Test + public void read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> first = transferClient.read(123); + ListenableFuture<byte[]> second = transferClient.read(123); + + assertThat(first.isDone()).isFalse(); + + ExecutionException thrown = assertThrows(ExecutionException.class, second::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); + } + + @Test + public void read_sendErrorOnFirstPacket_fails() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ChannelOutputException exception = new ChannelOutputException("blah"); + rpcClient.setChannelOutputException(exception); + ListenableFuture<byte[]> future = transferClient.read(123); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); + } + + @Test + public void read_sendErrorOnLaterPacket_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(1024, TRANSFER_PARAMETERS); + + performReadStartHandshake(1024, 123, TRANSFER_PARAMETERS); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20))); + + ChannelOutputException exception = new ChannelOutputException("blah"); + rpcClient.setChannelOutputException(exception); + + receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50))); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); + } + + @Test + public void read_cancelFuture_abortsTransfer() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + + performReadStartHandshake(1, 123, TRANSFER_PARAMETERS); + + assertThat(future.cancel(true)).isTrue(); + + assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED)); + } + + @Test + public void read_immediateTransferProtocolError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(123); + + // Resource ID will be set since session ID hasn't been assigned yet. + receiveReadChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID) + .setResourceId(123) + .setStatus(Status.ALREADY_EXISTS.ordinal())); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); + } + + @Test + public void read_laterTransferProtocolError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(123); + + performReadStartHandshake(123, 514); + + receiveReadChunks(finalChunk(514, Status.ALREADY_EXISTS)); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); + } + + @Test + public void read_rpcError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(2); + + receiveReadServerError(Status.NOT_FOUND); + + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void read_serverRespondsWithUnknownVersion_invalidArgument() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS); + + assertThat(lastChunks()) + .containsExactly(initialReadChunk(2, ProtocolVersion.VERSION_TWO, TRANSFER_PARAMETERS)); + + receiveReadChunks( + newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613)); + + assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT)); + + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); + } + + @Test + public void read_timeout() { + createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); + + // Call future.get() without sending any server-side packets. + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); + + // read should have retried sending the transfer parameters 2 times, for a total of 3 + assertThat(lastChunks()) + .containsExactly(initialReadChunk(123, ProtocolVersion.VERSION_TWO), + initialReadChunk(123, ProtocolVersion.VERSION_TWO), + initialReadChunk(123, ProtocolVersion.VERSION_TWO)); + } + + @Test + public void write_singleChunk() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + + // Do the start handshake (equivalent to performWriteStartHandshake()). + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 123) + .setResourceId(2) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) + .setRemainingBytes(TEST_DATA_SHORT.size()) + .build()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setWindowEndOffset(1024) + .setMaxChunkSizeBytes(128)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 123) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0) + .build()); + + receiveWriteChunks(finalChunk(123, Status.OK)); + + assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + } + + @Test + public void write_requestV2ReceiveLegacy() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + + receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + .setOffset(0) + .setWindowEndOffset(1024) + .setMaxChunkSizeBytes(128), + legacyFinalChunk(2, Status.OK)); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + } + + @Test + public void write_platformTransferDisabled_aborted() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + assertThat(future.isDone()).isFalse(); + + shouldAbortFlag = true; + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2)); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ABORTED); + } + + @Test + public void write_failedPreconditionError_retriesInitialPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + for (int i = 0; i < MAX_RETRIES; ++i) { + receiveWriteServerError(Status.FAILED_PRECONDITION); + + assertThat(lastChunks()) + .containsExactly( + initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + } + + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 54321) + .setResourceId(2) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 54321) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) + .setRemainingBytes(TEST_DATA_SHORT.size()) + .build()); + } + + @Test + public void write_failedPreconditionError_abortsAfterInitialPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); + + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size())); + + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 4) + .setResourceId(2) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + receiveWriteServerError(Status.FAILED_PRECONDITION); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void write_failedPreconditionErrorMaxRetriesTimes_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + + for (int i = 0; i < MAX_RETRIES; ++i) { + receiveWriteServerError(Status.FAILED_PRECONDITION); + } + + Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()); + assertThat(lastChunks()) + .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); + + receiveWriteServerError(Status.FAILED_PRECONDITION); + + assertThat(lastChunks()).isEmpty(); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); + } + + @Test + public void write_empty() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, new byte[] {}); + + performWriteStartHandshake(2, 123, 0); + + receiveWriteChunks(finalChunk(123, Status.OK)); + + assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + } + + @Test + public void write_severalChunks() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(500, TEST_DATA_100B.toByteArray()); + + performWriteStartHandshake(500, 123, TEST_DATA_100B.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setWindowEndOffset(50) + .setMaxChunkSizeBytes(30) + .setMinDelayMicroseconds(1)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(50) + .setWindowEndOffset(90) + .setMaxChunkSizeBytes(25)); + + assertThat(lastChunks()) + .containsExactly( + newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(), + newChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build()); + + receiveWriteChunks( + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setWindowEndOffset(140)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 123) + .setOffset(90) + .setData(range(90, 100)) + .setRemainingBytes(0) + .build()); + + assertThat(future.isDone()).isFalse(); + + receiveWriteChunks(finalChunk(123, Status.OK)); + + assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + } + + @Test + public void write_parametersContinue() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(321, TEST_DATA_100B.toByteArray()); + + performWriteStartHandshake(321, 123, TEST_DATA_100B.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setWindowEndOffset(50) + .setMaxChunkSizeBytes(30) + .setMinDelayMicroseconds(1)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); + + receiveWriteChunks( + newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(30).setWindowEndOffset(80)); + + // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. + assertThat(lastChunks()) + .containsExactly( + newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build()); + + receiveWriteChunks( + newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(130)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 123) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0) + .build()); + + assertThat(future.isDone()).isFalse(); + + receiveWriteChunks(finalChunk(123, Status.OK)); + + assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + } + + @Test + public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + + performWriteStartHandshake(123, 555, TEST_DATA_100B.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555) + .setOffset(0) + .setWindowEndOffset(90) + .setMaxChunkSizeBytes(90) + .setMinDelayMicroseconds(1)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 555).setOffset(0).setData(range(0, 90)).build()); + + receiveWriteChunks( + // This stale packet with a window end before the offset should be ignored. + newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(25).setWindowEndOffset(50), + // Start from an arbitrary offset before the current, but extend the window to the end. + newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(80).setWindowEndOffset(100)); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.DATA, 555) + .setOffset(90) + .setData(range(90, 100)) + .setRemainingBytes(0) + .build()); + + receiveWriteChunks(finalChunk(555, Status.OK)); + assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 555).build()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + } + + @Test + public void write_progressCallbackIsCalled() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = + transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback); + + performWriteStartHandshake(123, 123, TEST_DATA_100B.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setWindowEndOffset(90) + .setMaxChunkSizeBytes(30), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setWindowEndOffset(100), + finalChunk(123, Status.OK)); + + verify(progressCallback, times(6)).accept(progress.capture()); + assertThat(progress.getAllValues()) + .containsExactly(TransferProgress.create(30, 0, 100), + TransferProgress.create(60, 0, 100), + TransferProgress.create(90, 0, 100), + TransferProgress.create(80, 50, 100), + TransferProgress.create(100, 50, 100), + TransferProgress.create(100, 100, 100)); + assertThat(future.isDone()).isTrue(); + } + + @Test + public void write_asksForFinalOffset_sendsFinalPacket() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + + performWriteStartHandshake(123, 456, TEST_DATA_100B.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 456) + .setOffset(100) + .setWindowEndOffset(140) + .setMaxChunkSizeBytes(25)); + + assertThat(lastChunks()) + .containsExactly( + newChunk(Chunk.Type.DATA, 456).setOffset(100).setRemainingBytes(0).build()); } @Test public void write_multipleWithSameId_sequentially_successful() throws Exception { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); for (int i = 0; i < 3; ++i) { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray()); + ListenableFuture<Void> future = transferClient.write(6, TEST_DATA_SHORT.toByteArray()); + + performWriteStartHandshake(6, 123, TEST_DATA_SHORT.size()); receiveWriteChunks( - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0).setPendingBytes(50), - finalChunk(ID, Status.OK)); + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setWindowEndOffset(50), + finalChunk(123, Status.OK)); + + assertThat(lastChunks()) + .containsExactly( + newChunk(Chunk.Type.DATA, 123).setData(TEST_DATA_SHORT).setRemainingBytes(0).build(), + newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); future.get(); } @@ -696,6 +1764,7 @@ public final class TransferClientTest { @Test public void write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); @@ -708,6 +1777,7 @@ public final class TransferClientTest { @Test public void write_sendErrorOnFirstPacket_failsImmediately() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ChannelOutputException exception = new ChannelOutputException("blah"); rpcClient.setChannelOutputException(exception); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); @@ -719,9 +1789,15 @@ public final class TransferClientTest { @Test public void write_serviceRequestsNoData_aborts() { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray()); + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray()); + + performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0)); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0)); + assertThat(lastChunks()).containsExactly(finalChunk(123, Status.INVALID_ARGUMENT)); + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); @@ -729,16 +1805,18 @@ public final class TransferClientTest { @Test public void write_invalidOffset_aborts() { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + performWriteStartHandshake(7, 123, TEST_DATA_100B.size()); + + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(101) - .setPendingBytes(40) + .setWindowEndOffset(141) .setMaxChunkSizeBytes(25)); - assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(ID, TEST_DATA_100B.size()), finalChunk(ID, Status.OUT_OF_RANGE)); + assertThat(lastChunks()).containsExactly(finalChunk(123, Status.OUT_OF_RANGE)); + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE); @@ -746,14 +1824,17 @@ public final class TransferClientTest { @Test public void write_sendErrorOnLaterPacket_aborts() { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray()); + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray()); + + performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size()); ChannelOutputException exception = new ChannelOutputException("blah"); rpcClient.setChannelOutputException(exception); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) - .setPendingBytes(50) + .setWindowEndOffset(50) .setMaxChunkSizeBytes(30)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); @@ -763,22 +1844,46 @@ public final class TransferClientTest { @Test public void write_cancelFuture_abortsTransfer() { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray()); + + performWriteStartHandshake(7, 123, TEST_DATA_100B.size()); assertThat(future.cancel(true)).isTrue(); + assertThat(future.isCancelled()).isTrue(); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) - .setPendingBytes(50) + .setWindowEndOffset(50) .setMaxChunkSizeBytes(50)); - assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED)); + + assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED)); + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); + } + + @Test + public void write_immediateTransferProtocolError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID) + .setResourceId(123) + .setStatus(Status.NOT_FOUND.ordinal())); + + ExecutionException thrown = assertThrows(ExecutionException.class, future::get); + assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); + + assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND); } @Test - public void write_protocolError_aborts() { - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray()); + public void write_laterTransferProtocolError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + + performWriteStartHandshake(123, 123, TEST_DATA_SHORT.size()); - receiveWriteChunks(finalChunk(ID, Status.NOT_FOUND)); + receiveWriteChunks(finalChunk(123, Status.NOT_FOUND)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -788,6 +1893,7 @@ public final class TransferClientTest { @Test public void write_rpcError_aborts() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); receiveWriteServerError(Status.NOT_FOUND); @@ -798,22 +1904,40 @@ public final class TransferClientTest { @Test public void write_unknownVersion_invalidArgument() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2).setProtocolVersion(3)); + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2).setProtocolVersion(9)); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(2, TEST_DATA_SHORT.size()), finalChunk(2, Status.INVALID_ARGUMENT)); + .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), + finalChunk(3, Status.INVALID_ARGUMENT)); + } + + @Test + public void write_serverRespondsWithUnknownVersion_invalidArgument() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); + + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, 100)); + + receiveWriteChunks( + newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613)); + + assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT)); + + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); } @Test public void write_timeoutAfterInitialChunk() { - transferClient = createTransferClient(1, 1); // Create a manager with a very short timeout. - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray()); + createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); + ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); // Call future.get() without sending any server-side packets. ExecutionException exception = assertThrows(ExecutionException.class, future::get); @@ -821,94 +1945,183 @@ public final class TransferClientTest { // Client should have resent the last chunk (the initial chunk in this case) for each timeout. assertThat(lastChunks()) - .containsExactly(initialWriteChunk(ID, TEST_DATA_SHORT.size()), // initial - initialWriteChunk(ID, TEST_DATA_SHORT.size()), // retry 1 - initialWriteChunk(ID, TEST_DATA_SHORT.size()), // retry 2 - finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort + .containsExactly( + initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial + initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // retry 1 + initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); // retry 2 } @Test - public void write_timeoutAfterSingleChunk() throws Exception { - transferClient.close(); - transferClient = createTransferClient(10, 10); // Create a manager with a very short timeout. + public void write_timeoutAfterSingleChunk() { + createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); - // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. + // Wait for two outgoing packets (Write RPC request and first chunk), then do the handshake. enqueueWriteChunks(2, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + newChunk(Chunk.Type.START_ACK, 123).setResourceId(9), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) - .setPendingBytes(90) + .setWindowEndOffset(90) .setMaxChunkSizeBytes(30)); - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray()); + ListenableFuture<Void> future = transferClient.write(9, TEST_DATA_SHORT.toByteArray()); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); - Chunk data = newChunk(Chunk.Type.DATA, ID) + Chunk data = newChunk(Chunk.Type.DATA, 123) .setOffset(0) .setData(TEST_DATA_SHORT) .setRemainingBytes(0) .build(); assertThat(lastChunks()) - .containsExactly(initialWriteChunk(ID, TEST_DATA_SHORT.size()), // initial + .containsExactly( + initialWriteChunk(9, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial + newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) + .setRemainingBytes(TEST_DATA_SHORT.size()) + .build(), data, // data chunk data, // retry 1 - data, // retry 2 - finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort + data); // retry 2 + } + + @Test + public void write_timeoutAndRecoverDuringHandshakes() throws Exception { + createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); + assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries + + // Wait for four outgoing packets (Write RPC request and START chunk + retry), then handshake. + enqueueWriteChunks(3, + newChunk(Chunk.Type.START_ACK, 123) + .setResourceId(5) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + // Wait for start ack confirmation + 2 retries, then request three packets. + enqueueWriteChunks(3, + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(0) + .setWindowEndOffset(60) + .setMaxChunkSizeBytes(20)); + + // After two packets, request the remainder of the packets. + enqueueWriteChunks( + 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(20).setWindowEndOffset(200)); + + // Wait for last 3 data packets, then 2 final packet retries. + enqueueWriteChunks(5, + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(80) + .setWindowEndOffset(200) + .setMaxChunkSizeBytes(20), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .setOffset(80) + .setWindowEndOffset(200) + .setMaxChunkSizeBytes(20)); + + // After the retry, confirm completed multiple times; additional packets should be dropped + enqueueWriteChunks(1, + newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), + newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), + newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), + newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code())); + + ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); + + assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. + + final Chunk startAckConfirmation = + newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) + .setRemainingBytes(TEST_DATA_100B.size()) + .build(); + + assertThat(lastChunks()) + .containsExactly( + // initial handshake with retries + initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), + initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), + startAckConfirmation, + startAckConfirmation, + startAckConfirmation, + // send all data + dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 + dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 + dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60 + dataChunk(123, TEST_DATA_100B, 60, 80), // data 60-80 + dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + // retry last packet two times + dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + // respond to two PARAMETERS_RETRANSMIT packets + dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + // respond to OK packet + newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); } @Test public void write_multipleTimeoutsAndRecoveries() throws Exception { - transferClient.close(); - transferClient = createTransferClient(10, 10); // Create a manager with short timeouts. + createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); + assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries - // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. + // Wait for two outgoing packets (Write RPC request and START chunk), then do the handshake. enqueueWriteChunks(2, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + newChunk(Chunk.Type.START_ACK, 123) + .setResourceId(5) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + // Request two packets. + enqueueWriteChunks(1, + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) - .setPendingBytes(40) + .setWindowEndOffset(40) .setMaxChunkSizeBytes(20)); // After the second retry, send more transfer parameters enqueueWriteChunks(4, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(40) - .setPendingBytes(80) + .setWindowEndOffset(120) .setMaxChunkSizeBytes(40)); // After the first retry, send more transfer parameters enqueueWriteChunks(3, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(80) - .setPendingBytes(80) + .setWindowEndOffset(160) .setMaxChunkSizeBytes(10)); // After the second retry, confirm completed - enqueueWriteChunks( - 4, newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setStatus(Status.OK.code())); + enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code())); + enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 123)); - ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray()); + ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. assertThat(lastChunks()) .containsExactly( - // initial - initialWriteChunk(ID, TEST_DATA_100B.size()), + // initial handshake + initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), + newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) + .setRemainingBytes(TEST_DATA_100B.size()) + .build(), // after 2, receive parameters: 40 from 0 by 20 - dataChunk(ID, TEST_DATA_100B, 0, 20), // data 0-20 - dataChunk(ID, TEST_DATA_100B, 20, 40), // data 20-40 - dataChunk(ID, TEST_DATA_100B, 20, 40), // retry 1 - dataChunk(ID, TEST_DATA_100B, 20, 40), // retry 2 + dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 + dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 + dataChunk(123, TEST_DATA_100B, 20, 40), // retry 1 + dataChunk(123, TEST_DATA_100B, 20, 40), // retry 2 // after 4, receive parameters: 80 from 40 by 40 - dataChunk(ID, TEST_DATA_100B, 40, 80), // data 40-80 - dataChunk(ID, TEST_DATA_100B, 80, 100), // data 80-100 - dataChunk(ID, TEST_DATA_100B, 80, 100), // retry 1 + dataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80 + dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 + dataChunk(123, TEST_DATA_100B, 80, 100), // retry 1 // after 3, receive parameters: 80 from 80 by 10 - dataChunk(ID, TEST_DATA_100B, 80, 90), // data 80-90 - dataChunk(ID, TEST_DATA_100B, 90, 100), // data 90-100 - dataChunk(ID, TEST_DATA_100B, 90, 100), // retry 1 - dataChunk(ID, TEST_DATA_100B, 90, 100)); // retry 2 - // after 4, receive final OK + dataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90 + dataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100 + dataChunk(123, TEST_DATA_100B, 90, 100), // retry 1 + dataChunk(123, TEST_DATA_100B, 90, 100), // retry 2 + // after 4, receive final OK + newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); } private static ByteString range(int startInclusive, int endExclusive) { @@ -922,39 +2135,79 @@ public final class TransferClientTest { return ByteString.copyFrom(bytes); } - private static Chunk.Builder newChunk(Chunk.Type type, int resourceId) { - return Chunk.newBuilder().setType(type).setTransferId(resourceId); + private static Chunk.Builder newLegacyChunk(Chunk.Type type, int transferId) { + return Chunk.newBuilder().setType(type).setTransferId(transferId); + } + + private static Chunk.Builder newChunk(Chunk.Type type, int sessionId) { + return Chunk.newBuilder().setType(type).setSessionId(sessionId); } - private static Chunk initialReadChunk(int resourceId) { - return initialReadChunk(resourceId, TRANSFER_PARAMETERS); + private static Chunk initialReadChunk(int resourceId, ProtocolVersion version) { + return initialReadChunk(resourceId, version, TRANSFER_PARAMETERS); } - private static Chunk initialReadChunk(int resourceId, TransferParameters params) { - Chunk.Builder chunk = newChunk(Chunk.Type.START, resourceId) + private static Chunk initialReadChunk( + int resourceId, ProtocolVersion version, TransferParameters params) { + Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId) .setResourceId(resourceId) .setPendingBytes(params.maxPendingBytes()) .setWindowEndOffset(params.maxPendingBytes()) .setMaxChunkSizeBytes(params.maxChunkSizeBytes()) .setOffset(0); + if (version != ProtocolVersion.LEGACY) { + chunk.setProtocolVersion(version.ordinal()); + } + if (params.chunkDelayMicroseconds() > 0) { + chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds()); + } + return chunk.build(); + } + private static Chunk readStartAckConfirmation(int sessionId, TransferParameters params) { + Chunk.Builder chunk = newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId) + .setWindowEndOffset(params.maxPendingBytes()) + .setMaxChunkSizeBytes(params.maxChunkSizeBytes()) + .setOffset(0) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()); if (params.chunkDelayMicroseconds() > 0) { chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds()); } return chunk.build(); } - private static Chunk initialWriteChunk(int resourceId, int size) { - return newChunk(Chunk.Type.START, resourceId) - .setResourceId(resourceId) - .setRemainingBytes(size) - .build(); + private static Chunk initialWriteChunk(int resourceId, ProtocolVersion version, int size) { + Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId) + .setResourceId(resourceId) + .setRemainingBytes(size); + if (version != ProtocolVersion.LEGACY) { + chunk.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()); + } + return chunk.build(); + } + + private static Chunk legacyFinalChunk(int sessionId, Status status) { + return newLegacyChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build(); } private static Chunk finalChunk(int sessionId, Status status) { return newChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build(); } + private static Chunk legacyDataChunk(int sessionId, ByteString data, int start, int end) { + if (start < 0 || end > data.size()) { + throw new IndexOutOfBoundsException("Invalid start or end"); + } + + Chunk.Builder chunk = newLegacyChunk(Chunk.Type.DATA, sessionId) + .setOffset(start) + .setData(data.substring(start, end)); + if (end == data.size()) { + chunk.setRemainingBytes(0); + } + return chunk.build(); + } + private static Chunk dataChunk(int sessionId, ByteString data, int start, int end) { if (start < 0 || end > data.size()) { throw new IndexOutOfBoundsException("Invalid start or end"); @@ -995,6 +2248,48 @@ public final class TransferClientTest { } } + private void performReadStartHandshake(int resourceId, int sessionId) { + performReadStartHandshake( + resourceId, sessionId, TransferClient.DEFAULT_READ_TRANSFER_PARAMETERS); + } + + private void performReadStartHandshake(int resourceId, int sessionId, TransferParameters params) { + assertThat(lastChunks()) + .containsExactly(initialReadChunk(resourceId, ProtocolVersion.VERSION_TWO, params)); + + receiveReadChunks(newChunk(Chunk.Type.START_ACK, sessionId) + .setResourceId(resourceId) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()).containsExactly(readStartAckConfirmation(sessionId, params)); + } + + private void performReadCompletionHandshake(int sessionId, Status status) { + assertThat(lastChunks()) + .containsExactly(Chunk.newBuilder() + .setType(Chunk.Type.COMPLETION) + .setSessionId(sessionId) + .setStatus(status.ordinal()) + .build()); + + receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, sessionId)); + } + + private void performWriteStartHandshake(int resourceId, int sessionId, int dataSize) { + assertThat(lastChunks()) + .containsExactly(initialWriteChunk(resourceId, ProtocolVersion.VERSION_TWO, dataSize)); + + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, sessionId) + .setResourceId(resourceId) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); + + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId) + .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) + .setRemainingBytes(dataSize) + .build()); + } + /** Receive these chunks after a chunk is sent. */ private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) { syncWithTransferThread( @@ -1006,13 +2301,29 @@ public final class TransferClientTest { return rpcClient.lastClientStreams(Chunk.class); } - private TransferClient createTransferClient( - int transferTimeoutMillis, int initialTransferTimeoutMillis) { - return new TransferClient(rpcClient.client().method(CHANNEL_ID, SERVICE + "/Read"), + private void createTransferClientThatMayTimeOut(ProtocolVersion version) { + createTransferClient(version, 1, 1, TransferEventHandler::runForTestsThatMustTimeOut); + } + + private void createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion version) { + createTransferClient(version, 60000, 60000, TransferEventHandler::run); + } + + private void createTransferClient(ProtocolVersion version, + int transferTimeoutMillis, + int initialTransferTimeoutMillis, + Consumer<TransferEventHandler> eventHandlerFunction) { + if (transferClient != null) { + throw new AssertionError("createTransferClient must only be called once!"); + } + transferClient = new TransferClient(rpcClient.client().method(CHANNEL_ID, SERVICE + "/Read"), rpcClient.client().method(CHANNEL_ID, SERVICE + "/Write"), transferTimeoutMillis, initialTransferTimeoutMillis, MAX_RETRIES, - () -> this.shouldAbortFlag); + () + -> this.shouldAbortFlag, + eventHandlerFunction); + transferClient.setProtocolVersion(version); } } |