summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc')
-rw-r--r--grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc487
1 files changed, 316 insertions, 171 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
index 0133a713..bcb81945 100644
--- a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
+++ b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
@@ -29,8 +29,10 @@
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/xds_channel_args.h"
@@ -80,39 +82,33 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
};
XdsClusterResolverLbConfig(
- std::vector<DiscoveryMechanism> discovery_mechanisms,
- Json locality_picking_policy, Json endpoint_picking_policy)
+ std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
: discovery_mechanisms_(std::move(discovery_mechanisms)),
- locality_picking_policy_(std::move(locality_picking_policy)),
- endpoint_picking_policy_(std::move(endpoint_picking_policy)) {}
+ xds_lb_policy_(std::move(xds_lb_policy)) {}
const char* name() const override { return kXdsClusterResolver; }
-
const std::vector<DiscoveryMechanism>& discovery_mechanisms() const {
return discovery_mechanisms_;
}
- const Json& locality_picking_policy() const {
- return locality_picking_policy_;
- }
- const Json& endpoint_picking_policy() const {
- return endpoint_picking_policy_;
- }
+
+ const Json& xds_lb_policy() const { return xds_lb_policy_; }
private:
std::vector<DiscoveryMechanism> discovery_mechanisms_;
- Json locality_picking_policy_;
- Json endpoint_picking_policy_;
+ Json xds_lb_policy_;
};
// Xds Cluster Resolver LB policy.
class XdsClusterResolverLb : public LoadBalancingPolicy {
public:
- XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args);
+ XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args,
+ std::string server_name, bool is_xds_uri);
const char* name() const override { return kXdsClusterResolver; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
+ void ExitIdleLocked() override;
private:
// Discovery Mechanism Base class
@@ -132,9 +128,11 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
: parent_(std::move(xds_cluster_resolver_lb)), index_(index) {}
virtual void Start() = 0;
void Orphan() override = 0;
+ virtual Json::Array override_child_policy() = 0;
+ virtual bool disable_reresolution() = 0;
// Caller must ensure that config_ is set before calling.
- const absl::string_view GetXdsClusterResolverResourceName() const {
+ absl::string_view GetXdsClusterResolverResourceName() const {
if (!parent_->is_xds_uri_) return parent_->server_name_;
if (!parent_->config_->discovery_mechanisms()[index_]
.eds_service_name.empty()) {
@@ -172,6 +170,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
: DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
void Start() override;
void Orphan() override;
+ Json::Array override_child_policy() override { return Json::Array{}; }
+ bool disable_reresolution() override { return true; }
private:
class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
@@ -185,7 +185,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
void OnEndpointChanged(XdsApi::EdsUpdate update) override {
new Notifier(discovery_mechanism_, std::move(update));
}
- void OnError(grpc_error* error) override {
+ void OnError(grpc_error_handle error) override {
new Notifier(discovery_mechanism_, error);
}
void OnResourceDoesNotExist() override {
@@ -198,7 +198,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
XdsApi::EdsUpdate update);
Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
- grpc_error* error);
+ grpc_error_handle error);
explicit Notifier(
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism);
~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); }
@@ -206,8 +206,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
private:
enum Type { kUpdate, kError, kDoesNotExist };
- static void RunInExecCtx(void* arg, grpc_error* error);
- void RunInWorkSerializer(grpc_error* error);
+ static void RunInExecCtx(void* arg, grpc_error_handle error);
+ void RunInWorkSerializer(grpc_error_handle error);
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
grpc_closure closure_;
@@ -230,6 +230,14 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
: DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
void Start() override;
void Orphan() override;
+ Json::Array override_child_policy() override {
+ return Json::Array{
+ Json::Object{
+ {"pick_first", Json::Object()},
+ },
+ };
+ }
+ bool disable_reresolution() override { return false; };
private:
class ResolverResultHandler : public Resolver::ResultHandler {
@@ -242,7 +250,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
void ReturnResult(Resolver::Result result) override;
- void ReturnError(grpc_error* error) override;
+ void ReturnError(grpc_error_handle error) override;
private:
RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_;
@@ -296,7 +304,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
void OnEndpointChanged(size_t index, XdsApi::EdsUpdate update);
- void OnError(size_t index, grpc_error* error);
+ void OnError(size_t index, grpc_error_handle error);
void OnResourceDoesNotExist(size_t index);
void MaybeDestroyChildPolicyLocked();
@@ -310,6 +318,9 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in);
+ // The xds client and endpoint watcher.
+ RefCountedPtr<XdsClient> xds_client_;
+
// Server name from target URI.
std::string server_name_;
bool is_xds_uri_;
@@ -321,9 +332,6 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
// Internal state.
bool shutting_down_ = false;
- // The xds client and endpoint watcher.
- RefCountedPtr<XdsClient> xds_client_;
-
// Vector of discovery mechansism entries in priority order.
std::vector<DiscoveryMechanismEntry> discovery_mechanisms_;
@@ -422,7 +430,7 @@ XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
discovery_mechanism,
- grpc_error* error)
+ grpc_error_handle error)
: discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
@@ -438,7 +446,7 @@ XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
}
void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
- RunInExecCtx(void* arg, grpc_error* error) {
+ RunInExecCtx(void* arg, grpc_error_handle error) {
Notifier* self = static_cast<Notifier*>(arg);
GRPC_ERROR_REF(error);
self->discovery_mechanism_->parent()->work_serializer()->Run(
@@ -446,7 +454,7 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
}
void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
- RunInWorkSerializer(grpc_error* error) {
+ RunInWorkSerializer(grpc_error_handle error) {
switch (type_) {
case kUpdate:
discovery_mechanism_->parent()->OnEndpointChanged(
@@ -469,11 +477,26 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
//
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
+ std::string target = parent()->server_name_;
+ grpc_channel_args* args = nullptr;
+ FakeResolverResponseGenerator* fake_resolver_response_generator =
+ grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
+ parent()->args_,
+ GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
+ if (fake_resolver_response_generator != nullptr) {
+ target = absl::StrCat("fake:", target);
+ grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg(
+ fake_resolver_response_generator);
+ args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1);
+ } else {
+ args = grpc_channel_args_copy(parent()->args_);
+ }
resolver_ = ResolverRegistry::CreateResolver(
- parent()->server_name_.c_str(), parent()->args_,
- grpc_pollset_set_create(), parent()->work_serializer(),
+ target.c_str(), args, parent()->interested_parties(),
+ parent()->work_serializer(),
absl::make_unique<ResolverResultHandler>(
Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism")));
+ grpc_channel_args_destroy(args);
if (resolver_ == nullptr) {
parent()->OnResourceDoesNotExist(index());
return;
@@ -509,15 +532,17 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
XdsApi::EdsUpdate update;
XdsApi::EdsUpdate::Priority::Locality locality;
locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
+ locality.lb_weight = 1;
locality.endpoints = std::move(result.addresses);
- update.priorities[0].localities.emplace(locality.name.get(),
- std::move(locality));
+ XdsApi::EdsUpdate::Priority priority;
+ priority.localities.emplace(locality.name.get(), std::move(locality));
+ update.priorities.emplace_back(std::move(priority));
discovery_mechanism_->parent()->OnEndpointChanged(
discovery_mechanism_->index(), std::move(update));
}
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
- ReturnError(grpc_error* error) {
+ ReturnError(grpc_error_handle error) {
discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error);
}
@@ -526,26 +551,17 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
//
XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
- Args args)
- : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
+ Args args, std::string server_name,
+ bool is_xds_uri)
+ : LoadBalancingPolicy(std::move(args)),
+ xds_client_(std::move(xds_client)),
+ server_name_(std::move(server_name)),
+ is_xds_uri_(is_xds_uri) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
gpr_log(GPR_INFO,
- "[xds_cluster_resolver_lb %p] created -- using xds client %p", this,
- xds_client_.get());
- }
- // Record server name.
- const char* server_uri =
- grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(server_uri != nullptr);
- absl::StatusOr<URI> uri = URI::Parse(server_uri);
- GPR_ASSERT(uri.ok() && !uri->path().empty());
- server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
- is_xds_uri_ = uri->scheme() == "xds";
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
- gpr_log(GPR_INFO,
- "[xds_cluster_resolver_lb %p] server name from channel "
- "(is_xds_uri=%d): %s",
- this, is_xds_uri_, server_name_.c_str());
+ "[xds_cluster_resolver_lb %p] created -- xds_client=%p, "
+ "server_name=%s, is_xds_uri=%d",
+ this, xds_client_.get(), server_name_.c_str(), is_xds_uri_);
}
// EDS-only flow.
if (!is_xds_uri_) {
@@ -655,6 +671,10 @@ void XdsClusterResolverLb::ResetBackoffLocked() {
}
}
+void XdsClusterResolverLb::ExitIdleLocked() {
+ if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
+}
+
void XdsClusterResolverLb::OnEndpointChanged(size_t index,
XdsApi::EdsUpdate update) {
if (shutting_down_) return;
@@ -673,10 +693,17 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
discovery_mechanisms_[index].pending_priority_list =
std::move(update.priorities);
discovery_mechanisms_[index].first_update_received = true;
- if (!discovery_mechanisms_[0].first_update_received) {
- // We have not yet received an update for index 0, so wait until that
- // happens to create the child policy.
- return;
+ // If any discovery mechanism has not received its first update,
+ // wait until that happens before creating the child policy.
+ // TODO(roth): If this becomes problematic in the future (e.g., a
+ // secondary discovery mechanism delaying us from starting up at all),
+ // we can consider some sort of optimization whereby we can create the
+ // priority policy with only a subset of its children. But we need to
+ // make sure not to get into a situation where the priority policy
+ // will put the channel into TRANSIENT_FAILURE instead of CONNECTING
+ // while we're still waiting for the other discovery mechanism(s).
+ for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
+ if (!mechanism.first_update_received) return;
}
// Construct new priority list.
XdsApi::EdsUpdate::PriorityList priority_list;
@@ -703,11 +730,11 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
UpdatePriorityList(std::move(priority_list));
}
-void XdsClusterResolverLb::OnError(size_t index, grpc_error* error) {
+void XdsClusterResolverLb::OnError(size_t index, grpc_error_handle error) {
gpr_log(GPR_ERROR,
"[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
" xds watcher reported error: %s",
- this, index, grpc_error_string(error));
+ this, index, grpc_error_std_string(error).c_str());
GRPC_ERROR_UNREF(error);
if (shutting_down_) return;
if (!discovery_mechanisms_[index].first_update_received) {
@@ -813,7 +840,11 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
MakeHierarchicalPathAttribute(hierarchical_path))
.WithAttribute(kXdsLocalityNameAttributeKey,
absl::make_unique<XdsLocalityAttribute>(
- locality_name->Ref())));
+ locality_name->Ref()))
+ .WithAttribute(ServerAddressWeightAttribute::
+ kServerAddressWeightAttributeKey,
+ absl::make_unique<ServerAddressWeightAttribute>(
+ locality.lb_weight)));
}
}
}
@@ -825,53 +856,76 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
Json::Object priority_children;
Json::Array priority_priorities;
// Setting up index to iterate through the discovery mechanisms and keeping
- // track the discovery_mechanism each prioirty belongs to.
+ // track the discovery_mechanism each priority belongs to.
size_t discovery_index = 0;
// Setting up num_priorities_remaining to track the priorities in each
// discovery_mechanism.
size_t num_priorities_remaining_in_discovery =
discovery_mechanisms_[discovery_index].num_priorities;
for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
- // Each prioirty in the priority_list_ should correspond to a priority in a
- // discovery mechanism in discovery_mechanisms_ (both in the same order).
- // Keeping track of the discovery_mechanism each prioirty belongs to.
- if (num_priorities_remaining_in_discovery == 0) {
- ++discovery_index;
- num_priorities_remaining_in_discovery =
- discovery_mechanisms_[discovery_index].num_priorities;
+ Json child_policy;
+ if (!discovery_mechanisms_[discovery_index]
+ .discovery_mechanism->override_child_policy()
+ .empty()) {
+ child_policy = discovery_mechanisms_[discovery_index]
+ .discovery_mechanism->override_child_policy();
} else {
- --num_priorities_remaining_in_discovery;
- }
- const auto& localities = priority_list_[priority].localities;
- Json::Object weighted_targets;
- for (const auto& p : localities) {
- XdsLocalityName* locality_name = p.first;
- const auto& locality = p.second;
- // Construct JSON object containing locality name.
- Json::Object locality_name_json;
- if (!locality_name->region().empty()) {
- locality_name_json["region"] = locality_name->region();
- }
- if (!locality_name->zone().empty()) {
- locality_name_json["zone"] = locality_name->zone();
- }
- if (!locality_name->sub_zone().empty()) {
- locality_name_json["subzone"] = locality_name->sub_zone();
+ const auto& xds_lb_policy = config_->xds_lb_policy().object_value();
+ if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) {
+ const auto& localities = priority_list_[priority].localities;
+ Json::Object weighted_targets;
+ for (const auto& p : localities) {
+ XdsLocalityName* locality_name = p.first;
+ const auto& locality = p.second;
+ // Construct JSON object containing locality name.
+ Json::Object locality_name_json;
+ if (!locality_name->region().empty()) {
+ locality_name_json["region"] = locality_name->region();
+ }
+ if (!locality_name->zone().empty()) {
+ locality_name_json["zone"] = locality_name->zone();
+ }
+ if (!locality_name->sub_zone().empty()) {
+ locality_name_json["sub_zone"] = locality_name->sub_zone();
+ }
+ // Add weighted target entry.
+ weighted_targets[locality_name->AsHumanReadableString()] =
+ Json::Object{
+ {"weight", locality.lb_weight},
+ {"childPolicy",
+ Json::Array{
+ Json::Object{
+ {"round_robin", Json::Object()},
+ },
+ }},
+ };
+ }
+ // Construct locality-picking policy.
+ // Start with field from our config and add the "targets" field.
+ child_policy = Json::Array{
+ Json::Object{
+ {"weighted_target_experimental",
+ Json::Object{
+ {"targets", Json::Object()},
+ }},
+ },
+ };
+ Json::Object& config =
+ *(*child_policy.mutable_array())[0].mutable_object();
+ auto it = config.begin();
+ GPR_ASSERT(it != config.end());
+ (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
+ } else {
+ auto it = xds_lb_policy.find("RING_HASH");
+ GPR_ASSERT(it != xds_lb_policy.end());
+ Json::Object ring_hash_experimental_policy = it->second.object_value();
+ child_policy = Json::Array{
+ Json::Object{
+ {"ring_hash_experimental", ring_hash_experimental_policy},
+ },
+ };
}
- // Add weighted target entry.
- weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
- {"weight", locality.lb_weight},
- {"childPolicy", config_->endpoint_picking_policy()},
- };
}
- // Construct locality-picking policy.
- // Start with field from our config and add the "targets" field.
- Json locality_picking_config = config_->locality_picking_policy();
- Json::Object& config =
- *(*locality_picking_config.mutable_array())[0].mutable_object();
- auto it = config.begin();
- GPR_ASSERT(it != config.end());
- (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
// Wrap it in the drop policy.
Json::Array drop_categories;
if (discovery_mechanisms_[discovery_index].drop_config != nullptr) {
@@ -887,7 +941,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
.discovery_mechanism->GetLrsClusterKey();
Json::Object xds_cluster_impl_config = {
{"clusterName", std::string(lrs_key.first)},
- {"childPolicy", std::move(locality_picking_config)},
+ {"childPolicy", std::move(child_policy)},
{"dropCategories", std::move(drop_categories)},
{"maxConcurrentRequests",
config_->discovery_mechanisms()[discovery_index]
@@ -909,10 +963,24 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
const size_t child_number = priority_child_numbers_[priority];
std::string child_name = absl::StrCat("child", child_number);
priority_priorities.emplace_back(child_name);
- priority_children[child_name] = Json::Object{
+ Json::Object child_config = {
{"config", std::move(locality_picking_policy)},
- {"ignore_reresolution_requests", true},
};
+ if (discovery_mechanisms_[discovery_index]
+ .discovery_mechanism->disable_reresolution()) {
+ child_config["ignore_reresolution_requests"] = true;
+ }
+ priority_children[child_name] = std::move(child_config);
+ // Each priority in the priority_list_ should correspond to a priority in a
+ // discovery mechanism in discovery_mechanisms_ (both in the same order).
+ // Keeping track of the discovery_mechanism each priority belongs to.
+ --num_priorities_remaining_in_discovery;
+ while (num_priorities_remaining_in_discovery == 0 &&
+ discovery_index < discovery_mechanisms_.size() - 1) {
+ ++discovery_index;
+ num_priorities_remaining_in_discovery =
+ discovery_mechanisms_[discovery_index].num_priorities;
+ }
}
// There should be matching number of priorities in discovery_mechanisms_ and
// in priority_list_; therefore at the end of looping through all the
@@ -934,7 +1002,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
"[xds_cluster_resolver_lb %p] generated config for child policy: %s",
this, json_str.c_str());
}
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
RefCountedPtr<LoadBalancingPolicy::Config> config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
if (error != GRPC_ERROR_NONE) {
@@ -944,7 +1012,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
"[xds_cluster_resolver_lb %p] error parsing generated child policy "
"config -- "
"will put channel in TRANSIENT_FAILURE: %s",
- this, grpc_error_string(error));
+ this, grpc_error_std_string(error).c_str());
error = grpc_error_set_int(
grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds_cluster_resolver LB policy: error "
@@ -978,10 +1046,14 @@ void XdsClusterResolverLb::UpdateChildPolicyLocked() {
grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked(
const grpc_channel_args* args) {
- // Inhibit client-side health checking, since the balancer does this for us.
- grpc_arg new_arg = grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
- return grpc_channel_args_copy_and_add(args, &new_arg, 1);
+ absl::InlinedVector<grpc_arg, 2> new_args = {
+ // Inhibit client-side health checking, since the balancer does this
+ // for us.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
+ };
+ if (!is_xds_uri_) new_args.push_back(xds_client_->MakeChannelArg());
+ return grpc_channel_args_copy_and_add(args, new_args.data(), new_args.size());
}
OrphanablePtr<LoadBalancingPolicy>
@@ -1020,24 +1092,45 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
- grpc_error* error = GRPC_ERROR_NONE;
- RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
- if (error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR,
- "cannot get XdsClient to instantiate xds_cluster_resolver LB "
- "policy: %s",
- grpc_error_string(error));
- GRPC_ERROR_UNREF(error);
- return nullptr;
+ // Find server name.
+ const char* server_uri =
+ grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(server_uri != nullptr);
+ absl::StatusOr<URI> uri = URI::Parse(server_uri);
+ GPR_ASSERT(uri.ok() && !uri->path().empty());
+ absl::string_view server_name = absl::StripPrefix(uri->path(), "/");
+ // Determine if it's an xds URI.
+ bool is_xds_uri = uri->scheme() == "xds";
+ // Get XdsClient.
+ RefCountedPtr<XdsClient> xds_client =
+ XdsClient::GetFromChannelArgs(*args.args);
+ if (xds_client == nullptr) {
+ if (!is_xds_uri) {
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ xds_client = XdsClient::GetOrCreate(args.args, &error);
+ if (error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR,
+ "cannot get or create XdsClient to instantiate "
+ "xds_cluster_resolver LB policy: %s",
+ grpc_error_std_string(error).c_str());
+ GRPC_ERROR_UNREF(error);
+ return nullptr;
+ }
+ } else {
+ gpr_log(GPR_ERROR,
+ "XdsClient not present in channel args -- cannot instantiate "
+ "xds_cluster_resolver LB policy");
+ return nullptr;
+ }
}
- return MakeOrphanable<XdsClusterResolverChildHandler>(std::move(xds_client),
- std::move(args));
+ return MakeOrphanable<XdsClusterResolverChildHandler>(
+ std::move(xds_client), std::move(args), server_name, is_xds_uri);
}
const char* name() const override { return kXdsClusterResolver; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
- const Json& json, grpc_error** error) const override {
+ const Json& json, grpc_error_handle* error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// xds_cluster_resolver was mentioned as a policy in the deprecated
@@ -1048,7 +1141,7 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
"Please use loadBalancingConfig field of service config instead.");
return nullptr;
}
- std::vector<grpc_error*> error_list;
+ std::vector<grpc_error_handle> error_list;
std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
discovery_mechanisms;
auto it = json.object_value().find("discoveryMechanisms");
@@ -1062,13 +1155,13 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism;
- std::vector<grpc_error*> discovery_mechanism_errors =
+ std::vector<grpc_error_handle> discovery_mechanism_errors =
ParseDiscoveryMechanism(array[i], &discovery_mechanism);
if (!discovery_mechanism_errors.empty()) {
- grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ grpc_error_handle error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:discovery_mechanism element: ", i, " error")
.c_str());
- for (grpc_error* discovery_mechanism_error :
+ for (grpc_error_handle discovery_mechanism_error :
discovery_mechanism_errors) {
error = grpc_error_add_child(error, discovery_mechanism_error);
}
@@ -1077,58 +1170,104 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
}
}
- // Locality-picking policy.
- Json locality_picking_policy;
- it = json.object_value().find("localityPickingPolicy");
- if (it == json.object_value().end()) {
- locality_picking_policy = Json::Array{
- Json::Object{
- {"weighted_target_experimental",
- Json::Object{
- {"targets", Json::Object()},
- }},
- },
- };
- } else {
- locality_picking_policy = it->second;
- }
- grpc_error* parse_error = GRPC_ERROR_NONE;
- if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
- locality_picking_policy, &parse_error) == nullptr) {
- GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
- error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "localityPickingPolicy", &parse_error, 1));
- GRPC_ERROR_UNREF(parse_error);
- }
- // Endpoint-picking policy. Called "childPolicy" for xds policy.
- Json endpoint_picking_policy;
- it = json.object_value().find("endpointPickingPolicy");
- if (it == json.object_value().end()) {
- endpoint_picking_policy = Json::Array{
- Json::Object{
- {"round_robin", Json::Object()},
- },
- };
- } else {
- endpoint_picking_policy = it->second;
- }
- parse_error = GRPC_ERROR_NONE;
- if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
- endpoint_picking_policy, &parse_error) == nullptr) {
- GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
- error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "endpointPickingPolicy", &parse_error, 1));
- GRPC_ERROR_UNREF(parse_error);
- }
if (discovery_mechanisms.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:discovery_mechanism error:list is missing or empty"));
}
+ Json xds_lb_policy = Json::Object{
+ {"ROUND_ROBIN", Json::Object()},
+ };
+ it = json.object_value().find("xdsLbPolicy");
+ if (it != json.object_value().end()) {
+ if (it->second.type() != Json::Type::ARRAY) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:xdsLbPolicy error:type should be array"));
+ } else {
+ const Json::Array& array = it->second.array_value();
+ for (size_t i = 0; i < array.size(); ++i) {
+ if (array[i].type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:xdsLbPolicy error:element should be of type object"));
+ continue;
+ }
+ const Json::Object& policy = array[i].object_value();
+ auto policy_it = policy.find("ROUND_ROBIN");
+ if (policy_it != policy.end()) {
+ if (policy_it->second.type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:ROUND_ROBIN error:type should be object"));
+ }
+ break;
+ }
+ policy_it = policy.find("RING_HASH");
+ if (policy_it != policy.end()) {
+ if (policy_it->second.type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:RING_HASH error:type should be object"));
+ continue;
+ }
+ // TODO(donnadionne): Move this to a method in
+ // ring_hash_experimental and call it here.
+ const Json::Object& ring_hash = policy_it->second.object_value();
+ xds_lb_policy = array[i];
+ size_t min_ring_size = 1024;
+ size_t max_ring_size = 8388608;
+ auto ring_hash_it = ring_hash.find("min_ring_size");
+ if (ring_hash_it == ring_hash.end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:min_ring_size missing"));
+ } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:min_ring_size error: should be of "
+ "number"));
+ } else {
+ min_ring_size = gpr_parse_nonnegative_int(
+ ring_hash_it->second.string_value().c_str());
+ }
+ ring_hash_it = ring_hash.find("max_ring_size");
+ if (ring_hash_it == ring_hash.end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:max_ring_size missing"));
+ } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:max_ring_size error: should be of "
+ "number"));
+ } else {
+ max_ring_size = gpr_parse_nonnegative_int(
+ ring_hash_it->second.string_value().c_str());
+ }
+ if (min_ring_size <= 0 || min_ring_size > 8388608 ||
+ max_ring_size <= 0 || max_ring_size > 8388608 ||
+ min_ring_size > max_ring_size) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:max_ring_size and or min_ring_size error: "
+ "values need to be in the range of 1 to 8388608 "
+ "and max_ring_size cannot be smaller than "
+ "min_ring_size"));
+ }
+ ring_hash_it = ring_hash.find("hash_function");
+ if (ring_hash_it == ring_hash.end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:hash_function missing"));
+ } else if (ring_hash_it->second.type() != Json::Type::STRING) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:hash_function error: should be a "
+ "string"));
+ } else if (ring_hash_it->second.string_value() != "XX_HASH" &&
+ ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:hash_function error: unsupported "
+ "hash_function"));
+ }
+ break;
+ }
+ }
+ }
+ }
// Construct config.
if (error_list.empty()) {
return MakeRefCounted<XdsClusterResolverLbConfig>(
- std::move(discovery_mechanisms), std::move(locality_picking_policy),
- std::move(endpoint_picking_policy));
+ std::move(discovery_mechanisms), std::move(xds_lb_policy));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"xds_cluster_resolver_experimental LB policy config", &error_list);
@@ -1137,10 +1276,10 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
}
private:
- static std::vector<grpc_error*> ParseDiscoveryMechanism(
+ static std::vector<grpc_error_handle> ParseDiscoveryMechanism(
const Json& json,
XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) {
- std::vector<grpc_error*> error_list;
+ std::vector<grpc_error_handle> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
@@ -1217,10 +1356,13 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
class XdsClusterResolverChildHandler : public ChildPolicyHandler {
public:
XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
- Args args)
+ Args args, absl::string_view server_name,
+ bool is_xds_uri)
: ChildPolicyHandler(std::move(args),
&grpc_lb_xds_cluster_resolver_trace),
- xds_client_(std::move(xds_client)) {}
+ xds_client_(std::move(xds_client)),
+ server_name_(server_name),
+ is_xds_uri_(is_xds_uri) {}
bool ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config,
@@ -1236,12 +1378,15 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
}
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
- const char* name, LoadBalancingPolicy::Args args) const override {
- return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args));
+ const char* /*name*/, LoadBalancingPolicy::Args args) const override {
+ return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args),
+ server_name_, is_xds_uri_);
}
private:
RefCountedPtr<XdsClient> xds_client_;
+ std::string server_name_;
+ bool is_xds_uri_;
};
};