aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer
diff options
context:
space:
mode:
Diffstat (limited to 'pw_transfer')
-rw-r--r--pw_transfer/BUILD.bazel26
-rw-r--r--pw_transfer/BUILD.gn3
-rw-r--r--pw_transfer/atomic_file_transfer_handler_test.cc2
-rw-r--r--pw_transfer/chunk.cc9
-rw-r--r--pw_transfer/chunk_test.cc2
-rw-r--r--pw_transfer/client.cc64
-rw-r--r--pw_transfer/client_context.cc7
-rw-r--r--pw_transfer/client_test.cc870
-rw-r--r--pw_transfer/context.cc68
-rw-r--r--pw_transfer/docs.rst206
-rw-r--r--pw_transfer/handler_test.cc2
-rw-r--r--pw_transfer/integration_test/BUILD.bazel36
-rw-r--r--pw_transfer/integration_test/JavaClient.java54
-rw-r--r--pw_transfer/integration_test/client.cc58
-rw-r--r--pw_transfer/integration_test/config.proto19
-rw-r--r--pw_transfer/integration_test/cross_language_medium_read_test.py56
-rw-r--r--pw_transfer/integration_test/cross_language_medium_write_test.py55
-rw-r--r--pw_transfer/integration_test/cross_language_small_test.py46
-rw-r--r--pw_transfer/integration_test/expected_errors_test.py95
-rw-r--r--pw_transfer/integration_test/proxy.py31
-rw-r--r--pw_transfer/integration_test/proxy_test.py114
-rw-r--r--pw_transfer/integration_test/python_client.py6
-rw-r--r--pw_transfer/integration_test/server.cc55
-rw-r--r--pw_transfer/integration_test/test_fixture.py34
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/BUILD.bazel47
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java33
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java28
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java74
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java15
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/TransferService.java4
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java12
-rw-r--r--pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java42
-rw-r--r--pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel1
-rw-r--r--pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java6
-rw-r--r--pw_transfer/public/pw_transfer/client.h133
-rw-r--r--pw_transfer/public/pw_transfer/handler.h63
-rw-r--r--pw_transfer/public/pw_transfer/internal/chunk.h9
-rw-r--r--pw_transfer/public/pw_transfer/internal/client_context.h30
-rw-r--r--pw_transfer/public/pw_transfer/internal/config.h120
-rw-r--r--pw_transfer/public/pw_transfer/internal/context.h26
-rw-r--r--pw_transfer/public/pw_transfer/internal/event.h43
-rw-r--r--pw_transfer/public/pw_transfer/internal/server_context.h8
-rw-r--r--pw_transfer/public/pw_transfer/transfer.h11
-rw-r--r--pw_transfer/public/pw_transfer/transfer_thread.h76
-rw-r--r--pw_transfer/pw_transfer_private/chunk_testing.h2
-rw-r--r--pw_transfer/py/pw_transfer/__init__.py6
-rw-r--r--pw_transfer/py/pw_transfer/chunk.py7
-rw-r--r--pw_transfer/py/pw_transfer/client.py245
-rw-r--r--pw_transfer/py/pw_transfer/transfer.py58
-rw-r--r--pw_transfer/py/tests/transfer_test.py69
-rw-r--r--pw_transfer/server_context.cc4
-rw-r--r--pw_transfer/transfer.cc75
-rw-r--r--pw_transfer/transfer.proto37
-rw-r--r--pw_transfer/transfer_test.cc96
-rw-r--r--pw_transfer/transfer_thread.cc118
-rw-r--r--pw_transfer/transfer_thread_test.cc16
-rw-r--r--pw_transfer/ts/transfer_test.ts2
57 files changed, 2758 insertions, 676 deletions
diff --git a/pw_transfer/BUILD.bazel b/pw_transfer/BUILD.bazel
index 1a8bc16a2..b260c766a 100644
--- a/pw_transfer/BUILD.bazel
+++ b/pw_transfer/BUILD.bazel
@@ -12,25 +12,31 @@
# License for the specific language governing permissions and limitations under
# the License.
-load("@com_google_protobuf//:protobuf.bzl", "py_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
-load("//pw_build:pigweed.bzl", "pw_cc_library", "pw_cc_test")
+load("@rules_python//python:proto.bzl", "py_proto_library")
+load("//pw_build:pigweed.bzl", "pw_cc_test")
load("//pw_protobuf_compiler:pw_proto_library.bzl", "pw_proto_library")
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
-pw_cc_library(
+cc_library(
name = "config",
hdrs = ["public/pw_transfer/internal/config.h"],
includes = ["public"],
deps = [
+ ":config_override",
"//pw_chrono:system_clock",
],
)
-pw_cc_library(
+label_flag(
+ name = "config_override",
+ build_setting_default = "//pw_build:default_module_config",
+)
+
+cc_library(
name = "core",
srcs = [
"chunk.cc",
@@ -76,7 +82,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "pw_transfer",
srcs = [
"transfer.cc",
@@ -98,7 +104,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client",
srcs = [
"client.cc",
@@ -116,7 +122,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "atomic_file_transfer_handler",
srcs = ["atomic_file_transfer_handler.cc"],
hdrs = [
@@ -131,14 +137,14 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "atomic_file_transfer_handler_internal",
srcs = [
"pw_transfer_private/filename_generator.h",
],
)
-pw_cc_library(
+cc_library(
name = "test_helpers",
srcs = [
"pw_transfer_private/chunk_testing.h",
@@ -241,7 +247,7 @@ pw_proto_library(
py_proto_library(
name = "transfer_proto_pb2",
- srcs = ["transfer.proto"],
+ deps = [":transfer_proto"],
)
java_lite_proto_library(
diff --git a/pw_transfer/BUILD.gn b/pw_transfer/BUILD.gn
index e4e15d8af..d00e42587 100644
--- a/pw_transfer/BUILD.gn
+++ b/pw_transfer/BUILD.gn
@@ -218,7 +218,7 @@ pw_test("transfer_test") {
pw_test("transfer_thread_test") {
enable_if = pw_thread_THREAD_BACKEND == "$dir_pw_thread_stl:thread" &&
- pw_unit_test_GOOGLETEST_BACKEND == "$dir_pw_unit_test:light"
+ pw_unit_test_BACKEND == "$dir_pw_unit_test:light"
sources = [ "transfer_thread_test.cc" ]
deps = [
":core",
@@ -273,6 +273,7 @@ pw_executable("integration_test_server") {
}
pw_executable("integration_test_client") {
+ testonly = pw_unit_test_TESTONLY
sources = [ "integration_test/client.cc" ]
deps = [
":client",
diff --git a/pw_transfer/atomic_file_transfer_handler_test.cc b/pw_transfer/atomic_file_transfer_handler_test.cc
index 77a2a90be..71696f820 100644
--- a/pw_transfer/atomic_file_transfer_handler_test.cc
+++ b/pw_transfer/atomic_file_transfer_handler_test.cc
@@ -20,13 +20,13 @@
#include <string>
#include <string_view>
-#include "gtest/gtest.h"
#include "pw_random/xor_shift.h"
#include "pw_result/result.h"
#include "pw_status/status.h"
#include "pw_string/string_builder.h"
#include "pw_transfer/transfer.h"
#include "pw_transfer_private/filename_generator.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer {
diff --git a/pw_transfer/chunk.cc b/pw_transfer/chunk.cc
index 900c66efc..2717a5500 100644
--- a/pw_transfer/chunk.cc
+++ b/pw_transfer/chunk.cc
@@ -182,6 +182,11 @@ Result<Chunk> Chunk::Parse(ConstByteSpan message) {
chunk.desired_session_id_ = value;
break;
+ case ProtoChunk::Fields::kInitialOffset:
+ PW_TRY(decoder.ReadUint32(&value));
+ chunk.set_initial_offset(value);
+ break;
+
// Silently ignore any unrecognized fields.
}
}
@@ -283,6 +288,10 @@ Result<ConstByteSpan> Chunk::Encode(ByteSpan buffer) const {
encoder.WriteOffset(offset_).IgnoreError();
}
+ if (initial_offset_ != 0) {
+ encoder.WriteInitialOffset(initial_offset_).IgnoreError();
+ }
+
if (remaining_bytes_.has_value()) {
encoder.WriteRemainingBytes(remaining_bytes_.value()).IgnoreError();
}
diff --git a/pw_transfer/chunk_test.cc b/pw_transfer/chunk_test.cc
index ecc2d687a..8faee8abe 100644
--- a/pw_transfer/chunk_test.cc
+++ b/pw_transfer/chunk_test.cc
@@ -14,8 +14,8 @@
#include "pw_transfer/internal/chunk.h"
-#include "gtest/gtest.h"
#include "pw_bytes/array.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer::internal {
namespace {
diff --git a/pw_transfer/client.cc b/pw_transfer/client.cc
index bbd3d0de7..b73a13079 100644
--- a/pw_transfer/client.cc
+++ b/pw_transfer/client.cc
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -20,17 +20,23 @@
namespace pw::transfer {
-Status Client::Read(uint32_t resource_id,
- stream::Writer& output,
- CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout,
- chrono::SystemClock::duration initial_chunk_timeout,
- ProtocolVersion protocol_version) {
+Result<Client::Handle> Client::Read(
+ uint32_t resource_id,
+ stream::Writer& output,
+ CompletionFunc&& on_completion,
+ ProtocolVersion protocol_version,
+ chrono::SystemClock::duration timeout,
+ chrono::SystemClock::duration initial_chunk_timeout,
+ uint32_t initial_offset) {
if (on_completion == nullptr ||
protocol_version == ProtocolVersion::kUnknown) {
return Status::InvalidArgument();
}
+ if (protocol_version < ProtocolVersion::kVersionTwo && initial_offset != 0) {
+ return Status::InvalidArgument();
+ }
+
if (!has_read_stream_) {
rpc::RawClientReaderWriter read_stream = client_.Read(
[this](ConstByteSpan chunk) {
@@ -43,30 +49,40 @@ Status Client::Read(uint32_t resource_id,
has_read_stream_ = true;
}
+ Handle handle = AssignHandle();
+
transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
protocol_version,
resource_id,
+ handle.id(),
&output,
max_parameters_,
std::move(on_completion),
timeout,
initial_chunk_timeout,
max_retries_,
- max_lifetime_retries_);
- return OkStatus();
+ max_lifetime_retries_,
+ initial_offset);
+ return handle;
}
-Status Client::Write(uint32_t resource_id,
- stream::Reader& input,
- CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout,
- chrono::SystemClock::duration initial_chunk_timeout,
- ProtocolVersion protocol_version) {
+Result<Client::Handle> Client::Write(
+ uint32_t resource_id,
+ stream::Reader& input,
+ CompletionFunc&& on_completion,
+ ProtocolVersion protocol_version,
+ chrono::SystemClock::duration timeout,
+ chrono::SystemClock::duration initial_chunk_timeout,
+ uint32_t initial_offset) {
if (on_completion == nullptr ||
protocol_version == ProtocolVersion::kUnknown) {
return Status::InvalidArgument();
}
+ if (protocol_version < ProtocolVersion::kVersionTwo && initial_offset != 0) {
+ return Status::InvalidArgument();
+ }
+
if (!has_write_stream_) {
rpc::RawClientReaderWriter write_stream = client_.Write(
[this](ConstByteSpan chunk) {
@@ -79,23 +95,31 @@ Status Client::Write(uint32_t resource_id,
has_write_stream_ = true;
}
+ Handle handle = AssignHandle();
+
transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
protocol_version,
resource_id,
+ handle.id(),
&input,
max_parameters_,
std::move(on_completion),
timeout,
initial_chunk_timeout,
max_retries_,
- max_lifetime_retries_);
+ max_lifetime_retries_,
+ initial_offset);
- return OkStatus();
+ return handle;
}
-void Client::CancelTransfer(uint32_t resource_id) {
- transfer_thread_.EndClientTransfer(
- resource_id, Status::Cancelled(), /*send_status_chunk=*/true);
+Client::Handle Client::AssignHandle() {
+ uint32_t handle_id = next_handle_id_++;
+ if (handle_id == Handle::kUnassignedHandleId) {
+ handle_id = next_handle_id_++;
+ }
+
+ return Handle(this, handle_id);
}
void Client::OnRpcError(Status status, internal::TransferType type) {
diff --git a/pw_transfer/client_context.cc b/pw_transfer/client_context.cc
index 33c1aeb9f..2c627a4ed 100644
--- a/pw_transfer/client_context.cc
+++ b/pw_transfer/client_context.cc
@@ -1,4 +1,4 @@
-// Copyright 2021 The Pigweed Authors
+// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -23,7 +23,12 @@ Status ClientContext::FinalCleanup(Status status) {
if (on_completion_ != nullptr) {
on_completion_(status);
}
+ handle_id_ = 0;
return OkStatus();
}
+Status ClientContext::SeekReader(uint32_t offset) {
+ return reader().Seek(offset - initial_offset_);
+}
+
} // namespace pw::transfer::internal
diff --git a/pw_transfer/client_test.cc b/pw_transfer/client_test.cc
index 323570dcf..5af4a3bcd 100644
--- a/pw_transfer/client_test.cc
+++ b/pw_transfer/client_test.cc
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -16,7 +16,6 @@
#include <cstring>
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_bytes/array.h"
#include "pw_rpc/raw/client_testing.h"
@@ -24,6 +23,7 @@
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
#include "pw_transfer_private/chunk_testing.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer::test {
namespace {
@@ -42,11 +42,17 @@ class ReadTransfer : public ::testing::Test {
protected:
ReadTransfer(size_t max_bytes_to_receive = 0)
: transfer_thread_(chunk_buffer_, encode_buffer_),
+ legacy_client_(context_.client(),
+ context_.channel().id(),
+ transfer_thread_,
+ max_bytes_to_receive),
client_(context_.client(),
context_.channel().id(),
transfer_thread_,
max_bytes_to_receive),
- system_thread_(TransferThreadOptions(), transfer_thread_) {}
+ system_thread_(TransferThreadOptions(), transfer_thread_) {
+ legacy_client_.set_protocol_version(ProtocolVersion::kLegacy);
+ }
~ReadTransfer() override {
transfer_thread_.Terminate();
@@ -56,6 +62,7 @@ class ReadTransfer : public ::testing::Test {
rpc::RawClientTestContext<> context_;
Thread<1, 1> transfer_thread_;
+ Client legacy_client_;
Client client_;
std::array<std::byte, 64> chunk_buffer_;
@@ -71,10 +78,13 @@ TEST_F(ReadTransfer, SingleChunk) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(3, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -86,6 +96,7 @@ TEST_F(ReadTransfer, SingleChunk) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 3u);
+ EXPECT_EQ(c0.resource_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -114,10 +125,13 @@ TEST_F(ReadTransfer, MultiChunk) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(4, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(4,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -129,6 +143,7 @@ TEST_F(ReadTransfer, MultiChunk) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 4u);
+ EXPECT_EQ(c0.resource_id(), 4u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -167,10 +182,13 @@ TEST_F(ReadTransfer, MultipleTransfers) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(3, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
context_.server().SendServerStream<Transfer::Read>(
@@ -184,10 +202,13 @@ TEST_F(ReadTransfer, MultipleTransfers) {
ASSERT_EQ(transfer_status, OkStatus());
transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(3, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
context_.server().SendServerStream<Transfer::Read>(
@@ -208,7 +229,7 @@ class ReadTransferMaxBytes32 : public ReadTransfer {
TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
stream::MemoryWriterBuffer<64> writer;
- EXPECT_EQ(OkStatus(), client_.Read(5, writer, [](Status) {}));
+ EXPECT_EQ(OkStatus(), legacy_client_.Read(5, writer, [](Status) {}).status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -218,6 +239,7 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 5u);
+ EXPECT_EQ(c0.resource_id(), 5u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 32u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -225,7 +247,8 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
stream::MemoryWriterBuffer<16> small_writer;
- EXPECT_EQ(OkStatus(), client_.Read(5, small_writer, [](Status) {}));
+ EXPECT_EQ(OkStatus(),
+ legacy_client_.Read(5, small_writer, [](Status) {}).status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -235,6 +258,7 @@ TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 5u);
+ EXPECT_EQ(c0.resource_id(), 5u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 16u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -244,10 +268,13 @@ TEST_F(ReadTransferMaxBytes32, MultiParameters) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(6, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(6,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -258,6 +285,7 @@ TEST_F(ReadTransferMaxBytes32, MultiParameters) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 6u);
+ EXPECT_EQ(c0.resource_id(), 6u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.window_end_offset(), 32u);
@@ -301,10 +329,13 @@ TEST_F(ReadTransfer, UnexpectedOffset) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(7, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(7,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -315,6 +346,7 @@ TEST_F(ReadTransfer, UnexpectedOffset) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 7u);
+ EXPECT_EQ(c0.resource_id(), 7u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
@@ -371,10 +403,13 @@ TEST_F(ReadTransferMaxBytes32, TooMuchData) {
stream::MemoryWriterBuffer<32> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(8, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(8,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -385,6 +420,7 @@ TEST_F(ReadTransferMaxBytes32, TooMuchData) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 8u);
+ EXPECT_EQ(c0.resource_id(), 8u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.window_end_offset(), 32u);
@@ -426,10 +462,13 @@ TEST_F(ReadTransfer, ServerError) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(9, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(9,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -440,6 +479,7 @@ TEST_F(ReadTransfer, ServerError) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 9u);
+ EXPECT_EQ(c0.resource_id(), 9u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.window_end_offset(), 64u);
@@ -457,10 +497,13 @@ TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(10, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(10,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -471,6 +514,7 @@ TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 10u);
+ EXPECT_EQ(c0.resource_id(), 10u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.window_end_offset(), 64u);
@@ -525,10 +569,13 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(11, writer, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(11,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -539,6 +586,7 @@ TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 11u);
+ EXPECT_EQ(c0.resource_id(), 11u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.window_end_offset(), 64u);
@@ -618,12 +666,16 @@ TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 12,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(
+ 12,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout,
+ kTestTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -634,6 +686,7 @@ TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 12u);
+ EXPECT_EQ(c0.resource_id(), 12u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -675,12 +728,15 @@ TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 13,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ ASSERT_EQ(
+ OkStatus(),
+ legacy_client_
+ .Read(
+ 13,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -691,6 +747,7 @@ TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 13u);
+ EXPECT_EQ(c0.resource_id(), 13u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -744,12 +801,13 @@ TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 14,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Read(
+ 14,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout,
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -760,6 +818,7 @@ TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 14u);
+ EXPECT_EQ(c0.resource_id(), 14u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);
@@ -800,12 +859,13 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
constexpr ConstByteSpan data(kData32);
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 14,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Read(
+ 14,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout,
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer parameters chunk is sent.
@@ -816,6 +876,7 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 14u);
+ EXPECT_EQ(c0.resource_id(), 14u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
@@ -864,7 +925,7 @@ TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
EXPECT_EQ(c.window_end_offset(), 64u);
// Ensure we don't leave a dangling reference to transfer_status.
- client_.CancelTransfer(14);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
}
@@ -875,15 +936,17 @@ TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
context_.output().set_send_status(Status::Unauthenticated());
ASSERT_EQ(OkStatus(),
- client_.Read(
- 14,
- writer,
- [&transfer_status](Status status) {
- ASSERT_EQ(transfer_status,
- Status::Unknown()); // Must only call once
- transfer_status = status;
- },
- kTestTimeout));
+ legacy_client_
+ .Read(
+ 14,
+ writer,
+ [&transfer_status](Status status) {
+ ASSERT_EQ(transfer_status,
+ Status::Unknown()); // Must only call once
+ transfer_status = status;
+ },
+ kTestTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(transfer_status, Status::Internal());
@@ -893,8 +956,12 @@ class WriteTransfer : public ::testing::Test {
protected:
WriteTransfer()
: transfer_thread_(chunk_buffer_, encode_buffer_),
+ legacy_client_(
+ context_.client(), context_.channel().id(), transfer_thread_),
client_(context_.client(), context_.channel().id(), transfer_thread_),
- system_thread_(TransferThreadOptions(), transfer_thread_) {}
+ system_thread_(TransferThreadOptions(), transfer_thread_) {
+ legacy_client_.set_protocol_version(ProtocolVersion::kLegacy);
+ }
~WriteTransfer() override {
transfer_thread_.Terminate();
@@ -904,6 +971,7 @@ class WriteTransfer : public ::testing::Test {
rpc::RawClientTestContext<> context_;
Thread<1, 1> transfer_thread_;
+ Client legacy_client_;
Client client_;
std::array<std::byte, 64> chunk_buffer_;
@@ -917,9 +985,13 @@ TEST_F(WriteTransfer, SingleChunk) {
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
- client_.Write(3, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ legacy_client_
+ .Write(3,
+ reader,
+ [&transfer_status](Status status) {
+ transfer_status = status;
+ })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -974,9 +1046,13 @@ TEST_F(WriteTransfer, MultiChunk) {
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
- client_.Write(4, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ legacy_client_
+ .Write(4,
+ reader,
+ [&transfer_status](Status status) {
+ transfer_status = status;
+ })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1041,9 +1117,13 @@ TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
- client_.Write(5, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ legacy_client_
+ .Write(5,
+ reader,
+ [&transfer_status](Status status) {
+ transfer_status = status;
+ })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1121,9 +1201,13 @@ TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
- client_.Write(6, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ legacy_client_
+ .Write(6,
+ reader,
+ [&transfer_status](Status status) {
+ transfer_status = status;
+ })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1163,9 +1247,13 @@ TEST_F(WriteTransfer, ServerError) {
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
- client_.Write(7, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ legacy_client_
+ .Write(7,
+ reader,
+ [&transfer_status](Status status) {
+ transfer_status = status;
+ })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1194,9 +1282,13 @@ TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
- client_.Write(9, reader, [&transfer_status](Status status) {
- transfer_status = status;
- }));
+ legacy_client_
+ .Write(9,
+ reader,
+ [&transfer_status](Status status) {
+ transfer_status = status;
+ })
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1234,12 +1326,13 @@ TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 10,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 10,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout,
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1267,7 +1360,7 @@ TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
EXPECT_EQ(transfer_status, Status::Unknown());
// Ensure we don't leave a dangling reference to transfer_status.
- client_.CancelTransfer(10);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
}
@@ -1275,12 +1368,12 @@ TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 11,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 11,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1340,7 +1433,7 @@ TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
EXPECT_EQ(transfer_status, Status::Unknown());
// Ensure we don't leave a dangling reference to transfer_status.
- client_.CancelTransfer(11);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
}
@@ -1348,12 +1441,12 @@ TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 12,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 12,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1432,12 +1525,13 @@ TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 13,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 13,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout,
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1481,7 +1575,7 @@ TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
ASSERT_EQ(payloads.size(), 4u);
// Ensure we don't leave a dangling reference to transfer_status.
- client_.CancelTransfer(13);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
}
@@ -1489,12 +1583,12 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
FakeNonSeekableReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 14,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 14,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1553,7 +1647,7 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
// Ensure we don't leave a dangling reference to transfer_status.
- client_.CancelTransfer(14);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
}
@@ -1561,12 +1655,12 @@ TEST_F(WriteTransfer, ManualCancel) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 15,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 15,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1592,7 +1686,7 @@ TEST_F(WriteTransfer, ManualCancel) {
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 2u);
- client_.CancelTransfer(15);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
// Client should send a cancellation chunk to the server.
@@ -1609,12 +1703,13 @@ TEST_F(WriteTransfer, ManualCancel_NoContact) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 15,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- kTestTimeout));
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 15,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout,
+ kTestTimeout);
+ ASSERT_EQ(handle.status(), OkStatus());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -1629,7 +1724,7 @@ TEST_F(WriteTransfer, ManualCancel_NoContact) {
EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
// Cancel transfer without a server response. No final chunk should be sent.
- client_.CancelTransfer(15);
+ handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(payloads.size(), 1u);
@@ -1637,18 +1732,75 @@ TEST_F(WriteTransfer, ManualCancel_NoContact) {
EXPECT_EQ(transfer_status, Status::Cancelled());
}
+TEST_F(WriteTransfer, ManualCancel_Duplicate) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ Result<Client::Handle> handle = legacy_client_.Write(
+ 16,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), handle.status());
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 16u);
+ EXPECT_EQ(chunk.resource_id(), 16u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+
+ // Get a response from the server, then cancel the transfer.
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(16)
+ .set_offset(0)
+ .set_window_end_offset(16) // Request only a single chunk.
+ .set_max_chunk_size_bytes(16)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+ ASSERT_EQ(payloads.size(), 2u);
+
+ handle->Cancel();
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should send a cancellation chunk to the server.
+ ASSERT_EQ(payloads.size(), 3u);
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 16u);
+ ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.status().value(), Status::Cancelled());
+
+ EXPECT_EQ(transfer_status, Status::Cancelled());
+
+ // Attempt to cancel the transfer again.
+ transfer_status = Status::Unknown();
+ handle->Cancel();
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // No further chunks should be sent.
+ EXPECT_EQ(payloads.size(), 3u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+}
+
TEST_F(ReadTransfer, Version2_SingleChunk) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1722,14 +1874,16 @@ TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1781,14 +1935,16 @@ TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1871,14 +2027,16 @@ TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1965,14 +2123,16 @@ TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -2006,14 +2166,16 @@ TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -2112,14 +2274,16 @@ TEST_F(ReadTransfer,
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Read(
- 3,
- writer,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
@@ -2204,14 +2368,16 @@ TEST_F(WriteTransfer, Version2_SingleChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 3,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -2294,14 +2460,16 @@ TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 3,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -2362,14 +2530,16 @@ TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 3,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -2460,14 +2630,16 @@ TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 3,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -2560,14 +2732,16 @@ TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
- ASSERT_EQ(OkStatus(),
- client_.Write(
- 3,
- reader,
- [&transfer_status](Status status) { transfer_status = status; },
- cfg::kDefaultChunkTimeout,
- cfg::kDefaultChunkTimeout,
- ProtocolVersion::kVersionTwo));
+ ASSERT_EQ(
+ OkStatus(),
+ client_
+ .Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultClientTimeout,
+ cfg::kDefaultClientTimeout)
+ .status());
transfer_thread_.WaitUntilEventIsProcessed();
// The client begins by sending the ID of the resource to transfer.
@@ -2591,5 +2765,217 @@ TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
EXPECT_EQ(transfer_status, Status::NotFound());
}
+TEST_F(WriteTransfer, Write_UpdateTransferSize) {
+ FakeNonSeekableReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ Result<Client::Handle> result = client_.Write(
+ 91,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), result.status());
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ Client::Handle handle = *result;
+ handle.SetTransferSize(kData32.size());
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.desired_session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 91u);
+ EXPECT_EQ(chunk.offset(), 0u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(1)
+ .set_resource_id(91)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // The server can then begin the data transfer by sending its transfer
+ // parameters. Client should respond with data chunks.
+ rpc::test::WaitForPackets(context_.output(), 5, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(1)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(8)));
+ });
+
+ ASSERT_EQ(payloads.size(), 7u);
+
+ // Each 8-byte chunk of the 32-byte transfer should have an appropriate
+ // `remaining_bytes` value set.
+ chunk = DecodeChunk(payloads[2]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 24u);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 8u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 16u);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 16u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 8u);
+
+ chunk = DecodeChunk(payloads[5]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 24u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send the final status chunk to complete the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should acknowledge the completion of the transfer.
+ EXPECT_EQ(payloads.size(), 8u);
+
+ chunk = DecodeChunk(payloads[7]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+
+ EXPECT_EQ(transfer_status, OkStatus());
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ handle.Cancel();
+ transfer_thread_.WaitUntilEventIsProcessed();
+}
+
+TEST_F(WriteTransfer, Write_TransferSize_Large) {
+ FakeNonSeekableReader reader(kData64);
+ Status transfer_status = Status::Unknown();
+
+ Result<Client::Handle> result = client_.Write(
+ 91,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ kTestTimeout);
+ ASSERT_EQ(OkStatus(), result.status());
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Set a large transfer size that will encode to a multibyte varint.
+ constexpr size_t kLargeRemainingBytes = 1u << 28;
+ Client::Handle handle = *result;
+ handle.SetTransferSize(kLargeRemainingBytes);
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.desired_session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 91u);
+ EXPECT_EQ(chunk.offset(), 0u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(1)
+ .set_resource_id(91)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // The server can then begin the data transfer by sending its transfer
+ // parameters. Client should respond with data chunks.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(1)
+ .set_offset(0)
+ .set_window_end_offset(1024)
+ .set_max_chunk_size_bytes(64)));
+ });
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ // The transfer should reserve appropriate space for the `remaining_bytes`
+ // value and not fail to encode.
+ chunk = DecodeChunk(payloads[2]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.payload().size_bytes(), 48u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(),
+ kLargeRemainingBytes - chunk.payload().size_bytes());
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send the final status chunk to complete the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Client should acknowledge the completion of the transfer.
+ EXPECT_EQ(payloads.size(), 5u);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 1u);
+
+ EXPECT_EQ(transfer_status, OkStatus());
+
+ // Ensure we don't leave a dangling reference to transfer_status.
+ handle.Cancel();
+ transfer_thread_.WaitUntilEventIsProcessed();
+}
+
} // namespace
} // namespace pw::transfer::test
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index 8f64488a3..4cb08c5b4 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -13,15 +13,17 @@
// the License.
#define PW_LOG_MODULE_NAME "TRN"
+#define PW_LOG_LEVEL PW_LOG_LEVEL_INFO
#include "pw_transfer/internal/context.h"
#include <chrono>
+#include <limits>
#include "pw_assert/check.h"
#include "pw_chrono/system_clock.h"
#include "pw_log/log.h"
-#include "pw_status/try.h"
+#include "pw_protobuf/serialized_size.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_transfer/transfer_thread.h"
#include "pw_varint/varint.h"
@@ -87,6 +89,8 @@ void Context::HandleEvent(const Event& event) {
case EventType::kAddTransferHandler:
case EventType::kRemoveTransferHandler:
case EventType::kTerminate:
+ case EventType::kUpdateClientTransfer:
+ case EventType::kGetResourceStatus:
// These events are intended for the transfer thread and should never be
// forwarded through to a context.
PW_CRASH("Transfer context received a transfer thread event");
@@ -124,6 +128,7 @@ void Context::InitiateTransferAsClient() {
Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
start_chunk.set_desired_session_id(session_id_);
start_chunk.set_resource_id(resource_id_);
+ start_chunk.set_initial_offset(offset_);
if (type() == TransferType::kReceive) {
// Parameters should still be set on the initial chunk for backwards
@@ -135,15 +140,17 @@ void Context::InitiateTransferAsClient() {
}
bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
- PW_LOG_INFO("Starting %s transfer %u for resource %u",
+ PW_LOG_INFO("Starting %s transfer %u for resource %u with offset %u",
new_transfer.type == TransferType::kTransmit ? "read" : "write",
static_cast<unsigned>(new_transfer.session_id),
- static_cast<unsigned>(new_transfer.resource_id));
+ static_cast<unsigned>(new_transfer.resource_id),
+ static_cast<unsigned>(new_transfer.initial_offset));
LogTransferConfiguration();
flags_ |= kFlagsContactMade;
- if (Status status = new_transfer.handler->Prepare(new_transfer.type);
+ if (Status status = new_transfer.handler->Prepare(
+ new_transfer.type, new_transfer.initial_offset);
!status.ok()) {
PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
static_cast<unsigned>(new_transfer.handler->id()),
@@ -154,7 +161,10 @@ bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
// the desired version here, as that comes from the client.
configured_protocol_version_ = desired_protocol_version_;
- status = status.IsPermissionDenied() ? status : Status::DataLoss();
+ status = (status.IsPermissionDenied() || status.IsUnimplemented() ||
+ status.IsResourceExhausted())
+ ? status
+ : Status::DataLoss();
TerminateTransfer(status, /*with_resource_id=*/true);
return false;
}
@@ -239,6 +249,10 @@ void Context::SendTransferParameters(TransmitAction action) {
void Context::EncodeAndSendChunk(const Chunk& chunk) {
last_chunk_sent_ = chunk.type();
+ PW_LOG_DEBUG("Transfer %u sending chunk type %u",
+ id_for_log(),
+ static_cast<unsigned>(last_chunk_sent_));
+
Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
if (!data.ok()) {
PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
@@ -293,7 +307,8 @@ void Context::Initialize(const NewTransferEvent& new_transfer) {
rpc_writer_ = new_transfer.rpc_writer;
stream_ = new_transfer.stream;
- offset_ = 0;
+ offset_ = new_transfer.initial_offset;
+ initial_offset_ = new_transfer.initial_offset;
window_size_ = 0;
window_end_offset_ = 0;
max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
@@ -349,11 +364,18 @@ void Context::PerformInitialHandshake(const Chunk& chunk) {
case Chunk::Type::kStart: {
UpdateLocalProtocolConfigurationFromPeer(chunk);
+ if (type() == TransferType::kReceive) {
+ // Update window end offset so it is valid.
+ window_end_offset_ = offset_;
+ }
+
// This cast is safe as we know we're running in a transfer server.
uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
- start_ack.set_session_id(session_id_).set_resource_id(resource_id);
+ start_ack.set_session_id(session_id_);
+ start_ack.set_resource_id(resource_id);
+ start_ack.set_initial_offset(offset_);
EncodeAndSendChunk(start_ack);
break;
@@ -362,6 +384,12 @@ void Context::PerformInitialHandshake(const Chunk& chunk) {
// Response packet sent from a server to a client, confirming the protocol
// version and session_id of the transfer.
case Chunk::Type::kStartAck: {
+ // This should confirm the offset we're starting at
+ if (offset_ != chunk.initial_offset()) {
+ TerminateTransfer(Status::Unimplemented());
+ break;
+ }
+
UpdateLocalProtocolConfigurationFromPeer(chunk);
Chunk start_ack_confirmation(configured_protocol_version_,
@@ -402,11 +430,20 @@ void Context::PerformInitialHandshake(const Chunk& chunk) {
case Chunk::Type::kData:
case Chunk::Type::kParametersRetransmit:
case Chunk::Type::kParametersContinue:
+
// Update the local session_id, which will map to the transfer_id of the
// legacy chunk.
session_id_ = chunk.session_id();
configured_protocol_version_ = ProtocolVersion::kLegacy;
+ // Cancel if we are not using at least version 2, and we tried to start a
+ // non-zero offset transfer
+ if (chunk.initial_offset() != 0) {
+ PW_LOG_ERROR("Legacy transfer does not support offset transfers!");
+ TerminateTransfer(Status::Internal());
+ break;
+ }
+
set_transfer_state(TransferState::kWaiting);
PW_LOG_DEBUG(
@@ -497,8 +534,7 @@ void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
// readers support seeking; abort with UNIMPLEMENTED if this handler
// doesn't.
if (offset_ != chunk.offset()) {
- if (Status seek_status = reader().Seek(chunk.offset());
- !seek_status.ok()) {
+ if (Status seek_status = SeekReader(chunk.offset()); !seek_status.ok()) {
PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
static_cast<unsigned>(session_id_),
static_cast<unsigned>(chunk.offset()),
@@ -559,6 +595,12 @@ void Context::TransmitNextChunk(bool retransmit_requested) {
size_t reserved_size =
chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
+ size_t total_size = TransferSizeBytes();
+ if (total_size != std::numeric_limits<size_t>::max()) {
+ reserved_size += protobuf::SizeOfVarintField(
+ pwpb::Chunk::Fields::kRemainingBytes, total_size);
+ }
+
ByteSpan buffer = thread_->encode_buffer();
ByteSpan data_buffer = buffer.subspan(reserved_size);
@@ -603,6 +645,10 @@ void Context::TransmitNextChunk(bool retransmit_requested) {
chunk.set_payload(data.value());
last_chunk_offset_ = offset_;
offset_ += data.value().size();
+
+ if (total_size != std::numeric_limits<size_t>::max()) {
+ chunk.set_remaining_bytes(total_size - offset_);
+ }
} else {
PW_LOG_ERROR("Transfer %u Read() failed with status %u",
static_cast<unsigned>(session_id_),
@@ -1021,7 +1067,7 @@ void Context::Retry() {
// Otherwise, resend the most recent chunk. If the reader doesn't support
// seeking, this isn't possible, so just terminate the transfer immediately.
- if (!reader().Seek(last_chunk_offset_).ok()) {
+ if (!SeekReader(last_chunk_offset_).ok()) {
PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
id_for_log());
PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index d63890c2a..5bf3ded4f 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -88,6 +88,7 @@ for additional information.
return transfer_thread;
}
+.. _pw_transfer-transfer-server:
Transfer server
---------------
@@ -129,6 +130,28 @@ support ephemeral transfer resources.
pw::stream::MemoryReader reader_;
};
+Handlers may optionally implement a `GetStatus` method, which allows clients to
+query the status of a resource with a handler registered. The application layer
+above transfer can choose how to fill and interpret this information. The status
+information is `readable_offset`, `writeable_offset`, `read_checksum`, and
+`write_checksum`.
+
+**Example GetStatus implementation**
+
+.. code-block:: cpp
+
+ Status GetStatus(uint64_t& readable_offset,
+ uint64_t& writeable_offset,
+ uint64_t& read_checksum,
+ uint64_t& write_checksum) {
+ readable_offset = resource.get_size();
+ writeable_offset = resource.get_writeable_offset();
+ read_checksum = resource.get_crc();
+ write_checksum = resource.calculate_crc(0, writeable_offset);
+
+ return pw::OkStatus();
+ }
+
The transfer service is instantiated with a reference to the system's transfer
thread and registered with the system's RPC server.
@@ -174,9 +197,9 @@ an RPC client.
Currently, a transfer client is only capable of running transfers on a single
RPC channel. This may be expanded in the future.
-The transfer client provides the following two APIs for starting data transfers:
+The transfer client provides the following APIs for managing data transfers:
-.. cpp:function:: pw::Status pw::transfer::Client::Read(uint32_t resource_id, pw::stream::Writer& output, CompletionFunc&& on_completion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion)
+.. cpp:function:: Result<pw::Transfer::Client::Handle> pw::transfer::Client::Read(uint32_t resource_id, pw::stream::Writer& output, CompletionFunc&& on_completion, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout, pw::chrono::SystemClock::duration initial_chunk_timeout = cfg::kDefaultInitialChunkTimeout, uint32_t initial_offset = 0u)
Reads data from a transfer server to the specified ``pw::stream::Writer``.
Invokes the provided callback function with the overall status of the
@@ -186,7 +209,9 @@ The transfer client provides the following two APIs for starting data transfers:
return a non-OK status if it is called with bad arguments. Otherwise, it will
return OK and errors will be reported through the completion callback.
-.. cpp:function:: pw::Status pw::transfer::Client::Write(uint32_t resource_id, pw::stream::Reader& input, CompletionFunc&& on_completion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion)
+ For using the offset parameter, please see :ref:`pw_transfer-nonzero-transfers`.
+
+.. cpp:function:: Result<pw::Transfer::Client::Handle> pw::transfer::Client::Write(uint32_t resource_id, pw::stream::Reader& input, CompletionFunc&& on_completion, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout, pw::chrono::SystemClock::duration initial_chunk_timeout = cfg::kDefaultInitialChunkTimeout, uint32_t initial_offset = 0u)
Writes data from a source ``pw::stream::Reader`` to a transfer server.
Invokes the provided callback function with the overall status of the
@@ -196,6 +221,21 @@ The transfer client provides the following two APIs for starting data transfers:
return a non-OK status if it is called with bad arguments. Otherwise, it will
return OK and errors will be reported through the completion callback.
+ For using the offset parameter, please see :ref:`pw_transfer-nonzero-transfers`.
+
+Transfer handles
+^^^^^^^^^^^^^^^^
+Each transfer session initiated by a client returns a ``Handle`` object which
+is used to manage the transfer. These handles support the following operations:
+
+.. cpp:function:: pw::Transfer::Client::Handle::Cancel()
+
+ Terminates the ongoing transfer.
+
+.. cpp:function:: pw::Transfer::Client::Handle::SetTransferSize(size_t size_bytes)
+
+ In a write transfer, indicates the total size of the transfer resource.
+
**Example client setup**
.. code-block:: cpp
@@ -220,19 +260,59 @@ The transfer client provides the following two APIs for starting data transfers:
pw::Status status;
} transfer_state;
- transfer_client.Read(
+ Result<pw::transfer::Client::Handle> handle = transfer_client.Read(
kMagicBufferResourceId,
writer,
[&transfer_state](pw::Status status) {
transfer_state.status = status;
transfer_state.notification.release();
});
+ if (!handle.ok()) {
+ return handle.status();
+ }
// Block until the transfer completes.
transfer_state.notification.acquire();
return transfer_state.status;
}
+Specifying Resource Sizes
+-------------------------
+Transfer data is sent and received through the ``pw::Stream`` interface, which
+does not have a concept of overall stream size. Users of transfers that are
+fixed-size may optionally indicate this to the transfer client and server,
+which will be shared with the transfer peer to enable features such as progress
+reporting.
+
+The transfer size can only be set on the transmitting side of the transfer;
+that is, the client in a ``Write`` transfer or the server in a ``Read``
+transfer.
+
+**Setting a transfer size from a transmitting client**
+
+.. code-block:: c++
+
+ Result<pw::transfer::Client::Handle> handle = client.Write(...);
+ if (handle.ok()) {
+ handle->SetTransferSize(kMyResourceSize);
+ }
+
+**Setting a transfer size on a server resource**
+
+ The ``TransferHandler`` interface allows overriding its ``ResourceSize``
+ function to return the size of its transfer resource.
+
+.. code-block:: c++
+
+ class MyResourceHandler : public pw::transfer::ReadOnlyHandler {
+ public:
+ Status PrepareRead() final;
+
+ virtual size_t ResourceSize() const final {
+ return kMyResourceSize;
+ }
+ };
+
Atomic File Transfer Handler
----------------------------
Transfers are handled using the generic `Handler` interface. A specialized
@@ -242,6 +322,8 @@ always in a correct state. A temporary file is written to prior to updating the
target file. If any transfer failure occurs, the transfer is aborted and the
target file is either not created or not updated.
+.. _module-pw_transfer-config:
+
Module Configuration Options
----------------------------
The following configurations can be adjusted via compile-time configuration of
@@ -249,10 +331,19 @@ this module, see the
:ref:`module documentation <module-structure-compile-time-configuration>` for
more details.
-.. c:macro:: PW_TRANSFER_DEFAULT_MAX_RETRIES
+.. c:macro:: PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES
+
+ The default maximum number of times a transfer client should retry sending a
+ chunk when no response is received. Can later be configured per-transfer when
+ starting one.
- The default maximum number of times a transfer should retry sending a chunk
- when no response is received. This can later be configured per-transfer.
+.. c:macro:: PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES
+
+ The default maximum number of times a transfer server should retry sending a
+ chunk when no response is received.
+
+ In typical setups, retries are driven by the client, and timeouts on the
+ server are used only to clean up resources, so this defaults to 0.
.. c:macro:: PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES
@@ -263,10 +354,16 @@ more details.
expected. Its purpose is to prevent transfers from getting stuck in an
infinite loop.
-.. c:macro:: PW_TRANSFER_DEFAULT_TIMEOUT_MS
+.. c:macro:: PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS
+
+ The default amount of time, in milliseconds, to wait for a chunk to arrive
+ in a transfer client before retrying. This can later be configured
+ per-transfer.
+
+.. c:macro:: PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS
The default amount of time, in milliseconds, to wait for a chunk to arrive
- before retrying. This can later be configured per-transfer.
+ on the server before retrying. This can later be configured per-transfer.
.. c:macro:: PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS
@@ -288,6 +385,44 @@ more details.
requested data has been received, a divisor of three will extend at a third
of the window, and so on.
+.. _pw_transfer-nonzero-transfers:
+
+Non-zero Starting Offset Transfers
+----------------------------------
+`pw_transfer` provides for transfers which read from or
+write to a server resource starting from a point after the beginning.
+Handling of read/write/erase boundaries of the resource storage backend must
+be handled by the user through the transfer handler interfaces of `GetStatus`
+and `PrepareRead/Write(uint32_t offset)`.
+
+A resource can be read or written from a non-zero starting offset simply by
+having the transfer client calling `read()` or `write()` with an offset
+parameter. The offset gets included in the starting handshake.
+
+.. note::
+ The data or stream passed to `read()` or `write()` will be used as-is. I.e.
+ no seeking will be applied; the user is expected to seek to the desired
+ location.
+
+On the server side, the offset is accepted, and passed to the transfer
+handler's `Prepare(uint32_t)` method. This method must be implemented
+specifically by the handler in order to support the offset transfer. The
+transfer handler confirms that the start offset is valid for the read/write
+operation, and the server responds with the offset to confirm the non-zero
+transfer operation. Older server sw will ignore the offset, so the clients
+check that the server has accepted the non-zero offset during the handshake, so
+users may elect to catch such errors. Clients return `Status.UNIMPLEMENTED` in
+such cases.
+
+Due to the need to seek streams by the handler to support the non-zero offset,
+it is recommended to return `Status.RESOURCE_EXHAUSTED` if a seek is requested
+past the end of the stream.
+
+See the :ref:`transfer handler <pw_transfer-transfer-server>` documentation for
+further information about configuring resources for non-zero transfers and the
+interface documentation in
+``pw/transfer/public/pw_transfer/handler.h``
+
Python
======
.. automodule:: pw_transfer
@@ -621,6 +756,8 @@ implementations detect if their transfer peer is running the legacy protocol and
automatically switch to it if required, even if they requested a newer protocol
version. It is **strongly** unadvised to use the legacy protocol in new code.
+.. _module-pw_transfer-integration-tests:
+
-----------------
Integration tests
-----------------
@@ -632,14 +769,14 @@ To run the tests on your machine, run
.. code-block:: bash
- $ bazel test --features=c++17 \
+ $ bazel test \
pw_transfer/integration_test:cross_language_small_test \
pw_transfer/integration_test:cross_language_medium_test
.. note:: There is a large test that tests transfers that are megabytes in size.
These are not run automatically, but can be run manually via the
- pw_transfer/integration_test:cross_language_large_test test. These are VERY
- slow, but exist for manual validation of real-world use cases.
+ ``pw_transfer/integration_test:cross_language_large_test`` test. These are
+ VERY slow, but exist for manual validation of real-world use cases.
The integration tests permit injection of client/server/proxy binaries to use
when running the tests. This allows manual testing of older versions of
@@ -665,8 +802,8 @@ The CIPD package contents can be created with this command:
.. code-block::bash
- $ bazel build --features=c++17 pw_transfer/integration_test:server \
- pw_transfer/integration_test:cpp_client
+ $ bazel build pw_transfer/integration_test:server \
+ pw_transfer/integration_test:cpp_client
$ mkdir pw_transfer_test_binaries
$ cp bazel-bin/pw_transfer/integration_test/server \
pw_transfer_test_binaries
@@ -678,7 +815,7 @@ updating a CIPD package <go/pigweed-cipd#installing-packages-into-cipd>`_.
CI/CQ integration
=================
-`Current status of the test in CI <https://ci.chromium.org/p/pigweed/builders/ci/pigweed-integration-transfer>`_.
+`Current status of the test in CI <https://ci.chromium.org/ui/p/pigweed/builders/luci.pigweed.pigweed.ci/pigweed-linux-bzl-integration>`_.
By default, these tests are not run in CQ (on presubmit) because they are too
slow. However, you can request that the tests be run in presubmit on your
@@ -686,7 +823,44 @@ change by adding to following line to the commit message footer:
.. code-block::
- Cq-Include-Trybots: luci.pigweed.try:pigweed-integration-transfer
+ Cq-Include-Trybots: luci.pigweed.try:pigweed-linux-bzl-integration
+
+.. _module-pw_transfer-parallel-tests:
+
+Running the tests many times
+============================
+Because the tests bind to network ports, you cannot run more than one instance
+of each test in parallel. However, you might want to do so, e.g. to debug
+flakes. This section describes a manual process that makes this possible.
+
+Linux
+-----
+On Linux, you can add the ``"block-network"`` tag to the tests (`example
+<https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/181297>`_). This
+enables network isolation for the tests, allowing you to run them in parallel
+via,
+
+.. code-block::
+
+ bazel test --runs_per_test=10 //pw_transfer/integration_tests/...
+
+MacOS
+-----
+Network isolation is not supported on MacOS because the OS doesn't support
+network virtualization (`gh#2669
+<https://github.com/bazelbuild/bazel/issues/2669>`_). The best you can do is to
+tag the tests ``"exclusive"``. This allows you to use ``--runs_per_test``, but
+will force each test to run by itself, with no parallelism.
+
+Why is this manual?
+-------------------
+Ideally, we would apply either the ``"block-network"`` or ``"exclusive"`` tag
+to the tests depending on the OS. But this is not supported, `gh#2971
+<https://github.com/bazelbuild/bazel/issues/2971>`_.
+
+We don't want to tag the tests ``"exclusive"`` by default because that will
+prevent *different* tests from running in parallel, significantly slowing them
+down.
.. toctree::
:hidden:
diff --git a/pw_transfer/handler_test.cc b/pw_transfer/handler_test.cc
index 4a15910cc..93a2b1e16 100644
--- a/pw_transfer/handler_test.cc
+++ b/pw_transfer/handler_test.cc
@@ -14,7 +14,7 @@
#include "pw_transfer/handler.h"
-#include "gtest/gtest.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer {
namespace {
diff --git a/pw_transfer/integration_test/BUILD.bazel b/pw_transfer/integration_test/BUILD.bazel
index e90d6b067..91c3340d7 100644
--- a/pw_transfer/integration_test/BUILD.bazel
+++ b/pw_transfer/integration_test/BUILD.bazel
@@ -12,8 +12,9 @@
# License for the specific language governing permissions and limitations under
# the License.
-load("@com_google_protobuf//:protobuf.bzl", "py_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_python//python:defs.bzl", "py_binary", "py_library", "py_test")
+load("@rules_python//python:proto.bzl", "py_proto_library")
load("//pw_build:pigweed.bzl", "pw_cc_binary")
pw_cc_binary(
@@ -65,6 +66,7 @@ py_test(
proto_library(
name = "config_proto",
srcs = ["config.proto"],
+ deps = ["//pw_protobuf:status_proto"],
)
cc_proto_library(
@@ -74,7 +76,7 @@ cc_proto_library(
py_proto_library(
name = "config_pb2",
- srcs = ["config.proto"],
+ deps = [":config_proto"],
)
java_proto_library(
@@ -84,6 +86,7 @@ java_proto_library(
pw_cc_binary(
name = "cpp_client",
+ testonly = True,
srcs = ["client.cc"],
deps = [
":config_cc_proto",
@@ -101,6 +104,7 @@ pw_cc_binary(
py_library(
name = "integration_test_fixture",
+ testonly = True,
srcs = [
"test_fixture.py",
],
@@ -161,11 +165,13 @@ py_test(
# Uses ports 3304 and 3305.
py_test(
name = "cross_language_medium_read_test",
- timeout = "long",
+ timeout = "moderate",
srcs = [
"cross_language_medium_read_test.py",
],
- tags = ["integration"],
+ tags = [
+ "integration",
+ ],
deps = [
":config_pb2",
":integration_test_fixture",
@@ -181,7 +187,9 @@ py_test(
srcs = [
"cross_language_medium_write_test.py",
],
- tags = ["integration"],
+ tags = [
+ "integration",
+ ],
deps = [
":config_pb2",
":integration_test_fixture",
@@ -197,7 +205,9 @@ py_test(
srcs = [
"cross_language_small_test.py",
],
- tags = ["integration"],
+ tags = [
+ "integration",
+ ],
deps = [
":config_pb2",
":integration_test_fixture",
@@ -212,7 +222,9 @@ py_test(
srcs = [
"multi_transfer_test.py",
],
- tags = ["integration"],
+ tags = [
+ "integration",
+ ],
deps = [
":config_pb2",
":integration_test_fixture",
@@ -223,9 +235,11 @@ py_test(
# Uses ports 3312 and 3313.
py_test(
name = "expected_errors_test",
- timeout = "moderate",
+ timeout = "long",
srcs = ["expected_errors_test.py"],
- tags = ["integration"],
+ tags = [
+ "integration",
+ ],
deps = [
":config_pb2",
":integration_test_fixture",
@@ -243,7 +257,9 @@ py_test(
data = [
"@pw_transfer_test_binaries//:all",
],
- tags = ["integration"],
+ tags = [
+ "integration",
+ ],
# Legacy binaries were only built for linux-x86_64.
target_compatible_with = ["@platforms//os:linux"],
deps = [
diff --git a/pw_transfer/integration_test/JavaClient.java b/pw_transfer/integration_test/JavaClient.java
index 834faadcf..b8ad89a4c 100644
--- a/pw_transfer/integration_test/JavaClient.java
+++ b/pw_transfer/integration_test/JavaClient.java
@@ -162,11 +162,14 @@ public class JavaClient {
return config_builder.build();
}
- public static void ReadFromServer(
- int resourceId, Path fileName, TransferClient client, Status expected_status) {
+ public static void ReadFromServer(int resourceId,
+ Path fileName,
+ TransferClient client,
+ Status expected_status,
+ int initial_offset) {
byte[] data;
try {
- data = client.read(resourceId).get();
+ data = client.read(resourceId, initial_offset).get();
} catch (ExecutionException e) {
if (((TransferError) e.getCause()).status() != expected_status) {
throw new AssertionError("Unexpected transfer read failure", e);
@@ -178,6 +181,10 @@ public class JavaClient {
throw new AssertionError("Read from server failed", e);
}
+ if (expected_status != Status.OK) {
+ throw new AssertionError("Transfer succeeded unexpectedly");
+ }
+
try {
Files.write(fileName, data);
} catch (IOException e) {
@@ -186,8 +193,11 @@ public class JavaClient {
}
}
- public static void WriteToServer(
- int resourceId, Path fileName, TransferClient client, Status expected_status) {
+ public static void WriteToServer(int resourceId,
+ Path fileName,
+ TransferClient client,
+ Status expected_status,
+ int initial_offset) {
if (Files.notExists(fileName)) {
logger.atSevere().log("Input file `%s` does not exist", fileName);
}
@@ -201,14 +211,19 @@ public class JavaClient {
}
try {
- client.write(resourceId, data).get();
+ client.write(resourceId, data, initial_offset).get();
} catch (ExecutionException e) {
if (((TransferError) e.getCause()).status() != expected_status) {
throw new AssertionError("Unexpected transfer write failure", e);
}
+ return;
} catch (InterruptedException e) {
throw new AssertionError("Write to server failed", e);
}
+
+ if (expected_status != Status.OK) {
+ throw new AssertionError("Transfer succeeded unexpectedly");
+ }
}
public static void main(String[] args) {
@@ -273,14 +288,25 @@ public class JavaClient {
} else {
client.setProtocolVersion(ProtocolVersion.latest());
}
-
- if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.WRITE_TO_SERVER) {
- WriteToServer(resourceId, fileName, client, Status.fromCode(action.getExpectedStatus()));
- } else if (action.getTransferType()
- == ConfigProtos.TransferAction.TransferType.READ_FROM_SERVER) {
- ReadFromServer(resourceId, fileName, client, Status.fromCode(action.getExpectedStatus()));
- } else {
- throw new AssertionError("Unknown transfer action type");
+ try {
+ if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.WRITE_TO_SERVER) {
+ WriteToServer(resourceId,
+ fileName,
+ client,
+ Status.fromCode(action.getExpectedStatus().getNumber()),
+ action.getInitialOffset());
+ } else if (action.getTransferType()
+ == ConfigProtos.TransferAction.TransferType.READ_FROM_SERVER) {
+ ReadFromServer(resourceId,
+ fileName,
+ client,
+ Status.fromCode(action.getExpectedStatus().getNumber()),
+ action.getInitialOffset());
+ } else {
+ throw new AssertionError("Unknown transfer action type");
+ }
+ } catch (AssertionError e) {
+ System.exit(1);
}
}
diff --git a/pw_transfer/integration_test/client.cc b/pw_transfer/integration_test/client.cc
index 1d932209c..e412c4f15 100644
--- a/pw_transfer/integration_test/client.cc
+++ b/pw_transfer/integration_test/client.cc
@@ -57,10 +57,6 @@ namespace {
// smaller receive buffer size.
constexpr int kMaxSocketSendBufferSize = 1;
-// This client configures a socket read timeout to allow the RPC dispatch thread
-// to exit gracefully.
-constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
-
thread::Options& TransferThreadOptions() {
static thread::stl::Options options;
return options;
@@ -116,36 +112,50 @@ pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) {
pw::transfer::TransferAction::TransferType::
TransferAction_TransferType_WRITE_TO_SERVER) {
pw::stream::StdFileReader input(action.file_path().c_str());
- client.Write(
+ pw::Result<pw::transfer::Client::Handle> handle = client.Write(
action.resource_id(),
input,
[&result](Status status) {
result.status = status;
result.completed.release();
},
- pw::transfer::cfg::kDefaultChunkTimeout,
+ protocol_version,
+ pw::transfer::cfg::kDefaultClientTimeout,
pw::transfer::cfg::kDefaultInitialChunkTimeout,
- protocol_version);
- // Wait for the transfer to complete. We need to do this here so that the
- // StdFileReader doesn't go out of scope.
- result.completed.acquire();
+ action.initial_offset());
+ if (handle.ok()) {
+ // Wait for the transfer to complete. We need to do this here so that
+ // the StdFileReader doesn't go out of scope.
+ result.completed.acquire();
+ } else {
+ result.status = handle.status();
+ }
+
+ input.Close();
} else if (action.transfer_type() ==
pw::transfer::TransferAction::TransferType::
TransferAction_TransferType_READ_FROM_SERVER) {
pw::stream::StdFileWriter output(action.file_path().c_str());
- client.Read(
+ pw::Result<pw::transfer::Client::Handle> handle = client.Read(
action.resource_id(),
output,
[&result](Status status) {
result.status = status;
result.completed.release();
},
- pw::transfer::cfg::kDefaultChunkTimeout,
+ protocol_version,
+ pw::transfer::cfg::kDefaultClientTimeout,
pw::transfer::cfg::kDefaultInitialChunkTimeout,
- protocol_version);
- // Wait for the transfer to complete.
- result.completed.acquire();
+ action.initial_offset());
+ if (handle.ok()) {
+ // Wait for the transfer to complete.
+ result.completed.acquire();
+ } else {
+ result.status = handle.status();
+ }
+
+ output.Close();
} else {
PW_LOG_ERROR("Unrecognized transfer action type %d",
action.transfer_type());
@@ -153,10 +163,10 @@ pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) {
break;
}
- if (result.status.code() != action.expected_status()) {
+ if (int(result.status.code()) != int(action.expected_status())) {
PW_LOG_ERROR("Failed to perform action:\n%s",
action.DebugString().c_str());
- status = result.status;
+ status = result.status.ok() ? Status::Unknown() : result.status;
break;
}
}
@@ -203,8 +213,7 @@ int main(int argc, char* argv[]) {
return 1;
}
- int retval = setsockopt(
- pw::rpc::integration_test::GetClientSocketFd(),
+ int retval = pw::rpc::integration_test::SetClientSockOpt(
SOL_SOCKET,
SO_SNDBUF,
&pw::transfer::integration_test::kMaxSocketSendBufferSize,
@@ -214,17 +223,6 @@ int main(int argc, char* argv[]) {
"Failed to configure socket send buffer size with errno=%d",
errno);
- retval =
- setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
- SOL_SOCKET,
- SO_RCVTIMEO,
- &pw::transfer::integration_test::kSocketReadTimeout,
- sizeof(pw::transfer::integration_test::kSocketReadTimeout));
- PW_CHECK_INT_EQ(retval,
- 0,
- "Failed to configure socket receive timeout with errno=%d",
- errno);
-
if (!pw::transfer::integration_test::PerformTransferActions(config).ok()) {
PW_LOG_INFO("Failed to transfer!");
return 1;
diff --git a/pw_transfer/integration_test/config.proto b/pw_transfer/integration_test/config.proto
index 4fc7de336..8d872fbf2 100644
--- a/pw_transfer/integration_test/config.proto
+++ b/pw_transfer/integration_test/config.proto
@@ -18,6 +18,8 @@ package pw.transfer;
option java_package = "pw.transfer";
option java_outer_classname = "ConfigProtos";
+import "pw_protobuf_protos/status.proto";
+
message TransferAction {
enum ProtocolVersion {
option allow_alias = true;
@@ -48,13 +50,13 @@ message TransferAction {
TransferType transfer_type = 3;
// Expected final status of transfer operation.
- //
- // TODO: b/241456982 - This should be a pw.protobuf.StatusCode, but importing
- // other Pigweed protos doesn't work in Bazel.
- uint32 expected_status = 4;
+ pw.protobuf.StatusCode expected_status = 4;
// Protocol version to initiate the transfer with.
ProtocolVersion protocol_version = 5;
+
+ // Initial offset to start transfer with. Defaults to 0.
+ uint32 initial_offset = 6;
}
// Configuration for the integration test client.
@@ -107,6 +109,9 @@ message ServerResourceLocations {
// If destination_paths is exhausted or empty, this destination path can be
// reused as a fallback indefinitely.
string default_destination_path = 6;
+
+ // Defines whether or not the resource can be read from an offset
+ bool offsettable = 7;
}
// Configuration for the integration test server.
@@ -194,6 +199,12 @@ message ServerFailureConfig {
// repeated for each element in packets_before_failure. After that list
// is exhausted, ServerFailure will send all packets.
repeated uint32 packets_before_failure = 1;
+
+ // By default, the ServerFailure starts counting packets after receiving a
+ // transfer START packet. If `start_immediately` is set to `true`, the filter
+ // will begin counting packets as soon as it is initialized, before the first
+ // START is seen.
+ bool start_immediately = 2;
}
// Configuration for the WindowPacketDropper proxy filter.
diff --git a/pw_transfer/integration_test/cross_language_medium_read_test.py b/pw_transfer/integration_test/cross_language_medium_read_test.py
index 6adea14cd..3fee23fae 100644
--- a/pw_transfer/integration_test/cross_language_medium_read_test.py
+++ b/pw_transfer/integration_test/cross_language_medium_read_test.py
@@ -49,6 +49,12 @@ _ALL_LANGUAGES_AND_VERSIONS = tuple(
itertools.product(_ALL_LANGUAGES, _ALL_VERSIONS)
)
+_ALL_LANGUAGES_V2 = tuple(
+ itertools.product(
+ _ALL_LANGUAGES, [config_pb2.TransferAction.ProtocolVersion.V2]
+ )
+)
+
class MediumTransferReadIntegrationTest(test_fixture.TransferIntegrationTest):
# Each set of transfer tests uses a different client/server port pair to
@@ -151,6 +157,56 @@ class MediumTransferReadIntegrationTest(test_fixture.TransferIntegrationTest):
permanent_resource_id=True,
)
+ @parameterized.expand(_ALL_LANGUAGES_V2)
+ def test_medium_client_read_offset(self, client_type, protocol_version):
+ payload = random.Random(67336391945).randbytes(512)
+ config = self.default_config()
+
+ resource_id = 6
+ self.do_single_read(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ protocol_version,
+ initial_offset=100,
+ offsettable_resources=True,
+ )
+
+ @parameterized.expand(_ALL_LANGUAGES_V2)
+ def test_medium_client_read_offset_with_drops(
+ self, client_type, protocol_version
+ ):
+ payload = random.Random(67336391945).randbytes(1024)
+ config = TransferConfig(
+ self.default_server_config(),
+ self.default_client_config(),
+ text_format.Parse(
+ """
+ client_filter_stack: [
+ { hdlc_packetizer: {} },
+ { keep_drop_queue: {keep_drop_queue: [5, 1]} }
+ ]
+
+ server_filter_stack: [
+ { hdlc_packetizer: {} },
+ { keep_drop_queue: {keep_drop_queue: [5, 1]} }
+ ]""",
+ config_pb2.ProxyConfig(),
+ ),
+ )
+
+ resource_id = 7
+ self.do_single_read(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ protocol_version,
+ initial_offset=100,
+ offsettable_resources=True,
+ )
+
if __name__ == '__main__':
test_fixture.run_tests_for(MediumTransferReadIntegrationTest)
diff --git a/pw_transfer/integration_test/cross_language_medium_write_test.py b/pw_transfer/integration_test/cross_language_medium_write_test.py
index a4e33db9b..26030f9ad 100644
--- a/pw_transfer/integration_test/cross_language_medium_write_test.py
+++ b/pw_transfer/integration_test/cross_language_medium_write_test.py
@@ -48,6 +48,11 @@ _ALL_VERSIONS = (
_ALL_LANGUAGES_AND_VERSIONS = tuple(
itertools.product(_ALL_LANGUAGES, _ALL_VERSIONS)
)
+_ALL_LANGUAGES_V2 = tuple(
+ itertools.product(
+ _ALL_LANGUAGES, [config_pb2.TransferAction.ProtocolVersion.V2]
+ )
+)
class MediumTransferWriteIntegrationTest(test_fixture.TransferIntegrationTest):
@@ -145,6 +150,56 @@ class MediumTransferWriteIntegrationTest(test_fixture.TransferIntegrationTest):
client_type, config, resource_id, payload, protocol_version
)
+ @parameterized.expand(_ALL_LANGUAGES_V2)
+ def test_medium_client_write_offset(self, client_type, protocol_version):
+ """Runs a non-zero starting offset transfer."""
+ payload = random.Random(67336391945).randbytes(512)
+ config = self.default_config()
+ resource_id = 6
+ self.do_single_write(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ protocol_version,
+ initial_offset=100,
+ offsettable_resources=True,
+ )
+
+ @parameterized.expand(_ALL_LANGUAGES_V2)
+ def test_medium_client_write_offset_with_drops(
+ self, client_type, protocol_version
+ ):
+ """Tests a non-zero starting offset transfer with periodic drops."""
+ payload = random.Random(67336391945).randbytes(1024)
+ config = TransferConfig(
+ self.default_server_config(),
+ self.default_client_config(),
+ text_format.Parse(
+ """
+ client_filter_stack: [
+ { hdlc_packetizer: {} },
+ { keep_drop_queue: {keep_drop_queue: [5, 1]} }
+ ]
+
+ server_filter_stack: [
+ { hdlc_packetizer: {} },
+ { keep_drop_queue: {keep_drop_queue: [5, 1]} }
+ ]""",
+ config_pb2.ProxyConfig(),
+ ),
+ )
+ resource_id = 6
+ self.do_single_write(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ protocol_version,
+ initial_offset=100,
+ offsettable_resources=True,
+ )
+
if __name__ == '__main__':
test_fixture.run_tests_for(MediumTransferWriteIntegrationTest)
diff --git a/pw_transfer/integration_test/cross_language_small_test.py b/pw_transfer/integration_test/cross_language_small_test.py
index fd05b7802..611e2d9b6 100644
--- a/pw_transfer/integration_test/cross_language_small_test.py
+++ b/pw_transfer/integration_test/cross_language_small_test.py
@@ -135,6 +135,52 @@ class SmallTransferIntegrationTest(test_fixture.TransferIntegrationTest):
client_type, config, resource_id, payload, protocol_version
)
+ @parameterized.expand(
+ [
+ ("cpp"),
+ ("java"),
+ ("python"),
+ ]
+ )
+ def test_offset_read_offset_to_end(self, client_type):
+ payload = b"Rabbits are the best pets"
+ config = self.default_config()
+ resource_id = 6
+
+ config = self.default_config()
+
+ self.do_single_read(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ initial_offset=len(payload),
+ offsettable_resources=True,
+ )
+
+ @parameterized.expand(
+ [
+ ("cpp"),
+ ("java"),
+ ("python"),
+ ]
+ )
+ def test_offset_write_offset_to_end(self, client_type):
+ payload = b"Rabbits are the best pets"
+ config = self.default_config()
+ resource_id = 6
+
+ config = self.default_config()
+
+ self.do_single_write(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ initial_offset=len(payload) + 1,
+ offsettable_resources=True,
+ )
+
if __name__ == '__main__':
test_fixture.run_tests_for(SmallTransferIntegrationTest)
diff --git a/pw_transfer/integration_test/expected_errors_test.py b/pw_transfer/integration_test/expected_errors_test.py
index 9a12b6879..fdd3896f3 100644
--- a/pw_transfer/integration_test/expected_errors_test.py
+++ b/pw_transfer/integration_test/expected_errors_test.py
@@ -40,8 +40,8 @@ import tempfile
from google.protobuf import text_format
from pigweed.pw_transfer.integration_test import config_pb2
-from pigweed.pw_protobuf.pw_protobuf_protos import status_pb2
from pigweed.pw_transfer.integration_test import test_fixture
+from pw_protobuf_protos import status_pb2
from test_fixture import TransferIntegrationTestHarness, TransferConfig
@@ -105,7 +105,12 @@ class ErrorTransferIntegrationTest(test_fixture.TransferIntegrationTest):
"""
client_filter_stack: [
{ hdlc_packetizer: {} },
- { server_failure: {packets_before_failure: [5]} }
+ {
+ server_failure: {
+ packets_before_failure: [5],
+ start_immediately: true
+ }
+ }
]
server_filter_stack: [
@@ -212,7 +217,9 @@ class ErrorTransferIntegrationTest(test_fixture.TransferIntegrationTest):
]
)
def test_client_read_timeout(self, client_type):
- payload = random.Random(67336391945).randbytes(4321)
+ # This must be > 8192 in order to exceed the window_end default and
+ # cause a timeout on python client
+ payload = random.Random(67336391945).randbytes(10321)
config = TransferConfig(
self.default_server_config(),
self.default_client_config(),
@@ -220,7 +227,12 @@ class ErrorTransferIntegrationTest(test_fixture.TransferIntegrationTest):
"""
client_filter_stack: [
{ hdlc_packetizer: {} },
- { server_failure: {packets_before_failure: [5]} }
+ {
+ server_failure: {
+ packets_before_failure: [5],
+ start_immediately: true
+ }
+ }
]
server_filter_stack: [
@@ -282,11 +294,12 @@ class ErrorTransferIntegrationTest(test_fixture.TransferIntegrationTest):
expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED,
)
+ # TODO(b/322497823): Re-enable java and python tests when they are fixed.
@parameterized.expand(
[
("cpp"),
- ("java"),
- ("python"),
+ # ("java"),
+ # ("python"),
]
)
def test_data_drop_client_lifetime_timeout(self, client_type):
@@ -331,6 +344,76 @@ class ErrorTransferIntegrationTest(test_fixture.TransferIntegrationTest):
expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED,
)
+ @parameterized.expand(
+ [
+ ("cpp"),
+ ("java"),
+ ("python"),
+ ]
+ )
+ def test_offset_read_unimpl_handler(self, client_type):
+ payload = b"Rabbits are the best pets"
+ config = self.default_config()
+ resource_id = 5
+
+ config = self.default_config()
+
+ self.do_single_read(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ initial_offset=len(payload),
+ expected_status=status_pb2.StatusCode.UNIMPLEMENTED,
+ )
+
+ @parameterized.expand(
+ [
+ ("cpp"),
+ ("java"),
+ ("python"),
+ ]
+ )
+ def test_offset_write_unimpl_handler(self, client_type):
+ payload = b"Rabbits are the best pets"
+ config = self.default_config()
+ resource_id = 5
+
+ config = self.default_config()
+
+ self.do_single_write(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ initial_offset=len(payload),
+ expected_status=status_pb2.StatusCode.UNIMPLEMENTED,
+ )
+
+ @parameterized.expand(
+ [
+ ("cpp"),
+ ("java"),
+ ("python"),
+ ]
+ )
+ def test_offset_read_invalid_offset(self, client_type):
+ payload = b"Rabbits are the best pets"
+ config = self.default_config()
+ resource_id = 6
+
+ config = self.default_config()
+
+ self.do_single_read(
+ client_type,
+ config,
+ resource_id,
+ payload,
+ initial_offset=len(payload) + 1,
+ offsettable_resources=True,
+ expected_status=status_pb2.StatusCode.RESOURCE_EXHAUSTED,
+ )
+
if __name__ == '__main__':
test_fixture.run_tests_for(ErrorTransferIntegrationTest)
diff --git a/pw_transfer/integration_test/proxy.py b/pw_transfer/integration_test/proxy.py
index 553518f92..d6bf0118d 100644
--- a/pw_transfer/integration_test/proxy.py
+++ b/pw_transfer/integration_test/proxy.py
@@ -36,6 +36,7 @@ from pigweed.pw_rpc.internal import packet_pb2
from pigweed.pw_transfer import transfer_pb2
from pigweed.pw_transfer.integration_test import config_pb2
from pw_hdlc import decode
+from pw_transfer import ProtocolVersion
from pw_transfer.chunk import Chunk
_LOG = logging.getLogger('pw_transfer_intergration_test_proxy')
@@ -75,7 +76,6 @@ class Filter(abc.ABC):
def __init__(self, send_data: Callable[[bytes], Awaitable[None]]):
self.send_data = send_data
- pass
@abc.abstractmethod
async def process(self, data: bytes) -> None:
@@ -143,7 +143,7 @@ class KeepDropQueue(Filter):
queue is looped over unless a negative element is found. A negative number
is effectively the same as a value of infinity.
- This filter is typically most pratical when used with a packetizer so data
+ This filter is typically most practical when used with a packetizer so data
can be dropped as distinct packets.
Examples:
@@ -176,7 +176,7 @@ class KeepDropQueue(Filter):
self._name = name
async def process(self, data: bytes) -> None:
- # Move forward through the queue if neeeded.
+ # Move forward through the queue if needed.
while self._current_count == 0:
self._loop_idx += 1
self._current_count = self._keep_drop_queue[
@@ -296,12 +296,15 @@ class ServerFailure(Filter):
send_data: Callable[[bytes], Awaitable[None]],
name: str,
packets_before_failure_list: List[int],
+ start_immediately: bool = False,
):
super().__init__(send_data)
self._name = name
self._relay_packets = True
self._packets_before_failure_list = packets_before_failure_list
- self.advance_packets_before_failure()
+ self._packets_before_failure = None
+ if start_immediately:
+ self.advance_packets_before_failure()
def advance_packets_before_failure(self):
if len(self._packets_before_failure_list) > 0:
@@ -387,6 +390,7 @@ class EventFilter(Filter):
event_queue: asyncio.Queue,
):
super().__init__(send_data)
+ self._name = name
self._queue = event_queue
async def process(self, data: bytes) -> None:
@@ -417,11 +421,17 @@ def _extract_transfer_chunk(data: bytes) -> Chunk:
for frame in decoder.process(data):
packet = packet_pb2.RpcPacket()
packet.ParseFromString(frame.data)
- raw_chunk = transfer_pb2.Chunk()
- raw_chunk.ParseFromString(packet.payload)
- return Chunk.from_message(raw_chunk)
- raise ValueError("Invalid transfer frame")
+ if packet.payload:
+ raw_chunk = transfer_pb2.Chunk()
+ raw_chunk.ParseFromString(packet.payload)
+ return Chunk.from_message(raw_chunk)
+
+ # The incoming data is expected to be HDLC-packetized, so only one
+ # frame should exist.
+ break
+
+ raise ValueError("Invalid transfer chunk frame")
async def _handle_simplex_events(
@@ -476,7 +486,10 @@ async def _handle_simplex_connection(
elif filter_name == "server_failure":
server_failure = config.server_failure
filter_stack = ServerFailure(
- filter_stack, name, server_failure.packets_before_failure
+ filter_stack,
+ name,
+ server_failure.packets_before_failure,
+ server_failure.start_immediately,
)
event_handlers.append(filter_stack.handle_event)
elif filter_name == "keep_drop_queue":
diff --git a/pw_transfer/integration_test/proxy_test.py b/pw_transfer/integration_test/proxy_test.py
index 5a0599c96..982a7e035 100644
--- a/pw_transfer/integration_test/proxy_test.py
+++ b/pw_transfer/integration_test/proxy_test.py
@@ -24,8 +24,7 @@ import unittest
from pigweed.pw_rpc.internal import packet_pb2
from pigweed.pw_transfer import transfer_pb2
from pw_hdlc import encode
-from pw_transfer import ProtocolVersion
-from pw_transfer.chunk import Chunk
+from pw_transfer.chunk import Chunk, ProtocolVersion
import proxy
@@ -45,10 +44,13 @@ class MockRng(abc.ABC):
class ProxyTest(unittest.IsolatedAsyncioTestCase):
async def test_transposer_simple(self):
sent_packets: List[bytes] = []
+ new_packets_event: asyncio.Event = asyncio.Event()
# Async helper so DataTransposer can await on it.
async def append(list: List[bytes], data: bytes):
list.append(data)
+ # Notify that a new packet was "sent".
+ new_packets_event.set()
transposer = proxy.DataTransposer(
lambda data: append(sent_packets, data),
@@ -61,10 +63,21 @@ class ProxyTest(unittest.IsolatedAsyncioTestCase):
await transposer.process(b'aaaaaaaaaa')
await transposer.process(b'bbbbbbbbbb')
- # Give the transposer task time to process the data.
- await asyncio.sleep(0.05)
-
- self.assertEqual(sent_packets, [b'bbbbbbbbbb', b'aaaaaaaaaa'])
+ expected_packets = [b'bbbbbbbbbb', b'aaaaaaaaaa']
+ while True:
+ # Wait for new packets with a generous timeout.
+ try:
+ await asyncio.wait_for(new_packets_event.wait(), timeout=60.0)
+ except TimeoutError:
+ self.fail(
+ f'Timeout waiting for data. Packets sent: {sent_packets}'
+ )
+
+ # Only assert the sent packets are corrected when we've sent the
+ # expected number.
+ if len(sent_packets) == len(expected_packets):
+ self.assertEqual(sent_packets, expected_packets)
+ return
async def test_transposer_timeout(self):
sent_packets: List[bytes] = []
@@ -104,6 +117,7 @@ class ProxyTest(unittest.IsolatedAsyncioTestCase):
lambda data: append(sent_packets, data),
name="test",
packets_before_failure_list=packets_before_failure.copy(),
+ start_immediately=True,
)
# After passing the list to ServerFailure, add a test for no
@@ -245,6 +259,94 @@ class ProxyTest(unittest.IsolatedAsyncioTestCase):
self.assertEqual(sent_packets, expected_packets)
window_packet_dropper.handle_event(event)
+ async def test_event_filter(self):
+ sent_packets: List[bytes] = []
+
+ # Async helper so EventFilter can await on it.
+ async def append(list: List[bytes], data: bytes):
+ list.append(data)
+
+ queue = asyncio.Queue()
+
+ event_filter = proxy.EventFilter(
+ lambda data: append(sent_packets, data),
+ name="test",
+ event_queue=queue,
+ )
+
+ request = packet_pb2.RpcPacket(
+ type=packet_pb2.PacketType.REQUEST,
+ channel_id=101,
+ service_id=1001,
+ method_id=100001,
+ ).SerializeToString()
+
+ packets = [
+ request,
+ _encode_rpc_frame(
+ Chunk(
+ ProtocolVersion.VERSION_TWO, Chunk.Type.START, session_id=1
+ )
+ ),
+ _encode_rpc_frame(
+ Chunk(
+ ProtocolVersion.VERSION_TWO,
+ Chunk.Type.DATA,
+ session_id=1,
+ data=b'3',
+ )
+ ),
+ _encode_rpc_frame(
+ Chunk(
+ ProtocolVersion.VERSION_TWO,
+ Chunk.Type.DATA,
+ session_id=1,
+ data=b'3',
+ )
+ ),
+ request,
+ _encode_rpc_frame(
+ Chunk(
+ ProtocolVersion.VERSION_TWO, Chunk.Type.START, session_id=2
+ )
+ ),
+ _encode_rpc_frame(
+ Chunk(
+ ProtocolVersion.VERSION_TWO,
+ Chunk.Type.DATA,
+ session_id=2,
+ data=b'4',
+ )
+ ),
+ _encode_rpc_frame(
+ Chunk(
+ ProtocolVersion.VERSION_TWO,
+ Chunk.Type.DATA,
+ session_id=2,
+ data=b'5',
+ )
+ ),
+ ]
+
+ expected_events = [
+ None, # request
+ proxy.Event.TRANSFER_START, # start chunk
+ None, # data chunk
+ None, # data chunk
+ None, # request
+ proxy.Event.TRANSFER_START, # start chunk
+ None, # data chunk
+ None, # data chunk
+ ]
+
+ for packet, expected_event in zip(packets, expected_events):
+ await event_filter.process(packet)
+ try:
+ event = queue.get_nowait()
+ except asyncio.QueueEmpty:
+ event = None
+ self.assertEqual(event, expected_event)
+
def _encode_rpc_frame(chunk: Chunk) -> bytes:
packet = packet_pb2.RpcPacket(
diff --git a/pw_transfer/integration_test/python_client.py b/pw_transfer/integration_test/python_client.py
index 2474cf121..f26d45146 100644
--- a/pw_transfer/integration_test/python_client.py
+++ b/pw_transfer/integration_test/python_client.py
@@ -58,6 +58,7 @@ def _perform_transfer_action(
action.resource_id,
data,
protocol_version=protocol_version,
+ initial_offset=action.initial_offset,
)
except pw_transfer.client.Error as e:
if e.status != Status(action.expected_status):
@@ -65,6 +66,7 @@ def _perform_transfer_action(
"Unexpected error encountered during write transfer"
)
return False
+ return True
except:
_LOG.exception("Transfer (write to server) failed")
return False
@@ -76,6 +78,7 @@ def _perform_transfer_action(
data = transfer_manager.read(
action.resource_id,
protocol_version=protocol_version,
+ initial_offset=action.initial_offset,
)
except pw_transfer.client.Error as e:
if e.status != Status(action.expected_status):
@@ -97,6 +100,9 @@ def _perform_transfer_action(
else:
_LOG.critical("Unknown transfer type: %d", action.transfer_type)
return False
+ if Status(action.expected_status) != Status.OK:
+ _LOG.error("Transfer was not expected to succeed")
+ return False
return True
diff --git a/pw_transfer/integration_test/server.cc b/pw_transfer/integration_test/server.cc
index 98a29f045..92dadfd0d 100644
--- a/pw_transfer/integration_test/server.cc
+++ b/pw_transfer/integration_test/server.cc
@@ -73,12 +73,14 @@ class FileTransferHandler final : public ReadWriteHandler {
std::deque<std::string>&& sources,
std::deque<std::string>&& destinations,
std::string default_source_path,
- std::string default_destination_path)
+ std::string default_destination_path,
+ bool offsettable)
: ReadWriteHandler(resource_id),
sources_(sources),
destinations_(destinations),
default_source_path_(default_source_path),
- default_destination_path_(default_destination_path) {}
+ default_destination_path_(default_destination_path),
+ offsettable(offsettable) {}
~FileTransferHandler() = default;
@@ -101,6 +103,23 @@ class FileTransferHandler final : public ReadWriteHandler {
return OkStatus();
}
+ Status PrepareRead(uint32_t offset) final {
+ if (!offsettable) {
+ return Status::Unimplemented();
+ }
+
+ if (Status status = PrepareRead(); !status.ok()) {
+ return status;
+ }
+
+ if (offset >
+ std::get<stream::StdFileReader>(stream_).ConservativeReadLimit()) {
+ return Status::ResourceExhausted();
+ }
+
+ return std::get<stream::StdFileReader>(stream_).Seek(offset);
+ }
+
void FinalizeRead(Status) final {
std::get<stream::StdFileReader>(stream_).Close();
}
@@ -124,6 +143,24 @@ class FileTransferHandler final : public ReadWriteHandler {
return OkStatus();
}
+ Status PrepareWrite(uint32_t offset) final {
+ if (!offsettable) {
+ return Status::Unimplemented();
+ }
+
+ if (Status status = PrepareWrite(); !status.ok()) {
+ return status;
+ }
+
+ // It does not appear possible to hit this limit
+ if (offset >
+ std::get<stream::StdFileWriter>(stream_).ConservativeWriteLimit()) {
+ return Status::ResourceExhausted();
+ }
+
+ return std::get<stream::StdFileWriter>(stream_).Seek(offset);
+ }
+
Status FinalizeWrite(Status) final {
std::get<stream::StdFileWriter>(stream_).Close();
return OkStatus();
@@ -136,6 +173,7 @@ class FileTransferHandler final : public ReadWriteHandler {
std::string default_destination_path_;
std::variant<std::monostate, stream::StdFileReader, stream::StdFileWriter>
stream_;
+ bool offsettable;
};
void RunServer(int socket_port, ServerConfig config) {
@@ -158,11 +196,11 @@ void RunServer(int socket_port, ServerConfig config) {
thread::Thread transfer_thread_handle =
thread::Thread(thread::stl::Options(), transfer_thread);
- int retval = setsockopt(rpc::system_server::GetServerSocketFd(),
- SOL_SOCKET,
- SO_SNDBUF,
- &kMaxSocketSendBufferSize,
- sizeof(kMaxSocketSendBufferSize));
+ int retval =
+ rpc::system_server::SetServerSockOpt(SOL_SOCKET,
+ SO_SNDBUF,
+ &kMaxSocketSendBufferSize,
+ sizeof(kMaxSocketSendBufferSize));
PW_CHECK_INT_EQ(retval,
0,
"Failed to configure socket send buffer size with errno=%d",
@@ -183,7 +221,8 @@ void RunServer(int socket_port, ServerConfig config) {
std::move(source_paths),
std::move(destination_paths),
resource.second.default_source_path(),
- resource.second.default_destination_path());
+ resource.second.default_destination_path(),
+ resource.second.offsettable());
transfer_service.RegisterHandler(*handler);
handlers.push_back(std::move(handler));
diff --git a/pw_transfer/integration_test/test_fixture.py b/pw_transfer/integration_test/test_fixture.py
index 60323c223..f1877b0ee 100644
--- a/pw_transfer/integration_test/test_fixture.py
+++ b/pw_transfer/integration_test/test_fixture.py
@@ -27,7 +27,7 @@ import unittest
from google.protobuf import text_format
-from pigweed.pw_protobuf.pw_protobuf_protos import status_pb2
+from pw_protobuf_protos import status_pb2
from pigweed.pw_transfer.integration_test import config_pb2
from rules_python.python.runfiles import runfiles
@@ -415,6 +415,8 @@ class TransferIntegrationTest(unittest.TestCase):
protocol_version=config_pb2.TransferAction.ProtocolVersion.LATEST,
permanent_resource_id=False,
expected_status=status_pb2.StatusCode.OK,
+ initial_offset=0,
+ offsettable_resources=False,
) -> None:
"""Performs a single client-to-server write of the provided data."""
with tempfile.NamedTemporaryFile() as f_payload, tempfile.NamedTemporaryFile() as f_server_output:
@@ -426,13 +428,17 @@ class TransferIntegrationTest(unittest.TestCase):
config.server.resources[resource_id].destination_paths.append(
f_server_output.name
)
+ config.server.resources[
+ resource_id
+ ].offsettable = offsettable_resources
config.client.transfer_actions.append(
config_pb2.TransferAction(
resource_id=resource_id,
file_path=f_payload.name,
transfer_type=config_pb2.TransferAction.TransferType.WRITE_TO_SERVER,
protocol_version=protocol_version,
- expected_status=int(expected_status),
+ expected_status=expected_status,
+ initial_offset=initial_offset,
)
)
@@ -447,7 +453,15 @@ class TransferIntegrationTest(unittest.TestCase):
self.assertEqual(exit_codes.client, 0)
self.assertEqual(exit_codes.server, 0)
if expected_status == status_pb2.StatusCode.OK:
- self.assertEqual(f_server_output.read(), data)
+ bytes_output = f_server_output.read()
+ self.assertEqual(
+ bytes_output[initial_offset:],
+ data,
+ )
+ # Ensure we didn't write data to places before offset
+ self.assertEqual(
+ bytes_output[:initial_offset], b'\x00' * initial_offset
+ )
def do_single_read(
self,
@@ -458,6 +472,8 @@ class TransferIntegrationTest(unittest.TestCase):
protocol_version=config_pb2.TransferAction.ProtocolVersion.LATEST,
permanent_resource_id=False,
expected_status=status_pb2.StatusCode.OK,
+ initial_offset=0,
+ offsettable_resources=False,
) -> None:
"""Performs a single server-to-client read of the provided data."""
with tempfile.NamedTemporaryFile() as f_payload, tempfile.NamedTemporaryFile() as f_client_output:
@@ -469,13 +485,17 @@ class TransferIntegrationTest(unittest.TestCase):
config.server.resources[resource_id].source_paths.append(
f_payload.name
)
+ config.server.resources[
+ resource_id
+ ].offsettable = offsettable_resources
config.client.transfer_actions.append(
config_pb2.TransferAction(
resource_id=resource_id,
file_path=f_client_output.name,
transfer_type=config_pb2.TransferAction.TransferType.READ_FROM_SERVER,
protocol_version=protocol_version,
- expected_status=int(expected_status),
+ expected_status=expected_status,
+ initial_offset=initial_offset,
)
)
@@ -489,7 +509,11 @@ class TransferIntegrationTest(unittest.TestCase):
self.assertEqual(exit_codes.client, 0)
self.assertEqual(exit_codes.server, 0)
if expected_status == status_pb2.StatusCode.OK:
- self.assertEqual(f_client_output.read(), data)
+ bytes_output = f_client_output.read()
+ self.assertEqual(
+ bytes_output,
+ data[initial_offset:],
+ )
def do_basic_transfer_sequence(
self,
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/BUILD.bazel b/pw_transfer/java/main/dev/pigweed/pw_transfer/BUILD.bazel
index dffe540cd..5ea8cc336 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/BUILD.bazel
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/BUILD.bazel
@@ -15,22 +15,24 @@
# Client for the pw_transfer RPC service, which is used to send and receive data
# over pw_rpc.
+TRANSFER_CLIENT_SOURCES = [
+ "ProtocolVersion.java",
+ "ReadTransfer.java",
+ "Transfer.java",
+ "TransferClient.java",
+ "TransferError.java",
+ "TransferEventHandler.java",
+ "TransferParameters.java",
+ "TransferProgress.java",
+ "TransferService.java",
+ "TransferTimeoutSettings.java",
+ "VersionedChunk.java",
+ "WriteTransfer.java",
+]
+
java_library(
name = "client",
- srcs = [
- "ProtocolVersion.java",
- "ReadTransfer.java",
- "Transfer.java",
- "TransferClient.java",
- "TransferError.java",
- "TransferEventHandler.java",
- "TransferParameters.java",
- "TransferProgress.java",
- "TransferService.java",
- "TransferTimeoutSettings.java",
- "VersionedChunk.java",
- "WriteTransfer.java",
- ],
+ srcs = TRANSFER_CLIENT_SOURCES,
visibility = ["//visibility:public"],
deps = [
"//pw_log/java/main/dev/pigweed/pw_log",
@@ -43,3 +45,20 @@ java_library(
"@maven//:com_google_guava_guava",
],
)
+
+android_library(
+ name = "client_android",
+ srcs = TRANSFER_CLIENT_SOURCES,
+ tags = ["manual"], # TODO: b/227771184 - support Android in the Bazel build
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pw_log/java/android_main/dev/pigweed/pw_log",
+ "//pw_rpc/java/main/dev/pigweed/pw_rpc:client_android",
+ "//pw_transfer:transfer_proto_java_lite",
+ "//third_party/google_auto:value",
+ "@com_google_protobuf//java/lite",
+ "@maven//:com_google_code_findbugs_jsr305",
+ "@maven//:com_google_guava_failureaccess",
+ "@maven//:com_google_guava_guava",
+ ],
+)
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
index aa9389b77..7d8c03ffb 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
@@ -46,7 +46,6 @@ class ReadTransfer extends Transfer<byte[]> {
private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE;
- private int offset = 0;
private int windowEndOffset = 0;
private int lastReceivedOffset = 0;
@@ -58,14 +57,16 @@ class ReadTransfer extends Transfer<byte[]> {
TransferTimeoutSettings timeoutSettings,
TransferParameters transferParameters,
Consumer<TransferProgress> progressCallback,
- BooleanSupplier shouldAbortCallback) {
+ BooleanSupplier shouldAbortCallback,
+ int initialOffset) {
super(resourceId,
sessionId,
desiredProtocolVersion,
transferManager,
timeoutSettings,
progressCallback,
- shouldAbortCallback);
+ shouldAbortCallback,
+ initialOffset);
this.parameters = transferParameters;
this.windowEndOffset = parameters.maxPendingBytes();
}
@@ -81,6 +82,7 @@ class ReadTransfer extends Transfer<byte[]> {
@Override
void prepareInitialChunk(VersionedChunk.Builder chunk) {
+ chunk.setInitialOffset(getOffset());
setTransferParameters(chunk);
}
@@ -101,10 +103,10 @@ class ReadTransfer extends Transfer<byte[]> {
// Track the last seen offset so the DropRecovery state can detect retried packets.
lastReceivedOffset = chunk.offset();
- if (chunk.offset() != offset) {
+ if (chunk.offset() != getOffset()) {
logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
ReadTransfer.this,
- offset,
+ getOffset(),
chunk.offset());
// For now, only in-order transfers are supported. If data is received out of order,
@@ -119,7 +121,7 @@ class ReadTransfer extends Transfer<byte[]> {
dataChunks.addAll(chunk.data().asReadOnlyByteBufferList());
totalDataSize += chunk.data().size();
- offset += chunk.data().size();
+ setOffset(getOffset() + chunk.data().size());
if (chunk.remainingBytes().isPresent()) {
if (chunk.remainingBytes().getAsLong() == 0) {
@@ -134,12 +136,12 @@ class ReadTransfer extends Transfer<byte[]> {
}
if (remainingTransferSize == UNKNOWN_TRANSFER_SIZE || remainingTransferSize == 0) {
- updateProgress(offset, offset, UNKNOWN_TRANSFER_SIZE);
+ updateProgress(getOffset(), getOffset(), UNKNOWN_TRANSFER_SIZE);
} else {
- updateProgress(offset, offset, offset + remainingTransferSize);
+ updateProgress(getOffset(), getOffset(), getOffset() + remainingTransferSize);
}
- int remainingWindowSize = windowEndOffset - offset;
+ int remainingWindowSize = windowEndOffset - getOffset();
boolean extendWindow =
remainingWindowSize <= parameters.maxPendingBytes() / EXTEND_WINDOW_DIVISOR;
@@ -158,9 +160,9 @@ class ReadTransfer extends Transfer<byte[]> {
private class DropRecovery extends ActiveState {
@Override
public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
- if (chunk.offset() == offset) {
+ if (chunk.offset() == getOffset()) {
logger.atFine().log(
- "%s received expected offset %d, resuming transfer", ReadTransfer.this, offset);
+ "%s received expected offset %d, resuming transfer", ReadTransfer.this, getOffset());
changeState(new ReceivingData()).handleDataChunk(chunk);
return;
}
@@ -176,7 +178,7 @@ class ReadTransfer extends Transfer<byte[]> {
lastReceivedOffset = chunk.offset();
logger.atFiner().log("%s expecting offset %d, ignoring received offset %d",
ReadTransfer.this,
- offset,
+ getOffset(),
chunk.offset());
}
setNextChunkTimeout();
@@ -193,16 +195,15 @@ class ReadTransfer extends Transfer<byte[]> {
}
private VersionedChunk prepareTransferParameters(boolean extend) {
- windowEndOffset = offset + parameters.maxPendingBytes();
+ windowEndOffset = getOffset() + parameters.maxPendingBytes();
Chunk.Type type = extend ? Chunk.Type.PARAMETERS_CONTINUE : Chunk.Type.PARAMETERS_RETRANSMIT;
return setTransferParameters(newChunk(type)).build();
}
private VersionedChunk.Builder setTransferParameters(VersionedChunk.Builder chunk) {
- chunk.setWindowEndOffset(offset + parameters.maxPendingBytes())
- .setMaxChunkSizeBytes(parameters.maxChunkSizeBytes())
- .setOffset(offset)
+ chunk.setMaxChunkSizeBytes(parameters.maxChunkSizeBytes())
+ .setOffset(getOffset())
.setWindowEndOffset(windowEndOffset);
if (parameters.chunkDelayMicroseconds() > 0) {
chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds());
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
index 8183fb848..b50b9055f 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
@@ -40,6 +40,7 @@ abstract class Transfer<T> extends AbstractFuture<T> {
private final int resourceId;
private final int sessionId;
+ private int offset;
private final ProtocolVersion desiredProtocolVersion;
private final TransferEventHandler.TransferInterface eventHandler;
private final TransferTimeoutSettings timeoutSettings;
@@ -73,9 +74,11 @@ abstract class Transfer<T> extends AbstractFuture<T> {
TransferInterface eventHandler,
TransferTimeoutSettings timeoutSettings,
Consumer<TransferProgress> progressCallback,
- BooleanSupplier shouldAbortCallback) {
+ BooleanSupplier shouldAbortCallback,
+ int initial_offset) {
this.resourceId = resourceId;
this.sessionId = sessionId;
+ this.offset = initial_offset;
this.desiredProtocolVersion = desiredProtocolVersion;
this.eventHandler = eventHandler;
@@ -119,7 +122,11 @@ abstract class Transfer<T> extends AbstractFuture<T> {
return sessionId;
}
- final ProtocolVersion getDesiredProtocolVersionForTest() {
+ public final int getOffset() {
+ return offset;
+ }
+
+ final ProtocolVersion getDesiredProtocolVersion() {
return desiredProtocolVersion;
}
@@ -132,6 +139,10 @@ abstract class Transfer<T> extends AbstractFuture<T> {
return deadline;
}
+ final void setOffset(int offset) {
+ this.offset = offset;
+ }
+
final void setNextChunkTimeout() {
deadline = Instant.now().plusMillis(timeoutSettings.timeoutMillis());
}
@@ -399,6 +410,12 @@ abstract class Transfer<T> extends AbstractFuture<T> {
changeState(getWaitingForDataState());
if (chunk.type() != Chunk.Type.START_ACK) {
+ if (offset != 0) {
+ logger.atWarning().log(
+ "%s aborting due to unsupported non-zero offset transfer: %s", Transfer.this, chunk);
+ setStateTerminatingAndSendFinalChunk(Status.INTERNAL);
+ return;
+ }
logger.atFine().log(
"%s got non-handshake chunk; reverting to legacy protocol", Transfer.this);
configuredProtocolVersion = ProtocolVersion.LEGACY;
@@ -418,6 +435,13 @@ abstract class Transfer<T> extends AbstractFuture<T> {
desiredProtocolVersion,
chunk.version());
+ if (offset != chunk.initialOffset()) {
+ logger.atWarning().log(
+ "%s aborting due to unconfirmed non-zero offset transfer: %s", Transfer.this, chunk);
+ setStateTerminatingAndSendFinalChunk(Status.UNIMPLEMENTED);
+ return;
+ }
+
VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION);
prepareInitialChunk(startAckConfirmation);
sendChunk(startAckConfirmation.build());
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java
index 13779005a..b0c7473f7 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferClient.java
@@ -35,7 +35,7 @@ public class TransferClient {
private final TransferEventHandler transferEventHandler;
private final Thread transferEventHandlerThread;
- private ProtocolVersion desiredProtocolVersion = ProtocolVersion.LEGACY;
+ private ProtocolVersion desiredProtocolVersion = ProtocolVersion.VERSION_TWO;
/**
* Creates a new transfer client for sending and receiving data with pw_transfer.
@@ -87,7 +87,23 @@ public class TransferClient {
/** Writes the provided data to the given transfer resource. */
public ListenableFuture<Void> write(int resourceId, byte[] data) {
- return write(resourceId, data, transferProgress -> {});
+ return write(resourceId, data, transferProgress -> {}, 0);
+ }
+
+ /**
+ * Writes the provided data to the given transfer resource, calling the progress callback as data
+ * is sent
+ */
+ public ListenableFuture<Void> write(
+ int resourceId, byte[] data, Consumer<TransferProgress> progressCallback) {
+ return write(resourceId, data, progressCallback, 0);
+ }
+
+ /**
+ * Writes the provided data to the given transfer resource, starting at the given initial offset
+ */
+ public ListenableFuture<Void> write(int resourceId, byte[] data, int initialOffset) {
+ return write(resourceId, data, transferProgress -> {}, initialOffset);
}
/**
@@ -97,27 +113,33 @@ public class TransferClient {
* @param resourceId The ID of the resource to which to write
* @param data the data to write
* @param progressCallback called each time a packet is sent
+ * @param initialOffset The offset to start writing to on the server side
*/
public ListenableFuture<Void> write(
- int resourceId, byte[] data, Consumer<TransferProgress> progressCallback) {
- return transferEventHandler.startWriteTransferAsClient(
- resourceId, desiredProtocolVersion, settings, data, progressCallback, shouldAbortCallback);
+ int resourceId, byte[] data, Consumer<TransferProgress> progressCallback, int initialOffset) {
+ return transferEventHandler.startWriteTransferAsClient(resourceId,
+ desiredProtocolVersion,
+ settings,
+ data,
+ progressCallback,
+ shouldAbortCallback,
+ initialOffset);
}
/** Reads the data from the given transfer resource ID. */
public ListenableFuture<byte[]> read(int resourceId) {
- return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {});
+ return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {}, 0);
}
/** Reads the data for a transfer resource, calling the progress callback as data is received. */
public ListenableFuture<byte[]> read(
int resourceId, Consumer<TransferProgress> progressCallback) {
- return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback);
+ return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback, 0);
}
/** Reads the data for a transfer resource, using the specified transfer parameters. */
public ListenableFuture<byte[]> read(int resourceId, TransferParameters parameters) {
- return read(resourceId, parameters, (progressCallback) -> {});
+ return read(resourceId, parameters, (progressCallback) -> {}, 0);
}
/**
@@ -125,12 +147,46 @@ public class TransferClient {
*/
public ListenableFuture<byte[]> read(
int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback) {
+ return read(resourceId, parameters, progressCallback, 0);
+ }
+
+ /** Reads the data from the given transfer resource ID. */
+ public ListenableFuture<byte[]> read(int resourceId, int initialOffset) {
+ return read(
+ resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {}, initialOffset);
+ }
+
+ /** Reads the data for a transfer resource, calling the progress callback as data is received. */
+ public ListenableFuture<byte[]> read(
+ int resourceId, Consumer<TransferProgress> progressCallback, int initialOffset) {
+ return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback, initialOffset);
+ }
+
+ /** Reads the data for a transfer resource, using the specified transfer parameters. */
+ public ListenableFuture<byte[]> read(
+ int resourceId, TransferParameters parameters, int initialOffset) {
+ return read(resourceId, parameters, (progressCallback) -> {}, initialOffset);
+ }
+
+ /**
+ * Reads the data for a transfer resource, using the specified parameters and progress callback.
+ *
+ * @param resourceId The ID of the resource to which to read
+ * @param parameters The transfer parameters to use
+ * @param progressCallback called each time a packet is sent
+ * @param initialOffset The offset to start reading from on the server side
+ */
+ public ListenableFuture<byte[]> read(int resourceId,
+ TransferParameters parameters,
+ Consumer<TransferProgress> progressCallback,
+ int initialOffset) {
return transferEventHandler.startReadTransferAsClient(resourceId,
desiredProtocolVersion,
settings,
parameters,
progressCallback,
- shouldAbortCallback);
+ shouldAbortCallback,
+ initialOffset);
}
/**
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java
index a37d05117..bbb167f65 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferEventHandler.java
@@ -73,7 +73,8 @@ class TransferEventHandler {
TransferTimeoutSettings settings,
byte[] data,
Consumer<TransferProgress> progressCallback,
- BooleanSupplier shouldAbortCallback) {
+ BooleanSupplier shouldAbortCallback,
+ int initialOffset) {
WriteTransfer transfer = new WriteTransfer(
resourceId, assignSessionId(), desiredProtocolVersion, new TransferInterface() {
@Override
@@ -88,7 +89,7 @@ class TransferEventHandler {
}
return writeStream;
}
- }, settings, data, progressCallback, shouldAbortCallback);
+ }, settings, data, progressCallback, shouldAbortCallback, initialOffset);
startTransferAsClient(transfer);
return transfer;
}
@@ -98,7 +99,8 @@ class TransferEventHandler {
TransferTimeoutSettings settings,
TransferParameters parameters,
Consumer<TransferProgress> progressCallback,
- BooleanSupplier shouldAbortCallback) {
+ BooleanSupplier shouldAbortCallback,
+ int initialOffset) {
ReadTransfer transfer = new ReadTransfer(
resourceId, assignSessionId(), desiredProtocolVersion, new TransferInterface() {
@Override
@@ -113,7 +115,7 @@ class TransferEventHandler {
}
return readStream;
}
- }, settings, parameters, progressCallback, shouldAbortCallback);
+ }, settings, parameters, progressCallback, shouldAbortCallback, initialOffset);
startTransferAsClient(transfer);
return transfer;
}
@@ -124,6 +126,11 @@ class TransferEventHandler {
throw new AssertionError("Duplicate session ID " + transfer.getSessionId());
}
+ if (transfer.getDesiredProtocolVersion() == ProtocolVersion.LEGACY
+ && transfer.getOffset() != 0) {
+ throw new AssertionError("Cannot start non-zero offset transfer with legacy version");
+ }
+
// The v2 protocol supports multiple transfers for a single resource. For simplicity while
// supporting both protocols, only support a single transfer per resource.
if (legacyIdToSessionId.containsKey(transfer.getResourceId())) {
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferService.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferService.java
index c11151fcd..a909f712b 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferService.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/TransferService.java
@@ -15,6 +15,7 @@
package dev.pigweed.pw_transfer;
import static dev.pigweed.pw_rpc.Service.bidirectionalStreamingMethod;
+import static dev.pigweed.pw_rpc.Service.unaryMethod;
import dev.pigweed.pw_rpc.Service;
@@ -22,7 +23,8 @@ import dev.pigweed.pw_rpc.Service;
public class TransferService {
private static final Service SERVICE = new Service("pw.transfer.Transfer",
bidirectionalStreamingMethod("Read", Chunk.parser(), Chunk.parser()),
- bidirectionalStreamingMethod("Write", Chunk.parser(), Chunk.parser()));
+ bidirectionalStreamingMethod("Write", Chunk.parser(), Chunk.parser()),
+ unaryMethod("GetResourceStatus", ResourceStatusRequest.parser(), ResourceStatus.parser()));
public static Service get() {
return SERVICE;
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
index 27261888b..9481e167e 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/VersionedChunk.java
@@ -39,6 +39,8 @@ abstract class VersionedChunk {
public abstract int offset();
+ public abstract int initialOffset();
+
public abstract int windowEndOffset();
public abstract ByteString data();
@@ -58,7 +60,8 @@ abstract class VersionedChunk {
.setSessionId(UNKNOWN_SESSION_ID)
.setOffset(0)
.setWindowEndOffset(0)
- .setData(ByteString.EMPTY);
+ .setData(ByteString.EMPTY)
+ .setInitialOffset(0);
}
@AutoValue.Builder
@@ -73,6 +76,8 @@ abstract class VersionedChunk {
public abstract Builder setOffset(int offset);
+ public abstract Builder setInitialOffset(int initialOffset);
+
public abstract Builder setWindowEndOffset(int windowEndOffset);
public abstract Builder setData(ByteString data);
@@ -141,6 +146,8 @@ abstract class VersionedChunk {
builder.setOffset((int) chunk.getOffset()).setData(chunk.getData());
+ builder.setInitialOffset((int) chunk.getInitialOffset());
+
if (chunk.hasResourceId()) {
builder.setResourceId(chunk.getResourceId());
}
@@ -181,7 +188,8 @@ abstract class VersionedChunk {
.setType(type())
.setOffset(offset())
.setWindowEndOffset(windowEndOffset())
- .setData(data());
+ .setData(data())
+ .setInitialOffset(initialOffset());
remainingBytes().ifPresent(chunk::setRemainingBytes);
maxChunkSizeBytes().ifPresent(chunk::setMaxChunkSizeBytes);
diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
index d72149f5f..9ea5f45c5 100644
--- a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
+++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
@@ -32,7 +32,7 @@ class WriteTransfer extends Transfer<Void> {
private int maxChunkSizeBytes = 0;
private int minChunkDelayMicros = 0;
- private int sentOffset;
+ private int initialOffset;
private long totalDroppedBytes;
private final byte[] data;
@@ -44,19 +44,23 @@ class WriteTransfer extends Transfer<Void> {
TransferTimeoutSettings timeoutSettings,
byte[] data,
Consumer<TransferProgress> progressCallback,
- BooleanSupplier shouldAbortCallback) {
+ BooleanSupplier shouldAbortCallback,
+ int initialOffset) {
super(resourceId,
sessionId,
desiredProtocolVersion,
transferManager,
timeoutSettings,
progressCallback,
- shouldAbortCallback);
+ shouldAbortCallback,
+ initialOffset);
this.data = data;
+ this.initialOffset = initialOffset;
}
@Override
void prepareInitialChunk(VersionedChunk.Builder chunk) {
+ chunk.setInitialOffset(getOffset());
chunk.setRemainingBytes(data.length);
}
@@ -90,23 +94,23 @@ class WriteTransfer extends Transfer<Void> {
@Override
public void handleTimeout() throws TransferAbortedException {
ByteString chunkData = ByteString.copyFrom(
- data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes));
+ data, getOffset() - initialOffset, min(windowEndOffset - getOffset(), maxChunkSizeBytes));
if (VERBOSE_LOGGING) {
logger.atFinest().log("%s sending bytes %d-%d (%d B chunk, max size %d B)",
WriteTransfer.this,
- sentOffset,
- sentOffset + chunkData.size() - 1,
+ getOffset(),
+ getOffset() + chunkData.size() - 1,
chunkData.size(),
maxChunkSizeBytes);
}
sendChunk(buildDataChunk(chunkData));
- sentOffset += chunkData.size();
- updateProgress(sentOffset, windowStartOffset, data.length);
+ setOffset(getOffset() + chunkData.size());
+ updateProgress(getOffset(), windowStartOffset, data.length + initialOffset);
- if (sentOffset < windowEndOffset) {
+ if (getOffset() < windowEndOffset) {
setTimeoutMicros(minChunkDelayMicros);
return; // Keep transmitting packets
}
@@ -132,24 +136,24 @@ class WriteTransfer extends Transfer<Void> {
private void updateTransferParameters(VersionedChunk chunk) throws TransferAbortedException {
logger.atFiner().log("%s received new chunk %s", this, chunk);
- if (chunk.offset() > data.length) {
+ if (chunk.offset() > data.length + initialOffset) {
setStateTerminatingAndSendFinalChunk(Status.OUT_OF_RANGE);
return;
}
- int windowEndOffset = min(chunk.windowEndOffset(), data.length);
+ int windowEndOffset = min(chunk.windowEndOffset(), data.length + initialOffset);
if (chunk.requestsTransmissionFromOffset()) {
- long droppedBytes = sentOffset - chunk.offset();
+ long droppedBytes = getOffset() - chunk.offset();
if (droppedBytes > 0) {
totalDroppedBytes += droppedBytes;
logger.atFine().log("%s retransmitting %d B (%d retransmitted of %d sent)",
this,
droppedBytes,
totalDroppedBytes,
- sentOffset);
+ getOffset());
}
- sentOffset = chunk.offset();
- } else if (windowEndOffset <= sentOffset) {
+ setOffset(chunk.offset());
+ } else if (windowEndOffset <= getOffset()) {
logger.atFiner().log("%s ignoring old rolling window packet", this);
setNextChunkTimeout();
return; // Received an old rolling window packet, ignore it.
@@ -164,13 +168,13 @@ class WriteTransfer extends Transfer<Void> {
});
if (maxChunkSizeBytes == 0) {
- if (windowEndOffset == sentOffset) {
+ if (windowEndOffset == getOffset()) {
logger.atWarning().log("%s server requested 0 bytes; aborting", this);
setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
return;
}
// Default to sending the entire window if the max chunk size is not specified (or is 0).
- maxChunkSizeBytes = windowEndOffset - sentOffset;
+ maxChunkSizeBytes = windowEndOffset - getOffset();
}
// Enter the transmitting state and immediately send the first packet
@@ -179,10 +183,10 @@ class WriteTransfer extends Transfer<Void> {
private VersionedChunk buildDataChunk(ByteString chunkData) {
VersionedChunk.Builder chunk =
- newChunk(Chunk.Type.DATA).setOffset(sentOffset).setData(chunkData);
+ newChunk(Chunk.Type.DATA).setOffset(getOffset()).setData(chunkData);
// If this is the last data chunk, setRemainingBytes to 0.
- if (sentOffset + chunkData.size() == data.length) {
+ if (getOffset() + chunkData.size() == data.length + initialOffset) {
logger.atFiner().log("%s sending final chunk with %d B", this, chunkData.size());
chunk.setRemainingBytes(0);
}
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel b/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel
index af824bb39..38f53e3b8 100644
--- a/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel
+++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/BUILD.bazel
@@ -23,7 +23,6 @@ java_test(
test_class = "dev.pigweed.pw_transfer.TransferClientTest",
visibility = ["//visibility:public"],
deps = [
- "//pw_log/java/main/dev/pigweed/pw_log",
"//pw_rpc/java/main/dev/pigweed/pw_rpc:client",
"//pw_rpc/java/test/dev/pigweed/pw_rpc:test_client",
"//pw_transfer:transfer_proto_java_lite",
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
index 898f0e9b0..99488b2b1 100644
--- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
+++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
@@ -2461,8 +2461,8 @@ public final class TransferClientTest {
.setWindowEndOffset(transfer.getParametersForTest().maxPendingBytes())
.setMaxChunkSizeBytes(transfer.getParametersForTest().maxChunkSizeBytes())
.setOffset(0);
- if (transfer.getDesiredProtocolVersionForTest() != ProtocolVersion.LEGACY) {
- chunk.setProtocolVersion(transfer.getDesiredProtocolVersionForTest().ordinal());
+ if (transfer.getDesiredProtocolVersion() != ProtocolVersion.LEGACY) {
+ chunk.setProtocolVersion(transfer.getDesiredProtocolVersion().ordinal());
chunk.setDesiredSessionId(transfer.getSessionId());
}
if (transfer.getParametersForTest().chunkDelayMicroseconds() > 0) {
@@ -2511,7 +2511,7 @@ public final class TransferClientTest {
Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, transfer.getResourceId())
.setResourceId(transfer.getResourceId())
.setRemainingBytes(size);
- if (transfer.getDesiredProtocolVersionForTest() != ProtocolVersion.LEGACY) {
+ if (transfer.getDesiredProtocolVersion() != ProtocolVersion.LEGACY) {
chunk.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal());
chunk.setDesiredSessionId(transfer.getSessionId());
}
diff --git a/pw_transfer/public/pw_transfer/client.h b/pw_transfer/public/pw_transfer/client.h
index 62931daf8..90dcdebe2 100644
--- a/pw_transfer/public/pw_transfer/client.h
+++ b/pw_transfer/public/pw_transfer/client.h
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -13,12 +13,11 @@
// the License.
#pragma once
-#include <array>
-
#include "pw_function/function.h"
+#include "pw_result/result.h"
+#include "pw_rpc/client.h"
#include "pw_status/status.h"
#include "pw_stream/stream.h"
-#include "pw_transfer/internal/client_context.h"
#include "pw_transfer/internal/config.h"
#include "pw_transfer/transfer.raw_rpc.pb.h"
#include "pw_transfer/transfer_thread.h"
@@ -27,6 +26,41 @@ namespace pw::transfer {
class Client {
public:
+ /// A handle to an active transfer. Used to manage the transfer during its
+ /// operation.
+ class Handle {
+ public:
+ constexpr Handle() : client_(nullptr), id_(kUnassignedHandleId) {}
+
+ /// Terminates the transfer.
+ void Cancel() {
+ if (client_ != nullptr) {
+ client_->CancelTransfer(*this);
+ }
+ }
+
+ /// In a `Write()` transfer, updates the size of the resource being
+ /// transferred. This size will be indicated to the server.
+ void SetTransferSize(size_t size_bytes) {
+ if (client_ != nullptr) {
+ client_->UpdateTransferSize(*this, size_bytes);
+ }
+ }
+
+ private:
+ friend class Client;
+
+ static constexpr uint32_t kUnassignedHandleId = 0;
+
+ explicit constexpr Handle(Client* client, uint32_t id)
+ : client_(client), id_(id) {}
+ constexpr uint32_t id() const { return id_; }
+ constexpr bool is_unassigned() const { return id_ == kUnassignedHandleId; }
+
+ Client* client_;
+ uint32_t id_;
+ };
+
using CompletionFunc = Function<void(Status)>;
// Initializes a transfer client on a specified RPC client and channel.
@@ -57,14 +91,16 @@ class Client {
TransferThread& transfer_thread,
size_t max_bytes_to_receive = 0,
uint32_t extend_window_divisor = cfg::kDefaultExtendWindowDivisor)
- : client_(rpc_client, channel_id),
+ : default_protocol_version(ProtocolVersion::kLatest),
+ client_(rpc_client, channel_id),
transfer_thread_(transfer_thread),
+ next_handle_id_(1),
max_parameters_(max_bytes_to_receive > 0
? max_bytes_to_receive
: transfer_thread.max_chunk_size(),
transfer_thread.max_chunk_size(),
extend_window_divisor),
- max_retries_(cfg::kDefaultMaxRetries),
+ max_retries_(cfg::kDefaultMaxClientRetries),
max_lifetime_retries_(cfg::kDefaultMaxLifetimeRetries),
has_read_stream_(false),
has_write_stream_(false) {}
@@ -73,32 +109,63 @@ class Client {
// the server is written to the provided writer. Returns OK if the transfer is
// successfully started. When the transfer finishes (successfully or not), the
// completion callback is invoked with the overall status.
- Status Read(uint32_t resource_id,
- stream::Writer& output,
- CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout,
- chrono::SystemClock::duration initial_chunk_timeout =
- cfg::kDefaultInitialChunkTimeout,
- ProtocolVersion version = kDefaultProtocolVersion);
+ Result<Handle> Read(
+ uint32_t resource_id,
+ stream::Writer& output,
+ CompletionFunc&& on_completion,
+ ProtocolVersion protocol_version,
+ chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
+ chrono::SystemClock::duration initial_chunk_timeout =
+ cfg::kDefaultInitialChunkTimeout,
+ uint32_t initial_offset = 0u);
+
+ Result<Handle> Read(
+ uint32_t resource_id,
+ stream::Writer& output,
+ CompletionFunc&& on_completion,
+ chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
+ chrono::SystemClock::duration initial_chunk_timeout =
+ cfg::kDefaultInitialChunkTimeout,
+ uint32_t initial_offset = 0u) {
+ return Read(resource_id,
+ output,
+ std::move(on_completion),
+ default_protocol_version,
+ timeout,
+ initial_chunk_timeout,
+ initial_offset);
+ }
// Begins a new write transfer for the given resource ID. Data from the
// provided reader is sent to the server. When the transfer finishes
// (successfully or not), the completion callback is invoked with the overall
// status.
- Status Write(
+ Result<Handle> Write(
uint32_t resource_id,
stream::Reader& input,
CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout,
+ ProtocolVersion protocol_version,
+ chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
chrono::SystemClock::duration initial_chunk_timeout =
cfg::kDefaultInitialChunkTimeout,
- ProtocolVersion version = kDefaultProtocolVersion);
+ uint32_t initial_offset = 0u);
- // Terminates an ongoing transfer for the specified resource ID.
- //
- // TODO(frolv): This should not take a resource_id, but a handle to an active
- // transfer session.
- void CancelTransfer(uint32_t resource_id);
+ Result<Handle> Write(
+ uint32_t resource_id,
+ stream::Reader& input,
+ CompletionFunc&& on_completion,
+ chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
+ chrono::SystemClock::duration initial_chunk_timeout =
+ cfg::kDefaultInitialChunkTimeout,
+ uint32_t initial_offset = 0u) {
+ return Write(resource_id,
+ input,
+ std::move(on_completion),
+ default_protocol_version,
+ timeout,
+ initial_chunk_timeout,
+ initial_offset);
+ }
Status set_extend_window_divisor(uint32_t extend_window_divisor) {
if (extend_window_divisor <= 1) {
@@ -125,17 +192,37 @@ class Client {
return OkStatus();
}
+ constexpr void set_protocol_version(ProtocolVersion new_version) {
+ default_protocol_version = new_version;
+ }
+
private:
- static constexpr ProtocolVersion kDefaultProtocolVersion =
- ProtocolVersion::kLegacy;
+ // Terminates an ongoing transfer.
+ void CancelTransfer(Handle handle) {
+ if (!handle.is_unassigned()) {
+ transfer_thread_.CancelClientTransfer(handle.id());
+ }
+ }
+
+ void UpdateTransferSize(Handle handle, size_t transfer_size_bytes) {
+ if (!handle.is_unassigned()) {
+ transfer_thread_.UpdateClientTransfer(handle.id(), transfer_size_bytes);
+ }
+ }
+
+ ProtocolVersion default_protocol_version;
using Transfer = pw_rpc::raw::Transfer;
void OnRpcError(Status status, internal::TransferType type);
+ Handle AssignHandle();
+
Transfer::Client client_;
TransferThread& transfer_thread_;
+ uint32_t next_handle_id_;
+
internal::TransferParameters max_parameters_;
uint32_t max_retries_;
uint32_t max_lifetime_retries_;
diff --git a/pw_transfer/public/pw_transfer/handler.h b/pw_transfer/public/pw_transfer/handler.h
index 55df84db1..5f6332a51 100644
--- a/pw_transfer/public/pw_transfer/handler.h
+++ b/pw_transfer/public/pw_transfer/handler.h
@@ -1,4 +1,4 @@
-// Copyright 2022 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -13,6 +13,8 @@
// the License.
#pragma once
+#include <limits>
+
#include "pw_assert/assert.h"
#include "pw_containers/intrusive_list.h"
#include "pw_status/status.h"
@@ -45,6 +47,17 @@ class Handler : public IntrusiveList<Handler>::Item {
// Status::Unimplemented() indicates that reads are not supported.
virtual Status PrepareRead() = 0;
+ // Called at the beginning of a non-zero read transfer. The stream::Reader
+ // must be ready to read at the given offset after a successful PrepareRead()
+ // call. Returning a non-OK status aborts the read.
+ // The read handler should verify that the offset is a valid read point for
+ // the resource.
+ //
+ // Status::Unimplemented() indicates that non-zero reads are not supported.
+ virtual Status PrepareRead([[maybe_unused]] uint32_t offset) {
+ return Status::Unimplemented();
+ }
+
// FinalizeRead() is called at the end of a read transfer. The status argument
// indicates whether the data transfer was successful or not.
virtual void FinalizeRead(Status) {}
@@ -56,6 +69,17 @@ class Handler : public IntrusiveList<Handler>::Item {
// Status::Unimplemented() indicates that writes are not supported.
virtual Status PrepareWrite() = 0;
+ // Called at the beginning of a non-zero write transfer. The stream::writer
+ // must be ready to write at the given offset after a successful
+ // PrepareWrite() call. Returning a non-OK status aborts the write. The write
+ // handler should verify that the offset is a valid write point for the
+ // resource, and that the resource is prepared to write at that offset.
+ //
+ // Status::Unimplemented() indicates that non-zero writes are not supported.
+ virtual Status PrepareWrite([[maybe_unused]] uint32_t offset) {
+ return Status::Unimplemented();
+ }
+
// FinalizeWrite() is called at the end of a write transfer. The status
// argument indicates whether the data transfer was successful or not.
//
@@ -63,6 +87,29 @@ class Handler : public IntrusiveList<Handler>::Item {
// succeeded up to this point.
virtual Status FinalizeWrite(Status) { return OkStatus(); }
+ /// The total size of the transfer resource, if known. If unknown, returns
+ /// `std::numeric_limits<size_t>::max()`.
+ virtual size_t ResourceSize() const {
+ return std::numeric_limits<size_t>::max();
+ }
+
+ // GetStatus() is called to Transfer.GetResourceStatus RPC. The application
+ // layer invoking transfers should define the contents of these status
+ // variables for proper interpretation.
+ //
+ // Status::Unimplemented() indicates that the values have not been modifed by
+ // a handler.
+ virtual Status GetStatus(uint64_t& readable_offset,
+ uint64_t& writeable_offset,
+ uint64_t& read_checksum,
+ uint64_t& write_checksum) {
+ readable_offset = 0;
+ writeable_offset = 0;
+ read_checksum = 0;
+ write_checksum = 0;
+ return Status::Unimplemented();
+ }
+
protected:
constexpr Handler(uint32_t resource_id, stream::Reader* reader)
: resource_id_(resource_id), reader_(reader) {}
@@ -77,9 +124,14 @@ class Handler : public IntrusiveList<Handler>::Item {
friend class internal::Context;
// Prepares for either a read or write transfer.
- Status Prepare(internal::TransferType type) {
- return type == internal::TransferType::kTransmit ? PrepareRead()
- : PrepareWrite();
+ Status Prepare(internal::TransferType type, uint32_t offset = 0) {
+ if (offset == 0) {
+ return type == internal::TransferType::kTransmit ? PrepareRead()
+ : PrepareWrite();
+ }
+
+ return type == internal::TransferType::kTransmit ? PrepareRead(offset)
+ : PrepareWrite(offset);
}
// Only valid after a PrepareRead() or PrepareWrite() call that returns OK.
@@ -143,8 +195,7 @@ class ReadWriteHandler : public Handler {
public:
constexpr ReadWriteHandler(uint32_t resource_id)
: Handler(resource_id, static_cast<stream::Reader*>(nullptr)) {}
- constexpr ReadWriteHandler(uint32_t resource_id,
- stream::ReaderWriter& reader_writer)
+ ReadWriteHandler(uint32_t resource_id, stream::ReaderWriter& reader_writer)
: Handler(resource_id, &static_cast<stream::Reader&>(reader_writer)) {}
~ReadWriteHandler() override = default;
diff --git a/pw_transfer/public/pw_transfer/internal/chunk.h b/pw_transfer/public/pw_transfer/internal/chunk.h
index f2e03c8e0..bdbbc0877 100644
--- a/pw_transfer/public/pw_transfer/internal/chunk.h
+++ b/pw_transfer/public/pw_transfer/internal/chunk.h
@@ -140,6 +140,11 @@ class Chunk {
return *this;
}
+ constexpr Chunk& set_initial_offset(uint32_t offset) {
+ initial_offset_ = offset;
+ return *this;
+ }
+
// TODO(frolv): For some reason, the compiler complains if this setter is
// marked constexpr. Leaving it off for now, but this should be investigated
// and fixed.
@@ -215,6 +220,8 @@ class Chunk {
return Type::kParametersRetransmit;
}
+ constexpr uint32_t initial_offset() const { return initial_offset_; }
+
// Returns true if this parameters chunk is requesting that the transmitter
// transmit from its set offset instead of simply ACKing.
constexpr bool RequestsTransmissionFromOffset() const {
@@ -262,6 +269,7 @@ class Chunk {
max_chunk_size_bytes_(std::nullopt),
min_delay_microseconds_(std::nullopt),
offset_(0),
+ initial_offset_(0),
payload_({}),
remaining_bytes_(std::nullopt),
status_(std::nullopt),
@@ -289,6 +297,7 @@ class Chunk {
std::optional<uint32_t> max_chunk_size_bytes_;
std::optional<uint32_t> min_delay_microseconds_;
uint32_t offset_;
+ uint32_t initial_offset_;
ConstByteSpan payload_;
std::optional<uint64_t> remaining_bytes_;
std::optional<Status> status_;
diff --git a/pw_transfer/public/pw_transfer/internal/client_context.h b/pw_transfer/public/pw_transfer/internal/client_context.h
index aa7763eaa..f18b1a714 100644
--- a/pw_transfer/public/pw_transfer/internal/client_context.h
+++ b/pw_transfer/public/pw_transfer/internal/client_context.h
@@ -1,4 +1,4 @@
-// Copyright 2022 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -13,23 +13,47 @@
// the License.
#pragma once
+#include <limits>
+
#include "pw_function/function.h"
-#include "pw_rpc/raw/client_reader_writer.h"
#include "pw_transfer/internal/context.h"
namespace pw::transfer::internal {
class ClientContext final : public Context {
public:
- constexpr ClientContext() : on_completion_(nullptr) {}
+ constexpr ClientContext()
+ : handle_id_(0),
+ transfer_size_bytes_(std::numeric_limits<size_t>::max()),
+ on_completion_(nullptr) {}
void set_on_completion(Function<void(Status)>&& on_completion) {
on_completion_ = std::move(on_completion);
}
+ constexpr uint32_t handle_id() const { return handle_id_; }
+ constexpr void set_handle_id(uint32_t handle_id) { handle_id_ = handle_id; }
+
+ constexpr void set_transfer_size_bytes(size_t transfer_size_bytes) {
+ transfer_size_bytes_ = transfer_size_bytes;
+ }
+
private:
Status FinalCleanup(Status status) override;
+ size_t TransferSizeBytes() const override { return transfer_size_bytes_; }
+
+ // Seeks the reader to the offset, taking into account the client side reader
+ // needs to be shifted back for the initial offset.
+ Status SeekReader(uint32_t offset) override;
+
+ // Transfer clients assign a unique handle_id to all active transfer sessions.
+ // Unlike session or transfer IDs, this value is local to the client, not
+ // requiring any coordination with the transfer server, allowing users of the
+ // client to manage their ongoing transfers.
+ uint32_t handle_id_;
+
+ size_t transfer_size_bytes_;
Function<void(Status)> on_completion_;
};
diff --git a/pw_transfer/public/pw_transfer/internal/config.h b/pw_transfer/public/pw_transfer/internal/config.h
index 04bfa78f8..abc46b93a 100644
--- a/pw_transfer/public/pw_transfer/internal/config.h
+++ b/pw_transfer/public/pw_transfer/internal/config.h
@@ -21,15 +21,61 @@
#include "pw_chrono/system_clock.h"
-// The default maximum number of times a transfer should retry sending a chunk
-// when no response is received. This can later be configured per-transfer.
-#ifndef PW_TRANSFER_DEFAULT_MAX_RETRIES
-#define PW_TRANSFER_DEFAULT_MAX_RETRIES 3
+#ifdef PW_TRANSFER_DEFAULT_MAX_RETRIES
+#pragma message( \
+ "PW_TRANSFER_DEFAULT_MAX_RETRIES is deprecated; " \
+ "Use PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES and " \
+ "PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES instead.")
+#endif // PW_TRANSFER_DEFAULT_MAX_RETRIES
+
+#ifdef PW_TRANSFER_DEFAULT_TIMEOUT_MS
+#pragma message( \
+ "PW_TRANSFER_DEFAULT_TIMEOUT_MS is deprecated; " \
+ "Use PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS and " \
+ "PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS instead.")
+#endif // PW_TRANSFER_DEFAULT_TIMEOUT_MS
+
+// The default maximum number of times a transfer client should retry sending a
+// chunk when no response is received. Can later be configured per-transfer when
+// starting one.
+#ifndef PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES
+
+// Continue to accept the old deprecated setting until projects have migrated.
+#ifdef PW_TRANSFER_DEFAULT_MAX_RETRIES
+#define PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES PW_TRANSFER_DEFAULT_MAX_RETRIES
+#else
+#define PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES 3
#endif // PW_TRANSFER_DEFAULT_MAX_RETRIES
-static_assert(PW_TRANSFER_DEFAULT_MAX_RETRIES > 0 &&
- PW_TRANSFER_DEFAULT_MAX_RETRIES <=
+#endif // PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES
+
+static_assert(PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES >= 0 &&
+ PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES <=
+ static_cast<uint32_t>(std::numeric_limits<uint8_t>::max()));
+
+// The default maximum number of times a transfer server should retry sending a
+// chunk when no response is received.
+//
+// In typical setups, retries are driven by the client, and timeouts on the
+// server are used only to clean up resources, so this defaults to 0.
+#ifndef PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES
+
+// Continue to accept the old deprecated setting until projects have migrated.
+#ifdef PW_TRANSFER_DEFAULT_MAX_RETRIES
+#define PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES PW_TRANSFER_DEFAULT_MAX_RETRIES
+#else
+#define PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES 0
+#endif // PW_TRANSFER_DEFAULT_MAX_RETRIES
+
+#endif // PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES
+
+// GCC emits spurious -Wtype-limits warnings for the static_assert.
+PW_MODIFY_DIAGNOSTICS_PUSH();
+PW_MODIFY_DIAGNOSTIC_GCC(ignored, "-Wtype-limits");
+static_assert(PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES >= 0 &&
+ PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES <=
std::numeric_limits<uint8_t>::max());
+PW_MODIFY_DIAGNOSTICS_POP();
// The default maximum number of times a transfer should retry sending a chunk
// over the course of its entire lifetime.
@@ -38,31 +84,56 @@ static_assert(PW_TRANSFER_DEFAULT_MAX_RETRIES > 0 &&
// infinite loop.
#ifndef PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES
#define PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES \
- (static_cast<uint32_t>(PW_TRANSFER_DEFAULT_MAX_RETRIES) * 1000u)
+ (static_cast<uint32_t>(PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES) * 1000u)
#endif // PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES
static_assert(PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES >
- PW_TRANSFER_DEFAULT_MAX_RETRIES &&
+ PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES &&
PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES <=
std::numeric_limits<uint32_t>::max());
// The default amount of time, in milliseconds, to wait for a chunk to arrive
-// before retrying. This can later be configured per-transfer.
-#ifndef PW_TRANSFER_DEFAULT_TIMEOUT_MS
-#define PW_TRANSFER_DEFAULT_TIMEOUT_MS 2000
+// in a transfer client before retrying. This can later be configured
+// per-transfer.
+#ifndef PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS
+
+// Continue to accept the old deprecated setting until projects have migrated.
+#ifdef PW_TRANSFER_DEFAULT_TIMEOUT_MS
+#define PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS PW_TRANSFER_DEFAULT_TIMEOUT_MS
+#else
+#define PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS 2000
#endif // PW_TRANSFER_DEFAULT_TIMEOUT_MS
-static_assert(PW_TRANSFER_DEFAULT_TIMEOUT_MS > 0);
+#endif // PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS
-// The default amount of time, in milliseconds, to wait for an initial server
-// response to a transfer before retrying. This can later be configured
-// per-transfer.
+static_assert(PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS > 0);
+
+// The default amount of time, in milliseconds, to wait for a chunk to arrive
+// on the server before retrying. This can later be configured per-transfer.
+#ifndef PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS
+
+// Continue to accept the old deprecated setting until projects have migrated.
+#ifdef PW_TRANSFER_DEFAULT_TIMEOUT_MS
+#define PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS PW_TRANSFER_DEFAULT_TIMEOUT_MS
+#else
+#define PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS \
+ (static_cast<uint32_t>(PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS) * 5u)
+#endif // PW_TRANSFER_DEFAULT_TIMEOUT_MS
+
+#endif // PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS
+
+static_assert(PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS > 0);
+
+// The default amount of time, in milliseconds, for a client to wait for an
+// initial response from the transfer server before retrying. This can later be
+// configured // per-transfer.
//
-// This is set separately to PW_TRANSFER_DEFAULT_TIMEOUT_MS as transfers may
-// require additional time for resource initialization (e.g. erasing a flash
+// This is set separately to PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS as transfers
+// may require additional time for resource initialization (e.g. erasing a flash
// region before writing to it).
#ifndef PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS
-#define PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS PW_TRANSFER_DEFAULT_TIMEOUT_MS
+#define PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS \
+ PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS
#endif // PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS
static_assert(PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS > 0);
@@ -82,13 +153,20 @@ static_assert(PW_TRANSFER_DEFAULT_EXTEND_WINDOW_DIVISOR > 1);
namespace pw::transfer::cfg {
-inline constexpr uint8_t kDefaultMaxRetries = PW_TRANSFER_DEFAULT_MAX_RETRIES;
+inline constexpr uint8_t kDefaultMaxClientRetries =
+ PW_TRANSFER_DEFAULT_MAX_CLIENT_RETRIES;
+inline constexpr uint8_t kDefaultMaxServerRetries =
+ PW_TRANSFER_DEFAULT_MAX_SERVER_RETRIES;
inline constexpr uint16_t kDefaultMaxLifetimeRetries =
PW_TRANSFER_DEFAULT_MAX_LIFETIME_RETRIES;
-inline constexpr chrono::SystemClock::duration kDefaultChunkTimeout =
+inline constexpr chrono::SystemClock::duration kDefaultClientTimeout =
chrono::SystemClock::for_at_least(
- std::chrono::milliseconds(PW_TRANSFER_DEFAULT_TIMEOUT_MS));
+ std::chrono::milliseconds(PW_TRANSFER_DEFAULT_CLIENT_TIMEOUT_MS));
+inline constexpr chrono::SystemClock::duration kDefaultServerTimeout =
+ chrono::SystemClock::for_at_least(
+ std::chrono::milliseconds(PW_TRANSFER_DEFAULT_SERVER_TIMEOUT_MS));
+
inline constexpr chrono::SystemClock::duration kDefaultInitialChunkTimeout =
chrono::SystemClock::for_at_least(
std::chrono::milliseconds(PW_TRANSFER_DEFAULT_INITIAL_TIMEOUT_MS));
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index b4a80ec98..2391ee6cc 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -108,7 +108,8 @@ class Context {
~Context() = default;
constexpr Context()
- : session_id_(kUnassignedSessionId),
+ : initial_offset_(0),
+ session_id_(kUnassignedSessionId),
resource_id_(0),
desired_protocol_version_(ProtocolVersion::kUnknown),
configured_protocol_version_(ProtocolVersion::kUnknown),
@@ -138,6 +139,13 @@ class Context {
return static_cast<TransferType>(flags_ & kFlagsType);
}
+ stream::Reader& reader() {
+ PW_DASSERT(active() && type() == TransferType::kTransmit);
+ return static_cast<stream::Reader&>(*stream_);
+ }
+
+ uint32_t initial_offset_;
+
private:
enum class TransferState : uint8_t {
// The context is available for use for a new transfer.
@@ -194,11 +202,6 @@ class Context {
return static_cast<unsigned>(session_id_);
}
- stream::Reader& reader() {
- PW_DASSERT(active() && type() == TransferType::kTransmit);
- return static_cast<stream::Reader&>(*stream_);
- }
-
stream::Writer& writer() {
PW_DASSERT(active() && type() == TransferType::kReceive);
return static_cast<stream::Writer&>(*stream_);
@@ -254,6 +257,15 @@ class Context {
// failed.
virtual Status FinalCleanup(Status status) = 0;
+ // Returns the total size of the transfer resource, or
+ // `std::numeric_limits<size_t>::max()` if unbounded.
+ virtual size_t TransferSizeBytes() const = 0;
+
+ // Seeks the reader source. Client may need to seek with reference to the
+ // initial offset, where the server shouldn't, so each context needs its own
+ // seek method.
+ virtual Status SeekReader(uint32_t offset) = 0;
+
// Processes a chunk in either a transfer or receive transfer.
void HandleChunkEvent(const ChunkEvent& event);
diff --git a/pw_transfer/public/pw_transfer/internal/event.h b/pw_transfer/public/pw_transfer/internal/event.h
index 472bc59bf..0c6021086 100644
--- a/pw_transfer/public/pw_transfer/internal/event.h
+++ b/pw_transfer/public/pw_transfer/internal/event.h
@@ -1,4 +1,4 @@
-// Copyright 2022 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -14,6 +14,8 @@
#pragma once
#include "pw_chrono/system_clock.h"
+#include "pw_function/function.h"
+#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_rpc/writer.h"
#include "pw_stream/stream.h"
#include "pw_transfer/internal/protocol.h"
@@ -33,6 +35,12 @@ enum class TransferStream {
kServerWrite,
};
+enum class IdentifierType {
+ Session,
+ Resource,
+ Handle,
+};
+
enum class EventType {
// Begins a new transfer in an available context.
kNewClientTransfer,
@@ -55,12 +63,18 @@ enum class EventType {
// transfer context's completion handler; it is for out-of-band termination.
kSendStatusChunk,
+ // Changes parameters of an ongoing client transfer.
+ kUpdateClientTransfer,
+
// Manages the list of transfer handlers for a transfer service.
kAddTransferHandler,
kRemoveTransferHandler,
// For testing only: aborts the transfer thread.
kTerminate,
+
+ // Gets the status of a resource, if there is a handler registered for it.
+ kGetResourceStatus,
};
// Forward declarations required for events.
@@ -72,6 +86,7 @@ struct NewTransferEvent {
ProtocolVersion protocol_version;
uint32_t session_id;
uint32_t resource_id;
+ uint32_t handle_id;
rpc::Writer* rpc_writer;
const TransferParameters* max_parameters;
chrono::SystemClock::duration timeout;
@@ -87,6 +102,8 @@ struct NewTransferEvent {
const std::byte* raw_chunk_data;
size_t raw_chunk_size;
+
+ uint64_t initial_offset;
};
// A chunk received by a transfer client / server.
@@ -103,7 +120,8 @@ struct ChunkEvent {
};
struct EndTransferEvent {
- uint32_t session_id;
+ IdentifierType id_type;
+ uint32_t id;
Status::Code status;
bool send_status_chunk;
};
@@ -115,6 +133,25 @@ struct SendStatusChunkEvent {
TransferStream stream;
};
+struct UpdateTransferEvent {
+ uint32_t handle_id;
+ uint32_t transfer_size_bytes;
+};
+
+struct ResourceStatus {
+ uint32_t resource_id;
+ uint64_t readable_offset;
+ uint64_t writeable_offset;
+ uint64_t read_checksum;
+ uint64_t write_checksum;
+};
+
+using ResourceStatusCallback = Callback<void(Status, const ResourceStatus&)>;
+
+struct GetResourceStatusEvent {
+ uint32_t resource_id;
+};
+
struct Event {
EventType type;
@@ -123,8 +160,10 @@ struct Event {
ChunkEvent chunk;
EndTransferEvent end_transfer;
SendStatusChunkEvent send_status_chunk;
+ UpdateTransferEvent update_transfer;
Handler* add_transfer_handler;
Handler* remove_transfer_handler;
+ GetResourceStatusEvent resource_status;
};
};
diff --git a/pw_transfer/public/pw_transfer/internal/server_context.h b/pw_transfer/public/pw_transfer/internal/server_context.h
index ce9a78bbb..67173284f 100644
--- a/pw_transfer/public/pw_transfer/internal/server_context.h
+++ b/pw_transfer/public/pw_transfer/internal/server_context.h
@@ -1,4 +1,4 @@
-// Copyright 2022 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -44,6 +44,12 @@ class ServerContext final : public Context {
// Precondition: Transfer context is active.
Status FinalCleanup(Status status) override;
+ size_t TransferSizeBytes() const override { return handler_->ResourceSize(); }
+
+ // Seeks the reader to the given offset. Does not incorporate any initial
+ // offset
+ Status SeekReader(uint32_t offset) override;
+
Handler* handler_;
};
diff --git a/pw_transfer/public/pw_transfer/transfer.h b/pw_transfer/public/pw_transfer/transfer.h
index 93b39abe3..1bb5a0ea5 100644
--- a/pw_transfer/public/pw_transfer/transfer.h
+++ b/pw_transfer/public/pw_transfer/transfer.h
@@ -55,8 +55,8 @@ class TransferService : public pw_rpc::raw::Transfer::Service<TransferService> {
TransferService(
TransferThread& transfer_thread,
uint32_t max_pending_bytes,
- chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout,
- uint8_t max_retries = cfg::kDefaultMaxRetries,
+ chrono::SystemClock::duration chunk_timeout = cfg::kDefaultServerTimeout,
+ uint8_t max_retries = cfg::kDefaultMaxServerRetries,
uint32_t extend_window_divisor = cfg::kDefaultExtendWindowDivisor,
uint32_t max_lifetime_retries = cfg::kDefaultMaxLifetimeRetries)
: max_parameters_(max_pending_bytes,
@@ -87,6 +87,9 @@ class TransferService : public pw_rpc::raw::Transfer::Service<TransferService> {
thread_.SetServerWriteStream(reader_writer);
}
+ void GetResourceStatus(ConstByteSpan request,
+ rpc::RawUnaryResponder& responder);
+
void RegisterHandler(Handler& handler) {
thread_.AddTransferHandler(handler);
}
@@ -120,8 +123,12 @@ class TransferService : public pw_rpc::raw::Transfer::Service<TransferService> {
return OkStatus();
}
+ rpc::RawUnaryResponder resource_responder_;
+
private:
void HandleChunk(ConstByteSpan message, internal::TransferType type);
+ void ResourceStatusCallback(Status status,
+ const internal::ResourceStatus& stats);
internal::TransferParameters max_parameters_;
TransferThread& thread_;
diff --git a/pw_transfer/public/pw_transfer/transfer_thread.h b/pw_transfer/public/pw_transfer/transfer_thread.h
index 93e724008..30779ce81 100644
--- a/pw_transfer/public/pw_transfer/transfer_thread.h
+++ b/pw_transfer/public/pw_transfer/transfer_thread.h
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -18,7 +18,6 @@
#include "pw_assert/assert.h"
#include "pw_chrono/system_clock.h"
#include "pw_function/function.h"
-#include "pw_preprocessor/compiler.h"
#include "pw_rpc/raw/client_reader_writer.h"
#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_span/span.h"
@@ -32,6 +31,9 @@
#include "pw_transfer/internal/server_context.h"
namespace pw::transfer {
+
+class Client;
+
namespace internal {
class TransferThread : public thread::ThreadCore {
@@ -49,17 +51,20 @@ class TransferThread : public thread::ThreadCore {
void StartClientTransfer(TransferType type,
ProtocolVersion version,
uint32_t resource_id,
+ uint32_t handle_id,
stream::Stream* stream,
const TransferParameters& max_parameters,
Function<void(Status)>&& on_completion,
chrono::SystemClock::duration timeout,
chrono::SystemClock::duration initial_timeout,
uint8_t max_retries,
- uint32_t max_lifetime_retries) {
+ uint32_t max_lifetime_retries,
+ uint32_t initial_offset = 0) {
StartTransfer(type,
version,
Context::kUnassignedSessionId, // Assigned later.
resource_id,
+ handle_id,
/*raw_chunk=*/{},
stream,
max_parameters,
@@ -67,7 +72,8 @@ class TransferThread : public thread::ThreadCore {
timeout,
initial_timeout,
max_retries,
- max_lifetime_retries);
+ max_lifetime_retries,
+ initial_offset);
}
void StartServerTransfer(TransferType type,
@@ -78,11 +84,13 @@ class TransferThread : public thread::ThreadCore {
const TransferParameters& max_parameters,
chrono::SystemClock::duration timeout,
uint8_t max_retries,
- uint32_t max_lifetime_retries) {
+ uint32_t max_lifetime_retries,
+ uint32_t initial_offset = 0) {
StartTransfer(type,
version,
session_id,
resource_id,
+ /*handle_id=*/0,
raw_chunk,
/*stream=*/nullptr,
max_parameters,
@@ -90,7 +98,8 @@ class TransferThread : public thread::ThreadCore {
timeout,
timeout,
max_retries,
- max_lifetime_retries);
+ max_lifetime_retries,
+ initial_offset);
}
void ProcessClientChunk(ConstByteSpan chunk) {
@@ -112,18 +121,32 @@ class TransferThread : public thread::ThreadCore {
status);
}
+ void CancelClientTransfer(uint32_t handle_id) {
+ EndTransfer(EventType::kClientEndTransfer,
+ IdentifierType::Handle,
+ handle_id,
+ Status::Cancelled(),
+ /*send_status_chunk=*/true);
+ }
+
void EndClientTransfer(uint32_t session_id,
Status status,
bool send_status_chunk = false) {
- EndTransfer(
- EventType::kClientEndTransfer, session_id, status, send_status_chunk);
+ EndTransfer(EventType::kClientEndTransfer,
+ IdentifierType::Session,
+ session_id,
+ status,
+ send_status_chunk);
}
void EndServerTransfer(uint32_t session_id,
Status status,
bool send_status_chunk = false) {
- EndTransfer(
- EventType::kServerEndTransfer, session_id, status, send_status_chunk);
+ EndTransfer(EventType::kServerEndTransfer,
+ IdentifierType::Session,
+ session_id,
+ status,
+ send_status_chunk);
}
// Move the read/write streams on this thread instead of the transfer thread.
@@ -178,13 +201,19 @@ class TransferThread : public thread::ThreadCore {
SimulateTimeout(EventType::kServerTimeout, session_id);
}
+ void EnqueueResourceEvent(uint32_t resource_id,
+ ResourceStatusCallback&& callback);
+
private:
+ friend class transfer::Client;
friend class Context;
// Maximum amount of time between transfer thread runs.
static constexpr chrono::SystemClock::duration kMaxTimeout =
std::chrono::seconds(2);
+ void UpdateClientTransfer(uint32_t handle_id, size_t transfer_size_bytes);
+
// Finds an active server or client transfer, matching against its legacy ID.
template <typename T>
static Context* FindActiveTransferByLegacyId(const span<T>& transfers,
@@ -207,6 +236,16 @@ class TransferThread : public thread::ThreadCore {
return transfer != transfers.end() ? &*transfer : nullptr;
}
+ Context* FindClientTransferByHandleId(uint32_t handle_id) const {
+ auto transfer =
+ std::find_if(client_transfers_.begin(),
+ client_transfers_.end(),
+ [handle_id](auto& c) {
+ return c.initialized() && c.handle_id() == handle_id;
+ });
+ return transfer != client_transfers_.end() ? &*transfer : nullptr;
+ }
+
void SimulateTimeout(EventType type, uint32_t session_id);
// Finds an new server or client transfer.
@@ -240,13 +279,13 @@ class TransferThread : public thread::ThreadCore {
rpc::Writer& stream_for(TransferStream stream) {
switch (stream) {
case TransferStream::kClientRead:
- return client_read_stream_;
+ return client_read_stream_.as_writer();
case TransferStream::kClientWrite:
- return client_write_stream_;
+ return client_write_stream_.as_writer();
case TransferStream::kServerRead:
- return server_read_stream_;
+ return server_read_stream_.as_writer();
case TransferStream::kServerWrite:
- return server_write_stream_;
+ return server_write_stream_.as_writer();
}
// An unknown TransferStream value was passed, which means this function
// was passed an invalid enum value.
@@ -262,6 +301,7 @@ class TransferThread : public thread::ThreadCore {
ProtocolVersion version,
uint32_t session_id,
uint32_t resource_id,
+ uint32_t handle_id,
ConstByteSpan raw_chunk,
stream::Stream* stream,
const TransferParameters& max_parameters,
@@ -269,7 +309,8 @@ class TransferThread : public thread::ThreadCore {
chrono::SystemClock::duration timeout,
chrono::SystemClock::duration initial_timeout,
uint8_t max_retries,
- uint32_t max_lifetime_retries);
+ uint32_t max_lifetime_retries,
+ uint32_t initial_offset);
void ProcessChunk(EventType type, ConstByteSpan chunk);
@@ -279,6 +320,7 @@ class TransferThread : public thread::ThreadCore {
Status status);
void EndTransfer(EventType type,
+ IdentifierType id_type,
uint32_t session_id,
Status status,
bool send_status_chunk);
@@ -290,6 +332,8 @@ class TransferThread : public thread::ThreadCore {
void SendStatusChunk(const SendStatusChunkEvent& event);
+ void GetResourceState(uint32_t resource_id);
+
sync::TimedThreadNotification event_notification_;
sync::BinarySemaphore next_event_ownership_;
@@ -320,6 +364,8 @@ class TransferThread : public thread::ThreadCore {
// Buffer into which responses are encoded. Only ever used from within the
// transfer thread, so no locking is required.
ByteSpan encode_buffer_;
+
+ ResourceStatusCallback resource_status_callback_ = nullptr;
};
} // namespace internal
diff --git a/pw_transfer/pw_transfer_private/chunk_testing.h b/pw_transfer/pw_transfer_private/chunk_testing.h
index d909ab500..68087e0de 100644
--- a/pw_transfer/pw_transfer_private/chunk_testing.h
+++ b/pw_transfer/pw_transfer_private/chunk_testing.h
@@ -13,10 +13,10 @@
// the License.
#pragma once
-#include "gtest/gtest.h"
#include "pw_bytes/span.h"
#include "pw_containers/vector.h"
#include "pw_transfer/internal/chunk.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer::test {
diff --git a/pw_transfer/py/pw_transfer/__init__.py b/pw_transfer/py/pw_transfer/__init__.py
index 55a0b61d6..fafd07024 100644
--- a/pw_transfer/py/pw_transfer/__init__.py
+++ b/pw_transfer/py/pw_transfer/__init__.py
@@ -13,6 +13,12 @@
# the License.
"""Provides a simple interface for transferring bulk data over pw_rpc."""
+# The generated protos for this module overlap this `__init__.py` file's import
+# namespace, so we need to use extend_path() for them to be discoverable.
+# Note: this needs to be done in every nested `__init__.py` file as well (if
+# any exist).
+__path__ = __import__('pkgutil').extend_path(__path__, __name__)
+
from pw_transfer.transfer import (
ProgressCallback,
ProgressStats,
diff --git a/pw_transfer/py/pw_transfer/chunk.py b/pw_transfer/py/pw_transfer/chunk.py
index e527dafa4..6237743db 100644
--- a/pw_transfer/py/pw_transfer/chunk.py
+++ b/pw_transfer/py/pw_transfer/chunk.py
@@ -92,6 +92,7 @@ class Chunk:
max_chunk_size_bytes: Optional[int] = None,
min_delay_microseconds: Optional[int] = None,
status: Optional[Status] = None,
+ initial_offset: int = 0,
):
"""Creates a new transfer chunk.
@@ -113,6 +114,8 @@ class Chunk:
data chunk.
min_delay_microseconds: Delay between data chunks to be sent.
status: In a COMPLETION chunk, final status of the transfer.
+ initial_offset: Initial offset for non-zero starting offset
+ transfers
"""
self.protocol_version = protocol_version
self.type = chunk_type
@@ -126,6 +129,7 @@ class Chunk:
self.max_chunk_size_bytes = max_chunk_size_bytes
self.min_delay_microseconds = min_delay_microseconds
self.status = status
+ self.initial_offset = initial_offset
@classmethod
def from_message(cls, message: transfer_pb2.Chunk) -> 'Chunk':
@@ -156,6 +160,7 @@ class Chunk:
offset=message.offset,
window_end_offset=message.window_end_offset,
data=message.data,
+ initial_offset=message.initial_offset,
)
if message.HasField('session_id'):
@@ -251,6 +256,8 @@ class Chunk:
# explictly encoded.
message.protocol_version = self.protocol_version.value
+ message.initial_offset = self.initial_offset
+
return message
def id(self) -> int:
diff --git a/pw_transfer/py/pw_transfer/client.py b/pw_transfer/py/pw_transfer/client.py
index 6b2ab6dd2..946fe6928 100644
--- a/pw_transfer/py/pw_transfer/client.py
+++ b/pw_transfer/py/pw_transfer/client.py
@@ -1,4 +1,4 @@
-# Copyright 2022 The Pigweed Authors
+# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
@@ -17,7 +17,7 @@ import asyncio
import ctypes
import logging
import threading
-from typing import Any, Dict, Optional, Union
+from typing import Any, Callable, Dict, Optional, Union
from pw_rpc.callback_client import BidirectionalStreamingCall
from pw_status import Status
@@ -42,6 +42,68 @@ _LOG = logging.getLogger(__package__)
_TransferDict = Dict[int, Transfer]
+class _TransferStream:
+ def __init__(
+ self,
+ method,
+ chunk_handler: Callable[[Chunk], Any],
+ error_handler: Callable[[Status], Any],
+ max_reopen_attempts=3,
+ ):
+ self._method = method
+ self._chunk_handler = chunk_handler
+ self._error_handler = error_handler
+ self._call: Optional[BidirectionalStreamingCall] = None
+ self._reopen_attempts = 0
+ self._max_reopen_attempts = max_reopen_attempts
+
+ def is_open(self) -> bool:
+ return self._call is not None
+
+ def open(self, force: bool = False) -> None:
+ if force or self._call is None:
+ self._call = self._method.invoke(
+ lambda _, chunk: self._on_chunk_received(chunk),
+ on_error=lambda _, status: self._on_stream_error(status),
+ )
+
+ def close(self) -> None:
+ if self._call is not None:
+ self._call.cancel()
+ self._call = None
+
+ def send(self, chunk: Chunk) -> None:
+ assert self._call is not None
+ self._call.send(chunk.to_message())
+
+ def _on_chunk_received(self, chunk: Chunk) -> None:
+ self._reopen_attempts = 0
+ self._chunk_handler(chunk)
+
+ def _on_stream_error(self, rpc_status: Status) -> None:
+ if rpc_status is Status.FAILED_PRECONDITION:
+ # FAILED_PRECONDITION indicates that the stream packet was not
+ # recognized as the stream is not open. Attempt to re-open the
+ # stream to allow pending transfers to continue.
+ self._reopen_attempts += 1
+ if self._reopen_attempts > self._max_reopen_attempts:
+ _LOG.error(
+ 'Failed to reopen transfer stream after %d tries',
+ self._max_reopen_attempts,
+ )
+ self._error_handler(Status.UNAVAILABLE)
+ else:
+ _LOG.info(
+ 'Transfer stream failed to write; attempting to re-open'
+ )
+ self.open(force=True)
+ else:
+ # Other errors are unrecoverable; clear the stream.
+ _LOG.error('Transfer stream shut down with status %s', rpc_status)
+ self._call = None
+ self._error_handler(rpc_status)
+
+
class Manager: # pylint: disable=too-many-instance-attributes
"""A manager for transmitting data through an RPC TransferService.
@@ -61,7 +123,7 @@ class Manager: # pylint: disable=too-many-instance-attributes
initial_response_timeout_s: float = 4.0,
max_retries: int = 3,
max_lifetime_retries: int = 1500,
- default_protocol_version=ProtocolVersion.LEGACY,
+ default_protocol_version=ProtocolVersion.VERSION_TWO,
):
"""Initializes a Manager on top of a TransferService.
@@ -73,6 +135,8 @@ class Manager: # pylint: disable=too-many-instance-attributes
max_retires: number of times to retry a single package after a timeout
max_lifetime_retires: Cumulative maximum number of times to retry over
the course of the transfer before giving up.
+ default_protocol_version: Defaults to V2, can be set to legacy for
+ projects which use legacy devices.
"""
self._service: Any = rpc_transfer_service
self._default_response_timeout_s = default_response_timeout_s
@@ -86,11 +150,6 @@ class Manager: # pylint: disable=too-many-instance-attributes
self._write_transfers: _TransferDict = {}
self._next_session_id = ctypes.c_uint32(1)
- # RPC streams for read and write transfers. These are shareable by
- # multiple transfers of the same type.
- self._read_stream: Optional[BidirectionalStreamingCall] = None
- self._write_stream: Optional[BidirectionalStreamingCall] = None
-
self._loop = asyncio.new_event_loop()
# Set the event loop for the current thread.
asyncio.set_event_loop(self._loop)
@@ -106,6 +165,23 @@ class Manager: # pylint: disable=too-many-instance-attributes
target=self._start_event_loop_thread, daemon=True
)
+ # RPC streams for read and write transfers. These are shareable by
+ # multiple transfers of the same type.
+ self._read_stream = _TransferStream(
+ self._service.Read,
+ lambda chunk: self._loop.call_soon_threadsafe(
+ self._read_chunk_queue.put_nowait, chunk
+ ),
+ self._on_read_error,
+ )
+ self._write_stream = _TransferStream(
+ self._service.Write,
+ lambda chunk: self._loop.call_soon_threadsafe(
+ self._write_chunk_queue.put_nowait, chunk
+ ),
+ self._on_write_error,
+ )
+
self._thread.start()
def __del__(self):
@@ -120,6 +196,9 @@ class Manager: # pylint: disable=too-many-instance-attributes
resource_id: int,
progress_callback: Optional[ProgressCallback] = None,
protocol_version: Optional[ProtocolVersion] = None,
+ chunk_timeout_s: Optional[float] = None,
+ initial_timeout_s: Optional[float] = None,
+ initial_offset: int = 0,
) -> bytes:
"""Receives ("downloads") data from the server.
@@ -128,6 +207,17 @@ class Manager: # pylint: disable=too-many-instance-attributes
progress_callback: Optional callback periodically invoked throughout
the transfer with the transfer state. Can be used to provide user-
facing status updates such as progress bars.
+ protocol_version: The desired protocol version to use for this
+ transfer. Defaults to the version the manager was initialized
+ (typically VERSION_TWO).
+ chunk_timeout_s: Timeout for any individual chunk.
+ initial_timeout_s: Timeout for the first chunk, overrides
+ chunk_timeout_s.
+ initial_offset: Initial offset to start reading from. Must be
+ supported by the transfer handler. All transfers support starting
+ from 0, the default. Returned bytes will not have any padding
+ related to this initial offset. No seeking is done in the transfer
+ operation on the client side.
Raises:
Error: the transfer failed to complete
@@ -141,23 +231,36 @@ class Manager: # pylint: disable=too-many-instance-attributes
if protocol_version is None:
protocol_version = self._default_protocol_version
+ if protocol_version == ProtocolVersion.LEGACY and initial_offset != 0:
+ raise ValueError(
+ f'Unsupported transfer with offset {initial_offset} started '
+ + 'with legacy protocol'
+ )
+
session_id = (
resource_id
if protocol_version is ProtocolVersion.LEGACY
else self.assign_session_id()
)
+ if chunk_timeout_s is None:
+ chunk_timeout_s = self._default_response_timeout_s
+
+ if initial_timeout_s is None:
+ initial_timeout_s = self._initial_response_timeout_s
+
transfer = ReadTransfer(
session_id,
resource_id,
- self._send_read_chunk,
+ self._read_stream.send,
self._end_read_transfer,
- self._default_response_timeout_s,
- self._initial_response_timeout_s,
+ chunk_timeout_s,
+ initial_timeout_s,
self.max_retries,
self.max_lifetime_retries,
protocol_version,
progress_callback=progress_callback,
+ initial_offset=initial_offset,
)
self._start_read_transfer(transfer)
@@ -174,6 +277,9 @@ class Manager: # pylint: disable=too-many-instance-attributes
data: Union[bytes, str],
progress_callback: Optional[ProgressCallback] = None,
protocol_version: Optional[ProtocolVersion] = None,
+ chunk_timeout_s: Optional[Any] = None,
+ initial_timeout_s: Optional[Any] = None,
+ initial_offset: int = 0,
) -> None:
"""Transmits ("uploads") data to the server.
@@ -183,6 +289,17 @@ class Manager: # pylint: disable=too-many-instance-attributes
progress_callback: Optional callback periodically invoked throughout
the transfer with the transfer state. Can be used to provide user-
facing status updates such as progress bars.
+ protocol_version: The desired protocol version to use for this
+ transfer. Defaults to the version the manager was initialized
+ (defaults to LATEST).
+ chunk_timeout_s: Timeout for any individual chunk.
+ initial_timeout_s: Timeout for the first chunk, overrides
+ chunk_timeout_s.
+ initial_offset: Initial offset to start writing to. Must be supported
+ by the transfer handler. All transfers support starting from 0,
+ the default. data arg should start with the data you want to see
+ starting at this initial offset on the server. No seeking is done
+ in the transfer operation on the client side.
Raises:
Error: the transfer failed to complete
@@ -199,24 +316,40 @@ class Manager: # pylint: disable=too-many-instance-attributes
if protocol_version is None:
protocol_version = self._default_protocol_version
+ if (
+ protocol_version != ProtocolVersion.VERSION_TWO
+ and initial_offset != 0
+ ):
+ raise ValueError(
+ f'Unsupported transfer with offset {initial_offset} started '
+ + 'with legacy protocol'
+ )
+
session_id = (
resource_id
if protocol_version is ProtocolVersion.LEGACY
else self.assign_session_id()
)
+ if chunk_timeout_s is None:
+ chunk_timeout_s = self._default_response_timeout_s
+
+ if initial_timeout_s is None:
+ initial_timeout_s = self._initial_response_timeout_s
+
transfer = WriteTransfer(
session_id,
resource_id,
data,
- self._send_write_chunk,
+ self._write_stream.send,
self._end_write_transfer,
- self._default_response_timeout_s,
- self._initial_response_timeout_s,
+ chunk_timeout_s,
+ initial_timeout_s,
self.max_retries,
self.max_lifetime_retries,
protocol_version,
progress_callback=progress_callback,
+ initial_offset=initial_offset,
)
self._start_write_transfer(transfer)
@@ -225,14 +358,6 @@ class Manager: # pylint: disable=too-many-instance-attributes
if not transfer.status.ok():
raise Error(transfer.resource_id, transfer.status)
- def _send_read_chunk(self, chunk: Chunk) -> None:
- assert self._read_stream is not None
- self._read_stream.send(chunk.to_message())
-
- def _send_write_chunk(self, chunk: Chunk) -> None:
- assert self._write_stream is not None
- self._write_stream.send(chunk.to_message())
-
def assign_session_id(self) -> int:
new_id = self._next_session_id.value
@@ -336,71 +461,29 @@ class Manager: # pylint: disable=too-many-instance-attributes
await transfer.handle_chunk(chunk)
- def _open_read_stream(self) -> None:
- self._read_stream = self._service.Read.invoke(
- lambda _, chunk: self._loop.call_soon_threadsafe(
- self._read_chunk_queue.put_nowait, chunk
- ),
- on_error=lambda _, status: self._on_read_error(status),
- )
-
def _on_read_error(self, status: Status) -> None:
"""Callback for an RPC error in the read stream."""
- if status is Status.FAILED_PRECONDITION:
- # FAILED_PRECONDITION indicates that the stream packet was not
- # recognized as the stream is not open. This could occur if the
- # server resets during an active transfer. Re-open the stream to
- # allow pending transfers to continue.
- self._open_read_stream()
- else:
- # Other errors are unrecoverable. Clear the stream and cancel any
- # pending transfers with an INTERNAL status as this is a system
- # error.
- self._read_stream = None
-
- for transfer in self._read_transfers.values():
- transfer.finish(Status.INTERNAL, skip_callback=True)
- self._read_transfers.clear()
+ for transfer in self._read_transfers.values():
+ transfer.finish(Status.INTERNAL, skip_callback=True)
+ self._read_transfers.clear()
- _LOG.error('Read stream shut down: %s', status)
-
- def _open_write_stream(self) -> None:
- self._write_stream = self._service.Write.invoke(
- lambda _, chunk: self._loop.call_soon_threadsafe(
- self._write_chunk_queue.put_nowait, chunk
- ),
- on_error=lambda _, status: self._on_write_error(status),
- )
+ _LOG.error('Read stream shut down: %s', status)
def _on_write_error(self, status: Status) -> None:
"""Callback for an RPC error in the write stream."""
- if status is Status.FAILED_PRECONDITION:
- # FAILED_PRECONDITION indicates that the stream packet was not
- # recognized as the stream is not open. This could occur if the
- # server resets during an active transfer. Re-open the stream to
- # allow pending transfers to continue.
- self._open_write_stream()
- else:
- # Other errors are unrecoverable. Clear the stream and cancel any
- # pending transfers with an INTERNAL status as this is a system
- # error.
- self._write_stream = None
-
- for transfer in self._write_transfers.values():
- transfer.finish(Status.INTERNAL, skip_callback=True)
- self._write_transfers.clear()
+ for transfer in self._write_transfers.values():
+ transfer.finish(Status.INTERNAL, skip_callback=True)
+ self._write_transfers.clear()
- _LOG.error('Write stream shut down: %s', status)
+ _LOG.error('Write stream shut down: %s', status)
def _start_read_transfer(self, transfer: Transfer) -> None:
"""Begins a new read transfer, opening the stream if it isn't."""
self._read_transfers[transfer.resource_id] = transfer
-
- if not self._read_stream:
- self._open_read_stream()
+ self._read_stream.open()
_LOG.debug('Starting new read transfer %d', transfer.id)
self._loop.call_soon_threadsafe(
@@ -418,19 +501,15 @@ class Manager: # pylint: disable=too-many-instance-attributes
transfer.status,
)
- # TODO(frolv): This doesn't seem to work. Investigate why.
# If no more transfers are using the read stream, close it.
- # if not self._read_transfers and self._read_stream:
- # self._read_stream.cancel()
- # self._read_stream = None
+ if not self._read_transfers:
+ self._read_stream.close()
def _start_write_transfer(self, transfer: Transfer) -> None:
"""Begins a new write transfer, opening the stream if it isn't."""
self._write_transfers[transfer.resource_id] = transfer
-
- if not self._write_stream:
- self._open_write_stream()
+ self._write_stream.open()
_LOG.debug('Starting new write transfer %d', transfer.id)
self._loop.call_soon_threadsafe(
@@ -448,11 +527,9 @@ class Manager: # pylint: disable=too-many-instance-attributes
transfer.status,
)
- # TODO(frolv): This doesn't seem to work. Investigate why.
# If no more transfers are using the write stream, close it.
- # if not self._write_transfers and self._write_stream:
- # self._write_stream.cancel()
- # self._write_stream = None
+ if not self._write_transfers:
+ self._write_stream.close()
class Error(Exception):
diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py
index 1cbd0d5cd..bb1bb2de8 100644
--- a/pw_transfer/py/pw_transfer/transfer.py
+++ b/pw_transfer/py/pw_transfer/transfer.py
@@ -35,7 +35,7 @@ class ProgressStats:
total_size_bytes: Optional[int]
def percent_received(self) -> float:
- if self.total_size_bytes is None:
+ if self.total_size_bytes is None or self.total_size_bytes == 0:
return math.nan
return self.bytes_confirmed_received / self.total_size_bytes * 100
@@ -131,12 +131,14 @@ class Transfer(abc.ABC):
max_lifetime_retries: int,
protocol_version: ProtocolVersion,
progress_callback: Optional[ProgressCallback] = None,
+ initial_offset: int = 0,
):
self.status = Status.OK
self.done = threading.Event()
self._session_id = session_id
self._resource_id = resource_id
+ self._offset = initial_offset
self._send_chunk_fn = send_chunk
self._end_transfer = end_transfer
@@ -186,6 +188,9 @@ class Transfer(abc.ABC):
resource_id=self._resource_id,
)
+ if self._offset != 0:
+ initial_chunk.initial_offset = self._offset
+
if self._desired_protocol_version is ProtocolVersion.VERSION_TWO:
initial_chunk.desired_session_id = self._session_id
@@ -255,8 +260,12 @@ class Transfer(abc.ABC):
# Expecting a completion ACK but didn't receive one. Go through
# the retry process.
self._on_timeout()
- else:
+ # Only ignoring START_ACK, tests were unhappy with other non-data chunks
+ elif chunk.type not in [Chunk.Type.START_ACK]:
await self._handle_data_chunk(chunk)
+ else:
+ _LOG.warning("Ignoring extra START_ACK chunk")
+ return
# Start the timeout for the server to send a chunk in response.
self._response_timer.start()
@@ -275,6 +284,13 @@ class Transfer(abc.ABC):
self.id,
)
+ if self._offset != 0:
+ _LOG.error(
+ 'Non-zero offset transfers not supported by legacy protocol'
+ )
+ self.finish(Status.INTERNAL)
+ return
+
self._configured_protocol_version = ProtocolVersion.LEGACY
self._state = Transfer._State.WAITING
@@ -298,6 +314,11 @@ class Transfer(abc.ABC):
chunk.protocol_version.value,
)
+ if self._offset != chunk.initial_offset:
+ # If our offsets don't match, let user handle it
+ self.finish(Status.UNIMPLEMENTED)
+ return
+
# Send a confirmation chunk to the server accepting the assigned session
# ID and protocol version. Tag any initial transfer parameters onto the
# chunk to begin the data transfer.
@@ -305,7 +326,9 @@ class Transfer(abc.ABC):
self._configured_protocol_version,
Chunk.Type.START_ACK_CONFIRMATION,
session_id=self._session_id,
+ offset=self._offset,
)
+
self._set_initial_chunk_fields(start_ack_confirmation)
self._state = Transfer._State.WAITING
@@ -435,6 +458,7 @@ class WriteTransfer(Transfer):
max_lifetime_retries: int,
protocol_version: ProtocolVersion,
progress_callback: Optional[ProgressCallback] = None,
+ initial_offset: int = 0,
):
super().__init__(
session_id,
@@ -447,10 +471,11 @@ class WriteTransfer(Transfer):
max_lifetime_retries,
protocol_version,
progress_callback,
+ initial_offset=initial_offset,
)
self._data = data
+ self.initial_offset = initial_offset
- self._offset = 0
self._window_end_offset = 0
self._max_chunk_size = 0
self._chunk_delay_us: Optional[int] = None
@@ -515,7 +540,9 @@ class WriteTransfer(Transfer):
self._send_chunk(chunk)
self._update_progress(
- self._offset, self._bytes_confirmed_received, len(self.data)
+ self._offset,
+ self._bytes_confirmed_received,
+ len(self.data) + self.initial_offset,
)
if sent_requested_bytes:
@@ -530,13 +557,13 @@ class WriteTransfer(Transfer):
def _handle_parameters_update(self, chunk: Chunk) -> bool:
"""Updates transfer state based on a transfer parameters update."""
- if chunk.offset > len(self.data):
+ if chunk.offset > len(self.data) + self.initial_offset:
# Bad offset; terminate the transfer.
_LOG.error(
'Transfer %d: server requested invalid offset %d (size %d)',
self.id,
chunk.offset,
- len(self.data),
+ len(self.data) + self.initial_offset,
)
self._send_final_chunk(Status.OUT_OF_RANGE)
@@ -551,7 +578,9 @@ class WriteTransfer(Transfer):
return False
# Extend the window to the new end offset specified by the server.
- self._window_end_offset = min(chunk.window_end_offset, len(self.data))
+ self._window_end_offset = min(
+ chunk.window_end_offset, len(self.data) + self.initial_offset
+ )
if chunk.requests_transmission_from_offset():
# Check whether the client has sent a previous data offset, which
@@ -593,10 +622,18 @@ class WriteTransfer(Transfer):
max_bytes_in_chunk = min(
self._max_chunk_size, self._window_end_offset - self._offset
)
- chunk.data = self.data[self._offset : self._offset + max_bytes_in_chunk]
+ chunk.data = self.data[
+ self._offset
+ - self.initial_offset : self._offset
+ - self.initial_offset
+ + max_bytes_in_chunk
+ ]
# Mark the final chunk of the transfer.
- if len(self.data) - self._offset <= max_bytes_in_chunk:
+ if (
+ len(self.data) - self._offset + self.initial_offset
+ <= max_bytes_in_chunk
+ ):
chunk.remaining_bytes = 0
return chunk
@@ -634,6 +671,7 @@ class ReadTransfer(Transfer):
max_chunk_size: int = 1024,
chunk_delay_us: Optional[int] = None,
progress_callback: Optional[ProgressCallback] = None,
+ initial_offset: int = 0,
):
super().__init__(
session_id,
@@ -646,6 +684,7 @@ class ReadTransfer(Transfer):
max_lifetime_retries,
protocol_version,
progress_callback,
+ initial_offset=initial_offset,
)
self._max_bytes_to_receive = max_bytes_to_receive
self._max_chunk_size = max_chunk_size
@@ -653,7 +692,6 @@ class ReadTransfer(Transfer):
self._remaining_transfer_size: Optional[int] = None
self._data = bytearray()
- self._offset = 0
self._window_end_offset = max_bytes_to_receive
self._last_chunk_offset: Optional[int] = None
diff --git a/pw_transfer/py/tests/transfer_test.py b/pw_transfer/py/tests/transfer_test.py
index 8b29293f3..5b4057d98 100644
--- a/pw_transfer/py/tests/transfer_test.py
+++ b/pw_transfer/py/tests/transfer_test.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright 2022 The Pigweed Authors
+# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
@@ -61,7 +61,7 @@ class TransferManagerTest(unittest.TestCase):
self._service = self._client.channel(1).rpcs.pw.transfer.Transfer
self._sent_chunks: List[transfer_pb2.Chunk] = []
- self._packets_to_send: List[List[bytes]] = []
+ self._packets_to_send: List[List[packet_pb2.RpcPacket]] = []
def _enqueue_server_responses(
self, method: _Method, responses: Iterable[Iterable[transfer_pb2.Chunk]]
@@ -77,7 +77,7 @@ class TransferManagerTest(unittest.TestCase):
method_id=method.value,
status=Status.OK.value,
payload=response.SerializeToString(),
- ).SerializeToString()
+ )
)
self._packets_to_send.append(serialized_group)
@@ -90,7 +90,7 @@ class TransferManagerTest(unittest.TestCase):
service_id=_TRANSFER_SERVICE_ID,
method_id=method.value,
status=error.value,
- ).SerializeToString()
+ )
]
)
@@ -106,7 +106,8 @@ class TransferManagerTest(unittest.TestCase):
if self._packets_to_send:
responses = self._packets_to_send.pop(0)
for response in responses:
- self._client.process_packet(response)
+ response.call_id = packet.call_id
+ self._client.process_packet(response.SerializeToString())
def _received_data(self) -> bytearray:
data = bytearray()
@@ -401,6 +402,61 @@ class TransferManagerTest(unittest.TestCase):
self.assertEqual(exception.resource_id, 31)
self.assertEqual(exception.status, Status.NOT_FOUND)
+ def test_read_transfer_reopen(self) -> None:
+ manager = pw_transfer.Manager(
+ self._service,
+ initial_response_timeout_s=DEFAULT_TIMEOUT_S,
+ default_response_timeout_s=DEFAULT_TIMEOUT_S,
+ )
+
+ # A FAILED_PRECONDITION error should attempt a stream reopen.
+ self._enqueue_server_error(_Method.READ, Status.FAILED_PRECONDITION)
+ self._enqueue_server_responses(
+ _Method.READ,
+ (
+ (
+ transfer_pb2.Chunk(
+ transfer_id=3,
+ offset=0,
+ data=b'xyz',
+ remaining_bytes=0,
+ ),
+ ),
+ ),
+ )
+
+ # The transfer should complete following reopen, with the first chunk
+ # being retried.
+ data = manager.read(3)
+ self.assertEqual(data, b'xyz')
+ self.assertEqual(len(self._sent_chunks), 3)
+ self.assertEqual(self._sent_chunks[0], self._sent_chunks[1])
+ self.assertTrue(self._sent_chunks[-1].HasField('status'))
+ self.assertEqual(self._sent_chunks[-1].status, 0)
+
+ def test_read_transfer_reopen_max_attempts(self) -> None:
+ manager = pw_transfer.Manager(
+ self._service,
+ initial_response_timeout_s=DEFAULT_TIMEOUT_S,
+ default_response_timeout_s=DEFAULT_TIMEOUT_S,
+ )
+
+ # A FAILED_PRECONDITION error should attempt a stream reopen; enqueue
+ # several.
+ self._enqueue_server_error(_Method.READ, Status.FAILED_PRECONDITION)
+ self._enqueue_server_error(_Method.READ, Status.FAILED_PRECONDITION)
+ self._enqueue_server_error(_Method.READ, Status.FAILED_PRECONDITION)
+ self._enqueue_server_error(_Method.READ, Status.FAILED_PRECONDITION)
+ self._enqueue_server_error(_Method.READ, Status.FAILED_PRECONDITION)
+
+ with self.assertRaises(pw_transfer.Error) as context:
+ manager.read(81)
+
+ exception = context.exception
+ self.assertEqual(len(self._sent_chunks), 4)
+ self.assertEqual(exception.resource_id, 81)
+ self.assertEqual(exception.status, Status.INTERNAL)
+
def test_read_transfer_server_error(self) -> None:
manager = pw_transfer.Manager(
self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
@@ -417,7 +473,8 @@ class TransferManagerTest(unittest.TestCase):
def test_write_transfer_basic(self) -> None:
manager = pw_transfer.Manager(
- self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
+ self._service,
+ default_response_timeout_s=DEFAULT_TIMEOUT_S,
)
self._enqueue_server_responses(
diff --git a/pw_transfer/server_context.cc b/pw_transfer/server_context.cc
index 353258f5b..3ecf61d33 100644
--- a/pw_transfer/server_context.cc
+++ b/pw_transfer/server_context.cc
@@ -53,4 +53,8 @@ Status ServerContext::FinalCleanup(const Status status) {
return OkStatus();
}
+Status ServerContext::SeekReader(uint32_t offset) {
+ return reader().Seek(offset);
+}
+
} // namespace pw::transfer::internal
diff --git a/pw_transfer/transfer.cc b/pw_transfer/transfer.cc
index eb04aeeb1..14dd32a41 100644
--- a/pw_transfer/transfer.cc
+++ b/pw_transfer/transfer.cc
@@ -14,6 +14,7 @@
#include "pw_transfer/transfer.h"
+#include "public/pw_transfer/transfer.h"
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_status/try.h"
@@ -47,6 +48,14 @@ void TransferService::HandleChunk(ConstByteSpan message,
return;
}
+ uint32_t initial_offset;
+
+ if (chunk->is_legacy()) {
+ initial_offset = 0;
+ } else {
+ initial_offset = chunk->initial_offset();
+ }
+
thread_.StartServerTransfer(type,
chunk->protocol_version(),
session_id,
@@ -55,10 +64,74 @@ void TransferService::HandleChunk(ConstByteSpan message,
max_parameters_,
chunk_timeout_,
max_retries_,
- max_lifetime_retries_);
+ max_lifetime_retries_,
+ initial_offset);
} else {
thread_.ProcessServerChunk(message);
}
}
+void TransferService::GetResourceStatus(pw::ConstByteSpan request,
+ pw::rpc::RawUnaryResponder& responder) {
+ uint32_t resource_id;
+ Status status;
+
+ protobuf::Decoder decoder(request);
+ if (status = decoder.Next(); status.IsOutOfRange()) {
+ resource_id = 0;
+ } else if (!status.ok()) {
+ responder.Finish({}, Status::DataLoss()).IgnoreError();
+ return;
+ } else if (static_cast<pwpb::ResourceStatusRequest::Fields>(
+ decoder.FieldNumber()) ==
+ pwpb::ResourceStatusRequest::Fields::kResourceId) {
+ if (status = decoder.ReadUint32(&resource_id); !status.ok()) {
+ responder.Finish({}, Status::DataLoss()).IgnoreError();
+ return;
+ }
+ } else {
+ responder.Finish({}, Status::DataLoss()).IgnoreError();
+ return;
+ }
+
+ if (TransferService::resource_responder_.active()) {
+ responder.Finish({}, Status::Unavailable()).IgnoreError();
+ return;
+ }
+
+ TransferService::resource_responder_ = std::move(responder);
+
+ thread_.EnqueueResourceEvent(
+ resource_id,
+ [this](Status call_status, const internal::ResourceStatus stats) {
+ this->ResourceStatusCallback(call_status, stats);
+ });
+}
+
+void TransferService::ResourceStatusCallback(
+ Status status, const internal::ResourceStatus& stats) {
+ PW_ASSERT(resource_responder_.active());
+
+ if (!status.ok()) {
+ resource_responder_.Finish({}, status).IgnoreError();
+ }
+
+ std::array<std::byte, pwpb::ResourceStatus::kMaxEncodedSizeBytes> buffer = {};
+ pwpb::ResourceStatus::MemoryEncoder encoder(buffer);
+
+ encoder.WriteResourceId(stats.resource_id).IgnoreError();
+ encoder.WriteStatus(status.code()).IgnoreError();
+ encoder.WriteReadableOffset(stats.readable_offset).IgnoreError();
+ encoder.WriteReadChecksum(stats.read_checksum).IgnoreError();
+ encoder.WriteWriteableOffset(stats.writeable_offset).IgnoreError();
+ encoder.WriteWriteChecksum(stats.write_checksum).IgnoreError();
+
+ if (!encoder.status().ok()) {
+ resource_responder_.Finish({}, encoder.status()).IgnoreError();
+ return;
+ }
+
+ resource_responder_.Finish(ConstByteSpan(encoder), status).IgnoreError();
+}
+
} // namespace pw::transfer
diff --git a/pw_transfer/transfer.proto b/pw_transfer/transfer.proto
index 2d3c5d4c3..9bbedaa07 100644
--- a/pw_transfer/transfer.proto
+++ b/pw_transfer/transfer.proto
@@ -28,6 +28,10 @@ service Transfer {
// Transfer data from the client to the server; an "upload" from the client's
// perspective.
rpc Write(stream Chunk) returns (stream Chunk);
+
+ // Query the status of a resource. Can be used for partially completed
+ // transfers
+ rpc GetResourceStatus(ResourceStatusRequest) returns (ResourceStatus);
}
// Represents a chunk of data sent by the transfer service. Includes fields for
@@ -224,4 +228,37 @@ message Chunk {
// Write → Requested ID of transfer session
// Write ← N/A
optional uint32 desired_session_id = 14;
+
+ // The initial offset to start the transfer from. Can be used for read or
+ // write transfers. Set by the client during start handshake.
+ // Needs to be accepted by the resource transfer handler in order for the
+ // non-zero offset transfer to start from the initial_offset.
+ //
+ // Read → Requested initial offset for the session
+ // Read ← Confirmed (matches) or denied (zero) initial offset
+ // Write → Requested initial offset for the session
+ // Write ← Confirmed (matches) or denied (zero) initial offset
+ uint64 initial_offset = 15;
+}
+
+// Request for GetResourceStatus, indicating the resource to get status from.
+message ResourceStatusRequest {
+ uint32 resource_id = 1;
+}
+
+// Response for GetResourceStatus
+message ResourceStatus {
+ // Resource id, matching request
+ uint32 resource_id = 1;
+
+ // Status of the resource, returns Unimplemented by default.
+ uint32 status = 2;
+ // The offset that can be written to (other than 0).
+ uint64 writeable_offset = 3;
+ // The offset that can be read from (other than 0).
+ uint64 readable_offset = 4;
+ // The checksum at the given write offset.
+ optional uint64 write_checksum = 5;
+ // The checksum at the given read offset.
+ optional uint64 read_checksum = 6;
}
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index fcc4e1999..95bb385e9 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -14,7 +14,8 @@
#include "pw_transfer/transfer.h"
-#include "gtest/gtest.h"
+#include <limits>
+
#include "pw_assert/check.h"
#include "pw_bytes/array.h"
#include "pw_containers/algorithm.h"
@@ -24,6 +25,7 @@
#include "pw_thread_stl/options.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_transfer_private/chunk_testing.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer::test {
namespace {
@@ -74,6 +76,7 @@ class SimpleReadTransfer final : public ReadOnlyHandler {
prepare_read_called(false),
finalize_read_called(false),
finalize_read_status(Status::Unknown()),
+ resource_size_(std::numeric_limits<size_t>::max()),
reader_(data) {}
Status PrepareRead() final {
@@ -93,6 +96,11 @@ class SimpleReadTransfer final : public ReadOnlyHandler {
finalize_read_status = status;
}
+ size_t ResourceSize() const final { return resource_size_; }
+
+ void set_resource_size(size_t resource_size) {
+ resource_size_ = resource_size;
+ }
void set_seek_status(Status status) { reader_.seek_status = status; }
void set_read_status(Status status) { reader_.read_status = status; }
@@ -100,6 +108,7 @@ class SimpleReadTransfer final : public ReadOnlyHandler {
bool finalize_read_called;
Status prepare_read_return_status;
Status finalize_read_status;
+ size_t resource_size_;
private:
TestMemoryReader reader_;
@@ -135,7 +144,7 @@ class ReadTransfer : public ::testing::Test {
SimpleReadTransfer handler_;
Thread<1, 1> transfer_thread_;
- PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
+ PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 8) ctx_;
thread::Thread system_thread_;
std::array<std::byte, 64> data_buffer_;
std::array<std::byte, 64> encode_buffer_;
@@ -742,7 +751,8 @@ class WriteTransfer : public ::testing::Test {
ctx_(transfer_thread_,
max_bytes_to_receive,
// Use a long timeout to avoid accidentally triggering timeouts.
- std::chrono::minutes(1)) {
+ std::chrono::minutes(1),
+ /*max_retries=*/3) {
ctx_.service().RegisterHandler(handler_);
PW_CHECK(!handler_.prepare_write_called);
@@ -1990,6 +2000,84 @@ TEST_F(ReadTransfer, Version2_PrepareError) {
EXPECT_EQ(chunk.status().value(), Status::DataLoss());
}
+TEST_F(ReadTransfer, Version2_HandlerSetsTransferSize) {
+ handler_.set_resource_size(kData.size());
+
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_desired_session_id(kArbitrarySessionId)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ // First, the server responds with a START_ACK, accepting the session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Complete the handshake by confirming the server's ACK and sending the first
+ // read transfer parameters.
+ rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(kArbitrarySessionId)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(8)
+ .set_offset(0)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+ });
+
+ ASSERT_EQ(ctx_.total_responses(), 6u);
+
+ // Each of the sent chunks should have a remaining_bytes value set.
+ Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c1.type(), Chunk::Type::kData);
+ EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
+ EXPECT_EQ(c1.offset(), 0u);
+ ASSERT_TRUE(c1.remaining_bytes().has_value());
+ EXPECT_EQ(c1.remaining_bytes().value(), 24u);
+
+ Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+ EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c2.type(), Chunk::Type::kData);
+ EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
+ EXPECT_EQ(c2.offset(), 8u);
+ ASSERT_TRUE(c2.remaining_bytes().has_value());
+ EXPECT_EQ(c2.remaining_bytes().value(), 16u);
+
+ Chunk c3 = DecodeChunk(ctx_.responses()[3]);
+ EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c3.type(), Chunk::Type::kData);
+ EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
+ EXPECT_EQ(c3.offset(), 16u);
+ ASSERT_TRUE(c3.remaining_bytes().has_value());
+ EXPECT_EQ(c3.remaining_bytes().value(), 8u);
+
+ Chunk c4 = DecodeChunk(ctx_.responses()[4]);
+ EXPECT_EQ(c4.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c4.type(), Chunk::Type::kData);
+ EXPECT_EQ(c4.session_id(), kArbitrarySessionId);
+ EXPECT_EQ(c4.offset(), 24u);
+ ASSERT_TRUE(c4.remaining_bytes().has_value());
+ EXPECT_EQ(c4.remaining_bytes().value(), 0u);
+
+ ctx_.SendClientStream(EncodeChunk(Chunk::Final(
+ ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
TEST_F(WriteTransfer, Version2_SimpleTransfer) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
diff --git a/pw_transfer/transfer_thread.cc b/pw_transfer/transfer_thread.cc
index 84bf92651..51e388d79 100644
--- a/pw_transfer/transfer_thread.cc
+++ b/pw_transfer/transfer_thread.cc
@@ -1,4 +1,4 @@
-// Copyright 2023 The Pigweed Authors
+// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -19,6 +19,8 @@
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_transfer/internal/chunk.h"
+#include "pw_transfer/internal/client_context.h"
+#include "pw_transfer/internal/event.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
@@ -103,6 +105,7 @@ void TransferThread::StartTransfer(
ProtocolVersion version,
uint32_t session_id,
uint32_t resource_id,
+ uint32_t handle_id,
ConstByteSpan raw_chunk,
stream::Stream* stream,
const TransferParameters& max_parameters,
@@ -110,7 +113,8 @@ void TransferThread::StartTransfer(
chrono::SystemClock::duration timeout,
chrono::SystemClock::duration initial_timeout,
uint8_t max_retries,
- uint32_t max_lifetime_retries) {
+ uint32_t max_lifetime_retries,
+ uint32_t initial_offset) {
// Block until the last event has been processed.
next_event_ownership_.acquire();
@@ -136,6 +140,7 @@ void TransferThread::StartTransfer(
.protocol_version = version,
.session_id = session_id,
.resource_id = resource_id,
+ .handle_id = handle_id,
.max_parameters = &max_parameters,
.timeout = timeout,
.initial_timeout = initial_timeout,
@@ -144,6 +149,7 @@ void TransferThread::StartTransfer(
.transfer_thread = this,
.raw_chunk_data = chunk_buffer_.data(),
.raw_chunk_size = raw_chunk.size(),
+ .initial_offset = initial_offset,
};
staged_on_completion_ = std::move(on_completion);
@@ -153,18 +159,20 @@ void TransferThread::StartTransfer(
// with the specified ID.
if (is_client_transfer) {
next_event_.new_transfer.stream = stream;
- next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
- type == TransferType::kTransmit ? client_write_stream_
- : client_read_stream_);
+ next_event_.new_transfer.rpc_writer =
+ &(type == TransferType::kTransmit ? client_write_stream_
+ : client_read_stream_)
+ .as_writer();
} else {
auto handler = std::find_if(handlers_.begin(),
handlers_.end(),
[&](auto& h) { return h.id() == resource_id; });
if (handler != handlers_.end()) {
next_event_.new_transfer.handler = &*handler;
- next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
- type == TransferType::kTransmit ? server_read_stream_
- : server_write_stream_);
+ next_event_.new_transfer.rpc_writer =
+ &(type == TransferType::kTransmit ? server_read_stream_
+ : server_write_stream_)
+ .as_writer();
} else {
// No handler exists for the transfer: return a NOT_FOUND.
next_event_.type = EventType::kSendStatusChunk;
@@ -230,7 +238,8 @@ void TransferThread::SendStatus(TransferStream stream,
}
void TransferThread::EndTransfer(EventType type,
- uint32_t session_id,
+ IdentifierType id_type,
+ uint32_t id,
Status status,
bool send_status_chunk) {
// Block until the last event has been processed.
@@ -238,7 +247,8 @@ void TransferThread::EndTransfer(EventType type,
next_event_.type = type;
next_event_.end_transfer = {
- .session_id = session_id,
+ .id_type = id_type,
+ .id = id,
.status = status.code(),
.send_status_chunk = send_status_chunk,
};
@@ -246,6 +256,18 @@ void TransferThread::EndTransfer(EventType type,
event_notification_.release();
}
+void TransferThread::UpdateClientTransfer(uint32_t handle_id,
+ size_t transfer_size_bytes) {
+ // Block until the last event has been processed.
+ next_event_ownership_.acquire();
+
+ next_event_.type = EventType::kUpdateClientTransfer;
+ next_event_.update_transfer.handle_id = handle_id;
+ next_event_.update_transfer.transfer_size_bytes = transfer_size_bytes;
+
+ event_notification_.release();
+}
+
void TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
// Block until the last event has been processed.
next_event_ownership_.acquire();
@@ -269,7 +291,8 @@ void TransferThread::HandleEvent(const internal::Event& event) {
.type = EventType::kServerEndTransfer,
.end_transfer =
EndTransferEvent{
- .session_id = server_context.session_id(),
+ .id_type = IdentifierType::Session,
+ .id = server_context.session_id(),
.status = Status::Aborted().code(),
.send_status_chunk = false,
},
@@ -282,7 +305,8 @@ void TransferThread::HandleEvent(const internal::Event& event) {
.type = EventType::kClientEndTransfer,
.end_transfer =
EndTransferEvent{
- .session_id = client_context.session_id(),
+ .id_type = IdentifierType::Session,
+ .id = client_context.session_id(),
.status = Status::Aborted().code(),
.send_status_chunk = false,
},
@@ -311,7 +335,8 @@ void TransferThread::HandleEvent(const internal::Event& event) {
.type = EventType::kServerEndTransfer,
.end_transfer =
EndTransferEvent{
- .session_id = server_context.session_id(),
+ .id_type = IdentifierType::Session,
+ .id = server_context.session_id(),
.status = Status::Aborted().code(),
.send_status_chunk = false,
},
@@ -321,6 +346,10 @@ void TransferThread::HandleEvent(const internal::Event& event) {
handlers_.remove(*event.remove_transfer_handler);
return;
+ case EventType::kGetResourceStatus:
+ GetResourceState(event.resource_status.resource_id);
+ return;
+
case EventType::kNewClientTransfer:
case EventType::kNewServerTransfer:
case EventType::kClientChunk:
@@ -329,6 +358,7 @@ void TransferThread::HandleEvent(const internal::Event& event) {
case EventType::kServerTimeout:
case EventType::kClientEndTransfer:
case EventType::kServerEndTransfer:
+ case EventType::kUpdateClientTransfer:
default:
// Other events are handled by individual transfer contexts.
break;
@@ -356,8 +386,15 @@ void TransferThread::HandleEvent(const internal::Event& event) {
if (event.type == EventType::kNewClientTransfer) {
// TODO(frolv): This is terrible.
- static_cast<ClientContext*>(ctx)->set_on_completion(
- std::move(staged_on_completion_));
+ ClientContext* cctx = static_cast<ClientContext*>(ctx);
+ cctx->set_on_completion(std::move(staged_on_completion_));
+ cctx->set_handle_id(event.new_transfer.handle_id);
+ }
+
+ if (event.type == EventType::kUpdateClientTransfer) {
+ static_cast<ClientContext&>(*ctx).set_transfer_size_bytes(
+ event.update_transfer.transfer_size_bytes);
+ return;
}
ctx->HandleEvent(event);
@@ -395,16 +432,24 @@ Context* TransferThread::FindContextForEvent(
event.chunk.context_identifier);
case EventType::kClientEndTransfer:
+ if (event.end_transfer.id_type == IdentifierType::Handle) {
+ return FindClientTransferByHandleId(event.end_transfer.id);
+ }
return FindActiveTransferByLegacyId(client_transfers_,
- event.end_transfer.session_id);
+ event.end_transfer.id);
case EventType::kServerEndTransfer:
+ PW_DCHECK(event.end_transfer.id_type != IdentifierType::Handle);
return FindActiveTransferByLegacyId(server_transfers_,
- event.end_transfer.session_id);
+ event.end_transfer.id);
+
+ case EventType::kUpdateClientTransfer:
+ return FindClientTransferByHandleId(event.update_transfer.handle_id);
case EventType::kSendStatusChunk:
case EventType::kAddTransferHandler:
case EventType::kRemoveTransferHandler:
case EventType::kTerminate:
+ case EventType::kGetResourceStatus:
default:
return nullptr;
}
@@ -440,6 +485,45 @@ uint32_t TransferThread::AssignSessionId() {
return session_id;
}
+// Adds GetResourceStatusEvent to the queue. Will fail if there is already a
+// GetResourceStatusEvent in process.
+void TransferThread::EnqueueResourceEvent(uint32_t resource_id,
+ ResourceStatusCallback&& callback) {
+ // Block until the last event has been processed.
+ next_event_ownership_.acquire();
+
+ next_event_.type = EventType::kGetResourceStatus;
+
+ resource_status_callback_ = std::move(callback);
+
+ next_event_.resource_status.resource_id = resource_id;
+
+ event_notification_.release();
+}
+
+// Should only be called when we got a valid callback and RPC responder from
+// GetResourceStatus transfer RPC.
+void TransferThread::GetResourceState(uint32_t resource_id) {
+ PW_ASSERT(resource_status_callback_ != nullptr);
+
+ auto handler = std::find_if(handlers_.begin(), handlers_.end(), [&](auto& h) {
+ return h.id() == resource_id;
+ });
+ internal::ResourceStatus stats;
+ stats.resource_id = resource_id;
+
+ if (handler != handlers_.end()) {
+ Status status = handler->GetStatus(stats.readable_offset,
+ stats.writeable_offset,
+ stats.read_checksum,
+ stats.write_checksum);
+
+ resource_status_callback_(status, stats);
+ } else {
+ resource_status_callback_(Status::NotFound(), stats);
+ }
+}
+
} // namespace pw::transfer::internal
PW_MODIFY_DIAGNOSTICS_POP();
diff --git a/pw_transfer/transfer_thread_test.cc b/pw_transfer/transfer_thread_test.cc
index 0df41ec6f..b956250d0 100644
--- a/pw_transfer/transfer_thread_test.cc
+++ b/pw_transfer/transfer_thread_test.cc
@@ -1,4 +1,4 @@
-// Copyright 2022 The Pigweed Authors
+// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -14,7 +14,6 @@
#include "pw_transfer/transfer_thread.h"
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_bytes/array.h"
#include "pw_rpc/raw/client_testing.h"
@@ -26,6 +25,7 @@
#include "pw_transfer/transfer.h"
#include "pw_transfer/transfer.raw_rpc.pb.h"
#include "pw_transfer_private/chunk_testing.h"
+#include "pw_unit_test/framework.h"
namespace pw::transfer::test {
namespace {
@@ -220,8 +220,9 @@ TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
+ // Ensure only one chunk is sent as end offset equals max size.
.set_window_end_offset(16)
- .set_max_chunk_size_bytes(8)
+ .set_max_chunk_size_bytes(16)
.set_offset(0)),
max_parameters_,
kNeverTimeout,
@@ -244,8 +245,9 @@ TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(4)
+ // Ensure only one chunk is sent as end offset equals max size.
.set_window_end_offset(16)
- .set_max_chunk_size_bytes(8)
+ .set_max_chunk_size_bytes(16)
.set_offset(0)),
max_parameters_,
kNeverTimeout,
@@ -279,7 +281,8 @@ TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
transfer_thread_.StartClientTransfer(
internal::TransferType::kReceive,
ProtocolVersion::kLegacy,
- 3,
+ /*resource_id=*/3,
+ /*handle_id=*/27,
&buffer3,
max_parameters_,
[&status3](Status status) { status3 = status; },
@@ -297,7 +300,8 @@ TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
transfer_thread_.StartClientTransfer(
internal::TransferType::kReceive,
ProtocolVersion::kLegacy,
- 4,
+ /*resource_id=*/4,
+ /*handle_id=*/27,
&buffer4,
max_parameters_,
[&status4](Status status) { status4 = status; },
diff --git a/pw_transfer/ts/transfer_test.ts b/pw_transfer/ts/transfer_test.ts
index 723539dff..7d6b2156b 100644
--- a/pw_transfer/ts/transfer_test.ts
+++ b/pw_transfer/ts/transfer_test.ts
@@ -89,6 +89,7 @@ describe('Transfer client', () => {
packet.setChannelId(1);
packet.setServiceId(service.id);
packet.setMethodId(method.id);
+ packet.setCallId(method.rpcs.nextCallId);
packet.setStatus(error);
packetsToSend.push([packet.serializeBinary()]);
}
@@ -102,6 +103,7 @@ describe('Transfer client', () => {
packet.setChannelId(1);
packet.setServiceId(service.id);
packet.setMethodId(method.id);
+ packet.setCallId(method.rpcs.nextCallId);
packet.setStatus(Status.OK);
packet.setPayload(response.serializeBinary());
serializedGroup.push(packet.serializeBinary());