summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/xds/xds_client.h
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/xds/xds_client.h')
-rw-r--r--grpc/src/core/ext/xds/xds_client.h293
1 files changed, 131 insertions, 162 deletions
diff --git a/grpc/src/core/ext/xds/xds_client.h b/grpc/src/core/ext/xds/xds_client.h
index 0ee84e78..5fe050cc 100644
--- a/grpc/src/core/ext/xds/xds_client.h
+++ b/grpc/src/core/ext/xds/xds_client.h
@@ -28,6 +28,7 @@
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
+#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/memory.h"
@@ -35,6 +36,8 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/work_serializer.h"
+#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
@@ -43,40 +46,19 @@ extern TraceFlag grpc_xds_client_refcount_trace;
class XdsClient : public DualRefCounted<XdsClient> {
public:
- // Listener data watcher interface. Implemented by callers.
- class ListenerWatcherInterface {
+ // Resource watcher interface. Implemented by callers.
+ // Note: Most callers will not use this API directly but rather via a
+ // resource-type-specific wrapper API provided by the relevant
+ // XdsResourceType implementation.
+ class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
public:
- virtual ~ListenerWatcherInterface() = default;
- virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
- virtual void OnError(grpc_error_handle error) = 0;
- virtual void OnResourceDoesNotExist() = 0;
- };
-
- // RouteConfiguration data watcher interface. Implemented by callers.
- class RouteConfigWatcherInterface {
- public:
- virtual ~RouteConfigWatcherInterface() = default;
- virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
- virtual void OnError(grpc_error_handle error) = 0;
- virtual void OnResourceDoesNotExist() = 0;
- };
-
- // Cluster data watcher interface. Implemented by callers.
- class ClusterWatcherInterface {
- public:
- virtual ~ClusterWatcherInterface() = default;
- virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
- virtual void OnError(grpc_error_handle error) = 0;
- virtual void OnResourceDoesNotExist() = 0;
- };
-
- // Endpoint data watcher interface. Implemented by callers.
- class EndpointWatcherInterface {
- public:
- virtual ~EndpointWatcherInterface() = default;
- virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
- virtual void OnError(grpc_error_handle error) = 0;
- virtual void OnResourceDoesNotExist() = 0;
+ virtual void OnGenericResourceChanged(
+ const XdsResourceType::ResourceData* resource)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
+ virtual void OnError(absl::Status status)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
+ virtual void OnResourceDoesNotExist()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
// Factory function to get or create the global XdsClient instance.
@@ -102,78 +84,39 @@ class XdsClient : public DualRefCounted<XdsClient> {
grpc_pollset_set* interested_parties() const { return interested_parties_; }
- // TODO(roth): When we add federation, there will be multiple channels
- // inside the XdsClient, and the set of channels may change over time,
- // but not every channel may use every one of the child channels, so
- // this API will need to change. At minumum, we will need to hold a
- // ref to the parent channelz node so that we can update its list of
- // children as the set of xDS channels changes. However, we may also
- // want to make this a bit more selective such that only those
- // channels on which a given parent channel is actually requesting
- // resources will actually be marked as its children.
- void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
- void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
-
void Orphan() override;
- // Start and cancel listener data watch for a listener.
- // The XdsClient takes ownership of the watcher, but the caller may
- // keep a raw pointer to the watcher, which may be used only for
- // cancellation. (Because the caller does not own the watcher, the
- // pointer must not be used for any other purpose.)
- // If the caller is going to start a new watch after cancelling the
- // old one, it should set delay_unsubscription to true.
- void WatchListenerData(absl::string_view listener_name,
- std::unique_ptr<ListenerWatcherInterface> watcher);
- void CancelListenerDataWatch(absl::string_view listener_name,
- ListenerWatcherInterface* watcher,
- bool delay_unsubscription = false);
-
- // Start and cancel route config data watch for a listener.
- // The XdsClient takes ownership of the watcher, but the caller may
- // keep a raw pointer to the watcher, which may be used only for
- // cancellation. (Because the caller does not own the watcher, the
- // pointer must not be used for any other purpose.)
- // If the caller is going to start a new watch after cancelling the
- // old one, it should set delay_unsubscription to true.
- void WatchRouteConfigData(
- absl::string_view route_config_name,
- std::unique_ptr<RouteConfigWatcherInterface> watcher);
- void CancelRouteConfigDataWatch(absl::string_view route_config_name,
- RouteConfigWatcherInterface* watcher,
- bool delay_unsubscription = false);
-
- // Start and cancel cluster data watch for a cluster.
- // The XdsClient takes ownership of the watcher, but the caller may
- // keep a raw pointer to the watcher, which may be used only for
- // cancellation. (Because the caller does not own the watcher, the
- // pointer must not be used for any other purpose.)
- // If the caller is going to start a new watch after cancelling the
- // old one, it should set delay_unsubscription to true.
- void WatchClusterData(absl::string_view cluster_name,
- std::unique_ptr<ClusterWatcherInterface> watcher);
- void CancelClusterDataWatch(absl::string_view cluster_name,
- ClusterWatcherInterface* watcher,
- bool delay_unsubscription = false);
-
- // Start and cancel endpoint data watch for a cluster.
+ // Start and cancel watch for a resource.
+ //
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// cancellation. (Because the caller does not own the watcher, the
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
- void WatchEndpointData(absl::string_view eds_service_name,
- std::unique_ptr<EndpointWatcherInterface> watcher);
- void CancelEndpointDataWatch(absl::string_view eds_service_name,
- EndpointWatcherInterface* watcher,
- bool delay_unsubscription = false);
+ //
+ // The resource type object must be a global singleton, since the first
+ // time the XdsClient sees a particular resource type object, it will
+ // store the pointer to that object as the authoritative implementation for
+ // its type URLs. The resource type object must outlive the XdsClient object,
+ // and it is illegal to start a subsequent watch for the same type URLs using
+ // a different resource type object.
+ //
+ // Note: Most callers will not use this API directly but rather via a
+ // resource-type-specific wrapper API provided by the relevant
+ // XdsResourceType implementation.
+ void WatchResource(const XdsResourceType* type, absl::string_view name,
+ RefCountedPtr<ResourceWatcherInterface> watcher);
+ void CancelResourceWatch(const XdsResourceType* type,
+ absl::string_view listener_name,
+ ResourceWatcherInterface* watcher,
+ bool delay_unsubscription = false);
// Adds and removes drop stats for cluster_name and eds_service_name.
RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
- absl::string_view lrs_server, absl::string_view cluster_name,
+ const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name);
- void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
+ void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats);
@@ -181,11 +124,11 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Adds and removes locality stats for cluster_name and eds_service_name
// for the specified locality.
RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
- absl::string_view lrs_server, absl::string_view cluster_name,
+ const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality);
void RemoveClusterLocalityStats(
- absl::string_view /*lrs_server*/, absl::string_view cluster_name,
+ const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats);
@@ -208,14 +151,25 @@ class XdsClient : public DualRefCounted<XdsClient> {
const grpc_channel_args& args);
private:
+ struct XdsResourceKey {
+ std::string id;
+ std::vector<URI::QueryParam> query_params;
+
+ bool operator<(const XdsResourceKey& other) const {
+ int c = id.compare(other.id);
+ if (c != 0) return c < 0;
+ return query_params < other.query_params;
+ }
+ };
+
+ struct XdsResourceName {
+ std::string authority;
+ XdsResourceKey key;
+ };
+
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
- //
- // Currently, there is only one ChannelState object per XdsClient
- // object, and it has essentially the same lifetime. But in the
- // future, when we add federation support, a single XdsClient may have
- // multiple underlying channels to talk to different xDS servers.
- class ChannelState : public InternallyRefCounted<ChannelState> {
+ class ChannelState : public DualRefCounted<ChannelState> {
public:
template <typename T>
class RetryableCall;
@@ -235,17 +189,20 @@ class XdsClient : public DualRefCounted<XdsClient> {
LrsCallState* lrs_calld() const;
void MaybeStartLrsCall();
- void StopLrsCall();
+ void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool HasAdsCall() const;
bool HasActiveAdsCall() const;
- void StartConnectivityWatchLocked();
+ void StartConnectivityWatchLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void CancelConnectivityWatchLocked();
- void SubscribeLocked(const std::string& type_url, const std::string& name)
+ void SubscribeLocked(const XdsResourceType* type,
+ const XdsResourceName& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
- void UnsubscribeLocked(const std::string& type_url, const std::string& name,
+ void UnsubscribeLocked(const XdsResourceType* type,
+ const XdsResourceName& name,
bool delay_unsubscription)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
@@ -260,46 +217,29 @@ class XdsClient : public DualRefCounted<XdsClient> {
// The channel and its status.
grpc_channel* channel_;
bool shutting_down_ = false;
- StateWatcher* watcher_ = nullptr;
+ StateWatcher* watcher_;
// The retryable XDS calls.
OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
- };
- struct ListenerState {
- std::map<ListenerWatcherInterface*,
- std::unique_ptr<ListenerWatcherInterface>>
- watchers;
- // The latest data seen from LDS.
- absl::optional<XdsApi::LdsUpdate> update;
- XdsApi::ResourceMetadata meta;
- };
-
- struct RouteConfigState {
- std::map<RouteConfigWatcherInterface*,
- std::unique_ptr<RouteConfigWatcherInterface>>
- watchers;
- // The latest data seen from RDS.
- absl::optional<XdsApi::RdsUpdate> update;
- XdsApi::ResourceMetadata meta;
+ // Stores the most recent accepted resource version for each resource type.
+ std::map<const XdsResourceType*, std::string /*version*/>
+ resource_type_version_map_;
};
- struct ClusterState {
- std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
+ struct ResourceState {
+ std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
watchers;
- // The latest data seen from CDS.
- absl::optional<XdsApi::CdsUpdate> update;
+ // The latest data seen for the resource.
+ std::unique_ptr<XdsResourceType::ResourceData> resource;
XdsApi::ResourceMetadata meta;
};
- struct EndpointState {
- std::map<EndpointWatcherInterface*,
- std::unique_ptr<EndpointWatcherInterface>>
- watchers;
- // The latest data seen from EDS.
- absl::optional<XdsApi::EdsUpdate> update;
- XdsApi::ResourceMetadata meta;
+ struct AuthorityState {
+ RefCountedPtr<ChannelState> channel_state;
+ std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
+ resource_map;
};
struct LoadReportState {
@@ -313,55 +253,84 @@ class XdsClient : public DualRefCounted<XdsClient> {
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
- grpc_millis last_report_time = ExecCtx::Get()->Now();
+ Timestamp last_report_time = ExecCtx::Get()->Now();
+ };
+
+ // Load report data.
+ using LoadReportMap = std::map<
+ std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
+ LoadReportState>;
+
+ struct LoadReportServer {
+ RefCountedPtr<ChannelState> channel_state;
+ LoadReportMap load_report_map;
};
// Sends an error notification to all watchers.
- void NotifyOnErrorLocked(grpc_error_handle error)
+ void NotifyOnErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
- bool send_all_clusters, const std::set<std::string>& clusters)
+ // Sends an error notification to a specific set of watchers.
+ void NotifyWatchersOnErrorLocked(
+ const std::map<ResourceWatcherInterface*,
+ RefCountedPtr<ResourceWatcherInterface>>& watchers,
+ absl::Status status);
+ // Sends a resource-does-not-exist notification to a specific set of watchers.
+ void NotifyWatchersOnResourceDoesNotExist(
+ const std::map<ResourceWatcherInterface*,
+ RefCountedPtr<ResourceWatcherInterface>>& watchers);
+
+ void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
- void UpdateResourceMetadataWithFailedParseResultLocked(
- grpc_millis update_time, const XdsApi::AdsParseResult& result)
+ // Gets the type for resource_type, or null if the type is unknown.
+ const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
+ absl::StatusOr<XdsResourceName> ParseXdsResourceName(
+ absl::string_view name, const XdsResourceType* type);
+ static std::string ConstructFullXdsResourceName(
+ absl::string_view authority, absl::string_view resource_type,
+ const XdsResourceKey& key);
+
+ XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
+ const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
+ const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
+ RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked(
+ const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
std::unique_ptr<XdsBootstrap> bootstrap_;
grpc_channel_args* args_;
- const grpc_millis request_timeout_;
+ const Duration request_timeout_;
+ const bool xds_federation_enabled_;
grpc_pollset_set* interested_parties_;
OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
XdsApi api_;
+ WorkSerializer work_serializer_;
Mutex mu_;
- // The channel for communicating with the xds server.
- OrphanablePtr<ChannelState> chand_ ABSL_GUARDED_BY(mu_);
+ // Stores resource type objects seen by type URL.
+ std::map<absl::string_view /*resource_type*/, const XdsResourceType*>
+ resource_types_ ABSL_GUARDED_BY(mu_);
+ std::map<absl::string_view /*v2_resource_type*/, const XdsResourceType*>
+ v2_resource_types_ ABSL_GUARDED_BY(mu_);
+ upb::SymbolTable symtab_ ABSL_GUARDED_BY(mu_);
- // One entry for each watched LDS resource.
- std::map<std::string /*listener_name*/, ListenerState> listener_map_
- ABSL_GUARDED_BY(mu_);
- // One entry for each watched RDS resource.
- std::map<std::string /*route_config_name*/, RouteConfigState>
- route_config_map_ ABSL_GUARDED_BY(mu_);
- // One entry for each watched CDS resource.
- std::map<std::string /*cluster_name*/, ClusterState> cluster_map_
+ // Map of existing xDS server channels.
+ std::map<XdsBootstrap::XdsServer, ChannelState*> xds_server_channel_map_
ABSL_GUARDED_BY(mu_);
- // One entry for each watched EDS resource.
- std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_
+
+ std::map<std::string /*authority*/, AuthorityState> authority_state_map_
ABSL_GUARDED_BY(mu_);
- // Load report data.
- std::map<
- std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
- LoadReportState>
- load_report_map_ ABSL_GUARDED_BY(mu_);
+ std::map<XdsBootstrap::XdsServer, LoadReportServer>
+ xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
- // Stores the most recent accepted resource version for each resource type.
- std::map<std::string /*type*/, std::string /*version*/> resource_version_map_
- ABSL_GUARDED_BY(mu_);
+ // Stores started watchers whose resource name was not parsed successfully,
+ // waiting to be cancelled or reset in Orphan().
+ std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
+ invalid_watchers_ ABSL_GUARDED_BY(mu_);
bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
};