diff options
author | Xin Li <delphij@google.com> | 2024-01-17 22:13:58 -0800 |
---|---|---|
committer | Xin Li <delphij@google.com> | 2024-01-17 22:13:58 -0800 |
commit | 28d03a2a1cabbe01d7bcb6cf5166c10e50d3c2c6 (patch) | |
tree | c1643be8ab17fc607cea748a8bb1d621a5964873 /pw_rpc_transport/egress_ingress_test.cc | |
parent | ec2628a6ba2d0ecbe3ac10c8c772f6fc6acc345d (diff) | |
parent | f054515492af5132f685cb23fe11891ee77104c9 (diff) | |
download | pigweed-28d03a2a1cabbe01d7bcb6cf5166c10e50d3c2c6.tar.gz |
Merge Android 24Q1 Release (ab/11220357)temp_319669529
Bug: 319669529
Merged-In: Iba357b308a79d0c8b560acd4f72b5423c9c83294
Change-Id: Icdf552029fb97a34e83c6dd7799433fc473a2506
Diffstat (limited to 'pw_rpc_transport/egress_ingress_test.cc')
-rw-r--r-- | pw_rpc_transport/egress_ingress_test.cc | 406 |
1 files changed, 406 insertions, 0 deletions
diff --git a/pw_rpc_transport/egress_ingress_test.cc b/pw_rpc_transport/egress_ingress_test.cc new file mode 100644 index 000000000..5c3f8d317 --- /dev/null +++ b/pw_rpc_transport/egress_ingress_test.cc @@ -0,0 +1,406 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc_transport/egress_ingress.h" + +#include <random> + +#include "gtest/gtest.h" +#include "public/pw_rpc_transport/rpc_transport.h" +#include "pw_bytes/span.h" +#include "pw_metric/metric.h" +#include "pw_rpc/client_server.h" +#include "pw_rpc/packet_meta.h" +#include "pw_rpc_transport/hdlc_framing.h" +#include "pw_rpc_transport/internal/test.rpc.pwpb.h" +#include "pw_rpc_transport/rpc_transport.h" +#include "pw_rpc_transport/service_registry.h" +#include "pw_rpc_transport/simple_framing.h" +#include "pw_status/status.h" +#include "pw_string/string.h" +#include "pw_sync/thread_notification.h" + +namespace pw::rpc { +namespace { + +constexpr size_t kMaxPacketSize = 256; + +class TestService final + : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service< + TestService> { + public: + Status Echo( + const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request, + pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) { + response.msg = request.msg; + return OkStatus(); + } +}; + +// A transport that stores all received frames so they can be manually retrieved +// by the ingress later. +class TestTransport : public RpcFrameSender { + public: + explicit TestTransport(size_t mtu, bool is_faulty = false) + : mtu_(mtu), is_faulty_(is_faulty) {} + + size_t MaximumTransmissionUnit() const override { return mtu_; } + + Status Send(RpcFrame frame) override { + if (is_faulty_) { + return Status::Internal(); + } + std::copy( + frame.header.begin(), frame.header.end(), std::back_inserter(buffer_)); + std::copy(frame.payload.begin(), + frame.payload.end(), + std::back_inserter(buffer_)); + return OkStatus(); + } + + ByteSpan buffer() { return buffer_; } + + private: + size_t mtu_; + bool is_faulty_ = false; + std::vector<std::byte> buffer_; +}; + +// An egress handler that passes the received RPC packet to the service +// registry. +class TestLocalEgress : public RpcEgressHandler { + public: + Status SendRpcPacket(ConstByteSpan packet) override { + if (!registry_) { + return Status::FailedPrecondition(); + } + return registry_->ProcessRpcPacket(packet); + } + + void set_registry(ServiceRegistry& registry) { registry_ = ®istry; } + + private: + ServiceRegistry* registry_ = nullptr; +}; + +TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) { + constexpr uint32_t kChannelAtoB = 1; + constexpr size_t kMaxMessageLength = 200; + constexpr size_t kAtoBMtu = 33; + constexpr size_t kBtoAMtu = 72; + + TestTransport transport_a_to_b(kAtoBMtu); + TestTransport transport_b_to_a(kBtoAMtu); + + SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b); + SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a); + + std::array a_tx_channels = { + rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)}; + std::array b_tx_channels = { + rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)}; + + ServiceRegistry registry_a(a_tx_channels); + ServiceRegistry registry_b(b_tx_channels); + + TestService test_service; + registry_b.RegisterService(test_service); + + TestLocalEgress local_egress_a; + local_egress_a.set_registry(registry_a); + + TestLocalEgress local_egress_b; + local_egress_b.set_registry(registry_b); + + std::array a_rx_channels = { + ChannelEgress{kChannelAtoB, local_egress_a}, + }; + std::array b_rx_channels = { + ChannelEgress{kChannelAtoB, local_egress_b}, + }; + + SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels); + SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels); + + auto client = + registry_a + .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>( + kChannelAtoB); + + sync::ThreadNotification receiver1_done; + sync::ThreadNotification receiver2_done; + + struct ReceiverState { + InlineString<kMaxMessageLength> message; + sync::ThreadNotification done; + }; + + ReceiverState receiver1; + ReceiverState receiver2; + receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*'); + receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>'); + + auto call1 = client.Echo( + {.msg = receiver1.message}, + [&receiver1]( + const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response, + Status status) { + EXPECT_EQ(status, OkStatus()); + EXPECT_EQ(response.msg, receiver1.message); + receiver1.done.release(); + }, + [&receiver1](Status status) { + EXPECT_EQ(status, OkStatus()); + receiver1.done.release(); + }); + + auto call2 = client.Echo( + {.msg = receiver2.message}, + [&receiver2]( + const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response, + Status status) { + EXPECT_EQ(status, OkStatus()); + EXPECT_EQ(response.msg, receiver2.message); + receiver2.done.release(); + }, + [&receiver2](Status status) { + EXPECT_EQ(status, OkStatus()); + receiver2.done.release(); + }); + + // Calling `ingress_b.ProcessIncomingData` reads all packets from the + // transport and dispatches them according to the ingress configuration. + // Dispatching a packet generates a reply message: we then read it back at the + // sender by calling `ingress_a.ProcessIncomingData`. + EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()), + OkStatus()); + EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()), + OkStatus()); + + receiver1.done.acquire(); + receiver2.done.acquire(); +} + +TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) { + constexpr uint32_t kChannelAtoB = 1; + constexpr size_t kMaxMessageLength = 200; + constexpr size_t kAtoBMtu = 33; + constexpr size_t kBtoAMtu = 72; + + TestTransport transport_a_to_b(kAtoBMtu); + TestTransport transport_b_to_a(kBtoAMtu); + + HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b); + HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a); + + std::array a_tx_channels = { + rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)}; + std::array b_tx_channels = { + rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)}; + + ServiceRegistry registry_a(a_tx_channels); + ServiceRegistry registry_b(b_tx_channels); + + TestService test_service; + registry_b.RegisterService(test_service); + + TestLocalEgress local_egress_a; + local_egress_a.set_registry(registry_a); + + TestLocalEgress local_egress_b; + local_egress_b.set_registry(registry_b); + + std::array a_rx_channels = { + ChannelEgress{kChannelAtoB, local_egress_a}, + }; + std::array b_rx_channels = { + ChannelEgress{kChannelAtoB, local_egress_b}, + }; + + HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels); + HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels); + + auto client = + registry_a + .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>( + kChannelAtoB); + + sync::ThreadNotification receiver1_done; + sync::ThreadNotification receiver2_done; + + struct ReceiverState { + InlineString<kMaxMessageLength> message; + sync::ThreadNotification done; + }; + + ReceiverState receiver1; + ReceiverState receiver2; + receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*'); + receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>'); + + auto call1 = client.Echo( + {.msg = receiver1.message}, + [&receiver1]( + const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response, + Status status) { + EXPECT_EQ(status, OkStatus()); + EXPECT_EQ(response.msg, receiver1.message); + receiver1.done.release(); + }, + [&receiver1](Status status) { + EXPECT_EQ(status, OkStatus()); + receiver1.done.release(); + }); + + auto call2 = client.Echo( + {.msg = receiver2.message}, + [&receiver2]( + const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response, + Status status) { + EXPECT_EQ(status, OkStatus()); + EXPECT_EQ(response.msg, receiver2.message); + receiver2.done.release(); + }, + [&receiver2](Status status) { + EXPECT_EQ(status, OkStatus()); + receiver2.done.release(); + }); + + // Calling `ingress_b.ProcessIncomingData` reads all packets from the + // transport and dispatches them according to the ingress configuration. + // Dispatching a packet generates a reply message: we then read it back at the + // sender by calling `ingress_a.ProcessIncomingData`. + EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()), + OkStatus()); + EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()), + OkStatus()); + + receiver1.done.acquire(); + receiver2.done.acquire(); +} + +TEST(RpcEgressIngressTest, MalformedRpcPacket) { + constexpr uint32_t kTestChannel = 1; + constexpr size_t kMtu = 33; + std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}}; + + TestTransport transport(kMtu); + SimpleRpcEgress<kMaxPacketSize> egress("test", transport); + + TestLocalEgress local_egress; + std::array rx_channels = { + ChannelEgress{kTestChannel, local_egress}, + }; + + SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels); + + EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus()); + EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus()); + + EXPECT_EQ(ingress.num_bad_packets(), 1u); + EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u); + EXPECT_EQ(ingress.num_missing_egresses(), 0u); + EXPECT_EQ(ingress.num_egress_errors(), 0u); +} + +TEST(RpcEgressIngressTest, ChannelIdOverflow) { + constexpr uint32_t kInvalidChannelId = 65; + constexpr size_t kMtu = 128; + + TestTransport transport(kMtu); + SimpleRpcEgress<kMaxPacketSize> egress("test", transport); + + std::array sender_tx_channels = { + rpc::Channel::Create<kInvalidChannelId>(&egress)}; + + ServiceRegistry registry(sender_tx_channels); + auto client = + registry + .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>( + kInvalidChannelId); + + SimpleRpcIngress<kMaxPacketSize> ingress; + + auto receiver = client.Echo({.msg = "test"}); + + EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus()); + + EXPECT_EQ(ingress.num_bad_packets(), 0u); + EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u); + EXPECT_EQ(ingress.num_missing_egresses(), 0u); + EXPECT_EQ(ingress.num_egress_errors(), 0u); +} + +TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) { + constexpr uint32_t kChannelA = 22; + constexpr uint32_t kChannelB = 33; + constexpr size_t kMtu = 128; + + TestTransport transport(kMtu); + SimpleRpcEgress<kMaxPacketSize> egress("test", transport); + + std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)}; + + ServiceRegistry registry(sender_tx_channels); + auto client = + registry + .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>( + kChannelA); + + std::array ingress_channels = {ChannelEgress(kChannelB, egress)}; + SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels); + + auto receiver = client.Echo({.msg = "test"}); + + EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus()); + + EXPECT_EQ(ingress.num_bad_packets(), 0u); + EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u); + EXPECT_EQ(ingress.num_missing_egresses(), 1u); + EXPECT_EQ(ingress.num_egress_errors(), 0u); +} + +TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) { + constexpr uint32_t kChannelId = 22; + constexpr size_t kMtu = 128; + + TestTransport good_transport(kMtu, /*is_faulty=*/false); + TestTransport bad_transport(kMtu, /*is_faulty=*/true); + SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport); + SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport); + + std::array sender_tx_channels = { + rpc::Channel::Create<kChannelId>(&good_egress)}; + + ServiceRegistry registry(sender_tx_channels); + auto client = + registry + .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>( + kChannelId); + + std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)}; + SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels); + + auto receiver = client.Echo({.msg = "test"}); + + EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus()); + + EXPECT_EQ(ingress.num_bad_packets(), 0u); + EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u); + EXPECT_EQ(ingress.num_missing_egresses(), 0u); + EXPECT_EQ(ingress.num_egress_errors(), 1u); +} + +} // namespace +} // namespace pw::rpc |