aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc
diff options
context:
space:
mode:
authorCarlos Chinchilla <cachinchilla@google.com>2021-09-16 17:30:31 -0700
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2021-09-23 16:36:59 +0000
commit6f253ac64c9bba1f6b9951634ed051684bdae0fc (patch)
tree288d3d667ebcd4ef90a2b85e68c6941276c1dbe5 /pw_log_rpc
parent54891aeecdbbd29c0ba65ce39bb26942ac5edd20 (diff)
downloadpigweed-6f253ac64c9bba1f6b9951634ed051684bdae0fc.tar.gz
pw_log_rpc: Add unit tests with open server writer
Test RPC log streams with provided open server writer. No-Docs-Update-Reason: added unit tests. Change-Id: I81acdd59361b1883f25e34fde4e83d5a089f70af Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61422 Commit-Queue: Carlos Chinchilla <cachinchilla@google.com> Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r--pw_log_rpc/BUILD.bazel2
-rw-r--r--pw_log_rpc/BUILD.gn6
-rw-r--r--pw_log_rpc/log_service_test.cc187
-rw-r--r--pw_log_rpc/rpc_log_drain_test.cc80
4 files changed, 265 insertions, 10 deletions
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index 27c722117..29048a904 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -94,7 +94,9 @@ pw_cc_test(
"rpc_log_drain_test.cc",
],
deps = [
+ ":log_service",
":rpc_log_drain",
+ "//pw_rpc/raw:test_method_context",
"//pw_unit_test",
],
)
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 122fccb9d..4e4189d96 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -91,7 +91,11 @@ pw_test("log_service_test") {
pw_test("rpc_log_drain_test") {
sources = [ "rpc_log_drain_test.cc" ]
- deps = [ ":rpc_log_drain" ]
+ deps = [
+ ":log_service",
+ ":rpc_log_drain",
+ "$dir_pw_rpc/raw:test_method_context",
+ ]
}
# TODO(cachinchilla): update docs.
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 620ea46e6..5737c447e 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -19,12 +19,15 @@
#include <limits>
#include "gtest/gtest.h"
+#include "pw_assert/check.h"
#include "pw_containers/vector.h"
#include "pw_log/log.h"
#include "pw_log/proto/log.pwpb.h"
#include "pw_log/proto_utils.h"
#include "pw_protobuf/decoder.h"
#include "pw_result/result.h"
+#include "pw_rpc/channel.h"
+#include "pw_rpc/raw/fake_channel_output.h"
#include "pw_rpc/raw/test_method_context.h"
#include "pw_string/string_builder.h"
#include "pw_sync/mutex.h"
@@ -91,15 +94,17 @@ class LogServiceTest : public ::testing::Test {
std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
std::array<std::byte, kMaxLogEntrySize> drain_buffer2_;
std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_;
+ static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
+ static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
static constexpr uint32_t kSmallBufferDrainId = 3;
sync::Mutex shared_mutex_;
std::array<RpcLogDrain, kMaxDrains> drains_{
- RpcLogDrain(1,
+ RpcLogDrain(kIgnoreWriterErrorsDrainId,
drain_buffer1_,
shared_mutex_,
RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
RpcLogDrain(
- 2,
+ kCloseWriterOnErrorDrainId,
drain_buffer2_,
shared_mutex_,
RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
@@ -119,6 +124,12 @@ void VerifyLogEntry(protobuf::Decoder& entry_decoder,
EXPECT_TRUE(entry_decoder.Next().ok()); // message [tokenized]
EXPECT_EQ(1U, entry_decoder.FieldNumber());
EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
+ if (tokenized_data.size() != expected_tokenized_data.size()) {
+ PW_LOG_ERROR(
+ "actual: '%s', expected: '%s'",
+ reinterpret_cast<const char*>(tokenized_data.begin()),
+ reinterpret_cast<const char*>(expected_tokenized_data.begin()));
+ }
EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size());
EXPECT_EQ(std::memcmp(tokenized_data.begin(),
expected_tokenized_data.begin(),
@@ -176,6 +187,14 @@ size_t VerifyLogEntries(protobuf::Decoder& entries_decoder,
return entries_found;
}
+size_t CountLogEntries(protobuf::Decoder& entries_decoder) {
+ size_t entries_found = 0;
+ while (entries_decoder.Next().ok()) {
+ ++entries_found;
+ }
+ return entries_found;
+}
+
TEST_F(LogServiceTest, AssignWriter) {
// Drains don't have writers.
for (auto& drain : drain_map_.drains()) {
@@ -386,9 +405,167 @@ TEST_F(LogServiceTest, LargeLogEntry) {
entry_decoder, expected_metadata, expected_message, expected_timestamp);
}
-// TODO(pwbug/469): add tests for an open RawServerWriter that closes or fails
-// while flushing, then on re-open the drain sends a counts. The drain mus have
-// ignore_writer_error disabled.
+TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
+ const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
+ auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
+ ASSERT_TRUE(drain.ok());
+
+ LogService log_service(drain_map_);
+ const uint32_t output_buffer_size = 100;
+ rpc::RawFakeChannelOutput<output_buffer_size, 10> output(
+ rpc::MethodType::kServerStreaming);
+ rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
+ rpc::Server server(std::span(&channel, 1));
+
+ // Add as many entries needed to have multiple packets send.
+ const uint32_t min_packets_sent = 4;
+ const uint32_t max_messages_per_response =
+ output_buffer_size / sizeof(kMessage);
+ const size_t total_entries = min_packets_sent * max_messages_per_response;
+ AddLogEntries(total_entries, kMessage);
+
+ // Interrupt log stream with an error.
+ const uint32_t successful_packets_sent = min_packets_sent - 2;
+ output.set_send_status(Status::Unavailable(), successful_packets_sent);
+
+ // Request logs.
+ rpc::RawServerWriter writer =
+ rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_channel_id, log_service);
+ EXPECT_EQ(drain.value()->Open(writer), OkStatus());
+ // This drain closes on errors.
+ EXPECT_EQ(drain.value()->Flush(), Status::Aborted());
+ EXPECT_TRUE(output.done());
+
+ // Make sure not all packets were sent.
+ ASSERT_EQ(output.total_stream_packets(), successful_packets_sent);
+
+ // Verify data in responses.
+ Vector<ConstByteSpan, total_entries> message_stack;
+ for (size_t i = 0; i < total_entries; ++i) {
+ message_stack.push_back(
+ std::as_bytes(std::span(std::string_view(kMessage))));
+ }
+ size_t entries_found = 0;
+ for (auto& response : output.responses()) {
+ protobuf::Decoder entry_decoder(response);
+ entries_found += VerifyLogEntries(entry_decoder, message_stack);
+ }
+
+ // Verify that not all the entries were sent.
+ EXPECT_LE(entries_found, total_entries);
+
+ // Reset channel output and resume log stream with a new writer.
+ output.clear();
+ writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_channel_id, log_service);
+ EXPECT_EQ(drain.value()->Open(writer), OkStatus());
+ EXPECT_EQ(drain.value()->Flush(), OkStatus());
+
+ // Add expected messages to the stack in the reverse order they are received.
+ message_stack.clear();
+ // One full packet was dropped. Since all messages are the same length, there
+ // are entries_found / successful_packets_sent per packet.
+ const uint32_t total_drop_count = entries_found / successful_packets_sent;
+ const uint32_t remaining_entries = total_entries - total_drop_count;
+ for (size_t i = 0; i < remaining_entries; ++i) {
+ message_stack.push_back(
+ std::as_bytes(std::span(std::string_view(kMessage))));
+ }
+ StringBuffer<32> message;
+ message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
+ message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+
+ for (auto& response : output.responses()) {
+ protobuf::Decoder entry_decoder(response);
+ entries_found += VerifyLogEntries(entry_decoder, message_stack);
+ }
+ // All entries are accounted for, including the drop message.
+ EXPECT_EQ(entries_found, remaining_entries + 1);
+}
+
+TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
+ const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
+ auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
+ ASSERT_TRUE(drain.ok());
+
+ LogService log_service(drain_map_);
+ const uint32_t output_buffer_size = 100;
+ rpc::RawFakeChannelOutput<output_buffer_size, 10> output(
+ rpc::MethodType::kServerStreaming);
+ rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
+ rpc::Server server(std::span(&channel, 1));
+
+ // Add as many entries needed to have multiple packets send.
+ const uint32_t min_packets_sent = 4;
+ const uint32_t max_messages_per_response =
+ output_buffer_size / sizeof(kMessage);
+ const size_t total_entries = min_packets_sent * max_messages_per_response;
+ AddLogEntries(total_entries, kMessage);
+
+ // Interrupt log stream with an error.
+ const uint32_t error_on_packet_count = min_packets_sent;
+ output.set_send_status(Status::Unavailable(), min_packets_sent);
+
+ // Request logs.
+ rpc::RawServerWriter writer =
+ rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_channel_id, log_service);
+ EXPECT_EQ(drain.value()->Open(writer), OkStatus());
+ // This drain ignores errors.
+ EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_FALSE(output.done());
+
+ // Make some packets were sent.
+ ASSERT_GE(output.total_stream_packets(), min_packets_sent);
+
+ // Verify that not all the entries were sent.
+ size_t entries_found = 0;
+ for (auto& response : output.responses()) {
+ protobuf::Decoder entry_decoder(response);
+ entries_found += CountLogEntries(entry_decoder);
+ }
+ EXPECT_LE(entries_found, total_entries);
+
+ // Verify that a drop message count is found.
+ // Don't account the drop count message in the total drop count.
+ const uint32_t total_drop_count = total_entries - entries_found + 1;
+ // Since all messages are the same, the is a constant `total_drop_count`
+ // number of entries per packet.
+ const uint32_t entry_count_before_error =
+ error_on_packet_count * total_drop_count;
+ const uint32_t entry_count_after_error =
+ entries_found - 1 - entry_count_before_error;
+ Vector<ConstByteSpan, total_entries> message_stack;
+ // Add messages to the stack in the reverse order they are sent.
+ for (size_t i = 0; i < entry_count_after_error; ++i) {
+ message_stack.push_back(
+ std::as_bytes(std::span(std::string_view(kMessage))));
+ }
+ StringBuffer<32> message;
+ message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
+ message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+ for (size_t i = 0; i < entry_count_before_error; ++i) {
+ message_stack.push_back(
+ std::as_bytes(std::span(std::string_view(kMessage))));
+ }
+
+ for (auto& response : output.responses()) {
+ protobuf::Decoder entry_decoder(response);
+ VerifyLogEntries(entry_decoder, message_stack);
+ }
+
+ // More calls to flush with errors will not affect this stubborn drain.
+ const size_t previous_stream_packet_count = output.total_stream_packets();
+ output.set_send_status(Status::Unavailable());
+ EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_FALSE(output.done());
+ ASSERT_EQ(output.total_stream_packets(), previous_stream_packet_count);
+
+ output.clear();
+ EXPECT_EQ(drain.value()->Close(), OkStatus());
+ EXPECT_TRUE(output.done());
+}
} // namespace
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index 1eba11876..5913171c6 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -19,8 +19,12 @@
#include <span>
#include "gtest/gtest.h"
+#include "pw_log_rpc/log_service.h"
#include "pw_log_rpc/rpc_log_drain_map.h"
#include "pw_multisink/multisink.h"
+#include "pw_rpc/channel.h"
+#include "pw_rpc/raw/fake_channel_output.h"
+#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_status/status.h"
#include "pw_sync/mutex.h"
@@ -32,11 +36,11 @@ static constexpr size_t kBufferSize =
TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
// Drain without a writer.
const uint32_t drain_id = 1;
- std::array<std::byte, kBufferSize> buffer1;
+ std::array<std::byte, kBufferSize> buffer;
sync::Mutex mutex;
RpcLogDrain drain(
drain_id,
- buffer1,
+ buffer,
mutex,
RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError);
EXPECT_EQ(drain.channel_id(), drain_id);
@@ -87,8 +91,76 @@ TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) {
}
}
-// TODO(cachinchilla): add tests for passing an open RawServerWriter when there
-// is a way to create an one manually.
+TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
+ const uint32_t drain_id = 1;
+ std::array<std::byte, kBufferSize> buffer;
+ sync::Mutex mutex;
+ std::array<RpcLogDrain, 1> drains{
+ RpcLogDrain(
+ drain_id,
+ buffer,
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+ };
+ RpcLogDrainMap drain_map(drains);
+ LogService log_service(drain_map);
+
+ rpc::RawFakeChannelOutput<128, 1> output(rpc::MethodType::kServerStreaming);
+ rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
+ rpc::Server server(std::span(&channel, 1));
+
+ // Attach drain to a MultiSink.
+ RpcLogDrain& drain = drains[0];
+ std::array<std::byte, kBufferSize * 2> multisink_buffer;
+ multisink::MultiSink multisink(multisink_buffer);
+ multisink.AttachDrain(drain);
+ EXPECT_EQ(drain.Flush(), Status::Unavailable());
+
+ rpc::RawServerWriter writer =
+ rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_id, log_service);
+ ASSERT_TRUE(writer.open());
+ EXPECT_EQ(drain.Open(writer), OkStatus());
+ EXPECT_EQ(drain.Flush(), OkStatus());
+ // Can call multliple times until closed on error.
+ EXPECT_EQ(drain.Flush(), OkStatus());
+ EXPECT_EQ(drain.Close(), OkStatus());
+ rpc::RawServerWriter& writer_ref = writer;
+ ASSERT_FALSE(writer_ref.open());
+ EXPECT_EQ(drain.Flush(), Status::Unavailable());
+}
+
+TEST(RpcLogDrain, TryReopenOpenedDrain) {
+ const uint32_t drain_id = 1;
+ std::array<std::byte, kBufferSize> buffer;
+ sync::Mutex mutex;
+ std::array<RpcLogDrain, 1> drains{
+ RpcLogDrain(
+ drain_id,
+ buffer,
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+ };
+ RpcLogDrainMap drain_map(drains);
+ LogService log_service(drain_map);
+
+ rpc::RawFakeChannelOutput<128, 1> output(rpc::MethodType::kServerStreaming);
+ rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
+ rpc::Server server(std::span(&channel, 1));
+
+ // Open Drain and try to open with a new writer.
+ rpc::RawServerWriter writer =
+ rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_id, log_service);
+ ASSERT_TRUE(writer.open());
+ RpcLogDrain& drain = drains[0];
+ EXPECT_EQ(drain.Open(writer), OkStatus());
+ rpc::RawServerWriter second_writer =
+ rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_id, log_service);
+ ASSERT_TRUE(second_writer.open());
+ EXPECT_EQ(drain.Open(second_writer), Status::AlreadyExists());
+}
} // namespace
} // namespace pw::log_rpc