diff options
Diffstat (limited to 'pw_log_rpc/rpc_log_drain_test.cc')
-rw-r--r-- | pw_log_rpc/rpc_log_drain_test.cc | 157 |
1 files changed, 98 insertions, 59 deletions
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc index 83918a7ac..157e1158d 100644 --- a/pw_log_rpc/rpc_log_drain_test.cc +++ b/pw_log_rpc/rpc_log_drain_test.cc @@ -43,48 +43,6 @@ namespace { static constexpr size_t kBufferSize = RpcLogDrain::kMinEntrySizeWithoutPayload + 32; -// Verifies a stream of log entries and updates the total drop count found. -// expected_entries is expected to be in the same order that messages were -// added to the multisink. -void VerifyLogEntriesInCorrectOrder( - protobuf::Decoder& entries_decoder, - const Vector<TestLogEntry>& expected_entries, - uint32_t expected_first_entry_sequence_id, - uint32_t& drop_count_out) { - size_t entries_found = 0; - - while (entries_decoder.Next().ok()) { - if (static_cast<pw::log::LogEntries::Fields>( - entries_decoder.FieldNumber()) == - log::LogEntries::Fields::ENTRIES) { - ConstByteSpan entry; - EXPECT_EQ(entries_decoder.ReadBytes(&entry), OkStatus()); - protobuf::Decoder entry_decoder(entry); - if (expected_entries.empty()) { - break; - } - - ASSERT_LT(entries_found, expected_entries.size()); - - // Keep track of entries and drops respective counts. - uint32_t current_drop_count = 0; - VerifyLogEntry( - entry_decoder, expected_entries[entries_found], current_drop_count); - drop_count_out += current_drop_count; - if (current_drop_count == 0) { - ++entries_found; - } - } else if (static_cast<pw::log::LogEntries::Fields>( - entries_decoder.FieldNumber()) == - log::LogEntries::Fields::FIRST_ENTRY_SEQUENCE_ID) { - uint32_t first_entry_sequence_id = 0; - EXPECT_EQ(entries_decoder.ReadUint32(&first_entry_sequence_id), - OkStatus()); - EXPECT_EQ(expected_first_entry_sequence_id, first_entry_sequence_id); - } - } -} - TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) { // Drain without a writer. const uint32_t drain_id = 1; @@ -245,12 +203,11 @@ class TrickleTest : public ::testing::Test { server_(std::span(&channel_, 1)) {} TestLogEntry BasicLog(std::string_view message) { - constexpr log_tokenized::Metadata kSampleMetadata = - log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>(); return {.metadata = kSampleMetadata, - .timestamp = 123, + .timestamp = kSampleTimestamp, .dropped = 0, - .tokenized_data = std::as_bytes(std::span(message))}; + .tokenized_data = std::as_bytes(std::span(message)), + .thread = std::as_bytes(std::span(kSampleThreadName))}; } void AttachDrain() { multisink_.AttachDrain(drains_[0]); } @@ -264,14 +221,13 @@ class TrickleTest : public ::testing::Test { log::EncodeTokenizedLog(entry.metadata, entry.tokenized_data, entry.timestamp, + entry.thread, log_message_encode_buffer_); ASSERT_EQ(encoded_log_result.status(), OkStatus()); EXPECT_LE(encoded_log_result.value().size(), kMaxMessageSize); multisink_.HandleEntry(encoded_log_result.value()); } - // VerifyLogEntriesInCorrectOrder() expects logs to be in the opposite - // direction compared to when they were added to the multisink. void AddLogEntries(const Vector<TestLogEntry>& entries) { for (const TestLogEntry& entry : entries) { AddLogEntry(entry); @@ -280,8 +236,29 @@ class TrickleTest : public ::testing::Test { static constexpr uint32_t kDrainChannelId = 1; static constexpr size_t kMaxMessageSize = 60; + + // Use the size of the encoded BasicLog entry to calculate buffer sizes and + // better control the number of entries in each sent bulk. + static constexpr log_tokenized::Metadata kSampleMetadata = + log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, 300>(); + static constexpr uint64_t kSampleTimestamp = 9000; + static constexpr std::string_view kSampleThreadName = "thread"; + static constexpr size_t kBasicLogSizeWithoutPayload = + protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MESSAGE, 0) + + protobuf::SizeOfFieldUint32( + log::LogEntry::Fields::LINE_LEVEL, + log::PackLineLevel(kSampleMetadata.line_number(), + kSampleMetadata.level())) + + protobuf::SizeOfFieldUint32(log::LogEntry::Fields::FLAGS, + kSampleMetadata.flags()) + + protobuf::SizeOfFieldInt64(log::LogEntry::Fields::TIMESTAMP, + kSampleTimestamp) + + protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MODULE, + sizeof(kSampleMetadata.module())) + + protobuf::SizeOfFieldBytes(log::LogEntry::Fields::THREAD, + kSampleThreadName.size()); static constexpr size_t kDrainEncodeBufferSize = - RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize; + kBasicLogSizeWithoutPayload + kMaxMessageSize; static constexpr size_t kChannelEncodeBufferSize = kDrainEncodeBufferSize * 2; std::array<std::byte, kMaxMessageSize> log_message_encode_buffer_; std::array<std::byte, kDrainEncodeBufferSize> drain_encode_buffer_; @@ -325,11 +302,13 @@ TEST_F(TrickleTest, EntriesAreFlushedToSinglePayload) { EXPECT_EQ(payloads.size(), 1u); uint32_t drop_count = 0; + size_t entries_count = 0; protobuf::Decoder payload_decoder(payloads[0]); payload_decoder.Reset(payloads[0]); - VerifyLogEntriesInCorrectOrder( - payload_decoder, kExpectedEntries, 0, drop_count); + VerifyLogEntries( + payload_decoder, kExpectedEntries, 0, entries_count, drop_count); EXPECT_EQ(drop_count, 0u); + EXPECT_EQ(entries_count, 3u); } TEST_F(TrickleTest, ManyLogsOverflowToNextPayload) { @@ -361,16 +340,20 @@ TEST_F(TrickleTest, ManyLogsOverflowToNextPayload) { ASSERT_EQ(payloads.size(), 2u); uint32_t drop_count = 0; + size_t entries_count = 0; protobuf::Decoder payload_decoder(payloads[0]); payload_decoder.Reset(payloads[0]); - VerifyLogEntriesInCorrectOrder( - payload_decoder, kFirstFlushedBundle, 0, drop_count); + VerifyLogEntries( + payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count); EXPECT_EQ(drop_count, 0u); + EXPECT_EQ(entries_count, 3u); + entries_count = 0; payload_decoder.Reset(payloads[1]); - VerifyLogEntriesInCorrectOrder( - payload_decoder, kSecondFlushedBundle, 3, drop_count); + VerifyLogEntries( + payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count); EXPECT_EQ(drop_count, 0u); + EXPECT_EQ(entries_count, 3u); } TEST_F(TrickleTest, LimitedFlushOverflowsToNextPayload) { @@ -406,22 +389,78 @@ TEST_F(TrickleTest, LimitedFlushOverflowsToNextPayload) { output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId); ASSERT_EQ(first_flush_payloads.size(), 1u); uint32_t drop_count = 0; + size_t entries_count = 0; protobuf::Decoder payload_decoder(first_flush_payloads[0]); payload_decoder.Reset(first_flush_payloads[0]); - VerifyLogEntriesInCorrectOrder( - payload_decoder, kFirstFlushedBundle, 0, drop_count); + VerifyLogEntries( + payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count); + EXPECT_EQ(entries_count, 3u); // An additional flush should produce another payload. min_delay = drains_[0].Trickle(channel_encode_buffer_); EXPECT_EQ(min_delay.has_value(), false); drop_count = 0; + entries_count = 0; + rpc::PayloadsView second_flush_payloads = output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId); ASSERT_EQ(second_flush_payloads.size(), 2u); payload_decoder.Reset(second_flush_payloads[1]); - VerifyLogEntriesInCorrectOrder( - payload_decoder, kSecondFlushedBundle, 3, drop_count); + VerifyLogEntries( + payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count); EXPECT_EQ(drop_count, 0u); + EXPECT_EQ(entries_count, 3u); +} + +TEST(RpcLogDrain, OnOpenCallbackCalled) { + // Create drain and log components. + const uint32_t drain_id = 1; + std::array<std::byte, kBufferSize> buffer; + sync::Mutex mutex; + RpcLogDrain drain( + drain_id, + buffer, + mutex, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError, + nullptr); + RpcLogDrainMap drain_map(std::span(&drain, 1)); + LogService log_service(drain_map); + std::array<std::byte, kBufferSize * 2> multisink_buffer; + multisink::MultiSink multisink(multisink_buffer); + multisink.AttachDrain(drain); + + // Create server writer. + rpc::RawFakeChannelOutput<3> output; + rpc::Channel channel(rpc::Channel::Create<drain_id>(&output)); + rpc::Server server(std::span(&channel, 1)); + rpc::RawServerWriter writer = + rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_id, log_service); + + int callback_call_times = 0; + Function<void()> callback = [&callback_call_times]() { + ++callback_call_times; + }; + + // Callback not called when not set. + ASSERT_TRUE(writer.active()); + ASSERT_EQ(drain.Open(writer), OkStatus()); + EXPECT_EQ(callback_call_times, 0); + + drain.set_on_open_callback(std::move(callback)); + + // Callback called when writer is open. + writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( + server, drain_id, log_service); + ASSERT_TRUE(writer.active()); + ASSERT_EQ(drain.Open(writer), OkStatus()); + EXPECT_EQ(callback_call_times, 1); + + // Callback not called when writer is closed. + rpc::RawServerWriter closed_writer; + ASSERT_FALSE(closed_writer.active()); + ASSERT_EQ(drain.Open(closed_writer), Status::FailedPrecondition()); + EXPECT_EQ(callback_call_times, 1); } } // namespace |