summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/client_channel/subchannel.h
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/subchannel.h')
-rw-r--r--grpc/src/core/ext/filters/client_channel/subchannel.h167
1 files changed, 68 insertions, 99 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/subchannel.h b/grpc/src/core/ext/filters/client_channel/subchannel.h
index 952a02a5..9667fe7f 100644
--- a/grpc/src/core/ext/filters/client_channel/subchannel.h
+++ b/grpc/src/core/ext/filters/client_channel/subchannel.h
@@ -30,6 +30,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/arena.h"
+#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@@ -41,31 +42,6 @@
// Channel arg containing a URI indicating the address to connect to.
#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
-// For debugging refcounting.
-#ifndef NDEBUG
-#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
-#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
- const char *file, int line, const char *reason
-#define GRPC_SUBCHANNEL_REF_REASON reason
-#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
- , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
-#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
-#else
-#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
-#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
-#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
-#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
-#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
-#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
-#define GRPC_SUBCHANNEL_REF_REASON ""
-#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
-#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
-#endif
-
namespace grpc_core {
class SubchannelCall;
@@ -111,7 +87,8 @@ class SubchannelCall {
grpc_call_context_element* context;
CallCombiner* call_combiner;
};
- static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
+ static RefCountedPtr<SubchannelCall> Create(Args args,
+ grpc_error_handle* error);
// Continues processing a transport stream op batch.
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
@@ -137,20 +114,20 @@ class SubchannelCall {
template <typename T>
friend class RefCountedPtr;
- SubchannelCall(Args args, grpc_error** error);
+ SubchannelCall(Args args, grpc_error_handle* error);
// If channelz is enabled, intercepts recv_trailing so that we may check the
// status and associate it to a subchannel.
void MaybeInterceptRecvTrailingMetadata(
grpc_transport_stream_op_batch* batch);
- static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
+ static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
// Interface of RefCounted<>.
void IncrementRefCount();
void IncrementRefCount(const DebugLocation& location, const char* reason);
- static void Destroy(void* arg, grpc_error* error);
+ static void Destroy(void* arg, grpc_error_handle error);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_closure* after_call_stack_destroy_ = nullptr;
@@ -168,7 +145,7 @@ class SubchannelCall {
// different from the SubchannelInterface that is exposed to LB policy
// implementations. The client channel provides an adaptor class
// (SubchannelWrapper) that "converts" between the two.
-class Subchannel {
+class Subchannel : public DualRefCounted<Subchannel> {
public:
class ConnectivityStateWatcherInterface
: public RefCounted<ConnectivityStateWatcherInterface> {
@@ -204,37 +181,29 @@ class Subchannel {
ConnectivityStateChange PopConnectivityStateChange();
private:
+ Mutex mu_; // protects the queue
// Keeps track of the updates that the watcher instance must be notified of.
// TODO(yashkt): This is currently needed to send the state updates in the
// right order when asynchronously notifying. This will no longer be
// necessary when we have access to EventManager.
- std::deque<ConnectivityStateChange> connectivity_state_queue_;
- Mutex mu_; // protects the queue
+ std::deque<ConnectivityStateChange> connectivity_state_queue_
+ ABSL_GUARDED_BY(&mu_);
};
+ // Creates a subchannel given \a connector and \a args.
+ static RefCountedPtr<Subchannel> Create(
+ OrphanablePtr<SubchannelConnector> connector,
+ const grpc_channel_args* args);
+
// The ctor and dtor are not intended to use directly.
- Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
+ Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args);
- ~Subchannel();
-
- // Creates a subchannel given \a connector and \a args.
- static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
- const grpc_channel_args* args);
+ ~Subchannel() override;
// Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
// is larger than the subchannel's current keepalive time. The updated value
// will have an affect when the subchannel creates a new ConnectedSubchannel.
- void ThrottleKeepaliveTime(int new_keepalive_time);
-
- // Strong and weak refcounting.
- Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
- void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
- Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
- void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
- // Attempts to return a strong ref when only the weak refcount is guaranteed
- // non-zero. If the strong refcount is zero, does not alter the refcount and
- // returns null.
- Subchannel* RefFromWeakRef();
+ void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_);
// Gets the string representing the subchannel address.
// Caller doesn't take ownership.
@@ -251,7 +220,8 @@ class Subchannel {
// If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
grpc_connectivity_state CheckConnectivityState(
const absl::optional<std::string>& health_check_service_name,
- RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
+ RefCountedPtr<ConnectedSubchannel>* connected_subchannel)
+ ABSL_LOCKS_EXCLUDED(mu_);
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
@@ -264,23 +234,27 @@ class Subchannel {
void WatchConnectivityState(
grpc_connectivity_state initial_state,
const absl::optional<std::string>& health_check_service_name,
- RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
+ ABSL_LOCKS_EXCLUDED(mu_);
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
void CancelConnectivityStateWatch(
const absl::optional<std::string>& health_check_service_name,
- ConnectivityStateWatcherInterface* watcher);
+ ConnectivityStateWatcherInterface* watcher) ABSL_LOCKS_EXCLUDED(mu_);
// Attempt to connect to the backend. Has no effect if already connected.
- void AttemptToConnect();
+ void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_);
// Resets the connection backoff of the subchannel.
// TODO(roth): Move connection backoff out of subchannels and up into LB
// policy code (probably by adding a SubchannelGroup between
// SubchannelList and SubchannelData), at which point this method can
// go away.
- void ResetBackoff();
+ void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
+
+ // Tears down any existing connection, and arranges for destruction
+ void Orphan() override ABSL_LOCKS_EXCLUDED(mu_);
// Returns a new channel arg encoding the subchannel address as a URI
// string. Caller is responsible for freeing the string.
@@ -333,18 +307,20 @@ class Subchannel {
class HealthWatcherMap {
public:
void AddWatcherLocked(
- Subchannel* subchannel, grpc_connectivity_state initial_state,
+ WeakRefCountedPtr<Subchannel> subchannel,
+ grpc_connectivity_state initial_state,
const std::string& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(const std::string& health_check_service_name,
ConnectivityStateWatcherInterface* watcher);
// Notifies the watcher when the subchannel's state changes.
- void NotifyLocked(grpc_connectivity_state state,
- const absl::Status& status);
+ void NotifyLocked(grpc_connectivity_state state, const absl::Status& status)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_);
grpc_connectivity_state CheckConnectivityStateLocked(
- Subchannel* subchannel, const std::string& health_check_service_name);
+ Subchannel* subchannel, const std::string& health_check_service_name)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_);
void ShutdownLocked();
@@ -360,72 +336,65 @@ class Subchannel {
// Sets the subchannel's connectivity state to \a state.
void SetConnectivityStateLocked(grpc_connectivity_state state,
- const absl::Status& status);
+ const absl::Status& status)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Methods for connection.
- void MaybeStartConnectingLocked();
- static void OnRetryAlarm(void* arg, grpc_error* error);
- void ContinueConnectingLocked();
- static void OnConnectingFinished(void* arg, grpc_error* error);
- bool PublishTransportLocked();
- void Disconnect();
-
- gpr_atm RefMutate(gpr_atm delta,
- int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
+ void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
+ static void OnRetryAlarm(void* arg, grpc_error_handle error)
+ ABSL_LOCKS_EXCLUDED(mu_);
+ void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
+ static void OnConnectingFinished(void* arg, grpc_error_handle error)
+ ABSL_LOCKS_EXCLUDED(mu_);
+ bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// The subchannel pool this subchannel is in.
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
// TODO(juanlishen): Consider using args_ as key_ directly.
// Subchannel key that identifies this subchannel in the subchannel pool.
- SubchannelKey* key_;
+ const SubchannelKey key_;
// Channel args.
grpc_channel_args* args_;
// pollset_set tracking who's interested in a connection being setup.
grpc_pollset_set* pollset_set_;
- // Protects the other members.
- Mutex mu_;
- // Refcount
- // - lower INTERNAL_REF_BITS bits are for internal references:
- // these do not keep the subchannel open.
- // - upper remaining bits are for public references: these do
- // keep the subchannel open
- gpr_atm ref_pair_;
-
- // Connection states.
+ // Channelz tracking.
+ RefCountedPtr<channelz::SubchannelNode> channelz_node_;
+
+ // Connection state.
OrphanablePtr<SubchannelConnector> connector_;
- // Set during connection.
SubchannelConnector::Result connecting_result_;
grpc_closure on_connecting_finished_;
+
+ // Protects the other members.
+ Mutex mu_;
+
// Active connection, or null.
- RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
- bool connecting_ = false;
- bool disconnected_ = false;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
+ bool connecting_ ABSL_GUARDED_BY(mu_) = false;
+ bool disconnected_ ABSL_GUARDED_BY(mu_) = false;
// Connectivity state tracking.
- grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
- absl::Status status_;
+ grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE;
+ absl::Status status_ ABSL_GUARDED_BY(mu_);
// The list of watchers without a health check service name.
- ConnectivityStateWatcherList watcher_list_;
+ ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_);
// The map of watchers with health check service names.
- HealthWatcherMap health_watcher_map_;
+ HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_);
// Backoff state.
- BackOff backoff_;
- grpc_millis next_attempt_deadline_;
- grpc_millis min_connect_timeout_ms_;
- bool backoff_begun_ = false;
+ BackOff backoff_ ABSL_GUARDED_BY(mu_);
+ grpc_millis next_attempt_deadline_ ABSL_GUARDED_BY(mu_);
+ grpc_millis min_connect_timeout_ms_ ABSL_GUARDED_BY(mu_);
+ bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false;
// Retry alarm.
- grpc_timer retry_alarm_;
- grpc_closure on_retry_alarm_;
- bool have_retry_alarm_ = false;
+ grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_);
+ grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_);
+ bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false;
// reset_backoff() was called while alarm was pending.
- bool retry_immediately_ = false;
+ bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false;
// Keepalive time period (-1 for unset)
- int keepalive_time_ = -1;
-
- // Channelz tracking.
- RefCountedPtr<channelz::SubchannelNode> channelz_node_;
+ int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;
};
} // namespace grpc_core