diff options
Diffstat (limited to 'pw_log_rpc/rpc_log_drain.cc')
-rw-r--r-- | pw_log_rpc/rpc_log_drain.cc | 145 |
1 files changed, 104 insertions, 41 deletions
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc index 05884f404..fe12ba0c8 100644 --- a/pw_log_rpc/rpc_log_drain.cc +++ b/pw_log_rpc/rpc_log_drain.cc @@ -17,6 +17,8 @@ #include <limits> #include <mutex> #include <optional> +#include <span> +#include <string_view> #include "pw_assert/check.h" #include "pw_chrono/system_clock.h" @@ -29,14 +31,29 @@ namespace pw::log_rpc { namespace { -// Creates an encoded drop message on the provided buffer. -Result<ConstByteSpan> CreateEncodedDropMessage( - uint32_t drop_count, ByteSpan encoded_drop_message_buffer) { - // Encode message in protobuf. +// Creates an encoded drop message on the provided buffer and adds it to the +// bulk log entries. Resets the drop count when successfull. +void TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer, + std::string_view reason, + uint32_t& drop_count, + log::LogEntries::MemoryEncoder& entries_encoder) { + // Encode drop count and reason, if any, in log proto. log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer); - encoder.WriteDropped(drop_count); - PW_TRY(encoder.status()); - return ConstByteSpan(encoder); + if (!reason.empty()) { + encoder.WriteMessage(std::as_bytes(std::span(reason))).IgnoreError(); + } + encoder.WriteDropped(drop_count).IgnoreError(); + if (!encoder.status().ok()) { + return; + } + // Add encoded drop messsage if fits in buffer. + ConstByteSpan drop_message(encoder); + if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize < + entries_encoder.ConservativeWriteLimit()) { + PW_CHECK_OK(entries_encoder.WriteBytes( + static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), drop_message)); + drop_count = 0; + } } } // namespace @@ -50,6 +67,9 @@ Status RpcLogDrain::Open(rpc::RawServerWriter& writer) { return Status::AlreadyExists(); } server_writer_ = std::move(writer); + if (on_open_callback_ != nullptr) { + on_open_callback_(); + } return OkStatus(); } @@ -102,7 +122,8 @@ RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles, continue; } - encoder.WriteFirstEntrySequenceId(sequence_id_); + encoder.WriteFirstEntrySequenceId(sequence_id_) + .IgnoreError(); // TODO(pwbug/387): Handle Status properly sequence_id_ += packed_entry_count; const Status status = server_writer_.Write(encoder); sent_bundle_count++; @@ -110,7 +131,7 @@ RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles, if (!status.ok() && error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) { // Only update this drop count when writer errors are not ignored. - committed_entry_drop_count_ += packed_entry_count; + drop_count_writer_error_ += packed_entry_count; server_writer_.Finish().IgnoreError(); encoding_status_out = Status::Aborted(); return log_sink_state; @@ -123,64 +144,106 @@ RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket( log::LogEntries::MemoryEncoder& encoder, uint32_t& packed_entry_count_out) { const size_t total_buffer_size = encoder.ConservativeWriteLimit(); do { - // Get entry and drop count from drain. + // Peek entry and get drop count from multisink. uint32_t drop_count = 0; + uint32_t ingress_drop_count = 0; Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry = - PeekEntry(log_entry_buffer_, drop_count); + PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count); + drop_count_ingress_error_ += ingress_drop_count; + + // Check if the entry fits in the entry buffer. if (possible_entry.status().IsResourceExhausted()) { + ++drop_count_small_stack_buffer_; continue; } - // Report drop count if messages were dropped. - if (committed_entry_drop_count_ > 0 || drop_count > 0) { - // Reuse the log_entry_buffer_ to send a drop message. - const Result<ConstByteSpan> drop_message_result = - CreateEncodedDropMessage(committed_entry_drop_count_ + drop_count, - log_entry_buffer_); - // Add encoded drop messsage if fits in buffer. - if (drop_message_result.ok() && - drop_message_result.value().size() + kLogEntriesEncodeFrameSize < - encoder.ConservativeWriteLimit()) { - PW_CHECK_OK(encoder.WriteBytes( - static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), - drop_message_result.value())); - committed_entry_drop_count_ = 0; - } - if (possible_entry.ok()) { - PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count).status()); - } - } - + // Check if there are any entries left. if (possible_entry.status().IsOutOfRange()) { + // Stash multisink's reported drop count that will be reported later with + // any other drop counts. + drop_count_slow_drain_ += drop_count; return LogDrainState::kCaughtUp; // There are no more entries. } - // At this point all expected error modes have been handled. + // At this point all expected errors have been handled. PW_CHECK_OK(possible_entry.status()); - // TODO(pwbug/559): avoid sending multiple drop counts between filtered out - // log entries. + // Check if the entry passes any set filter rules. if (filter_ != nullptr && filter_->ShouldDropLog(possible_entry.value().entry())) { + // Add the drop count from the multisink peek, stored in `drop_count`, to + // the total drop count. Then drop the entry without counting it towards + // the total drop count. Drops will be reported later all together. + drop_count_slow_drain_ += drop_count; PW_CHECK_OK(PopEntry(possible_entry.value())); - return LogDrainState::kMoreEntriesRemaining; + continue; } - // Check if the entry fits in encoder buffer. + // Check if the entry fits in the encoder buffer by itself. const size_t encoded_entry_size = possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize; if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) { // Entry is larger than the entire available buffer. - ++committed_entry_drop_count_; + ++drop_count_small_outbound_buffer_; PW_CHECK_OK(PopEntry(possible_entry.value())); continue; - } else if (encoded_entry_size > encoder.ConservativeWriteLimit()) { - // Entry does not fit in the partially filled encoder buffer. Notify the - // caller there are more entries to send. + } + + // At this point, we have a valid entry that may fit in the encode buffer. + // Report any drop counts combined reusing the log_entry_buffer_ to encode a + // drop message. + drop_count_slow_drain_ += drop_count; + // Account for dropped entries too large for stack buffer, which PeekEntry() + // also reports. + drop_count_slow_drain_ -= drop_count_small_stack_buffer_; + bool log_entry_buffer_has_valid_entry = possible_entry.ok(); + if (drop_count_slow_drain_ > 0) { + TryEncodeDropMessage(log_entry_buffer_, + std::string_view(kSlowDrainErrorMessage), + drop_count_slow_drain_, + encoder); + log_entry_buffer_has_valid_entry = false; + } + if (drop_count_ingress_error_ > 0) { + TryEncodeDropMessage(log_entry_buffer_, + std::string_view(kIngressErrorMessage), + drop_count_ingress_error_, + encoder); + log_entry_buffer_has_valid_entry = false; + } + if (drop_count_small_stack_buffer_ > 0) { + TryEncodeDropMessage(log_entry_buffer_, + std::string_view(kSmallStackBufferErrorMessage), + drop_count_small_stack_buffer_, + encoder); + log_entry_buffer_has_valid_entry = false; + } + if (drop_count_small_outbound_buffer_ > 0) { + TryEncodeDropMessage(log_entry_buffer_, + std::string_view(kSmallOutboundBufferErrorMessage), + drop_count_small_outbound_buffer_, + encoder); + log_entry_buffer_has_valid_entry = false; + } + if (drop_count_writer_error_ > 0) { + TryEncodeDropMessage(log_entry_buffer_, + std::string_view(kWriterErrorMessage), + drop_count_writer_error_, + encoder); + log_entry_buffer_has_valid_entry = false; + } + if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) { + PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count) + .status()); + } + + // Check if the entry fits in the partially filled encoder buffer. + if (encoded_entry_size > encoder.ConservativeWriteLimit()) { + // Notify the caller there are more entries to send. return LogDrainState::kMoreEntriesRemaining; } - // Encode log entry and remove it from multisink. + // Encode the entry and remove it from multisink. PW_CHECK_OK(encoder.WriteBytes( static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), possible_entry.value().entry())); |