aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc
diff options
context:
space:
mode:
authorWyatt Hepler <hepler@google.com>2022-01-26 22:18:44 -0800
committerWyatt Hepler <hepler@google.com>2022-02-01 16:56:19 +0000
commit3ecb339aec12dfbc28408738ec1ceff747fc5267 (patch)
tree3d009bc7f30438e4a5fbe57517342f1d8b7ff412 /pw_log_rpc
parentc228f0d45c753ba7219dab750d8b2dedaad6b8cc (diff)
downloadpigweed-3ecb339aec12dfbc28408738ec1ceff747fc5267.tar.gz
pw_log_rpc: Remove use of pw_rpc channel payload buffer
Provide a buffer for encoding rather than using the deprecated pw_rpc PayloadBuffer() API. Change-Id: Id330a4198a820c709f789a2ea93ff45c7710dd36 Bug: 605 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/81700 Reviewed-by: Carlos Chinchilla <cachinchilla@google.com> Reviewed-by: Armando Montanez <amontanez@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r--pw_log_rpc/docs.rst12
-rw-r--r--pw_log_rpc/log_service_test.cc34
-rw-r--r--pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h43
-rw-r--r--pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h33
-rw-r--r--pw_log_rpc/rpc_log_drain.cc23
-rw-r--r--pw_log_rpc/rpc_log_drain_test.cc16
6 files changed, 87 insertions, 74 deletions
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst
index 304b58661..53f3dfc1a 100644
--- a/pw_log_rpc/docs.rst
+++ b/pw_log_rpc/docs.rst
@@ -150,10 +150,14 @@ channel ID.
RpcLogDrainThread
-----------------
-The module includes a sample thread that flushes each drain sequentially. Future
-work might replace this with enqueueing the flush work on a work queue. The user
-can also choose to have different threads flushing individual ``RpcLogDrain``\s
-with different priorities.
+The module includes a sample thread that flushes each drain sequentially.
+``RpcLogDrainThread`` takes an encoding buffer span at construction.
+``RpcLogDrainThreadWithBuffer`` takes a template parameter for the buffer size,
+which must be large enough to fit at least one log entry.
+
+Future work might replace this with enqueueing the flush work on a work queue.
+The user can also choose to have different threads flushing individual
+``RpcLogDrain``\s with different priorities.
Calling ``OpenUnrequestedLogStream()`` is a convenient way to set up a log
stream that is started without the need to receive an RCP request for logs.
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 5a8c6c041..a69eda4a9 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -144,6 +144,8 @@ class LogServiceTest : public ::testing::Test {
RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
&filters_[2]),
};
+
+ std::array<std::byte, 128> encoding_buffer_ = {};
};
struct TestLogEntry {
log_tokenized::Metadata metadata = kSampleMetadata;
@@ -269,7 +271,7 @@ size_t CountLogEntries(protobuf::Decoder& entries_decoder) {
TEST_F(LogServiceTest, AssignWriter) {
// Drains don't have writers.
for (auto& drain : drain_map_.drains()) {
- EXPECT_EQ(drain.Flush(), Status::Unavailable());
+ EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
}
// Create context directed to drain with ID 1.
@@ -280,12 +282,12 @@ TEST_F(LogServiceTest, AssignWriter) {
// Call RPC, which sets the drain's writer.
context.call(rpc_request_buffer);
- EXPECT_EQ(active_drain.Flush(), OkStatus());
+ EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
// Other drains are still missing writers.
for (auto& drain : drain_map_.drains()) {
if (drain.channel_id() != drain_channel_id) {
- EXPECT_EQ(drain.Flush(), Status::Unavailable());
+ EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
}
}
@@ -294,7 +296,7 @@ TEST_F(LogServiceTest, AssignWriter) {
LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
second_call_context.set_channel_id(drain_channel_id);
second_call_context.call(rpc_request_buffer);
- EXPECT_EQ(active_drain.Flush(), OkStatus());
+ EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
ASSERT_TRUE(second_call_context.done());
EXPECT_EQ(second_call_context.responses().size(), 0u);
@@ -303,7 +305,7 @@ TEST_F(LogServiceTest, AssignWriter) {
LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
third_call_context.set_channel_id(drain_channel_id);
third_call_context.call(rpc_request_buffer);
- EXPECT_EQ(active_drain.Flush(), OkStatus());
+ EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
ASSERT_FALSE(third_call_context.done());
EXPECT_EQ(third_call_context.responses().size(), 0u);
EXPECT_EQ(active_drain.Close(), OkStatus());
@@ -321,7 +323,7 @@ TEST_F(LogServiceTest, StartAndEndStream) {
// Request logs.
context.call(rpc_request_buffer);
- EXPECT_EQ(active_drain.Flush(), OkStatus());
+ EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
// Not done until the stream is finished.
ASSERT_FALSE(context.done());
@@ -364,7 +366,7 @@ TEST_F(LogServiceTest, HandleDropped) {
// Request logs.
context.call(rpc_request_buffer);
- EXPECT_EQ(active_drain.Flush(), OkStatus());
+ EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
active_drain.Close();
ASSERT_EQ(context.status(), OkStatus());
// There is at least 1 response with multiple log entries packed.
@@ -405,7 +407,7 @@ TEST_F(LogServiceTest, HandleSmallBuffer) {
AddLogEntries(total_entries, kLongMessage, kSampleMetadata, kSampleTimestamp);
// Request logs.
context.call(rpc_request_buffer);
- EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus());
+ EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
ASSERT_EQ(context.status(), OkStatus());
ASSERT_GE(context.responses().size(), 1u);
@@ -478,7 +480,7 @@ TEST_F(LogServiceTest, LargeLogEntry) {
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
context.set_channel_id(drain_channel_id);
context.call(rpc_request_buffer);
- ASSERT_EQ(active_drain.Flush(), OkStatus());
+ ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
active_drain.Close();
ASSERT_EQ(context.status(), OkStatus());
ASSERT_EQ(context.responses().size(), 1u);
@@ -531,7 +533,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
server, drain_channel_id, log_service);
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
// This drain closes on errors.
- EXPECT_EQ(drain.value()->Flush(), Status::Aborted());
+ EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted());
EXPECT_TRUE(output.done());
// Make sure not all packets were sent.
@@ -563,7 +565,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
writer = rpc::RawServerWriter::Open<Logs::Listen>(
server, drain_channel_id, log_service);
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
- EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
// Add expected messages to the stack in the reverse order they are received.
message_stack.clear();
@@ -627,7 +629,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
server, drain_channel_id, log_service);
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
// This drain ignores errors.
- EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
EXPECT_FALSE(output.done());
// Make sure some packets were sent.
@@ -672,7 +674,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
const size_t previous_stream_packet_count =
output.payloads<Logs::Listen>().size();
output.set_send_status(Status::Unavailable());
- EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
EXPECT_FALSE(output.done());
ASSERT_EQ(output.payloads<Logs::Listen>().size(),
previous_stream_packet_count);
@@ -746,7 +748,7 @@ TEST_F(LogServiceTest, FilterLogs) {
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
context.set_channel_id(drain.channel_id());
context.call({});
- ASSERT_EQ(drain.Flush(), OkStatus());
+ ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus());
size_t entries_found = 0;
uint32_t drop_count_found = 0;
@@ -774,13 +776,13 @@ TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) {
server, drain_channel_id, log_service);
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
// This drain closes on errors.
- EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
// Request log stream with a new writer.
writer = rpc::RawServerWriter::Open<Logs::Listen>(
server, drain_channel_id, log_service);
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
- EXPECT_EQ(drain.value()->Flush(), OkStatus());
+ EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
}
} // namespace
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
index 87aa61d3c..a4e404d8f 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -57,41 +57,34 @@ class RpcLogDrain : public multisink::MultiSink::Drain {
// must account for the max message size to avoid log entry drops. The dropped
// field is not accounted since a dropped message has all other fields unset.
static constexpr size_t kMinEntrySizeWithoutPayload =
- // message
- protobuf::FieldNumberSizeBytes(1) +
- 1 // Assume minimum varint length, skip the payload bytes.
- // line_level
- + protobuf::FieldNumberSizeBytes(2) +
- protobuf::kMaxSizeBytesUint32
- // flags
- + protobuf::FieldNumberSizeBytes(3) +
- protobuf::kMaxSizeBytesUint32
- // timestamp or time_since_last_entry
- + protobuf::FieldNumberSizeBytes(4) +
- protobuf::kMaxSizeBytesInt64
- // Module
- + protobuf::FieldNumberSizeBytes(7) +
- 1; // Assume minimum varint length, skip the module bytes.
+ protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MESSAGE, 0) +
+ protobuf::SizeOfFieldUint32(log::LogEntry::Fields::LINE_LEVEL) +
+ protobuf::SizeOfFieldUint32(log::LogEntry::Fields::FLAGS) +
+ protobuf::SizeOfFieldInt64(log::LogEntry::Fields::TIMESTAMP) +
+ protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MODULE, 0);
+
// The smallest buffer size must be able to fit a typical token size: 4 bytes.
static constexpr size_t kMinEntryBufferSize = kMinEntrySizeWithoutPayload + 4;
- // When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize
+ // When encoding LogEntry in LogEntries, there are kLogEntriesEncodeFrameSize
// bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize
// can be used to calculate the minimum RPC ChannelOutput buffer size.
- static constexpr size_t kLogEntryEncodeFrameSize =
- protobuf::FieldNumberSizeBytes(1) // LogEntry
- + protobuf::kMaxSizeOfLength;
+ static constexpr size_t kLogEntriesEncodeFrameSize =
+ protobuf::FieldNumberSizeBytes(log::LogEntries::Fields::ENTRIES) +
+ protobuf::kMaxSizeOfLength +
+ protobuf::SizeOfFieldUint32(
+ log::LogEntries::Fields::FIRST_ENTRY_SEQUENCE_ID);
// Creates a closed log stream with a writer that can be set at a later time.
// The provided buffer must be large enough to hold the largest transmittable
// log::LogEntry or a drop count message at the very least. The user can
// choose to provide a unique mutex for the drain, or share it to save RAM as
// long as they are aware of contengency issues.
- RpcLogDrain(const uint32_t channel_id,
- ByteSpan log_entry_buffer,
- sync::Mutex& mutex,
- LogDrainErrorHandling error_handling,
- Filter* filter = nullptr)
+ constexpr RpcLogDrain(const uint32_t channel_id,
+ ByteSpan log_entry_buffer,
+ sync::Mutex& mutex,
+ LogDrainErrorHandling error_handling,
+ Filter* filter = nullptr)
: channel_id_(channel_id),
error_handling_(error_handling),
server_writer_(),
@@ -128,7 +121,7 @@ class RpcLogDrain : public multisink::MultiSink::Drain {
// OK - all entries were consumed.
// ABORTED - there was an error writing the packet, and error_handling equals
// `kCloseStreamOnWriterError`.
- Status Flush() PW_LOCKS_EXCLUDED(mutex_);
+ Status Flush(ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_);
// Ends RPC log stream without flushing.
//
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h
index 5358b35ad..c0506643e 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h
@@ -14,6 +14,7 @@
#pragma once
+#include <cstddef>
#include <span>
#include "pw_log_rpc/log_service.h"
@@ -32,11 +33,15 @@ namespace pw::log_rpc {
// manages multiple log streams. It is a suitable option when a minimal
// thread count is desired but comes with the cost of individual log streams
// blocking each other's flushing.
-class RpcLogDrainThread final : public thread::ThreadCore,
- public multisink::MultiSink::Listener {
+class RpcLogDrainThread : public thread::ThreadCore,
+ public multisink::MultiSink::Listener {
public:
- RpcLogDrainThread(multisink::MultiSink& multisink, RpcLogDrainMap& drain_map)
- : drain_map_(drain_map), multisink_(multisink) {}
+ RpcLogDrainThread(multisink::MultiSink& multisink,
+ RpcLogDrainMap& drain_map,
+ std::span<std::byte> encoding_buffer)
+ : drain_map_(drain_map),
+ multisink_(multisink),
+ encoding_buffer_(encoding_buffer) {}
void OnNewEntryAvailable() override {
new_log_available_notification_.release();
@@ -51,7 +56,7 @@ class RpcLogDrainThread final : public thread::ThreadCore,
while (true) {
new_log_available_notification_.acquire();
for (auto& drain : drain_map_.drains()) {
- drain.Flush().IgnoreError();
+ drain.Flush(encoding_buffer_).IgnoreError();
}
}
}
@@ -73,6 +78,24 @@ class RpcLogDrainThread final : public thread::ThreadCore,
sync::ThreadNotification new_log_available_notification_;
RpcLogDrainMap& drain_map_;
multisink::MultiSink& multisink_;
+ std::span<std::byte> encoding_buffer_;
+};
+
+template <size_t kEncodingBufferSizeBytes>
+class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread {
+ public:
+ RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink,
+ RpcLogDrainMap& drain_map)
+ : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {}
+
+ private:
+ static_assert(kEncodingBufferSizeBytes >=
+ RpcLogDrain::kLogEntriesEncodeFrameSize +
+ RpcLogDrain::kMinEntryBufferSize,
+ "RpcLogDrainThread's encoding buffer must be large enough for "
+ "at least one entry");
+
+ std::byte encoding_buffer_array_[kEncodingBufferSizeBytes];
};
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index a4f3e4c3a..6abaeda5e 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -31,13 +31,6 @@ Result<ConstByteSpan> CreateEncodedDropMessage(
return ConstByteSpan(encoder);
}
-// TODO(pwbug/605): Remove this hack for accessing the PayloadBuffer() API.
-class AccessHiddenFunctions : public rpc::RawServerWriter {
- public:
- using RawServerWriter::PayloadBuffer;
- using RawServerWriter::ReleaseBuffer;
-};
-
} // namespace
Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
@@ -52,7 +45,7 @@ Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
return OkStatus();
}
-Status RpcLogDrain::Flush() {
+Status RpcLogDrain::Flush(ByteSpan encoding_buffer) {
PW_CHECK_NOTNULL(multisink_);
LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
@@ -61,17 +54,11 @@ Status RpcLogDrain::Flush() {
if (!server_writer_.active()) {
return Status::Unavailable();
}
- log::LogEntries::MemoryEncoder encoder(
- static_cast<AccessHiddenFunctions&>(server_writer_).PayloadBuffer());
+ log::LogEntries::MemoryEncoder encoder(encoding_buffer);
uint32_t packed_entry_count = 0;
log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
// Avoid sending empty packets.
if (encoder.size() == 0) {
- // Release buffer when still active to keep the writer in a replaceable
- // state.
- if (server_writer_.active()) {
- static_cast<AccessHiddenFunctions&>(server_writer_).ReleaseBuffer();
- }
continue;
}
@@ -110,7 +97,7 @@ RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
log_entry_buffer_);
// Add encoded drop messsage if fits in buffer.
if (drop_message_result.ok() &&
- drop_message_result.value().size() + kLogEntryEncodeFrameSize <
+ drop_message_result.value().size() + kLogEntriesEncodeFrameSize <
encoder.ConservativeWriteLimit()) {
PW_CHECK_OK(encoder.WriteBytes(
static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
@@ -138,8 +125,8 @@ RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
// Check if the entry fits in encoder buffer.
const size_t encoded_entry_size =
- possible_entry.value().entry().size() + kLogEntryEncodeFrameSize;
- if (encoded_entry_size + kLogEntryEncodeFrameSize > total_buffer_size) {
+ possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
+ if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
// Entry is larger than the entire available buffer.
++committed_entry_drop_count_;
PW_CHECK_OK(PopEntry(possible_entry.value()));
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index 7f9ad5f82..83fd467ba 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -47,16 +47,18 @@ TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
nullptr);
EXPECT_EQ(drain.channel_id(), drain_id);
+ std::byte encoding_buffer[128] = {};
+
// Attach drain to a MultiSink.
std::array<std::byte, kBufferSize * 2> multisink_buffer;
multisink::MultiSink multisink(multisink_buffer);
multisink.AttachDrain(drain);
- EXPECT_EQ(drain.Flush(), Status::Unavailable());
+ EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
rpc::RawServerWriter writer;
ASSERT_FALSE(writer.active());
EXPECT_EQ(drain.Open(writer), Status::FailedPrecondition());
- EXPECT_EQ(drain.Flush(), Status::Unavailable());
+ EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
}
TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) {
@@ -108,6 +110,8 @@ TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
RpcLogDrainMap drain_map(drains);
LogService log_service(drain_map);
+ std::byte encoding_buffer[128] = {};
+
rpc::RawFakeChannelOutput<3, 128> output;
rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
rpc::Server server(std::span(&channel, 1));
@@ -117,20 +121,20 @@ TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
std::array<std::byte, kBufferSize * 2> multisink_buffer;
multisink::MultiSink multisink(multisink_buffer);
multisink.AttachDrain(drain);
- EXPECT_EQ(drain.Flush(), Status::Unavailable());
+ EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
rpc::RawServerWriter writer =
rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
server, drain_id, log_service);
ASSERT_TRUE(writer.active());
EXPECT_EQ(drain.Open(writer), OkStatus());
- EXPECT_EQ(drain.Flush(), OkStatus());
+ EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus());
// Can call multliple times until closed on error.
- EXPECT_EQ(drain.Flush(), OkStatus());
+ EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus());
EXPECT_EQ(drain.Close(), OkStatus());
rpc::RawServerWriter& writer_ref = writer;
ASSERT_FALSE(writer_ref.active());
- EXPECT_EQ(drain.Flush(), Status::Unavailable());
+ EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
}
TEST(RpcLogDrain, TryReopenOpenedDrain) {