diff options
Diffstat (limited to 'grpc/src/core/ext/xds/xds_client.h')
-rw-r--r-- | grpc/src/core/ext/xds/xds_client.h | 293 |
1 files changed, 131 insertions, 162 deletions
diff --git a/grpc/src/core/ext/xds/xds_client.h b/grpc/src/core/ext/xds/xds_client.h index 0ee84e78..5fe050cc 100644 --- a/grpc/src/core/ext/xds/xds_client.h +++ b/grpc/src/core/ext/xds/xds_client.h @@ -28,6 +28,7 @@ #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/ext/xds/xds_resource_type.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/memory.h" @@ -35,6 +36,8 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/work_serializer.h" +#include "src/core/lib/uri/uri_parser.h" namespace grpc_core { @@ -43,40 +46,19 @@ extern TraceFlag grpc_xds_client_refcount_trace; class XdsClient : public DualRefCounted<XdsClient> { public: - // Listener data watcher interface. Implemented by callers. - class ListenerWatcherInterface { + // Resource watcher interface. Implemented by callers. + // Note: Most callers will not use this API directly but rather via a + // resource-type-specific wrapper API provided by the relevant + // XdsResourceType implementation. + class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> { public: - virtual ~ListenerWatcherInterface() = default; - virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; - }; - - // RouteConfiguration data watcher interface. Implemented by callers. - class RouteConfigWatcherInterface { - public: - virtual ~RouteConfigWatcherInterface() = default; - virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; - }; - - // Cluster data watcher interface. Implemented by callers. - class ClusterWatcherInterface { - public: - virtual ~ClusterWatcherInterface() = default; - virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; - }; - - // Endpoint data watcher interface. Implemented by callers. - class EndpointWatcherInterface { - public: - virtual ~EndpointWatcherInterface() = default; - virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; + virtual void OnGenericResourceChanged( + const XdsResourceType::ResourceData* resource) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnError(absl::Status status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnResourceDoesNotExist() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; // Factory function to get or create the global XdsClient instance. @@ -102,78 +84,39 @@ class XdsClient : public DualRefCounted<XdsClient> { grpc_pollset_set* interested_parties() const { return interested_parties_; } - // TODO(roth): When we add federation, there will be multiple channels - // inside the XdsClient, and the set of channels may change over time, - // but not every channel may use every one of the child channels, so - // this API will need to change. At minumum, we will need to hold a - // ref to the parent channelz node so that we can update its list of - // children as the set of xDS channels changes. However, we may also - // want to make this a bit more selective such that only those - // channels on which a given parent channel is actually requesting - // resources will actually be marked as its children. - void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node); - void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node); - void Orphan() override; - // Start and cancel listener data watch for a listener. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchListenerData(absl::string_view listener_name, - std::unique_ptr<ListenerWatcherInterface> watcher); - void CancelListenerDataWatch(absl::string_view listener_name, - ListenerWatcherInterface* watcher, - bool delay_unsubscription = false); - - // Start and cancel route config data watch for a listener. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchRouteConfigData( - absl::string_view route_config_name, - std::unique_ptr<RouteConfigWatcherInterface> watcher); - void CancelRouteConfigDataWatch(absl::string_view route_config_name, - RouteConfigWatcherInterface* watcher, - bool delay_unsubscription = false); - - // Start and cancel cluster data watch for a cluster. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchClusterData(absl::string_view cluster_name, - std::unique_ptr<ClusterWatcherInterface> watcher); - void CancelClusterDataWatch(absl::string_view cluster_name, - ClusterWatcherInterface* watcher, - bool delay_unsubscription = false); - - // Start and cancel endpoint data watch for a cluster. + // Start and cancel watch for a resource. + // // The XdsClient takes ownership of the watcher, but the caller may // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) // If the caller is going to start a new watch after cancelling the // old one, it should set delay_unsubscription to true. - void WatchEndpointData(absl::string_view eds_service_name, - std::unique_ptr<EndpointWatcherInterface> watcher); - void CancelEndpointDataWatch(absl::string_view eds_service_name, - EndpointWatcherInterface* watcher, - bool delay_unsubscription = false); + // + // The resource type object must be a global singleton, since the first + // time the XdsClient sees a particular resource type object, it will + // store the pointer to that object as the authoritative implementation for + // its type URLs. The resource type object must outlive the XdsClient object, + // and it is illegal to start a subsequent watch for the same type URLs using + // a different resource type object. + // + // Note: Most callers will not use this API directly but rather via a + // resource-type-specific wrapper API provided by the relevant + // XdsResourceType implementation. + void WatchResource(const XdsResourceType* type, absl::string_view name, + RefCountedPtr<ResourceWatcherInterface> watcher); + void CancelResourceWatch(const XdsResourceType* type, + absl::string_view listener_name, + ResourceWatcherInterface* watcher, + bool delay_unsubscription = false); // Adds and removes drop stats for cluster_name and eds_service_name. RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( - absl::string_view lrs_server, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name); - void RemoveClusterDropStats(absl::string_view /*lrs_server*/, + void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, XdsClusterDropStats* cluster_drop_stats); @@ -181,11 +124,11 @@ class XdsClient : public DualRefCounted<XdsClient> { // Adds and removes locality stats for cluster_name and eds_service_name // for the specified locality. RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( - absl::string_view lrs_server, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr<XdsLocalityName> locality); void RemoveClusterLocalityStats( - absl::string_view /*lrs_server*/, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, const RefCountedPtr<XdsLocalityName>& locality, XdsClusterLocalityStats* cluster_locality_stats); @@ -208,14 +151,25 @@ class XdsClient : public DualRefCounted<XdsClient> { const grpc_channel_args& args); private: + struct XdsResourceKey { + std::string id; + std::vector<URI::QueryParam> query_params; + + bool operator<(const XdsResourceKey& other) const { + int c = id.compare(other.id); + if (c != 0) return c < 0; + return query_params < other.query_params; + } + }; + + struct XdsResourceName { + std::string authority; + XdsResourceKey key; + }; + // Contains a channel to the xds server and all the data related to the // channel. Holds a ref to the xds client object. - // - // Currently, there is only one ChannelState object per XdsClient - // object, and it has essentially the same lifetime. But in the - // future, when we add federation support, a single XdsClient may have - // multiple underlying channels to talk to different xDS servers. - class ChannelState : public InternallyRefCounted<ChannelState> { + class ChannelState : public DualRefCounted<ChannelState> { public: template <typename T> class RetryableCall; @@ -235,17 +189,20 @@ class XdsClient : public DualRefCounted<XdsClient> { LrsCallState* lrs_calld() const; void MaybeStartLrsCall(); - void StopLrsCall(); + void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool HasAdsCall() const; bool HasActiveAdsCall() const; - void StartConnectivityWatchLocked(); + void StartConnectivityWatchLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void CancelConnectivityWatchLocked(); - void SubscribeLocked(const std::string& type_url, const std::string& name) + void SubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void UnsubscribeLocked(const std::string& type_url, const std::string& name, + void UnsubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name, bool delay_unsubscription) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); @@ -260,46 +217,29 @@ class XdsClient : public DualRefCounted<XdsClient> { // The channel and its status. grpc_channel* channel_; bool shutting_down_ = false; - StateWatcher* watcher_ = nullptr; + StateWatcher* watcher_; // The retryable XDS calls. OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_; OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_; - }; - struct ListenerState { - std::map<ListenerWatcherInterface*, - std::unique_ptr<ListenerWatcherInterface>> - watchers; - // The latest data seen from LDS. - absl::optional<XdsApi::LdsUpdate> update; - XdsApi::ResourceMetadata meta; - }; - - struct RouteConfigState { - std::map<RouteConfigWatcherInterface*, - std::unique_ptr<RouteConfigWatcherInterface>> - watchers; - // The latest data seen from RDS. - absl::optional<XdsApi::RdsUpdate> update; - XdsApi::ResourceMetadata meta; + // Stores the most recent accepted resource version for each resource type. + std::map<const XdsResourceType*, std::string /*version*/> + resource_type_version_map_; }; - struct ClusterState { - std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>> + struct ResourceState { + std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>> watchers; - // The latest data seen from CDS. - absl::optional<XdsApi::CdsUpdate> update; + // The latest data seen for the resource. + std::unique_ptr<XdsResourceType::ResourceData> resource; XdsApi::ResourceMetadata meta; }; - struct EndpointState { - std::map<EndpointWatcherInterface*, - std::unique_ptr<EndpointWatcherInterface>> - watchers; - // The latest data seen from EDS. - absl::optional<XdsApi::EdsUpdate> update; - XdsApi::ResourceMetadata meta; + struct AuthorityState { + RefCountedPtr<ChannelState> channel_state; + std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>> + resource_map; }; struct LoadReportState { @@ -313,55 +253,84 @@ class XdsClient : public DualRefCounted<XdsClient> { std::map<RefCountedPtr<XdsLocalityName>, LocalityState, XdsLocalityName::Less> locality_stats; - grpc_millis last_report_time = ExecCtx::Get()->Now(); + Timestamp last_report_time = ExecCtx::Get()->Now(); + }; + + // Load report data. + using LoadReportMap = std::map< + std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, + LoadReportState>; + + struct LoadReportServer { + RefCountedPtr<ChannelState> channel_state; + LoadReportMap load_report_map; }; // Sends an error notification to all watchers. - void NotifyOnErrorLocked(grpc_error_handle error) + void NotifyOnErrorLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - - XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( - bool send_all_clusters, const std::set<std::string>& clusters) + // Sends an error notification to a specific set of watchers. + void NotifyWatchersOnErrorLocked( + const std::map<ResourceWatcherInterface*, + RefCountedPtr<ResourceWatcherInterface>>& watchers, + absl::Status status); + // Sends a resource-does-not-exist notification to a specific set of watchers. + void NotifyWatchersOnResourceDoesNotExist( + const std::map<ResourceWatcherInterface*, + RefCountedPtr<ResourceWatcherInterface>>& watchers); + + void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - void UpdateResourceMetadataWithFailedParseResultLocked( - grpc_millis update_time, const XdsApi::AdsParseResult& result) + // Gets the type for resource_type, or null if the type is unknown. + const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + absl::StatusOr<XdsResourceName> ParseXdsResourceName( + absl::string_view name, const XdsResourceType* type); + static std::string ConstructFullXdsResourceName( + absl::string_view authority, absl::string_view resource_type, + const XdsResourceKey& key); + + XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( + const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, + const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked( + const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + std::unique_ptr<XdsBootstrap> bootstrap_; grpc_channel_args* args_; - const grpc_millis request_timeout_; + const Duration request_timeout_; + const bool xds_federation_enabled_; grpc_pollset_set* interested_parties_; OrphanablePtr<CertificateProviderStore> certificate_provider_store_; XdsApi api_; + WorkSerializer work_serializer_; Mutex mu_; - // The channel for communicating with the xds server. - OrphanablePtr<ChannelState> chand_ ABSL_GUARDED_BY(mu_); + // Stores resource type objects seen by type URL. + std::map<absl::string_view /*resource_type*/, const XdsResourceType*> + resource_types_ ABSL_GUARDED_BY(mu_); + std::map<absl::string_view /*v2_resource_type*/, const XdsResourceType*> + v2_resource_types_ ABSL_GUARDED_BY(mu_); + upb::SymbolTable symtab_ ABSL_GUARDED_BY(mu_); - // One entry for each watched LDS resource. - std::map<std::string /*listener_name*/, ListenerState> listener_map_ - ABSL_GUARDED_BY(mu_); - // One entry for each watched RDS resource. - std::map<std::string /*route_config_name*/, RouteConfigState> - route_config_map_ ABSL_GUARDED_BY(mu_); - // One entry for each watched CDS resource. - std::map<std::string /*cluster_name*/, ClusterState> cluster_map_ + // Map of existing xDS server channels. + std::map<XdsBootstrap::XdsServer, ChannelState*> xds_server_channel_map_ ABSL_GUARDED_BY(mu_); - // One entry for each watched EDS resource. - std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_ + + std::map<std::string /*authority*/, AuthorityState> authority_state_map_ ABSL_GUARDED_BY(mu_); - // Load report data. - std::map< - std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, - LoadReportState> - load_report_map_ ABSL_GUARDED_BY(mu_); + std::map<XdsBootstrap::XdsServer, LoadReportServer> + xds_load_report_server_map_ ABSL_GUARDED_BY(mu_); - // Stores the most recent accepted resource version for each resource type. - std::map<std::string /*type*/, std::string /*version*/> resource_version_map_ - ABSL_GUARDED_BY(mu_); + // Stores started watchers whose resource name was not parsed successfully, + // waiting to be cancelled or reset in Orphan(). + std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>> + invalid_watchers_ ABSL_GUARDED_BY(mu_); bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; }; |