diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc | 487 |
1 files changed, 316 insertions, 171 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 0133a713..bcb81945 100644 --- a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -29,8 +29,10 @@ #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/xds/xds_channel_args.h" @@ -80,39 +82,33 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config { }; XdsClusterResolverLbConfig( - std::vector<DiscoveryMechanism> discovery_mechanisms, - Json locality_picking_policy, Json endpoint_picking_policy) + std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy) : discovery_mechanisms_(std::move(discovery_mechanisms)), - locality_picking_policy_(std::move(locality_picking_policy)), - endpoint_picking_policy_(std::move(endpoint_picking_policy)) {} + xds_lb_policy_(std::move(xds_lb_policy)) {} const char* name() const override { return kXdsClusterResolver; } - const std::vector<DiscoveryMechanism>& discovery_mechanisms() const { return discovery_mechanisms_; } - const Json& locality_picking_policy() const { - return locality_picking_policy_; - } - const Json& endpoint_picking_policy() const { - return endpoint_picking_policy_; - } + + const Json& xds_lb_policy() const { return xds_lb_policy_; } private: std::vector<DiscoveryMechanism> discovery_mechanisms_; - Json locality_picking_policy_; - Json endpoint_picking_policy_; + Json xds_lb_policy_; }; // Xds Cluster Resolver LB policy. class XdsClusterResolverLb : public LoadBalancingPolicy { public: - XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args); + XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args, + std::string server_name, bool is_xds_uri); const char* name() const override { return kXdsClusterResolver; } void UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; + void ExitIdleLocked() override; private: // Discovery Mechanism Base class @@ -132,9 +128,11 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {} virtual void Start() = 0; void Orphan() override = 0; + virtual Json::Array override_child_policy() = 0; + virtual bool disable_reresolution() = 0; // Caller must ensure that config_ is set before calling. - const absl::string_view GetXdsClusterResolverResourceName() const { + absl::string_view GetXdsClusterResolverResourceName() const { if (!parent_->is_xds_uri_) return parent_->server_name_; if (!parent_->config_->discovery_mechanisms()[index_] .eds_service_name.empty()) { @@ -172,6 +170,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {} void Start() override; void Orphan() override; + Json::Array override_child_policy() override { return Json::Array{}; } + bool disable_reresolution() override { return true; } private: class EndpointWatcher : public XdsClient::EndpointWatcherInterface { @@ -185,7 +185,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { void OnEndpointChanged(XdsApi::EdsUpdate update) override { new Notifier(discovery_mechanism_, std::move(update)); } - void OnError(grpc_error* error) override { + void OnError(grpc_error_handle error) override { new Notifier(discovery_mechanism_, error); } void OnResourceDoesNotExist() override { @@ -198,7 +198,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism, XdsApi::EdsUpdate update); Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism, - grpc_error* error); + grpc_error_handle error); explicit Notifier( RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism); ~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); } @@ -206,8 +206,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { private: enum Type { kUpdate, kError, kDoesNotExist }; - static void RunInExecCtx(void* arg, grpc_error* error); - void RunInWorkSerializer(grpc_error* error); + static void RunInExecCtx(void* arg, grpc_error_handle error); + void RunInWorkSerializer(grpc_error_handle error); RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_; grpc_closure closure_; @@ -230,6 +230,14 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {} void Start() override; void Orphan() override; + Json::Array override_child_policy() override { + return Json::Array{ + Json::Object{ + {"pick_first", Json::Object()}, + }, + }; + } + bool disable_reresolution() override { return false; }; private: class ResolverResultHandler : public Resolver::ResultHandler { @@ -242,7 +250,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { void ReturnResult(Resolver::Result result) override; - void ReturnError(grpc_error* error) override; + void ReturnError(grpc_error_handle error) override; private: RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_; @@ -296,7 +304,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { void ShutdownLocked() override; void OnEndpointChanged(size_t index, XdsApi::EdsUpdate update); - void OnError(size_t index, grpc_error* error); + void OnError(size_t index, grpc_error_handle error); void OnResourceDoesNotExist(size_t index); void MaybeDestroyChildPolicyLocked(); @@ -310,6 +318,9 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { grpc_channel_args* CreateChildPolicyArgsLocked( const grpc_channel_args* args_in); + // The xds client and endpoint watcher. + RefCountedPtr<XdsClient> xds_client_; + // Server name from target URI. std::string server_name_; bool is_xds_uri_; @@ -321,9 +332,6 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { // Internal state. bool shutting_down_ = false; - // The xds client and endpoint watcher. - RefCountedPtr<XdsClient> xds_client_; - // Vector of discovery mechansism entries in priority order. std::vector<DiscoveryMechanismEntry> discovery_mechanisms_; @@ -422,7 +430,7 @@ XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism, - grpc_error* error) + grpc_error_handle error) : discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, error); @@ -438,7 +446,7 @@ XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: } void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - RunInExecCtx(void* arg, grpc_error* error) { + RunInExecCtx(void* arg, grpc_error_handle error) { Notifier* self = static_cast<Notifier*>(arg); GRPC_ERROR_REF(error); self->discovery_mechanism_->parent()->work_serializer()->Run( @@ -446,7 +454,7 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: } void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - RunInWorkSerializer(grpc_error* error) { + RunInWorkSerializer(grpc_error_handle error) { switch (type_) { case kUpdate: discovery_mechanism_->parent()->OnEndpointChanged( @@ -469,11 +477,26 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: // void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() { + std::string target = parent()->server_name_; + grpc_channel_args* args = nullptr; + FakeResolverResponseGenerator* fake_resolver_response_generator = + grpc_channel_args_find_pointer<FakeResolverResponseGenerator>( + parent()->args_, + GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR); + if (fake_resolver_response_generator != nullptr) { + target = absl::StrCat("fake:", target); + grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg( + fake_resolver_response_generator); + args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1); + } else { + args = grpc_channel_args_copy(parent()->args_); + } resolver_ = ResolverRegistry::CreateResolver( - parent()->server_name_.c_str(), parent()->args_, - grpc_pollset_set_create(), parent()->work_serializer(), + target.c_str(), args, parent()->interested_parties(), + parent()->work_serializer(), absl::make_unique<ResolverResultHandler>( Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"))); + grpc_channel_args_destroy(args); if (resolver_ == nullptr) { parent()->OnResourceDoesNotExist(index()); return; @@ -509,15 +532,17 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: XdsApi::EdsUpdate update; XdsApi::EdsUpdate::Priority::Locality locality; locality.name = MakeRefCounted<XdsLocalityName>("", "", ""); + locality.lb_weight = 1; locality.endpoints = std::move(result.addresses); - update.priorities[0].localities.emplace(locality.name.get(), - std::move(locality)); + XdsApi::EdsUpdate::Priority priority; + priority.localities.emplace(locality.name.get(), std::move(locality)); + update.priorities.emplace_back(std::move(priority)); discovery_mechanism_->parent()->OnEndpointChanged( discovery_mechanism_->index(), std::move(update)); } void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: - ReturnError(grpc_error* error) { + ReturnError(grpc_error_handle error) { discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error); } @@ -526,26 +551,17 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: // XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, - Args args) - : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { + Args args, std::string server_name, + bool is_xds_uri) + : LoadBalancingPolicy(std::move(args)), + xds_client_(std::move(xds_client)), + server_name_(std::move(server_name)), + is_xds_uri_(is_xds_uri) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { gpr_log(GPR_INFO, - "[xds_cluster_resolver_lb %p] created -- using xds client %p", this, - xds_client_.get()); - } - // Record server name. - const char* server_uri = - grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(server_uri != nullptr); - absl::StatusOr<URI> uri = URI::Parse(server_uri); - GPR_ASSERT(uri.ok() && !uri->path().empty()); - server_name_ = std::string(absl::StripPrefix(uri->path(), "/")); - is_xds_uri_ = uri->scheme() == "xds"; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { - gpr_log(GPR_INFO, - "[xds_cluster_resolver_lb %p] server name from channel " - "(is_xds_uri=%d): %s", - this, is_xds_uri_, server_name_.c_str()); + "[xds_cluster_resolver_lb %p] created -- xds_client=%p, " + "server_name=%s, is_xds_uri=%d", + this, xds_client_.get(), server_name_.c_str(), is_xds_uri_); } // EDS-only flow. if (!is_xds_uri_) { @@ -655,6 +671,10 @@ void XdsClusterResolverLb::ResetBackoffLocked() { } } +void XdsClusterResolverLb::ExitIdleLocked() { + if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); +} + void XdsClusterResolverLb::OnEndpointChanged(size_t index, XdsApi::EdsUpdate update) { if (shutting_down_) return; @@ -673,10 +693,17 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index, discovery_mechanisms_[index].pending_priority_list = std::move(update.priorities); discovery_mechanisms_[index].first_update_received = true; - if (!discovery_mechanisms_[0].first_update_received) { - // We have not yet received an update for index 0, so wait until that - // happens to create the child policy. - return; + // If any discovery mechanism has not received its first update, + // wait until that happens before creating the child policy. + // TODO(roth): If this becomes problematic in the future (e.g., a + // secondary discovery mechanism delaying us from starting up at all), + // we can consider some sort of optimization whereby we can create the + // priority policy with only a subset of its children. But we need to + // make sure not to get into a situation where the priority policy + // will put the channel into TRANSIENT_FAILURE instead of CONNECTING + // while we're still waiting for the other discovery mechanism(s). + for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) { + if (!mechanism.first_update_received) return; } // Construct new priority list. XdsApi::EdsUpdate::PriorityList priority_list; @@ -703,11 +730,11 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index, UpdatePriorityList(std::move(priority_list)); } -void XdsClusterResolverLb::OnError(size_t index, grpc_error* error) { +void XdsClusterResolverLb::OnError(size_t index, grpc_error_handle error) { gpr_log(GPR_ERROR, "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR " xds watcher reported error: %s", - this, index, grpc_error_string(error)); + this, index, grpc_error_std_string(error).c_str()); GRPC_ERROR_UNREF(error); if (shutting_down_) return; if (!discovery_mechanisms_[index].first_update_received) { @@ -813,7 +840,11 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { MakeHierarchicalPathAttribute(hierarchical_path)) .WithAttribute(kXdsLocalityNameAttributeKey, absl::make_unique<XdsLocalityAttribute>( - locality_name->Ref()))); + locality_name->Ref())) + .WithAttribute(ServerAddressWeightAttribute:: + kServerAddressWeightAttributeKey, + absl::make_unique<ServerAddressWeightAttribute>( + locality.lb_weight))); } } } @@ -825,53 +856,76 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { Json::Object priority_children; Json::Array priority_priorities; // Setting up index to iterate through the discovery mechanisms and keeping - // track the discovery_mechanism each prioirty belongs to. + // track the discovery_mechanism each priority belongs to. size_t discovery_index = 0; // Setting up num_priorities_remaining to track the priorities in each // discovery_mechanism. size_t num_priorities_remaining_in_discovery = discovery_mechanisms_[discovery_index].num_priorities; for (size_t priority = 0; priority < priority_list_.size(); ++priority) { - // Each prioirty in the priority_list_ should correspond to a priority in a - // discovery mechanism in discovery_mechanisms_ (both in the same order). - // Keeping track of the discovery_mechanism each prioirty belongs to. - if (num_priorities_remaining_in_discovery == 0) { - ++discovery_index; - num_priorities_remaining_in_discovery = - discovery_mechanisms_[discovery_index].num_priorities; + Json child_policy; + if (!discovery_mechanisms_[discovery_index] + .discovery_mechanism->override_child_policy() + .empty()) { + child_policy = discovery_mechanisms_[discovery_index] + .discovery_mechanism->override_child_policy(); } else { - --num_priorities_remaining_in_discovery; - } - const auto& localities = priority_list_[priority].localities; - Json::Object weighted_targets; - for (const auto& p : localities) { - XdsLocalityName* locality_name = p.first; - const auto& locality = p.second; - // Construct JSON object containing locality name. - Json::Object locality_name_json; - if (!locality_name->region().empty()) { - locality_name_json["region"] = locality_name->region(); - } - if (!locality_name->zone().empty()) { - locality_name_json["zone"] = locality_name->zone(); - } - if (!locality_name->sub_zone().empty()) { - locality_name_json["subzone"] = locality_name->sub_zone(); + const auto& xds_lb_policy = config_->xds_lb_policy().object_value(); + if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) { + const auto& localities = priority_list_[priority].localities; + Json::Object weighted_targets; + for (const auto& p : localities) { + XdsLocalityName* locality_name = p.first; + const auto& locality = p.second; + // Construct JSON object containing locality name. + Json::Object locality_name_json; + if (!locality_name->region().empty()) { + locality_name_json["region"] = locality_name->region(); + } + if (!locality_name->zone().empty()) { + locality_name_json["zone"] = locality_name->zone(); + } + if (!locality_name->sub_zone().empty()) { + locality_name_json["sub_zone"] = locality_name->sub_zone(); + } + // Add weighted target entry. + weighted_targets[locality_name->AsHumanReadableString()] = + Json::Object{ + {"weight", locality.lb_weight}, + {"childPolicy", + Json::Array{ + Json::Object{ + {"round_robin", Json::Object()}, + }, + }}, + }; + } + // Construct locality-picking policy. + // Start with field from our config and add the "targets" field. + child_policy = Json::Array{ + Json::Object{ + {"weighted_target_experimental", + Json::Object{ + {"targets", Json::Object()}, + }}, + }, + }; + Json::Object& config = + *(*child_policy.mutable_array())[0].mutable_object(); + auto it = config.begin(); + GPR_ASSERT(it != config.end()); + (*it->second.mutable_object())["targets"] = std::move(weighted_targets); + } else { + auto it = xds_lb_policy.find("RING_HASH"); + GPR_ASSERT(it != xds_lb_policy.end()); + Json::Object ring_hash_experimental_policy = it->second.object_value(); + child_policy = Json::Array{ + Json::Object{ + {"ring_hash_experimental", ring_hash_experimental_policy}, + }, + }; } - // Add weighted target entry. - weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{ - {"weight", locality.lb_weight}, - {"childPolicy", config_->endpoint_picking_policy()}, - }; } - // Construct locality-picking policy. - // Start with field from our config and add the "targets" field. - Json locality_picking_config = config_->locality_picking_policy(); - Json::Object& config = - *(*locality_picking_config.mutable_array())[0].mutable_object(); - auto it = config.begin(); - GPR_ASSERT(it != config.end()); - (*it->second.mutable_object())["targets"] = std::move(weighted_targets); // Wrap it in the drop policy. Json::Array drop_categories; if (discovery_mechanisms_[discovery_index].drop_config != nullptr) { @@ -887,7 +941,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { .discovery_mechanism->GetLrsClusterKey(); Json::Object xds_cluster_impl_config = { {"clusterName", std::string(lrs_key.first)}, - {"childPolicy", std::move(locality_picking_config)}, + {"childPolicy", std::move(child_policy)}, {"dropCategories", std::move(drop_categories)}, {"maxConcurrentRequests", config_->discovery_mechanisms()[discovery_index] @@ -909,10 +963,24 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { const size_t child_number = priority_child_numbers_[priority]; std::string child_name = absl::StrCat("child", child_number); priority_priorities.emplace_back(child_name); - priority_children[child_name] = Json::Object{ + Json::Object child_config = { {"config", std::move(locality_picking_policy)}, - {"ignore_reresolution_requests", true}, }; + if (discovery_mechanisms_[discovery_index] + .discovery_mechanism->disable_reresolution()) { + child_config["ignore_reresolution_requests"] = true; + } + priority_children[child_name] = std::move(child_config); + // Each priority in the priority_list_ should correspond to a priority in a + // discovery mechanism in discovery_mechanisms_ (both in the same order). + // Keeping track of the discovery_mechanism each priority belongs to. + --num_priorities_remaining_in_discovery; + while (num_priorities_remaining_in_discovery == 0 && + discovery_index < discovery_mechanisms_.size() - 1) { + ++discovery_index; + num_priorities_remaining_in_discovery = + discovery_mechanisms_[discovery_index].num_priorities; + } } // There should be matching number of priorities in discovery_mechanisms_ and // in priority_list_; therefore at the end of looping through all the @@ -934,7 +1002,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { "[xds_cluster_resolver_lb %p] generated config for child policy: %s", this, json_str.c_str()); } - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; RefCountedPtr<LoadBalancingPolicy::Config> config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); if (error != GRPC_ERROR_NONE) { @@ -944,7 +1012,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { "[xds_cluster_resolver_lb %p] error parsing generated child policy " "config -- " "will put channel in TRANSIENT_FAILURE: %s", - this, grpc_error_string(error)); + this, grpc_error_std_string(error).c_str()); error = grpc_error_set_int( grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "xds_cluster_resolver LB policy: error " @@ -978,10 +1046,14 @@ void XdsClusterResolverLb::UpdateChildPolicyLocked() { grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked( const grpc_channel_args* args) { - // Inhibit client-side health checking, since the balancer does this for us. - grpc_arg new_arg = grpc_channel_arg_integer_create( - const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); - return grpc_channel_args_copy_and_add(args, &new_arg, 1); + absl::InlinedVector<grpc_arg, 2> new_args = { + // Inhibit client-side health checking, since the balancer does this + // for us. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), + }; + if (!is_xds_uri_) new_args.push_back(xds_client_->MakeChannelArg()); + return grpc_channel_args_copy_and_add(args, new_args.data(), new_args.size()); } OrphanablePtr<LoadBalancingPolicy> @@ -1020,24 +1092,45 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - grpc_error* error = GRPC_ERROR_NONE; - RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error); - if (error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, - "cannot get XdsClient to instantiate xds_cluster_resolver LB " - "policy: %s", - grpc_error_string(error)); - GRPC_ERROR_UNREF(error); - return nullptr; + // Find server name. + const char* server_uri = + grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(server_uri != nullptr); + absl::StatusOr<URI> uri = URI::Parse(server_uri); + GPR_ASSERT(uri.ok() && !uri->path().empty()); + absl::string_view server_name = absl::StripPrefix(uri->path(), "/"); + // Determine if it's an xds URI. + bool is_xds_uri = uri->scheme() == "xds"; + // Get XdsClient. + RefCountedPtr<XdsClient> xds_client = + XdsClient::GetFromChannelArgs(*args.args); + if (xds_client == nullptr) { + if (!is_xds_uri) { + grpc_error_handle error = GRPC_ERROR_NONE; + xds_client = XdsClient::GetOrCreate(args.args, &error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, + "cannot get or create XdsClient to instantiate " + "xds_cluster_resolver LB policy: %s", + grpc_error_std_string(error).c_str()); + GRPC_ERROR_UNREF(error); + return nullptr; + } + } else { + gpr_log(GPR_ERROR, + "XdsClient not present in channel args -- cannot instantiate " + "xds_cluster_resolver LB policy"); + return nullptr; + } } - return MakeOrphanable<XdsClusterResolverChildHandler>(std::move(xds_client), - std::move(args)); + return MakeOrphanable<XdsClusterResolverChildHandler>( + std::move(xds_client), std::move(args), server_name, is_xds_uri); } const char* name() const override { return kXdsClusterResolver; } RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( - const Json& json, grpc_error** error) const override { + const Json& json, grpc_error_handle* error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { // xds_cluster_resolver was mentioned as a policy in the deprecated @@ -1048,7 +1141,7 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { "Please use loadBalancingConfig field of service config instead."); return nullptr; } - std::vector<grpc_error*> error_list; + std::vector<grpc_error_handle> error_list; std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism> discovery_mechanisms; auto it = json.object_value().find("discoveryMechanisms"); @@ -1062,13 +1155,13 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { const Json::Array& array = it->second.array_value(); for (size_t i = 0; i < array.size(); ++i) { XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism; - std::vector<grpc_error*> discovery_mechanism_errors = + std::vector<grpc_error_handle> discovery_mechanism_errors = ParseDiscoveryMechanism(array[i], &discovery_mechanism); if (!discovery_mechanism_errors.empty()) { - grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + grpc_error_handle error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:discovery_mechanism element: ", i, " error") .c_str()); - for (grpc_error* discovery_mechanism_error : + for (grpc_error_handle discovery_mechanism_error : discovery_mechanism_errors) { error = grpc_error_add_child(error, discovery_mechanism_error); } @@ -1077,58 +1170,104 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { discovery_mechanisms.emplace_back(std::move(discovery_mechanism)); } } - // Locality-picking policy. - Json locality_picking_policy; - it = json.object_value().find("localityPickingPolicy"); - if (it == json.object_value().end()) { - locality_picking_policy = Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }; - } else { - locality_picking_policy = it->second; - } - grpc_error* parse_error = GRPC_ERROR_NONE; - if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - locality_picking_policy, &parse_error) == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "localityPickingPolicy", &parse_error, 1)); - GRPC_ERROR_UNREF(parse_error); - } - // Endpoint-picking policy. Called "childPolicy" for xds policy. - Json endpoint_picking_policy; - it = json.object_value().find("endpointPickingPolicy"); - if (it == json.object_value().end()) { - endpoint_picking_policy = Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }; - } else { - endpoint_picking_policy = it->second; - } - parse_error = GRPC_ERROR_NONE; - if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - endpoint_picking_policy, &parse_error) == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "endpointPickingPolicy", &parse_error, 1)); - GRPC_ERROR_UNREF(parse_error); - } if (discovery_mechanisms.empty()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:discovery_mechanism error:list is missing or empty")); } + Json xds_lb_policy = Json::Object{ + {"ROUND_ROBIN", Json::Object()}, + }; + it = json.object_value().find("xdsLbPolicy"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::ARRAY) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:xdsLbPolicy error:type should be array")); + } else { + const Json::Array& array = it->second.array_value(); + for (size_t i = 0; i < array.size(); ++i) { + if (array[i].type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:xdsLbPolicy error:element should be of type object")); + continue; + } + const Json::Object& policy = array[i].object_value(); + auto policy_it = policy.find("ROUND_ROBIN"); + if (policy_it != policy.end()) { + if (policy_it->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:ROUND_ROBIN error:type should be object")); + } + break; + } + policy_it = policy.find("RING_HASH"); + if (policy_it != policy.end()) { + if (policy_it->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:RING_HASH error:type should be object")); + continue; + } + // TODO(donnadionne): Move this to a method in + // ring_hash_experimental and call it here. + const Json::Object& ring_hash = policy_it->second.object_value(); + xds_lb_policy = array[i]; + size_t min_ring_size = 1024; + size_t max_ring_size = 8388608; + auto ring_hash_it = ring_hash.find("min_ring_size"); + if (ring_hash_it == ring_hash.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:min_ring_size missing")); + } else if (ring_hash_it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:min_ring_size error: should be of " + "number")); + } else { + min_ring_size = gpr_parse_nonnegative_int( + ring_hash_it->second.string_value().c_str()); + } + ring_hash_it = ring_hash.find("max_ring_size"); + if (ring_hash_it == ring_hash.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size missing")); + } else if (ring_hash_it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size error: should be of " + "number")); + } else { + max_ring_size = gpr_parse_nonnegative_int( + ring_hash_it->second.string_value().c_str()); + } + if (min_ring_size <= 0 || min_ring_size > 8388608 || + max_ring_size <= 0 || max_ring_size > 8388608 || + min_ring_size > max_ring_size) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size and or min_ring_size error: " + "values need to be in the range of 1 to 8388608 " + "and max_ring_size cannot be smaller than " + "min_ring_size")); + } + ring_hash_it = ring_hash.find("hash_function"); + if (ring_hash_it == ring_hash.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:hash_function missing")); + } else if (ring_hash_it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:hash_function error: should be a " + "string")); + } else if (ring_hash_it->second.string_value() != "XX_HASH" && + ring_hash_it->second.string_value() != "MURMUR_HASH_2") { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:hash_function error: unsupported " + "hash_function")); + } + break; + } + } + } + } // Construct config. if (error_list.empty()) { return MakeRefCounted<XdsClusterResolverLbConfig>( - std::move(discovery_mechanisms), std::move(locality_picking_policy), - std::move(endpoint_picking_policy)); + std::move(discovery_mechanisms), std::move(xds_lb_policy)); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR( "xds_cluster_resolver_experimental LB policy config", &error_list); @@ -1137,10 +1276,10 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { } private: - static std::vector<grpc_error*> ParseDiscoveryMechanism( + static std::vector<grpc_error_handle> ParseDiscoveryMechanism( const Json& json, XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) { - std::vector<grpc_error*> error_list; + std::vector<grpc_error_handle> error_list; if (json.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "value should be of type object")); @@ -1217,10 +1356,13 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { class XdsClusterResolverChildHandler : public ChildPolicyHandler { public: XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client, - Args args) + Args args, absl::string_view server_name, + bool is_xds_uri) : ChildPolicyHandler(std::move(args), &grpc_lb_xds_cluster_resolver_trace), - xds_client_(std::move(xds_client)) {} + xds_client_(std::move(xds_client)), + server_name_(server_name), + is_xds_uri_(is_xds_uri) {} bool ConfigChangeRequiresNewPolicyInstance( LoadBalancingPolicy::Config* old_config, @@ -1236,12 +1378,15 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { } OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( - const char* name, LoadBalancingPolicy::Args args) const override { - return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args)); + const char* /*name*/, LoadBalancingPolicy::Args args) const override { + return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args), + server_name_, is_xds_uri_); } private: RefCountedPtr<XdsClient> xds_client_; + std::string server_name_; + bool is_xds_uri_; }; }; |