diff options
author | Joel Galenson <jgalenson@google.com> | 2021-04-02 14:59:08 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-04-02 15:06:51 -0700 |
commit | 23c9e5ee44d2855e2bffb5ffe6dd0f3021be3ce2 (patch) | |
tree | 9ff33245fc40085284a777f947a77028b4a715f5 /grpc/src/core/ext/xds | |
parent | ac4642dab790b53d059713fb3d7ee44dbd605191 (diff) | |
download | grpcio-sys-23c9e5ee44d2855e2bffb5ffe6dd0f3021be3ce2.tar.gz |
Upgrade rust/crates/grpcio-sys to 0.8.1
Test: make
Change-Id: I333e35e7d00abaa92ea6beb00e9cf5b85840998f
Diffstat (limited to 'grpc/src/core/ext/xds')
18 files changed, 1943 insertions, 1123 deletions
diff --git a/grpc/src/core/ext/xds/certificate_provider_factory.h b/grpc/src/core/ext/xds/certificate_provider_factory.h index 244fdd9f..84c219e6 100644 --- a/grpc/src/core/ext/xds/certificate_provider_factory.h +++ b/grpc/src/core/ext/xds/certificate_provider_factory.h @@ -23,7 +23,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/json/json.h" -#include "src/core/lib/security/certificate_provider.h" +#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h" namespace grpc_core { @@ -32,13 +32,15 @@ namespace grpc_core { class CertificateProviderFactory { public: // Interface for configs for CertificateProviders. - class Config { + class Config : public RefCounted<Config> { public: - virtual ~Config() = default; + ~Config() override = default; // Name of the type of the CertificateProvider. Unique to each type of // config. virtual const char* name() const = 0; + + virtual std::string ToString() const = 0; }; virtual ~CertificateProviderFactory() = default; @@ -46,12 +48,12 @@ class CertificateProviderFactory { // Name of the plugin. virtual const char* name() const = 0; - virtual std::unique_ptr<Config> CreateCertificateProviderConfig( + virtual RefCountedPtr<Config> CreateCertificateProviderConfig( const Json& config_json, grpc_error** error) = 0; // Create a CertificateProvider instance from config. virtual RefCountedPtr<grpc_tls_certificate_provider> - CreateCertificateProvider(std::unique_ptr<Config> config) = 0; + CreateCertificateProvider(RefCountedPtr<Config> config) = 0; }; } // namespace grpc_core diff --git a/grpc/src/core/ext/xds/certificate_provider_store.cc b/grpc/src/core/ext/xds/certificate_provider_store.cc new file mode 100644 index 00000000..dd66b97a --- /dev/null +++ b/grpc/src/core/ext/xds/certificate_provider_store.cc @@ -0,0 +1,87 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/xds/certificate_provider_store.h" + +#include "src/core/ext/xds/certificate_provider_registry.h" + +namespace grpc_core { + +// If a certificate provider is created, the CertificateProviderStore +// maintains a raw pointer to the created CertificateProviderWrapper so that +// future calls to `CreateOrGetCertificateProvider()` with the same key result +// in returning a ref to this created certificate provider. This entry is +// deleted when the refcount to this provider reaches zero. +RefCountedPtr<grpc_tls_certificate_provider> +CertificateProviderStore::CreateOrGetCertificateProvider( + absl::string_view key) { + RefCountedPtr<CertificateProviderWrapper> result; + MutexLock lock(&mu_); + auto it = certificate_providers_map_.find(key); + if (it == certificate_providers_map_.end()) { + result = CreateCertificateProviderLocked(key); + if (result != nullptr) { + certificate_providers_map_.insert({result->key(), result.get()}); + } + } else { + result = it->second->RefIfNonZero(); + if (result == nullptr) { + result = CreateCertificateProviderLocked(key); + it->second = result.get(); + } + } + return result; +} + +RefCountedPtr<CertificateProviderStore::CertificateProviderWrapper> +CertificateProviderStore::CreateCertificateProviderLocked( + absl::string_view key) { + auto plugin_config_it = plugin_config_map_.find(std::string(key)); + if (plugin_config_it == plugin_config_map_.end()) { + return nullptr; + } + CertificateProviderFactory* factory = + CertificateProviderRegistry::LookupCertificateProviderFactory( + plugin_config_it->second.plugin_name); + if (factory == nullptr) { + // This should never happen since an entry is only inserted in the + // plugin_config_map_ if the corresponding factory was found when parsing + // the xDS bootstrap file. + gpr_log(GPR_ERROR, "Certificate provider factory %s not found", + plugin_config_it->second.plugin_name.c_str()); + return nullptr; + } + return MakeRefCounted<CertificateProviderWrapper>( + factory->CreateCertificateProvider(plugin_config_it->second.config), + Ref(), plugin_config_it->first); +} + +void CertificateProviderStore::ReleaseCertificateProvider( + absl::string_view key, CertificateProviderWrapper* wrapper) { + MutexLock lock(&mu_); + auto it = certificate_providers_map_.find(key); + if (it != certificate_providers_map_.end()) { + if (it->second == wrapper) { + certificate_providers_map_.erase(it); + } + } +} + +} // namespace grpc_core diff --git a/grpc/src/core/ext/xds/certificate_provider_store.h b/grpc/src/core/ext/xds/certificate_provider_store.h index c6881a9b..0954bc5e 100644 --- a/grpc/src/core/ext/xds/certificate_provider_store.h +++ b/grpc/src/core/ext/xds/certificate_provider_store.h @@ -23,26 +23,88 @@ #include <map> +#include "absl/strings/string_view.h" + +#include "src/core/ext/xds/certificate_provider_factory.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/security/certificate_provider.h" +#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h" namespace grpc_core { // Map for xDS based grpc_tls_certificate_provider instances. -class CertificateProviderStore { +class CertificateProviderStore + : public InternallyRefCounted<CertificateProviderStore> { public: - // If a provider corresponding to the config is found, a raw pointer to the - // grpc_tls_certificate_provider in the map is returned. If no provider is - // found for a key, a new provider is created. The CertificateProviderStore - // maintains a ref to the grpc_tls_certificate_provider for its entire - // lifetime. + struct PluginDefinition { + std::string plugin_name; + RefCountedPtr<CertificateProviderFactory::Config> config; + }; + + // Maps plugin instance (opaque) name to plugin defition. + typedef std::map<std::string, PluginDefinition> PluginDefinitionMap; + + explicit CertificateProviderStore(PluginDefinitionMap plugin_config_map) + : plugin_config_map_(std::move(plugin_config_map)) {} + + // If a certificate provider corresponding to the instance name \a key is + // found, a ref to the grpc_tls_certificate_provider is returned. If no + // provider is found for the key, a new provider is created from the plugin + // definition map. + // Returns nullptr on failure to get or create a new certificate provider. RefCountedPtr<grpc_tls_certificate_provider> CreateOrGetCertificateProvider( absl::string_view key); + void Orphan() override { Unref(); } + private: + // A thin wrapper around `grpc_tls_certificate_provider` which allows removing + // the entry from the CertificateProviderStore when the refcount reaches zero. + class CertificateProviderWrapper : public grpc_tls_certificate_provider { + public: + CertificateProviderWrapper( + RefCountedPtr<grpc_tls_certificate_provider> certificate_provider, + RefCountedPtr<CertificateProviderStore> store, absl::string_view key) + : certificate_provider_(std::move(certificate_provider)), + store_(std::move(store)), + key_(key) {} + + ~CertificateProviderWrapper() override { + store_->ReleaseCertificateProvider(key_, this); + } + + grpc_core::RefCountedPtr<grpc_tls_certificate_distributor> distributor() + const override { + return certificate_provider_->distributor(); + } + + grpc_pollset_set* interested_parties() const override { + return certificate_provider_->interested_parties(); + } + + absl::string_view key() const { return key_; } + + private: + RefCountedPtr<grpc_tls_certificate_provider> certificate_provider_; + RefCountedPtr<CertificateProviderStore> store_; + absl::string_view key_; + }; + + RefCountedPtr<CertificateProviderWrapper> CreateCertificateProviderLocked( + absl::string_view key); + + // Releases a previously created certificate provider from the certificate + // provider map if the value matches \a wrapper. + void ReleaseCertificateProvider(absl::string_view key, + CertificateProviderWrapper* wrapper); + + Mutex mu_; + // Map of plugin configurations + PluginDefinitionMap plugin_config_map_; // Underlying map for the providers. - std::map<std::string, RefCountedPtr<grpc_tls_certificate_provider>> map_; + std::map<absl::string_view, CertificateProviderWrapper*> + certificate_providers_map_; }; } // namespace grpc_core diff --git a/grpc/src/core/ext/xds/file_watcher_certificate_provider_factory.cc b/grpc/src/core/ext/xds/file_watcher_certificate_provider_factory.cc new file mode 100644 index 00000000..a5250eba --- /dev/null +++ b/grpc/src/core/ext/xds/file_watcher_certificate_provider_factory.cc @@ -0,0 +1,144 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/xds/file_watcher_certificate_provider_factory.h" + +#include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" + +#include "src/core/ext/xds/certificate_provider_registry.h" +#include "src/core/lib/json/json_util.h" + +namespace grpc_core { + +namespace { + +const char* kFileWatcherPlugin = "file_watcher"; + +} // namespace + +// +// FileWatcherCertificateProviderFactory::Config +// + +const char* FileWatcherCertificateProviderFactory::Config::name() const { + return kFileWatcherPlugin; +} + +std::string FileWatcherCertificateProviderFactory::Config::ToString() const { + std::vector<std::string> parts; + parts.push_back("{"); + if (!identity_cert_file_.empty()) { + parts.push_back( + absl::StrFormat("certificate_file=\"%s\", ", identity_cert_file_)); + } + if (!identity_cert_file_.empty()) { + parts.push_back( + absl::StrFormat("private_key_file=\"%s\", ", private_key_file_)); + } + if (!identity_cert_file_.empty()) { + parts.push_back( + absl::StrFormat("ca_certificate_file=\"%s\", ", root_cert_file_)); + } + parts.push_back( + absl::StrFormat("refresh_interval=%ldms}", refresh_interval_ms_)); + return absl::StrJoin(parts, ""); +} + +RefCountedPtr<FileWatcherCertificateProviderFactory::Config> +FileWatcherCertificateProviderFactory::Config::Parse(const Json& config_json, + grpc_error** error) { + auto config = MakeRefCounted<FileWatcherCertificateProviderFactory::Config>(); + if (config_json.type() != Json::Type::OBJECT) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "error:config type should be OBJECT."); + return nullptr; + } + std::vector<grpc_error*> error_list; + ParseJsonObjectField(config_json.object_value(), "certificate_file", + &config->identity_cert_file_, &error_list, false); + ParseJsonObjectField(config_json.object_value(), "private_key_file", + &config->private_key_file_, &error_list, false); + if (config->identity_cert_file_.empty() != + config->private_key_file_.empty()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "fields \"certificate_file\" and \"private_key_file\" must be both set " + "or both unset.")); + } + ParseJsonObjectField(config_json.object_value(), "ca_certificate_file", + &config->root_cert_file_, &error_list, false); + if (config->identity_cert_file_.empty() && config->root_cert_file_.empty()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "At least one of \"certificate_file\" and \"ca_certificate_file\" must " + "be specified.")); + } + if (!ParseJsonObjectFieldAsDuration( + config_json.object_value(), "refresh_interval", + &config->refresh_interval_ms_, &error_list, false)) { + config->refresh_interval_ms_ = 10 * 60 * 1000; // 10 minutes default + } + if (!error_list.empty()) { + *error = GRPC_ERROR_CREATE_FROM_VECTOR( + "Error parsing file watcher certificate provider config", &error_list); + return nullptr; + } + return config; +} + +// +// FileWatcherCertificateProviderFactory +// + +const char* FileWatcherCertificateProviderFactory::name() const { + return kFileWatcherPlugin; +} + +RefCountedPtr<CertificateProviderFactory::Config> +FileWatcherCertificateProviderFactory::CreateCertificateProviderConfig( + const Json& config_json, grpc_error** error) { + return FileWatcherCertificateProviderFactory::Config::Parse(config_json, + error); +} + +RefCountedPtr<grpc_tls_certificate_provider> +FileWatcherCertificateProviderFactory::CreateCertificateProvider( + RefCountedPtr<CertificateProviderFactory::Config> config) { + if (config->name() != name()) { + gpr_log(GPR_ERROR, "Wrong config type Actual:%s vs Expected:%s", + config->name(), name()); + return nullptr; + } + auto* file_watcher_config = + static_cast<FileWatcherCertificateProviderFactory::Config*>(config.get()); + return MakeRefCounted<FileWatcherCertificateProvider>( + file_watcher_config->private_key_file(), + file_watcher_config->identity_cert_file(), + file_watcher_config->root_cert_file(), + file_watcher_config->refresh_interval_ms() / GPR_MS_PER_SEC); +} + +void FileWatcherCertificateProviderInit() { + CertificateProviderRegistry::RegisterCertificateProviderFactory( + absl::make_unique<FileWatcherCertificateProviderFactory>()); +} + +void FileWatcherCertificateProviderShutdown() {} + +} // namespace grpc_core diff --git a/grpc/src/core/ext/xds/file_watcher_certificate_provider_factory.h b/grpc/src/core/ext/xds/file_watcher_certificate_provider_factory.h new file mode 100644 index 00000000..c5700625 --- /dev/null +++ b/grpc/src/core/ext/xds/file_watcher_certificate_provider_factory.h @@ -0,0 +1,69 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_CORE_EXT_XDS_FILE_WATCHER_CERTIFICATE_PROVIDER_FACTORY_H +#define GRPC_CORE_EXT_XDS_FILE_WATCHER_CERTIFICATE_PROVIDER_FACTORY_H + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/xds/certificate_provider_factory.h" + +namespace grpc_core { + +class FileWatcherCertificateProviderFactory + : public CertificateProviderFactory { + public: + class Config : public CertificateProviderFactory::Config { + public: + static RefCountedPtr<Config> Parse(const Json& config_json, + grpc_error** error); + + const char* name() const override; + + std::string ToString() const override; + + const std::string& identity_cert_file() const { + return identity_cert_file_; + } + + const std::string& private_key_file() const { return private_key_file_; } + + const std::string& root_cert_file() const { return root_cert_file_; } + + grpc_millis refresh_interval_ms() const { return refresh_interval_ms_; } + + private: + std::string identity_cert_file_; + std::string private_key_file_; + std::string root_cert_file_; + grpc_millis refresh_interval_ms_; + }; + + const char* name() const override; + + RefCountedPtr<CertificateProviderFactory::Config> + CreateCertificateProviderConfig(const Json& config_json, + grpc_error** error) override; + + RefCountedPtr<grpc_tls_certificate_provider> CreateCertificateProvider( + RefCountedPtr<CertificateProviderFactory::Config> config) override; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_EXT_XDS_FILE_WATCHER_CERTIFICATE_PROVIDER_FACTORY_H diff --git a/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.cc b/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.cc index 8e1f7b5a..c1b7b84a 100644 --- a/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.cc +++ b/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.cc @@ -37,123 +37,6 @@ namespace { const char* kMeshCaPlugin = "meshCA"; -// -// Helper functions for extracting types from JSON -// -template <typename NumericType, typename ErrorVectorType> -bool ExtractJsonType(const Json& json, const std::string& field_name, - NumericType* output, ErrorVectorType* error_list) { - static_assert(std::is_integral<NumericType>::value, "Integral required"); - if (json.type() != Json::Type::NUMBER) { - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:type should be NUMBER") - .c_str())); - return false; - } - std::istringstream ss(json.string_value()); - ss >> *output; - // The JSON parsing API should have dealt with parsing errors, but check - // anyway - if (GPR_UNLIKELY(ss.bad())) { - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:failed to parse.").c_str())); - return false; - } - return true; -} - -template <typename ErrorVectorType> -bool ExtractJsonType(const Json& json, const std::string& field_name, - bool* output, ErrorVectorType* error_list) { - switch (json.type()) { - case Json::Type::JSON_TRUE: - *output = true; - return true; - case Json::Type::JSON_FALSE: - *output = false; - return true; - default: - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:type should be BOOLEAN") - .c_str())); - return false; - } -} - -template <typename ErrorVectorType> -bool ExtractJsonType(const Json& json, const std::string& field_name, - std::string* output, ErrorVectorType* error_list) { - if (json.type() != Json::Type::STRING) { - *output = ""; - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:type should be STRING") - .c_str())); - return false; - } - *output = json.string_value(); - return true; -} - -template <typename ErrorVectorType> -bool ExtractJsonType(const Json& json, const std::string& field_name, - const Json::Array** output, ErrorVectorType* error_list) { - if (json.type() != Json::Type::ARRAY) { - *output = nullptr; - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:type should be ARRAY") - .c_str())); - return false; - } - *output = &json.array_value(); - return true; -} - -template <typename ErrorVectorType> -bool ExtractJsonType(const Json& json, const std::string& field_name, - const Json::Object** output, ErrorVectorType* error_list) { - if (json.type() != Json::Type::OBJECT) { - *output = nullptr; - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:type should be OBJECT") - .c_str())); - return false; - } - *output = &json.object_value(); - return true; -} - -template <typename ErrorVectorType> -bool ExtractJsonType(const Json& json, const std::string& field_name, - grpc_millis* output, ErrorVectorType* error_list) { - if (!ParseDurationFromJson(json, output)) { - *output = GRPC_MILLIS_INF_PAST; - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, - " error:type should be STRING of the form given by " - "google.proto.Duration.") - .c_str())); - return false; - } - return true; -} - -template <typename T, typename ErrorVectorType> -bool ParseJsonObjectField(const Json::Object& object, - const std::string& field_name, T* output, - ErrorVectorType* error_list, bool optional = false) { - auto it = object.find(field_name); - if (it == object.end()) { - if (!optional) { - error_list->push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:", field_name, " error:does not exist.") - .c_str())); - } - return false; - } - auto& child_object_json = it->second; - return ExtractJsonType(child_object_json, field_name, output, error_list); -} - } // namespace // @@ -164,28 +47,33 @@ const char* GoogleMeshCaCertificateProviderFactory::Config::name() const { return kMeshCaPlugin; } +std::string GoogleMeshCaCertificateProviderFactory::Config::ToString() const { + // TODO(yashykt): To be filled + return "{}"; +} + std::vector<grpc_error*> GoogleMeshCaCertificateProviderFactory::Config::ParseJsonObjectStsService( const Json::Object& sts_service) { std::vector<grpc_error*> error_list_sts_service; if (!ParseJsonObjectField(sts_service, "token_exchange_service_uri", &sts_config_.token_exchange_service_uri, - &error_list_sts_service, true)) { + &error_list_sts_service, false)) { sts_config_.token_exchange_service_uri = "securetoken.googleapis.com"; // default } ParseJsonObjectField(sts_service, "resource", &sts_config_.resource, - &error_list_sts_service, true); + &error_list_sts_service, false); ParseJsonObjectField(sts_service, "audience", &sts_config_.audience, - &error_list_sts_service, true); + &error_list_sts_service, false); if (!ParseJsonObjectField(sts_service, "scope", &sts_config_.scope, - &error_list_sts_service, true)) { + &error_list_sts_service, false)) { sts_config_.scope = "https://www.googleapis.com/auth/cloud-platform"; // default } ParseJsonObjectField(sts_service, "requested_token_type", &sts_config_.requested_token_type, - &error_list_sts_service, true); + &error_list_sts_service, false); ParseJsonObjectField(sts_service, "subject_token_path", &sts_config_.subject_token_path, &error_list_sts_service); @@ -194,10 +82,10 @@ GoogleMeshCaCertificateProviderFactory::Config::ParseJsonObjectStsService( &error_list_sts_service); ParseJsonObjectField(sts_service, "actor_token_path", &sts_config_.actor_token_path, &error_list_sts_service, - true); + false); ParseJsonObjectField(sts_service, "actor_token_type", &sts_config_.actor_token_type, &error_list_sts_service, - true); + false); return error_list_sts_service; } @@ -223,7 +111,7 @@ GoogleMeshCaCertificateProviderFactory::Config::ParseJsonObjectGoogleGrpc( const Json::Object& google_grpc) { std::vector<grpc_error*> error_list_google_grpc; if (!ParseJsonObjectField(google_grpc, "target_uri", &endpoint_, - &error_list_google_grpc, true)) { + &error_list_google_grpc, false)) { endpoint_ = "meshca.googleapis.com"; // Default target } const Json::Array* call_credentials_array = nullptr; @@ -263,8 +151,8 @@ GoogleMeshCaCertificateProviderFactory::Config::ParseJsonObjectGrpcServices( "field:google_grpc", &error_list_google_grpc)); } } - if (!ParseJsonObjectField(grpc_service, "timeout", &timeout_, - &error_list_grpc_services, true)) { + if (!ParseJsonObjectFieldAsDuration(grpc_service, "timeout", &timeout_, + &error_list_grpc_services, false)) { timeout_ = 10 * 1000; // 10sec default } return error_list_grpc_services; @@ -276,7 +164,7 @@ GoogleMeshCaCertificateProviderFactory::Config::ParseJsonObjectServer( std::vector<grpc_error*> error_list_server; std::string api_type; if (ParseJsonObjectField(server, "api_type", &api_type, &error_list_server, - true)) { + false)) { if (api_type != "GRPC") { error_list_server.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:api_type error:Only GRPC is supported")); @@ -304,11 +192,11 @@ GoogleMeshCaCertificateProviderFactory::Config::ParseJsonObjectServer( return error_list_server; } -std::unique_ptr<GoogleMeshCaCertificateProviderFactory::Config> +RefCountedPtr<GoogleMeshCaCertificateProviderFactory::Config> GoogleMeshCaCertificateProviderFactory::Config::Parse(const Json& config_json, grpc_error** error) { auto config = - absl::make_unique<GoogleMeshCaCertificateProviderFactory::Config>(); + MakeRefCounted<GoogleMeshCaCertificateProviderFactory::Config>(); if (config_json.type() != Json::Type::OBJECT) { *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "error:config type should be OBJECT."); @@ -325,30 +213,30 @@ GoogleMeshCaCertificateProviderFactory::Config::Parse(const Json& config_json, GRPC_ERROR_CREATE_FROM_VECTOR("field:server", &error_list_server)); } } - if (!ParseJsonObjectField(config_json.object_value(), "certificate_lifetime", - &config->certificate_lifetime_, &error_list, - true)) { + if (!ParseJsonObjectFieldAsDuration( + config_json.object_value(), "certificate_lifetime", + &config->certificate_lifetime_, &error_list, false)) { config->certificate_lifetime_ = 24 * 60 * 60 * 1000; // 24hrs default } - if (!ParseJsonObjectField(config_json.object_value(), "renewal_grace_period", - &config->renewal_grace_period_, &error_list, - true)) { + if (!ParseJsonObjectFieldAsDuration( + config_json.object_value(), "renewal_grace_period", + &config->renewal_grace_period_, &error_list, false)) { config->renewal_grace_period_ = 12 * 60 * 60 * 1000; // 12hrs default } std::string key_type; if (ParseJsonObjectField(config_json.object_value(), "key_type", &key_type, - &error_list, true)) { + &error_list, false)) { if (key_type != "RSA") { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:key_type error:Only RSA is supported.")); } } if (!ParseJsonObjectField(config_json.object_value(), "key_size", - &config->key_size_, &error_list, true)) { + &config->key_size_, &error_list, false)) { config->key_size_ = 2048; // default 2048 bit key size } if (!ParseJsonObjectField(config_json.object_value(), "location", - &config->location_, &error_list, true)) { + &config->location_, &error_list, false)) { // GCE/GKE Metadata server needs to be contacted to get the value. } if (!error_list.empty()) { @@ -367,7 +255,7 @@ const char* GoogleMeshCaCertificateProviderFactory::name() const { return kMeshCaPlugin; } -std::unique_ptr<CertificateProviderFactory::Config> +RefCountedPtr<CertificateProviderFactory::Config> GoogleMeshCaCertificateProviderFactory::CreateCertificateProviderConfig( const Json& config_json, grpc_error** error) { return GoogleMeshCaCertificateProviderFactory::Config::Parse(config_json, diff --git a/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.h b/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.h index a48b1c65..f2765d6d 100644 --- a/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.h +++ b/grpc/src/core/ext/xds/google_mesh_ca_certificate_provider_factory.h @@ -46,6 +46,8 @@ class GoogleMeshCaCertificateProviderFactory const char* name() const override; + std::string ToString() const override; + const std::string& endpoint() const { return endpoint_; } const StsConfig& sts_config() const { return sts_config_; } @@ -60,8 +62,8 @@ class GoogleMeshCaCertificateProviderFactory const std::string& location() const { return location_; } - static std::unique_ptr<Config> Parse(const Json& config_json, - grpc_error** error); + static RefCountedPtr<Config> Parse(const Json& config_json, + grpc_error** error); private: // Helpers for parsing the config @@ -86,12 +88,12 @@ class GoogleMeshCaCertificateProviderFactory const char* name() const override; - std::unique_ptr<CertificateProviderFactory::Config> + RefCountedPtr<CertificateProviderFactory::Config> CreateCertificateProviderConfig(const Json& config_json, grpc_error** error) override; RefCountedPtr<grpc_tls_certificate_provider> CreateCertificateProvider( - std::unique_ptr<CertificateProviderFactory::Config> config) override { + RefCountedPtr<CertificateProviderFactory::Config> config) override { // TODO(yashykt) : To be implemented return nullptr; } diff --git a/grpc/src/core/ext/xds/xds_api.cc b/grpc/src/core/ext/xds/xds_api.cc index bbd8d248..e9403c2c 100644 --- a/grpc/src/core/ext/xds/xds_api.cc +++ b/grpc/src/core/ext/xds/xds_api.cc @@ -41,29 +41,39 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/slice/slice_utils.h" #include "envoy/config/cluster/v3/circuit_breaker.upb.h" #include "envoy/config/cluster/v3/cluster.upb.h" +#include "envoy/config/cluster/v3/cluster.upbdefs.h" #include "envoy/config/core/v3/address.upb.h" #include "envoy/config/core/v3/base.upb.h" #include "envoy/config/core/v3/config_source.upb.h" #include "envoy/config/core/v3/health_check.upb.h" +#include "envoy/config/core/v3/protocol.upb.h" #include "envoy/config/endpoint/v3/endpoint.upb.h" +#include "envoy/config/endpoint/v3/endpoint.upbdefs.h" #include "envoy/config/endpoint/v3/endpoint_components.upb.h" #include "envoy/config/endpoint/v3/load_report.upb.h" #include "envoy/config/listener/v3/api_listener.upb.h" #include "envoy/config/listener/v3/listener.upb.h" #include "envoy/config/route/v3/route.upb.h" +#include "envoy/config/route/v3/route.upbdefs.h" #include "envoy/config/route/v3/route_components.upb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h" #include "envoy/extensions/transport_sockets/tls/v3/common.upb.h" #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h" #include "envoy/service/cluster/v3/cds.upb.h" +#include "envoy/service/cluster/v3/cds.upbdefs.h" #include "envoy/service/discovery/v3/discovery.upb.h" +#include "envoy/service/discovery/v3/discovery.upbdefs.h" #include "envoy/service/endpoint/v3/eds.upb.h" +#include "envoy/service/endpoint/v3/eds.upbdefs.h" #include "envoy/service/listener/v3/lds.upb.h" #include "envoy/service/load_stats/v3/lrs.upb.h" +#include "envoy/service/load_stats/v3/lrs.upbdefs.h" #include "envoy/service/route/v3/rds.upb.h" +#include "envoy/service/route/v3/rds.upbdefs.h" #include "envoy/type/matcher/v3/regex.upb.h" #include "envoy/type/matcher/v3/string.upb.h" #include "envoy/type/v3/percent.upb.h" @@ -73,18 +83,44 @@ #include "google/protobuf/struct.upb.h" #include "google/protobuf/wrappers.upb.h" #include "google/rpc/status.upb.h" +#include "upb/text_encode.h" #include "upb/upb.h" namespace grpc_core { +// TODO (donnadionne): Check to see if timeout is enabled, this will be +// removed once timeout feature is fully integration-tested and enabled by +// default. +bool XdsTimeoutEnabled() { + char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"); + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); + gpr_free(value); + return parse_succeeded && parsed_value; +} + +// TODO(yashykt): Check to see if xDS security is enabled. This will be +// removed once this feature is fully integration-tested and enabled by +// default. +bool XdsSecurityEnabled() { + char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"); + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); + gpr_free(value); + return parse_succeeded && parsed_value; +} + // // XdsApi::Route::Matchers::PathMatcher // XdsApi::Route::Matchers::PathMatcher::PathMatcher(const PathMatcher& other) - : type(other.type) { + : type(other.type), case_sensitive(other.case_sensitive) { if (type == PathMatcherType::REGEX) { - regex_matcher = absl::make_unique<RE2>(other.regex_matcher->pattern()); + RE2::Options options; + options.set_case_sensitive(case_sensitive); + regex_matcher = + absl::make_unique<RE2>(other.regex_matcher->pattern(), options); } else { string_matcher = other.string_matcher; } @@ -93,8 +129,12 @@ XdsApi::Route::Matchers::PathMatcher::PathMatcher(const PathMatcher& other) XdsApi::Route::Matchers::PathMatcher& XdsApi::Route::Matchers::PathMatcher:: operator=(const PathMatcher& other) { type = other.type; + case_sensitive = other.case_sensitive; if (type == PathMatcherType::REGEX) { - regex_matcher = absl::make_unique<RE2>(other.regex_matcher->pattern()); + RE2::Options options; + options.set_case_sensitive(case_sensitive); + regex_matcher = + absl::make_unique<RE2>(other.regex_matcher->pattern(), options); } else { string_matcher = other.string_matcher; } @@ -104,6 +144,7 @@ operator=(const PathMatcher& other) { bool XdsApi::Route::Matchers::PathMatcher::operator==( const PathMatcher& other) const { if (type != other.type) return false; + if (case_sensitive != other.case_sensitive) return false; if (type == PathMatcherType::REGEX) { // Should never be null. if (regex_matcher == nullptr || other.regex_matcher == nullptr) { @@ -129,10 +170,11 @@ std::string XdsApi::Route::Matchers::PathMatcher::ToString() const { default: break; } - return absl::StrFormat("Path %s:%s", path_type_string, + return absl::StrFormat("Path %s:%s%s", path_type_string, type == PathMatcherType::REGEX ? regex_matcher->pattern() - : string_matcher); + : string_matcher, + case_sensitive ? "" : "[case_sensitive=false]"); } // @@ -255,6 +297,9 @@ std::string XdsApi::Route::ToString() const { for (const ClusterWeight& cluster_weight : weighted_clusters) { contents.push_back(cluster_weight.ToString()); } + if (max_stream_duration.has_value()) { + contents.push_back(max_stream_duration->ToString()); + } return absl::StrJoin(contents, "\n"); } @@ -294,9 +339,11 @@ enum MatchType { }; // Returns true if match succeeds. -bool DomainMatch(MatchType match_type, std::string domain_pattern, - std::string expected_host_name) { +bool DomainMatch(MatchType match_type, const std::string& domain_pattern_in, + const std::string& expected_host_name_in) { // Normalize the args to lower-case. Domain matching is case-insensitive. + std::string domain_pattern = domain_pattern_in; + std::string expected_host_name = expected_host_name_in; std::transform(domain_pattern.begin(), domain_pattern.end(), domain_pattern.begin(), [](unsigned char c) { return std::tolower(c); }); @@ -382,40 +429,209 @@ XdsApi::RdsUpdate::VirtualHost* XdsApi::RdsUpdate::FindVirtualHostForDomain( // XdsApi::StringMatcher // +XdsApi::StringMatcher::StringMatcher(StringMatcherType type, + const std::string& matcher, + bool ignore_case) + : type_(type), ignore_case_(ignore_case) { + if (type_ == StringMatcherType::SAFE_REGEX) { + regex_matcher_ = absl::make_unique<RE2>(matcher); + } else { + string_matcher_ = matcher; + } +} + XdsApi::StringMatcher::StringMatcher(const StringMatcher& other) - : type(other.type) { - switch (type) { + : type_(other.type_), ignore_case_(other.ignore_case_) { + switch (type_) { case StringMatcherType::SAFE_REGEX: - regex_match = absl::make_unique<RE2>(other.regex_match->pattern()); + regex_matcher_ = absl::make_unique<RE2>(other.regex_matcher_->pattern()); break; default: - string_matcher = other.string_matcher; + string_matcher_ = other.string_matcher_; } } XdsApi::StringMatcher& XdsApi::StringMatcher::operator=( const StringMatcher& other) { - type = other.type; - switch (type) { + type_ = other.type_; + switch (type_) { case StringMatcherType::SAFE_REGEX: - regex_match = absl::make_unique<RE2>(other.regex_match->pattern()); + regex_matcher_ = absl::make_unique<RE2>(other.regex_matcher_->pattern()); break; default: - string_matcher = other.string_matcher; + string_matcher_ = other.string_matcher_; } + ignore_case_ = other.ignore_case_; return *this; } bool XdsApi::StringMatcher::operator==(const StringMatcher& other) const { - if (type != other.type) return false; - switch (type) { + if (type_ != other.type_ || ignore_case_ != other.ignore_case_) return false; + switch (type_) { case StringMatcherType::SAFE_REGEX: - return regex_match->pattern() != other.regex_match->pattern(); + return regex_matcher_->pattern() == other.regex_matcher_->pattern(); default: - return string_matcher != other.string_matcher; + return string_matcher_ == other.string_matcher_; + } +} + +bool XdsApi::StringMatcher::Match(absl::string_view value) const { + switch (type_) { + case XdsApi::StringMatcher::StringMatcherType::EXACT: + return ignore_case_ ? absl::EqualsIgnoreCase(value, string_matcher_) + : value == string_matcher_; + case XdsApi::StringMatcher::StringMatcherType::PREFIX: + return ignore_case_ ? absl::StartsWithIgnoreCase(value, string_matcher_) + : absl::StartsWith(value, string_matcher_); + case XdsApi::StringMatcher::StringMatcherType::SUFFIX: + return ignore_case_ ? absl::EndsWithIgnoreCase(value, string_matcher_) + : absl::EndsWith(value, string_matcher_); + case XdsApi::StringMatcher::StringMatcherType::CONTAINS: + return ignore_case_ + ? absl::StrContains(absl::AsciiStrToLower(value), + absl::AsciiStrToLower(string_matcher_)) + : absl::StrContains(value, string_matcher_); + case XdsApi::StringMatcher::StringMatcherType::SAFE_REGEX: + // ignore_case_ is ignored for SAFE_REGEX + return RE2::FullMatch(std::string(value), *regex_matcher_); + default: + return false; } } +std::string XdsApi::StringMatcher::ToString() const { + switch (type_) { + case StringMatcherType::EXACT: + return absl::StrFormat("StringMatcher{exact=%s%s}", string_matcher_, + ignore_case_ ? ", ignore_case" : ""); + case StringMatcherType::PREFIX: + return absl::StrFormat("StringMatcher{prefix=%s%s}", string_matcher_, + ignore_case_ ? ", ignore_case" : ""); + case StringMatcherType::SUFFIX: + return absl::StrFormat("StringMatcher{suffix=%s%s}", string_matcher_, + ignore_case_ ? ", ignore_case" : ""); + case StringMatcherType::CONTAINS: + return absl::StrFormat("StringMatcher{contains=%s%s}", string_matcher_, + ignore_case_ ? ", ignore_case" : ""); + case StringMatcherType::SAFE_REGEX: + return absl::StrFormat("StringMatcher{safe_regex=%s}", + regex_matcher_->pattern()); + default: + return ""; + } +} + +// +// XdsApi::CommonTlsContext::CertificateValidationContext +// + +std::string XdsApi::CommonTlsContext::CertificateValidationContext::ToString() + const { + std::vector<std::string> contents; + for (const auto& match : match_subject_alt_names) { + contents.push_back(match.ToString()); + } + return absl::StrFormat("{match_subject_alt_names=[%s]}", + absl::StrJoin(contents, ", ")); +} + +bool XdsApi::CommonTlsContext::CertificateValidationContext::Empty() const { + return match_subject_alt_names.empty(); +} + +// +// XdsApi::CommonTlsContext::CertificateValidationContext +// + +std::string XdsApi::CommonTlsContext::CertificateProviderInstance::ToString() + const { + absl::InlinedVector<std::string, 2> contents; + if (!instance_name.empty()) { + contents.push_back(absl::StrFormat("instance_name=%s", instance_name)); + } + if (!certificate_name.empty()) { + contents.push_back( + absl::StrFormat("certificate_name=%s", certificate_name)); + } + return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); +} + +bool XdsApi::CommonTlsContext::CertificateProviderInstance::Empty() const { + return instance_name.empty() && certificate_name.empty(); +} + +// +// XdsApi::CommonTlsContext::CombinedCertificateValidationContext +// + +std::string +XdsApi::CommonTlsContext::CombinedCertificateValidationContext::ToString() + const { + absl::InlinedVector<std::string, 2> contents; + if (!default_validation_context.Empty()) { + contents.push_back(absl::StrFormat("default_validation_context=%s", + default_validation_context.ToString())); + } + if (!validation_context_certificate_provider_instance.Empty()) { + contents.push_back(absl::StrFormat( + "validation_context_certificate_provider_instance=%s", + validation_context_certificate_provider_instance.ToString())); + } + return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); +} + +bool XdsApi::CommonTlsContext::CombinedCertificateValidationContext::Empty() + const { + return default_validation_context.Empty() && + validation_context_certificate_provider_instance.Empty(); +} + +// +// XdsApi::CommonTlsContext +// + +std::string XdsApi::CommonTlsContext::ToString() const { + absl::InlinedVector<std::string, 2> contents; + if (!tls_certificate_certificate_provider_instance.Empty()) { + contents.push_back(absl::StrFormat( + "tls_certificate_certificate_provider_instance=%s", + tls_certificate_certificate_provider_instance.ToString())); + } + if (!combined_validation_context.Empty()) { + contents.push_back(absl::StrFormat("combined_validation_context=%s", + combined_validation_context.ToString())); + } + return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); +} + +bool XdsApi::CommonTlsContext::Empty() const { + return tls_certificate_certificate_provider_instance.Empty() && + combined_validation_context.Empty(); +} + +// +// XdsApi::CdsUpdate +// + +std::string XdsApi::CdsUpdate::ToString() const { + absl::InlinedVector<std::string, 4> contents; + if (!eds_service_name.empty()) { + contents.push_back( + absl::StrFormat("eds_service_name=%s", eds_service_name)); + } + if (!common_tls_context.Empty()) { + contents.push_back(absl::StrFormat("common_tls_context=%s", + common_tls_context.ToString())); + } + if (lrs_load_reporting_server_name.has_value()) { + contents.push_back(absl::StrFormat("lrs_load_reporting_server_name=%s", + lrs_load_reporting_server_name.value())); + } + contents.push_back( + absl::StrFormat("max_concurrent_requests=%d", max_concurrent_requests)); + return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); +} + // // XdsApi::EdsUpdate // @@ -527,11 +743,10 @@ bool IsEds(absl::string_view type_url) { } // namespace XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer, - const XdsBootstrap* bootstrap) + const XdsBootstrap::Node* node) : client_(client), tracer_(tracer), - use_v3_(bootstrap != nullptr && bootstrap->server().ShouldUseV3()), - bootstrap_(bootstrap), + node_(node), build_version_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ", grpc_version_string())), user_agent_name_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) {} @@ -632,11 +847,10 @@ void PopulateBuildVersion(upb_arena* arena, envoy_config_core_v3_Node* node_msg, encoded_build_version.size(), arena); } -void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap, +void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node, bool use_v3, const std::string& build_version, const std::string& user_agent_name, envoy_config_core_v3_Node* node_msg) { - const XdsBootstrap::Node* node = bootstrap->node(); if (node != nullptr) { if (!node->id.empty()) { envoy_config_core_v3_Node_set_id(node_msg, @@ -669,7 +883,7 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap, } } } - if (!bootstrap->server().ShouldUseV3()) { + if (!use_v3) { PopulateBuildVersion(arena, node_msg, build_version); } envoy_config_core_v3_Node_set_user_agent_name( @@ -689,172 +903,17 @@ inline std::string UpbStringToStdString(const upb_strview& str) { return std::string(str.data, str.size); } -inline void AddStringField(const char* name, const upb_strview& value, - std::vector<std::string>* fields, - bool add_if_empty = false) { - if (value.size > 0 || add_if_empty) { - fields->emplace_back( - absl::StrCat(name, ": \"", UpbStringToAbsl(value), "\"")); - } -} - -inline void AddUInt32ValueField(const char* name, - const google_protobuf_UInt32Value* value, - std::vector<std::string>* fields) { - if (value != nullptr) { - fields->emplace_back(absl::StrCat( - name, " { value: ", google_protobuf_UInt32Value_value(value), " }")); - } -} - -inline void AddLocalityField(int indent_level, - const envoy_config_core_v3_Locality* locality, - std::vector<std::string>* fields) { - std::string indent = - absl::StrJoin(std::vector<std::string>(indent_level, " "), ""); - // region - std::string field = absl::StrCat(indent, "region"); - AddStringField(field.c_str(), envoy_config_core_v3_Locality_region(locality), - fields); - // zone - field = absl::StrCat(indent, "zone"); - AddStringField(field.c_str(), envoy_config_core_v3_Locality_zone(locality), - fields); - // sub_zone - field = absl::StrCat(indent, "sub_zone"); - AddStringField(field.c_str(), - envoy_config_core_v3_Locality_sub_zone(locality), fields); -} - -void AddNodeLogFields(const envoy_config_core_v3_Node* node, - const std::string& build_version, - std::vector<std::string>* fields) { - fields->emplace_back("node {"); - // id - AddStringField(" id", envoy_config_core_v3_Node_id(node), fields); - // metadata - const google_protobuf_Struct* metadata = - envoy_config_core_v3_Node_metadata(node); - if (metadata != nullptr) { - fields->emplace_back(" metadata {"); - size_t entry_idx = UPB_MAP_BEGIN; - while (true) { - const google_protobuf_Struct_FieldsEntry* entry = - google_protobuf_Struct_fields_next(metadata, &entry_idx); - if (entry == nullptr) break; - fields->emplace_back(" field {"); - // key - AddStringField(" key", google_protobuf_Struct_FieldsEntry_key(entry), - fields); - // value - const google_protobuf_Value* value = - google_protobuf_Struct_FieldsEntry_value(entry); - if (value != nullptr) { - std::string value_str; - if (google_protobuf_Value_has_string_value(value)) { - value_str = absl::StrCat( - "string_value: \"", - UpbStringToAbsl(google_protobuf_Value_string_value(value)), "\""); - } else if (google_protobuf_Value_has_null_value(value)) { - value_str = "null_value: NULL_VALUE"; - } else if (google_protobuf_Value_has_number_value(value)) { - value_str = absl::StrCat("double_value: ", - google_protobuf_Value_number_value(value)); - } else if (google_protobuf_Value_has_bool_value(value)) { - value_str = absl::StrCat("bool_value: ", - google_protobuf_Value_bool_value(value)); - } else if (google_protobuf_Value_has_struct_value(value)) { - value_str = "struct_value: <not printed>"; - } else if (google_protobuf_Value_has_list_value(value)) { - value_str = "list_value: <not printed>"; - } else { - value_str = "<unknown>"; - } - fields->emplace_back(absl::StrCat(" value { ", value_str, " }")); - } - fields->emplace_back(" }"); - } - fields->emplace_back(" }"); - } - // locality - const envoy_config_core_v3_Locality* locality = - envoy_config_core_v3_Node_locality(node); - if (locality != nullptr) { - fields->emplace_back(" locality {"); - AddLocalityField(2, locality, fields); - fields->emplace_back(" }"); - } - // build_version (doesn't exist in v3 proto; this is a horrible hack) - if (!build_version.empty()) { - fields->emplace_back( - absl::StrCat(" build_version: \"", build_version, "\"")); - } - // user_agent_name - AddStringField(" user_agent_name", - envoy_config_core_v3_Node_user_agent_name(node), fields); - // user_agent_version - AddStringField(" user_agent_version", - envoy_config_core_v3_Node_user_agent_version(node), fields); - // client_features - size_t num_client_features; - const upb_strview* client_features = - envoy_config_core_v3_Node_client_features(node, &num_client_features); - for (size_t i = 0; i < num_client_features; ++i) { - AddStringField(" client_features", client_features[i], fields); - } - fields->emplace_back("}"); -} - void MaybeLogDiscoveryRequest( - XdsClient* client, TraceFlag* tracer, - const envoy_service_discovery_v3_DiscoveryRequest* request, - const std::string& build_version) { + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, + const envoy_service_discovery_v3_DiscoveryRequest* request) { if (GRPC_TRACE_FLAG_ENABLED(*tracer) && gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { - // TODO(roth): When we can upgrade upb, use upb textformat code to dump - // the raw proto instead of doing this manually. - std::vector<std::string> fields; - // version_info - AddStringField( - "version_info", - envoy_service_discovery_v3_DiscoveryRequest_version_info(request), - &fields); - // node - const envoy_config_core_v3_Node* node = - envoy_service_discovery_v3_DiscoveryRequest_node(request); - if (node != nullptr) AddNodeLogFields(node, build_version, &fields); - // resource_names - size_t num_resource_names; - const upb_strview* resource_names = - envoy_service_discovery_v3_DiscoveryRequest_resource_names( - request, &num_resource_names); - for (size_t i = 0; i < num_resource_names; ++i) { - AddStringField("resource_names", resource_names[i], &fields); - } - // type_url - AddStringField( - "type_url", - envoy_service_discovery_v3_DiscoveryRequest_type_url(request), &fields); - // response_nonce - AddStringField( - "response_nonce", - envoy_service_discovery_v3_DiscoveryRequest_response_nonce(request), - &fields); - // error_detail - const struct google_rpc_Status* error_detail = - envoy_service_discovery_v3_DiscoveryRequest_error_detail(request); - if (error_detail != nullptr) { - fields.emplace_back("error_detail {"); - // code - int32_t code = google_rpc_Status_code(error_detail); - if (code != 0) fields.emplace_back(absl::StrCat(" code: ", code)); - // message - AddStringField(" message", google_rpc_Status_message(error_detail), - &fields); - fields.emplace_back("}"); - } + const upb_msgdef* msg_type = + envoy_service_discovery_v3_DiscoveryRequest_getmsgdef(symtab); + char buf[10240]; + upb_text_encode(request, msg_type, nullptr, 0, buf, sizeof(buf)); gpr_log(GPR_DEBUG, "[xds_client %p] constructed ADS request: %s", client, - absl::StrJoin(fields, "\n").c_str()); + buf); } } @@ -888,7 +947,7 @@ absl::string_view TypeUrlExternalToInternal(bool use_v3, } // namespace grpc_slice XdsApi::CreateAdsRequest( - const std::string& type_url, + const XdsBootstrap::XdsServer& server, const std::string& type_url, const std::set<absl::string_view>& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, bool populate_node) { @@ -898,7 +957,7 @@ grpc_slice XdsApi::CreateAdsRequest( envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr()); // Set type_url. absl::string_view real_type_url = - TypeUrlExternalToInternal(use_v3_, type_url); + TypeUrlExternalToInternal(server.ShouldUseV3(), type_url); envoy_service_discovery_v3_DiscoveryRequest_set_type_url( request, StdStringToUpbString(real_type_url)); // Set version_info. @@ -913,16 +972,20 @@ grpc_slice XdsApi::CreateAdsRequest( } // Set error_detail if it's a NACK. if (error != GRPC_ERROR_NONE) { + google_rpc_Status* error_detail = + envoy_service_discovery_v3_DiscoveryRequest_mutable_error_detail( + request, arena.ptr()); + // Hard-code INVALID_ARGUMENT as the status code. + // TODO(roth): If at some point we decide we care about this value, + // we could attach a status code to the individual errors where we + // generate them in the parsing code, and then use that here. + google_rpc_Status_set_code(error_detail, GRPC_STATUS_INVALID_ARGUMENT); + // Error description comes from the error that was passed in. grpc_slice error_description_slice; GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_description_slice)); upb_strview error_description_strview = - upb_strview_make(reinterpret_cast<const char*>( - GPR_SLICE_START_PTR(error_description_slice)), - GPR_SLICE_LENGTH(error_description_slice)); - google_rpc_Status* error_detail = - envoy_service_discovery_v3_DiscoveryRequest_mutable_error_detail( - request, arena.ptr()); + StdStringToUpbString(StringViewFromSlice(error_description_slice)); google_rpc_Status_set_message(error_detail, error_description_strview); GRPC_ERROR_UNREF(error); } @@ -931,401 +994,84 @@ grpc_slice XdsApi::CreateAdsRequest( envoy_config_core_v3_Node* node_msg = envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, - node_msg); + PopulateNode(arena.ptr(), node_, server.ShouldUseV3(), build_version_, + user_agent_name_, node_msg); } // Add resource_names. for (const auto& resource_name : resource_names) { envoy_service_discovery_v3_DiscoveryRequest_add_resource_names( request, StdStringToUpbString(resource_name), arena.ptr()); } - MaybeLogDiscoveryRequest(client_, tracer_, request, build_version_); + MaybeLogDiscoveryRequest(client_, tracer_, symtab_.ptr(), request); return SerializeDiscoveryRequest(arena.ptr(), request); } namespace { void MaybeLogDiscoveryResponse( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_service_discovery_v3_DiscoveryResponse* response) { if (GRPC_TRACE_FLAG_ENABLED(*tracer) && gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { - // TODO(roth): When we can upgrade upb, use upb textformat code to dump - // the raw proto instead of doing this manually. - std::vector<std::string> fields; - // version_info - AddStringField( - "version_info", - envoy_service_discovery_v3_DiscoveryResponse_version_info(response), - &fields); - // resources - size_t num_resources; - envoy_service_discovery_v3_DiscoveryResponse_resources(response, - &num_resources); - fields.emplace_back( - absl::StrCat("resources: <", num_resources, " element(s)>")); - // type_url - AddStringField( - "type_url", - envoy_service_discovery_v3_DiscoveryResponse_type_url(response), - &fields); - // nonce - AddStringField("nonce", - envoy_service_discovery_v3_DiscoveryResponse_nonce(response), - &fields); - gpr_log(GPR_DEBUG, "[xds_client %p] received response: %s", client, - absl::StrJoin(fields, "\n").c_str()); + const upb_msgdef* msg_type = + envoy_service_discovery_v3_DiscoveryResponse_getmsgdef(symtab); + char buf[10240]; + upb_text_encode(response, msg_type, nullptr, 0, buf, sizeof(buf)); + gpr_log(GPR_DEBUG, "[xds_client %p] received response: %s", client, buf); } } void MaybeLogRouteConfiguration( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_config_route_v3_RouteConfiguration* route_config) { if (GRPC_TRACE_FLAG_ENABLED(*tracer) && gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { - // TODO(roth): When we can upgrade upb, use upb textformat code to dump - // the raw proto instead of doing this manually. - std::vector<std::string> fields; - // name - AddStringField("name", - envoy_config_route_v3_RouteConfiguration_name(route_config), - &fields); - // virtual_hosts - size_t num_virtual_hosts; - const envoy_config_route_v3_VirtualHost* const* virtual_hosts = - envoy_config_route_v3_RouteConfiguration_virtual_hosts( - route_config, &num_virtual_hosts); - for (size_t i = 0; i < num_virtual_hosts; ++i) { - const auto* virtual_host = virtual_hosts[i]; - fields.push_back("virtual_hosts {"); - // name - AddStringField(" name", - envoy_config_route_v3_VirtualHost_name(virtual_host), - &fields); - // domains - size_t num_domains; - const upb_strview* const domains = - envoy_config_route_v3_VirtualHost_domains(virtual_host, &num_domains); - for (size_t j = 0; j < num_domains; ++j) { - AddStringField(" domains", domains[j], &fields); - } - // routes - size_t num_routes; - const envoy_config_route_v3_Route* const* routes = - envoy_config_route_v3_VirtualHost_routes(virtual_host, &num_routes); - for (size_t j = 0; j < num_routes; ++j) { - const auto* route = routes[j]; - fields.push_back(" route {"); - // name - AddStringField(" name", envoy_config_route_v3_Route_name(route), - &fields); - // match - const envoy_config_route_v3_RouteMatch* match = - envoy_config_route_v3_Route_match(route); - if (match != nullptr) { - fields.emplace_back(" match {"); - // path matching - if (envoy_config_route_v3_RouteMatch_has_prefix(match)) { - AddStringField(" prefix", - envoy_config_route_v3_RouteMatch_prefix(match), - &fields, - /*add_if_empty=*/true); - } else if (envoy_config_route_v3_RouteMatch_has_path(match)) { - AddStringField(" path", - envoy_config_route_v3_RouteMatch_path(match), - &fields, - /*add_if_empty=*/true); - } else if (envoy_config_route_v3_RouteMatch_has_safe_regex(match)) { - fields.emplace_back(" safe_regex: <not printed>"); - } else { - fields.emplace_back(" <unknown path matching type>"); - } - // header matching - size_t num_headers; - envoy_config_route_v3_RouteMatch_headers(match, &num_headers); - if (num_headers > 0) { - fields.emplace_back( - absl::StrCat(" headers: <", num_headers, " element(s)>")); - } - fields.emplace_back(" }"); - } - // action - if (envoy_config_route_v3_Route_has_route(route)) { - const envoy_config_route_v3_RouteAction* action = - envoy_config_route_v3_Route_route(route); - fields.emplace_back(" route {"); - if (envoy_config_route_v3_RouteAction_has_cluster(action)) { - AddStringField(" cluster", - envoy_config_route_v3_RouteAction_cluster(action), - &fields); - } else if (envoy_config_route_v3_RouteAction_has_cluster_header( - action)) { - AddStringField( - " cluster_header", - envoy_config_route_v3_RouteAction_cluster_header(action), - &fields); - } else if (envoy_config_route_v3_RouteAction_has_weighted_clusters( - action)) { - const envoy_config_route_v3_WeightedCluster* weighted_clusters = - envoy_config_route_v3_RouteAction_weighted_clusters(action); - fields.emplace_back(" weighted_clusters {"); - size_t num_cluster_weights; - const envoy_config_route_v3_WeightedCluster_ClusterWeight* const* - cluster_weights = - envoy_config_route_v3_WeightedCluster_clusters( - weighted_clusters, &num_cluster_weights); - for (size_t i = 0; i < num_cluster_weights; ++i) { - const envoy_config_route_v3_WeightedCluster_ClusterWeight* - cluster_weight = cluster_weights[i]; - fields.emplace_back(" clusters {"); - AddStringField( - " name", - envoy_config_route_v3_WeightedCluster_ClusterWeight_name( - cluster_weight), - &fields); - AddUInt32ValueField( - " weight", - envoy_config_route_v3_WeightedCluster_ClusterWeight_weight( - cluster_weight), - &fields); - fields.emplace_back(" }"); - } - AddUInt32ValueField( - " total_weight", - envoy_config_route_v3_WeightedCluster_total_weight( - weighted_clusters), - &fields); - fields.emplace_back(" }"); - } - fields.emplace_back(" }"); - } else if (envoy_config_route_v3_Route_has_redirect(route)) { - fields.emplace_back(" redirect: <not printed>"); - } else if (envoy_config_route_v3_Route_has_direct_response(route)) { - fields.emplace_back(" direct_response: <not printed>"); - } else if (envoy_config_route_v3_Route_has_filter_action(route)) { - fields.emplace_back(" filter_action: <not printed>"); - } - fields.push_back(" }"); - } - fields.push_back("}"); - } - gpr_log(GPR_DEBUG, "[xds_client %p] RouteConfiguration: %s", client, - absl::StrJoin(fields, "\n").c_str()); + const upb_msgdef* msg_type = + envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab); + char buf[10240]; + upb_text_encode(route_config, msg_type, nullptr, 0, buf, sizeof(buf)); + gpr_log(GPR_DEBUG, "[xds_client %p] RouteConfiguration: %s", client, buf); } } -void MaybeLogCluster(XdsClient* client, TraceFlag* tracer, +void MaybeLogCluster(XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_config_cluster_v3_Cluster* cluster) { if (GRPC_TRACE_FLAG_ENABLED(*tracer) && gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { - // TODO(roth): When we can upgrade upb, use upb textformat code to dump - // the raw proto instead of doing this manually. - std::vector<std::string> fields; - // name - AddStringField("name", envoy_config_cluster_v3_Cluster_name(cluster), - &fields); - // type - if (envoy_config_cluster_v3_Cluster_has_type(cluster)) { - fields.emplace_back(absl::StrCat( - "type: ", envoy_config_cluster_v3_Cluster_type(cluster))); - } else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { - fields.emplace_back("cluster_type: <not printed>"); - } else { - fields.emplace_back("<unknown type>"); - } - // eds_cluster_config - const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config = - envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); - if (eds_cluster_config != nullptr) { - fields.emplace_back("eds_cluster_config {"); - // eds_config - const struct envoy_config_core_v3_ConfigSource* eds_config = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( - eds_cluster_config); - if (eds_config != nullptr) { - if (envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { - fields.emplace_back(" eds_config { ads {} }"); - } else { - fields.emplace_back(" eds_config: <non-ADS type>"); - } - } - // service_name - AddStringField( - " service_name", - envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( - eds_cluster_config), - &fields); - fields.emplace_back("}"); - } - // lb_policy - fields.emplace_back(absl::StrCat( - "lb_policy: ", envoy_config_cluster_v3_Cluster_lb_policy(cluster))); - // lrs_server - const envoy_config_core_v3_ConfigSource* lrs_server = - envoy_config_cluster_v3_Cluster_lrs_server(cluster); - if (lrs_server != nullptr) { - if (envoy_config_core_v3_ConfigSource_has_self(lrs_server)) { - fields.emplace_back("lrs_server { self {} }"); - } else { - fields.emplace_back("lrs_server: <non-self type>"); - } - } - gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", client, - absl::StrJoin(fields, "\n").c_str()); + const upb_msgdef* msg_type = + envoy_config_cluster_v3_Cluster_getmsgdef(symtab); + char buf[10240]; + upb_text_encode(cluster, msg_type, nullptr, 0, buf, sizeof(buf)); + gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", client, buf); } } void MaybeLogClusterLoadAssignment( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) { if (GRPC_TRACE_FLAG_ENABLED(*tracer) && gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { - // TODO(roth): When we can upgrade upb, use upb textformat code to dump - // the raw proto instead of doing this manually. - std::vector<std::string> fields; - // cluster_name - AddStringField( - "cluster_name", - envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(cla), - &fields); - // endpoints - size_t num_localities; - const struct envoy_config_endpoint_v3_LocalityLbEndpoints* const* - locality_endpoints = - envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( - cla, &num_localities); - for (size_t i = 0; i < num_localities; ++i) { - const auto* locality_endpoint = locality_endpoints[i]; - fields.emplace_back("endpoints {"); - // locality - const auto* locality = - envoy_config_endpoint_v3_LocalityLbEndpoints_locality( - locality_endpoint); - if (locality != nullptr) { - fields.emplace_back(" locality {"); - AddLocalityField(2, locality, &fields); - fields.emplace_back(" }"); - } - // lb_endpoints - size_t num_lb_endpoints; - const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints = - envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints( - locality_endpoint, &num_lb_endpoints); - for (size_t j = 0; j < num_lb_endpoints; ++j) { - const auto* lb_endpoint = lb_endpoints[j]; - fields.emplace_back(" lb_endpoints {"); - // health_status - uint32_t health_status = - envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint); - if (health_status > 0) { - fields.emplace_back( - absl::StrCat(" health_status: ", health_status)); - } - // endpoint - const envoy_config_endpoint_v3_Endpoint* endpoint = - envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint); - if (endpoint != nullptr) { - fields.emplace_back(" endpoint {"); - // address - const auto* address = - envoy_config_endpoint_v3_Endpoint_address(endpoint); - if (address != nullptr) { - fields.emplace_back(" address {"); - // socket_address - const auto* socket_address = - envoy_config_core_v3_Address_socket_address(address); - if (socket_address != nullptr) { - fields.emplace_back(" socket_address {"); - // address - AddStringField( - " address", - envoy_config_core_v3_SocketAddress_address(socket_address), - &fields); - // port_value - if (envoy_config_core_v3_SocketAddress_has_port_value( - socket_address)) { - fields.emplace_back( - absl::StrCat(" port_value: ", - envoy_config_core_v3_SocketAddress_port_value( - socket_address))); - } else { - fields.emplace_back(" <non-numeric port>"); - } - fields.emplace_back(" }"); - } else { - fields.emplace_back(" <non-socket address>"); - } - fields.emplace_back(" }"); - } - fields.emplace_back(" }"); - } - fields.emplace_back(" }"); - } - // load_balancing_weight - AddUInt32ValueField( - " load_balancing_weight", - envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight( - locality_endpoint), - &fields); - // priority - uint32_t priority = envoy_config_endpoint_v3_LocalityLbEndpoints_priority( - locality_endpoint); - if (priority > 0) { - fields.emplace_back(absl::StrCat(" priority: ", priority)); - } - fields.emplace_back("}"); - } - // policy - const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy* policy = - envoy_config_endpoint_v3_ClusterLoadAssignment_policy(cla); - if (policy != nullptr) { - fields.emplace_back("policy {"); - // drop_overloads - size_t num_drop_overloads; - const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload* const* - drop_overloads = - envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads( - policy, &num_drop_overloads); - for (size_t i = 0; i < num_drop_overloads; ++i) { - auto* drop_overload = drop_overloads[i]; - fields.emplace_back(" drop_overloads {"); - // category - AddStringField( - " category", - envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category( - drop_overload), - &fields); - // drop_percentage - const auto* drop_percentage = - envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage( - drop_overload); - if (drop_percentage != nullptr) { - fields.emplace_back(" drop_percentage {"); - fields.emplace_back(absl::StrCat( - " numerator: ", - envoy_type_v3_FractionalPercent_numerator(drop_percentage))); - fields.emplace_back(absl::StrCat( - " denominator: ", - envoy_type_v3_FractionalPercent_denominator(drop_percentage))); - fields.emplace_back(" }"); - } - fields.emplace_back(" }"); - } - // overprovisioning_factor - fields.emplace_back("}"); - } + const upb_msgdef* msg_type = + envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab); + char buf[10240]; + upb_text_encode(cla, msg_type, nullptr, 0, buf, sizeof(buf)); gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s", client, - absl::StrJoin(fields, "\n").c_str()); + buf); } } grpc_error* RoutePathMatchParse(const envoy_config_route_v3_RouteMatch* match, XdsApi::Route* route, bool* ignore_route) { + auto* case_sensitive = envoy_config_route_v3_RouteMatch_case_sensitive(match); + if (case_sensitive != nullptr) { + route->matchers.path_matcher.case_sensitive = + google_protobuf_BoolValue_value(case_sensitive); + } if (envoy_config_route_v3_RouteMatch_has_prefix(match)) { absl::string_view prefix = UpbStringToAbsl(envoy_config_route_v3_RouteMatch_prefix(match)); // Empty prefix "" is accepted. - if (prefix.size() > 0) { + if (!prefix.empty()) { // Prefix "/" is accepted. if (prefix[0] != '/') { // Prefix which does not start with a / will never match anything, so @@ -1351,7 +1097,7 @@ grpc_error* RoutePathMatchParse(const envoy_config_route_v3_RouteMatch* match, } else if (envoy_config_route_v3_RouteMatch_has_path(match)) { absl::string_view path = UpbStringToAbsl(envoy_config_route_v3_RouteMatch_path(match)); - if (path.size() == 0) { + if (path.empty()) { // Path that is empty will never match anything, so ignore this route. *ignore_route = true; return GRPC_ERROR_NONE; @@ -1389,7 +1135,9 @@ grpc_error* RoutePathMatchParse(const envoy_config_route_v3_RouteMatch* match, GPR_ASSERT(regex_matcher != nullptr); std::string matcher = UpbStringToStdString( envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)); - std::unique_ptr<RE2> regex = absl::make_unique<RE2>(std::move(matcher)); + RE2::Options options; + options.set_case_sensitive(route->matchers.path_matcher.case_sensitive); + auto regex = absl::make_unique<RE2>(std::move(matcher), options); if (!regex->ok()) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Invalid regex string specified in path matcher."); @@ -1518,7 +1266,7 @@ grpc_error* RouteActionParse(const envoy_config_route_v3_Route* route_msg, if (envoy_config_route_v3_RouteAction_has_cluster(route_action)) { route->cluster_name = UpbStringToStdString( envoy_config_route_v3_RouteAction_cluster(route_action)); - if (route->cluster_name.size() == 0) { + if (route->cluster_name.empty()) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "RouteAction cluster contains empty cluster name."); } @@ -1557,6 +1305,7 @@ grpc_error* RouteActionParse(const envoy_config_route_v3_Route* route_msg, "RouteAction weighted_cluster cluster missing weight"); } cluster.weight = google_protobuf_UInt32Value_value(weight); + if (cluster.weight == 0) continue; sum_of_weights += cluster.weight; route->weighted_clusters.emplace_back(std::move(cluster)); } @@ -1571,16 +1320,36 @@ grpc_error* RouteActionParse(const envoy_config_route_v3_Route* route_msg, } else { // No cluster or weighted_clusters found in RouteAction, ignore this route. *ignore_route = true; - return GRPC_ERROR_NONE; + } + if (XdsTimeoutEnabled() && !*ignore_route) { + const envoy_config_route_v3_RouteAction_MaxStreamDuration* + max_stream_duration = + envoy_config_route_v3_RouteAction_max_stream_duration(route_action); + if (max_stream_duration != nullptr) { + const google_protobuf_Duration* duration = + envoy_config_route_v3_RouteAction_MaxStreamDuration_grpc_timeout_header_max( + max_stream_duration); + if (duration == nullptr) { + duration = + envoy_config_route_v3_RouteAction_MaxStreamDuration_max_stream_duration( + max_stream_duration); + } + if (duration != nullptr) { + XdsApi::Duration duration_in_route; + duration_in_route.seconds = google_protobuf_Duration_seconds(duration); + duration_in_route.nanos = google_protobuf_Duration_nanos(duration); + route->max_stream_duration = duration_in_route; + } + } } return GRPC_ERROR_NONE; } grpc_error* RouteConfigParse( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_config_route_v3_RouteConfiguration* route_config, XdsApi::RdsUpdate* rds_update) { - MaybeLogRouteConfiguration(client, tracer, route_config); + MaybeLogRouteConfiguration(client, tracer, symtab, route_config); // Get the virtual hosts. size_t size; const envoy_config_route_v3_VirtualHost* const* virtual_hosts = @@ -1636,13 +1405,6 @@ grpc_error* RouteConfigParse( error = RouteActionParse(routes[j], &route, &ignore_route); if (error != GRPC_ERROR_NONE) return error; if (ignore_route) continue; - const google_protobuf_BoolValue* case_sensitive = - envoy_config_route_v3_RouteMatch_case_sensitive(match); - if (case_sensitive != nullptr && - !google_protobuf_BoolValue_value(case_sensitive)) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "case_sensitive if set must be set to true."); - } vhost.routes.emplace_back(std::move(route)); } if (vhost.routes.empty()) { @@ -1653,7 +1415,7 @@ grpc_error* RouteConfigParse( } grpc_error* LdsResponseParse( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_service_discovery_v3_DiscoveryResponse* response, const std::set<absl::string_view>& expected_listener_names, XdsApi::LdsUpdateMap* lds_update_map, upb_arena* arena) { @@ -1708,6 +1470,23 @@ grpc_error* LdsResponseParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Could not parse HttpConnectionManager config from ApiListener"); } + if (XdsTimeoutEnabled()) { + // Obtain max_stream_duration from Http Protocol Options. + const envoy_config_core_v3_HttpProtocolOptions* options = + envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_common_http_protocol_options( + http_connection_manager); + if (options != nullptr) { + const google_protobuf_Duration* duration = + envoy_config_core_v3_HttpProtocolOptions_max_stream_duration( + options); + if (duration != nullptr) { + lds_update.http_max_stream_duration.seconds = + google_protobuf_Duration_seconds(duration); + lds_update.http_max_stream_duration.nanos = + google_protobuf_Duration_nanos(duration); + } + } + } // Found inlined route_config. Parse it to find the cluster_name. if (envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_has_route_config( http_connection_manager)) { @@ -1716,7 +1495,7 @@ grpc_error* LdsResponseParse( http_connection_manager); XdsApi::RdsUpdate rds_update; grpc_error* error = - RouteConfigParse(client, tracer, route_config, &rds_update); + RouteConfigParse(client, tracer, symtab, route_config, &rds_update); if (error != GRPC_ERROR_NONE) return error; lds_update.rds_update = std::move(rds_update); continue; @@ -1751,7 +1530,7 @@ grpc_error* LdsResponseParse( } grpc_error* RdsResponseParse( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_service_discovery_v3_DiscoveryResponse* response, const std::set<absl::string_view>& expected_route_configuration_names, XdsApi::RdsUpdateMap* rds_update_map, upb_arena* arena) { @@ -1793,12 +1572,25 @@ grpc_error* RdsResponseParse( XdsApi::RdsUpdate& rds_update = (*rds_update_map)[std::move(route_config_name)]; grpc_error* error = - RouteConfigParse(client, tracer, route_config, &rds_update); + RouteConfigParse(client, tracer, symtab, route_config, &rds_update); if (error != GRPC_ERROR_NONE) return error; } return GRPC_ERROR_NONE; } +XdsApi::CommonTlsContext::CertificateProviderInstance +CertificateProviderInstanceParse( + const envoy_extensions_transport_sockets_tls_v3_CommonTlsContext_CertificateProviderInstance* + certificate_provider_instance_proto) { + return { + UpbStringToStdString( + envoy_extensions_transport_sockets_tls_v3_CommonTlsContext_CertificateProviderInstance_instance_name( + certificate_provider_instance_proto)), + UpbStringToStdString( + envoy_extensions_transport_sockets_tls_v3_CommonTlsContext_CertificateProviderInstance_certificate_name( + certificate_provider_instance_proto))}; +} + grpc_error* CommonTlsContextParse( const envoy_extensions_transport_sockets_tls_v3_CommonTlsContext* common_tls_context_proto, @@ -1820,47 +1612,59 @@ grpc_error* CommonTlsContextParse( envoy_extensions_transport_sockets_tls_v3_CertificateValidationContext_match_subject_alt_names( default_validation_context, &len); for (size_t i = 0; i < len; ++i) { - XdsApi::StringMatcher matcher; + XdsApi::StringMatcher::StringMatcherType type; + std::string matcher; if (envoy_type_matcher_v3_StringMatcher_has_exact( subject_alt_names_matchers[i])) { - matcher.type = XdsApi::StringMatcher::StringMatcherType::EXACT; - matcher.string_matcher = + type = XdsApi::StringMatcher::StringMatcherType::EXACT; + matcher = UpbStringToStdString(envoy_type_matcher_v3_StringMatcher_exact( subject_alt_names_matchers[i])); } else if (envoy_type_matcher_v3_StringMatcher_has_prefix( subject_alt_names_matchers[i])) { - matcher.type = XdsApi::StringMatcher::StringMatcherType::PREFIX; - matcher.string_matcher = + type = XdsApi::StringMatcher::StringMatcherType::PREFIX; + matcher = UpbStringToStdString(envoy_type_matcher_v3_StringMatcher_prefix( subject_alt_names_matchers[i])); } else if (envoy_type_matcher_v3_StringMatcher_has_suffix( subject_alt_names_matchers[i])) { - matcher.type = XdsApi::StringMatcher::StringMatcherType::SUFFIX; - matcher.string_matcher = + type = XdsApi::StringMatcher::StringMatcherType::SUFFIX; + matcher = UpbStringToStdString(envoy_type_matcher_v3_StringMatcher_suffix( subject_alt_names_matchers[i])); + } else if (envoy_type_matcher_v3_StringMatcher_has_contains( + subject_alt_names_matchers[i])) { + type = XdsApi::StringMatcher::StringMatcherType::CONTAINS; + matcher = + UpbStringToStdString(envoy_type_matcher_v3_StringMatcher_contains( + subject_alt_names_matchers[i])); } else if (envoy_type_matcher_v3_StringMatcher_has_safe_regex( subject_alt_names_matchers[i])) { - matcher.type = XdsApi::StringMatcher::StringMatcherType::SAFE_REGEX; + type = XdsApi::StringMatcher::StringMatcherType::SAFE_REGEX; auto* regex_matcher = envoy_type_matcher_v3_StringMatcher_safe_regex( subject_alt_names_matchers[i]); - std::unique_ptr<RE2> regex = - absl::make_unique<RE2>(UpbStringToStdString( - envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher))); - if (!regex->ok()) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Invalid regex string specified in string matcher."); - } - matcher.regex_match = std::move(regex); + matcher = UpbStringToStdString( + envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)); } else { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Invalid StringMatcher specified"); } - matcher.ignore_case = envoy_type_matcher_v3_StringMatcher_ignore_case( + bool ignore_case = envoy_type_matcher_v3_StringMatcher_ignore_case( subject_alt_names_matchers[i]); + XdsApi::StringMatcher string_matcher(type, matcher, ignore_case); + if (type == XdsApi::StringMatcher::StringMatcherType::SAFE_REGEX) { + if (!string_matcher.regex_matcher()->ok()) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Invalid regex string specified in string matcher."); + } + if (ignore_case) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "StringMatcher: ignore_case has no effect for SAFE_REGEX."); + } + } common_tls_context->combined_validation_context - .default_validation_context.match_subject_alt_names.emplace_back( - matcher); + .default_validation_context.match_subject_alt_names.push_back( + std::move(string_matcher)); } } auto* validation_context_certificate_provider_instance = @@ -1868,25 +1672,24 @@ grpc_error* CommonTlsContextParse( combined_validation_context); if (validation_context_certificate_provider_instance != nullptr) { common_tls_context->combined_validation_context - .validation_context_certificate_provider_instance = UpbStringToStdString( - envoy_extensions_transport_sockets_tls_v3_CommonTlsContext_CertificateProviderInstance_instance_name( - validation_context_certificate_provider_instance)); + .validation_context_certificate_provider_instance = + CertificateProviderInstanceParse( + validation_context_certificate_provider_instance); } } auto* tls_certificate_certificate_provider_instance = envoy_extensions_transport_sockets_tls_v3_CommonTlsContext_tls_certificate_certificate_provider_instance( common_tls_context_proto); if (tls_certificate_certificate_provider_instance != nullptr) { - common_tls_context - ->tls_certificate_certificate_provider_instance = UpbStringToStdString( - envoy_extensions_transport_sockets_tls_v3_CommonTlsContext_CertificateProviderInstance_instance_name( - tls_certificate_certificate_provider_instance)); + common_tls_context->tls_certificate_certificate_provider_instance = + CertificateProviderInstanceParse( + tls_certificate_certificate_provider_instance); } return GRPC_ERROR_NONE; } grpc_error* CdsResponseParse( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_service_discovery_v3_DiscoveryResponse* response, const std::set<absl::string_view>& expected_cluster_names, XdsApi::CdsUpdateMap* cds_update_map, upb_arena* arena) { @@ -1910,7 +1713,7 @@ grpc_error* CdsResponseParse( if (cluster == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster."); } - MaybeLogCluster(client, tracer, cluster); + MaybeLogCluster(client, tracer, symtab, cluster); // Ignore unexpected cluster names. std::string cluster_name = UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(cluster)); @@ -1956,33 +1759,43 @@ grpc_error* CdsResponseParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "LB policy is not ROUND_ROBIN."); } - // Record Upstream tls context - auto* transport_socket = - envoy_config_cluster_v3_Cluster_transport_socket(cluster); - if (transport_socket != nullptr) { - absl::string_view name = UpbStringToAbsl( - envoy_config_core_v3_TransportSocket_name(transport_socket)); - if (name == "tls") { - auto* typed_config = - envoy_config_core_v3_TransportSocket_typed_config(transport_socket); - if (typed_config != nullptr) { - const upb_strview encoded_upstream_tls_context = - google_protobuf_Any_value(typed_config); - auto* upstream_tls_context = - envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_parse( - encoded_upstream_tls_context.data, - encoded_upstream_tls_context.size, arena); - if (upstream_tls_context == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Can't decode upstream tls context."); + if (XdsSecurityEnabled()) { + // Record Upstream tls context + auto* transport_socket = + envoy_config_cluster_v3_Cluster_transport_socket(cluster); + if (transport_socket != nullptr) { + absl::string_view name = UpbStringToAbsl( + envoy_config_core_v3_TransportSocket_name(transport_socket)); + if (name == "envoy.transport_sockets.tls") { + auto* typed_config = + envoy_config_core_v3_TransportSocket_typed_config( + transport_socket); + if (typed_config != nullptr) { + const upb_strview encoded_upstream_tls_context = + google_protobuf_Any_value(typed_config); + auto* upstream_tls_context = + envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_parse( + encoded_upstream_tls_context.data, + encoded_upstream_tls_context.size, arena); + if (upstream_tls_context == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Can't decode upstream tls context."); + } + auto* common_tls_context = + envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_common_tls_context( + upstream_tls_context); + if (common_tls_context != nullptr) { + grpc_error* error = CommonTlsContextParse( + common_tls_context, &cds_update.common_tls_context); + if (error != GRPC_ERROR_NONE) return error; + } } - auto* common_tls_context = - envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_common_tls_context( - upstream_tls_context); - if (common_tls_context != nullptr) { - grpc_error* error = CommonTlsContextParse( - common_tls_context, &cds_update.common_tls_context); - if (error != GRPC_ERROR_NONE) return error; + if (cds_update.common_tls_context.combined_validation_context + .validation_context_certificate_provider_instance + .instance_name.empty()) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "TLS configuration provided but no " + "validation_context_certificate_provider_instance found."); } } } @@ -2107,7 +1920,7 @@ grpc_error* DropParseAndAppend( std::string category = UpbStringToStdString( envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category( drop_overload)); - if (category.size() == 0) { + if (category.empty()) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name"); } // Get the drop rate (per million). @@ -2139,7 +1952,7 @@ grpc_error* DropParseAndAppend( } grpc_error* EdsResponseParse( - XdsClient* client, TraceFlag* tracer, + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, const envoy_service_discovery_v3_DiscoveryResponse* response, const std::set<absl::string_view>& expected_eds_service_names, XdsApi::EdsUpdateMap* eds_update_map, upb_arena* arena) { @@ -2165,7 +1978,8 @@ grpc_error* EdsResponseParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Can't parse cluster_load_assignment."); } - MaybeLogClusterLoadAssignment(client, tracer, cluster_load_assignment); + MaybeLogClusterLoadAssignment(client, tracer, symtab, + cluster_load_assignment); // Check the EDS service name. Ignore unexpected names. std::string eds_service_name = UpbStringToStdString( envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name( @@ -2263,7 +2077,7 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode DiscoveryResponse."); return result; } - MaybeLogDiscoveryResponse(client_, tracer_, response); + MaybeLogDiscoveryResponse(client_, tracer_, symtab_.ptr(), response); // Record the type_url, the version_info, and the nonce of the response. result.type_url = TypeUrlInternalToExternal(UpbStringToAbsl( envoy_service_discovery_v3_DiscoveryResponse_type_url(response))); @@ -2273,21 +2087,22 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( envoy_service_discovery_v3_DiscoveryResponse_nonce(response)); // Parse the response according to the resource type. if (IsLds(result.type_url)) { - result.parse_error = - LdsResponseParse(client_, tracer_, response, expected_listener_names, - &result.lds_update_map, arena.ptr()); + result.parse_error = LdsResponseParse(client_, tracer_, symtab_.ptr(), + response, expected_listener_names, + &result.lds_update_map, arena.ptr()); } else if (IsRds(result.type_url)) { - result.parse_error = RdsResponseParse(client_, tracer_, response, - expected_route_configuration_names, - &result.rds_update_map, arena.ptr()); - } else if (IsCds(result.type_url)) { result.parse_error = - CdsResponseParse(client_, tracer_, response, expected_cluster_names, - &result.cds_update_map, arena.ptr()); + RdsResponseParse(client_, tracer_, symtab_.ptr(), response, + expected_route_configuration_names, + &result.rds_update_map, arena.ptr()); + } else if (IsCds(result.type_url)) { + result.parse_error = CdsResponseParse(client_, tracer_, symtab_.ptr(), + response, expected_cluster_names, + &result.cds_update_map, arena.ptr()); } else if (IsEds(result.type_url)) { - result.parse_error = - EdsResponseParse(client_, tracer_, response, expected_eds_service_names, - &result.eds_update_map, arena.ptr()); + result.parse_error = EdsResponseParse(client_, tracer_, symtab_.ptr(), + response, expected_eds_service_names, + &result.eds_update_map, arena.ptr()); } return result; } @@ -2295,120 +2110,16 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( namespace { void MaybeLogLrsRequest( - XdsClient* client, TraceFlag* tracer, - const envoy_service_load_stats_v3_LoadStatsRequest* request, - const std::string& build_version) { + XdsClient* client, TraceFlag* tracer, upb_symtab* symtab, + const envoy_service_load_stats_v3_LoadStatsRequest* request) { if (GRPC_TRACE_FLAG_ENABLED(*tracer) && gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { - // TODO(roth): When we can upgrade upb, use upb textformat code to dump - // the raw proto instead of doing this manually. - std::vector<std::string> fields; - // node - const auto* node = - envoy_service_load_stats_v3_LoadStatsRequest_node(request); - if (node != nullptr) { - AddNodeLogFields(node, build_version, &fields); - } - // cluster_stats - size_t num_cluster_stats; - const struct envoy_config_endpoint_v3_ClusterStats* const* cluster_stats = - envoy_service_load_stats_v3_LoadStatsRequest_cluster_stats( - request, &num_cluster_stats); - for (size_t i = 0; i < num_cluster_stats; ++i) { - const auto* cluster_stat = cluster_stats[i]; - fields.emplace_back("cluster_stats {"); - // cluster_name - AddStringField( - " cluster_name", - envoy_config_endpoint_v3_ClusterStats_cluster_name(cluster_stat), - &fields); - // cluster_service_name - AddStringField(" cluster_service_name", - envoy_config_endpoint_v3_ClusterStats_cluster_service_name( - cluster_stat), - &fields); - // upstream_locality_stats - size_t num_stats; - const envoy_config_endpoint_v3_UpstreamLocalityStats* const* stats = - envoy_config_endpoint_v3_ClusterStats_upstream_locality_stats( - cluster_stat, &num_stats); - for (size_t j = 0; j < num_stats; ++j) { - const auto* stat = stats[j]; - fields.emplace_back(" upstream_locality_stats {"); - // locality - const auto* locality = - envoy_config_endpoint_v3_UpstreamLocalityStats_locality(stat); - if (locality != nullptr) { - fields.emplace_back(" locality {"); - AddLocalityField(3, locality, &fields); - fields.emplace_back(" }"); - } - // total_successful_requests - fields.emplace_back(absl::StrCat( - " total_successful_requests: ", - envoy_config_endpoint_v3_UpstreamLocalityStats_total_successful_requests( - stat))); - // total_requests_in_progress - fields.emplace_back(absl::StrCat( - " total_requests_in_progress: ", - envoy_config_endpoint_v3_UpstreamLocalityStats_total_requests_in_progress( - stat))); - // total_error_requests - fields.emplace_back(absl::StrCat( - " total_error_requests: ", - envoy_config_endpoint_v3_UpstreamLocalityStats_total_error_requests( - stat))); - // total_issued_requests - fields.emplace_back(absl::StrCat( - " total_issued_requests: ", - envoy_config_endpoint_v3_UpstreamLocalityStats_total_issued_requests( - stat))); - fields.emplace_back(" }"); - } - // total_dropped_requests - fields.emplace_back(absl::StrCat( - " total_dropped_requests: ", - envoy_config_endpoint_v3_ClusterStats_total_dropped_requests( - cluster_stat))); - // dropped_requests - size_t num_drops; - const envoy_config_endpoint_v3_ClusterStats_DroppedRequests* const* - drops = envoy_config_endpoint_v3_ClusterStats_dropped_requests( - cluster_stat, &num_drops); - for (size_t j = 0; j < num_drops; ++j) { - const auto* drop = drops[j]; - fields.emplace_back(" dropped_requests {"); - // category - AddStringField( - " category", - envoy_config_endpoint_v3_ClusterStats_DroppedRequests_category( - drop), - &fields); - // dropped_count - fields.emplace_back(absl::StrCat( - " dropped_count: ", - envoy_config_endpoint_v3_ClusterStats_DroppedRequests_dropped_count( - drop))); - fields.emplace_back(" }"); - } - // load_report_interval - const auto* load_report_interval = - envoy_config_endpoint_v3_ClusterStats_load_report_interval( - cluster_stat); - if (load_report_interval != nullptr) { - fields.emplace_back(" load_report_interval {"); - fields.emplace_back(absl::StrCat( - " seconds: ", - google_protobuf_Duration_seconds(load_report_interval))); - fields.emplace_back( - absl::StrCat(" nanos: ", - google_protobuf_Duration_nanos(load_report_interval))); - fields.emplace_back(" }"); - } - fields.emplace_back("}"); - } + const upb_msgdef* msg_type = + envoy_service_load_stats_v3_LoadStatsRequest_getmsgdef(symtab); + char buf[10240]; + upb_text_encode(request, msg_type, nullptr, 0, buf, sizeof(buf)); gpr_log(GPR_DEBUG, "[xds_client %p] constructed LRS request: %s", client, - absl::StrJoin(fields, "\n").c_str()); + buf); } } @@ -2423,7 +2134,8 @@ grpc_slice SerializeLrsRequest( } // namespace -grpc_slice XdsApi::CreateLrsInitialRequest() { +grpc_slice XdsApi::CreateLrsInitialRequest( + const XdsBootstrap::XdsServer& server) { upb::Arena arena; // Create a request. envoy_service_load_stats_v3_LoadStatsRequest* request = @@ -2432,12 +2144,12 @@ grpc_slice XdsApi::CreateLrsInitialRequest() { envoy_config_core_v3_Node* node_msg = envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, - node_msg); + PopulateNode(arena.ptr(), node_, server.ShouldUseV3(), build_version_, + user_agent_name_, node_msg); envoy_config_core_v3_Node_add_client_features( node_msg, upb_strview_makez("envoy.lrs.supports_send_all_clusters"), arena.ptr()); - MaybeLogLrsRequest(client_, tracer_, request, build_version_); + MaybeLogLrsRequest(client_, tracer_, symtab_.ptr(), request); return SerializeLrsRequest(request, arena.ptr()); } @@ -2549,7 +2261,7 @@ grpc_slice XdsApi::CreateLrsRequest( google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec); google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec); } - MaybeLogLrsRequest(client_, tracer_, request, build_version_); + MaybeLogLrsRequest(client_, tracer_, symtab_.ptr(), request); return SerializeLrsRequest(request, arena.ptr()); } diff --git a/grpc/src/core/ext/xds/xds_api.h b/grpc/src/core/ext/xds/xds_api.h index 44e8a8b6..f2a67072 100644 --- a/grpc/src/core/ext/xds/xds_api.h +++ b/grpc/src/core/ext/xds/xds_api.h @@ -29,6 +29,8 @@ #include "absl/types/optional.h" #include "re2/re2.h" +#include "upb/def.hpp" + #include <grpc/slice_buffer.h> #include "src/core/ext/filters/client_channel/server_address.h" @@ -37,6 +39,11 @@ namespace grpc_core { +// TODO(yashykt): Check to see if xDS security is enabled. This will be +// removed once this feature is fully integration-tested and enabled by +// default. +bool XdsSecurityEnabled(); + class XdsClient; class XdsApi { @@ -46,6 +53,17 @@ class XdsApi { static const char* kCdsTypeUrl; static const char* kEdsTypeUrl; + struct Duration { + int64_t seconds = 0; + int32_t nanos = 0; + bool operator==(const Duration& other) const { + return (seconds == other.seconds && nanos == other.nanos); + } + std::string ToString() const { + return absl::StrFormat("Duration seconds: %ld, nanos %d", seconds, nanos); + } + }; + // TODO(donnadionne): When we can use absl::variant<>, consider using that // for: PathMatcher, HeaderMatcher, cluster_name and weighted_clusters struct Route { @@ -60,6 +78,7 @@ class XdsApi { PathMatcherType type; std::string string_matcher; std::unique_ptr<RE2> regex_matcher; + bool case_sensitive = true; PathMatcher() = default; PathMatcher(const PathMatcher& other); @@ -122,11 +141,17 @@ class XdsApi { std::string ToString() const; }; std::vector<ClusterWeight> weighted_clusters; + // Storing the timeout duration from route action: + // RouteAction.max_stream_duration.grpc_timeout_header_max or + // RouteAction.max_stream_duration.max_stream_duration if the former is + // not set. + absl::optional<Duration> max_stream_duration; bool operator==(const Route& other) const { return (matchers == other.matchers && cluster_name == other.cluster_name && - weighted_clusters == other.weighted_clusters); + weighted_clusters == other.weighted_clusters && + max_stream_duration == other.max_stream_duration); } std::string ToString() const; }; @@ -150,23 +175,40 @@ class XdsApi { VirtualHost* FindVirtualHostForDomain(const std::string& domain); }; - struct StringMatcher { + class StringMatcher { + public: enum class StringMatcherType { - EXACT, // value stored in string_matcher_field - PREFIX, // value stored in string_matcher_field - SUFFIX, // value stored in string_matcher_field - SAFE_REGEX, // use regex_match field - CONTAINS, // value stored in string_matcher_field + EXACT, // value stored in string_matcher_ field + PREFIX, // value stored in string_matcher_ field + SUFFIX, // value stored in string_matcher_ field + SAFE_REGEX, // pattern stored in regex_matcher_ field + CONTAINS, // value stored in string_matcher_ field }; - StringMatcherType type; - std::string string_matcher; - std::unique_ptr<RE2> regex_match; - bool ignore_case; StringMatcher() = default; StringMatcher(const StringMatcher& other); + StringMatcher(StringMatcherType type, const std::string& matcher, + bool ignore_case = false); StringMatcher& operator=(const StringMatcher& other); bool operator==(const StringMatcher& other) const; + + bool Match(absl::string_view value) const; + + std::string ToString() const; + + StringMatcherType type() const { return type_; } + + // Valid for EXACT, PREFIX, SUFFIX and CONTAINS + const std::string& string_matcher() const { return string_matcher_; } + + // Valid for SAFE_REGEX + RE2* regex_matcher() const { return regex_matcher_.get(); } + + private: + StringMatcherType type_ = StringMatcherType::EXACT; + std::string string_matcher_; + std::unique_ptr<RE2> regex_matcher_; + bool ignore_case_ = false; }; struct CommonTlsContext { @@ -176,20 +218,40 @@ class XdsApi { bool operator==(const CertificateValidationContext& other) const { return match_subject_alt_names == other.match_subject_alt_names; } + + std::string ToString() const; + bool Empty() const; + }; + + struct CertificateProviderInstance { + std::string instance_name; + std::string certificate_name; + + bool operator==(const CertificateProviderInstance& other) const { + return instance_name == other.instance_name && + certificate_name == other.certificate_name; + } + + std::string ToString() const; + bool Empty() const; }; struct CombinedCertificateValidationContext { CertificateValidationContext default_validation_context; - std::string validation_context_certificate_provider_instance; + CertificateProviderInstance + validation_context_certificate_provider_instance; bool operator==(const CombinedCertificateValidationContext& other) const { return default_validation_context == other.default_validation_context && validation_context_certificate_provider_instance == other.validation_context_certificate_provider_instance; } + + std::string ToString() const; + bool Empty() const; }; - std::string tls_certificate_certificate_provider_instance; + CertificateProviderInstance tls_certificate_certificate_provider_instance; CombinedCertificateValidationContext combined_validation_context; bool operator==(const CommonTlsContext& other) const { @@ -197,6 +259,9 @@ class XdsApi { other.tls_certificate_certificate_provider_instance && combined_validation_context == other.combined_validation_context; } + + std::string ToString() const; + bool Empty() const; }; // TODO(roth): When we can use absl::variant<>, consider using that @@ -204,13 +269,17 @@ class XdsApi { struct LdsUpdate { // The name to use in the RDS request. std::string route_config_name; + // Storing the Http Connection Manager Common Http Protocol Option + // max_stream_duration + Duration http_max_stream_duration; // The RouteConfiguration to use for this listener. // Present only if it is inlined in the LDS response. absl::optional<RdsUpdate> rds_update; bool operator==(const LdsUpdate& other) const { return route_config_name == other.route_config_name && - rds_update == other.rds_update; + rds_update == other.rds_update && + http_max_stream_duration == other.http_max_stream_duration; } }; @@ -240,6 +309,8 @@ class XdsApi { other.lrs_load_reporting_server_name && max_concurrent_requests == other.max_concurrent_requests; } + + std::string ToString() const; }; using CdsUpdateMap = std::map<std::string /*cluster_name*/, CdsUpdate>; @@ -339,11 +410,12 @@ class XdsApi { std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, ClusterLoadReport>; - XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap* bootstrap); + XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node); // Creates an ADS request. // Takes ownership of \a error. - grpc_slice CreateAdsRequest(const std::string& type_url, + grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer& server, + const std::string& type_url, const std::set<absl::string_view>& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, @@ -370,7 +442,7 @@ class XdsApi { const std::set<absl::string_view>& expected_eds_service_names); // Creates an initial LRS request. - grpc_slice CreateLrsInitialRequest(); + grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server); // Creates an LRS request sending a client-side load report. grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map); @@ -386,8 +458,8 @@ class XdsApi { private: XdsClient* client_; TraceFlag* tracer_; - const bool use_v3_; - const XdsBootstrap* bootstrap_; // Do not own. + const XdsBootstrap::Node* node_; // Do not own. + upb::SymbolTable symtab_; const std::string build_version_; const std::string user_agent_name_; }; diff --git a/grpc/src/core/ext/xds/xds_bootstrap.cc b/grpc/src/core/ext/xds/xds_bootstrap.cc index 05c37031..e48d9827 100644 --- a/grpc/src/core/ext/xds/xds_bootstrap.cc +++ b/grpc/src/core/ext/xds/xds_bootstrap.cc @@ -28,17 +28,59 @@ #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include "src/core/ext/xds/certificate_provider_registry.h" +#include "src/core/ext/xds/xds_api.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/load_file.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/slice/slice_internal.h" namespace grpc_core { +// +// XdsChannelCredsRegistry +// + +bool XdsChannelCredsRegistry::IsSupported(const std::string& creds_type) { + return creds_type == "google_default" || creds_type == "insecure" || + creds_type == "fake"; +} + +bool XdsChannelCredsRegistry::IsValidConfig(const std::string& creds_type, + const Json& config) { + // Currently, none of the creds types actually take a config, but we + // ignore whatever might be specified in the bootstrap file for + // forward compatibility reasons. + return true; +} + +RefCountedPtr<grpc_channel_credentials> +XdsChannelCredsRegistry::MakeChannelCreds(const std::string& creds_type, + const Json& config) { + if (creds_type == "google_default") { + return grpc_google_default_credentials_create(nullptr); + } else if (creds_type == "insecure") { + return grpc_insecure_credentials_create(); + } else if (creds_type == "fake") { + return grpc_fake_transport_security_credentials_create(); + } + return nullptr; +} + +// +// XdsBootstrap::XdsServer +// + bool XdsBootstrap::XdsServer::ShouldUseV3() const { return server_features.find("xds_v3") != server_features.end(); } +// +// XdsBootstrap +// + namespace { std::string BootstrapString(const XdsBootstrap& bootstrap) { @@ -59,23 +101,34 @@ std::string BootstrapString(const XdsBootstrap& bootstrap) { bootstrap.node()->locality_region, bootstrap.node()->locality_zone, bootstrap.node()->locality_subzone, bootstrap.node()->metadata.Dump())); } - parts.push_back( - absl::StrFormat("servers=[\n" - " {\n" - " uri=\"%s\",\n" - " creds=[\n", - bootstrap.server().server_uri)); - for (const auto& creds : bootstrap.server().channel_creds) { - parts.push_back(absl::StrFormat(" {type=\"%s\", config=%s},\n", - creds.type, creds.config.Dump())); - } - parts.push_back(" ],\n"); + parts.push_back(absl::StrFormat( + "servers=[\n" + " {\n" + " uri=\"%s\",\n" + " creds_type=%s,\n", + bootstrap.server().server_uri, bootstrap.server().channel_creds_type)); + if (bootstrap.server().channel_creds_config.type() != Json::Type::JSON_NULL) { + parts.push_back( + absl::StrFormat(" creds_config=%s,", + bootstrap.server().channel_creds_config.Dump())); + } if (!bootstrap.server().server_features.empty()) { parts.push_back(absl::StrCat( " server_features=[", absl::StrJoin(bootstrap.server().server_features, ", "), "],\n")); } - parts.push_back(" }\n]"); + parts.push_back(" }\n],\n"); + parts.push_back("certificate_providers={\n"); + for (const auto& entry : bootstrap.certificate_providers()) { + parts.push_back( + absl::StrFormat(" %s={\n" + " plugin_name=%s\n" + " config=%s\n" + " },\n", + entry.first, entry.second.plugin_name, + entry.second.config->ToString())); + } + parts.push_back("}"); return absl::StrJoin(parts, ""); } @@ -152,6 +205,18 @@ XdsBootstrap::XdsBootstrap(Json json, grpc_error** error) { if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); } } + if (XdsSecurityEnabled()) { + it = json.mutable_object()->find("certificate_providers"); + if (it != json.mutable_object()->end()) { + if (it->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"certificate_providers\" field is not an object")); + } else { + grpc_error* parse_error = ParseCertificateProviders(&it->second); + if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); + } + } + } *error = GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing xds bootstrap file", &error_list); } @@ -187,14 +252,15 @@ grpc_error* XdsBootstrap::ParseXdsServer(Json* json, size_t idx) { server.server_uri = std::move(*it->second.mutable_string_value()); } it = json->mutable_object()->find("channel_creds"); - if (it != json->mutable_object()->end()) { - if (it->second.type() != Json::Type::ARRAY) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "\"channel_creds\" field is not an array")); - } else { - grpc_error* parse_error = ParseChannelCredsArray(&it->second, &server); - if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); - } + if (it == json->mutable_object()->end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"channel_creds\" field not present")); + } else if (it->second.type() != Json::Type::ARRAY) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"channel_creds\" field is not an array")); + } else { + grpc_error* parse_error = ParseChannelCredsArray(&it->second, &server); + if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); } it = json->mutable_object()->find("server_features"); if (it != json->mutable_object()->end()) { @@ -230,6 +296,10 @@ grpc_error* XdsBootstrap::ParseChannelCredsArray(Json* json, if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); } } + if (server->channel_creds_type.empty()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "no known creds type found in \"channel_creds\"")); + } return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"channel_creds\" array", &error_list); } @@ -237,7 +307,7 @@ grpc_error* XdsBootstrap::ParseChannelCredsArray(Json* json, grpc_error* XdsBootstrap::ParseChannelCreds(Json* json, size_t idx, XdsServer* server) { std::vector<grpc_error*> error_list; - ChannelCreds channel_creds; + std::string type; auto it = json->mutable_object()->find("type"); if (it == json->mutable_object()->end()) { error_list.push_back( @@ -246,19 +316,28 @@ grpc_error* XdsBootstrap::ParseChannelCreds(Json* json, size_t idx, error_list.push_back( GRPC_ERROR_CREATE_FROM_STATIC_STRING("\"type\" field is not a string")); } else { - channel_creds.type = std::move(*it->second.mutable_string_value()); + type = std::move(*it->second.mutable_string_value()); } + Json config; it = json->mutable_object()->find("config"); if (it != json->mutable_object()->end()) { if (it->second.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "\"config\" field is not an object")); } else { - channel_creds.config = std::move(it->second); + config = std::move(it->second); } } - if (!channel_creds.type.empty()) { - server->channel_creds.emplace_back(std::move(channel_creds)); + // Select the first channel creds type that we support. + if (server->channel_creds_type.empty() && + XdsChannelCredsRegistry::IsSupported(type)) { + if (!XdsChannelCredsRegistry::IsValidConfig(type, config)) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("invalid config for channel creds type \"", type, "\"") + .c_str())); + } + server->channel_creds_type = std::move(type); + server->channel_creds_config = std::move(config); } // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error // string is not static in this case. @@ -370,4 +449,72 @@ grpc_error* XdsBootstrap::ParseLocality(Json* json) { &error_list); } +grpc_error* XdsBootstrap::ParseCertificateProviders(Json* json) { + std::vector<grpc_error*> error_list; + for (auto& certificate_provider : *(json->mutable_object())) { + if (certificate_provider.second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("element \"", certificate_provider.first, + "\" is not an object") + .c_str())); + } else { + grpc_error* parse_error = ParseCertificateProvider( + certificate_provider.first, &certificate_provider.second); + if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); + } + } + return GRPC_ERROR_CREATE_FROM_VECTOR( + "errors parsing \"certificate_providers\" object", &error_list); +} + +grpc_error* XdsBootstrap::ParseCertificateProvider( + const std::string& instance_name, Json* certificate_provider_json) { + std::vector<grpc_error*> error_list; + auto it = certificate_provider_json->mutable_object()->find("plugin_name"); + if (it == certificate_provider_json->mutable_object()->end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"plugin_name\" field not present")); + } else if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"plugin_name\" field is not a string")); + } else { + std::string plugin_name = std::move(*(it->second.mutable_string_value())); + CertificateProviderFactory* factory = + CertificateProviderRegistry::LookupCertificateProviderFactory( + plugin_name); + if (factory != nullptr) { + RefCountedPtr<CertificateProviderFactory::Config> config; + it = certificate_provider_json->mutable_object()->find("config"); + if (it != certificate_provider_json->mutable_object()->end()) { + if (it->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"config\" field is not an object")); + } else { + grpc_error* parse_error = GRPC_ERROR_NONE; + config = factory->CreateCertificateProviderConfig(it->second, + &parse_error); + if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); + } + } else { + // "config" is an optional field, so create an empty JSON object. + grpc_error* parse_error = GRPC_ERROR_NONE; + config = factory->CreateCertificateProviderConfig(Json::Object(), + &parse_error); + if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error); + } + certificate_providers_.insert( + {instance_name, {std::move(plugin_name), std::move(config)}}); + } + } + // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error + // string is not static in this case. + if (error_list.empty()) return GRPC_ERROR_NONE; + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("errors parsing element \"", instance_name, "\"").c_str()); + for (size_t i = 0; i < error_list.size(); ++i) { + error = grpc_error_add_child(error, error_list[i]); + } + return error; +} + } // namespace grpc_core diff --git a/grpc/src/core/ext/xds/xds_bootstrap.h b/grpc/src/core/ext/xds/xds_bootstrap.h index a43c8c0f..969d5d54 100644 --- a/grpc/src/core/ext/xds/xds_bootstrap.h +++ b/grpc/src/core/ext/xds/xds_bootstrap.h @@ -26,17 +26,27 @@ #include "absl/container/inlined_vector.h" -#include <grpc/impl/codegen/slice.h> +#include <grpc/slice.h> -#include "src/core/lib/gprpp/map.h" +#include "src/core/ext/xds/certificate_provider_store.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/json/json.h" +#include "src/core/lib/security/credentials/credentials.h" namespace grpc_core { class XdsClient; +class XdsChannelCredsRegistry { + public: + static bool IsSupported(const std::string& creds_type); + static bool IsValidConfig(const std::string& creds_type, const Json& config); + static RefCountedPtr<grpc_channel_credentials> MakeChannelCreds( + const std::string& creds_type, const Json& config); +}; + class XdsBootstrap { public: struct Node { @@ -48,14 +58,10 @@ class XdsBootstrap { Json metadata; }; - struct ChannelCreds { - std::string type; - Json config; - }; - struct XdsServer { std::string server_uri; - absl::InlinedVector<ChannelCreds, 1> channel_creds; + std::string channel_creds_type; + Json channel_creds_config; std::set<std::string> server_features; bool ShouldUseV3() const; @@ -75,6 +81,11 @@ class XdsBootstrap { const XdsServer& server() const { return servers_[0]; } const Node* node() const { return node_.get(); } + const CertificateProviderStore::PluginDefinitionMap& certificate_providers() + const { + return certificate_providers_; + } + private: grpc_error* ParseXdsServerList(Json* json); grpc_error* ParseXdsServer(Json* json, size_t idx); @@ -83,9 +94,13 @@ class XdsBootstrap { grpc_error* ParseServerFeaturesArray(Json* json, XdsServer* server); grpc_error* ParseNode(Json* json); grpc_error* ParseLocality(Json* json); + grpc_error* ParseCertificateProviders(Json* json); + grpc_error* ParseCertificateProvider(const std::string& instance_name, + Json* certificate_provider_json); absl::InlinedVector<XdsServer, 1> servers_; std::unique_ptr<Node> node_; + CertificateProviderStore::PluginDefinitionMap certificate_providers_; }; } // namespace grpc_core diff --git a/grpc/src/core/ext/xds/xds_certificate_provider.cc b/grpc/src/core/ext/xds/xds_certificate_provider.cc new file mode 100644 index 00000000..f285b6db --- /dev/null +++ b/grpc/src/core/ext/xds/xds_certificate_provider.cc @@ -0,0 +1,299 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/xds/xds_certificate_provider.h" + +#include "absl/functional/bind_front.h" +#include "absl/strings/str_cat.h" + +#include "src/core/lib/gpr/useful.h" + +namespace grpc_core { + +namespace { + +class RootCertificatesWatcher + : public grpc_tls_certificate_distributor::TlsCertificatesWatcherInterface { + public: + // Takes a ref to \a parent instead of a raw pointer since the watcher is + // owned by the root certificate distributor and not by \a parent. Note that + // presently, the watcher is immediately deleted when + // CancelTlsCertificatesWatch() is called, but that can potentially change in + // the future. + explicit RootCertificatesWatcher( + RefCountedPtr<grpc_tls_certificate_distributor> parent) + : parent_(std::move(parent)) {} + + void OnCertificatesChanged(absl::optional<absl::string_view> root_certs, + absl::optional<PemKeyCertPairList> + /* key_cert_pairs */) override { + if (root_certs.has_value()) { + parent_->SetKeyMaterials("", std::string(root_certs.value()), + absl::nullopt); + } + } + + void OnError(grpc_error* root_cert_error, + grpc_error* identity_cert_error) override { + if (root_cert_error != GRPC_ERROR_NONE) { + parent_->SetErrorForCert("", root_cert_error /* pass the ref */, + absl::nullopt); + } + GRPC_ERROR_UNREF(identity_cert_error); + } + + private: + RefCountedPtr<grpc_tls_certificate_distributor> parent_; +}; + +class IdentityCertificatesWatcher + : public grpc_tls_certificate_distributor::TlsCertificatesWatcherInterface { + public: + // Takes a ref to \a parent instead of a raw pointer since the watcher is + // owned by the root certificate distributor and not by \a parent. Note that + // presently, the watcher is immediately deleted when + // CancelTlsCertificatesWatch() is called, but that can potentially change in + // the future. + explicit IdentityCertificatesWatcher( + RefCountedPtr<grpc_tls_certificate_distributor> parent) + : parent_(std::move(parent)) {} + + void OnCertificatesChanged( + absl::optional<absl::string_view> /* root_certs */, + absl::optional<PemKeyCertPairList> key_cert_pairs) override { + if (key_cert_pairs.has_value()) { + parent_->SetKeyMaterials("", absl::nullopt, key_cert_pairs); + } + } + + void OnError(grpc_error* root_cert_error, + grpc_error* identity_cert_error) override { + if (identity_cert_error != GRPC_ERROR_NONE) { + parent_->SetErrorForCert("", absl::nullopt, + identity_cert_error /* pass the ref */); + } + GRPC_ERROR_UNREF(root_cert_error); + } + + private: + RefCountedPtr<grpc_tls_certificate_distributor> parent_; +}; + +} // namespace + +XdsCertificateProvider::XdsCertificateProvider( + absl::string_view root_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> root_cert_distributor, + absl::string_view identity_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> identity_cert_distributor, + std::vector<XdsApi::StringMatcher> san_matchers) + : root_cert_name_(root_cert_name), + identity_cert_name_(identity_cert_name), + root_cert_distributor_(std::move(root_cert_distributor)), + identity_cert_distributor_(std::move(identity_cert_distributor)), + san_matchers_(std::move(san_matchers)), + distributor_(MakeRefCounted<grpc_tls_certificate_distributor>()) { + distributor_->SetWatchStatusCallback( + absl::bind_front(&XdsCertificateProvider::WatchStatusCallback, this)); +} + +XdsCertificateProvider::~XdsCertificateProvider() { + distributor_->SetWatchStatusCallback(nullptr); +} + +void XdsCertificateProvider::UpdateRootCertNameAndDistributor( + absl::string_view root_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> root_cert_distributor) { + MutexLock lock(&mu_); + if (root_cert_name_ == root_cert_name && + root_cert_distributor_ == root_cert_distributor) { + return; + } + root_cert_name_ = std::string(root_cert_name); + if (watching_root_certs_) { + // The root certificates are being watched. Swap out the watcher. + if (root_cert_distributor_ != nullptr) { + root_cert_distributor_->CancelTlsCertificatesWatch(root_cert_watcher_); + } + if (root_cert_distributor != nullptr) { + UpdateRootCertWatcher(root_cert_distributor.get()); + } else { + root_cert_watcher_ = nullptr; + distributor_->SetErrorForCert( + "", + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No certificate provider available for root certificates"), + absl::nullopt); + } + } + // Swap out the root certificate distributor + root_cert_distributor_ = std::move(root_cert_distributor); +} + +void XdsCertificateProvider::UpdateIdentityCertNameAndDistributor( + absl::string_view identity_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> identity_cert_distributor) { + MutexLock lock(&mu_); + if (identity_cert_name_ == identity_cert_name && + identity_cert_distributor_ == identity_cert_distributor) { + return; + } + identity_cert_name_ = std::string(identity_cert_name); + if (watching_identity_certs_) { + // The identity certificates are being watched. Swap out the watcher. + if (identity_cert_distributor_ != nullptr) { + identity_cert_distributor_->CancelTlsCertificatesWatch( + identity_cert_watcher_); + } + if (identity_cert_distributor != nullptr) { + UpdateIdentityCertWatcher(identity_cert_distributor.get()); + } else { + identity_cert_watcher_ = nullptr; + distributor_->SetErrorForCert( + "", absl::nullopt, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No certificate provider available for identity certificates")); + } + } + // Swap out the identity certificate distributor + identity_cert_distributor_ = std::move(identity_cert_distributor); +} + +void XdsCertificateProvider::UpdateSubjectAlternativeNameMatchers( + std::vector<XdsApi::StringMatcher> matchers) { + MutexLock lock(&san_matchers_mu_); + san_matchers_ = std::move(matchers); +} + +void XdsCertificateProvider::WatchStatusCallback(std::string cert_name, + bool root_being_watched, + bool identity_being_watched) { + // We aren't specially handling the case where root_cert_distributor is same + // as identity_cert_distributor. Always using two separate watchers + // irrespective of the fact results in a straightforward design, and using a + // single watcher does not seem to provide any benefit other than cutting down + // on the number of callbacks. + MutexLock lock(&mu_); + if (!cert_name.empty()) { + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Illegal certificate name: \'", cert_name, + "\'. Should be empty.") + .c_str()); + distributor_->SetErrorForCert(cert_name, GRPC_ERROR_REF(error), + GRPC_ERROR_REF(error)); + GRPC_ERROR_UNREF(error); + return; + } + if (root_being_watched && !watching_root_certs_) { + // We need to start watching root certs. + watching_root_certs_ = true; + if (root_cert_distributor_ == nullptr) { + distributor_->SetErrorForCert( + "", + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No certificate provider available for root certificates"), + absl::nullopt); + } else { + UpdateRootCertWatcher(root_cert_distributor_.get()); + } + } else if (!root_being_watched && watching_root_certs_) { + // We need to cancel root certs watch. + watching_root_certs_ = false; + if (root_cert_distributor_ != nullptr) { + root_cert_distributor_->CancelTlsCertificatesWatch(root_cert_watcher_); + root_cert_watcher_ = nullptr; + } + GPR_ASSERT(root_cert_watcher_ == nullptr); + } + if (identity_being_watched && !watching_identity_certs_) { + watching_identity_certs_ = true; + if (identity_cert_distributor_ == nullptr) { + distributor_->SetErrorForCert( + "", absl::nullopt, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No certificate provider available for identity certificates")); + } else { + UpdateIdentityCertWatcher(identity_cert_distributor_.get()); + } + } else if (!identity_being_watched && watching_identity_certs_) { + watching_identity_certs_ = false; + if (identity_cert_distributor_ != nullptr) { + identity_cert_distributor_->CancelTlsCertificatesWatch( + identity_cert_watcher_); + identity_cert_watcher_ = nullptr; + } + GPR_ASSERT(identity_cert_watcher_ == nullptr); + } +} + +void XdsCertificateProvider::UpdateRootCertWatcher( + grpc_tls_certificate_distributor* root_cert_distributor) { + auto watcher = absl::make_unique<RootCertificatesWatcher>(distributor()); + root_cert_watcher_ = watcher.get(); + root_cert_distributor->WatchTlsCertificates(std::move(watcher), + root_cert_name_, absl::nullopt); +} + +void XdsCertificateProvider::UpdateIdentityCertWatcher( + grpc_tls_certificate_distributor* identity_cert_distributor) { + auto watcher = absl::make_unique<IdentityCertificatesWatcher>(distributor()); + identity_cert_watcher_ = watcher.get(); + identity_cert_distributor->WatchTlsCertificates( + std::move(watcher), absl::nullopt, identity_cert_name_); +} + +namespace { + +void* XdsCertificateProviderArgCopy(void* p) { + XdsCertificateProvider* xds_certificate_provider = + static_cast<XdsCertificateProvider*>(p); + return xds_certificate_provider->Ref().release(); +} + +void XdsCertificateProviderArgDestroy(void* p) { + XdsCertificateProvider* xds_certificate_provider = + static_cast<XdsCertificateProvider*>(p); + xds_certificate_provider->Unref(); +} + +int XdsCertificateProviderArgCmp(void* p, void* q) { return GPR_ICMP(p, q); } + +const grpc_arg_pointer_vtable kChannelArgVtable = { + XdsCertificateProviderArgCopy, XdsCertificateProviderArgDestroy, + XdsCertificateProviderArgCmp}; + +} // namespace + +grpc_arg XdsCertificateProvider::MakeChannelArg() const { + return grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_XDS_CERTIFICATE_PROVIDER), + const_cast<XdsCertificateProvider*>(this), &kChannelArgVtable); +} + +RefCountedPtr<XdsCertificateProvider> +XdsCertificateProvider::GetFromChannelArgs(const grpc_channel_args* args) { + XdsCertificateProvider* xds_certificate_provider = + grpc_channel_args_find_pointer<XdsCertificateProvider>( + args, GRPC_ARG_XDS_CERTIFICATE_PROVIDER); + return xds_certificate_provider != nullptr ? xds_certificate_provider->Ref() + : nullptr; +} + +} // namespace grpc_core diff --git a/grpc/src/core/ext/xds/xds_certificate_provider.h b/grpc/src/core/ext/xds/xds_certificate_provider.h new file mode 100644 index 00000000..4d13423a --- /dev/null +++ b/grpc/src/core/ext/xds/xds_certificate_provider.h @@ -0,0 +1,112 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_CORE_EXT_XDS_XDS_CERTIFICATE_PROVIDER_H +#define GRPC_CORE_EXT_XDS_XDS_CERTIFICATE_PROVIDER_H + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/xds/xds_api.h" +#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h" + +#define GRPC_ARG_XDS_CERTIFICATE_PROVIDER \ + "grpc.internal.xds_certificate_provider" + +namespace grpc_core { + +class XdsCertificateProvider : public grpc_tls_certificate_provider { + public: + XdsCertificateProvider( + absl::string_view root_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> root_cert_distributor, + absl::string_view identity_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> identity_cert_distributor, + std::vector<XdsApi::StringMatcher> san_matchers); + + ~XdsCertificateProvider() override; + + void UpdateRootCertNameAndDistributor( + absl::string_view root_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> root_cert_distributor); + void UpdateIdentityCertNameAndDistributor( + absl::string_view identity_cert_name, + RefCountedPtr<grpc_tls_certificate_distributor> + identity_cert_distributor); + void UpdateSubjectAlternativeNameMatchers( + std::vector<XdsApi::StringMatcher> matchers); + + grpc_core::RefCountedPtr<grpc_tls_certificate_distributor> distributor() + const override { + return distributor_; + } + + bool ProvidesRootCerts() { + MutexLock lock(&mu_); + return root_cert_distributor_ != nullptr; + } + + bool ProvidesIdentityCerts() { + MutexLock lock(&mu_); + return identity_cert_distributor_ != nullptr; + } + + std::vector<XdsApi::StringMatcher> subject_alternative_name_matchers() { + MutexLock lock(&san_matchers_mu_); + return san_matchers_; + } + + grpc_arg MakeChannelArg() const; + + static RefCountedPtr<XdsCertificateProvider> GetFromChannelArgs( + const grpc_channel_args* args); + + private: + void WatchStatusCallback(std::string cert_name, bool root_being_watched, + bool identity_being_watched); + void UpdateRootCertWatcher( + grpc_tls_certificate_distributor* root_cert_distributor); + void UpdateIdentityCertWatcher( + grpc_tls_certificate_distributor* identity_cert_distributor); + + Mutex mu_; + // Use a separate mutex for san_matchers_ to avoid deadlocks since + // san_matchers_ needs to be accessed when a handshake is being done and we + // run into a possible deadlock scenario if using the same mutex. The mutex + // deadlock cycle is formed as - + // WatchStatusCallback() -> SetKeyMaterials() -> + // TlsChannelSecurityConnector::TlsChannelCertificateWatcher::OnCertificatesChanged() + // -> HandshakeManager::Add() -> SecurityHandshaker::DoHandshake() -> + // subject_alternative_names_matchers() + Mutex san_matchers_mu_; + bool watching_root_certs_ = false; + bool watching_identity_certs_ = false; + std::string root_cert_name_; + std::string identity_cert_name_; + RefCountedPtr<grpc_tls_certificate_distributor> root_cert_distributor_; + RefCountedPtr<grpc_tls_certificate_distributor> identity_cert_distributor_; + std::vector<XdsApi::StringMatcher> san_matchers_; + RefCountedPtr<grpc_tls_certificate_distributor> distributor_; + grpc_tls_certificate_distributor::TlsCertificatesWatcherInterface* + root_cert_watcher_ = nullptr; + grpc_tls_certificate_distributor::TlsCertificatesWatcherInterface* + identity_cert_watcher_ = nullptr; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_EXT_XDS_XDS_CERTIFICATE_PROVIDER_H diff --git a/grpc/src/core/ext/xds/xds_client.cc b/grpc/src/core/ext/xds/xds_client.cc index 6d448641..b31bbbdc 100644 --- a/grpc/src/core/ext/xds/xds_client.cc +++ b/grpc/src/core/ext/xds/xds_client.cc @@ -42,7 +42,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -50,8 +49,6 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/surface/call.h" @@ -68,6 +65,7 @@ namespace grpc_core { TraceFlag grpc_xds_client_trace(false, "xds_client"); +TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); namespace { @@ -143,8 +141,11 @@ class XdsClient::ChannelState::AdsCallState private: class ResourceState : public InternallyRefCounted<ResourceState> { public: - ResourceState(const std::string& type_url, const std::string& name) - : type_url_(type_url), name_(name) { + ResourceState(const std::string& type_url, const std::string& name, + bool sent_initial_request) + : type_url_(type_url), + name_(name), + sent_initial_request_(sent_initial_request) { GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, grpc_schedule_on_exec_ctx); } @@ -155,8 +156,8 @@ class XdsClient::ChannelState::AdsCallState } void Start(RefCountedPtr<AdsCallState> ads_calld) { - if (sent_) return; - sent_ = true; + if (sent_initial_request_) return; + sent_initial_request_ = true; ads_calld_ = std::move(ads_calld); Ref(DEBUG_LOCATION, "timer").release(); timer_pending_ = true; @@ -229,7 +230,7 @@ class XdsClient::ChannelState::AdsCallState const std::string name_; RefCountedPtr<AdsCallState> ads_calld_; - bool sent_ = false; + bool sent_initial_request_; bool timer_pending_ = false; grpc_timer timer_; grpc_closure timer_callback_; @@ -238,8 +239,7 @@ class XdsClient::ChannelState::AdsCallState struct ResourceTypeState { ~ResourceTypeState() { GRPC_ERROR_UNREF(error); } - // Version, nonce, and error for this resource type. - std::string version; + // Nonce and error for this resource type. std::string nonce; grpc_error* error = GRPC_ERROR_NONE; @@ -336,7 +336,7 @@ class XdsClient::ChannelState::LrsCallState void ScheduleNextReportLocked(); static void OnNextReportTimer(void* arg, grpc_error* error); bool OnNextReportTimerLocked(grpc_error* error); - void SendReportLocked(); + bool SendReportLocked(); static void OnReportDone(void* arg, grpc_error* error); bool OnReportDoneLocked(grpc_error* error); @@ -431,11 +431,45 @@ class XdsClient::ChannelState::StateWatcher // XdsClient::ChannelState // +namespace { + +grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) { + // Build channel args. + absl::InlinedVector<grpc_arg, 2> args_to_add = { + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), + 5 * 60 * GPR_MS_PER_SEC), + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1), + }; + grpc_channel_args* new_args = grpc_channel_args_copy_and_add( + g_channel_args, args_to_add.data(), args_to_add.size()); + // Create channel creds. + RefCountedPtr<grpc_channel_credentials> channel_creds = + XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type, + server.channel_creds_config); + // Create channel. + grpc_channel* channel = grpc_secure_channel_create( + channel_creds.get(), server.server_uri.c_str(), new_args, nullptr); + grpc_channel_args_destroy(new_args); + return channel; +} + +} // namespace + XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client, - grpc_channel* channel) - : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace), + const XdsBootstrap::XdsServer& server) + : InternallyRefCounted<ChannelState>( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) + ? "ChannelState" + : nullptr), xds_client_(std::move(xds_client)), - channel_(channel) { + server_(server) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", + xds_client_.get(), server.server_uri.c_str()); + } + channel_ = CreateXdsChannel(server); GPR_ASSERT(channel_ != nullptr); StartConnectivityWatchLocked(); } @@ -634,7 +668,10 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked( XdsClient::ChannelState::AdsCallState::AdsCallState( RefCountedPtr<RetryableCall<AdsCallState>> parent) - : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace), + : InternallyRefCounted<AdsCallState>( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) + ? "AdsCallState" + : nullptr), parent_(std::move(parent)) { // Init the ADS call. Note that the call will progress every time there's // activity in xds_client()->interested_parties_, which is comprised of @@ -642,7 +679,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( GPR_ASSERT(xds_client() != nullptr); // Create a call with the specified method name. const auto& method = - xds_client()->bootstrap_->server().ShouldUseV3() + chand()->server_.ShouldUseV3() ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES; call_ = grpc_channel_create_pollset_set_call( @@ -672,8 +709,8 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; op->reserved = nullptr; op++; - call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), - nullptr); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast<size_t>(op - ops), nullptr); GPR_ASSERT(GRPC_CALL_OK == call_error); // Op: send request message. GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, @@ -707,8 +744,8 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release(); GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, grpc_schedule_on_exec_ctx); - call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), - &on_response_received_); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast<size_t>(op - ops), &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); // Op: recv server status. op = ops; @@ -724,8 +761,8 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // unreffed. GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, grpc_schedule_on_exec_ctx); - call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), - &on_status_received_); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast<size_t>(op - ops), &on_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); } @@ -763,7 +800,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( std::set<absl::string_view> resource_names = ResourceNamesForRequest(type_url); request_payload_slice = xds_client()->api_.CreateAdsRequest( - type_url, resource_names, state.version, state.nonce, + chand()->server_, type_url, resource_names, + xds_client()->resource_version_map_[type_url], state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { @@ -774,7 +812,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( gpr_log(GPR_INFO, "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s " "error=%s resources=%s", - xds_client(), type_url.c_str(), state.version.c_str(), + xds_client(), type_url.c_str(), + xds_client()->resource_version_map_[type_url].c_str(), state.nonce.c_str(), grpc_error_string(state.error), absl::StrJoin(resource_names, " ").c_str()); } @@ -806,7 +845,8 @@ void XdsClient::ChannelState::AdsCallState::Subscribe( const std::string& type_url, const std::string& name) { auto& state = state_map_[type_url].subscribed_resources[name]; if (state == nullptr) { - state = MakeOrphanable<ResourceState>(type_url, name); + state = MakeOrphanable<ResourceState>( + type_url, name, !xds_client()->resource_version_map_[type_url].empty()); SendMessageLocked(type_url); } } @@ -967,13 +1007,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( auto& state = cds_state.subscribed_resources[cluster_name]; if (state != nullptr) state->Finish(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] cluster=%s: eds_service_name=%s, " - "lrs_load_reporting_server_name=%s", - xds_client(), cluster_name, cds_update.eds_service_name.c_str(), - cds_update.lrs_load_reporting_server_name.has_value() - ? cds_update.lrs_load_reporting_server_name.value().c_str() - : "(N/A)"); + gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(), + cluster_name, cds_update.ToString().c_str()); } // Record the EDS resource names seen. eds_resource_names_seen.insert(cds_update.eds_service_name.empty() @@ -1170,7 +1205,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { } else if (result.type_url == XdsApi::kEdsTypeUrl) { AcceptEdsUpdate(std::move(result.eds_update_map)); } - state.version = std::move(result.version); + xds_client()->resource_version_map_[result.type_url] = + std::move(result.version); // ACK the update. SendMessageLocked(result.type_url); // Start load reporting if needed. @@ -1287,8 +1323,7 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( GRPC_ERROR_UNREF(error); return true; } - SendReportLocked(); - return false; + return SendReportLocked(); } namespace { @@ -1307,7 +1342,7 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { } // namespace -void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { +bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Construct snapshot from all reported stats. XdsApi::ClusterLoadReportMap snapshot = xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_, @@ -1317,8 +1352,12 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { const bool old_val = last_report_counters_were_zero_; last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); if (old_val && last_report_counters_were_zero_) { + if (xds_client()->load_report_map_.empty()) { + parent_->chand()->StopLrsCall(); + return true; + } ScheduleNextReportLocked(); - return; + return false; } // Create a request that contains the snapshot. grpc_slice request_payload_slice = @@ -1339,6 +1378,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { xds_client(), this, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } + return false; } void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( @@ -1381,14 +1421,17 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( XdsClient::ChannelState::LrsCallState::LrsCallState( RefCountedPtr<RetryableCall<LrsCallState>> parent) - : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace), + : InternallyRefCounted<LrsCallState>( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) + ? "LrsCallState" + : nullptr), parent_(std::move(parent)) { // Init the LRS call. Note that the call will progress every time there's // activity in xds_client()->interested_parties_, which is comprised of // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); const auto& method = - xds_client()->bootstrap_->server().ShouldUseV3() + chand()->server_.ShouldUseV3() ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS; call_ = grpc_channel_create_pollset_set_call( @@ -1398,7 +1441,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( GPR_ASSERT(call_ != nullptr); // Init the request payload. grpc_slice request_payload_slice = - xds_client()->api_.CreateLrsInitialRequest(); + xds_client()->api_.CreateLrsInitialRequest(chand()->server_); send_message_payload_ = grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); @@ -1434,8 +1477,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release(); GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this, grpc_schedule_on_exec_ctx); - call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), - &on_initial_request_sent_); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_); GPR_ASSERT(GRPC_CALL_OK == call_error); // Op: recv initial metadata. op = ops; @@ -1454,8 +1497,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release(); GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, grpc_schedule_on_exec_ctx); - call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), - &on_response_received_); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast<size_t>(op - ops), &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); // Op: recv server status. op = ops; @@ -1471,8 +1514,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( // unreffed. GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, grpc_schedule_on_exec_ctx); - call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), - &on_status_received_); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast<size_t>(op - ops), &on_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); } @@ -1689,56 +1732,22 @@ grpc_millis GetRequestTimeout() { {15000, 0, INT_MAX}); } -grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap, - grpc_error** error) { - // Build channel args. - absl::InlinedVector<grpc_arg, 2> args_to_add = { - grpc_channel_arg_integer_create( - const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), - 5 * 60 * GPR_MS_PER_SEC), - grpc_channel_arg_integer_create( - const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1), - }; - grpc_channel_args* new_args = grpc_channel_args_copy_and_add( - g_channel_args, args_to_add.data(), args_to_add.size()); - // Find credentials and create channel. - RefCountedPtr<grpc_channel_credentials> creds; - for (const auto& channel_creds : bootstrap.server().channel_creds) { - if (channel_creds.type == "google_default") { - creds.reset(grpc_google_default_credentials_create(nullptr)); - break; - } - if (channel_creds.type == "insecure") { - grpc_channel* channel = grpc_insecure_channel_create( - bootstrap.server().server_uri.c_str(), new_args, nullptr); - grpc_channel_args_destroy(new_args); - return channel; - } - if (channel_creds.type == "fake") { - creds.reset(grpc_fake_transport_security_credentials_create()); - break; - } - } - if (creds == nullptr) { - *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "no supported credential types found"); - return nullptr; - } - grpc_channel* channel = grpc_secure_channel_create( - creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr); - grpc_channel_args_destroy(new_args); - return channel; -} - } // namespace XdsClient::XdsClient(grpc_error** error) - : DualRefCounted<XdsClient>(&grpc_xds_client_trace), + : DualRefCounted<XdsClient>( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient" + : nullptr), request_timeout_(GetRequestTimeout()), interested_parties_(grpc_pollset_set_create()), bootstrap_( XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), - api_(this, &grpc_xds_client_trace, bootstrap_.get()) { + certificate_provider_store_(MakeOrphanable<CertificateProviderStore>( + bootstrap_ == nullptr + ? CertificateProviderStore::PluginDefinitionMap() + : bootstrap_->certificate_providers())), + api_(this, &grpc_xds_client_trace, + bootstrap_ == nullptr ? nullptr : bootstrap_->node()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); } @@ -1747,19 +1756,9 @@ XdsClient::XdsClient(grpc_error** error) this, grpc_error_string(*error)); return; } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this, - bootstrap_->server().server_uri.c_str()); - } - grpc_channel* channel = CreateXdsChannel(*bootstrap_, error); - if (*error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this, - grpc_error_string(*error)); - return; - } // Create ChannelState object. chand_ = MakeOrphanable<ChannelState>( - WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); + WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server()); } XdsClient::~XdsClient() { @@ -1982,10 +1981,22 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( auto it = load_report_map_ .emplace(std::make_pair(std::move(key), LoadReportState())) .first; - auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>( - Ref(DEBUG_LOCATION, "DropStats"), lrs_server, - it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/); - it->second.drop_stats.insert(cluster_drop_stats.get()); + LoadReportState& load_report_state = it->second; + RefCountedPtr<XdsClusterDropStats> cluster_drop_stats; + if (load_report_state.drop_stats != nullptr) { + cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero(); + } + if (cluster_drop_stats == nullptr) { + if (load_report_state.drop_stats != nullptr) { + load_report_state.deleted_drop_stats += + load_report_state.drop_stats->GetSnapshotAndReset(); + } + cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>( + Ref(DEBUG_LOCATION, "DropStats"), lrs_server, + it->first.first /*cluster_name*/, + it->first.second /*eds_service_name*/); + load_report_state.drop_stats = cluster_drop_stats.get(); + } chand_->MaybeStartLrsCall(); return cluster_drop_stats; } @@ -1995,19 +2006,18 @@ void XdsClient::RemoveClusterDropStats( absl::string_view eds_service_name, XdsClusterDropStats* cluster_drop_stats) { MutexLock lock(&mu_); - auto load_report_it = load_report_map_.find( - std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (load_report_it == load_report_map_.end()) return; - LoadReportState& load_report_state = load_report_it->second; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. - auto it = load_report_state.drop_stats.find(cluster_drop_stats); - if (it != load_report_state.drop_stats.end()) { - // Record final drop stats in deleted_drop_stats, which will be + auto it = load_report_map_.find( + std::make_pair(std::string(cluster_name), std::string(eds_service_name))); + if (it == load_report_map_.end()) return; + LoadReportState& load_report_state = it->second; + if (load_report_state.drop_stats == cluster_drop_stats) { + // Record final snapshot in deleted_drop_stats, which will be // added to the next load report. - auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset(); - load_report_state.deleted_drop_stats += dropped_requests; - load_report_state.drop_stats.erase(it); + load_report_state.deleted_drop_stats += + load_report_state.drop_stats->GetSnapshotAndReset(); + load_report_state.drop_stats = nullptr; } } @@ -2026,12 +2036,24 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats( auto it = load_report_map_ .emplace(std::make_pair(std::move(key), LoadReportState())) .first; - auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>( - Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, - it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, - locality); - it->second.locality_stats[std::move(locality)].locality_stats.insert( - cluster_locality_stats.get()); + LoadReportState& load_report_state = it->second; + LoadReportState::LocalityState& locality_state = + load_report_state.locality_stats[locality]; + RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats; + if (locality_state.locality_stats != nullptr) { + cluster_locality_stats = locality_state.locality_stats->RefIfNonZero(); + } + if (cluster_locality_stats == nullptr) { + if (locality_state.locality_stats != nullptr) { + locality_state.deleted_locality_stats += + locality_state.locality_stats->GetSnapshotAndReset(); + } + cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>( + Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, + it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, + std::move(locality)); + locality_state.locality_stats = cluster_locality_stats.get(); + } chand_->MaybeStartLrsCall(); return cluster_locality_stats; } @@ -2042,22 +2064,21 @@ void XdsClient::RemoveClusterLocalityStats( const RefCountedPtr<XdsLocalityName>& locality, XdsClusterLocalityStats* cluster_locality_stats) { MutexLock lock(&mu_); - auto load_report_it = load_report_map_.find( - std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (load_report_it == load_report_map_.end()) return; - LoadReportState& load_report_state = load_report_it->second; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. + auto it = load_report_map_.find( + std::make_pair(std::string(cluster_name), std::string(eds_service_name))); + if (it == load_report_map_.end()) return; + LoadReportState& load_report_state = it->second; auto locality_it = load_report_state.locality_stats.find(locality); if (locality_it == load_report_state.locality_stats.end()) return; - auto& locality_set = locality_it->second.locality_stats; - auto it = locality_set.find(cluster_locality_stats); - if (it != locality_set.end()) { + LoadReportState::LocalityState& locality_state = locality_it->second; + if (locality_state.locality_stats == cluster_locality_stats) { // Record final snapshot in deleted_locality_stats, which will be // added to the next load report. - locality_it->second.deleted_locality_stats.emplace_back( - cluster_locality_stats->GetSnapshotAndReset()); - locality_set.erase(it); + locality_state.deleted_locality_stats += + locality_state.locality_stats->GetSnapshotAndReset(); + locality_state.locality_stats = nullptr; } } @@ -2098,6 +2119,9 @@ void XdsClient::NotifyOnErrorLocked(grpc_error* error) { XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( bool send_all_clusters, const std::set<std::string>& clusters) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] start building load report", this); + } XdsApi::ClusterLoadReportMap snapshot_map; for (auto load_report_it = load_report_map_.begin(); load_report_it != load_report_map_.end();) { @@ -2116,9 +2140,15 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( XdsApi::ClusterLoadReport snapshot; // Aggregate drop stats. snapshot.dropped_requests = std::move(load_report.deleted_drop_stats); - for (auto& drop_stats : load_report.drop_stats) { - auto dropped_requests = drop_stats->GetSnapshotAndReset(); - snapshot.dropped_requests += dropped_requests; + if (load_report.drop_stats != nullptr) { + snapshot.dropped_requests += + load_report.drop_stats->GetSnapshotAndReset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p", + this, cluster_key.first.c_str(), cluster_key.second.c_str(), + load_report.drop_stats); + } } // Aggregate locality stats. for (auto it = load_report.locality_stats.begin(); @@ -2127,34 +2157,39 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( auto& locality_state = it->second; XdsClusterLocalityStats::Snapshot& locality_snapshot = snapshot.locality_stats[locality_name]; - for (auto& locality_stats : locality_state.locality_stats) { - locality_snapshot += locality_stats->GetSnapshotAndReset(); - } - // Add final snapshots from recently deleted locality stats objects. - for (auto& deleted_locality_stats : - locality_state.deleted_locality_stats) { - locality_snapshot += deleted_locality_stats; + locality_snapshot = std::move(locality_state.deleted_locality_stats); + if (locality_state.locality_stats != nullptr) { + locality_snapshot += + locality_state.locality_stats->GetSnapshotAndReset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] cluster=%s eds_service_name=%s " + "locality=%s locality_stats=%p", + this, cluster_key.first.c_str(), cluster_key.second.c_str(), + locality_name->AsHumanReadableString().c_str(), + locality_state.locality_stats); + } } - locality_state.deleted_locality_stats.clear(); // If the only thing left in this entry was final snapshots from // deleted locality stats objects, remove the entry. - if (locality_state.locality_stats.empty()) { + if (locality_state.locality_stats == nullptr) { it = load_report.locality_stats.erase(it); } else { ++it; } } + // Compute load report interval. + const grpc_millis now = ExecCtx::Get()->Now(); + snapshot.load_report_interval = now - load_report.last_report_time; + load_report.last_report_time = now; + // Record snapshot. if (record_stats) { - // Compute load report interval. - const grpc_millis now = ExecCtx::Get()->Now(); - snapshot.load_report_interval = now - load_report.last_report_time; - load_report.last_report_time = now; - // Record snapshot. snapshot_map[cluster_key] = std::move(snapshot); } // If the only thing left in this entry was final snapshots from // deleted stats objects, remove the entry. - if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) { + if (load_report.locality_stats.empty() && + load_report.drop_stats == nullptr) { load_report_it = load_report_map_.erase(load_report_it); } else { ++load_report_it; diff --git a/grpc/src/core/ext/xds/xds_client.h b/grpc/src/core/ext/xds/xds_client.h index 3e74c2c3..f1c64675 100644 --- a/grpc/src/core/ext/xds/xds_client.h +++ b/grpc/src/core/ext/xds/xds_client.h @@ -30,7 +30,6 @@ #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/dual_ref_counted.h" -#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -39,7 +38,8 @@ namespace grpc_core { -extern TraceFlag xds_client_trace; +extern TraceFlag grpc_xds_client_trace; +extern TraceFlag grpc_xds_client_refcount_trace; class XdsClient : public DualRefCounted<XdsClient> { public: @@ -86,7 +86,11 @@ class XdsClient : public DualRefCounted<XdsClient> { // Callers should not instantiate directly. Use GetOrCreate() instead. explicit XdsClient(grpc_error** error); - ~XdsClient(); + ~XdsClient() override; + + CertificateProviderStore& certificate_provider_store() { + return *certificate_provider_store_; + } grpc_pollset_set* interested_parties() const { return interested_parties_; } @@ -198,8 +202,8 @@ class XdsClient : public DualRefCounted<XdsClient> { class LrsCallState; ChannelState(WeakRefCountedPtr<XdsClient> xds_client, - grpc_channel* channel); - ~ChannelState(); + const XdsBootstrap::XdsServer& server); + ~ChannelState() override; void Orphan() override; @@ -226,6 +230,8 @@ class XdsClient : public DualRefCounted<XdsClient> { // The owning xds client. WeakRefCountedPtr<XdsClient> xds_client_; + const XdsBootstrap::XdsServer& server_; + // The channel and its status. grpc_channel* channel_; bool shutting_down_ = false; @@ -267,17 +273,13 @@ class XdsClient : public DualRefCounted<XdsClient> { absl::optional<XdsApi::EdsUpdate> update; }; - // TODO(roth): Change this to store exactly one instance of - // XdsClusterDropStats and exactly one instance of - // XdsClusterLocalityStats per locality. We can return multiple refs - // to the same object instead of registering multiple objects. struct LoadReportState { struct LocalityState { - std::set<XdsClusterLocalityStats*> locality_stats; - std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats; + XdsClusterLocalityStats* locality_stats = nullptr; + XdsClusterLocalityStats::Snapshot deleted_locality_stats; }; - std::set<XdsClusterDropStats*> drop_stats; + XdsClusterDropStats* drop_stats = nullptr; XdsClusterDropStats::Snapshot deleted_drop_stats; std::map<RefCountedPtr<XdsLocalityName>, LocalityState, XdsLocalityName::Less> @@ -294,6 +296,7 @@ class XdsClient : public DualRefCounted<XdsClient> { const grpc_millis request_timeout_; grpc_pollset_set* interested_parties_; std::unique_ptr<XdsBootstrap> bootstrap_; + OrphanablePtr<CertificateProviderStore> certificate_provider_store_; XdsApi api_; Mutex mu_; @@ -317,6 +320,9 @@ class XdsClient : public DualRefCounted<XdsClient> { LoadReportState> load_report_map_; + // Stores the most recent accepted resource version for each resource type. + std::map<std::string /*type*/, std::string /*version*/> resource_version_map_; + bool shutting_down_ = false; }; diff --git a/grpc/src/core/ext/xds/xds_client_stats.cc b/grpc/src/core/ext/xds/xds_client_stats.cc index ba29ec08..de401a7b 100644 --- a/grpc/src/core/ext/xds/xds_client_stats.cc +++ b/grpc/src/core/ext/xds/xds_client_stats.cc @@ -45,12 +45,29 @@ XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server_name, absl::string_view cluster_name, absl::string_view eds_service_name) - : xds_client_(std::move(xds_client)), + : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) + ? "XdsClusterDropStats" + : nullptr), + xds_client_(std::move(xds_client)), lrs_server_name_(lrs_server_name), cluster_name_(cluster_name), - eds_service_name_(eds_service_name) {} + eds_service_name_(eds_service_name) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str()); + } +} XdsClusterDropStats::~XdsClusterDropStats() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] destroying drop stats %p for {%s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str()); + } xds_client_->RemoveClusterDropStats(lrs_server_name_, cluster_name_, eds_service_name_, this); xds_client_.reset(DEBUG_LOCATION, "DropStats"); @@ -81,13 +98,33 @@ XdsClusterLocalityStats::XdsClusterLocalityStats( RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server_name, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr<XdsLocalityName> name) - : xds_client_(std::move(xds_client)), + : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) + ? "XdsClusterLocalityStats" + : nullptr), + xds_client_(std::move(xds_client)), lrs_server_name_(lrs_server_name), cluster_name_(cluster_name), eds_service_name_(eds_service_name), - name_(std::move(name)) {} + name_(std::move(name)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] created locality stats %p for {%s, %s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str(), + name_->AsHumanReadableString().c_str()); + } +} XdsClusterLocalityStats::~XdsClusterLocalityStats() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str(), + name_->AsHumanReadableString().c_str()); + } xds_client_->RemoveClusterLocalityStats(lrs_server_name_, cluster_name_, eds_service_name_, name_, this); xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); diff --git a/grpc/src/core/ext/xds/xds_client_stats.h b/grpc/src/core/ext/xds/xds_client_stats.h index 1906f08d..523ef111 100644 --- a/grpc/src/core/ext/xds/xds_client_stats.h +++ b/grpc/src/core/ext/xds/xds_client_stats.h @@ -131,7 +131,7 @@ class XdsClusterDropStats : public RefCounted<XdsClusterDropStats> { absl::string_view lrs_server_name, absl::string_view cluster_name, absl::string_view eds_service_name); - ~XdsClusterDropStats(); + ~XdsClusterDropStats() override; // Returns a snapshot of this instance and resets all the counters. Snapshot GetSnapshotAndReset(); @@ -206,7 +206,7 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> { absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr<XdsLocalityName> name); - ~XdsClusterLocalityStats(); + ~XdsClusterLocalityStats() override; // Returns a snapshot of this instance and resets all the counters. Snapshot GetSnapshotAndReset(); diff --git a/grpc/src/core/ext/xds/xds_server_config_fetcher.cc b/grpc/src/core/ext/xds/xds_server_config_fetcher.cc new file mode 100644 index 00000000..5c5e8ee2 --- /dev/null +++ b/grpc/src/core/ext/xds/xds_server_config_fetcher.cc @@ -0,0 +1,131 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/xds/xds_client.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/server.h" + +namespace grpc_core { +namespace { + +class XdsServerConfigFetcher : public grpc_server_config_fetcher { + public: + explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client) + : xds_client_(std::move(xds_client)) { + GPR_ASSERT(xds_client_ != nullptr); + } + + void StartWatch(std::string listening_address, + std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> + watcher) override { + grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); + auto listener_watcher = + absl::make_unique<ListenerWatcher>(std::move(watcher)); + auto* listener_watcher_ptr = listener_watcher.get(); + // TODO(yashykt): Get the resource name id from bootstrap + xds_client_->WatchListenerData( + absl::StrCat("grpc/server?xds.resource.listening_address=", + listening_address), + std::move(listener_watcher)); + MutexLock lock(&mu_); + auto& watcher_state = watchers_[watcher_ptr]; + watcher_state.listening_address = listening_address; + watcher_state.listener_watcher = listener_watcher_ptr; + } + + void CancelWatch( + grpc_server_config_fetcher::WatcherInterface* watcher) override { + MutexLock lock(&mu_); + auto it = watchers_.find(watcher); + if (it != watchers_.end()) { + // Cancel the watch on the listener before erasing + xds_client_->CancelListenerDataWatch(it->second.listening_address, + it->second.listener_watcher, + false /* delay_unsubscription */); + watchers_.erase(it); + } + } + + // Return the interested parties from the xds client so that it can be polled. + grpc_pollset_set* interested_parties() override { + return xds_client_->interested_parties(); + } + + private: + class ListenerWatcher : public XdsClient::ListenerWatcherInterface { + public: + explicit ListenerWatcher( + std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> + server_config_watcher) + : server_config_watcher_(std::move(server_config_watcher)) {} + + void OnListenerChanged(XdsApi::LdsUpdate listener) override { + // TODO(yashykt): Construct channel args according to received update + server_config_watcher_->UpdateConfig(nullptr); + } + + void OnError(grpc_error* error) override { + gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this, + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + // TODO(yashykt): We might want to bubble this error to the application. + } + + void OnResourceDoesNotExist() override { + gpr_log(GPR_ERROR, + "ListenerWatcher:%p XdsClient reports requested listener does " + "not exist", + this); + // TODO(yashykt): We might want to bubble this error to the application. + } + + private: + std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> + server_config_watcher_; + }; + + struct WatcherState { + std::string listening_address; + ListenerWatcher* listener_watcher = nullptr; + }; + + RefCountedPtr<XdsClient> xds_client_; + Mutex mu_; + std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState> + watchers_; +}; + +} // namespace +} // namespace grpc_core + +grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ()); + grpc_error* error = GRPC_ERROR_NONE; + grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client = + grpc_core::XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to create xds client: %s", + grpc_error_string(error)); + return nullptr; + } + return new grpc_core::XdsServerConfigFetcher(std::move(xds_client)); +} |