aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexei Frolov <frolv@google.com>2022-03-25 11:12:06 -0700
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-03-28 22:03:00 +0000
commita6af061a2fdd1198d6214fe03a4e2afaecbf3f55 (patch)
tree4fa4a50775ea1aed03ea85d8056f1584bfa60f66
parent5f49391161bf53c31323aa367f594bc91877af6a (diff)
downloadpigweed-a6af061a2fdd1198d6214fe03a4e2afaecbf3f55.tar.gz
pw_transfer: Always set chunk type field
This updates the implementations of pw_transfer to always set a chunk type when encoding a chunk proto, including a new completion type. No significant behavioral changes are made in response to the chunk type field -- these will be rolled out in a future change. Change-Id: I9d39eecff814cfc21285eb24ff7a0b5166ce1ff4 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/89101 Reviewed-by: Keir Mierle <keir@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com> Commit-Queue: Alexei Frolov <frolv@google.com>
-rw-r--r--pw_transfer/context.cc40
-rw-r--r--pw_transfer/public/pw_transfer/internal/chunk.h8
-rw-r--r--pw_transfer/public/pw_transfer/internal/context.h9
-rw-r--r--pw_transfer/py/pw_transfer/transfer.py39
-rw-r--r--pw_transfer/py/tests/transfer_test.py20
-rw-r--r--pw_transfer/transfer.proto19
-rw-r--r--pw_transfer/transfer_test.cc104
-rw-r--r--pw_transfer/ts/transfer.ts25
-rw-r--r--pw_transfer/ts/transfer_test.ts3
9 files changed, 168 insertions, 99 deletions
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index 624a977e2..04027036e 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -80,7 +80,7 @@ void Context::InitiateTransferAsClient() {
if (type() == TransferType::kReceive) {
// A receiver begins a new transfer with a parameters chunk telling the
// transmitter what to send.
- UpdateAndSendTransferParameters(kRetransmit);
+ UpdateAndSendTransferParameters(TransmitAction::kBegin);
} else {
SendInitialTransmitChunk();
}
@@ -118,23 +118,33 @@ void Context::SendInitialTransmitChunk() {
// A transmitter begins a transfer by just sending its ID.
internal::Chunk chunk = {};
chunk.transfer_id = transfer_id_;
+ chunk.type = Chunk::Type::kTransferStart;
EncodeAndSendChunk(chunk);
}
void Context::SendTransferParameters(TransmitAction action) {
- const internal::Chunk parameters = {
+ internal::Chunk parameters = {
.transfer_id = transfer_id_,
.window_end_offset = window_end_offset_,
.pending_bytes = pending_bytes_,
.max_chunk_size_bytes = max_chunk_size_bytes_,
.min_delay_microseconds = kDefaultChunkDelayMicroseconds,
.offset = offset_,
- .type = action == kRetransmit
- ? internal::Chunk::Type::kParametersRetransmit
- : internal::Chunk::Type::kParametersContinue,
};
+ switch (action) {
+ case TransmitAction::kBegin:
+ parameters.type = internal::Chunk::Type::kTransferStart;
+ break;
+ case TransmitAction::kRetransmit:
+ parameters.type = internal::Chunk::Type::kParametersRetransmit;
+ break;
+ case TransmitAction::kExtend:
+ parameters.type = internal::Chunk::Type::kParametersContinue;
+ break;
+ }
+
PW_LOG_DEBUG(
"Transfer %u sending transfer parameters: "
"offset=%u, window_end_offset=%u, pending_bytes=%u, chunk_size=%u",
@@ -287,7 +297,8 @@ void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
bool retransmit = true;
if (chunk.type.has_value()) {
- retransmit = chunk.type == Chunk::Type::kParametersRetransmit;
+ retransmit = chunk.type == Chunk::Type::kParametersRetransmit ||
+ chunk.type == Chunk::Type::kTransferStart;
}
if (retransmit) {
@@ -360,6 +371,12 @@ void Context::TransmitNextChunk(bool retransmit_requested) {
encoder.WriteTransferId(transfer_id_).IgnoreError();
encoder.WriteOffset(offset_).IgnoreError();
+ // TODO(frolv): Type field presence is currently meaningful, so this type must
+ // be serialized. Once all users of transfer always set chunk types, the field
+ // can be made non-optional and this write can be removed as TRANSFER_DATA has
+ // the default proto value of 0.
+ encoder.WriteType(transfer::Chunk::Type::TRANSFER_DATA).IgnoreError();
+
// Reserve space for the data proto field overhead and use the remainder of
// the buffer for the chunk data.
size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */;
@@ -473,7 +490,7 @@ void Context::HandleReceiveChunk(const Chunk& chunk) {
static_cast<unsigned>(transfer_id_),
static_cast<unsigned>(chunk.offset));
- UpdateAndSendTransferParameters(kRetransmit);
+ UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
if (transfer_state_ == TransferState::kCompleted) {
SendFinalStatusChunk();
return;
@@ -531,7 +548,7 @@ void Context::HandleReceivedData(const Chunk& chunk) {
set_transfer_state(TransferState::kRecovery);
SetTimeout(chunk_timeout_);
- UpdateAndSendTransferParameters(kRetransmit);
+ UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
return;
}
@@ -598,7 +615,7 @@ void Context::HandleReceivedData(const Chunk& chunk) {
if (pending_bytes_ == 0u) {
// Received all pending data. Advance the transfer parameters.
- UpdateAndSendTransferParameters(kRetransmit);
+ UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
return;
}
@@ -609,7 +626,7 @@ void Context::HandleReceivedData(const Chunk& chunk) {
window_size_ / max_parameters_->extend_window_divisor();
if (extend_window) {
- UpdateAndSendTransferParameters(kExtend);
+ UpdateAndSendTransferParameters(TransmitAction::kExtend);
return;
}
}
@@ -620,6 +637,7 @@ void Context::SendFinalStatusChunk() {
internal::Chunk chunk = {};
chunk.transfer_id = transfer_id_;
chunk.status = status_.code();
+ chunk.type = Chunk::Type::kTransferCompletion;
PW_LOG_DEBUG("Sending final chunk for transfer %u with status %u",
static_cast<unsigned>(transfer_id_),
@@ -699,7 +717,7 @@ void Context::Retry() {
"Receive transfer %u timed out waiting for chunk; resending parameters",
static_cast<unsigned>(transfer_id_));
- SendTransferParameters(kRetransmit);
+ SendTransferParameters(TransmitAction::kRetransmit);
return;
}
diff --git a/pw_transfer/public/pw_transfer/internal/chunk.h b/pw_transfer/public/pw_transfer/internal/chunk.h
index f4a0ee960..b20acf44f 100644
--- a/pw_transfer/public/pw_transfer/internal/chunk.h
+++ b/pw_transfer/public/pw_transfer/internal/chunk.h
@@ -28,13 +28,17 @@ struct Chunk {
kTransferStart = 1,
kParametersRetransmit = 2,
kParametersContinue = 3,
+ kTransferCompletion = 4,
+ kTransferCompletionAck = 5, // Currently unused.
};
// The initial chunk always has an offset of 0 and no data or status.
//
- // Pending bytes is required in all read chunks, so that is checked elsewhere.
+ // TODO(frolv): Going forward, all users of transfer should set a type for
+ // all chunks. This initial chunk assumption should be removed.
constexpr bool IsInitialChunk() const {
- return offset == 0 && data.empty() && !status.has_value();
+ return type == Type::kTransferStart ||
+ (offset == 0 && data.empty() && !status.has_value());
}
// The final chunk from the transmitter sets remaining_bytes to 0 in both Read
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index 0a4d2b453..f825419dd 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -149,7 +149,14 @@ class Context {
kRecovery,
};
- enum TransmitAction : bool { kExtend, kRetransmit };
+ enum class TransmitAction {
+ // Start of a new transfer.
+ kBegin,
+ // Extend the current window length.
+ kExtend,
+ // Retransmit from a specified offset.
+ kRetransmit,
+ };
void set_transfer_state(TransferState state) { transfer_state_ = state; }
diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py
index bd3fa1d26..06574f4e7 100644
--- a/pw_transfer/py/pw_transfer/transfer.py
+++ b/pw_transfer/py/pw_transfer/transfer.py
@@ -1,4 +1,4 @@
-# Copyright 2021 The Pigweed Authors
+# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
@@ -196,7 +196,10 @@ class Transfer(abc.ABC):
def _send_error(self, error: Status) -> None:
"""Sends an error chunk to the server and finishes the transfer."""
- self._send_chunk(Chunk(transfer_id=self.id, status=error.value))
+ self._send_chunk(
+ Chunk(transfer_id=self.id,
+ status=error.value,
+ type=Chunk.Type.TRANSFER_COMPLETION))
self.finish(error)
@@ -236,7 +239,7 @@ class WriteTransfer(Transfer):
return self._data
def _initial_chunk(self) -> Chunk:
- return Chunk(transfer_id=self.id)
+ return Chunk(transfer_id=self.id, type=Chunk.Type.TRANSFER_START)
async def _handle_data_chunk(self, chunk: Chunk) -> None:
"""Processes an incoming chunk from the server.
@@ -286,7 +289,8 @@ class WriteTransfer(Transfer):
retransmit = True
if chunk.HasField('type'):
- retransmit = chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT
+ retransmit = (chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT
+ or chunk.type == Chunk.Type.TRANSFER_START)
if chunk.offset > len(self.data):
# Bad offset; terminate the transfer.
@@ -339,7 +343,9 @@ class WriteTransfer(Transfer):
def _next_chunk(self) -> Chunk:
"""Returns the next Chunk message to send in the data transfer."""
- chunk = Chunk(transfer_id=self.id, offset=self._offset)
+ chunk = Chunk(transfer_id=self.id,
+ offset=self._offset,
+ type=Chunk.Type.TRANSFER_DATA)
max_bytes_in_chunk = min(self._max_chunk_size,
self._window_end_offset - self._offset)
@@ -400,7 +406,7 @@ class ReadTransfer(Transfer):
return bytes(self._data)
def _initial_chunk(self) -> Chunk:
- return self._transfer_parameters()
+ return self._transfer_parameters(Chunk.Type.TRANSFER_START)
async def _handle_data_chunk(self, chunk: Chunk) -> None:
"""Processes an incoming chunk from the server.
@@ -413,7 +419,8 @@ class ReadTransfer(Transfer):
# Initially, the transfer service only supports in-order transfers.
# If data is received out of order, request that the server
# retransmit from the previous offset.
- self._send_chunk(self._transfer_parameters())
+ self._send_chunk(
+ self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT))
return
self._data += chunk.data
@@ -424,7 +431,9 @@ class ReadTransfer(Transfer):
if chunk.remaining_bytes == 0:
# No more data to read. Acknowledge receipt and finish.
self._send_chunk(
- Chunk(transfer_id=self.id, status=Status.OK.value))
+ Chunk(transfer_id=self.id,
+ status=Status.OK.value,
+ type=Chunk.Type.TRANSFER_COMPLETION))
self.finish(Status.OK)
return
@@ -469,22 +478,22 @@ class ReadTransfer(Transfer):
if self._pending_bytes == 0:
# All pending data was received. Send out a new parameters chunk for
# the next block.
- self._send_chunk(self._transfer_parameters())
+ self._send_chunk(
+ self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT))
elif extend_window:
- self._send_chunk(self._transfer_parameters(extend=True))
+ self._send_chunk(
+ self._transfer_parameters(Chunk.Type.PARAMETERS_CONTINUE))
def _retry_after_timeout(self) -> None:
- self._send_chunk(self._transfer_parameters())
+ self._send_chunk(
+ self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT))
- def _transfer_parameters(self, extend: bool = False) -> Chunk:
+ def _transfer_parameters(self, chunk_type: Any) -> Chunk:
"""Sends an updated transfer parameters chunk to the server."""
self._pending_bytes = self._max_bytes_to_receive
self._window_end_offset = self._offset + self._max_bytes_to_receive
- chunk_type = (Chunk.Type.PARAMETERS_CONTINUE
- if extend else Chunk.Type.PARAMETERS_RETRANSMIT)
-
chunk = Chunk(transfer_id=self.id,
pending_bytes=self._pending_bytes,
window_end_offset=self._window_end_offset,
diff --git a/pw_transfer/py/tests/transfer_test.py b/pw_transfer/py/tests/transfer_test.py
index 718b276bf..e73c09ddf 100644
--- a/pw_transfer/py/tests/transfer_test.py
+++ b/pw_transfer/py/tests/transfer_test.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright 2021 The Pigweed Authors
+# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
@@ -462,9 +462,12 @@ class TransferManagerTest(unittest.TestCase):
self.assertEqual(
self._sent_chunks,
[
- Chunk(transfer_id=22), # initial chunk
- Chunk(transfer_id=22), # retry 1
- Chunk(transfer_id=22), # retry 2
+ Chunk(transfer_id=22,
+ type=Chunk.Type.TRANSFER_START), # initial chunk
+ Chunk(transfer_id=22,
+ type=Chunk.Type.TRANSFER_START), # retry 1
+ Chunk(transfer_id=22,
+ type=Chunk.Type.TRANSFER_START), # retry 2
])
exception = context.exception
@@ -489,13 +492,16 @@ class TransferManagerTest(unittest.TestCase):
last_data_chunk = Chunk(transfer_id=22,
data=b'56789',
offset=5,
- remaining_bytes=0)
+ remaining_bytes=0,
+ type=Chunk.Type.TRANSFER_DATA)
self.assertEqual(
self._sent_chunks,
[
- Chunk(transfer_id=22), # start transfer
- Chunk(transfer_id=22, data=b'01234'),
+ Chunk(transfer_id=22, type=Chunk.Type.TRANSFER_START),
+ Chunk(transfer_id=22,
+ data=b'01234',
+ type=Chunk.Type.TRANSFER_DATA),
last_data_chunk, # last chunk
last_data_chunk, # retry 1
last_data_chunk, # retry 2
diff --git a/pw_transfer/transfer.proto b/pw_transfer/transfer.proto
index 8a9d35500..6f598a071 100644
--- a/pw_transfer/transfer.proto
+++ b/pw_transfer/transfer.proto
@@ -140,17 +140,24 @@ message Chunk {
// Chunk containing transfer data.
TRANSFER_DATA = 0;
- // First chunk of a transfer (only sent by the client). Currently unused.
+ // First chunk of a transfer (only sent by the client).
TRANSFER_START = 1;
- // Transfer parameters indicating that the sender should retransmit from the
- // specified offset.
+ // Transfer parameters indicating that the transmitter should retransmit
+ // from the specified offset.
PARAMETERS_RETRANSMIT = 2;
- // Transfer parameters telling the sender to continue sending up to index
- // `offset + pending_bytes` of data. If the sender is already beyond offset,
- // it does not have to rewind.
+ // Transfer parameters telling the transmitter to continue sending up to
+ // index `offset + pending_bytes` of data. If the transmitter is already
+ // beyond `offset`, it does not have to rewind.
PARAMETERS_CONTINUE = 3;
+
+ // Sender of the chunk is terminating the transfer.
+ TRANSFER_COMPLETION = 4;
+
+ // Acknowledge the completion of a transfer. Currently unused.
+ // TODO(konkers): Implement this behavior.
+ TRANSFER_COMPLETION_ACK = 5;
};
// The type of this chunk. This field should only be processed when present.
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index 725f32e50..76cfb0113 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -140,12 +140,11 @@ class ReadTransfer : public ::testing::Test {
TEST_F(ReadTransfer, SingleChunk) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3,
- .window_end_offset = 64,
- .pending_bytes = 64,
- .offset = 0,
- .type = Chunk::Type::kParametersRetransmit}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .window_end_offset = 64,
+ .pending_bytes = 64,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
});
@@ -178,8 +177,10 @@ TEST_F(ReadTransfer, SingleChunk) {
TEST_F(ReadTransfer, PendingBytes_SingleChunk) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 64,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
});
@@ -211,12 +212,11 @@ TEST_F(ReadTransfer, PendingBytes_SingleChunk) {
}
TEST_F(ReadTransfer, MultiChunk) {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3,
- .window_end_offset = 16,
- .pending_bytes = 16,
- .offset = 0,
- .type = Chunk::Type::kParametersRetransmit}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .window_end_offset = 16,
+ .pending_bytes = 16,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
@@ -271,12 +271,11 @@ TEST_F(ReadTransfer, MultiChunk) {
}
TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3,
- .window_end_offset = 16,
- .pending_bytes = 16,
- .offset = 0,
- .type = Chunk::Type::kParametersRetransmit}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .window_end_offset = 16,
+ .pending_bytes = 16,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
@@ -403,7 +402,8 @@ TEST_F(ReadTransfer, MaxChunkSize_Client) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
.pending_bytes = 64,
.max_chunk_size_bytes = 8,
- .offset = 0}));
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
});
EXPECT_TRUE(handler_.prepare_read_called);
@@ -449,12 +449,11 @@ TEST_F(ReadTransfer, MaxChunkSize_Client) {
}
TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3,
- .window_end_offset = 64,
- .pending_bytes = 64,
- .offset = 0,
- .type = Chunk::Type::kParametersRetransmit}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .window_end_offset = 64,
+ .pending_bytes = 64,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
transfer_thread_.WaitUntilEventIsProcessed();
@@ -468,12 +467,11 @@ TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
handler_.prepare_read_called = false;
handler_.finalize_read_called = false;
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3,
- .window_end_offset = 64,
- .pending_bytes = 64,
- .offset = 0,
- .type = Chunk::Type::kParametersRetransmit}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .window_end_offset = 64,
+ .pending_bytes = 64,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
// Prepare failed, so the handler should not have been stored in the context,
@@ -493,7 +491,8 @@ TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
.pending_bytes = 64,
.max_chunk_size_bytes = 16,
- .offset = 0}));
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
});
EXPECT_TRUE(handler_.prepare_read_called);
@@ -539,8 +538,10 @@ TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
}
TEST_F(ReadTransfer, ClientError) {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 16,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
@@ -575,8 +576,10 @@ TEST_F(ReadTransfer, MalformedParametersChunk) {
}
TEST_F(ReadTransfer, UnregisteredHandler) {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 11, .pending_bytes = 32, .offset = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 11,
+ .pending_bytes = 32,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -599,8 +602,10 @@ TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
}
TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 16,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -610,7 +615,10 @@ TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
handler_.prepare_read_called = false; // Reset so can check if called again.
ctx_.SendClientStream( // Resend starting chunk
- EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
+ EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 16,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
@@ -631,7 +639,9 @@ TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
}
TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
- ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -646,7 +656,9 @@ TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
// Make the next read appear to be the end of the stream.
handler_.set_read_status(Status::OutOfRange());
- ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 0,
+ .type = Chunk::Type::kTransferStart}));
transfer_thread_.WaitUntilEventIsProcessed();
Chunk chunk = DecodeChunk(ctx_.responses().back());
@@ -663,8 +675,10 @@ TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
- ctx_.SendClientStream(
- EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 0}));
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
+ .pending_bytes = 64,
+ .offset = 0,
+ .type = Chunk::Type::kTransferStart}));
});
EXPECT_TRUE(handler_.prepare_read_called);
diff --git a/pw_transfer/ts/transfer.ts b/pw_transfer/ts/transfer.ts
index e8caaf21c..708f5cfcb 100644
--- a/pw_transfer/ts/transfer.ts
+++ b/pw_transfer/ts/transfer.ts
@@ -1,4 +1,4 @@
-// Copyright 2021 The Pigweed Authors
+// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -138,6 +138,7 @@ export abstract class Transfer {
const chunk = new Chunk();
chunk.setStatus(error);
chunk.setTransferId(this.id);
+ chunk.setType(Chunk.Type.TRANSFER_COMPLETION);
this.sendChunk(chunk);
this.finish(error);
}
@@ -252,25 +253,21 @@ export class ReadTransfer extends Transfer {
}
protected get initialChunk(): Chunk {
- return this.transferParameters();
+ return this.transferParameters(Chunk.Type.TRANSFER_START);
}
/** Builds an updated transfer parameters chunk to send the server. */
- private transferParameters(extend = false): Chunk {
+ private transferParameters(type: Chunk.TypeMap[keyof Chunk.TypeMap]): Chunk {
this.pendingBytes = this.maxBytesToReceive;
this.windowEndOffset = this.offset + this.maxBytesToReceive;
- const chunkType = extend
- ? Chunk.Type.PARAMETERS_CONTINUE
- : Chunk.Type.PARAMETERS_RETRANSMIT;
-
const chunk = new Chunk();
chunk.setTransferId(this.id);
chunk.setPendingBytes(this.pendingBytes);
chunk.setMaxChunkSizeBytes(this.maxChunkSize);
chunk.setOffset(this.offset);
chunk.setWindowEndOffset(this.windowEndOffset);
- chunk.setType(chunkType);
+ chunk.setType(type);
if (this.chunkDelayMicroS !== 0) {
chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!);
@@ -289,7 +286,7 @@ export class ReadTransfer extends Transfer {
// Initially, the transfer service only supports in-order transfers.
// If data is received out of order, request that the server
// retransmit from the previous offset.
- this.sendChunk(this.transferParameters());
+ this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
return;
}
@@ -308,6 +305,7 @@ export class ReadTransfer extends Transfer {
const endChunk = new Chunk();
endChunk.setTransferId(this.id);
endChunk.setStatus(Status.OK);
+ endChunk.setType(Chunk.Type.TRANSFER_COMPLETION);
this.sendChunk(endChunk);
this.finish(Status.OK);
return;
@@ -366,14 +364,14 @@ export class ReadTransfer extends Transfer {
if (this.pendingBytes === 0) {
// All pending data was received. Send out a new parameters chunk
// for the next block.
- this.sendChunk(this.transferParameters());
+ this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
} else if (extendWindow) {
- this.sendChunk(this.transferParameters(/*extend=*/ true));
+ this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE));
}
}
protected retryAfterTimeout(): void {
- this.sendChunk(this.transferParameters());
+ this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
}
}
@@ -406,6 +404,7 @@ export class WriteTransfer extends Transfer {
protected get initialChunk(): Chunk {
const chunk = new Chunk();
chunk.setTransferId(this.id);
+ chunk.setType(Chunk.Type.TRANSFER_START);
return chunk;
}
@@ -516,6 +515,8 @@ export class WriteTransfer extends Transfer {
const chunk = new Chunk();
chunk.setTransferId(this.id);
chunk.setOffset(this.offset);
+ chunk.setType(Chunk.Type.TRANSFER_DATA);
+
const maxBytesInChunk = Math.min(
this.maxChunkSize,
this.windowEndOffset - this.offset
diff --git a/pw_transfer/ts/transfer_test.ts b/pw_transfer/ts/transfer_test.ts
index 2eefdfdc6..1707a82f7 100644
--- a/pw_transfer/ts/transfer_test.ts
+++ b/pw_transfer/ts/transfer_test.ts
@@ -613,14 +613,17 @@ describe('Encoder', () => {
.catch(error => {
const expectedChunk1 = new Chunk();
expectedChunk1.setTransferId(22);
+ expectedChunk1.setType(Chunk.Type.TRANSFER_START);
const expectedChunk2 = new Chunk();
expectedChunk2.setTransferId(22);
expectedChunk2.setData(textEncoder.encode('01234'));
+ expectedChunk2.setType(Chunk.Type.TRANSFER_DATA);
const lastChunk = new Chunk();
lastChunk.setTransferId(22);
lastChunk.setData(textEncoder.encode('56789'));
lastChunk.setOffset(5);
lastChunk.setRemainingBytes(0);
+ lastChunk.setType(Chunk.Type.TRANSFER_DATA);
const expectedChunks = [
expectedChunk1,