From 4663ef627204c314a482d299d351c54cecb9230b Mon Sep 17 00:00:00 2001 From: Carlos Chinchilla Date: Tue, 2 Nov 2021 16:08:21 -0700 Subject: pw_log & pw_log_rpc: use dropped field count - Use the dropped field to notify RPC clients of log drops and avoid the extra memory cost that may be added by the string builder. - Refactor unit tests to improve parsing log entries. Change-Id: I409995598cb4453c46ca4f04259da5ee9d3ac99c Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/67060 Reviewed-by: Keir Mierle Commit-Queue: Carlos Chinchilla --- pw_log/log.proto | 15 +- pw_log_rpc/BUILD.bazel | 2 - pw_log_rpc/BUILD.gn | 5 - pw_log_rpc/docs.rst | 3 +- pw_log_rpc/log_service_test.cc | 285 +++++++++++++++------------ pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h | 19 +- pw_log_rpc/rpc_log_drain.cc | 14 +- 7 files changed, 180 insertions(+), 163 deletions(-) diff --git a/pw_log/log.proto b/pw_log/log.proto index 988f34a63..f329ca351 100644 --- a/pw_log/log.proto +++ b/pw_log/log.proto @@ -126,20 +126,15 @@ message LogEntry { int64 time_since_last_entry = 5; } + // When the log buffers are full but more logs come in, the logs are counted + // and a special log message is omitted with only counts for the number of + // messages dropped. + optional uint32 dropped = 6; + // The following fields are planned but will not be added until they are // needed. Protobuf field numbers over 15 use an extra byte, so these fields // are left out for now to avoid reserving field numbers unnecessarily. - // When the log buffers are full but more logs come in, the logs are counted - // and a special log message is omitted with only counts for the number of - // messages dropped. The timestamp indicates the time that the "missed logs" - // message was inserted into the queue. - // - // As an alternative to these fields, implementations may simply send a - // message stating the drop count. - // optional uint32 dropped = ?; - // optional uint32 dropped_warning_or_above = ?; - // Represents the device from which the log originated. The meaning of this // field is implementation defined // optional uint32 source_id = ?; diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel index c5f411c1e..f1db98cfc 100644 --- a/pw_log_rpc/BUILD.bazel +++ b/pw_log_rpc/BUILD.bazel @@ -51,7 +51,6 @@ pw_cc_library( "//pw_protobuf", "//pw_result", "//pw_status", - "//pw_string", "//pw_sync:lock_annotations", "//pw_sync:mutex", ], @@ -88,7 +87,6 @@ pw_cc_test( "//pw_result", "//pw_rpc/raw:test_method_context", "//pw_status", - "//pw_string", "//pw_unit_test", ], ) diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn index 5e15a1dc8..cbe304af1 100644 --- a/pw_log_rpc/BUILD.gn +++ b/pw_log_rpc/BUILD.gn @@ -44,10 +44,6 @@ pw_source_set("rpc_log_drain") { "public/pw_log_rpc/rpc_log_drain_map.h", ] sources = [ "rpc_log_drain.cc" ] - deps = [ - "$dir_pw_log", - "$dir_pw_string", - ] public_deps = [ "$dir_pw_assert", "$dir_pw_log:protos.pwpb", @@ -89,7 +85,6 @@ pw_test("log_service_test") { "$dir_pw_result", "$dir_pw_rpc/raw:test_method_context", "$dir_pw_status", - "$dir_pw_string", ] } diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst index b3db664d9..d93083c69 100644 --- a/pw_log_rpc/docs.rst +++ b/pw_log_rpc/docs.rst @@ -118,7 +118,8 @@ The user must provide a buffer large enough for the largest entry in the ``MultiSink`` while also accounting for the interface's Maximum Transmission Unit (MTU). If the ``RpcLogDrain`` finds a drop message count as it reads the ``MultiSink`` it will insert a message in the stream with the drop message -count. +count in the log proto dropped optional field. The receiving end can display the +count with the logs if desired. RpcLogDrainMap ============== diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc index 9c04caf92..e3a9259c3 100644 --- a/pw_log_rpc/log_service_test.cc +++ b/pw_log_rpc/log_service_test.cc @@ -24,12 +24,12 @@ #include "pw_log/log.h" #include "pw_log/proto/log.pwpb.h" #include "pw_log/proto_utils.h" +#include "pw_log_tokenized/metadata.h" #include "pw_protobuf/decoder.h" #include "pw_result/result.h" #include "pw_rpc/channel.h" #include "pw_rpc/raw/fake_channel_output.h" #include "pw_rpc/raw/test_method_context.h" -#include "pw_string/string_builder.h" #include "pw_sync/mutex.h" namespace pw::log_rpc { @@ -41,9 +41,9 @@ using log::pw_rpc::raw::Logs; PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 6, 128) constexpr size_t kMaxMessageSize = 50; -static_assert(RpcLogDrain::kMaxDropMessageSize < kMaxMessageSize); constexpr size_t kMaxLogEntrySize = RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize; +static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize); constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10; constexpr size_t kMaxDrains = 3; constexpr char kMessage[] = "message"; @@ -52,8 +52,13 @@ constexpr char kMessage[] = "message"; constexpr char kLongMessage[] = "This is a long log message that will be dropped."; static_assert(sizeof(kLongMessage) < kMaxMessageSize); -static_assert(sizeof(kLongMessage) > RpcLogDrain::kMaxDropMessageSize); +static_assert(sizeof(kLongMessage) > RpcLogDrain::kMinEntryBufferSize); std::array rpc_request_buffer; +constexpr auto kSampleMetadata = + log_tokenized::Metadata::Set(); +constexpr auto kDropMessageMetadata = + log_tokenized::Metadata::Set<0, 0, 0, 0>(); +constexpr int64_t kSampleTimestamp = 1000; // `LogServiceTest` sets up a logging environment for testing with a `MultiSink` // for log entries, and multiple `RpcLogDrain`s for consuming such log entries. @@ -68,22 +73,26 @@ class LogServiceTest : public ::testing::Test { } } - void AddLogEntries(size_t log_count, std::string_view message) { + void AddLogEntries(size_t log_count, + std::string_view message, + log_tokenized::Metadata metadata, + int64_t timestamp) { for (size_t i = 0; i < log_count; ++i) { - AddLogEntry(message); + ASSERT_TRUE(AddLogEntry(message, metadata, timestamp).ok()); } } - void AddLogEntry(std::string_view message) { - auto metadata = - log_tokenized::Metadata::Set(); + StatusWithSize AddLogEntry(std::string_view message, + log_tokenized::Metadata metadata, + int64_t timestamp) { Result encoded_log_result = log::EncodeTokenizedLog(metadata, std::as_bytes(std::span(message)), - /*ticks_since_epoch=*/0, + timestamp, entry_encode_buffer_); - EXPECT_EQ(encoded_log_result.status(), OkStatus()); + PW_TRY_WITH_SIZE(encoded_log_result.status()); multisink_.HandleEntry(encoded_log_result.value()); + return StatusWithSize(encoded_log_result.value().size()); } protected: @@ -116,74 +125,80 @@ class LogServiceTest : public ::testing::Test { RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), }; }; +struct TestLogEntry { + log_tokenized::Metadata metadata = kSampleMetadata; + int64_t timestamp = 0; + uint32_t dropped = 0; + ConstByteSpan tokenized_data = {}; +}; // Unpacks a `LogEntry` proto buffer and compares it with the expected data. void VerifyLogEntry(protobuf::Decoder& entry_decoder, - log_tokenized::Metadata expected_metadata, - ConstByteSpan expected_tokenized_data, - const int64_t expected_timestamp) { + const TestLogEntry& expected_entry) { ConstByteSpan tokenized_data; - EXPECT_TRUE(entry_decoder.Next().ok()); // message [tokenized] - EXPECT_EQ(1U, entry_decoder.FieldNumber()); - EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok()); - if (tokenized_data.size() != expected_tokenized_data.size()) { - PW_LOG_ERROR( - "actual: '%s', expected: '%s'", - reinterpret_cast(tokenized_data.begin()), - reinterpret_cast(expected_tokenized_data.begin())); + if (!expected_entry.tokenized_data.empty()) { + ASSERT_EQ(entry_decoder.Next(), OkStatus()); + ASSERT_EQ(entry_decoder.FieldNumber(), 1u); // message [tokenized] + ASSERT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok()); + if (tokenized_data.size() != expected_entry.tokenized_data.size()) { + PW_LOG_ERROR( + "actual: '%s', expected: '%s'", + reinterpret_cast(tokenized_data.begin()), + reinterpret_cast(expected_entry.tokenized_data.begin())); + } + EXPECT_EQ(tokenized_data.size(), expected_entry.tokenized_data.size()); + EXPECT_EQ(std::memcmp(tokenized_data.begin(), + expected_entry.tokenized_data.begin(), + expected_entry.tokenized_data.size()), + 0); + } + if (expected_entry.metadata.level()) { + ASSERT_EQ(entry_decoder.Next(), OkStatus()); + ASSERT_EQ(entry_decoder.FieldNumber(), 2u); // line_level + uint32_t line_level; + ASSERT_TRUE(entry_decoder.ReadUint32(&line_level).ok()); + EXPECT_EQ(expected_entry.metadata.level(), + line_level & PW_LOG_LEVEL_BITMASK); + EXPECT_EQ(expected_entry.metadata.line_number(), + (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS); } - EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size()); - EXPECT_EQ(std::memcmp(tokenized_data.begin(), - expected_tokenized_data.begin(), - expected_tokenized_data.size()), - 0); - - uint32_t line_level; - EXPECT_TRUE(entry_decoder.Next().ok()); // line_level - EXPECT_EQ(2U, entry_decoder.FieldNumber()); - EXPECT_TRUE(entry_decoder.ReadUint32(&line_level).ok()); - EXPECT_EQ(expected_metadata.level(), line_level & PW_LOG_LEVEL_BITMASK); - EXPECT_EQ(expected_metadata.line_number(), - (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS); - - if (expected_metadata.flags() != 0) { + if (expected_entry.metadata.flags()) { + ASSERT_EQ(entry_decoder.Next(), OkStatus()); + ASSERT_EQ(entry_decoder.FieldNumber(), 3u); // flags uint32_t flags; - EXPECT_TRUE(entry_decoder.Next().ok()); // flags - EXPECT_EQ(3U, entry_decoder.FieldNumber()); - EXPECT_TRUE(entry_decoder.ReadUint32(&flags).ok()); - EXPECT_EQ(expected_metadata.flags(), flags); + ASSERT_TRUE(entry_decoder.ReadUint32(&flags).ok()); + EXPECT_EQ(expected_entry.metadata.flags(), flags); } - - const bool has_timestamp = entry_decoder.Next().ok(); // timestamp - if (expected_timestamp == 0 && !has_timestamp) { - return; + if (expected_entry.timestamp) { + ASSERT_EQ(entry_decoder.Next(), OkStatus()); + ASSERT_TRUE(entry_decoder.FieldNumber() == 4u // timestamp + || entry_decoder.FieldNumber() == 5u); // time_since_last_entry + int64_t timestamp; + ASSERT_TRUE(entry_decoder.ReadInt64(×tamp).ok()); + EXPECT_EQ(expected_entry.timestamp, timestamp); + } + if (expected_entry.dropped) { + ASSERT_EQ(entry_decoder.Next(), OkStatus()); + ASSERT_EQ(entry_decoder.FieldNumber(), 6u); // dropped + uint32_t dropped; + ASSERT_TRUE(entry_decoder.ReadUint32(&dropped).ok()); + EXPECT_EQ(expected_entry.dropped, dropped); } - int64_t timestamp; - EXPECT_TRUE(has_timestamp); - EXPECT_EQ(4U, entry_decoder.FieldNumber()); - EXPECT_TRUE(entry_decoder.ReadInt64(×tamp).ok()); - EXPECT_EQ(expected_timestamp, timestamp); } // Verifies a stream of log entries, returning the total count found. size_t VerifyLogEntries(protobuf::Decoder& entries_decoder, - Vector& message_stack) { + Vector& expected_entries_stack) { size_t entries_found = 0; while (entries_decoder.Next().ok()) { ConstByteSpan entry; EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok()); protobuf::Decoder entry_decoder(entry); - auto expected_metadata = - log_tokenized::Metadata::Set(); - if (message_stack.empty()) { + if (expected_entries_stack.empty()) { break; } - ConstByteSpan expected_message = message_stack.back(); - VerifyLogEntry(entry_decoder, - expected_metadata, - expected_message, - /*expected_timestamp=*/0); - message_stack.pop_back(); + VerifyLogEntry(entry_decoder, expected_entries_stack.back()); + expected_entries_stack.pop_back(); ++entries_found; } return entries_found; @@ -248,7 +263,7 @@ TEST_F(LogServiceTest, StartAndEndStream) { // Add log entries. const size_t total_entries = 10; - AddLogEntries(total_entries, kMessage); + AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp); // Request logs. context.call(rpc_request_buffer); EXPECT_EQ(active_drain.Flush(), OkStatus()); @@ -263,10 +278,11 @@ TEST_F(LogServiceTest, StartAndEndStream) { EXPECT_GE(context.responses().size(), 1u); // Verify data in responses. - Vector message_stack; + Vector message_stack; for (size_t i = 0; i < total_entries; ++i) { - message_stack.push_back( - std::as_bytes(std::span(std::string_view(kMessage)))); + message_stack.push_back({.timestamp = kSampleTimestamp, + .tokenized_data = std::as_bytes( + std::span(std::string_view(kMessage)))}); } size_t entries_found = 0; for (auto& response : context.responses()) { @@ -285,7 +301,7 @@ TEST_F(LogServiceTest, HandleDropped) { // Add log entries. const size_t total_entries = 5; const uint32_t total_drop_count = 2; - AddLogEntries(total_entries, kMessage); + AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp); multisink_.HandleDropped(total_drop_count); // Request logs. @@ -297,13 +313,13 @@ TEST_F(LogServiceTest, HandleDropped) { ASSERT_GE(context.responses().size(), 1u); // Add create expected messages in a stack to match the order they arrive in. - Vector message_stack; - StringBuffer<32> message; - message.Format("Dropped %u", static_cast(total_drop_count)); - message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + Vector message_stack; + message_stack.push_back( + {.metadata = kDropMessageMetadata, .dropped = total_drop_count}); for (size_t i = 0; i < total_entries; ++i) { - message_stack.push_back( - std::as_bytes(std::span(std::string_view(kMessage)))); + message_stack.push_back({.timestamp = kSampleTimestamp, + .tokenized_data = std::as_bytes( + std::span(std::string_view(kMessage)))}); } // Verify data in responses. @@ -326,7 +342,7 @@ TEST_F(LogServiceTest, HandleSmallBuffer) { // Add log entries. const size_t total_entries = 5; const uint32_t total_drop_count = total_entries; - AddLogEntries(total_entries, kLongMessage); + AddLogEntries(total_entries, kLongMessage, kSampleMetadata, kSampleTimestamp); // Request logs. context.call(rpc_request_buffer); EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus()); @@ -334,10 +350,9 @@ TEST_F(LogServiceTest, HandleSmallBuffer) { ASSERT_EQ(context.status(), OkStatus()); ASSERT_GE(context.responses().size(), 1u); - Vector message_stack; - StringBuffer<32> message; - message.Format("Dropped %u", static_cast(total_drop_count)); - message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + Vector message_stack; + message_stack.push_back( + {.metadata = kDropMessageMetadata, .dropped = total_drop_count}); // Verify data in responses. size_t entries_found = 0; @@ -357,7 +372,7 @@ TEST_F(LogServiceTest, FlushDrainWithoutMultisink) { // Add log entries. const size_t total_entries = 5; - AddLogEntries(total_entries, kMessage); + AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp); // Request logs. context.call(rpc_request_buffer); EXPECT_EQ(detached_drain.Close(), OkStatus()); @@ -366,23 +381,25 @@ TEST_F(LogServiceTest, FlushDrainWithoutMultisink) { } TEST_F(LogServiceTest, LargeLogEntry) { - const auto expected_metadata = - log_tokenized::Metadata::Set(); - ConstByteSpan expected_message = std::as_bytes(std::span(kMessage)); - const int64_t expected_timestamp = std::numeric_limits::max(); + const TestLogEntry expected_entry{ + .metadata = + log_tokenized::Metadata::Set(), + .timestamp = std::numeric_limits::max(), + .tokenized_data = std::as_bytes(std::span(kMessage)), + }; // Add entry to multisink. log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_); - encoder.WriteMessage(expected_message); + encoder.WriteMessage(expected_entry.tokenized_data); encoder.WriteLineLevel( - (expected_metadata.level() & PW_LOG_LEVEL_BITMASK) | - ((expected_metadata.line_number() << PW_LOG_LEVEL_BITS) & + (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) | + ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) & ~PW_LOG_LEVEL_BITMASK)); - encoder.WriteFlags(expected_metadata.flags()); - encoder.WriteTimestamp(expected_timestamp); + encoder.WriteFlags(expected_entry.metadata.flags()); + encoder.WriteTimestamp(expected_entry.timestamp); ASSERT_EQ(encoder.status(), OkStatus()); multisink_.HandleEntry(encoder); @@ -403,8 +420,7 @@ TEST_F(LogServiceTest, LargeLogEntry) { ConstByteSpan entry; EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok()); protobuf::Decoder entry_decoder(entry); - VerifyLogEntry( - entry_decoder, expected_metadata, expected_message, expected_timestamp); + VerifyLogEntry(entry_decoder, expected_entry); } TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { @@ -413,20 +429,30 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { ASSERT_TRUE(drain.ok()); LogService log_service(drain_map_); - const uint32_t output_buffer_size = 100; + const size_t output_buffer_size = 128; + const size_t max_packets = 10; rpc::RawFakeChannelOutput<10, output_buffer_size, 512> output; rpc::Channel channel(rpc::Channel::Create(&output)); rpc::Server server(std::span(&channel, 1)); // Add as many entries needed to have multiple packets send. - const uint32_t min_packets_sent = 4; - const uint32_t max_messages_per_response = - output_buffer_size / sizeof(kMessage); - const size_t total_entries = min_packets_sent * max_messages_per_response; - AddLogEntries(total_entries, kMessage); + StatusWithSize status = + AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp); + ASSERT_TRUE(status.ok()); + + // In reality less than output_buffer_size is given as a buffer, since some + // bytes are used for the RPC framing. + const uint32_t max_messages_per_response = output_buffer_size / status.size(); + // Send less packets than the max to avoid crashes. + const uint32_t packets_sent = max_packets / 2; + const size_t total_entries = packets_sent * max_messages_per_response; + const size_t max_entries = 50; + // Check we can test all these entries. + ASSERT_GE(max_entries, total_entries); + AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp); // Interrupt log stream with an error. - const uint32_t successful_packets_sent = min_packets_sent - 2; + const uint32_t successful_packets_sent = packets_sent / 2; output.set_send_status(Status::Unavailable(), successful_packets_sent); // Request logs. @@ -441,10 +467,11 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { ASSERT_EQ(output.payloads().size(), successful_packets_sent); // Verify data in responses. - Vector message_stack; + Vector message_stack; for (size_t i = 0; i < total_entries; ++i) { - message_stack.push_back( - std::as_bytes(std::span(std::string_view(kMessage)))); + message_stack.push_back({.timestamp = kSampleTimestamp, + .tokenized_data = std::as_bytes( + std::span(std::string_view(kMessage)))}); } size_t entries_found = 0; for (auto& response : output.payloads()) { @@ -469,12 +496,11 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { const uint32_t total_drop_count = entries_found / successful_packets_sent; const uint32_t remaining_entries = total_entries - total_drop_count; for (size_t i = 0; i < remaining_entries; ++i) { - message_stack.push_back( - std::as_bytes(std::span(std::string_view(kMessage)))); + message_stack.push_back({.tokenized_data = std::as_bytes( + std::span(std::string_view(kMessage)))}); } - StringBuffer<32> message; - message.Format("Dropped %u", static_cast(total_drop_count)); - message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + message_stack.push_back( + {.metadata = kDropMessageMetadata, .dropped = total_drop_count}); for (auto& response : output.payloads()) { protobuf::Decoder entry_decoder(response); @@ -490,21 +516,31 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) { ASSERT_TRUE(drain.ok()); LogService log_service(drain_map_); - const uint32_t output_buffer_size = 100; - rpc::RawFakeChannelOutput<10, output_buffer_size, 768> output; + const size_t output_buffer_size = 50; + const size_t max_packets = 20; + rpc::RawFakeChannelOutput output; rpc::Channel channel(rpc::Channel::Create(&output)); rpc::Server server(std::span(&channel, 1)); // Add as many entries needed to have multiple packets send. - const uint32_t min_packets_sent = 4; - const uint32_t max_messages_per_response = - output_buffer_size / sizeof(kMessage); - const size_t total_entries = min_packets_sent * max_messages_per_response; - AddLogEntries(total_entries, kMessage); + StatusWithSize status = + AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp); + ASSERT_TRUE(status.ok()); + + // In reality less than output_buffer_size is given as a buffer, since some + // bytes are used for the RPC framing. + const uint32_t max_messages_per_response = output_buffer_size / status.size(); + // Send less packets than the max to avoid crashes. + const uint32_t packets_sent = 4; + const size_t total_entries = packets_sent * max_messages_per_response; + const size_t max_entries = 50; + // Check we can test all these entries.q + ASSERT_GE(max_entries, total_entries); + AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp); // Interrupt log stream with an error. - const uint32_t error_on_packet_count = min_packets_sent; - output.set_send_status(Status::Unavailable(), min_packets_sent); + const uint32_t error_on_packet_count = packets_sent / 2; + output.set_send_status(Status::Unavailable(), error_on_packet_count); // Request logs. rpc::RawServerWriter writer = rpc::RawServerWriter::Open( @@ -514,8 +550,8 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) { EXPECT_EQ(drain.value()->Flush(), OkStatus()); EXPECT_FALSE(output.done()); - // Make some packets were sent. - ASSERT_GE(output.payloads().size(), min_packets_sent); + // Make sure some packets were sent. + ASSERT_GE(output.payloads().size(), packets_sent); // Verify that not all the entries were sent. size_t entries_found = 0; @@ -534,18 +570,19 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) { error_on_packet_count * total_drop_count; const uint32_t entry_count_after_error = entries_found - 1 - entry_count_before_error; - Vector message_stack; + Vector message_stack; // Add messages to the stack in the reverse order they are sent. for (size_t i = 0; i < entry_count_after_error; ++i) { - message_stack.push_back( - std::as_bytes(std::span(std::string_view(kMessage)))); + message_stack.push_back({.timestamp = kSampleTimestamp, + .tokenized_data = std::as_bytes( + std::span(std::string_view(kMessage)))}); } - StringBuffer<32> message; - message.Format("Dropped %u", static_cast(total_drop_count)); - message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + message_stack.push_back( + {.metadata = kDropMessageMetadata, .dropped = total_drop_count}); for (size_t i = 0; i < entry_count_before_error; ++i) { - message_stack.push_back( - std::as_bytes(std::span(std::string_view(kMessage)))); + message_stack.push_back({.timestamp = kSampleTimestamp, + .tokenized_data = std::as_bytes( + std::span(std::string_view(kMessage)))}); } for (auto& response : output.payloads()) { diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h index cc1201461..3c03579b0 100644 --- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h @@ -52,7 +52,8 @@ class RpcLogDrain : public multisink::MultiSink::Drain { // The minimum buffer size, without the message payload, needed to retrieve a // log::LogEntry from the attached MultiSink. The user must account for the - // max message size to avoid log entry drops. + // max message size to avoid log entry drops. The dropped field is not + // accounted since a dropped message has all other fields unset. static constexpr size_t kMinEntrySizeWithoutPayload = // message protobuf::SizeOfFieldKey(1) + @@ -65,13 +66,15 @@ class RpcLogDrain : public multisink::MultiSink::Drain { protobuf::kMaxSizeBytesUint32 // timestamp or time_since_last_entry + protobuf::SizeOfFieldKey(4) + protobuf::kMaxSizeBytesInt64; - // Message format to report the drop count. - static constexpr char kDropMessageFormatString[] = "Dropped %u"; - // With a uint32_t number, "Dropped %u" is no more than 18 characters long. - static constexpr size_t kMaxDropMessageSize = 18; - // The smallest buffer size must be able to fit a drop message. - static constexpr size_t kMinEntryBufferSize = - kMaxDropMessageSize + kMinEntrySizeWithoutPayload; + // The smallest buffer size must be able to fit a typical token size: 4 bytes. + static constexpr size_t kMinEntryBufferSize = kMinEntrySizeWithoutPayload + 4; + + // When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize + // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize + // can be used to calculate the minimum RPC ChannelOutput buffer size. + static constexpr size_t kLogEntryEncodeFrameSize = + protobuf::SizeOfFieldKey(1) // LogEntry + + protobuf::kMaxSizeOfLength; // Creates a log stream with the provided open writer. Useful for streaming // logs without a request. diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc index 3759162bb..3cfbbff46 100644 --- a/pw_log_rpc/rpc_log_drain.cc +++ b/pw_log_rpc/rpc_log_drain.cc @@ -17,28 +17,16 @@ #include #include "pw_assert/check.h" -#include "pw_log/log.h" -#include "pw_string/string_builder.h" namespace pw::log_rpc { namespace { -// When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize -// bytes added to the encoded LogEntry. -constexpr size_t kLogEntryEncodeFrameSize = - protobuf::SizeOfFieldKey(1) // LogEntry - + protobuf::kMaxSizeOfLength; // Creates an encoded drop message on the provided buffer. Result CreateEncodedDropMessage( uint32_t drop_count, ByteSpan encoded_drop_message_buffer) { - StringBuffer message; - message.Format(RpcLogDrain::kDropMessageFormatString, - static_cast(drop_count)); - // Encode message in protobuf. log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer); - encoder.WriteMessage(std::as_bytes(std::span(std::string_view(message)))); - encoder.WriteLineLevel(PW_LOG_LEVEL_WARN & PW_LOG_LEVEL_BITMASK); + encoder.WriteDropped(drop_count); PW_TRY(encoder.status()); return ConstByteSpan(encoder); } -- cgit v1.2.3