summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/client_channel/client_channel.h
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/client_channel.h')
-rw-r--r--grpc/src/core/ext/filters/client_channel/client_channel.h544
1 files changed, 488 insertions, 56 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/client_channel.h b/grpc/src/core/ext/filters/client_channel/client_channel.h
index 202de8ae..b7dd1a19 100644
--- a/grpc/src/core/ext/filters/client_channel/client_channel.h
+++ b/grpc/src/core/ext/filters/client_channel/client_channel.h
@@ -1,78 +1,510 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
+//
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
#include <grpc/support/port_platform.h>
-#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "absl/status/status.h"
+#include "absl/types/optional.h"
+
+#include <grpc/support/log.h>
+
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/config_selector.h"
+#include "src/core/ext/filters/client_channel/dynamic_filters.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/resolver.h"
-#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/service_config.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/iomgr/work_serializer.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+//
+// Client channel filter
+//
-extern grpc_core::TraceFlag grpc_client_channel_trace;
+// A client channel is a channel that begins disconnected, and can connect
+// to some endpoint on demand. If that endpoint disconnects, it will be
+// connected to again later.
+//
+// Calls on a disconnected client channel are queued until a connection is
+// established.
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
-/* A client channel is a channel that begins disconnected, and can connect
- to some endpoint on demand. If that endpoint disconnects, it will be
- connected to again later.
+// Channel arg containing a pointer to the ClientChannel object.
+#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel"
+
+// Channel arg containing a pointer to the ServiceConfig object.
+#define GRPC_ARG_SERVICE_CONFIG_OBJ "grpc.internal.service_config_obj"
+
+// Max number of batches that can be pending on a call at any given
+// time. This includes one batch for each of the following ops:
+// recv_initial_metadata
+// send_initial_metadata
+// recv_message
+// send_message
+// recv_trailing_metadata
+// send_trailing_metadata
+#define MAX_PENDING_BATCHES 6
+
+namespace grpc_core {
+
+class ClientChannel {
+ public:
+ static const grpc_channel_filter kFilterVtable;
+
+ class LoadBalancedCall;
+
+ // Returns the ClientChannel object from channel, or null if channel
+ // is not a client channel.
+ static ClientChannel* GetFromChannel(grpc_channel* channel);
+
+ grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
+
+ // Starts a one-time connectivity state watch. When the channel's state
+ // becomes different from *state, sets *state to the new state and
+ // schedules on_complete. The watcher_timer_init callback is invoked as
+ // soon as the watch is actually started (i.e., after hopping into the
+ // client channel combiner). I/O will be serviced via pollent.
+ //
+ // This is intended to be used when starting a watch from outside of C-core
+ // via grpc_channel_watch_connectivity_state(). It should not be used
+ // by other callers.
+ void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
+ grpc_connectivity_state* state,
+ grpc_closure* on_complete,
+ grpc_closure* watcher_timer_init) {
+ new ExternalConnectivityWatcher(this, pollent, state, on_complete,
+ watcher_timer_init);
+ }
+
+ // Cancels a pending external watcher previously added by
+ // AddExternalConnectivityWatcher().
+ void CancelExternalConnectivityWatcher(grpc_closure* on_complete) {
+ ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
+ this, on_complete, /*cancel=*/true);
+ }
+
+ int NumExternalConnectivityWatchers() const {
+ MutexLock lock(&external_watchers_mu_);
+ return static_cast<int>(external_watchers_.size());
+ }
+
+ // Starts and stops a connectivity watch. The watcher will be initially
+ // notified as soon as the state changes from initial_state and then on
+ // every subsequent state change until either the watch is stopped or
+ // it is notified that the state has changed to SHUTDOWN.
+ //
+ // This is intended to be used when starting watches from code inside of
+ // C-core (e.g., for a nested control plane channel for things like xds).
+ void AddConnectivityWatcher(
+ grpc_connectivity_state initial_state,
+ OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
+ void RemoveConnectivityWatcher(
+ AsyncConnectivityStateWatcherInterface* watcher);
+
+ RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall(
+ const grpc_call_element_args& args, grpc_polling_entity* pollent,
+ grpc_closure* on_call_destruction_complete);
+
+ private:
+ class CallData;
+ class ResolverResultHandler;
+ class SubchannelWrapper;
+ class ClientChannelControlHelper;
+ class ConnectivityWatcherAdder;
+ class ConnectivityWatcherRemover;
+
+ // Represents a pending connectivity callback from an external caller
+ // via grpc_client_channel_watch_connectivity_state().
+ class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
+ public:
+ ExternalConnectivityWatcher(ClientChannel* chand,
+ grpc_polling_entity pollent,
+ grpc_connectivity_state* state,
+ grpc_closure* on_complete,
+ grpc_closure* watcher_timer_init);
+
+ ~ExternalConnectivityWatcher() override;
+
+ // Removes the watcher from the external_watchers_ map.
+ static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
+ grpc_closure* on_complete,
+ bool cancel);
+
+ void Notify(grpc_connectivity_state state,
+ const absl::Status& /* status */) override;
+
+ void Cancel();
+
+ private:
+ // Adds the watcher to state_tracker_. Consumes the ref that is passed to it
+ // from Start().
+ void AddWatcherLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_);
+ void RemoveWatcherLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_);
+
+ ClientChannel* chand_;
+ grpc_polling_entity pollent_;
+ grpc_connectivity_state initial_state_;
+ grpc_connectivity_state* state_;
+ grpc_closure* on_complete_;
+ grpc_closure* watcher_timer_init_;
+ Atomic<bool> done_{false};
+ };
+
+ struct ResolverQueuedCall {
+ grpc_call_element* elem;
+ ResolverQueuedCall* next = nullptr;
+ };
+ struct LbQueuedCall {
+ LoadBalancedCall* lb_call;
+ LbQueuedCall* next = nullptr;
+ };
+
+ ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error);
+ ~ClientChannel();
+
+ // Filter vtable functions.
+ static grpc_error_handle Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args);
+ static void Destroy(grpc_channel_element* elem);
+ static void StartTransportOp(grpc_channel_element* elem,
+ grpc_transport_op* op);
+ static void GetChannelInfo(grpc_channel_element* elem,
+ const grpc_channel_info* info);
- Calls on a disconnected client channel are queued until a connection is
- established. */
+ // Note: Does NOT return a new ref.
+ grpc_error_handle disconnect_error() const {
+ return disconnect_error_.Load(MemoryOrder::ACQUIRE);
+ }
-extern const grpc_channel_filter grpc_client_channel_filter;
+ // Note: All methods with "Locked" suffix must be invoked from within
+ // work_serializer_.
-grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element* elem, int try_to_connect);
+ void OnResolverResultChangedLocked(Resolver::Result result)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+ void OnResolverErrorLocked(grpc_error_handle error)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
-int grpc_client_channel_num_external_connectivity_watchers(
- grpc_channel_element* elem);
+ void CreateOrUpdateLbPolicyLocked(
+ RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
+ Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+ OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
+ const grpc_channel_args& args)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ void UpdateStateAndPickerLocked(
+ grpc_connectivity_state state, const absl::Status& status,
+ const char* reason,
+ std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ void UpdateServiceConfigInControlPlaneLocked(
+ RefCountedPtr<ServiceConfig> service_config,
+ RefCountedPtr<ConfigSelector> config_selector,
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+ const char* lb_policy_name)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ void UpdateServiceConfigInDataPlaneLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+ void DestroyResolverAndLbPolicyLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ grpc_error_handle DoPingLocked(grpc_transport_op* op)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ void StartTransportOpLocked(grpc_transport_op* op)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
+
+ // These methods all require holding resolution_mu_.
+ void AddResolverQueuedCall(ResolverQueuedCall* call,
+ grpc_polling_entity* pollent)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
+ void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
+ grpc_polling_entity* pollent)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
+
+ // These methods all require holding data_plane_mu_.
+ void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
+ void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
+ RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
+ SubchannelInterface* subchannel) const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
+
+ //
+ // Fields set at construction and never modified.
+ //
+ const bool deadline_checking_enabled_;
+ const bool enable_retries_;
+ grpc_channel_stack* owning_stack_;
+ ClientChannelFactory* client_channel_factory_;
+ const grpc_channel_args* channel_args_;
+ RefCountedPtr<ServiceConfig> default_service_config_;
+ std::string server_name_;
+ UniquePtr<char> target_uri_;
+ channelz::ChannelNode* channelz_node_;
+ grpc_pollset_set* interested_parties_;
+
+ //
+ // Fields related to name resolution. Guarded by resolution_mu_.
+ //
+ mutable Mutex resolution_mu_;
+ // Linked list of calls queued waiting for resolver result.
+ ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) =
+ nullptr;
+ // Data from service config.
+ grpc_error_handle resolver_transient_failure_error_
+ ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE;
+ bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false;
+ RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_);
+ RefCountedPtr<ConfigSelector> config_selector_
+ ABSL_GUARDED_BY(resolution_mu_);
+ RefCountedPtr<DynamicFilters> dynamic_filters_
+ ABSL_GUARDED_BY(resolution_mu_);
+
+ //
+ // Fields used in the data plane. Guarded by data_plane_mu_.
+ //
+ mutable Mutex data_plane_mu_;
+ std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
+ ABSL_GUARDED_BY(data_plane_mu_);
+ // Linked list of calls queued waiting for LB pick.
+ LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr;
+
+ //
+ // Fields used in the control plane. Guarded by work_serializer.
+ //
+ std::shared_ptr<WorkSerializer> work_serializer_;
+ ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(work_serializer_);
+ OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(work_serializer_);
+ bool previous_resolution_contained_addresses_
+ ABSL_GUARDED_BY(work_serializer_) = false;
+ RefCountedPtr<ServiceConfig> saved_service_config_
+ ABSL_GUARDED_BY(work_serializer_);
+ RefCountedPtr<ConfigSelector> saved_config_selector_
+ ABSL_GUARDED_BY(work_serializer_);
+ absl::optional<std::string> health_check_service_name_
+ ABSL_GUARDED_BY(work_serializer_);
+ OrphanablePtr<LoadBalancingPolicy> lb_policy_
+ ABSL_GUARDED_BY(work_serializer_);
+ RefCountedPtr<SubchannelPoolInterface> subchannel_pool_
+ ABSL_GUARDED_BY(work_serializer_);
+ // The number of SubchannelWrapper instances referencing a given Subchannel.
+ std::map<Subchannel*, int> subchannel_refcount_map_
+ ABSL_GUARDED_BY(work_serializer_);
+ // The set of SubchannelWrappers that currently exist.
+ // No need to hold a ref, since the map is updated in the control-plane
+ // work_serializer when the SubchannelWrappers are created and destroyed.
+ std::set<SubchannelWrapper*> subchannel_wrappers_
+ ABSL_GUARDED_BY(work_serializer_);
+ // Pending ConnectedSubchannel updates for each SubchannelWrapper.
+ // Updates are queued here in the control plane work_serializer and then
+ // applied in the data plane mutex when the picker is updated.
+ std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
+ pending_subchannel_updates_ ABSL_GUARDED_BY(work_serializer_);
+ int keepalive_time_ ABSL_GUARDED_BY(work_serializer_) = -1;
+
+ //
+ // Fields accessed from both data plane mutex and control plane
+ // work_serializer.
+ //
+ Atomic<grpc_error_handle> disconnect_error_;
+
+ //
+ // Fields guarded by a mutex, since they need to be accessed
+ // synchronously via get_channel_info().
+ //
+ Mutex info_mu_;
+ UniquePtr<char> info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_);
+ UniquePtr<char> info_service_config_json_ ABSL_GUARDED_BY(info_mu_);
+
+ //
+ // Fields guarded by a mutex, since they need to be accessed
+ // synchronously via grpc_channel_num_external_connectivity_watchers().
+ //
+ mutable Mutex external_watchers_mu_;
+ std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
+ external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_);
+};
-// Starts a one-time connectivity state watch. When the channel's state
-// becomes different from *state, sets *state to the new state and
-// schedules on_complete. The watcher_timer_init callback is invoked as
-// soon as the watch is actually started (i.e., after hopping into the
-// client channel combiner). I/O will be serviced via pollent.
//
-// This is intended to be used when starting a watch from outside of C-core
-// via grpc_channel_watch_connectivity_state(). It should not be used
-// by other callers.
-void grpc_client_channel_watch_connectivity_state(
- grpc_channel_element* elem, grpc_polling_entity pollent,
- grpc_connectivity_state* state, grpc_closure* on_complete,
- grpc_closure* watcher_timer_init);
-
-// Starts and stops a connectivity watch. The watcher will be initially
-// notified as soon as the state changes from initial_state and then on
-// every subsequent state change until either the watch is stopped or
-// it is notified that the state has changed to SHUTDOWN.
+// ClientChannel::LoadBalancedCall
//
-// This is intended to be used when starting watches from code inside of
-// C-core (e.g., for a nested control plane channel for things like xds).
-void grpc_client_channel_start_connectivity_watch(
- grpc_channel_element* elem, grpc_connectivity_state initial_state,
- grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
- watcher);
-void grpc_client_channel_stop_connectivity_watch(
- grpc_channel_element* elem,
- grpc_core::AsyncConnectivityStateWatcherInterface* watcher);
+
+// This object is ref-counted, but it cannot inherit from RefCounted<>,
+// because it is allocated on the arena and can't free its memory when
+// its refcount goes to zero. So instead, it manually implements the
+// same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
+class ClientChannel::LoadBalancedCall
+ : public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> {
+ public:
+ // If on_call_destruction_complete is non-null, then it will be
+ // invoked once the LoadBalancedCall is completely destroyed.
+ // If it is null, then the caller is responsible for checking whether
+ // the LB call has a subchannel call and ensuring that the
+ // on_call_destruction_complete closure passed down from the surface
+ // is not invoked until after the subchannel call stack is destroyed.
+ LoadBalancedCall(ClientChannel* chand, const grpc_call_element_args& args,
+ grpc_polling_entity* pollent,
+ grpc_closure* on_call_destruction_complete);
+ ~LoadBalancedCall() override;
+
+ void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
+
+ // Invoked by channel for queued LB picks when the picker is updated.
+ static void PickSubchannel(void* arg, grpc_error_handle error);
+ // Helper function for performing an LB pick while holding the data plane
+ // mutex. Returns true if the pick is complete, in which case the caller
+ // must invoke PickDone() or AsyncPickDone() with the returned error.
+ bool PickSubchannelLocked(grpc_error_handle* error)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
+ // Schedules a callback to process the completed pick. The callback
+ // will not run until after this method returns.
+ void AsyncPickDone(grpc_error_handle error);
+
+ RefCountedPtr<SubchannelCall> subchannel_call() const {
+ return subchannel_call_;
+ }
+
+ private:
+ class LbQueuedCallCanceller;
+ class Metadata;
+ class LbCallState;
+
+ // Returns the index into pending_batches_ to be used for batch.
+ static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
+ void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
+ static void FailPendingBatchInCallCombiner(void* arg,
+ grpc_error_handle error);
+ // A predicate type and some useful implementations for PendingBatchesFail().
+ typedef bool (*YieldCallCombinerPredicate)(
+ const CallCombinerClosureList& closures);
+ static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
+ return true;
+ }
+ static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
+ return false;
+ }
+ static bool YieldCallCombinerIfPendingBatchesFound(
+ const CallCombinerClosureList& closures) {
+ return closures.size() > 0;
+ }
+ // Fails all pending batches.
+ // If yield_call_combiner_predicate returns true, assumes responsibility for
+ // yielding the call combiner.
+ void PendingBatchesFail(
+ grpc_error_handle error,
+ YieldCallCombinerPredicate yield_call_combiner_predicate);
+ static void ResumePendingBatchInCallCombiner(void* arg,
+ grpc_error_handle ignored);
+ // Resumes all pending batches on subchannel_call_.
+ void PendingBatchesResume();
+
+ static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
+ void* arg, grpc_error_handle error);
+ void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
+ grpc_transport_stream_op_batch* batch);
+
+ void CreateSubchannelCall();
+ // Invoked when a pick is completed, on both success or failure.
+ static void PickDone(void* arg, grpc_error_handle error);
+ // Removes the call from the channel's list of queued picks if present.
+ void MaybeRemoveCallFromLbQueuedCallsLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
+ // Adds the call to the channel's list of queued picks if not already present.
+ void MaybeAddCallToLbQueuedCallsLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
+
+ ClientChannel* chand_;
+
+ // TODO(roth): Instead of duplicating these fields in every filter
+ // that uses any one of them, we should store them in the call
+ // context. This will save per-call memory overhead.
+ grpc_slice path_; // Request path.
+ gpr_cycle_counter call_start_time_;
+ grpc_millis deadline_;
+ Arena* arena_;
+ grpc_call_stack* owning_call_;
+ CallCombiner* call_combiner_;
+ grpc_call_context_element* call_context_;
+ grpc_polling_entity* pollent_;
+ grpc_closure* on_call_destruction_complete_;
+
+ // Set when we get a cancel_stream op.
+ grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
+
+ // Set when we fail inside the LB call.
+ grpc_error_handle failure_error_ = GRPC_ERROR_NONE;
+
+ grpc_closure pick_closure_;
+
+ // Accessed while holding ClientChannel::data_plane_mu_.
+ ClientChannel::LbQueuedCall queued_call_
+ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
+ bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
+ false;
+ LbQueuedCallCanceller* lb_call_canceller_
+ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr;
+
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+ const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
+ std::function<void(grpc_error_handle, LoadBalancingPolicy::MetadataInterface*,
+ LoadBalancingPolicy::CallState*)>
+ lb_recv_trailing_metadata_ready_;
+
+ RefCountedPtr<SubchannelCall> subchannel_call_;
+
+ // For intercepting recv_trailing_metadata_ready for the LB policy.
+ grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
+ grpc_closure recv_trailing_metadata_ready_;
+ grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
+
+ // Batches are added to this list when received from above.
+ // They are removed when we are done handling the batch (i.e., when
+ // either we have invoked all of the batch's callbacks or we have
+ // passed the batch down to the subchannel call and are not
+ // intercepting any of its callbacks).
+ grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
+};
+
+} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H