aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc_transport/egress_ingress_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc_transport/egress_ingress_test.cc')
-rw-r--r--pw_rpc_transport/egress_ingress_test.cc406
1 files changed, 406 insertions, 0 deletions
diff --git a/pw_rpc_transport/egress_ingress_test.cc b/pw_rpc_transport/egress_ingress_test.cc
new file mode 100644
index 000000000..5c3f8d317
--- /dev/null
+++ b/pw_rpc_transport/egress_ingress_test.cc
@@ -0,0 +1,406 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc_transport/egress_ingress.h"
+
+#include <random>
+
+#include "gtest/gtest.h"
+#include "public/pw_rpc_transport/rpc_transport.h"
+#include "pw_bytes/span.h"
+#include "pw_metric/metric.h"
+#include "pw_rpc/client_server.h"
+#include "pw_rpc/packet_meta.h"
+#include "pw_rpc_transport/hdlc_framing.h"
+#include "pw_rpc_transport/internal/test.rpc.pwpb.h"
+#include "pw_rpc_transport/rpc_transport.h"
+#include "pw_rpc_transport/service_registry.h"
+#include "pw_rpc_transport/simple_framing.h"
+#include "pw_status/status.h"
+#include "pw_string/string.h"
+#include "pw_sync/thread_notification.h"
+
+namespace pw::rpc {
+namespace {
+
+constexpr size_t kMaxPacketSize = 256;
+
+class TestService final
+ : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
+ TestService> {
+ public:
+ Status Echo(
+ const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
+ pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
+ response.msg = request.msg;
+ return OkStatus();
+ }
+};
+
+// A transport that stores all received frames so they can be manually retrieved
+// by the ingress later.
+class TestTransport : public RpcFrameSender {
+ public:
+ explicit TestTransport(size_t mtu, bool is_faulty = false)
+ : mtu_(mtu), is_faulty_(is_faulty) {}
+
+ size_t MaximumTransmissionUnit() const override { return mtu_; }
+
+ Status Send(RpcFrame frame) override {
+ if (is_faulty_) {
+ return Status::Internal();
+ }
+ std::copy(
+ frame.header.begin(), frame.header.end(), std::back_inserter(buffer_));
+ std::copy(frame.payload.begin(),
+ frame.payload.end(),
+ std::back_inserter(buffer_));
+ return OkStatus();
+ }
+
+ ByteSpan buffer() { return buffer_; }
+
+ private:
+ size_t mtu_;
+ bool is_faulty_ = false;
+ std::vector<std::byte> buffer_;
+};
+
+// An egress handler that passes the received RPC packet to the service
+// registry.
+class TestLocalEgress : public RpcEgressHandler {
+ public:
+ Status SendRpcPacket(ConstByteSpan packet) override {
+ if (!registry_) {
+ return Status::FailedPrecondition();
+ }
+ return registry_->ProcessRpcPacket(packet);
+ }
+
+ void set_registry(ServiceRegistry& registry) { registry_ = &registry; }
+
+ private:
+ ServiceRegistry* registry_ = nullptr;
+};
+
+TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) {
+ constexpr uint32_t kChannelAtoB = 1;
+ constexpr size_t kMaxMessageLength = 200;
+ constexpr size_t kAtoBMtu = 33;
+ constexpr size_t kBtoAMtu = 72;
+
+ TestTransport transport_a_to_b(kAtoBMtu);
+ TestTransport transport_b_to_a(kBtoAMtu);
+
+ SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
+ SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
+
+ std::array a_tx_channels = {
+ rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
+ std::array b_tx_channels = {
+ rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
+
+ ServiceRegistry registry_a(a_tx_channels);
+ ServiceRegistry registry_b(b_tx_channels);
+
+ TestService test_service;
+ registry_b.RegisterService(test_service);
+
+ TestLocalEgress local_egress_a;
+ local_egress_a.set_registry(registry_a);
+
+ TestLocalEgress local_egress_b;
+ local_egress_b.set_registry(registry_b);
+
+ std::array a_rx_channels = {
+ ChannelEgress{kChannelAtoB, local_egress_a},
+ };
+ std::array b_rx_channels = {
+ ChannelEgress{kChannelAtoB, local_egress_b},
+ };
+
+ SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
+ SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
+
+ auto client =
+ registry_a
+ .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
+ kChannelAtoB);
+
+ sync::ThreadNotification receiver1_done;
+ sync::ThreadNotification receiver2_done;
+
+ struct ReceiverState {
+ InlineString<kMaxMessageLength> message;
+ sync::ThreadNotification done;
+ };
+
+ ReceiverState receiver1;
+ ReceiverState receiver2;
+ receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
+ receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
+
+ auto call1 = client.Echo(
+ {.msg = receiver1.message},
+ [&receiver1](
+ const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
+ Status status) {
+ EXPECT_EQ(status, OkStatus());
+ EXPECT_EQ(response.msg, receiver1.message);
+ receiver1.done.release();
+ },
+ [&receiver1](Status status) {
+ EXPECT_EQ(status, OkStatus());
+ receiver1.done.release();
+ });
+
+ auto call2 = client.Echo(
+ {.msg = receiver2.message},
+ [&receiver2](
+ const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
+ Status status) {
+ EXPECT_EQ(status, OkStatus());
+ EXPECT_EQ(response.msg, receiver2.message);
+ receiver2.done.release();
+ },
+ [&receiver2](Status status) {
+ EXPECT_EQ(status, OkStatus());
+ receiver2.done.release();
+ });
+
+ // Calling `ingress_b.ProcessIncomingData` reads all packets from the
+ // transport and dispatches them according to the ingress configuration.
+ // Dispatching a packet generates a reply message: we then read it back at the
+ // sender by calling `ingress_a.ProcessIncomingData`.
+ EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
+ OkStatus());
+ EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
+ OkStatus());
+
+ receiver1.done.acquire();
+ receiver2.done.acquire();
+}
+
+TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) {
+ constexpr uint32_t kChannelAtoB = 1;
+ constexpr size_t kMaxMessageLength = 200;
+ constexpr size_t kAtoBMtu = 33;
+ constexpr size_t kBtoAMtu = 72;
+
+ TestTransport transport_a_to_b(kAtoBMtu);
+ TestTransport transport_b_to_a(kBtoAMtu);
+
+ HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
+ HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
+
+ std::array a_tx_channels = {
+ rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
+ std::array b_tx_channels = {
+ rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
+
+ ServiceRegistry registry_a(a_tx_channels);
+ ServiceRegistry registry_b(b_tx_channels);
+
+ TestService test_service;
+ registry_b.RegisterService(test_service);
+
+ TestLocalEgress local_egress_a;
+ local_egress_a.set_registry(registry_a);
+
+ TestLocalEgress local_egress_b;
+ local_egress_b.set_registry(registry_b);
+
+ std::array a_rx_channels = {
+ ChannelEgress{kChannelAtoB, local_egress_a},
+ };
+ std::array b_rx_channels = {
+ ChannelEgress{kChannelAtoB, local_egress_b},
+ };
+
+ HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
+ HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
+
+ auto client =
+ registry_a
+ .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
+ kChannelAtoB);
+
+ sync::ThreadNotification receiver1_done;
+ sync::ThreadNotification receiver2_done;
+
+ struct ReceiverState {
+ InlineString<kMaxMessageLength> message;
+ sync::ThreadNotification done;
+ };
+
+ ReceiverState receiver1;
+ ReceiverState receiver2;
+ receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
+ receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
+
+ auto call1 = client.Echo(
+ {.msg = receiver1.message},
+ [&receiver1](
+ const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
+ Status status) {
+ EXPECT_EQ(status, OkStatus());
+ EXPECT_EQ(response.msg, receiver1.message);
+ receiver1.done.release();
+ },
+ [&receiver1](Status status) {
+ EXPECT_EQ(status, OkStatus());
+ receiver1.done.release();
+ });
+
+ auto call2 = client.Echo(
+ {.msg = receiver2.message},
+ [&receiver2](
+ const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
+ Status status) {
+ EXPECT_EQ(status, OkStatus());
+ EXPECT_EQ(response.msg, receiver2.message);
+ receiver2.done.release();
+ },
+ [&receiver2](Status status) {
+ EXPECT_EQ(status, OkStatus());
+ receiver2.done.release();
+ });
+
+ // Calling `ingress_b.ProcessIncomingData` reads all packets from the
+ // transport and dispatches them according to the ingress configuration.
+ // Dispatching a packet generates a reply message: we then read it back at the
+ // sender by calling `ingress_a.ProcessIncomingData`.
+ EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
+ OkStatus());
+ EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
+ OkStatus());
+
+ receiver1.done.acquire();
+ receiver2.done.acquire();
+}
+
+TEST(RpcEgressIngressTest, MalformedRpcPacket) {
+ constexpr uint32_t kTestChannel = 1;
+ constexpr size_t kMtu = 33;
+ std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}};
+
+ TestTransport transport(kMtu);
+ SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
+
+ TestLocalEgress local_egress;
+ std::array rx_channels = {
+ ChannelEgress{kTestChannel, local_egress},
+ };
+
+ SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels);
+
+ EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus());
+ EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
+
+ EXPECT_EQ(ingress.num_bad_packets(), 1u);
+ EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
+ EXPECT_EQ(ingress.num_missing_egresses(), 0u);
+ EXPECT_EQ(ingress.num_egress_errors(), 0u);
+}
+
+TEST(RpcEgressIngressTest, ChannelIdOverflow) {
+ constexpr uint32_t kInvalidChannelId = 65;
+ constexpr size_t kMtu = 128;
+
+ TestTransport transport(kMtu);
+ SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
+
+ std::array sender_tx_channels = {
+ rpc::Channel::Create<kInvalidChannelId>(&egress)};
+
+ ServiceRegistry registry(sender_tx_channels);
+ auto client =
+ registry
+ .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
+ kInvalidChannelId);
+
+ SimpleRpcIngress<kMaxPacketSize> ingress;
+
+ auto receiver = client.Echo({.msg = "test"});
+
+ EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
+
+ EXPECT_EQ(ingress.num_bad_packets(), 0u);
+ EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u);
+ EXPECT_EQ(ingress.num_missing_egresses(), 0u);
+ EXPECT_EQ(ingress.num_egress_errors(), 0u);
+}
+
+TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) {
+ constexpr uint32_t kChannelA = 22;
+ constexpr uint32_t kChannelB = 33;
+ constexpr size_t kMtu = 128;
+
+ TestTransport transport(kMtu);
+ SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
+
+ std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)};
+
+ ServiceRegistry registry(sender_tx_channels);
+ auto client =
+ registry
+ .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
+ kChannelA);
+
+ std::array ingress_channels = {ChannelEgress(kChannelB, egress)};
+ SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
+
+ auto receiver = client.Echo({.msg = "test"});
+
+ EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
+
+ EXPECT_EQ(ingress.num_bad_packets(), 0u);
+ EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
+ EXPECT_EQ(ingress.num_missing_egresses(), 1u);
+ EXPECT_EQ(ingress.num_egress_errors(), 0u);
+}
+
+TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) {
+ constexpr uint32_t kChannelId = 22;
+ constexpr size_t kMtu = 128;
+
+ TestTransport good_transport(kMtu, /*is_faulty=*/false);
+ TestTransport bad_transport(kMtu, /*is_faulty=*/true);
+ SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport);
+ SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport);
+
+ std::array sender_tx_channels = {
+ rpc::Channel::Create<kChannelId>(&good_egress)};
+
+ ServiceRegistry registry(sender_tx_channels);
+ auto client =
+ registry
+ .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
+ kChannelId);
+
+ std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)};
+ SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
+
+ auto receiver = client.Echo({.msg = "test"});
+
+ EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus());
+
+ EXPECT_EQ(ingress.num_bad_packets(), 0u);
+ EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
+ EXPECT_EQ(ingress.num_missing_egresses(), 0u);
+ EXPECT_EQ(ingress.num_egress_errors(), 1u);
+}
+
+} // namespace
+} // namespace pw::rpc