aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc30
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc20
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h126
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc2
5 files changed, 134 insertions, 53 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index dad10f0ce8..a87dfda732 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -575,12 +575,13 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
result = child_picker_->Pick(args);
// If pick succeeded, add LB token to initial metadata.
if (result.type == PickResult::PICK_COMPLETE &&
- result.subchannel != nullptr) {
+ result.connected_subchannel != nullptr) {
const grpc_arg* arg = grpc_channel_args_find(
- result.subchannel->channel_args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
+ result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
if (arg == nullptr) {
- gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p",
- parent_, this, result.subchannel.get());
+ gpr_log(GPR_ERROR,
+ "[grpclb %p picker %p] No LB token for connected subchannel %p",
+ parent_, this, result.connected_subchannel.get());
abort();
}
grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 8bf3d825b2..00036d8be6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -28,6 +28,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -84,8 +85,9 @@ class PickFirst : public LoadBalancingPolicy {
public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
+ grpc_combiner* combiner,
const grpc_channel_args& args)
- : SubchannelList(policy, tracer, addresses,
+ : SubchannelList(policy, tracer, addresses, combiner,
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
@@ -109,18 +111,19 @@ class PickFirst : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
- explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
- : subchannel_(std::move(subchannel)) {}
+ explicit Picker(
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
+ : connected_subchannel_(std::move(connected_subchannel)) {}
PickResult Pick(PickArgs args) override {
PickResult result;
result.type = PickResult::PICK_COMPLETE;
- result.subchannel = subchannel_;
+ result.connected_subchannel = connected_subchannel_;
return result;
}
private:
- RefCountedPtr<SubchannelInterface> subchannel_;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
void ShutdownLocked() override;
@@ -163,9 +166,6 @@ void PickFirst::ShutdownLocked() {
void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (idle_) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
- }
idle_ = false;
if (subchannel_list_ == nullptr ||
subchannel_list_->num_subchannels() == 0) {
@@ -200,7 +200,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
- this, &grpc_lb_pick_first_trace, args.addresses, *new_args);
+ this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
@@ -351,8 +351,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// some connectivity state notifications.
if (connectivity_state == GRPC_CHANNEL_READY) {
p->channel_control_helper()->UpdateState(
- GRPC_CHANNEL_READY,
- UniquePtr<SubchannelPicker>(New<Picker>(subchannel()->Ref())));
+ GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>(
+ connected_subchannel()->Ref())));
} else { // CONNECTING
p->channel_control_helper()->UpdateState(
connectivity_state, UniquePtr<SubchannelPicker>(New<QueuePicker>(
@@ -447,13 +447,13 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
- }
p->selected_ = this;
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
- UniquePtr<SubchannelPicker>(New<Picker>(subchannel()->Ref())));
+ UniquePtr<SubchannelPicker>(New<Picker>(connected_subchannel()->Ref())));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
+ }
}
void PickFirst::PickFirstSubchannelData::
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 04308ee254..c6655c7d9b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -38,6 +38,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -105,8 +106,9 @@ class RoundRobin : public LoadBalancingPolicy {
public:
RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
+ grpc_combiner* combiner,
const grpc_channel_args& args)
- : SubchannelList(policy, tracer, addresses,
+ : SubchannelList(policy, tracer, addresses, combiner,
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
@@ -153,7 +155,7 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobin* parent_;
size_t last_picked_index_;
- InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
+ InlinedVector<RefCountedPtr<ConnectedSubchannelInterface>, 10> subchannels_;
};
void ShutdownLocked() override;
@@ -178,9 +180,10 @@ RoundRobin::Picker::Picker(RoundRobin* parent,
RoundRobinSubchannelList* subchannel_list)
: parent_(parent) {
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
- RoundRobinSubchannelData* sd = subchannel_list->subchannel(i);
- if (sd->connectivity_state() == GRPC_CHANNEL_READY) {
- subchannels_.push_back(sd->subchannel()->Ref());
+ auto* connected_subchannel =
+ subchannel_list->subchannel(i)->connected_subchannel();
+ if (connected_subchannel != nullptr) {
+ subchannels_.push_back(connected_subchannel->Ref());
}
}
// For discussion on why we generate a random starting index for
@@ -201,13 +204,14 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
- "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p",
+ "[RR %p picker %p] returning index %" PRIuPTR
+ ", connected_subchannel=%p",
parent_, this, last_picked_index_,
subchannels_[last_picked_index_].get());
}
PickResult result;
result.type = PickResult::PICK_COMPLETE;
- result.subchannel = subchannels_[last_picked_index_];
+ result.connected_subchannel = subchannels_[last_picked_index_];
return result;
}
@@ -420,7 +424,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
- this, &grpc_lb_round_robin_trace, args.addresses, *args.args);
+ this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args);
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 34cd0f549f..7d70928a83 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -39,6 +39,7 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -63,7 +64,8 @@ class MySubchannelList
};
*/
-// All methods will be called from within the client_channel combiner.
+// All methods with a Locked() suffix must be called from within the
+// client_channel combiner.
namespace grpc_core {
@@ -91,13 +93,20 @@ class SubchannelData {
// Returns a pointer to the subchannel.
SubchannelInterface* subchannel() const { return subchannel_.get(); }
+ // Returns the connected subchannel. Will be null if the subchannel
+ // is not connected.
+ ConnectedSubchannelInterface* connected_subchannel() const {
+ return connected_subchannel_.get();
+ }
+
// Synchronously checks the subchannel's connectivity state.
// Must not be called while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() and
// calling CancelConnectivityWatchLocked()).
grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(pending_watcher_ == nullptr);
- connectivity_state_ = subchannel_->CheckConnectivityState();
+ connectivity_state_ =
+ subchannel()->CheckConnectivityState(&connected_subchannel_);
return connectivity_state_;
}
@@ -135,8 +144,7 @@ class SubchannelData {
private:
// Watcher for subchannel connectivity state.
- class Watcher
- : public SubchannelInterface::ConnectivityStateWatcherInterface {
+ class Watcher : public SubchannelInterface::ConnectivityStateWatcher {
public:
Watcher(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
@@ -146,13 +154,42 @@ class SubchannelData {
~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
- void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
+ void OnConnectivityStateChange(grpc_connectivity_state new_state,
+ RefCountedPtr<ConnectedSubchannelInterface>
+ connected_subchannel) override;
grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy()->interested_parties();
}
private:
+ // A fire-and-forget class that bounces into the combiner to process
+ // a connectivity state update.
+ class Updater {
+ public:
+ Updater(
+ SubchannelData<SubchannelListType, SubchannelDataType>*
+ subchannel_data,
+ RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
+ subchannel_list,
+ grpc_connectivity_state state,
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel);
+
+ ~Updater() {
+ subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor");
+ }
+
+ private:
+ static void OnUpdateLocked(void* arg, grpc_error* error);
+
+ SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
+ RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
+ subchannel_list_;
+ const grpc_connectivity_state state_;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
+ grpc_closure closure_;
+ };
+
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
RefCountedPtr<SubchannelListType> subchannel_list_;
};
@@ -165,10 +202,10 @@ class SubchannelData {
// The subchannel.
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
- SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
- nullptr;
+ SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr;
// Data updated by the watcher.
grpc_connectivity_state connectivity_state_;
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
// A list of subchannels.
@@ -195,6 +232,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// the backoff code out of subchannels and into LB policies.
void ResetBackoffLocked();
+ // Note: Caller must ensure that this is invoked inside of the combiner.
void Orphan() override {
ShutdownLocked();
InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
@@ -204,7 +242,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
protected:
SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
- const ServerAddressList& addresses,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args);
@@ -225,6 +263,8 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
TraceFlag* tracer_;
+ grpc_combiner* combiner_;
+
// The list of subchannels.
SubchannelVector subchannels_;
@@ -244,26 +284,59 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
- OnConnectivityStateChange(grpc_connectivity_state new_state) {
- if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
+ OnConnectivityStateChange(
+ grpc_connectivity_state new_state,
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel) {
+ // Will delete itself.
+ New<Updater>(subchannel_data_,
+ subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"),
+ new_state, std::move(connected_subchannel));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
+ Updater(
+ SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
+ RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
+ subchannel_list,
+ grpc_connectivity_state state,
+ RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
+ : subchannel_data_(subchannel_data),
+ subchannel_list_(std::move(subchannel_list)),
+ state_(state),
+ connected_subchannel_(std::move(connected_subchannel)) {
+ GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this,
+ grpc_combiner_scheduler(subchannel_list_->combiner_));
+ GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
+ OnUpdateLocked(void* arg, grpc_error* error) {
+ Updater* self = static_cast<Updater*>(arg);
+ SubchannelData* sd = self->subchannel_data_;
+ if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: state=%s, "
- "shutting_down=%d, pending_watcher=%p",
- subchannel_list_->tracer()->name(), subchannel_list_->policy(),
- subchannel_list_.get(), subchannel_data_->Index(),
- subchannel_list_->num_subchannels(),
- subchannel_data_->subchannel_.get(),
- grpc_connectivity_state_name(new_state),
- subchannel_list_->shutting_down(),
- subchannel_data_->pending_watcher_);
+ "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p",
+ sd->subchannel_list_->tracer()->name(),
+ sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(),
+ sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(),
+ grpc_connectivity_state_name(self->state_),
+ self->connected_subchannel_.get(),
+ sd->subchannel_list_->shutting_down(), sd->pending_watcher_);
}
- if (!subchannel_list_->shutting_down() &&
- subchannel_data_->pending_watcher_ != nullptr) {
- subchannel_data_->connectivity_state_ = new_state;
+ if (!sd->subchannel_list_->shutting_down() &&
+ sd->pending_watcher_ != nullptr) {
+ sd->connectivity_state_ = self->state_;
+ // Get or release ref to connected subchannel.
+ sd->connected_subchannel_ = std::move(self->connected_subchannel_);
// Call the subclass's ProcessConnectivityChangeLocked() method.
- subchannel_data_->ProcessConnectivityChangeLocked(new_state);
+ sd->ProcessConnectivityChangeLocked(sd->connectivity_state_);
}
+ // Clean up.
+ Delete(self);
}
//
@@ -298,6 +371,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
subchannel_.get());
}
subchannel_.reset();
+ connected_subchannel_.reset();
}
}
@@ -326,7 +400,7 @@ void SubchannelData<SubchannelListType,
New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState(
connectivity_state_,
- UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>(
+ UniquePtr<SubchannelInterface::ConnectivityStateWatcher>(
pending_watcher_));
}
@@ -360,12 +434,13 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, TraceFlag* tracer,
- const ServerAddressList& addresses,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args)
: InternallyRefCounted<SubchannelListType>(tracer),
policy_(policy),
- tracer_(tracer) {
+ tracer_(tracer),
+ combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
@@ -434,6 +509,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
policy_, this);
}
+ GRPC_COMBINER_UNREF(combiner_, "subchannel_list");
}
template <typename SubchannelListType, typename SubchannelDataType>
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 74660cec92..e790ec2552 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -570,7 +570,7 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
PickResult result = PickFromLocality(key, args);
// If pick succeeded, add client stats.
if (result.type == PickResult::PICK_COMPLETE &&
- result.subchannel != nullptr && client_stats_ != nullptr) {
+ result.connected_subchannel != nullptr && client_stats_ != nullptr) {
// TODO(roth): Add support for client stats.
}
return result;