diff options
author | Mark D. Roth <roth@google.com> | 2019-06-13 09:50:37 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-13 09:50:37 -0700 |
commit | 82b06bef18955d8f66047f52878aeb9159041184 (patch) | |
tree | 6179003cd1042e3368de8c12a7ddf19dbcb11a6c /src/core/ext/filters/client_channel/lb_policy/subchannel_list.h | |
parent | fa6a71be5f1754fa8334b88b97d16cadc87702ca (diff) | |
parent | 7767fbe683fc95f562a6ae1a22c67828e80afebe (diff) | |
download | grpc-grpc-82b06bef18955d8f66047f52878aeb9159041184.tar.gz |
Merge pull request #19042 from markdroth/subchannel_interface_connected_subchannel
Hide ConnectedSubchannel from LB policy API.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/subchannel_list.h | 126 |
1 files changed, 25 insertions, 101 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 7d70928a83..34cd0f549f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -39,7 +39,6 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -64,8 +63,7 @@ class MySubchannelList }; */ -// All methods with a Locked() suffix must be called from within the -// client_channel combiner. +// All methods will be called from within the client_channel combiner. namespace grpc_core { @@ -93,20 +91,13 @@ class SubchannelData { // Returns a pointer to the subchannel. SubchannelInterface* subchannel() const { return subchannel_.get(); } - // Returns the connected subchannel. Will be null if the subchannel - // is not connected. - ConnectedSubchannelInterface* connected_subchannel() const { - return connected_subchannel_.get(); - } - // Synchronously checks the subchannel's connectivity state. // Must not be called while there is a connectivity notification // pending (i.e., between calling StartConnectivityWatchLocked() and // calling CancelConnectivityWatchLocked()). grpc_connectivity_state CheckConnectivityStateLocked() { GPR_ASSERT(pending_watcher_ == nullptr); - connectivity_state_ = - subchannel()->CheckConnectivityState(&connected_subchannel_); + connectivity_state_ = subchannel_->CheckConnectivityState(); return connectivity_state_; } @@ -144,7 +135,8 @@ class SubchannelData { private: // Watcher for subchannel connectivity state. - class Watcher : public SubchannelInterface::ConnectivityStateWatcher { + class Watcher + : public SubchannelInterface::ConnectivityStateWatcherInterface { public: Watcher( SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data, @@ -154,42 +146,13 @@ class SubchannelData { ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } - void OnConnectivityStateChange(grpc_connectivity_state new_state, - RefCountedPtr<ConnectedSubchannelInterface> - connected_subchannel) override; + void OnConnectivityStateChange(grpc_connectivity_state new_state) override; grpc_pollset_set* interested_parties() override { return subchannel_list_->policy()->interested_parties(); } private: - // A fire-and-forget class that bounces into the combiner to process - // a connectivity state update. - class Updater { - public: - Updater( - SubchannelData<SubchannelListType, SubchannelDataType>* - subchannel_data, - RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>> - subchannel_list, - grpc_connectivity_state state, - RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel); - - ~Updater() { - subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor"); - } - - private: - static void OnUpdateLocked(void* arg, grpc_error* error); - - SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_; - RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>> - subchannel_list_; - const grpc_connectivity_state state_; - RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_; - grpc_closure closure_; - }; - SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_; RefCountedPtr<SubchannelListType> subchannel_list_; }; @@ -202,10 +165,10 @@ class SubchannelData { // The subchannel. RefCountedPtr<SubchannelInterface> subchannel_; // Will be non-null when the subchannel's state is being watched. - SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr; + SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = + nullptr; // Data updated by the watcher. grpc_connectivity_state connectivity_state_; - RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_; }; // A list of subchannels. @@ -232,7 +195,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> { // the backoff code out of subchannels and into LB policies. void ResetBackoffLocked(); - // Note: Caller must ensure that this is invoked inside of the combiner. void Orphan() override { ShutdownLocked(); InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown"); @@ -242,7 +204,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> { protected: SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, - const ServerAddressList& addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args); @@ -263,8 +225,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> { TraceFlag* tracer_; - grpc_combiner* combiner_; - // The list of subchannels. SubchannelVector subchannels_; @@ -284,59 +244,26 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> { template <typename SubchannelListType, typename SubchannelDataType> void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher:: - OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel) { - // Will delete itself. - New<Updater>(subchannel_data_, - subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"), - new_state, std::move(connected_subchannel)); -} - -template <typename SubchannelListType, typename SubchannelDataType> -SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater:: - Updater( - SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data, - RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>> - subchannel_list, - grpc_connectivity_state state, - RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel) - : subchannel_data_(subchannel_data), - subchannel_list_(std::move(subchannel_list)), - state_(state), - connected_subchannel_(std::move(connected_subchannel)) { - GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this, - grpc_combiner_scheduler(subchannel_list_->combiner_)); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); -} - -template <typename SubchannelListType, typename SubchannelDataType> -void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater:: - OnUpdateLocked(void* arg, grpc_error* error) { - Updater* self = static_cast<Updater*>(arg); - SubchannelData* sd = self->subchannel_data_; - if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) { + OnConnectivityStateChange(grpc_connectivity_state new_state) { + if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): connectivity changed: state=%s, " - "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p", - sd->subchannel_list_->tracer()->name(), - sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(), - sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(), - grpc_connectivity_state_name(self->state_), - self->connected_subchannel_.get(), - sd->subchannel_list_->shutting_down(), sd->pending_watcher_); + "shutting_down=%d, pending_watcher=%p", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_.get(), subchannel_data_->Index(), + subchannel_list_->num_subchannels(), + subchannel_data_->subchannel_.get(), + grpc_connectivity_state_name(new_state), + subchannel_list_->shutting_down(), + subchannel_data_->pending_watcher_); } - if (!sd->subchannel_list_->shutting_down() && - sd->pending_watcher_ != nullptr) { - sd->connectivity_state_ = self->state_; - // Get or release ref to connected subchannel. - sd->connected_subchannel_ = std::move(self->connected_subchannel_); + if (!subchannel_list_->shutting_down() && + subchannel_data_->pending_watcher_ != nullptr) { + subchannel_data_->connectivity_state_ = new_state; // Call the subclass's ProcessConnectivityChangeLocked() method. - sd->ProcessConnectivityChangeLocked(sd->connectivity_state_); + subchannel_data_->ProcessConnectivityChangeLocked(new_state); } - // Clean up. - Delete(self); } // @@ -371,7 +298,6 @@ void SubchannelData<SubchannelListType, SubchannelDataType>:: subchannel_.get()); } subchannel_.reset(); - connected_subchannel_.reset(); } } @@ -400,7 +326,7 @@ void SubchannelData<SubchannelListType, New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); subchannel_->WatchConnectivityState( connectivity_state_, - UniquePtr<SubchannelInterface::ConnectivityStateWatcher>( + UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>( pending_watcher_)); } @@ -434,13 +360,12 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() { template <typename SubchannelListType, typename SubchannelDataType> SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( LoadBalancingPolicy* policy, TraceFlag* tracer, - const ServerAddressList& addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args) : InternallyRefCounted<SubchannelListType>(tracer), policy_(policy), - tracer_(tracer), - combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) { + tracer_(tracer) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", @@ -509,7 +434,6 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() { gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(), policy_, this); } - GRPC_COMBINER_UNREF(combiner_, "subchannel_list"); } template <typename SubchannelListType, typename SubchannelDataType> |