aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc')
-rw-r--r--pw_rpc/BUILD.bazel53
-rw-r--r--pw_rpc/BUILD.gn4
-rw-r--r--pw_rpc/CMakeLists.txt2
-rw-r--r--pw_rpc/README.md1
-rw-r--r--pw_rpc/call_test.cc11
-rw-r--r--pw_rpc/callback_test.cc2
-rw-r--r--pw_rpc/channel_test.cc2
-rw-r--r--pw_rpc/client_integration_test.cc20
-rw-r--r--pw_rpc/client_server_test.cc13
-rw-r--r--pw_rpc/docs.rst62
-rw-r--r--pw_rpc/endpoint.cc9
-rw-r--r--pw_rpc/fake_channel_output_test.cc2
-rw-r--r--pw_rpc/fuzz/BUILD.gn1
-rw-r--r--pw_rpc/fuzz/alarm_timer_test.cc52
-rw-r--r--pw_rpc/fuzz/argparse_test.cc2
-rw-r--r--pw_rpc/fuzz/client_fuzzer.cc19
-rw-r--r--pw_rpc/fuzz/engine_test.cc2
-rw-r--r--pw_rpc/integration_testing.cc9
-rw-r--r--pw_rpc/java/main/dev/pigweed/pw_rpc/BUILD.bazel67
-rw-r--r--pw_rpc/java/test/dev/pigweed/pw_rpc/BUILD.bazel4
-rw-r--r--pw_rpc/method_test.cc2
-rw-r--r--pw_rpc/nanopb/BUILD.bazel19
-rw-r--r--pw_rpc/nanopb/BUILD.gn54
-rw-r--r--pw_rpc/nanopb/callback_test.cc2
-rw-r--r--pw_rpc/nanopb/client_call_test.cc2
-rw-r--r--pw_rpc/nanopb/client_integration_test.cc2
-rw-r--r--pw_rpc/nanopb/client_reader_writer_test.cc2
-rw-r--r--pw_rpc/nanopb/client_server_context_test.cc2
-rw-r--r--pw_rpc/nanopb/client_server_context_threaded_test.cc2
-rw-r--r--pw_rpc/nanopb/codegen_test.cc2
-rw-r--r--pw_rpc/nanopb/echo_service_test.cc2
-rw-r--r--pw_rpc/nanopb/fake_channel_output_test.cc2
-rw-r--r--pw_rpc/nanopb/method_info_test.cc2
-rw-r--r--pw_rpc/nanopb/method_lookup_test.cc2
-rw-r--r--pw_rpc/nanopb/method_test.cc2
-rw-r--r--pw_rpc/nanopb/method_union_test.cc2
-rw-r--r--pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h4
-rw-r--r--pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h14
-rw-r--r--pw_rpc/nanopb/serde_test.cc2
-rw-r--r--pw_rpc/nanopb/server_callback_test.cc2
-rw-r--r--pw_rpc/nanopb/server_reader_writer_test.cc2
-rw-r--r--pw_rpc/nanopb/stub_generation_test.cc2
-rw-r--r--pw_rpc/nanopb/synchronous_call_test.cc2
-rw-r--r--pw_rpc/packet_meta_test.cc2
-rw-r--r--pw_rpc/packet_test.cc15
-rw-r--r--pw_rpc/public/pw_rpc/integration_test_socket_client.h17
-rw-r--r--pw_rpc/public/pw_rpc/integration_testing.h12
-rw-r--r--pw_rpc/public/pw_rpc/internal/call.h32
-rw-r--r--pw_rpc/public/pw_rpc/internal/config.h11
-rw-r--r--pw_rpc/public/pw_rpc/internal/endpoint.h15
-rw-r--r--pw_rpc/public/pw_rpc/internal/grpc.h21
-rw-r--r--pw_rpc/public/pw_rpc/internal/hash.h1
-rw-r--r--pw_rpc/public/pw_rpc/internal/method.h28
-rw-r--r--pw_rpc/public/pw_rpc/internal/method_impl_tester.h2
-rw-r--r--pw_rpc/public/pw_rpc/internal/packet.h9
-rw-r--r--pw_rpc/public/pw_rpc/internal/test_utils.h19
-rw-r--r--pw_rpc/public/pw_rpc/server.h20
-rw-r--r--pw_rpc/public/pw_rpc/writer.h36
-rw-r--r--pw_rpc/pw_rpc_private/test_method.h2
-rw-r--r--pw_rpc/pwpb/BUILD.bazel19
-rw-r--r--pw_rpc/pwpb/BUILD.gn60
-rw-r--r--pw_rpc/pwpb/client_call_test.cc2
-rw-r--r--pw_rpc/pwpb/client_integration_test.cc2
-rw-r--r--pw_rpc/pwpb/client_reader_writer_test.cc2
-rw-r--r--pw_rpc/pwpb/client_server_context_test.cc2
-rw-r--r--pw_rpc/pwpb/client_server_context_threaded_test.cc2
-rw-r--r--pw_rpc/pwpb/codegen_test.cc2
-rw-r--r--pw_rpc/pwpb/echo_service_test.cc2
-rw-r--r--pw_rpc/pwpb/fake_channel_output_test.cc2
-rw-r--r--pw_rpc/pwpb/method_info_test.cc2
-rw-r--r--pw_rpc/pwpb/method_lookup_test.cc2
-rw-r--r--pw_rpc/pwpb/method_test.cc2
-rw-r--r--pw_rpc/pwpb/method_union_test.cc2
-rw-r--r--pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h4
-rw-r--r--pw_rpc/pwpb/public/pw_rpc/pwpb/internal/method.h14
-rw-r--r--pw_rpc/pwpb/serde_test.cc2
-rw-r--r--pw_rpc/pwpb/server_callback_test.cc2
-rw-r--r--pw_rpc/pwpb/server_reader_writer_test.cc2
-rw-r--r--pw_rpc/pwpb/stub_generation_test.cc2
-rw-r--r--pw_rpc/pwpb/synchronous_call_test.cc2
-rw-r--r--pw_rpc/py/BUILD.bazel2
-rw-r--r--pw_rpc/py/pw_rpc/client.py9
-rw-r--r--pw_rpc/py/pw_rpc/descriptors.py32
-rw-r--r--pw_rpc/py/tests/descriptors_test.py13
-rwxr-xr-xpw_rpc/py/tests/ids_test.py2
-rw-r--r--pw_rpc/raw/BUILD.bazel11
-rw-r--r--pw_rpc/raw/BUILD.gn42
-rw-r--r--pw_rpc/raw/client_reader_writer_test.cc6
-rw-r--r--pw_rpc/raw/client_test.cc8
-rw-r--r--pw_rpc/raw/codegen_test.cc2
-rw-r--r--pw_rpc/raw/method_info_test.cc2
-rw-r--r--pw_rpc/raw/method_test.cc2
-rw-r--r--pw_rpc/raw/method_union_test.cc2
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h6
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/internal/method.h37
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h6
-rw-r--r--pw_rpc/raw/server_reader_writer_test.cc6
-rw-r--r--pw_rpc/raw/stub_generation_test.cc2
-rw-r--r--pw_rpc/raw/synchronous_call_test.cc2
-rw-r--r--pw_rpc/server.cc15
-rw-r--r--pw_rpc/server_test.cc77
-rw-r--r--pw_rpc/service_test.cc4
-rw-r--r--pw_rpc/system_server/BUILD.bazel11
-rw-r--r--pw_rpc/system_server/public/pw_rpc_system_server/socket.h8
-rw-r--r--pw_rpc/test_helpers_test.cc2
-rw-r--r--pw_rpc/ts/call.ts10
-rw-r--r--pw_rpc/ts/call_test.ts9
-rw-r--r--pw_rpc/ts/client.ts2
-rw-r--r--pw_rpc/ts/client_test.ts324
-rw-r--r--pw_rpc/ts/packets.ts10
-rw-r--r--pw_rpc/ts/packets_test.ts9
-rw-r--r--pw_rpc/ts/rpc_classes.ts85
112 files changed, 1157 insertions, 440 deletions
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index dca4569dd..50ff91565 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations under
# the License.
-load("@com_google_protobuf//:protobuf.bzl", "py_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
-load("//pw_build:pigweed.bzl", "pw_cc_library", "pw_cc_test")
+load("@rules_python//python:proto.bzl", "py_proto_library")
+load("//pw_build:pigweed.bzl", "pw_cc_test")
load("//pw_protobuf_compiler:pw_proto_library.bzl", "pw_proto_filegroup", "pw_proto_library")
package(default_visibility = ["//visibility:public"])
@@ -33,7 +33,7 @@ pw_proto_library(
deps = [":benchmark_proto"],
)
-pw_cc_library(
+cc_library(
name = "benchmark",
srcs = ["benchmark.cc"],
hdrs = ["public/pw_rpc/benchmark.h"],
@@ -55,14 +55,14 @@ filegroup(
# ],
)
-pw_cc_library(
+cc_library(
name = "client_server",
srcs = ["client_server.cc"],
hdrs = ["public/pw_rpc/client_server.h"],
deps = [":pw_rpc"],
)
-pw_cc_library(
+cc_library(
name = "pw_rpc",
srcs = [
"call.cc",
@@ -81,6 +81,7 @@ pw_cc_library(
"public/pw_rpc/internal/config.h",
"public/pw_rpc/internal/encoding_buffer.h",
"public/pw_rpc/internal/endpoint.h",
+ "public/pw_rpc/internal/grpc.h",
"public/pw_rpc/internal/hash.h",
"public/pw_rpc/internal/lock.h",
"public/pw_rpc/internal/log_config.h",
@@ -110,6 +111,7 @@ pw_cc_library(
],
includes = ["public"],
deps = [
+ ":config_override",
":internal_packet_cc.pwpb",
"//pw_assert",
"//pw_bytes",
@@ -127,7 +129,26 @@ pw_cc_library(
],
)
-pw_cc_library(
+label_flag(
+ name = "config_override",
+ build_setting_default = "//pw_build:default_module_config",
+)
+
+cc_library(
+ name = "completion_request_callback_config_enabled",
+ defines = [
+ "PW_RPC_COMPLETION_REQUEST_CALLBACK=1",
+ ],
+)
+
+config_setting(
+ name = "completion_request_callback_config_setting",
+ flag_values = {
+ ":config_override": ":completion_request_callback_config_enabled",
+ },
+)
+
+cc_library(
name = "synchronous_client_api",
srcs = ["public/pw_rpc/internal/synchronous_call_impl.h"],
hdrs = [
@@ -142,7 +163,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_server_testing",
hdrs = ["public/pw_rpc/internal/client_server_testing.h"],
includes = ["public"],
@@ -154,7 +175,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_server_testing_threaded",
hdrs = ["public/pw_rpc/internal/client_server_testing_threaded.h"],
includes = ["public"],
@@ -169,7 +190,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "test_helpers",
hdrs = ["public/pw_rpc/test_helpers.h"],
includes = ["public"],
@@ -186,14 +207,14 @@ pw_cc_library(
# thread_testing target is kept for backward compatibility.
# New code should use test_helpers instead.
-pw_cc_library(
+cc_library(
name = "thread_testing",
hdrs = ["public/pw_rpc/thread_testing.h"],
includes = ["public"],
deps = [":test_helpers"],
)
-pw_cc_library(
+cc_library(
name = "internal_test_utils",
srcs = ["fake_channel_output.cc"],
hdrs = [
@@ -224,8 +245,9 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "integration_testing",
+ testonly = True,
srcs = [
"integration_testing.cc",
],
@@ -236,7 +258,8 @@ pw_cc_library(
deps = [
":pw_rpc",
"//pw_assert",
- "//pw_hdlc:pw_rpc",
+ "//pw_hdlc",
+ "//pw_hdlc:default_addresses",
"//pw_hdlc:rpc_channel_output",
"//pw_log",
"//pw_stream:socket_stream",
@@ -386,7 +409,7 @@ java_lite_proto_library(
py_proto_library(
name = "internal_packet_proto_pb2",
- srcs = ["internal/packet.proto"],
+ deps = [":internal_packet_proto"],
)
pw_proto_library(
@@ -421,7 +444,7 @@ proto_library(
py_proto_library(
name = "echo_py_pb2",
- srcs = ["echo.proto"],
+ deps = [":echo_proto"],
)
pw_proto_library(
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 97a32cec4..11710a798 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -215,6 +215,7 @@ pw_source_set("common") {
"public/pw_rpc/internal/channel_list.h",
"public/pw_rpc/internal/encoding_buffer.h",
"public/pw_rpc/internal/endpoint.h",
+ "public/pw_rpc/internal/grpc.h",
"public/pw_rpc/internal/lock.h",
"public/pw_rpc/internal/method_info.h",
"public/pw_rpc/internal/packet.h",
@@ -329,7 +330,8 @@ pw_source_set("integration_testing") {
sources = [ "integration_testing.cc" ]
public_deps = [
":client",
- "$dir_pw_hdlc:pw_rpc",
+ "$dir_pw_hdlc:decoder",
+ "$dir_pw_hdlc:default_addresses",
"$dir_pw_hdlc:rpc_channel_output",
"$dir_pw_stream:socket_stream",
"$dir_pw_unit_test:logging",
diff --git a/pw_rpc/CMakeLists.txt b/pw_rpc/CMakeLists.txt
index 4d95c180d..678c86f51 100644
--- a/pw_rpc/CMakeLists.txt
+++ b/pw_rpc/CMakeLists.txt
@@ -266,7 +266,7 @@ pw_add_library(pw_rpc.integration_testing STATIC
PUBLIC_DEPS
pw_assert
pw_function
- pw_hdlc.pw_rpc
+ pw_hdlc.default_addresses
pw_hdlc.rpc_channel_output
pw_rpc.client
pw_stream.socket_stream
diff --git a/pw_rpc/README.md b/pw_rpc/README.md
deleted file mode 100644
index f86c91595..000000000
--- a/pw_rpc/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# pw\_rpc: Remote procedure calls
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index c3ae8be98..ed46e6e60 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -20,11 +20,11 @@
#include <cstring>
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/service.h"
#include "pw_rpc_private/fake_server_reader_writer.h"
#include "pw_rpc_private/test_method.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
@@ -38,7 +38,12 @@ class TestService : public Service {
namespace internal {
namespace {
-constexpr Packet kPacket(pwpb::PacketType::REQUEST, 99, 16, 8);
+constexpr uint32_t kChannelId = 99;
+constexpr uint32_t kServiceId = 16;
+constexpr uint32_t kMethodId = 8;
+constexpr uint32_t kCallId = 327;
+constexpr Packet kPacket(
+ pwpb::PacketType::REQUEST, kChannelId, kServiceId, kMethodId, kCallId);
using ::pw::rpc::internal::test::FakeServerReader;
using ::pw::rpc::internal::test::FakeServerReaderWriter;
@@ -90,7 +95,7 @@ class ServerWriterTest : public Test {
writer_ = std::move(writer_temp);
}
- ServerContextForTest<TestService> context_;
+ ServerContextForTest<TestService, kChannelId, kServiceId, kCallId> context_;
FakeServerWriter writer_;
};
diff --git a/pw_rpc/callback_test.cc b/pw_rpc/callback_test.cc
index 80e0cb4cc..e1b969451 100644
--- a/pw_rpc/callback_test.cc
+++ b/pw_rpc/callback_test.cc
@@ -12,7 +12,6 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/raw/client_testing.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
#include "pw_sync/binary_semaphore.h"
@@ -20,6 +19,7 @@
#include "pw_thread/sleep.h"
#include "pw_thread/thread.h"
#include "pw_thread/yield.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/channel_test.cc b/pw_rpc/channel_test.cc
index e96a7bd9b..a6c021a7b 100644
--- a/pw_rpc/channel_test.cc
+++ b/pw_rpc/channel_test.cc
@@ -16,9 +16,9 @@
#include <cstddef>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/internal/test_utils.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/client_integration_test.cc b/pw_rpc/client_integration_test.cc
index 8f1021176..e8ecc0fd7 100644
--- a/pw_rpc/client_integration_test.cc
+++ b/pw_rpc/client_integration_test.cc
@@ -12,28 +12,22 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include <sys/socket.h>
-
#include <algorithm>
#include <array>
#include <cstring>
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_rpc/benchmark.raw_rpc.pb.h"
#include "pw_rpc/integration_testing.h"
#include "pw_sync/binary_semaphore.h"
+#include "pw_unit_test/framework.h"
namespace rpc_test {
namespace {
constexpr int kIterations = 3;
-// This client configures a socket read timeout to allow the RPC dispatch thread
-// to exit gracefully.
-constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
-
using namespace std::chrono_literals;
using pw::ByteSpan;
using pw::ConstByteSpan;
@@ -145,18 +139,6 @@ int main(int argc, char* argv[]) {
return 1;
}
- // Set read timout on socket to allow
- // pw::rpc::integration_test::TerminateClient() to complete.
- int retval = setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
- SOL_SOCKET,
- SO_RCVTIMEO,
- &rpc_test::kSocketReadTimeout,
- sizeof(rpc_test::kSocketReadTimeout));
- PW_CHECK_INT_EQ(retval,
- 0,
- "Failed to configure socket receive timeout with errno=%d",
- errno);
-
int test_retval = RUN_ALL_TESTS();
pw::rpc::integration_test::TerminateClient();
diff --git a/pw_rpc/client_server_test.cc b/pw_rpc/client_server_test.cc
index bb2c72b0d..5f40faa3c 100644
--- a/pw_rpc/client_server_test.cc
+++ b/pw_rpc/client_server_test.cc
@@ -14,12 +14,12 @@
#include "pw_rpc/client_server.h"
-#include "gtest/gtest.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/raw/fake_channel_output.h"
#include "pw_rpc/raw/internal/method_union.h"
#include "pw_rpc/service.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
@@ -27,6 +27,7 @@ namespace {
constexpr uint32_t kFakeChannelId = 1;
constexpr uint32_t kFakeServiceId = 3;
constexpr uint32_t kFakeMethodId = 10;
+constexpr uint32_t kFakeCallId = 239;
RawFakeChannelOutput<1> output;
rpc::Channel channels[] = {Channel::Create<kFakeChannelId>(&output)};
@@ -50,8 +51,11 @@ TEST(ClientServer, ProcessPacket_CallsServer) {
ClientServer client_server(channels);
client_server.server().RegisterService(service);
- Packet packet(
- pwpb::PacketType::REQUEST, kFakeChannelId, kFakeServiceId, kFakeMethodId);
+ Packet packet(pwpb::PacketType::REQUEST,
+ kFakeChannelId,
+ kFakeServiceId,
+ kFakeMethodId,
+ kFakeCallId);
std::array<std::byte, 32> buffer;
Result result = packet.Encode(buffer);
EXPECT_EQ(result.status(), OkStatus());
@@ -68,7 +72,8 @@ TEST(ClientServer, ProcessPacket_CallsClient) {
Packet packet(pwpb::PacketType::RESPONSE,
kFakeChannelId,
kFakeServiceId,
- kFakeMethodId);
+ kFakeMethodId,
+ kFakeCallId);
std::array<std::byte, 32> buffer;
Result result = packet.Encode(buffer);
EXPECT_EQ(result.status(), OkStatus());
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 093e52a78..311c8f96d 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -76,7 +76,9 @@ In ``pw_rpc``, an RPC begins when the client sends an initial packet. The server
receives the packet, looks up the relevant service method, then calls into the
RPC function. The RPC is considered active until the server sends a status to
finish the RPC. The client may terminate an ongoing RPC by cancelling it.
-Multiple RPC requests to the same method may be made simultaneously.
+Multiple concurrent RPC requests to the same method may be made simultaneously
+(Note: Concurrent requests are not yet possible using the Java client. See
+`Issue 237418397 <https://issues.pigweed.dev/issues/237418397>`_).
Depending the type of RPC, the client and server exchange zero or more protobuf
request or response payloads. There are four RPC types:
@@ -147,6 +149,26 @@ appropriate reader/writer class must be used.
// Finish the RPC.
CHECK_OK(writer.Finish(OkStatus()));
+Errata
+------
+Prior to support for concurrent requests to a single method, no identifier was
+present to distinguish different calls to the same method. When a "call ID"
+feature was first introduced to solve this issue, existing clients and servers
+(1) set this value to zero and (2) ignored this value.
+
+When initial support for concurrent methods was added, a separate
+"open call ID" was introduced to distinguish unrequested responses. However,
+legacy servers built prior to this change continue to send unrequested
+responses with call ID zero. Prior to
+`this fix <https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/192311>`,
+clients which used "open call ID" would not accept unrequested responses from
+legacy servers. Clients built after that change will accept unrequested
+responses which use both "open call ID" and call ID zero.
+
+See
+`Issue 237418397 <https://issues.pigweed.dev/issues/237418397>`_
+for more details and discussion.
+
---------------
Creating an RPC
---------------
@@ -352,10 +374,27 @@ channel output and the example service.
RegisterServices();
// Declare a buffer for decoding incoming HDLC frames.
- std::array<std::byte, kMaxTransmissionUnit> input_buffer;
+ constexpr size_t kDecoderBufferSize =
+ pw::hdlc::Decoder::RequiredBufferSizeForFrameSize(kMaxTransmissionUnit);
+
+ std::array<std::byte, kDecoderBufferSize> input_buffer;
PW_LOG_INFO("Starting pw_rpc server");
- pw::hdlc::ReadAndProcessPackets(server, input_buffer);
+ pw::hdlc::Decoder decoder(input_buffer);
+
+ while (true) {
+ std::byte byte;
+ pw::Status ret_val = pw::sys_io::ReadByte(&byte);
+ if (!ret_val.ok()) {
+ return ret_val;
+ }
+ if (auto result = decoder.Process(byte); result.ok()) {
+ pw::hdlc::Frame& frame = result.value();
+ if (frame.address() == pw::hdlc::kDefaultRpcAddress) {
+ server.ProcessPacket(frame.data());
+ }
+ }
+ }
}
--------
@@ -731,7 +770,7 @@ status field indicates the type of error.
unrecoverable internal error.
* ``UNAVAILABLE`` -- Received a packet for an unknown channel.
-Inovking a service method
+Invoking a service method
=========================
Calling an RPC requires a specific sequence of packets. This section describes
the protocol for calling service methods of each type: unary, server streaming,
@@ -1744,7 +1783,8 @@ sharing code between servers and clients, ``pw_rpc`` provides the
streaming RPC call object (``ClientWriter`` or ``ClientReaderWriter``) can be
used as a ``pw::rpc::Writer&``. On the server side, a server or bidirectional
streaming RPC call object (``ServerWriter`` or ``ServerReaderWriter``) can be
-used as a ``pw::rpc::Writer&``.
+used as a ``pw::rpc::Writer&``. Call ``as_writer()`` to get a ``Writer&`` of the
+client or server call object.
Zephyr
======
@@ -1808,7 +1848,7 @@ interface.
No ``pw_rpc`` APIs may be accessed in this function! Implementations
MUST NOT access any RPC endpoints (:cpp:class:`pw::rpc::Client`,
:cpp:class:`pw::rpc::Server`) or call objects
- (:cpp:class:`pw::rpc::ServerReaderWriter`,
+ (:cpp:class:`pw::rpc::ServerReaderWriter`
:cpp:class:`pw::rpc::ClientReaderWriter`, etc.) inside the
:cpp:func:`Send` function or any descendent calls. Doing so will result
in deadlock! RPC APIs may be used by other threads, just not within
@@ -1817,3 +1857,13 @@ interface.
The buffer provided in ``packet`` must NOT be accessed outside of this
function. It must be sent immediately or copied elsewhere before the
function returns.
+
+Evolution
+=========
+Concurrent requests were not initially supported in pw_rpc (added in
+`C++ <https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/109077>`,
+`Python <https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/139610>`,
+and
+`TypeScript <https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/160792>`).
+As a result, some user-written service implementations may not expect or
+correctly support concurrent requests.
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
index dd6cd53c8..1959a8643 100644
--- a/pw_rpc/endpoint.cc
+++ b/pw_rpc/endpoint.cc
@@ -131,12 +131,17 @@ Endpoint::FindIteratorsForCall(uint32_t channel_id,
while (call != calls_.end()) {
if (channel_id == call->channel_id_locked() &&
service_id == call->service_id() && method_id == call->method_id()) {
- if (call_id == call->id() || call_id == kOpenCallId) {
+ if (call_id == call->id() || call_id == kOpenCallId ||
+ call_id == kLegacyOpenCallId) {
break;
}
- if (call->id() == kOpenCallId) {
+ if (call->id() == kOpenCallId || call->id() == kLegacyOpenCallId) {
// Calls with ID of `kOpenCallId` were unrequested, and
// are updated to have the call ID of the first matching request.
+ //
+ // kLegacyOpenCallId is used for compatibility with old servers
+ // which do not specify a Call ID but expect to be able to send
+ // unrequested responses.
call->set_id(call_id);
break;
}
diff --git a/pw_rpc/fake_channel_output_test.cc b/pw_rpc/fake_channel_output_test.cc
index 7d892545d..8c279370f 100644
--- a/pw_rpc/fake_channel_output_test.cc
+++ b/pw_rpc/fake_channel_output_test.cc
@@ -18,10 +18,10 @@
#include <cstddef>
#include <memory>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/lock.h"
#include "pw_rpc/internal/packet.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal::test {
namespace {
diff --git a/pw_rpc/fuzz/BUILD.gn b/pw_rpc/fuzz/BUILD.gn
index b2fd728b7..753d4cd1e 100644
--- a/pw_rpc/fuzz/BUILD.gn
+++ b/pw_rpc/fuzz/BUILD.gn
@@ -43,6 +43,7 @@ pw_test("alarm_timer_test") {
sources = [ "alarm_timer_test.cc" ]
deps = [
":alarm_timer",
+ "$dir_pw_chrono:system_clock",
"$dir_pw_sync:binary_semaphore",
]
}
diff --git a/pw_rpc/fuzz/alarm_timer_test.cc b/pw_rpc/fuzz/alarm_timer_test.cc
index a64156996..aac3ccb21 100644
--- a/pw_rpc/fuzz/alarm_timer_test.cc
+++ b/pw_rpc/fuzz/alarm_timer_test.cc
@@ -16,8 +16,9 @@
#include <chrono>
-#include "gtest/gtest.h"
+#include "pw_chrono/system_clock.h"
#include "pw_sync/binary_semaphore.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::fuzz {
namespace {
@@ -27,25 +28,56 @@ using namespace std::chrono_literals;
TEST(AlarmTimerTest, Start) {
sync::BinarySemaphore sem;
AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); });
- timer.Start(10ms);
+ timer.Start(chrono::SystemClock::for_at_least(10ms));
sem.acquire();
}
TEST(AlarmTimerTest, Restart) {
- sync::BinarySemaphore sem;
- AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); });
- timer.Start(50ms);
- for (size_t i = 0; i < 10; ++i) {
+ sync::BinarySemaphore final_sem;
+ sync::BinarySemaphore kick_sem;
+ constexpr auto kTimerDuration = 200ms;
+ constexpr auto kTimerKickDuration = 10ms;
+ constexpr size_t kNumRestarts = 10;
+ static_assert(kTimerKickDuration < kTimerDuration);
+
+ AlarmTimer timer(
+ [&final_sem](chrono::SystemClock::time_point) { final_sem.release(); });
+ AlarmTimer timer_kicker(
+ [&kick_sem](chrono::SystemClock::time_point) { kick_sem.release(); });
+
+ timer.Start(chrono::SystemClock::for_at_least(kTimerDuration));
+
+ bool acquired = false;
+ const auto start = chrono::SystemClock::now();
+ for (size_t i = 0; i < kNumRestarts; ++i) {
+ // Be overly aggressive with restarting the timer, the point is to ensure
+ // that it doesn't time out when restareted. Since this tests timings, it
+ // inherrently is very prone to flake in some environments (e.g. heavy load
+ // on a Windows machine).
+ timer.Restart();
+ timer_kicker.Start(chrono::SystemClock::for_at_least(kTimerKickDuration));
timer.Restart();
- EXPECT_FALSE(sem.try_acquire_for(chrono::SystemClock::for_at_least(10us)));
+ kick_sem.acquire();
+ timer.Restart();
+
+ acquired = final_sem.try_acquire();
+ EXPECT_FALSE(acquired);
+ if (acquired) {
+ break;
+ }
}
- sem.acquire();
+
+ if (!acquired) {
+ final_sem.acquire();
+ }
+ auto end = chrono::SystemClock::now();
+ EXPECT_GT(end - start, kTimerKickDuration * kNumRestarts + kTimerDuration);
}
TEST(AlarmTimerTest, Cancel) {
sync::BinarySemaphore sem;
AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); });
- timer.Start(50ms);
+ timer.Start(chrono::SystemClock::for_at_least(50ms));
timer.Cancel();
EXPECT_FALSE(sem.try_acquire_for(chrono::SystemClock::for_at_least(100us)));
}
@@ -55,7 +87,7 @@ TEST(AlarmTimerTest, Destroy) {
{
AlarmTimer timer(
[&sem](chrono::SystemClock::time_point) { sem.release(); });
- timer.Start(50ms);
+ timer.Start(chrono::SystemClock::for_at_least(50ms));
}
EXPECT_FALSE(sem.try_acquire_for(chrono::SystemClock::for_at_least(100us)));
}
diff --git a/pw_rpc/fuzz/argparse_test.cc b/pw_rpc/fuzz/argparse_test.cc
index 93adea4d6..a76656077 100644
--- a/pw_rpc/fuzz/argparse_test.cc
+++ b/pw_rpc/fuzz/argparse_test.cc
@@ -17,8 +17,8 @@
#include <cstdint>
#include <limits>
-#include "gtest/gtest.h"
#include "pw_string/string_builder.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::fuzz {
namespace {
diff --git a/pw_rpc/fuzz/client_fuzzer.cc b/pw_rpc/fuzz/client_fuzzer.cc
index c5086becf..d34b94af6 100644
--- a/pw_rpc/fuzz/client_fuzzer.cc
+++ b/pw_rpc/fuzz/client_fuzzer.cc
@@ -16,8 +16,6 @@
#include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
// clang-format on
-#include <sys/socket.h>
-
#include <cstring>
#include "pw_log/log.h"
@@ -28,10 +26,6 @@
namespace pw::rpc::fuzz {
namespace {
-// This client configures a socket read timeout to allow the RPC dispatch thread
-// to exit gracefully.
-constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
-
int FuzzClient(int argc, char** argv) {
// TODO(aarongreen): Incorporate descriptions into usage message.
Vector<ArgParserVariant, 5> parsers{
@@ -78,19 +72,6 @@ int FuzzClient(int argc, char** argv) {
return 1;
}
- // Set read timout on socket to allow
- // pw::rpc::integration_test::TerminateClient() to complete.
- int fd = integration_test::GetClientSocketFd();
- if (setsockopt(fd,
- SOL_SOCKET,
- SO_RCVTIMEO,
- &kSocketReadTimeout,
- sizeof(kSocketReadTimeout)) != 0) {
- PW_LOG_ERROR("Failed to configure socket receive timeout with errno=%d",
- errno);
- return 1;
- }
-
if (num_actions == 0) {
num_actions = std::numeric_limits<size_t>::max();
}
diff --git a/pw_rpc/fuzz/engine_test.cc b/pw_rpc/fuzz/engine_test.cc
index b8720fcb4..b3f2ce7f9 100644
--- a/pw_rpc/fuzz/engine_test.cc
+++ b/pw_rpc/fuzz/engine_test.cc
@@ -16,7 +16,6 @@
#include <chrono>
-#include "gtest/gtest.h"
#include "pw_containers/vector.h"
#include "pw_log/log.h"
#include "pw_rpc/benchmark.h"
@@ -24,6 +23,7 @@
#include "pw_rpc/internal/client_server_testing_threaded.h"
#include "pw_rpc/internal/fake_channel_output.h"
#include "pw_thread/non_portable_test_thread_options.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::fuzz {
namespace {
diff --git a/pw_rpc/integration_testing.cc b/pw_rpc/integration_testing.cc
index d0b18fada..94b4e9e8e 100644
--- a/pw_rpc/integration_testing.cc
+++ b/pw_rpc/integration_testing.cc
@@ -16,9 +16,9 @@
#include <limits>
-#include "gtest/gtest.h"
#include "pw_log/log.h"
#include "pw_rpc/integration_test_socket_client.h"
+#include "pw_unit_test/framework.h"
#include "pw_unit_test/logging_event_handler.h"
namespace pw::rpc::integration_test {
@@ -33,7 +33,12 @@ unit_test::LoggingEventHandler log_test_events;
Client& client() { return context.client(); }
-int GetClientSocketFd() { return context.GetSocketFd(); }
+int SetClientSockOpt(int level,
+ int optname,
+ const void* optval,
+ unsigned int optlen) {
+ return context.SetSockOpt(level, optname, optval, optlen);
+}
void SetEgressChannelManipulator(ChannelManipulator* new_channel_manipulator) {
context.SetEgressChannelManipulator(new_channel_manipulator);
diff --git a/pw_rpc/java/main/dev/pigweed/pw_rpc/BUILD.bazel b/pw_rpc/java/main/dev/pigweed/pw_rpc/BUILD.bazel
index d7923d7d4..72fc45609 100644
--- a/pw_rpc/java/main/dev/pigweed/pw_rpc/BUILD.bazel
+++ b/pw_rpc/java/main/dev/pigweed/pw_rpc/BUILD.bazel
@@ -14,33 +14,35 @@
# Generic client for pw_rpc, Pigweed's RPC system.
+RPC_CLIENT_SOURCES = [
+ "AbstractCall.java",
+ "Call.java",
+ "Channel.java",
+ "ChannelOutputException.java",
+ "Client.java",
+ "Endpoint.java",
+ "FutureCall.java",
+ "Ids.java",
+ "InvalidRpcChannelException.java",
+ "InvalidRpcServiceException.java",
+ "InvalidRpcServiceMethodException.java",
+ "InvalidRpcStateException.java",
+ "Method.java",
+ "MethodClient.java",
+ "Packets.java",
+ "PendingRpc.java",
+ "RpcError.java",
+ "RpcKey.java",
+ "Service.java",
+ "Status.java",
+ "StreamObserver.java",
+ "StreamObserverCall.java",
+ "UnaryResult.java",
+]
+
java_library(
name = "client",
- srcs = [
- "AbstractCall.java",
- "Call.java",
- "Channel.java",
- "ChannelOutputException.java",
- "Client.java",
- "Endpoint.java",
- "FutureCall.java",
- "Ids.java",
- "InvalidRpcChannelException.java",
- "InvalidRpcServiceException.java",
- "InvalidRpcServiceMethodException.java",
- "InvalidRpcStateException.java",
- "Method.java",
- "MethodClient.java",
- "Packets.java",
- "PendingRpc.java",
- "RpcError.java",
- "RpcKey.java",
- "Service.java",
- "Status.java",
- "StreamObserver.java",
- "StreamObserverCall.java",
- "UnaryResult.java",
- ],
+ srcs = RPC_CLIENT_SOURCES,
visibility = ["//visibility:public"],
deps = [
"//pw_log/java/main/dev/pigweed/pw_log",
@@ -51,3 +53,18 @@ java_library(
"@maven//:com_google_guava_guava",
],
)
+
+android_library(
+ name = "client_android",
+ srcs = RPC_CLIENT_SOURCES,
+ tags = ["manual"], # TODO: b/227771184 - support Android in the Bazel build
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pw_log/java/android_main/dev/pigweed/pw_log",
+ "//pw_rpc:packet_proto_java_lite",
+ "//third_party/google_auto:value",
+ "@com_google_protobuf//java/lite",
+ "@maven//:com_google_code_findbugs_jsr305",
+ "@maven//:com_google_guava_guava",
+ ],
+)
diff --git a/pw_rpc/java/test/dev/pigweed/pw_rpc/BUILD.bazel b/pw_rpc/java/test/dev/pigweed/pw_rpc/BUILD.bazel
index 5e16cc71e..b0c3e8c71 100644
--- a/pw_rpc/java/test/dev/pigweed/pw_rpc/BUILD.bazel
+++ b/pw_rpc/java/test/dev/pigweed/pw_rpc/BUILD.bazel
@@ -61,6 +61,7 @@ java_test(
"//pw_rpc/java/main/dev/pigweed/pw_rpc:client",
"@com_google_protobuf//java/lite",
"@maven//:com_google_flogger_flogger_system_backend",
+ "@maven//:com_google_guava_guava",
"@maven//:com_google_truth_truth",
"@maven//:org_mockito_mockito_core",
],
@@ -77,6 +78,7 @@ java_test(
"//pw_rpc/java/main/dev/pigweed/pw_rpc:client",
"@com_google_protobuf//java/lite",
"@maven//:com_google_flogger_flogger_system_backend",
+ "@maven//:com_google_guava_guava",
"@maven//:com_google_truth_truth",
"@maven//:org_mockito_mockito_core",
],
@@ -132,6 +134,7 @@ java_test(
"//pw_rpc:packet_proto_java_lite",
"//pw_rpc/java/main/dev/pigweed/pw_rpc:client",
"@maven//:com_google_flogger_flogger_system_backend",
+ "@maven//:com_google_guava_guava",
"@maven//:com_google_truth_truth",
"@maven//:org_mockito_mockito_core",
],
@@ -148,6 +151,7 @@ java_test(
"//pw_rpc/java/main/dev/pigweed/pw_rpc:client",
"@com_google_protobuf//java/lite",
"@maven//:com_google_flogger_flogger_system_backend",
+ "@maven//:com_google_guava_guava",
"@maven//:com_google_truth_truth",
"@maven//:org_mockito_mockito_core",
],
diff --git a/pw_rpc/method_test.cc b/pw_rpc/method_test.cc
index 39cd87c6c..a71f1bc1b 100644
--- a/pw_rpc/method_test.cc
+++ b/pw_rpc/method_test.cc
@@ -16,11 +16,11 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/method_type.h"
#include "pw_rpc/server.h"
#include "pw_rpc_private/test_method.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/nanopb/BUILD.bazel b/pw_rpc/nanopb/BUILD.bazel
index e28f6bf59..845ab0b16 100644
--- a/pw_rpc/nanopb/BUILD.bazel
+++ b/pw_rpc/nanopb/BUILD.bazel
@@ -14,7 +14,6 @@
load(
"//pw_build:pigweed.bzl",
- "pw_cc_library",
"pw_cc_test",
)
load(
@@ -26,7 +25,7 @@ package(default_visibility = ["//visibility:public"])
licenses(["notice"])
-pw_cc_library(
+cc_library(
name = "server_api",
srcs = [
"method.cc",
@@ -44,7 +43,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_api",
hdrs = ["public/pw_rpc/nanopb/client_reader_writer.h"],
includes = ["public"],
@@ -53,7 +52,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "common",
srcs = ["common.cc"],
hdrs = [
@@ -67,7 +66,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "test_method_context",
hdrs = [
"public/pw_rpc/nanopb/fake_channel_output.h",
@@ -80,7 +79,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_testing",
hdrs = [
"public/pw_rpc/nanopb/client_testing.h",
@@ -93,7 +92,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_server_testing",
hdrs = [
"public/pw_rpc/nanopb/client_server_testing.h",
@@ -105,7 +104,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_server_testing_threaded",
hdrs = [
"public/pw_rpc/nanopb/client_server_testing_threaded.h",
@@ -117,13 +116,13 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "internal_test_utils",
hdrs = ["pw_rpc_nanopb_private/internal_test_utils.h"],
deps = ["//pw_rpc:internal_test_utils"],
)
-pw_cc_library(
+cc_library(
name = "echo_service",
hdrs = ["public/pw_rpc/echo_service_nanopb.h"],
deps = [
diff --git a/pw_rpc/nanopb/BUILD.gn b/pw_rpc/nanopb/BUILD.gn
index 1fb9852d2..2a7eafd4a 100644
--- a/pw_rpc/nanopb/BUILD.gn
+++ b/pw_rpc/nanopb/BUILD.gn
@@ -199,6 +199,12 @@ pw_test("client_call_test") {
]
sources = [ "client_call_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("client_reader_writer_test") {
@@ -209,6 +215,12 @@ pw_test("client_reader_writer_test") {
]
sources = [ "client_reader_writer_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("client_server_context_test") {
@@ -220,6 +232,12 @@ pw_test("client_server_context_test") {
]
sources = [ "client_server_context_test.cc" ]
enable_if = dir_pw_third_party_nanopb != "" && pw_sync_MUTEX_BACKEND != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
_stl_threading_and_nanopb_enabled =
@@ -252,6 +270,12 @@ pw_test("codegen_test") {
]
sources = [ "codegen_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("fake_channel_output_test") {
@@ -274,6 +298,12 @@ pw_test("method_test") {
]
sources = [ "method_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("method_info_test") {
@@ -296,6 +326,12 @@ pw_test("method_lookup_test") {
]
sources = [ "method_lookup_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("method_union_test") {
@@ -307,6 +343,12 @@ pw_test("method_union_test") {
]
sources = [ "method_union_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("echo_service_test") {
@@ -346,6 +388,12 @@ pw_test("server_reader_writer_test") {
]
sources = [ "server_reader_writer_test.cc" ]
enable_if = dir_pw_third_party_nanopb != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("stub_generation_test") {
@@ -366,4 +414,10 @@ pw_test("synchronous_call_test") {
sources = [ "synchronous_call_test.cc" ]
enable_if = dir_pw_third_party_nanopb != "" &&
pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
diff --git a/pw_rpc/nanopb/callback_test.cc b/pw_rpc/nanopb/callback_test.cc
index bfd5088f6..1679a16c7 100644
--- a/pw_rpc/nanopb/callback_test.cc
+++ b/pw_rpc/nanopb/callback_test.cc
@@ -12,7 +12,6 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/client_testing.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
#include "pw_sync/binary_semaphore.h"
@@ -20,6 +19,7 @@
#include "pw_thread/sleep.h"
#include "pw_thread/thread.h"
#include "pw_thread/yield.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/nanopb/client_call_test.cc b/pw_rpc/nanopb/client_call_test.cc
index 19d7c8678..4d915ac2a 100644
--- a/pw_rpc/nanopb/client_call_test.cc
+++ b/pw_rpc/nanopb/client_call_test.cc
@@ -14,11 +14,11 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/nanopb/client_reader_writer.h"
#include "pw_rpc_nanopb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/nanopb/client_integration_test.cc b/pw_rpc/nanopb/client_integration_test.cc
index 6163ba7a0..edf661967 100644
--- a/pw_rpc/nanopb/client_integration_test.cc
+++ b/pw_rpc/nanopb/client_integration_test.cc
@@ -12,11 +12,11 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_rpc/benchmark.rpc.pb.h"
#include "pw_rpc/integration_testing.h"
#include "pw_sync/binary_semaphore.h"
+#include "pw_unit_test/framework.h"
namespace nanopb_rpc_test {
namespace {
diff --git a/pw_rpc/nanopb/client_reader_writer_test.cc b/pw_rpc/nanopb/client_reader_writer_test.cc
index 66da9ce43..153cd88a3 100644
--- a/pw_rpc/nanopb/client_reader_writer_test.cc
+++ b/pw_rpc/nanopb/client_reader_writer_test.cc
@@ -16,9 +16,9 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/client_testing.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/nanopb/client_server_context_test.cc b/pw_rpc/nanopb/client_server_context_test.cc
index 710efb311..a062a4b7a 100644
--- a/pw_rpc/nanopb/client_server_context_test.cc
+++ b/pw_rpc/nanopb/client_server_context_test.cc
@@ -13,10 +13,10 @@
// the License.
#include <array>
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/client_server_testing.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
#include "pw_sync/mutex.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/nanopb/client_server_context_threaded_test.cc b/pw_rpc/nanopb/client_server_context_threaded_test.cc
index 0ebefa48d..b10a42ad3 100644
--- a/pw_rpc/nanopb/client_server_context_threaded_test.cc
+++ b/pw_rpc/nanopb/client_server_context_threaded_test.cc
@@ -14,12 +14,12 @@
#include <atomic>
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/client_server_testing_threaded.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
#include "pw_sync/binary_semaphore.h"
#include "pw_sync/mutex.h"
#include "pw_thread/non_portable_test_thread_options.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/nanopb/codegen_test.cc b/pw_rpc/nanopb/codegen_test.cc
index 8a6b26cb1..2a011967b 100644
--- a/pw_rpc/nanopb/codegen_test.cc
+++ b/pw_rpc/nanopb/codegen_test.cc
@@ -12,13 +12,13 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_preprocessor/compiler.h"
#include "pw_rpc/internal/hash.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/nanopb/test_method_context.h"
#include "pw_rpc_nanopb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/nanopb/echo_service_test.cc b/pw_rpc/nanopb/echo_service_test.cc
index 4d134c23d..31a0a070c 100644
--- a/pw_rpc/nanopb/echo_service_test.cc
+++ b/pw_rpc/nanopb/echo_service_test.cc
@@ -12,9 +12,9 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/echo_service_nanopb.h"
#include "pw_rpc/nanopb/test_method_context.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/nanopb/fake_channel_output_test.cc b/pw_rpc/nanopb/fake_channel_output_test.cc
index dcdf0c096..9ef16d268 100644
--- a/pw_rpc/nanopb/fake_channel_output_test.cc
+++ b/pw_rpc/nanopb/fake_channel_output_test.cc
@@ -18,10 +18,10 @@
#include <cstddef>
#include <memory>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal::test {
namespace {
diff --git a/pw_rpc/nanopb/method_info_test.cc b/pw_rpc/nanopb/method_info_test.cc
index f200a12cf..3fc6b04df 100644
--- a/pw_rpc/nanopb/method_info_test.cc
+++ b/pw_rpc/nanopb/method_info_test.cc
@@ -14,9 +14,9 @@
#include "pw_rpc/internal/method_info.h"
-#include "gtest/gtest.h"
#include "pw_rpc/internal/method_info_tester.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/nanopb/method_lookup_test.cc b/pw_rpc/nanopb/method_lookup_test.cc
index 7d45879fd..ba3247220 100644
--- a/pw_rpc/nanopb/method_lookup_test.cc
+++ b/pw_rpc/nanopb/method_lookup_test.cc
@@ -12,10 +12,10 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/test_method_context.h"
#include "pw_rpc/raw/test_method_context.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/nanopb/method_test.cc b/pw_rpc/nanopb/method_test.cc
index 9b84e1730..c5bbd9822 100644
--- a/pw_rpc/nanopb/method_test.cc
+++ b/pw_rpc/nanopb/method_test.cc
@@ -16,7 +16,6 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_containers/algorithm.h"
#include "pw_rpc/internal/lock.h"
#include "pw_rpc/internal/method_impl_tester.h"
@@ -25,6 +24,7 @@
#include "pw_rpc/service.h"
#include "pw_rpc_nanopb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/nanopb/method_union_test.cc b/pw_rpc/nanopb/method_union_test.cc
index 3b9c2d114..fd61a7af9 100644
--- a/pw_rpc/nanopb/method_union_test.cc
+++ b/pw_rpc/nanopb/method_union_test.cc
@@ -17,10 +17,10 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc_nanopb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
index bc3fbe814..5b2d34f36 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
@@ -58,7 +58,7 @@ class NanopbUnaryResponseClientCall : public UnaryResponseClientCall {
~NanopbUnaryResponseClientCall() { DestroyClientCall(); }
protected:
- constexpr NanopbUnaryResponseClientCall() = default;
+ constexpr NanopbUnaryResponseClientCall() : serde_(nullptr) {}
NanopbUnaryResponseClientCall(LockedEndpoint& client,
uint32_t channel_id,
@@ -151,7 +151,7 @@ class NanopbStreamResponseClientCall : public StreamResponseClientCall {
~NanopbStreamResponseClientCall() { DestroyClientCall(); }
protected:
- constexpr NanopbStreamResponseClientCall() = default;
+ constexpr NanopbStreamResponseClientCall() : serde_(nullptr) {}
NanopbStreamResponseClientCall(NanopbStreamResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock()) {
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h
index ef7bdd929..ea41b5519 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h
@@ -185,6 +185,7 @@ class NanopbMethod : public Method {
id,
SynchronousUnaryInvoker<AllocateSpaceFor<Request<kMethod>>(),
AllocateSpaceFor<Response<kMethod>>()>,
+ MethodType::kUnary,
Function{.synchronous_unary = wrapper},
serde);
}
@@ -209,6 +210,7 @@ class NanopbMethod : public Method {
return NanopbMethod(
id,
AsynchronousUnaryInvoker<AllocateSpaceFor<Request<kMethod>>()>,
+ MethodType::kUnary,
Function{.unary_request = wrapper},
serde);
}
@@ -231,6 +233,7 @@ class NanopbMethod : public Method {
return NanopbMethod(
id,
ServerStreamingInvoker<AllocateSpaceFor<Request<kMethod>>()>,
+ MethodType::kServerStreaming,
Function{.unary_request = wrapper},
serde);
}
@@ -248,6 +251,7 @@ class NanopbMethod : public Method {
};
return NanopbMethod(id,
ClientStreamingInvoker<Request<kMethod>>,
+ MethodType::kClientStreaming,
Function{.stream_request = wrapper},
serde);
}
@@ -266,13 +270,18 @@ class NanopbMethod : public Method {
};
return NanopbMethod(id,
BidirectionalStreamingInvoker<Request<kMethod>>,
+ MethodType::kBidirectionalStreaming,
Function{.stream_request = wrapper},
serde);
}
// Represents an invalid method. Used to reduce error message verbosity.
static constexpr NanopbMethod Invalid() {
- return {0, InvalidInvoker, {}, NanopbMethodSerde(nullptr, nullptr)};
+ return {0,
+ InvalidInvoker,
+ MethodType::kUnary,
+ {},
+ NanopbMethodSerde(nullptr, nullptr)};
}
// Give access to the serializer/deserializer object for converting requests
@@ -313,9 +322,10 @@ class NanopbMethod : public Method {
constexpr NanopbMethod(uint32_t id,
Invoker invoker,
+ MethodType type,
Function function,
const NanopbMethodSerde& serde)
- : Method(id, invoker), function_(function), serde_(serde) {}
+ : Method(id, invoker, type), function_(function), serde_(serde) {}
void CallSynchronousUnary(const CallContext& context,
const Packet& request,
diff --git a/pw_rpc/nanopb/serde_test.cc b/pw_rpc/nanopb/serde_test.cc
index ebc337459..8bcb7771d 100644
--- a/pw_rpc/nanopb/serde_test.cc
+++ b/pw_rpc/nanopb/serde_test.cc
@@ -12,9 +12,9 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/internal/common.h"
#include "pw_rpc_test_protos/test.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/nanopb/server_callback_test.cc b/pw_rpc/nanopb/server_callback_test.cc
index 41bfd5729..57890b7fa 100644
--- a/pw_rpc/nanopb/server_callback_test.cc
+++ b/pw_rpc/nanopb/server_callback_test.cc
@@ -14,12 +14,12 @@
#include <array>
-#include "gtest/gtest.h"
#include "pb_decode.h"
#include "pb_encode.h"
#include "pw_rpc/nanopb/test_method_context.h"
#include "pw_rpc/service.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
diff --git a/pw_rpc/nanopb/server_reader_writer_test.cc b/pw_rpc/nanopb/server_reader_writer_test.cc
index 909063de0..92bf3ed5e 100644
--- a/pw_rpc/nanopb/server_reader_writer_test.cc
+++ b/pw_rpc/nanopb/server_reader_writer_test.cc
@@ -16,11 +16,11 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/nanopb/fake_channel_output.h"
#include "pw_rpc/nanopb/test_method_context.h"
#include "pw_rpc/service.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
diff --git a/pw_rpc/nanopb/stub_generation_test.cc b/pw_rpc/nanopb/stub_generation_test.cc
index 18777bc56..6ae4fd9c7 100644
--- a/pw_rpc/nanopb/stub_generation_test.cc
+++ b/pw_rpc/nanopb/stub_generation_test.cc
@@ -16,8 +16,8 @@
// so that the generated stubs can be tested.
#define _PW_RPC_COMPILE_GENERATED_SERVICE_STUBS
-#include "gtest/gtest.h"
#include "pw_rpc_test_protos/test.rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace {
diff --git a/pw_rpc/nanopb/synchronous_call_test.cc b/pw_rpc/nanopb/synchronous_call_test.cc
index 4ea8402d6..1905f96d4 100644
--- a/pw_rpc/nanopb/synchronous_call_test.cc
+++ b/pw_rpc/nanopb/synchronous_call_test.cc
@@ -16,7 +16,6 @@
#include <chrono>
-#include "gtest/gtest.h"
#include "pw_chrono/system_clock.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/packet.h"
@@ -25,6 +24,7 @@
#include "pw_status/status.h"
#include "pw_status/status_with_size.h"
#include "pw_thread/thread.h"
+#include "pw_unit_test/framework.h"
#include "pw_work_queue/test_thread.h"
#include "pw_work_queue/work_queue.h"
diff --git a/pw_rpc/packet_meta_test.cc b/pw_rpc/packet_meta_test.cc
index 0f8a9683a..bfa943628 100644
--- a/pw_rpc/packet_meta_test.cc
+++ b/pw_rpc/packet_meta_test.cc
@@ -14,9 +14,9 @@
#include "pw_rpc/packet_meta.h"
-#include "gtest/gtest.h"
#include "pw_fuzzer/fuzztest.h"
#include "pw_rpc/internal/packet.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/packet_test.cc b/pw_rpc/packet_test.cc
index b1051950f..c4f211cf8 100644
--- a/pw_rpc/packet_test.cc
+++ b/pw_rpc/packet_test.cc
@@ -14,10 +14,10 @@
#include "pw_rpc/internal/packet.h"
-#include "gtest/gtest.h"
#include "pw_bytes/array.h"
#include "pw_fuzzer/fuzztest.h"
#include "pw_protobuf/wire_format.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
@@ -91,7 +91,7 @@ TEST(Packet, Encode) {
TEST(Packet, Encode_BufferTooSmall) {
byte buffer[2];
- Packet packet(PacketType::RESPONSE, 1, 42, 100, 0, kPayload);
+ Packet packet(PacketType::RESPONSE, 1, 42, 100, 12, kPayload);
auto result = packet.Encode(buffer);
EXPECT_EQ(Status::ResourceExhausted(), result.status());
@@ -172,14 +172,15 @@ constexpr size_t kReservedSize = 2 /* type */ + 2 /* channel */ +
2 /* payload key */ + 2 /* status */;
TEST(Packet, PayloadUsableSpace_ExactFit) {
- EXPECT_EQ(kReservedSize,
- Packet(PacketType::RESPONSE, 1, 42, 100).MinEncodedSizeBytes());
+ EXPECT_EQ(
+ kReservedSize,
+ Packet(PacketType::RESPONSE, 1, 42, 100, 28282).MinEncodedSizeBytes());
}
TEST(Packet, PayloadUsableSpace_LargerVarints) {
- EXPECT_EQ(
- kReservedSize + 2 /* channel */, // service and method are Fixed32
- Packet(PacketType::RESPONSE, 17000, 200, 200).MinEncodedSizeBytes());
+ EXPECT_EQ(kReservedSize + 2 /* channel */, // service and method are Fixed32
+ Packet(PacketType::RESPONSE, 17000, 200, 200, 28282)
+ .MinEncodedSizeBytes());
}
} // namespace
diff --git a/pw_rpc/public/pw_rpc/integration_test_socket_client.h b/pw_rpc/public/pw_rpc/integration_test_socket_client.h
index f8847874b..b8aabe98b 100644
--- a/pw_rpc/public/pw_rpc/integration_test_socket_client.h
+++ b/pw_rpc/public/pw_rpc/integration_test_socket_client.h
@@ -18,9 +18,10 @@
#include <optional>
#include <thread>
+#include "pw_hdlc/decoder.h"
+#include "pw_hdlc/default_addresses.h"
#include "pw_hdlc/encoded_size.h"
#include "pw_hdlc/rpc_channel.h"
-#include "pw_hdlc/rpc_packets.h"
#include "pw_rpc/integration_testing.h"
#include "pw_span/span.h"
#include "pw_status/try.h"
@@ -53,17 +54,21 @@ class SocketClientContext {
}
// Terminates the client, joining the RPC dispatch thread.
- //
- // WARNING: This may block forever if the socket is configured to block
- // indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
- // nonzero timeout will allow the dispatch thread to always return.
void Terminate() {
PW_ASSERT(rpc_dispatch_thread_handle_.has_value());
should_terminate_.test_and_set();
+ // Close the stream to avoid blocking forever on a socket read.
+ stream_.Close();
rpc_dispatch_thread_handle_->join();
}
- int GetSocketFd() { return stream_.connection_fd(); }
+ // Configure options for the socket associated with the client.
+ int SetSockOpt(int level,
+ int optname,
+ const void* optval,
+ unsigned int optlen) {
+ return stream_.SetSockOpt(level, optname, optval, optlen);
+ }
void SetEgressChannelManipulator(
ChannelManipulator* new_channel_manipulator) {
diff --git a/pw_rpc/public/pw_rpc/integration_testing.h b/pw_rpc/public/pw_rpc/integration_testing.h
index 8d6dd17c8..2071c53cc 100644
--- a/pw_rpc/public/pw_rpc/integration_testing.h
+++ b/pw_rpc/public/pw_rpc/integration_testing.h
@@ -85,9 +85,11 @@ void SetIngressChannelManipulator(ChannelManipulator* new_channel_manipulator);
// Returns the global RPC client for integration test use.
Client& client();
-// The file descriptor for the socket associated with the client. This may be
-// used to configure socket options.
-int GetClientSocketFd();
+// Configure options for the socket associated with the client.
+int SetClientSockOpt(int level,
+ int optname,
+ const void* optval,
+ unsigned int optlen);
// Initializes logging and the global RPC client for integration testing. Starts
// a background thread that processes incoming.
@@ -98,10 +100,6 @@ Status InitializeClient(int argc,
Status InitializeClient(int port);
// Terminates the client, joining the RPC dispatch thread.
-//
-// WARNING: This may block forever if the socket is configured to block
-// indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
-// nonzero timeout will allow the dispatch thread to always return.
void TerminateClient();
} // namespace pw::rpc::integration_test
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index bb6cdc7e1..4c3b0d2f6 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -27,14 +27,12 @@
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/method_type.h"
#include "pw_rpc/service.h"
+#include "pw_rpc/writer.h"
#include "pw_span/span.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"
namespace pw::rpc {
-
-class Writer;
-
namespace internal {
class Endpoint;
@@ -94,6 +92,10 @@ class CallProperties {
// it will match a calls with this ID if one exists.
inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max();
+// Legacy clients and servers didn't make use of call IDs at all, and will send
+// unrequested responses with an "empty" (zero) call ID.
+inline constexpr uint32_t kLegacyOpenCallId = 0;
+
// Internal RPC Call class. The Call is used to respond to any type of RPC.
// Public classes like ServerWriters inherit from it with private inheritance
// and provide a public API for their use case. The Call's public API is used by
@@ -115,7 +117,7 @@ inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max();
// At the top level, `ServerCall` and `ClientCall` invoke `DestroyServerCall`
// `DestroyClientCall` respectively to perform cleanup in the case where no
// subclass carries additional state.
-class Call : public IntrusiveList<Call>::Item {
+class Call : public IntrusiveList<Call>::Item, private rpc::Writer {
public:
Call(const Call&) = delete;
@@ -393,10 +395,9 @@ class Call : public IntrusiveList<Call>::Item {
// active or inactive when this is called.
void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
- // Define conversions to the generic server/client RPC writer class. These
- // functions are defined in pw_rpc/writer.h after the Writer class is defined.
- constexpr operator Writer&();
- constexpr operator const Writer&() const;
+ // Define conversions to the generic server/client RPC writer class.
+ constexpr Writer& as_writer() { return *this; }
+ constexpr const Writer& as_writer() const { return *this; }
// Indicates if the on_next and unary on_completed callbacks are internal
// wrappers that decode the raw proto before invoking the user's callback. If
@@ -478,6 +479,8 @@ class Call : public IntrusiveList<Call>::Item {
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
private:
+ friend class rpc::Writer;
+
enum State : uint8_t {
kActive = 0b001,
kClientRequestedCompletion = 0b010,
@@ -575,4 +578,17 @@ class Call : public IntrusiveList<Call>::Item {
};
} // namespace internal
+
+inline bool Writer::active() const {
+ return static_cast<const internal::Call*>(this)->active();
+}
+
+inline uint32_t Writer::channel_id() const {
+ return static_cast<const internal::Call*>(this)->channel_id();
+}
+
+inline Status Writer::Write(ConstByteSpan payload) {
+ return static_cast<internal::Call*>(this)->Write(payload);
+}
+
} // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/internal/config.h b/pw_rpc/public/pw_rpc/internal/config.h
index dec868b05..59c848a8b 100644
--- a/pw_rpc/public/pw_rpc/internal/config.h
+++ b/pw_rpc/public/pw_rpc/internal/config.h
@@ -43,6 +43,14 @@
#define PW_RPC_COMPLETION_REQUEST_CALLBACK 0
#endif // PW_RPC_COMPLETION_REQUEST_CALLBACK
+/// pw_rpc Method's can include their MethodType as a runtime accessible
+/// variable.
+///
+/// This isn't needed for most applications so is disabled by default.
+#ifndef PW_RPC_METHOD_STORES_TYPE
+#define PW_RPC_METHOD_STORES_TYPE 0
+#endif // PW_RPC_METHOD_STORES_TYPE
+
/// The Nanopb-based pw_rpc implementation allocates memory to use for Nanopb
/// structs for the request and response protobufs. The template function that
/// allocates these structs rounds struct sizes up to this value so that
@@ -239,6 +247,9 @@ constexpr std::bool_constant<PW_RPC_COMPLETION_REQUEST_CALLBACK>
kClientStreamEndCallbackEnabled;
template <typename...>
+constexpr std::bool_constant<PW_RPC_METHOD_STORES_TYPE> kMethodStoresType;
+
+template <typename...>
constexpr std::bool_constant<PW_RPC_DYNAMIC_ALLOCATION>
kDynamicAllocationEnabled;
diff --git a/pw_rpc/public/pw_rpc/internal/endpoint.h b/pw_rpc/public/pw_rpc/internal/endpoint.h
index 93a8239a0..43aecb65f 100644
--- a/pw_rpc/public/pw_rpc/internal/endpoint.h
+++ b/pw_rpc/public/pw_rpc/internal/endpoint.h
@@ -180,7 +180,16 @@ class Endpoint {
// Call IDs are varint encoded. Limit the varint size to 2 bytes (14 usable
// bits).
constexpr uint32_t kMaxCallId = 1 << 14;
- return (++next_call_id_) % kMaxCallId;
+ auto call_id = next_call_id_;
+ next_call_id_ = (next_call_id_ + 1) % kMaxCallId;
+
+ // Skip call_id `0` to avoid confusion with legacy servers which use
+ // call_id `0` as `kOpenCallId` or which do not provide call_id at all.
+ if (next_call_id_ == 0) {
+ next_call_id_ = 1;
+ }
+
+ return call_id;
}
// Adds a call to the internal call registry. If a matching call already
@@ -245,7 +254,9 @@ class Endpoint {
// problematic.
IntrusiveList<Call> to_cleanup_ PW_GUARDED_BY(rpc_lock());
- uint32_t next_call_id_ PW_GUARDED_BY(rpc_lock()) = 0;
+ // Skip call_id `0` to avoid confusion with legacy servers which use
+ // call_id `0` as `kOpenCallId` or which do not provide call_id at all.
+ uint32_t next_call_id_ PW_GUARDED_BY(rpc_lock()) = 1;
};
// An `Endpoint` indicating that `rpc_lock()` is held.
diff --git a/pw_rpc/public/pw_rpc/internal/grpc.h b/pw_rpc/public/pw_rpc/internal/grpc.h
new file mode 100644
index 000000000..51a267888
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/grpc.h
@@ -0,0 +1,21 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+namespace pw::grpc {
+
+// Forward declarations to provide friend access to private pw::rpc methods.
+class PwRpcHandler;
+
+} // namespace pw::grpc
diff --git a/pw_rpc/public/pw_rpc/internal/hash.h b/pw_rpc/public/pw_rpc/internal/hash.h
index ae4efe903..2c65e7c8f 100644
--- a/pw_rpc/public/pw_rpc/internal/hash.h
+++ b/pw_rpc/public/pw_rpc/internal/hash.h
@@ -13,6 +13,7 @@
// the License.
#pragma once
+#include <cstdint>
#include <string_view>
#include "pw_preprocessor/compiler.h"
diff --git a/pw_rpc/public/pw_rpc/internal/method.h b/pw_rpc/public/pw_rpc/internal/method.h
index 58fb4e331..ac08e32bc 100644
--- a/pw_rpc/public/pw_rpc/internal/method.h
+++ b/pw_rpc/public/pw_rpc/internal/method.h
@@ -19,6 +19,7 @@
#include "pw_rpc/internal/call_context.h"
#include "pw_rpc/internal/lock.h"
+#include "pw_rpc/method_type.h"
namespace pw::rpc {
@@ -62,6 +63,18 @@ class Method {
public:
constexpr uint32_t id() const { return id_; }
+ template <typename UnusedType = void>
+ constexpr MethodType type() const {
+ static_assert(cfg::kMethodStoresType<UnusedType>,
+ "The MethodType accessor is disabled. To enable set "
+ "PW_RPC_METHOD_STORES_TYPE to 1.");
+#if PW_RPC_METHOD_STORES_TYPE
+ return type_;
+#else
+ return MethodType::kUnary;
+#endif
+ }
+
// The pw::rpc::Server calls method.Invoke to call a user-defined RPC. Invoke
// calls the invoker function, which handles the RPC request and response
// according to the RPC type and protobuf implementation and calls the
@@ -79,11 +92,24 @@ class Method {
static constexpr void InvalidInvoker(const CallContext&, const Packet&) {}
- constexpr Method(uint32_t id, Invoker invoker) : id_(id), invoker_(invoker) {}
+ constexpr Method(uint32_t id,
+ Invoker invoker,
+ [[maybe_unused]] MethodType type)
+ : id_(id),
+ invoker_(invoker)
+#if PW_RPC_METHOD_STORES_TYPE
+ ,
+ type_(type)
+#endif
+ {
+ }
private:
uint32_t id_;
Invoker invoker_;
+#if PW_RPC_METHOD_STORES_TYPE
+ MethodType type_;
+#endif
};
// MethodTraits inspects an RPC implementation function. It determines which
diff --git a/pw_rpc/public/pw_rpc/internal/method_impl_tester.h b/pw_rpc/public/pw_rpc/internal/method_impl_tester.h
index 2b411d2a6..9fce90942 100644
--- a/pw_rpc/public/pw_rpc/internal/method_impl_tester.h
+++ b/pw_rpc/public/pw_rpc/internal/method_impl_tester.h
@@ -16,9 +16,9 @@
#include <tuple>
#include <type_traits>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/raw/internal/method.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
diff --git a/pw_rpc/public/pw_rpc/internal/packet.h b/pw_rpc/public/pw_rpc/internal/packet.h
index 237356164..6e4bcd71b 100644
--- a/pw_rpc/public/pw_rpc/internal/packet.h
+++ b/pw_rpc/public/pw_rpc/internal/packet.h
@@ -86,14 +86,17 @@ class Packet {
// Creates an empty packet.
constexpr Packet()
- : Packet(
- pwpb::PacketType{}, kUnassignedId, kUnassignedId, kUnassignedId) {}
+ : Packet(pwpb::PacketType{},
+ kUnassignedId,
+ kUnassignedId,
+ kUnassignedId,
+ kUnassignedId) {}
constexpr Packet(pwpb::PacketType type,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id,
- uint32_t call_id = kUnassignedId,
+ uint32_t call_id,
ConstByteSpan payload = {},
Status status = OkStatus())
: type_(type),
diff --git a/pw_rpc/public/pw_rpc/internal/test_utils.h b/pw_rpc/public/pw_rpc/internal/test_utils.h
index 773b18e42..9e5bc7a05 100644
--- a/pw_rpc/public/pw_rpc/internal/test_utils.h
+++ b/pw_rpc/public/pw_rpc/internal/test_utils.h
@@ -20,7 +20,6 @@
#include <cstddef>
#include <cstdint>
-#include "gtest/gtest.h"
#include "pw_assert/assert.h"
#include "pw_rpc/client.h"
#include "pw_rpc/internal/channel.h"
@@ -29,6 +28,7 @@
#include "pw_rpc/raw/fake_channel_output.h"
#include "pw_rpc/server.h"
#include "pw_span/span.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
@@ -40,7 +40,10 @@ class TestServer : public Server {
using Server::FindCall;
};
-template <typename Service, uint32_t kChannelId = 99, uint32_t kServiceId = 16>
+template <typename Service,
+ uint32_t kChannelId = 99,
+ uint32_t kServiceId = 16,
+ uint32_t kDefaultCallId = 437>
class ServerContextForTest {
public:
static constexpr uint32_t channel_id() { return kChannelId; }
@@ -50,7 +53,7 @@ class ServerContextForTest {
: channel_(Channel::Create<kChannelId>(&output_)),
server_(span(&channel_, 1)),
service_(kServiceId),
- context_(server_, channel_.id(), service_, method, 0) {
+ context_(server_, channel_.id(), service_, method, kDefaultCallId) {
server_.RegisterService(service_);
}
@@ -60,7 +63,7 @@ class ServerContextForTest {
kChannelId,
kServiceId,
context_.method().id(),
- 0,
+ kDefaultCallId,
payload);
}
@@ -70,7 +73,7 @@ class ServerContextForTest {
kChannelId,
kServiceId,
context_.method().id(),
- 0,
+ kDefaultCallId,
payload,
status);
}
@@ -80,7 +83,7 @@ class ServerContextForTest {
kChannelId,
kServiceId,
context_.method().id(),
- 0,
+ kDefaultCallId,
payload);
}
@@ -89,11 +92,11 @@ class ServerContextForTest {
kChannelId,
kServiceId,
context_.method().id(),
- 0,
+ kDefaultCallId,
payload);
}
- CallContext get(uint32_t id = 0) const {
+ CallContext get(uint32_t id = kDefaultCallId) const {
return CallContext(context_.server(),
context_.channel_id(),
context_.service(),
diff --git a/pw_rpc/public/pw_rpc/server.h b/pw_rpc/public/pw_rpc/server.h
index 2369c46a1..3fabe74b6 100644
--- a/pw_rpc/public/pw_rpc/server.h
+++ b/pw_rpc/public/pw_rpc/server.h
@@ -21,6 +21,7 @@
#include "pw_rpc/internal/call.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/endpoint.h"
+#include "pw_rpc/internal/grpc.h"
#include "pw_rpc/internal/lock.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/method_info.h"
@@ -99,6 +100,10 @@ class Server : public internal::Endpoint {
private:
friend class internal::Call;
+ friend class ServerTestHelper;
+
+ // Give gRPC integration access to FindMethod
+ friend class pw::grpc::PwRpcHandler;
// Give call classes access to OpenCall.
friend class RawServerReaderWriter;
@@ -167,10 +172,21 @@ class Server : public internal::Endpoint {
return call;
}
- std::tuple<Service*, const internal::Method*> FindMethod(
- const internal::Packet& packet)
+ std::tuple<Service*, const internal::Method*> FindMethod(uint32_t service_id,
+ uint32_t method_id)
+ PW_LOCKS_EXCLUDED(internal::rpc_lock());
+
+ std::tuple<Service*, const internal::Method*> FindMethodLocked(
+ uint32_t service_id, uint32_t method_id)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock());
+ std::tuple<Service*, const internal::Method*> FindMethodLocked(
+ const internal::Packet& packet)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) {
+ // Packets always include service and method IDs.
+ return FindMethodLocked(packet.service_id(), packet.method_id());
+ }
+
void HandleCompletionRequest(const internal::Packet& packet,
internal::Channel& channel,
IntrusiveList<internal::Call>::iterator call)
diff --git a/pw_rpc/public/pw_rpc/writer.h b/pw_rpc/public/pw_rpc/writer.h
index 57b1e63d2..3774af100 100644
--- a/pw_rpc/public/pw_rpc/writer.h
+++ b/pw_rpc/public/pw_rpc/writer.h
@@ -13,41 +13,41 @@
// the License.
#pragma once
-#include "pw_rpc/internal/call.h"
+#include <cstdint>
+
+#include "pw_bytes/span.h"
+#include "pw_rpc/internal/lock.h"
+#include "pw_status/status.h"
namespace pw::rpc {
+namespace internal {
+class Call;
+}
// The Writer class allows writing requests or responses to a streaming RPC.
// ClientWriter, ClientReaderWriter, ServerWriter, and ServerReaderWriter
// classes can be used as a generic Writer.
-class Writer final : private internal::Call {
+class Writer {
public:
- // Writers cannot be created directly. They may only be used as a reference to
- // an existing call object.
- Writer() = delete;
-
Writer(const Writer&) = delete;
Writer(Writer&&) = delete;
Writer& operator=(const Writer&) = delete;
Writer& operator=(Writer&&) = delete;
- using internal::Call::active;
- using internal::Call::channel_id;
+ bool active() const;
+ uint32_t channel_id() const;
- using internal::Call::Write;
+ Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(internal::rpc_lock());
private:
+ // Only allow Call to inherit from Writer. This guarantees that Writers can
+ // always safely downcast to Call.
friend class internal::Call;
-};
-namespace internal {
-
-constexpr Call::operator Writer&() { return static_cast<Writer&>(*this); }
-
-constexpr Call::operator const Writer&() const {
- return static_cast<const Writer&>(*this);
-}
+ // Writers cannot be created directly. They may only be used as a reference to
+ // an existing call object.
+ constexpr Writer() = default;
+};
-} // namespace internal
} // namespace pw::rpc
diff --git a/pw_rpc/pw_rpc_private/test_method.h b/pw_rpc/pw_rpc_private/test_method.h
index 1c4ea5b0a..231402368 100644
--- a/pw_rpc/pw_rpc_private/test_method.h
+++ b/pw_rpc/pw_rpc_private/test_method.h
@@ -34,7 +34,7 @@ namespace pw::rpc::internal {
class TestMethod : public Method {
public:
constexpr TestMethod(uint32_t id, MethodType type = MethodType::kUnary)
- : Method(id, GetInvoker(type)),
+ : Method(id, GetInvoker(type), type),
last_channel_id_(0),
invocations_(0),
move_to_call_(nullptr) {}
diff --git a/pw_rpc/pwpb/BUILD.bazel b/pw_rpc/pwpb/BUILD.bazel
index 0fd8ff726..b8c62044c 100644
--- a/pw_rpc/pwpb/BUILD.bazel
+++ b/pw_rpc/pwpb/BUILD.bazel
@@ -14,7 +14,6 @@
load(
"//pw_build:pigweed.bzl",
- "pw_cc_library",
"pw_cc_test",
)
load(
@@ -26,7 +25,7 @@ package(default_visibility = ["//visibility:public"])
licenses(["notice"])
-pw_cc_library(
+cc_library(
name = "server_api",
srcs = [
"server_reader_writer.cc",
@@ -43,7 +42,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_api",
hdrs = [
"public/pw_rpc/pwpb/client_reader_writer.h",
@@ -54,7 +53,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "common",
hdrs = [
"public/pw_rpc/pwpb/internal/common.h",
@@ -67,7 +66,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "test_method_context",
hdrs = [
"public/pw_rpc/pwpb/fake_channel_output.h",
@@ -84,7 +83,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_testing",
hdrs = [
"public/pw_rpc/pwpb/client_testing.h",
@@ -97,7 +96,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_server_testing",
hdrs = [
"public/pw_rpc/pwpb/client_server_testing.h",
@@ -110,7 +109,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_server_testing_threaded",
hdrs = [
"public/pw_rpc/pwpb/client_server_testing_threaded.h",
@@ -122,13 +121,13 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "internal_test_utils",
hdrs = ["pw_rpc_pwpb_private/internal_test_utils.h"],
deps = ["//pw_rpc:internal_test_utils"],
)
-pw_cc_library(
+cc_library(
name = "echo_service",
hdrs = ["public/pw_rpc/echo_service_pwpb.h"],
deps = [
diff --git a/pw_rpc/pwpb/BUILD.gn b/pw_rpc/pwpb/BUILD.gn
index e036a8019..d5f1cecfe 100644
--- a/pw_rpc/pwpb/BUILD.gn
+++ b/pw_rpc/pwpb/BUILD.gn
@@ -178,6 +178,12 @@ pw_test("client_call_test") {
"..:test_utils",
]
sources = [ "client_call_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("client_reader_writer_test") {
@@ -189,6 +195,12 @@ pw_test("client_reader_writer_test") {
]
enable_if = pw_sync_MUTEX_BACKEND != ""
sources = [ "client_reader_writer_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("client_server_context_test") {
@@ -198,6 +210,12 @@ pw_test("client_server_context_test") {
"..:test_protos.pwpb_rpc",
]
sources = [ "client_server_context_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
_stl_threading_enabled =
@@ -228,6 +246,12 @@ pw_test("codegen_test") {
"..:test_utils",
]
sources = [ "codegen_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("echo_service_test") {
@@ -237,6 +261,12 @@ pw_test("echo_service_test") {
":test_method_context",
]
sources = [ "echo_service_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("fake_channel_output_test") {
@@ -257,6 +287,12 @@ pw_test("method_test") {
"..:test_utils",
]
sources = [ "method_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("method_info_test") {
@@ -277,6 +313,12 @@ pw_test("method_lookup_test") {
"../raw:test_method_context",
]
sources = [ "method_lookup_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("method_union_test") {
@@ -287,6 +329,12 @@ pw_test("method_union_test") {
"..:test_utils",
]
sources = [ "method_union_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("serde_test") {
@@ -313,6 +361,12 @@ pw_test("server_reader_writer_test") {
"..:test_protos.pwpb_rpc",
]
sources = [ "server_reader_writer_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("stub_generation_test") {
@@ -333,4 +387,10 @@ pw_test("synchronous_call_test") {
]
sources = [ "synchronous_call_test.cc" ]
enable_if = pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
diff --git a/pw_rpc/pwpb/client_call_test.cc b/pw_rpc/pwpb/client_call_test.cc
index e382ea82e..2d145210d 100644
--- a/pw_rpc/pwpb/client_call_test.cc
+++ b/pw_rpc/pwpb/client_call_test.cc
@@ -14,11 +14,11 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/pwpb/client_reader_writer.h"
#include "pw_rpc_pwpb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pwpb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/pwpb/client_integration_test.cc b/pw_rpc/pwpb/client_integration_test.cc
index af5bfc745..ec3a111af 100644
--- a/pw_rpc/pwpb/client_integration_test.cc
+++ b/pw_rpc/pwpb/client_integration_test.cc
@@ -12,11 +12,11 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_rpc/benchmark.rpc.pwpb.h"
#include "pw_rpc/integration_testing.h"
#include "pw_sync/binary_semaphore.h"
+#include "pw_unit_test/framework.h"
namespace pwpb_rpc_test {
namespace {
diff --git a/pw_rpc/pwpb/client_reader_writer_test.cc b/pw_rpc/pwpb/client_reader_writer_test.cc
index f22313d82..11184a612 100644
--- a/pw_rpc/pwpb/client_reader_writer_test.cc
+++ b/pw_rpc/pwpb/client_reader_writer_test.cc
@@ -16,9 +16,9 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/pwpb/client_testing.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/pwpb/client_server_context_test.cc b/pw_rpc/pwpb/client_server_context_test.cc
index 184bc3948..c22c99906 100644
--- a/pw_rpc/pwpb/client_server_context_test.cc
+++ b/pw_rpc/pwpb/client_server_context_test.cc
@@ -15,10 +15,10 @@
#include <mutex>
#include <utility>
-#include "gtest/gtest.h"
#include "pw_rpc/pwpb/client_server_testing.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
#include "pw_sync/mutex.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/pwpb/client_server_context_threaded_test.cc b/pw_rpc/pwpb/client_server_context_threaded_test.cc
index d9d6f7f65..269ea400b 100644
--- a/pw_rpc/pwpb/client_server_context_threaded_test.cc
+++ b/pw_rpc/pwpb/client_server_context_threaded_test.cc
@@ -15,12 +15,12 @@
#include <atomic>
#include <iostream>
-#include "gtest/gtest.h"
#include "pw_function/function.h"
#include "pw_rpc/pwpb/client_server_testing_threaded.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
#include "pw_sync/binary_semaphore.h"
#include "pw_thread/non_portable_test_thread_options.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/pwpb/codegen_test.cc b/pw_rpc/pwpb/codegen_test.cc
index cd5b29b19..9c91980b3 100644
--- a/pw_rpc/pwpb/codegen_test.cc
+++ b/pw_rpc/pwpb/codegen_test.cc
@@ -12,13 +12,13 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_preprocessor/compiler.h"
#include "pw_rpc/internal/hash.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/pwpb/test_method_context.h"
#include "pw_rpc_pwpb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/pwpb/echo_service_test.cc b/pw_rpc/pwpb/echo_service_test.cc
index 80651c65c..3e1b0580b 100644
--- a/pw_rpc/pwpb/echo_service_test.cc
+++ b/pw_rpc/pwpb/echo_service_test.cc
@@ -14,9 +14,9 @@
#include <string_view>
-#include "gtest/gtest.h"
#include "pw_rpc/echo_service_pwpb.h"
#include "pw_rpc/pwpb/test_method_context.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/pwpb/fake_channel_output_test.cc b/pw_rpc/pwpb/fake_channel_output_test.cc
index 039092b9f..410123fd2 100644
--- a/pw_rpc/pwpb/fake_channel_output_test.cc
+++ b/pw_rpc/pwpb/fake_channel_output_test.cc
@@ -18,10 +18,10 @@
#include <cstddef>
#include <memory>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/pwpb/method_info_test.cc b/pw_rpc/pwpb/method_info_test.cc
index 8102be561..0379e5f74 100644
--- a/pw_rpc/pwpb/method_info_test.cc
+++ b/pw_rpc/pwpb/method_info_test.cc
@@ -14,10 +14,10 @@
#include "pw_rpc/internal/method_info.h"
-#include "gtest/gtest.h"
#include "pw_rpc/internal/method_info_tester.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
#include "pw_status/status.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
diff --git a/pw_rpc/pwpb/method_lookup_test.cc b/pw_rpc/pwpb/method_lookup_test.cc
index d0a578e6a..b33204350 100644
--- a/pw_rpc/pwpb/method_lookup_test.cc
+++ b/pw_rpc/pwpb/method_lookup_test.cc
@@ -12,10 +12,10 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/pwpb/test_method_context.h"
#include "pw_rpc/raw/test_method_context.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/pwpb/method_test.cc b/pw_rpc/pwpb/method_test.cc
index 76f728f59..395b10f81 100644
--- a/pw_rpc/pwpb/method_test.cc
+++ b/pw_rpc/pwpb/method_test.cc
@@ -16,7 +16,6 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_containers/algorithm.h"
#include "pw_rpc/internal/lock.h"
#include "pw_rpc/internal/method_impl_tester.h"
@@ -25,6 +24,7 @@
#include "pw_rpc/service.h"
#include "pw_rpc_pwpb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pwpb.h"
+#include "pw_unit_test/framework.h"
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
diff --git a/pw_rpc/pwpb/method_union_test.cc b/pw_rpc/pwpb/method_union_test.cc
index f474f15e1..875423d9f 100644
--- a/pw_rpc/pwpb/method_union_test.cc
+++ b/pw_rpc/pwpb/method_union_test.cc
@@ -17,11 +17,11 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/service.h"
#include "pw_rpc_pwpb_private/internal_test_utils.h"
#include "pw_rpc_test_protos/test.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
index d47582165..d1fc8d9d3 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
@@ -90,7 +90,7 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall {
protected:
// Derived classes allow default construction so that users can declare a
// variable into which to move client reader/writers from RPC calls.
- constexpr PwpbUnaryResponseClientCall() = default;
+ constexpr PwpbUnaryResponseClientCall() : serde_(nullptr) {}
PwpbUnaryResponseClientCall(LockedEndpoint& client,
uint32_t channel_id,
@@ -252,7 +252,7 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall {
protected:
// Derived classes allow default construction so that users can declare a
// variable into which to move client reader/writers from RPC calls.
- constexpr PwpbStreamResponseClientCall() = default;
+ constexpr PwpbStreamResponseClientCall() : serde_(nullptr) {}
PwpbStreamResponseClientCall(LockedEndpoint& client,
uint32_t channel_id,
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/internal/method.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/internal/method.h
index a8a7b9903..12ad47619 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/internal/method.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/internal/method.h
@@ -92,6 +92,7 @@ class PwpbMethod : public Method {
return PwpbMethod(
id,
SynchronousUnaryInvoker<Request<kMethod>, Response<kMethod>>,
+ MethodType::kUnary,
Function{.synchronous_unary = wrapper},
serde);
}
@@ -120,6 +121,7 @@ class PwpbMethod : public Method {
};
return PwpbMethod(id,
AsynchronousUnaryInvoker<Request<kMethod>>,
+ MethodType::kUnary,
Function{.unary_request = wrapper},
serde);
}
@@ -146,6 +148,7 @@ class PwpbMethod : public Method {
};
return PwpbMethod(id,
ServerStreamingInvoker<Request<kMethod>>,
+ MethodType::kServerStreaming,
Function{.unary_request = wrapper},
serde);
}
@@ -171,6 +174,7 @@ class PwpbMethod : public Method {
};
return PwpbMethod(id,
ClientStreamingInvoker<Request<kMethod>>,
+ MethodType::kClientStreaming,
Function{.stream_request = wrapper},
serde);
}
@@ -196,13 +200,18 @@ class PwpbMethod : public Method {
};
return PwpbMethod(id,
BidirectionalStreamingInvoker<Request<kMethod>>,
+ MethodType::kBidirectionalStreaming,
Function{.stream_request = wrapper},
serde);
}
// Represents an invalid method. Used to reduce error message verbosity.
static constexpr PwpbMethod Invalid() {
- return {0, InvalidInvoker, {}, PwpbMethodSerde(nullptr, nullptr)};
+ return {0,
+ InvalidInvoker,
+ MethodType::kUnary,
+ {},
+ PwpbMethodSerde(nullptr, nullptr)};
}
// Give access to the serializer/deserializer object for converting requests
@@ -236,9 +245,10 @@ class PwpbMethod : public Method {
constexpr PwpbMethod(uint32_t id,
Invoker invoker,
+ MethodType type,
Function function,
const PwpbMethodSerde& serde)
- : Method(id, invoker), function_(function), serde_(serde) {}
+ : Method(id, invoker, type), function_(function), serde_(serde) {}
template <typename Request, typename Response>
void CallSynchronousUnary(const CallContext& context,
diff --git a/pw_rpc/pwpb/serde_test.cc b/pw_rpc/pwpb/serde_test.cc
index d9da19749..cc4e31c39 100644
--- a/pw_rpc/pwpb/serde_test.cc
+++ b/pw_rpc/pwpb/serde_test.cc
@@ -12,11 +12,11 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "gtest/gtest.h"
#include "pw_rpc/pwpb/internal/common.h"
#include "pw_rpc_test_protos/test.pwpb.h"
#include "pw_span/span.h"
#include "pw_status/status_with_size.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/pwpb/server_callback_test.cc b/pw_rpc/pwpb/server_callback_test.cc
index 780279a3d..2a174f39e 100644
--- a/pw_rpc/pwpb/server_callback_test.cc
+++ b/pw_rpc/pwpb/server_callback_test.cc
@@ -14,11 +14,11 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_containers/vector.h"
#include "pw_rpc/pwpb/test_method_context.h"
#include "pw_rpc/service.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/pwpb/server_reader_writer_test.cc b/pw_rpc/pwpb/server_reader_writer_test.cc
index 07b96c826..80cd0e58d 100644
--- a/pw_rpc/pwpb/server_reader_writer_test.cc
+++ b/pw_rpc/pwpb/server_reader_writer_test.cc
@@ -16,11 +16,11 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/pwpb/fake_channel_output.h"
#include "pw_rpc/pwpb/test_method_context.h"
#include "pw_rpc/service.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/pwpb/stub_generation_test.cc b/pw_rpc/pwpb/stub_generation_test.cc
index cf1919ff8..32894b5d7 100644
--- a/pw_rpc/pwpb/stub_generation_test.cc
+++ b/pw_rpc/pwpb/stub_generation_test.cc
@@ -16,8 +16,8 @@
// so that the generated stubs can be tested.
#define _PW_RPC_COMPILE_GENERATED_SERVICE_STUBS
-#include "gtest/gtest.h"
#include "pw_rpc_test_protos/test.rpc.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace {
diff --git a/pw_rpc/pwpb/synchronous_call_test.cc b/pw_rpc/pwpb/synchronous_call_test.cc
index b5f18e658..e657b667a 100644
--- a/pw_rpc/pwpb/synchronous_call_test.cc
+++ b/pw_rpc/pwpb/synchronous_call_test.cc
@@ -16,7 +16,6 @@
#include <chrono>
-#include "gtest/gtest.h"
#include "pw_chrono/system_clock.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/packet.h"
@@ -26,6 +25,7 @@
#include "pw_status/status.h"
#include "pw_status/status_with_size.h"
#include "pw_thread/thread.h"
+#include "pw_unit_test/framework.h"
#include "pw_work_queue/test_thread.h"
#include "pw_work_queue/work_queue.h"
diff --git a/pw_rpc/py/BUILD.bazel b/pw_rpc/py/BUILD.bazel
index ed78e3065..fb8f59cad 100644
--- a/pw_rpc/py/BUILD.bazel
+++ b/pw_rpc/py/BUILD.bazel
@@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations under
# the License.
-load("@rules_python//python:defs.bzl", "py_binary")
+load("@rules_python//python:defs.bzl", "py_binary", "py_library", "py_test")
package(default_visibility = ["//visibility:public"])
diff --git a/pw_rpc/py/pw_rpc/client.py b/pw_rpc/py/pw_rpc/client.py
index df7413751..068d698af 100644
--- a/pw_rpc/py/pw_rpc/client.py
+++ b/pw_rpc/py/pw_rpc/client.py
@@ -37,6 +37,7 @@ _LOG = logging.getLogger(__package__)
# Calls with ID of `kOpenCallId` were unrequested, and are updated to have the
# call ID of the first matching request.
+LEGACY_OPEN_CALL_ID: int = 0
OPEN_CALL_ID: int = (2**32) - 1
_MAX_CALL_ID: int = 1 << 14
@@ -82,11 +83,15 @@ class PendingRpcs:
def __init__(self) -> None:
self._pending: Dict[PendingRpc, _PendingRpcMetadata] = {}
- self._next_call_id: int = 0
+ # We skip call_id = 0 in order to avoid LEGACY_OPEN_CALL_ID.
+ self._next_call_id: int = 1
def allocate_call_id(self) -> int:
call_id = self._next_call_id
self._next_call_id = (self._next_call_id + 1) % _MAX_CALL_ID
+ # We skip call_id = 0 in order to avoid LEGACY_OPEN_CALL_ID.
+ if self._next_call_id == 0:
+ self._next_call_id = 1
return call_id
def request(
@@ -205,7 +210,7 @@ class PendingRpcs:
def get_pending(self, rpc: PendingRpc, status: Optional[Status]):
"""Gets the pending RPC's context. If status is set, clears the RPC."""
- if rpc.call_id == OPEN_CALL_ID:
+ if rpc.call_id == OPEN_CALL_ID or rpc.call_id == LEGACY_OPEN_CALL_ID:
# Calls with ID `OPEN_CALL_ID` were unrequested, and are updated to
# have the call ID of the first matching request.
for pending in self._pending:
diff --git a/pw_rpc/py/pw_rpc/descriptors.py b/pw_rpc/py/pw_rpc/descriptors.py
index 728061e88..9a1bb3cfc 100644
--- a/pw_rpc/py/pw_rpc/descriptors.py
+++ b/pw_rpc/py/pw_rpc/descriptors.py
@@ -194,9 +194,7 @@ _PROTO_FIELD_TYPES = {
def _field_type_annotation(field: FieldDescriptor):
"""Creates a field type annotation to use in the help message only."""
if field.type == FieldDescriptor.TYPE_MESSAGE:
- annotation = message_factory.MessageFactory(
- field.message_type.file.pool
- ).GetPrototype(field.message_type)
+ annotation = message_factory.GetMessageClass(field.message_type)
else:
annotation = _PROTO_FIELD_TYPES.get(field.type, Parameter.empty)
@@ -223,18 +221,6 @@ def field_help(proto_message, *, annotations: bool = False) -> Iterator[str]:
yield f'{field.name}={value}'
-def _message_is_type(proto, expected_type) -> bool:
- """Returns true if the protobuf instance is the expected type."""
- # Getting protobuf classes from google.protobuf.message_factory may create a
- # new, unique generated proto class. Any generated classes for a particular
- # proto message share the same MessageDescriptor instance and are
- # interchangeable, so check the descriptors in addition to the types.
- return isinstance(proto, expected_type) or (
- isinstance(proto, Message)
- and proto.DESCRIPTOR is expected_type.DESCRIPTOR
- )
-
-
@dataclass(frozen=True, eq=False)
class Method:
"""Describes a method in a service."""
@@ -248,20 +234,16 @@ class Method:
response_type: Any
@classmethod
- def from_descriptor(cls, descriptor: MethodDescriptor, service: Service):
- input_factory = message_factory.MessageFactory(
- descriptor.input_type.file.pool
- )
- output_factory = message_factory.MessageFactory(
- descriptor.output_type.file.pool
- )
+ def from_descriptor(
+ cls, descriptor: MethodDescriptor, service: Service
+ ) -> 'Method':
return Method(
descriptor,
service,
ids.calculate(descriptor.name),
*_streaming_attributes(descriptor),
- input_factory.GetPrototype(descriptor.input_type),
- output_factory.GetPrototype(descriptor.output_type),
+ message_factory.GetMessageClass(descriptor.input_type),
+ message_factory.GetMessageClass(descriptor.output_type),
)
class Type(enum.Enum):
@@ -323,7 +305,7 @@ class Method:
if proto is None:
return self.request_type(**proto_kwargs)
- if not _message_is_type(proto, self.request_type):
+ if not isinstance(proto, self.request_type):
try:
bad_type = proto.DESCRIPTOR.full_name
except AttributeError:
diff --git a/pw_rpc/py/tests/descriptors_test.py b/pw_rpc/py/tests/descriptors_test.py
index b328559a8..9b2352fe6 100644
--- a/pw_rpc/py/tests/descriptors_test.py
+++ b/pw_rpc/py/tests/descriptors_test.py
@@ -15,7 +15,7 @@
import unittest
-from google.protobuf.message_factory import MessageFactory
+from google.protobuf.message_factory import GetMessageClass
from pw_protobuf_compiler import python_protos
from pw_rpc import descriptors
@@ -80,15 +80,12 @@ class MethodTest(unittest.TestCase):
self._method.get_request(msg, {})
def test_get_request_with_different_copy_of_same_message_class(self):
- some_message_clone = MessageFactory(
- self._method.request_type.DESCRIPTOR.file.pool
- ).GetPrototype(self._method.request_type.DESCRIPTOR)
-
+ some_message_clone = GetMessageClass(
+ self._method.request_type.DESCRIPTOR
+ )
msg = some_message_clone()
- # Protobuf classes obtained with a MessageFactory may or may not be a
- # unique type, but will always use the same descriptor instance.
- self.assertIsInstance(msg, some_message_clone)
+ self.assertIsInstance(msg, self._method.request_type)
self.assertIs(msg.DESCRIPTOR, self._method.request_type.DESCRIPTOR)
result = self._method.get_request(msg, {})
diff --git a/pw_rpc/py/tests/ids_test.py b/pw_rpc/py/tests/ids_test.py
index fed5b787d..88a2668b5 100755
--- a/pw_rpc/py/tests/ids_test.py
+++ b/pw_rpc/py/tests/ids_test.py
@@ -48,7 +48,7 @@ IdsTest = _TESTS.python_tests('IdsTest', _define_py_test)
_CC_HEADER = """\
#include <string_view>
-#include "gtest/gtest.h"
+#include "pw_unit_test/framework.h"
#include "pw_rpc/internal/hash.h"
namespace pw::rpc::internal {
diff --git a/pw_rpc/raw/BUILD.bazel b/pw_rpc/raw/BUILD.bazel
index 7d9033296..bdb115484 100644
--- a/pw_rpc/raw/BUILD.bazel
+++ b/pw_rpc/raw/BUILD.bazel
@@ -14,7 +14,6 @@
load(
"//pw_build:pigweed.bzl",
- "pw_cc_library",
"pw_cc_test",
)
@@ -22,7 +21,7 @@ package(default_visibility = ["//visibility:public"])
licenses(["notice"])
-pw_cc_library(
+cc_library(
name = "server_api",
srcs = [
"method.cc",
@@ -41,7 +40,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_api",
hdrs = ["public/pw_rpc/raw/client_reader_writer.h"],
includes = ["public"],
@@ -52,7 +51,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "fake_channel_output",
hdrs = ["public/pw_rpc/raw/fake_channel_output.h"],
includes = ["public"],
@@ -63,7 +62,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "test_method_context",
hdrs = ["public/pw_rpc/raw/test_method_context.h"],
includes = ["public"],
@@ -78,7 +77,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "client_testing",
srcs = ["client_testing.cc"],
hdrs = ["public/pw_rpc/raw/client_testing.h"],
diff --git a/pw_rpc/raw/BUILD.gn b/pw_rpc/raw/BUILD.gn
index ca43d6f1c..4afcd48ae 100644
--- a/pw_rpc/raw/BUILD.gn
+++ b/pw_rpc/raw/BUILD.gn
@@ -100,6 +100,12 @@ pw_test("codegen_test") {
dir_pw_protobuf,
]
sources = [ "codegen_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("client_test") {
@@ -109,6 +115,12 @@ pw_test("client_test") {
"..:test_utils",
]
sources = [ "client_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("client_reader_writer_test") {
@@ -118,6 +130,12 @@ pw_test("client_reader_writer_test") {
"..:test_protos.raw_rpc",
]
sources = [ "client_reader_writer_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("method_test") {
@@ -131,6 +149,12 @@ pw_test("method_test") {
dir_pw_protobuf,
]
sources = [ "method_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("method_info_test") {
@@ -150,6 +174,12 @@ pw_test("method_union_test") {
dir_pw_protobuf,
]
sources = [ "method_union_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("server_reader_writer_test") {
@@ -159,6 +189,12 @@ pw_test("server_reader_writer_test") {
"..:test_protos.raw_rpc",
]
sources = [ "server_reader_writer_test.cc" ]
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_test("stub_generation_test") {
@@ -178,6 +214,12 @@ pw_test("synchronous_call_test") {
]
sources = [ "synchronous_call_test.cc" ]
enable_if = pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != ""
+
+ # TODO: https://pwbug.dev/325509758 - Doesn't work on the Pico yet; hangs
+ # indefinitely.
+ if (pw_build_EXECUTABLE_TARGET_TYPE == "pico_executable") {
+ enable_if = false
+ }
}
pw_cc_negative_compilation_test("service_nc_test") {
diff --git a/pw_rpc/raw/client_reader_writer_test.cc b/pw_rpc/raw/client_reader_writer_test.cc
index c8dd07aa2..24ba2c35a 100644
--- a/pw_rpc/raw/client_reader_writer_test.cc
+++ b/pw_rpc/raw/client_reader_writer_test.cc
@@ -16,10 +16,10 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/raw/client_testing.h"
#include "pw_rpc/writer.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
@@ -441,7 +441,7 @@ TEST(RawClientWriter, UsableAsWriter) {
RawClientWriter call = TestService::TestClientStreamRpc(
ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
- WriteAsWriter(call);
+ WriteAsWriter(call.as_writer());
EXPECT_STREQ(reinterpret_cast<const char*>(
ctx.output()
@@ -460,7 +460,7 @@ TEST(RawClientReaderWriter, UsableAsWriter) {
FailIfCalled,
FailIfCalled);
- WriteAsWriter(call);
+ WriteAsWriter(call.as_writer());
EXPECT_STREQ(reinterpret_cast<const char*>(
ctx.output()
diff --git a/pw_rpc/raw/client_test.cc b/pw_rpc/raw/client_test.cc
index b78fa2307..2606097ba 100644
--- a/pw_rpc/raw/client_test.cc
+++ b/pw_rpc/raw/client_test.cc
@@ -16,11 +16,11 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/client_call.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/raw/client_reader_writer.h"
#include "pw_rpc/raw/client_testing.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
@@ -157,12 +157,14 @@ TEST(Client, ProcessPacket_UnassignedChannelId_ReturnsDataLoss) {
auto call_cts = StartStreamCall<BidirectionalStreamMethod>(context);
std::byte encoded[64];
+ uint32_t arbitrary_call_id = 24602;
Result<span<const std::byte>> result =
internal::Packet(
internal::pwpb::PacketType::kResponse,
Channel::kUnassignedChannelId,
internal::MethodInfo<BidirectionalStreamMethod>::kServiceId,
- internal::MethodInfo<BidirectionalStreamMethod>::kMethodId)
+ internal::MethodInfo<BidirectionalStreamMethod>::kMethodId,
+ arbitrary_call_id)
.Encode(encoded);
ASSERT_TRUE(result.ok());
@@ -210,7 +212,7 @@ TEST(Client, ProcessPacket_ReturnsInvalidArgumentOnServerPacket) {
std::byte encoded[64];
Result<span<const std::byte>> result =
- internal::Packet(internal::pwpb::PacketType::REQUEST, 1, 2, 3)
+ internal::Packet(internal::pwpb::PacketType::REQUEST, 1, 2, 3, 4)
.Encode(encoded);
ASSERT_TRUE(result.ok());
diff --git a/pw_rpc/raw/codegen_test.cc b/pw_rpc/raw/codegen_test.cc
index 072f87139..1c8981b65 100644
--- a/pw_rpc/raw/codegen_test.cc
+++ b/pw_rpc/raw/codegen_test.cc
@@ -14,7 +14,6 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_protobuf/decoder.h"
#include "pw_rpc/internal/hash.h"
#include "pw_rpc/raw/client_testing.h"
@@ -22,6 +21,7 @@
#include "pw_rpc_test_protos/no_package.raw_rpc.pb.h"
#include "pw_rpc_test_protos/test.pwpb.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
namespace {
diff --git a/pw_rpc/raw/method_info_test.cc b/pw_rpc/raw/method_info_test.cc
index 80817de84..08c2dd527 100644
--- a/pw_rpc/raw/method_info_test.cc
+++ b/pw_rpc/raw/method_info_test.cc
@@ -14,9 +14,9 @@
#include "pw_rpc/internal/method_info.h"
-#include "gtest/gtest.h"
#include "pw_rpc/internal/method_info_tester.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/raw/method_test.cc b/pw_rpc/raw/method_test.cc
index f2facba54..239915116 100644
--- a/pw_rpc/raw/method_test.cc
+++ b/pw_rpc/raw/method_test.cc
@@ -16,7 +16,6 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_bytes/array.h"
#include "pw_containers/algorithm.h"
#include "pw_protobuf/decoder.h"
@@ -27,6 +26,7 @@
#include "pw_rpc/raw/internal/method_union.h"
#include "pw_rpc/service.h"
#include "pw_rpc_test_protos/test.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/raw/method_union_test.cc b/pw_rpc/raw/method_union_test.cc
index 12428dcbb..c47b89dbc 100644
--- a/pw_rpc/raw/method_union_test.cc
+++ b/pw_rpc/raw/method_union_test.cc
@@ -16,13 +16,13 @@
#include <array>
-#include "gtest/gtest.h"
#include "pw_bytes/array.h"
#include "pw_protobuf/decoder.h"
#include "pw_protobuf/encoder.h"
#include "pw_rpc/internal/test_utils.h"
#include "pw_rpc/service.h"
#include "pw_rpc_test_protos/test.pwpb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::internal {
namespace {
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
index e4ae19f3f..80ca890b5 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
@@ -63,8 +63,7 @@ class RawClientReaderWriter : private internal::StreamResponseClientCall {
using internal::ClientCall::Abandon;
// Allow use as a generic RPC Writer.
- using internal::Call::operator Writer&;
- using internal::Call::operator const Writer&;
+ using internal::Call::as_writer;
private:
friend class internal::StreamResponseClientCall;
@@ -140,8 +139,7 @@ class RawClientWriter : private internal::UnaryResponseClientCall {
using internal::ClientCall::Abandon;
// Allow use as a generic RPC Writer.
- using internal::Call::operator Writer&;
- using internal::Call::operator const Writer&;
+ using internal::Call::as_writer;
private:
friend class internal::UnaryResponseClientCall;
diff --git a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
index 0d02d4a1c..81af41305 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
@@ -42,7 +42,7 @@ class RawMethod : public Method {
"Use an asynchronous unary method instead: "
"void MethodName(pw::ConstByteSpan request, "
"pw::rpc::RawUnaryResponder& responder)");
- return {id, InvalidInvoker, {}};
+ return {id, InvalidInvoker, MethodType::kUnary, {}};
}
template <auto kMethod>
@@ -51,8 +51,10 @@ class RawMethod : public Method {
[](Service& service, ConstByteSpan req, RawUnaryResponder& responder) {
return CallMethodImplFunction<kMethod>(service, req, responder);
};
- return RawMethod(
- id, AsynchronousUnaryInvoker, Function{.asynchronous_unary = wrapper});
+ return RawMethod(id,
+ AsynchronousUnaryInvoker,
+ MethodType::kUnary,
+ Function{.asynchronous_unary = wrapper});
}
template <auto kMethod>
@@ -61,8 +63,10 @@ class RawMethod : public Method {
[](Service& service, ConstByteSpan request, RawServerWriter& writer) {
return CallMethodImplFunction<kMethod>(service, request, writer);
};
- return RawMethod(
- id, ServerStreamingInvoker, Function{.server_streaming = wrapper});
+ return RawMethod(id,
+ ServerStreamingInvoker,
+ MethodType::kServerStreaming,
+ Function{.server_streaming = wrapper});
}
template <auto kMethod>
@@ -72,8 +76,10 @@ class RawMethod : public Method {
return CallMethodImplFunction<kMethod>(
service, static_cast<RawServerReader&>(reader));
};
- return RawMethod(
- id, ClientStreamingInvoker, Function{.stream_request = wrapper});
+ return RawMethod(id,
+ ClientStreamingInvoker,
+ MethodType::kClientStreaming,
+ Function{.stream_request = wrapper});
}
template <auto kMethod>
@@ -82,12 +88,16 @@ class RawMethod : public Method {
[](Service& service, RawServerReaderWriter& reader_writer) {
return CallMethodImplFunction<kMethod>(service, reader_writer);
};
- return RawMethod(
- id, BidirectionalStreamingInvoker, Function{.stream_request = wrapper});
+ return RawMethod(id,
+ BidirectionalStreamingInvoker,
+ MethodType::kBidirectionalStreaming,
+ Function{.stream_request = wrapper});
}
// Represents an invalid method. Used to reduce error message verbosity.
- static constexpr RawMethod Invalid() { return {0, InvalidInvoker, {}}; }
+ static constexpr RawMethod Invalid() {
+ return {0, InvalidInvoker, MethodType::kUnary, {}};
+ }
private:
// Wraps the user-defined functions.
@@ -112,8 +122,11 @@ class RawMethod : public Method {
StreamRequestFunction stream_request;
};
- constexpr RawMethod(uint32_t id, Invoker invoker, Function function)
- : Method(id, invoker), function_(function) {}
+ constexpr RawMethod(uint32_t id,
+ Invoker invoker,
+ MethodType type,
+ Function function)
+ : Method(id, invoker, type), function_(function) {}
static void AsynchronousUnaryInvoker(const CallContext& context,
const Packet& request)
diff --git a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
index 91a85e552..73c4eaa38 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
@@ -86,8 +86,7 @@ class RawServerReaderWriter : private internal::ServerCall {
}
// Allow use as a generic RPC Writer.
- using internal::Call::operator Writer&;
- using internal::Call::operator const Writer&;
+ using internal::Call::as_writer;
protected:
RawServerReaderWriter(const internal::LockedCallContext& context,
@@ -198,8 +197,7 @@ class RawServerWriter : private RawServerReaderWriter {
using RawServerReaderWriter::Write;
// Allow use as a generic RPC Writer.
- using internal::Call::operator Writer&;
- using internal::Call::operator const Writer&;
+ using internal::Call::as_writer;
private:
friend class internal::RawMethod;
diff --git a/pw_rpc/raw/server_reader_writer_test.cc b/pw_rpc/raw/server_reader_writer_test.cc
index cadc3d81f..7b58b976a 100644
--- a/pw_rpc/raw/server_reader_writer_test.cc
+++ b/pw_rpc/raw/server_reader_writer_test.cc
@@ -16,12 +16,12 @@
#include <optional>
-#include "gtest/gtest.h"
#include "pw_rpc/internal/lock.h"
#include "pw_rpc/raw/fake_channel_output.h"
#include "pw_rpc/service.h"
#include "pw_rpc/writer.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
@@ -407,7 +407,7 @@ TEST(RawServerWriter, UsableAsWriter) {
RawServerWriter::Open<TestService::TestServerStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
- WriteAsWriter(call);
+ WriteAsWriter(call.as_writer());
EXPECT_STREQ(reinterpret_cast<const char*>(
ctx.output.payloads<TestService::TestServerStreamRpc>()
@@ -422,7 +422,7 @@ TEST(RawServerReaderWriter, UsableAsWriter) {
RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
- WriteAsWriter(call);
+ WriteAsWriter(call.as_writer());
EXPECT_STREQ(
reinterpret_cast<const char*>(
diff --git a/pw_rpc/raw/stub_generation_test.cc b/pw_rpc/raw/stub_generation_test.cc
index dc67a02ac..61732f414 100644
--- a/pw_rpc/raw/stub_generation_test.cc
+++ b/pw_rpc/raw/stub_generation_test.cc
@@ -16,8 +16,8 @@
// so that the generated stubs can be tested.
#define _PW_RPC_COMPILE_GENERATED_SERVICE_STUBS
-#include "gtest/gtest.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+#include "pw_unit_test/framework.h"
namespace {
diff --git a/pw_rpc/raw/synchronous_call_test.cc b/pw_rpc/raw/synchronous_call_test.cc
index 8e00631a2..8b8f8a666 100644
--- a/pw_rpc/raw/synchronous_call_test.cc
+++ b/pw_rpc/raw/synchronous_call_test.cc
@@ -17,7 +17,6 @@
#include <chrono>
#include <string_view>
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_chrono/system_clock.h"
#include "pw_rpc/channel.h"
@@ -27,6 +26,7 @@
#include "pw_status/status.h"
#include "pw_status/status_with_size.h"
#include "pw_thread/thread.h"
+#include "pw_unit_test/framework.h"
#include "pw_work_queue/test_thread.h"
#include "pw_work_queue/work_queue.h"
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index 8822a0fd0..1ec470dcd 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -54,7 +54,7 @@ Status Server::ProcessPacket(ConstByteSpan packet_data) {
return Status::Unavailable();
}
- const auto [service, method] = FindMethod(packet);
+ const auto [service, method] = FindMethodLocked(packet);
if (method == nullptr) {
// Don't send responses to errors to avoid infinite error cycles.
@@ -109,17 +109,22 @@ Status Server::ProcessPacket(ConstByteSpan packet_data) {
}
std::tuple<Service*, const internal::Method*> Server::FindMethod(
- const internal::Packet& packet) {
- // Packets always include service and method IDs.
+ uint32_t service_id, uint32_t method_id) {
+ internal::RpcLockGuard lock;
+ return FindMethodLocked(service_id, method_id);
+}
+
+std::tuple<Service*, const internal::Method*> Server::FindMethodLocked(
+ uint32_t service_id, uint32_t method_id) {
auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) {
- return internal::UnwrapServiceId(s.service_id()) == packet.service_id();
+ return internal::UnwrapServiceId(s.service_id()) == service_id;
});
if (service == services_.end()) {
return {};
}
- return {&(*service), service->FindMethod(packet.method_id())};
+ return {&(*service), service->FindMethod(method_id)};
}
void Server::HandleCompletionRequest(
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index bbdb62421..48f264607 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -17,7 +17,6 @@
#include <array>
#include <cstdint>
-#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_rpc/internal/call.h"
#include "pw_rpc/internal/method.h"
@@ -26,8 +25,18 @@
#include "pw_rpc/service.h"
#include "pw_rpc_private/fake_server_reader_writer.h"
#include "pw_rpc_private/test_method.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
+
+class ServerTestHelper {
+ public:
+ static std::tuple<Service*, const internal::Method*> FindMethod(
+ Server& server, uint32_t service_id, uint32_t method_id) {
+ return server.FindMethod(service_id, method_id);
+ }
+};
+
namespace {
using std::byte;
@@ -396,6 +405,31 @@ TEST_F(BasicServer, OpenChannel_AdditionalSlot) {
EXPECT_EQ(kExpected, server_.OpenChannel(19823, output_));
}
+TEST_F(BasicServer, FindMethod_FoundOkOptionallyCheckType) {
+ const auto [service, method] = ServerTestHelper::FindMethod(server_, 1, 100);
+ ASSERT_TRUE(service != nullptr);
+ ASSERT_TRUE(method != nullptr);
+#if PW_RPC_METHOD_STORES_TYPE
+ EXPECT_EQ(MethodType::kBidirectionalStreaming, method->type());
+#endif
+}
+
+TEST_F(BasicServer, FindMethod_NotFound) {
+ {
+ const auto [service, method] =
+ ServerTestHelper::FindMethod(server_, 2, 100);
+ ASSERT_TRUE(service == nullptr);
+ ASSERT_TRUE(method == nullptr);
+ }
+
+ {
+ const auto [service, method] =
+ ServerTestHelper::FindMethod(server_, 1, 101);
+ ASSERT_TRUE(service != nullptr);
+ ASSERT_TRUE(method == nullptr);
+ }
+}
+
class BidiMethod : public BasicServer {
protected:
BidiMethod() {
@@ -577,6 +611,20 @@ TEST_F(BidiMethod, ClientStream_CallsCallbackOnCallWithOpenId) {
EXPECT_STREQ(span_as_cstr(data), "hello");
}
+TEST_F(BidiMethod, ClientStream_CallsCallbackOnCallWithLegacyOpenId) {
+ ConstByteSpan data = as_bytes(span("?"));
+ responder_.set_on_next([&data](ConstByteSpan payload) { data = payload; });
+
+ ASSERT_EQ(OkStatus(),
+ server_.ProcessPacket(PacketForRpc(PacketType::CLIENT_STREAM,
+ {},
+ "hello",
+ internal::kLegacyOpenCallId)));
+
+ EXPECT_EQ(output_.total_packets(), 0u);
+ EXPECT_STREQ(span_as_cstr(data), "hello");
+}
+
TEST_F(BidiMethod, ClientStream_CallsOpenIdOnCallWithDifferentId) {
const uint32_t kSecondCallId = 1625;
internal::CallContext context(server_,
@@ -604,6 +652,33 @@ TEST_F(BidiMethod, ClientStream_CallsOpenIdOnCallWithDifferentId) {
EXPECT_EQ(responder_.as_server_call().id(), kSecondCallId);
}
+TEST_F(BidiMethod, ClientStream_CallsLegacyOpenIdOnCallWithDifferentId) {
+ const uint32_t kSecondCallId = 1625;
+ internal::CallContext context(server_,
+ channels_[0].id(),
+ service_42_,
+ service_42_.method(100),
+ internal::kLegacyOpenCallId);
+ internal::rpc_lock().lock();
+ auto temp_responder =
+ internal::test::FakeServerReaderWriter(context.ClaimLocked());
+ internal::rpc_lock().unlock();
+ responder_ = std::move(temp_responder);
+
+ ConstByteSpan data = as_bytes(span("?"));
+ responder_.set_on_next([&data](ConstByteSpan payload) { data = payload; });
+
+ ASSERT_EQ(OkStatus(),
+ server_.ProcessPacket(PacketForRpc(
+ PacketType::CLIENT_STREAM, {}, "hello", kSecondCallId)));
+
+ EXPECT_EQ(output_.total_packets(), 0u);
+ EXPECT_STREQ(span_as_cstr(data), "hello");
+
+ internal::RpcLockGuard lock;
+ EXPECT_EQ(responder_.as_server_call().id(), kSecondCallId);
+}
+
TEST_F(BidiMethod, UnregsiterService_AbortsActiveCalls) {
ASSERT_TRUE(responder_.active());
diff --git a/pw_rpc/service_test.cc b/pw_rpc/service_test.cc
index 174de725d..763fa1e2e 100644
--- a/pw_rpc/service_test.cc
+++ b/pw_rpc/service_test.cc
@@ -14,8 +14,8 @@
#include "pw_rpc/service.h"
-#include "gtest/gtest.h"
#include "pw_rpc/internal/method.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc {
@@ -33,7 +33,7 @@ void InvokeIt(const internal::CallContext&, const internal::Packet&) {}
class ServiceTestMethod : public internal::Method {
public:
constexpr ServiceTestMethod(uint32_t id, char the_value)
- : internal::Method(id, InvokeIt), value(the_value) {}
+ : internal::Method(id, InvokeIt, MethodType::kUnary), value(the_value) {}
char value; // Add a member so the class is larger than the base Method.
};
diff --git a/pw_rpc/system_server/BUILD.bazel b/pw_rpc/system_server/BUILD.bazel
index eca9534d0..c7996dfa7 100644
--- a/pw_rpc/system_server/BUILD.bazel
+++ b/pw_rpc/system_server/BUILD.bazel
@@ -12,16 +12,11 @@
# License for the specific language governing permissions and limitations under
# the License.
-load(
- "//pw_build:pigweed.bzl",
- "pw_cc_library",
-)
-
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
-pw_cc_library(
+cc_library(
name = "facade",
hdrs = ["public/pw_rpc_system_server/rpc_server.h"],
includes = ["public"],
@@ -32,7 +27,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "system_server",
hdrs = [
"public/pw_rpc_system_server/rpc_server.h",
@@ -48,7 +43,7 @@ pw_cc_library(
],
)
-pw_cc_library(
+cc_library(
name = "system_server_backend_multiplexer",
visibility = ["@pigweed//targets:__pkg__"],
deps = select({
diff --git a/pw_rpc/system_server/public/pw_rpc_system_server/socket.h b/pw_rpc/system_server/public/pw_rpc_system_server/socket.h
index dc34e9335..96b79ada5 100644
--- a/pw_rpc/system_server/public/pw_rpc_system_server/socket.h
+++ b/pw_rpc/system_server/public/pw_rpc_system_server/socket.h
@@ -20,8 +20,10 @@ namespace pw::rpc::system_server {
// Sets the port to use for pw::rpc::system_server backends that use sockets.
void set_socket_port(uint16_t port);
-// The file descriptor for the socket associated with the server. This may be
-// used to configure socket options.
-int GetServerSocketFd();
+// Configure options for the socket associated with the server.
+int SetServerSockOpt(int level,
+ int optname,
+ const void* optval,
+ unsigned int optlen);
} // namespace pw::rpc::system_server
diff --git a/pw_rpc/test_helpers_test.cc b/pw_rpc/test_helpers_test.cc
index 7b4b6e2a9..cc0860421 100644
--- a/pw_rpc/test_helpers_test.cc
+++ b/pw_rpc/test_helpers_test.cc
@@ -16,7 +16,6 @@
#include <mutex>
-#include "gtest/gtest.h"
#include "pw_chrono/system_clock.h"
#include "pw_containers/vector.h"
#include "pw_result/result.h"
@@ -28,6 +27,7 @@
#include "pw_sync/interrupt_spin_lock.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/timed_thread_notification.h"
+#include "pw_unit_test/framework.h"
namespace pw::rpc::test {
namespace {
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index e418760c3..438fc6265 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -55,7 +55,8 @@ export class Call {
protected responses: Message[] = [];
private rpcs: PendingCalls;
- private rpc: Rpc;
+ rpc: Rpc;
+ readonly callId: number;
private onNext: Callback;
private onCompleted: Callback;
@@ -78,6 +79,7 @@ export class Call {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.onError = onError;
+ this.callId = rpcs.allocateCallId();
}
/* Calls the RPC. This must be called immediately after construction. */
@@ -201,7 +203,7 @@ export class Call {
}
this.error = Status.CANCELLED;
- return this.rpcs.sendCancel(this.rpc);
+ return this.rpcs.sendCancel(this.rpc, this.callId);
}
private checkErrors(): void {
@@ -241,7 +243,7 @@ export class Call {
if (this.status !== undefined) {
throw new RpcError(this.rpc, Status.FAILED_PRECONDITION);
}
- this.rpcs.sendClientStream(this.rpc, request);
+ this.rpcs.sendClientStream(this.rpc, request, this.callId);
}
protected finishClientStream(requests: Message[]) {
@@ -250,7 +252,7 @@ export class Call {
}
if (!this.completed) {
- this.rpcs.sendClientStreamEnd(this.rpc);
+ this.rpcs.sendClientStreamEnd(this.rpc, this.callId);
}
}
}
diff --git a/pw_rpc/ts/call_test.ts b/pw_rpc/ts/call_test.ts
index 98590c88a..d4a6d768c 100644
--- a/pw_rpc/ts/call_test.ts
+++ b/pw_rpc/ts/call_test.ts
@@ -25,8 +25,13 @@ class FakeRpc {
readonly service: any = undefined;
readonly method: any = undefined;
- idSet: [number, number, number] = [1, 2, 3];
- idString = '1.2.3';
+ getIdSet(callId: number): [number, number, number, number] {
+ return [1, 2, 3, callId];
+ }
+
+ getIdString(callId: number): string {
+ return '1.2.3.' + callId;
+ }
}
describe('Call', () => {
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
index e99554877..d61d7a237 100644
--- a/pw_rpc/ts/client.ts
+++ b/pw_rpc/ts/client.ts
@@ -281,7 +281,7 @@ export class Client {
status = Status.DATA_LOSS;
}
- const call = this.rpcs.getPending(rpc, status);
+ const call = this.rpcs.getPending(rpc, packet.getCallId(), status);
if (call === undefined) {
this.sendClientError(channelClient, packet, Status.FAILED_PRECONDITION);
console.debug(`Discarding response for ${rpc}, which is not pending`);
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index 0535fddb9..fbd21c7da 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -34,7 +34,8 @@ import {
} from './method';
import * as packets from './packets';
-const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
+const LEGACY_OPEN_CALL_ID = 0;
+const OPEN_CALL_ID = 2 ** 32 - 1;
describe('Client', () => {
let protoCollection: ProtoCollection;
@@ -95,18 +96,19 @@ describe('Client', () => {
});
it('processPacket for unrecognized channel', () => {
- const packet = packets.encodeResponse([123, 456, 789], new Request());
+ const packet = packets.encodeResponse([123, 456, 789, 456], new Request());
expect(client.processPacket(packet)).toEqual(Status.NOT_FOUND);
});
it('processPacket for unrecognized service', () => {
- const packet = packets.encodeResponse([1, 456, 789], new Request());
+ const packet = packets.encodeResponse([1, 456, 789, 456], new Request());
const status = client.processPacket(packet);
expect(client.processPacket(packet)).toEqual(Status.OK);
expect(lastPacketSent.getChannelId()).toEqual(1);
expect(lastPacketSent.getServiceId()).toEqual(456);
expect(lastPacketSent.getMethodId()).toEqual(789);
+ expect(lastPacketSent.getCallId()).toEqual(456);
expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
});
@@ -114,13 +116,17 @@ describe('Client', () => {
it('processPacket for unrecognized method', () => {
const service = client.services.values().next().value;
- const packet = packets.encodeResponse([1, service.id, 789], new Request());
+ const packet = packets.encodeResponse(
+ [1, service.id, 789, 456],
+ new Request(),
+ );
const status = client.processPacket(packet);
expect(client.processPacket(packet)).toEqual(Status.OK);
expect(lastPacketSent.getChannelId()).toEqual(1);
expect(lastPacketSent.getServiceId()).toEqual(service.id);
expect(lastPacketSent.getMethodId()).toEqual(789);
+ expect(lastPacketSent.getCallId()).toEqual(456);
expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
});
@@ -130,7 +136,7 @@ describe('Client', () => {
const method = service.methods.values().next().value;
const packet = packets.encodeResponse(
- [1, service.id, method.id],
+ [1, service.id, method.id, 456],
new Request(),
);
const status = client.processPacket(packet);
@@ -187,6 +193,7 @@ describe('RPC', () => {
channelId: number,
method: Method,
status: Status,
+ callId: number,
response?: Message,
) {
const packet = new RpcPacket();
@@ -194,6 +201,7 @@ describe('RPC', () => {
packet.setChannelId(channelId);
packet.setServiceId(method.service.id);
packet.setMethodId(method.id);
+ packet.setCallId(callId);
packet.setStatus(status);
if (response === undefined) {
packet.setPayload(new Uint8Array(0));
@@ -207,6 +215,7 @@ describe('RPC', () => {
channelId: number,
method: Method,
response: Message,
+ callId: number,
status: Status = Status.OK,
) {
const packet = new RpcPacket();
@@ -214,6 +223,7 @@ describe('RPC', () => {
packet.setChannelId(channelId);
packet.setServiceId(method.service.id);
packet.setMethodId(method.id);
+ packet.setCallId(callId);
packet.setPayload(response.serializeBinary());
packet.setStatus(status);
nextPackets.push([packet.serializeBinary(), status]);
@@ -224,12 +234,14 @@ describe('RPC', () => {
method: Method,
status: Status,
processStatus: Status,
+ callId: number,
) {
const packet = new RpcPacket();
packet.setType(PacketType.SERVER_ERROR);
packet.setChannelId(channelId);
packet.setServiceId(method.service.id);
packet.setMethodId(method.id);
+ packet.setCallId(callId);
packet.setStatus(status);
nextPackets.push([packet.serializeBinary(), processStatus]);
@@ -263,6 +275,7 @@ describe('RPC', () => {
function processEnqueuedPackets(): void {
// Avoid infinite recursion when processing a packet causes another packet
// to send.
+ if (responseLock) return;
responseLock = true;
for (const [packet, status] of nextPackets) {
expect(client.processPacket(packet)).toEqual(status);
@@ -282,12 +295,37 @@ describe('RPC', () => {
) as UnaryMethodStub;
});
+ const openCallIds = [
+ ['OPEN_CALL_ID', OPEN_CALL_ID],
+ ['LEGACY_OPEN_CALL_ID', LEGACY_OPEN_CALL_ID],
+ ];
+ openCallIds.forEach(([idName, callId]) => {
+ it(`matches responses with ${idName} to requests with arbitrary IDs`, async () => {
+ const promisedResponse = unaryStub.call(newRequest(6));
+ enqueueResponse(
+ 1,
+ unaryStub.method,
+ Status.ABORTED,
+ OPEN_CALL_ID,
+ newResponse('is unrequested'),
+ );
+
+ processEnqueuedPackets();
+ const [status, response] = await promisedResponse;
+
+ expect(sentPayload(Request).getMagicNumber()).toEqual(6);
+ expect(status).toEqual(Status.ABORTED);
+ expect(response).toEqual(newResponse('is unrequested'));
+ });
+ });
+
it('blocking call', async () => {
for (let i = 0; i < 3; i++) {
enqueueResponse(
1,
unaryStub.method,
Status.ABORTED,
+ unaryStub.rpcs.nextCallId,
newResponse('0_o'),
);
const [status, response] = await unaryStub.call(newRequest(6));
@@ -301,7 +339,13 @@ describe('RPC', () => {
it('nonblocking call', () => {
for (let i = 0; i < 3; i++) {
const response = newResponse('hello world');
- enqueueResponse(1, unaryStub.method, Status.ABORTED, response);
+ enqueueResponse(
+ 1,
+ unaryStub.method,
+ Status.ABORTED,
+ unaryStub.rpcs.nextCallId,
+ response,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -325,7 +369,13 @@ describe('RPC', () => {
for (let i = 0; i < 3; i++) {
const response = newResponse('hello world');
- enqueueResponse(1, unaryStub.method, Status.ABORTED, response);
+ enqueueResponse(
+ 1,
+ unaryStub.method,
+ Status.ABORTED,
+ unaryStub.rpcs.nextCallId,
+ response,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -341,9 +391,55 @@ describe('RPC', () => {
}
});
+ it('nonblocking concurrent call', () => {
+ // Start several calls to the same method
+ const callsAndCallbacks = [];
+ for (let i = 0; i < 3; i++) {
+ const onNext = jest.fn();
+ const onCompleted = jest.fn();
+ const onError = jest.fn();
+
+ const call = unaryStub.invoke(
+ newRequest(5),
+ onNext,
+ onCompleted,
+ onError,
+ );
+ callsAndCallbacks.push([call, onNext, onCompleted, onError]);
+
+ expect(sentPayload(Request).getMagicNumber()).toEqual(5);
+ }
+ // Respond only to the last call
+ const [lastCall, lastCallback] = callsAndCallbacks.pop();
+ const lastResponse = newResponse('last payload');
+
+ enqueueResponse(
+ 1,
+ unaryStub.method,
+ Status.OK,
+ lastCall.callId,
+ lastResponse,
+ );
+ processEnqueuedPackets();
+
+ expect(lastCallback).toHaveBeenCalledWith(lastResponse);
+ for (const i in callsAndCallbacks) {
+ const [_call, onNext, onCompleted, onError] = callsAndCallbacks[i];
+ expect(onNext).toBeCalledTimes(0);
+ expect(onCompleted).toBeCalledTimes(0);
+ expect(onError).toBeCalledTimes(0);
+ }
+ });
+
it('blocking server error', async () => {
for (let i = 0; i < 3; i++) {
- enqueueError(1, unaryStub.method, Status.NOT_FOUND, Status.OK);
+ enqueueError(
+ 1,
+ unaryStub.method,
+ Status.NOT_FOUND,
+ Status.OK,
+ unaryStub.rpcs.nextCallId,
+ );
try {
await unaryStub.call(newRequest());
@@ -380,21 +476,17 @@ describe('RPC', () => {
}
});
- it('nonblocking duplicate calls first is cancelled', () => {
- const firstCall = unaryStub.invoke(newRequest());
- expect(firstCall.completed).toBe(false);
-
- const secondCall = unaryStub.invoke(newRequest());
- expect(firstCall.error).toEqual(Status.CANCELLED);
- expect(secondCall.completed).toBe(false);
- });
-
it('nonblocking exception in callback', () => {
const errorCallback = () => {
throw Error('Something went wrong!');
};
- enqueueResponse(1, unaryStub.method, Status.OK);
+ enqueueResponse(
+ 1,
+ unaryStub.method,
+ Status.OK,
+ unaryStub.rpcs.nextCallId,
+ );
const call = unaryStub.invoke(newRequest(), errorCallback);
expect(call.callbackException!.name).toEqual('Error');
expect(call.callbackException!.message).toEqual('Something went wrong!');
@@ -417,9 +509,24 @@ describe('RPC', () => {
const response2 = newResponse('?');
for (let i = 0; i < 3; i++) {
- enqueueServerStream(1, serverStreaming.method, response1);
- enqueueServerStream(1, serverStreaming.method, response2);
- enqueueResponse(1, serverStreaming.method, Status.ABORTED);
+ enqueueServerStream(
+ 1,
+ serverStreaming.method,
+ response1,
+ serverStreaming.rpcs.nextCallId,
+ );
+ enqueueServerStream(
+ 1,
+ serverStreaming.method,
+ response2,
+ serverStreaming.rpcs.nextCallId,
+ );
+ enqueueResponse(
+ 1,
+ serverStreaming.method,
+ Status.ABORTED,
+ serverStreaming.rpcs.nextCallId,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -443,9 +550,24 @@ describe('RPC', () => {
const response2 = newResponse('?');
for (let i = 0; i < 3; i++) {
- enqueueServerStream(1, serverStreaming.method, response1);
- enqueueServerStream(1, serverStreaming.method, response2);
- enqueueResponse(1, serverStreaming.method, Status.ABORTED);
+ enqueueServerStream(
+ 1,
+ serverStreaming.method,
+ response1,
+ serverStreaming.rpcs.nextCallId,
+ );
+ enqueueServerStream(
+ 1,
+ serverStreaming.method,
+ response2,
+ serverStreaming.rpcs.nextCallId,
+ );
+ enqueueResponse(
+ 1,
+ serverStreaming.method,
+ Status.ABORTED,
+ serverStreaming.rpcs.nextCallId,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -478,7 +600,12 @@ describe('RPC', () => {
it('non-blocking cancel', () => {
const testResponse = newResponse('!!!');
- enqueueServerStream(1, serverStreaming.method, testResponse);
+ enqueueServerStream(
+ 1,
+ serverStreaming.method,
+ testResponse,
+ serverStreaming.rpcs.nextCallId,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -493,8 +620,18 @@ describe('RPC', () => {
expect(lastRequest().getStatus()).toEqual(Status.CANCELLED);
// Ensure the RPC can be called after being cancelled.
- enqueueServerStream(1, serverStreaming.method, testResponse);
- enqueueResponse(1, serverStreaming.method, Status.OK);
+ enqueueServerStream(
+ 1,
+ serverStreaming.method,
+ testResponse,
+ serverStreaming.rpcs.nextCallId,
+ );
+ enqueueResponse(
+ 1,
+ serverStreaming.method,
+ Status.OK,
+ serverStreaming.rpcs.nextCallId,
+ );
call = serverStreaming.invoke(newRequest(), onNext, onCompleted, onError);
expect(onNext).toHaveBeenNthCalledWith(2, testResponse);
expect(onError).not.toHaveBeenCalled();
@@ -527,7 +664,13 @@ describe('RPC', () => {
expect(stream.completed).toBe(false);
// Enqueue the server response to be sent after the next message.
- enqueueResponse(1, clientStreaming.method, Status.OK, testResponse);
+ enqueueResponse(
+ 1,
+ clientStreaming.method,
+ Status.OK,
+ stream.callId,
+ testResponse,
+ );
stream.send(newRequest(32));
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
@@ -545,7 +688,13 @@ describe('RPC', () => {
const response = newResponse('!!!');
for (let i = 0; i < 3; i++) {
- enqueueResponse(1, clientStreaming.method, Status.OK, response);
+ enqueueResponse(
+ 1,
+ clientStreaming.method,
+ Status.OK,
+ clientStreaming.rpcs.nextCallId,
+ response,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -583,7 +732,13 @@ describe('RPC', () => {
expect(stream.completed).toBe(false);
// Enqueue the server response to be sent after the next message.
- enqueueResponse(1, clientStreaming.method, Status.OK, testResponse);
+ enqueueResponse(
+ 1,
+ clientStreaming.method,
+ Status.OK,
+ stream.callId,
+ testResponse,
+ );
stream.finishAndWait();
expect(lastRequest().getType()).toEqual(
@@ -619,6 +774,7 @@ describe('RPC', () => {
clientStreaming.method,
Status.INVALID_ARGUMENT,
Status.OK,
+ stream.callId,
);
stream.send(newRequest());
@@ -643,6 +799,7 @@ describe('RPC', () => {
clientStreaming.method,
Status.INVALID_ARGUMENT,
Status.OK,
+ stream.callId,
);
await stream
@@ -678,6 +835,7 @@ describe('RPC', () => {
1,
clientStreaming.method,
Status.UNAVAILABLE,
+ clientStreaming.rpcs.nextCallId,
enqueuedResponse,
);
@@ -690,7 +848,13 @@ describe('RPC', () => {
});
it('non-blocking finish after error', async () => {
- enqueueError(1, clientStreaming.method, Status.UNAVAILABLE, Status.OK);
+ enqueueError(
+ 1,
+ clientStreaming.method,
+ Status.UNAVAILABLE,
+ Status.OK,
+ clientStreaming.rpcs.nextCallId,
+ );
const stream = clientStreaming.invoke();
for (let i = 0; i < 3; i++) {
@@ -706,15 +870,6 @@ describe('RPC', () => {
});
}
});
-
- it('non-blocking duplicate calls first is cancelled', () => {
- const firstCall = clientStreaming.invoke();
- expect(firstCall.completed).toBe(false);
-
- const secondCall = clientStreaming.invoke();
- expect(firstCall.error).toEqual(Status.CANCELLED);
- expect(secondCall.completed).toBe(false);
- });
});
describe('BidirectionalStreaming', () => {
@@ -732,7 +887,12 @@ describe('RPC', () => {
const testRequests = [newRequest(123), newRequest(456)];
sendResponsesAfterPackets = 3;
- enqueueResponse(1, bidiStreaming.method, Status.NOT_FOUND);
+ enqueueResponse(
+ 1,
+ bidiStreaming.method,
+ Status.NOT_FOUND,
+ bidiStreaming.rpcs.nextCallId,
+ );
const results = await bidiStreaming.call(testRequests);
expect(results[0]).toEqual(Status.NOT_FOUND);
@@ -741,7 +901,13 @@ describe('RPC', () => {
it('blocking server error', async () => {
const testRequests = [newRequest(123)];
- enqueueError(1, bidiStreaming.method, Status.NOT_FOUND, Status.OK);
+ enqueueError(
+ 1,
+ bidiStreaming.method,
+ Status.NOT_FOUND,
+ Status.OK,
+ bidiStreaming.rpcs.nextCallId,
+ );
await bidiStreaming
.call(testRequests)
@@ -770,8 +936,8 @@ describe('RPC', () => {
expect(stream.completed).toBe(false);
expect(testResponses).toEqual([]);
- enqueueServerStream(1, bidiStreaming.method, rep1);
- enqueueServerStream(1, bidiStreaming.method, rep2);
+ enqueueServerStream(1, bidiStreaming.method, rep1, stream.callId);
+ enqueueServerStream(1, bidiStreaming.method, rep2, stream.callId);
stream.send(newRequest(66));
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
@@ -779,7 +945,7 @@ describe('RPC', () => {
expect(stream.completed).toBe(false);
expect(testResponses).toEqual([rep1, rep2]);
- enqueueResponse(1, bidiStreaming.method, Status.OK);
+ enqueueResponse(1, bidiStreaming.method, Status.OK, stream.callId);
stream.send(newRequest(77));
expect(stream.completed).toBe(true);
@@ -795,9 +961,24 @@ describe('RPC', () => {
const response2 = newResponse('?');
for (let i = 0; i < 3; i++) {
- enqueueServerStream(1, bidiStreaming.method, response1);
- enqueueServerStream(1, bidiStreaming.method, response2);
- enqueueResponse(1, bidiStreaming.method, Status.OK);
+ enqueueServerStream(
+ 1,
+ bidiStreaming.method,
+ response1,
+ bidiStreaming.rpcs.nextCallId,
+ );
+ enqueueServerStream(
+ 1,
+ bidiStreaming.method,
+ response2,
+ bidiStreaming.rpcs.nextCallId,
+ );
+ enqueueResponse(
+ 1,
+ bidiStreaming.method,
+ Status.OK,
+ bidiStreaming.rpcs.nextCallId,
+ );
const onNext = jest.fn();
const onCompleted = jest.fn();
@@ -833,13 +1014,19 @@ describe('RPC', () => {
});
expect(stream.completed).toBe(false);
- enqueueServerStream(1, bidiStreaming.method, response);
+ enqueueServerStream(1, bidiStreaming.method, response, stream.callId);
stream.send(newRequest(55));
expect(stream.completed).toBe(false);
expect(testResponses).toEqual([response]);
- enqueueError(1, bidiStreaming.method, Status.OUT_OF_RANGE, Status.OK);
+ enqueueError(
+ 1,
+ bidiStreaming.method,
+ Status.OUT_OF_RANGE,
+ Status.OK,
+ stream.callId,
+ );
stream.send(newRequest(999));
expect(stream.completed).toBe(true);
@@ -867,6 +1054,7 @@ describe('RPC', () => {
bidiStreaming.method,
Status.INVALID_ARGUMENT,
Status.OK,
+ stream.callId,
);
await stream
@@ -894,8 +1082,18 @@ describe('RPC', () => {
it('non-blocking finish after completed', async () => {
const response = newResponse('!?');
- enqueueServerStream(1, bidiStreaming.method, response);
- enqueueResponse(1, bidiStreaming.method, Status.UNAVAILABLE);
+ enqueueServerStream(
+ 1,
+ bidiStreaming.method,
+ response,
+ bidiStreaming.rpcs.nextCallId,
+ );
+ enqueueResponse(
+ 1,
+ bidiStreaming.method,
+ Status.UNAVAILABLE,
+ bidiStreaming.rpcs.nextCallId,
+ );
const stream = bidiStreaming.invoke();
const result = await stream.finishAndWait();
@@ -907,8 +1105,19 @@ describe('RPC', () => {
it('non-blocking finish after error', async () => {
const response = newResponse('!?');
- enqueueServerStream(1, bidiStreaming.method, response);
- enqueueError(1, bidiStreaming.method, Status.UNAVAILABLE, Status.OK);
+ enqueueServerStream(
+ 1,
+ bidiStreaming.method,
+ response,
+ bidiStreaming.rpcs.nextCallId,
+ );
+ enqueueError(
+ 1,
+ bidiStreaming.method,
+ Status.UNAVAILABLE,
+ Status.OK,
+ bidiStreaming.rpcs.nextCallId,
+ );
const stream = bidiStreaming.invoke();
@@ -924,12 +1133,5 @@ describe('RPC', () => {
});
}
});
- it('non-blocking duplicate calls first is cancelled', () => {
- const firstCall = bidiStreaming.invoke();
- expect(firstCall.completed).toBe(false);
- const secondCall = bidiStreaming.invoke();
- expect(firstCall.error).toEqual(Status.CANCELLED);
- expect(secondCall.completed).toBe(false);
- });
});
});
diff --git a/pw_rpc/ts/packets.ts b/pw_rpc/ts/packets.ts
index b6cb71395..7d72f1c48 100644
--- a/pw_rpc/ts/packets.ts
+++ b/pw_rpc/ts/packets.ts
@@ -21,8 +21,8 @@ import {
} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
import { Status } from 'pigweedjs/pw_status';
-// Channel, Service, Method
-type idSet = [number, number, number];
+// Channel, Service, Method, CallId
+type idSet = [number, number, number, number];
export function decode(data: Uint8Array): RpcPacket {
return RpcPacket.deserializeBinary(data);
@@ -45,6 +45,7 @@ export function encodeClientError(
errorPacket.setChannelId(packet.getChannelId());
errorPacket.setMethodId(packet.getMethodId());
errorPacket.setServiceId(packet.getServiceId());
+ errorPacket.setCallId(packet.getCallId());
errorPacket.setStatus(status);
return errorPacket.serializeBinary();
}
@@ -55,6 +56,7 @@ export function encodeClientStream(ids: idSet, message: Message): Uint8Array {
streamPacket.setChannelId(ids[0]);
streamPacket.setServiceId(ids[1]);
streamPacket.setMethodId(ids[2]);
+ streamPacket.setCallId(ids[3]);
const msgSerialized = (message as any)['serializeBinary']();
streamPacket.setPayload(msgSerialized);
return streamPacket.serializeBinary();
@@ -66,6 +68,7 @@ export function encodeClientStreamEnd(ids: idSet): Uint8Array {
streamEnd.setChannelId(ids[0]);
streamEnd.setServiceId(ids[1]);
streamEnd.setMethodId(ids[2]);
+ streamEnd.setCallId(ids[3]);
return streamEnd.serializeBinary();
}
@@ -80,6 +83,7 @@ export function encodeRequest(ids: idSet, request?: Message): Uint8Array {
packet.setChannelId(ids[0]);
packet.setServiceId(ids[1]);
packet.setMethodId(ids[2]);
+ packet.setCallId(ids[3]);
packet.setPayload(payload);
return packet.serializeBinary();
}
@@ -90,6 +94,7 @@ export function encodeResponse(ids: idSet, response: Message): Uint8Array {
packet.setChannelId(ids[0]);
packet.setServiceId(ids[1]);
packet.setMethodId(ids[2]);
+ packet.setCallId(ids[3]);
const msgSerialized = (response as any)['serializeBinary']();
packet.setPayload(msgSerialized);
return packet.serializeBinary();
@@ -102,5 +107,6 @@ export function encodeCancel(ids: idSet): Uint8Array {
packet.setChannelId(ids[0]);
packet.setServiceId(ids[1]);
packet.setMethodId(ids[2]);
+ packet.setCallId(ids[3]);
return packet.serializeBinary();
}
diff --git a/pw_rpc/ts/packets_test.ts b/pw_rpc/ts/packets_test.ts
index 50cdb3f1a..2805b1194 100644
--- a/pw_rpc/ts/packets_test.ts
+++ b/pw_rpc/ts/packets_test.ts
@@ -27,6 +27,7 @@ function addTestData(packet: RpcPacket) {
packet.setChannelId(1);
packet.setServiceId(2);
packet.setMethodId(3);
+ packet.setCallId(4);
packet.setPayload(payload.serializeBinary());
}
@@ -42,7 +43,7 @@ describe('Packets', () => {
const dataPacket = new RpcPacket();
dataPacket.setStatus(321);
- const data = packets.encodeRequest([1, 2, 3], dataPacket);
+ const data = packets.encodeRequest([1, 2, 3, 4], dataPacket);
const packet = RpcPacket.deserializeBinary(data);
expect(packet.toObject()).toEqual(goldenRequest.toObject());
@@ -55,7 +56,7 @@ describe('Packets', () => {
const dataPacket = new RpcPacket();
dataPacket.setStatus(321);
- const data = packets.encodeResponse([1, 2, 3], dataPacket);
+ const data = packets.encodeResponse([1, 2, 3, 4], dataPacket);
const packet = RpcPacket.deserializeBinary(data);
expect(packet.toObject()).toEqual(goldenResponse.toObject());
@@ -73,6 +74,7 @@ describe('Packets', () => {
golden.setChannelId(1);
golden.setServiceId(2);
golden.setMethodId(3);
+ golden.setCallId(4);
golden.setStatus(Status.NOT_FOUND);
expect(errorPacket.toObject()).toEqual(golden.toObject());
@@ -85,8 +87,9 @@ describe('Packets', () => {
goldenCancel.setChannelId(1);
goldenCancel.setServiceId(2);
goldenCancel.setMethodId(3);
+ goldenCancel.setCallId(4);
- const data = packets.encodeCancel([1, 2, 3]);
+ const data = packets.encodeCancel([1, 2, 3, 4]);
const packet = RpcPacket.deserializeBinary(data);
expect(packet.toObject()).toEqual(goldenCancel.toObject());
});
diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts
index 83001e268..cfe08d4cb 100644
--- a/pw_rpc/ts/rpc_classes.ts
+++ b/pw_rpc/ts/rpc_classes.ts
@@ -19,6 +19,13 @@ import { Call } from './call';
import { Channel, Method, Service } from './descriptors';
import * as packets from './packets';
+/** Max number that can fit into a 2-byte varint */
+const MAX_CALL_ID = 1 << 14;
+/** Calls with ID of `kOpenCallId` were unrequested, and are updated to have the
+ call ID of the first matching request. */
+const LEGACY_OPEN_CALL_ID = 0;
+const OPEN_CALL_ID = 2 ** 32 - 1;
+
/** Data class for a pending RPC call. */
export class Rpc {
readonly channel: Channel;
@@ -31,19 +38,19 @@ export class Rpc {
this.method = method;
}
- /** Returns channel service method id tuple */
- get idSet(): [number, number, number] {
- return [this.channel.id, this.service.id, this.method.id];
+ /** Returns channel service method callId tuple */
+ getIdSet(callId: number): [number, number, number, number] {
+ return [this.channel.id, this.service.id, this.method.id, callId];
}
/**
- * Returns a string sequence to uniquely identify channel, service, and
- * method. This can be used to hash the Rpc.
+ * Returns a string sequence to uniquely identify channel, service, method
+ * and call ID. This can be used to hash the Rpc.
*
- * For example: "12346789.23452345.12341234"
+ * For example: "12346789.23452345.12341234.34"
*/
- get idString(): string {
- return `${this.channel.id}.${this.service.id}.${this.method.id}`;
+ getIdString(callId: number): string {
+ return `${this.channel.id}.${this.service.id}.${this.method.id}.${callId}`;
}
toString(): string {
@@ -57,12 +64,24 @@ export class Rpc {
/** Tracks pending RPCs and encodes outgoing RPC packets. */
export class PendingCalls {
pending: Map<string, Call> = new Map();
+ // We skip callId zero to avoid LEGACY_OPEN_CALL_ID.
+ nextCallId: number = 1;
/** Starts the provided RPC and returns the encoded packet to send. */
request(rpc: Rpc, request: Message, call: Call): Uint8Array {
this.open(rpc, call);
console.log(`Starting ${rpc}`);
- return packets.encodeRequest(rpc.idSet, request);
+ return packets.encodeRequest(rpc.getIdSet(call.callId), request);
+ }
+
+ allocateCallId(): number {
+ const callId = this.nextCallId;
+ this.nextCallId = (this.nextCallId + 1) % MAX_CALL_ID;
+ // We skip callId zero to avoid LEGACY_OPEN_CALL_ID.
+ if (this.nextCallId == 0) {
+ this.nextCallId = 1;
+ }
+ return callId;
}
/** Calls request and sends the resulting packet to the channel. */
@@ -73,7 +92,7 @@ export class PendingCalls {
request?: Message,
): Call | undefined {
const previous = this.open(rpc, call);
- const packet = packets.encodeRequest(rpc.idSet, request);
+ const packet = packets.encodeRequest(rpc.getIdSet(call.callId), request);
try {
rpc.channel.send(packet);
} catch (error) {
@@ -93,37 +112,37 @@ export class PendingCalls {
*/
open(rpc: Rpc, call: Call): Call | undefined {
console.debug(`Starting ${rpc}`);
- const previous = this.pending.get(rpc.idString);
- this.pending.set(rpc.idString, call);
+ const previous = this.pending.get(rpc.getIdString(call.callId));
+ this.pending.set(rpc.getIdString(call.callId), call);
return previous;
}
- sendClientStream(rpc: Rpc, message: Message) {
- if (this.getPending(rpc) === undefined) {
+ sendClientStream(rpc: Rpc, message: Message, callId: number) {
+ if (this.getPending(rpc, callId) === undefined) {
throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
}
- rpc.channel.send(packets.encodeClientStream(rpc.idSet, message));
+ rpc.channel.send(packets.encodeClientStream(rpc.getIdSet(callId), message));
}
- sendClientStreamEnd(rpc: Rpc) {
- if (this.getPending(rpc) === undefined) {
+ sendClientStreamEnd(rpc: Rpc, callId: number) {
+ if (this.getPending(rpc, callId) === undefined) {
throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
}
- rpc.channel.send(packets.encodeClientStreamEnd(rpc.idSet));
+ rpc.channel.send(packets.encodeClientStreamEnd(rpc.getIdSet(callId)));
}
/** Cancels the RPC. Returns the CLIENT_ERROR packet to send. */
- cancel(rpc: Rpc): Uint8Array {
+ cancel(rpc: Rpc, callId: number): Uint8Array {
console.debug(`Cancelling ${rpc}`);
- this.pending.delete(rpc.idString);
- return packets.encodeCancel(rpc.idSet);
+ this.pending.delete(rpc.getIdString(callId));
+ return packets.encodeCancel(rpc.getIdSet(callId));
}
/** Calls cancel and sends the cancel packet, if any, to the channel. */
- sendCancel(rpc: Rpc): boolean {
+ sendCancel(rpc: Rpc, callId: number): boolean {
let packet: Uint8Array | undefined;
try {
- packet = this.cancel(rpc);
+ packet = this.cancel(rpc, callId);
} catch (err) {
return false;
}
@@ -135,13 +154,25 @@ export class PendingCalls {
}
/** Gets the pending RPC's call. If status is set, clears the RPC. */
- getPending(rpc: Rpc, status?: Status): Call | undefined {
+ getPending(rpc: Rpc, callId: number, status?: Status): Call | undefined {
+ let call: Call | undefined = this.pending.get(rpc.getIdString(callId));
+ if (callId === LEGACY_OPEN_CALL_ID || callId === OPEN_CALL_ID) {
+ // Calls with ID `OPEN_CALL_ID` were unrequested, and are updated to
+ // have the call ID of the first matching request.
+ const allPendingCalls = Array.from(this.pending.values());
+ for (const pending in allPendingCalls) {
+ const curCall = allPendingCalls[pending];
+ if (curCall.rpc.getIdString(0) === rpc.getIdString(0)) {
+ call = curCall;
+ break;
+ }
+ }
+ }
if (status === undefined) {
- return this.pending.get(rpc.idString);
+ return call;
}
- const call = this.pending.get(rpc.idString);
- this.pending.delete(rpc.idString);
+ this.pending.delete(rpc.getIdString(callId));
return call;
}
}