diff options
author | Carlos Chinchilla <cachinchilla@google.com> | 2021-08-02 20:48:33 -0700 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2021-09-03 17:31:12 +0000 |
commit | 5e645b2d2671d7f02c0d9b9d3fe76fdc14fd4a46 (patch) | |
tree | 2869a9d89082c9b9ca04fa0cad1ba23092e3125c /pw_log_rpc | |
parent | 505be03e3b3729fe0c184e0c4d20b0cb5dc3ff55 (diff) | |
download | pigweed-5e645b2d2671d7f02c0d9b9d3fe76fdc14fd4a46.tar.gz |
pw_log_rpc: Create Log RPC service & streams
An RpcLogStream matches a Multisink::Drain with an RPC writer and is
identified by the RPC channel ID. A persistent log stream can be created
by assigning an open writer to the RpcLogStream, or can be set when the
stream is inactive by the RPC Log service.
A map is provided to collect all log streams, persistent and dynamic. It
makes it easy to assign a writer to a log stream, or to flush all the
log streams.
Flushing is delegated to whatever owns the log stream map. Included is a
single thread, single Multisink::Listener that sequentially flushes the
log streams. Future work can make use of a work queue.
Test: sample app using raw strings log entries works with pw_console
calling:
```for payload in rpcs.pw.log.LogService.Listen.invoke():
for entry in payload.entries:
LOG.info("%s", entry.message)```
No-Docs-Update-Reason: module is still under work.
Change-Id: I20fdbabe3aeb298a22c27e7a655e77c3cfaa2c13
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/56001
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r-- | pw_log_rpc/BUILD.bazel | 52 | ||||
-rw-r--r-- | pw_log_rpc/BUILD.gn | 55 | ||||
-rw-r--r-- | pw_log_rpc/log_service.cc | 37 | ||||
-rw-r--r-- | pw_log_rpc/logs_rpc.cc | 79 | ||||
-rw-r--r-- | pw_log_rpc/logs_rpc_test.cc | 140 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/log_service.h | 40 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/logs_rpc.h | 55 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h | 165 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h | 53 | ||||
-rw-r--r-- | pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h | 58 | ||||
-rw-r--r-- | pw_log_rpc/rpc_log_drain.cc | 149 |
11 files changed, 586 insertions, 297 deletions
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel index 87e2ddab4..e570466b3 100644 --- a/pw_log_rpc/BUILD.bazel +++ b/pw_log_rpc/BUILD.bazel @@ -23,26 +23,52 @@ package(default_visibility = ["//visibility:public"]) licenses(["notice"]) pw_cc_library( - name = "pw_logs", - srcs = ["logs_rpc.cc"], - hdrs = ["public/pw_log_rpc/logs_rpc.h"], + name = "log_service", + srcs = ["log_service.cc"], + hdrs = ["public/pw_log_rpc/log_service.h"], includes = ["public"], deps = [ - "//pw_bytes", + ":rpc_log_drain", + "//pw_log", + "//pw_log:protos.pwpb", + "//pw_log:protos.raw_rpc", + ], +) + +pw_cc_library( + name = "rpc_log_drain", + srcs = ["rpc_log_drain.cc"], + hdrs = [ + "public/pw_log_rpc/rpc_log_drain.h", + "public/pw_log_rpc/rpc_log_drain_map.h", + ], + includes = ["public"], + deps = [ + "//pw_assert", + "//pw_log:protos.pwpb", + "//pw_log:protos.raw_rpc", + "//pw_multisink", + "//pw_protobuf", "//pw_result", - "//pw_ring_buffer", "//pw_status", + "//pw_string", + "//pw_sync:lock_annotations", + "//pw_sync:mutex", ], ) -pw_cc_test( - name = "logs_rpc_test", - srcs = [ - "logs_rpc_test.cc", - ], +pw_cc_library( + name = "rpc_log_drain_thread", + hdrs = ["public/pw_log_rpc/rpc_log_drain_thread.h"], + includes = ["public"], deps = [ - ":pw_log_queue", - "//pw_preprocessor", - "//pw_unit_test", + ":rpc_log_drain", + "//pw_sync:thread_notification", + "//pw_thread:thread", ], ) + +# TODO(cachinchilla): implement tests. +pw_cc_test( + name = "logs_rpc_test", +) diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn index b4c931eda..10f313959 100644 --- a/pw_log_rpc/BUILD.gn +++ b/pw_log_rpc/BUILD.gn @@ -23,26 +23,61 @@ config("default_config") { visibility = [ ":*" ] } -pw_source_set("logs") { +pw_source_set("log_service") { public_configs = [ ":default_config" ] - public = [ "public/pw_log_rpc/logs_rpc.h" ] - sources = [ "logs_rpc.cc" ] + public = [ "public/pw_log_rpc/log_service.h" ] + sources = [ "log_service.cc" ] + deps = [ + "$dir_pw_log", + "$dir_pw_log:protos.pwpb", + ] public_deps = [ + ":rpc_log_drain", + "$dir_pw_log:protos.raw_rpc", + ] +} + +pw_source_set("rpc_log_drain") { + public_configs = [ ":default_config" ] + public = [ + "public/pw_log_rpc/rpc_log_drain.h", + "public/pw_log_rpc/rpc_log_drain_map.h", + ] + sources = [ "rpc_log_drain.cc" ] + deps = [ + "$dir_pw_log", + "$dir_pw_string", + ] + public_deps = [ + "$dir_pw_assert", "$dir_pw_log:protos.pwpb", "$dir_pw_log:protos.raw_rpc", + "$dir_pw_multisink", + "$dir_pw_protobuf", + "$dir_pw_result", + "$dir_pw_status", + "$dir_pw_sync:lock_annotations", + "$dir_pw_sync:mutex", + ] +} + +pw_source_set("rpc_log_drain_thread") { + public_configs = [ ":default_config" ] + public = [ "public/pw_log_rpc/rpc_log_drain_thread.h" ] + sources = [] + public_deps = [ + ":rpc_log_drain", + "$dir_pw_multisink", + "$dir_pw_sync:thread_notification", + "$dir_pw_thread:thread", ] } +# TODO(cachinchilla): implement tests. pw_test("logs_rpc_test") { - # TODO(cachinchilla): implement RPC log tests without pw_log_multisink when - # ready. - # deps = [ - # ":logs", - # "$dir_pw_rpc/raw:test_method_context", - # ] - # sources = [ "logs_rpc_test.cc" ] } +# TODO(cachinchilla): update docs. pw_doc_group("docs") { sources = [ "docs.rst" ] } diff --git a/pw_log_rpc/log_service.cc b/pw_log_rpc/log_service.cc new file mode 100644 index 000000000..509f51263 --- /dev/null +++ b/pw_log_rpc/log_service.cc @@ -0,0 +1,37 @@ +// Copyright 2020 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_log_rpc/log_service.h" + +#include "pw_log/log.h" +#include "pw_log/proto/log.pwpb.h" + +namespace pw::log_rpc { + +void LogService::Listen(ServerContext& context, + ConstByteSpan, + rpc::RawServerWriter& writer) { + uint32_t channel_id = context.channel_id(); + Result<RpcLogDrain*> drain = drains_.GetDrainFromChannelId(channel_id); + if (!drain.ok()) { + return; + } + + if (const Status status = drain.value()->Open(writer); !status.ok()) { + PW_LOG_ERROR("Could not start new log stream. %d", + static_cast<int>(status.code())); + } +} + +} // namespace pw::log_rpc diff --git a/pw_log_rpc/logs_rpc.cc b/pw_log_rpc/logs_rpc.cc deleted file mode 100644 index 5630429d3..000000000 --- a/pw_log_rpc/logs_rpc.cc +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2020 The Pigweed Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not -// use this file except in compliance with the License. You may obtain a copy of -// the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations under -// the License. - -#include "pw_log_rpc/logs_rpc.h" - -#include "pw_log/log.h" -#include "pw_log/proto/log.pwpb.h" -#include "pw_status/try.h" - -namespace pw::log_rpc { -namespace { - -// TODO(prashanthsw): Handle dropped messages. -// Result<ConstByteSpan> GenerateDroppedEntryMessage(ByteSpan encode_buffer, -// size_t dropped_entries) { -// pw::log::LogEntry::MemoryEncoder encoder(encode_buffer); -// encoder.WriteDropped(dropped_entries); -// if (encoder.status().ok()) { -// return ConstByteSpan(encoder); -// } -// return encoder.status(); -// } - -} // namespace - -void Logs::Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer) { - response_writer_ = std::move(writer); -} - -Status Logs::Flush() { - // If the response writer was not initialized or has since been closed, - // ignore the flush operation. - if (!response_writer_.open()) { - return OkStatus(); - } - - // If previous calls to flush resulted in dropped entries, generate a - // dropped entry message and write it before further log messages. - // TODO(prashanthsw): Handle dropped messages. - // if (dropped_entries_ > 0) { - // ByteSpan payload = response_writer_.PayloadBuffer(); - // Result dropped_log = GenerateDroppedEntryMessage(payload, - // dropped_entries_); PW_TRY(dropped_log.status()); - // PW_TRY(response_writer_.Write(dropped_log.value())); - // dropped_entries_ = 0; - // } - - // Write logs to the response writer. An important limitation of this - // implementation is that if this RPC call fails, the logs are lost - - // a subsequent call to the RPC will produce a drop count message. - ByteSpan payload = response_writer_.PayloadBuffer(); - Result possible_logs = log_queue_.PopMultiple(payload); - PW_TRY(possible_logs.status()); - if (possible_logs.value().entry_count == 0) { - return OkStatus(); - } - - Status status = response_writer_.Write(possible_logs.value().entries); - if (!status.ok()) { - // On a failure to send logs, track the dropped entries. - dropped_entries_ = possible_logs.value().entry_count; - return status; - } - - return OkStatus(); -} - -} // namespace pw::log_rpc diff --git a/pw_log_rpc/logs_rpc_test.cc b/pw_log_rpc/logs_rpc_test.cc deleted file mode 100644 index 682931687..000000000 --- a/pw_log_rpc/logs_rpc_test.cc +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2020 The Pigweed Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not -// use this file except in compliance with the License. You may obtain a copy of -// the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations under -// the License. - -#include "pw_log_rpc/logs_rpc.h" - -#include "gtest/gtest.h" -#include "pw_log/log.h" -#include "pw_rpc/raw/test_method_context.h" - -namespace pw::log_rpc { -namespace { - -#define LOGS_METHOD_CONTEXT PW_RAW_TEST_METHOD_CONTEXT(Logs, Get) - -constexpr size_t kEncodeBufferSize = 128; -constexpr size_t kLogBufferSize = 4096; - -class LogQueueTester : public LogQueueWithEncodeBuffer<kLogBufferSize> { - public: - LogQueueTester(ByteSpan log_queue) - : LogQueueWithEncodeBuffer<kLogBufferSize>(log_queue) {} - - void SetPopStatus(Status error_status) { - pop_status_for_test_ = error_status; - } -}; - -class LogsService : public ::testing::Test { - public: - LogsService() : log_queue_(log_queue_buffer_) {} - - protected: - void AddLogs(const size_t log_count = 1) { - constexpr char kTokenizedMessage[] = "message"; - for (size_t i = 0; i < log_count; i++) { - EXPECT_EQ( - OkStatus(), - log_queue_.PushTokenizedMessage( - std::as_bytes(std::span(kTokenizedMessage)), 0, 0, 0, 0, 0)); - } - } - - static Logs& GetLogs(LOGS_METHOD_CONTEXT& context) { - return (Logs&)(context.service()); - } - - std::array<std::byte, kEncodeBufferSize> log_queue_buffer_; - LogQueueWithEncodeBuffer<kLogBufferSize> log_queue_; -}; - -TEST_F(LogsService, Get) { - constexpr size_t kLogEntryCount = 3; - std::array<std::byte, 1> rpc_buffer; - LOGS_METHOD_CONTEXT context(log_queue_); - - context.call(rpc_buffer); - - // Flush all logs from the buffer, then close the RPC. - AddLogs(kLogEntryCount); - GetLogs(context) - .Flush() - .IgnoreError(); // TODO(pwbug/387): Handle Status properly - GetLogs(context).Finish(); - - EXPECT_TRUE(context.done()); - EXPECT_EQ(OkStatus(), context.status()); - - // Although |kLogEntryCount| messages were in the queue, they are batched - // before being written to the client, so there is only one response. - EXPECT_EQ(1U, context.total_responses()); -} - -TEST_F(LogsService, GetMultiple) { - constexpr size_t kLogEntryCount = 1; - constexpr size_t kFlushCount = 3; - std::array<std::byte, 1> rpc_buffer; - LOGS_METHOD_CONTEXT context(log_queue_); - - context.call(rpc_buffer); - - for (size_t i = 0; i < kFlushCount; i++) { - AddLogs(kLogEntryCount); - GetLogs(context) - .Flush() - .IgnoreError(); // TODO(pwbug/387): Handle Status properly - } - GetLogs(context).Finish(); - - EXPECT_TRUE(context.done()); - EXPECT_EQ(OkStatus(), context.status()); - EXPECT_EQ(kFlushCount, context.total_responses()); -} - -TEST_F(LogsService, NoEntriesOnEmptyQueue) { - std::array<std::byte, 1> rpc_buffer; - LOGS_METHOD_CONTEXT context(log_queue_); - - // Invoking flush with no logs in the queue should behave like a no-op. - context.call(rpc_buffer); - GetLogs(context) - .Flush() - .IgnoreError(); // TODO(pwbug/387): Handle Status properly - GetLogs(context).Finish(); - - EXPECT_TRUE(context.done()); - EXPECT_EQ(OkStatus(), context.status()); - EXPECT_EQ(0U, context.total_responses()); -} - -TEST_F(LogsService, QueueError) { - std::array<std::byte, 1> rpc_buffer; - LogQueueTester log_queue_tester(log_queue_buffer_); - LOGS_METHOD_CONTEXT context(log_queue_tester); - - // Generate failure on log queue. - log_queue_tester.SetPopStatus(Status::Internal()); - context.call(rpc_buffer); - GetLogs(context) - .Flush() - .IgnoreError(); // TODO(pwbug/387): Handle Status properly - GetLogs(context).Finish(); - - EXPECT_TRUE(context.done()); - EXPECT_EQ(OkStatus(), context.status()); - EXPECT_EQ(0U, context.total_responses()); -} - -} // namespace -} // namespace pw::log_rpc diff --git a/pw_log_rpc/public/pw_log_rpc/log_service.h b/pw_log_rpc/public/pw_log_rpc/log_service.h new file mode 100644 index 000000000..3cc648b03 --- /dev/null +++ b/pw_log_rpc/public/pw_log_rpc/log_service.h @@ -0,0 +1,40 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#pragma once + +#include "pw_log/proto/log.raw_rpc.pb.h" +#include "pw_log_rpc/rpc_log_drain_map.h" + +namespace pw::log_rpc { + +// The RPC LogService provides a way to start a log stream on a known RPC +// channel with a writer provided on a call. Log streams maintenance is flexible +// and delegated outside the service. +class LogService final : public log::generated::LogService<LogService> { + public: + LogService(RpcLogDrainMap& drains) : drains_(drains) {} + + // Starts listening to logs on the given RPC channel and writer. The call is + // ignored if the channel was not pre-registered in the drain map. If there is + // an existent stream of logs for the given channel and previous writer, the + // writer in this call is closed without finishing the RPC call and the log + // stream using the previous writer continues. + void Listen(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer); + + private: + RpcLogDrainMap& drains_; +}; + +} // namespace pw::log_rpc diff --git a/pw_log_rpc/public/pw_log_rpc/logs_rpc.h b/pw_log_rpc/public/pw_log_rpc/logs_rpc.h deleted file mode 100644 index a6fef55f4..000000000 --- a/pw_log_rpc/public/pw_log_rpc/logs_rpc.h +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2020 The Pigweed Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not -// use this file except in compliance with the License. You may obtain a copy of -// the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations under -// the License. - -#pragma once - -#include "pw_log/log.h" -#include "pw_log/proto/log.raw_rpc.pb.h" -#include "pw_log_multisink/log_queue.h" - -namespace pw::log_rpc { - -// The Logs RPC service will send logs when requested by Get(). For now, Get() -// requests result in a stream of responses, containing all log entries from -// the attached log queue. -// -// The Get() method will return logs in the current queue immediately, but -// someone else is responsible for pumping the log queue using Flush(). -class Logs final : public pw::log::generated::Logs<Logs> { - public: - Logs(LogQueue& log_queue) : log_queue_(log_queue), dropped_entries_(0) {} - - // RPC API for the Logs that produces a log stream. This method will - // return immediately, another class must call Flush() to push logs from - // the queue to this stream. - void Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer); - - // Interface for the owner of the service instance to flush all existing - // logs to the writer, if one is attached. - Status Flush(); - - // Interface for the owner of the service instance to close the RPC, if - // one is attached. - void Finish() { - response_writer_.Finish() - .IgnoreError(); // TODO(pwbug/387): Handle Status properly - } - - private: - LogQueue& log_queue_; - rpc::RawServerWriter response_writer_; - size_t dropped_entries_; -}; - -} // namespace pw::log_rpc diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h new file mode 100644 index 000000000..6933690bc --- /dev/null +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h @@ -0,0 +1,165 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#pragma once + +#include <array> +#include <cstdint> + +#include "pw_assert/assert.h" +#include "pw_bytes/span.h" +#include "pw_log/proto/log.pwpb.h" +#include "pw_multisink/multisink.h" +#include "pw_protobuf/serialized_size.h" +#include "pw_result/result.h" +#include "pw_rpc/raw/server_reader_writer.h" +#include "pw_status/status.h" +#include "pw_sync/lock_annotations.h" +#include "pw_sync/mutex.h" + +namespace pw::log_rpc { + +// RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A +// RPC channel ID identifies this drain. The user must attach this drain +// to a MultiSink that returns a log::LogEntry, and provide a buffer large +// enough to hold the largest log::LogEntry transmittable. The user must call +// Flush(), which, on every call, packs as many log::LogEntry items as possible +// into a log::LogEntries message, writes the message to the provided writer, +// then repeats the process until there are no more entries in the MultiSink or +// the writer failed to write the outgoing package, in which case the RPC on +// the writer is closed. When close_stream_on_writer_error is false the drain +// will continue to retrieve log entries out of the MultiSink and attempt to +// send them out ignoring the writer errors. Note: this behavior might change or +// be removed in the future. +class RpcLogDrain : public multisink::MultiSink::Drain { + public: + // The minimum buffer size, without the message payload, needed to retrieve a + // log::LogEntry from the attached MultiSink. The user must account for the + // max message size to avoid log entry drops. + static constexpr size_t kMinEntrySizeWithoutPayload = + // message + protobuf::SizeOfFieldKey(1) + + 1 // Assume minimum varint length, skip the payload bytes. + // line_level + + protobuf::SizeOfFieldKey(2) + + protobuf::kMaxSizeBytesUint32 + // flags + + protobuf::SizeOfFieldKey(3) + + protobuf::kMaxSizeBytesUint32 + // timestamp or time_since_last_entry + + protobuf::SizeOfFieldKey(4) + protobuf::kMaxSizeBytesInt64; + // Message format to report the drop count. + static constexpr char kDropMessageFormatString[] = "Dropped %u"; + // With a uint32_t number, "Dropped %u" is no more than 18 characters long. + static constexpr size_t kMaxDropMessageSize = 18; + // The smallest buffer size must be able to fit a drop message. + static constexpr size_t kMinEntryBufferSize = + kMaxDropMessageSize + kMinEntrySizeWithoutPayload; + + // Creates a log stream with the provided open writer. Useful for streaming + // logs without a request. + // The provided buffer must be large enough to hold the largest transmittable + // log::LogEntry or a drop count message at the very least. The user can + // choose to provide a unique mutex for the drain, or share it to save RAM as + // long as they are aware of contengency issues. + RpcLogDrain(uint32_t channel_id, + ByteSpan log_entry_buffer, + rpc::RawServerWriter writer, + sync::Mutex& mutex, + bool close_stream_on_writer_error) + : channel_id_(channel_id), + close_stream_on_writer_error_(close_stream_on_writer_error), + server_writer_(std::move(writer)), + log_entry_buffer_(log_entry_buffer), + committed_entry_drop_count_(0), + mutex_(mutex) { + PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize); + PW_ASSERT(writer.open()); + } + + // Creates a closed log stream with a writer that can be set at a later time. + // The provided buffer must be large enough to hold the largest transmittable + // log::LogEntry or a drop count message at the very least. The user can + // choose to provide a unique mutex for the drain, or share it to save RAM as + // long as they are aware of contengency issues. + RpcLogDrain(uint32_t channel_id, + ByteSpan log_entry_buffer, + sync::Mutex& mutex, + bool close_stream_on_writer_error) + : channel_id_(channel_id), + close_stream_on_writer_error_(close_stream_on_writer_error), + server_writer_(), + log_entry_buffer_(log_entry_buffer), + committed_entry_drop_count_(0), + mutex_(mutex) { + PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize); + } + + // Not copyable. + RpcLogDrain(const RpcLogDrain&) = delete; + RpcLogDrain& operator=(const RpcLogDrain&) = delete; + + // Configures the drain with a new open server writer if the current one is + // not open. + // + // Return values: + // OK - Successfully set the new open writer. + // FAILED_PRECONDITION - The given writer is not open. + // ALREADY_EXISTS - an open writer is already set. + Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_); + + // Accesses log entries and sends them via the writer. Expected to be called + // frequently to avoid log drops. If the writer fails to send a packet with + // multiple log entries, the entries are dropped and a drop message with the + // count is sent. When close_stream_on_writer_error is set, the stream will + // automatically be closed and Flush will return the writer error. + // + // Precondition: the drain must be attached to a MultiSink. + // + // Return values: + // OK - all entries were consumed. + // ABORTED - there was an error writing the packet, and + // close_stream_on_writer_error is true. + Status Flush() PW_LOCKS_EXCLUDED(mutex_); + + // Ends RPC log stream without flushing. + // + // Return values: + // OK - successfully closed the server writer. + // FAILED_PRECONDITION - The given writer is not open. + // Errors from the underlying writer send packet. + Status Close() PW_LOCKS_EXCLUDED(mutex_); + + uint32_t channel_id() const { return channel_id_; } + + private: + enum class LogDrainState { + kCaughtUp, + kMoreEntriesRemaining, + }; + + // Fills the outgoing buffer with as many entries as possible. + LogDrainState EncodeOutgoingPacket(log::LogEntries::MemoryEncoder& encoder, + uint32_t packed_entry_count_out) + PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + const uint32_t channel_id_; + const bool close_stream_on_writer_error_; + rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_); + const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_); + uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_); + sync::Mutex& mutex_; +}; + +} // namespace pw::log_rpc diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h new file mode 100644 index 000000000..d21c37bef --- /dev/null +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h @@ -0,0 +1,53 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#pragma once + +#include <span> + +#include "pw_log_rpc/rpc_log_drain.h" +#include "pw_result/result.h" +#include "pw_status/status.h" + +namespace pw::log_rpc { + +// Holds an inmutable map of RPC channel ID to RpcLogDrain to fascilitate the +// maintenance of all RPC log streams. +class RpcLogDrainMap { + public: + explicit constexpr RpcLogDrainMap(std::span<RpcLogDrain> drains) + : drains_(drains) {} + + // Not copyable nor movable. + RpcLogDrainMap(RpcLogDrainMap const&) = delete; + RpcLogDrainMap& operator=(RpcLogDrainMap const&) = delete; + RpcLogDrainMap(RpcLogDrainMap&&) = delete; + RpcLogDrainMap& operator=(RpcLogDrainMap&&) = delete; + + Result<RpcLogDrain*> GetDrainFromChannelId(uint32_t channel_id) const { + for (auto& drain : drains_) { + if (drain.channel_id() == channel_id) { + return &drain; + } + } + return Status::NotFound(); + } + + const std::span<RpcLogDrain>& drains() const { return drains_; } + + protected: + const std::span<RpcLogDrain> drains_; +}; + +} // namespace pw::log_rpc diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h new file mode 100644 index 000000000..a4160cbba --- /dev/null +++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h @@ -0,0 +1,58 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#pragma once + +#include "pw_log_rpc/rpc_log_drain_map.h" +#include "pw_multisink/multisink.h" +#include "pw_sync/thread_notification.h" +#include "pw_thread/thread_core.h" + +namespace pw::log_rpc { + +// RpcLogDrainThread is a single thread and single MultiSink::Listener that +// manages multiple log streams. It is a suitable option when a minimal +// thread count is desired but comes with the cost of individual log streams +// blocking each other's flushing. +class RpcLogDrainThread final : public thread::ThreadCore, + public multisink::MultiSink::Listener { + public: + RpcLogDrainThread(multisink::MultiSink& multisink, RpcLogDrainMap& drain_map) + : drain_map_(drain_map), multisink_(multisink) {} + + void OnNewEntryAvailable() override { + new_log_available_notification_.release(); + } + + // Sequentially flushes each log stream. + void Run() override { + for (auto& drain : drain_map_.drains()) { + multisink_.AttachDrain(drain); + } + multisink_.AttachListener(*this); + while (true) { + new_log_available_notification_.acquire(); + for (auto& drain : drain_map_.drains()) { + drain.Flush().IgnoreError(); + } + } + } + + private: + sync::ThreadNotification new_log_available_notification_; + RpcLogDrainMap& drain_map_; + multisink::MultiSink& multisink_; +}; + +} // namespace pw::log_rpc diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc new file mode 100644 index 000000000..871cbe894 --- /dev/null +++ b/pw_log_rpc/rpc_log_drain.cc @@ -0,0 +1,149 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_log_rpc/rpc_log_drain.h" + +#include <mutex> + +#include "pw_assert/check.h" +#include "pw_log/log.h" +#include "pw_string/string_builder.h" + +namespace pw::log_rpc { +namespace { +// When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize +// bytes added to the encoded LogEntry. +constexpr size_t kLogEntryEncodeFrameSize = + protobuf::SizeOfFieldKey(1) // LogEntry + + protobuf::kMaxSizeOfLength; + +// Creates an encoded drop message on the provided buffer. +Result<ConstByteSpan> CreateEncodedDropMessage( + uint32_t drop_count, ByteSpan encoded_drop_message_buffer) { + StringBuffer<RpcLogDrain::kMaxDropMessageSize> message; + message.Format(RpcLogDrain::kDropMessageFormatString, + static_cast<unsigned int>(drop_count)); + + // Encode message in protobuf. + log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer); + encoder.WriteMessage(std::as_bytes(std::span(std::string_view(message)))); + encoder.WriteLineLevel(PW_LOG_LEVEL_WARN & PW_LOG_LEVEL_BITMASK); + PW_TRY(encoder.status()); + return ConstByteSpan(encoder); +} +} // namespace + +Status RpcLogDrain::Open(rpc::RawServerWriter& writer) { + if (!writer.open()) { + return Status::FailedPrecondition(); + } + std::lock_guard lock(mutex_); + if (server_writer_.open()) { + return Status::AlreadyExists(); + } + server_writer_ = std::move(writer); + return OkStatus(); +} + +Status RpcLogDrain::Flush() { + PW_CHECK_NOTNULL(multisink_); + + LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining; + std::lock_guard lock(mutex_); + do { + if (!server_writer_.open()) { + return Status::Unavailable(); + } + log::LogEntries::MemoryEncoder encoder(server_writer_.PayloadBuffer()); + uint32_t packed_entry_count = 0; + log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count); + if (const Status status = server_writer_.Write(encoder); !status.ok()) { + committed_entry_drop_count_ += packed_entry_count; + if (close_stream_on_writer_error_) { + server_writer_.Finish().IgnoreError(); + return Status::Aborted(); + } + } + } while (log_sink_state == LogDrainState::kMoreEntriesRemaining); + return OkStatus(); +} + +RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket( + log::LogEntries::MemoryEncoder& encoder, uint32_t packed_entry_count_out) { + const size_t total_buffer_size = encoder.ConservativeWriteLimit(); + do { + // Get entry and drop count from drain. + uint32_t drop_count = 0; + Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry = + PeekEntry(log_entry_buffer_, drop_count); + if (possible_entry.status().IsResourceExhausted() || + possible_entry.status().IsDataLoss()) { + continue; + } + + // Report drop count if messages were dropped. + if (committed_entry_drop_count_ > 0 || drop_count > 0) { + // Reuse the log_entry_buffer_ to send a drop message. + const Result<ConstByteSpan> drop_message_result = + CreateEncodedDropMessage(committed_entry_drop_count_ + drop_count, + log_entry_buffer_); + // Add encoded drop messsage if fits in buffer. + if (drop_message_result.ok() && + drop_message_result.value().size() + kLogEntryEncodeFrameSize < + encoder.ConservativeWriteLimit()) { + PW_CHECK_OK(encoder.WriteBytes( + static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), + drop_message_result.value())); + committed_entry_drop_count_ = 0; + } + if (possible_entry.ok()) { + PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count).status()); + } + } + + if (possible_entry.status().IsOutOfRange()) { + return LogDrainState::kCaughtUp; // There are no more entries. + } + // At this point all expected error modes have been handled. + PW_CHECK_OK(possible_entry.status()); + + // Check if the entry fits in encoder buffer. + const size_t encoded_entry_size = + possible_entry.value().entry().size() + kLogEntryEncodeFrameSize; + if (encoded_entry_size + kLogEntryEncodeFrameSize > total_buffer_size) { + // Entry is larger than the entire available buffer. + ++committed_entry_drop_count_; + PW_CHECK_OK(PopEntry(possible_entry.value())); + continue; + } else if (encoded_entry_size > encoder.ConservativeWriteLimit()) { + // Entry does not fit in the partially filled encoder buffer. Notify the + // caller there are more entries to send. + return LogDrainState::kMoreEntriesRemaining; + } + + // Encode log entry and remove it from multisink. + PW_CHECK_OK(encoder.WriteBytes( + static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), + possible_entry.value().entry())); + PW_CHECK_OK(PopEntry(possible_entry.value())); + ++packed_entry_count_out; + } while (true); +} + +Status RpcLogDrain::Close() { + std::lock_guard lock(mutex_); + return server_writer_.Finish(); +} + +} // namespace pw::log_rpc |