diff options
author | Wyatt Hepler <hepler@google.com> | 2023-04-11 01:27:36 +0000 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2023-04-11 01:27:36 +0000 |
commit | e850621049d267fbf5b78f9eb3f9b4895cdc4c92 (patch) | |
tree | 30504cb2541225f129353f84d3658b04fed96a5a /pw_rpc | |
parent | 3ceb704d2101ba27e064886414447b3365ca8f47 (diff) | |
download | pigweed-e850621049d267fbf5b78f9eb3f9b4895cdc4c92.tar.gz |
pw_rpc: Raw synchronous call API
Raw synchronous calls pass the raw response buffer to a callback
function.
Fixes: b/268216599
Change-Id: I0f9462e5d5baa987f754615c3156942e05f9215f
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/128371
Reviewed-by: Xiaofan Jiang <xiaofanj@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_rpc')
-rw-r--r-- | pw_rpc/public/pw_rpc/internal/synchronous_call_impl.h | 95 | ||||
-rw-r--r-- | pw_rpc/public/pw_rpc/synchronous_call.h | 126 | ||||
-rw-r--r-- | pw_rpc/raw/BUILD.bazel | 13 | ||||
-rw-r--r-- | pw_rpc/raw/BUILD.gn | 16 | ||||
-rw-r--r-- | pw_rpc/raw/synchronous_call_test.cc | 237 |
5 files changed, 475 insertions, 12 deletions
diff --git a/pw_rpc/public/pw_rpc/internal/synchronous_call_impl.h b/pw_rpc/public/pw_rpc/internal/synchronous_call_impl.h index 8a2d5546b..856adf7d7 100644 --- a/pw_rpc/public/pw_rpc/internal/synchronous_call_impl.h +++ b/pw_rpc/public/pw_rpc/internal/synchronous_call_impl.h @@ -13,6 +13,9 @@ // the License. #pragma once +#include <utility> + +#include "pw_rpc/internal/method_info.h" #include "pw_rpc/synchronous_call_result.h" #include "pw_sync/timed_thread_notification.h" @@ -38,4 +41,96 @@ struct SynchronousCallState { sync::TimedThreadNotification notify; }; +class RawSynchronousCallState { + public: + RawSynchronousCallState(Function<void(ConstByteSpan, Status)> on_completed) + : on_completed_(std::move(on_completed)) {} + + auto OnCompletedCallback() { + return [this](ConstByteSpan response, Status status) { + if (on_completed_) { + on_completed_(response, status); + } + notify.release(); + }; + } + + auto OnRpcErrorCallback() { + return [this](Status status) { + error = status; + notify.release(); + }; + } + + Status error; + sync::TimedThreadNotification notify; + + private: + Function<void(ConstByteSpan, Status)> on_completed_; +}; + +// Overloaded function to choose detween timeout and deadline APIs. +inline bool AcquireNotification(sync::TimedThreadNotification& notification, + chrono::SystemClock::duration timeout) { + return notification.try_acquire_for(timeout); +} + +inline bool AcquireNotification(sync::TimedThreadNotification& notification, + chrono::SystemClock::time_point timeout) { + return notification.try_acquire_until(timeout); +} + +// Template for a raw synchronous call. Used for SynchronousCall, +// SynchronousCallFor, and SynchronousCallUntil. The type of the timeout +// argument is used to determine the behavior. +template <auto kRpcMethod, typename DoCall, typename... TimeoutArg> +Status RawSynchronousCall(Function<void(ConstByteSpan, Status)>&& on_completed, + DoCall&& do_call, + TimeoutArg... timeout_arg) { + static_assert(MethodInfo<kRpcMethod>::kType == MethodType::kUnary, + "Only unary methods can be used with synchronous calls"); + + RawSynchronousCallState call_state{std::move(on_completed)}; + + auto call = std::forward<DoCall>(do_call)(call_state); + + // Wait for the notification based on the type of the timeout argument. + if constexpr (sizeof...(TimeoutArg) == 0) { + call_state.notify.acquire(); // Wait forever, since no timeout was given. + } else if (!AcquireNotification(call_state.notify, timeout_arg...)) { + return Status::DeadlineExceeded(); + } + + return call_state.error; +} + +// Invokes the RPC method free function using a call_state. +template <auto kRpcMethod> +constexpr auto CallFreeFunction(Client& client, + uint32_t channel_id, + const ConstByteSpan& request) { + return [&client, channel_id, &request]( + internal::RawSynchronousCallState& call_state) { + return kRpcMethod(client, + channel_id, + request, + call_state.OnCompletedCallback(), + call_state.OnRpcErrorCallback()); + }; +} + +// Invokes the RPC function on the generated service client using a call_state. +template <auto kRpcMethod> +constexpr auto CallGeneratedClient( + const typename MethodInfo<kRpcMethod>::GeneratedClient& client, + const ConstByteSpan& request) { + return [&client, &request](internal::RawSynchronousCallState& call_state) { + constexpr auto kMemberFunction = MethodInfo<kRpcMethod>::template Function< + typename MethodInfo<kRpcMethod>::GeneratedClient>(); + return (client.*kMemberFunction)(request, + call_state.OnCompletedCallback(), + call_state.OnRpcErrorCallback()); + }; +} + } // namespace pw::rpc::internal diff --git a/pw_rpc/public/pw_rpc/synchronous_call.h b/pw_rpc/public/pw_rpc/synchronous_call.h index 92bd20217..8a2be800a 100644 --- a/pw_rpc/public/pw_rpc/synchronous_call.h +++ b/pw_rpc/public/pw_rpc/synchronous_call.h @@ -25,25 +25,28 @@ /// /// `pw_rpc` provides wrappers that convert the asynchronous client API to a /// synchronous API. The `SynchronousCall<RpcMethod>` functions wrap the -/// asynchronous client RPC call with a timed thread notification and return -/// once a result is known or a timeout has occurred. These return a -/// `SynchronousCallResult<Response>` object, which can be queried to determine -/// whether any error scenarios occurred and, if not, access the response. +/// asynchronous client RPC call with a timed thread notification and returns +/// once a result is known or a timeout has occurred. Only unary methods are +/// supported. +/// +/// The Nanopb and pwpb APIs return a `SynchronousCallResult<Response>` object, +/// which can be queried to determine whether any error scenarios occurred and, +/// if not, access the response. The raw API executes a function when the call +/// completes or returns a `pw::Status` if it does not. /// /// `SynchronousCall<RpcMethod>` blocks indefinitely, whereas /// `SynchronousCallFor<RpcMethod>` and `SynchronousCallUntil<RpcMethod>` block /// for a given timeout or until a deadline, respectively. All wrappers work -/// with both the standalone static RPC functions and the generated Client -/// member methods. +/// with either the standalone static RPC functions or the generated service +/// client member methods. /// /// @note Use of the SynchronousCall wrappers requires a /// @cpp_class{pw::sync::TimedThreadNotification} backend. /// -/// @note Only nanopb and pw_protobuf unary RPC methods are supported. -/// -/// The following example blocks indefinitely. If you'd like to include a -/// timeout for how long the call should block for, use the -/// `SynchronousCallFor()` or `SynchronousCallUntil()` variants. +/// The following examples use the Nanopb API to make a call that blocks +/// indefinitely. If you'd like to include a timeout for how long the call +/// should block for, use the `SynchronousCallFor()` or `SynchronousCallUntil()` +/// variants. /// /// @code{.cpp} /// pw_rpc_EchoMessage request{.msg = "hello" }; @@ -58,7 +61,7 @@ /// /// Additionally, the use of a generated `Client` object is supported: /// -/// @code +/// @code{.cpp} /// pw_rpc::nanopb::EchoService::Client client(rpc_client, channel_id); /// pw_rpc_EchoMessage request{.msg = "hello" }; /// pw::rpc::SynchronousCallResult<pw_rpc_EchoMessage> result = @@ -69,6 +72,22 @@ /// } /// @endcode /// +/// The raw API works similarly to the Nanopb API, but takes a +/// @cpp_type{pw::Function} and returns a @cpp_class{pw::Status}. If the RPC +/// completes, the @cpp_type{pw::Function} is called with the response and +/// returned status, and the `SynchronousCall` invocation returns +/// @pw_status{OK}. If the RPC fails, `SynchronousCall` returns an error. +/// +/// @code{.cpp} +/// pw::Status rpc_status = pw::rpc::SynchronousCall<EchoService::Echo>( +/// rpc_client, channel_id, encoded_request, +/// [](pw::ConstByteSpan reply, pw::Status status) { +/// PW_LOG_INFO("Received %zu bytes with status %s", +/// reply.size(), +/// status.str()); +/// }); +/// @endcode +/// /// @warning These wrappers should not be used from any context that cannot be /// blocked! This method will block the calling thread until the RPC completes, /// and translate the response into a `pw::rpc::SynchronousCallResult` that @@ -134,6 +153,30 @@ SynchronousCall( return std::move(call_state.result); } +/// Invokes a unary RPC synchronously using the raw API. Blocks until a +/// response is received. +template <auto kRpcMethod> +Status SynchronousCall(Client& client, + uint32_t channel_id, + ConstByteSpan request, + Function<void(ConstByteSpan, Status)>&& on_completed) { + return internal::RawSynchronousCall<kRpcMethod>( + std::move(on_completed), + internal::CallFreeFunction<kRpcMethod>(client, channel_id, request)); +} + +/// Invokes a unary RPC synchronously using the raw API. Blocks until a +/// response is received. +template <auto kRpcMethod> +Status SynchronousCall( + const typename internal::MethodInfo<kRpcMethod>::GeneratedClient& client, + ConstByteSpan request, + Function<void(ConstByteSpan, Status)>&& on_completed) { + return internal::RawSynchronousCall<kRpcMethod>( + std::move(on_completed), + internal::CallGeneratedClient<kRpcMethod>(client, request)); +} + /// Invokes a unary RPC synchronously using Nanopb or pwpb. Blocks until a /// response is received or the provided timeout passes. /// @@ -201,6 +244,35 @@ SynchronousCallFor( return std::move(call_state.result); } +/// Invokes a unary RPC synchronously using the raw API. Blocks until a +/// response is received or the provided timeout passes. +template <auto kRpcMethod> +Status SynchronousCallFor( + Client& client, + uint32_t channel_id, + ConstByteSpan request, + chrono::SystemClock::duration timeout, + Function<void(ConstByteSpan, Status)>&& on_completed) { + return internal::RawSynchronousCall<kRpcMethod>( + std::move(on_completed), + internal::CallFreeFunction<kRpcMethod>(client, channel_id, request), + timeout); +} + +/// Invokes a unary RPC synchronously using the raw API. Blocks until a +/// response is received or the provided timeout passes. +template <auto kRpcMethod> +Status SynchronousCallFor( + const typename internal::MethodInfo<kRpcMethod>::GeneratedClient& client, + ConstByteSpan request, + chrono::SystemClock::duration timeout, + Function<void(ConstByteSpan, Status)>&& on_completed) { + return internal::RawSynchronousCall<kRpcMethod>( + std::move(on_completed), + internal::CallGeneratedClient<kRpcMethod>(client, request), + timeout); +} + /// Invokes a unary RPC synchronously using Nanopb or pwpb. Blocks until a /// response is received or the provided deadline arrives. /// @@ -267,4 +339,34 @@ SynchronousCallUntil( return std::move(call_state.result); } + +/// Invokes a unary RPC synchronously using the raw API. Blocks until a +/// response is received or the provided deadline arrives. +template <auto kRpcMethod> +Status SynchronousCallUntil( + Client& client, + uint32_t channel_id, + ConstByteSpan request, + chrono::SystemClock::time_point deadline, + Function<void(ConstByteSpan, Status)>&& on_completed) { + return internal::RawSynchronousCall<kRpcMethod>( + std::move(on_completed), + internal::CallFreeFunction<kRpcMethod>(client, channel_id, request), + deadline); +} + +/// Invokes a unary RPC synchronously using the raw API. Blocks until a +/// response is received or the provided deadline arrives. +template <auto kRpcMethod> +Status SynchronousCallUntil( + const typename internal::MethodInfo<kRpcMethod>::GeneratedClient& client, + ConstByteSpan request, + chrono::SystemClock::time_point deadline, + Function<void(ConstByteSpan, Status)>&& on_completed) { + return internal::RawSynchronousCall<kRpcMethod>( + std::move(on_completed), + internal::CallGeneratedClient<kRpcMethod>(client, request), + deadline); +} + } // namespace pw::rpc diff --git a/pw_rpc/raw/BUILD.bazel b/pw_rpc/raw/BUILD.bazel index ac9c10e8c..7d9033296 100644 --- a/pw_rpc/raw/BUILD.bazel +++ b/pw_rpc/raw/BUILD.bazel @@ -195,3 +195,16 @@ pw_cc_test( "//pw_rpc:pw_rpc_test_cc.raw_rpc", ], ) + +pw_cc_test( + name = "synchronous_call_test", + srcs = ["synchronous_call_test.cc"], + deps = [ + ":test_method_context", + "//pw_rpc:pw_rpc_test_cc.raw_rpc", + "//pw_rpc:synchronous_client_api", + "//pw_work_queue", + "//pw_work_queue:stl_test_thread", + "//pw_work_queue:test_thread_header", + ], +) diff --git a/pw_rpc/raw/BUILD.gn b/pw_rpc/raw/BUILD.gn index 48b35bf50..59db92dbf 100644 --- a/pw_rpc/raw/BUILD.gn +++ b/pw_rpc/raw/BUILD.gn @@ -17,6 +17,8 @@ import("//build_overrides/pigweed.gni") import("$dir_pw_build/target_types.gni") import("$dir_pw_compilation_testing/negative_compilation_test.gni") import("$dir_pw_docgen/docs.gni") +import("$dir_pw_sync/backend.gni") +import("$dir_pw_thread/backend.gni") import("$dir_pw_unit_test/test.gni") config("public") { @@ -85,6 +87,7 @@ pw_test_group("tests") { ":method_union_test", ":server_reader_writer_test", ":stub_generation_test", + ":synchronous_call_test", ] } @@ -163,6 +166,19 @@ pw_test("stub_generation_test") { sources = [ "stub_generation_test.cc" ] } +pw_test("synchronous_call_test") { + deps = [ + ":test_method_context", + "$dir_pw_work_queue:pw_work_queue", + "$dir_pw_work_queue:stl_test_thread", + "$dir_pw_work_queue:test_thread", + "..:synchronous_client_api", + "..:test_protos.raw_rpc", + ] + sources = [ "synchronous_call_test.cc" ] + enable_if = pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != "" +} + pw_cc_negative_compilation_test("service_nc_test") { sources = [ "service_nc_test.cc" ] deps = [ "..:test_protos.raw_rpc" ] diff --git a/pw_rpc/raw/synchronous_call_test.cc b/pw_rpc/raw/synchronous_call_test.cc new file mode 100644 index 000000000..79e92db6d --- /dev/null +++ b/pw_rpc/raw/synchronous_call_test.cc @@ -0,0 +1,237 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc/synchronous_call.h" + +#include <chrono> +#include <string_view> + +#include "gtest/gtest.h" +#include "pw_assert/check.h" +#include "pw_chrono/system_clock.h" +#include "pw_rpc/channel.h" +#include "pw_rpc/internal/packet.h" +#include "pw_rpc/raw/fake_channel_output.h" +#include "pw_rpc_test_protos/test.raw_rpc.pb.h" +#include "pw_status/status.h" +#include "pw_status/status_with_size.h" +#include "pw_thread/thread.h" +#include "pw_work_queue/test_thread.h" +#include "pw_work_queue/work_queue.h" + +namespace pw::rpc::test { +namespace { + +using ::pw::rpc::test::pw_rpc::raw::TestService; +using MethodInfo = internal::MethodInfo<TestService::TestUnaryRpc>; + +class RawSynchronousCallTest : public ::testing::Test { + public: + RawSynchronousCallTest() + : channels_({{Channel::Create<42>(&fake_output_)}}), client_(channels_) {} + + void SetUp() override { + work_thread_ = + thread::Thread(work_queue::test::WorkQueueThreadOptions(), work_queue_); + } + + void TearDown() override { + work_queue_.RequestStop(); + work_thread_.join(); + } + + protected: + void OnSend(span<const std::byte> buffer, Status status) { + if (!status.ok()) { + return; + } + auto result = internal::Packet::FromBuffer(buffer); + EXPECT_TRUE(result.ok()); + request_packet_ = *result; + + EXPECT_TRUE(work_queue_.PushWork([this]() { SendResponse(); }).ok()); + } + + void SendResponse() { + std::array<std::byte, 256> buffer; + std::array<char, 32> payload_buffer; + + PW_CHECK_UINT_LE(response_.size(), payload_buffer.size()); + size_t size = response_.copy(payload_buffer.data(), payload_buffer.size()); + + auto response = + internal::Packet::Response(request_packet_, response_status_); + response.set_payload(as_bytes(span(payload_buffer.data(), size))); + EXPECT_TRUE(client_.ProcessPacket(response.Encode(buffer).value()).ok()); + } + + void set_response(std::string_view response, + Status response_status = OkStatus()) { + response_ = response; + response_status_ = response_status; + output().set_on_send([this](span<const std::byte> buffer, Status status) { + OnSend(buffer, status); + }); + } + + MethodInfo::GeneratedClient generated_client() { + return MethodInfo::GeneratedClient(client(), channel().id()); + } + + RawFakeChannelOutput<2>& output() { return fake_output_; } + const Channel& channel() const { return channels_.front(); } + Client& client() { return client_; } + + private: + RawFakeChannelOutput<2> fake_output_; + std::array<Channel, 1> channels_; + Client client_; + thread::Thread work_thread_; + work_queue::WorkQueueWithBuffer<1> work_queue_; + std::string_view response_; + Status response_status_ = OkStatus(); + internal::Packet request_packet_; +}; + +template <Status::Code kExpectedStatus = OkStatus().code()> +auto CopyReply(InlineString<32>& reply) { + return [&reply](ConstByteSpan response, Status status) { + EXPECT_EQ(Status(kExpectedStatus), status); + reply.assign(reinterpret_cast<const char*>(response.data()), + response.size()); + }; +} + +void ExpectNoReply(ConstByteSpan, Status) { FAIL(); } + +TEST_F(RawSynchronousCallTest, SynchronousCallSuccess) { + set_response("jicama", OkStatus()); + + InlineString<32> reply; + ASSERT_EQ(OkStatus(), + SynchronousCall<TestService::TestUnaryRpc>( + client(), channel().id(), {}, CopyReply(reply))); + EXPECT_EQ("jicama", reply); +} + +TEST_F(RawSynchronousCallTest, SynchronousCallServerError) { + set_response("raddish", Status::Internal()); + + InlineString<32> reply; + ASSERT_EQ(OkStatus(), + SynchronousCall<TestService::TestUnaryRpc>( + client(), + channel().id(), + {}, + CopyReply<Status::Internal().code()>(reply))); + // We should still receive the response + EXPECT_EQ("raddish", reply); +} + +TEST_F(RawSynchronousCallTest, SynchronousCallRpcError) { + // Internally, if Channel receives a non-ok status from the + // ChannelOutput::Send, it will always return Unknown. + output().set_send_status(Status::Unknown()); + + EXPECT_EQ(Status::Unknown(), + SynchronousCall<TestService::TestUnaryRpc>( + client(), channel().id(), {}, ExpectNoReply)); +} + +TEST_F(RawSynchronousCallTest, SynchronousCallFor) { + set_response("broccoli", Status::NotFound()); + + InlineString<32> reply; + ASSERT_EQ(OkStatus(), + SynchronousCallFor<TestService::TestUnaryRpc>( + client(), + channel().id(), + {}, + chrono::SystemClock::for_at_least(std::chrono::seconds(1)), + [&reply](ConstByteSpan response, Status status) { + EXPECT_EQ(Status::NotFound(), status); + reply.assign(reinterpret_cast<const char*>(response.data()), + response.size()); + })); + EXPECT_EQ("broccoli", reply); +} + +TEST_F(RawSynchronousCallTest, SynchronousCallForTimeoutError) { + ASSERT_EQ(Status::DeadlineExceeded(), + SynchronousCallFor<TestService::TestUnaryRpc>( + client(), + channel().id(), + {}, + chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)), + ExpectNoReply)); +} + +TEST_F(RawSynchronousCallTest, SynchronousCallUntilTimeoutError) { + EXPECT_EQ(Status::DeadlineExceeded(), + SynchronousCallUntil<TestService::TestUnaryRpc>( + client(), + channel().id(), + {}, + chrono::SystemClock::now(), + ExpectNoReply)); +} + +TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallSuccess) { + set_response("lettuce", OkStatus()); + + InlineString<32> reply; + EXPECT_EQ(OkStatus(), + SynchronousCall<TestService::TestUnaryRpc>( + generated_client(), {}, CopyReply(reply))); + EXPECT_EQ("lettuce", reply); +} + +TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallServerError) { + set_response("cabbage", Status::Internal()); + + InlineString<32> reply; + EXPECT_EQ( + OkStatus(), + SynchronousCall<TestService::TestUnaryRpc>( + generated_client(), {}, CopyReply<Status::Internal().code()>(reply))); + EXPECT_EQ("cabbage", reply); +} + +TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallRpcError) { + output().set_send_status(Status::Unknown()); + + EXPECT_EQ(Status::Unknown(), + SynchronousCall<TestService::TestUnaryRpc>( + generated_client(), {}, ExpectNoReply)); +} + +TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallForTimeoutError) { + EXPECT_EQ(Status::DeadlineExceeded(), + SynchronousCallFor<TestService::TestUnaryRpc>( + generated_client(), + {}, + chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)), + ExpectNoReply)); +} + +TEST_F(RawSynchronousCallTest, + GeneratedClientSynchronousCallUntilTimeoutError) { + EXPECT_EQ( + Status::DeadlineExceeded(), + SynchronousCallUntil<TestService::TestUnaryRpc>( + generated_client(), {}, chrono::SystemClock::now(), ExpectNoReply)); +} + +} // namespace +} // namespace pw::rpc::test |