aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc/log_service_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_log_rpc/log_service_test.cc')
-rw-r--r--pw_log_rpc/log_service_test.cc277
1 files changed, 195 insertions, 82 deletions
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index dc8fe44ac..1072ec55b 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -46,7 +46,7 @@ using log::pw_rpc::raw::Logs;
constexpr size_t kMaxMessageSize = 50;
constexpr size_t kMaxLogEntrySize =
- RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize;
+ RpcLogDrain::kMinEntryBufferSize + kMaxMessageSize;
static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
constexpr size_t kMaxDrains = 3;
@@ -57,7 +57,8 @@ constexpr char kMessage[] = "message";
constexpr char kLongMessage[] =
"This is a long log message that will be dropped.";
static_assert(sizeof(kLongMessage) < kMaxMessageSize);
-static_assert(sizeof(kLongMessage) > RpcLogDrain::kMinEntryBufferSize);
+static_assert(sizeof(kLongMessage) + RpcLogDrain::kMinEntrySizeWithoutPayload >
+ RpcLogDrain::kMinEntryBufferSize);
std::array<std::byte, 1> rpc_request_buffer;
constexpr auto kSampleMetadata =
log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
@@ -94,6 +95,7 @@ class LogServiceTest : public ::testing::Test {
log::EncodeTokenizedLog(metadata,
std::as_bytes(std::span(message)),
timestamp,
+ /*thread_name=*/{},
entry_encode_buffer_);
PW_TRY_WITH_SIZE(encoded_log_result.status());
multisink_.HandleEntry(encoded_log_result.value());
@@ -209,7 +211,7 @@ TEST_F(LogServiceTest, StartAndEndStream) {
// Not done until the stream is finished.
ASSERT_FALSE(context.done());
- active_drain.Close();
+ EXPECT_EQ(OkStatus(), active_drain.Close());
ASSERT_TRUE(context.done());
EXPECT_EQ(context.status(), OkStatus());
@@ -217,19 +219,22 @@ TEST_F(LogServiceTest, StartAndEndStream) {
EXPECT_GE(context.responses().size(), 1u);
// Verify data in responses.
- Vector<TestLogEntry, total_entries> message_stack;
+ Vector<TestLogEntry, total_entries> expected_messages;
for (size_t i = 0; i < total_entries; ++i) {
- message_stack.push_back({.metadata = kSampleMetadata,
- .timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(
- std::span(std::string_view(kMessage)))});
+ expected_messages.push_back({.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
size_t entries_found = 0;
uint32_t drop_count_found = 0;
for (auto& response : context.responses()) {
protobuf::Decoder entry_decoder(response);
- entries_found += VerifyLogEntries(
- entry_decoder, message_stack, entries_found, drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
}
EXPECT_EQ(entries_found, total_entries);
EXPECT_EQ(drop_count_found, 0u);
@@ -243,28 +248,44 @@ TEST_F(LogServiceTest, HandleDropped) {
// Add log entries.
const size_t total_entries = 5;
+ const size_t entries_before_drop = 1;
const uint32_t total_drop_count = 2;
- AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
+
+ // Force a drop entry in between entries.
+ AddLogEntries(
+ entries_before_drop, kMessage, kSampleMetadata, kSampleTimestamp);
multisink_.HandleDropped(total_drop_count);
+ AddLogEntries(total_entries - entries_before_drop,
+ kMessage,
+ kSampleMetadata,
+ kSampleTimestamp);
// Request logs.
context.call(rpc_request_buffer);
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
- active_drain.Close();
+ EXPECT_EQ(OkStatus(), active_drain.Close());
ASSERT_EQ(context.status(), OkStatus());
// There is at least 1 response with multiple log entries packed.
ASSERT_GE(context.responses().size(), 1u);
- // Add create expected messages in a stack to match the order they arrive
- // in.
- Vector<TestLogEntry, total_entries + 1> message_stack;
- message_stack.push_back(
- {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
- for (size_t i = 0; i < total_entries; ++i) {
- message_stack.push_back({.metadata = kSampleMetadata,
- .timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(
- std::span(std::string_view(kMessage)))});
+ Vector<TestLogEntry, total_entries + 1> expected_messages;
+ size_t i = 0;
+ for (; i < entries_before_drop; ++i) {
+ expected_messages.push_back({.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
+ }
+ expected_messages.push_back(
+ {.metadata = kDropMessageMetadata,
+ .dropped = total_drop_count,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
+ for (; i < total_entries; ++i) {
+ expected_messages.push_back({.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
// Verify data in responses.
@@ -272,45 +293,123 @@ TEST_F(LogServiceTest, HandleDropped) {
uint32_t drop_count_found = 0;
for (auto& response : context.responses()) {
protobuf::Decoder entry_decoder(response);
- entries_found += VerifyLogEntries(
- entry_decoder, message_stack, entries_found, drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
}
EXPECT_EQ(entries_found, total_entries);
EXPECT_EQ(drop_count_found, total_drop_count);
}
-TEST_F(LogServiceTest, HandleSmallBuffer) {
+TEST_F(LogServiceTest, HandleDroppedBetweenFilteredOutLogs) {
+ RpcLogDrain& active_drain = drains_[0];
+ const uint32_t drain_channel_id = active_drain.channel_id();
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ context.set_channel_id(drain_channel_id);
+ // Set filter to drop INFO+ and keep DEBUG logs
+ rules1_[0].action = Filter::Rule::Action::kDrop;
+ rules1_[0].level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL;
+
+ // Add log entries.
+ const size_t total_entries = 5;
+ const uint32_t total_drop_count = total_entries - 1;
+
+ // Force a drop entry in between entries that will be filtered out.
+ for (size_t i = 1; i < total_entries; ++i) {
+ ASSERT_EQ(
+ OkStatus(),
+ AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
+ multisink_.HandleDropped(1);
+ }
+ // Add message that won't be filtered out.
+ constexpr auto metadata =
+ log_tokenized::Metadata::Set<PW_LOG_LEVEL_DEBUG, 0, 0, __LINE__>();
+ ASSERT_EQ(OkStatus(),
+ AddLogEntry(kMessage, metadata, kSampleTimestamp).status());
+
+ // Request logs.
+ context.call(rpc_request_buffer);
+ EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
+ EXPECT_EQ(OkStatus(), active_drain.Close());
+ ASSERT_EQ(context.status(), OkStatus());
+ // There is at least 1 response with multiple log entries packed.
+ ASSERT_GE(context.responses().size(), 1u);
+
+ Vector<TestLogEntry, 2> expected_messages;
+ expected_messages.push_back(
+ {.metadata = kDropMessageMetadata,
+ .dropped = total_drop_count,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
+ expected_messages.push_back(
+ {.metadata = metadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+
+ // Verify data in responses.
+ size_t entries_found = 0;
+ uint32_t drop_count_found = 0;
+ for (auto& response : context.responses()) {
+ protobuf::Decoder entry_decoder(response);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
+ }
+ EXPECT_EQ(entries_found, 1u);
+ EXPECT_EQ(drop_count_found, total_drop_count);
+}
+
+TEST_F(LogServiceTest, HandleSmallLogEntryBuffer) {
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
context.set_channel_id(kSmallBufferDrainId);
auto small_buffer_drain =
drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
ASSERT_TRUE(small_buffer_drain.ok());
- // Add log entries.
+ // Add long entries that don't fit the drain's log entry buffer, except for
+ // one, since drop count messages are only sent when a log entry can be sent.
const size_t total_entries = 5;
- const uint32_t total_drop_count = total_entries;
- AddLogEntries(total_entries, kLongMessage, kSampleMetadata, kSampleTimestamp);
+ const uint32_t total_drop_count = total_entries - 1;
+ AddLogEntries(
+ total_drop_count, kLongMessage, kSampleMetadata, kSampleTimestamp);
+ EXPECT_EQ(OkStatus(),
+ AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
+
// Request logs.
context.call(rpc_request_buffer);
EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
ASSERT_EQ(context.status(), OkStatus());
- ASSERT_GE(context.responses().size(), 1u);
+ ASSERT_EQ(context.responses().size(), 1u);
- Vector<TestLogEntry, total_entries + 1> message_stack;
- message_stack.push_back(
- {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
+ Vector<TestLogEntry, total_entries + 1> expected_messages;
+ expected_messages.push_back(
+ {.metadata = kDropMessageMetadata,
+ .dropped = total_drop_count,
+ .tokenized_data = std::as_bytes(std::span(
+ std::string_view(RpcLogDrain::kSmallStackBufferErrorMessage)))});
+ expected_messages.push_back(
+ {.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
- // Verify data in responses.
+ // Expect one drop message with the total drop count, and the only message
+ // that fits the buffer.
size_t entries_found = 0;
uint32_t drop_count_found = 0;
for (auto& response : context.responses()) {
protobuf::Decoder entry_decoder(response);
- entries_found += VerifyLogEntries(
- entry_decoder, message_stack, entries_found, drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
}
- // No messages fit the buffer, expect a drop message.
- EXPECT_EQ(entries_found, 0u);
+ EXPECT_EQ(entries_found, 1u);
EXPECT_EQ(drop_count_found, total_drop_count);
}
@@ -366,7 +465,7 @@ TEST_F(LogServiceTest, LargeLogEntry) {
context.set_channel_id(drain_channel_id);
context.call(rpc_request_buffer);
ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
- active_drain.Close();
+ EXPECT_EQ(OkStatus(), active_drain.Close());
ASSERT_EQ(context.status(), OkStatus());
ASSERT_EQ(context.responses().size(), 1u);
@@ -423,19 +522,22 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
// Verify data in responses.
- Vector<TestLogEntry, max_entries> message_stack;
+ Vector<TestLogEntry, max_entries> expected_messages;
for (size_t i = 0; i < total_entries; ++i) {
- message_stack.push_back({.metadata = kSampleMetadata,
- .timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(
- std::span(std::string_view(kMessage)))});
+ expected_messages.push_back({.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
size_t entries_found = 0;
uint32_t drop_count_found = 0;
for (auto& response : output.payloads<Logs::Listen>()) {
protobuf::Decoder entry_decoder(response);
- entries_found += VerifyLogEntries(
- entry_decoder, message_stack, entries_found, drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
}
// Verify that not all the entries were sent.
@@ -451,30 +553,37 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
- // Add expected messages to the stack in the reverse order they are
- // received.
- message_stack.clear();
// One full packet was dropped. Since all messages are the same length,
// there are entries_found / successful_packets_sent per packet.
const uint32_t total_drop_count = entries_found / successful_packets_sent;
+ Vector<TestLogEntry, max_entries> expected_messages_after_reset;
+ expected_messages_after_reset.push_back(
+ {.metadata = kDropMessageMetadata,
+ .dropped = total_drop_count,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(RpcLogDrain::kWriterErrorMessage)))});
+
const uint32_t remaining_entries = total_entries - total_drop_count;
for (size_t i = 0; i < remaining_entries; ++i) {
- message_stack.push_back({.metadata = kSampleMetadata,
- .timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(
- std::span(std::string_view(kMessage)))});
+ expected_messages_after_reset.push_back(
+ {.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data =
+ std::as_bytes(std::span(std::string_view(kMessage)))});
}
- message_stack.push_back(
- {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
+ size_t entries_found_after_reset = 0;
for (auto& response : output.payloads<Logs::Listen>()) {
protobuf::Decoder entry_decoder(response);
- entries_found += VerifyLogEntries(entry_decoder,
- message_stack,
- entries_found + total_drop_count,
- drop_count_found);
+ uint32_t expected_sequence_id =
+ entries_found + entries_found_after_reset + total_drop_count;
+ VerifyLogEntries(entry_decoder,
+ expected_messages_after_reset,
+ expected_sequence_id,
+ entries_found_after_reset,
+ drop_count_found);
}
- EXPECT_EQ(entries_found, remaining_entries);
+ EXPECT_EQ(entries_found + entries_found_after_reset, remaining_entries);
EXPECT_EQ(drop_count_found, total_drop_count);
}
@@ -529,12 +638,12 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
// Verify that all messages were sent.
const uint32_t total_drop_count = total_entries - entries_found;
- Vector<TestLogEntry, max_entries> message_stack;
+ Vector<TestLogEntry, max_entries> expected_messages;
for (size_t i = 0; i < entries_found; ++i) {
- message_stack.push_back({.metadata = kSampleMetadata,
- .timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(
- std::span(std::string_view(kMessage)))});
+ expected_messages.push_back({.metadata = kSampleMetadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
entries_found = 0;
@@ -542,15 +651,19 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
uint32_t i = 0;
for (; i < error_on_packet_count; ++i) {
protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
- entries_found += VerifyLogEntries(
- entry_decoder, message_stack, entries_found, drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
}
for (; i < output.payloads<Logs::Listen>().size(); ++i) {
protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
- entries_found += VerifyLogEntries(entry_decoder,
- message_stack,
- entries_found + total_drop_count,
- drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found + total_drop_count,
+ entries_found,
+ drop_count_found);
}
// This drain ignores errors and thus doesn't report drops on its own.
EXPECT_EQ(drop_count_found, 0u);
@@ -595,20 +708,17 @@ TEST_F(LogServiceTest, FilterLogs) {
ASSERT_TRUE(
AddLogEntry(kMessage, different_module_metadata, kSampleTimestamp).ok());
- // Add messages to the stack in the reverse order they are sent.
- Vector<TestLogEntry, 3> message_stack;
- message_stack.push_back(
- {.metadata = error_metadata,
+ Vector<TestLogEntry, 3> expected_messages{
+ {.metadata = info_metadata,
.timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
- message_stack.push_back(
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
{.metadata = warn_metadata,
.timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
- message_stack.push_back(
- {.metadata = info_metadata,
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
+ {.metadata = error_metadata,
.timestamp = kSampleTimestamp,
- .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
+ };
// Set up filter rules for drain at drains_[1].
RpcLogDrain& drain = drains_[1];
@@ -639,8 +749,11 @@ TEST_F(LogServiceTest, FilterLogs) {
uint32_t drop_count_found = 0;
for (auto& response : context.responses()) {
protobuf::Decoder entry_decoder(response);
- entries_found += VerifyLogEntries(
- entry_decoder, message_stack, entries_found, drop_count_found);
+ VerifyLogEntries(entry_decoder,
+ expected_messages,
+ entries_found,
+ entries_found,
+ drop_count_found);
}
EXPECT_EQ(entries_found, 3u);
EXPECT_EQ(drop_count_found, 0u);