aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/raw/public
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/raw/public')
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h51
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/client_testing.h46
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/internal/method.h4
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h2
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h87
-rw-r--r--pw_rpc/raw/public/pw_rpc/raw/test_method_context.h7
6 files changed, 123 insertions, 74 deletions
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
index 5e21a59b6..2d780279b 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
@@ -1,4 +1,4 @@
-// Copyright 2021 The Pigweed Authors
+// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -38,6 +38,8 @@ class RawClientReaderWriter : private internal::StreamResponseClientCall {
using internal::Call::active;
using internal::Call::channel_id;
+ using internal::ClientCall::id;
+
// Functions for setting the callbacks.
using internal::StreamResponseClientCall::set_on_completed;
using internal::StreamResponseClientCall::set_on_error;
@@ -47,25 +49,35 @@ class RawClientReaderWriter : private internal::StreamResponseClientCall {
using internal::Call::Write;
// Notifies the server that no further client stream messages will be sent.
- using internal::Call::CloseClientStream;
+ using internal::ClientCall::CloseClientStream;
- // Cancels this RPC.
+ // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+ // the server.
using internal::Call::Cancel;
+ // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+ // packet. Future packets for this RPC are dropped, and the client sends a
+ // FAILED_PRECONDITION error in response because the call is not active.
+ using internal::ClientCall::Abandon;
+
// Allow use as a generic RPC Writer.
using internal::Call::operator Writer&;
using internal::Call::operator const Writer&;
- protected:
+ private:
friend class internal::StreamResponseClientCall;
- RawClientReaderWriter(internal::Endpoint& client,
+ RawClientReaderWriter(internal::LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
- uint32_t method_id,
- MethodType type = MethodType::kBidirectionalStreaming)
+ uint32_t method_id)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
: StreamResponseClientCall(
- client, channel_id, service_id, method_id, type) {}
+ client,
+ channel_id,
+ service_id,
+ method_id,
+ RawCallProps(MethodType::kBidirectionalStreaming)) {}
};
// Handles responses for a server streaming RPC.
@@ -84,19 +96,21 @@ class RawClientReader : private internal::StreamResponseClientCall {
using internal::StreamResponseClientCall::set_on_next;
using internal::Call::Cancel;
+ using internal::ClientCall::Abandon;
private:
friend class internal::StreamResponseClientCall;
- RawClientReader(internal::Endpoint& client,
+ RawClientReader(internal::LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
: StreamResponseClientCall(client,
channel_id,
service_id,
method_id,
- MethodType::kServerStreaming) {}
+ RawCallProps(MethodType::kServerStreaming)) {}
};
// Sends requests and handles the response for a client streaming RPC.
@@ -116,6 +130,7 @@ class RawClientWriter : private internal::UnaryResponseClientCall {
using internal::Call::Cancel;
using internal::Call::CloseClientStream;
using internal::Call::Write;
+ using internal::ClientCall::Abandon;
// Allow use as a generic RPC Writer.
using internal::Call::operator Writer&;
@@ -124,15 +139,16 @@ class RawClientWriter : private internal::UnaryResponseClientCall {
private:
friend class internal::UnaryResponseClientCall;
- RawClientWriter(internal::Endpoint& client,
+ RawClientWriter(internal::LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
: UnaryResponseClientCall(client,
channel_id,
service_id,
method_id,
- MethodType::kClientStreaming) {}
+ RawCallProps(MethodType::kClientStreaming)) {}
};
// Handles the response for to unary RPC.
@@ -149,17 +165,22 @@ class RawUnaryReceiver : private internal::UnaryResponseClientCall {
using internal::UnaryResponseClientCall::set_on_completed;
using internal::UnaryResponseClientCall::set_on_error;
+ using internal::ClientCall::Abandon;
using internal::UnaryResponseClientCall::Cancel;
private:
friend class internal::UnaryResponseClientCall;
- RawUnaryReceiver(internal::Endpoint& client,
+ RawUnaryReceiver(internal::LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id)
- : UnaryResponseClientCall(
- client, channel_id, service_id, method_id, MethodType::kUnary) {}
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
+ : UnaryResponseClientCall(client,
+ channel_id,
+ service_id,
+ method_id,
+ RawCallProps(MethodType::kUnary)) {}
};
} // namespace pw::rpc
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_testing.h b/pw_rpc/raw/public/pw_rpc/raw/client_testing.h
index 436e59e8e..dbda24d58 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/client_testing.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/client_testing.h
@@ -26,7 +26,7 @@
namespace pw::rpc {
-// TODO(pwbug/477): Document the client testing APIs.
+// TODO(b/234878467): Document the client testing APIs.
// Sends packets to an RPC client as if it were a pw_rpc server.
class FakeServer {
@@ -45,8 +45,10 @@ class FakeServer {
template <auto kMethod,
typename = std::enable_if_t<
HasServerStream(internal::MethodInfo<kMethod>::kType)>>
- void SendResponse(Status status) const {
- SendPacket<kMethod>(internal::PacketType::RESPONSE, {}, status);
+ void SendResponse(Status status,
+ std::optional<uint32_t> call_id = std::nullopt) const {
+ SendPacket<kMethod>(
+ internal::pwpb::PacketType::RESPONSE, {}, status, call_id);
}
// Sends a response packet for a unary or client streaming streaming RPC to
@@ -54,45 +56,57 @@ class FakeServer {
template <auto kMethod,
typename = std::enable_if_t<
!HasServerStream(internal::MethodInfo<kMethod>::kType)>>
- void SendResponse(ConstByteSpan payload, Status status) const {
- SendPacket<kMethod>(internal::PacketType::RESPONSE, payload, status);
+ void SendResponse(ConstByteSpan payload,
+ Status status,
+ std::optional<uint32_t> call_id = std::nullopt) const {
+ SendPacket<kMethod>(
+ internal::pwpb::PacketType::RESPONSE, payload, status, call_id);
}
// Sends a stream packet for a server or bidirectional streaming RPC to the
// client.
template <auto kMethod>
- void SendServerStream(ConstByteSpan payload) const {
+ void SendServerStream(ConstByteSpan payload,
+ std::optional<uint32_t> call_id = std::nullopt) const {
static_assert(HasServerStream(internal::MethodInfo<kMethod>::kType),
"Only server and bidirectional streaming methods can receive "
"server stream packets");
- SendPacket<kMethod>(internal::PacketType::SERVER_STREAM, payload);
+ SendPacket<kMethod>(internal::pwpb::PacketType::SERVER_STREAM,
+ payload,
+ OkStatus(),
+ call_id);
}
// Sends a server error packet to the client.
template <auto kMethod>
- void SendServerError(Status error) const {
- SendPacket<kMethod>(internal::PacketType::SERVER_ERROR, {}, error);
+ void SendServerError(Status error,
+ std::optional<uint32_t> call_id = std::nullopt) const {
+ SendPacket<kMethod>(
+ internal::pwpb::PacketType::SERVER_ERROR, {}, error, call_id);
}
private:
template <auto kMethod>
- void SendPacket(internal::PacketType type,
- ConstByteSpan payload = {},
- Status status = OkStatus()) const {
+ void SendPacket(internal::pwpb::PacketType type,
+ ConstByteSpan payload,
+ Status status,
+ std::optional<uint32_t> call_id) const {
using Info = internal::MethodInfo<kMethod>;
CheckProcessPacket(
- type, Info::kServiceId, Info::kMethodId, payload, status);
+ type, Info::kServiceId, Info::kMethodId, call_id, payload, status);
}
- void CheckProcessPacket(internal::PacketType type,
+ void CheckProcessPacket(internal::pwpb::PacketType type,
uint32_t service_id,
uint32_t method_id,
+ std::optional<uint32_t> call_id,
ConstByteSpan payload,
Status status) const;
- Status ProcessPacket(internal::PacketType type,
+ Status ProcessPacket(internal::pwpb::PacketType type,
uint32_t service_id,
uint32_t method_id,
+ std::optional<uint32_t> call_id,
ConstByteSpan payload,
Status status) const;
@@ -114,7 +128,7 @@ class RawClientTestContext {
constexpr RawClientTestContext()
: channel_(Channel::Create<kDefaultChannelId>(&channel_output_)),
- client_(std::span(&channel_, 1)),
+ client_(span(&channel_, 1)),
packet_buffer_{},
fake_server_(
channel_output_, client_, kDefaultChannelId, packet_buffer_) {}
diff --git a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
index 0ab022222..0d02d4a1c 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
@@ -115,10 +115,6 @@ class RawMethod : public Method {
constexpr RawMethod(uint32_t id, Invoker invoker, Function function)
: Method(id, invoker), function_(function) {}
- static void SynchronousUnaryInvoker(const CallContext& context,
- const Packet& request)
- PW_UNLOCK_FUNCTION(rpc_lock());
-
static void AsynchronousUnaryInvoker(const CallContext& context,
const Packet& request)
PW_UNLOCK_FUNCTION(rpc_lock());
diff --git a/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h b/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h
index d3eb2bc88..4158b0be7 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/internal/method_union.h
@@ -45,6 +45,6 @@ constexpr RawMethod GetRawMethodFor(uint32_t id) {
} else {
return InvalidMethod<kMethod, kType, RawMethod>(id);
}
-};
+}
} // namespace pw::rpc::internal
diff --git a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
index 8ff018e55..4de87fc03 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
@@ -57,14 +57,16 @@ class RawServerReaderWriter : private internal::ServerCall {
template <auto kMethod, typename ServiceImpl>
[[nodiscard]] static RawServerReaderWriter Open(Server& server,
uint32_t channel_id,
- ServiceImpl& service) {
- internal::LockGuard lock(internal::rpc_lock());
- return {server.OpenContext<kMethod, MethodType::kBidirectionalStreaming>(
+ ServiceImpl& service)
+ PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
+ return server.OpenCall<RawServerReaderWriter,
+ kMethod,
+ MethodType::kBidirectionalStreaming>(
channel_id,
service,
internal::MethodLookup::GetRawMethod<
ServiceImpl,
- internal::MethodInfo<kMethod>::kMethodId>())};
+ internal::MethodInfo<kMethod>::kMethodId>());
}
using internal::Call::active;
@@ -87,17 +89,26 @@ class RawServerReaderWriter : private internal::ServerCall {
using internal::Call::operator const Writer&;
protected:
- RawServerReaderWriter(const internal::CallContext& context,
- MethodType type = MethodType::kBidirectionalStreaming)
- : internal::ServerCall(context, type) {}
+ RawServerReaderWriter(const internal::LockedCallContext& context,
+ MethodType type)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
+ : internal::ServerCall(
+ context,
+ internal::CallProperties(
+ type, internal::kServerCall, internal::kRawProto)) {}
using internal::Call::CloseAndSendResponse;
private:
friend class internal::RawMethod; // Needed to construct
+ friend class Server;
template <typename, typename, uint32_t>
friend class internal::test::InvocationContext;
+
+ // Private constructor for test use
+ RawServerReaderWriter(const internal::LockedCallContext& context)
+ : RawServerReaderWriter(context, MethodType::kBidirectionalStreaming) {}
};
// The RawServerReader is used to receive messages and send a response in a
@@ -110,14 +121,15 @@ class RawServerReader : private RawServerReaderWriter {
template <auto kMethod, typename ServiceImpl>
[[nodiscard]] static RawServerReader Open(Server& server,
uint32_t channel_id,
- ServiceImpl& service) {
- internal::LockGuard lock(internal::rpc_lock());
- return {server.OpenContext<kMethod, MethodType::kClientStreaming>(
- channel_id,
- service,
- internal::MethodLookup::GetRawMethod<
- ServiceImpl,
- internal::MethodInfo<kMethod>::kMethodId>())};
+ ServiceImpl& service)
+ PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
+ return server
+ .OpenCall<RawServerReader, kMethod, MethodType::kClientStreaming>(
+ channel_id,
+ service,
+ internal::MethodLookup::GetRawMethod<
+ ServiceImpl,
+ internal::MethodInfo<kMethod>::kMethodId>());
}
constexpr RawServerReader() = default;
@@ -138,11 +150,13 @@ class RawServerReader : private RawServerReaderWriter {
private:
friend class internal::RawMethod; // Needed for conversions from ReaderWriter
+ friend class Server;
template <typename, typename, uint32_t>
friend class internal::test::InvocationContext;
- RawServerReader(const internal::CallContext& context)
+ RawServerReader(const internal::LockedCallContext& context)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
: RawServerReaderWriter(context, MethodType::kClientStreaming) {}
};
@@ -155,14 +169,15 @@ class RawServerWriter : private RawServerReaderWriter {
template <auto kMethod, typename ServiceImpl>
[[nodiscard]] static RawServerWriter Open(Server& server,
uint32_t channel_id,
- ServiceImpl& service) {
- internal::LockGuard lock(internal::rpc_lock());
- return {server.OpenContext<kMethod, MethodType::kServerStreaming>(
- channel_id,
- service,
- internal::MethodLookup::GetRawMethod<
- ServiceImpl,
- internal::MethodInfo<kMethod>::kMethodId>())};
+ ServiceImpl& service)
+ PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
+ return server
+ .OpenCall<RawServerWriter, kMethod, MethodType::kServerStreaming>(
+ channel_id,
+ service,
+ internal::MethodLookup::GetRawMethod<
+ ServiceImpl,
+ internal::MethodInfo<kMethod>::kMethodId>());
}
constexpr RawServerWriter() = default;
@@ -183,12 +198,14 @@ class RawServerWriter : private RawServerReaderWriter {
using internal::Call::operator const Writer&;
private:
+ friend class internal::RawMethod;
+ friend class Server;
+
template <typename, typename, uint32_t>
friend class internal::test::InvocationContext;
- friend class internal::RawMethod;
-
- RawServerWriter(const internal::CallContext& context)
+ RawServerWriter(const internal::LockedCallContext& context)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
: RawServerReaderWriter(context, MethodType::kServerStreaming) {}
};
@@ -201,14 +218,14 @@ class RawUnaryResponder : private RawServerReaderWriter {
template <auto kMethod, typename ServiceImpl>
[[nodiscard]] static RawUnaryResponder Open(Server& server,
uint32_t channel_id,
- ServiceImpl& service) {
- internal::LockGuard lock(internal::rpc_lock());
- return {server.OpenContext<kMethod, MethodType::kUnary>(
+ ServiceImpl& service)
+ PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
+ return server.OpenCall<RawUnaryResponder, kMethod, MethodType::kUnary>(
channel_id,
service,
internal::MethodLookup::GetRawMethod<
ServiceImpl,
- internal::MethodInfo<kMethod>::kMethodId>())};
+ internal::MethodInfo<kMethod>::kMethodId>());
}
constexpr RawUnaryResponder() = default;
@@ -226,12 +243,14 @@ class RawUnaryResponder : private RawServerReaderWriter {
}
private:
+ friend class internal::RawMethod;
+ friend class Server;
+
template <typename, typename, uint32_t>
friend class internal::test::InvocationContext;
- friend class internal::RawMethod;
-
- RawUnaryResponder(const internal::CallContext& context)
+ RawUnaryResponder(const internal::LockedCallContext& context)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
: RawServerReaderWriter(context, MethodType::kUnary) {}
};
diff --git a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
index e91306604..fca027b5d 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
@@ -137,10 +137,9 @@ class UnaryContext
auto responder = Base::template GetResponder<RawUnaryResponder>();
std::byte response[kSynchronousResponseBufferSizeBytes] = {};
auto sws = CallMethodImplFunction<kMethod>(
- Base::service(), request, std::span(response));
- PW_ASSERT(
- responder.Finish(std::span(response).first(sws.size()), sws.status())
- .ok());
+ Base::service(), request, span(response));
+ PW_ASSERT(responder.Finish(span(response).first(sws.size()), sws.status())
+ .ok());
return sws;
} else {
Base::template call<kMethod, RawUnaryResponder>(request);