diff options
Diffstat (limited to 'pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h')
-rw-r--r-- | pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h b/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h new file mode 100644 index 000000000..d263081c6 --- /dev/null +++ b/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h @@ -0,0 +1,170 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include <sys/types.h> + +#include <mutex> + +#include "pw_bytes/span.h" +#include "pw_metric/metric.h" +#include "pw_rpc/channel.h" +#include "pw_rpc/packet_meta.h" +#include "pw_rpc_transport/hdlc_framing.h" +#include "pw_rpc_transport/rpc_transport.h" +#include "pw_rpc_transport/simple_framing.h" +#include "pw_status/status.h" +#include "pw_sync/lock_annotations.h" +#include "pw_sync/mutex.h" +#include "rpc_transport.h" + +namespace pw::rpc { +namespace internal { + +void LogBadPacket(); +void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id); +void LogMissingEgressForChannel(uint32_t channel_id); +void LogIngressSendFailure(uint32_t channel_id, Status status); + +} // namespace internal + +// Ties RPC transport and RPC frame encoder together. +template <typename Encoder> +class RpcEgress : public RpcEgressHandler, public ChannelOutput { + public: + RpcEgress(std::string_view channel_name, RpcFrameSender& transport) + : ChannelOutput(channel_name.data()), transport_(transport) {} + + // Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the + // provided packet using the target transport's MTU as max frame size and + // sends it over that transport. + // + // Sending a packet may result in multiple RpcTransport::Write calls which + // must not be interleaved in order for the packet to be successfully + // reassembled from the transport-level frames by the receiver. RpcEgress + // is using a mutex to ensure this. Technically we could just rely on pw_rpc + // global lock but that would unnecessarily couple transport logic to pw_rpc + // internals. + Status SendRpcPacket(ConstByteSpan rpc_packet) override { + std::lock_guard lock(mutex_); + return encoder_.Encode(rpc_packet, + transport_.MaximumTransmissionUnit(), + [this](RpcFrame& frame) { + // Encoders must call this callback inline so that + // we're still holding `mutex_` here. Unfortunately + // the lock annotations cannot be used on + // `transport_` to enforce this. + return transport_.Send(frame); + }); + } + + // Implements ChannelOutput. + Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); } + + private: + sync::Mutex mutex_; + RpcFrameSender& transport_; + Encoder encoder_ PW_GUARDED_BY(mutex_); +}; + +// Ties a channel id and the egress that packets on that channel should be sent +// to. +struct ChannelEgress { + ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler) + : channel_id(id), egress(&egress_handler) {} + + const uint32_t channel_id; + RpcEgressHandler* const egress = nullptr; +}; + +// Handler for incoming RPC packets. RpcIngress is not thread-safe and must be +// accessed from a single thread (typically the RPC RX thread). +template <typename Decoder> +class RpcIngress : public RpcIngressHandler { + public: + static constexpr size_t kMaxChannelId = 64; + RpcIngress() = default; + + explicit RpcIngress(span<ChannelEgress> channel_egresses) { + for (auto& channel : channel_egresses) { + PW_ASSERT(channel.channel_id <= kMaxChannelId); + channel_egresses_[channel.channel_id] = channel.egress; + } + } + + const metric::Group& metrics() const { return metrics_; } + + uint32_t num_bad_packets() const { return bad_packets_.value(); } + + uint32_t num_overflow_channel_ids() const { + return overflow_channel_ids_.value(); + } + + uint32_t num_missing_egresses() const { return missing_egresses_.value(); } + + uint32_t num_egress_errors() const { return egress_errors_.value(); } + + // Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each + // packet and sends the packet to the egress registered for that channel. + Status ProcessIncomingData(ConstByteSpan buffer) override { + return decoder_.Decode(buffer, [this](ConstByteSpan packet) { + const auto packet_meta = rpc::PacketMeta::FromBuffer(packet); + if (!packet_meta.ok()) { + bad_packets_.Increment(); + internal::LogBadPacket(); + return; + } + if (packet_meta->channel_id() > kMaxChannelId) { + overflow_channel_ids_.Increment(); + internal::LogChannelIdOverflow(packet_meta->channel_id(), + kMaxChannelId); + return; + } + auto* egress = channel_egresses_[packet_meta->channel_id()]; + if (egress == nullptr) { + missing_egresses_.Increment(); + internal::LogMissingEgressForChannel(packet_meta->channel_id()); + return; + } + const auto status = egress->SendRpcPacket(packet); + if (!status.ok()) { + egress_errors_.Increment(); + internal::LogIngressSendFailure(packet_meta->channel_id(), status); + } + }); + } + + private: + std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{}; + Decoder decoder_; + PW_METRIC_GROUP(metrics_, "pw_rpc_transport"); + PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u); + PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u); + PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u); + PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u); +}; + +template <size_t kMaxPacketSize> +using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>; + +template <size_t kMaxPacketSize> +using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>; + +template <size_t kMaxPacketSize> +using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>; + +template <size_t kMaxPacketSize> +using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>; + +} // namespace pw::rpc |