diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/client_channel.h')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/client_channel.h | 544 |
1 files changed, 488 insertions, 56 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/client_channel.h b/grpc/src/core/ext/filters/client_channel/client_channel.h index 202de8ae..b7dd1a19 100644 --- a/grpc/src/core/ext/filters/client_channel/client_channel.h +++ b/grpc/src/core/ext/filters/client_channel/client_channel.h @@ -1,78 +1,510 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include <map> +#include <memory> +#include <set> +#include <string> + +#include "absl/status/status.h" +#include "absl/types/optional.h" + +#include <grpc/support/log.h> + #include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/config_selector.h" +#include "src/core/ext/filters/client_channel/dynamic_filters.h" +#include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/resolver.h" -#include "src/core/lib/channel/channel_stack.h" +#include "src/core/ext/filters/client_channel/resolver_result_parsing.h" +#include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/ext/filters/client_channel/service_config.h" +#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/iomgr/work_serializer.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/connectivity_state.h" + +// +// Client channel filter +// -extern grpc_core::TraceFlag grpc_client_channel_trace; +// A client channel is a channel that begins disconnected, and can connect +// to some endpoint on demand. If that endpoint disconnects, it will be +// connected to again later. +// +// Calls on a disconnected client channel are queued until a connection is +// established. // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" -/* A client channel is a channel that begins disconnected, and can connect - to some endpoint on demand. If that endpoint disconnects, it will be - connected to again later. +// Channel arg containing a pointer to the ClientChannel object. +#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel" + +// Channel arg containing a pointer to the ServiceConfig object. +#define GRPC_ARG_SERVICE_CONFIG_OBJ "grpc.internal.service_config_obj" + +// Max number of batches that can be pending on a call at any given +// time. This includes one batch for each of the following ops: +// recv_initial_metadata +// send_initial_metadata +// recv_message +// send_message +// recv_trailing_metadata +// send_trailing_metadata +#define MAX_PENDING_BATCHES 6 + +namespace grpc_core { + +class ClientChannel { + public: + static const grpc_channel_filter kFilterVtable; + + class LoadBalancedCall; + + // Returns the ClientChannel object from channel, or null if channel + // is not a client channel. + static ClientChannel* GetFromChannel(grpc_channel* channel); + + grpc_connectivity_state CheckConnectivityState(bool try_to_connect); + + // Starts a one-time connectivity state watch. When the channel's state + // becomes different from *state, sets *state to the new state and + // schedules on_complete. The watcher_timer_init callback is invoked as + // soon as the watch is actually started (i.e., after hopping into the + // client channel combiner). I/O will be serviced via pollent. + // + // This is intended to be used when starting a watch from outside of C-core + // via grpc_channel_watch_connectivity_state(). It should not be used + // by other callers. + void AddExternalConnectivityWatcher(grpc_polling_entity pollent, + grpc_connectivity_state* state, + grpc_closure* on_complete, + grpc_closure* watcher_timer_init) { + new ExternalConnectivityWatcher(this, pollent, state, on_complete, + watcher_timer_init); + } + + // Cancels a pending external watcher previously added by + // AddExternalConnectivityWatcher(). + void CancelExternalConnectivityWatcher(grpc_closure* on_complete) { + ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( + this, on_complete, /*cancel=*/true); + } + + int NumExternalConnectivityWatchers() const { + MutexLock lock(&external_watchers_mu_); + return static_cast<int>(external_watchers_.size()); + } + + // Starts and stops a connectivity watch. The watcher will be initially + // notified as soon as the state changes from initial_state and then on + // every subsequent state change until either the watch is stopped or + // it is notified that the state has changed to SHUTDOWN. + // + // This is intended to be used when starting watches from code inside of + // C-core (e.g., for a nested control plane channel for things like xds). + void AddConnectivityWatcher( + grpc_connectivity_state initial_state, + OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher); + void RemoveConnectivityWatcher( + AsyncConnectivityStateWatcherInterface* watcher); + + RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall( + const grpc_call_element_args& args, grpc_polling_entity* pollent, + grpc_closure* on_call_destruction_complete); + + private: + class CallData; + class ResolverResultHandler; + class SubchannelWrapper; + class ClientChannelControlHelper; + class ConnectivityWatcherAdder; + class ConnectivityWatcherRemover; + + // Represents a pending connectivity callback from an external caller + // via grpc_client_channel_watch_connectivity_state(). + class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { + public: + ExternalConnectivityWatcher(ClientChannel* chand, + grpc_polling_entity pollent, + grpc_connectivity_state* state, + grpc_closure* on_complete, + grpc_closure* watcher_timer_init); + + ~ExternalConnectivityWatcher() override; + + // Removes the watcher from the external_watchers_ map. + static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand, + grpc_closure* on_complete, + bool cancel); + + void Notify(grpc_connectivity_state state, + const absl::Status& /* status */) override; + + void Cancel(); + + private: + // Adds the watcher to state_tracker_. Consumes the ref that is passed to it + // from Start(). + void AddWatcherLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_); + void RemoveWatcherLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_); + + ClientChannel* chand_; + grpc_polling_entity pollent_; + grpc_connectivity_state initial_state_; + grpc_connectivity_state* state_; + grpc_closure* on_complete_; + grpc_closure* watcher_timer_init_; + Atomic<bool> done_{false}; + }; + + struct ResolverQueuedCall { + grpc_call_element* elem; + ResolverQueuedCall* next = nullptr; + }; + struct LbQueuedCall { + LoadBalancedCall* lb_call; + LbQueuedCall* next = nullptr; + }; + + ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); + ~ClientChannel(); + + // Filter vtable functions. + static grpc_error_handle Init(grpc_channel_element* elem, + grpc_channel_element_args* args); + static void Destroy(grpc_channel_element* elem); + static void StartTransportOp(grpc_channel_element* elem, + grpc_transport_op* op); + static void GetChannelInfo(grpc_channel_element* elem, + const grpc_channel_info* info); - Calls on a disconnected client channel are queued until a connection is - established. */ + // Note: Does NOT return a new ref. + grpc_error_handle disconnect_error() const { + return disconnect_error_.Load(MemoryOrder::ACQUIRE); + } -extern const grpc_channel_filter grpc_client_channel_filter; + // Note: All methods with "Locked" suffix must be invoked from within + // work_serializer_. -grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element* elem, int try_to_connect); + void OnResolverResultChangedLocked(Resolver::Result result) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + void OnResolverErrorLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); -int grpc_client_channel_num_external_connectivity_watchers( - grpc_channel_element* elem); + void CreateOrUpdateLbPolicyLocked( + RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, + Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked( + const grpc_channel_args& args) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + void UpdateStateAndPickerLocked( + grpc_connectivity_state state, const absl::Status& status, + const char* reason, + std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + void UpdateServiceConfigInControlPlaneLocked( + RefCountedPtr<ServiceConfig> service_config, + RefCountedPtr<ConfigSelector> config_selector, + const internal::ClientChannelGlobalParsedConfig* parsed_service_config, + const char* lb_policy_name) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + void UpdateServiceConfigInDataPlaneLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + void DestroyResolverAndLbPolicyLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + grpc_error_handle DoPingLocked(grpc_transport_op* op) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + void StartTransportOpLocked(grpc_transport_op* op) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); + + // These methods all require holding resolution_mu_. + void AddResolverQueuedCall(ResolverQueuedCall* call, + grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); + void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, + grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); + + // These methods all require holding data_plane_mu_. + void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); + void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); + RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane( + SubchannelInterface* subchannel) const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); + + // + // Fields set at construction and never modified. + // + const bool deadline_checking_enabled_; + const bool enable_retries_; + grpc_channel_stack* owning_stack_; + ClientChannelFactory* client_channel_factory_; + const grpc_channel_args* channel_args_; + RefCountedPtr<ServiceConfig> default_service_config_; + std::string server_name_; + UniquePtr<char> target_uri_; + channelz::ChannelNode* channelz_node_; + grpc_pollset_set* interested_parties_; + + // + // Fields related to name resolution. Guarded by resolution_mu_. + // + mutable Mutex resolution_mu_; + // Linked list of calls queued waiting for resolver result. + ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = + nullptr; + // Data from service config. + grpc_error_handle resolver_transient_failure_error_ + ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE; + bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false; + RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_); + RefCountedPtr<ConfigSelector> config_selector_ + ABSL_GUARDED_BY(resolution_mu_); + RefCountedPtr<DynamicFilters> dynamic_filters_ + ABSL_GUARDED_BY(resolution_mu_); + + // + // Fields used in the data plane. Guarded by data_plane_mu_. + // + mutable Mutex data_plane_mu_; + std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_ + ABSL_GUARDED_BY(data_plane_mu_); + // Linked list of calls queued waiting for LB pick. + LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr; + + // + // Fields used in the control plane. Guarded by work_serializer. + // + std::shared_ptr<WorkSerializer> work_serializer_; + ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(work_serializer_); + OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(work_serializer_); + bool previous_resolution_contained_addresses_ + ABSL_GUARDED_BY(work_serializer_) = false; + RefCountedPtr<ServiceConfig> saved_service_config_ + ABSL_GUARDED_BY(work_serializer_); + RefCountedPtr<ConfigSelector> saved_config_selector_ + ABSL_GUARDED_BY(work_serializer_); + absl::optional<std::string> health_check_service_name_ + ABSL_GUARDED_BY(work_serializer_); + OrphanablePtr<LoadBalancingPolicy> lb_policy_ + ABSL_GUARDED_BY(work_serializer_); + RefCountedPtr<SubchannelPoolInterface> subchannel_pool_ + ABSL_GUARDED_BY(work_serializer_); + // The number of SubchannelWrapper instances referencing a given Subchannel. + std::map<Subchannel*, int> subchannel_refcount_map_ + ABSL_GUARDED_BY(work_serializer_); + // The set of SubchannelWrappers that currently exist. + // No need to hold a ref, since the map is updated in the control-plane + // work_serializer when the SubchannelWrappers are created and destroyed. + std::set<SubchannelWrapper*> subchannel_wrappers_ + ABSL_GUARDED_BY(work_serializer_); + // Pending ConnectedSubchannel updates for each SubchannelWrapper. + // Updates are queued here in the control plane work_serializer and then + // applied in the data plane mutex when the picker is updated. + std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>> + pending_subchannel_updates_ ABSL_GUARDED_BY(work_serializer_); + int keepalive_time_ ABSL_GUARDED_BY(work_serializer_) = -1; + + // + // Fields accessed from both data plane mutex and control plane + // work_serializer. + // + Atomic<grpc_error_handle> disconnect_error_; + + // + // Fields guarded by a mutex, since they need to be accessed + // synchronously via get_channel_info(). + // + Mutex info_mu_; + UniquePtr<char> info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); + UniquePtr<char> info_service_config_json_ ABSL_GUARDED_BY(info_mu_); + + // + // Fields guarded by a mutex, since they need to be accessed + // synchronously via grpc_channel_num_external_connectivity_watchers(). + // + mutable Mutex external_watchers_mu_; + std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>> + external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_); +}; -// Starts a one-time connectivity state watch. When the channel's state -// becomes different from *state, sets *state to the new state and -// schedules on_complete. The watcher_timer_init callback is invoked as -// soon as the watch is actually started (i.e., after hopping into the -// client channel combiner). I/O will be serviced via pollent. // -// This is intended to be used when starting a watch from outside of C-core -// via grpc_channel_watch_connectivity_state(). It should not be used -// by other callers. -void grpc_client_channel_watch_connectivity_state( - grpc_channel_element* elem, grpc_polling_entity pollent, - grpc_connectivity_state* state, grpc_closure* on_complete, - grpc_closure* watcher_timer_init); - -// Starts and stops a connectivity watch. The watcher will be initially -// notified as soon as the state changes from initial_state and then on -// every subsequent state change until either the watch is stopped or -// it is notified that the state has changed to SHUTDOWN. +// ClientChannel::LoadBalancedCall // -// This is intended to be used when starting watches from code inside of -// C-core (e.g., for a nested control plane channel for things like xds). -void grpc_client_channel_start_connectivity_watch( - grpc_channel_element* elem, grpc_connectivity_state initial_state, - grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface> - watcher); -void grpc_client_channel_stop_connectivity_watch( - grpc_channel_element* elem, - grpc_core::AsyncConnectivityStateWatcherInterface* watcher); + +// This object is ref-counted, but it cannot inherit from RefCounted<>, +// because it is allocated on the arena and can't free its memory when +// its refcount goes to zero. So instead, it manually implements the +// same API as RefCounted<>, so that it can be used with RefCountedPtr<>. +class ClientChannel::LoadBalancedCall + : public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> { + public: + // If on_call_destruction_complete is non-null, then it will be + // invoked once the LoadBalancedCall is completely destroyed. + // If it is null, then the caller is responsible for checking whether + // the LB call has a subchannel call and ensuring that the + // on_call_destruction_complete closure passed down from the surface + // is not invoked until after the subchannel call stack is destroyed. + LoadBalancedCall(ClientChannel* chand, const grpc_call_element_args& args, + grpc_polling_entity* pollent, + grpc_closure* on_call_destruction_complete); + ~LoadBalancedCall() override; + + void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); + + // Invoked by channel for queued LB picks when the picker is updated. + static void PickSubchannel(void* arg, grpc_error_handle error); + // Helper function for performing an LB pick while holding the data plane + // mutex. Returns true if the pick is complete, in which case the caller + // must invoke PickDone() or AsyncPickDone() with the returned error. + bool PickSubchannelLocked(grpc_error_handle* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); + // Schedules a callback to process the completed pick. The callback + // will not run until after this method returns. + void AsyncPickDone(grpc_error_handle error); + + RefCountedPtr<SubchannelCall> subchannel_call() const { + return subchannel_call_; + } + + private: + class LbQueuedCallCanceller; + class Metadata; + class LbCallState; + + // Returns the index into pending_batches_ to be used for batch. + static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); + void PendingBatchesAdd(grpc_transport_stream_op_batch* batch); + static void FailPendingBatchInCallCombiner(void* arg, + grpc_error_handle error); + // A predicate type and some useful implementations for PendingBatchesFail(). + typedef bool (*YieldCallCombinerPredicate)( + const CallCombinerClosureList& closures); + static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) { + return true; + } + static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) { + return false; + } + static bool YieldCallCombinerIfPendingBatchesFound( + const CallCombinerClosureList& closures) { + return closures.size() > 0; + } + // Fails all pending batches. + // If yield_call_combiner_predicate returns true, assumes responsibility for + // yielding the call combiner. + void PendingBatchesFail( + grpc_error_handle error, + YieldCallCombinerPredicate yield_call_combiner_predicate); + static void ResumePendingBatchInCallCombiner(void* arg, + grpc_error_handle ignored); + // Resumes all pending batches on subchannel_call_. + void PendingBatchesResume(); + + static void RecvTrailingMetadataReadyForLoadBalancingPolicy( + void* arg, grpc_error_handle error); + void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( + grpc_transport_stream_op_batch* batch); + + void CreateSubchannelCall(); + // Invoked when a pick is completed, on both success or failure. + static void PickDone(void* arg, grpc_error_handle error); + // Removes the call from the channel's list of queued picks if present. + void MaybeRemoveCallFromLbQueuedCallsLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); + // Adds the call to the channel's list of queued picks if not already present. + void MaybeAddCallToLbQueuedCallsLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); + + ClientChannel* chand_; + + // TODO(roth): Instead of duplicating these fields in every filter + // that uses any one of them, we should store them in the call + // context. This will save per-call memory overhead. + grpc_slice path_; // Request path. + gpr_cycle_counter call_start_time_; + grpc_millis deadline_; + Arena* arena_; + grpc_call_stack* owning_call_; + CallCombiner* call_combiner_; + grpc_call_context_element* call_context_; + grpc_polling_entity* pollent_; + grpc_closure* on_call_destruction_complete_; + + // Set when we get a cancel_stream op. + grpc_error_handle cancel_error_ = GRPC_ERROR_NONE; + + // Set when we fail inside the LB call. + grpc_error_handle failure_error_ = GRPC_ERROR_NONE; + + grpc_closure pick_closure_; + + // Accessed while holding ClientChannel::data_plane_mu_. + ClientChannel::LbQueuedCall queued_call_ + ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_); + bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = + false; + LbQueuedCallCanceller* lb_call_canceller_ + ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr; + + RefCountedPtr<ConnectedSubchannel> connected_subchannel_; + const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr; + std::function<void(grpc_error_handle, LoadBalancingPolicy::MetadataInterface*, + LoadBalancingPolicy::CallState*)> + lb_recv_trailing_metadata_ready_; + + RefCountedPtr<SubchannelCall> subchannel_call_; + + // For intercepting recv_trailing_metadata_ready for the LB policy. + grpc_metadata_batch* recv_trailing_metadata_ = nullptr; + grpc_closure recv_trailing_metadata_ready_; + grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; + + // Batches are added to this list when received from above. + // They are removed when we are done handling the batch (i.e., when + // either we have invoked all of the batch's callbacks or we have + // passed the batch down to the subchannel call and are not + // intercepting any of its callbacks). + grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; +}; + +} // namespace grpc_core #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H |