aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer/client_test.cc
diff options
context:
space:
mode:
authorXin Li <delphij@google.com>2023-08-14 15:38:30 -0700
committerXin Li <delphij@google.com>2023-08-14 15:38:30 -0700
commitbddf63953e111d742b591c1c0c7c34bcda8a51c7 (patch)
tree3a93128bff4b737b24b0c9581922c0b20410f0f4 /pw_transfer/client_test.cc
parentee890da55c82b95deca3518d5f3777e3d8ca9f0e (diff)
parentfbb9890f8922aa55fde183655a0017e69127ea4b (diff)
downloadpigweed-bddf63953e111d742b591c1c0c7c34bcda8a51c7.tar.gz
Merge Android U (ab/10368041)tmp_amf_298295554
Bug: 291102124 Merged-In: I10c41adb8fe3e126cfa4ff2f49b15863fff379de Change-Id: I66f7a6cccaafc173d3924dae62a736c6c53520c7
Diffstat (limited to 'pw_transfer/client_test.cc')
-rw-r--r--pw_transfer/client_test.cc1855
1 files changed, 1485 insertions, 370 deletions
diff --git a/pw_transfer/client_test.cc b/pw_transfer/client_test.cc
index cbf82d7ba..1b16d9142 100644
--- a/pw_transfer/client_test.cc
+++ b/pw_transfer/client_test.cc
@@ -20,7 +20,7 @@
#include "pw_assert/check.h"
#include "pw_bytes/array.h"
#include "pw_rpc/raw/client_testing.h"
-#include "pw_rpc/thread_testing.h"
+#include "pw_rpc/test_helpers.h"
#include "pw_thread/sleep.h"
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
@@ -34,9 +34,6 @@ using pw_rpc::raw::Transfer;
using namespace std::chrono_literals;
-PW_MODIFY_DIAGNOSTICS_PUSH();
-PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
-
thread::Options& TransferThreadOptions() {
static thread::stl::Options options;
return options;
@@ -52,7 +49,7 @@ class ReadTransfer : public ::testing::Test {
max_bytes_to_receive),
system_thread_(TransferThreadOptions(), transfer_thread_) {}
- ~ReadTransfer() {
+ ~ReadTransfer() override {
transfer_thread_.Terminate();
system_thread_.join();
}
@@ -89,20 +86,25 @@ TEST_F(ReadTransfer, SingleChunk) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 3u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 3u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
- context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
- {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 2u);
- Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 3u);
- ASSERT_TRUE(c1.status.has_value());
- EXPECT_EQ(c1.status.value(), OkStatus());
+ Chunk c1 = DecodeChunk(payloads.back());
+ EXPECT_EQ(c1.session_id(), 3u);
+ ASSERT_TRUE(c1.status().has_value());
+ EXPECT_EQ(c1.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
@@ -127,30 +129,35 @@ TEST_F(ReadTransfer, MultiChunk) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 4u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 4u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
constexpr ConstByteSpan data(kData32);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 4u, .offset = 0, .data = data.first(16)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(4)
+ .set_offset(0)
+ .set_payload(data.first(16))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 1u);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 4u,
- .offset = 16,
- .data = data.subspan(16),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(4)
+ .set_offset(16)
+ .set_payload(data.subspan(16))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 4u);
- ASSERT_TRUE(c1.status.has_value());
- EXPECT_EQ(c1.status.value(), OkStatus());
+ EXPECT_EQ(c1.session_id(), 4u);
+ ASSERT_TRUE(c1.status().has_value());
+ EXPECT_EQ(c1.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
@@ -167,8 +174,12 @@ TEST_F(ReadTransfer, MultipleTransfers) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
- {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(transfer_status, OkStatus());
@@ -180,8 +191,12 @@ TEST_F(ReadTransfer, MultipleTransfers) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
- {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(transfer_status, OkStatus());
@@ -203,9 +218,10 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
ASSERT_EQ(payloads.size(), 1u);
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 5u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 32u);
+ EXPECT_EQ(c0.session_id(), 5u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 32u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
}
TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
@@ -219,9 +235,10 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
ASSERT_EQ(payloads.size(), 1u);
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 5u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 16u);
+ EXPECT_EQ(c0.session_id(), 5u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 16u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
}
TEST_F(ReadTransferMaxBytes32, MultiParameters) {
@@ -241,13 +258,16 @@ TEST_F(ReadTransferMaxBytes32, MultiParameters) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 6u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 32u);
+ EXPECT_EQ(c0.session_id(), 6u);
+ EXPECT_EQ(c0.offset(), 0u);
+ ASSERT_EQ(c0.window_end_offset(), 32u);
constexpr ConstByteSpan data(kData64);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 6u, .offset = 0, .data = data.first(32)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(6)
+ .set_offset(0)
+ .set_payload(data.first(32))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 2u);
@@ -255,23 +275,24 @@ TEST_F(ReadTransferMaxBytes32, MultiParameters) {
// Second parameters chunk.
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 6u);
- EXPECT_EQ(c1.offset, 32u);
- ASSERT_EQ(c1.pending_bytes.value(), 32u);
+ EXPECT_EQ(c1.session_id(), 6u);
+ EXPECT_EQ(c1.offset(), 32u);
+ ASSERT_EQ(c1.window_end_offset(), 64u);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 6u,
- .offset = 32,
- .data = data.subspan(32),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(6)
+ .set_offset(32)
+ .set_payload(data.subspan(32))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 6u);
- ASSERT_TRUE(c2.status.has_value());
- EXPECT_EQ(c2.status.value(), OkStatus());
+ EXPECT_EQ(c2.session_id(), 6u);
+ ASSERT_TRUE(c2.status().has_value());
+ EXPECT_EQ(c2.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
@@ -294,13 +315,16 @@ TEST_F(ReadTransfer, UnexpectedOffset) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 7u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 7u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
constexpr ConstByteSpan data(kData32);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 7u, .offset = 0, .data = data.first(16)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(7)
+ .set_offset(0)
+ .set_payload(data.first(16))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 1u);
@@ -308,34 +332,36 @@ TEST_F(ReadTransfer, UnexpectedOffset) {
// Send a chunk with an incorrect offset. The client should resend parameters.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 7u,
- .offset = 8, // wrong!
- .data = data.subspan(16),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(7)
+ .set_offset(8) // wrong!
+ .set_payload(data.subspan(16))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 2u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 7u);
- EXPECT_EQ(c1.offset, 16u);
- EXPECT_EQ(c1.pending_bytes.value(), 48u);
+ EXPECT_EQ(c1.session_id(), 7u);
+ EXPECT_EQ(c1.offset(), 16u);
+ EXPECT_EQ(c1.window_end_offset(), 64u);
// Send the correct chunk, completing the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 7u,
- .offset = 16,
- .data = data.subspan(16),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(7)
+ .set_offset(16)
+ .set_payload(data.subspan(16))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 7u);
- ASSERT_TRUE(c2.status.has_value());
- EXPECT_EQ(c2.status.value(), OkStatus());
+ EXPECT_EQ(c2.session_id(), 7u);
+ ASSERT_TRUE(c2.status().has_value());
+ EXPECT_EQ(c2.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
@@ -359,31 +385,40 @@ TEST_F(ReadTransferMaxBytes32, TooMuchData) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 8u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 32u);
+ EXPECT_EQ(c0.session_id(), 8u);
+ EXPECT_EQ(c0.offset(), 0u);
+ ASSERT_EQ(c0.window_end_offset(), 32u);
constexpr ConstByteSpan data(kData64);
// pending_bytes == 32
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 8u, .offset = 0, .data = data.first(16)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(8)
+ .set_offset(0)
+ .set_payload(data.first(16))));
// pending_bytes == 16
- context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
- {.transfer_id = 8u, .offset = 16, .data = data.subspan(16, 8)}));
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(8)
+ .set_offset(16)
+ .set_payload(data.subspan(16, 8))));
// pending_bytes == 8, send 16 instead.
- context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
- {.transfer_id = 8u, .offset = 24, .data = data.subspan(24, 16)}));
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(8)
+ .set_offset(24)
+ .set_payload(data.subspan(24, 16))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 4u);
Chunk c1 = DecodeChunk(payloads[3]);
- EXPECT_EQ(c1.transfer_id, 8u);
- ASSERT_TRUE(c1.status.has_value());
- EXPECT_EQ(c1.status.value(), Status::Internal());
+ EXPECT_EQ(c1.session_id(), 8u);
+ ASSERT_TRUE(c1.status().has_value());
+ EXPECT_EQ(c1.status().value(), Status::Internal());
EXPECT_EQ(transfer_status, Status::Internal());
}
@@ -405,14 +440,14 @@ TEST_F(ReadTransfer, ServerError) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 9u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 9u);
+ EXPECT_EQ(c0.offset(), 0u);
+ ASSERT_EQ(c0.window_end_offset(), 64u);
// Server sends an error. Client should not respond and terminate the
// transfer.
- context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 9u, .status = Status::NotFound()}));
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
+ Chunk::Final(ProtocolVersion::kLegacy, 9, Status::NotFound())));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 1u);
@@ -436,22 +471,26 @@ TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 10u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 10u);
+ EXPECT_EQ(c0.offset(), 0u);
+ ASSERT_EQ(c0.window_end_offset(), 64u);
- constexpr ConstByteSpan data(kData64);
+ constexpr ConstByteSpan data(kData32);
// Send the first 8 bytes of the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 10u, .offset = 0, .data = data.first(8)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(10)
+ .set_offset(0)
+ .set_payload(data.first(8))));
// Skip offset 8, send the rest starting from 16.
for (uint32_t offset = 16; offset < data.size(); offset += 8) {
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 10u,
- .offset = offset,
- .data = data.subspan(offset, 8)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(10)
+ .set_offset(offset)
+ .set_payload(data.subspan(offset, 8))));
}
transfer_thread_.WaitUntilEventIsProcessed();
@@ -460,24 +499,25 @@ TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 10u);
- EXPECT_EQ(c1.offset, 8u);
- ASSERT_EQ(c1.pending_bytes.value(), 56u);
+ EXPECT_EQ(c1.session_id(), 10u);
+ EXPECT_EQ(c1.offset(), 8u);
+ ASSERT_EQ(c1.window_end_offset(), 64u);
// Send the remaining data to complete the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 10u,
- .offset = 8,
- .data = data.subspan(8, 56),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(10)
+ .set_offset(8)
+ .set_payload(data.subspan(8))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 10u);
- ASSERT_TRUE(c2.status.has_value());
- EXPECT_EQ(c2.status.value(), OkStatus());
+ EXPECT_EQ(c2.session_id(), 10u);
+ ASSERT_TRUE(c2.status().has_value());
+ EXPECT_EQ(c2.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
@@ -499,22 +539,26 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 11u);
- EXPECT_EQ(c0.offset, 0u);
- ASSERT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 11u);
+ EXPECT_EQ(c0.offset(), 0u);
+ ASSERT_EQ(c0.window_end_offset(), 64u);
- constexpr ConstByteSpan data(kData64);
+ constexpr ConstByteSpan data(kData32);
// Send the first 8 bytes of the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 11u, .offset = 0, .data = data.first(8)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(11)
+ .set_offset(0)
+ .set_payload(data.first(8))));
// Skip offset 8, send the rest starting from 16.
for (uint32_t offset = 16; offset < data.size(); offset += 8) {
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 11u,
- .offset = offset,
- .data = data.subspan(offset, 8)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(11)
+ .set_offset(offset)
+ .set_payload(data.subspan(offset, 8))));
}
transfer_thread_.WaitUntilEventIsProcessed();
@@ -522,8 +566,10 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
// dropped packet.
ASSERT_EQ(payloads.size(), 2u);
- const Chunk last_chunk = {
- .transfer_id = 11u, .offset = 56, .data = data.subspan(56)};
+ const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(11)
+ .set_offset(24)
+ .set_payload(data.subspan(24));
// Re-send the final chunk of the block.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
@@ -532,9 +578,9 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
// The original drop parameters should be re-sent.
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 11u);
- EXPECT_EQ(c2.offset, 8u);
- ASSERT_EQ(c2.pending_bytes.value(), 56u);
+ EXPECT_EQ(c2.session_id(), 11u);
+ EXPECT_EQ(c2.offset(), 8u);
+ ASSERT_EQ(c2.window_end_offset(), 64u);
// Do it again.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
@@ -542,24 +588,25 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
- EXPECT_EQ(c3.transfer_id, 11u);
- EXPECT_EQ(c3.offset, 8u);
- ASSERT_EQ(c3.pending_bytes.value(), 56u);
+ EXPECT_EQ(c3.session_id(), 11u);
+ EXPECT_EQ(c3.offset(), 8u);
+ ASSERT_EQ(c3.window_end_offset(), 64u);
// Finish the transfer normally.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 11u,
- .offset = 8,
- .data = data.subspan(8, 56),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(11)
+ .set_offset(8)
+ .set_payload(data.subspan(8))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 5u);
Chunk c4 = DecodeChunk(payloads[4]);
- EXPECT_EQ(c4.transfer_id, 11u);
- ASSERT_TRUE(c4.status.has_value());
- EXPECT_EQ(c4.status.value(), OkStatus());
+ EXPECT_EQ(c4.session_id(), 11u);
+ ASSERT_TRUE(c4.status().has_value());
+ EXPECT_EQ(c4.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
@@ -587,37 +634,40 @@ TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 12u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 12u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Wait for the timeout to expire without doing anything. The client should
- // resend its parameters chunk.
+ // resend its initial parameters chunk.
transfer_thread_.SimulateClientTimeout(12);
ASSERT_EQ(payloads.size(), 2u);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_EQ(c.transfer_id, 12u);
- EXPECT_EQ(c.offset, 0u);
- EXPECT_EQ(c.pending_bytes.value(), 64u);
+ EXPECT_EQ(c.session_id(), 12u);
+ EXPECT_EQ(c.offset(), 0u);
+ EXPECT_EQ(c.window_end_offset(), 64u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
// Finish the transfer following the timeout.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 12u,
- .offset = 0,
- .data = kData32,
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(12)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 3u);
Chunk c4 = DecodeChunk(payloads.back());
- EXPECT_EQ(c4.transfer_id, 12u);
- ASSERT_TRUE(c4.status.has_value());
- EXPECT_EQ(c4.status.value(), OkStatus());
+ EXPECT_EQ(c4.session_id(), 12u);
+ ASSERT_TRUE(c4.status().has_value());
+ EXPECT_EQ(c4.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
@@ -641,15 +691,19 @@ TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 13u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 13u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
constexpr ConstByteSpan data(kData32);
// Send some data, but not everything.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 13u, .offset = 0, .data = data.first(16)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(13)
+ .set_offset(0)
+ .set_payload(data.first(16))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 1u);
@@ -660,27 +714,29 @@ TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
ASSERT_EQ(payloads.size(), 2u);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_EQ(c.transfer_id, 13u);
- EXPECT_EQ(c.offset, 16u);
- EXPECT_EQ(c.pending_bytes.value(), 48u);
+ EXPECT_EQ(c.session_id(), 13u);
+ EXPECT_EQ(c.offset(), 16u);
+ EXPECT_EQ(c.window_end_offset(), 64u);
+ EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the rest of the data, finishing the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 13u,
- .offset = 16,
- .data = data.subspan(16),
- .remaining_bytes = 0}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(13)
+ .set_offset(16)
+ .set_payload(data.subspan(16))
+ .set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 3u);
Chunk c4 = DecodeChunk(payloads.back());
- EXPECT_EQ(c4.transfer_id, 13u);
- ASSERT_TRUE(c4.status.has_value());
- EXPECT_EQ(c4.status.value(), OkStatus());
+ EXPECT_EQ(c4.session_id(), 13u);
+ ASSERT_TRUE(c4.status().has_value());
+ EXPECT_EQ(c4.status().value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
@@ -704,9 +760,10 @@ TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 14u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.pending_bytes.value(), 64u);
+ EXPECT_EQ(c0.session_id(), 14u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
// Wait for the timeout to expire without doing anything. The client should
@@ -715,30 +772,26 @@ TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
ASSERT_EQ(payloads.size(), retry + 1);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_EQ(c.transfer_id, 14u);
- EXPECT_EQ(c.offset, 0u);
- EXPECT_EQ(c.pending_bytes.value(), 64u);
+ EXPECT_EQ(c.session_id(), 14u);
+ EXPECT_EQ(c.offset(), 0u);
+ EXPECT_EQ(c.window_end_offset(), 64u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
}
// Sleep one more time after the final retry. The client should cancel the
- // transfer at this point and send a DEADLINE_EXCEEDED chunk.
+ // transfer at this point. As no packets were received from the server, no
+ // final status chunk should be sent.
transfer_thread_.SimulateClientTimeout(14);
- ASSERT_EQ(payloads.size(), 5u);
-
- Chunk c4 = DecodeChunk(payloads.back());
- EXPECT_EQ(c4.transfer_id, 14u);
- ASSERT_TRUE(c4.status.has_value());
- EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
+ ASSERT_EQ(payloads.size(), 4u);
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
// After finishing the transfer, nothing else should be sent. Verify this by
// waiting for a bit.
this_thread::sleep_for(kTestTimeout * 4);
- ASSERT_EQ(payloads.size(), 5u);
+ ASSERT_EQ(payloads.size(), 4u);
}
TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
@@ -762,9 +815,9 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 14u);
- EXPECT_EQ(c0.offset, 0u);
- EXPECT_EQ(c0.window_end_offset, 64u);
+ EXPECT_EQ(c0.session_id(), 14u);
+ EXPECT_EQ(c0.offset(), 0u);
+ EXPECT_EQ(c0.window_end_offset(), 64u);
// Simulate one less timeout than the maximum amount of retries.
for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
@@ -772,9 +825,9 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
ASSERT_EQ(payloads.size(), retry + 1);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_EQ(c.transfer_id, 14u);
- EXPECT_EQ(c.offset, 0u);
- EXPECT_EQ(c.window_end_offset, 64u);
+ EXPECT_EQ(c.session_id(), 14u);
+ EXPECT_EQ(c.offset(), 0u);
+ EXPECT_EQ(c.window_end_offset(), 64u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
@@ -782,7 +835,10 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
// Send some data.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk({.transfer_id = 14u, .offset = 0, .data = data.first(16)}));
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(14)
+ .set_offset(0)
+ .set_payload(data.first(16))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 3u);
@@ -793,19 +849,23 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
ASSERT_EQ(payloads.size(), 4u);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_FALSE(c.status.has_value());
- EXPECT_EQ(c.transfer_id, 14u);
- EXPECT_EQ(c.offset, 16u);
- EXPECT_EQ(c.window_end_offset, 64u);
+ EXPECT_FALSE(c.status().has_value());
+ EXPECT_EQ(c.session_id(), 14u);
+ EXPECT_EQ(c.offset(), 16u);
+ EXPECT_EQ(c.window_end_offset(), 64u);
transfer_thread_.SimulateClientTimeout(14);
ASSERT_EQ(payloads.size(), 5u);
c = DecodeChunk(payloads.back());
- EXPECT_FALSE(c.status.has_value());
- EXPECT_EQ(c.transfer_id, 14u);
- EXPECT_EQ(c.offset, 16u);
- EXPECT_EQ(c.window_end_offset, 64u);
+ EXPECT_FALSE(c.status().has_value());
+ EXPECT_EQ(c.session_id(), 14u);
+ EXPECT_EQ(c.offset(), 16u);
+ EXPECT_EQ(c.window_end_offset(), 64u);
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ client_.CancelTransfer(14);
+ transfer_thread_.WaitUntilEventIsProcessed();
}
TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
@@ -836,7 +896,7 @@ class WriteTransfer : public ::testing::Test {
client_(context_.client(), context_.channel().id(), transfer_thread_),
system_thread_(TransferThreadOptions(), transfer_thread_) {}
- ~WriteTransfer() {
+ ~WriteTransfer() override {
transfer_thread_.Terminate();
system_thread_.join();
}
@@ -862,42 +922,47 @@ TEST_F(WriteTransfer, SingleChunk) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 3u);
+ EXPECT_EQ(c0.session_id(), 3u);
+ EXPECT_EQ(c0.resource_id(), 3u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters. Client should send a data chunk and the final
// chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 3,
- .pending_bytes = 64,
- .max_chunk_size_bytes = 32,
- .offset = 0}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
});
ASSERT_EQ(payloads.size(), 3u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 3u);
- EXPECT_EQ(c1.offset, 0u);
- EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
+ EXPECT_EQ(c1.session_id(), 3u);
+ EXPECT_EQ(c1.offset(), 0u);
+ EXPECT_TRUE(c1.has_payload());
+ EXPECT_EQ(
+ std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 3u);
- ASSERT_TRUE(c2.remaining_bytes.has_value());
- EXPECT_EQ(c2.remaining_bytes.value(), 0u);
+ EXPECT_EQ(c2.session_id(), 3u);
+ ASSERT_TRUE(c2.remaining_bytes().has_value());
+ EXPECT_EQ(c2.remaining_bytes().value(), 0u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the final status chunk to complete the transfer.
context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
+ EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(payloads.size(), 3u);
@@ -914,50 +979,57 @@ TEST_F(WriteTransfer, MultiChunk) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 4u);
+ EXPECT_EQ(c0.session_id(), 4u);
+ EXPECT_EQ(c0.resource_id(), 4u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters with a chunk size smaller than the data.
// Client should send two data chunks and the final chunk.
rpc::test::WaitForPackets(context_.output(), 3, [this] {
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 4,
- .pending_bytes = 64,
- .max_chunk_size_bytes = 16,
- .offset = 0}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(4)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(16)));
});
ASSERT_EQ(payloads.size(), 4u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 4u);
- EXPECT_EQ(c1.offset, 0u);
- EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
+ EXPECT_EQ(c1.session_id(), 4u);
+ EXPECT_EQ(c1.offset(), 0u);
+ EXPECT_TRUE(c1.has_payload());
+ EXPECT_EQ(
+ std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 4u);
- EXPECT_EQ(c2.offset, 16u);
- EXPECT_EQ(
- std::memcmp(c2.data.data(), kData32.data() + c2.offset, c2.data.size()),
- 0);
+ EXPECT_EQ(c2.session_id(), 4u);
+ EXPECT_EQ(c2.offset(), 16u);
+ EXPECT_TRUE(c2.has_payload());
+ EXPECT_EQ(std::memcmp(c2.payload().data(),
+ kData32.data() + c2.offset(),
+ c2.payload().size()),
+ 0);
Chunk c3 = DecodeChunk(payloads[3]);
- EXPECT_EQ(c3.transfer_id, 4u);
- ASSERT_TRUE(c3.remaining_bytes.has_value());
- EXPECT_EQ(c3.remaining_bytes.value(), 0u);
+ EXPECT_EQ(c3.session_id(), 4u);
+ ASSERT_TRUE(c3.remaining_bytes().has_value());
+ EXPECT_EQ(c3.remaining_bytes().value(), 0u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the final status chunk to complete the transfer.
context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 4, .status = OkStatus()}));
+ EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 4, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(payloads.size(), 4u);
@@ -974,44 +1046,49 @@ TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 5u);
+ EXPECT_EQ(c0.session_id(), 5u);
+ EXPECT_EQ(c0.resource_id(), 5u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters with a nonzero offset, requesting a seek.
// Client should send a data chunk and the final chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 5,
- .pending_bytes = 64,
- .max_chunk_size_bytes = 32,
- .offset = 16}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(5)
+ .set_offset(16)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
});
ASSERT_EQ(payloads.size(), 3u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 5u);
- EXPECT_EQ(c1.offset, 16u);
- EXPECT_EQ(
- std::memcmp(c1.data.data(), kData32.data() + c1.offset, c1.data.size()),
- 0);
+ EXPECT_EQ(c1.session_id(), 5u);
+ EXPECT_EQ(c1.offset(), 16u);
+ EXPECT_TRUE(c1.has_payload());
+ EXPECT_EQ(std::memcmp(c1.payload().data(),
+ kData32.data() + c1.offset(),
+ c1.payload().size()),
+ 0);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 5u);
- ASSERT_TRUE(c2.remaining_bytes.has_value());
- EXPECT_EQ(c2.remaining_bytes.value(), 0u);
+ EXPECT_EQ(c2.session_id(), 5u);
+ ASSERT_TRUE(c2.remaining_bytes().has_value());
+ EXPECT_EQ(c2.remaining_bytes().value(), 0u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the final status chunk to complete the transfer.
context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 5, .status = OkStatus()}));
+ EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 5, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(payloads.size(), 3u);
@@ -1049,30 +1126,34 @@ TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 6u);
+ EXPECT_EQ(c0.session_id(), 6u);
+ EXPECT_EQ(c0.resource_id(), 6u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters with a nonzero offset, requesting a seek.
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 6,
- .pending_bytes = 64,
- .max_chunk_size_bytes = 32,
- .offset = 16}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(6)
+ .set_offset(16)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
transfer_thread_.WaitUntilEventIsProcessed();
// Client should send a status chunk and end the transfer.
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 6u);
- ASSERT_TRUE(c1.status.has_value());
- EXPECT_EQ(c1.status.value(), Status::Unimplemented());
+ EXPECT_EQ(c1.session_id(), 6u);
+ EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
+ ASSERT_TRUE(c1.status().has_value());
+ EXPECT_EQ(c1.status().value(), Status::Unimplemented());
EXPECT_EQ(transfer_status, Status::Unimplemented());
}
@@ -1087,18 +1168,20 @@ TEST_F(WriteTransfer, ServerError) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 7u);
+ EXPECT_EQ(c0.session_id(), 7u);
+ EXPECT_EQ(c0.resource_id(), 7u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send an error from the server.
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 7, .status = Status::NotFound()}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk::Final(ProtocolVersion::kLegacy, 7, Status::NotFound())));
transfer_thread_.WaitUntilEventIsProcessed();
// Client should not respond and terminate the transfer.
@@ -1106,41 +1189,6 @@ TEST_F(WriteTransfer, ServerError) {
EXPECT_EQ(transfer_status, Status::NotFound());
}
-TEST_F(WriteTransfer, MalformedParametersChunk) {
- stream::MemoryReader reader(kData32);
- Status transfer_status = Status::Unknown();
-
- ASSERT_EQ(OkStatus(),
- client_.Write(8, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
- transfer_thread_.WaitUntilEventIsProcessed();
-
- // The client begins by just sending the transfer ID.
- rpc::PayloadsView payloads =
- context_.output().payloads<Transfer::Write>(context_.channel().id());
- ASSERT_EQ(payloads.size(), 1u);
- EXPECT_EQ(transfer_status, Status::Unknown());
-
- Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 8u);
-
- // Send an invalid transfer parameters chunk without pending_bytes.
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 8, .max_chunk_size_bytes = 32}));
- transfer_thread_.WaitUntilEventIsProcessed();
-
- // Client should send a status chunk and end the transfer.
- ASSERT_EQ(payloads.size(), 2u);
-
- Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 8u);
- ASSERT_TRUE(c1.status.has_value());
- EXPECT_EQ(c1.status.value(), Status::InvalidArgument());
-
- EXPECT_EQ(transfer_status, Status::InvalidArgument());
-}
-
TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
@@ -1151,27 +1199,33 @@ TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
}));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
- EXPECT_EQ(c0.transfer_id, 9u);
+ EXPECT_EQ(c0.session_id(), 9u);
+ EXPECT_EQ(c0.resource_id(), 9u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
- // Send an invalid transfer parameters chunk with 0 pending_bytes.
+ // Send an invalid transfer parameters chunk with 0 pending bytes.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
- {.transfer_id = 9, .pending_bytes = 0, .max_chunk_size_bytes = 32}));
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(9)
+ .set_offset(0)
+ .set_window_end_offset(0)
+ .set_max_chunk_size_bytes(32)));
transfer_thread_.WaitUntilEventIsProcessed();
// Client should send a status chunk and end the transfer.
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 9u);
- ASSERT_TRUE(c1.status.has_value());
- EXPECT_EQ(c1.status.value(), Status::ResourceExhausted());
+ EXPECT_EQ(c1.session_id(), 9u);
+ ASSERT_TRUE(c1.status().has_value());
+ EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
EXPECT_EQ(transfer_status, Status::ResourceExhausted());
}
@@ -1188,14 +1242,16 @@ TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
kTestTimeout));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 10u);
+ EXPECT_EQ(c0.session_id(), 10u);
+ EXPECT_EQ(c0.resource_id(), 10u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Wait for the timeout to expire without doing anything. The client should
// resend the initial transmit chunk.
@@ -1203,10 +1259,16 @@ TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
ASSERT_EQ(payloads.size(), 2u);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_EQ(c.transfer_id, 10u);
+ EXPECT_EQ(c.session_id(), 10u);
+ EXPECT_EQ(c.resource_id(), 10u);
+ EXPECT_EQ(c.type(), Chunk::Type::kStart);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ client_.CancelTransfer(10);
+ transfer_thread_.WaitUntilEventIsProcessed();
}
TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
@@ -1221,40 +1283,45 @@ TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
kTestTimeout));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 11u);
+ EXPECT_EQ(c0.session_id(), 11u);
+ EXPECT_EQ(c0.resource_id(), 11u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send the first parameters chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 11,
- .pending_bytes = 16,
- .max_chunk_size_bytes = 8,
- .offset = 0}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(11)
+ .set_offset(0)
+ .set_window_end_offset(16)
+ .set_max_chunk_size_bytes(8)));
});
ASSERT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 11u);
- EXPECT_EQ(c1.offset, 0u);
- EXPECT_EQ(c1.data.size(), 8u);
- EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
+ EXPECT_EQ(c1.session_id(), 11u);
+ EXPECT_EQ(c1.offset(), 0u);
+ EXPECT_EQ(c1.payload().size(), 8u);
+ EXPECT_EQ(
+ std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 11u);
- EXPECT_EQ(c2.offset, 8u);
- EXPECT_EQ(c2.data.size(), 8u);
- EXPECT_EQ(
- std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
- 0);
+ EXPECT_EQ(c2.session_id(), 11u);
+ EXPECT_EQ(c2.offset(), 8u);
+ EXPECT_EQ(c2.payload().size(), 8u);
+ EXPECT_EQ(std::memcmp(c2.payload().data(),
+ kData32.data() + c2.offset(),
+ c2.payload().size()),
+ 0);
// Wait for the timeout to expire without doing anything. The client should
// resend the most recently sent chunk.
@@ -1262,13 +1329,19 @@ TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
- EXPECT_EQ(c3.transfer_id, c2.transfer_id);
- EXPECT_EQ(c3.offset, c2.offset);
- EXPECT_EQ(c3.data.size(), c2.data.size());
- EXPECT_EQ(std::memcmp(c3.data.data(), c2.data.data(), c3.data.size()), 0);
+ EXPECT_EQ(c3.session_id(), c2.session_id());
+ EXPECT_EQ(c3.offset(), c2.offset());
+ EXPECT_EQ(c3.payload().size(), c2.payload().size());
+ EXPECT_EQ(std::memcmp(
+ c3.payload().data(), c2.payload().data(), c3.payload().size()),
+ 0);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ client_.CancelTransfer(11);
+ transfer_thread_.WaitUntilEventIsProcessed();
}
TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
@@ -1283,38 +1356,42 @@ TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
kTestTimeout));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 12u);
+ EXPECT_EQ(c0.session_id(), 12u);
+ EXPECT_EQ(c0.resource_id(), 12u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send the first parameters chunk, requesting all the data. The client should
// respond with one data chunk and a remaining_bytes = 0 chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 12,
- .pending_bytes = 64,
- .max_chunk_size_bytes = 64,
- .offset = 0}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(12)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(64)));
});
ASSERT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 12u);
- EXPECT_EQ(c1.offset, 0u);
- EXPECT_EQ(c1.data.size(), 32u);
- EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
+ EXPECT_EQ(c1.session_id(), 12u);
+ EXPECT_EQ(c1.offset(), 0u);
+ EXPECT_EQ(c1.payload().size(), 32u);
+ EXPECT_EQ(
+ std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 12u);
- ASSERT_TRUE(c2.remaining_bytes.has_value());
- EXPECT_EQ(c2.remaining_bytes.value(), 0u);
+ EXPECT_EQ(c2.session_id(), 12u);
+ ASSERT_TRUE(c2.remaining_bytes().has_value());
+ EXPECT_EQ(c2.remaining_bytes().value(), 0u);
// Wait for the timeout to expire without doing anything. The client should
// resend the data chunk.
@@ -1322,28 +1399,30 @@ TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
- EXPECT_EQ(c3.transfer_id, c1.transfer_id);
- EXPECT_EQ(c3.offset, c1.offset);
- EXPECT_EQ(c3.data.size(), c1.data.size());
- EXPECT_EQ(std::memcmp(c3.data.data(), c1.data.data(), c3.data.size()), 0);
+ EXPECT_EQ(c3.session_id(), c1.session_id());
+ EXPECT_EQ(c3.offset(), c1.offset());
+ EXPECT_EQ(c3.payload().size(), c1.payload().size());
+ EXPECT_EQ(std::memcmp(
+ c3.payload().data(), c1.payload().data(), c3.payload().size()),
+ 0);
// The remaining_bytes = 0 chunk should be resent on the next parameters.
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 12,
- .pending_bytes = 64,
- .max_chunk_size_bytes = 64,
- .offset = 32}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(12)
+ .set_offset(32)
+ .set_window_end_offset(64)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 5u);
Chunk c4 = DecodeChunk(payloads[4]);
- EXPECT_EQ(c4.transfer_id, 12u);
- ASSERT_TRUE(c4.remaining_bytes.has_value());
- EXPECT_EQ(c4.remaining_bytes.value(), 0u);
+ EXPECT_EQ(c4.session_id(), 12u);
+ ASSERT_TRUE(c4.remaining_bytes().has_value());
+ EXPECT_EQ(c4.remaining_bytes().value(), 0u);
context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 12, .status = OkStatus()}));
+ EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 12, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(transfer_status, OkStatus());
@@ -1361,14 +1440,16 @@ TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
kTestTimeout));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 13u);
+ EXPECT_EQ(c0.session_id(), 13u);
+ EXPECT_EQ(c0.resource_id(), 13u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
// Wait for the timeout to expire without doing anything. The client should
@@ -1377,28 +1458,30 @@ TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
ASSERT_EQ(payloads.size(), retry + 1);
Chunk c = DecodeChunk(payloads.back());
- EXPECT_EQ(c.transfer_id, 13u);
+ EXPECT_EQ(c.session_id(), 13u);
+ EXPECT_EQ(c.resource_id(), 13u);
+ EXPECT_EQ(c.type(), Chunk::Type::kStart);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
}
// Sleep one more time after the final retry. The client should cancel the
- // transfer at this point and send a DEADLINE_EXCEEDED chunk.
+ // transfer at this point. As no packets were received from the server, no
+ // final status chunk should be sent.
transfer_thread_.SimulateClientTimeout(13);
- ASSERT_EQ(payloads.size(), 5u);
-
- Chunk c4 = DecodeChunk(payloads.back());
- EXPECT_EQ(c4.transfer_id, 13u);
- ASSERT_TRUE(c4.status.has_value());
- EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
+ ASSERT_EQ(payloads.size(), 4u);
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
// After finishing the transfer, nothing else should be sent. Verify this by
// waiting for a bit.
this_thread::sleep_for(kTestTimeout * 4);
- ASSERT_EQ(payloads.size(), 5u);
+ ASSERT_EQ(payloads.size(), 4u);
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ client_.CancelTransfer(13);
+ transfer_thread_.WaitUntilEventIsProcessed();
}
TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
@@ -1413,40 +1496,47 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
kTestTimeout));
transfer_thread_.WaitUntilEventIsProcessed();
- // The client begins by just sending the transfer ID.
+ // The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
- EXPECT_EQ(c0.transfer_id, 14u);
+ EXPECT_EQ(c0.session_id(), 14u);
+ EXPECT_EQ(c0.resource_id(), 14u);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send the first parameters chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
- context_.server().SendServerStream<Transfer::Write>(
- EncodeChunk({.transfer_id = 14,
- .pending_bytes = 16,
- .max_chunk_size_bytes = 8,
- .offset = 0}));
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(14)
+ .set_offset(0)
+ .set_window_end_offset(16)
+ .set_max_chunk_size_bytes(8)));
});
ASSERT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
- EXPECT_EQ(c1.transfer_id, 14u);
- EXPECT_EQ(c1.offset, 0u);
- EXPECT_EQ(c1.data.size(), 8u);
- EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
+ EXPECT_EQ(c1.session_id(), 14u);
+ EXPECT_EQ(c1.offset(), 0u);
+ EXPECT_TRUE(c1.has_payload());
+ EXPECT_EQ(c1.payload().size(), 8u);
+ EXPECT_EQ(
+ std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
- EXPECT_EQ(c2.transfer_id, 14u);
- EXPECT_EQ(c2.offset, 8u);
- EXPECT_EQ(c2.data.size(), 8u);
- EXPECT_EQ(
- std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
- 0);
+ EXPECT_EQ(c2.session_id(), 14u);
+ EXPECT_EQ(c2.offset(), 8u);
+ EXPECT_TRUE(c2.has_payload());
+ EXPECT_EQ(c2.payload().size(), 8u);
+ EXPECT_EQ(std::memcmp(c2.payload().data(),
+ kData32.data() + c2.offset(),
+ c2.payload().size()),
+ 0);
// Wait for the timeout to expire without doing anything. The client should
// fail to seek back and end the transfer.
@@ -1454,14 +1544,1039 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
- EXPECT_EQ(c3.transfer_id, 14u);
- ASSERT_TRUE(c3.status.has_value());
- EXPECT_EQ(c3.status.value(), Status::DeadlineExceeded());
+ EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
+ EXPECT_EQ(c3.session_id(), 14u);
+ ASSERT_TRUE(c3.status().has_value());
+ EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ client_.CancelTransfer(14);
+ transfer_thread_.WaitUntilEventIsProcessed();
}
-PW_MODIFY_DIAGNOSTICS_POP();
+TEST_F(WriteTransfer, ManualCancel) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 15,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 15u);
+ EXPECT_EQ(chunk.resource_id(), 15u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+
+ // Get a response from the server, then cancel the transfer.
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(15)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+ ASSERT_EQ(payloads.size(), 2u);
+
+ client_.CancelTransfer(15);
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should send a cancellation chunk to the server.
+ ASSERT_EQ(payloads.size(), 3u);
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 15u);
+ ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.status().value(), Status::Cancelled());
+
+ EXPECT_EQ(transfer_status, Status::Cancelled());
+}
+
+TEST_F(WriteTransfer, ManualCancel_NoContact) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 15,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 15u);
+ EXPECT_EQ(chunk.resource_id(), 15u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+
+ // Cancel transfer without a server response. No final chunk should be sent.
+ client_.CancelTransfer(15);
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 1u);
+
+ EXPECT_EQ(transfer_status, Status::Cancelled());
+}
+
+TEST_F(ReadTransfer, Version2_SingleChunk) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(29)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION,
+ // additionally containing the initial parameters for the read transfer.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Send all the transfer data. Client should accept it and complete the
+ // transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(29)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
+ .set_session_id(29)));
+}
+
+TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Instead of a START_ACK to continue the handshake, the server responds with
+ // an immediate data chunk, indicating that it is running the legacy protocol
+ // version. Client should revert to legacy, using the resource_id of 3 as the
+ // session_id, and complete the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Wait for the timeout to expire without doing anything. The client should
+ // resend the initial chunk.
+ transfer_thread_.SimulateClientTimeout(3);
+ ASSERT_EQ(payloads.size(), 2u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // This time, the server responds, continuing the handshake and transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(31)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(31)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
+ .set_session_id(31)));
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(33)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION,
+ // additionally containing the initial parameters for the read transfer.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Wait for the timeout to expire without doing anything. The client should
+ // resend the confirmation chunk.
+ transfer_thread_.SimulateClientTimeout(33);
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds and the transfer should continue normally.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(33)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
+ .set_session_id(33)));
+}
+
+TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds to the start request with an error.
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
+ ProtocolVersion::kVersionTwo, 3, Status::Unauthenticated())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unauthenticated());
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(29)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION,
+ // additionally containing the initial parameters for the read transfer.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Send all the transfer data. Client should accept it and complete the
+ // transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(29)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+
+ // Time out instead of sending a completion ACK. THe transfer should resend
+ // its completion chunk.
+ transfer_thread_.SimulateClientTimeout(29);
+ ASSERT_EQ(payloads.size(), 4u);
+
+ // Reset transfer_status to check whether the handler is called again.
+ transfer_status = Status::Unknown();
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ // Transfer handler should not be called a second time in response to the
+ // re-sent completion chunk.
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send a completion ACK to end the transfer.
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
+ .set_session_id(29)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // No further chunks should be sent following the ACK.
+ transfer_thread_.SimulateClientTimeout(29);
+ ASSERT_EQ(payloads.size(), 4u);
+}
+
+TEST_F(ReadTransfer,
+ Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(29)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION,
+ // additionally containing the initial parameters for the read transfer.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Send all the transfer data. Client should accept it and complete the
+ // transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(29)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+
+ // Time out instead of sending a completion ACK. THe transfer should resend
+ // its completion chunk at first, then terminate after the maximum number of
+ // retries.
+ transfer_thread_.SimulateClientTimeout(29);
+ ASSERT_EQ(payloads.size(), 4u); // Retry 1.
+
+ transfer_thread_.SimulateClientTimeout(29);
+ ASSERT_EQ(payloads.size(), 5u); // Retry 2.
+
+ transfer_thread_.SimulateClientTimeout(29);
+ ASSERT_EQ(payloads.size(), 6u); // Retry 3.
+
+ transfer_thread_.SimulateClientTimeout(29);
+ ASSERT_EQ(payloads.size(), 6u); // No more retries; transfer has ended.
+}
+
+TEST_F(WriteTransfer, Version2_SingleChunk) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(29)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // The server can then begin the data transfer by sending its transfer
+ // parameters. Client should respond with a data chunk and the final chunk.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(29)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ chunk = DecodeChunk(payloads[2]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send the final status chunk to complete the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should acknowledge the completion of the transfer.
+ EXPECT_EQ(payloads.size(), 5u);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Instead of continuing the handshake with a START_ACK, the server
+ // immediately sends parameters, indicating that it only supports the legacy
+ // protocol. Client should switch over to legacy and continue the transfer.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads[1]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[2]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send the final status chunk to complete the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 3u);
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Time out waiting for a server response. The client should resend the
+ // initial packet.
+ transfer_thread_.SimulateClientTimeout(3);
+ ASSERT_EQ(payloads.size(), 2u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // This time, respond with the correct continuation packet. The transfer
+ // should resume and complete normally.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(31)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(31)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 5u);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should acknowledge the completion of the transfer.
+ EXPECT_EQ(payloads.size(), 6u);
+
+ chunk = DecodeChunk(payloads[5]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(33)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // Time out waiting for a server response. The client should resend the
+ // initial packet.
+ transfer_thread_.SimulateClientTimeout(33);
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // This time, respond with the first transfer parameters chunk. The transfer
+ // should resume and complete normally.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(33)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 5u);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should acknowledge the completion of the transfer.
+ EXPECT_EQ(payloads.size(), 6u);
+
+ chunk = DecodeChunk(payloads[5]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // The server responds to the start request with an error.
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk::Final(ProtocolVersion::kVersionTwo, 3, Status::NotFound())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::NotFound());
+}
} // namespace
} // namespace pw::transfer::test