aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h')
-rw-r--r--pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h170
1 files changed, 170 insertions, 0 deletions
diff --git a/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h b/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h
new file mode 100644
index 000000000..d263081c6
--- /dev/null
+++ b/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h
@@ -0,0 +1,170 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <sys/types.h>
+
+#include <mutex>
+
+#include "pw_bytes/span.h"
+#include "pw_metric/metric.h"
+#include "pw_rpc/channel.h"
+#include "pw_rpc/packet_meta.h"
+#include "pw_rpc_transport/hdlc_framing.h"
+#include "pw_rpc_transport/rpc_transport.h"
+#include "pw_rpc_transport/simple_framing.h"
+#include "pw_status/status.h"
+#include "pw_sync/lock_annotations.h"
+#include "pw_sync/mutex.h"
+#include "rpc_transport.h"
+
+namespace pw::rpc {
+namespace internal {
+
+void LogBadPacket();
+void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id);
+void LogMissingEgressForChannel(uint32_t channel_id);
+void LogIngressSendFailure(uint32_t channel_id, Status status);
+
+} // namespace internal
+
+// Ties RPC transport and RPC frame encoder together.
+template <typename Encoder>
+class RpcEgress : public RpcEgressHandler, public ChannelOutput {
+ public:
+ RpcEgress(std::string_view channel_name, RpcFrameSender& transport)
+ : ChannelOutput(channel_name.data()), transport_(transport) {}
+
+ // Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the
+ // provided packet using the target transport's MTU as max frame size and
+ // sends it over that transport.
+ //
+ // Sending a packet may result in multiple RpcTransport::Write calls which
+ // must not be interleaved in order for the packet to be successfully
+ // reassembled from the transport-level frames by the receiver. RpcEgress
+ // is using a mutex to ensure this. Technically we could just rely on pw_rpc
+ // global lock but that would unnecessarily couple transport logic to pw_rpc
+ // internals.
+ Status SendRpcPacket(ConstByteSpan rpc_packet) override {
+ std::lock_guard lock(mutex_);
+ return encoder_.Encode(rpc_packet,
+ transport_.MaximumTransmissionUnit(),
+ [this](RpcFrame& frame) {
+ // Encoders must call this callback inline so that
+ // we're still holding `mutex_` here. Unfortunately
+ // the lock annotations cannot be used on
+ // `transport_` to enforce this.
+ return transport_.Send(frame);
+ });
+ }
+
+ // Implements ChannelOutput.
+ Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); }
+
+ private:
+ sync::Mutex mutex_;
+ RpcFrameSender& transport_;
+ Encoder encoder_ PW_GUARDED_BY(mutex_);
+};
+
+// Ties a channel id and the egress that packets on that channel should be sent
+// to.
+struct ChannelEgress {
+ ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler)
+ : channel_id(id), egress(&egress_handler) {}
+
+ const uint32_t channel_id;
+ RpcEgressHandler* const egress = nullptr;
+};
+
+// Handler for incoming RPC packets. RpcIngress is not thread-safe and must be
+// accessed from a single thread (typically the RPC RX thread).
+template <typename Decoder>
+class RpcIngress : public RpcIngressHandler {
+ public:
+ static constexpr size_t kMaxChannelId = 64;
+ RpcIngress() = default;
+
+ explicit RpcIngress(span<ChannelEgress> channel_egresses) {
+ for (auto& channel : channel_egresses) {
+ PW_ASSERT(channel.channel_id <= kMaxChannelId);
+ channel_egresses_[channel.channel_id] = channel.egress;
+ }
+ }
+
+ const metric::Group& metrics() const { return metrics_; }
+
+ uint32_t num_bad_packets() const { return bad_packets_.value(); }
+
+ uint32_t num_overflow_channel_ids() const {
+ return overflow_channel_ids_.value();
+ }
+
+ uint32_t num_missing_egresses() const { return missing_egresses_.value(); }
+
+ uint32_t num_egress_errors() const { return egress_errors_.value(); }
+
+ // Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each
+ // packet and sends the packet to the egress registered for that channel.
+ Status ProcessIncomingData(ConstByteSpan buffer) override {
+ return decoder_.Decode(buffer, [this](ConstByteSpan packet) {
+ const auto packet_meta = rpc::PacketMeta::FromBuffer(packet);
+ if (!packet_meta.ok()) {
+ bad_packets_.Increment();
+ internal::LogBadPacket();
+ return;
+ }
+ if (packet_meta->channel_id() > kMaxChannelId) {
+ overflow_channel_ids_.Increment();
+ internal::LogChannelIdOverflow(packet_meta->channel_id(),
+ kMaxChannelId);
+ return;
+ }
+ auto* egress = channel_egresses_[packet_meta->channel_id()];
+ if (egress == nullptr) {
+ missing_egresses_.Increment();
+ internal::LogMissingEgressForChannel(packet_meta->channel_id());
+ return;
+ }
+ const auto status = egress->SendRpcPacket(packet);
+ if (!status.ok()) {
+ egress_errors_.Increment();
+ internal::LogIngressSendFailure(packet_meta->channel_id(), status);
+ }
+ });
+ }
+
+ private:
+ std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{};
+ Decoder decoder_;
+ PW_METRIC_GROUP(metrics_, "pw_rpc_transport");
+ PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u);
+ PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u);
+ PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u);
+ PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u);
+};
+
+template <size_t kMaxPacketSize>
+using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>;
+
+template <size_t kMaxPacketSize>
+using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>;
+
+template <size_t kMaxPacketSize>
+using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>;
+
+template <size_t kMaxPacketSize>
+using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>;
+
+} // namespace pw::rpc