aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc
diff options
context:
space:
mode:
authorCarlos Chinchilla <cachinchilla@google.com>2021-11-02 16:08:21 -0700
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2021-11-10 18:47:18 +0000
commit4663ef627204c314a482d299d351c54cecb9230b (patch)
tree8dd2f88f22ce72502ffeeeb2c88a23ba7457d065 /pw_log_rpc
parent61505e088df449a535419b9b0840be2681b3bd7e (diff)
downloadpigweed-4663ef627204c314a482d299d351c54cecb9230b.tar.gz
pw_log & pw_log_rpc: use dropped field count
- Use the dropped field to notify RPC clients of log drops and avoid the extra memory cost that may be added by the string builder. - Refactor unit tests to improve parsing log entries. Change-Id: I409995598cb4453c46ca4f04259da5ee9d3ac99c Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/67060 Reviewed-by: Keir Mierle <keir@google.com> Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r--pw_log_rpc/BUILD.bazel2
-rw-r--r--pw_log_rpc/BUILD.gn5
-rw-r--r--pw_log_rpc/docs.rst3
-rw-r--r--pw_log_rpc/log_service_test.cc285
-rw-r--r--pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h19
-rw-r--r--pw_log_rpc/rpc_log_drain.cc14
6 files changed, 175 insertions, 153 deletions
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index c5f411c1e..f1db98cfc 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -51,7 +51,6 @@ pw_cc_library(
"//pw_protobuf",
"//pw_result",
"//pw_status",
- "//pw_string",
"//pw_sync:lock_annotations",
"//pw_sync:mutex",
],
@@ -88,7 +87,6 @@ pw_cc_test(
"//pw_result",
"//pw_rpc/raw:test_method_context",
"//pw_status",
- "//pw_string",
"//pw_unit_test",
],
)
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 5e15a1dc8..cbe304af1 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -44,10 +44,6 @@ pw_source_set("rpc_log_drain") {
"public/pw_log_rpc/rpc_log_drain_map.h",
]
sources = [ "rpc_log_drain.cc" ]
- deps = [
- "$dir_pw_log",
- "$dir_pw_string",
- ]
public_deps = [
"$dir_pw_assert",
"$dir_pw_log:protos.pwpb",
@@ -89,7 +85,6 @@ pw_test("log_service_test") {
"$dir_pw_result",
"$dir_pw_rpc/raw:test_method_context",
"$dir_pw_status",
- "$dir_pw_string",
]
}
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst
index b3db664d9..d93083c69 100644
--- a/pw_log_rpc/docs.rst
+++ b/pw_log_rpc/docs.rst
@@ -118,7 +118,8 @@ The user must provide a buffer large enough for the largest entry in the
``MultiSink`` while also accounting for the interface's Maximum Transmission
Unit (MTU). If the ``RpcLogDrain`` finds a drop message count as it reads the
``MultiSink`` it will insert a message in the stream with the drop message
-count.
+count in the log proto dropped optional field. The receiving end can display the
+count with the logs if desired.
RpcLogDrainMap
==============
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 9c04caf92..e3a9259c3 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -24,12 +24,12 @@
#include "pw_log/log.h"
#include "pw_log/proto/log.pwpb.h"
#include "pw_log/proto_utils.h"
+#include "pw_log_tokenized/metadata.h"
#include "pw_protobuf/decoder.h"
#include "pw_result/result.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/raw/fake_channel_output.h"
#include "pw_rpc/raw/test_method_context.h"
-#include "pw_string/string_builder.h"
#include "pw_sync/mutex.h"
namespace pw::log_rpc {
@@ -41,9 +41,9 @@ using log::pw_rpc::raw::Logs;
PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 6, 128)
constexpr size_t kMaxMessageSize = 50;
-static_assert(RpcLogDrain::kMaxDropMessageSize < kMaxMessageSize);
constexpr size_t kMaxLogEntrySize =
RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize;
+static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
constexpr size_t kMaxDrains = 3;
constexpr char kMessage[] = "message";
@@ -52,8 +52,13 @@ constexpr char kMessage[] = "message";
constexpr char kLongMessage[] =
"This is a long log message that will be dropped.";
static_assert(sizeof(kLongMessage) < kMaxMessageSize);
-static_assert(sizeof(kLongMessage) > RpcLogDrain::kMaxDropMessageSize);
+static_assert(sizeof(kLongMessage) > RpcLogDrain::kMinEntryBufferSize);
std::array<std::byte, 1> rpc_request_buffer;
+constexpr auto kSampleMetadata =
+ log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
+constexpr auto kDropMessageMetadata =
+ log_tokenized::Metadata::Set<0, 0, 0, 0>();
+constexpr int64_t kSampleTimestamp = 1000;
// `LogServiceTest` sets up a logging environment for testing with a `MultiSink`
// for log entries, and multiple `RpcLogDrain`s for consuming such log entries.
@@ -68,22 +73,26 @@ class LogServiceTest : public ::testing::Test {
}
}
- void AddLogEntries(size_t log_count, std::string_view message) {
+ void AddLogEntries(size_t log_count,
+ std::string_view message,
+ log_tokenized::Metadata metadata,
+ int64_t timestamp) {
for (size_t i = 0; i < log_count; ++i) {
- AddLogEntry(message);
+ ASSERT_TRUE(AddLogEntry(message, metadata, timestamp).ok());
}
}
- void AddLogEntry(std::string_view message) {
- auto metadata =
- log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>();
+ StatusWithSize AddLogEntry(std::string_view message,
+ log_tokenized::Metadata metadata,
+ int64_t timestamp) {
Result<ConstByteSpan> encoded_log_result =
log::EncodeTokenizedLog(metadata,
std::as_bytes(std::span(message)),
- /*ticks_since_epoch=*/0,
+ timestamp,
entry_encode_buffer_);
- EXPECT_EQ(encoded_log_result.status(), OkStatus());
+ PW_TRY_WITH_SIZE(encoded_log_result.status());
multisink_.HandleEntry(encoded_log_result.value());
+ return StatusWithSize(encoded_log_result.value().size());
}
protected:
@@ -116,74 +125,80 @@ class LogServiceTest : public ::testing::Test {
RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
};
};
+struct TestLogEntry {
+ log_tokenized::Metadata metadata = kSampleMetadata;
+ int64_t timestamp = 0;
+ uint32_t dropped = 0;
+ ConstByteSpan tokenized_data = {};
+};
// Unpacks a `LogEntry` proto buffer and compares it with the expected data.
void VerifyLogEntry(protobuf::Decoder& entry_decoder,
- log_tokenized::Metadata expected_metadata,
- ConstByteSpan expected_tokenized_data,
- const int64_t expected_timestamp) {
+ const TestLogEntry& expected_entry) {
ConstByteSpan tokenized_data;
- EXPECT_TRUE(entry_decoder.Next().ok()); // message [tokenized]
- EXPECT_EQ(1U, entry_decoder.FieldNumber());
- EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
- if (tokenized_data.size() != expected_tokenized_data.size()) {
- PW_LOG_ERROR(
- "actual: '%s', expected: '%s'",
- reinterpret_cast<const char*>(tokenized_data.begin()),
- reinterpret_cast<const char*>(expected_tokenized_data.begin()));
+ if (!expected_entry.tokenized_data.empty()) {
+ ASSERT_EQ(entry_decoder.Next(), OkStatus());
+ ASSERT_EQ(entry_decoder.FieldNumber(), 1u); // message [tokenized]
+ ASSERT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
+ if (tokenized_data.size() != expected_entry.tokenized_data.size()) {
+ PW_LOG_ERROR(
+ "actual: '%s', expected: '%s'",
+ reinterpret_cast<const char*>(tokenized_data.begin()),
+ reinterpret_cast<const char*>(expected_entry.tokenized_data.begin()));
+ }
+ EXPECT_EQ(tokenized_data.size(), expected_entry.tokenized_data.size());
+ EXPECT_EQ(std::memcmp(tokenized_data.begin(),
+ expected_entry.tokenized_data.begin(),
+ expected_entry.tokenized_data.size()),
+ 0);
+ }
+ if (expected_entry.metadata.level()) {
+ ASSERT_EQ(entry_decoder.Next(), OkStatus());
+ ASSERT_EQ(entry_decoder.FieldNumber(), 2u); // line_level
+ uint32_t line_level;
+ ASSERT_TRUE(entry_decoder.ReadUint32(&line_level).ok());
+ EXPECT_EQ(expected_entry.metadata.level(),
+ line_level & PW_LOG_LEVEL_BITMASK);
+ EXPECT_EQ(expected_entry.metadata.line_number(),
+ (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS);
}
- EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size());
- EXPECT_EQ(std::memcmp(tokenized_data.begin(),
- expected_tokenized_data.begin(),
- expected_tokenized_data.size()),
- 0);
-
- uint32_t line_level;
- EXPECT_TRUE(entry_decoder.Next().ok()); // line_level
- EXPECT_EQ(2U, entry_decoder.FieldNumber());
- EXPECT_TRUE(entry_decoder.ReadUint32(&line_level).ok());
- EXPECT_EQ(expected_metadata.level(), line_level & PW_LOG_LEVEL_BITMASK);
- EXPECT_EQ(expected_metadata.line_number(),
- (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS);
-
- if (expected_metadata.flags() != 0) {
+ if (expected_entry.metadata.flags()) {
+ ASSERT_EQ(entry_decoder.Next(), OkStatus());
+ ASSERT_EQ(entry_decoder.FieldNumber(), 3u); // flags
uint32_t flags;
- EXPECT_TRUE(entry_decoder.Next().ok()); // flags
- EXPECT_EQ(3U, entry_decoder.FieldNumber());
- EXPECT_TRUE(entry_decoder.ReadUint32(&flags).ok());
- EXPECT_EQ(expected_metadata.flags(), flags);
+ ASSERT_TRUE(entry_decoder.ReadUint32(&flags).ok());
+ EXPECT_EQ(expected_entry.metadata.flags(), flags);
}
-
- const bool has_timestamp = entry_decoder.Next().ok(); // timestamp
- if (expected_timestamp == 0 && !has_timestamp) {
- return;
+ if (expected_entry.timestamp) {
+ ASSERT_EQ(entry_decoder.Next(), OkStatus());
+ ASSERT_TRUE(entry_decoder.FieldNumber() == 4u // timestamp
+ || entry_decoder.FieldNumber() == 5u); // time_since_last_entry
+ int64_t timestamp;
+ ASSERT_TRUE(entry_decoder.ReadInt64(&timestamp).ok());
+ EXPECT_EQ(expected_entry.timestamp, timestamp);
+ }
+ if (expected_entry.dropped) {
+ ASSERT_EQ(entry_decoder.Next(), OkStatus());
+ ASSERT_EQ(entry_decoder.FieldNumber(), 6u); // dropped
+ uint32_t dropped;
+ ASSERT_TRUE(entry_decoder.ReadUint32(&dropped).ok());
+ EXPECT_EQ(expected_entry.dropped, dropped);
}
- int64_t timestamp;
- EXPECT_TRUE(has_timestamp);
- EXPECT_EQ(4U, entry_decoder.FieldNumber());
- EXPECT_TRUE(entry_decoder.ReadInt64(&timestamp).ok());
- EXPECT_EQ(expected_timestamp, timestamp);
}
// Verifies a stream of log entries, returning the total count found.
size_t VerifyLogEntries(protobuf::Decoder& entries_decoder,
- Vector<ConstByteSpan>& message_stack) {
+ Vector<TestLogEntry>& expected_entries_stack) {
size_t entries_found = 0;
while (entries_decoder.Next().ok()) {
ConstByteSpan entry;
EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
protobuf::Decoder entry_decoder(entry);
- auto expected_metadata =
- log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>();
- if (message_stack.empty()) {
+ if (expected_entries_stack.empty()) {
break;
}
- ConstByteSpan expected_message = message_stack.back();
- VerifyLogEntry(entry_decoder,
- expected_metadata,
- expected_message,
- /*expected_timestamp=*/0);
- message_stack.pop_back();
+ VerifyLogEntry(entry_decoder, expected_entries_stack.back());
+ expected_entries_stack.pop_back();
++entries_found;
}
return entries_found;
@@ -248,7 +263,7 @@ TEST_F(LogServiceTest, StartAndEndStream) {
// Add log entries.
const size_t total_entries = 10;
- AddLogEntries(total_entries, kMessage);
+ AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
// Request logs.
context.call(rpc_request_buffer);
EXPECT_EQ(active_drain.Flush(), OkStatus());
@@ -263,10 +278,11 @@ TEST_F(LogServiceTest, StartAndEndStream) {
EXPECT_GE(context.responses().size(), 1u);
// Verify data in responses.
- Vector<ConstByteSpan, total_entries> message_stack;
+ Vector<TestLogEntry, total_entries> message_stack;
for (size_t i = 0; i < total_entries; ++i) {
- message_stack.push_back(
- std::as_bytes(std::span(std::string_view(kMessage))));
+ message_stack.push_back({.timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
size_t entries_found = 0;
for (auto& response : context.responses()) {
@@ -285,7 +301,7 @@ TEST_F(LogServiceTest, HandleDropped) {
// Add log entries.
const size_t total_entries = 5;
const uint32_t total_drop_count = 2;
- AddLogEntries(total_entries, kMessage);
+ AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
multisink_.HandleDropped(total_drop_count);
// Request logs.
@@ -297,13 +313,13 @@ TEST_F(LogServiceTest, HandleDropped) {
ASSERT_GE(context.responses().size(), 1u);
// Add create expected messages in a stack to match the order they arrive in.
- Vector<ConstByteSpan, total_entries + 1> message_stack;
- StringBuffer<32> message;
- message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
- message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+ Vector<TestLogEntry, total_entries + 1> message_stack;
+ message_stack.push_back(
+ {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
for (size_t i = 0; i < total_entries; ++i) {
- message_stack.push_back(
- std::as_bytes(std::span(std::string_view(kMessage))));
+ message_stack.push_back({.timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
// Verify data in responses.
@@ -326,7 +342,7 @@ TEST_F(LogServiceTest, HandleSmallBuffer) {
// Add log entries.
const size_t total_entries = 5;
const uint32_t total_drop_count = total_entries;
- AddLogEntries(total_entries, kLongMessage);
+ AddLogEntries(total_entries, kLongMessage, kSampleMetadata, kSampleTimestamp);
// Request logs.
context.call(rpc_request_buffer);
EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus());
@@ -334,10 +350,9 @@ TEST_F(LogServiceTest, HandleSmallBuffer) {
ASSERT_EQ(context.status(), OkStatus());
ASSERT_GE(context.responses().size(), 1u);
- Vector<ConstByteSpan, total_entries + 1> message_stack;
- StringBuffer<32> message;
- message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
- message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+ Vector<TestLogEntry, total_entries + 1> message_stack;
+ message_stack.push_back(
+ {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
// Verify data in responses.
size_t entries_found = 0;
@@ -357,7 +372,7 @@ TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
// Add log entries.
const size_t total_entries = 5;
- AddLogEntries(total_entries, kMessage);
+ AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
// Request logs.
context.call(rpc_request_buffer);
EXPECT_EQ(detached_drain.Close(), OkStatus());
@@ -366,23 +381,25 @@ TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
}
TEST_F(LogServiceTest, LargeLogEntry) {
- const auto expected_metadata =
- log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
- (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
- (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
- (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>();
- ConstByteSpan expected_message = std::as_bytes(std::span(kMessage));
- const int64_t expected_timestamp = std::numeric_limits<int64_t>::max();
+ const TestLogEntry expected_entry{
+ .metadata =
+ log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
+ (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
+ (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
+ (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
+ .timestamp = std::numeric_limits<int64_t>::max(),
+ .tokenized_data = std::as_bytes(std::span(kMessage)),
+ };
// Add entry to multisink.
log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
- encoder.WriteMessage(expected_message);
+ encoder.WriteMessage(expected_entry.tokenized_data);
encoder.WriteLineLevel(
- (expected_metadata.level() & PW_LOG_LEVEL_BITMASK) |
- ((expected_metadata.line_number() << PW_LOG_LEVEL_BITS) &
+ (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
+ ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
~PW_LOG_LEVEL_BITMASK));
- encoder.WriteFlags(expected_metadata.flags());
- encoder.WriteTimestamp(expected_timestamp);
+ encoder.WriteFlags(expected_entry.metadata.flags());
+ encoder.WriteTimestamp(expected_entry.timestamp);
ASSERT_EQ(encoder.status(), OkStatus());
multisink_.HandleEntry(encoder);
@@ -403,8 +420,7 @@ TEST_F(LogServiceTest, LargeLogEntry) {
ConstByteSpan entry;
EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
protobuf::Decoder entry_decoder(entry);
- VerifyLogEntry(
- entry_decoder, expected_metadata, expected_message, expected_timestamp);
+ VerifyLogEntry(entry_decoder, expected_entry);
}
TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
@@ -413,20 +429,30 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
ASSERT_TRUE(drain.ok());
LogService log_service(drain_map_);
- const uint32_t output_buffer_size = 100;
+ const size_t output_buffer_size = 128;
+ const size_t max_packets = 10;
rpc::RawFakeChannelOutput<10, output_buffer_size, 512> output;
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
rpc::Server server(std::span(&channel, 1));
// Add as many entries needed to have multiple packets send.
- const uint32_t min_packets_sent = 4;
- const uint32_t max_messages_per_response =
- output_buffer_size / sizeof(kMessage);
- const size_t total_entries = min_packets_sent * max_messages_per_response;
- AddLogEntries(total_entries, kMessage);
+ StatusWithSize status =
+ AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
+ ASSERT_TRUE(status.ok());
+
+ // In reality less than output_buffer_size is given as a buffer, since some
+ // bytes are used for the RPC framing.
+ const uint32_t max_messages_per_response = output_buffer_size / status.size();
+ // Send less packets than the max to avoid crashes.
+ const uint32_t packets_sent = max_packets / 2;
+ const size_t total_entries = packets_sent * max_messages_per_response;
+ const size_t max_entries = 50;
+ // Check we can test all these entries.
+ ASSERT_GE(max_entries, total_entries);
+ AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
// Interrupt log stream with an error.
- const uint32_t successful_packets_sent = min_packets_sent - 2;
+ const uint32_t successful_packets_sent = packets_sent / 2;
output.set_send_status(Status::Unavailable(), successful_packets_sent);
// Request logs.
@@ -441,10 +467,11 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
// Verify data in responses.
- Vector<ConstByteSpan, total_entries> message_stack;
+ Vector<TestLogEntry, max_entries> message_stack;
for (size_t i = 0; i < total_entries; ++i) {
- message_stack.push_back(
- std::as_bytes(std::span(std::string_view(kMessage))));
+ message_stack.push_back({.timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
size_t entries_found = 0;
for (auto& response : output.payloads<Logs::Listen>()) {
@@ -469,12 +496,11 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
const uint32_t total_drop_count = entries_found / successful_packets_sent;
const uint32_t remaining_entries = total_entries - total_drop_count;
for (size_t i = 0; i < remaining_entries; ++i) {
- message_stack.push_back(
- std::as_bytes(std::span(std::string_view(kMessage))));
+ message_stack.push_back({.tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
- StringBuffer<32> message;
- message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
- message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+ message_stack.push_back(
+ {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
for (auto& response : output.payloads<Logs::Listen>()) {
protobuf::Decoder entry_decoder(response);
@@ -490,21 +516,31 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
ASSERT_TRUE(drain.ok());
LogService log_service(drain_map_);
- const uint32_t output_buffer_size = 100;
- rpc::RawFakeChannelOutput<10, output_buffer_size, 768> output;
+ const size_t output_buffer_size = 50;
+ const size_t max_packets = 20;
+ rpc::RawFakeChannelOutput<max_packets, output_buffer_size, 512> output;
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
rpc::Server server(std::span(&channel, 1));
// Add as many entries needed to have multiple packets send.
- const uint32_t min_packets_sent = 4;
- const uint32_t max_messages_per_response =
- output_buffer_size / sizeof(kMessage);
- const size_t total_entries = min_packets_sent * max_messages_per_response;
- AddLogEntries(total_entries, kMessage);
+ StatusWithSize status =
+ AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
+ ASSERT_TRUE(status.ok());
+
+ // In reality less than output_buffer_size is given as a buffer, since some
+ // bytes are used for the RPC framing.
+ const uint32_t max_messages_per_response = output_buffer_size / status.size();
+ // Send less packets than the max to avoid crashes.
+ const uint32_t packets_sent = 4;
+ const size_t total_entries = packets_sent * max_messages_per_response;
+ const size_t max_entries = 50;
+ // Check we can test all these entries.q
+ ASSERT_GE(max_entries, total_entries);
+ AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
// Interrupt log stream with an error.
- const uint32_t error_on_packet_count = min_packets_sent;
- output.set_send_status(Status::Unavailable(), min_packets_sent);
+ const uint32_t error_on_packet_count = packets_sent / 2;
+ output.set_send_status(Status::Unavailable(), error_on_packet_count);
// Request logs.
rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
@@ -514,8 +550,8 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
EXPECT_EQ(drain.value()->Flush(), OkStatus());
EXPECT_FALSE(output.done());
- // Make some packets were sent.
- ASSERT_GE(output.payloads<Logs::Listen>().size(), min_packets_sent);
+ // Make sure some packets were sent.
+ ASSERT_GE(output.payloads<Logs::Listen>().size(), packets_sent);
// Verify that not all the entries were sent.
size_t entries_found = 0;
@@ -534,18 +570,19 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
error_on_packet_count * total_drop_count;
const uint32_t entry_count_after_error =
entries_found - 1 - entry_count_before_error;
- Vector<ConstByteSpan, total_entries> message_stack;
+ Vector<TestLogEntry, max_entries> message_stack;
// Add messages to the stack in the reverse order they are sent.
for (size_t i = 0; i < entry_count_after_error; ++i) {
- message_stack.push_back(
- std::as_bytes(std::span(std::string_view(kMessage))));
+ message_stack.push_back({.timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
- StringBuffer<32> message;
- message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
- message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+ message_stack.push_back(
+ {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
for (size_t i = 0; i < entry_count_before_error; ++i) {
- message_stack.push_back(
- std::as_bytes(std::span(std::string_view(kMessage))));
+ message_stack.push_back({.timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(
+ std::span(std::string_view(kMessage)))});
}
for (auto& response : output.payloads<Logs::Listen>()) {
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
index cc1201461..3c03579b0 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -52,7 +52,8 @@ class RpcLogDrain : public multisink::MultiSink::Drain {
// The minimum buffer size, without the message payload, needed to retrieve a
// log::LogEntry from the attached MultiSink. The user must account for the
- // max message size to avoid log entry drops.
+ // max message size to avoid log entry drops. The dropped field is not
+ // accounted since a dropped message has all other fields unset.
static constexpr size_t kMinEntrySizeWithoutPayload =
// message
protobuf::SizeOfFieldKey(1) +
@@ -65,13 +66,15 @@ class RpcLogDrain : public multisink::MultiSink::Drain {
protobuf::kMaxSizeBytesUint32
// timestamp or time_since_last_entry
+ protobuf::SizeOfFieldKey(4) + protobuf::kMaxSizeBytesInt64;
- // Message format to report the drop count.
- static constexpr char kDropMessageFormatString[] = "Dropped %u";
- // With a uint32_t number, "Dropped %u" is no more than 18 characters long.
- static constexpr size_t kMaxDropMessageSize = 18;
- // The smallest buffer size must be able to fit a drop message.
- static constexpr size_t kMinEntryBufferSize =
- kMaxDropMessageSize + kMinEntrySizeWithoutPayload;
+ // The smallest buffer size must be able to fit a typical token size: 4 bytes.
+ static constexpr size_t kMinEntryBufferSize = kMinEntrySizeWithoutPayload + 4;
+
+ // When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize
+ // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize
+ // can be used to calculate the minimum RPC ChannelOutput buffer size.
+ static constexpr size_t kLogEntryEncodeFrameSize =
+ protobuf::SizeOfFieldKey(1) // LogEntry
+ + protobuf::kMaxSizeOfLength;
// Creates a log stream with the provided open writer. Useful for streaming
// logs without a request.
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index 3759162bb..3cfbbff46 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -17,28 +17,16 @@
#include <mutex>
#include "pw_assert/check.h"
-#include "pw_log/log.h"
-#include "pw_string/string_builder.h"
namespace pw::log_rpc {
namespace {
-// When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize
-// bytes added to the encoded LogEntry.
-constexpr size_t kLogEntryEncodeFrameSize =
- protobuf::SizeOfFieldKey(1) // LogEntry
- + protobuf::kMaxSizeOfLength;
// Creates an encoded drop message on the provided buffer.
Result<ConstByteSpan> CreateEncodedDropMessage(
uint32_t drop_count, ByteSpan encoded_drop_message_buffer) {
- StringBuffer<RpcLogDrain::kMaxDropMessageSize> message;
- message.Format(RpcLogDrain::kDropMessageFormatString,
- static_cast<unsigned int>(drop_count));
-
// Encode message in protobuf.
log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
- encoder.WriteMessage(std::as_bytes(std::span(std::string_view(message))));
- encoder.WriteLineLevel(PW_LOG_LEVEL_WARN & PW_LOG_LEVEL_BITMASK);
+ encoder.WriteDropped(drop_count);
PW_TRY(encoder.status());
return ConstByteSpan(encoder);
}