diff options
Diffstat (limited to 'pw_rpc/raw/public')
-rw-r--r-- | pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h | 51 | ||||
-rw-r--r-- | pw_rpc/raw/public/pw_rpc/raw/client_testing.h | 46 | ||||
-rw-r--r-- | pw_rpc/raw/public/pw_rpc/raw/internal/method.h | 4 | ||||
-rw-r--r-- | pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h | 2 | ||||
-rw-r--r-- | pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h | 87 | ||||
-rw-r--r-- | pw_rpc/raw/public/pw_rpc/raw/test_method_context.h | 7 |
6 files changed, 123 insertions, 74 deletions
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h index 5e21a59b6..2d780279b 100644 --- a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h +++ b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h @@ -1,4 +1,4 @@ -// Copyright 2021 The Pigweed Authors +// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of @@ -38,6 +38,8 @@ class RawClientReaderWriter : private internal::StreamResponseClientCall { using internal::Call::active; using internal::Call::channel_id; + using internal::ClientCall::id; + // Functions for setting the callbacks. using internal::StreamResponseClientCall::set_on_completed; using internal::StreamResponseClientCall::set_on_error; @@ -47,25 +49,35 @@ class RawClientReaderWriter : private internal::StreamResponseClientCall { using internal::Call::Write; // Notifies the server that no further client stream messages will be sent. - using internal::Call::CloseClientStream; + using internal::ClientCall::CloseClientStream; - // Cancels this RPC. + // Cancels this RPC. Closes the call locally and sends a CANCELLED error to + // the server. using internal::Call::Cancel; + // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation + // packet. Future packets for this RPC are dropped, and the client sends a + // FAILED_PRECONDITION error in response because the call is not active. + using internal::ClientCall::Abandon; + // Allow use as a generic RPC Writer. using internal::Call::operator Writer&; using internal::Call::operator const Writer&; - protected: + private: friend class internal::StreamResponseClientCall; - RawClientReaderWriter(internal::Endpoint& client, + RawClientReaderWriter(internal::LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, - uint32_t method_id, - MethodType type = MethodType::kBidirectionalStreaming) + uint32_t method_id) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) : StreamResponseClientCall( - client, channel_id, service_id, method_id, type) {} + client, + channel_id, + service_id, + method_id, + RawCallProps(MethodType::kBidirectionalStreaming)) {} }; // Handles responses for a server streaming RPC. @@ -84,19 +96,21 @@ class RawClientReader : private internal::StreamResponseClientCall { using internal::StreamResponseClientCall::set_on_next; using internal::Call::Cancel; + using internal::ClientCall::Abandon; private: friend class internal::StreamResponseClientCall; - RawClientReader(internal::Endpoint& client, + RawClientReader(internal::LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, uint32_t method_id) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) : StreamResponseClientCall(client, channel_id, service_id, method_id, - MethodType::kServerStreaming) {} + RawCallProps(MethodType::kServerStreaming)) {} }; // Sends requests and handles the response for a client streaming RPC. @@ -116,6 +130,7 @@ class RawClientWriter : private internal::UnaryResponseClientCall { using internal::Call::Cancel; using internal::Call::CloseClientStream; using internal::Call::Write; + using internal::ClientCall::Abandon; // Allow use as a generic RPC Writer. using internal::Call::operator Writer&; @@ -124,15 +139,16 @@ class RawClientWriter : private internal::UnaryResponseClientCall { private: friend class internal::UnaryResponseClientCall; - RawClientWriter(internal::Endpoint& client, + RawClientWriter(internal::LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, uint32_t method_id) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) : UnaryResponseClientCall(client, channel_id, service_id, method_id, - MethodType::kClientStreaming) {} + RawCallProps(MethodType::kClientStreaming)) {} }; // Handles the response for to unary RPC. @@ -149,17 +165,22 @@ class RawUnaryReceiver : private internal::UnaryResponseClientCall { using internal::UnaryResponseClientCall::set_on_completed; using internal::UnaryResponseClientCall::set_on_error; + using internal::ClientCall::Abandon; using internal::UnaryResponseClientCall::Cancel; private: friend class internal::UnaryResponseClientCall; - RawUnaryReceiver(internal::Endpoint& client, + RawUnaryReceiver(internal::LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, uint32_t method_id) - : UnaryResponseClientCall( - client, channel_id, service_id, method_id, MethodType::kUnary) {} + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) + : UnaryResponseClientCall(client, + channel_id, + service_id, + method_id, + RawCallProps(MethodType::kUnary)) {} }; } // namespace pw::rpc diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_testing.h b/pw_rpc/raw/public/pw_rpc/raw/client_testing.h index 436e59e8e..dbda24d58 100644 --- a/pw_rpc/raw/public/pw_rpc/raw/client_testing.h +++ b/pw_rpc/raw/public/pw_rpc/raw/client_testing.h @@ -26,7 +26,7 @@ namespace pw::rpc { -// TODO(pwbug/477): Document the client testing APIs. +// TODO(b/234878467): Document the client testing APIs. // Sends packets to an RPC client as if it were a pw_rpc server. class FakeServer { @@ -45,8 +45,10 @@ class FakeServer { template <auto kMethod, typename = std::enable_if_t< HasServerStream(internal::MethodInfo<kMethod>::kType)>> - void SendResponse(Status status) const { - SendPacket<kMethod>(internal::PacketType::RESPONSE, {}, status); + void SendResponse(Status status, + std::optional<uint32_t> call_id = std::nullopt) const { + SendPacket<kMethod>( + internal::pwpb::PacketType::RESPONSE, {}, status, call_id); } // Sends a response packet for a unary or client streaming streaming RPC to @@ -54,45 +56,57 @@ class FakeServer { template <auto kMethod, typename = std::enable_if_t< !HasServerStream(internal::MethodInfo<kMethod>::kType)>> - void SendResponse(ConstByteSpan payload, Status status) const { - SendPacket<kMethod>(internal::PacketType::RESPONSE, payload, status); + void SendResponse(ConstByteSpan payload, + Status status, + std::optional<uint32_t> call_id = std::nullopt) const { + SendPacket<kMethod>( + internal::pwpb::PacketType::RESPONSE, payload, status, call_id); } // Sends a stream packet for a server or bidirectional streaming RPC to the // client. template <auto kMethod> - void SendServerStream(ConstByteSpan payload) const { + void SendServerStream(ConstByteSpan payload, + std::optional<uint32_t> call_id = std::nullopt) const { static_assert(HasServerStream(internal::MethodInfo<kMethod>::kType), "Only server and bidirectional streaming methods can receive " "server stream packets"); - SendPacket<kMethod>(internal::PacketType::SERVER_STREAM, payload); + SendPacket<kMethod>(internal::pwpb::PacketType::SERVER_STREAM, + payload, + OkStatus(), + call_id); } // Sends a server error packet to the client. template <auto kMethod> - void SendServerError(Status error) const { - SendPacket<kMethod>(internal::PacketType::SERVER_ERROR, {}, error); + void SendServerError(Status error, + std::optional<uint32_t> call_id = std::nullopt) const { + SendPacket<kMethod>( + internal::pwpb::PacketType::SERVER_ERROR, {}, error, call_id); } private: template <auto kMethod> - void SendPacket(internal::PacketType type, - ConstByteSpan payload = {}, - Status status = OkStatus()) const { + void SendPacket(internal::pwpb::PacketType type, + ConstByteSpan payload, + Status status, + std::optional<uint32_t> call_id) const { using Info = internal::MethodInfo<kMethod>; CheckProcessPacket( - type, Info::kServiceId, Info::kMethodId, payload, status); + type, Info::kServiceId, Info::kMethodId, call_id, payload, status); } - void CheckProcessPacket(internal::PacketType type, + void CheckProcessPacket(internal::pwpb::PacketType type, uint32_t service_id, uint32_t method_id, + std::optional<uint32_t> call_id, ConstByteSpan payload, Status status) const; - Status ProcessPacket(internal::PacketType type, + Status ProcessPacket(internal::pwpb::PacketType type, uint32_t service_id, uint32_t method_id, + std::optional<uint32_t> call_id, ConstByteSpan payload, Status status) const; @@ -114,7 +128,7 @@ class RawClientTestContext { constexpr RawClientTestContext() : channel_(Channel::Create<kDefaultChannelId>(&channel_output_)), - client_(std::span(&channel_, 1)), + client_(span(&channel_, 1)), packet_buffer_{}, fake_server_( channel_output_, client_, kDefaultChannelId, packet_buffer_) {} diff --git a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h index 0ab022222..0d02d4a1c 100644 --- a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h +++ b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h @@ -115,10 +115,6 @@ class RawMethod : public Method { constexpr RawMethod(uint32_t id, Invoker invoker, Function function) : Method(id, invoker), function_(function) {} - static void SynchronousUnaryInvoker(const CallContext& context, - const Packet& request) - PW_UNLOCK_FUNCTION(rpc_lock()); - static void AsynchronousUnaryInvoker(const CallContext& context, const Packet& request) PW_UNLOCK_FUNCTION(rpc_lock()); diff --git a/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h b/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h index d3eb2bc88..4158b0be7 100644 --- a/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h +++ b/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h @@ -45,6 +45,6 @@ constexpr RawMethod GetRawMethodFor(uint32_t id) { } else { return InvalidMethod<kMethod, kType, RawMethod>(id); } -}; +} } // namespace pw::rpc::internal diff --git a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h index 8ff018e55..4de87fc03 100644 --- a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h +++ b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h @@ -57,14 +57,16 @@ class RawServerReaderWriter : private internal::ServerCall { template <auto kMethod, typename ServiceImpl> [[nodiscard]] static RawServerReaderWriter Open(Server& server, uint32_t channel_id, - ServiceImpl& service) { - internal::LockGuard lock(internal::rpc_lock()); - return {server.OpenContext<kMethod, MethodType::kBidirectionalStreaming>( + ServiceImpl& service) + PW_LOCKS_EXCLUDED(internal::rpc_lock()) { + return server.OpenCall<RawServerReaderWriter, + kMethod, + MethodType::kBidirectionalStreaming>( channel_id, service, internal::MethodLookup::GetRawMethod< ServiceImpl, - internal::MethodInfo<kMethod>::kMethodId>())}; + internal::MethodInfo<kMethod>::kMethodId>()); } using internal::Call::active; @@ -87,17 +89,26 @@ class RawServerReaderWriter : private internal::ServerCall { using internal::Call::operator const Writer&; protected: - RawServerReaderWriter(const internal::CallContext& context, - MethodType type = MethodType::kBidirectionalStreaming) - : internal::ServerCall(context, type) {} + RawServerReaderWriter(const internal::LockedCallContext& context, + MethodType type) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) + : internal::ServerCall( + context, + internal::CallProperties( + type, internal::kServerCall, internal::kRawProto)) {} using internal::Call::CloseAndSendResponse; private: friend class internal::RawMethod; // Needed to construct + friend class Server; template <typename, typename, uint32_t> friend class internal::test::InvocationContext; + + // Private constructor for test use + RawServerReaderWriter(const internal::LockedCallContext& context) + : RawServerReaderWriter(context, MethodType::kBidirectionalStreaming) {} }; // The RawServerReader is used to receive messages and send a response in a @@ -110,14 +121,15 @@ class RawServerReader : private RawServerReaderWriter { template <auto kMethod, typename ServiceImpl> [[nodiscard]] static RawServerReader Open(Server& server, uint32_t channel_id, - ServiceImpl& service) { - internal::LockGuard lock(internal::rpc_lock()); - return {server.OpenContext<kMethod, MethodType::kClientStreaming>( - channel_id, - service, - internal::MethodLookup::GetRawMethod< - ServiceImpl, - internal::MethodInfo<kMethod>::kMethodId>())}; + ServiceImpl& service) + PW_LOCKS_EXCLUDED(internal::rpc_lock()) { + return server + .OpenCall<RawServerReader, kMethod, MethodType::kClientStreaming>( + channel_id, + service, + internal::MethodLookup::GetRawMethod< + ServiceImpl, + internal::MethodInfo<kMethod>::kMethodId>()); } constexpr RawServerReader() = default; @@ -138,11 +150,13 @@ class RawServerReader : private RawServerReaderWriter { private: friend class internal::RawMethod; // Needed for conversions from ReaderWriter + friend class Server; template <typename, typename, uint32_t> friend class internal::test::InvocationContext; - RawServerReader(const internal::CallContext& context) + RawServerReader(const internal::LockedCallContext& context) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) : RawServerReaderWriter(context, MethodType::kClientStreaming) {} }; @@ -155,14 +169,15 @@ class RawServerWriter : private RawServerReaderWriter { template <auto kMethod, typename ServiceImpl> [[nodiscard]] static RawServerWriter Open(Server& server, uint32_t channel_id, - ServiceImpl& service) { - internal::LockGuard lock(internal::rpc_lock()); - return {server.OpenContext<kMethod, MethodType::kServerStreaming>( - channel_id, - service, - internal::MethodLookup::GetRawMethod< - ServiceImpl, - internal::MethodInfo<kMethod>::kMethodId>())}; + ServiceImpl& service) + PW_LOCKS_EXCLUDED(internal::rpc_lock()) { + return server + .OpenCall<RawServerWriter, kMethod, MethodType::kServerStreaming>( + channel_id, + service, + internal::MethodLookup::GetRawMethod< + ServiceImpl, + internal::MethodInfo<kMethod>::kMethodId>()); } constexpr RawServerWriter() = default; @@ -183,12 +198,14 @@ class RawServerWriter : private RawServerReaderWriter { using internal::Call::operator const Writer&; private: + friend class internal::RawMethod; + friend class Server; + template <typename, typename, uint32_t> friend class internal::test::InvocationContext; - friend class internal::RawMethod; - - RawServerWriter(const internal::CallContext& context) + RawServerWriter(const internal::LockedCallContext& context) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) : RawServerReaderWriter(context, MethodType::kServerStreaming) {} }; @@ -201,14 +218,14 @@ class RawUnaryResponder : private RawServerReaderWriter { template <auto kMethod, typename ServiceImpl> [[nodiscard]] static RawUnaryResponder Open(Server& server, uint32_t channel_id, - ServiceImpl& service) { - internal::LockGuard lock(internal::rpc_lock()); - return {server.OpenContext<kMethod, MethodType::kUnary>( + ServiceImpl& service) + PW_LOCKS_EXCLUDED(internal::rpc_lock()) { + return server.OpenCall<RawUnaryResponder, kMethod, MethodType::kUnary>( channel_id, service, internal::MethodLookup::GetRawMethod< ServiceImpl, - internal::MethodInfo<kMethod>::kMethodId>())}; + internal::MethodInfo<kMethod>::kMethodId>()); } constexpr RawUnaryResponder() = default; @@ -226,12 +243,14 @@ class RawUnaryResponder : private RawServerReaderWriter { } private: + friend class internal::RawMethod; + friend class Server; + template <typename, typename, uint32_t> friend class internal::test::InvocationContext; - friend class internal::RawMethod; - - RawUnaryResponder(const internal::CallContext& context) + RawUnaryResponder(const internal::LockedCallContext& context) + PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) : RawServerReaderWriter(context, MethodType::kUnary) {} }; diff --git a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h index e91306604..fca027b5d 100644 --- a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h +++ b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h @@ -137,10 +137,9 @@ class UnaryContext auto responder = Base::template GetResponder<RawUnaryResponder>(); std::byte response[kSynchronousResponseBufferSizeBytes] = {}; auto sws = CallMethodImplFunction<kMethod>( - Base::service(), request, std::span(response)); - PW_ASSERT( - responder.Finish(std::span(response).first(sws.size()), sws.status()) - .ok()); + Base::service(), request, span(response)); + PW_ASSERT(responder.Finish(span(response).first(sws.size()), sws.status()) + .ok()); return sws; } else { Base::template call<kMethod, RawUnaryResponder>(request); |