diff options
author | Carlos Chinchilla <cachinchilla@google.com> | 2021-08-10 16:30:40 -0700 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2021-09-09 00:22:20 +0000 |
commit | c0cbc16ec823caa773c4d1a463dffea16240612f (patch) | |
tree | b7c500b75902cb20688a3714fa077b7f9fe16a7c /pw_log_rpc | |
parent | ff2ca8380ef3cf20ec78c1b217d88f77ed236f89 (diff) | |
download | pigweed-c0cbc16ec823caa773c4d1a463dffea16240612f.tar.gz |
pw_log_rpc: Add unit tests, docs, & error handling
Add pw_log_rpc unit tests and documentation.
Add an enum to dictacte how to handle server writer errors in
RpcLogDrain.
Test: unit tests pass
Change-Id: Ib22dd3414cf344fddd6f179d43fd4d533ba12fd8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/56931
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Keir Mierle <keir@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r-- | pw_log_rpc/BUILD.bazel | 30 | ||||
-rw-r--r-- | pw_log_rpc/BUILD.gn | 26 | ||||
-rw-r--r-- | pw_log_rpc/docs.rst | 252 | ||||
-rw-r--r-- | pw_log_rpc/log_service_test.cc | 394 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h | 26 | ||||
-rw-r--r-- | pw_log_rpc/rpc_log_drain.cc | 2 | ||||
-rw-r--r-- | pw_log_rpc/rpc_log_drain_test.cc | 94 |
7 files changed, 806 insertions, 18 deletions
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel index e570466b3..27c722117 100644 --- a/pw_log_rpc/BUILD.bazel +++ b/pw_log_rpc/BUILD.bazel @@ -68,7 +68,33 @@ pw_cc_library( ], ) -# TODO(cachinchilla): implement tests. pw_cc_test( - name = "logs_rpc_test", + name = "log_service_test", + srcs = [ + "log_service_test.cc", + ], + deps = [ + ":log_service", + "//pw_containers:vector", + "//pw_log", + "//pw_log:proto_utils", + "//pw_log:protos.pwpb", + "//pw_protobuf", + "//pw_result", + "//pw_rpc/raw:test_method_context", + "//pw_status", + "//pw_string", + "//pw_unit_test", + ], +) + +pw_cc_test( + name = "rpc_log_drain_test", + srcs = [ + "rpc_log_drain_test.cc", + ], + deps = [ + ":rpc_log_drain", + "//pw_unit_test", + ], ) diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn index 10f313959..122fccb9d 100644 --- a/pw_log_rpc/BUILD.gn +++ b/pw_log_rpc/BUILD.gn @@ -73,8 +73,25 @@ pw_source_set("rpc_log_drain_thread") { ] } -# TODO(cachinchilla): implement tests. -pw_test("logs_rpc_test") { +pw_test("log_service_test") { + sources = [ "log_service_test.cc" ] + deps = [ + ":log_service", + "$dir_pw_containers:vector", + "$dir_pw_log", + "$dir_pw_log:proto_utils", + "$dir_pw_log:protos.pwpb", + "$dir_pw_protobuf", + "$dir_pw_result", + "$dir_pw_rpc/raw:test_method_context", + "$dir_pw_status", + "$dir_pw_string", + ] +} + +pw_test("rpc_log_drain_test") { + sources = [ "rpc_log_drain_test.cc" ] + deps = [ ":rpc_log_drain" ] } # TODO(cachinchilla): update docs. @@ -83,5 +100,8 @@ pw_doc_group("docs") { } pw_test_group("tests") { - tests = [ ":logs_rpc_test" ] + tests = [ + ":log_service_test", + ":rpc_log_drain_test", + ] } diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst index ff8b46696..54916a045 100644 --- a/pw_log_rpc/docs.rst +++ b/pw_log_rpc/docs.rst @@ -3,5 +3,253 @@ ---------- pw_log_rpc ---------- -This is a RPC-based logging backend for Pigweed. It is not ready for use, and -is under construction. +An RPC-based logging backend for Pigweed. + +.. warning:: + This module is under construction and might change in the future. + +How to use +========== +1. Set up RPC +------------- +Set up RPC for your target device. Basic deployments run RPC over a UART, with +HDLC on top for framing. See :ref:`module-pw_rpc` for details on how to enable +``pw_rpc``. + +2. Set up tokenized logging (optional) +-------------------------------------- +Set up the :ref:`module-pw_log_tokenized` log backend. + +3. Connect the tokenized logging handler to the MultiSink +--------------------------------------------------------- +Create a :ref:`MultiSink <module-pw_multisink>` instance to buffer log entries. +Then, make the log backend handler, +``pw_tokenizer_HandleEncodedMessageWithPayload``, encode log entries in the +``log::LogEntry`` format, and add them to the ``MultiSink``. + +4. Create log drains +-------------------- +Create an ``RpcLogDrainMap`` with one ``RpcLogDrain`` for each RPC channel used +to stream logs. Provide this map to the ``LogService`` and register the latter +with the application's RPC service. The ``RpcLogDrainMap`` provides a convenient +way to access and maintain each ``RpcLogDrain``. Attach each ``RpcLogDrain`` to +the ``MultiSink``. + +5. Flush the log drains in the background +----------------------------------------- +Depending on the product's requirements, create a thread to flush all +``RpcLogDrain``\s or one thread per drain. The thread(s) must continuously call +``RpcLogDrain::Flush()`` to pull entries from the ``MultiSink`` and send them to +the log listeners. + +Logging over RPC diagrams +========================= + +Sample RPC logs request +----------------------- +The log listener, e.g. a computer, requests logs via RPC. The log service +receives the request and sets up the corresponding ``RpcLogDrain`` to start the +log stream. + +.. mermaid:: + + graph TD + computer[Computer]-->pw_rpc; + pw_rpc-->log_service[LogService]; + log_service-->rpc_log_drain_pc[RpcLogDrain<br>streams to<br>computer];; + +Sample logging over RPC +------------------------ +Logs are streamed via RPC to a computer, and to another log listener. There can +also be internal log readers, i.e. ``MultiSink::Drain``\s, attached to the +``MultiSink``, such as a writer to persistent memory, for example. + +.. mermaid:: + + graph TD + source1[Source 1]-->log_api[pw_log API]; + source2[Source 2]-->log_api; + log_api-->log_backend[Log backend]; + log_backend-->multisink[MultiSink]; + multisink-->drain[MultiSink::Drain]; + multisink-->rpc_log_drain_pc[RpcLogDrain<br>streams to<br>computer]; + multisink-->rpc_log_drain_other[RpcLogDrain<br>streams to<br>other log listener]; + drain-->other_consumer[Other log consumer<br>e.g. persistent memory]; + rpc_log_drain_pc-->pw_rpc; + rpc_log_drain_other-->pw_rpc; + pw_rpc-->computer[Computer]; + pw_rpc-->other_listener[Other log<br>listener]; + +RPC log service +=============== +The ``LogService`` class is an RPC service that provides a way to request a log +stream sent via RPC. Thus, it helps avoid using a different protocol for logs +and RPCs over the same interface(s). It requires a map of ``RpcLogDrains`` to +assign stream writers and delegate the log stream flushing to the user's +preferred method. + +RpcLogDrain +=========== +An ``RpcLogDrain`` reads from the ``MultiSink`` instance that buffers logs, then +packs, and sends the retrieved log entries to the log listener. One +``RpcLogDrain`` is needed for each log listener. An ``RpcLogDrain`` needs a +thread to continuously call ``Flush()`` to maintain the log stream. A thread can +maintain multiple log streams, but it must not be the same thread used by the +RPC server, to avoid blocking it. + +Each ``RpcLogDrain`` is identified by a known RPC channel ID and requires a +``rpc::RawServerWriter`` to write the packed multiple log entries. This writer +is assigned by the ``LogService::Listen`` RPC. Future work will allow +``RpcLogDrain``\s to have an open RPC writer, to constantly stream logs without +the need to request them. This is useful in cases where the connection to the +client is dropped silently because the log stream can continue when reconnected +without the client requesting it. + +An ``RpcLogDrain`` must be attached to a ``MultiSink`` containing multiple +``log::LogEntry``\s. When ``Flush`` is called, the drain acquires the +``rpc::RawServerWriter`` 's write buffer, grabs one ``log::LogEntry`` from the +multisink, encodes it into a ``log::LogEntries`` stream, and repeats the process +until the write buffer is full. Then the drain calls +``rpc::RawServerWriter::Write`` to flush the write buffer and repeats the +process until all the entries in the ``MultiSink`` are read or an error is +found. + +The user must provide a buffer large enough for the largest entry in the +``MultiSink`` while also accounting for the interface's Maximum Transmission +Unit (MTU). If the ``RpcLogDrain`` finds a drop message count as it reads the +``MultiSink`` it will insert a message in the stream with the drop message +count. + +RpcLogDrainMap +============== +Provides a convenient way to access all or a single ``RpcLogDrain`` by its RPC +channel ID. + +RpcLogDrainThread +================= +The module includes a sample thread that flushes each drain sequentially. Future +work might replace this with enqueueing the flush work on a work queue. The user +can also choose to have different threads flushing individual ``RpcLogDrain``\s +with different priorities. + +Logging example +=============== +The following code shows a sample setup to defer the log handling to the +``RpcLogDrainThread`` to avoid having the log streaming block at the log +callsite. + +main.cc +------- +.. code-block:: cpp + + #include "foo/foo_log.h" + #include "pw_log/log.h" + #include "pw_thread/detached_thread.h" + #include "pw_thread_stl/options.h" + + namespace { + + void RegisterServices() { + pw::rpc::system_server::Server().RegisterService(foo_log::log_service); + } + } // namespace + + int main() { + PW_LOG_INFO("Deferred logging over RPC example"); + pw::rpc::system_server::Init(); + RegisterServices(); + pw::thread::DetachedThread(pw::thread::stl::Options(), foo_log::log_thread); + pw::rpc::system_server::Start(); + return 0; + } + +foo_log.cc +---------- +Example of a log backend implementation, where logs enter the ``MultiSink`` and +log drains are set up. + +.. code-block:: cpp + + #include "foo/foo_log.h" + + #include <array> + #include <cstdint> + + #include "pw_chrono/system_clock.h" + #include "pw_log/proto_utils.h" + #include "pw_log_rpc/log_service.h" + #include "pw_log_rpc/rpc_log_drain.h" + #include "pw_log_rpc/rpc_log_drain_map.h" + #include "pw_log_rpc/rpc_log_drain_thread.h" + #include "pw_rpc_system_server/rpc_server.h" + #include "pw_sync/interrupt_spin_lock.h" + #include "pw_sync/lock_annotations.h" + #include "pw_sync/mutex.h" + #include "pw_tokenizer/tokenize_to_global_handler_with_payload.h" + + namespace foo_log { + namespace { + constexpr size_t kLogBufferSize = 5000; + // Tokenized logs are typically 12-24 bytes. + constexpr size_t kMaxMessageSize = 32; + // kMaxLogEntrySize should be less than the MTU of the RPC channel output used + // by the provided server writer. + constexpr size_t kMaxLogEntrySize = + pw::log_rpc::RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize; + std::array<std::byte, kLogBufferSize> multisink_buffer; + + // To save RAM, share the mutex, since drains will be managed sequentially. + pw::sync::Mutex shared_mutex; + std::array<std::byte, kMaxEntrySize> client1_buffer + PW_GUARDED_BY(shared_mutex); + std::array<std::byte, kMaxEntrySize> client2_buffer + PW_GUARDED_BY(shared_mutex); + std::array<pw::log_rpc::RpcLogDrain, 2> drains = { + pw::log_rpc::RpcLogDrain( + 1, + client1_buffer, + shared_mutex, + RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), + pw::log_rpc::RpcLogDrain( + 2, + client2_buffer, + shared_mutex, + RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), + }; + + pw::sync::InterruptSpinLock log_encode_lock; + std::array<std::byte, kMaxLogEntrySize> log_encode_buffer + PW_GUARDED_BY(log_encode_lock); + + extern "C" void pw_tokenizer_HandleEncodedMessageWithPayload( + pw_tokenizer_Payload metadata, const uint8_t message[], size_t size_bytes) { + int64_t timestamp = + pw::chrono::SystemClock::now().time_since_epoch().count(); + std::lock_guard lock(log_encode_lock); + pw::Result<pw::ConstByteSpan> encoded_log_result = + pw::log::EncodeTokenizedLog( + metadata, message, size_bytes, timestamp, log_encode_buffer); + + if (!encoded_log_result.ok()) { + GetMultiSink().HandleDropped(); + return; + } + GetMultiSink().HandleEntry(encoded_log_result.value()); + } + } // namespace + + pw::log_rpc::RpcLogDrainMap drain_map(drains); + pw::log_rpc::RpcLogDrainThread log_thread(GetMultiSink(), drain_map); + pw::log_rpc::LogService log_service(drain_map); + + pw::multisink::MultiSink& GetMultiSink() { + static pw::multisink::MultiSink multisink(multisink_buffer); + return multisink; + } + } // namespace foo_log + +Logging in other source files +----------------------------- +To defer logging, other source files must simply include ``pw_log/log.h`` and +use the :ref:`module-pw_log` APIs, as long as the source set that includes +``foo_log.cc`` is setup as the log backend. diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc new file mode 100644 index 000000000..620ea46e6 --- /dev/null +++ b/pw_log_rpc/log_service_test.cc @@ -0,0 +1,394 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_log_rpc/log_service.h" + +#include <array> +#include <cstdint> +#include <limits> + +#include "gtest/gtest.h" +#include "pw_containers/vector.h" +#include "pw_log/log.h" +#include "pw_log/proto/log.pwpb.h" +#include "pw_log/proto_utils.h" +#include "pw_protobuf/decoder.h" +#include "pw_result/result.h" +#include "pw_rpc/raw/test_method_context.h" +#include "pw_string/string_builder.h" +#include "pw_sync/mutex.h" + +namespace pw::log_rpc { +namespace { + +#define LOG_SERVICE_METHOD_CONTEXT \ + PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen) + +constexpr size_t kMaxMessageSize = 50; +static_assert(RpcLogDrain::kMaxDropMessageSize < kMaxMessageSize); +constexpr size_t kMaxLogEntrySize = + RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize; +constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10; +constexpr size_t kMaxDrains = 3; +constexpr char kMessage[] = "message"; +// A message small enough to fit encoded in LogServiceTest::entry_encode_buffer_ +// but large enough to not fit in LogServiceTest::small_buffer_. +constexpr char kLongMessage[] = + "This is a long log message that will be dropped."; +static_assert(sizeof(kLongMessage) < kMaxMessageSize); +static_assert(sizeof(kLongMessage) > RpcLogDrain::kMaxDropMessageSize); +std::array<std::byte, 1> rpc_request_buffer; + +// `LogServiceTest` sets up a logging environment for testing with a `MultiSink` +// for log entries, and multiple `RpcLogDrain`s for consuming such log entries. +// It includes methods to add log entries to the `MultiSink`, and buffers for +// encoding and retrieving log entries. Tests can choose how many entries to +// add to the multisink, and which drain to use. +class LogServiceTest : public ::testing::Test { + public: + LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) { + for (auto& drain : drain_map_.drains()) { + multisink_.AttachDrain(drain); + } + } + + void AddLogEntries(size_t log_count, std::string_view message) { + for (size_t i = 0; i < log_count; ++i) { + AddLogEntry(message); + } + } + + void AddLogEntry(std::string_view message) { + auto metadata = + log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>(); + Result<ConstByteSpan> encoded_log_result = + log::EncodeTokenizedLog(metadata, + std::as_bytes(std::span(message)), + /*ticks_since_epoch=*/0, + entry_encode_buffer_); + EXPECT_EQ(encoded_log_result.status(), OkStatus()); + multisink_.HandleEntry(encoded_log_result.value()); + } + + protected: + std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_; + multisink::MultiSink multisink_; + RpcLogDrainMap drain_map_; + std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_; + + // Drain Buffers + std::array<std::byte, kMaxLogEntrySize> drain_buffer1_; + std::array<std::byte, kMaxLogEntrySize> drain_buffer2_; + std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_; + static constexpr uint32_t kSmallBufferDrainId = 3; + sync::Mutex shared_mutex_; + std::array<RpcLogDrain, kMaxDrains> drains_{ + RpcLogDrain(1, + drain_buffer1_, + shared_mutex_, + RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), + RpcLogDrain( + 2, + drain_buffer2_, + shared_mutex_, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError), + RpcLogDrain(kSmallBufferDrainId, + small_buffer_, + shared_mutex_, + RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), + }; +}; + +// Unpacks a `LogEntry` proto buffer and compares it with the expected data. +void VerifyLogEntry(protobuf::Decoder& entry_decoder, + log_tokenized::Metadata expected_metadata, + ConstByteSpan expected_tokenized_data, + const int64_t expected_timestamp) { + ConstByteSpan tokenized_data; + EXPECT_TRUE(entry_decoder.Next().ok()); // message [tokenized] + EXPECT_EQ(1U, entry_decoder.FieldNumber()); + EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok()); + EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size()); + EXPECT_EQ(std::memcmp(tokenized_data.begin(), + expected_tokenized_data.begin(), + expected_tokenized_data.size()), + 0); + + uint32_t line_level; + EXPECT_TRUE(entry_decoder.Next().ok()); // line_level + EXPECT_EQ(2U, entry_decoder.FieldNumber()); + EXPECT_TRUE(entry_decoder.ReadUint32(&line_level).ok()); + EXPECT_EQ(expected_metadata.level(), line_level & PW_LOG_LEVEL_BITMASK); + EXPECT_EQ(expected_metadata.line_number(), + (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS); + + if (expected_metadata.flags() != 0) { + uint32_t flags; + EXPECT_TRUE(entry_decoder.Next().ok()); // flags + EXPECT_EQ(3U, entry_decoder.FieldNumber()); + EXPECT_TRUE(entry_decoder.ReadUint32(&flags).ok()); + EXPECT_EQ(expected_metadata.flags(), flags); + } + + const bool has_timestamp = entry_decoder.Next().ok(); // timestamp + if (expected_timestamp == 0 && !has_timestamp) { + return; + } + int64_t timestamp; + EXPECT_TRUE(has_timestamp); + EXPECT_EQ(4U, entry_decoder.FieldNumber()); + EXPECT_TRUE(entry_decoder.ReadInt64(×tamp).ok()); + EXPECT_EQ(expected_timestamp, timestamp); +} + +// Verifies a stream of log entries, returning the total count found. +size_t VerifyLogEntries(protobuf::Decoder& entries_decoder, + Vector<ConstByteSpan>& message_stack) { + size_t entries_found = 0; + while (entries_decoder.Next().ok()) { + ConstByteSpan entry; + EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok()); + protobuf::Decoder entry_decoder(entry); + auto expected_metadata = + log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>(); + if (message_stack.empty()) { + break; + } + ConstByteSpan expected_message = message_stack.back(); + VerifyLogEntry(entry_decoder, + expected_metadata, + expected_message, + /*expected_timestamp=*/0); + message_stack.pop_back(); + ++entries_found; + } + return entries_found; +} + +TEST_F(LogServiceTest, AssignWriter) { + // Drains don't have writers. + for (auto& drain : drain_map_.drains()) { + EXPECT_EQ(drain.Flush(), Status::Unavailable()); + } + + // Create context directed to drain with ID 1. + RpcLogDrain& active_drain = drains_[0]; + const uint32_t drain_channel_id = active_drain.channel_id(); + LOG_SERVICE_METHOD_CONTEXT context(drain_map_); + context.set_channel_id(drain_channel_id); + + // Call RPC, which sets the drain's writer. + context.call(rpc_request_buffer); + EXPECT_EQ(active_drain.Flush(), OkStatus()); + + // Other drains are still missing writers. + for (auto& drain : drain_map_.drains()) { + if (drain.channel_id() != drain_channel_id) { + EXPECT_EQ(drain.Flush(), Status::Unavailable()); + } + } + + // Calling an ongoing log stream must not change the active drain's + // writer, and the second writer must not get any responses. + LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_); + second_call_context.set_channel_id(drain_channel_id); + second_call_context.call(rpc_request_buffer); + EXPECT_EQ(active_drain.Flush(), OkStatus()); + ASSERT_TRUE(second_call_context.done()); + EXPECT_EQ(second_call_context.responses().size(), 0u); + + // Setting a new writer on a closed stream is allowed. + ASSERT_EQ(active_drain.Close(), OkStatus()); + LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_); + third_call_context.set_channel_id(drain_channel_id); + third_call_context.call(rpc_request_buffer); + EXPECT_EQ(active_drain.Flush(), OkStatus()); + ASSERT_FALSE(third_call_context.done()); + EXPECT_EQ(third_call_context.responses().size(), 1u); + EXPECT_EQ(active_drain.Close(), OkStatus()); +} + +TEST_F(LogServiceTest, StartAndEndStream) { + RpcLogDrain& active_drain = drains_[2]; + const uint32_t drain_channel_id = active_drain.channel_id(); + LOG_SERVICE_METHOD_CONTEXT context(drain_map_); + context.set_channel_id(drain_channel_id); + + // Add log entries. + const size_t total_entries = 10; + AddLogEntries(total_entries, kMessage); + // Request logs. + context.call(rpc_request_buffer); + EXPECT_EQ(active_drain.Flush(), OkStatus()); + + // Not done until the stream is finished. + ASSERT_FALSE(context.done()); + active_drain.Close(); + ASSERT_TRUE(context.done()); + + EXPECT_EQ(context.status(), OkStatus()); + // There is at least 1 response with multiple log entries packed. + EXPECT_GE(context.responses().size(), 1u); + + // Verify data in responses. + Vector<ConstByteSpan, total_entries> message_stack; + for (size_t i = 0; i < total_entries; ++i) { + message_stack.push_back( + std::as_bytes(std::span(std::string_view(kMessage)))); + } + size_t entries_found = 0; + for (auto& response : context.responses()) { + protobuf::Decoder entry_decoder(response); + entries_found += VerifyLogEntries(entry_decoder, message_stack); + } + EXPECT_EQ(entries_found, total_entries); +} + +TEST_F(LogServiceTest, HandleDropped) { + RpcLogDrain& active_drain = drains_[0]; + const uint32_t drain_channel_id = active_drain.channel_id(); + LOG_SERVICE_METHOD_CONTEXT context(drain_map_); + context.set_channel_id(drain_channel_id); + + // Add log entries. + const size_t total_entries = 5; + const uint32_t total_drop_count = 2; + AddLogEntries(total_entries, kMessage); + multisink_.HandleDropped(total_drop_count); + + // Request logs. + context.call(rpc_request_buffer); + EXPECT_EQ(active_drain.Flush(), OkStatus()); + active_drain.Close(); + ASSERT_EQ(context.status(), OkStatus()); + // There is at least 1 response with multiple log entries packed. + ASSERT_GE(context.responses().size(), 1u); + + // Add create expected messages in a stack to match the order they arrive in. + Vector<ConstByteSpan, total_entries + 1> message_stack; + StringBuffer<32> message; + message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count)); + message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + for (size_t i = 0; i < total_entries; ++i) { + message_stack.push_back( + std::as_bytes(std::span(std::string_view(kMessage)))); + } + + // Verify data in responses. + size_t entries_found = 0; + for (auto& response : context.responses()) { + protobuf::Decoder entry_decoder(response); + entries_found += VerifyLogEntries(entry_decoder, message_stack); + } + // Expect an extra message with the drop count. + EXPECT_EQ(entries_found, total_entries + 1); +} + +TEST_F(LogServiceTest, HandleSmallBuffer) { + LOG_SERVICE_METHOD_CONTEXT context(drain_map_); + context.set_channel_id(kSmallBufferDrainId); + auto small_buffer_drain = + drain_map_.GetDrainFromChannelId(kSmallBufferDrainId); + ASSERT_TRUE(small_buffer_drain.ok()); + + // Add log entries. + const size_t total_entries = 5; + const uint32_t total_drop_count = total_entries; + AddLogEntries(total_entries, kLongMessage); + // Request logs. + context.call(rpc_request_buffer); + EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus()); + EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus()); + ASSERT_EQ(context.status(), OkStatus()); + ASSERT_GE(context.responses().size(), 1u); + + Vector<ConstByteSpan, total_entries + 1> message_stack; + StringBuffer<32> message; + message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count)); + message_stack.push_back(std::as_bytes(std::span(std::string_view(message)))); + + // Verify data in responses. + size_t entries_found = 0; + for (auto& response : context.responses()) { + protobuf::Decoder entry_decoder(response); + entries_found += VerifyLogEntries(entry_decoder, message_stack); + } + // No messages fit the buffer, expect a drop message. + EXPECT_EQ(entries_found, 1u); +} + +TEST_F(LogServiceTest, FlushDrainWithoutMultisink) { + auto& detached_drain = drains_[0]; + multisink_.DetachDrain(detached_drain); + LOG_SERVICE_METHOD_CONTEXT context(drain_map_); + context.set_channel_id(detached_drain.channel_id()); + + // Add log entries. + const size_t total_entries = 5; + AddLogEntries(total_entries, kMessage); + // Request logs. + context.call(rpc_request_buffer); + EXPECT_EQ(detached_drain.Close(), OkStatus()); + ASSERT_EQ(context.status(), OkStatus()); + EXPECT_EQ(context.responses().size(), 0u); +} + +TEST_F(LogServiceTest, LargeLogEntry) { + const auto expected_metadata = + log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, + (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1, + (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1, + (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(); + ConstByteSpan expected_message = std::as_bytes(std::span(kMessage)); + const int64_t expected_timestamp = std::numeric_limits<int64_t>::max(); + + // Add entry to multisink. + log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_); + encoder.WriteMessage(expected_message); + encoder.WriteLineLevel( + (expected_metadata.level() & PW_LOG_LEVEL_BITMASK) | + ((expected_metadata.line_number() << PW_LOG_LEVEL_BITS) & + ~PW_LOG_LEVEL_BITMASK)); + encoder.WriteFlags(expected_metadata.flags()); + encoder.WriteTimestamp(expected_timestamp); + ASSERT_EQ(encoder.status(), OkStatus()); + multisink_.HandleEntry(encoder); + + // Start log stream. + RpcLogDrain& active_drain = drains_[0]; + const uint32_t drain_channel_id = active_drain.channel_id(); + LOG_SERVICE_METHOD_CONTEXT context(drain_map_); + context.set_channel_id(drain_channel_id); + context.call(rpc_request_buffer); + ASSERT_EQ(active_drain.Flush(), OkStatus()); + active_drain.Close(); + ASSERT_EQ(context.status(), OkStatus()); + ASSERT_EQ(context.responses().size(), 1u); + + // Verify message. + protobuf::Decoder entries_decoder(context.responses()[0]); + ASSERT_TRUE(entries_decoder.Next().ok()); + ConstByteSpan entry; + EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok()); + protobuf::Decoder entry_decoder(entry); + VerifyLogEntry( + entry_decoder, expected_metadata, expected_message, expected_timestamp); +} + +// TODO(pwbug/469): add tests for an open RawServerWriter that closes or fails +// while flushing, then on re-open the drain sends a counts. The drain mus have +// ignore_writer_error disabled. + +} // namespace +} // namespace pw::log_rpc diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h index 6933690bc..93d5358b7 100644 --- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h @@ -38,12 +38,18 @@ namespace pw::log_rpc { // into a log::LogEntries message, writes the message to the provided writer, // then repeats the process until there are no more entries in the MultiSink or // the writer failed to write the outgoing package, in which case the RPC on -// the writer is closed. When close_stream_on_writer_error is false the drain +// the writer is closed. When error_handling is `kIgnoreWriterErrors` the drain // will continue to retrieve log entries out of the MultiSink and attempt to // send them out ignoring the writer errors. Note: this behavior might change or // be removed in the future. class RpcLogDrain : public multisink::MultiSink::Drain { public: + // Dictates how to handle server writer errors. + enum class LogDrainErrorHandling { + kIgnoreWriterErrors, + kCloseStreamOnWriterError, + }; + // The minimum buffer size, without the message payload, needed to retrieve a // log::LogEntry from the attached MultiSink. The user must account for the // max message size to avoid log entry drops. @@ -77,9 +83,9 @@ class RpcLogDrain : public multisink::MultiSink::Drain { ByteSpan log_entry_buffer, rpc::RawServerWriter writer, sync::Mutex& mutex, - bool close_stream_on_writer_error) + LogDrainErrorHandling error_handling) : channel_id_(channel_id), - close_stream_on_writer_error_(close_stream_on_writer_error), + error_handling_(error_handling), server_writer_(std::move(writer)), log_entry_buffer_(log_entry_buffer), committed_entry_drop_count_(0), @@ -96,9 +102,9 @@ class RpcLogDrain : public multisink::MultiSink::Drain { RpcLogDrain(uint32_t channel_id, ByteSpan log_entry_buffer, sync::Mutex& mutex, - bool close_stream_on_writer_error) + LogDrainErrorHandling error_handling) : channel_id_(channel_id), - close_stream_on_writer_error_(close_stream_on_writer_error), + error_handling_(error_handling), server_writer_(), log_entry_buffer_(log_entry_buffer), committed_entry_drop_count_(0), @@ -122,15 +128,15 @@ class RpcLogDrain : public multisink::MultiSink::Drain { // Accesses log entries and sends them via the writer. Expected to be called // frequently to avoid log drops. If the writer fails to send a packet with // multiple log entries, the entries are dropped and a drop message with the - // count is sent. When close_stream_on_writer_error is set, the stream will - // automatically be closed and Flush will return the writer error. + // count is sent. When error_handling is kCloseStreamOnWriterError, the stream + // will automatically be closed and Flush will return the writer error. // // Precondition: the drain must be attached to a MultiSink. // // Return values: // OK - all entries were consumed. - // ABORTED - there was an error writing the packet, and - // close_stream_on_writer_error is true. + // ABORTED - there was an error writing the packet, and error_handling equals + // `kCloseStreamOnWriterError`. Status Flush() PW_LOCKS_EXCLUDED(mutex_); // Ends RPC log stream without flushing. @@ -155,7 +161,7 @@ class RpcLogDrain : public multisink::MultiSink::Drain { PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); const uint32_t channel_id_; - const bool close_stream_on_writer_error_; + const LogDrainErrorHandling error_handling_; rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_); const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_); uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_); diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc index 871cbe894..385b9e0dd 100644 --- a/pw_log_rpc/rpc_log_drain.cc +++ b/pw_log_rpc/rpc_log_drain.cc @@ -70,7 +70,7 @@ Status RpcLogDrain::Flush() { log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count); if (const Status status = server_writer_.Write(encoder); !status.ok()) { committed_entry_drop_count_ += packed_entry_count; - if (close_stream_on_writer_error_) { + if (error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) { server_writer_.Finish().IgnoreError(); return Status::Aborted(); } diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc new file mode 100644 index 000000000..1eba11876 --- /dev/null +++ b/pw_log_rpc/rpc_log_drain_test.cc @@ -0,0 +1,94 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_log_rpc/rpc_log_drain.h" + +#include <array> +#include <cstdint> +#include <span> + +#include "gtest/gtest.h" +#include "pw_log_rpc/rpc_log_drain_map.h" +#include "pw_multisink/multisink.h" +#include "pw_status/status.h" +#include "pw_sync/mutex.h" + +namespace pw::log_rpc { +namespace { +static constexpr size_t kBufferSize = + RpcLogDrain::kMinEntrySizeWithoutPayload + 32; + +TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) { + // Drain without a writer. + const uint32_t drain_id = 1; + std::array<std::byte, kBufferSize> buffer1; + sync::Mutex mutex; + RpcLogDrain drain( + drain_id, + buffer1, + mutex, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError); + EXPECT_EQ(drain.channel_id(), drain_id); + + // Attach drain to a MultiSink. + std::array<std::byte, kBufferSize * 2> multisink_buffer; + multisink::MultiSink multisink(multisink_buffer); + multisink.AttachDrain(drain); + EXPECT_EQ(drain.Flush(), Status::Unavailable()); + + rpc::RawServerWriter writer; + ASSERT_FALSE(writer.open()); + EXPECT_EQ(drain.Open(writer), Status::FailedPrecondition()); + EXPECT_EQ(drain.Flush(), Status::Unavailable()); +} + +TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) { + static constexpr size_t kMaxDrains = 3; + sync::Mutex mutex; + std::array<std::array<std::byte, kBufferSize>, kMaxDrains> buffers; + std::array<RpcLogDrain, kMaxDrains> drains{ + RpcLogDrain( + 0, + buffers[0], + mutex, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError), + RpcLogDrain( + 1, + buffers[1], + mutex, + RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError), + RpcLogDrain(2, + buffers[2], + mutex, + RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors), + }; + + RpcLogDrainMap drain_map(drains); + for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) { + auto drain_result = drain_map.GetDrainFromChannelId(channel_id); + ASSERT_TRUE(drain_result.ok()); + EXPECT_EQ(drain_result.value(), &drains[channel_id]); + } + const std::span<RpcLogDrain> mapped_drains = drain_map.drains(); + ASSERT_EQ(mapped_drains.size(), kMaxDrains); + for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) { + EXPECT_EQ(&mapped_drains[channel_id], &drains[channel_id]); + } +} + +// TODO(cachinchilla): add tests for passing an open RawServerWriter when there +// is a way to create an one manually. + +} // namespace +} // namespace pw::log_rpc |