diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/subchannel.cc | 248 |
1 files changed, 86 insertions, 162 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/subchannel.cc b/grpc/src/core/ext/filters/client_channel/subchannel.cc index dbac59af..a3db6096 100644 --- a/grpc/src/core/ext/filters/client_channel/subchannel.cc +++ b/grpc/src/core/ext/filters/client_channel/subchannel.cc @@ -36,6 +36,8 @@ #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" +#include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -45,8 +47,6 @@ #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/parse_address.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" @@ -131,7 +131,7 @@ size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const { // RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args, - grpc_error** error) { + grpc_error_handle* error) { const size_t allocation_size = args.connected_subchannel->GetInitialCallSizeEstimate(); Arena* arena = args.arena; @@ -139,7 +139,7 @@ RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args, arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error)); } -SubchannelCall::SubchannelCall(Args args, grpc_error** error) +SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error) : connected_subchannel_(std::move(args.connected_subchannel)), deadline_(args.deadline) { grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this); @@ -156,8 +156,7 @@ SubchannelCall::SubchannelCall(Args args, grpc_error** error) *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1, SubchannelCall::Destroy, this, &call_args); if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) { - const char* error_string = grpc_error_string(*error); - gpr_log(GPR_ERROR, "error: %s", error_string); + gpr_log(GPR_ERROR, "error: %s", grpc_error_std_string(*error).c_str()); return; } grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); @@ -207,7 +206,7 @@ void SubchannelCall::Unref(const DebugLocation& /*location*/, GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); } -void SubchannelCall::Destroy(void* arg, grpc_error* /*error*/) { +void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) { GPR_TIMER_SCOPE("subchannel_call_destroy", 0); SubchannelCall* self = static_cast<SubchannelCall*>(arg); // Keep some members before destroying the subchannel call. @@ -252,7 +251,7 @@ namespace { // Sets *status based on the rest of the parameters. void GetCallStatus(grpc_status_code* status, grpc_millis deadline, - grpc_metadata_batch* md_batch, grpc_error* error) { + grpc_metadata_batch* md_batch, grpc_error_handle error) { if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); } else { @@ -268,7 +267,8 @@ void GetCallStatus(grpc_status_code* status, grpc_millis deadline, } // namespace -void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { +void SubchannelCall::RecvTrailingMetadataReady(void* arg, + grpc_error_handle error) { SubchannelCall* call = static_cast<SubchannelCall*>(arg); GPR_ASSERT(call->recv_trailing_metadata_ != nullptr); grpc_status_code status = GRPC_STATUS_OK; @@ -303,20 +303,17 @@ class Subchannel::ConnectedSubchannelStateWatcher : public AsyncConnectivityStateWatcherInterface { public: // Must be instantiated while holding c->mu. - explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { - // Steal subchannel ref for connecting. - GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); - GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); - } + explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c) + : subchannel_(std::move(c)) {} ~ConnectedSubchannelStateWatcher() override { - GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher"); + subchannel_.reset(DEBUG_LOCATION, "state_watcher"); } private: void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { - Subchannel* c = subchannel_; + Subchannel* c = subchannel_.get(); MutexLock lock(&c->mu_); switch (new_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -357,7 +354,7 @@ class Subchannel::ConnectedSubchannelStateWatcher } } - Subchannel* subchannel_; + WeakRefCountedPtr<Subchannel> subchannel_; }; // Asynchronously notifies the \a watcher of a change in the connectvity state @@ -378,7 +375,7 @@ class Subchannel::AsyncWatcherNotifierLocked { ExecCtx::Run(DEBUG_LOCATION, GRPC_CLOSURE_INIT( &closure_, - [](void* arg, grpc_error* /*error*/) { + [](void* arg, grpc_error_handle /*error*/) { auto* self = static_cast<AsyncWatcherNotifierLocked*>(arg); self->watcher_->OnConnectivityStateChange(); @@ -424,19 +421,19 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked( class Subchannel::HealthWatcherMap::HealthWatcher : public AsyncConnectivityStateWatcherInterface { public: - HealthWatcher(Subchannel* c, std::string health_check_service_name, - grpc_connectivity_state subchannel_state) - : subchannel_(c), + HealthWatcher(WeakRefCountedPtr<Subchannel> c, + std::string health_check_service_name) + : subchannel_(std::move(c)), health_check_service_name_(std::move(health_check_service_name)), - state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING - : subchannel_state) { - GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher"); + state_(subchannel_->state_ == GRPC_CHANNEL_READY + ? GRPC_CHANNEL_CONNECTING + : subchannel_->state_) { // If the subchannel is already connected, start health checking. - if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked(); + if (subchannel_->state_ == GRPC_CHANNEL_READY) StartHealthCheckingLocked(); } ~HealthWatcher() override { - GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher"); + subchannel_.reset(DEBUG_LOCATION, "health_watcher"); } const std::string& health_check_service_name() const { @@ -449,7 +446,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher grpc_connectivity_state initial_state, RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) { if (state_ != initial_state) { - new AsyncWatcherNotifierLocked(watcher, subchannel_, state_, status_); + new AsyncWatcherNotifierLocked(watcher, subchannel_.get(), state_, + status_); } watcher_list_.AddWatcherLocked(std::move(watcher)); } @@ -461,7 +459,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher bool HasWatchers() const { return !watcher_list_.empty(); } - void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) { + void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) { if (state == GRPC_CHANNEL_READY) { // If we had not already notified for CONNECTING state, do so now. // (We may have missed this earlier, because if the transition @@ -470,14 +469,14 @@ class Subchannel::HealthWatcherMap::HealthWatcher if (state_ != GRPC_CHANNEL_CONNECTING) { state_ = GRPC_CHANNEL_CONNECTING; status_ = status; - watcher_list_.NotifyLocked(subchannel_, state_, status); + watcher_list_.NotifyLocked(subchannel_.get(), state_, status); } // If we've become connected, start health checking. StartHealthCheckingLocked(); } else { state_ = state; status_ = status; - watcher_list_.NotifyLocked(subchannel_, state_, status); + watcher_list_.NotifyLocked(subchannel_.get(), state_, status); // We're not connected, so stop health checking. health_check_client_.reset(); } @@ -496,18 +495,19 @@ class Subchannel::HealthWatcherMap::HealthWatcher if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) { state_ = new_state; status_ = status; - watcher_list_.NotifyLocked(subchannel_, new_state, status); + watcher_list_.NotifyLocked(subchannel_.get(), new_state, status); } } - void StartHealthCheckingLocked() { + void StartHealthCheckingLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) { GPR_ASSERT(health_check_client_ == nullptr); health_check_client_ = MakeOrphanable<HealthCheckClient>( health_check_service_name_, subchannel_->connected_subchannel_, subchannel_->pollset_set_, subchannel_->channelz_node_, Ref()); } - Subchannel* subchannel_; + WeakRefCountedPtr<Subchannel> subchannel_; std::string health_check_service_name_; OrphanablePtr<HealthCheckClient> health_check_client_; grpc_connectivity_state state_; @@ -520,7 +520,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher // void Subchannel::HealthWatcherMap::AddWatcherLocked( - Subchannel* subchannel, grpc_connectivity_state initial_state, + WeakRefCountedPtr<Subchannel> subchannel, + grpc_connectivity_state initial_state, const std::string& health_check_service_name, RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { // If the health check service name is not already present in the map, @@ -528,8 +529,8 @@ void Subchannel::HealthWatcherMap::AddWatcherLocked( auto it = map_.find(health_check_service_name); HealthWatcher* health_watcher; if (it == map_.end()) { - auto w = MakeOrphanable<HealthWatcher>( - subchannel, health_check_service_name, subchannel->state_); + auto w = MakeOrphanable<HealthWatcher>(std::move(subchannel), + health_check_service_name); health_watcher = w.get(); map_.emplace(health_check_service_name, std::move(w)); } else { @@ -647,14 +648,16 @@ Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() { return state_change; } -Subchannel::Subchannel(SubchannelKey* key, +Subchannel::Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector, const grpc_channel_args* args) - : key_(key), + : DualRefCounted<Subchannel>( + GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel" + : nullptr), + key_(std::move(key)), connector_(std::move(connector)), backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) { GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); - gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS); pollset_set_ = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr))); @@ -704,26 +707,26 @@ Subchannel::~Subchannel() { grpc_channel_args_destroy(args_); connector_.reset(); grpc_pollset_set_destroy(pollset_set_); - delete key_; } -Subchannel* Subchannel::Create(OrphanablePtr<SubchannelConnector> connector, - const grpc_channel_args* args) { - SubchannelKey* key = new SubchannelKey(args); +RefCountedPtr<Subchannel> Subchannel::Create( + OrphanablePtr<SubchannelConnector> connector, + const grpc_channel_args* args) { + SubchannelKey key(args); SubchannelPoolInterface* subchannel_pool = SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args); GPR_ASSERT(subchannel_pool != nullptr); - Subchannel* c = subchannel_pool->FindSubchannel(key); + RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key); if (c != nullptr) { - delete key; return c; } - c = new Subchannel(key, std::move(connector), args); + c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args); // Try to register the subchannel before setting the subchannel pool. // Otherwise, in case of a registration race, unreffing c in // RegisterSubchannel() will cause c to be tried to be unregistered, while // its key maps to a different subchannel. - Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c); + RefCountedPtr<Subchannel> registered = + subchannel_pool->RegisterSubchannel(c->key_, c); if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); return registered; } @@ -747,68 +750,6 @@ void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { } } -Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = RefMutate((1 << INTERNAL_REF_BITS), - 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF")); - GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); - return this; -} - -void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - // add a weak ref and subtract a strong ref (atomically) - old_refs = RefMutate( - static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS), - 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF")); - if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { - Disconnect(); - } - GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref"); -} - -Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF")); - GPR_ASSERT(old_refs != 0); - return this; -} - -namespace { - -void subchannel_destroy(void* arg, grpc_error* /*error*/) { - Subchannel* self = static_cast<Subchannel*>(arg); - delete self; -} - -} // namespace - -void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = RefMutate(-static_cast<gpr_atm>(1), - 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF")); - if (old_refs == 1) { - ExecCtx::Run(DEBUG_LOCATION, - GRPC_CLOSURE_CREATE(subchannel_destroy, this, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); - } -} - -Subchannel* Subchannel::RefFromWeakRef() { - for (;;) { - gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_); - if (old_refs >= (1 << INTERNAL_REF_BITS)) { - gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); - if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) { - return this; - } - } else { - return nullptr; - } - } -} - const char* Subchannel::GetTargetAddress() { const grpc_arg* addr_arg = grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS); @@ -854,7 +795,8 @@ void Subchannel::WatchConnectivityState( watcher_list_.AddWatcherLocked(std::move(watcher)); } else { health_watcher_map_.AddWatcherLocked( - this, initial_state, *health_check_service_name, std::move(watcher)); + WeakRef(DEBUG_LOCATION, "health_watcher"), initial_state, + *health_check_service_name, std::move(watcher)); } } @@ -891,6 +833,21 @@ void Subchannel::ResetBackoff() { } } +void Subchannel::Orphan() { + // The subchannel_pool is only used once here in this subchannel, so the + // access can be outside of the lock. + if (subchannel_pool_ != nullptr) { + subchannel_pool_->UnregisterSubchannel(key_, this); + subchannel_pool_.reset(); + } + MutexLock lock(&mu_); + GPR_ASSERT(!disconnected_); + disconnected_ = true; + connector_.reset(); + connected_subchannel_.reset(); + health_watcher_map_.ShutdownLocked(); +} + grpc_arg Subchannel::CreateSubchannelAddressArg( const grpc_resolved_address* addr) { return grpc_channel_arg_string_create( @@ -984,7 +941,8 @@ void Subchannel::MaybeStartConnectingLocked() { return; } connecting_ = true; - GRPC_SUBCHANNEL_WEAK_REF(this, "connecting"); + WeakRef(DEBUG_LOCATION, "connecting") + .release(); // ref held by pending connect if (!backoff_begun_) { backoff_begun_ = true; ContinueConnectingLocked(); @@ -1005,11 +963,9 @@ void Subchannel::MaybeStartConnectingLocked() { } } -void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { - Subchannel* c = static_cast<Subchannel*>(arg); - // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use - // MutexLock instead of ReleasableMutexLock, here. - ReleasableMutexLock lock(&c->mu_); +void Subchannel::OnRetryAlarm(void* arg, grpc_error_handle error) { + WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg)); + MutexLock lock(&c->mu_); c->have_retry_alarm_ = false; if (c->disconnected_) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", @@ -1023,10 +979,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); c->ContinueConnectingLocked(); - lock.Unlock(); - } else { - lock.Unlock(); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + // Still connecting, keep ref around. Note that this stolen ref won't + // be dropped without first acquiring c->mu_. + c.release(); } GRPC_ERROR_UNREF(error); } @@ -1043,33 +998,30 @@ void Subchannel::ContinueConnectingLocked() { connector_->Connect(args, &connecting_result_, &on_connecting_finished_); } -void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { - auto* c = static_cast<Subchannel*>(arg); +void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) { + WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg)); const grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); { MutexLock lock(&c->mu_); c->connecting_ = false; if (c->connecting_result_.transport != nullptr && c->PublishTransportLocked()) { // Do nothing, transport was published. - } else if (c->disconnected_) { - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } else { - gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error)); + } else if (!c->disconnected_) { + gpr_log(GPR_INFO, "Connect failed: %s", + grpc_error_std_string(error).c_str()); c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } } - GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); grpc_channel_args_destroy(delete_channel_args); + c.reset(DEBUG_LOCATION, "connecting"); } namespace { -void ConnectionDestroy(void* arg, grpc_error* /*error*/) { +void ConnectionDestroy(void* arg, grpc_error_handle /*error*/) { grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg); grpc_channel_stack_destroy(stk); gpr_free(stk); @@ -1089,13 +1041,13 @@ bool Subchannel::PublishTransportLocked() { return false; } grpc_channel_stack* stk; - grpc_error* error = grpc_channel_stack_builder_finish( + grpc_error_handle error = grpc_channel_stack_builder_finish( builder, 0, 1, ConnectionDestroy, nullptr, reinterpret_cast<void**>(&stk)); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(connecting_result_.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", - grpc_error_string(error)); + grpc_error_std_string(error).c_str()); GRPC_ERROR_UNREF(error); return false; } @@ -1117,39 +1069,11 @@ bool Subchannel::PublishTransportLocked() { } // Start watching connected subchannel. connected_subchannel_->StartWatch( - pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(this)); + pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>( + WeakRef(DEBUG_LOCATION, "state_watcher"))); // Report initial state. SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status()); return true; } -void Subchannel::Disconnect() { - // The subchannel_pool is only used once here in this subchannel, so the - // access can be outside of the lock. - if (subchannel_pool_ != nullptr) { - subchannel_pool_->UnregisterSubchannel(key_); - subchannel_pool_.reset(); - } - MutexLock lock(&mu_); - GPR_ASSERT(!disconnected_); - disconnected_ = true; - connector_.reset(); - connected_subchannel_.reset(); - health_watcher_map_.ShutdownLocked(); -} - -gpr_atm Subchannel::RefMutate( - gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta) - : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta); -#ifndef NDEBUG - if (grpc_trace_subchannel_refcount.enabled()) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this, - purpose, old_val, old_val + delta, reason); - } -#endif - return old_val; -} - } // namespace grpc_core |