diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc | 511 |
1 files changed, 318 insertions, 193 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 8a10db52..49bff276 100644 --- a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -63,40 +63,55 @@ class CdsLb : public LoadBalancingPolicy { void UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; + void ExitIdleLocked() override; private: // Watcher for getting cluster data from XdsClient. class ClusterWatcher : public XdsClient::ClusterWatcherInterface { public: - explicit ClusterWatcher(RefCountedPtr<CdsLb> parent) - : parent_(std::move(parent)) {} + ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name) + : parent_(std::move(parent)), name_(std::move(name)) {} void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override { - new Notifier(parent_, std::move(cluster_data)); + new Notifier(parent_, name_, std::move(cluster_data)); } - void OnError(grpc_error* error) override { new Notifier(parent_, error); } - void OnResourceDoesNotExist() override { new Notifier(parent_); } + void OnError(grpc_error_handle error) override { + new Notifier(parent_, name_, error); + } + void OnResourceDoesNotExist() override { new Notifier(parent_, name_); } private: class Notifier { public: - Notifier(RefCountedPtr<CdsLb> parent, XdsApi::CdsUpdate update); - Notifier(RefCountedPtr<CdsLb> parent, grpc_error* error); - explicit Notifier(RefCountedPtr<CdsLb> parent); + Notifier(RefCountedPtr<CdsLb> parent, std::string name, + XdsApi::CdsUpdate update); + Notifier(RefCountedPtr<CdsLb> parent, std::string name, + grpc_error_handle error); + explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name); private: enum Type { kUpdate, kError, kDoesNotExist }; - static void RunInExecCtx(void* arg, grpc_error* error); - void RunInWorkSerializer(grpc_error* error); + static void RunInExecCtx(void* arg, grpc_error_handle error); + void RunInWorkSerializer(grpc_error_handle error); RefCountedPtr<CdsLb> parent_; + std::string name_; grpc_closure closure_; XdsApi::CdsUpdate update_; Type type_; }; RefCountedPtr<CdsLb> parent_; + std::string name_; + }; + + struct WatcherState { + // Pointer to watcher, to be used when cancelling. + // Not owned, so do not dereference. + ClusterWatcher* watcher = nullptr; + // Most recent update obtained from this watcher. + absl::optional<XdsApi::CdsUpdate> update; }; // Delegating helper to be passed to child policy. @@ -119,12 +134,20 @@ class CdsLb : public LoadBalancingPolicy { void ShutdownLocked() override; - void OnClusterChanged(XdsApi::CdsUpdate cluster_data); - void OnError(grpc_error* error); - void OnResourceDoesNotExist(); + bool GenerateDiscoveryMechanismForCluster( + const std::string& name, Json::Array* discovery_mechanisms, + std::set<std::string>* clusters_needed); + void OnClusterChanged(const std::string& name, + XdsApi::CdsUpdate cluster_data); + void OnError(const std::string& name, grpc_error_handle error); + void OnResourceDoesNotExist(const std::string& name); - grpc_error* UpdateXdsCertificateProvider( - const XdsApi::CdsUpdate& cluster_data); + grpc_error_handle UpdateXdsCertificateProvider( + const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data); + + void CancelClusterDataWatch(absl::string_view cluster_name, + XdsClient::ClusterWatcherInterface* watcher, + bool delay_unsubscription = false); void MaybeDestroyChildPolicyLocked(); @@ -135,9 +158,10 @@ class CdsLb : public LoadBalancingPolicy { // The xds client. RefCountedPtr<XdsClient> xds_client_; - // A pointer to the cluster watcher, to be used when cancelling the watch. - // Note that this is not owned, so this pointer must never be derefernced. - ClusterWatcher* cluster_watcher_ = nullptr; + + // Maps from cluster name to the state for that cluster. + // The root of the tree is config_->cluster(). + std::map<std::string, WatcherState> watchers_; RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_; RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_; @@ -155,43 +179,50 @@ class CdsLb : public LoadBalancingPolicy { // CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent, + std::string name, XdsApi::CdsUpdate update) - : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) { + : parent_(std::move(parent)), + name_(std::move(name)), + update_(std::move(update)), + type_(kUpdate) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent, - grpc_error* error) - : parent_(std::move(parent)), type_(kError) { + std::string name, + grpc_error_handle error) + : parent_(std::move(parent)), name_(std::move(name)), type_(kError) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, error); } -CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent) - : parent_(std::move(parent)), type_(kDoesNotExist) { +CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent, + std::string name) + : parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg, - grpc_error* error) { + grpc_error_handle error) { Notifier* self = static_cast<Notifier*>(arg); GRPC_ERROR_REF(error); self->parent_->work_serializer()->Run( [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); } -void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) { +void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer( + grpc_error_handle error) { switch (type_) { case kUpdate: - parent_->OnClusterChanged(std::move(update_)); + parent_->OnClusterChanged(name_, std::move(update_)); break; case kError: - parent_->OnError(error); + parent_->OnError(name_, error); break; case kDoesNotExist: - parent_->OnResourceDoesNotExist(); + parent_->OnResourceDoesNotExist(name_); break; }; delete this; @@ -261,13 +292,15 @@ void CdsLb::ShutdownLocked() { shutting_down_ = true; MaybeDestroyChildPolicyLocked(); if (xds_client_ != nullptr) { - if (cluster_watcher_ != nullptr) { + for (auto& watcher : watchers_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, - config_->cluster().c_str()); + watcher.first.c_str()); } - xds_client_->CancelClusterDataWatch(config_->cluster(), cluster_watcher_); + CancelClusterDataWatch(watcher.first, watcher.second.watcher, + /*delay_unsubscription=*/false); } + watchers_.clear(); xds_client_.reset(DEBUG_LOCATION, "CdsLb"); } grpc_channel_args_destroy(args_); @@ -286,6 +319,10 @@ void CdsLb::ResetBackoffLocked() { if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } +void CdsLb::ExitIdleLocked() { + if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); +} + void CdsLb::UpdateLocked(UpdateArgs args) { // Update config. auto old_config = std::move(config_); @@ -301,119 +338,214 @@ void CdsLb::UpdateLocked(UpdateArgs args) { // If cluster name changed, cancel watcher and restart. if (old_config == nullptr || old_config->cluster() != config_->cluster()) { if (old_config != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, - old_config->cluster().c_str()); + for (auto& watcher : watchers_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, + watcher.first.c_str()); + } + CancelClusterDataWatch(watcher.first, watcher.second.watcher, + /*delay_unsubscription=*/true); } - xds_client_->CancelClusterDataWatch(old_config->cluster(), - cluster_watcher_, - /*delay_unsubscription=*/true); + watchers_.clear(); } - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, - config_->cluster().c_str()); - } - auto watcher = absl::make_unique<ClusterWatcher>(Ref()); - cluster_watcher_ = watcher.get(); + auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster()); + watchers_[config_->cluster()].watcher = watcher.get(); xds_client_->WatchClusterData(config_->cluster(), std::move(watcher)); } } -void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client %p: %s", - this, xds_client_.get(), cluster_data.ToString().c_str()); +// This method will attempt to generate one or multiple entries of discovery +// mechanism recursively: +// For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be +// generated cluster name, type and other data from the CdsUpdate inserted into +// the entry and the entry appended to the array of entries. +// Note, discovery mechanism entry can be generated if an CdsUpdate is +// available; otherwise, just return false. For cluster type AGGREGATE, +// recursively call the method for each child cluster. +bool CdsLb::GenerateDiscoveryMechanismForCluster( + const std::string& name, Json::Array* discovery_mechanisms, + std::set<std::string>* clusters_needed) { + clusters_needed->insert(name); + auto& state = watchers_[name]; + // Create a new watcher if needed. + if (state.watcher == nullptr) { + auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name); + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, + name.c_str()); + } + state.watcher = watcher.get(); + xds_client_->WatchClusterData(name, std::move(watcher)); + return false; } - grpc_error* error = GRPC_ERROR_NONE; - error = UpdateXdsCertificateProvider(cluster_data); - if (error != GRPC_ERROR_NONE) { - return OnError(error); + // Don't have the update we need yet. + if (!state.update.has_value()) return false; + // For AGGREGATE clusters, recursively expand to child clusters. + if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) { + bool missing_cluster = false; + for (const std::string& child_name : + state.update->prioritized_cluster_names) { + if (!GenerateDiscoveryMechanismForCluster( + child_name, discovery_mechanisms, clusters_needed)) { + missing_cluster = true; + } + } + return !missing_cluster; + } + std::string type; + switch (state.update->cluster_type) { + case XdsApi::CdsUpdate::ClusterType::EDS: + type = "EDS"; + break; + case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS: + type = "LOGICAL_DNS"; + break; + default: + GPR_ASSERT(0); + break; } - // Construct config for child policy. - Json::Object discovery_mechanism = { - {"clusterName", config_->cluster()}, - {"max_concurrent_requests", cluster_data.max_concurrent_requests}, - {"type", "EDS"}, + Json::Object mechanism = { + {"clusterName", name}, + {"max_concurrent_requests", state.update->max_concurrent_requests}, + {"type", std::move(type)}, }; - if (!cluster_data.eds_service_name.empty()) { - discovery_mechanism["edsServiceName"] = cluster_data.eds_service_name; + if (!state.update->eds_service_name.empty()) { + mechanism["edsServiceName"] = state.update->eds_service_name; } - if (cluster_data.lrs_load_reporting_server_name.has_value()) { - discovery_mechanism["lrsLoadReportingServerName"] = - cluster_data.lrs_load_reporting_server_name.value(); + if (state.update->lrs_load_reporting_server_name.has_value()) { + mechanism["lrsLoadReportingServerName"] = + state.update->lrs_load_reporting_server_name.value(); } - Json::Object child_config = { - {"discoveryMechanisms", - Json::Array{ - discovery_mechanism, - }}, - {"localityPickingPolicy", - Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }}, - {"endpointPickingPolicy", - Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }}, - }; - Json json = Json::Array{ - Json::Object{ - {"xds_cluster_resolver_experimental", std::move(child_config)}, - }, - }; + discovery_mechanisms->emplace_back(std::move(mechanism)); + return true; +} + +void CdsLb::OnClusterChanged(const std::string& name, + XdsApi::CdsUpdate cluster_data) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - std::string json_str = json.Dump(/*indent=*/1); - gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this, - json_str.c_str()); + gpr_log( + GPR_INFO, + "[cdslb %p] received CDS update for cluster %s from xds client %p: %s", + this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str()); } - RefCountedPtr<LoadBalancingPolicy::Config> config = - LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + // Store the update in the map if we are still interested in watching this + // cluster (i.e., it is not cancelled already). + // If we've already deleted this entry, then this is an update notification + // that was scheduled before the deletion, so we can just ignore it. + auto it = watchers_.find(name); + if (it == watchers_.end()) return; + it->second.update = cluster_data; + // Take care of integration with new certificate code. + grpc_error_handle error = GRPC_ERROR_NONE; + error = UpdateXdsCertificateProvider(name, it->second.update.value()); if (error != GRPC_ERROR_NONE) { - OnError(error); - return; + return OnError(name, error); } - // Create child policy if not already present. - if (child_policy_ == nullptr) { - LoadBalancingPolicy::Args args; - args.work_serializer = work_serializer(); - args.args = args_; - args.channel_control_helper = absl::make_unique<Helper>(Ref()); - child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - config->name(), std::move(args)); - if (child_policy_ == nullptr) { - OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "failed to create child policy")); - return; + // Scan the map starting from the root cluster to generate the list of + // discovery mechanisms. If we don't have some of the data we need (i.e., we + // just started up and not all watchers have returned data yet), then don't + // update the child policy at all. + Json::Array discovery_mechanisms; + std::set<std::string> clusters_needed; + if (GenerateDiscoveryMechanismForCluster( + config_->cluster(), &discovery_mechanisms, &clusters_needed)) { + // Construct config for child policy. + Json::Object xds_lb_policy; + if (cluster_data.lb_policy == "RING_HASH") { + std::string hash_function; + switch (cluster_data.hash_function) { + case XdsApi::CdsUpdate::HashFunction::XX_HASH: + hash_function = "XX_HASH"; + break; + case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2: + hash_function = "MURMUR_HASH_2"; + break; + default: + GPR_ASSERT(0); + break; + } + xds_lb_policy["RING_HASH"] = Json::Object{ + {"min_ring_size", cluster_data.min_ring_size}, + {"max_ring_size", cluster_data.max_ring_size}, + {"hash_function", hash_function}, + }; + } else { + xds_lb_policy["ROUND_ROBIN"] = Json::Object(); } - grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), - interested_parties()); + Json::Object child_config = { + {"xdsLbPolicy", + Json::Array{ + xds_lb_policy, + }}, + {"discoveryMechanisms", std::move(discovery_mechanisms)}, + }; + Json json = Json::Array{ + Json::Object{ + {"xds_cluster_resolver_experimental", std::move(child_config)}, + }, + }; if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this, - config->name(), child_policy_.get()); + std::string json_str = json.Dump(/*indent=*/1); + gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", + this, json_str.c_str()); } + RefCountedPtr<LoadBalancingPolicy::Config> config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + if (error != GRPC_ERROR_NONE) { + OnError(name, error); + return; + } + // Create child policy if not already present. + if (child_policy_ == nullptr) { + LoadBalancingPolicy::Args args; + args.work_serializer = work_serializer(); + args.args = args_; + args.channel_control_helper = absl::make_unique<Helper>(Ref()); + child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + config->name(), std::move(args)); + if (child_policy_ == nullptr) { + OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "failed to create child policy")); + return; + } + grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), + interested_parties()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this, + config->name(), child_policy_.get()); + } + } + // Update child policy. + UpdateArgs args; + args.config = std::move(config); + if (xds_certificate_provider_ != nullptr) { + grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg(); + args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1); + } else { + args.args = grpc_channel_args_copy(args_); + } + child_policy_->UpdateLocked(std::move(args)); } - // Update child policy. - UpdateArgs args; - args.config = std::move(config); - if (xds_certificate_provider_ != nullptr) { - grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg(); - args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1); - } else { - args.args = grpc_channel_args_copy(args_); + // Remove entries in watchers_ for any clusters not in clusters_needed + for (auto it = watchers_.begin(); it != watchers_.end();) { + const std::string& cluster_name = it->first; + if (clusters_needed.find(cluster_name) != clusters_needed.end()) { + ++it; + continue; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, + cluster_name.c_str()); + } + CancelClusterDataWatch(cluster_name, it->second.watcher, + /*delay_unsubscription=*/false); + it = watchers_.erase(it); } - child_policy_->UpdateLocked(std::move(args)); } -void CdsLb::OnError(grpc_error* error) { +void CdsLb::OnError(const std::string& name, grpc_error_handle error) { gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s", - this, config_->cluster().c_str(), grpc_error_string(error)); + this, name.c_str(), grpc_error_std_string(error).c_str()); // Go into TRANSIENT_FAILURE if we have not yet created the child // policy (i.e., we have not yet received data from xds). Otherwise, // we keep running with the data we had previously. @@ -426,12 +558,12 @@ void CdsLb::OnError(grpc_error* error) { } } -void CdsLb::OnResourceDoesNotExist() { +void CdsLb::OnResourceDoesNotExist(const std::string& name) { gpr_log(GPR_ERROR, "[cdslb %p] CDS resource for %s does not exist -- reporting " "TRANSIENT_FAILURE", - this, config_->cluster().c_str()); - grpc_error* error = + this, name.c_str()); + grpc_error_handle error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("CDS resource \"", config_->cluster(), "\" does not exist") @@ -443,8 +575,8 @@ void CdsLb::OnResourceDoesNotExist() { MaybeDestroyChildPolicyLocked(); } -grpc_error* CdsLb::UpdateXdsCertificateProvider( - const XdsApi::CdsUpdate& cluster_data) { +grpc_error_handle CdsLb::UpdateXdsCertificateProvider( + const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) { // Early out if channel is not configured to use xds security. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args_); @@ -453,28 +585,28 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider( xds_certificate_provider_ = nullptr; return GRPC_ERROR_NONE; } + if (xds_certificate_provider_ == nullptr) { + xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>(); + } + // Configure root cert. absl::string_view root_provider_instance_name = cluster_data.common_tls_context.combined_validation_context .validation_context_certificate_provider_instance.instance_name; absl::string_view root_provider_cert_name = cluster_data.common_tls_context.combined_validation_context .validation_context_certificate_provider_instance.certificate_name; - absl::string_view identity_provider_instance_name = - cluster_data.common_tls_context - .tls_certificate_certificate_provider_instance.instance_name; - absl::string_view identity_provider_cert_name = - cluster_data.common_tls_context - .tls_certificate_certificate_provider_instance.certificate_name; RefCountedPtr<XdsCertificateProvider> new_root_provider; if (!root_provider_instance_name.empty()) { new_root_provider = xds_client_->certificate_provider_store() .CreateOrGetCertificateProvider(root_provider_instance_name); if (new_root_provider == nullptr) { - return GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("Certificate provider instance name: \"", - root_provider_instance_name, "\" not recognized.") - .c_str()); + return grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Certificate provider instance name: \"", + root_provider_instance_name, "\" not recognized.") + .c_str()), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); } } if (root_certificate_provider_ != new_root_provider) { @@ -491,16 +623,31 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider( } root_certificate_provider_ = std::move(new_root_provider); } + xds_certificate_provider_->UpdateRootCertNameAndDistributor( + cluster_name, root_provider_cert_name, + root_certificate_provider_ == nullptr + ? nullptr + : root_certificate_provider_->distributor()); + // Configure identity cert. + absl::string_view identity_provider_instance_name = + cluster_data.common_tls_context + .tls_certificate_certificate_provider_instance.instance_name; + absl::string_view identity_provider_cert_name = + cluster_data.common_tls_context + .tls_certificate_certificate_provider_instance.certificate_name; RefCountedPtr<XdsCertificateProvider> new_identity_provider; if (!identity_provider_instance_name.empty()) { new_identity_provider = xds_client_->certificate_provider_store() .CreateOrGetCertificateProvider(identity_provider_instance_name); if (new_identity_provider == nullptr) { - return GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("Certificate provider instance name: \"", - identity_provider_instance_name, "\" not recognized.") - .c_str()); + return grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Certificate provider instance name: \"", + identity_provider_instance_name, + "\" not recognized.") + .c_str()), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); } } if (identity_certificate_provider_ != new_identity_provider) { @@ -517,56 +664,34 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider( } identity_certificate_provider_ = std::move(new_identity_provider); } - const std::vector<XdsApi::StringMatcher>& match_subject_alt_names = + xds_certificate_provider_->UpdateIdentityCertNameAndDistributor( + cluster_name, identity_provider_cert_name, + identity_certificate_provider_ == nullptr + ? nullptr + : identity_certificate_provider_->distributor()); + // Configure SAN matchers. + const std::vector<StringMatcher>& match_subject_alt_names = cluster_data.common_tls_context.combined_validation_context .default_validation_context.match_subject_alt_names; - if (!root_provider_instance_name.empty() && - !identity_provider_instance_name.empty()) { - // Using mTLS configuration - if (xds_certificate_provider_ != nullptr && - xds_certificate_provider_->ProvidesRootCerts() && - xds_certificate_provider_->ProvidesIdentityCerts()) { - xds_certificate_provider_->UpdateRootCertNameAndDistributor( - root_provider_cert_name, root_certificate_provider_->distributor()); - xds_certificate_provider_->UpdateIdentityCertNameAndDistributor( - identity_provider_cert_name, - identity_certificate_provider_->distributor()); - xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers( - match_subject_alt_names); - } else { - // Existing xDS certificate provider does not have mTLS configuration. - // Create new certificate provider so that new subchannel connectors are - // created. - xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>( - root_provider_cert_name, root_certificate_provider_->distributor(), - identity_provider_cert_name, - identity_certificate_provider_->distributor(), - match_subject_alt_names); - } - } else if (!root_provider_instance_name.empty()) { - // Using TLS configuration - if (xds_certificate_provider_ != nullptr && - xds_certificate_provider_->ProvidesRootCerts() && - !xds_certificate_provider_->ProvidesIdentityCerts()) { - xds_certificate_provider_->UpdateRootCertNameAndDistributor( - root_provider_cert_name, root_certificate_provider_->distributor()); - xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers( - match_subject_alt_names); - } else { - // Existing xDS certificate provider does not have TLS configuration. - // Create new certificate provider so that new subchannel connectors are - // created. - xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>( - root_provider_cert_name, root_certificate_provider_->distributor(), - "", nullptr, match_subject_alt_names); - } - } else { - // No configuration provided. - xds_certificate_provider_ = nullptr; - } + xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers( + cluster_name, match_subject_alt_names); return GRPC_ERROR_NONE; } +void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name, + XdsClient::ClusterWatcherInterface* watcher, + bool delay_unsubscription) { + if (xds_certificate_provider_ != nullptr) { + std::string name(cluster_name); + xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "", + nullptr); + xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "", + nullptr); + xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {}); + } + xds_client_->CancelClusterDataWatch(cluster_name, watcher, + delay_unsubscription); +} // // factory // @@ -575,13 +700,12 @@ class CdsLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - grpc_error* error = GRPC_ERROR_NONE; - RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error); - if (error != GRPC_ERROR_NONE) { + RefCountedPtr<XdsClient> xds_client = + XdsClient::GetFromChannelArgs(*args.args); + if (xds_client == nullptr) { gpr_log(GPR_ERROR, - "cannot get XdsClient to instantiate cds LB policy: %s", - grpc_error_string(error)); - GRPC_ERROR_UNREF(error); + "XdsClient not present in channel args -- cannot instantiate " + "cds LB policy"); return nullptr; } return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args)); @@ -590,7 +714,7 @@ class CdsLbFactory : public LoadBalancingPolicyFactory { const char* name() const override { return kCds; } RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( - const Json& json, grpc_error** error) const override { + const Json& json, grpc_error_handle* error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { // xds was mentioned as a policy in the deprecated loadBalancingPolicy @@ -600,7 +724,8 @@ class CdsLbFactory : public LoadBalancingPolicyFactory { "Please use loadBalancingConfig field of service config instead."); return nullptr; } - std::vector<grpc_error*> error_list; + std::vector<grpc_error_handle> error_list; + // cluster name. std::string cluster; auto it = json.object_value().find("cluster"); if (it == json.object_value().end()) { |