diff options
author | Xin Li <delphij@google.com> | 2023-08-14 15:38:30 -0700 |
---|---|---|
committer | Xin Li <delphij@google.com> | 2023-08-14 15:38:30 -0700 |
commit | bddf63953e111d742b591c1c0c7c34bcda8a51c7 (patch) | |
tree | 3a93128bff4b737b24b0c9581922c0b20410f0f4 /pw_transfer/client_test.cc | |
parent | ee890da55c82b95deca3518d5f3777e3d8ca9f0e (diff) | |
parent | fbb9890f8922aa55fde183655a0017e69127ea4b (diff) | |
download | pigweed-bddf63953e111d742b591c1c0c7c34bcda8a51c7.tar.gz |
Merge Android U (ab/10368041)tmp_amf_298295554
Bug: 291102124
Merged-In: I10c41adb8fe3e126cfa4ff2f49b15863fff379de
Change-Id: I66f7a6cccaafc173d3924dae62a736c6c53520c7
Diffstat (limited to 'pw_transfer/client_test.cc')
-rw-r--r-- | pw_transfer/client_test.cc | 1855 |
1 files changed, 1485 insertions, 370 deletions
diff --git a/pw_transfer/client_test.cc b/pw_transfer/client_test.cc index cbf82d7ba..1b16d9142 100644 --- a/pw_transfer/client_test.cc +++ b/pw_transfer/client_test.cc @@ -20,7 +20,7 @@ #include "pw_assert/check.h" #include "pw_bytes/array.h" #include "pw_rpc/raw/client_testing.h" -#include "pw_rpc/thread_testing.h" +#include "pw_rpc/test_helpers.h" #include "pw_thread/sleep.h" #include "pw_thread/thread.h" #include "pw_thread_stl/options.h" @@ -34,9 +34,6 @@ using pw_rpc::raw::Transfer; using namespace std::chrono_literals; -PW_MODIFY_DIAGNOSTICS_PUSH(); -PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers"); - thread::Options& TransferThreadOptions() { static thread::stl::Options options; return options; @@ -52,7 +49,7 @@ class ReadTransfer : public ::testing::Test { max_bytes_to_receive), system_thread_(TransferThreadOptions(), transfer_thread_) {} - ~ReadTransfer() { + ~ReadTransfer() override { transfer_thread_.Terminate(); system_thread_.join(); } @@ -89,20 +86,25 @@ TEST_F(ReadTransfer, SingleChunk) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 3u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 3u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); - context_.server().SendServerStream<Transfer::Read>(EncodeChunk( - {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0})); + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(3) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 2u); - Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 3u); - ASSERT_TRUE(c1.status.has_value()); - EXPECT_EQ(c1.status.value(), OkStatus()); + Chunk c1 = DecodeChunk(payloads.back()); + EXPECT_EQ(c1.session_id(), 3u); + ASSERT_TRUE(c1.status().has_value()); + EXPECT_EQ(c1.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), @@ -127,30 +129,35 @@ TEST_F(ReadTransfer, MultiChunk) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 4u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 4u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); constexpr ConstByteSpan data(kData32); context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 4u, .offset = 0, .data = data.first(16)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(4) + .set_offset(0) + .set_payload(data.first(16)))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 1u); context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 4u, - .offset = 16, - .data = data.subspan(16), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(4) + .set_offset(16) + .set_payload(data.subspan(16)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 2u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 4u); - ASSERT_TRUE(c1.status.has_value()); - EXPECT_EQ(c1.status.value(), OkStatus()); + EXPECT_EQ(c1.session_id(), 4u); + ASSERT_TRUE(c1.status().has_value()); + EXPECT_EQ(c1.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), @@ -167,8 +174,12 @@ TEST_F(ReadTransfer, MultipleTransfers) { })); transfer_thread_.WaitUntilEventIsProcessed(); - context_.server().SendServerStream<Transfer::Read>(EncodeChunk( - {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0})); + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(3) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(transfer_status, OkStatus()); @@ -180,8 +191,12 @@ TEST_F(ReadTransfer, MultipleTransfers) { })); transfer_thread_.WaitUntilEventIsProcessed(); - context_.server().SendServerStream<Transfer::Read>(EncodeChunk( - {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0})); + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(3) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); EXPECT_EQ(transfer_status, OkStatus()); @@ -203,9 +218,10 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) { ASSERT_EQ(payloads.size(), 1u); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 5u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 32u); + EXPECT_EQ(c0.session_id(), 5u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 32u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); } TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) { @@ -219,9 +235,10 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) { ASSERT_EQ(payloads.size(), 1u); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 5u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 16u); + EXPECT_EQ(c0.session_id(), 5u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 16u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); } TEST_F(ReadTransferMaxBytes32, MultiParameters) { @@ -241,13 +258,16 @@ TEST_F(ReadTransferMaxBytes32, MultiParameters) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 6u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 32u); + EXPECT_EQ(c0.session_id(), 6u); + EXPECT_EQ(c0.offset(), 0u); + ASSERT_EQ(c0.window_end_offset(), 32u); constexpr ConstByteSpan data(kData64); context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 6u, .offset = 0, .data = data.first(32)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(6) + .set_offset(0) + .set_payload(data.first(32)))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 2u); @@ -255,23 +275,24 @@ TEST_F(ReadTransferMaxBytes32, MultiParameters) { // Second parameters chunk. Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 6u); - EXPECT_EQ(c1.offset, 32u); - ASSERT_EQ(c1.pending_bytes.value(), 32u); + EXPECT_EQ(c1.session_id(), 6u); + EXPECT_EQ(c1.offset(), 32u); + ASSERT_EQ(c1.window_end_offset(), 64u); context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 6u, - .offset = 32, - .data = data.subspan(32), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(6) + .set_offset(32) + .set_payload(data.subspan(32)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 3u); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 6u); - ASSERT_TRUE(c2.status.has_value()); - EXPECT_EQ(c2.status.value(), OkStatus()); + EXPECT_EQ(c2.session_id(), 6u); + ASSERT_TRUE(c2.status().has_value()); + EXPECT_EQ(c2.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0); @@ -294,13 +315,16 @@ TEST_F(ReadTransfer, UnexpectedOffset) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 7u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 7u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); constexpr ConstByteSpan data(kData32); context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 7u, .offset = 0, .data = data.first(16)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(7) + .set_offset(0) + .set_payload(data.first(16)))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 1u); @@ -308,34 +332,36 @@ TEST_F(ReadTransfer, UnexpectedOffset) { // Send a chunk with an incorrect offset. The client should resend parameters. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 7u, - .offset = 8, // wrong! - .data = data.subspan(16), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(7) + .set_offset(8) // wrong! + .set_payload(data.subspan(16)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 2u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 7u); - EXPECT_EQ(c1.offset, 16u); - EXPECT_EQ(c1.pending_bytes.value(), 48u); + EXPECT_EQ(c1.session_id(), 7u); + EXPECT_EQ(c1.offset(), 16u); + EXPECT_EQ(c1.window_end_offset(), 64u); // Send the correct chunk, completing the transfer. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 7u, - .offset = 16, - .data = data.subspan(16), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(7) + .set_offset(16) + .set_payload(data.subspan(16)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 3u); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 7u); - ASSERT_TRUE(c2.status.has_value()); - EXPECT_EQ(c2.status.value(), OkStatus()); + EXPECT_EQ(c2.session_id(), 7u); + ASSERT_TRUE(c2.status().has_value()); + EXPECT_EQ(c2.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), @@ -359,31 +385,40 @@ TEST_F(ReadTransferMaxBytes32, TooMuchData) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 8u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 32u); + EXPECT_EQ(c0.session_id(), 8u); + EXPECT_EQ(c0.offset(), 0u); + ASSERT_EQ(c0.window_end_offset(), 32u); constexpr ConstByteSpan data(kData64); // pending_bytes == 32 context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 8u, .offset = 0, .data = data.first(16)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(8) + .set_offset(0) + .set_payload(data.first(16)))); // pending_bytes == 16 - context_.server().SendServerStream<Transfer::Read>(EncodeChunk( - {.transfer_id = 8u, .offset = 16, .data = data.subspan(16, 8)})); + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(8) + .set_offset(16) + .set_payload(data.subspan(16, 8)))); // pending_bytes == 8, send 16 instead. - context_.server().SendServerStream<Transfer::Read>(EncodeChunk( - {.transfer_id = 8u, .offset = 24, .data = data.subspan(24, 16)})); + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(8) + .set_offset(24) + .set_payload(data.subspan(24, 16)))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 4u); Chunk c1 = DecodeChunk(payloads[3]); - EXPECT_EQ(c1.transfer_id, 8u); - ASSERT_TRUE(c1.status.has_value()); - EXPECT_EQ(c1.status.value(), Status::Internal()); + EXPECT_EQ(c1.session_id(), 8u); + ASSERT_TRUE(c1.status().has_value()); + EXPECT_EQ(c1.status().value(), Status::Internal()); EXPECT_EQ(transfer_status, Status::Internal()); } @@ -405,14 +440,14 @@ TEST_F(ReadTransfer, ServerError) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 9u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 9u); + EXPECT_EQ(c0.offset(), 0u); + ASSERT_EQ(c0.window_end_offset(), 64u); // Server sends an error. Client should not respond and terminate the // transfer. - context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 9u, .status = Status::NotFound()})); + context_.server().SendServerStream<Transfer::Read>(EncodeChunk( + Chunk::Final(ProtocolVersion::kLegacy, 9, Status::NotFound()))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 1u); @@ -436,22 +471,26 @@ TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 10u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 10u); + EXPECT_EQ(c0.offset(), 0u); + ASSERT_EQ(c0.window_end_offset(), 64u); - constexpr ConstByteSpan data(kData64); + constexpr ConstByteSpan data(kData32); // Send the first 8 bytes of the transfer. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 10u, .offset = 0, .data = data.first(8)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(10) + .set_offset(0) + .set_payload(data.first(8)))); // Skip offset 8, send the rest starting from 16. for (uint32_t offset = 16; offset < data.size(); offset += 8) { context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 10u, - .offset = offset, - .data = data.subspan(offset, 8)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(10) + .set_offset(offset) + .set_payload(data.subspan(offset, 8)))); } transfer_thread_.WaitUntilEventIsProcessed(); @@ -460,24 +499,25 @@ TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) { ASSERT_EQ(payloads.size(), 2u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 10u); - EXPECT_EQ(c1.offset, 8u); - ASSERT_EQ(c1.pending_bytes.value(), 56u); + EXPECT_EQ(c1.session_id(), 10u); + EXPECT_EQ(c1.offset(), 8u); + ASSERT_EQ(c1.window_end_offset(), 64u); // Send the remaining data to complete the transfer. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 10u, - .offset = 8, - .data = data.subspan(8, 56), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(10) + .set_offset(8) + .set_payload(data.subspan(8)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 3u); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 10u); - ASSERT_TRUE(c2.status.has_value()); - EXPECT_EQ(c2.status.value(), OkStatus()); + EXPECT_EQ(c2.session_id(), 10u); + ASSERT_TRUE(c2.status().has_value()); + EXPECT_EQ(c2.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); } @@ -499,22 +539,26 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 11u); - EXPECT_EQ(c0.offset, 0u); - ASSERT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 11u); + EXPECT_EQ(c0.offset(), 0u); + ASSERT_EQ(c0.window_end_offset(), 64u); - constexpr ConstByteSpan data(kData64); + constexpr ConstByteSpan data(kData32); // Send the first 8 bytes of the transfer. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 11u, .offset = 0, .data = data.first(8)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(11) + .set_offset(0) + .set_payload(data.first(8)))); // Skip offset 8, send the rest starting from 16. for (uint32_t offset = 16; offset < data.size(); offset += 8) { context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 11u, - .offset = offset, - .data = data.subspan(offset, 8)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(11) + .set_offset(offset) + .set_payload(data.subspan(offset, 8)))); } transfer_thread_.WaitUntilEventIsProcessed(); @@ -522,8 +566,10 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) { // dropped packet. ASSERT_EQ(payloads.size(), 2u); - const Chunk last_chunk = { - .transfer_id = 11u, .offset = 56, .data = data.subspan(56)}; + const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(11) + .set_offset(24) + .set_payload(data.subspan(24)); // Re-send the final chunk of the block. context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk)); @@ -532,9 +578,9 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) { // The original drop parameters should be re-sent. ASSERT_EQ(payloads.size(), 3u); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 11u); - EXPECT_EQ(c2.offset, 8u); - ASSERT_EQ(c2.pending_bytes.value(), 56u); + EXPECT_EQ(c2.session_id(), 11u); + EXPECT_EQ(c2.offset(), 8u); + ASSERT_EQ(c2.window_end_offset(), 64u); // Do it again. context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk)); @@ -542,24 +588,25 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) { ASSERT_EQ(payloads.size(), 4u); Chunk c3 = DecodeChunk(payloads[3]); - EXPECT_EQ(c3.transfer_id, 11u); - EXPECT_EQ(c3.offset, 8u); - ASSERT_EQ(c3.pending_bytes.value(), 56u); + EXPECT_EQ(c3.session_id(), 11u); + EXPECT_EQ(c3.offset(), 8u); + ASSERT_EQ(c3.window_end_offset(), 64u); // Finish the transfer normally. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 11u, - .offset = 8, - .data = data.subspan(8, 56), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(11) + .set_offset(8) + .set_payload(data.subspan(8)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 5u); Chunk c4 = DecodeChunk(payloads[4]); - EXPECT_EQ(c4.transfer_id, 11u); - ASSERT_TRUE(c4.status.has_value()); - EXPECT_EQ(c4.status.value(), OkStatus()); + EXPECT_EQ(c4.session_id(), 11u); + ASSERT_TRUE(c4.status().has_value()); + EXPECT_EQ(c4.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); } @@ -587,37 +634,40 @@ TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 12u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 12u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Wait for the timeout to expire without doing anything. The client should - // resend its parameters chunk. + // resend its initial parameters chunk. transfer_thread_.SimulateClientTimeout(12); ASSERT_EQ(payloads.size(), 2u); Chunk c = DecodeChunk(payloads.back()); - EXPECT_EQ(c.transfer_id, 12u); - EXPECT_EQ(c.offset, 0u); - EXPECT_EQ(c.pending_bytes.value(), 64u); + EXPECT_EQ(c.session_id(), 12u); + EXPECT_EQ(c.offset(), 0u); + EXPECT_EQ(c.window_end_offset(), 64u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); // Finish the transfer following the timeout. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 12u, - .offset = 0, - .data = kData32, - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(12) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 3u); Chunk c4 = DecodeChunk(payloads.back()); - EXPECT_EQ(c4.transfer_id, 12u); - ASSERT_TRUE(c4.status.has_value()); - EXPECT_EQ(c4.status.value(), OkStatus()); + EXPECT_EQ(c4.session_id(), 12u); + ASSERT_TRUE(c4.status().has_value()); + EXPECT_EQ(c4.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); } @@ -641,15 +691,19 @@ TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 13u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 13u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); constexpr ConstByteSpan data(kData32); // Send some data, but not everything. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 13u, .offset = 0, .data = data.first(16)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(13) + .set_offset(0) + .set_payload(data.first(16)))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 1u); @@ -660,27 +714,29 @@ TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) { ASSERT_EQ(payloads.size(), 2u); Chunk c = DecodeChunk(payloads.back()); - EXPECT_EQ(c.transfer_id, 13u); - EXPECT_EQ(c.offset, 16u); - EXPECT_EQ(c.pending_bytes.value(), 48u); + EXPECT_EQ(c.session_id(), 13u); + EXPECT_EQ(c.offset(), 16u); + EXPECT_EQ(c.window_end_offset(), 64u); + EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); // Send the rest of the data, finishing the transfer. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 13u, - .offset = 16, - .data = data.subspan(16), - .remaining_bytes = 0})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(13) + .set_offset(16) + .set_payload(data.subspan(16)) + .set_remaining_bytes(0))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 3u); Chunk c4 = DecodeChunk(payloads.back()); - EXPECT_EQ(c4.transfer_id, 13u); - ASSERT_TRUE(c4.status.has_value()); - EXPECT_EQ(c4.status.value(), OkStatus()); + EXPECT_EQ(c4.session_id(), 13u); + ASSERT_TRUE(c4.status().has_value()); + EXPECT_EQ(c4.status().value(), OkStatus()); EXPECT_EQ(transfer_status, OkStatus()); } @@ -704,9 +760,10 @@ TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 14u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.pending_bytes.value(), 64u); + EXPECT_EQ(c0.session_id(), 14u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); for (unsigned retry = 1; retry <= kTestRetries; ++retry) { // Wait for the timeout to expire without doing anything. The client should @@ -715,30 +772,26 @@ TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) { ASSERT_EQ(payloads.size(), retry + 1); Chunk c = DecodeChunk(payloads.back()); - EXPECT_EQ(c.transfer_id, 14u); - EXPECT_EQ(c.offset, 0u); - EXPECT_EQ(c.pending_bytes.value(), 64u); + EXPECT_EQ(c.session_id(), 14u); + EXPECT_EQ(c.offset(), 0u); + EXPECT_EQ(c.window_end_offset(), 64u); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); } // Sleep one more time after the final retry. The client should cancel the - // transfer at this point and send a DEADLINE_EXCEEDED chunk. + // transfer at this point. As no packets were received from the server, no + // final status chunk should be sent. transfer_thread_.SimulateClientTimeout(14); - ASSERT_EQ(payloads.size(), 5u); - - Chunk c4 = DecodeChunk(payloads.back()); - EXPECT_EQ(c4.transfer_id, 14u); - ASSERT_TRUE(c4.status.has_value()); - EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded()); + ASSERT_EQ(payloads.size(), 4u); EXPECT_EQ(transfer_status, Status::DeadlineExceeded()); // After finishing the transfer, nothing else should be sent. Verify this by // waiting for a bit. this_thread::sleep_for(kTestTimeout * 4); - ASSERT_EQ(payloads.size(), 5u); + ASSERT_EQ(payloads.size(), 4u); } TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) { @@ -762,9 +815,9 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) { EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 14u); - EXPECT_EQ(c0.offset, 0u); - EXPECT_EQ(c0.window_end_offset, 64u); + EXPECT_EQ(c0.session_id(), 14u); + EXPECT_EQ(c0.offset(), 0u); + EXPECT_EQ(c0.window_end_offset(), 64u); // Simulate one less timeout than the maximum amount of retries. for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) { @@ -772,9 +825,9 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) { ASSERT_EQ(payloads.size(), retry + 1); Chunk c = DecodeChunk(payloads.back()); - EXPECT_EQ(c.transfer_id, 14u); - EXPECT_EQ(c.offset, 0u); - EXPECT_EQ(c.window_end_offset, 64u); + EXPECT_EQ(c.session_id(), 14u); + EXPECT_EQ(c.offset(), 0u); + EXPECT_EQ(c.window_end_offset(), 64u); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); @@ -782,7 +835,10 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) { // Send some data. context_.server().SendServerStream<Transfer::Read>( - EncodeChunk({.transfer_id = 14u, .offset = 0, .data = data.first(16)})); + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(14) + .set_offset(0) + .set_payload(data.first(16)))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 3u); @@ -793,19 +849,23 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) { ASSERT_EQ(payloads.size(), 4u); Chunk c = DecodeChunk(payloads.back()); - EXPECT_FALSE(c.status.has_value()); - EXPECT_EQ(c.transfer_id, 14u); - EXPECT_EQ(c.offset, 16u); - EXPECT_EQ(c.window_end_offset, 64u); + EXPECT_FALSE(c.status().has_value()); + EXPECT_EQ(c.session_id(), 14u); + EXPECT_EQ(c.offset(), 16u); + EXPECT_EQ(c.window_end_offset(), 64u); transfer_thread_.SimulateClientTimeout(14); ASSERT_EQ(payloads.size(), 5u); c = DecodeChunk(payloads.back()); - EXPECT_FALSE(c.status.has_value()); - EXPECT_EQ(c.transfer_id, 14u); - EXPECT_EQ(c.offset, 16u); - EXPECT_EQ(c.window_end_offset, 64u); + EXPECT_FALSE(c.status().has_value()); + EXPECT_EQ(c.session_id(), 14u); + EXPECT_EQ(c.offset(), 16u); + EXPECT_EQ(c.window_end_offset(), 64u); + + // Ensure we don't leave a dangling reference to transfer_status. + client_.CancelTransfer(14); + transfer_thread_.WaitUntilEventIsProcessed(); } TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) { @@ -836,7 +896,7 @@ class WriteTransfer : public ::testing::Test { client_(context_.client(), context_.channel().id(), transfer_thread_), system_thread_(TransferThreadOptions(), transfer_thread_) {} - ~WriteTransfer() { + ~WriteTransfer() override { transfer_thread_.Terminate(); system_thread_.join(); } @@ -862,42 +922,47 @@ TEST_F(WriteTransfer, SingleChunk) { })); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 3u); + EXPECT_EQ(c0.session_id(), 3u); + EXPECT_EQ(c0.resource_id(), 3u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send transfer parameters. Client should send a data chunk and the final // chunk. rpc::test::WaitForPackets(context_.output(), 2, [this] { - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 3, - .pending_bytes = 64, - .max_chunk_size_bytes = 32, - .offset = 0})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(3) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); }); ASSERT_EQ(payloads.size(), 3u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 3u); - EXPECT_EQ(c1.offset, 0u); - EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0); + EXPECT_EQ(c1.session_id(), 3u); + EXPECT_EQ(c1.offset(), 0u); + EXPECT_TRUE(c1.has_payload()); + EXPECT_EQ( + std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 3u); - ASSERT_TRUE(c2.remaining_bytes.has_value()); - EXPECT_EQ(c2.remaining_bytes.value(), 0u); + EXPECT_EQ(c2.session_id(), 3u); + ASSERT_TRUE(c2.remaining_bytes().has_value()); + EXPECT_EQ(c2.remaining_bytes().value(), 0u); EXPECT_EQ(transfer_status, Status::Unknown()); // Send the final status chunk to complete the transfer. context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 3, .status = OkStatus()})); + EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus()))); transfer_thread_.WaitUntilEventIsProcessed(); EXPECT_EQ(payloads.size(), 3u); @@ -914,50 +979,57 @@ TEST_F(WriteTransfer, MultiChunk) { })); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 4u); + EXPECT_EQ(c0.session_id(), 4u); + EXPECT_EQ(c0.resource_id(), 4u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send transfer parameters with a chunk size smaller than the data. // Client should send two data chunks and the final chunk. rpc::test::WaitForPackets(context_.output(), 3, [this] { - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 4, - .pending_bytes = 64, - .max_chunk_size_bytes = 16, - .offset = 0})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(4) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(16))); }); ASSERT_EQ(payloads.size(), 4u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 4u); - EXPECT_EQ(c1.offset, 0u); - EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0); + EXPECT_EQ(c1.session_id(), 4u); + EXPECT_EQ(c1.offset(), 0u); + EXPECT_TRUE(c1.has_payload()); + EXPECT_EQ( + std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 4u); - EXPECT_EQ(c2.offset, 16u); - EXPECT_EQ( - std::memcmp(c2.data.data(), kData32.data() + c2.offset, c2.data.size()), - 0); + EXPECT_EQ(c2.session_id(), 4u); + EXPECT_EQ(c2.offset(), 16u); + EXPECT_TRUE(c2.has_payload()); + EXPECT_EQ(std::memcmp(c2.payload().data(), + kData32.data() + c2.offset(), + c2.payload().size()), + 0); Chunk c3 = DecodeChunk(payloads[3]); - EXPECT_EQ(c3.transfer_id, 4u); - ASSERT_TRUE(c3.remaining_bytes.has_value()); - EXPECT_EQ(c3.remaining_bytes.value(), 0u); + EXPECT_EQ(c3.session_id(), 4u); + ASSERT_TRUE(c3.remaining_bytes().has_value()); + EXPECT_EQ(c3.remaining_bytes().value(), 0u); EXPECT_EQ(transfer_status, Status::Unknown()); // Send the final status chunk to complete the transfer. context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 4, .status = OkStatus()})); + EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 4, OkStatus()))); transfer_thread_.WaitUntilEventIsProcessed(); EXPECT_EQ(payloads.size(), 4u); @@ -974,44 +1046,49 @@ TEST_F(WriteTransfer, OutOfOrder_SeekSupported) { })); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 5u); + EXPECT_EQ(c0.session_id(), 5u); + EXPECT_EQ(c0.resource_id(), 5u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send transfer parameters with a nonzero offset, requesting a seek. // Client should send a data chunk and the final chunk. rpc::test::WaitForPackets(context_.output(), 2, [this] { - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 5, - .pending_bytes = 64, - .max_chunk_size_bytes = 32, - .offset = 16})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(5) + .set_offset(16) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); }); ASSERT_EQ(payloads.size(), 3u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 5u); - EXPECT_EQ(c1.offset, 16u); - EXPECT_EQ( - std::memcmp(c1.data.data(), kData32.data() + c1.offset, c1.data.size()), - 0); + EXPECT_EQ(c1.session_id(), 5u); + EXPECT_EQ(c1.offset(), 16u); + EXPECT_TRUE(c1.has_payload()); + EXPECT_EQ(std::memcmp(c1.payload().data(), + kData32.data() + c1.offset(), + c1.payload().size()), + 0); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 5u); - ASSERT_TRUE(c2.remaining_bytes.has_value()); - EXPECT_EQ(c2.remaining_bytes.value(), 0u); + EXPECT_EQ(c2.session_id(), 5u); + ASSERT_TRUE(c2.remaining_bytes().has_value()); + EXPECT_EQ(c2.remaining_bytes().value(), 0u); EXPECT_EQ(transfer_status, Status::Unknown()); // Send the final status chunk to complete the transfer. context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 5, .status = OkStatus()})); + EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 5, OkStatus()))); transfer_thread_.WaitUntilEventIsProcessed(); EXPECT_EQ(payloads.size(), 3u); @@ -1049,30 +1126,34 @@ TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) { })); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 6u); + EXPECT_EQ(c0.session_id(), 6u); + EXPECT_EQ(c0.resource_id(), 6u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send transfer parameters with a nonzero offset, requesting a seek. - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 6, - .pending_bytes = 64, - .max_chunk_size_bytes = 32, - .offset = 16})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(6) + .set_offset(16) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); transfer_thread_.WaitUntilEventIsProcessed(); // Client should send a status chunk and end the transfer. ASSERT_EQ(payloads.size(), 2u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 6u); - ASSERT_TRUE(c1.status.has_value()); - EXPECT_EQ(c1.status.value(), Status::Unimplemented()); + EXPECT_EQ(c1.session_id(), 6u); + EXPECT_EQ(c1.type(), Chunk::Type::kCompletion); + ASSERT_TRUE(c1.status().has_value()); + EXPECT_EQ(c1.status().value(), Status::Unimplemented()); EXPECT_EQ(transfer_status, Status::Unimplemented()); } @@ -1087,18 +1168,20 @@ TEST_F(WriteTransfer, ServerError) { })); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 7u); + EXPECT_EQ(c0.session_id(), 7u); + EXPECT_EQ(c0.resource_id(), 7u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send an error from the server. - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 7, .status = Status::NotFound()})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk::Final(ProtocolVersion::kLegacy, 7, Status::NotFound()))); transfer_thread_.WaitUntilEventIsProcessed(); // Client should not respond and terminate the transfer. @@ -1106,41 +1189,6 @@ TEST_F(WriteTransfer, ServerError) { EXPECT_EQ(transfer_status, Status::NotFound()); } -TEST_F(WriteTransfer, MalformedParametersChunk) { - stream::MemoryReader reader(kData32); - Status transfer_status = Status::Unknown(); - - ASSERT_EQ(OkStatus(), - client_.Write(8, reader, [&transfer_status](Status status) { - transfer_status = status; - })); - transfer_thread_.WaitUntilEventIsProcessed(); - - // The client begins by just sending the transfer ID. - rpc::PayloadsView payloads = - context_.output().payloads<Transfer::Write>(context_.channel().id()); - ASSERT_EQ(payloads.size(), 1u); - EXPECT_EQ(transfer_status, Status::Unknown()); - - Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 8u); - - // Send an invalid transfer parameters chunk without pending_bytes. - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 8, .max_chunk_size_bytes = 32})); - transfer_thread_.WaitUntilEventIsProcessed(); - - // Client should send a status chunk and end the transfer. - ASSERT_EQ(payloads.size(), 2u); - - Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 8u); - ASSERT_TRUE(c1.status.has_value()); - EXPECT_EQ(c1.status.value(), Status::InvalidArgument()); - - EXPECT_EQ(transfer_status, Status::InvalidArgument()); -} - TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) { stream::MemoryReader reader(kData32); Status transfer_status = Status::Unknown(); @@ -1151,27 +1199,33 @@ TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) { })); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads[0]); - EXPECT_EQ(c0.transfer_id, 9u); + EXPECT_EQ(c0.session_id(), 9u); + EXPECT_EQ(c0.resource_id(), 9u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); - // Send an invalid transfer parameters chunk with 0 pending_bytes. + // Send an invalid transfer parameters chunk with 0 pending bytes. context_.server().SendServerStream<Transfer::Write>(EncodeChunk( - {.transfer_id = 9, .pending_bytes = 0, .max_chunk_size_bytes = 32})); + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(9) + .set_offset(0) + .set_window_end_offset(0) + .set_max_chunk_size_bytes(32))); transfer_thread_.WaitUntilEventIsProcessed(); // Client should send a status chunk and end the transfer. ASSERT_EQ(payloads.size(), 2u); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 9u); - ASSERT_TRUE(c1.status.has_value()); - EXPECT_EQ(c1.status.value(), Status::ResourceExhausted()); + EXPECT_EQ(c1.session_id(), 9u); + ASSERT_TRUE(c1.status().has_value()); + EXPECT_EQ(c1.status().value(), Status::ResourceExhausted()); EXPECT_EQ(transfer_status, Status::ResourceExhausted()); } @@ -1188,14 +1242,16 @@ TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) { kTestTimeout)); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 10u); + EXPECT_EQ(c0.session_id(), 10u); + EXPECT_EQ(c0.resource_id(), 10u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Wait for the timeout to expire without doing anything. The client should // resend the initial transmit chunk. @@ -1203,10 +1259,16 @@ TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) { ASSERT_EQ(payloads.size(), 2u); Chunk c = DecodeChunk(payloads.back()); - EXPECT_EQ(c.transfer_id, 10u); + EXPECT_EQ(c.session_id(), 10u); + EXPECT_EQ(c.resource_id(), 10u); + EXPECT_EQ(c.type(), Chunk::Type::kStart); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); + + // Ensure we don't leave a dangling reference to transfer_status. + client_.CancelTransfer(10); + transfer_thread_.WaitUntilEventIsProcessed(); } TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) { @@ -1221,40 +1283,45 @@ TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) { kTestTimeout)); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 11u); + EXPECT_EQ(c0.session_id(), 11u); + EXPECT_EQ(c0.resource_id(), 11u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send the first parameters chunk. rpc::test::WaitForPackets(context_.output(), 2, [this] { - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 11, - .pending_bytes = 16, - .max_chunk_size_bytes = 8, - .offset = 0})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(11) + .set_offset(0) + .set_window_end_offset(16) + .set_max_chunk_size_bytes(8))); }); ASSERT_EQ(payloads.size(), 3u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 11u); - EXPECT_EQ(c1.offset, 0u); - EXPECT_EQ(c1.data.size(), 8u); - EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0); + EXPECT_EQ(c1.session_id(), 11u); + EXPECT_EQ(c1.offset(), 0u); + EXPECT_EQ(c1.payload().size(), 8u); + EXPECT_EQ( + std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 11u); - EXPECT_EQ(c2.offset, 8u); - EXPECT_EQ(c2.data.size(), 8u); - EXPECT_EQ( - std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()), - 0); + EXPECT_EQ(c2.session_id(), 11u); + EXPECT_EQ(c2.offset(), 8u); + EXPECT_EQ(c2.payload().size(), 8u); + EXPECT_EQ(std::memcmp(c2.payload().data(), + kData32.data() + c2.offset(), + c2.payload().size()), + 0); // Wait for the timeout to expire without doing anything. The client should // resend the most recently sent chunk. @@ -1262,13 +1329,19 @@ TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) { ASSERT_EQ(payloads.size(), 4u); Chunk c3 = DecodeChunk(payloads[3]); - EXPECT_EQ(c3.transfer_id, c2.transfer_id); - EXPECT_EQ(c3.offset, c2.offset); - EXPECT_EQ(c3.data.size(), c2.data.size()); - EXPECT_EQ(std::memcmp(c3.data.data(), c2.data.data(), c3.data.size()), 0); + EXPECT_EQ(c3.session_id(), c2.session_id()); + EXPECT_EQ(c3.offset(), c2.offset()); + EXPECT_EQ(c3.payload().size(), c2.payload().size()); + EXPECT_EQ(std::memcmp( + c3.payload().data(), c2.payload().data(), c3.payload().size()), + 0); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); + + // Ensure we don't leave a dangling reference to transfer_status. + client_.CancelTransfer(11); + transfer_thread_.WaitUntilEventIsProcessed(); } TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) { @@ -1283,38 +1356,42 @@ TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) { kTestTimeout)); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 12u); + EXPECT_EQ(c0.session_id(), 12u); + EXPECT_EQ(c0.resource_id(), 12u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send the first parameters chunk, requesting all the data. The client should // respond with one data chunk and a remaining_bytes = 0 chunk. rpc::test::WaitForPackets(context_.output(), 2, [this] { - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 12, - .pending_bytes = 64, - .max_chunk_size_bytes = 64, - .offset = 0})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(12) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(64))); }); ASSERT_EQ(payloads.size(), 3u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 12u); - EXPECT_EQ(c1.offset, 0u); - EXPECT_EQ(c1.data.size(), 32u); - EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0); + EXPECT_EQ(c1.session_id(), 12u); + EXPECT_EQ(c1.offset(), 0u); + EXPECT_EQ(c1.payload().size(), 32u); + EXPECT_EQ( + std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 12u); - ASSERT_TRUE(c2.remaining_bytes.has_value()); - EXPECT_EQ(c2.remaining_bytes.value(), 0u); + EXPECT_EQ(c2.session_id(), 12u); + ASSERT_TRUE(c2.remaining_bytes().has_value()); + EXPECT_EQ(c2.remaining_bytes().value(), 0u); // Wait for the timeout to expire without doing anything. The client should // resend the data chunk. @@ -1322,28 +1399,30 @@ TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) { ASSERT_EQ(payloads.size(), 4u); Chunk c3 = DecodeChunk(payloads[3]); - EXPECT_EQ(c3.transfer_id, c1.transfer_id); - EXPECT_EQ(c3.offset, c1.offset); - EXPECT_EQ(c3.data.size(), c1.data.size()); - EXPECT_EQ(std::memcmp(c3.data.data(), c1.data.data(), c3.data.size()), 0); + EXPECT_EQ(c3.session_id(), c1.session_id()); + EXPECT_EQ(c3.offset(), c1.offset()); + EXPECT_EQ(c3.payload().size(), c1.payload().size()); + EXPECT_EQ(std::memcmp( + c3.payload().data(), c1.payload().data(), c3.payload().size()), + 0); // The remaining_bytes = 0 chunk should be resent on the next parameters. - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 12, - .pending_bytes = 64, - .max_chunk_size_bytes = 64, - .offset = 32})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(12) + .set_offset(32) + .set_window_end_offset(64))); transfer_thread_.WaitUntilEventIsProcessed(); ASSERT_EQ(payloads.size(), 5u); Chunk c4 = DecodeChunk(payloads[4]); - EXPECT_EQ(c4.transfer_id, 12u); - ASSERT_TRUE(c4.remaining_bytes.has_value()); - EXPECT_EQ(c4.remaining_bytes.value(), 0u); + EXPECT_EQ(c4.session_id(), 12u); + ASSERT_TRUE(c4.remaining_bytes().has_value()); + EXPECT_EQ(c4.remaining_bytes().value(), 0u); context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 12, .status = OkStatus()})); + EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 12, OkStatus()))); transfer_thread_.WaitUntilEventIsProcessed(); EXPECT_EQ(transfer_status, OkStatus()); @@ -1361,14 +1440,16 @@ TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) { kTestTimeout)); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 13u); + EXPECT_EQ(c0.session_id(), 13u); + EXPECT_EQ(c0.resource_id(), 13u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); for (unsigned retry = 1; retry <= kTestRetries; ++retry) { // Wait for the timeout to expire without doing anything. The client should @@ -1377,28 +1458,30 @@ TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) { ASSERT_EQ(payloads.size(), retry + 1); Chunk c = DecodeChunk(payloads.back()); - EXPECT_EQ(c.transfer_id, 13u); + EXPECT_EQ(c.session_id(), 13u); + EXPECT_EQ(c.resource_id(), 13u); + EXPECT_EQ(c.type(), Chunk::Type::kStart); // Transfer has not yet completed. EXPECT_EQ(transfer_status, Status::Unknown()); } // Sleep one more time after the final retry. The client should cancel the - // transfer at this point and send a DEADLINE_EXCEEDED chunk. + // transfer at this point. As no packets were received from the server, no + // final status chunk should be sent. transfer_thread_.SimulateClientTimeout(13); - ASSERT_EQ(payloads.size(), 5u); - - Chunk c4 = DecodeChunk(payloads.back()); - EXPECT_EQ(c4.transfer_id, 13u); - ASSERT_TRUE(c4.status.has_value()); - EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded()); + ASSERT_EQ(payloads.size(), 4u); EXPECT_EQ(transfer_status, Status::DeadlineExceeded()); // After finishing the transfer, nothing else should be sent. Verify this by // waiting for a bit. this_thread::sleep_for(kTestTimeout * 4); - ASSERT_EQ(payloads.size(), 5u); + ASSERT_EQ(payloads.size(), 4u); + + // Ensure we don't leave a dangling reference to transfer_status. + client_.CancelTransfer(13); + transfer_thread_.WaitUntilEventIsProcessed(); } TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) { @@ -1413,40 +1496,47 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) { kTestTimeout)); transfer_thread_.WaitUntilEventIsProcessed(); - // The client begins by just sending the transfer ID. + // The client begins by sending the ID of the resource to transfer. rpc::PayloadsView payloads = context_.output().payloads<Transfer::Write>(context_.channel().id()); ASSERT_EQ(payloads.size(), 1u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c0 = DecodeChunk(payloads.back()); - EXPECT_EQ(c0.transfer_id, 14u); + EXPECT_EQ(c0.session_id(), 14u); + EXPECT_EQ(c0.resource_id(), 14u); + EXPECT_EQ(c0.type(), Chunk::Type::kStart); // Send the first parameters chunk. rpc::test::WaitForPackets(context_.output(), 2, [this] { - context_.server().SendServerStream<Transfer::Write>( - EncodeChunk({.transfer_id = 14, - .pending_bytes = 16, - .max_chunk_size_bytes = 8, - .offset = 0})); + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(14) + .set_offset(0) + .set_window_end_offset(16) + .set_max_chunk_size_bytes(8))); }); ASSERT_EQ(payloads.size(), 3u); EXPECT_EQ(transfer_status, Status::Unknown()); Chunk c1 = DecodeChunk(payloads[1]); - EXPECT_EQ(c1.transfer_id, 14u); - EXPECT_EQ(c1.offset, 0u); - EXPECT_EQ(c1.data.size(), 8u); - EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0); + EXPECT_EQ(c1.session_id(), 14u); + EXPECT_EQ(c1.offset(), 0u); + EXPECT_TRUE(c1.has_payload()); + EXPECT_EQ(c1.payload().size(), 8u); + EXPECT_EQ( + std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0); Chunk c2 = DecodeChunk(payloads[2]); - EXPECT_EQ(c2.transfer_id, 14u); - EXPECT_EQ(c2.offset, 8u); - EXPECT_EQ(c2.data.size(), 8u); - EXPECT_EQ( - std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()), - 0); + EXPECT_EQ(c2.session_id(), 14u); + EXPECT_EQ(c2.offset(), 8u); + EXPECT_TRUE(c2.has_payload()); + EXPECT_EQ(c2.payload().size(), 8u); + EXPECT_EQ(std::memcmp(c2.payload().data(), + kData32.data() + c2.offset(), + c2.payload().size()), + 0); // Wait for the timeout to expire without doing anything. The client should // fail to seek back and end the transfer. @@ -1454,14 +1544,1039 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) { ASSERT_EQ(payloads.size(), 4u); Chunk c3 = DecodeChunk(payloads[3]); - EXPECT_EQ(c3.transfer_id, 14u); - ASSERT_TRUE(c3.status.has_value()); - EXPECT_EQ(c3.status.value(), Status::DeadlineExceeded()); + EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy); + EXPECT_EQ(c3.session_id(), 14u); + ASSERT_TRUE(c3.status().has_value()); + EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded()); EXPECT_EQ(transfer_status, Status::DeadlineExceeded()); + + // Ensure we don't leave a dangling reference to transfer_status. + client_.CancelTransfer(14); + transfer_thread_.WaitUntilEventIsProcessed(); } -PW_MODIFY_DIAGNOSTICS_POP(); +TEST_F(WriteTransfer, ManualCancel) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 15, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + kTestTimeout)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 15u); + EXPECT_EQ(chunk.resource_id(), 15u); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + + // Get a response from the server, then cancel the transfer. + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(15) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); + transfer_thread_.WaitUntilEventIsProcessed(); + ASSERT_EQ(payloads.size(), 2u); + + client_.CancelTransfer(15); + transfer_thread_.WaitUntilEventIsProcessed(); + + // Client should send a cancellation chunk to the server. + ASSERT_EQ(payloads.size(), 3u); + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 15u); + ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.status().value(), Status::Cancelled()); + + EXPECT_EQ(transfer_status, Status::Cancelled()); +} + +TEST_F(WriteTransfer, ManualCancel_NoContact) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 15, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + kTestTimeout)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 15u); + EXPECT_EQ(chunk.resource_id(), 15u); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + + // Cancel transfer without a server response. No final chunk should be sent. + client_.CancelTransfer(15); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 1u); + + EXPECT_EQ(transfer_status, Status::Cancelled()); +} + +TEST_F(ReadTransfer, Version2_SingleChunk) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads[0]); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // The server responds with a START_ACK, continuing the version 2 handshake + // and assigning a session_id to the transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(29) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + // Client should accept the session_id with a START_ACK_CONFIRMATION, + // additionally containing the initial parameters for the read transfer. + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_FALSE(chunk.resource_id().has_value()); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // Send all the transfer data. Client should accept it and complete the + // transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData) + .set_session_id(29) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + EXPECT_EQ(transfer_status, OkStatus()); + EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), + 0); + + context_.server().SendServerStream<Transfer::Read>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck) + .set_session_id(29))); +} + +TEST_F(ReadTransfer, Version2_ServerRunsLegacy) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads[0]); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // Instead of a START_ACK to continue the handshake, the server responds with + // an immediate data chunk, indicating that it is running the legacy protocol + // version. Client should revert to legacy, using the resource_id of 3 as the + // session_id, and complete the transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData) + .set_session_id(3) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + EXPECT_EQ(transfer_status, OkStatus()); + EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), + 0); +} + +TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // Wait for the timeout to expire without doing anything. The client should + // resend the initial chunk. + transfer_thread_.SimulateClientTimeout(3); + ASSERT_EQ(payloads.size(), 2u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // This time, the server responds, continuing the handshake and transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(31) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 31u); + EXPECT_FALSE(chunk.resource_id().has_value()); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData) + .set_session_id(31) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 4u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 31u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + EXPECT_EQ(transfer_status, OkStatus()); + EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), + 0); + + context_.server().SendServerStream<Transfer::Read>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck) + .set_session_id(31))); +} + +TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // The server responds with a START_ACK, continuing the version 2 handshake + // and assigning a session_id to the transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(33) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + // Client should accept the session_id with a START_ACK_CONFIRMATION, + // additionally containing the initial parameters for the read transfer. + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + EXPECT_FALSE(chunk.resource_id().has_value()); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // Wait for the timeout to expire without doing anything. The client should + // resend the confirmation chunk. + transfer_thread_.SimulateClientTimeout(33); + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + EXPECT_FALSE(chunk.resource_id().has_value()); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // The server responds and the transfer should continue normally. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData) + .set_session_id(33) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 4u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 33u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + EXPECT_EQ(transfer_status, OkStatus()); + EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), + 0); + + context_.server().SendServerStream<Transfer::Read>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck) + .set_session_id(33))); +} + +TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // The server responds to the start request with an error. + context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final( + ProtocolVersion::kVersionTwo, 3, Status::Unauthenticated()))); + transfer_thread_.WaitUntilEventIsProcessed(); + + EXPECT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unauthenticated()); +} + +TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads[0]); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // The server responds with a START_ACK, continuing the version 2 handshake + // and assigning a session_id to the transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(29) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + // Client should accept the session_id with a START_ACK_CONFIRMATION, + // additionally containing the initial parameters for the read transfer. + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_FALSE(chunk.resource_id().has_value()); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // Send all the transfer data. Client should accept it and complete the + // transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData) + .set_session_id(29) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + EXPECT_EQ(transfer_status, OkStatus()); + EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), + 0); + + // Time out instead of sending a completion ACK. THe transfer should resend + // its completion chunk. + transfer_thread_.SimulateClientTimeout(29); + ASSERT_EQ(payloads.size(), 4u); + + // Reset transfer_status to check whether the handler is called again. + transfer_status = Status::Unknown(); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + // Transfer handler should not be called a second time in response to the + // re-sent completion chunk. + EXPECT_EQ(transfer_status, Status::Unknown()); + + // Send a completion ACK to end the transfer. + context_.server().SendServerStream<Transfer::Read>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck) + .set_session_id(29))); + transfer_thread_.WaitUntilEventIsProcessed(); + + // No further chunks should be sent following the ACK. + transfer_thread_.SimulateClientTimeout(29); + ASSERT_EQ(payloads.size(), 4u); +} + +TEST_F(ReadTransfer, + Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) { + stream::MemoryWriterBuffer<64> writer; + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Read( + 3, + writer, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + + transfer_thread_.WaitUntilEventIsProcessed(); + + // Initial chunk of the transfer is sent. This chunk should contain all the + // fields from both legacy and version 2 protocols for backwards + // compatibility. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Read>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads[0]); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // The server responds with a START_ACK, continuing the version 2 handshake + // and assigning a session_id to the transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(29) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + // Client should accept the session_id with a START_ACK_CONFIRMATION, + // additionally containing the initial parameters for the read transfer. + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_FALSE(chunk.resource_id().has_value()); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_EQ(chunk.window_end_offset(), 64u); + EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u); + + // Send all the transfer data. Client should accept it and complete the + // transfer. + context_.server().SendServerStream<Transfer::Read>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData) + .set_session_id(29) + .set_offset(0) + .set_payload(kData32) + .set_remaining_bytes(0))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + ASSERT_TRUE(chunk.status().has_value()); + EXPECT_EQ(chunk.status().value(), OkStatus()); + + EXPECT_EQ(transfer_status, OkStatus()); + EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()), + 0); + + // Time out instead of sending a completion ACK. THe transfer should resend + // its completion chunk at first, then terminate after the maximum number of + // retries. + transfer_thread_.SimulateClientTimeout(29); + ASSERT_EQ(payloads.size(), 4u); // Retry 1. + + transfer_thread_.SimulateClientTimeout(29); + ASSERT_EQ(payloads.size(), 5u); // Retry 2. + + transfer_thread_.SimulateClientTimeout(29); + ASSERT_EQ(payloads.size(), 6u); // Retry 3. + + transfer_thread_.SimulateClientTimeout(29); + ASSERT_EQ(payloads.size(), 6u); // No more retries; transfer has ended. +} + +TEST_F(WriteTransfer, Version2_SingleChunk) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 3, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + + // The server responds with a START_ACK, continuing the version 2 handshake + // and assigning a session_id to the transfer. + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(29) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + // Client should accept the session_id with a START_ACK_CONFIRMATION. + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_FALSE(chunk.resource_id().has_value()); + + // The server can then begin the data transfer by sending its transfer + // parameters. Client should respond with a data chunk and the final chunk. + rpc::test::WaitForPackets(context_.output(), 2, [this] { + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit) + .set_session_id(29) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); + }); + + ASSERT_EQ(payloads.size(), 4u); + + chunk = DecodeChunk(payloads[2]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_TRUE(chunk.has_payload()); + EXPECT_EQ(std::memcmp( + chunk.payload().data(), kData32.data(), chunk.payload().size()), + 0); + + chunk = DecodeChunk(payloads[3]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + ASSERT_TRUE(chunk.remaining_bytes().has_value()); + EXPECT_EQ(chunk.remaining_bytes().value(), 0u); + + EXPECT_EQ(transfer_status, Status::Unknown()); + + // Send the final status chunk to complete the transfer. + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus()))); + transfer_thread_.WaitUntilEventIsProcessed(); + + // Client should acknowledge the completion of the transfer. + EXPECT_EQ(payloads.size(), 5u); + + chunk = DecodeChunk(payloads[4]); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 29u); + + EXPECT_EQ(transfer_status, OkStatus()); +} + +TEST_F(WriteTransfer, Version2_ServerRunsLegacy) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 3, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + + // Instead of continuing the handshake with a START_ACK, the server + // immediately sends parameters, indicating that it only supports the legacy + // protocol. Client should switch over to legacy and continue the transfer. + rpc::test::WaitForPackets(context_.output(), 2, [this] { + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit) + .set_session_id(3) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); + }); + + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads[1]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_TRUE(chunk.has_payload()); + EXPECT_EQ(std::memcmp( + chunk.payload().data(), kData32.data(), chunk.payload().size()), + 0); + + chunk = DecodeChunk(payloads[2]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy); + EXPECT_EQ(chunk.session_id(), 3u); + ASSERT_TRUE(chunk.remaining_bytes().has_value()); + EXPECT_EQ(chunk.remaining_bytes().value(), 0u); + + EXPECT_EQ(transfer_status, Status::Unknown()); + + // Send the final status chunk to complete the transfer. + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus()))); + transfer_thread_.WaitUntilEventIsProcessed(); + + EXPECT_EQ(payloads.size(), 3u); + EXPECT_EQ(transfer_status, OkStatus()); +} + +TEST_F(WriteTransfer, Version2_RetryDuringHandshake) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 3, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + + // Time out waiting for a server response. The client should resend the + // initial packet. + transfer_thread_.SimulateClientTimeout(3); + ASSERT_EQ(payloads.size(), 2u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + + // This time, respond with the correct continuation packet. The transfer + // should resume and complete normally. + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(31) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 31u); + EXPECT_FALSE(chunk.resource_id().has_value()); + + rpc::test::WaitForPackets(context_.output(), 2, [this] { + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit) + .set_session_id(31) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); + }); + + ASSERT_EQ(payloads.size(), 5u); + + chunk = DecodeChunk(payloads[3]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 31u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_TRUE(chunk.has_payload()); + EXPECT_EQ(std::memcmp( + chunk.payload().data(), kData32.data(), chunk.payload().size()), + 0); + + chunk = DecodeChunk(payloads[4]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 31u); + ASSERT_TRUE(chunk.remaining_bytes().has_value()); + EXPECT_EQ(chunk.remaining_bytes().value(), 0u); + + EXPECT_EQ(transfer_status, Status::Unknown()); + + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus()))); + transfer_thread_.WaitUntilEventIsProcessed(); + + // Client should acknowledge the completion of the transfer. + EXPECT_EQ(payloads.size(), 6u); + + chunk = DecodeChunk(payloads[5]); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 31u); + + EXPECT_EQ(transfer_status, OkStatus()); +} + +TEST_F(WriteTransfer, Version2_RetryAfterHandshake) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 3, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + + // The server responds with a START_ACK, continuing the version 2 handshake + // and assigning a session_id to the transfer. + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck) + .set_session_id(33) + .set_resource_id(3))); + transfer_thread_.WaitUntilEventIsProcessed(); + + ASSERT_EQ(payloads.size(), 2u); + + // Client should accept the session_id with a START_ACK_CONFIRMATION. + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + EXPECT_FALSE(chunk.resource_id().has_value()); + + // Time out waiting for a server response. The client should resend the + // initial packet. + transfer_thread_.SimulateClientTimeout(33); + ASSERT_EQ(payloads.size(), 3u); + + chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + EXPECT_FALSE(chunk.resource_id().has_value()); + + // This time, respond with the first transfer parameters chunk. The transfer + // should resume and complete normally. + rpc::test::WaitForPackets(context_.output(), 2, [this] { + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit) + .set_session_id(33) + .set_offset(0) + .set_window_end_offset(64) + .set_max_chunk_size_bytes(32))); + }); + + ASSERT_EQ(payloads.size(), 5u); + + chunk = DecodeChunk(payloads[3]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + EXPECT_EQ(chunk.offset(), 0u); + EXPECT_TRUE(chunk.has_payload()); + EXPECT_EQ(std::memcmp( + chunk.payload().data(), kData32.data(), chunk.payload().size()), + 0); + + chunk = DecodeChunk(payloads[4]); + EXPECT_EQ(chunk.type(), Chunk::Type::kData); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + ASSERT_TRUE(chunk.remaining_bytes().has_value()); + EXPECT_EQ(chunk.remaining_bytes().value(), 0u); + + EXPECT_EQ(transfer_status, Status::Unknown()); + + context_.server().SendServerStream<Transfer::Write>( + EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus()))); + transfer_thread_.WaitUntilEventIsProcessed(); + + // Client should acknowledge the completion of the transfer. + EXPECT_EQ(payloads.size(), 6u); + + chunk = DecodeChunk(payloads[5]); + EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 33u); + + EXPECT_EQ(transfer_status, OkStatus()); +} + +TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) { + stream::MemoryReader reader(kData32); + Status transfer_status = Status::Unknown(); + + ASSERT_EQ(OkStatus(), + client_.Write( + 3, + reader, + [&transfer_status](Status status) { transfer_status = status; }, + cfg::kDefaultChunkTimeout, + ProtocolVersion::kVersionTwo)); + transfer_thread_.WaitUntilEventIsProcessed(); + + // The client begins by sending the ID of the resource to transfer. + rpc::PayloadsView payloads = + context_.output().payloads<Transfer::Write>(context_.channel().id()); + ASSERT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::Unknown()); + + Chunk chunk = DecodeChunk(payloads.back()); + EXPECT_EQ(chunk.type(), Chunk::Type::kStart); + EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo); + EXPECT_EQ(chunk.session_id(), 3u); + EXPECT_EQ(chunk.resource_id(), 3u); + + // The server responds to the start request with an error. + context_.server().SendServerStream<Transfer::Write>(EncodeChunk( + Chunk::Final(ProtocolVersion::kVersionTwo, 3, Status::NotFound()))); + transfer_thread_.WaitUntilEventIsProcessed(); + + EXPECT_EQ(payloads.size(), 1u); + EXPECT_EQ(transfer_status, Status::NotFound()); +} } // namespace } // namespace pw::transfer::test |