diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/subchannel.h')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/subchannel.h | 167 |
1 files changed, 68 insertions, 99 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/subchannel.h b/grpc/src/core/ext/filters/client_channel/subchannel.h index 952a02a5..9667fe7f 100644 --- a/grpc/src/core/ext/filters/client_channel/subchannel.h +++ b/grpc/src/core/ext/filters/client_channel/subchannel.h @@ -30,6 +30,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gprpp/arena.h" +#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -41,31 +42,6 @@ // Channel arg containing a URI indicating the address to connect to. #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address" -// For debugging refcounting. -#ifndef NDEBUG -#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef() -#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \ - const char *file, int line, const char *reason -#define GRPC_SUBCHANNEL_REF_REASON reason -#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \ - , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose -#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x -#else -#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref() -#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef() -#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref() -#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef() -#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref() -#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS -#define GRPC_SUBCHANNEL_REF_REASON "" -#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS -#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) -#endif - namespace grpc_core { class SubchannelCall; @@ -111,7 +87,8 @@ class SubchannelCall { grpc_call_context_element* context; CallCombiner* call_combiner; }; - static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error); + static RefCountedPtr<SubchannelCall> Create(Args args, + grpc_error_handle* error); // Continues processing a transport stream op batch. void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); @@ -137,20 +114,20 @@ class SubchannelCall { template <typename T> friend class RefCountedPtr; - SubchannelCall(Args args, grpc_error** error); + SubchannelCall(Args args, grpc_error_handle* error); // If channelz is enabled, intercepts recv_trailing so that we may check the // status and associate it to a subchannel. void MaybeInterceptRecvTrailingMetadata( grpc_transport_stream_op_batch* batch); - static void RecvTrailingMetadataReady(void* arg, grpc_error* error); + static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); // Interface of RefCounted<>. void IncrementRefCount(); void IncrementRefCount(const DebugLocation& location, const char* reason); - static void Destroy(void* arg, grpc_error* error); + static void Destroy(void* arg, grpc_error_handle error); RefCountedPtr<ConnectedSubchannel> connected_subchannel_; grpc_closure* after_call_stack_destroy_ = nullptr; @@ -168,7 +145,7 @@ class SubchannelCall { // different from the SubchannelInterface that is exposed to LB policy // implementations. The client channel provides an adaptor class // (SubchannelWrapper) that "converts" between the two. -class Subchannel { +class Subchannel : public DualRefCounted<Subchannel> { public: class ConnectivityStateWatcherInterface : public RefCounted<ConnectivityStateWatcherInterface> { @@ -204,37 +181,29 @@ class Subchannel { ConnectivityStateChange PopConnectivityStateChange(); private: + Mutex mu_; // protects the queue // Keeps track of the updates that the watcher instance must be notified of. // TODO(yashkt): This is currently needed to send the state updates in the // right order when asynchronously notifying. This will no longer be // necessary when we have access to EventManager. - std::deque<ConnectivityStateChange> connectivity_state_queue_; - Mutex mu_; // protects the queue + std::deque<ConnectivityStateChange> connectivity_state_queue_ + ABSL_GUARDED_BY(&mu_); }; + // Creates a subchannel given \a connector and \a args. + static RefCountedPtr<Subchannel> Create( + OrphanablePtr<SubchannelConnector> connector, + const grpc_channel_args* args); + // The ctor and dtor are not intended to use directly. - Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector, + Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector, const grpc_channel_args* args); - ~Subchannel(); - - // Creates a subchannel given \a connector and \a args. - static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector, - const grpc_channel_args* args); + ~Subchannel() override; // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time // is larger than the subchannel's current keepalive time. The updated value // will have an affect when the subchannel creates a new ConnectedSubchannel. - void ThrottleKeepaliveTime(int new_keepalive_time); - - // Strong and weak refcounting. - Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - // Attempts to return a strong ref when only the weak refcount is guaranteed - // non-zero. If the strong refcount is zero, does not alter the refcount and - // returns null. - Subchannel* RefFromWeakRef(); + void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_); // Gets the string representing the subchannel address. // Caller doesn't take ownership. @@ -251,7 +220,8 @@ class Subchannel { // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel. grpc_connectivity_state CheckConnectivityState( const absl::optional<std::string>& health_check_service_name, - RefCountedPtr<ConnectedSubchannel>* connected_subchannel); + RefCountedPtr<ConnectedSubchannel>* connected_subchannel) + ABSL_LOCKS_EXCLUDED(mu_); // Starts watching the subchannel's connectivity state. // The first callback to the watcher will be delivered when the @@ -264,23 +234,27 @@ class Subchannel { void WatchConnectivityState( grpc_connectivity_state initial_state, const absl::optional<std::string>& health_check_service_name, - RefCountedPtr<ConnectivityStateWatcherInterface> watcher); + RefCountedPtr<ConnectivityStateWatcherInterface> watcher) + ABSL_LOCKS_EXCLUDED(mu_); // Cancels a connectivity state watch. // If the watcher has already been destroyed, this is a no-op. void CancelConnectivityStateWatch( const absl::optional<std::string>& health_check_service_name, - ConnectivityStateWatcherInterface* watcher); + ConnectivityStateWatcherInterface* watcher) ABSL_LOCKS_EXCLUDED(mu_); // Attempt to connect to the backend. Has no effect if already connected. - void AttemptToConnect(); + void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_); // Resets the connection backoff of the subchannel. // TODO(roth): Move connection backoff out of subchannels and up into LB // policy code (probably by adding a SubchannelGroup between // SubchannelList and SubchannelData), at which point this method can // go away. - void ResetBackoff(); + void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_); + + // Tears down any existing connection, and arranges for destruction + void Orphan() override ABSL_LOCKS_EXCLUDED(mu_); // Returns a new channel arg encoding the subchannel address as a URI // string. Caller is responsible for freeing the string. @@ -333,18 +307,20 @@ class Subchannel { class HealthWatcherMap { public: void AddWatcherLocked( - Subchannel* subchannel, grpc_connectivity_state initial_state, + WeakRefCountedPtr<Subchannel> subchannel, + grpc_connectivity_state initial_state, const std::string& health_check_service_name, RefCountedPtr<ConnectivityStateWatcherInterface> watcher); void RemoveWatcherLocked(const std::string& health_check_service_name, ConnectivityStateWatcherInterface* watcher); // Notifies the watcher when the subchannel's state changes. - void NotifyLocked(grpc_connectivity_state state, - const absl::Status& status); + void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_); grpc_connectivity_state CheckConnectivityStateLocked( - Subchannel* subchannel, const std::string& health_check_service_name); + Subchannel* subchannel, const std::string& health_check_service_name) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_); void ShutdownLocked(); @@ -360,72 +336,65 @@ class Subchannel { // Sets the subchannel's connectivity state to \a state. void SetConnectivityStateLocked(grpc_connectivity_state state, - const absl::Status& status); + const absl::Status& status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Methods for connection. - void MaybeStartConnectingLocked(); - static void OnRetryAlarm(void* arg, grpc_error* error); - void ContinueConnectingLocked(); - static void OnConnectingFinished(void* arg, grpc_error* error); - bool PublishTransportLocked(); - void Disconnect(); - - gpr_atm RefMutate(gpr_atm delta, - int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS); + void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + static void OnRetryAlarm(void* arg, grpc_error_handle error) + ABSL_LOCKS_EXCLUDED(mu_); + void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + static void OnConnectingFinished(void* arg, grpc_error_handle error) + ABSL_LOCKS_EXCLUDED(mu_); + bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // The subchannel pool this subchannel is in. RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; // TODO(juanlishen): Consider using args_ as key_ directly. // Subchannel key that identifies this subchannel in the subchannel pool. - SubchannelKey* key_; + const SubchannelKey key_; // Channel args. grpc_channel_args* args_; // pollset_set tracking who's interested in a connection being setup. grpc_pollset_set* pollset_set_; - // Protects the other members. - Mutex mu_; - // Refcount - // - lower INTERNAL_REF_BITS bits are for internal references: - // these do not keep the subchannel open. - // - upper remaining bits are for public references: these do - // keep the subchannel open - gpr_atm ref_pair_; - - // Connection states. + // Channelz tracking. + RefCountedPtr<channelz::SubchannelNode> channelz_node_; + + // Connection state. OrphanablePtr<SubchannelConnector> connector_; - // Set during connection. SubchannelConnector::Result connecting_result_; grpc_closure on_connecting_finished_; + + // Protects the other members. + Mutex mu_; + // Active connection, or null. - RefCountedPtr<ConnectedSubchannel> connected_subchannel_; - bool connecting_ = false; - bool disconnected_ = false; + RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_); + bool connecting_ ABSL_GUARDED_BY(mu_) = false; + bool disconnected_ ABSL_GUARDED_BY(mu_) = false; // Connectivity state tracking. - grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; - absl::Status status_; + grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE; + absl::Status status_ ABSL_GUARDED_BY(mu_); // The list of watchers without a health check service name. - ConnectivityStateWatcherList watcher_list_; + ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_); // The map of watchers with health check service names. - HealthWatcherMap health_watcher_map_; + HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_); // Backoff state. - BackOff backoff_; - grpc_millis next_attempt_deadline_; - grpc_millis min_connect_timeout_ms_; - bool backoff_begun_ = false; + BackOff backoff_ ABSL_GUARDED_BY(mu_); + grpc_millis next_attempt_deadline_ ABSL_GUARDED_BY(mu_); + grpc_millis min_connect_timeout_ms_ ABSL_GUARDED_BY(mu_); + bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false; // Retry alarm. - grpc_timer retry_alarm_; - grpc_closure on_retry_alarm_; - bool have_retry_alarm_ = false; + grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_); + grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_); + bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false; // reset_backoff() was called while alarm was pending. - bool retry_immediately_ = false; + bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false; // Keepalive time period (-1 for unset) - int keepalive_time_ = -1; - - // Channelz tracking. - RefCountedPtr<channelz::SubchannelNode> channelz_node_; + int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1; }; } // namespace grpc_core |