summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc')
-rw-r--r--grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc511
1 files changed, 318 insertions, 193 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
index 8a10db52..49bff276 100644
--- a/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
+++ b/grpc/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -63,40 +63,55 @@ class CdsLb : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
+ void ExitIdleLocked() override;
private:
// Watcher for getting cluster data from XdsClient.
class ClusterWatcher : public XdsClient::ClusterWatcherInterface {
public:
- explicit ClusterWatcher(RefCountedPtr<CdsLb> parent)
- : parent_(std::move(parent)) {}
+ ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
+ : parent_(std::move(parent)), name_(std::move(name)) {}
void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override {
- new Notifier(parent_, std::move(cluster_data));
+ new Notifier(parent_, name_, std::move(cluster_data));
}
- void OnError(grpc_error* error) override { new Notifier(parent_, error); }
- void OnResourceDoesNotExist() override { new Notifier(parent_); }
+ void OnError(grpc_error_handle error) override {
+ new Notifier(parent_, name_, error);
+ }
+ void OnResourceDoesNotExist() override { new Notifier(parent_, name_); }
private:
class Notifier {
public:
- Notifier(RefCountedPtr<CdsLb> parent, XdsApi::CdsUpdate update);
- Notifier(RefCountedPtr<CdsLb> parent, grpc_error* error);
- explicit Notifier(RefCountedPtr<CdsLb> parent);
+ Notifier(RefCountedPtr<CdsLb> parent, std::string name,
+ XdsApi::CdsUpdate update);
+ Notifier(RefCountedPtr<CdsLb> parent, std::string name,
+ grpc_error_handle error);
+ explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name);
private:
enum Type { kUpdate, kError, kDoesNotExist };
- static void RunInExecCtx(void* arg, grpc_error* error);
- void RunInWorkSerializer(grpc_error* error);
+ static void RunInExecCtx(void* arg, grpc_error_handle error);
+ void RunInWorkSerializer(grpc_error_handle error);
RefCountedPtr<CdsLb> parent_;
+ std::string name_;
grpc_closure closure_;
XdsApi::CdsUpdate update_;
Type type_;
};
RefCountedPtr<CdsLb> parent_;
+ std::string name_;
+ };
+
+ struct WatcherState {
+ // Pointer to watcher, to be used when cancelling.
+ // Not owned, so do not dereference.
+ ClusterWatcher* watcher = nullptr;
+ // Most recent update obtained from this watcher.
+ absl::optional<XdsApi::CdsUpdate> update;
};
// Delegating helper to be passed to child policy.
@@ -119,12 +134,20 @@ class CdsLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
- void OnClusterChanged(XdsApi::CdsUpdate cluster_data);
- void OnError(grpc_error* error);
- void OnResourceDoesNotExist();
+ bool GenerateDiscoveryMechanismForCluster(
+ const std::string& name, Json::Array* discovery_mechanisms,
+ std::set<std::string>* clusters_needed);
+ void OnClusterChanged(const std::string& name,
+ XdsApi::CdsUpdate cluster_data);
+ void OnError(const std::string& name, grpc_error_handle error);
+ void OnResourceDoesNotExist(const std::string& name);
- grpc_error* UpdateXdsCertificateProvider(
- const XdsApi::CdsUpdate& cluster_data);
+ grpc_error_handle UpdateXdsCertificateProvider(
+ const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data);
+
+ void CancelClusterDataWatch(absl::string_view cluster_name,
+ XdsClient::ClusterWatcherInterface* watcher,
+ bool delay_unsubscription = false);
void MaybeDestroyChildPolicyLocked();
@@ -135,9 +158,10 @@ class CdsLb : public LoadBalancingPolicy {
// The xds client.
RefCountedPtr<XdsClient> xds_client_;
- // A pointer to the cluster watcher, to be used when cancelling the watch.
- // Note that this is not owned, so this pointer must never be derefernced.
- ClusterWatcher* cluster_watcher_ = nullptr;
+
+ // Maps from cluster name to the state for that cluster.
+ // The root of the tree is config_->cluster().
+ std::map<std::string, WatcherState> watchers_;
RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
@@ -155,43 +179,50 @@ class CdsLb : public LoadBalancingPolicy {
//
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
+ std::string name,
XdsApi::CdsUpdate update)
- : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) {
+ : parent_(std::move(parent)),
+ name_(std::move(name)),
+ update_(std::move(update)),
+ type_(kUpdate) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
- grpc_error* error)
- : parent_(std::move(parent)), type_(kError) {
+ std::string name,
+ grpc_error_handle error)
+ : parent_(std::move(parent)), name_(std::move(name)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
}
-CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent)
- : parent_(std::move(parent)), type_(kDoesNotExist) {
+CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
+ std::string name)
+ : parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg,
- grpc_error* error) {
+ grpc_error_handle error) {
Notifier* self = static_cast<Notifier*>(arg);
GRPC_ERROR_REF(error);
self->parent_->work_serializer()->Run(
[self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
}
-void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
+void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(
+ grpc_error_handle error) {
switch (type_) {
case kUpdate:
- parent_->OnClusterChanged(std::move(update_));
+ parent_->OnClusterChanged(name_, std::move(update_));
break;
case kError:
- parent_->OnError(error);
+ parent_->OnError(name_, error);
break;
case kDoesNotExist:
- parent_->OnResourceDoesNotExist();
+ parent_->OnResourceDoesNotExist(name_);
break;
};
delete this;
@@ -261,13 +292,15 @@ void CdsLb::ShutdownLocked() {
shutting_down_ = true;
MaybeDestroyChildPolicyLocked();
if (xds_client_ != nullptr) {
- if (cluster_watcher_ != nullptr) {
+ for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
- config_->cluster().c_str());
+ watcher.first.c_str());
}
- xds_client_->CancelClusterDataWatch(config_->cluster(), cluster_watcher_);
+ CancelClusterDataWatch(watcher.first, watcher.second.watcher,
+ /*delay_unsubscription=*/false);
}
+ watchers_.clear();
xds_client_.reset(DEBUG_LOCATION, "CdsLb");
}
grpc_channel_args_destroy(args_);
@@ -286,6 +319,10 @@ void CdsLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
+void CdsLb::ExitIdleLocked() {
+ if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
+}
+
void CdsLb::UpdateLocked(UpdateArgs args) {
// Update config.
auto old_config = std::move(config_);
@@ -301,119 +338,214 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
// If cluster name changed, cancel watcher and restart.
if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
if (old_config != nullptr) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
- gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
- old_config->cluster().c_str());
+ for (auto& watcher : watchers_) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
+ gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
+ watcher.first.c_str());
+ }
+ CancelClusterDataWatch(watcher.first, watcher.second.watcher,
+ /*delay_unsubscription=*/true);
}
- xds_client_->CancelClusterDataWatch(old_config->cluster(),
- cluster_watcher_,
- /*delay_unsubscription=*/true);
+ watchers_.clear();
}
- if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
- gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
- config_->cluster().c_str());
- }
- auto watcher = absl::make_unique<ClusterWatcher>(Ref());
- cluster_watcher_ = watcher.get();
+ auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster());
+ watchers_[config_->cluster()].watcher = watcher.get();
xds_client_->WatchClusterData(config_->cluster(), std::move(watcher));
}
}
-void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
- gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client %p: %s",
- this, xds_client_.get(), cluster_data.ToString().c_str());
+// This method will attempt to generate one or multiple entries of discovery
+// mechanism recursively:
+// For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be
+// generated cluster name, type and other data from the CdsUpdate inserted into
+// the entry and the entry appended to the array of entries.
+// Note, discovery mechanism entry can be generated if an CdsUpdate is
+// available; otherwise, just return false. For cluster type AGGREGATE,
+// recursively call the method for each child cluster.
+bool CdsLb::GenerateDiscoveryMechanismForCluster(
+ const std::string& name, Json::Array* discovery_mechanisms,
+ std::set<std::string>* clusters_needed) {
+ clusters_needed->insert(name);
+ auto& state = watchers_[name];
+ // Create a new watcher if needed.
+ if (state.watcher == nullptr) {
+ auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
+ gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
+ name.c_str());
+ }
+ state.watcher = watcher.get();
+ xds_client_->WatchClusterData(name, std::move(watcher));
+ return false;
}
- grpc_error* error = GRPC_ERROR_NONE;
- error = UpdateXdsCertificateProvider(cluster_data);
- if (error != GRPC_ERROR_NONE) {
- return OnError(error);
+ // Don't have the update we need yet.
+ if (!state.update.has_value()) return false;
+ // For AGGREGATE clusters, recursively expand to child clusters.
+ if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) {
+ bool missing_cluster = false;
+ for (const std::string& child_name :
+ state.update->prioritized_cluster_names) {
+ if (!GenerateDiscoveryMechanismForCluster(
+ child_name, discovery_mechanisms, clusters_needed)) {
+ missing_cluster = true;
+ }
+ }
+ return !missing_cluster;
+ }
+ std::string type;
+ switch (state.update->cluster_type) {
+ case XdsApi::CdsUpdate::ClusterType::EDS:
+ type = "EDS";
+ break;
+ case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS:
+ type = "LOGICAL_DNS";
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
}
- // Construct config for child policy.
- Json::Object discovery_mechanism = {
- {"clusterName", config_->cluster()},
- {"max_concurrent_requests", cluster_data.max_concurrent_requests},
- {"type", "EDS"},
+ Json::Object mechanism = {
+ {"clusterName", name},
+ {"max_concurrent_requests", state.update->max_concurrent_requests},
+ {"type", std::move(type)},
};
- if (!cluster_data.eds_service_name.empty()) {
- discovery_mechanism["edsServiceName"] = cluster_data.eds_service_name;
+ if (!state.update->eds_service_name.empty()) {
+ mechanism["edsServiceName"] = state.update->eds_service_name;
}
- if (cluster_data.lrs_load_reporting_server_name.has_value()) {
- discovery_mechanism["lrsLoadReportingServerName"] =
- cluster_data.lrs_load_reporting_server_name.value();
+ if (state.update->lrs_load_reporting_server_name.has_value()) {
+ mechanism["lrsLoadReportingServerName"] =
+ state.update->lrs_load_reporting_server_name.value();
}
- Json::Object child_config = {
- {"discoveryMechanisms",
- Json::Array{
- discovery_mechanism,
- }},
- {"localityPickingPolicy",
- Json::Array{
- Json::Object{
- {"weighted_target_experimental",
- Json::Object{
- {"targets", Json::Object()},
- }},
- },
- }},
- {"endpointPickingPolicy",
- Json::Array{
- Json::Object{
- {"round_robin", Json::Object()},
- },
- }},
- };
- Json json = Json::Array{
- Json::Object{
- {"xds_cluster_resolver_experimental", std::move(child_config)},
- },
- };
+ discovery_mechanisms->emplace_back(std::move(mechanism));
+ return true;
+}
+
+void CdsLb::OnClusterChanged(const std::string& name,
+ XdsApi::CdsUpdate cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
- std::string json_str = json.Dump(/*indent=*/1);
- gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
- json_str.c_str());
+ gpr_log(
+ GPR_INFO,
+ "[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
+ this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
}
- RefCountedPtr<LoadBalancingPolicy::Config> config =
- LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
+ // Store the update in the map if we are still interested in watching this
+ // cluster (i.e., it is not cancelled already).
+ // If we've already deleted this entry, then this is an update notification
+ // that was scheduled before the deletion, so we can just ignore it.
+ auto it = watchers_.find(name);
+ if (it == watchers_.end()) return;
+ it->second.update = cluster_data;
+ // Take care of integration with new certificate code.
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ error = UpdateXdsCertificateProvider(name, it->second.update.value());
if (error != GRPC_ERROR_NONE) {
- OnError(error);
- return;
+ return OnError(name, error);
}
- // Create child policy if not already present.
- if (child_policy_ == nullptr) {
- LoadBalancingPolicy::Args args;
- args.work_serializer = work_serializer();
- args.args = args_;
- args.channel_control_helper = absl::make_unique<Helper>(Ref());
- child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
- config->name(), std::move(args));
- if (child_policy_ == nullptr) {
- OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "failed to create child policy"));
- return;
+ // Scan the map starting from the root cluster to generate the list of
+ // discovery mechanisms. If we don't have some of the data we need (i.e., we
+ // just started up and not all watchers have returned data yet), then don't
+ // update the child policy at all.
+ Json::Array discovery_mechanisms;
+ std::set<std::string> clusters_needed;
+ if (GenerateDiscoveryMechanismForCluster(
+ config_->cluster(), &discovery_mechanisms, &clusters_needed)) {
+ // Construct config for child policy.
+ Json::Object xds_lb_policy;
+ if (cluster_data.lb_policy == "RING_HASH") {
+ std::string hash_function;
+ switch (cluster_data.hash_function) {
+ case XdsApi::CdsUpdate::HashFunction::XX_HASH:
+ hash_function = "XX_HASH";
+ break;
+ case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
+ hash_function = "MURMUR_HASH_2";
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+ xds_lb_policy["RING_HASH"] = Json::Object{
+ {"min_ring_size", cluster_data.min_ring_size},
+ {"max_ring_size", cluster_data.max_ring_size},
+ {"hash_function", hash_function},
+ };
+ } else {
+ xds_lb_policy["ROUND_ROBIN"] = Json::Object();
}
- grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
- interested_parties());
+ Json::Object child_config = {
+ {"xdsLbPolicy",
+ Json::Array{
+ xds_lb_policy,
+ }},
+ {"discoveryMechanisms", std::move(discovery_mechanisms)},
+ };
+ Json json = Json::Array{
+ Json::Object{
+ {"xds_cluster_resolver_experimental", std::move(child_config)},
+ },
+ };
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
- gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
- config->name(), child_policy_.get());
+ std::string json_str = json.Dump(/*indent=*/1);
+ gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
+ this, json_str.c_str());
}
+ RefCountedPtr<LoadBalancingPolicy::Config> config =
+ LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
+ if (error != GRPC_ERROR_NONE) {
+ OnError(name, error);
+ return;
+ }
+ // Create child policy if not already present.
+ if (child_policy_ == nullptr) {
+ LoadBalancingPolicy::Args args;
+ args.work_serializer = work_serializer();
+ args.args = args_;
+ args.channel_control_helper = absl::make_unique<Helper>(Ref());
+ child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+ config->name(), std::move(args));
+ if (child_policy_ == nullptr) {
+ OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "failed to create child policy"));
+ return;
+ }
+ grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
+ interested_parties());
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
+ gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
+ config->name(), child_policy_.get());
+ }
+ }
+ // Update child policy.
+ UpdateArgs args;
+ args.config = std::move(config);
+ if (xds_certificate_provider_ != nullptr) {
+ grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
+ args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
+ } else {
+ args.args = grpc_channel_args_copy(args_);
+ }
+ child_policy_->UpdateLocked(std::move(args));
}
- // Update child policy.
- UpdateArgs args;
- args.config = std::move(config);
- if (xds_certificate_provider_ != nullptr) {
- grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
- args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
- } else {
- args.args = grpc_channel_args_copy(args_);
+ // Remove entries in watchers_ for any clusters not in clusters_needed
+ for (auto it = watchers_.begin(); it != watchers_.end();) {
+ const std::string& cluster_name = it->first;
+ if (clusters_needed.find(cluster_name) != clusters_needed.end()) {
+ ++it;
+ continue;
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
+ gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
+ cluster_name.c_str());
+ }
+ CancelClusterDataWatch(cluster_name, it->second.watcher,
+ /*delay_unsubscription=*/false);
+ it = watchers_.erase(it);
}
- child_policy_->UpdateLocked(std::move(args));
}
-void CdsLb::OnError(grpc_error* error) {
+void CdsLb::OnError(const std::string& name, grpc_error_handle error) {
gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
- this, config_->cluster().c_str(), grpc_error_string(error));
+ this, name.c_str(), grpc_error_std_string(error).c_str());
// Go into TRANSIENT_FAILURE if we have not yet created the child
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
@@ -426,12 +558,12 @@ void CdsLb::OnError(grpc_error* error) {
}
}
-void CdsLb::OnResourceDoesNotExist() {
+void CdsLb::OnResourceDoesNotExist(const std::string& name) {
gpr_log(GPR_ERROR,
"[cdslb %p] CDS resource for %s does not exist -- reporting "
"TRANSIENT_FAILURE",
- this, config_->cluster().c_str());
- grpc_error* error =
+ this, name.c_str());
+ grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("CDS resource \"", config_->cluster(),
"\" does not exist")
@@ -443,8 +575,8 @@ void CdsLb::OnResourceDoesNotExist() {
MaybeDestroyChildPolicyLocked();
}
-grpc_error* CdsLb::UpdateXdsCertificateProvider(
- const XdsApi::CdsUpdate& cluster_data) {
+grpc_error_handle CdsLb::UpdateXdsCertificateProvider(
+ const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) {
// Early out if channel is not configured to use xds security.
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args_);
@@ -453,28 +585,28 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider(
xds_certificate_provider_ = nullptr;
return GRPC_ERROR_NONE;
}
+ if (xds_certificate_provider_ == nullptr) {
+ xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
+ }
+ // Configure root cert.
absl::string_view root_provider_instance_name =
cluster_data.common_tls_context.combined_validation_context
.validation_context_certificate_provider_instance.instance_name;
absl::string_view root_provider_cert_name =
cluster_data.common_tls_context.combined_validation_context
.validation_context_certificate_provider_instance.certificate_name;
- absl::string_view identity_provider_instance_name =
- cluster_data.common_tls_context
- .tls_certificate_certificate_provider_instance.instance_name;
- absl::string_view identity_provider_cert_name =
- cluster_data.common_tls_context
- .tls_certificate_certificate_provider_instance.certificate_name;
RefCountedPtr<XdsCertificateProvider> new_root_provider;
if (!root_provider_instance_name.empty()) {
new_root_provider =
xds_client_->certificate_provider_store()
.CreateOrGetCertificateProvider(root_provider_instance_name);
if (new_root_provider == nullptr) {
- return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat("Certificate provider instance name: \"",
- root_provider_instance_name, "\" not recognized.")
- .c_str());
+ return grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat("Certificate provider instance name: \"",
+ root_provider_instance_name, "\" not recognized.")
+ .c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
}
}
if (root_certificate_provider_ != new_root_provider) {
@@ -491,16 +623,31 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider(
}
root_certificate_provider_ = std::move(new_root_provider);
}
+ xds_certificate_provider_->UpdateRootCertNameAndDistributor(
+ cluster_name, root_provider_cert_name,
+ root_certificate_provider_ == nullptr
+ ? nullptr
+ : root_certificate_provider_->distributor());
+ // Configure identity cert.
+ absl::string_view identity_provider_instance_name =
+ cluster_data.common_tls_context
+ .tls_certificate_certificate_provider_instance.instance_name;
+ absl::string_view identity_provider_cert_name =
+ cluster_data.common_tls_context
+ .tls_certificate_certificate_provider_instance.certificate_name;
RefCountedPtr<XdsCertificateProvider> new_identity_provider;
if (!identity_provider_instance_name.empty()) {
new_identity_provider =
xds_client_->certificate_provider_store()
.CreateOrGetCertificateProvider(identity_provider_instance_name);
if (new_identity_provider == nullptr) {
- return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat("Certificate provider instance name: \"",
- identity_provider_instance_name, "\" not recognized.")
- .c_str());
+ return grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat("Certificate provider instance name: \"",
+ identity_provider_instance_name,
+ "\" not recognized.")
+ .c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
}
}
if (identity_certificate_provider_ != new_identity_provider) {
@@ -517,56 +664,34 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider(
}
identity_certificate_provider_ = std::move(new_identity_provider);
}
- const std::vector<XdsApi::StringMatcher>& match_subject_alt_names =
+ xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
+ cluster_name, identity_provider_cert_name,
+ identity_certificate_provider_ == nullptr
+ ? nullptr
+ : identity_certificate_provider_->distributor());
+ // Configure SAN matchers.
+ const std::vector<StringMatcher>& match_subject_alt_names =
cluster_data.common_tls_context.combined_validation_context
.default_validation_context.match_subject_alt_names;
- if (!root_provider_instance_name.empty() &&
- !identity_provider_instance_name.empty()) {
- // Using mTLS configuration
- if (xds_certificate_provider_ != nullptr &&
- xds_certificate_provider_->ProvidesRootCerts() &&
- xds_certificate_provider_->ProvidesIdentityCerts()) {
- xds_certificate_provider_->UpdateRootCertNameAndDistributor(
- root_provider_cert_name, root_certificate_provider_->distributor());
- xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
- identity_provider_cert_name,
- identity_certificate_provider_->distributor());
- xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
- match_subject_alt_names);
- } else {
- // Existing xDS certificate provider does not have mTLS configuration.
- // Create new certificate provider so that new subchannel connectors are
- // created.
- xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>(
- root_provider_cert_name, root_certificate_provider_->distributor(),
- identity_provider_cert_name,
- identity_certificate_provider_->distributor(),
- match_subject_alt_names);
- }
- } else if (!root_provider_instance_name.empty()) {
- // Using TLS configuration
- if (xds_certificate_provider_ != nullptr &&
- xds_certificate_provider_->ProvidesRootCerts() &&
- !xds_certificate_provider_->ProvidesIdentityCerts()) {
- xds_certificate_provider_->UpdateRootCertNameAndDistributor(
- root_provider_cert_name, root_certificate_provider_->distributor());
- xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
- match_subject_alt_names);
- } else {
- // Existing xDS certificate provider does not have TLS configuration.
- // Create new certificate provider so that new subchannel connectors are
- // created.
- xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>(
- root_provider_cert_name, root_certificate_provider_->distributor(),
- "", nullptr, match_subject_alt_names);
- }
- } else {
- // No configuration provided.
- xds_certificate_provider_ = nullptr;
- }
+ xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
+ cluster_name, match_subject_alt_names);
return GRPC_ERROR_NONE;
}
+void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
+ XdsClient::ClusterWatcherInterface* watcher,
+ bool delay_unsubscription) {
+ if (xds_certificate_provider_ != nullptr) {
+ std::string name(cluster_name);
+ xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
+ nullptr);
+ xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
+ nullptr);
+ xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
+ }
+ xds_client_->CancelClusterDataWatch(cluster_name, watcher,
+ delay_unsubscription);
+}
//
// factory
//
@@ -575,13 +700,12 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
- grpc_error* error = GRPC_ERROR_NONE;
- RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
- if (error != GRPC_ERROR_NONE) {
+ RefCountedPtr<XdsClient> xds_client =
+ XdsClient::GetFromChannelArgs(*args.args);
+ if (xds_client == nullptr) {
gpr_log(GPR_ERROR,
- "cannot get XdsClient to instantiate cds LB policy: %s",
- grpc_error_string(error));
- GRPC_ERROR_UNREF(error);
+ "XdsClient not present in channel args -- cannot instantiate "
+ "cds LB policy");
return nullptr;
}
return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));
@@ -590,7 +714,7 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
const char* name() const override { return kCds; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
- const Json& json, grpc_error** error) const override {
+ const Json& json, grpc_error_handle* error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// xds was mentioned as a policy in the deprecated loadBalancingPolicy
@@ -600,7 +724,8 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
"Please use loadBalancingConfig field of service config instead.");
return nullptr;
}
- std::vector<grpc_error*> error_list;
+ std::vector<grpc_error_handle> error_list;
+ // cluster name.
std::string cluster;
auto it = json.object_value().find("cluster");
if (it == json.object_value().end()) {