diff options
author | Carlos Chinchilla <cachinchilla@google.com> | 2021-09-16 17:30:31 -0700 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2021-09-23 16:36:59 +0000 |
commit | 6f253ac64c9bba1f6b9951634ed051684bdae0fc (patch) | |
tree | 288d3d667ebcd4ef90a2b85e68c6941276c1dbe5 /pw_log_rpc | |
parent | 54891aeecdbbd29c0ba65ce39bb26942ac5edd20 (diff) | |
download | pigweed-6f253ac64c9bba1f6b9951634ed051684bdae0fc.tar.gz |
pw_log_rpc: Add unit tests with open server writer
Test RPC log streams with provided open server writer.
No-Docs-Update-Reason: added unit tests.
Change-Id: I81acdd59361b1883f25e34fde4e83d5a089f70af
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61422
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r-- | pw_log_rpc/BUILD.bazel | 2 | ||||
-rw-r--r-- | pw_log_rpc/BUILD.gn | 6 | ||||
-rw-r--r-- | pw_log_rpc/log_service_test.cc | 187 | ||||
-rw-r--r-- | pw_log_rpc/rpc_log_drain_test.cc | 80 |
4 files changed, 265 insertions, 10 deletions
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel index 27c722117..29048a904 100644 --- a/pw_log_rpc/BUILD.bazel +++ b/pw_log_rpc/BUILD.bazel @@ -94,7 +94,9 @@ pw_cc_test( "rpc_log_drain_test.cc", ], deps = [ + ":log_service", ":rpc_log_drain", + "//pw_rpc/raw:test_method_context", "//pw_unit_test", ], ) diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn index 122fccb9d..4e4189d96 100644 --- a/pw_log_rpc/BUILD.gn +++ b/pw_log_rpc/BUILD.gn @@ -91,7 +91,11 @@ pw_test("log_service_test") { pw_test("rpc_log_drain_test") { sources = [ "rpc_log_drain_test.cc" ] - deps = [ ":rpc_log_drain" ] + deps = [ + ":log_service", + ":rpc_log_drain", + "$dir_pw_rpc/raw:test_method_context", + ] } # TODO(cachinchilla): update docs. diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc index 620ea46e6..5737c447e 100644 --- a/pw_log_rpc/log_service_test.cc +++ b/pw_log_rpc/log_service_test.cc @@ -19,12 +19,15 @@ #include <limits> #include "gtest/gtest.h" +#include "pw_assert/check.h" #include "pw_containers/vector.h" #include "pw_log/log.h" #include "pw_log/proto/log.pwpb.h" #include "pw_log/proto_utils.h" #include "pw_protobuf/decoder.h" #include "pw_result/result.h" +#include "pw_rpc/channel.h" +#include "pw_rpc/raw/fake_channel_output.h" #include "pw_rpc/raw/test_method_context.h" #include "pw_string/string_builder.h" #include "pw_sync/mutex.h" @@ -91,15 +94,17 @@ class LogServiceTest : public ::testing::Test { std::array<std::byte, kMaxLogEntrySize> drain_buffer1_; std::array<std::byte, kMaxLogEntrySize> drain_buffer2_; std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_; + static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1; + static constexpr uint32_t kCloseWriterOnErrorDrainId = 2; static constexpr uint32_t kSmallBufferDrainId = 3; sync::Mutex shared_mutex_; std::array<RpcLogDrain, kMaxDrains> drains_{ - RpcLogDrain(1, + RpcLogDrain(kIgnoreWriterErrorsDrainId, drain_buffer1_, shared_mutex_, RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), RpcLogDrain( - 2, + kCloseWriterOnErrorDrainId, drain_buffer2_, shared_mutex_, RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError), @@ -119,6 +124,12 @@ void VerifyLogEntry(protobuf::Decoder& entry_decoder, EXPECT_TRUE(entry_decoder.Next().ok()); // message [tokenized] EXPECT_EQ(1U, entry_decoder.FieldNumber()); EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok()); + if (tokenized_data.size() != expected_tokenized_data.size()) { + PW_LOG_ERROR( + "actual: '%s', expected: '%s'", + reinterpret_cast<const char*>(tokenized_data.begin()), + reinterpret_cast<const char*>(expected_tokenized_data.begin())); + } EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size()); EXPECT_EQ(std::memcmp(tokenized_data.begin(), expected_tokenized_data.begin(), @@ -176,6 +187,14 @@ size_t VerifyLogEntries(protobuf::Decoder& entries_decoder, return entries_found; } +size_t CountLogEntries(protobuf::Decoder& entries_decoder) { + size_t entries_found = 0; + while (entries_decoder.Next().ok()) { + ++entries_found; + } + return entries_found; +} + TEST_F(LogServiceTest, AssignWriter) { // Drains don't have writers. for (auto& drain : drain_map_.drains()) { @@ -386,9 +405,167 @@ TEST_F(LogServiceTest, LargeLogEntry) { entry_decoder, expected_metadata, expected_message, expected_timestamp); } -// TODO(pwbug/469): add tests for an open RawServerWriter that closes or fails -// while flushing, then on re-open the drain sends a counts. The drain mus have -// ignore_writer_error disabled. +TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { + const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId; + auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id); + ASSERT_TRUE(drain.ok()); + + LogService log_service(drain_map_); + const uint32_t output_buffer_size = 100; + rpc::RawFakeChannelOutput<output_buffer_size, 10> output( + rpc::MethodType::kServerStreaming); + rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output)); + rpc::Server server(std::span(&channel, 1)); + + // Add as many entries needed to have multiple packets send. + const uint32_t min_packets_sent = 4; + const uint32_t max_messages_per_response = + output_buffer_size / sizeof(kMessage); + const size_t total_entries = min_packets_sent * max_messages_per_response; + AddLogEntries(total_entries, kMessage); + + // Interrupt log stream with an error. + const uint32_t successful_packets_sent = min_packets_sent - 2; + output.set_send_status(Status::Unavailable(), successful_packets_sent); + + // Request logs. + rpc::RawServerWriter writer = + rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_channel_id, log_service); + EXPECT_EQ(drain.value()->Open(writer), OkStatus()); + // This drain closes on errors. + EXPECT_EQ(drain.value()->Flush(), Status::Aborted()); + EXPECT_TRUE(output.done()); + + // Make sure not all packets were sent. + ASSERT_EQ(output.total_stream_packets(), successful_packets_sent); + + // Verify data in responses. + Vector<ConstByteSpan, total_entries> message_stack; + for (size_t i = 0; i < total_entries; ++i) { + message_stack.push_back( + std::as_bytes(std::span(std::string_view(kMessage)))); + } + size_t entries_found = 0; + for (auto& response : output.responses()) { + protobuf::Decoder entry_decoder(response); + entries_found += VerifyLogEntries(entry_decoder, message_stack); + } + + // Verify that not all the entries were sent. + EXPECT_LE(entries_found, total_entries); + + // Reset channel output and resume log stream with a new writer. + output.clear(); + writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_channel_id, log_service); + EXPECT_EQ(drain.value()->Open(writer), OkStatus()); + EXPECT_EQ(drain.value()->Flush(), OkStatus()); + + // Add expected messages to the stack in the reverse order they are received. + message_stack.clear(); + // One full packet was dropped. Since all messages are the same length, there + // are entries_found / successful_packets_sent per packet. + const uint32_t total_drop_count = entries_found / successful_packets_sent; + const uint32_t remaining_entries = total_entries - total_drop_count; + for (size_t i = 0; i < remaining_entries; ++i) { + message_stack.push_back( + std::as_bytes(std::span(std::string_view(kMessage)))); + } + StringBuffer<32> message; + message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count)); + message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + + for (auto& response : output.responses()) { + protobuf::Decoder entry_decoder(response); + entries_found += VerifyLogEntries(entry_decoder, message_stack); + } + // All entries are accounted for, including the drop message. + EXPECT_EQ(entries_found, remaining_entries + 1); +} + +TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) { + const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId; + auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id); + ASSERT_TRUE(drain.ok()); + + LogService log_service(drain_map_); + const uint32_t output_buffer_size = 100; + rpc::RawFakeChannelOutput<output_buffer_size, 10> output( + rpc::MethodType::kServerStreaming); + rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output)); + rpc::Server server(std::span(&channel, 1)); + + // Add as many entries needed to have multiple packets send. + const uint32_t min_packets_sent = 4; + const uint32_t max_messages_per_response = + output_buffer_size / sizeof(kMessage); + const size_t total_entries = min_packets_sent * max_messages_per_response; + AddLogEntries(total_entries, kMessage); + + // Interrupt log stream with an error. + const uint32_t error_on_packet_count = min_packets_sent; + output.set_send_status(Status::Unavailable(), min_packets_sent); + + // Request logs. + rpc::RawServerWriter writer = + rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_channel_id, log_service); + EXPECT_EQ(drain.value()->Open(writer), OkStatus()); + // This drain ignores errors. + EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_FALSE(output.done()); + + // Make some packets were sent. + ASSERT_GE(output.total_stream_packets(), min_packets_sent); + + // Verify that not all the entries were sent. + size_t entries_found = 0; + for (auto& response : output.responses()) { + protobuf::Decoder entry_decoder(response); + entries_found += CountLogEntries(entry_decoder); + } + EXPECT_LE(entries_found, total_entries); + + // Verify that a drop message count is found. + // Don't account the drop count message in the total drop count. + const uint32_t total_drop_count = total_entries - entries_found + 1; + // Since all messages are the same, the is a constant `total_drop_count` + // number of entries per packet. + const uint32_t entry_count_before_error = + error_on_packet_count * total_drop_count; + const uint32_t entry_count_after_error = + entries_found - 1 - entry_count_before_error; + Vector<ConstByteSpan, total_entries> message_stack; + // Add messages to the stack in the reverse order they are sent. + for (size_t i = 0; i < entry_count_after_error; ++i) { + message_stack.push_back( + std::as_bytes(std::span(std::string_view(kMessage)))); + } + StringBuffer<32> message; + message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count)); + message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + for (size_t i = 0; i < entry_count_before_error; ++i) { + message_stack.push_back( + std::as_bytes(std::span(std::string_view(kMessage)))); + } + + for (auto& response : output.responses()) { + protobuf::Decoder entry_decoder(response); + VerifyLogEntries(entry_decoder, message_stack); + } + + // More calls to flush with errors will not affect this stubborn drain. + const size_t previous_stream_packet_count = output.total_stream_packets(); + output.set_send_status(Status::Unavailable()); + EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_FALSE(output.done()); + ASSERT_EQ(output.total_stream_packets(), previous_stream_packet_count); + + output.clear(); + EXPECT_EQ(drain.value()->Close(), OkStatus()); + EXPECT_TRUE(output.done()); +} } // namespace } // namespace pw::log_rpc diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc index 1eba11876..5913171c6 100644 --- a/pw_log_rpc/rpc_log_drain_test.cc +++ b/pw_log_rpc/rpc_log_drain_test.cc @@ -19,8 +19,12 @@ #include <span> #include "gtest/gtest.h" +#include "pw_log_rpc/log_service.h" #include "pw_log_rpc/rpc_log_drain_map.h" #include "pw_multisink/multisink.h" +#include "pw_rpc/channel.h" +#include "pw_rpc/raw/fake_channel_output.h" +#include "pw_rpc/raw/server_reader_writer.h" #include "pw_status/status.h" #include "pw_sync/mutex.h" @@ -32,11 +36,11 @@ static constexpr size_t kBufferSize = TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) { // Drain without a writer. const uint32_t drain_id = 1; - std::array<std::byte, kBufferSize> buffer1; + std::array<std::byte, kBufferSize> buffer; sync::Mutex mutex; RpcLogDrain drain( drain_id, - buffer1, + buffer, mutex, RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError); EXPECT_EQ(drain.channel_id(), drain_id); @@ -87,8 +91,76 @@ TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) { } } -// TODO(cachinchilla): add tests for passing an open RawServerWriter when there -// is a way to create an one manually. +TEST(RpcLogDrain, FlushingDrainWithOpenWriter) { + const uint32_t drain_id = 1; + std::array<std::byte, kBufferSize> buffer; + sync::Mutex mutex; + std::array<RpcLogDrain, 1> drains{ + RpcLogDrain( + drain_id, + buffer, + mutex, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError), + }; + RpcLogDrainMap drain_map(drains); + LogService log_service(drain_map); + + rpc::RawFakeChannelOutput<128, 1> output(rpc::MethodType::kServerStreaming); + rpc::Channel channel(rpc::Channel::Create<drain_id>(&output)); + rpc::Server server(std::span(&channel, 1)); + + // Attach drain to a MultiSink. + RpcLogDrain& drain = drains[0]; + std::array<std::byte, kBufferSize * 2> multisink_buffer; + multisink::MultiSink multisink(multisink_buffer); + multisink.AttachDrain(drain); + EXPECT_EQ(drain.Flush(), Status::Unavailable()); + + rpc::RawServerWriter writer = + rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_id, log_service); + ASSERT_TRUE(writer.open()); + EXPECT_EQ(drain.Open(writer), OkStatus()); + EXPECT_EQ(drain.Flush(), OkStatus()); + // Can call multliple times until closed on error. + EXPECT_EQ(drain.Flush(), OkStatus()); + EXPECT_EQ(drain.Close(), OkStatus()); + rpc::RawServerWriter& writer_ref = writer; + ASSERT_FALSE(writer_ref.open()); + EXPECT_EQ(drain.Flush(), Status::Unavailable()); +} + +TEST(RpcLogDrain, TryReopenOpenedDrain) { + const uint32_t drain_id = 1; + std::array<std::byte, kBufferSize> buffer; + sync::Mutex mutex; + std::array<RpcLogDrain, 1> drains{ + RpcLogDrain( + drain_id, + buffer, + mutex, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError), + }; + RpcLogDrainMap drain_map(drains); + LogService log_service(drain_map); + + rpc::RawFakeChannelOutput<128, 1> output(rpc::MethodType::kServerStreaming); + rpc::Channel channel(rpc::Channel::Create<drain_id>(&output)); + rpc::Server server(std::span(&channel, 1)); + + // Open Drain and try to open with a new writer. + rpc::RawServerWriter writer = + rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_id, log_service); + ASSERT_TRUE(writer.open()); + RpcLogDrain& drain = drains[0]; + EXPECT_EQ(drain.Open(writer), OkStatus()); + rpc::RawServerWriter second_writer = + rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_id, log_service); + ASSERT_TRUE(second_writer.open()); + EXPECT_EQ(drain.Open(second_writer), Status::AlreadyExists()); +} } // namespace } // namespace pw::log_rpc |