diff options
author | Wyatt Hepler <hepler@google.com> | 2022-01-26 22:18:44 -0800 |
---|---|---|
committer | Wyatt Hepler <hepler@google.com> | 2022-02-01 16:56:19 +0000 |
commit | 3ecb339aec12dfbc28408738ec1ceff747fc5267 (patch) | |
tree | 3d009bc7f30438e4a5fbe57517342f1d8b7ff412 /pw_log_rpc | |
parent | c228f0d45c753ba7219dab750d8b2dedaad6b8cc (diff) | |
download | pigweed-3ecb339aec12dfbc28408738ec1ceff747fc5267.tar.gz |
pw_log_rpc: Remove use of pw_rpc channel payload buffer
Provide a buffer for encoding rather than using the deprecated
pw_rpc PayloadBuffer() API.
Change-Id: Id330a4198a820c709f789a2ea93ff45c7710dd36
Bug: 605
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/81700
Reviewed-by: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r-- | pw_log_rpc/docs.rst | 12 | ||||
-rw-r--r-- | pw_log_rpc/log_service_test.cc | 34 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h | 43 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h | 33 | ||||
-rw-r--r-- | pw_log_rpc/rpc_log_drain.cc | 23 | ||||
-rw-r--r-- | pw_log_rpc/rpc_log_drain_test.cc | 16 |
6 files changed, 87 insertions, 74 deletions
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst index 304b58661..53f3dfc1a 100644 --- a/pw_log_rpc/docs.rst +++ b/pw_log_rpc/docs.rst @@ -150,10 +150,14 @@ channel ID. RpcLogDrainThread ----------------- -The module includes a sample thread that flushes each drain sequentially. Future -work might replace this with enqueueing the flush work on a work queue. The user -can also choose to have different threads flushing individual ``RpcLogDrain``\s -with different priorities. +The module includes a sample thread that flushes each drain sequentially. +``RpcLogDrainThread`` takes an encoding buffer span at construction. +``RpcLogDrainThreadWithBuffer`` takes a template parameter for the buffer size, +which must be large enough to fit at least one log entry. + +Future work might replace this with enqueueing the flush work on a work queue. +The user can also choose to have different threads flushing individual +``RpcLogDrain``\s with different priorities. Calling ``OpenUnrequestedLogStream()`` is a convenient way to set up a log stream that is started without the need to receive an RCP request for logs. diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc index 5a8c6c041..a69eda4a9 100644 --- a/pw_log_rpc/log_service_test.cc +++ b/pw_log_rpc/log_service_test.cc @@ -144,6 +144,8 @@ class LogServiceTest : public ::testing::Test { RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors, &filters_[2]), }; + + std::array<std::byte, 128> encoding_buffer_ = {}; }; struct TestLogEntry { log_tokenized::Metadata metadata = kSampleMetadata; @@ -269,7 +271,7 @@ size_t CountLogEntries(protobuf::Decoder& entries_decoder) { TEST_F(LogServiceTest, AssignWriter) { // Drains don't have writers. for (auto& drain : drain_map_.drains()) { - EXPECT_EQ(drain.Flush(), Status::Unavailable()); + EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable()); } // Create context directed to drain with ID 1. @@ -280,12 +282,12 @@ TEST_F(LogServiceTest, AssignWriter) { // Call RPC, which sets the drain's writer. context.call(rpc_request_buffer); - EXPECT_EQ(active_drain.Flush(), OkStatus()); + EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus()); // Other drains are still missing writers. for (auto& drain : drain_map_.drains()) { if (drain.channel_id() != drain_channel_id) { - EXPECT_EQ(drain.Flush(), Status::Unavailable()); + EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable()); } } @@ -294,7 +296,7 @@ TEST_F(LogServiceTest, AssignWriter) { LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_); second_call_context.set_channel_id(drain_channel_id); second_call_context.call(rpc_request_buffer); - EXPECT_EQ(active_drain.Flush(), OkStatus()); + EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus()); ASSERT_TRUE(second_call_context.done()); EXPECT_EQ(second_call_context.responses().size(), 0u); @@ -303,7 +305,7 @@ TEST_F(LogServiceTest, AssignWriter) { LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_); third_call_context.set_channel_id(drain_channel_id); third_call_context.call(rpc_request_buffer); - EXPECT_EQ(active_drain.Flush(), OkStatus()); + EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus()); ASSERT_FALSE(third_call_context.done()); EXPECT_EQ(third_call_context.responses().size(), 0u); EXPECT_EQ(active_drain.Close(), OkStatus()); @@ -321,7 +323,7 @@ TEST_F(LogServiceTest, StartAndEndStream) { // Request logs. context.call(rpc_request_buffer); - EXPECT_EQ(active_drain.Flush(), OkStatus()); + EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus()); // Not done until the stream is finished. ASSERT_FALSE(context.done()); @@ -364,7 +366,7 @@ TEST_F(LogServiceTest, HandleDropped) { // Request logs. context.call(rpc_request_buffer); - EXPECT_EQ(active_drain.Flush(), OkStatus()); + EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus()); active_drain.Close(); ASSERT_EQ(context.status(), OkStatus()); // There is at least 1 response with multiple log entries packed. @@ -405,7 +407,7 @@ TEST_F(LogServiceTest, HandleSmallBuffer) { AddLogEntries(total_entries, kLongMessage, kSampleMetadata, kSampleTimestamp); // Request logs. context.call(rpc_request_buffer); - EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus()); + EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus()); EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus()); ASSERT_EQ(context.status(), OkStatus()); ASSERT_GE(context.responses().size(), 1u); @@ -478,7 +480,7 @@ TEST_F(LogServiceTest, LargeLogEntry) { LOG_SERVICE_METHOD_CONTEXT context(drain_map_); context.set_channel_id(drain_channel_id); context.call(rpc_request_buffer); - ASSERT_EQ(active_drain.Flush(), OkStatus()); + ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus()); active_drain.Close(); ASSERT_EQ(context.status(), OkStatus()); ASSERT_EQ(context.responses().size(), 1u); @@ -531,7 +533,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { server, drain_channel_id, log_service); EXPECT_EQ(drain.value()->Open(writer), OkStatus()); // This drain closes on errors. - EXPECT_EQ(drain.value()->Flush(), Status::Aborted()); + EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted()); EXPECT_TRUE(output.done()); // Make sure not all packets were sent. @@ -563,7 +565,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) { writer = rpc::RawServerWriter::Open<Logs::Listen>( server, drain_channel_id, log_service); EXPECT_EQ(drain.value()->Open(writer), OkStatus()); - EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus()); // Add expected messages to the stack in the reverse order they are received. message_stack.clear(); @@ -627,7 +629,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) { server, drain_channel_id, log_service); EXPECT_EQ(drain.value()->Open(writer), OkStatus()); // This drain ignores errors. - EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus()); EXPECT_FALSE(output.done()); // Make sure some packets were sent. @@ -672,7 +674,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) { const size_t previous_stream_packet_count = output.payloads<Logs::Listen>().size(); output.set_send_status(Status::Unavailable()); - EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus()); EXPECT_FALSE(output.done()); ASSERT_EQ(output.payloads<Logs::Listen>().size(), previous_stream_packet_count); @@ -746,7 +748,7 @@ TEST_F(LogServiceTest, FilterLogs) { LOG_SERVICE_METHOD_CONTEXT context(drain_map_); context.set_channel_id(drain.channel_id()); context.call({}); - ASSERT_EQ(drain.Flush(), OkStatus()); + ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus()); size_t entries_found = 0; uint32_t drop_count_found = 0; @@ -774,13 +776,13 @@ TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) { server, drain_channel_id, log_service); EXPECT_EQ(drain.value()->Open(writer), OkStatus()); // This drain closes on errors. - EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus()); // Request log stream with a new writer. writer = rpc::RawServerWriter::Open<Logs::Listen>( server, drain_channel_id, log_service); EXPECT_EQ(drain.value()->Open(writer), OkStatus()); - EXPECT_EQ(drain.value()->Flush(), OkStatus()); + EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus()); } } // namespace diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h index 87aa61d3c..a4e404d8f 100644 --- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h @@ -57,41 +57,34 @@ class RpcLogDrain : public multisink::MultiSink::Drain { // must account for the max message size to avoid log entry drops. The dropped // field is not accounted since a dropped message has all other fields unset. static constexpr size_t kMinEntrySizeWithoutPayload = - // message - protobuf::FieldNumberSizeBytes(1) + - 1 // Assume minimum varint length, skip the payload bytes. - // line_level - + protobuf::FieldNumberSizeBytes(2) + - protobuf::kMaxSizeBytesUint32 - // flags - + protobuf::FieldNumberSizeBytes(3) + - protobuf::kMaxSizeBytesUint32 - // timestamp or time_since_last_entry - + protobuf::FieldNumberSizeBytes(4) + - protobuf::kMaxSizeBytesInt64 - // Module - + protobuf::FieldNumberSizeBytes(7) + - 1; // Assume minimum varint length, skip the module bytes. + protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MESSAGE, 0) + + protobuf::SizeOfFieldUint32(log::LogEntry::Fields::LINE_LEVEL) + + protobuf::SizeOfFieldUint32(log::LogEntry::Fields::FLAGS) + + protobuf::SizeOfFieldInt64(log::LogEntry::Fields::TIMESTAMP) + + protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MODULE, 0); + // The smallest buffer size must be able to fit a typical token size: 4 bytes. static constexpr size_t kMinEntryBufferSize = kMinEntrySizeWithoutPayload + 4; - // When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize + // When encoding LogEntry in LogEntries, there are kLogEntriesEncodeFrameSize // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize // can be used to calculate the minimum RPC ChannelOutput buffer size. - static constexpr size_t kLogEntryEncodeFrameSize = - protobuf::FieldNumberSizeBytes(1) // LogEntry - + protobuf::kMaxSizeOfLength; + static constexpr size_t kLogEntriesEncodeFrameSize = + protobuf::FieldNumberSizeBytes(log::LogEntries::Fields::ENTRIES) + + protobuf::kMaxSizeOfLength + + protobuf::SizeOfFieldUint32( + log::LogEntries::Fields::FIRST_ENTRY_SEQUENCE_ID); // Creates a closed log stream with a writer that can be set at a later time. // The provided buffer must be large enough to hold the largest transmittable // log::LogEntry or a drop count message at the very least. The user can // choose to provide a unique mutex for the drain, or share it to save RAM as // long as they are aware of contengency issues. - RpcLogDrain(const uint32_t channel_id, - ByteSpan log_entry_buffer, - sync::Mutex& mutex, - LogDrainErrorHandling error_handling, - Filter* filter = nullptr) + constexpr RpcLogDrain(const uint32_t channel_id, + ByteSpan log_entry_buffer, + sync::Mutex& mutex, + LogDrainErrorHandling error_handling, + Filter* filter = nullptr) : channel_id_(channel_id), error_handling_(error_handling), server_writer_(), @@ -128,7 +121,7 @@ class RpcLogDrain : public multisink::MultiSink::Drain { // OK - all entries were consumed. // ABORTED - there was an error writing the packet, and error_handling equals // `kCloseStreamOnWriterError`. - Status Flush() PW_LOCKS_EXCLUDED(mutex_); + Status Flush(ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_); // Ends RPC log stream without flushing. // diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h index 5358b35ad..c0506643e 100644 --- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h @@ -14,6 +14,7 @@ #pragma once +#include <cstddef> #include <span> #include "pw_log_rpc/log_service.h" @@ -32,11 +33,15 @@ namespace pw::log_rpc { // manages multiple log streams. It is a suitable option when a minimal // thread count is desired but comes with the cost of individual log streams // blocking each other's flushing. -class RpcLogDrainThread final : public thread::ThreadCore, - public multisink::MultiSink::Listener { +class RpcLogDrainThread : public thread::ThreadCore, + public multisink::MultiSink::Listener { public: - RpcLogDrainThread(multisink::MultiSink& multisink, RpcLogDrainMap& drain_map) - : drain_map_(drain_map), multisink_(multisink) {} + RpcLogDrainThread(multisink::MultiSink& multisink, + RpcLogDrainMap& drain_map, + std::span<std::byte> encoding_buffer) + : drain_map_(drain_map), + multisink_(multisink), + encoding_buffer_(encoding_buffer) {} void OnNewEntryAvailable() override { new_log_available_notification_.release(); @@ -51,7 +56,7 @@ class RpcLogDrainThread final : public thread::ThreadCore, while (true) { new_log_available_notification_.acquire(); for (auto& drain : drain_map_.drains()) { - drain.Flush().IgnoreError(); + drain.Flush(encoding_buffer_).IgnoreError(); } } } @@ -73,6 +78,24 @@ class RpcLogDrainThread final : public thread::ThreadCore, sync::ThreadNotification new_log_available_notification_; RpcLogDrainMap& drain_map_; multisink::MultiSink& multisink_; + std::span<std::byte> encoding_buffer_; +}; + +template <size_t kEncodingBufferSizeBytes> +class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread { + public: + RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink, + RpcLogDrainMap& drain_map) + : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {} + + private: + static_assert(kEncodingBufferSizeBytes >= + RpcLogDrain::kLogEntriesEncodeFrameSize + + RpcLogDrain::kMinEntryBufferSize, + "RpcLogDrainThread's encoding buffer must be large enough for " + "at least one entry"); + + std::byte encoding_buffer_array_[kEncodingBufferSizeBytes]; }; } // namespace pw::log_rpc diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc index a4f3e4c3a..6abaeda5e 100644 --- a/pw_log_rpc/rpc_log_drain.cc +++ b/pw_log_rpc/rpc_log_drain.cc @@ -31,13 +31,6 @@ Result<ConstByteSpan> CreateEncodedDropMessage( return ConstByteSpan(encoder); } -// TODO(pwbug/605): Remove this hack for accessing the PayloadBuffer() API. -class AccessHiddenFunctions : public rpc::RawServerWriter { - public: - using RawServerWriter::PayloadBuffer; - using RawServerWriter::ReleaseBuffer; -}; - } // namespace Status RpcLogDrain::Open(rpc::RawServerWriter& writer) { @@ -52,7 +45,7 @@ Status RpcLogDrain::Open(rpc::RawServerWriter& writer) { return OkStatus(); } -Status RpcLogDrain::Flush() { +Status RpcLogDrain::Flush(ByteSpan encoding_buffer) { PW_CHECK_NOTNULL(multisink_); LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining; @@ -61,17 +54,11 @@ Status RpcLogDrain::Flush() { if (!server_writer_.active()) { return Status::Unavailable(); } - log::LogEntries::MemoryEncoder encoder( - static_cast<AccessHiddenFunctions&>(server_writer_).PayloadBuffer()); + log::LogEntries::MemoryEncoder encoder(encoding_buffer); uint32_t packed_entry_count = 0; log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count); // Avoid sending empty packets. if (encoder.size() == 0) { - // Release buffer when still active to keep the writer in a replaceable - // state. - if (server_writer_.active()) { - static_cast<AccessHiddenFunctions&>(server_writer_).ReleaseBuffer(); - } continue; } @@ -110,7 +97,7 @@ RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket( log_entry_buffer_); // Add encoded drop messsage if fits in buffer. if (drop_message_result.ok() && - drop_message_result.value().size() + kLogEntryEncodeFrameSize < + drop_message_result.value().size() + kLogEntriesEncodeFrameSize < encoder.ConservativeWriteLimit()) { PW_CHECK_OK(encoder.WriteBytes( static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), @@ -138,8 +125,8 @@ RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket( // Check if the entry fits in encoder buffer. const size_t encoded_entry_size = - possible_entry.value().entry().size() + kLogEntryEncodeFrameSize; - if (encoded_entry_size + kLogEntryEncodeFrameSize > total_buffer_size) { + possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize; + if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) { // Entry is larger than the entire available buffer. ++committed_entry_drop_count_; PW_CHECK_OK(PopEntry(possible_entry.value())); diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc index 7f9ad5f82..83fd467ba 100644 --- a/pw_log_rpc/rpc_log_drain_test.cc +++ b/pw_log_rpc/rpc_log_drain_test.cc @@ -47,16 +47,18 @@ TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) { nullptr); EXPECT_EQ(drain.channel_id(), drain_id); + std::byte encoding_buffer[128] = {}; + // Attach drain to a MultiSink. std::array<std::byte, kBufferSize * 2> multisink_buffer; multisink::MultiSink multisink(multisink_buffer); multisink.AttachDrain(drain); - EXPECT_EQ(drain.Flush(), Status::Unavailable()); + EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable()); rpc::RawServerWriter writer; ASSERT_FALSE(writer.active()); EXPECT_EQ(drain.Open(writer), Status::FailedPrecondition()); - EXPECT_EQ(drain.Flush(), Status::Unavailable()); + EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable()); } TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) { @@ -108,6 +110,8 @@ TEST(RpcLogDrain, FlushingDrainWithOpenWriter) { RpcLogDrainMap drain_map(drains); LogService log_service(drain_map); + std::byte encoding_buffer[128] = {}; + rpc::RawFakeChannelOutput<3, 128> output; rpc::Channel channel(rpc::Channel::Create<drain_id>(&output)); rpc::Server server(std::span(&channel, 1)); @@ -117,20 +121,20 @@ TEST(RpcLogDrain, FlushingDrainWithOpenWriter) { std::array<std::byte, kBufferSize * 2> multisink_buffer; multisink::MultiSink multisink(multisink_buffer); multisink.AttachDrain(drain); - EXPECT_EQ(drain.Flush(), Status::Unavailable()); + EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable()); rpc::RawServerWriter writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( server, drain_id, log_service); ASSERT_TRUE(writer.active()); EXPECT_EQ(drain.Open(writer), OkStatus()); - EXPECT_EQ(drain.Flush(), OkStatus()); + EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus()); // Can call multliple times until closed on error. - EXPECT_EQ(drain.Flush(), OkStatus()); + EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus()); EXPECT_EQ(drain.Close(), OkStatus()); rpc::RawServerWriter& writer_ref = writer; ASSERT_FALSE(writer_ref.active()); - EXPECT_EQ(drain.Flush(), Status::Unavailable()); + EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable()); } TEST(RpcLogDrain, TryReopenOpenedDrain) { |