aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc
diff options
context:
space:
mode:
authorCarlos Chinchilla <cachinchilla@google.com>2021-08-02 20:48:33 -0700
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2021-09-03 17:31:12 +0000
commit5e645b2d2671d7f02c0d9b9d3fe76fdc14fd4a46 (patch)
tree2869a9d89082c9b9ca04fa0cad1ba23092e3125c /pw_log_rpc
parent505be03e3b3729fe0c184e0c4d20b0cb5dc3ff55 (diff)
downloadpigweed-5e645b2d2671d7f02c0d9b9d3fe76fdc14fd4a46.tar.gz
pw_log_rpc: Create Log RPC service & streams
An RpcLogStream matches a Multisink::Drain with an RPC writer and is identified by the RPC channel ID. A persistent log stream can be created by assigning an open writer to the RpcLogStream, or can be set when the stream is inactive by the RPC Log service. A map is provided to collect all log streams, persistent and dynamic. It makes it easy to assign a writer to a log stream, or to flush all the log streams. Flushing is delegated to whatever owns the log stream map. Included is a single thread, single Multisink::Listener that sequentially flushes the log streams. Future work can make use of a work queue. Test: sample app using raw strings log entries works with pw_console calling: ```for payload in rpcs.pw.log.LogService.Listen.invoke(): for entry in payload.entries: LOG.info("%s", entry.message)``` No-Docs-Update-Reason: module is still under work. Change-Id: I20fdbabe3aeb298a22c27e7a655e77c3cfaa2c13 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/56001 Commit-Queue: Carlos Chinchilla <cachinchilla@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com> Reviewed-by: Ewout van Bekkum <ewout@google.com>
Diffstat (limited to 'pw_log_rpc')
-rw-r--r--pw_log_rpc/BUILD.bazel52
-rw-r--r--pw_log_rpc/BUILD.gn55
-rw-r--r--pw_log_rpc/log_service.cc37
-rw-r--r--pw_log_rpc/logs_rpc.cc79
-rw-r--r--pw_log_rpc/logs_rpc_test.cc140
-rw-r--r--pw_log_rpc/public/pw_log_rpc/log_service.h40
-rw-r--r--pw_log_rpc/public/pw_log_rpc/logs_rpc.h55
-rw-r--r--pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h165
-rw-r--r--pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h53
-rw-r--r--pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h58
-rw-r--r--pw_log_rpc/rpc_log_drain.cc149
11 files changed, 586 insertions, 297 deletions
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index 87e2ddab4..e570466b3 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -23,26 +23,52 @@ package(default_visibility = ["//visibility:public"])
licenses(["notice"])
pw_cc_library(
- name = "pw_logs",
- srcs = ["logs_rpc.cc"],
- hdrs = ["public/pw_log_rpc/logs_rpc.h"],
+ name = "log_service",
+ srcs = ["log_service.cc"],
+ hdrs = ["public/pw_log_rpc/log_service.h"],
includes = ["public"],
deps = [
- "//pw_bytes",
+ ":rpc_log_drain",
+ "//pw_log",
+ "//pw_log:protos.pwpb",
+ "//pw_log:protos.raw_rpc",
+ ],
+)
+
+pw_cc_library(
+ name = "rpc_log_drain",
+ srcs = ["rpc_log_drain.cc"],
+ hdrs = [
+ "public/pw_log_rpc/rpc_log_drain.h",
+ "public/pw_log_rpc/rpc_log_drain_map.h",
+ ],
+ includes = ["public"],
+ deps = [
+ "//pw_assert",
+ "//pw_log:protos.pwpb",
+ "//pw_log:protos.raw_rpc",
+ "//pw_multisink",
+ "//pw_protobuf",
"//pw_result",
- "//pw_ring_buffer",
"//pw_status",
+ "//pw_string",
+ "//pw_sync:lock_annotations",
+ "//pw_sync:mutex",
],
)
-pw_cc_test(
- name = "logs_rpc_test",
- srcs = [
- "logs_rpc_test.cc",
- ],
+pw_cc_library(
+ name = "rpc_log_drain_thread",
+ hdrs = ["public/pw_log_rpc/rpc_log_drain_thread.h"],
+ includes = ["public"],
deps = [
- ":pw_log_queue",
- "//pw_preprocessor",
- "//pw_unit_test",
+ ":rpc_log_drain",
+ "//pw_sync:thread_notification",
+ "//pw_thread:thread",
],
)
+
+# TODO(cachinchilla): implement tests.
+pw_cc_test(
+ name = "logs_rpc_test",
+)
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index b4c931eda..10f313959 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -23,26 +23,61 @@ config("default_config") {
visibility = [ ":*" ]
}
-pw_source_set("logs") {
+pw_source_set("log_service") {
public_configs = [ ":default_config" ]
- public = [ "public/pw_log_rpc/logs_rpc.h" ]
- sources = [ "logs_rpc.cc" ]
+ public = [ "public/pw_log_rpc/log_service.h" ]
+ sources = [ "log_service.cc" ]
+ deps = [
+ "$dir_pw_log",
+ "$dir_pw_log:protos.pwpb",
+ ]
public_deps = [
+ ":rpc_log_drain",
+ "$dir_pw_log:protos.raw_rpc",
+ ]
+}
+
+pw_source_set("rpc_log_drain") {
+ public_configs = [ ":default_config" ]
+ public = [
+ "public/pw_log_rpc/rpc_log_drain.h",
+ "public/pw_log_rpc/rpc_log_drain_map.h",
+ ]
+ sources = [ "rpc_log_drain.cc" ]
+ deps = [
+ "$dir_pw_log",
+ "$dir_pw_string",
+ ]
+ public_deps = [
+ "$dir_pw_assert",
"$dir_pw_log:protos.pwpb",
"$dir_pw_log:protos.raw_rpc",
+ "$dir_pw_multisink",
+ "$dir_pw_protobuf",
+ "$dir_pw_result",
+ "$dir_pw_status",
+ "$dir_pw_sync:lock_annotations",
+ "$dir_pw_sync:mutex",
+ ]
+}
+
+pw_source_set("rpc_log_drain_thread") {
+ public_configs = [ ":default_config" ]
+ public = [ "public/pw_log_rpc/rpc_log_drain_thread.h" ]
+ sources = []
+ public_deps = [
+ ":rpc_log_drain",
+ "$dir_pw_multisink",
+ "$dir_pw_sync:thread_notification",
+ "$dir_pw_thread:thread",
]
}
+# TODO(cachinchilla): implement tests.
pw_test("logs_rpc_test") {
- # TODO(cachinchilla): implement RPC log tests without pw_log_multisink when
- # ready.
- # deps = [
- # ":logs",
- # "$dir_pw_rpc/raw:test_method_context",
- # ]
- # sources = [ "logs_rpc_test.cc" ]
}
+# TODO(cachinchilla): update docs.
pw_doc_group("docs") {
sources = [ "docs.rst" ]
}
diff --git a/pw_log_rpc/log_service.cc b/pw_log_rpc/log_service.cc
new file mode 100644
index 000000000..509f51263
--- /dev/null
+++ b/pw_log_rpc/log_service.cc
@@ -0,0 +1,37 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/log_service.h"
+
+#include "pw_log/log.h"
+#include "pw_log/proto/log.pwpb.h"
+
+namespace pw::log_rpc {
+
+void LogService::Listen(ServerContext& context,
+ ConstByteSpan,
+ rpc::RawServerWriter& writer) {
+ uint32_t channel_id = context.channel_id();
+ Result<RpcLogDrain*> drain = drains_.GetDrainFromChannelId(channel_id);
+ if (!drain.ok()) {
+ return;
+ }
+
+ if (const Status status = drain.value()->Open(writer); !status.ok()) {
+ PW_LOG_ERROR("Could not start new log stream. %d",
+ static_cast<int>(status.code()));
+ }
+}
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/logs_rpc.cc b/pw_log_rpc/logs_rpc.cc
deleted file mode 100644
index 5630429d3..000000000
--- a/pw_log_rpc/logs_rpc.cc
+++ /dev/null
@@ -1,79 +0,0 @@
-// Copyright 2020 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#include "pw_log_rpc/logs_rpc.h"
-
-#include "pw_log/log.h"
-#include "pw_log/proto/log.pwpb.h"
-#include "pw_status/try.h"
-
-namespace pw::log_rpc {
-namespace {
-
-// TODO(prashanthsw): Handle dropped messages.
-// Result<ConstByteSpan> GenerateDroppedEntryMessage(ByteSpan encode_buffer,
-// size_t dropped_entries) {
-// pw::log::LogEntry::MemoryEncoder encoder(encode_buffer);
-// encoder.WriteDropped(dropped_entries);
-// if (encoder.status().ok()) {
-// return ConstByteSpan(encoder);
-// }
-// return encoder.status();
-// }
-
-} // namespace
-
-void Logs::Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer) {
- response_writer_ = std::move(writer);
-}
-
-Status Logs::Flush() {
- // If the response writer was not initialized or has since been closed,
- // ignore the flush operation.
- if (!response_writer_.open()) {
- return OkStatus();
- }
-
- // If previous calls to flush resulted in dropped entries, generate a
- // dropped entry message and write it before further log messages.
- // TODO(prashanthsw): Handle dropped messages.
- // if (dropped_entries_ > 0) {
- // ByteSpan payload = response_writer_.PayloadBuffer();
- // Result dropped_log = GenerateDroppedEntryMessage(payload,
- // dropped_entries_); PW_TRY(dropped_log.status());
- // PW_TRY(response_writer_.Write(dropped_log.value()));
- // dropped_entries_ = 0;
- // }
-
- // Write logs to the response writer. An important limitation of this
- // implementation is that if this RPC call fails, the logs are lost -
- // a subsequent call to the RPC will produce a drop count message.
- ByteSpan payload = response_writer_.PayloadBuffer();
- Result possible_logs = log_queue_.PopMultiple(payload);
- PW_TRY(possible_logs.status());
- if (possible_logs.value().entry_count == 0) {
- return OkStatus();
- }
-
- Status status = response_writer_.Write(possible_logs.value().entries);
- if (!status.ok()) {
- // On a failure to send logs, track the dropped entries.
- dropped_entries_ = possible_logs.value().entry_count;
- return status;
- }
-
- return OkStatus();
-}
-
-} // namespace pw::log_rpc
diff --git a/pw_log_rpc/logs_rpc_test.cc b/pw_log_rpc/logs_rpc_test.cc
deleted file mode 100644
index 682931687..000000000
--- a/pw_log_rpc/logs_rpc_test.cc
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2020 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#include "pw_log_rpc/logs_rpc.h"
-
-#include "gtest/gtest.h"
-#include "pw_log/log.h"
-#include "pw_rpc/raw/test_method_context.h"
-
-namespace pw::log_rpc {
-namespace {
-
-#define LOGS_METHOD_CONTEXT PW_RAW_TEST_METHOD_CONTEXT(Logs, Get)
-
-constexpr size_t kEncodeBufferSize = 128;
-constexpr size_t kLogBufferSize = 4096;
-
-class LogQueueTester : public LogQueueWithEncodeBuffer<kLogBufferSize> {
- public:
- LogQueueTester(ByteSpan log_queue)
- : LogQueueWithEncodeBuffer<kLogBufferSize>(log_queue) {}
-
- void SetPopStatus(Status error_status) {
- pop_status_for_test_ = error_status;
- }
-};
-
-class LogsService : public ::testing::Test {
- public:
- LogsService() : log_queue_(log_queue_buffer_) {}
-
- protected:
- void AddLogs(const size_t log_count = 1) {
- constexpr char kTokenizedMessage[] = "message";
- for (size_t i = 0; i < log_count; i++) {
- EXPECT_EQ(
- OkStatus(),
- log_queue_.PushTokenizedMessage(
- std::as_bytes(std::span(kTokenizedMessage)), 0, 0, 0, 0, 0));
- }
- }
-
- static Logs& GetLogs(LOGS_METHOD_CONTEXT& context) {
- return (Logs&)(context.service());
- }
-
- std::array<std::byte, kEncodeBufferSize> log_queue_buffer_;
- LogQueueWithEncodeBuffer<kLogBufferSize> log_queue_;
-};
-
-TEST_F(LogsService, Get) {
- constexpr size_t kLogEntryCount = 3;
- std::array<std::byte, 1> rpc_buffer;
- LOGS_METHOD_CONTEXT context(log_queue_);
-
- context.call(rpc_buffer);
-
- // Flush all logs from the buffer, then close the RPC.
- AddLogs(kLogEntryCount);
- GetLogs(context)
- .Flush()
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- GetLogs(context).Finish();
-
- EXPECT_TRUE(context.done());
- EXPECT_EQ(OkStatus(), context.status());
-
- // Although |kLogEntryCount| messages were in the queue, they are batched
- // before being written to the client, so there is only one response.
- EXPECT_EQ(1U, context.total_responses());
-}
-
-TEST_F(LogsService, GetMultiple) {
- constexpr size_t kLogEntryCount = 1;
- constexpr size_t kFlushCount = 3;
- std::array<std::byte, 1> rpc_buffer;
- LOGS_METHOD_CONTEXT context(log_queue_);
-
- context.call(rpc_buffer);
-
- for (size_t i = 0; i < kFlushCount; i++) {
- AddLogs(kLogEntryCount);
- GetLogs(context)
- .Flush()
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- }
- GetLogs(context).Finish();
-
- EXPECT_TRUE(context.done());
- EXPECT_EQ(OkStatus(), context.status());
- EXPECT_EQ(kFlushCount, context.total_responses());
-}
-
-TEST_F(LogsService, NoEntriesOnEmptyQueue) {
- std::array<std::byte, 1> rpc_buffer;
- LOGS_METHOD_CONTEXT context(log_queue_);
-
- // Invoking flush with no logs in the queue should behave like a no-op.
- context.call(rpc_buffer);
- GetLogs(context)
- .Flush()
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- GetLogs(context).Finish();
-
- EXPECT_TRUE(context.done());
- EXPECT_EQ(OkStatus(), context.status());
- EXPECT_EQ(0U, context.total_responses());
-}
-
-TEST_F(LogsService, QueueError) {
- std::array<std::byte, 1> rpc_buffer;
- LogQueueTester log_queue_tester(log_queue_buffer_);
- LOGS_METHOD_CONTEXT context(log_queue_tester);
-
- // Generate failure on log queue.
- log_queue_tester.SetPopStatus(Status::Internal());
- context.call(rpc_buffer);
- GetLogs(context)
- .Flush()
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- GetLogs(context).Finish();
-
- EXPECT_TRUE(context.done());
- EXPECT_EQ(OkStatus(), context.status());
- EXPECT_EQ(0U, context.total_responses());
-}
-
-} // namespace
-} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_service.h b/pw_log_rpc/public/pw_log_rpc/log_service.h
new file mode 100644
index 000000000..3cc648b03
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/log_service.h
@@ -0,0 +1,40 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include "pw_log/proto/log.raw_rpc.pb.h"
+#include "pw_log_rpc/rpc_log_drain_map.h"
+
+namespace pw::log_rpc {
+
+// The RPC LogService provides a way to start a log stream on a known RPC
+// channel with a writer provided on a call. Log streams maintenance is flexible
+// and delegated outside the service.
+class LogService final : public log::generated::LogService<LogService> {
+ public:
+ LogService(RpcLogDrainMap& drains) : drains_(drains) {}
+
+ // Starts listening to logs on the given RPC channel and writer. The call is
+ // ignored if the channel was not pre-registered in the drain map. If there is
+ // an existent stream of logs for the given channel and previous writer, the
+ // writer in this call is closed without finishing the RPC call and the log
+ // stream using the previous writer continues.
+ void Listen(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer);
+
+ private:
+ RpcLogDrainMap& drains_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/logs_rpc.h b/pw_log_rpc/public/pw_log_rpc/logs_rpc.h
deleted file mode 100644
index a6fef55f4..000000000
--- a/pw_log_rpc/public/pw_log_rpc/logs_rpc.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2020 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#pragma once
-
-#include "pw_log/log.h"
-#include "pw_log/proto/log.raw_rpc.pb.h"
-#include "pw_log_multisink/log_queue.h"
-
-namespace pw::log_rpc {
-
-// The Logs RPC service will send logs when requested by Get(). For now, Get()
-// requests result in a stream of responses, containing all log entries from
-// the attached log queue.
-//
-// The Get() method will return logs in the current queue immediately, but
-// someone else is responsible for pumping the log queue using Flush().
-class Logs final : public pw::log::generated::Logs<Logs> {
- public:
- Logs(LogQueue& log_queue) : log_queue_(log_queue), dropped_entries_(0) {}
-
- // RPC API for the Logs that produces a log stream. This method will
- // return immediately, another class must call Flush() to push logs from
- // the queue to this stream.
- void Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer);
-
- // Interface for the owner of the service instance to flush all existing
- // logs to the writer, if one is attached.
- Status Flush();
-
- // Interface for the owner of the service instance to close the RPC, if
- // one is attached.
- void Finish() {
- response_writer_.Finish()
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- }
-
- private:
- LogQueue& log_queue_;
- rpc::RawServerWriter response_writer_;
- size_t dropped_entries_;
-};
-
-} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
new file mode 100644
index 000000000..6933690bc
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -0,0 +1,165 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include <array>
+#include <cstdint>
+
+#include "pw_assert/assert.h"
+#include "pw_bytes/span.h"
+#include "pw_log/proto/log.pwpb.h"
+#include "pw_multisink/multisink.h"
+#include "pw_protobuf/serialized_size.h"
+#include "pw_result/result.h"
+#include "pw_rpc/raw/server_reader_writer.h"
+#include "pw_status/status.h"
+#include "pw_sync/lock_annotations.h"
+#include "pw_sync/mutex.h"
+
+namespace pw::log_rpc {
+
+// RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A
+// RPC channel ID identifies this drain. The user must attach this drain
+// to a MultiSink that returns a log::LogEntry, and provide a buffer large
+// enough to hold the largest log::LogEntry transmittable. The user must call
+// Flush(), which, on every call, packs as many log::LogEntry items as possible
+// into a log::LogEntries message, writes the message to the provided writer,
+// then repeats the process until there are no more entries in the MultiSink or
+// the writer failed to write the outgoing package, in which case the RPC on
+// the writer is closed. When close_stream_on_writer_error is false the drain
+// will continue to retrieve log entries out of the MultiSink and attempt to
+// send them out ignoring the writer errors. Note: this behavior might change or
+// be removed in the future.
+class RpcLogDrain : public multisink::MultiSink::Drain {
+ public:
+ // The minimum buffer size, without the message payload, needed to retrieve a
+ // log::LogEntry from the attached MultiSink. The user must account for the
+ // max message size to avoid log entry drops.
+ static constexpr size_t kMinEntrySizeWithoutPayload =
+ // message
+ protobuf::SizeOfFieldKey(1) +
+ 1 // Assume minimum varint length, skip the payload bytes.
+ // line_level
+ + protobuf::SizeOfFieldKey(2) +
+ protobuf::kMaxSizeBytesUint32
+ // flags
+ + protobuf::SizeOfFieldKey(3) +
+ protobuf::kMaxSizeBytesUint32
+ // timestamp or time_since_last_entry
+ + protobuf::SizeOfFieldKey(4) + protobuf::kMaxSizeBytesInt64;
+ // Message format to report the drop count.
+ static constexpr char kDropMessageFormatString[] = "Dropped %u";
+ // With a uint32_t number, "Dropped %u" is no more than 18 characters long.
+ static constexpr size_t kMaxDropMessageSize = 18;
+ // The smallest buffer size must be able to fit a drop message.
+ static constexpr size_t kMinEntryBufferSize =
+ kMaxDropMessageSize + kMinEntrySizeWithoutPayload;
+
+ // Creates a log stream with the provided open writer. Useful for streaming
+ // logs without a request.
+ // The provided buffer must be large enough to hold the largest transmittable
+ // log::LogEntry or a drop count message at the very least. The user can
+ // choose to provide a unique mutex for the drain, or share it to save RAM as
+ // long as they are aware of contengency issues.
+ RpcLogDrain(uint32_t channel_id,
+ ByteSpan log_entry_buffer,
+ rpc::RawServerWriter writer,
+ sync::Mutex& mutex,
+ bool close_stream_on_writer_error)
+ : channel_id_(channel_id),
+ close_stream_on_writer_error_(close_stream_on_writer_error),
+ server_writer_(std::move(writer)),
+ log_entry_buffer_(log_entry_buffer),
+ committed_entry_drop_count_(0),
+ mutex_(mutex) {
+ PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
+ PW_ASSERT(writer.open());
+ }
+
+ // Creates a closed log stream with a writer that can be set at a later time.
+ // The provided buffer must be large enough to hold the largest transmittable
+ // log::LogEntry or a drop count message at the very least. The user can
+ // choose to provide a unique mutex for the drain, or share it to save RAM as
+ // long as they are aware of contengency issues.
+ RpcLogDrain(uint32_t channel_id,
+ ByteSpan log_entry_buffer,
+ sync::Mutex& mutex,
+ bool close_stream_on_writer_error)
+ : channel_id_(channel_id),
+ close_stream_on_writer_error_(close_stream_on_writer_error),
+ server_writer_(),
+ log_entry_buffer_(log_entry_buffer),
+ committed_entry_drop_count_(0),
+ mutex_(mutex) {
+ PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
+ }
+
+ // Not copyable.
+ RpcLogDrain(const RpcLogDrain&) = delete;
+ RpcLogDrain& operator=(const RpcLogDrain&) = delete;
+
+ // Configures the drain with a new open server writer if the current one is
+ // not open.
+ //
+ // Return values:
+ // OK - Successfully set the new open writer.
+ // FAILED_PRECONDITION - The given writer is not open.
+ // ALREADY_EXISTS - an open writer is already set.
+ Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_);
+
+ // Accesses log entries and sends them via the writer. Expected to be called
+ // frequently to avoid log drops. If the writer fails to send a packet with
+ // multiple log entries, the entries are dropped and a drop message with the
+ // count is sent. When close_stream_on_writer_error is set, the stream will
+ // automatically be closed and Flush will return the writer error.
+ //
+ // Precondition: the drain must be attached to a MultiSink.
+ //
+ // Return values:
+ // OK - all entries were consumed.
+ // ABORTED - there was an error writing the packet, and
+ // close_stream_on_writer_error is true.
+ Status Flush() PW_LOCKS_EXCLUDED(mutex_);
+
+ // Ends RPC log stream without flushing.
+ //
+ // Return values:
+ // OK - successfully closed the server writer.
+ // FAILED_PRECONDITION - The given writer is not open.
+ // Errors from the underlying writer send packet.
+ Status Close() PW_LOCKS_EXCLUDED(mutex_);
+
+ uint32_t channel_id() const { return channel_id_; }
+
+ private:
+ enum class LogDrainState {
+ kCaughtUp,
+ kMoreEntriesRemaining,
+ };
+
+ // Fills the outgoing buffer with as many entries as possible.
+ LogDrainState EncodeOutgoingPacket(log::LogEntries::MemoryEncoder& encoder,
+ uint32_t packed_entry_count_out)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+
+ const uint32_t channel_id_;
+ const bool close_stream_on_writer_error_;
+ rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_);
+ const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
+ uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_);
+ sync::Mutex& mutex_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h
new file mode 100644
index 000000000..d21c37bef
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_map.h
@@ -0,0 +1,53 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include <span>
+
+#include "pw_log_rpc/rpc_log_drain.h"
+#include "pw_result/result.h"
+#include "pw_status/status.h"
+
+namespace pw::log_rpc {
+
+// Holds an inmutable map of RPC channel ID to RpcLogDrain to fascilitate the
+// maintenance of all RPC log streams.
+class RpcLogDrainMap {
+ public:
+ explicit constexpr RpcLogDrainMap(std::span<RpcLogDrain> drains)
+ : drains_(drains) {}
+
+ // Not copyable nor movable.
+ RpcLogDrainMap(RpcLogDrainMap const&) = delete;
+ RpcLogDrainMap& operator=(RpcLogDrainMap const&) = delete;
+ RpcLogDrainMap(RpcLogDrainMap&&) = delete;
+ RpcLogDrainMap& operator=(RpcLogDrainMap&&) = delete;
+
+ Result<RpcLogDrain*> GetDrainFromChannelId(uint32_t channel_id) const {
+ for (auto& drain : drains_) {
+ if (drain.channel_id() == channel_id) {
+ return &drain;
+ }
+ }
+ return Status::NotFound();
+ }
+
+ const std::span<RpcLogDrain>& drains() const { return drains_; }
+
+ protected:
+ const std::span<RpcLogDrain> drains_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h
new file mode 100644
index 000000000..a4160cbba
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h
@@ -0,0 +1,58 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include "pw_log_rpc/rpc_log_drain_map.h"
+#include "pw_multisink/multisink.h"
+#include "pw_sync/thread_notification.h"
+#include "pw_thread/thread_core.h"
+
+namespace pw::log_rpc {
+
+// RpcLogDrainThread is a single thread and single MultiSink::Listener that
+// manages multiple log streams. It is a suitable option when a minimal
+// thread count is desired but comes with the cost of individual log streams
+// blocking each other's flushing.
+class RpcLogDrainThread final : public thread::ThreadCore,
+ public multisink::MultiSink::Listener {
+ public:
+ RpcLogDrainThread(multisink::MultiSink& multisink, RpcLogDrainMap& drain_map)
+ : drain_map_(drain_map), multisink_(multisink) {}
+
+ void OnNewEntryAvailable() override {
+ new_log_available_notification_.release();
+ }
+
+ // Sequentially flushes each log stream.
+ void Run() override {
+ for (auto& drain : drain_map_.drains()) {
+ multisink_.AttachDrain(drain);
+ }
+ multisink_.AttachListener(*this);
+ while (true) {
+ new_log_available_notification_.acquire();
+ for (auto& drain : drain_map_.drains()) {
+ drain.Flush().IgnoreError();
+ }
+ }
+ }
+
+ private:
+ sync::ThreadNotification new_log_available_notification_;
+ RpcLogDrainMap& drain_map_;
+ multisink::MultiSink& multisink_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
new file mode 100644
index 000000000..871cbe894
--- /dev/null
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -0,0 +1,149 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/rpc_log_drain.h"
+
+#include <mutex>
+
+#include "pw_assert/check.h"
+#include "pw_log/log.h"
+#include "pw_string/string_builder.h"
+
+namespace pw::log_rpc {
+namespace {
+// When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize
+// bytes added to the encoded LogEntry.
+constexpr size_t kLogEntryEncodeFrameSize =
+ protobuf::SizeOfFieldKey(1) // LogEntry
+ + protobuf::kMaxSizeOfLength;
+
+// Creates an encoded drop message on the provided buffer.
+Result<ConstByteSpan> CreateEncodedDropMessage(
+ uint32_t drop_count, ByteSpan encoded_drop_message_buffer) {
+ StringBuffer<RpcLogDrain::kMaxDropMessageSize> message;
+ message.Format(RpcLogDrain::kDropMessageFormatString,
+ static_cast<unsigned int>(drop_count));
+
+ // Encode message in protobuf.
+ log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
+ encoder.WriteMessage(std::as_bytes(std::span(std::string_view(message))));
+ encoder.WriteLineLevel(PW_LOG_LEVEL_WARN & PW_LOG_LEVEL_BITMASK);
+ PW_TRY(encoder.status());
+ return ConstByteSpan(encoder);
+}
+} // namespace
+
+Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
+ if (!writer.open()) {
+ return Status::FailedPrecondition();
+ }
+ std::lock_guard lock(mutex_);
+ if (server_writer_.open()) {
+ return Status::AlreadyExists();
+ }
+ server_writer_ = std::move(writer);
+ return OkStatus();
+}
+
+Status RpcLogDrain::Flush() {
+ PW_CHECK_NOTNULL(multisink_);
+
+ LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
+ std::lock_guard lock(mutex_);
+ do {
+ if (!server_writer_.open()) {
+ return Status::Unavailable();
+ }
+ log::LogEntries::MemoryEncoder encoder(server_writer_.PayloadBuffer());
+ uint32_t packed_entry_count = 0;
+ log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
+ if (const Status status = server_writer_.Write(encoder); !status.ok()) {
+ committed_entry_drop_count_ += packed_entry_count;
+ if (close_stream_on_writer_error_) {
+ server_writer_.Finish().IgnoreError();
+ return Status::Aborted();
+ }
+ }
+ } while (log_sink_state == LogDrainState::kMoreEntriesRemaining);
+ return OkStatus();
+}
+
+RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
+ log::LogEntries::MemoryEncoder& encoder, uint32_t packed_entry_count_out) {
+ const size_t total_buffer_size = encoder.ConservativeWriteLimit();
+ do {
+ // Get entry and drop count from drain.
+ uint32_t drop_count = 0;
+ Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
+ PeekEntry(log_entry_buffer_, drop_count);
+ if (possible_entry.status().IsResourceExhausted() ||
+ possible_entry.status().IsDataLoss()) {
+ continue;
+ }
+
+ // Report drop count if messages were dropped.
+ if (committed_entry_drop_count_ > 0 || drop_count > 0) {
+ // Reuse the log_entry_buffer_ to send a drop message.
+ const Result<ConstByteSpan> drop_message_result =
+ CreateEncodedDropMessage(committed_entry_drop_count_ + drop_count,
+ log_entry_buffer_);
+ // Add encoded drop messsage if fits in buffer.
+ if (drop_message_result.ok() &&
+ drop_message_result.value().size() + kLogEntryEncodeFrameSize <
+ encoder.ConservativeWriteLimit()) {
+ PW_CHECK_OK(encoder.WriteBytes(
+ static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
+ drop_message_result.value()));
+ committed_entry_drop_count_ = 0;
+ }
+ if (possible_entry.ok()) {
+ PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count).status());
+ }
+ }
+
+ if (possible_entry.status().IsOutOfRange()) {
+ return LogDrainState::kCaughtUp; // There are no more entries.
+ }
+ // At this point all expected error modes have been handled.
+ PW_CHECK_OK(possible_entry.status());
+
+ // Check if the entry fits in encoder buffer.
+ const size_t encoded_entry_size =
+ possible_entry.value().entry().size() + kLogEntryEncodeFrameSize;
+ if (encoded_entry_size + kLogEntryEncodeFrameSize > total_buffer_size) {
+ // Entry is larger than the entire available buffer.
+ ++committed_entry_drop_count_;
+ PW_CHECK_OK(PopEntry(possible_entry.value()));
+ continue;
+ } else if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
+ // Entry does not fit in the partially filled encoder buffer. Notify the
+ // caller there are more entries to send.
+ return LogDrainState::kMoreEntriesRemaining;
+ }
+
+ // Encode log entry and remove it from multisink.
+ PW_CHECK_OK(encoder.WriteBytes(
+ static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
+ possible_entry.value().entry()));
+ PW_CHECK_OK(PopEntry(possible_entry.value()));
+ ++packed_entry_count_out;
+ } while (true);
+}
+
+Status RpcLogDrain::Close() {
+ std::lock_guard lock(mutex_);
+ return server_writer_.Finish();
+}
+
+} // namespace pw::log_rpc