aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc/rpc_log_drain.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_log_rpc/rpc_log_drain.cc')
-rw-r--r--pw_log_rpc/rpc_log_drain.cc145
1 files changed, 104 insertions, 41 deletions
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index 05884f404..fe12ba0c8 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -17,6 +17,8 @@
#include <limits>
#include <mutex>
#include <optional>
+#include <span>
+#include <string_view>
#include "pw_assert/check.h"
#include "pw_chrono/system_clock.h"
@@ -29,14 +31,29 @@
namespace pw::log_rpc {
namespace {
-// Creates an encoded drop message on the provided buffer.
-Result<ConstByteSpan> CreateEncodedDropMessage(
- uint32_t drop_count, ByteSpan encoded_drop_message_buffer) {
- // Encode message in protobuf.
+// Creates an encoded drop message on the provided buffer and adds it to the
+// bulk log entries. Resets the drop count when successfull.
+void TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,
+ std::string_view reason,
+ uint32_t& drop_count,
+ log::LogEntries::MemoryEncoder& entries_encoder) {
+ // Encode drop count and reason, if any, in log proto.
log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
- encoder.WriteDropped(drop_count);
- PW_TRY(encoder.status());
- return ConstByteSpan(encoder);
+ if (!reason.empty()) {
+ encoder.WriteMessage(std::as_bytes(std::span(reason))).IgnoreError();
+ }
+ encoder.WriteDropped(drop_count).IgnoreError();
+ if (!encoder.status().ok()) {
+ return;
+ }
+ // Add encoded drop messsage if fits in buffer.
+ ConstByteSpan drop_message(encoder);
+ if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize <
+ entries_encoder.ConservativeWriteLimit()) {
+ PW_CHECK_OK(entries_encoder.WriteBytes(
+ static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), drop_message));
+ drop_count = 0;
+ }
}
} // namespace
@@ -50,6 +67,9 @@ Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
return Status::AlreadyExists();
}
server_writer_ = std::move(writer);
+ if (on_open_callback_ != nullptr) {
+ on_open_callback_();
+ }
return OkStatus();
}
@@ -102,7 +122,8 @@ RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
continue;
}
- encoder.WriteFirstEntrySequenceId(sequence_id_);
+ encoder.WriteFirstEntrySequenceId(sequence_id_)
+ .IgnoreError(); // TODO(pwbug/387): Handle Status properly
sequence_id_ += packed_entry_count;
const Status status = server_writer_.Write(encoder);
sent_bundle_count++;
@@ -110,7 +131,7 @@ RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
if (!status.ok() &&
error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
// Only update this drop count when writer errors are not ignored.
- committed_entry_drop_count_ += packed_entry_count;
+ drop_count_writer_error_ += packed_entry_count;
server_writer_.Finish().IgnoreError();
encoding_status_out = Status::Aborted();
return log_sink_state;
@@ -123,64 +144,106 @@ RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
log::LogEntries::MemoryEncoder& encoder, uint32_t& packed_entry_count_out) {
const size_t total_buffer_size = encoder.ConservativeWriteLimit();
do {
- // Get entry and drop count from drain.
+ // Peek entry and get drop count from multisink.
uint32_t drop_count = 0;
+ uint32_t ingress_drop_count = 0;
Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
- PeekEntry(log_entry_buffer_, drop_count);
+ PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count);
+ drop_count_ingress_error_ += ingress_drop_count;
+
+ // Check if the entry fits in the entry buffer.
if (possible_entry.status().IsResourceExhausted()) {
+ ++drop_count_small_stack_buffer_;
continue;
}
- // Report drop count if messages were dropped.
- if (committed_entry_drop_count_ > 0 || drop_count > 0) {
- // Reuse the log_entry_buffer_ to send a drop message.
- const Result<ConstByteSpan> drop_message_result =
- CreateEncodedDropMessage(committed_entry_drop_count_ + drop_count,
- log_entry_buffer_);
- // Add encoded drop messsage if fits in buffer.
- if (drop_message_result.ok() &&
- drop_message_result.value().size() + kLogEntriesEncodeFrameSize <
- encoder.ConservativeWriteLimit()) {
- PW_CHECK_OK(encoder.WriteBytes(
- static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
- drop_message_result.value()));
- committed_entry_drop_count_ = 0;
- }
- if (possible_entry.ok()) {
- PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count).status());
- }
- }
-
+ // Check if there are any entries left.
if (possible_entry.status().IsOutOfRange()) {
+ // Stash multisink's reported drop count that will be reported later with
+ // any other drop counts.
+ drop_count_slow_drain_ += drop_count;
return LogDrainState::kCaughtUp; // There are no more entries.
}
- // At this point all expected error modes have been handled.
+ // At this point all expected errors have been handled.
PW_CHECK_OK(possible_entry.status());
- // TODO(pwbug/559): avoid sending multiple drop counts between filtered out
- // log entries.
+ // Check if the entry passes any set filter rules.
if (filter_ != nullptr &&
filter_->ShouldDropLog(possible_entry.value().entry())) {
+ // Add the drop count from the multisink peek, stored in `drop_count`, to
+ // the total drop count. Then drop the entry without counting it towards
+ // the total drop count. Drops will be reported later all together.
+ drop_count_slow_drain_ += drop_count;
PW_CHECK_OK(PopEntry(possible_entry.value()));
- return LogDrainState::kMoreEntriesRemaining;
+ continue;
}
- // Check if the entry fits in encoder buffer.
+ // Check if the entry fits in the encoder buffer by itself.
const size_t encoded_entry_size =
possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
// Entry is larger than the entire available buffer.
- ++committed_entry_drop_count_;
+ ++drop_count_small_outbound_buffer_;
PW_CHECK_OK(PopEntry(possible_entry.value()));
continue;
- } else if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
- // Entry does not fit in the partially filled encoder buffer. Notify the
- // caller there are more entries to send.
+ }
+
+ // At this point, we have a valid entry that may fit in the encode buffer.
+ // Report any drop counts combined reusing the log_entry_buffer_ to encode a
+ // drop message.
+ drop_count_slow_drain_ += drop_count;
+ // Account for dropped entries too large for stack buffer, which PeekEntry()
+ // also reports.
+ drop_count_slow_drain_ -= drop_count_small_stack_buffer_;
+ bool log_entry_buffer_has_valid_entry = possible_entry.ok();
+ if (drop_count_slow_drain_ > 0) {
+ TryEncodeDropMessage(log_entry_buffer_,
+ std::string_view(kSlowDrainErrorMessage),
+ drop_count_slow_drain_,
+ encoder);
+ log_entry_buffer_has_valid_entry = false;
+ }
+ if (drop_count_ingress_error_ > 0) {
+ TryEncodeDropMessage(log_entry_buffer_,
+ std::string_view(kIngressErrorMessage),
+ drop_count_ingress_error_,
+ encoder);
+ log_entry_buffer_has_valid_entry = false;
+ }
+ if (drop_count_small_stack_buffer_ > 0) {
+ TryEncodeDropMessage(log_entry_buffer_,
+ std::string_view(kSmallStackBufferErrorMessage),
+ drop_count_small_stack_buffer_,
+ encoder);
+ log_entry_buffer_has_valid_entry = false;
+ }
+ if (drop_count_small_outbound_buffer_ > 0) {
+ TryEncodeDropMessage(log_entry_buffer_,
+ std::string_view(kSmallOutboundBufferErrorMessage),
+ drop_count_small_outbound_buffer_,
+ encoder);
+ log_entry_buffer_has_valid_entry = false;
+ }
+ if (drop_count_writer_error_ > 0) {
+ TryEncodeDropMessage(log_entry_buffer_,
+ std::string_view(kWriterErrorMessage),
+ drop_count_writer_error_,
+ encoder);
+ log_entry_buffer_has_valid_entry = false;
+ }
+ if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) {
+ PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count)
+ .status());
+ }
+
+ // Check if the entry fits in the partially filled encoder buffer.
+ if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
+ // Notify the caller there are more entries to send.
return LogDrainState::kMoreEntriesRemaining;
}
- // Encode log entry and remove it from multisink.
+ // Encode the entry and remove it from multisink.
PW_CHECK_OK(encoder.WriteBytes(
static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
possible_entry.value().entry()));