aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc/rpc_log_drain_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_log_rpc/rpc_log_drain_test.cc')
-rw-r--r--pw_log_rpc/rpc_log_drain_test.cc157
1 files changed, 98 insertions, 59 deletions
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index 83918a7ac..157e1158d 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -43,48 +43,6 @@ namespace {
static constexpr size_t kBufferSize =
RpcLogDrain::kMinEntrySizeWithoutPayload + 32;
-// Verifies a stream of log entries and updates the total drop count found.
-// expected_entries is expected to be in the same order that messages were
-// added to the multisink.
-void VerifyLogEntriesInCorrectOrder(
- protobuf::Decoder& entries_decoder,
- const Vector<TestLogEntry>& expected_entries,
- uint32_t expected_first_entry_sequence_id,
- uint32_t& drop_count_out) {
- size_t entries_found = 0;
-
- while (entries_decoder.Next().ok()) {
- if (static_cast<pw::log::LogEntries::Fields>(
- entries_decoder.FieldNumber()) ==
- log::LogEntries::Fields::ENTRIES) {
- ConstByteSpan entry;
- EXPECT_EQ(entries_decoder.ReadBytes(&entry), OkStatus());
- protobuf::Decoder entry_decoder(entry);
- if (expected_entries.empty()) {
- break;
- }
-
- ASSERT_LT(entries_found, expected_entries.size());
-
- // Keep track of entries and drops respective counts.
- uint32_t current_drop_count = 0;
- VerifyLogEntry(
- entry_decoder, expected_entries[entries_found], current_drop_count);
- drop_count_out += current_drop_count;
- if (current_drop_count == 0) {
- ++entries_found;
- }
- } else if (static_cast<pw::log::LogEntries::Fields>(
- entries_decoder.FieldNumber()) ==
- log::LogEntries::Fields::FIRST_ENTRY_SEQUENCE_ID) {
- uint32_t first_entry_sequence_id = 0;
- EXPECT_EQ(entries_decoder.ReadUint32(&first_entry_sequence_id),
- OkStatus());
- EXPECT_EQ(expected_first_entry_sequence_id, first_entry_sequence_id);
- }
- }
-}
-
TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
// Drain without a writer.
const uint32_t drain_id = 1;
@@ -245,12 +203,11 @@ class TrickleTest : public ::testing::Test {
server_(std::span(&channel_, 1)) {}
TestLogEntry BasicLog(std::string_view message) {
- constexpr log_tokenized::Metadata kSampleMetadata =
- log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
return {.metadata = kSampleMetadata,
- .timestamp = 123,
+ .timestamp = kSampleTimestamp,
.dropped = 0,
- .tokenized_data = std::as_bytes(std::span(message))};
+ .tokenized_data = std::as_bytes(std::span(message)),
+ .thread = std::as_bytes(std::span(kSampleThreadName))};
}
void AttachDrain() { multisink_.AttachDrain(drains_[0]); }
@@ -264,14 +221,13 @@ class TrickleTest : public ::testing::Test {
log::EncodeTokenizedLog(entry.metadata,
entry.tokenized_data,
entry.timestamp,
+ entry.thread,
log_message_encode_buffer_);
ASSERT_EQ(encoded_log_result.status(), OkStatus());
EXPECT_LE(encoded_log_result.value().size(), kMaxMessageSize);
multisink_.HandleEntry(encoded_log_result.value());
}
- // VerifyLogEntriesInCorrectOrder() expects logs to be in the opposite
- // direction compared to when they were added to the multisink.
void AddLogEntries(const Vector<TestLogEntry>& entries) {
for (const TestLogEntry& entry : entries) {
AddLogEntry(entry);
@@ -280,8 +236,29 @@ class TrickleTest : public ::testing::Test {
static constexpr uint32_t kDrainChannelId = 1;
static constexpr size_t kMaxMessageSize = 60;
+
+ // Use the size of the encoded BasicLog entry to calculate buffer sizes and
+ // better control the number of entries in each sent bulk.
+ static constexpr log_tokenized::Metadata kSampleMetadata =
+ log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, 300>();
+ static constexpr uint64_t kSampleTimestamp = 9000;
+ static constexpr std::string_view kSampleThreadName = "thread";
+ static constexpr size_t kBasicLogSizeWithoutPayload =
+ protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MESSAGE, 0) +
+ protobuf::SizeOfFieldUint32(
+ log::LogEntry::Fields::LINE_LEVEL,
+ log::PackLineLevel(kSampleMetadata.line_number(),
+ kSampleMetadata.level())) +
+ protobuf::SizeOfFieldUint32(log::LogEntry::Fields::FLAGS,
+ kSampleMetadata.flags()) +
+ protobuf::SizeOfFieldInt64(log::LogEntry::Fields::TIMESTAMP,
+ kSampleTimestamp) +
+ protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MODULE,
+ sizeof(kSampleMetadata.module())) +
+ protobuf::SizeOfFieldBytes(log::LogEntry::Fields::THREAD,
+ kSampleThreadName.size());
static constexpr size_t kDrainEncodeBufferSize =
- RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize;
+ kBasicLogSizeWithoutPayload + kMaxMessageSize;
static constexpr size_t kChannelEncodeBufferSize = kDrainEncodeBufferSize * 2;
std::array<std::byte, kMaxMessageSize> log_message_encode_buffer_;
std::array<std::byte, kDrainEncodeBufferSize> drain_encode_buffer_;
@@ -325,11 +302,13 @@ TEST_F(TrickleTest, EntriesAreFlushedToSinglePayload) {
EXPECT_EQ(payloads.size(), 1u);
uint32_t drop_count = 0;
+ size_t entries_count = 0;
protobuf::Decoder payload_decoder(payloads[0]);
payload_decoder.Reset(payloads[0]);
- VerifyLogEntriesInCorrectOrder(
- payload_decoder, kExpectedEntries, 0, drop_count);
+ VerifyLogEntries(
+ payload_decoder, kExpectedEntries, 0, entries_count, drop_count);
EXPECT_EQ(drop_count, 0u);
+ EXPECT_EQ(entries_count, 3u);
}
TEST_F(TrickleTest, ManyLogsOverflowToNextPayload) {
@@ -361,16 +340,20 @@ TEST_F(TrickleTest, ManyLogsOverflowToNextPayload) {
ASSERT_EQ(payloads.size(), 2u);
uint32_t drop_count = 0;
+ size_t entries_count = 0;
protobuf::Decoder payload_decoder(payloads[0]);
payload_decoder.Reset(payloads[0]);
- VerifyLogEntriesInCorrectOrder(
- payload_decoder, kFirstFlushedBundle, 0, drop_count);
+ VerifyLogEntries(
+ payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count);
EXPECT_EQ(drop_count, 0u);
+ EXPECT_EQ(entries_count, 3u);
+ entries_count = 0;
payload_decoder.Reset(payloads[1]);
- VerifyLogEntriesInCorrectOrder(
- payload_decoder, kSecondFlushedBundle, 3, drop_count);
+ VerifyLogEntries(
+ payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count);
EXPECT_EQ(drop_count, 0u);
+ EXPECT_EQ(entries_count, 3u);
}
TEST_F(TrickleTest, LimitedFlushOverflowsToNextPayload) {
@@ -406,22 +389,78 @@ TEST_F(TrickleTest, LimitedFlushOverflowsToNextPayload) {
output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
ASSERT_EQ(first_flush_payloads.size(), 1u);
uint32_t drop_count = 0;
+ size_t entries_count = 0;
protobuf::Decoder payload_decoder(first_flush_payloads[0]);
payload_decoder.Reset(first_flush_payloads[0]);
- VerifyLogEntriesInCorrectOrder(
- payload_decoder, kFirstFlushedBundle, 0, drop_count);
+ VerifyLogEntries(
+ payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count);
+ EXPECT_EQ(entries_count, 3u);
// An additional flush should produce another payload.
min_delay = drains_[0].Trickle(channel_encode_buffer_);
EXPECT_EQ(min_delay.has_value(), false);
drop_count = 0;
+ entries_count = 0;
+
rpc::PayloadsView second_flush_payloads =
output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
ASSERT_EQ(second_flush_payloads.size(), 2u);
payload_decoder.Reset(second_flush_payloads[1]);
- VerifyLogEntriesInCorrectOrder(
- payload_decoder, kSecondFlushedBundle, 3, drop_count);
+ VerifyLogEntries(
+ payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count);
EXPECT_EQ(drop_count, 0u);
+ EXPECT_EQ(entries_count, 3u);
+}
+
+TEST(RpcLogDrain, OnOpenCallbackCalled) {
+ // Create drain and log components.
+ const uint32_t drain_id = 1;
+ std::array<std::byte, kBufferSize> buffer;
+ sync::Mutex mutex;
+ RpcLogDrain drain(
+ drain_id,
+ buffer,
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ nullptr);
+ RpcLogDrainMap drain_map(std::span(&drain, 1));
+ LogService log_service(drain_map);
+ std::array<std::byte, kBufferSize * 2> multisink_buffer;
+ multisink::MultiSink multisink(multisink_buffer);
+ multisink.AttachDrain(drain);
+
+ // Create server writer.
+ rpc::RawFakeChannelOutput<3> output;
+ rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
+ rpc::Server server(std::span(&channel, 1));
+ rpc::RawServerWriter writer =
+ rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_id, log_service);
+
+ int callback_call_times = 0;
+ Function<void()> callback = [&callback_call_times]() {
+ ++callback_call_times;
+ };
+
+ // Callback not called when not set.
+ ASSERT_TRUE(writer.active());
+ ASSERT_EQ(drain.Open(writer), OkStatus());
+ EXPECT_EQ(callback_call_times, 0);
+
+ drain.set_on_open_callback(std::move(callback));
+
+ // Callback called when writer is open.
+ writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+ server, drain_id, log_service);
+ ASSERT_TRUE(writer.active());
+ ASSERT_EQ(drain.Open(writer), OkStatus());
+ EXPECT_EQ(callback_call_times, 1);
+
+ // Callback not called when writer is closed.
+ rpc::RawServerWriter closed_writer;
+ ASSERT_FALSE(closed_writer.active());
+ ASSERT_EQ(drain.Open(closed_writer), Status::FailedPrecondition());
+ EXPECT_EQ(callback_call_times, 1);
}
} // namespace