aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc
diff options
context:
space:
mode:
authorAlexei Frolov <frolv@google.com>2020-08-04 10:19:24 -0700
committerCQ Bot Account <commit-bot@chromium.org>2020-09-30 16:31:44 +0000
commit4d2adde7c674a20e57a043b4f6456824a4d6f41a (patch)
tree1a8b79deef2d73adc62823daf90e0792ed4b6c4f /pw_rpc
parentbb57d9cdd56031f81e3d8af2a71ee0cf7320a2a5 (diff)
downloadpigweed-4d2adde7c674a20e57a043b4f6456824a4d6f41a.tar.gz
pw_rpc: Basic client implementation
This implements an RPC client class which is used to make RPC requests to a server. Like the server, the client exists globally and processes incoming RPC packets. The client keeps track of active RPC request contexts through ClientCall objects, and dispatches packets to them when an expected response is received. A nanopb implementation of a ClientCall is added as well, supporting unary and server-streaming RPC calls, with a codegen-ready API. Change-Id: If9615877199e0d4bc468c33d3d9ecf85da32440a Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/15741 Commit-Queue: Alexei Frolov <frolv@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_rpc')
-rw-r--r--pw_rpc/BUILD66
-rw-r--r--pw_rpc/BUILD.gn70
-rw-r--r--pw_rpc/base_client_call.cc44
-rw-r--r--pw_rpc/base_client_call_test.cc72
-rw-r--r--pw_rpc/client.cc92
-rw-r--r--pw_rpc/client_test.cc78
-rw-r--r--pw_rpc/docs.rst69
-rw-r--r--pw_rpc/nanopb/BUILD.gn43
-rw-r--r--pw_rpc/nanopb/docs.rst7
-rw-r--r--pw_rpc/nanopb/nanopb_client_call.cc33
-rw-r--r--pw_rpc/nanopb/nanopb_client_call_test.cc273
-rw-r--r--pw_rpc/nanopb/nanopb_common.cc55
-rw-r--r--pw_rpc/nanopb/nanopb_method.cc38
-rw-r--r--pw_rpc/nanopb/nanopb_method_test.cc31
-rw-r--r--pw_rpc/nanopb/public/pw_rpc/internal/nanopb_common.h63
-rw-r--r--pw_rpc/nanopb/public/pw_rpc/internal/nanopb_method.h29
-rw-r--r--pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h167
-rw-r--r--pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h62
-rw-r--r--pw_rpc/public/pw_rpc/channel.h19
-rw-r--r--pw_rpc/public/pw_rpc/client.h62
-rw-r--r--pw_rpc/public/pw_rpc/internal/base_client_call.h82
-rw-r--r--pw_rpc/public/pw_rpc/internal/method_type.h25
-rw-r--r--pw_rpc/pw_rpc_private/internal_test_utils.h43
-rw-r--r--pw_rpc/pw_rpc_test_protos/test.options15
24 files changed, 1441 insertions, 97 deletions
diff --git a/pw_rpc/BUILD b/pw_rpc/BUILD
index a5eb04614..63d9cff87 100644
--- a/pw_rpc/BUILD
+++ b/pw_rpc/BUILD
@@ -23,29 +23,57 @@ package(default_visibility = ["//visibility:public"])
licenses(["notice"]) # Apache License 2.0
pw_cc_library(
+ name = "client",
+ srcs = [
+ "client.cc",
+ "base_client_call.cc",
+ ],
+ hdrs = [
+ "public/pw_rpc/client.h",
+ "public/pw_rpc/internal/base_client_call.h",
+ ],
+ deps = [
+ ":common",
+ ]
+)
+
+pw_cc_library(
name = "server",
srcs = [
"base_server_writer.cc",
- "channel.cc",
- "packet.cc",
"public/pw_rpc/internal/base_server_writer.h",
"public/pw_rpc/internal/call.h",
- "public/pw_rpc/internal/channel.h",
"public/pw_rpc/internal/hash.h",
"public/pw_rpc/internal/method.h",
- "public/pw_rpc/internal/packet.h",
"public/pw_rpc/internal/server.h",
"server.cc",
"service.cc",
],
hdrs = [
- "public/pw_rpc/channel.h",
"public/pw_rpc/server.h",
"public/pw_rpc/server_context.h",
"public/pw_rpc/service.h",
],
includes = ["public"],
deps = [
+ ":common",
+ ],
+)
+
+pw_cc_library(
+ name = "common",
+ srcs = [
+ "channel.cc",
+ "packet.cc",
+ "public/pw_rpc/internal/channel.h",
+ "public/pw_rpc/internal/method_type.h",
+ "public/pw_rpc/internal/packet.h",
+ ],
+ hdrs = [
+ "public/pw_rpc/channel.h",
+ ],
+ includes = ["public"],
+ deps = [
"//pw_assert",
"//pw_log",
"//pw_span",
@@ -73,12 +101,18 @@ filegroup(
srcs = [
"nanopb/codegen_test.cc",
"nanopb/echo_service_test.cc",
+ "nanopb/nanopb_client_call.cc",
+ "nanopb/nanopb_client_call_test.cc",
+ "nanopb/nanopb_common.cc",
"nanopb/nanopb_method.cc",
"nanopb/nanopb_method_test.cc",
"nanopb/public/pw_rpc/echo_service_nanopb.h",
+ "nanopb/public/pw_rpc/nanopb_client_call.h",
+ "nanopb/public/pw_rpc/internal/nanopb_common.h",
"nanopb/public/pw_rpc/internal/nanopb_method.h",
"nanopb/public/pw_rpc/internal/service_method_traits.h",
"nanopb/public/pw_rpc/test_method_context.h",
+ "nanopb/pw_rpc_nanopb_private/internal_test_utils.h",
"nanopb/service_method_traits_test.cc",
"nanopb/test.pb.c",
"nanopb/test.pb.h",
@@ -98,6 +132,28 @@ pw_cc_test(
)
pw_cc_test(
+ name = "base_client_call_test",
+ srcs = [
+ "base_client_call_test.cc",
+ ],
+ deps = [
+ ":client",
+ ":internal_test_utils",
+ ],
+)
+
+pw_cc_test(
+ name = "client_test",
+ srcs = [
+ "client_test.cc",
+ ],
+ deps = [
+ ":client",
+ ":internal_test_utils",
+ ],
+)
+
+pw_cc_test(
name = "channel_test",
srcs = ["channel_test.cc"],
deps = [
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index fbf3a7442..3ec6c520f 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -28,30 +28,19 @@ config("default_config") {
pw_source_set("server") {
public_configs = [ ":default_config" ]
- public_deps = [
- ":protos_pwpb",
- "$dir_pw_containers:intrusive_list",
- dir_pw_assert,
- dir_pw_span,
- dir_pw_status,
- ]
+ public_deps = [ ":common" ]
deps = [ dir_pw_log ]
public = [
- "public/pw_rpc/channel.h",
"public/pw_rpc/server.h",
"public/pw_rpc/server_context.h",
"public/pw_rpc/service.h",
]
sources = [
"base_server_writer.cc",
- "channel.cc",
- "packet.cc",
"public/pw_rpc/internal/base_server_writer.h",
"public/pw_rpc/internal/call.h",
- "public/pw_rpc/internal/channel.h",
"public/pw_rpc/internal/hash.h",
"public/pw_rpc/internal/method.h",
- "public/pw_rpc/internal/packet.h",
"public/pw_rpc/internal/server.h",
"server.cc",
"service.cc",
@@ -59,6 +48,43 @@ pw_source_set("server") {
friend = [ "./*" ]
}
+pw_source_set("client") {
+ public_configs = [ ":default_config" ]
+ public_deps = [ ":common" ]
+ deps = [ dir_pw_log ]
+ public = [
+ "public/pw_rpc/client.h",
+ "public/pw_rpc/internal/base_client_call.h",
+ ]
+ sources = [
+ "base_client_call.cc",
+ "client.cc",
+ ]
+}
+
+# Classes shared by the server and client.
+pw_source_set("common") {
+ public_configs = [ ":default_config" ]
+ public_deps = [
+ ":protos_pwpb",
+ "$dir_pw_containers:intrusive_list",
+ dir_pw_assert,
+ dir_pw_bytes,
+ dir_pw_span,
+ dir_pw_status,
+ ]
+ deps = [ dir_pw_log ]
+ public = [ "public/pw_rpc/channel.h" ]
+ sources = [
+ "channel.cc",
+ "packet.cc",
+ "public/pw_rpc/internal/channel.h",
+ "public/pw_rpc/internal/method_type.h",
+ "public/pw_rpc/internal/packet.h",
+ ]
+ friend = [ "./*" ]
+}
+
pw_source_set("test_utils") {
public = [
"public/pw_rpc/internal/test_method.h",
@@ -66,6 +92,7 @@ pw_source_set("test_utils") {
]
public_configs = [ ":private_includes" ]
public_deps = [
+ ":client",
":server",
dir_pw_span,
]
@@ -145,8 +172,10 @@ pw_size_report("server_size") {
pw_test_group("tests") {
tests = [
+ ":base_client_call_test",
":base_server_writer_test",
":channel_test",
+ ":client_test",
":packet_test",
":server_test",
":service_test",
@@ -156,6 +185,7 @@ pw_test_group("tests") {
pw_proto_library("test_protos") {
sources = [ "pw_rpc_test_protos/test.proto" ]
+ inputs = [ "pw_rpc_test_protos/test.options" ]
visibility = [ "./*" ]
}
@@ -198,6 +228,22 @@ pw_test("service_test") {
sources = [ "service_test.cc" ]
}
+pw_test("client_test") {
+ deps = [
+ ":client",
+ ":test_utils",
+ ]
+ sources = [ "client_test.cc" ]
+}
+
+pw_test("base_client_call_test") {
+ deps = [
+ ":client",
+ ":test_utils",
+ ]
+ sources = [ "base_client_call_test.cc" ]
+}
+
pw_test("server_test") {
deps = [
":protos_pwpb",
diff --git a/pw_rpc/base_client_call.cc b/pw_rpc/base_client_call.cc
new file mode 100644
index 000000000..91dd28f5a
--- /dev/null
+++ b/pw_rpc/base_client_call.cc
@@ -0,0 +1,44 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/internal/base_client_call.h"
+
+#include "pw_rpc/client.h"
+
+namespace pw::rpc::internal {
+
+void BaseClientCall::Cancel() {
+ channel_->Send(NewPacket(PacketType::CANCEL_SERVER_STREAM));
+}
+
+std::span<std::byte> BaseClientCall::AcquirePayloadBuffer() {
+ request_ = channel_->AcquireBuffer();
+ return request_.payload(NewPacket(PacketType::REQUEST));
+}
+
+Status BaseClientCall::ReleasePayloadBuffer(
+ std::span<const std::byte> payload) {
+ return channel_->Send(request_, NewPacket(PacketType::REQUEST, payload));
+}
+
+Packet BaseClientCall::NewPacket(PacketType type,
+ std::span<const std::byte> payload) const {
+ return Packet(type, channel_->id(), service_id_, method_id_, payload);
+}
+
+void BaseClientCall::Register() { channel_->client()->RegisterCall(*this); }
+
+void BaseClientCall::Unregister() { channel_->client()->RemoveCall(*this); }
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/base_client_call_test.cc b/pw_rpc/base_client_call_test.cc
new file mode 100644
index 000000000..44a32437f
--- /dev/null
+++ b/pw_rpc/base_client_call_test.cc
@@ -0,0 +1,72 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/internal/base_client_call.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc_private/internal_test_utils.h"
+
+namespace pw::rpc::internal {
+namespace {
+
+TEST(BaseClientCall, RegistersAndRemovesItselfFromClient) {
+ ClientContextForTest context;
+ EXPECT_EQ(context.client().active_calls(), 0u);
+
+ {
+ BaseClientCall call(&context.channel(),
+ context.kServiceId,
+ context.kMethodId,
+ [](BaseClientCall&, const Packet&) {});
+ EXPECT_EQ(context.client().active_calls(), 1u);
+ }
+
+ EXPECT_EQ(context.client().active_calls(), 0u);
+}
+
+class FakeClientCall : public BaseClientCall {
+ public:
+ constexpr FakeClientCall(rpc::Channel* channel,
+ uint32_t service_id,
+ uint32_t method_id,
+ ResponseHandler handler)
+ : BaseClientCall(channel, service_id, method_id, handler) {}
+
+ Status SendPacket(std::span<const std::byte> payload) {
+ std::span buffer = AcquirePayloadBuffer();
+ std::memcpy(buffer.data(), payload.data(), payload.size());
+ return ReleasePayloadBuffer(buffer.first(payload.size()));
+ }
+};
+
+TEST(BaseClientCall, SendsPacketWithPayload) {
+ ClientContextForTest context;
+ FakeClientCall call(&context.channel(),
+ context.kServiceId,
+ context.kMethodId,
+ [](BaseClientCall&, const Packet&) {});
+
+ constexpr std::byte payload[]{std::byte{0x08}, std::byte{0x39}};
+ call.SendPacket(payload);
+
+ EXPECT_EQ(context.output().packet_count(), 1u);
+ Packet packet = context.output().sent_packet();
+ EXPECT_EQ(packet.channel_id(), context.channel().id());
+ EXPECT_EQ(packet.service_id(), context.kServiceId);
+ EXPECT_EQ(packet.method_id(), context.kMethodId);
+ EXPECT_EQ(std::memcmp(packet.payload().data(), payload, sizeof(payload)), 0);
+}
+
+} // namespace
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/client.cc b/pw_rpc/client.cc
new file mode 100644
index 000000000..73b2f0010
--- /dev/null
+++ b/pw_rpc/client.cc
@@ -0,0 +1,92 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/client.h"
+
+#include "pw_log/log.h"
+#include "pw_rpc/internal/packet.h"
+
+namespace pw::rpc {
+namespace {
+
+using internal::BaseClientCall;
+using internal::Packet;
+using internal::PacketType;
+
+} // namespace
+
+Status Client::ProcessPacket(ConstByteSpan data) {
+ Packet packet;
+
+ if (Status status = Packet::FromBuffer(data, packet); !status.ok()) {
+ PW_LOG_WARN("RPC client failed to decode incoming packet");
+ return Status::DataLoss();
+ }
+
+ if (packet.destination() != Packet::kClient) {
+ return Status::InvalidArgument();
+ }
+
+ if (packet.channel_id() == Channel::kUnassignedChannelId ||
+ packet.service_id() == 0 || packet.method_id() == 0) {
+ PW_LOG_WARN("RPC client received a malformed packet");
+ return Status::DataLoss();
+ }
+
+ auto call = std::find_if(calls_.begin(), calls_.end(), [&](auto& c) {
+ return c.channel().id() == packet.channel_id() &&
+ c.service_id() == packet.service_id() &&
+ c.method_id() == packet.method_id();
+ });
+
+ if (call == calls_.end()) {
+ PW_LOG_WARN("RPC client received a packet for a request it did not make");
+ return Status::NotFound();
+ }
+
+ switch (packet.type()) {
+ case PacketType::RESPONSE:
+ call->HandleResponse(packet);
+ break;
+ case PacketType::SERVER_STREAM_END:
+ call->HandleResponse(packet);
+ RemoveCall(*call);
+ break;
+ case PacketType::SERVER_ERROR:
+ // TODO(frolv): Handle errors.
+ break;
+ default:
+ return Status::Unimplemented();
+ }
+
+ return Status::Ok();
+}
+
+Status Client::RegisterCall(BaseClientCall& call) {
+ auto existing_call = std::find_if(calls_.begin(), calls_.end(), [&](auto& c) {
+ return c.channel().id() == call.channel().id() &&
+ c.service_id() == call.service_id() &&
+ c.method_id() == call.method_id();
+ });
+ if (existing_call != calls_.end()) {
+ PW_LOG_WARN(
+ "RPC client tried to call same method multiple times; aborting.");
+ return Status::FailedPrecondition();
+ }
+
+ calls_.push_front(call);
+ return Status::Ok();
+}
+
+} // namespace pw::rpc
diff --git a/pw_rpc/client_test.cc b/pw_rpc/client_test.cc
new file mode 100644
index 000000000..13a7cbe42
--- /dev/null
+++ b/pw_rpc/client_test.cc
@@ -0,0 +1,78 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/client.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc/internal/packet.h"
+#include "pw_rpc_private/internal_test_utils.h"
+
+namespace pw::rpc {
+namespace {
+
+using internal::BaseClientCall;
+using internal::Packet;
+using internal::PacketType;
+
+class TestClientCall : public BaseClientCall {
+ public:
+ constexpr TestClientCall(Channel* channel,
+ uint32_t service_id,
+ uint32_t method_id)
+ : BaseClientCall(channel, service_id, method_id, ProcessPacket) {}
+
+ static void ProcessPacket(BaseClientCall& call, const Packet& packet) {
+ static_cast<TestClientCall&>(call).HandlePacket(packet);
+ }
+
+ void HandlePacket(const Packet&) { invoked_ = true; }
+
+ constexpr bool invoked() const { return invoked_; }
+
+ private:
+ bool invoked_ = false;
+};
+
+TEST(Client, ProcessPacket_InvokesARegisteredClientCall) {
+ ClientContextForTest context;
+
+ TestClientCall call(
+ &context.channel(), context.kServiceId, context.kMethodId);
+ EXPECT_EQ(context.SendResponse(Status::Ok(), {}), Status::Ok());
+
+ EXPECT_TRUE(call.invoked());
+}
+
+TEST(Client, ProcessPacket_ReturnsNotFoundOnUnregisteredCall) {
+ ClientContextForTest context;
+ EXPECT_EQ(context.SendResponse(Status::OK, {}), Status::NotFound());
+}
+
+TEST(Client, ProcessPacket_ReturnsDataLossOnBadPacket) {
+ ClientContextForTest context;
+
+ constexpr std::byte bad_packet[]{
+ std::byte{0xab}, std::byte{0xcd}, std::byte{0xef}};
+ EXPECT_EQ(context.client().ProcessPacket(bad_packet), Status::DataLoss());
+}
+
+TEST(Client, ProcessPacket_ReturnsInvalidArgumentOnServerPacket) {
+ ClientContextForTest context;
+ EXPECT_EQ(context.SendPacket(PacketType::REQUEST), Status::InvalidArgument());
+ EXPECT_EQ(context.SendPacket(PacketType::CANCEL_SERVER_STREAM),
+ Status::InvalidArgument());
+}
+
+} // namespace
+} // namespace pw::rpc
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 520d03025..807663989 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -23,6 +23,11 @@ user-defined RPCs are implemented.
``pw_rpc`` supports multiple protobuf libraries, and the generated code API
depends on which is used.
+.. _pw-rpc-protobuf-apis:
+
+Protobuf library APIs
+=====================
+
.. toctree::
:maxdepth: 1
@@ -533,3 +538,67 @@ Responses
method -> server -> channel;
channel -> packets [folded];
}
+
+RPC client
+==========
+The RPC client is used to send requests to a server and manages the contexts of
+ongoing RPCs.
+
+Setting up a client
+-------------------
+The ``pw::rpc::Client`` class is instantiated with a list of channels that it
+uses to communicate. These channels can be shared with a server, but multiple
+clients cannot use the same channels.
+
+To send incoming RPC packets from the transport layer to be processed by a
+client, the client's ``ProcessPacket`` function is called with the packet data.
+
+.. code:: c++
+
+ #include "pw_rpc/client.h"
+
+ namespace {
+
+ pw::rpc::Channel my_channels[] = {
+ pw::rpc::Channel::Create<1>(&my_channel_output)};
+ pw::rpc::Client my_client(my_channels);
+
+ } // namespace
+
+ // Called when the transport layer receives an RPC packet.
+ void ProcessRpcPacket(ConstByteSpan packet) {
+ my_client.ProcessPacket(packet);
+ }
+
+Making RPC calls
+----------------
+RPC calls are not made directly through the client, but using one of its
+registered channels instead. A service client class is generated from a .proto
+file for each selected protobuf library, which is then used to send RPC requests
+through a given channel. The API for this depends on the protobuf library;
+please refer to the :ref:`appropriate documentation <pw-rpc-protobuf-apis>`.
+Multiple service client implementations can exist simulatenously and share the
+same ``Client`` class.
+
+When a call is made, a ``pw::rpc::ClientCall`` object is returned to the caller.
+This object tracks the ongoing RPC call, and can be used to manage it. An RPC
+call is only active as long as its ``ClientCall`` object is alive.
+
+.. tip::
+ Use ``std::move`` when passing around ``ClientCall`` objects to keep RPCs
+ alive.
+
+Client implementation details
+-----------------------------
+
+The ClientCall class
+^^^^^^^^^^^^^^^^^^^^
+``ClientCall`` stores the context of an active RPC, and serves as the user's
+interface to the RPC client. The core RPC library provides a base ``ClientCall``
+class with common functionality, which is then extended for RPC client
+implementations tied to different protobuf libraries to provide convenient
+interfaces for working with RPCs.
+
+The RPC server stores a list of all of active ``ClientCall`` objects. When an
+incoming packet is recieved, it dispatches to one of its active calls, which
+then decodes the payload and presents it to the user.
diff --git a/pw_rpc/nanopb/BUILD.gn b/pw_rpc/nanopb/BUILD.gn
index a3d204ce6..3261e8a42 100644
--- a/pw_rpc/nanopb/BUILD.gn
+++ b/pw_rpc/nanopb/BUILD.gn
@@ -28,8 +28,28 @@ pw_source_set("method") {
public_configs = [ ":public" ]
public = [ "public/pw_rpc/internal/nanopb_method.h" ]
sources = [ "nanopb_method.cc" ]
- public_deps = [ "..:server" ]
+ public_deps = [
+ ":common",
+ "..:server",
+ ]
deps = [ dir_pw_log ]
+}
+
+pw_source_set("client") {
+ public_configs = [ ":public" ]
+ public_deps = [
+ ":common",
+ "..:client",
+ ]
+ public = [ "public/pw_rpc/nanopb_client_call.h" ]
+ sources = [ "nanopb_client_call.cc" ]
+}
+
+pw_source_set("common") {
+ public_deps = [ dir_pw_bytes ]
+ public_configs = [ ":public" ]
+ public = [ "public/pw_rpc/internal/nanopb_common.h" ]
+ sources = [ "nanopb_common.cc" ]
if (dir_pw_third_party_nanopb != "") {
public_deps += [ "$dir_pw_third_party/nanopb" ]
@@ -56,6 +76,14 @@ pw_source_set("test_method_context") {
]
}
+pw_source_set("internal_test_utils") {
+ public = [ "pw_rpc_nanopb_private/internal_test_utils.h" ]
+ public_deps = []
+ if (dir_pw_third_party_nanopb != "") {
+ public_deps += [ "$dir_pw_third_party/nanopb" ]
+ }
+}
+
pw_source_set("echo_service") {
public_configs = [ ":public" ]
public_deps = [ "..:echo_service_proto_nanopb_rpc" ]
@@ -68,6 +96,7 @@ pw_doc_group("docs") {
pw_test_group("tests") {
tests = [
+ ":client_call_test",
":codegen_test",
":echo_service_test",
":nanopb_method_test",
@@ -75,6 +104,17 @@ pw_test_group("tests") {
]
}
+pw_test("client_call_test") {
+ deps = [
+ ":client",
+ ":internal_test_utils",
+ "..:test_protos_nanopb",
+ "..:test_utils",
+ ]
+ sources = [ "nanopb_client_call_test.cc" ]
+ enable_if = dir_pw_third_party_nanopb != ""
+}
+
pw_test("codegen_test") {
deps = [
":test_method_context",
@@ -87,6 +127,7 @@ pw_test("codegen_test") {
pw_test("nanopb_method_test") {
deps = [
+ ":internal_test_utils",
":method",
"..:server",
"..:test_protos_nanopb",
diff --git a/pw_rpc/nanopb/docs.rst b/pw_rpc/nanopb/docs.rst
index 32f87fb0a..9ac3ab92f 100644
--- a/pw_rpc/nanopb/docs.rst
+++ b/pw_rpc/nanopb/docs.rst
@@ -151,3 +151,10 @@ Bidirectional streaming RPC
.. attention::
``pw_rpc`` does not yet support bidirectional streaming RPCs.
+
+Client-side
+-----------
+
+.. admonition:: TODO
+
+ Document the generated client interface
diff --git a/pw_rpc/nanopb/nanopb_client_call.cc b/pw_rpc/nanopb/nanopb_client_call.cc
new file mode 100644
index 000000000..8b4cc2d34
--- /dev/null
+++ b/pw_rpc/nanopb/nanopb_client_call.cc
@@ -0,0 +1,33 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/nanopb_client_call.h"
+
+namespace pw::rpc {
+namespace internal {
+
+Status BaseNanopbClientCall::SendRequest(const void* request_struct) {
+ std::span<std::byte> buffer = AcquirePayloadBuffer();
+
+ StatusWithSize sws = serde_.EncodeRequest(buffer, request_struct);
+ if (!sws.ok()) {
+ ReleasePayloadBuffer({});
+ return sws.status();
+ }
+
+ return ReleasePayloadBuffer(buffer.first(sws.size()));
+}
+
+} // namespace internal
+} // namespace pw::rpc
diff --git a/pw_rpc/nanopb/nanopb_client_call_test.cc b/pw_rpc/nanopb/nanopb_client_call_test.cc
new file mode 100644
index 000000000..f8374b717
--- /dev/null
+++ b/pw_rpc/nanopb/nanopb_client_call_test.cc
@@ -0,0 +1,273 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/nanopb_client_call.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc_nanopb_private/internal_test_utils.h"
+#include "pw_rpc_private/internal_test_utils.h"
+#include "pw_rpc_test_protos/test.pb.h"
+
+namespace pw::rpc {
+namespace {
+
+constexpr uint32_t kServiceId = 16;
+constexpr uint32_t kUnaryMethodId = 111;
+constexpr uint32_t kServerStreamingMethodId = 112;
+
+class FakeGeneratedServiceClient {
+ public:
+ static NanopbClientCall<UnaryResponseHandler<pw_rpc_test_TestResponse>>
+ TestRpc(Channel& channel,
+ const pw_rpc_test_TestRequest& request,
+ UnaryResponseHandler<pw_rpc_test_TestResponse>& callback) {
+ auto call = NanopbClientCall(&channel,
+ kServiceId,
+ kUnaryMethodId,
+ callback,
+ pw_rpc_test_TestRequest_fields,
+ pw_rpc_test_TestResponse_fields);
+ call.SendRequest(&request);
+ return call;
+ }
+
+ static NanopbClientCall<
+ ServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse>>
+ TestStreamRpc(Channel& channel,
+ const pw_rpc_test_TestRequest& request,
+ ServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse>&
+ callback) {
+ auto call = NanopbClientCall(&channel,
+ kServiceId,
+ kServerStreamingMethodId,
+ callback,
+ pw_rpc_test_TestRequest_fields,
+ pw_rpc_test_TestStreamResponse_fields);
+ call.SendRequest(&request);
+ return call;
+ }
+};
+
+// TODO(frolv): Maybe extract these into a utils header as it could be useful
+// for other tests.
+template <typename Response>
+class TestUnaryResponseHandler : public UnaryResponseHandler<Response> {
+ public:
+ void ReceivedResponse(Status status, const Response& response) override {
+ last_status_ = status;
+ last_response_ = response;
+ ++responses_received_;
+ }
+
+ constexpr Status last_status() const { return last_status_; }
+ constexpr const Response& last_response() const& { return last_response_; }
+ constexpr size_t responses_received() const { return responses_received_; }
+
+ private:
+ Status last_status_;
+ Response last_response_;
+ size_t responses_received_ = 0;
+};
+
+template <typename Response>
+class TestServerStreamingResponseHandler
+ : public ServerStreamingResponseHandler<Response> {
+ public:
+ void ReceivedResponse(const Response& response) override {
+ last_response_ = response;
+ ++responses_received_;
+ }
+
+ void Complete(Status status) override {
+ active_ = false;
+ status_ = status;
+ }
+
+ constexpr bool active() const { return active_; }
+ constexpr Status status() const { return status_; }
+ constexpr const Response& last_response() const& { return last_response_; }
+ constexpr size_t responses_received() const { return responses_received_; }
+
+ private:
+ Status status_;
+ Response last_response_;
+ size_t responses_received_ = 0;
+ bool active_ = true;
+};
+
+TEST(NanopbClientCall, Unary_SendsRequestPacket) {
+ ClientContextForTest context;
+ TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestRpc(
+ context.channel(), {.integer = 123, .status_code = 0}, handler);
+
+ EXPECT_EQ(context.output().packet_count(), 1u);
+ auto packet = context.output().sent_packet();
+ EXPECT_EQ(packet.channel_id(), context.channel().id());
+ EXPECT_EQ(packet.service_id(), kServiceId);
+ EXPECT_EQ(packet.method_id(), kUnaryMethodId);
+
+ PW_DECODE_PB(pw_rpc_test_TestRequest, sent_proto, packet.payload());
+ EXPECT_EQ(sent_proto.integer, 123);
+}
+
+TEST(NanopbClientCall, Unary_InvokesCallbackOnValidResponse) {
+ ClientContextForTest context;
+ TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestRpc(
+ context.channel(), {.integer = 123, .status_code = 0}, handler);
+
+ PW_ENCODE_PB(pw_rpc_test_TestResponse, response, .value = 42);
+ context.SendResponse(Status::Ok(), response);
+
+ ASSERT_EQ(handler.responses_received(), 1u);
+ EXPECT_EQ(handler.last_status(), Status::Ok());
+ EXPECT_EQ(handler.last_response().value, 42);
+}
+
+TEST(NanopbClientCall, Unary_DoesNothingOnInvalidResponse) {
+ ClientContextForTest context;
+ TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestRpc(
+ context.channel(), {.integer = 123, .status_code = 0}, handler);
+
+ constexpr std::byte bad_payload[]{
+ std::byte{0xab}, std::byte{0xcd}, std::byte{0xef}};
+ context.SendResponse(Status::Ok(), bad_payload);
+
+ EXPECT_EQ(handler.responses_received(), 0u);
+}
+
+TEST(NanopbClientCall, Unary_OnlyReceivesOneResponse) {
+ ClientContextForTest context;
+ TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestRpc(
+ context.channel(), {.integer = 123, .status_code = 0}, handler);
+
+ PW_ENCODE_PB(pw_rpc_test_TestResponse, r1, .value = 42);
+ context.SendResponse(Status::Unimplemented(), r1);
+ PW_ENCODE_PB(pw_rpc_test_TestResponse, r2, .value = 44);
+ context.SendResponse(Status::OutOfRange(), r2);
+ PW_ENCODE_PB(pw_rpc_test_TestResponse, r3, .value = 46);
+ context.SendResponse(Status::Internal(), r3);
+
+ EXPECT_EQ(handler.responses_received(), 1u);
+ EXPECT_EQ(handler.last_status(), Status::Unimplemented());
+ EXPECT_EQ(handler.last_response().value, 42);
+}
+
+TEST(NanopbClientCall, ServerStreaming_SendsRequestPacket) {
+ ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
+ context;
+ TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestStreamRpc(
+ context.channel(), {.integer = 71, .status_code = 0}, handler);
+
+ EXPECT_EQ(context.output().packet_count(), 1u);
+ auto packet = context.output().sent_packet();
+ EXPECT_EQ(packet.channel_id(), context.channel().id());
+ EXPECT_EQ(packet.service_id(), kServiceId);
+ EXPECT_EQ(packet.method_id(), kServerStreamingMethodId);
+
+ PW_DECODE_PB(pw_rpc_test_TestRequest, sent_proto, packet.payload());
+ EXPECT_EQ(sent_proto.integer, 71);
+}
+
+TEST(NanopbClientCall, ServerStreaming_InvokesCallbackOnValidResponse) {
+ ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
+ context;
+ TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestStreamRpc(
+ context.channel(), {.integer = 71, .status_code = 0}, handler);
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r1, .chunk = {}, .number = 11u);
+ context.SendResponse(Status::Ok(), r1);
+ EXPECT_TRUE(handler.active());
+ EXPECT_EQ(handler.responses_received(), 1u);
+ EXPECT_EQ(handler.last_response().number, 11u);
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r2, .chunk = {}, .number = 22u);
+ context.SendResponse(Status::Ok(), r2);
+ EXPECT_TRUE(handler.active());
+ EXPECT_EQ(handler.responses_received(), 2u);
+ EXPECT_EQ(handler.last_response().number, 22u);
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r3, .chunk = {}, .number = 33u);
+ context.SendResponse(Status::Ok(), r3);
+ EXPECT_TRUE(handler.active());
+ EXPECT_EQ(handler.responses_received(), 3u);
+ EXPECT_EQ(handler.last_response().number, 33u);
+}
+
+TEST(NanopbClientCall, ServerStreaming_ClosesOnFinish) {
+ ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
+ context;
+ TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestStreamRpc(
+ context.channel(), {.integer = 71, .status_code = 0}, handler);
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r1, .chunk = {}, .number = 11u);
+ context.SendResponse(Status::Ok(), r1);
+ EXPECT_TRUE(handler.active());
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r2, .chunk = {}, .number = 22u);
+ context.SendResponse(Status::Ok(), r2);
+ EXPECT_TRUE(handler.active());
+
+ // Close the stream.
+ context.SendPacket(internal::PacketType::SERVER_STREAM_END,
+ Status::NotFound());
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r3, .chunk = {}, .number = 33u);
+ context.SendResponse(Status::Ok(), r3);
+ EXPECT_FALSE(handler.active());
+
+ EXPECT_EQ(handler.responses_received(), 2u);
+}
+
+TEST(NanopbClientCall, ServerStreaming_IgnoresInvalidResponses) {
+ ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
+ context;
+ TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
+
+ auto call = FakeGeneratedServiceClient::TestStreamRpc(
+ context.channel(), {.integer = 71, .status_code = 0}, handler);
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r1, .chunk = {}, .number = 11u);
+ context.SendResponse(Status::Ok(), r1);
+ EXPECT_TRUE(handler.active());
+ EXPECT_EQ(handler.responses_received(), 1u);
+ EXPECT_EQ(handler.last_response().number, 11u);
+
+ constexpr std::byte bad_payload[]{
+ std::byte{0xab}, std::byte{0xcd}, std::byte{0xef}};
+ context.SendResponse(Status::Ok(), bad_payload);
+ EXPECT_EQ(handler.responses_received(), 1u);
+
+ PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r2, .chunk = {}, .number = 22u);
+ context.SendResponse(Status::Ok(), r2);
+ EXPECT_TRUE(handler.active());
+ EXPECT_EQ(handler.responses_received(), 2u);
+ EXPECT_EQ(handler.last_response().number, 22u);
+}
+
+} // namespace
+} // namespace pw::rpc
diff --git a/pw_rpc/nanopb/nanopb_common.cc b/pw_rpc/nanopb/nanopb_common.cc
new file mode 100644
index 000000000..d3339c6f4
--- /dev/null
+++ b/pw_rpc/nanopb/nanopb_common.cc
@@ -0,0 +1,55 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/internal/nanopb_common.h"
+
+#include "pb_decode.h"
+#include "pb_encode.h"
+
+namespace pw::rpc::internal {
+
+// Nanopb 3 uses pb_field_s and Nanopb 4 uses pb_msgdesc_s for fields. The
+// Nanopb version macro is difficult to use, so deduce the correct type from the
+// pb_decode function.
+template <typename DecodeFunction>
+struct NanopbTraits;
+
+template <typename FieldsType>
+struct NanopbTraits<bool(pb_istream_t*, FieldsType, void*)> {
+ using Fields = FieldsType;
+};
+
+using Fields = typename NanopbTraits<decltype(pb_decode)>::Fields;
+
+StatusWithSize NanopbMethodSerde::Encode(NanopbMessageDescriptor fields,
+ ByteSpan buffer,
+ const void* proto_struct) const {
+ auto output = pb_ostream_from_buffer(
+ reinterpret_cast<pb_byte_t*>(buffer.data()), buffer.size());
+ if (!pb_encode(&output, static_cast<Fields>(fields), proto_struct)) {
+ return StatusWithSize::Internal();
+ }
+
+ return StatusWithSize::Ok(output.bytes_written);
+}
+
+bool NanopbMethodSerde::Decode(NanopbMessageDescriptor fields,
+ void* proto_struct,
+ ConstByteSpan buffer) const {
+ auto input = pb_istream_from_buffer(
+ reinterpret_cast<const pb_byte_t*>(buffer.data()), buffer.size());
+ return pb_decode(&input, static_cast<Fields>(fields), proto_struct);
+}
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/nanopb/nanopb_method.cc b/pw_rpc/nanopb/nanopb_method.cc
index 20e307c09..72a0d7a78 100644
--- a/pw_rpc/nanopb/nanopb_method.cc
+++ b/pw_rpc/nanopb/nanopb_method.cc
@@ -20,42 +20,9 @@
#include "pw_rpc/internal/packet.h"
namespace pw::rpc::internal {
-namespace {
-
-// Nanopb 3 uses pb_field_s and Nanopb 4 uses pb_msgdesc_s for fields. The
-// Nanopb version macro is difficult to use, so deduce the correct type from the
-// pb_decode function.
-template <typename DecodeFunction>
-struct NanopbTraits;
-
-template <typename FieldsType>
-struct NanopbTraits<bool(pb_istream_t*, FieldsType, void*)> {
- using Fields = FieldsType;
-};
-
-using Fields = typename NanopbTraits<decltype(pb_decode)>::Fields;
-
-} // namespace
using std::byte;
-StatusWithSize NanopbMethod::EncodeResponse(const void* proto_struct,
- std::span<byte> buffer) const {
- auto output = pb_ostream_from_buffer(
- reinterpret_cast<pb_byte_t*>(buffer.data()), buffer.size());
- if (pb_encode(&output, static_cast<Fields>(response_fields_), proto_struct)) {
- return StatusWithSize(output.bytes_written);
- }
- return StatusWithSize::INTERNAL;
-}
-
-bool NanopbMethod::DecodeResponse(std::span<const byte> response,
- void* proto_struct) const {
- auto input = pb_istream_from_buffer(
- reinterpret_cast<const pb_byte_t*>(response.data()), response.size());
- return pb_decode(&input, static_cast<Fields>(response_fields_), proto_struct);
-}
-
void NanopbMethod::CallUnary(ServerCall& call,
const Packet& request,
void* request_struct,
@@ -82,10 +49,7 @@ void NanopbMethod::CallServerStreaming(ServerCall& call,
bool NanopbMethod::DecodeRequest(Channel& channel,
const Packet& request,
void* proto_struct) const {
- auto input = pb_istream_from_buffer(
- reinterpret_cast<const pb_byte_t*>(request.payload().data()),
- request.payload().size());
- if (pb_decode(&input, static_cast<Fields>(request_fields_), proto_struct)) {
+ if (serde_.DecodeRequest(proto_struct, request.payload())) {
return true;
}
diff --git a/pw_rpc/nanopb/nanopb_method_test.cc b/pw_rpc/nanopb/nanopb_method_test.cc
index 4c964a4d2..31b23978d 100644
--- a/pw_rpc/nanopb/nanopb_method_test.cc
+++ b/pw_rpc/nanopb/nanopb_method_test.cc
@@ -17,9 +17,9 @@
#include <array>
#include "gtest/gtest.h"
-#include "pb_encode.h"
#include "pw_rpc/server_context.h"
#include "pw_rpc/service.h"
+#include "pw_rpc_nanopb_private/internal_test_utils.h"
#include "pw_rpc_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pb.h"
@@ -28,25 +28,6 @@ namespace {
using std::byte;
-#define ENCODE_PB(proto, result, ...) \
- _ENCODE_PB_EXPAND(proto, result, __LINE__, __VA_ARGS__)
-
-#define _ENCODE_PB_EXPAND(proto, result, unique, ...) \
- _ENCODE_PB_IMPL(proto, result, unique, __VA_ARGS__)
-
-#define _ENCODE_PB_IMPL(proto, result, unique, ...) \
- std::array<pb_byte_t, 2 * sizeof(proto)> _pb_buffer_##unique{}; \
- const std::span result = EncodeProtobuf<proto, proto##_fields>( \
- proto{__VA_ARGS__}, _pb_buffer_##unique)
-
-template <typename T, auto fields>
-std::span<const byte> EncodeProtobuf(const T& protobuf,
- std::span<pb_byte_t> buffer) {
- auto output = pb_ostream_from_buffer(buffer.data(), buffer.size());
- EXPECT_TRUE(pb_encode(&output, fields, &protobuf));
- return std::as_bytes(buffer.first(output.bytes_written));
-}
-
template <typename Implementation>
class FakeGeneratedService : public Service {
public:
@@ -114,7 +95,8 @@ class FakeGeneratedServiceImpl
};
TEST(NanopbMethod, UnaryRpc_SendsResponse) {
- ENCODE_PB(pw_rpc_test_TestRequest, request, .integer = 123, .status_code = 0);
+ PW_ENCODE_PB(
+ pw_rpc_test_TestRequest, request, .integer = 123, .status_code = 0);
const NanopbMethod& method = std::get<1>(FakeGeneratedServiceImpl::kMethods);
ServerContextForTest<FakeGeneratedServiceImpl> context(method);
@@ -150,7 +132,7 @@ TEST(NanopbMethod, UnaryRpc_InvalidPayload_SendsError) {
TEST(NanopbMethod, UnaryRpc_BufferTooSmallForResponse_SendsInternalError) {
constexpr int64_t value = 0x7FFFFFFF'FFFFFF00ll;
- ENCODE_PB(
+ PW_ENCODE_PB(
pw_rpc_test_TestRequest, request, .integer = value, .status_code = 0);
const NanopbMethod& method = std::get<1>(FakeGeneratedServiceImpl::kMethods);
@@ -171,7 +153,8 @@ TEST(NanopbMethod, UnaryRpc_BufferTooSmallForResponse_SendsInternalError) {
}
TEST(NanopbMethod, ServerStreamingRpc_SendsNothingWhenInitiallyCalled) {
- ENCODE_PB(pw_rpc_test_TestRequest, request, .integer = 555, .status_code = 0);
+ PW_ENCODE_PB(
+ pw_rpc_test_TestRequest, request, .integer = 555, .status_code = 0);
const NanopbMethod& method = std::get<2>(FakeGeneratedServiceImpl::kMethods);
ServerContextForTest<FakeGeneratedServiceImpl> context(method);
@@ -190,7 +173,7 @@ TEST(NanopbMethod, ServerWriter_SendsResponse) {
EXPECT_EQ(Status::OK, last_writer.Write({.value = 100}));
- ENCODE_PB(pw_rpc_test_TestResponse, payload, .value = 100);
+ PW_ENCODE_PB(pw_rpc_test_TestResponse, payload, .value = 100);
std::array<byte, 128> encoded_response = {};
auto encoded = context.packet(payload).Encode(encoded_response);
ASSERT_EQ(Status::OK, encoded.status());
diff --git a/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_common.h b/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_common.h
new file mode 100644
index 000000000..de3146331
--- /dev/null
+++ b/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_common.h
@@ -0,0 +1,63 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_bytes/span.h"
+#include "pw_status/status_with_size.h"
+
+namespace pw::rpc::internal {
+
+// Use a void* to cover both Nanopb 3's pb_field_s and Nanopb 4's pb_msgdesc_s.
+using NanopbMessageDescriptor = const void*;
+
+// Serializer/deserializer for nanopb message request and response structs in an
+// RPC method.
+class NanopbMethodSerde {
+ public:
+ constexpr NanopbMethodSerde(NanopbMessageDescriptor request_fields,
+ NanopbMessageDescriptor response_fields)
+ : request_fields_(request_fields), response_fields_(response_fields) {}
+
+ StatusWithSize EncodeRequest(ByteSpan buffer,
+ const void* proto_struct) const {
+ return Encode(request_fields_, buffer, proto_struct);
+ }
+ StatusWithSize EncodeResponse(ByteSpan buffer,
+ const void* proto_struct) const {
+ return Encode(response_fields_, buffer, proto_struct);
+ }
+
+ bool DecodeRequest(void* proto_struct, ConstByteSpan buffer) const {
+ return Decode(request_fields_, proto_struct, buffer);
+ }
+ bool DecodeResponse(void* proto_struct, ConstByteSpan buffer) const {
+ return Decode(response_fields_, proto_struct, buffer);
+ }
+
+ private:
+ // Encodes a nanopb protobuf struct to serialized wire format.
+ StatusWithSize Encode(NanopbMessageDescriptor fields,
+ ByteSpan buffer,
+ const void* proto_struct) const;
+
+ // Decodes a serialized protobuf to a nanopb struct.
+ bool Decode(NanopbMessageDescriptor fields,
+ void* proto_struct,
+ ConstByteSpan buffer) const;
+
+ NanopbMessageDescriptor request_fields_;
+ NanopbMessageDescriptor response_fields_;
+};
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_method.h b/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_method.h
index 3d521b1e4..3a1508ca4 100644
--- a/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_method.h
+++ b/pw_rpc/nanopb/public/pw_rpc/internal/nanopb_method.h
@@ -21,6 +21,8 @@
#include "pw_rpc/internal/base_server_writer.h"
#include "pw_rpc/internal/method.h"
+#include "pw_rpc/internal/method_type.h"
+#include "pw_rpc/internal/nanopb_common.h"
#include "pw_rpc/server_context.h"
#include "pw_status/status.h"
#include "pw_status/status_with_size.h"
@@ -53,11 +55,6 @@ namespace internal {
class Packet;
-// Use a void* to cover both Nanopb 3's pb_field_s and Nanopb 4's pb_msgdesc_s.
-using NanopbMessageDescriptor = const void*;
-
-enum class Type { kUnary, kServerStreaming, kClientStreaming, kBidiStreaming };
-
// Templated false value for use in static_assert(false) statements.
template <typename...>
constexpr std::false_type kFalse{};
@@ -75,7 +72,7 @@ struct RpcTraits<Status (*)(ServerCall&, const RequestType&, ResponseType&)> {
using Request = RequestType;
using Response = ResponseType;
- static constexpr Type kType = Type::kUnary;
+ static constexpr MethodType kType = MethodType::kUnary;
static constexpr bool kServerStreaming = false;
static constexpr bool kClientStreaming = false;
};
@@ -87,7 +84,7 @@ struct RpcTraits<void (*)(
using Request = RequestType;
using Response = ResponseType;
- static constexpr Type kType = Type::kServerStreaming;
+ static constexpr MethodType kType = MethodType::kServerStreaming;
static constexpr bool kServerStreaming = true;
static constexpr bool kClientStreaming = false;
};
@@ -178,12 +175,16 @@ class NanopbMethod : public Method {
// Encodes a response protobuf with Nanopb to the provided buffer.
StatusWithSize EncodeResponse(const void* proto_struct,
- std::span<std::byte> buffer) const;
+ std::span<std::byte> buffer) const {
+ return serde_.EncodeResponse(buffer, proto_struct);
+ }
// Decodes a response protobuf with Nanopb to the provided buffer. For testing
// use.
bool DecodeResponse(std::span<const std::byte> response,
- void* proto_struct) const;
+ void* proto_struct) const {
+ return serde_.DecodeResponse(proto_struct, response);
+ }
private:
// Generic version of the unary RPC function signature:
@@ -223,10 +224,7 @@ class NanopbMethod : public Method {
Function function,
NanopbMessageDescriptor request,
NanopbMessageDescriptor response)
- : Method(id, invoker),
- function_(function),
- request_fields_(request),
- response_fields_(response) {}
+ : Method(id, invoker), function_(function), serde_(request, response) {}
void CallUnary(ServerCall& call,
const Packet& request,
@@ -284,9 +282,8 @@ class NanopbMethod : public Method {
// Stores the user-defined RPC in a generic wrapper.
Function function_;
- // Pointers to the descriptors used to encode and decode Nanopb structs.
- NanopbMessageDescriptor request_fields_;
- NanopbMessageDescriptor response_fields_;
+ // Serde used to encode and decode Nanopb structs.
+ NanopbMethodSerde serde_;
};
} // namespace internal
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h b/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h
new file mode 100644
index 000000000..21b9297e1
--- /dev/null
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h
@@ -0,0 +1,167 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <new>
+
+#include "pw_bytes/span.h"
+#include "pw_rpc/internal/base_client_call.h"
+#include "pw_rpc/internal/method_type.h"
+#include "pw_rpc/internal/nanopb_common.h"
+#include "pw_status/status.h"
+
+namespace pw::rpc {
+
+// Response handler callback for unary RPC methods.
+template <typename Response>
+class UnaryResponseHandler {
+ public:
+ virtual ~UnaryResponseHandler() = default;
+
+ // Called when the response is received from the server with the method's
+ // status and the deserialized response struct.
+ virtual void ReceivedResponse(Status status, const Response& response) = 0;
+};
+
+// Response handler callbacks for server streaming RPC methods.
+template <typename Response>
+class ServerStreamingResponseHandler {
+ public:
+ virtual ~ServerStreamingResponseHandler() = default;
+
+ // Called on every response received from the server with the deserialized
+ // response struct.
+ virtual void ReceivedResponse(const Response& response) = 0;
+
+ // Called when the server ends the stream with the overall RPC status.
+ virtual void Complete(Status status) = 0;
+};
+
+namespace internal {
+
+// Non-templated nanopb base class providing protobuf encoding and decoding.
+class BaseNanopbClientCall : public BaseClientCall {
+ public:
+ Status SendRequest(const void* request_struct);
+
+ protected:
+ constexpr BaseNanopbClientCall(
+ rpc::Channel* channel,
+ uint32_t service_id,
+ uint32_t method_id,
+ ResponseHandler handler,
+ internal::NanopbMessageDescriptor request_fields,
+ internal::NanopbMessageDescriptor response_fields)
+ : BaseClientCall(channel, service_id, method_id, handler),
+ serde_(request_fields, response_fields) {}
+
+ constexpr const internal::NanopbMethodSerde& serde() const { return serde_; }
+
+ private:
+ internal::NanopbMethodSerde serde_;
+};
+
+template <typename Callback>
+struct CallbackTraits {};
+
+template <typename ResponseType>
+struct CallbackTraits<UnaryResponseHandler<ResponseType>> {
+ using Response = ResponseType;
+
+ static constexpr MethodType kType = MethodType::kUnary;
+};
+
+template <typename ResponseType>
+struct CallbackTraits<ServerStreamingResponseHandler<ResponseType>> {
+ using Response = ResponseType;
+
+ static constexpr MethodType kType = MethodType::kServerStreaming;
+};
+
+} // namespace internal
+
+template <typename Callback>
+class NanopbClientCall : public internal::BaseNanopbClientCall {
+ public:
+ constexpr NanopbClientCall(Channel* channel,
+ uint32_t service_id,
+ uint32_t method_id,
+ Callback& callback,
+ internal::NanopbMessageDescriptor request_fields,
+ internal::NanopbMessageDescriptor response_fields)
+ : BaseNanopbClientCall(channel,
+ service_id,
+ method_id,
+ &ResponseHandler,
+ request_fields,
+ response_fields),
+ callback_(callback) {}
+
+ private:
+ using Traits = internal::CallbackTraits<Callback>;
+ using Response = typename Traits::Response;
+
+ // Buffer into which the nanopb struct is decoded. Its contents are unknown,
+ // so it is aligned to maximum alignment to accommodate any type.
+ using ResponseBuffer =
+ std::aligned_storage_t<sizeof(Response), alignof(std::max_align_t)>;
+
+ friend class Client;
+
+ static void ResponseHandler(internal::BaseClientCall& call,
+ const internal::Packet& packet) {
+ static_cast<NanopbClientCall<Callback>&>(call).HandleResponse(packet);
+ }
+
+ void HandleResponse(const internal::Packet& packet) {
+ if constexpr (Traits::kType == internal::MethodType::kUnary) {
+ InvokeUnaryCallback(packet);
+ }
+ if constexpr (Traits::kType == internal::MethodType::kServerStreaming) {
+ InvokeServerStreamingCallback(packet);
+ }
+ }
+
+ void InvokeUnaryCallback(const internal::Packet& packet) {
+ ResponseBuffer response_struct{};
+
+ // TODO(frolv): Report an error to the caller if the decode fails.
+ if (serde().DecodeResponse(&response_struct, packet.payload())) {
+ callback_.ReceivedResponse(
+ packet.status(),
+ *std::launder(reinterpret_cast<Response*>(&response_struct)));
+ }
+
+ Unregister();
+ }
+
+ void InvokeServerStreamingCallback(const internal::Packet& packet) {
+ if (packet.type() == internal::PacketType::SERVER_STREAM_END) {
+ callback_.Complete(packet.status());
+ return;
+ }
+
+ ResponseBuffer response_struct{};
+
+ // TODO(frolv): Report an error to the caller if the decode fails.
+ if (serde().DecodeResponse(&response_struct, packet.payload())) {
+ callback_.ReceivedResponse(
+ *std::launder(reinterpret_cast<Response*>(&response_struct)));
+ }
+ }
+
+ Callback& callback_;
+};
+
+} // namespace pw::rpc
diff --git a/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h b/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h
new file mode 100644
index 000000000..5312463e6
--- /dev/null
+++ b/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h
@@ -0,0 +1,62 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <span>
+
+#include "pb_decode.h"
+#include "pb_encode.h"
+
+namespace pw::rpc::internal {
+
+// Encodes a protobuf to a local span named by result from a list of nanopb
+// struct initializers.
+//
+// PW_ENCODE_PB(pw_rpc_TestProto, encoded, .value = 42);
+//
+#define PW_ENCODE_PB(proto, result, ...) \
+ _PW_ENCODE_PB_EXPAND(proto, result, __LINE__, __VA_ARGS__)
+
+#define _PW_ENCODE_PB_EXPAND(proto, result, unique, ...) \
+ _PW_ENCODE_PB_IMPL(proto, result, unique, __VA_ARGS__)
+
+#define _PW_ENCODE_PB_IMPL(proto, result, unique, ...) \
+ std::array<pb_byte_t, 2 * sizeof(proto)> _pb_buffer_##unique{}; \
+ const std::span result = \
+ ::pw::rpc::internal::EncodeProtobuf<proto, proto##_fields>( \
+ proto{__VA_ARGS__}, _pb_buffer_##unique)
+
+template <typename T, auto fields>
+std::span<const std::byte> EncodeProtobuf(const T& protobuf,
+ std::span<pb_byte_t> buffer) {
+ auto output = pb_ostream_from_buffer(buffer.data(), buffer.size());
+ EXPECT_TRUE(pb_encode(&output, fields, &protobuf));
+ return std::as_bytes(buffer.first(output.bytes_written));
+}
+
+// Decodes a protobuf to a nanopb struct named by result.
+#define PW_DECODE_PB(proto, result, buffer) \
+ proto result; \
+ ::pw::rpc::internal::DecodeProtobuf<proto, proto##_fields>( \
+ std::span(reinterpret_cast<const pb_byte_t*>(buffer.data()), \
+ buffer.size()), \
+ result);
+
+template <typename T, auto fields>
+void DecodeProtobuf(std::span<const pb_byte_t> buffer, T& protobuf) {
+ auto input = pb_istream_from_buffer(buffer.data(), buffer.size());
+ EXPECT_TRUE(pb_decode(&input, fields, &protobuf));
+}
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/channel.h b/pw_rpc/public/pw_rpc/channel.h
index bcfbd21e3..094d269a1 100644
--- a/pw_rpc/public/pw_rpc/channel.h
+++ b/pw_rpc/public/pw_rpc/channel.h
@@ -20,6 +20,13 @@
#include "pw_status/status.h"
namespace pw::rpc {
+namespace internal {
+
+class BaseClientCall;
+
+} // namespace internal
+
+class Client;
class ChannelOutput {
public:
@@ -49,7 +56,8 @@ class Channel {
static constexpr uint32_t kUnassignedChannelId = 0;
// Creates a dynamically assignable channel without a set ID or output.
- constexpr Channel() : id_(kUnassignedChannelId), output_(nullptr) {}
+ constexpr Channel()
+ : id_(kUnassignedChannelId), output_(nullptr), client_(nullptr) {}
// Creates a channel with a static ID. The channel's output can also be
// static, or it can set to null to allow dynamically opening connections
@@ -65,7 +73,7 @@ class Channel {
protected:
constexpr Channel(uint32_t id, ChannelOutput* output)
- : id_(id), output_(output) {
+ : id_(id), output_(output), client_(nullptr) {
// TODO(pwbug/246): Use PW_ASSERT when that is available.
// PW_ASSERT(id != kUnassignedChannelId);
}
@@ -77,8 +85,15 @@ class Channel {
}
private:
+ friend class internal::BaseClientCall;
+ friend class Client;
+
+ constexpr Client* client() const { return client_; }
+ constexpr void set_client(Client* client) { client_ = client; }
+
uint32_t id_;
ChannelOutput* output_;
+ Client* client_;
};
} // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/client.h b/pw_rpc/public/pw_rpc/client.h
new file mode 100644
index 000000000..6e7def15a
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/client.h
@@ -0,0 +1,62 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <cstddef>
+#include <span>
+
+#include "pw_bytes/span.h"
+#include "pw_rpc/internal/base_client_call.h"
+#include "pw_rpc/internal/channel.h"
+
+namespace pw::rpc {
+
+class Client {
+ public:
+ // Creates a client that uses a set of RPC channels. Channels can be shared
+ // between a client and a server, but not between multiple clients.
+ constexpr Client(std::span<Channel> channels)
+ : channels_(static_cast<internal::Channel*>(channels.data()),
+ channels.size()) {
+ for (Channel& channel : channels_) {
+ channel.set_client(this);
+ };
+ }
+
+ // Processes an incoming RPC packet. The packet may be an RPC response or a
+ // control packet, the result of which is processed in this function. Returns
+ // whether the packet was able to be processed:
+ //
+ // OK - The packet was processed by the client.
+ // DATA_LOSS - Failed to decode the packet.
+ // INVALID_ARGUMENT - The packet is intended for a server, not a client.
+ // NOT_FOUND - The packet belongs to an unknown RPC call.
+ // UNIMPLEMENTED - Received a type of packet that the client doesn't know
+ // how to handle.
+ //
+ Status ProcessPacket(ConstByteSpan data);
+
+ size_t active_calls() const { return calls_.size(); }
+
+ private:
+ friend class internal::BaseClientCall;
+
+ Status RegisterCall(internal::BaseClientCall& call);
+ void RemoveCall(const internal::BaseClientCall& call) { calls_.remove(call); }
+
+ std::span<internal::Channel> channels_;
+ IntrusiveList<internal::BaseClientCall> calls_;
+};
+
+} // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/internal/base_client_call.h b/pw_rpc/public/pw_rpc/internal/base_client_call.h
new file mode 100644
index 000000000..275d870ec
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/base_client_call.h
@@ -0,0 +1,82 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_assert/assert.h"
+#include "pw_containers/intrusive_list.h"
+#include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/packet.h"
+#include "pw_status/status.h"
+
+namespace pw::rpc::internal {
+
+// Base class representing an active client-side RPC call. Implementations
+// derive from this class and provide a packet handler function which is
+// called with a reference to the ClientCall object and the received packet.
+class BaseClientCall : public IntrusiveList<BaseClientCall>::Item {
+ public:
+ using ResponseHandler = void (*)(BaseClientCall&, const Packet&);
+
+ constexpr BaseClientCall(rpc::Channel* channel,
+ uint32_t service_id,
+ uint32_t method_id,
+ ResponseHandler handler)
+ : channel_(static_cast<Channel*>(channel)),
+ service_id_(service_id),
+ method_id_(method_id),
+ handler_(handler) {
+ // TODO(pwbug/246): Use PW_ASSERT when that is available.
+ // PW_ASSERT(channel_ != nullptr);
+
+ Register();
+ }
+
+ ~BaseClientCall() { Unregister(); }
+
+ BaseClientCall(const BaseClientCall&) = delete;
+ BaseClientCall& operator=(const BaseClientCall&) = delete;
+
+ BaseClientCall(BaseClientCall&& other) { *this = std::move(other); }
+ BaseClientCall& operator=(BaseClientCall&& other);
+
+ void Cancel();
+
+ protected:
+ constexpr Channel& channel() const { return *channel_; }
+ constexpr uint32_t service_id() const { return service_id_; }
+ constexpr uint32_t method_id() const { return method_id_; }
+
+ std::span<std::byte> AcquirePayloadBuffer();
+ Status ReleasePayloadBuffer(std::span<const std::byte> payload);
+
+ void Unregister();
+
+ private:
+ friend class rpc::Client;
+
+ void Register();
+
+ void HandleResponse(const Packet& packet) { handler_(*this, packet); }
+
+ Packet NewPacket(PacketType type,
+ std::span<const std::byte> payload = {}) const;
+
+ Channel* channel_;
+ uint32_t service_id_;
+ uint32_t method_id_;
+ Channel::OutputBuffer request_;
+ ResponseHandler handler_;
+};
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/method_type.h b/pw_rpc/public/pw_rpc/internal/method_type.h
new file mode 100644
index 000000000..9a44e7a12
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/method_type.h
@@ -0,0 +1,25 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+namespace pw::rpc::internal {
+
+enum class MethodType {
+ kUnary,
+ kServerStreaming,
+ kClientStreaming,
+ kBidirectionalStreaming,
+};
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/pw_rpc_private/internal_test_utils.h b/pw_rpc/pw_rpc_private/internal_test_utils.h
index e58519186..c3f958de5 100644
--- a/pw_rpc/pw_rpc_private/internal_test_utils.h
+++ b/pw_rpc/pw_rpc_private/internal_test_utils.h
@@ -21,6 +21,7 @@
#include <cstdint>
#include <span>
+#include "pw_rpc/client.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
@@ -121,4 +122,46 @@ class ServerContextForTest {
internal::ServerCall context_;
};
+template <size_t output_buffer_size = 128,
+ size_t input_buffer_size = 128,
+ uint32_t channel_id = 99,
+ uint32_t service_id = 16,
+ uint32_t method_id = 111>
+class ClientContextForTest {
+ public:
+ static constexpr uint32_t kChannelId = channel_id;
+ static constexpr uint32_t kServiceId = service_id;
+ static constexpr uint32_t kMethodId = method_id;
+
+ ClientContextForTest()
+ : channel_(Channel::Create<kChannelId>(&output_)),
+ client_(std::span(&channel_, 1)) {}
+
+ const auto& output() const { return output_; }
+ Channel& channel() { return channel_; }
+ Client& client() { return client_; }
+
+ // Sends a packet to be processed by the client. Returns the client's
+ // ProcessPacket status.
+ Status SendPacket(internal::PacketType type,
+ Status status = Status::OK,
+ std::span<const std::byte> payload = {}) {
+ internal::Packet packet(
+ type, kChannelId, kServiceId, kMethodId, payload, status);
+ std::byte buffer[input_buffer_size];
+ StatusWithSize sws = packet.Encode(buffer);
+ EXPECT_EQ(sws.status(), Status::OK);
+ return client_.ProcessPacket(std::span(buffer, sws.size()));
+ }
+
+ Status SendResponse(Status status, std::span<const std::byte> payload) {
+ return SendPacket(internal::PacketType::RESPONSE, status, payload);
+ }
+
+ private:
+ TestOutput<output_buffer_size> output_;
+ Channel channel_;
+ Client client_;
+};
+
} // namespace pw::rpc
diff --git a/pw_rpc/pw_rpc_test_protos/test.options b/pw_rpc/pw_rpc_test_protos/test.options
new file mode 100644
index 000000000..a10a02f95
--- /dev/null
+++ b/pw_rpc/pw_rpc_test_protos/test.options
@@ -0,0 +1,15 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+pw.rpc.test.TestStreamResponse.chunk max_size:32