diff options
Diffstat (limited to 'grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py')
-rw-r--r-- | grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py | 133 |
1 files changed, 114 insertions, 19 deletions
diff --git a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index 23dddf95..c5d8c040 100644 --- a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -20,12 +20,14 @@ modules. import datetime import functools import logging -from typing import Iterator, Optional +from typing import Iterable, List, Optional from framework.helpers import retryers +from framework.infrastructure import gcp from framework.infrastructure import k8s import framework.rpc from framework.rpc import grpc_channelz +from framework.rpc import grpc_csds from framework.rpc import grpc_testing from framework.test_app import base_runner @@ -34,11 +36,13 @@ logger = logging.getLogger(__name__) # Type aliases _timedelta = datetime.timedelta _LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient +_XdsUpdateClientConfigureServiceClient = grpc_testing.XdsUpdateClientConfigureServiceClient _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient _ChannelzChannel = grpc_channelz.Channel _ChannelzChannelState = grpc_channelz.ChannelState _ChannelzSubchannel = grpc_channelz.Subchannel _ChannelzSocket = grpc_channelz.Socket +_CsdsClient = grpc_csds.CsdsClient class XdsTestClient(framework.rpc.grpc.GrpcApp): @@ -68,9 +72,20 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): @property @functools.lru_cache(None) + def update_config(self): + return _XdsUpdateClientConfigureServiceClient( + self._make_channel(self.rpc_port)) + + @property + @functools.lru_cache(None) def channelz(self) -> _ChannelzServiceClient: return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) + @property + @functools.lru_cache(None) + def csds(self) -> _CsdsClient: + return _CsdsClient(self._make_channel(self.maintenance_port)) + def get_load_balancer_stats( self, *, @@ -83,6 +98,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): return self.load_balancer_stats.get_client_stats( num_rpcs=num_rpcs, timeout_sec=timeout_sec) + def get_load_balancer_accumulated_stats( + self, + *, + timeout_sec: Optional[int] = None, + ) -> grpc_testing.LoadBalancerAccumulatedStatsResponse: + """Shortcut to LoadBalancerStatsServiceClient.get_client_accumulated_stats()""" + return self.load_balancer_stats.get_client_accumulated_stats( + timeout_sec=timeout_sec) + def wait_for_active_server_channel(self) -> _ChannelzChannel: """Wait for the channel to the server to transition to READY. @@ -173,7 +197,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): f'Client has no {_ChannelzChannelState.Name(state)} channel with ' 'the server') - def get_server_channels(self, **kwargs) -> Iterator[_ChannelzChannel]: + def get_server_channels(self, **kwargs) -> Iterable[_ChannelzChannel]: return self.channelz.find_channels_for_target(self.server_target, **kwargs) @@ -189,6 +213,17 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): f'Not found a {_ChannelzChannelState.Name(state)} ' f'subchannel for channel_id {channel.ref.channel_id}') + def find_subchannels_with_state(self, state: _ChannelzChannelState, + **kwargs) -> List[_ChannelzSubchannel]: + subchannels = [] + for channel in self.channelz.find_channels_for_target( + self.server_target, **kwargs): + for subchannel in self.channelz.list_channel_subchannels( + channel, **kwargs): + if subchannel.data.state.state is state: + subchannels.append(subchannel) + return subchannels + class KubernetesClientRunner(base_runner.KubernetesBaseRunner): @@ -197,54 +232,92 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): *, deployment_name, image_name, - gcp_service_account, td_bootstrap_image, + gcp_api_manager: gcp.api.GcpApiManager, + gcp_project: str, + gcp_service_account: str, xds_server_uri=None, network='default', + config_scope=None, service_account_name=None, stats_port=8079, deployment_template='client.deployment.yaml', service_account_template='service-account.yaml', reuse_namespace=False, namespace_template=None, - debug_use_port_forwarding=False): + debug_use_port_forwarding=False, + enable_workload_identity=True): super().__init__(k8s_namespace, namespace_template, reuse_namespace) # Settings self.deployment_name = deployment_name self.image_name = image_name - self.gcp_service_account = gcp_service_account - self.service_account_name = service_account_name or deployment_name self.stats_port = stats_port # xDS bootstrap generator self.td_bootstrap_image = td_bootstrap_image self.xds_server_uri = xds_server_uri self.network = network + self.config_scope = config_scope self.deployment_template = deployment_template - self.service_account_template = service_account_template self.debug_use_port_forwarding = debug_use_port_forwarding + self.enable_workload_identity = enable_workload_identity + # Service account settings: + # Kubernetes service account + if self.enable_workload_identity: + self.service_account_name = service_account_name or deployment_name + self.service_account_template = service_account_template + else: + self.service_account_name = None + self.service_account_template = None + # GCP. + self.gcp_project = gcp_project + self.gcp_ui_url = gcp_api_manager.gcp_ui_url + # GCP service account to map to Kubernetes service account + self.gcp_service_account = gcp_service_account + # GCP IAM API used to grant allow workload service accounts permission + # to use GCP service account identity. + self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) # Mutable state self.deployment: Optional[k8s.V1Deployment] = None self.service_account: Optional[k8s.V1ServiceAccount] = None - self.port_forwarder = None + self.port_forwarder: Optional[k8s.PortForwarder] = None + # TODO(sergiitk): make rpc UnaryCall enum or get it from proto def run(self, *, server_target, rpc='UnaryCall', qps=25, + metadata='', secure_mode=False, print_response=False) -> XdsTestClient: + logger.info( + 'Deploying xDS test client "%s" to k8s namespace %s: ' + 'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s ' + 'print_response=%s', self.deployment_name, self.k8s_namespace.name, + server_target, rpc, qps, metadata, secure_mode, print_response) + self._logs_explorer_link(deployment_name=self.deployment_name, + namespace_name=self.k8s_namespace.name, + gcp_project=self.gcp_project, + gcp_ui_url=self.gcp_ui_url) + super().run() - # TODO(sergiitk): make rpc UnaryCall enum or get it from proto - # Create service account - self.service_account = self._create_service_account( - self.service_account_template, - service_account_name=self.service_account_name, - namespace_name=self.k8s_namespace.name, - gcp_service_account=self.gcp_service_account) + if self.enable_workload_identity: + # Allow Kubernetes service account to use the GCP service account + # identity. + self._grant_workload_identity_user( + gcp_iam=self.gcp_iam, + gcp_service_account=self.gcp_service_account, + service_account_name=self.service_account_name) + + # Create service account + self.service_account = self._create_service_account( + self.service_account_template, + service_account_name=self.service_account_name, + namespace_name=self.k8s_namespace.name, + gcp_service_account=self.gcp_service_account) # Always create a new deployment self.deployment = self._create_deployment( @@ -256,10 +329,12 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): td_bootstrap_image=self.td_bootstrap_image, xds_server_uri=self.xds_server_uri, network=self.network, + config_scope=self.config_scope, stats_port=self.stats_port, server_target=server_target, rpc=rpc, qps=qps, + metadata=metadata, secure_mode=secure_mode, print_response=print_response) @@ -269,6 +344,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0] self._wait_pod_started(pod.metadata.name) pod_ip = pod.status.pod_ip + rpc_port = self.stats_port rpc_host = None # Experimental, for local debugging. @@ -277,21 +353,40 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): pod_ip, self.stats_port) self.port_forwarder = self.k8s_namespace.port_forward_pod( pod, remote_port=self.stats_port) - rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS + rpc_port = self.port_forwarder.local_port + rpc_host = self.port_forwarder.local_address return XdsTestClient(ip=pod_ip, - rpc_port=self.stats_port, + rpc_port=rpc_port, server_target=server_target, rpc_host=rpc_host) def cleanup(self, *, force=False, force_namespace=False): if self.port_forwarder: - self.k8s_namespace.port_forward_stop(self.port_forwarder) + self.port_forwarder.close() self.port_forwarder = None if self.deployment or force: self._delete_deployment(self.deployment_name) self.deployment = None - if self.service_account or force: + if self.enable_workload_identity and (self.service_account or force): + self._revoke_workload_identity_user( + gcp_iam=self.gcp_iam, + gcp_service_account=self.gcp_service_account, + service_account_name=self.service_account_name) self._delete_service_account(self.service_account_name) self.service_account = None super().cleanup(force=force_namespace and force) + + @classmethod + def make_namespace_name(cls, + resource_prefix: str, + resource_suffix: str, + name: str = 'client') -> str: + """A helper to make consistent XdsTestClient kubernetes namespace name + for given resource prefix and suffix. + + Note: the idea is to intentionally produce different namespace name for + the test server, and the test client, as that closely mimics real-world + deployments. + """ + return cls._make_namespace_name(resource_prefix, resource_suffix, name) |