diff options
author | Alexei Frolov <frolv@google.com> | 2022-03-25 11:12:06 -0700 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-03-28 22:03:00 +0000 |
commit | a6af061a2fdd1198d6214fe03a4e2afaecbf3f55 (patch) | |
tree | 4fa4a50775ea1aed03ea85d8056f1584bfa60f66 | |
parent | 5f49391161bf53c31323aa367f594bc91877af6a (diff) | |
download | pigweed-a6af061a2fdd1198d6214fe03a4e2afaecbf3f55.tar.gz |
pw_transfer: Always set chunk type field
This updates the implementations of pw_transfer to always set a chunk
type when encoding a chunk proto, including a new completion type. No
significant behavioral changes are made in response to the chunk type
field -- these will be rolled out in a future change.
Change-Id: I9d39eecff814cfc21285eb24ff7a0b5166ce1ff4
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/89101
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
-rw-r--r-- | pw_transfer/context.cc | 40 | ||||
-rw-r--r-- | pw_transfer/public/pw_transfer/internal/chunk.h | 8 | ||||
-rw-r--r-- | pw_transfer/public/pw_transfer/internal/context.h | 9 | ||||
-rw-r--r-- | pw_transfer/py/pw_transfer/transfer.py | 39 | ||||
-rw-r--r-- | pw_transfer/py/tests/transfer_test.py | 20 | ||||
-rw-r--r-- | pw_transfer/transfer.proto | 19 | ||||
-rw-r--r-- | pw_transfer/transfer_test.cc | 104 | ||||
-rw-r--r-- | pw_transfer/ts/transfer.ts | 25 | ||||
-rw-r--r-- | pw_transfer/ts/transfer_test.ts | 3 |
9 files changed, 168 insertions, 99 deletions
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc index 624a977e2..04027036e 100644 --- a/pw_transfer/context.cc +++ b/pw_transfer/context.cc @@ -80,7 +80,7 @@ void Context::InitiateTransferAsClient() { if (type() == TransferType::kReceive) { // A receiver begins a new transfer with a parameters chunk telling the // transmitter what to send. - UpdateAndSendTransferParameters(kRetransmit); + UpdateAndSendTransferParameters(TransmitAction::kBegin); } else { SendInitialTransmitChunk(); } @@ -118,23 +118,33 @@ void Context::SendInitialTransmitChunk() { // A transmitter begins a transfer by just sending its ID. internal::Chunk chunk = {}; chunk.transfer_id = transfer_id_; + chunk.type = Chunk::Type::kTransferStart; EncodeAndSendChunk(chunk); } void Context::SendTransferParameters(TransmitAction action) { - const internal::Chunk parameters = { + internal::Chunk parameters = { .transfer_id = transfer_id_, .window_end_offset = window_end_offset_, .pending_bytes = pending_bytes_, .max_chunk_size_bytes = max_chunk_size_bytes_, .min_delay_microseconds = kDefaultChunkDelayMicroseconds, .offset = offset_, - .type = action == kRetransmit - ? internal::Chunk::Type::kParametersRetransmit - : internal::Chunk::Type::kParametersContinue, }; + switch (action) { + case TransmitAction::kBegin: + parameters.type = internal::Chunk::Type::kTransferStart; + break; + case TransmitAction::kRetransmit: + parameters.type = internal::Chunk::Type::kParametersRetransmit; + break; + case TransmitAction::kExtend: + parameters.type = internal::Chunk::Type::kParametersContinue; + break; + } + PW_LOG_DEBUG( "Transfer %u sending transfer parameters: " "offset=%u, window_end_offset=%u, pending_bytes=%u, chunk_size=%u", @@ -287,7 +297,8 @@ void Context::HandleTransferParametersUpdate(const Chunk& chunk) { bool retransmit = true; if (chunk.type.has_value()) { - retransmit = chunk.type == Chunk::Type::kParametersRetransmit; + retransmit = chunk.type == Chunk::Type::kParametersRetransmit || + chunk.type == Chunk::Type::kTransferStart; } if (retransmit) { @@ -360,6 +371,12 @@ void Context::TransmitNextChunk(bool retransmit_requested) { encoder.WriteTransferId(transfer_id_).IgnoreError(); encoder.WriteOffset(offset_).IgnoreError(); + // TODO(frolv): Type field presence is currently meaningful, so this type must + // be serialized. Once all users of transfer always set chunk types, the field + // can be made non-optional and this write can be removed as TRANSFER_DATA has + // the default proto value of 0. + encoder.WriteType(transfer::Chunk::Type::TRANSFER_DATA).IgnoreError(); + // Reserve space for the data proto field overhead and use the remainder of // the buffer for the chunk data. size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */; @@ -473,7 +490,7 @@ void Context::HandleReceiveChunk(const Chunk& chunk) { static_cast<unsigned>(transfer_id_), static_cast<unsigned>(chunk.offset)); - UpdateAndSendTransferParameters(kRetransmit); + UpdateAndSendTransferParameters(TransmitAction::kRetransmit); if (transfer_state_ == TransferState::kCompleted) { SendFinalStatusChunk(); return; @@ -531,7 +548,7 @@ void Context::HandleReceivedData(const Chunk& chunk) { set_transfer_state(TransferState::kRecovery); SetTimeout(chunk_timeout_); - UpdateAndSendTransferParameters(kRetransmit); + UpdateAndSendTransferParameters(TransmitAction::kRetransmit); return; } @@ -598,7 +615,7 @@ void Context::HandleReceivedData(const Chunk& chunk) { if (pending_bytes_ == 0u) { // Received all pending data. Advance the transfer parameters. - UpdateAndSendTransferParameters(kRetransmit); + UpdateAndSendTransferParameters(TransmitAction::kRetransmit); return; } @@ -609,7 +626,7 @@ void Context::HandleReceivedData(const Chunk& chunk) { window_size_ / max_parameters_->extend_window_divisor(); if (extend_window) { - UpdateAndSendTransferParameters(kExtend); + UpdateAndSendTransferParameters(TransmitAction::kExtend); return; } } @@ -620,6 +637,7 @@ void Context::SendFinalStatusChunk() { internal::Chunk chunk = {}; chunk.transfer_id = transfer_id_; chunk.status = status_.code(); + chunk.type = Chunk::Type::kTransferCompletion; PW_LOG_DEBUG("Sending final chunk for transfer %u with status %u", static_cast<unsigned>(transfer_id_), @@ -699,7 +717,7 @@ void Context::Retry() { "Receive transfer %u timed out waiting for chunk; resending parameters", static_cast<unsigned>(transfer_id_)); - SendTransferParameters(kRetransmit); + SendTransferParameters(TransmitAction::kRetransmit); return; } diff --git a/pw_transfer/public/pw_transfer/internal/chunk.h b/pw_transfer/public/pw_transfer/internal/chunk.h index f4a0ee960..b20acf44f 100644 --- a/pw_transfer/public/pw_transfer/internal/chunk.h +++ b/pw_transfer/public/pw_transfer/internal/chunk.h @@ -28,13 +28,17 @@ struct Chunk { kTransferStart = 1, kParametersRetransmit = 2, kParametersContinue = 3, + kTransferCompletion = 4, + kTransferCompletionAck = 5, // Currently unused. }; // The initial chunk always has an offset of 0 and no data or status. // - // Pending bytes is required in all read chunks, so that is checked elsewhere. + // TODO(frolv): Going forward, all users of transfer should set a type for + // all chunks. This initial chunk assumption should be removed. constexpr bool IsInitialChunk() const { - return offset == 0 && data.empty() && !status.has_value(); + return type == Type::kTransferStart || + (offset == 0 && data.empty() && !status.has_value()); } // The final chunk from the transmitter sets remaining_bytes to 0 in both Read diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h index 0a4d2b453..f825419dd 100644 --- a/pw_transfer/public/pw_transfer/internal/context.h +++ b/pw_transfer/public/pw_transfer/internal/context.h @@ -149,7 +149,14 @@ class Context { kRecovery, }; - enum TransmitAction : bool { kExtend, kRetransmit }; + enum class TransmitAction { + // Start of a new transfer. + kBegin, + // Extend the current window length. + kExtend, + // Retransmit from a specified offset. + kRetransmit, + }; void set_transfer_state(TransferState state) { transfer_state_ = state; } diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py index bd3fa1d26..06574f4e7 100644 --- a/pw_transfer/py/pw_transfer/transfer.py +++ b/pw_transfer/py/pw_transfer/transfer.py @@ -1,4 +1,4 @@ -# Copyright 2021 The Pigweed Authors +# Copyright 2022 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of @@ -196,7 +196,10 @@ class Transfer(abc.ABC): def _send_error(self, error: Status) -> None: """Sends an error chunk to the server and finishes the transfer.""" - self._send_chunk(Chunk(transfer_id=self.id, status=error.value)) + self._send_chunk( + Chunk(transfer_id=self.id, + status=error.value, + type=Chunk.Type.TRANSFER_COMPLETION)) self.finish(error) @@ -236,7 +239,7 @@ class WriteTransfer(Transfer): return self._data def _initial_chunk(self) -> Chunk: - return Chunk(transfer_id=self.id) + return Chunk(transfer_id=self.id, type=Chunk.Type.TRANSFER_START) async def _handle_data_chunk(self, chunk: Chunk) -> None: """Processes an incoming chunk from the server. @@ -286,7 +289,8 @@ class WriteTransfer(Transfer): retransmit = True if chunk.HasField('type'): - retransmit = chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT + retransmit = (chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT + or chunk.type == Chunk.Type.TRANSFER_START) if chunk.offset > len(self.data): # Bad offset; terminate the transfer. @@ -339,7 +343,9 @@ class WriteTransfer(Transfer): def _next_chunk(self) -> Chunk: """Returns the next Chunk message to send in the data transfer.""" - chunk = Chunk(transfer_id=self.id, offset=self._offset) + chunk = Chunk(transfer_id=self.id, + offset=self._offset, + type=Chunk.Type.TRANSFER_DATA) max_bytes_in_chunk = min(self._max_chunk_size, self._window_end_offset - self._offset) @@ -400,7 +406,7 @@ class ReadTransfer(Transfer): return bytes(self._data) def _initial_chunk(self) -> Chunk: - return self._transfer_parameters() + return self._transfer_parameters(Chunk.Type.TRANSFER_START) async def _handle_data_chunk(self, chunk: Chunk) -> None: """Processes an incoming chunk from the server. @@ -413,7 +419,8 @@ class ReadTransfer(Transfer): # Initially, the transfer service only supports in-order transfers. # If data is received out of order, request that the server # retransmit from the previous offset. - self._send_chunk(self._transfer_parameters()) + self._send_chunk( + self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) return self._data += chunk.data @@ -424,7 +431,9 @@ class ReadTransfer(Transfer): if chunk.remaining_bytes == 0: # No more data to read. Acknowledge receipt and finish. self._send_chunk( - Chunk(transfer_id=self.id, status=Status.OK.value)) + Chunk(transfer_id=self.id, + status=Status.OK.value, + type=Chunk.Type.TRANSFER_COMPLETION)) self.finish(Status.OK) return @@ -469,22 +478,22 @@ class ReadTransfer(Transfer): if self._pending_bytes == 0: # All pending data was received. Send out a new parameters chunk for # the next block. - self._send_chunk(self._transfer_parameters()) + self._send_chunk( + self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) elif extend_window: - self._send_chunk(self._transfer_parameters(extend=True)) + self._send_chunk( + self._transfer_parameters(Chunk.Type.PARAMETERS_CONTINUE)) def _retry_after_timeout(self) -> None: - self._send_chunk(self._transfer_parameters()) + self._send_chunk( + self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) - def _transfer_parameters(self, extend: bool = False) -> Chunk: + def _transfer_parameters(self, chunk_type: Any) -> Chunk: """Sends an updated transfer parameters chunk to the server.""" self._pending_bytes = self._max_bytes_to_receive self._window_end_offset = self._offset + self._max_bytes_to_receive - chunk_type = (Chunk.Type.PARAMETERS_CONTINUE - if extend else Chunk.Type.PARAMETERS_RETRANSMIT) - chunk = Chunk(transfer_id=self.id, pending_bytes=self._pending_bytes, window_end_offset=self._window_end_offset, diff --git a/pw_transfer/py/tests/transfer_test.py b/pw_transfer/py/tests/transfer_test.py index 718b276bf..e73c09ddf 100644 --- a/pw_transfer/py/tests/transfer_test.py +++ b/pw_transfer/py/tests/transfer_test.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2021 The Pigweed Authors +# Copyright 2022 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of @@ -462,9 +462,12 @@ class TransferManagerTest(unittest.TestCase): self.assertEqual( self._sent_chunks, [ - Chunk(transfer_id=22), # initial chunk - Chunk(transfer_id=22), # retry 1 - Chunk(transfer_id=22), # retry 2 + Chunk(transfer_id=22, + type=Chunk.Type.TRANSFER_START), # initial chunk + Chunk(transfer_id=22, + type=Chunk.Type.TRANSFER_START), # retry 1 + Chunk(transfer_id=22, + type=Chunk.Type.TRANSFER_START), # retry 2 ]) exception = context.exception @@ -489,13 +492,16 @@ class TransferManagerTest(unittest.TestCase): last_data_chunk = Chunk(transfer_id=22, data=b'56789', offset=5, - remaining_bytes=0) + remaining_bytes=0, + type=Chunk.Type.TRANSFER_DATA) self.assertEqual( self._sent_chunks, [ - Chunk(transfer_id=22), # start transfer - Chunk(transfer_id=22, data=b'01234'), + Chunk(transfer_id=22, type=Chunk.Type.TRANSFER_START), + Chunk(transfer_id=22, + data=b'01234', + type=Chunk.Type.TRANSFER_DATA), last_data_chunk, # last chunk last_data_chunk, # retry 1 last_data_chunk, # retry 2 diff --git a/pw_transfer/transfer.proto b/pw_transfer/transfer.proto index 8a9d35500..6f598a071 100644 --- a/pw_transfer/transfer.proto +++ b/pw_transfer/transfer.proto @@ -140,17 +140,24 @@ message Chunk { // Chunk containing transfer data. TRANSFER_DATA = 0; - // First chunk of a transfer (only sent by the client). Currently unused. + // First chunk of a transfer (only sent by the client). TRANSFER_START = 1; - // Transfer parameters indicating that the sender should retransmit from the - // specified offset. + // Transfer parameters indicating that the transmitter should retransmit + // from the specified offset. PARAMETERS_RETRANSMIT = 2; - // Transfer parameters telling the sender to continue sending up to index - // `offset + pending_bytes` of data. If the sender is already beyond offset, - // it does not have to rewind. + // Transfer parameters telling the transmitter to continue sending up to + // index `offset + pending_bytes` of data. If the transmitter is already + // beyond `offset`, it does not have to rewind. PARAMETERS_CONTINUE = 3; + + // Sender of the chunk is terminating the transfer. + TRANSFER_COMPLETION = 4; + + // Acknowledge the completion of a transfer. Currently unused. + // TODO(konkers): Implement this behavior. + TRANSFER_COMPLETION_ACK = 5; }; // The type of this chunk. This field should only be processed when present. diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc index 725f32e50..76cfb0113 100644 --- a/pw_transfer/transfer_test.cc +++ b/pw_transfer/transfer_test.cc @@ -140,12 +140,11 @@ class ReadTransfer : public ::testing::Test { TEST_F(ReadTransfer, SingleChunk) { rpc::test::WaitForPackets(ctx_.output(), 2, [this] { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, - .window_end_offset = 64, - .pending_bytes = 64, - .offset = 0, - .type = Chunk::Type::kParametersRetransmit})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .window_end_offset = 64, + .pending_bytes = 64, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); }); @@ -178,8 +177,10 @@ TEST_F(ReadTransfer, SingleChunk) { TEST_F(ReadTransfer, PendingBytes_SingleChunk) { rpc::test::WaitForPackets(ctx_.output(), 2, [this] { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .pending_bytes = 64, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); }); @@ -211,12 +212,11 @@ TEST_F(ReadTransfer, PendingBytes_SingleChunk) { } TEST_F(ReadTransfer, MultiChunk) { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, - .window_end_offset = 16, - .pending_bytes = 16, - .offset = 0, - .type = Chunk::Type::kParametersRetransmit})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .window_end_offset = 16, + .pending_bytes = 16, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); @@ -271,12 +271,11 @@ TEST_F(ReadTransfer, MultiChunk) { } TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, - .window_end_offset = 16, - .pending_bytes = 16, - .offset = 0, - .type = Chunk::Type::kParametersRetransmit})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .window_end_offset = 16, + .pending_bytes = 16, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); @@ -403,7 +402,8 @@ TEST_F(ReadTransfer, MaxChunkSize_Client) { ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .max_chunk_size_bytes = 8, - .offset = 0})); + .offset = 0, + .type = Chunk::Type::kTransferStart})); }); EXPECT_TRUE(handler_.prepare_read_called); @@ -449,12 +449,11 @@ TEST_F(ReadTransfer, MaxChunkSize_Client) { } TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, - .window_end_offset = 64, - .pending_bytes = 64, - .offset = 0, - .type = Chunk::Type::kParametersRetransmit})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .window_end_offset = 64, + .pending_bytes = 64, + .offset = 0, + .type = Chunk::Type::kTransferStart})); ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()})); transfer_thread_.WaitUntilEventIsProcessed(); @@ -468,12 +467,11 @@ TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) { handler_.prepare_read_called = false; handler_.finalize_read_called = false; - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, - .window_end_offset = 64, - .pending_bytes = 64, - .offset = 0, - .type = Chunk::Type::kParametersRetransmit})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .window_end_offset = 64, + .pending_bytes = 64, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); // Prepare failed, so the handler should not have been stored in the context, @@ -493,7 +491,8 @@ TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) { ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .max_chunk_size_bytes = 16, - .offset = 0})); + .offset = 0, + .type = Chunk::Type::kTransferStart})); }); EXPECT_TRUE(handler_.prepare_read_called); @@ -539,8 +538,10 @@ TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) { } TEST_F(ReadTransfer, ClientError) { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .pending_bytes = 16, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); @@ -575,8 +576,10 @@ TEST_F(ReadTransfer, MalformedParametersChunk) { } TEST_F(ReadTransfer, UnregisteredHandler) { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 11, .pending_bytes = 32, .offset = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 11, + .pending_bytes = 32, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(ctx_.total_responses(), 1u); @@ -599,8 +602,10 @@ TEST_F(ReadTransfer, IgnoresNonPendingTransfers) { } TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .pending_bytes = 16, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(ctx_.total_responses(), 1u); @@ -610,7 +615,10 @@ TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) { handler_.prepare_read_called = false; // Reset so can check if called again. ctx_.SendClientStream( // Resend starting chunk - EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0})); + EncodeChunk({.transfer_id = 3, + .pending_bytes = 16, + .offset = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(ctx_.total_responses(), 2u); @@ -631,7 +639,9 @@ TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) { } TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) { - ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .pending_bytes = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(ctx_.total_responses(), 1u); @@ -646,7 +656,9 @@ TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) { // Make the next read appear to be the end of the stream. handler_.set_read_status(Status::OutOfRange()); - ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .pending_bytes = 0, + .type = Chunk::Type::kTransferStart})); transfer_thread_.WaitUntilEventIsProcessed(); Chunk chunk = DecodeChunk(ctx_.responses().back()); @@ -663,8 +675,10 @@ TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) { TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) { rpc::test::WaitForPackets(ctx_.output(), 2, [this] { - ctx_.SendClientStream( - EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 0})); + ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, + .pending_bytes = 64, + .offset = 0, + .type = Chunk::Type::kTransferStart})); }); EXPECT_TRUE(handler_.prepare_read_called); diff --git a/pw_transfer/ts/transfer.ts b/pw_transfer/ts/transfer.ts index e8caaf21c..708f5cfcb 100644 --- a/pw_transfer/ts/transfer.ts +++ b/pw_transfer/ts/transfer.ts @@ -1,4 +1,4 @@ -// Copyright 2021 The Pigweed Authors +// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of @@ -138,6 +138,7 @@ export abstract class Transfer { const chunk = new Chunk(); chunk.setStatus(error); chunk.setTransferId(this.id); + chunk.setType(Chunk.Type.TRANSFER_COMPLETION); this.sendChunk(chunk); this.finish(error); } @@ -252,25 +253,21 @@ export class ReadTransfer extends Transfer { } protected get initialChunk(): Chunk { - return this.transferParameters(); + return this.transferParameters(Chunk.Type.TRANSFER_START); } /** Builds an updated transfer parameters chunk to send the server. */ - private transferParameters(extend = false): Chunk { + private transferParameters(type: Chunk.TypeMap[keyof Chunk.TypeMap]): Chunk { this.pendingBytes = this.maxBytesToReceive; this.windowEndOffset = this.offset + this.maxBytesToReceive; - const chunkType = extend - ? Chunk.Type.PARAMETERS_CONTINUE - : Chunk.Type.PARAMETERS_RETRANSMIT; - const chunk = new Chunk(); chunk.setTransferId(this.id); chunk.setPendingBytes(this.pendingBytes); chunk.setMaxChunkSizeBytes(this.maxChunkSize); chunk.setOffset(this.offset); chunk.setWindowEndOffset(this.windowEndOffset); - chunk.setType(chunkType); + chunk.setType(type); if (this.chunkDelayMicroS !== 0) { chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!); @@ -289,7 +286,7 @@ export class ReadTransfer extends Transfer { // Initially, the transfer service only supports in-order transfers. // If data is received out of order, request that the server // retransmit from the previous offset. - this.sendChunk(this.transferParameters()); + this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); return; } @@ -308,6 +305,7 @@ export class ReadTransfer extends Transfer { const endChunk = new Chunk(); endChunk.setTransferId(this.id); endChunk.setStatus(Status.OK); + endChunk.setType(Chunk.Type.TRANSFER_COMPLETION); this.sendChunk(endChunk); this.finish(Status.OK); return; @@ -366,14 +364,14 @@ export class ReadTransfer extends Transfer { if (this.pendingBytes === 0) { // All pending data was received. Send out a new parameters chunk // for the next block. - this.sendChunk(this.transferParameters()); + this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); } else if (extendWindow) { - this.sendChunk(this.transferParameters(/*extend=*/ true)); + this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE)); } } protected retryAfterTimeout(): void { - this.sendChunk(this.transferParameters()); + this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); } } @@ -406,6 +404,7 @@ export class WriteTransfer extends Transfer { protected get initialChunk(): Chunk { const chunk = new Chunk(); chunk.setTransferId(this.id); + chunk.setType(Chunk.Type.TRANSFER_START); return chunk; } @@ -516,6 +515,8 @@ export class WriteTransfer extends Transfer { const chunk = new Chunk(); chunk.setTransferId(this.id); chunk.setOffset(this.offset); + chunk.setType(Chunk.Type.TRANSFER_DATA); + const maxBytesInChunk = Math.min( this.maxChunkSize, this.windowEndOffset - this.offset diff --git a/pw_transfer/ts/transfer_test.ts b/pw_transfer/ts/transfer_test.ts index 2eefdfdc6..1707a82f7 100644 --- a/pw_transfer/ts/transfer_test.ts +++ b/pw_transfer/ts/transfer_test.ts @@ -613,14 +613,17 @@ describe('Encoder', () => { .catch(error => { const expectedChunk1 = new Chunk(); expectedChunk1.setTransferId(22); + expectedChunk1.setType(Chunk.Type.TRANSFER_START); const expectedChunk2 = new Chunk(); expectedChunk2.setTransferId(22); expectedChunk2.setData(textEncoder.encode('01234')); + expectedChunk2.setType(Chunk.Type.TRANSFER_DATA); const lastChunk = new Chunk(); lastChunk.setTransferId(22); lastChunk.setData(textEncoder.encode('56789')); lastChunk.setOffset(5); lastChunk.setRemainingBytes(0); + lastChunk.setType(Chunk.Type.TRANSFER_DATA); const expectedChunks = [ expectedChunk1, |