aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVijay Pai <vpai@google.com>2019-06-18 09:33:05 -0700
committerVijay Pai <vpai@google.com>2019-06-18 10:37:15 -0700
commit6124a835d4b130f385e57e1e5b7840ed6766959b (patch)
tree677d28e0e5a06139d37d52f1c83cefd8782d2cb3
parent766ab783bc9701d315a685eb5fa0fb7526a7d9df (diff)
downloadgrpc-grpc-6124a835d4b130f385e57e1e5b7840ed6766959b.tar.gz
Revert "Hide ConnectedSubchannel from LB policy API."
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc420
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc30
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc20
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h126
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc23
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h71
-rw-r--r--src/core/ext/filters/client_channel/subchannel_interface.h45
-rw-r--r--src/core/lib/gprpp/map.h28
-rw-r--r--src/core/lib/transport/metadata.cc5
-rw-r--r--test/core/gprpp/map_test.cc29
-rw-r--r--test/core/util/test_lb_policies.cc2
14 files changed, 298 insertions, 514 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index a67792fcd3..0b612e67a3 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -147,9 +147,6 @@ class ChannelData {
return service_config_;
}
- RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
- SubchannelInterface* subchannel) const;
-
grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
grpc_connectivity_state* state,
@@ -164,9 +161,9 @@ class ChannelData {
}
private:
- class SubchannelWrapper;
class ConnectivityStateAndPickerSetter;
class ServiceConfigSetter;
+ class GrpcSubchannel;
class ClientChannelControlHelper;
class ExternalConnectivityWatcher {
@@ -265,14 +262,7 @@ class ChannelData {
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false;
- // The number of SubchannelWrapper instances referencing a given Subchannel.
Map<Subchannel*, int> subchannel_refcount_map_;
- // Pending ConnectedSubchannel updates for each SubchannelWrapper.
- // Updates are queued here in the control plane combiner and then applied
- // in the data plane combiner when the picker is updated.
- Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
- RefCountedPtrLess<SubchannelWrapper>>
- pending_subchannel_updates_;
//
// Fields accessed from both data plane and control plane combiners.
@@ -717,247 +707,6 @@ class CallData {
};
//
-// ChannelData::SubchannelWrapper
-//
-
-// This class is a wrapper for Subchannel that hides details of the
-// channel's implementation (such as the health check service name and
-// connected subchannel) from the LB policy API.
-//
-// Note that no synchronization is needed here, because even if the
-// underlying subchannel is shared between channels, this wrapper will only
-// be used within one channel, so it will always be synchronized by the
-// control plane combiner.
-class ChannelData::SubchannelWrapper : public SubchannelInterface {
- public:
- SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
- UniquePtr<char> health_check_service_name)
- : SubchannelInterface(&grpc_client_channel_routing_trace),
- chand_(chand),
- subchannel_(subchannel),
- health_check_service_name_(std::move(health_check_service_name)) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: creating subchannel wrapper %p for subchannel %p",
- chand, this, subchannel_);
- }
- GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
- auto* subchannel_node = subchannel_->channelz_node();
- if (subchannel_node != nullptr) {
- intptr_t subchannel_uuid = subchannel_node->uuid();
- auto it = chand_->subchannel_refcount_map_.find(subchannel_);
- if (it == chand_->subchannel_refcount_map_.end()) {
- chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
- it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
- }
- ++it->second;
- }
- }
-
- ~SubchannelWrapper() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: destroying subchannel wrapper %p for subchannel %p",
- chand_, this, subchannel_);
- }
- auto* subchannel_node = subchannel_->channelz_node();
- if (subchannel_node != nullptr) {
- intptr_t subchannel_uuid = subchannel_node->uuid();
- auto it = chand_->subchannel_refcount_map_.find(subchannel_);
- GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
- --it->second;
- if (it->second == 0) {
- chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
- chand_->subchannel_refcount_map_.erase(it);
- }
- }
- GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
- GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
- }
-
- grpc_connectivity_state CheckConnectivityState() override {
- RefCountedPtr<ConnectedSubchannel> connected_subchannel;
- grpc_connectivity_state connectivity_state =
- subchannel_->CheckConnectivityState(health_check_service_name_.get(),
- &connected_subchannel);
- MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
- return connectivity_state;
- }
-
- void WatchConnectivityState(
- grpc_connectivity_state initial_state,
- UniquePtr<ConnectivityStateWatcherInterface> watcher) override {
- auto& watcher_wrapper = watcher_map_[watcher.get()];
- GPR_ASSERT(watcher_wrapper == nullptr);
- watcher_wrapper = New<WatcherWrapper>(
- std::move(watcher), Ref(DEBUG_LOCATION, "WatcherWrapper"));
- subchannel_->WatchConnectivityState(
- initial_state,
- UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
- OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
- watcher_wrapper));
- }
-
- void CancelConnectivityStateWatch(
- ConnectivityStateWatcherInterface* watcher) override {
- auto it = watcher_map_.find(watcher);
- GPR_ASSERT(it != watcher_map_.end());
- subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
- it->second);
- watcher_map_.erase(it);
- }
-
- void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
-
- void ResetBackoff() override { subchannel_->ResetBackoff(); }
-
- const grpc_channel_args* channel_args() override {
- return subchannel_->channel_args();
- }
-
- // Caller must be holding the control-plane combiner.
- ConnectedSubchannel* connected_subchannel() const {
- return connected_subchannel_.get();
- }
-
- // Caller must be holding the data-plane combiner.
- ConnectedSubchannel* connected_subchannel_in_data_plane() const {
- return connected_subchannel_in_data_plane_.get();
- }
- void set_connected_subchannel_in_data_plane(
- RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
- connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
- }
-
- private:
- // Subchannel and SubchannelInterface have different interfaces for
- // their respective ConnectivityStateWatcherInterface classes.
- // The one in Subchannel updates the ConnectedSubchannel along with
- // the state, whereas the one in SubchannelInterface does not expose
- // the ConnectedSubchannel.
- //
- // This wrapper provides a bridge between the two. It implements
- // Subchannel::ConnectivityStateWatcherInterface and wraps
- // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
- // that was passed in by the LB policy. We pass an instance of this
- // class to the underlying Subchannel, and when we get updates from
- // the subchannel, we pass those on to the wrapped watcher to return
- // the update to the LB policy. This allows us to set the connected
- // subchannel before passing the result back to the LB policy.
- class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
- public:
- WatcherWrapper(
- UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>
- watcher,
- RefCountedPtr<SubchannelWrapper> parent)
- : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
-
- ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); }
-
- void Orphan() override { Unref(); }
-
- void OnConnectivityStateChange(
- grpc_connectivity_state new_state,
- RefCountedPtr<ConnectedSubchannel> connected_subchannel) override {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: connectivity change for subchannel wrapper %p "
- "subchannel %p (connected_subchannel=%p state=%s); "
- "hopping into combiner",
- parent_->chand_, parent_.get(), parent_->subchannel_,
- connected_subchannel.get(),
- grpc_connectivity_state_name(new_state));
- }
- // Will delete itself.
- New<Updater>(Ref(), new_state, std::move(connected_subchannel));
- }
-
- grpc_pollset_set* interested_parties() override {
- return watcher_->interested_parties();
- }
-
- private:
- class Updater {
- public:
- Updater(RefCountedPtr<WatcherWrapper> parent,
- grpc_connectivity_state new_state,
- RefCountedPtr<ConnectedSubchannel> connected_subchannel)
- : parent_(std::move(parent)),
- state_(new_state),
- connected_subchannel_(std::move(connected_subchannel)) {
- GRPC_CLOSURE_INIT(
- &closure_, ApplyUpdateInControlPlaneCombiner, this,
- grpc_combiner_scheduler(parent_->parent_->chand_->combiner_));
- GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
- }
-
- private:
- static void ApplyUpdateInControlPlaneCombiner(void* arg,
- grpc_error* error) {
- Updater* self = static_cast<Updater*>(arg);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: processing connectivity change in combiner "
- "for subchannel wrapper %p subchannel %p "
- "(connected_subchannel=%p state=%s)",
- self->parent_->parent_->chand_, self->parent_->parent_.get(),
- self->parent_->parent_->subchannel_,
- self->connected_subchannel_.get(),
- grpc_connectivity_state_name(self->state_));
- }
- self->parent_->parent_->MaybeUpdateConnectedSubchannel(
- std::move(self->connected_subchannel_));
- self->parent_->watcher_->OnConnectivityStateChange(self->state_);
- Delete(self);
- }
-
- RefCountedPtr<WatcherWrapper> parent_;
- grpc_connectivity_state state_;
- RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
- grpc_closure closure_;
- };
-
- UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher_;
- RefCountedPtr<SubchannelWrapper> parent_;
- };
-
- void MaybeUpdateConnectedSubchannel(
- RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
- // Update the connected subchannel only if the channel is not shutting
- // down. This is because once the channel is shutting down, we
- // ignore picker updates from the LB policy, which means that
- // ConnectivityStateAndPickerSetter will never process the entries
- // in chand_->pending_subchannel_updates_. So we don't want to add
- // entries there that will never be processed, since that would
- // leave dangling refs to the channel and prevent its destruction.
- grpc_error* disconnect_error = chand_->disconnect_error();
- if (disconnect_error != GRPC_ERROR_NONE) return;
- // Not shutting down, so do the update.
- if (connected_subchannel_ != connected_subchannel) {
- connected_subchannel_ = std::move(connected_subchannel);
- // Record the new connected subchannel so that it can be updated
- // in the data plane combiner the next time the picker is updated.
- chand_->pending_subchannel_updates_[Ref(
- DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
- }
- }
-
- ChannelData* chand_;
- Subchannel* subchannel_;
- UniquePtr<char> health_check_service_name_;
- // Maps from the address of the watcher passed to us by the LB policy
- // to the address of the WrapperWatcher that we passed to the underlying
- // subchannel. This is needed so that when the LB policy calls
- // CancelConnectivityStateWatch() with its watcher, we know the
- // corresponding WrapperWatcher to cancel on the underlying subchannel.
- Map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
- // To be accessed only in the control plane combiner.
- RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
- // To be accessed only in the data plane combiner.
- RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
-};
-
-//
// ChannelData::ConnectivityStateAndPickerSetter
//
@@ -980,13 +729,10 @@ class ChannelData::ConnectivityStateAndPickerSetter {
grpc_slice_from_static_string(
GetChannelConnectivityStateChangeString(state)));
}
- // Grab any pending subchannel updates.
- pending_subchannel_updates_ =
- std::move(chand_->pending_subchannel_updates_);
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
"ConnectivityStateAndPickerSetter");
- GRPC_CLOSURE_INIT(&closure_, SetPickerInDataPlane, this,
+ GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
grpc_combiner_scheduler(chand->data_plane_combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
@@ -1009,38 +755,16 @@ class ChannelData::ConnectivityStateAndPickerSetter {
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
- static void SetPickerInDataPlane(void* arg, grpc_error* ignored) {
+ static void SetPicker(void* arg, grpc_error* ignored) {
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
- // Handle subchannel updates.
- for (auto& p : self->pending_subchannel_updates_) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: updating subchannel wrapper %p data plane "
- "connected_subchannel to %p",
- self->chand_, p.first.get(), p.second.get());
- }
- p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
- }
- // Swap out the picker. We hang on to the old picker so that it can
- // be deleted in the control-plane combiner, since that's where we need
- // to unref the subchannel wrappers that are reffed by the picker.
- self->picker_.swap(self->chand_->picker_);
+ // Update picker.
+ self->chand_->picker_ = std::move(self->picker_);
// Re-process queued picks.
for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
pick = pick->next) {
CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
}
- // Pop back into the control plane combiner to delete ourself, so
- // that we make sure to unref subchannel wrappers there. This
- // includes both the ones reffed by the old picker (now stored in
- // self->picker_) and the ones in self->pending_subchannel_updates_.
- GRPC_CLOSURE_INIT(&self->closure_, CleanUpInControlPlane, self,
- grpc_combiner_scheduler(self->chand_->combiner_));
- GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE);
- }
-
- static void CleanUpInControlPlane(void* arg, grpc_error* ignored) {
- auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
+ // Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityStateAndPickerSetter");
Delete(self);
@@ -1048,9 +772,6 @@ class ChannelData::ConnectivityStateAndPickerSetter {
ChannelData* chand_;
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
- Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
- RefCountedPtrLess<SubchannelWrapper>>
- pending_subchannel_updates_;
grpc_closure closure_;
};
@@ -1226,6 +947,89 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
}
//
+// ChannelData::GrpcSubchannel
+//
+
+// This class is a wrapper for Subchannel that hides details of the
+// channel's implementation (such as the health check service name) from
+// the LB policy API.
+//
+// Note that no synchronization is needed here, because even if the
+// underlying subchannel is shared between channels, this wrapper will only
+// be used within one channel, so it will always be synchronized by the
+// control plane combiner.
+class ChannelData::GrpcSubchannel : public SubchannelInterface {
+ public:
+ GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
+ UniquePtr<char> health_check_service_name)
+ : chand_(chand),
+ subchannel_(subchannel),
+ health_check_service_name_(std::move(health_check_service_name)) {
+ GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
+ auto* subchannel_node = subchannel_->channelz_node();
+ if (subchannel_node != nullptr) {
+ intptr_t subchannel_uuid = subchannel_node->uuid();
+ auto it = chand_->subchannel_refcount_map_.find(subchannel_);
+ if (it == chand_->subchannel_refcount_map_.end()) {
+ chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
+ it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
+ }
+ ++it->second;
+ }
+ }
+
+ ~GrpcSubchannel() {
+ auto* subchannel_node = subchannel_->channelz_node();
+ if (subchannel_node != nullptr) {
+ intptr_t subchannel_uuid = subchannel_node->uuid();
+ auto it = chand_->subchannel_refcount_map_.find(subchannel_);
+ GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
+ --it->second;
+ if (it->second == 0) {
+ chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
+ chand_->subchannel_refcount_map_.erase(it);
+ }
+ }
+ GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
+ GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
+ }
+
+ grpc_connectivity_state CheckConnectivityState(
+ RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
+ override {
+ RefCountedPtr<ConnectedSubchannel> tmp;
+ auto retval = subchannel_->CheckConnectivityState(
+ health_check_service_name_.get(), &tmp);
+ *connected_subchannel = std::move(tmp);
+ return retval;
+ }
+
+ void WatchConnectivityState(
+ grpc_connectivity_state initial_state,
+ UniquePtr<ConnectivityStateWatcher> watcher) override {
+ subchannel_->WatchConnectivityState(
+ initial_state,
+ UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
+ std::move(watcher));
+ }
+
+ void CancelConnectivityStateWatch(
+ ConnectivityStateWatcher* watcher) override {
+ subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
+ watcher);
+ }
+
+ void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
+
+ void ResetBackoff() override { subchannel_->ResetBackoff(); }
+
+ private:
+ ChannelData* chand_;
+ Subchannel* subchannel_;
+ UniquePtr<char> health_check_service_name_;
+};
+
+//
// ChannelData::ClientChannelControlHelper
//
@@ -1262,8 +1066,8 @@ class ChannelData::ClientChannelControlHelper
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) return nullptr;
- return MakeRefCounted<SubchannelWrapper>(
- chand_, subchannel, std::move(health_check_service_name));
+ return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
+ std::move(health_check_service_name));
}
grpc_channel* CreateChannel(const char* target,
@@ -1274,7 +1078,8 @@ class ChannelData::ClientChannelControlHelper
void UpdateState(
grpc_connectivity_state state,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
- grpc_error* disconnect_error = chand_->disconnect_error();
+ grpc_error* disconnect_error =
+ chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
const char* extra = disconnect_error == GRPC_ERROR_NONE
? ""
@@ -1646,13 +1451,9 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
}
LoadBalancingPolicy::PickResult result =
picker_->Pick(LoadBalancingPolicy::PickArgs());
- ConnectedSubchannel* connected_subchannel = nullptr;
- if (result.subchannel != nullptr) {
- SubchannelWrapper* subchannel =
- static_cast<SubchannelWrapper*>(result.subchannel.get());
- connected_subchannel = subchannel->connected_subchannel();
- }
- if (connected_subchannel != nullptr) {
+ if (result.connected_subchannel != nullptr) {
+ ConnectedSubchannel* connected_subchannel =
+ static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
} else {
if (result.error == GRPC_ERROR_NONE) {
@@ -1695,10 +1496,6 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
}
// Disconnect.
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
- gpr_log(GPR_INFO, "chand=%p: channel shut down from API: %s", chand,
- grpc_error_string(op->disconnect_with_error));
- }
grpc_error* error = GRPC_ERROR_NONE;
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
@@ -1773,17 +1570,6 @@ void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
}
}
-RefCountedPtr<ConnectedSubchannel>
-ChannelData::GetConnectedSubchannelInDataPlane(
- SubchannelInterface* subchannel) const {
- SubchannelWrapper* subchannel_wrapper =
- static_cast<SubchannelWrapper*>(subchannel);
- ConnectedSubchannel* connected_subchannel =
- subchannel_wrapper->connected_subchannel_in_data_plane();
- if (connected_subchannel == nullptr) return nullptr;
- return connected_subchannel->Ref();
-}
-
void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
auto* chand = static_cast<ChannelData*>(arg);
if (chand->resolving_lb_policy_ != nullptr) {
@@ -3711,9 +3497,10 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
auto result = chand->picker()->Pick(pick_args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
- "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)",
+ "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
+ "error=%s)",
chand, calld, PickResultTypeName(result.type),
- result.subchannel.get(), grpc_error_string(result.error));
+ result.connected_subchannel.get(), grpc_error_string(result.error));
}
switch (result.type) {
case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
@@ -3755,16 +3542,11 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
break;
default: // PICK_COMPLETE
// Handle drops.
- if (GPR_UNLIKELY(result.subchannel == nullptr)) {
+ if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
- } else {
- // Grab a ref to the connected subchannel while we're still
- // holding the data plane combiner.
- calld->connected_subchannel_ =
- chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
- GPR_ASSERT(calld->connected_subchannel_ != nullptr);
}
+ calld->connected_subchannel_ = std::move(result.connected_subchannel);
calld->lb_recv_trailing_metadata_ready_ =
result.recv_trailing_metadata_ready;
calld->lb_recv_trailing_metadata_ready_user_data_ =
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index c21e9d90ec..f98a41dee0 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -128,7 +128,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Used only if type is PICK_COMPLETE. Will be set to the selected
/// subchannel, or nullptr if the LB policy decides to drop the call.
- RefCountedPtr<SubchannelInterface> subchannel;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel;
/// Used only if type is PICK_TRANSIENT_FAILURE.
/// Error to be set when returning a transient failure.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index dad10f0ce8..a87dfda732 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -575,12 +575,13 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
result = child_picker_->Pick(args);
// If pick succeeded, add LB token to initial metadata.
if (result.type == PickResult::PICK_COMPLETE &&
- result.subchannel != nullptr) {
+ result.connected_subchannel != nullptr) {
const grpc_arg* arg = grpc_channel_args_find(
- result.subchannel->channel_args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
+ result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
if (arg == nullptr) {
- gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p",
- parent_, this, result.subchannel.get());
+ gpr_log(GPR_ERROR,
+ "[grpclb %p picker %p] No LB token for connected subchannel %p",
+ parent_, this, result.connected_subchannel.get());
abort();
}
grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 8bf3d825b2..00036d8be6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -28,6 +28,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -84,8 +85,9 @@ class PickFirst : public LoadBalancingPolicy {
public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
+ grpc_combiner* combiner,
const grpc_channel_args& args)
- : SubchannelList(policy, tracer, addresses,
+ : SubchannelList(policy, tracer, addresses, combiner,
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
@@ -109,18 +111,19 @@ class PickFirst : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
- explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
- : subchannel_(std::move(subchannel)) {}
+ explicit Picker(
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
+ : connected_subchannel_(std::move(connected_subchannel)) {}
PickResult Pick(PickArgs args) override {
PickResult result;
result.type = PickResult::PICK_COMPLETE;
- result.subchannel = subchannel_;
+ result.connected_subchannel = connected_subchannel_;
return result;
}
private:
- RefCountedPtr<SubchannelInterface> subchannel_;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
void ShutdownLocked() override;
@@ -163,9 +166,6 @@ void PickFirst::ShutdownLocked() {
void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (idle_) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
- }
idle_ = false;
if (subchannel_list_ == nullptr ||
subchannel_list_->num_subchannels() == 0) {
@@ -200,7 +200,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
- this, &grpc_lb_pick_first_trace, args.addresses, *new_args);
+ this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
@@ -351,8 +351,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// some connectivity state notifications.
if (connectivity_state == GRPC_CHANNEL_READY) {
p->channel_control_helper()->UpdateState(
- GRPC_CHANNEL_READY,
- UniquePtr<SubchannelPicker>(New<Picker>(subchannel()->Ref())));
+ GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>(
+ connected_subchannel()->Ref())));
} else { // CONNECTING
p->channel_control_helper()->UpdateState(
connectivity_state, UniquePtr<SubchannelPicker>(New<QueuePicker>(
@@ -447,13 +447,13 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
- }
p->selected_ = this;
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
- UniquePtr<SubchannelPicker>(New<Picker>(subchannel()->Ref())));
+ UniquePtr<SubchannelPicker>(New<Picker>(connected_subchannel()->Ref())));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
+ }
}
void PickFirst::PickFirstSubchannelData::
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 04308ee254..c6655c7d9b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -38,6 +38,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -105,8 +106,9 @@ class RoundRobin : public LoadBalancingPolicy {
public:
RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
+ grpc_combiner* combiner,
const grpc_channel_args& args)
- : SubchannelList(policy, tracer, addresses,
+ : SubchannelList(policy, tracer, addresses, combiner,
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
@@ -153,7 +155,7 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobin* parent_;
size_t last_picked_index_;
- InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
+ InlinedVector<RefCountedPtr<ConnectedSubchannelInterface>, 10> subchannels_;
};
void ShutdownLocked() override;
@@ -178,9 +180,10 @@ RoundRobin::Picker::Picker(RoundRobin* parent,
RoundRobinSubchannelList* subchannel_list)
: parent_(parent) {
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
- RoundRobinSubchannelData* sd = subchannel_list->subchannel(i);
- if (sd->connectivity_state() == GRPC_CHANNEL_READY) {
- subchannels_.push_back(sd->subchannel()->Ref());
+ auto* connected_subchannel =
+ subchannel_list->subchannel(i)->connected_subchannel();
+ if (connected_subchannel != nullptr) {
+ subchannels_.push_back(connected_subchannel->Ref());
}
}
// For discussion on why we generate a random starting index for
@@ -201,13 +204,14 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
- "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p",
+ "[RR %p picker %p] returning index %" PRIuPTR
+ ", connected_subchannel=%p",
parent_, this, last_picked_index_,
subchannels_[last_picked_index_].get());
}
PickResult result;
result.type = PickResult::PICK_COMPLETE;
- result.subchannel = subchannels_[last_picked_index_];
+ result.connected_subchannel = subchannels_[last_picked_index_];
return result;
}
@@ -420,7 +424,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
- this, &grpc_lb_round_robin_trace, args.addresses, *args.args);
+ this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args);
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 34cd0f549f..7d70928a83 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -39,6 +39,7 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -63,7 +64,8 @@ class MySubchannelList
};
*/
-// All methods will be called from within the client_channel combiner.
+// All methods with a Locked() suffix must be called from within the
+// client_channel combiner.
namespace grpc_core {
@@ -91,13 +93,20 @@ class SubchannelData {
// Returns a pointer to the subchannel.
SubchannelInterface* subchannel() const { return subchannel_.get(); }
+ // Returns the connected subchannel. Will be null if the subchannel
+ // is not connected.
+ ConnectedSubchannelInterface* connected_subchannel() const {
+ return connected_subchannel_.get();
+ }
+
// Synchronously checks the subchannel's connectivity state.
// Must not be called while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() and
// calling CancelConnectivityWatchLocked()).
grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(pending_watcher_ == nullptr);
- connectivity_state_ = subchannel_->CheckConnectivityState();
+ connectivity_state_ =
+ subchannel()->CheckConnectivityState(&connected_subchannel_);
return connectivity_state_;
}
@@ -135,8 +144,7 @@ class SubchannelData {
private:
// Watcher for subchannel connectivity state.
- class Watcher
- : public SubchannelInterface::ConnectivityStateWatcherInterface {
+ class Watcher : public SubchannelInterface::ConnectivityStateWatcher {
public:
Watcher(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
@@ -146,13 +154,42 @@ class SubchannelData {
~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
- void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
+ void OnConnectivityStateChange(grpc_connectivity_state new_state,
+ RefCountedPtr<ConnectedSubchannelInterface>
+ connected_subchannel) override;
grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy()->interested_parties();
}
private:
+ // A fire-and-forget class that bounces into the combiner to process
+ // a connectivity state update.
+ class Updater {
+ public:
+ Updater(
+ SubchannelData<SubchannelListType, SubchannelDataType>*
+ subchannel_data,
+ RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
+ subchannel_list,
+ grpc_connectivity_state state,
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel);
+
+ ~Updater() {
+ subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor");
+ }
+
+ private:
+ static void OnUpdateLocked(void* arg, grpc_error* error);
+
+ SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
+ RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
+ subchannel_list_;
+ const grpc_connectivity_state state_;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
+ grpc_closure closure_;
+ };
+
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
RefCountedPtr<SubchannelListType> subchannel_list_;
};
@@ -165,10 +202,10 @@ class SubchannelData {
// The subchannel.
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
- SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
- nullptr;
+ SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr;
// Data updated by the watcher.
grpc_connectivity_state connectivity_state_;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
// A list of subchannels.
@@ -195,6 +232,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// the backoff code out of subchannels and into LB policies.
void ResetBackoffLocked();
+ // Note: Caller must ensure that this is invoked inside of the combiner.
void Orphan() override {
ShutdownLocked();
InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
@@ -204,7 +242,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
protected:
SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
- const ServerAddressList& addresses,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args);
@@ -225,6 +263,8 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
TraceFlag* tracer_;
+ grpc_combiner* combiner_;
+
// The list of subchannels.
SubchannelVector subchannels_;
@@ -244,26 +284,59 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
- OnConnectivityStateChange(grpc_connectivity_state new_state) {
- if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
+ OnConnectivityStateChange(
+ grpc_connectivity_state new_state,
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel) {
+ // Will delete itself.
+ New<Updater>(subchannel_data_,
+ subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"),
+ new_state, std::move(connected_subchannel));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
+ Updater(
+ SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
+ RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
+ subchannel_list,
+ grpc_connectivity_state state,
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
+ : subchannel_data_(subchannel_data),
+ subchannel_list_(std::move(subchannel_list)),
+ state_(state),
+ connected_subchannel_(std::move(connected_subchannel)) {
+ GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this,
+ grpc_combiner_scheduler(subchannel_list_->combiner_));
+ GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
+ OnUpdateLocked(void* arg, grpc_error* error) {
+ Updater* self = static_cast<Updater*>(arg);
+ SubchannelData* sd = self->subchannel_data_;
+ if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: state=%s, "
- "shutting_down=%d, pending_watcher=%p",
- subchannel_list_->tracer()->name(), subchannel_list_->policy(),
- subchannel_list_.get(), subchannel_data_->Index(),
- subchannel_list_->num_subchannels(),
- subchannel_data_->subchannel_.get(),
- grpc_connectivity_state_name(new_state),
- subchannel_list_->shutting_down(),
- subchannel_data_->pending_watcher_);
+ "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p",
+ sd->subchannel_list_->tracer()->name(),
+ sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(),
+ sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(),
+ grpc_connectivity_state_name(self->state_),
+ self->connected_subchannel_.get(),
+ sd->subchannel_list_->shutting_down(), sd->pending_watcher_);
}
- if (!subchannel_list_->shutting_down() &&
- subchannel_data_->pending_watcher_ != nullptr) {
- subchannel_data_->connectivity_state_ = new_state;
+ if (!sd->subchannel_list_->shutting_down() &&
+ sd->pending_watcher_ != nullptr) {
+ sd->connectivity_state_ = self->state_;
+ // Get or release ref to connected subchannel.
+ sd->connected_subchannel_ = std::move(self->connected_subchannel_);
// Call the subclass's ProcessConnectivityChangeLocked() method.
- subchannel_data_->ProcessConnectivityChangeLocked(new_state);
+ sd->ProcessConnectivityChangeLocked(sd->connectivity_state_);
}
+ // Clean up.
+ Delete(self);
}
//
@@ -298,6 +371,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
subchannel_.get());
}
subchannel_.reset();
+ connected_subchannel_.reset();
}
}
@@ -326,7 +400,7 @@ void SubchannelData<SubchannelListType,
New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState(
connectivity_state_,
- UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>(
+ UniquePtr<SubchannelInterface::ConnectivityStateWatcher>(
pending_watcher_));
}
@@ -360,12 +434,13 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, TraceFlag* tracer,
- const ServerAddressList& addresses,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args)
: InternallyRefCounted<SubchannelListType>(tracer),
policy_(policy),
- tracer_(tracer) {
+ tracer_(tracer),
+ combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
@@ -434,6 +509,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
policy_, this);
}
+ GRPC_COMBINER_UNREF(combiner_, "subchannel_list");
}
template <typename SubchannelListType, typename SubchannelDataType>
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 74660cec92..e790ec2552 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -570,7 +570,7 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
PickResult result = PickFromLocality(key, args);
// If pick succeeded, add client stats.
if (result.type == PickResult::PICK_COMPLETE &&
- result.subchannel != nullptr && client_stats_ != nullptr) {
+ result.connected_subchannel != nullptr && client_stats_ != nullptr) {
// TODO(roth): Add support for client stats.
}
return result;
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 99c7721e5e..dd16eded82 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -86,7 +86,7 @@ ConnectedSubchannel::ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
intptr_t socket_uuid)
- : RefCounted<ConnectedSubchannel>(&grpc_trace_subchannel_refcount),
+ : ConnectedSubchannelInterface(&grpc_trace_subchannel_refcount),
channel_stack_(channel_stack),
args_(grpc_channel_args_copy(args)),
channelz_subchannel_(std::move(channelz_subchannel)),
@@ -378,12 +378,12 @@ class Subchannel::ConnectedSubchannelStateWatcher {
//
void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+ UniquePtr<ConnectivityStateWatcher> watcher) {
watchers_.insert(MakePair(watcher.get(), std::move(watcher)));
}
void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
- ConnectivityStateWatcherInterface* watcher) {
+ ConnectivityStateWatcher* watcher) {
watchers_.erase(watcher);
}
@@ -438,9 +438,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher
grpc_connectivity_state state() const { return state_; }
- void AddWatcherLocked(
- grpc_connectivity_state initial_state,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+ void AddWatcherLocked(grpc_connectivity_state initial_state,
+ UniquePtr<ConnectivityStateWatcher> watcher) {
if (state_ != initial_state) {
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
if (state_ == GRPC_CHANNEL_READY) {
@@ -452,7 +451,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher
watcher_list_.AddWatcherLocked(std::move(watcher));
}
- void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher) {
+ void RemoveWatcherLocked(ConnectivityStateWatcher* watcher) {
watcher_list_.RemoveWatcherLocked(watcher);
}
@@ -528,7 +527,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher
void Subchannel::HealthWatcherMap::AddWatcherLocked(
Subchannel* subchannel, grpc_connectivity_state initial_state,
UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+ UniquePtr<ConnectivityStateWatcher> watcher) {
// If the health check service name is not already present in the map,
// add it.
auto it = map_.find(health_check_service_name.get());
@@ -547,8 +546,7 @@ void Subchannel::HealthWatcherMap::AddWatcherLocked(
}
void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
- const char* health_check_service_name,
- ConnectivityStateWatcherInterface* watcher) {
+ const char* health_check_service_name, ConnectivityStateWatcher* watcher) {
auto it = map_.find(health_check_service_name);
GPR_ASSERT(it != map_.end());
it->second->RemoveWatcherLocked(watcher);
@@ -820,7 +818,7 @@ grpc_connectivity_state Subchannel::CheckConnectivityState(
void Subchannel::WatchConnectivityState(
grpc_connectivity_state initial_state,
UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+ UniquePtr<ConnectivityStateWatcher> watcher) {
MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) {
@@ -839,8 +837,7 @@ void Subchannel::WatchConnectivityState(
}
void Subchannel::CancelConnectivityStateWatch(
- const char* health_check_service_name,
- ConnectivityStateWatcherInterface* watcher) {
+ const char* health_check_service_name, ConnectivityStateWatcher* watcher) {
MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) {
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 9e8de76783..2f05792b87 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -23,6 +23,7 @@
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -69,7 +70,7 @@ namespace grpc_core {
class SubchannelCall;
-class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
+class ConnectedSubchannel : public ConnectedSubchannelInterface {
public:
struct CallArgs {
grpc_polling_entity* pollent;
@@ -96,7 +97,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_error** error);
grpc_channel_stack* channel_stack() const { return channel_stack_; }
- const grpc_channel_args* args() const { return args_; }
+ const grpc_channel_args* args() const override { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get();
}
@@ -175,35 +176,10 @@ class SubchannelCall {
// A subchannel that knows how to connect to exactly one target address. It
// provides a target for load balancing.
-//
-// Note that this is the "real" subchannel implementation, whose API is
-// different from the SubchannelInterface that is exposed to LB policy
-// implementations. The client channel provides an adaptor class
-// (SubchannelWrapper) that "converts" between the two.
class Subchannel {
public:
- class ConnectivityStateWatcherInterface
- : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
- public:
- virtual ~ConnectivityStateWatcherInterface() = default;
-
- // Will be invoked whenever the subchannel's connectivity state
- // changes. There will be only one invocation of this method on a
- // given watcher instance at any given time.
- //
- // When the state changes to READY, connected_subchannel will
- // contain a ref to the connected subchannel. When it changes from
- // READY to some other state, the implementation must release its
- // ref to the connected subchannel.
- virtual void OnConnectivityStateChange(
- grpc_connectivity_state new_state,
- RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
- GRPC_ABSTRACT;
-
- virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
-
- GRPC_ABSTRACT_BASE_CLASS
- };
+ typedef SubchannelInterface::ConnectivityStateWatcher
+ ConnectivityStateWatcher;
// The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector,
@@ -230,8 +206,6 @@ class Subchannel {
// Caller doesn't take ownership.
const char* GetTargetAddress();
- const grpc_channel_args* channel_args() const { return args_; }
-
channelz::SubchannelNode* channelz_node();
// Returns the current connectivity state of the subchannel.
@@ -251,15 +225,14 @@ class Subchannel {
// changes.
// The watcher will be destroyed either when the subchannel is
// destroyed or when CancelConnectivityStateWatch() is called.
- void WatchConnectivityState(
- grpc_connectivity_state initial_state,
- UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ void WatchConnectivityState(grpc_connectivity_state initial_state,
+ UniquePtr<char> health_check_service_name,
+ UniquePtr<ConnectivityStateWatcher> watcher);
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
void CancelConnectivityStateWatch(const char* health_check_service_name,
- ConnectivityStateWatcherInterface* watcher);
+ ConnectivityStateWatcher* watcher);
// Attempt to connect to the backend. Has no effect if already connected.
void AttemptToConnect();
@@ -284,15 +257,14 @@ class Subchannel {
grpc_resolved_address* addr);
private:
- // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
- // the subchannel's state.
+ // A linked list of ConnectivityStateWatchers that are monitoring the
+ // subchannel's state.
class ConnectivityStateWatcherList {
public:
~ConnectivityStateWatcherList() { Clear(); }
- void AddWatcherLocked(
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
- void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
+ void AddWatcherLocked(UniquePtr<ConnectivityStateWatcher> watcher);
+ void RemoveWatcherLocked(ConnectivityStateWatcher* watcher);
// Notifies all watchers in the list about a change to state.
void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
@@ -304,13 +276,12 @@ class Subchannel {
private:
// TODO(roth): This could be a set instead of a map if we had a set
// implementation.
- Map<ConnectivityStateWatcherInterface*,
- OrphanablePtr<ConnectivityStateWatcherInterface>>
+ Map<ConnectivityStateWatcher*, UniquePtr<ConnectivityStateWatcher>>
watchers_;
};
- // A map that tracks ConnectivityStateWatcherInterfaces using a particular
- // health check service name.
+ // A map that tracks ConnectivityStateWatchers using a particular health
+ // check service name.
//
// There is one entry in the map for each health check service name.
// Entries exist only as long as there are watchers using the
@@ -320,12 +291,12 @@ class Subchannel {
// state READY.
class HealthWatcherMap {
public:
- void AddWatcherLocked(
- Subchannel* subchannel, grpc_connectivity_state initial_state,
- UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ void AddWatcherLocked(Subchannel* subchannel,
+ grpc_connectivity_state initial_state,
+ UniquePtr<char> health_check_service_name,
+ UniquePtr<ConnectivityStateWatcher> watcher);
void RemoveWatcherLocked(const char* health_check_service_name,
- ConnectivityStateWatcherInterface* watcher);
+ ConnectivityStateWatcher* watcher);
// Notifies the watcher when the subchannel's state changes.
void NotifyLocked(grpc_connectivity_state state);
diff --git a/src/core/ext/filters/client_channel/subchannel_interface.h b/src/core/ext/filters/client_channel/subchannel_interface.h
index 2e448dc5a6..10b1bf124c 100644
--- a/src/core/ext/filters/client_channel/subchannel_interface.h
+++ b/src/core/ext/filters/client_channel/subchannel_interface.h
@@ -21,22 +21,42 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
namespace grpc_core {
-// The interface for subchannels that is exposed to LB policy implementations.
+// TODO(roth): In a subsequent PR, remove this from this API.
+class ConnectedSubchannelInterface
+ : public RefCounted<ConnectedSubchannelInterface> {
+ public:
+ virtual const grpc_channel_args* args() const GRPC_ABSTRACT;
+
+ protected:
+ template <typename TraceFlagT = TraceFlag>
+ explicit ConnectedSubchannelInterface(TraceFlagT* trace_flag = nullptr)
+ : RefCounted<ConnectedSubchannelInterface>(trace_flag) {}
+};
+
class SubchannelInterface : public RefCounted<SubchannelInterface> {
public:
- class ConnectivityStateWatcherInterface {
+ class ConnectivityStateWatcher {
public:
- virtual ~ConnectivityStateWatcherInterface() = default;
+ virtual ~ConnectivityStateWatcher() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
- virtual void OnConnectivityStateChange(grpc_connectivity_state new_state)
+ //
+ // When the state changes to READY, connected_subchannel will
+ // contain a ref to the connected subchannel. When it changes from
+ // READY to some other state, the implementation must release its
+ // ref to the connected subchannel.
+ virtual void OnConnectivityStateChange(
+ grpc_connectivity_state new_state,
+ RefCountedPtr<ConnectedSubchannelInterface>
+ connected_subchannel) // NOLINT
GRPC_ABSTRACT;
// TODO(roth): Remove this as soon as we move to EventManager-based
@@ -46,14 +66,12 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
GRPC_ABSTRACT_BASE_CLASS
};
- template <typename TraceFlagT = TraceFlag>
- explicit SubchannelInterface(TraceFlagT* trace_flag = nullptr)
- : RefCounted<SubchannelInterface>(trace_flag) {}
-
virtual ~SubchannelInterface() = default;
// Returns the current connectivity state of the subchannel.
- virtual grpc_connectivity_state CheckConnectivityState() GRPC_ABSTRACT;
+ virtual grpc_connectivity_state CheckConnectivityState(
+ RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
+ GRPC_ABSTRACT;
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
@@ -68,12 +86,12 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
// the previous watcher using CancelConnectivityStateWatch().
virtual void WatchConnectivityState(
grpc_connectivity_state initial_state,
- UniquePtr<ConnectivityStateWatcherInterface> watcher) GRPC_ABSTRACT;
+ UniquePtr<ConnectivityStateWatcher> watcher) GRPC_ABSTRACT;
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
- virtual void CancelConnectivityStateWatch(
- ConnectivityStateWatcherInterface* watcher) GRPC_ABSTRACT;
+ virtual void CancelConnectivityStateWatch(ConnectivityStateWatcher* watcher)
+ GRPC_ABSTRACT;
// Attempt to connect to the backend. Has no effect if already connected.
// If the subchannel is currently in backoff delay due to a previously
@@ -87,9 +105,6 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
// attempt will be started as soon as AttemptToConnect() is called.
virtual void ResetBackoff() GRPC_ABSTRACT;
- // TODO(roth): Need a better non-grpc-specific abstraction here.
- virtual const grpc_channel_args* channel_args() GRPC_ABSTRACT;
-
GRPC_ABSTRACT_BASE_CLASS
};
diff --git a/src/core/lib/gprpp/map.h b/src/core/lib/gprpp/map.h
index 566691df58..36e32d60c0 100644
--- a/src/core/lib/gprpp/map.h
+++ b/src/core/lib/gprpp/map.h
@@ -30,7 +30,6 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/pair.h"
-#include "src/core/lib/gprpp/ref_counted_ptr.h"
namespace grpc_core {
struct StringLess {
@@ -42,13 +41,6 @@ struct StringLess {
}
};
-template <typename T>
-struct RefCountedPtrLess {
- bool operator()(const RefCountedPtr<T>& p1, const RefCountedPtr<T>& p2) {
- return p1.get() < p2.get();
- }
-};
-
namespace testing {
class MapTest;
}
@@ -63,28 +55,8 @@ class Map {
typedef Compare key_compare;
class iterator;
- Map() {}
~Map() { clear(); }
- // Copying not currently supported.
- Map(const Map& other) = delete;
-
- // Move support.
- Map(Map&& other) : root_(other.root_), size_(other.size_) {
- other.root_ = nullptr;
- other.size_ = 0;
- }
- Map& operator=(Map&& other) {
- if (this != &other) {
- clear();
- root_ = other.root_;
- size_ = other.size_;
- other.root_ = nullptr;
- other.size_ = 0;
- }
- return *this;
- }
-
T& operator[](key_type&& key);
T& operator[](const key_type& key);
iterator find(const key_type& k);
diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc
index 7766ee186c..1523ced16d 100644
--- a/src/core/lib/transport/metadata.cc
+++ b/src/core/lib/transport/metadata.cc
@@ -222,12 +222,7 @@ void grpc_mdctx_global_shutdown() {
abort();
}
}
- // For ASAN builds, we don't want to crash here, because that will
- // prevent ASAN from providing leak detection information, which is
- // far more useful than this simple assertion.
-#ifndef GRPC_ASAN_ENABLED
GPR_DEBUG_ASSERT(shard->count == 0);
-#endif
gpr_free(shard->elems);
}
}
diff --git a/test/core/gprpp/map_test.cc b/test/core/gprpp/map_test.cc
index 21aeee8248..30d9eb0b20 100644
--- a/test/core/gprpp/map_test.cc
+++ b/test/core/gprpp/map_test.cc
@@ -437,35 +437,6 @@ TEST_F(MapTest, LowerBound) {
EXPECT_EQ(it, test_map.end());
}
-// Test move ctor
-TEST_F(MapTest, MoveCtor) {
- Map<const char*, Payload, StringLess> test_map;
- for (int i = 0; i < 5; i++) {
- test_map.emplace(kKeys[i], Payload(i));
- }
- Map<const char*, Payload, StringLess> test_map2 = std::move(test_map);
- for (int i = 0; i < 5; i++) {
- EXPECT_EQ(test_map.end(), test_map.find(kKeys[i]));
- EXPECT_EQ(i, test_map2.find(kKeys[i])->second.data());
- }
-}
-
-// Test move assignment
-TEST_F(MapTest, MoveAssignment) {
- Map<const char*, Payload, StringLess> test_map;
- for (int i = 0; i < 5; i++) {
- test_map.emplace(kKeys[i], Payload(i));
- }
- Map<const char*, Payload, StringLess> test_map2;
- test_map2.emplace("xxx", Payload(123));
- test_map2 = std::move(test_map);
- for (int i = 0; i < 5; i++) {
- EXPECT_EQ(test_map.end(), test_map.find(kKeys[i]));
- EXPECT_EQ(i, test_map2.find(kKeys[i])->second.data());
- }
- EXPECT_EQ(test_map2.end(), test_map2.find("xxx"));
-}
-
} // namespace testing
} // namespace grpc_core
diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc
index 041ce1f45a..2c1f988d17 100644
--- a/test/core/util/test_lb_policies.cc
+++ b/test/core/util/test_lb_policies.cc
@@ -117,7 +117,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
PickResult Pick(PickArgs args) override {
PickResult result = delegate_picker_->Pick(args);
if (result.type == PickResult::PICK_COMPLETE &&
- result.subchannel != nullptr) {
+ result.connected_subchannel != nullptr) {
new (args.call_state->Alloc(sizeof(TrailingMetadataHandler)))
TrailingMetadataHandler(&result, cb_, user_data_);
}