diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/retry_filter.cc')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/retry_filter.cc | 2188 |
1 files changed, 2188 insertions, 0 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/retry_filter.cc b/grpc/src/core/ext/filters/client_channel/retry_filter.cc new file mode 100644 index 00000000..a03d2da1 --- /dev/null +++ b/grpc/src/core/ext/filters/client_channel/retry_filter.cc @@ -0,0 +1,2188 @@ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/retry_filter.h" + +#include "absl/container/inlined_vector.h" +#include "absl/status/statusor.h" +#include "absl/strings/strip.h" + +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/retry_service_config.h" +#include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/ext/filters/client_channel/service_config.h" +#include "src/core/ext/filters/client_channel/service_config_call_data.h" +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/metadata.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/status_metadata.h" +#include "src/core/lib/uri/uri_parser.h" + +// +// Retry filter +// + +// This filter is intended to be used in the DynamicFilter stack in the +// client channel, which is situated between the name resolver and the +// LB policy. Normally, the last filter in the DynamicFilter stack is +// the DynamicTerminationFilter (see client_channel.cc), which creates a +// LoadBalancedCall and delegates to it. However, when retries are +// enabled, this filter is used instead of the DynamicTerminationFilter. +// +// In order to support retries, we act as a proxy for stream op batches. +// When we get a batch from the surface, we add it to our list of pending +// batches, and we then use those batches to construct separate "child" +// batches to be started on an LB call. When the child batches return, we +// then decide which pending batches have been completed and schedule their +// callbacks accordingly. If a call attempt fails and we want to retry it, +// we create a new LB call and start again, constructing new "child" batches +// for the new LB call. +// +// Note that retries are committed when receiving data from the server +// (except for Trailers-Only responses). However, there may be many +// send ops started before receiving any data, so we may have already +// completed some number of send ops (and returned the completions up to +// the surface) by the time we realize that we need to retry. To deal +// with this, we cache data for send ops, so that we can replay them on a +// different LB call even after we have completed the original batches. +// +// The code is structured as follows: +// - In CallData (in the parent channel), we maintain a list of pending +// ops and cached data for send ops. +// - There is a CallData::CallAttempt object for each retry attempt. +// This object contains the LB call for that attempt and state to indicate +// which ops from the CallData object have already been sent down to that +// LB call. +// - There is a CallData::CallAttempt::BatchData object for each "child" +// batch sent on the LB call. +// +// When constructing the "child" batches, we compare the state in the +// CallAttempt object against the state in the CallData object to see +// which batches need to be sent on the LB call for a given attempt. + +// TODO(roth): In subsequent PRs: +// - add support for transparent retries (including initial metadata) +// - figure out how to record stats in census for retries +// (census filter is on top of this one) +// - add census stats for retries + +// By default, we buffer 256 KiB per RPC for retries. +// TODO(roth): Do we have any data to suggest a better value? +#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) + +// This value was picked arbitrarily. It can be changed if there is +// any even moderately compelling reason to do so. +#define RETRY_BACKOFF_JITTER 0.2 + +namespace grpc_core { + +namespace { + +using internal::RetryGlobalConfig; +using internal::RetryMethodConfig; +using internal::RetryServiceConfigParser; +using internal::ServerRetryThrottleData; + +TraceFlag grpc_retry_trace(false, "retry"); + +// +// RetryFilter +// + +class RetryFilter { + public: + class CallData; + + static grpc_error_handle Init(grpc_channel_element* elem, + grpc_channel_element_args* args) { + GPR_ASSERT(args->is_last); + GPR_ASSERT(elem->filter == &kRetryFilterVtable); + grpc_error_handle error = GRPC_ERROR_NONE; + new (elem->channel_data) RetryFilter(args->channel_args, &error); + return error; + } + + static void Destroy(grpc_channel_element* elem) { + auto* chand = static_cast<RetryFilter*>(elem->channel_data); + chand->~RetryFilter(); + } + + // Will never be called. + static void StartTransportOp(grpc_channel_element* /*elem*/, + grpc_transport_op* /*op*/) {} + static void GetChannelInfo(grpc_channel_element* /*elem*/, + const grpc_channel_info* /*info*/) {} + + private: + static size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) { + return static_cast<size_t>(grpc_channel_args_find_integer( + args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE, + {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX})); + } + + RetryFilter(const grpc_channel_args* args, grpc_error_handle* error) + : client_channel_(grpc_channel_args_find_pointer<ClientChannel>( + args, GRPC_ARG_CLIENT_CHANNEL)), + per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)) { + // Get retry throttling parameters from service config. + auto* service_config = grpc_channel_args_find_pointer<ServiceConfig>( + args, GRPC_ARG_SERVICE_CONFIG_OBJ); + if (service_config == nullptr) return; + const auto* config = static_cast<const RetryGlobalConfig*>( + service_config->GetGlobalParsedConfig( + RetryServiceConfigParser::ParserIndex())); + if (config == nullptr) return; + // Get server name from target URI. + const char* server_uri = + grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI); + if (server_uri == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "server URI channel arg missing or wrong type in client channel " + "filter"); + return; + } + absl::StatusOr<URI> uri = URI::Parse(server_uri); + if (!uri.ok() || uri->path().empty()) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "could not extract server name from target URI"); + return; + } + std::string server_name(absl::StripPrefix(uri->path(), "/")); + // Get throttling config for server_name. + retry_throttle_data_ = internal::ServerRetryThrottleMap::GetDataForServer( + server_name, config->max_milli_tokens(), config->milli_token_ratio()); + } + + ClientChannel* client_channel_; + size_t per_rpc_retry_buffer_size_; + RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_; +}; + +// +// RetryFilter::CallData +// + +class RetryFilter::CallData { + public: + static grpc_error_handle Init(grpc_call_element* elem, + const grpc_call_element_args* args); + static void Destroy(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* then_schedule_closure); + static void StartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent); + + private: + class Canceller; + class CallStackDestructionBarrier; + + // Pending batches stored in call data. + struct PendingBatch { + // The pending batch. If nullptr, this slot is empty. + grpc_transport_stream_op_batch* batch = nullptr; + // Indicates whether payload for send ops has been cached in CallData. + bool send_ops_cached = false; + }; + + // State associated with each call attempt. + // Allocated on the arena. + class CallAttempt + : public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> { + public: + explicit CallAttempt(CallData* calld); + + ClientChannel::LoadBalancedCall* lb_call() const { return lb_call_.get(); } + + // Constructs and starts whatever batches are needed on this call + // attempt. + void StartRetriableBatches(); + + // Frees cached send ops that have already been completed after + // committing the call. + void FreeCachedSendOpDataAfterCommit(); + + private: + // State used for starting a retryable batch on the call attempt's LB call. + // This provides its own grpc_transport_stream_op_batch and other data + // structures needed to populate the ops in the batch. + // We allocate one struct on the arena for each attempt at starting a + // batch on a given LB call. + class BatchData + : public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> { + public: + BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount, + bool set_on_complete); + ~BatchData() override; + + grpc_transport_stream_op_batch* batch() { return &batch_; } + + // Adds retriable send_initial_metadata op to batch_data. + void AddRetriableSendInitialMetadataOp(); + // Adds retriable send_message op to batch_data. + void AddRetriableSendMessageOp(); + // Adds retriable send_trailing_metadata op to batch_data. + void AddRetriableSendTrailingMetadataOp(); + // Adds retriable recv_initial_metadata op to batch_data. + void AddRetriableRecvInitialMetadataOp(); + // Adds retriable recv_message op to batch_data. + void AddRetriableRecvMessageOp(); + // Adds retriable recv_trailing_metadata op to batch_data. + void AddRetriableRecvTrailingMetadataOp(); + + private: + // Returns true if the call is being retried. + bool MaybeRetry(grpc_status_code status, grpc_mdelem* server_pushback_md, + bool is_lb_drop); + + // Frees cached send ops that were completed by the completed batch in + // batch_data. Used when batches are completed after the call is + // committed. + void FreeCachedSendOpDataForCompletedBatch(); + + // Invokes recv_initial_metadata_ready for a batch. + static void InvokeRecvInitialMetadataCallback(void* arg, + grpc_error_handle error); + // Intercepts recv_initial_metadata_ready callback for retries. + // Commits the call and returns the initial metadata up the stack. + static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); + + // Invokes recv_message_ready for a batch. + static void InvokeRecvMessageCallback(void* arg, grpc_error_handle error); + // Intercepts recv_message_ready callback for retries. + // Commits the call and returns the message up the stack. + static void RecvMessageReady(void* arg, grpc_error_handle error); + + // Adds recv_trailing_metadata_ready closure to closures. + void AddClosureForRecvTrailingMetadataReady( + grpc_error_handle error, CallCombinerClosureList* closures); + // Adds any necessary closures for deferred recv_initial_metadata and + // recv_message callbacks to closures. + void AddClosuresForDeferredRecvCallbacks( + CallCombinerClosureList* closures); + // For any pending batch containing an op that has not yet been started, + // adds the pending batch's completion closures to closures. + void AddClosuresToFailUnstartedPendingBatches( + grpc_error_handle error, CallCombinerClosureList* closures); + // Runs necessary closures upon completion of a call attempt. + void RunClosuresForCompletedCall(grpc_error_handle error); + // Intercepts recv_trailing_metadata_ready callback for retries. + // Commits the call and returns the trailing metadata up the stack. + static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); + + // Adds the on_complete closure for the pending batch completed in + // batch_data to closures. + void AddClosuresForCompletedPendingBatch( + grpc_error_handle error, CallCombinerClosureList* closures); + + // If there are any cached ops to replay or pending ops to start on the + // LB call, adds them to closures. + void AddClosuresForReplayOrPendingSendOps( + CallCombinerClosureList* closures); + + // Callback used to intercept on_complete from LB calls. + static void OnComplete(void* arg, grpc_error_handle error); + + RefCountedPtr<CallAttempt> call_attempt_; + // The batch to use in the LB call. + // Its payload field points to CallAttempt::batch_payload_. + grpc_transport_stream_op_batch batch_; + // For intercepting on_complete. + grpc_closure on_complete_; + }; + + // Creates a BatchData object on the call's arena with the + // specified refcount. If set_on_complete is true, the batch's + // on_complete callback will be set to point to on_complete(); + // otherwise, the batch's on_complete callback will be null. + BatchData* CreateBatch(int refcount, bool set_on_complete) { + return calld_->arena_->New<BatchData>(Ref(), refcount, set_on_complete); + } + + // If there are any cached send ops that need to be replayed on this + // call attempt, creates and returns a new batch to replay those ops. + // Otherwise, returns nullptr. + BatchData* MaybeCreateBatchForReplay(); + + // Adds batches for pending batches to closures. + void AddBatchesForPendingBatches(CallCombinerClosureList* closures); + + // Adds whatever batches are needed on this attempt to closures. + void AddRetriableBatches(CallCombinerClosureList* closures); + + // Returns true if any op in the batch was not yet started on this attempt. + bool PendingBatchIsUnstarted(PendingBatch* pending); + + // Helper function used to start a recv_trailing_metadata batch. This + // is used in the case where a recv_initial_metadata or recv_message + // op fails in a way that we know the call is over but when the application + // has not yet started its own recv_trailing_metadata op. + void StartInternalRecvTrailingMetadata(); + + CallData* calld_; + RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call_; + + // BatchData.batch.payload points to this. + grpc_transport_stream_op_batch_payload batch_payload_; + // For send_initial_metadata. + // Note that we need to make a copy of the initial metadata for each + // call attempt instead of just referring to the copy in call_data, + // because filters in the subchannel stack may modify the metadata, + // so we need to start in a pristine state for each attempt of the call. + grpc_linked_mdelem* send_initial_metadata_storage_; + grpc_metadata_batch send_initial_metadata_; + // For send_message. + // TODO(roth): Restructure this to eliminate use of ManualConstructor. + ManualConstructor<ByteStreamCache::CachingByteStream> send_message_; + // For send_trailing_metadata. + grpc_linked_mdelem* send_trailing_metadata_storage_; + grpc_metadata_batch send_trailing_metadata_; + // For intercepting recv_initial_metadata. + grpc_metadata_batch recv_initial_metadata_; + grpc_closure recv_initial_metadata_ready_; + bool trailing_metadata_available_ = false; + // For intercepting recv_message. + grpc_closure recv_message_ready_; + OrphanablePtr<ByteStream> recv_message_; + // For intercepting recv_trailing_metadata. + grpc_metadata_batch recv_trailing_metadata_; + grpc_transport_stream_stats collect_stats_; + grpc_closure recv_trailing_metadata_ready_; + // These fields indicate which ops have been started and completed on + // this call attempt. + size_t started_send_message_count_ = 0; + size_t completed_send_message_count_ = 0; + size_t started_recv_message_count_ = 0; + size_t completed_recv_message_count_ = 0; + bool started_send_initial_metadata_ : 1; + bool completed_send_initial_metadata_ : 1; + bool started_send_trailing_metadata_ : 1; + bool completed_send_trailing_metadata_ : 1; + bool started_recv_initial_metadata_ : 1; + bool completed_recv_initial_metadata_ : 1; + bool started_recv_trailing_metadata_ : 1; + bool completed_recv_trailing_metadata_ : 1; + // State for callback processing. + BatchData* recv_initial_metadata_ready_deferred_batch_ = nullptr; + grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE; + BatchData* recv_message_ready_deferred_batch_ = nullptr; + grpc_error_handle recv_message_error_ = GRPC_ERROR_NONE; + BatchData* recv_trailing_metadata_internal_batch_ = nullptr; + // NOTE: Do not move this next to the metadata bitfields above. That would + // save space but will also result in a data race because compiler + // will generate a 2 byte store which overwrites the meta-data + // fields upon setting this field. + bool retry_dispatched_ : 1; + }; + + CallData(RetryFilter* chand, const grpc_call_element_args& args); + ~CallData(); + + void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); + + // Returns the index into pending_batches_ to be used for batch. + static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); + PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch); + void PendingBatchClear(PendingBatch* pending); + void MaybeClearPendingBatch(PendingBatch* pending); + static void FailPendingBatchInCallCombiner(void* arg, + grpc_error_handle error); + // Fails all pending batches. Does NOT yield call combiner. + void PendingBatchesFail(grpc_error_handle error); + // Returns a pointer to the first pending batch for which predicate(batch) + // returns true, or null if not found. + template <typename Predicate> + PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate); + + // Caches data for send ops so that it can be retried later, if not + // already cached. + void MaybeCacheSendOpsForBatch(PendingBatch* pending); + void FreeCachedSendInitialMetadata(); + // Frees cached send_message at index idx. + void FreeCachedSendMessage(size_t idx); + void FreeCachedSendTrailingMetadata(); + void FreeAllCachedSendOpData(); + + // Commits the call so that no further retry attempts will be performed. + void RetryCommit(CallAttempt* call_attempt); + + // Starts a retry after appropriate back-off. + void DoRetry(grpc_millis server_pushback_ms); + static void OnRetryTimer(void* arg, grpc_error_handle error); + + RefCountedPtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall(); + + void CreateCallAttempt(); + + // Adds a closure to closures that will execute batch in the call combiner. + void AddClosureForBatch(grpc_transport_stream_op_batch* batch, + CallCombinerClosureList* closures); + + RetryFilter* chand_; + grpc_polling_entity* pollent_; + RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_; + const RetryMethodConfig* retry_policy_ = nullptr; + BackOff retry_backoff_; + + grpc_slice path_; // Request path. + gpr_cycle_counter call_start_time_; + grpc_millis deadline_; + Arena* arena_; + grpc_call_stack* owning_call_; + CallCombiner* call_combiner_; + grpc_call_context_element* call_context_; + + RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_; + + // TODO(roth): As part of implementing hedging, we will need to maintain a + // list of all pending attempts, so that we can cancel them all if the call + // gets cancelled. + RefCountedPtr<CallAttempt> call_attempt_; + + // LB call used when the call is commited before any CallAttempt is + // created. + // TODO(roth): Change CallAttempt logic such that once we've committed + // and all cached send ops have been replayed, we move the LB call + // from the CallAttempt here, thus creating a fast path for the + // remainder of the streaming call. + RefCountedPtr<ClientChannel::LoadBalancedCall> committed_call_; + + // When are are not yet fully committed to a particular call (i.e., + // either we might still retry or we have committed to the call but + // there are still some cached ops to be replayed on the call), + // batches received from above will be added to this list, and they + // will not be removed until we have invoked their completion callbacks. + size_t bytes_buffered_for_retry_ = 0; + PendingBatch pending_batches_[MAX_PENDING_BATCHES]; + bool pending_send_initial_metadata_ : 1; + bool pending_send_message_ : 1; + bool pending_send_trailing_metadata_ : 1; + + // Retry state. + bool retry_committed_ : 1; + bool last_attempt_got_server_pushback_ : 1; + int num_attempts_completed_ = 0; + Mutex timer_mu_; + Canceller* canceller_ ABSL_GUARDED_BY(timer_mu_); + grpc_timer retry_timer_ ABSL_GUARDED_BY(timer_mu_); + grpc_closure retry_closure_; + + // The number of batches containing send ops that are currently in-flight + // on any call attempt. + // We hold a ref to the call stack while this is non-zero, since replay + // batches may not complete until after all callbacks have been returned + // to the surface, and we need to make sure that the call is not destroyed + // until all of these batches have completed. + // Note that we actually only need to track replay batches, but it's + // easier to track all batches with send ops. + int num_in_flight_call_attempt_send_batches_ = 0; + + // Cached data for retrying send ops. + // send_initial_metadata + bool seen_send_initial_metadata_ = false; + grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr; + grpc_metadata_batch send_initial_metadata_; + uint32_t send_initial_metadata_flags_; + // TODO(roth): As part of implementing hedging, we'll probably need to + // have the LB call set a value in CallAttempt and then propagate it + // from CallAttempt to the parent call when we commit. Otherwise, we + // may leave this with a value for a peer other than the one we + // actually commit to. + gpr_atm* peer_string_; + // send_message + // When we get a send_message op, we replace the original byte stream + // with a CachingByteStream that caches the slices to a local buffer for + // use in retries. + // Note: We inline the cache for the first 3 send_message ops and use + // dynamic allocation after that. This number was essentially picked + // at random; it could be changed in the future to tune performance. + absl::InlinedVector<ByteStreamCache*, 3> send_messages_; + // send_trailing_metadata + bool seen_send_trailing_metadata_ = false; + grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr; + grpc_metadata_batch send_trailing_metadata_; +}; + +// +// RetryFilter::CallData::CallStackDestructionBarrier +// + +// A class to track the existence of LoadBalancedCall call stacks that +// we've created. We wait until all such call stacks have been +// destroyed before we return the on_call_stack_destruction closure up +// to the surface. +// +// The parent RetryFilter::CallData object holds a ref to this object. +// When it is destroyed, it will store the on_call_stack_destruction +// closure from the surface in this object and then release its ref. +// We also take a ref to this object for each LB call we create, and +// those refs are not released until the LB call stack is destroyed. +// When this object is destroyed, it will invoke the +// on_call_stack_destruction closure from the surface. +class RetryFilter::CallData::CallStackDestructionBarrier + : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount, + kUnrefCallDtor> { + public: + CallStackDestructionBarrier() {} + + ~CallStackDestructionBarrier() override { + // TODO(yashkt) : This can potentially be a Closure::Run + ExecCtx::Run(DEBUG_LOCATION, on_call_stack_destruction_, GRPC_ERROR_NONE); + } + + // Set the closure from the surface. This closure will be invoked + // when this object is destroyed. + void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) { + on_call_stack_destruction_ = on_call_stack_destruction; + } + + // Invoked to get an on_call_stack_destruction closure for a new LB call. + grpc_closure* MakeLbCallDestructionClosure(CallData* calld) { + Ref().release(); // Ref held by callback. + grpc_closure* on_lb_call_destruction_complete = + calld->arena_->New<grpc_closure>(); + GRPC_CLOSURE_INIT(on_lb_call_destruction_complete, + OnLbCallDestructionComplete, this, nullptr); + return on_lb_call_destruction_complete; + } + + private: + static void OnLbCallDestructionComplete(void* arg, + grpc_error_handle /*error*/) { + auto* self = static_cast<CallStackDestructionBarrier*>(arg); + self->Unref(); + } + + grpc_closure* on_call_stack_destruction_ = nullptr; +}; + +// +// RetryFilter::CallData::Canceller +// + +class RetryFilter::CallData::Canceller { + public: + explicit Canceller(CallData* calld) : calld_(calld) { + GRPC_CALL_STACK_REF(calld_->owning_call_, "RetryCanceller"); + GRPC_CLOSURE_INIT(&closure_, &Cancel, this, nullptr); + calld_->call_combiner_->SetNotifyOnCancel(&closure_); + } + + private: + static void Cancel(void* arg, grpc_error_handle error) { + auto* self = static_cast<Canceller*>(arg); + auto* calld = self->calld_; + { + MutexLock lock(&calld->timer_mu_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "calld=%p: cancelling retry timer: error=%s self=%p " + "calld->canceller_=%p", + calld, grpc_error_std_string(error).c_str(), self, + calld->canceller_); + } + if (calld->canceller_ == self && error != GRPC_ERROR_NONE) { + calld->canceller_ = nullptr; // Checked by OnRetryTimer(). + grpc_timer_cancel(&calld->retry_timer_); + calld->FreeAllCachedSendOpData(); + GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "Canceller"); + } + } + GRPC_CALL_STACK_UNREF(calld->owning_call_, "RetryCanceller"); + delete self; + } + + CallData* calld_; + grpc_closure closure_; +}; + +// +// RetryFilter::CallData::CallAttempt +// + +RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) + : calld_(calld), + batch_payload_(calld->call_context_), + started_send_initial_metadata_(false), + completed_send_initial_metadata_(false), + started_send_trailing_metadata_(false), + completed_send_trailing_metadata_(false), + started_recv_initial_metadata_(false), + completed_recv_initial_metadata_(false), + started_recv_trailing_metadata_(false), + completed_recv_trailing_metadata_(false), + retry_dispatched_(false) { + lb_call_ = calld->CreateLoadBalancedCall(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: attempt=%p: create lb_call=%p", + calld->chand_, calld, this, lb_call_.get()); + } +} + +void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() { + // TODO(roth): When we implement hedging, this logic will need to get + // a bit more complex, because there may be other (now abandoned) call + // attempts still using this data. We may need to do some sort of + // ref-counting instead. + if (completed_send_initial_metadata_) { + calld_->FreeCachedSendInitialMetadata(); + } + for (size_t i = 0; i < completed_send_message_count_; ++i) { + calld_->FreeCachedSendMessage(i); + } + if (completed_send_trailing_metadata_) { + calld_->FreeCachedSendTrailingMetadata(); + } +} + +bool RetryFilter::CallData::CallAttempt::PendingBatchIsUnstarted( + PendingBatch* pending) { + // Only look at batches containing send ops, since batches containing + // only recv ops are always started immediately. + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !started_send_initial_metadata_) { + return true; + } + if (pending->batch->send_message && + started_send_message_count_ < calld_->send_messages_.size()) { + return true; + } + if (pending->batch->send_trailing_metadata && + !started_send_trailing_metadata_) { + return true; + } + return false; +} + +void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: call failed but recv_trailing_metadata not " + "started; starting it internally", + calld_->chand_, calld_); + } + // Create batch_data with 2 refs, since this batch will be unreffed twice: + // once for the recv_trailing_metadata_ready callback when the batch + // completes, and again when we actually get a recv_trailing_metadata + // op from the surface. + BatchData* batch_data = CreateBatch(2, false /* set_on_complete */); + batch_data->AddRetriableRecvTrailingMetadataOp(); + recv_trailing_metadata_internal_batch_ = batch_data; + // Note: This will release the call combiner. + lb_call_->StartTransportStreamOpBatch(batch_data->batch()); +} + +// If there are any cached send ops that need to be replayed on the +// current call attempt, creates and returns a new batch to replay those ops. +// Otherwise, returns nullptr. +RetryFilter::CallData::CallAttempt::BatchData* +RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { + BatchData* replay_batch_data = nullptr; + // send_initial_metadata. + if (calld_->seen_send_initial_metadata_ && !started_send_initial_metadata_ && + !calld_->pending_send_initial_metadata_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: replaying previously completed " + "send_initial_metadata op", + calld_->chand_, calld_); + } + replay_batch_data = CreateBatch(1, true /* set_on_complete */); + replay_batch_data->AddRetriableSendInitialMetadataOp(); + } + // send_message. + // Note that we can only have one send_message op in flight at a time. + if (started_send_message_count_ < calld_->send_messages_.size() && + started_send_message_count_ == completed_send_message_count_ && + !calld_->pending_send_message_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: replaying previously completed " + "send_message op", + calld_->chand_, calld_); + } + if (replay_batch_data == nullptr) { + replay_batch_data = CreateBatch(1, true /* set_on_complete */); + } + replay_batch_data->AddRetriableSendMessageOp(); + } + // send_trailing_metadata. + // Note that we only add this op if we have no more send_message ops + // to start, since we can't send down any more send_message ops after + // send_trailing_metadata. + if (calld_->seen_send_trailing_metadata_ && + started_send_message_count_ == calld_->send_messages_.size() && + !started_send_trailing_metadata_ && + !calld_->pending_send_trailing_metadata_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: replaying previously completed " + "send_trailing_metadata op", + calld_->chand_, calld_); + } + if (replay_batch_data == nullptr) { + replay_batch_data = CreateBatch(1, true /* set_on_complete */); + } + replay_batch_data->AddRetriableSendTrailingMetadataOp(); + } + return replay_batch_data; +} + +void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( + CallCombinerClosureList* closures) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) { + PendingBatch* pending = &calld_->pending_batches_[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch == nullptr) continue; + // Skip any batch that either (a) has already been started on this + // call attempt or (b) we can't start yet because we're still + // replaying send ops that need to be completed first. + // TODO(roth): Note that if any one op in the batch can't be sent + // yet due to ops that we're replaying, we don't start any of the ops + // in the batch. This is probably okay, but it could conceivably + // lead to increased latency in some cases -- e.g., we could delay + // starting a recv op due to it being in the same batch with a send + // op. If/when we revamp the callback protocol in + // transport_stream_op_batch, we may be able to fix this. + if (batch->send_initial_metadata && started_send_initial_metadata_) { + continue; + } + if (batch->send_message && + completed_send_message_count_ < started_send_message_count_) { + continue; + } + // Note that we only start send_trailing_metadata if we have no more + // send_message ops to start, since we can't send down any more + // send_message ops after send_trailing_metadata. + if (batch->send_trailing_metadata && + (started_send_message_count_ + batch->send_message < + calld_->send_messages_.size() || + started_send_trailing_metadata_)) { + continue; + } + if (batch->recv_initial_metadata && started_recv_initial_metadata_) { + continue; + } + if (batch->recv_message && + completed_recv_message_count_ < started_recv_message_count_) { + continue; + } + if (batch->recv_trailing_metadata && started_recv_trailing_metadata_) { + // If we previously completed a recv_trailing_metadata op + // initiated by StartInternalRecvTrailingMetadata(), use the + // result of that instead of trying to re-start this op. + if (GPR_UNLIKELY(recv_trailing_metadata_internal_batch_ != nullptr)) { + // If the batch completed, then trigger the completion callback + // directly, so that we return the previously returned results to + // the application. Otherwise, just unref the internally started + // batch, since we'll propagate the completion when it completes. + if (completed_recv_trailing_metadata_) { + // Batches containing recv_trailing_metadata always succeed. + closures->Add( + &recv_trailing_metadata_ready_, GRPC_ERROR_NONE, + "re-executing recv_trailing_metadata_ready to propagate " + "internally triggered result"); + } else { + recv_trailing_metadata_internal_batch_->Unref(); + } + recv_trailing_metadata_internal_batch_ = nullptr; + } + continue; + } + // If we're already committed, just send the batch as-is. + if (calld_->retry_committed_) { + calld_->AddClosureForBatch(batch, closures); + calld_->PendingBatchClear(pending); + continue; + } + // Create batch with the right number of callbacks. + const bool has_send_ops = batch->send_initial_metadata || + batch->send_message || + batch->send_trailing_metadata; + const int num_callbacks = has_send_ops + batch->recv_initial_metadata + + batch->recv_message + + batch->recv_trailing_metadata; + CallAttempt::BatchData* batch_data = + CreateBatch(num_callbacks, has_send_ops /* set_on_complete */); + // Cache send ops if needed. + calld_->MaybeCacheSendOpsForBatch(pending); + // send_initial_metadata. + if (batch->send_initial_metadata) { + batch_data->AddRetriableSendInitialMetadataOp(); + } + // send_message. + if (batch->send_message) { + batch_data->AddRetriableSendMessageOp(); + } + // send_trailing_metadata. + if (batch->send_trailing_metadata) { + batch_data->AddRetriableSendTrailingMetadataOp(); + } + // recv_initial_metadata. + if (batch->recv_initial_metadata) { + // recv_flags is only used on the server side. + GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); + batch_data->AddRetriableRecvInitialMetadataOp(); + } + // recv_message. + if (batch->recv_message) { + batch_data->AddRetriableRecvMessageOp(); + } + // recv_trailing_metadata. + if (batch->recv_trailing_metadata) { + batch_data->AddRetriableRecvTrailingMetadataOp(); + } + calld_->AddClosureForBatch(batch_data->batch(), closures); + // Track number of in-flight send batches. + // If this is the first one, take a ref to the call stack. + if (batch->send_initial_metadata || batch->send_message || + batch->send_trailing_metadata) { + if (calld_->num_in_flight_call_attempt_send_batches_ == 0) { + GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches"); + } + ++calld_->num_in_flight_call_attempt_send_batches_; + } + } +} + +void RetryFilter::CallData::CallAttempt::AddRetriableBatches( + CallCombinerClosureList* closures) { + // Replay previously-returned send_* ops if needed. + BatchData* replay_batch_data = MaybeCreateBatchForReplay(); + if (replay_batch_data != nullptr) { + calld_->AddClosureForBatch(replay_batch_data->batch(), closures); + // Track number of pending send batches. + // If this is the first one, take a ref to the call stack. + if (calld_->num_in_flight_call_attempt_send_batches_ == 0) { + GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches"); + } + ++calld_->num_in_flight_call_attempt_send_batches_; + } + // Now add pending batches. + AddBatchesForPendingBatches(closures); +} + +void RetryFilter::CallData::CallAttempt::StartRetriableBatches() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", + calld_->chand_, calld_); + } + // Construct list of closures to execute, one for each pending batch. + CallCombinerClosureList closures; + AddRetriableBatches(&closures); + // Note: This will yield the call combiner. + // Start batches on LB call. + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: starting %" PRIuPTR + " retriable batches on lb_call=%p", + calld_->chand_, calld_, closures.size(), lb_call()); + } + closures.RunClosures(calld_->call_combiner_); +} + +// +// RetryFilter::CallData::CallAttempt::BatchData +// + +RetryFilter::CallData::CallAttempt::BatchData::BatchData( + RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete) + : RefCounted(nullptr, refcount), call_attempt_(std::move(attempt)) { + // TODO(roth): Consider holding this ref on the call stack in + // CallAttempt instead of here in BatchData. This would eliminate the + // need for CallData::num_in_flight_call_attempt_send_batches_. + // But it would require having a way to unref CallAttempt when it is + // no longer needed (i.e., when the call is committed and all cached + // send ops have been replayed and the LB call is moved into + // CallData::committed_call_). + GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "CallAttempt"); + batch_.payload = &call_attempt_->batch_payload_; + if (set_on_complete) { + GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, + grpc_schedule_on_exec_ctx); + batch_.on_complete = &on_complete_; + } +} + +RetryFilter::CallData::CallAttempt::BatchData::~BatchData() { + if (batch_.send_initial_metadata) { + grpc_metadata_batch_destroy(&call_attempt_->send_initial_metadata_); + } + if (batch_.send_trailing_metadata) { + grpc_metadata_batch_destroy(&call_attempt_->send_trailing_metadata_); + } + if (batch_.recv_initial_metadata) { + grpc_metadata_batch_destroy(&call_attempt_->recv_initial_metadata_); + } + if (batch_.recv_trailing_metadata) { + grpc_metadata_batch_destroy(&call_attempt_->recv_trailing_metadata_); + } + GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "CallAttempt"); +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + FreeCachedSendOpDataForCompletedBatch() { + auto* calld = call_attempt_->calld_; + // TODO(roth): When we implement hedging, this logic will need to get + // a bit more complex, because there may be other (now abandoned) call + // attempts still using this data. We may need to do some sort of + // ref-counting instead. + if (batch_.send_initial_metadata) { + calld->FreeCachedSendInitialMetadata(); + } + if (batch_.send_message) { + calld->FreeCachedSendMessage(call_attempt_->completed_send_message_count_ - + 1); + } + if (batch_.send_trailing_metadata) { + calld->FreeCachedSendTrailingMetadata(); + } +} + +bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( + grpc_status_code status, grpc_mdelem* server_pushback_md, bool is_lb_drop) { + auto* calld = call_attempt_->calld_; + // LB drops always inhibit retries. + if (is_lb_drop) return false; + // Get retry policy. + if (calld->retry_policy_ == nullptr) return false; + // If we've already dispatched a retry from this call, return true. + // This catches the case where the batch has multiple callbacks + // (i.e., it includes either recv_message or recv_initial_metadata). + if (call_attempt_->retry_dispatched_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", + calld->chand_, calld); + } + return true; + } + // Check status. + if (GPR_LIKELY(status == GRPC_STATUS_OK)) { + if (calld->retry_throttle_data_ != nullptr) { + calld->retry_throttle_data_->RecordSuccess(); + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", calld->chand_, + calld); + } + return false; + } + // Status is not OK. Check whether the status is retryable. + if (!calld->retry_policy_->retryable_status_codes().Contains(status)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: status %s not configured as retryable", + calld->chand_, calld, grpc_status_code_to_string(status)); + } + return false; + } + // Record the failure and check whether retries are throttled. + // Note that it's important for this check to come after the status + // code check above, since we should only record failures whose statuses + // match the configured retryable status codes, so that we don't count + // things like failures due to malformed requests (INVALID_ARGUMENT). + // Conversely, it's important for this to come before the remaining + // checks, so that we don't fail to record failures due to other factors. + if (calld->retry_throttle_data_ != nullptr && + !calld->retry_throttle_data_->RecordFailure()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", calld->chand_, + calld); + } + return false; + } + // Check whether the call is committed. + if (calld->retry_committed_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", + calld->chand_, calld); + } + return false; + } + // Check whether we have retries remaining. + ++calld->num_attempts_completed_; + if (calld->num_attempts_completed_ >= calld->retry_policy_->max_attempts()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", + calld->chand_, calld, calld->retry_policy_->max_attempts()); + } + return false; + } + // Check server push-back. + grpc_millis server_pushback_ms = -1; + if (server_pushback_md != nullptr) { + // If the value is "-1" or any other unparseable string, we do not retry. + uint32_t ms; + if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: not retrying due to server push-back", + calld->chand_, calld); + } + return false; + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", + calld->chand_, calld, ms); + } + server_pushback_ms = static_cast<grpc_millis>(ms); + } + } + // Do retry. + call_attempt_->retry_dispatched_ = true; + calld->DoRetry(server_pushback_ms); + return true; +} + +// +// recv_initial_metadata callback handling +// + +void RetryFilter::CallData::CallAttempt::BatchData:: + InvokeRecvInitialMetadataCallback(void* arg, grpc_error_handle error) { + auto* batch_data = static_cast<CallAttempt::BatchData*>(arg); + auto* call_attempt = batch_data->call_attempt_.get(); + // Find pending batch. + PendingBatch* pending = call_attempt->calld_->PendingBatchFind( + "invoking recv_initial_metadata_ready for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_initial_metadata && + batch->payload->recv_initial_metadata + .recv_initial_metadata_ready != nullptr; + }); + GPR_ASSERT(pending != nullptr); + // Return metadata. + grpc_metadata_batch_move( + &call_attempt->recv_initial_metadata_, + pending->batch->payload->recv_initial_metadata.recv_initial_metadata); + // Update bookkeeping. + // Note: Need to do this before invoking the callback, since invoking + // the callback will result in yielding the call combiner. + grpc_closure* recv_initial_metadata_ready = + pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready; + pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + nullptr; + call_attempt->calld_->MaybeClearPendingBatch(pending); + batch_data->Unref(); + // Invoke callback. + Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady( + void* arg, grpc_error_handle error) { + CallAttempt::BatchData* batch_data = + static_cast<CallAttempt::BatchData*>(arg); + CallAttempt* call_attempt = batch_data->call_attempt_.get(); + CallData* calld = call_attempt->calld_; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", + calld->chand_, calld, grpc_error_std_string(error).c_str()); + } + call_attempt->completed_recv_initial_metadata_ = true; + // If a retry was already dispatched, then we're not going to use the + // result of this recv_initial_metadata op, so do nothing. + if (call_attempt->retry_dispatched_) { + GRPC_CALL_COMBINER_STOP( + calld->call_combiner_, + "recv_initial_metadata_ready after retry dispatched"); + return; + } + if (!calld->retry_committed_) { + // If we got an error or a Trailers-Only response and have not yet gotten + // the recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. + if (GPR_UNLIKELY((call_attempt->trailing_metadata_available_ || + error != GRPC_ERROR_NONE) && + !call_attempt->completed_recv_trailing_metadata_)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: deferring recv_initial_metadata_ready " + "(Trailers-Only)", + calld->chand_, calld); + } + call_attempt->recv_initial_metadata_ready_deferred_batch_ = batch_data; + call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error); + if (!call_attempt->started_recv_trailing_metadata_) { + // recv_trailing_metadata not yet started by application; start it + // ourselves to get status. + call_attempt->StartInternalRecvTrailingMetadata(); + } else { + GRPC_CALL_COMBINER_STOP( + calld->call_combiner_, + "recv_initial_metadata_ready trailers-only or error"); + } + return; + } + // Received valid initial metadata, so commit the call. + calld->RetryCommit(call_attempt); + } + // Invoke the callback to return the result to the surface. + // Manually invoking a callback function; it does not take ownership of error. + InvokeRecvInitialMetadataCallback(batch_data, error); +} + +// +// recv_message callback handling +// + +void RetryFilter::CallData::CallAttempt::BatchData::InvokeRecvMessageCallback( + void* arg, grpc_error_handle error) { + CallAttempt::BatchData* batch_data = + static_cast<CallAttempt::BatchData*>(arg); + CallAttempt* call_attempt = batch_data->call_attempt_.get(); + CallData* calld = call_attempt->calld_; + // Find pending op. + PendingBatch* pending = calld->PendingBatchFind( + "invoking recv_message_ready for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_message && + batch->payload->recv_message.recv_message_ready != nullptr; + }); + GPR_ASSERT(pending != nullptr); + // Return payload. + *pending->batch->payload->recv_message.recv_message = + std::move(call_attempt->recv_message_); + // Update bookkeeping. + // Note: Need to do this before invoking the callback, since invoking + // the callback will result in yielding the call combiner. + grpc_closure* recv_message_ready = + pending->batch->payload->recv_message.recv_message_ready; + pending->batch->payload->recv_message.recv_message_ready = nullptr; + calld->MaybeClearPendingBatch(pending); + batch_data->Unref(); + // Invoke callback. + Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error)); +} + +void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady( + void* arg, grpc_error_handle error) { + CallAttempt::BatchData* batch_data = + static_cast<CallAttempt::BatchData*>(arg); + CallAttempt* call_attempt = batch_data->call_attempt_.get(); + CallData* calld = call_attempt->calld_; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", + calld->chand_, calld, grpc_error_std_string(error).c_str()); + } + ++call_attempt->completed_recv_message_count_; + // If a retry was already dispatched, then we're not going to use the + // result of this recv_message op, so do nothing. + if (call_attempt->retry_dispatched_) { + GRPC_CALL_COMBINER_STOP(calld->call_combiner_, + "recv_message_ready after retry dispatched"); + return; + } + if (!calld->retry_committed_) { + // If we got an error or the payload was nullptr and we have not yet gotten + // the recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. + if (GPR_UNLIKELY((call_attempt->recv_message_ == nullptr || + error != GRPC_ERROR_NONE) && + !call_attempt->completed_recv_trailing_metadata_)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: deferring recv_message_ready (nullptr " + "message and recv_trailing_metadata pending)", + calld->chand_, calld); + } + call_attempt->recv_message_ready_deferred_batch_ = batch_data; + call_attempt->recv_message_error_ = GRPC_ERROR_REF(error); + if (!call_attempt->started_recv_trailing_metadata_) { + // recv_trailing_metadata not yet started by application; start it + // ourselves to get status. + call_attempt->StartInternalRecvTrailingMetadata(); + } else { + GRPC_CALL_COMBINER_STOP(calld->call_combiner_, + "recv_message_ready null"); + } + return; + } + // Received a valid message, so commit the call. + calld->RetryCommit(call_attempt); + } + // Invoke the callback to return the result to the surface. + // Manually invoking a callback function; it does not take ownership of error. + InvokeRecvMessageCallback(batch_data, error); +} + +// +// recv_trailing_metadata handling +// + +namespace { + +// Sets *status, *server_pushback_md, and *is_lb_drop based on md_batch +// and error. +void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch, + grpc_error_handle error, grpc_status_code* status, + grpc_mdelem** server_pushback_md, bool* is_lb_drop) { + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); + intptr_t value = 0; + if (grpc_error_get_int(error, GRPC_ERROR_INT_LB_POLICY_DROP, &value) && + value != 0) { + *is_lb_drop = true; + } + } else { + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + *status = + grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + } + } + GRPC_ERROR_UNREF(error); +} + +} // namespace + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddClosureForRecvTrailingMetadataReady(grpc_error_handle error, + CallCombinerClosureList* closures) { + auto* calld = call_attempt_->calld_; + // Find pending batch. + PendingBatch* pending = calld->PendingBatchFind( + "invoking recv_trailing_metadata for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_trailing_metadata && + batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready != nullptr; + }); + // If we generated the recv_trailing_metadata op internally via + // StartInternalRecvTrailingMetadata(), then there will be no pending batch. + if (pending == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } + // Return metadata. + grpc_metadata_batch_move( + &call_attempt_->recv_trailing_metadata_, + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); + // Add closure. + closures->Add(pending->batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + error, "recv_trailing_metadata_ready for pending batch"); + // Update bookkeeping. + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + nullptr; + calld->MaybeClearPendingBatch(pending); +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddClosuresForDeferredRecvCallbacks(CallCombinerClosureList* closures) { + if (batch_.recv_trailing_metadata) { + // Add closure for deferred recv_initial_metadata_ready. + if (GPR_UNLIKELY( + call_attempt_->recv_initial_metadata_ready_deferred_batch_ != + nullptr)) { + GRPC_CLOSURE_INIT( + &call_attempt_->recv_initial_metadata_ready_, + InvokeRecvInitialMetadataCallback, + call_attempt_->recv_initial_metadata_ready_deferred_batch_, + grpc_schedule_on_exec_ctx); + closures->Add(&call_attempt_->recv_initial_metadata_ready_, + call_attempt_->recv_initial_metadata_error_, + "resuming recv_initial_metadata_ready"); + call_attempt_->recv_initial_metadata_ready_deferred_batch_ = nullptr; + } + // Add closure for deferred recv_message_ready. + if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ != + nullptr)) { + GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, + InvokeRecvMessageCallback, + call_attempt_->recv_message_ready_deferred_batch_, + grpc_schedule_on_exec_ctx); + closures->Add(&call_attempt_->recv_message_ready_, + call_attempt_->recv_message_error_, + "resuming recv_message_ready"); + call_attempt_->recv_message_ready_deferred_batch_ = nullptr; + } + } +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddClosuresToFailUnstartedPendingBatches( + grpc_error_handle error, CallCombinerClosureList* closures) { + auto* calld = call_attempt_->calld_; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) { + PendingBatch* pending = &calld->pending_batches_[i]; + if (call_attempt_->PendingBatchIsUnstarted(pending)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: failing unstarted pending batch at " + "index %" PRIuPTR, + calld->chand_, calld, i); + } + closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), + "failing on_complete for pending batch"); + pending->batch->on_complete = nullptr; + calld->MaybeClearPendingBatch(pending); + } + } + GRPC_ERROR_UNREF(error); +} + +void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall( + grpc_error_handle error) { + // Construct list of closures to execute. + CallCombinerClosureList closures; + // First, add closure for recv_trailing_metadata_ready. + AddClosureForRecvTrailingMetadataReady(GRPC_ERROR_REF(error), &closures); + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + AddClosuresForDeferredRecvCallbacks(&closures); + // Add closures to fail any pending batches that have not yet been started. + AddClosuresToFailUnstartedPendingBatches(GRPC_ERROR_REF(error), &closures); + // Schedule all of the closures identified above. + // Note: This will release the call combiner. + closures.RunClosures(call_attempt_->calld_->call_combiner_); + // Don't need batch_data anymore. + Unref(); + GRPC_ERROR_UNREF(error); +} + +void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( + void* arg, grpc_error_handle error) { + CallAttempt::BatchData* batch_data = + static_cast<CallAttempt::BatchData*>(arg); + CallAttempt* call_attempt = batch_data->call_attempt_.get(); + CallData* calld = call_attempt->calld_; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", + calld->chand_, calld, grpc_error_std_string(error).c_str()); + } + call_attempt->completed_recv_trailing_metadata_ = true; + // Get the call's status and check for server pushback metadata. + grpc_status_code status = GRPC_STATUS_OK; + grpc_mdelem* server_pushback_md = nullptr; + grpc_metadata_batch* md_batch = + batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata; + bool is_lb_drop = false; + GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status, + &server_pushback_md, &is_lb_drop); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log( + GPR_INFO, "chand=%p calld=%p: call finished, status=%s is_lb_drop=%d", + calld->chand_, calld, grpc_status_code_to_string(status), is_lb_drop); + } + // Check if we should retry. + if (batch_data->MaybeRetry(status, server_pushback_md, is_lb_drop)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (call_attempt->recv_initial_metadata_ready_deferred_batch_ != nullptr) { + GRPC_ERROR_UNREF(call_attempt->recv_initial_metadata_error_); + batch_data->Unref(); + } + if (call_attempt->recv_message_ready_deferred_batch_ != nullptr) { + GRPC_ERROR_UNREF(call_attempt->recv_message_error_); + batch_data->Unref(); + } + batch_data->Unref(); + return; + } + // Not retrying, so commit the call. + calld->RetryCommit(call_attempt); + // Run any necessary closures. + batch_data->RunClosuresForCompletedCall(GRPC_ERROR_REF(error)); +} + +// +// on_complete callback handling +// + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddClosuresForCompletedPendingBatch(grpc_error_handle error, + CallCombinerClosureList* closures) { + auto* calld = call_attempt_->calld_; + PendingBatch* pending = calld->PendingBatchFind( + "completed", [this](grpc_transport_stream_op_batch* batch) { + // Match the pending batch with the same set of send ops as the + // batch we've just completed. + return batch->on_complete != nullptr && + batch_.send_initial_metadata == batch->send_initial_metadata && + batch_.send_message == batch->send_message && + batch_.send_trailing_metadata == batch->send_trailing_metadata; + }); + // If batch_data is a replay batch, then there will be no pending + // batch to complete. + if (pending == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } + // Add closure. + closures->Add(pending->batch->on_complete, error, + "on_complete for pending batch"); + pending->batch->on_complete = nullptr; + calld->MaybeClearPendingBatch(pending); +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) { + auto* calld = call_attempt_->calld_; + // We don't check send_initial_metadata here, because that op will always + // be started as soon as it is received from the surface, so it will + // never need to be started at this point. + bool have_pending_send_message_ops = + call_attempt_->started_send_message_count_ < calld->send_messages_.size(); + bool have_pending_send_trailing_metadata_op = + calld->seen_send_trailing_metadata_ && + !call_attempt_->started_send_trailing_metadata_; + if (!have_pending_send_message_ops && + !have_pending_send_trailing_metadata_op) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) { + PendingBatch* pending = &calld->pending_batches_[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch == nullptr || pending->send_ops_cached) continue; + if (batch->send_message) have_pending_send_message_ops = true; + if (batch->send_trailing_metadata) { + have_pending_send_trailing_metadata_op = true; + } + } + } + if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: starting next batch for pending send op(s)", + calld->chand_, calld); + } + call_attempt_->AddRetriableBatches(closures); + } +} + +void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( + void* arg, grpc_error_handle error) { + CallAttempt::BatchData* batch_data = + static_cast<CallAttempt::BatchData*>(arg); + CallAttempt* call_attempt = batch_data->call_attempt_.get(); + CallData* calld = call_attempt->calld_; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", + calld->chand_, calld, grpc_error_std_string(error).c_str(), + grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str()); + } + // Update bookkeeping in call_attempt. + if (batch_data->batch_.send_initial_metadata) { + call_attempt->completed_send_initial_metadata_ = true; + } + if (batch_data->batch_.send_message) { + ++call_attempt->completed_send_message_count_; + } + if (batch_data->batch_.send_trailing_metadata) { + call_attempt->completed_send_trailing_metadata_ = true; + } + // If the call is committed, free cached data for send ops that we've just + // completed. + if (calld->retry_committed_) { + batch_data->FreeCachedSendOpDataForCompletedBatch(); + } + // Construct list of closures to execute. + CallCombinerClosureList closures; + // If a retry was already dispatched, that means we saw + // recv_trailing_metadata before this, so we do nothing here. + // Otherwise, invoke the callback to return the result to the surface. + if (!call_attempt->retry_dispatched_) { + // Add closure for the completed pending batch, if any. + batch_data->AddClosuresForCompletedPendingBatch(GRPC_ERROR_REF(error), + &closures); + // If needed, add a callback to start any replay or pending send ops on + // the LB call. + if (!call_attempt->completed_recv_trailing_metadata_) { + batch_data->AddClosuresForReplayOrPendingSendOps(&closures); + } + } + // Track number of in-flight send batches and determine if this was the + // last one. + --calld->num_in_flight_call_attempt_send_batches_; + const bool last_send_batch_complete = + calld->num_in_flight_call_attempt_send_batches_ == 0; + // Don't need batch_data anymore. + batch_data->Unref(); + // Schedule all of the closures identified above. + // Note: This yields the call combiner. + closures.RunClosures(calld->call_combiner_); + // If this was the last in-flight send batch, unref the call stack. + if (last_send_batch_complete) { + GRPC_CALL_STACK_UNREF(calld->owning_call_, "retriable_send_batches"); + } +} + +// +// retriable batch construction +// + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddRetriableSendInitialMetadataOp() { + auto* calld = call_attempt_->calld_; + // Maps the number of retries to the corresponding metadata value slice. + const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2, + &GRPC_MDSTR_3, &GRPC_MDSTR_4}; + // We need to make a copy of the metadata batch for each attempt, since + // the filters in the subchannel stack may modify this batch, and we don't + // want those modifications to be passed forward to subsequent attempts. + // + // If we've already completed one or more attempts, add the + // grpc-retry-attempts header. + call_attempt_->send_initial_metadata_storage_ = + static_cast<grpc_linked_mdelem*>( + calld->arena_->Alloc(sizeof(grpc_linked_mdelem) * + (calld->send_initial_metadata_.list.count + + (calld->num_attempts_completed_ > 0)))); + grpc_metadata_batch_copy(&calld->send_initial_metadata_, + &call_attempt_->send_initial_metadata_, + call_attempt_->send_initial_metadata_storage_); + if (GPR_UNLIKELY(call_attempt_->send_initial_metadata_.idx.named + .grpc_previous_rpc_attempts != nullptr)) { + grpc_metadata_batch_remove(&call_attempt_->send_initial_metadata_, + GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS); + } + if (GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) { + grpc_mdelem retry_md = grpc_mdelem_create( + GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, + *retry_count_strings[calld->num_attempts_completed_ - 1], nullptr); + grpc_error_handle error = grpc_metadata_batch_add_tail( + &call_attempt_->send_initial_metadata_, + &call_attempt_->send_initial_metadata_storage_ + [calld->send_initial_metadata_.list.count], + retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS); + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + gpr_log(GPR_ERROR, "error adding retry metadata: %s", + grpc_error_std_string(error).c_str()); + GPR_ASSERT(false); + } + } + call_attempt_->started_send_initial_metadata_ = true; + batch_.send_initial_metadata = true; + batch_.payload->send_initial_metadata.send_initial_metadata = + &call_attempt_->send_initial_metadata_; + batch_.payload->send_initial_metadata.send_initial_metadata_flags = + calld->send_initial_metadata_flags_; + batch_.payload->send_initial_metadata.peer_string = calld->peer_string_; +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddRetriableSendMessageOp() { + auto* calld = call_attempt_->calld_; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", + calld->chand_, calld, call_attempt_->started_send_message_count_); + } + ByteStreamCache* cache = + calld->send_messages_[call_attempt_->started_send_message_count_]; + ++call_attempt_->started_send_message_count_; + call_attempt_->send_message_.Init(cache); + batch_.send_message = true; + batch_.payload->send_message.send_message.reset( + call_attempt_->send_message_.get()); +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddRetriableSendTrailingMetadataOp() { + auto* calld = call_attempt_->calld_; + // We need to make a copy of the metadata batch for each attempt, since + // the filters in the subchannel stack may modify this batch, and we don't + // want those modifications to be passed forward to subsequent attempts. + call_attempt_->send_trailing_metadata_storage_ = + static_cast<grpc_linked_mdelem*>( + calld->arena_->Alloc(sizeof(grpc_linked_mdelem) * + calld->send_trailing_metadata_.list.count)); + grpc_metadata_batch_copy(&calld->send_trailing_metadata_, + &call_attempt_->send_trailing_metadata_, + call_attempt_->send_trailing_metadata_storage_); + call_attempt_->started_send_trailing_metadata_ = true; + batch_.send_trailing_metadata = true; + batch_.payload->send_trailing_metadata.send_trailing_metadata = + &call_attempt_->send_trailing_metadata_; +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddRetriableRecvInitialMetadataOp() { + call_attempt_->started_recv_initial_metadata_ = true; + batch_.recv_initial_metadata = true; + grpc_metadata_batch_init(&call_attempt_->recv_initial_metadata_); + batch_.payload->recv_initial_metadata.recv_initial_metadata = + &call_attempt_->recv_initial_metadata_; + batch_.payload->recv_initial_metadata.trailing_metadata_available = + &call_attempt_->trailing_metadata_available_; + GRPC_CLOSURE_INIT(&call_attempt_->recv_initial_metadata_ready_, + RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx); + batch_.payload->recv_initial_metadata.recv_initial_metadata_ready = + &call_attempt_->recv_initial_metadata_ready_; +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddRetriableRecvMessageOp() { + ++call_attempt_->started_recv_message_count_; + batch_.recv_message = true; + batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_; + GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this, + grpc_schedule_on_exec_ctx); + batch_.payload->recv_message.recv_message_ready = + &call_attempt_->recv_message_ready_; +} + +void RetryFilter::CallData::CallAttempt::BatchData:: + AddRetriableRecvTrailingMetadataOp() { + call_attempt_->started_recv_trailing_metadata_ = true; + batch_.recv_trailing_metadata = true; + grpc_metadata_batch_init(&call_attempt_->recv_trailing_metadata_); + batch_.payload->recv_trailing_metadata.recv_trailing_metadata = + &call_attempt_->recv_trailing_metadata_; + batch_.payload->recv_trailing_metadata.collect_stats = + &call_attempt_->collect_stats_; + GRPC_CLOSURE_INIT(&call_attempt_->recv_trailing_metadata_ready_, + RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx); + batch_.payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call_attempt_->recv_trailing_metadata_ready_; +} + +// +// CallData vtable functions +// + +grpc_error_handle RetryFilter::CallData::Init( + grpc_call_element* elem, const grpc_call_element_args* args) { + auto* chand = static_cast<RetryFilter*>(elem->channel_data); + new (elem->call_data) CallData(chand, *args); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p: created call=%p", chand, elem->call_data); + } + return GRPC_ERROR_NONE; +} + +void RetryFilter::CallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* then_schedule_closure) { + auto* calld = static_cast<CallData*>(elem->call_data); + // Save our ref to the CallStackDestructionBarrier until after our + // dtor is invoked. + RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier = + std::move(calld->call_stack_destruction_barrier_); + calld->~CallData(); + // Now set the callback in the CallStackDestructionBarrier object, + // right before we release our ref to it (implicitly upon returning). + // The callback will be invoked when the CallStackDestructionBarrier + // is destroyed. + call_stack_destruction_barrier->set_on_call_stack_destruction( + then_schedule_closure); +} + +void RetryFilter::CallData::StartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + auto* calld = static_cast<CallData*>(elem->call_data); + calld->StartTransportStreamOpBatch(batch); +} + +void RetryFilter::CallData::SetPollent(grpc_call_element* elem, + grpc_polling_entity* pollent) { + auto* calld = static_cast<CallData*>(elem->call_data); + calld->pollent_ = pollent; +} + +// +// CallData implementation +// + +const RetryMethodConfig* GetRetryPolicy( + const grpc_call_context_element* context) { + if (context == nullptr) return nullptr; + auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>( + context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); + if (svc_cfg_call_data == nullptr) return nullptr; + return static_cast<const RetryMethodConfig*>( + svc_cfg_call_data->GetMethodParsedConfig( + RetryServiceConfigParser::ParserIndex())); +} + +RetryFilter::CallData::CallData(RetryFilter* chand, + const grpc_call_element_args& args) + : chand_(chand), + retry_throttle_data_(chand->retry_throttle_data_), + retry_policy_(GetRetryPolicy(args.context)), + retry_backoff_( + BackOff::Options() + .set_initial_backoff(retry_policy_ == nullptr + ? 0 + : retry_policy_->initial_backoff()) + .set_multiplier(retry_policy_ == nullptr + ? 0 + : retry_policy_->backoff_multiplier()) + .set_jitter(RETRY_BACKOFF_JITTER) + .set_max_backoff( + retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())), + path_(grpc_slice_ref_internal(args.path)), + call_start_time_(args.start_time), + deadline_(args.deadline), + arena_(args.arena), + owning_call_(args.call_stack), + call_combiner_(args.call_combiner), + call_context_(args.context), + call_stack_destruction_barrier_( + arena_->New<CallStackDestructionBarrier>()), + pending_send_initial_metadata_(false), + pending_send_message_(false), + pending_send_trailing_metadata_(false), + retry_committed_(false), + last_attempt_got_server_pushback_(false) {} + +RetryFilter::CallData::~CallData() { + grpc_slice_unref_internal(path_); + // Make sure there are no remaining pending batches. + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + GPR_ASSERT(pending_batches_[i].batch == nullptr); + } +} + +void RetryFilter::CallData::StartTransportStreamOpBatch( + grpc_transport_stream_op_batch* batch) { + // If we have an LB call, delegate to the LB call. + if (committed_call_ != nullptr) { + // Note: This will release the call combiner. + committed_call_->StartTransportStreamOpBatch(batch); + return; + } + // Handle cancellation. + if (GPR_UNLIKELY(batch->cancel_stream)) { + grpc_error_handle cancel_error = batch->payload->cancel_stream.cancel_error; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_, + this, grpc_error_std_string(cancel_error).c_str()); + } + // If we have a current call attempt, commit the call, then send + // the cancellation down to that attempt. When the call fails, it + // will not be retried, because we have committed it here. + if (call_attempt_ != nullptr) { + RetryCommit(call_attempt_.get()); + // Note: This will release the call combiner. + call_attempt_->lb_call()->StartTransportStreamOpBatch(batch); + return; + } + // Fail pending batches. + PendingBatchesFail(GRPC_ERROR_REF(cancel_error)); + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(cancel_error), call_combiner_); + return; + } + // Add the batch to the pending list. + PendingBatch* pending = PendingBatchesAdd(batch); + if (call_attempt_ == nullptr) { + // If this is the first batch and retries are already committed + // (e.g., if this batch put the call above the buffer size limit), then + // immediately create an LB call and delegate the batch to it. This + // avoids the overhead of unnecessarily allocating a CallAttempt + // object or caching any of the send op data. + if (num_attempts_completed_ == 0 && retry_committed_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: retry committed before first attempt; " + "creating LB call", + chand_, this); + } + PendingBatchClear(pending); + committed_call_ = CreateLoadBalancedCall(); + committed_call_->StartTransportStreamOpBatch(batch); + return; + } + // We do not yet have a call attempt, so create one. + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_, + this); + } + CreateCallAttempt(); + return; + } + // Send batches to call attempt. + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: starting batch on attempt=%p lb_call=%p", + chand_, this, call_attempt_.get(), call_attempt_->lb_call()); + } + call_attempt_->StartRetriableBatches(); +} + +RefCountedPtr<ClientChannel::LoadBalancedCall> +RetryFilter::CallData::CreateLoadBalancedCall() { + grpc_call_element_args args = {owning_call_, nullptr, call_context_, + path_, call_start_time_, deadline_, + arena_, call_combiner_}; + return chand_->client_channel_->CreateLoadBalancedCall( + args, pollent_, + // This callback holds a ref to the CallStackDestructionBarrier + // object until the LB call is destroyed. + call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this)); +} + +void RetryFilter::CallData::CreateCallAttempt() { + call_attempt_.reset(arena_->New<CallAttempt>(this)); + call_attempt_->StartRetriableBatches(); + // TODO(roth): When implementing hedging, change this to start a timer + // for the next hedging attempt. +} + +namespace { + +void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>( + batch->handler_private.extra_arg); + // Note: This will release the call combiner. + lb_call->StartTransportStreamOpBatch(batch); +} + +} // namespace + +void RetryFilter::CallData::AddClosureForBatch( + grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) { + batch->handler_private.extra_arg = call_attempt_->lb_call(); + GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, + batch, grpc_schedule_on_exec_ctx); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on LB call: %s", + chand_, this, grpc_transport_stream_op_batch_string(batch).c_str()); + } + closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, + "start_batch_on_lb_call"); +} + +// +// send op data caching +// + +void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) { + if (pending->send_ops_cached) return; + pending->send_ops_cached = true; + grpc_transport_stream_op_batch* batch = pending->batch; + // Save a copy of metadata for send_initial_metadata ops. + if (batch->send_initial_metadata) { + seen_send_initial_metadata_ = true; + GPR_ASSERT(send_initial_metadata_storage_ == nullptr); + grpc_metadata_batch* send_initial_metadata = + batch->payload->send_initial_metadata.send_initial_metadata; + send_initial_metadata_storage_ = + static_cast<grpc_linked_mdelem*>(arena_->Alloc( + sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count)); + grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_, + send_initial_metadata_storage_); + send_initial_metadata_flags_ = + batch->payload->send_initial_metadata.send_initial_metadata_flags; + peer_string_ = batch->payload->send_initial_metadata.peer_string; + } + // Set up cache for send_message ops. + if (batch->send_message) { + ByteStreamCache* cache = arena_->New<ByteStreamCache>( + std::move(batch->payload->send_message.send_message)); + send_messages_.push_back(cache); + } + // Save metadata batch for send_trailing_metadata ops. + if (batch->send_trailing_metadata) { + seen_send_trailing_metadata_ = true; + GPR_ASSERT(send_trailing_metadata_storage_ == nullptr); + grpc_metadata_batch* send_trailing_metadata = + batch->payload->send_trailing_metadata.send_trailing_metadata; + send_trailing_metadata_storage_ = + static_cast<grpc_linked_mdelem*>(arena_->Alloc( + sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count)); + grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_, + send_trailing_metadata_storage_); + } +} + +void RetryFilter::CallData::FreeCachedSendInitialMetadata() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_initial_metadata", + chand_, this); + } + grpc_metadata_batch_destroy(&send_initial_metadata_); +} + +void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]", chand_, + this, idx); + } + send_messages_[idx]->Destroy(); +} + +void RetryFilter::CallData::FreeCachedSendTrailingMetadata() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand_=%p calld=%p: destroying send_trailing_metadata", + chand_, this); + } + grpc_metadata_batch_destroy(&send_trailing_metadata_); +} + +void RetryFilter::CallData::FreeAllCachedSendOpData() { + if (seen_send_initial_metadata_) { + FreeCachedSendInitialMetadata(); + } + for (size_t i = 0; i < send_messages_.size(); ++i) { + FreeCachedSendMessage(i); + } + if (seen_send_trailing_metadata_) { + FreeCachedSendTrailingMetadata(); + } +} + +// +// pending_batches management +// + +size_t RetryFilter::CallData::GetBatchIndex( + grpc_transport_stream_op_batch* batch) { + if (batch->send_initial_metadata) return 0; + if (batch->send_message) return 1; + if (batch->send_trailing_metadata) return 2; + if (batch->recv_initial_metadata) return 3; + if (batch->recv_message) return 4; + if (batch->recv_trailing_metadata) return 5; + GPR_UNREACHABLE_CODE(return (size_t)-1); +} + +// This is called via the call combiner, so access to calld is synchronized. +RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd( + grpc_transport_stream_op_batch* batch) { + const size_t idx = GetBatchIndex(batch); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand_=%p calld=%p: adding pending batch at index %" PRIuPTR, + chand_, this, idx); + } + PendingBatch* pending = &pending_batches_[idx]; + GPR_ASSERT(pending->batch == nullptr); + pending->batch = batch; + pending->send_ops_cached = false; + // Update state in calld about pending batches. + // Also check if the batch takes us over the retry buffer limit. + // Note: We don't check the size of trailing metadata here, because + // gRPC clients do not send trailing metadata. + if (batch->send_initial_metadata) { + pending_send_initial_metadata_ = true; + bytes_buffered_for_retry_ += grpc_metadata_batch_size( + batch->payload->send_initial_metadata.send_initial_metadata); + } + if (batch->send_message) { + pending_send_message_ = true; + bytes_buffered_for_retry_ += + batch->payload->send_message.send_message->length(); + } + if (batch->send_trailing_metadata) { + pending_send_trailing_metadata_ = true; + } + if (GPR_UNLIKELY(bytes_buffered_for_retry_ > + chand_->per_rpc_retry_buffer_size_)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: exceeded retry buffer size, committing", + chand_, this); + } + RetryCommit(call_attempt_.get()); + } + return pending; +} + +void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) { + if (pending->batch->send_initial_metadata) { + pending_send_initial_metadata_ = false; + } + if (pending->batch->send_message) { + pending_send_message_ = false; + } + if (pending->batch->send_trailing_metadata) { + pending_send_trailing_metadata_ = false; + } + pending->batch = nullptr; +} + +void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) { + grpc_transport_stream_op_batch* batch = pending->batch; + // We clear the pending batch if all of its callbacks have been + // scheduled and reset to nullptr. + if (batch->on_complete == nullptr && + (!batch->recv_initial_metadata || + batch->payload->recv_initial_metadata.recv_initial_metadata_ready == + nullptr) && + (!batch->recv_message || + batch->payload->recv_message.recv_message_ready == nullptr) && + (!batch->recv_trailing_metadata || + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == + nullptr)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand_, + this); + } + PendingBatchClear(pending); + } +} + +// This is called via the call combiner, so access to calld is synchronized. +void RetryFilter::CallData::FailPendingBatchInCallCombiner( + void* arg, grpc_error_handle error) { + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + CallData* call = static_cast<CallData*>(batch->handler_private.extra_arg); + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(error), call->call_combiner_); +} + +// This is called via the call combiner, so access to calld is synchronized. +void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) { + GPR_ASSERT(error != GRPC_ERROR_NONE); + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + if (pending_batches_[i].batch != nullptr) ++num_batches; + } + gpr_log(GPR_INFO, + "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", + chand_, this, num_batches, grpc_error_std_string(error).c_str()); + } + CallCombinerClosureList closures; + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + PendingBatch* pending = &pending_batches_[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch != nullptr) { + batch->handler_private.extra_arg = this; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + FailPendingBatchInCallCombiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), + "PendingBatchesFail"); + PendingBatchClear(pending); + } + } + closures.RunClosuresWithoutYielding(call_combiner_); + GRPC_ERROR_UNREF(error); +} + +template <typename Predicate> +RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind( + const char* log_message, Predicate predicate) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + PendingBatch* pending = &pending_batches_[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch != nullptr && predicate(batch)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, + chand_, this, log_message, i); + } + return pending; + } + } + return nullptr; +} + +// +// retry code +// + +void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) { + if (retry_committed_) return; + retry_committed_ = true; + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this); + } + if (call_attempt != nullptr) { + call_attempt->FreeCachedSendOpDataAfterCommit(); + } +} + +void RetryFilter::CallData::DoRetry(grpc_millis server_pushback_ms) { + // Reset call attempt. + call_attempt_.reset(); + // Compute backoff delay. + grpc_millis next_attempt_time; + if (server_pushback_ms >= 0) { + next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms; + last_attempt_got_server_pushback_ = true; + } else { + if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) { + last_attempt_got_server_pushback_ = false; + } + next_attempt_time = retry_backoff_.NextAttemptTime(); + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_, + this, next_attempt_time - ExecCtx::Get()->Now()); + } + // Schedule retry after computed delay. + GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr); + GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer"); + MutexLock lock(&timer_mu_); + canceller_ = new Canceller(this); + grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_); +} + +void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) { + auto* calld = static_cast<CallData*>(arg); + if (error == GRPC_ERROR_NONE) { + bool start_attempt = false; + { + MutexLock lock(&calld->timer_mu_); + if (calld->canceller_ != nullptr) { + calld->canceller_ = nullptr; + start_attempt = true; + } + } + if (start_attempt) calld->CreateCallAttempt(); + } + GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer"); +} + +} // namespace + +const grpc_channel_filter kRetryFilterVtable = { + RetryFilter::CallData::StartTransportStreamOpBatch, + RetryFilter::StartTransportOp, + sizeof(RetryFilter::CallData), + RetryFilter::CallData::Init, + RetryFilter::CallData::SetPollent, + RetryFilter::CallData::Destroy, + sizeof(RetryFilter), + RetryFilter::Init, + RetryFilter::Destroy, + RetryFilter::GetChannelInfo, + "retry_filter", +}; + +} // namespace grpc_core |