summaryrefslogtreecommitdiff
path: root/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py')
-rw-r--r--grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py133
1 files changed, 114 insertions, 19 deletions
diff --git a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
index 23dddf95..c5d8c040 100644
--- a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
+++ b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
@@ -20,12 +20,14 @@ modules.
import datetime
import functools
import logging
-from typing import Iterator, Optional
+from typing import Iterable, List, Optional
from framework.helpers import retryers
+from framework.infrastructure import gcp
from framework.infrastructure import k8s
import framework.rpc
from framework.rpc import grpc_channelz
+from framework.rpc import grpc_csds
from framework.rpc import grpc_testing
from framework.test_app import base_runner
@@ -34,11 +36,13 @@ logger = logging.getLogger(__name__)
# Type aliases
_timedelta = datetime.timedelta
_LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient
+_XdsUpdateClientConfigureServiceClient = grpc_testing.XdsUpdateClientConfigureServiceClient
_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
_ChannelzChannel = grpc_channelz.Channel
_ChannelzChannelState = grpc_channelz.ChannelState
_ChannelzSubchannel = grpc_channelz.Subchannel
_ChannelzSocket = grpc_channelz.Socket
+_CsdsClient = grpc_csds.CsdsClient
class XdsTestClient(framework.rpc.grpc.GrpcApp):
@@ -68,9 +72,20 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
@property
@functools.lru_cache(None)
+ def update_config(self):
+ return _XdsUpdateClientConfigureServiceClient(
+ self._make_channel(self.rpc_port))
+
+ @property
+ @functools.lru_cache(None)
def channelz(self) -> _ChannelzServiceClient:
return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
+ @property
+ @functools.lru_cache(None)
+ def csds(self) -> _CsdsClient:
+ return _CsdsClient(self._make_channel(self.maintenance_port))
+
def get_load_balancer_stats(
self,
*,
@@ -83,6 +98,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return self.load_balancer_stats.get_client_stats(
num_rpcs=num_rpcs, timeout_sec=timeout_sec)
+ def get_load_balancer_accumulated_stats(
+ self,
+ *,
+ timeout_sec: Optional[int] = None,
+ ) -> grpc_testing.LoadBalancerAccumulatedStatsResponse:
+ """Shortcut to LoadBalancerStatsServiceClient.get_client_accumulated_stats()"""
+ return self.load_balancer_stats.get_client_accumulated_stats(
+ timeout_sec=timeout_sec)
+
def wait_for_active_server_channel(self) -> _ChannelzChannel:
"""Wait for the channel to the server to transition to READY.
@@ -173,7 +197,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
f'Client has no {_ChannelzChannelState.Name(state)} channel with '
'the server')
- def get_server_channels(self, **kwargs) -> Iterator[_ChannelzChannel]:
+ def get_server_channels(self, **kwargs) -> Iterable[_ChannelzChannel]:
return self.channelz.find_channels_for_target(self.server_target,
**kwargs)
@@ -189,6 +213,17 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
f'Not found a {_ChannelzChannelState.Name(state)} '
f'subchannel for channel_id {channel.ref.channel_id}')
+ def find_subchannels_with_state(self, state: _ChannelzChannelState,
+ **kwargs) -> List[_ChannelzSubchannel]:
+ subchannels = []
+ for channel in self.channelz.find_channels_for_target(
+ self.server_target, **kwargs):
+ for subchannel in self.channelz.list_channel_subchannels(
+ channel, **kwargs):
+ if subchannel.data.state.state is state:
+ subchannels.append(subchannel)
+ return subchannels
+
class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
@@ -197,54 +232,92 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
*,
deployment_name,
image_name,
- gcp_service_account,
td_bootstrap_image,
+ gcp_api_manager: gcp.api.GcpApiManager,
+ gcp_project: str,
+ gcp_service_account: str,
xds_server_uri=None,
network='default',
+ config_scope=None,
service_account_name=None,
stats_port=8079,
deployment_template='client.deployment.yaml',
service_account_template='service-account.yaml',
reuse_namespace=False,
namespace_template=None,
- debug_use_port_forwarding=False):
+ debug_use_port_forwarding=False,
+ enable_workload_identity=True):
super().__init__(k8s_namespace, namespace_template, reuse_namespace)
# Settings
self.deployment_name = deployment_name
self.image_name = image_name
- self.gcp_service_account = gcp_service_account
- self.service_account_name = service_account_name or deployment_name
self.stats_port = stats_port
# xDS bootstrap generator
self.td_bootstrap_image = td_bootstrap_image
self.xds_server_uri = xds_server_uri
self.network = network
+ self.config_scope = config_scope
self.deployment_template = deployment_template
- self.service_account_template = service_account_template
self.debug_use_port_forwarding = debug_use_port_forwarding
+ self.enable_workload_identity = enable_workload_identity
+ # Service account settings:
+ # Kubernetes service account
+ if self.enable_workload_identity:
+ self.service_account_name = service_account_name or deployment_name
+ self.service_account_template = service_account_template
+ else:
+ self.service_account_name = None
+ self.service_account_template = None
+ # GCP.
+ self.gcp_project = gcp_project
+ self.gcp_ui_url = gcp_api_manager.gcp_ui_url
+ # GCP service account to map to Kubernetes service account
+ self.gcp_service_account = gcp_service_account
+ # GCP IAM API used to grant allow workload service accounts permission
+ # to use GCP service account identity.
+ self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
# Mutable state
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
- self.port_forwarder = None
+ self.port_forwarder: Optional[k8s.PortForwarder] = None
+ # TODO(sergiitk): make rpc UnaryCall enum or get it from proto
def run(self,
*,
server_target,
rpc='UnaryCall',
qps=25,
+ metadata='',
secure_mode=False,
print_response=False) -> XdsTestClient:
+ logger.info(
+ 'Deploying xDS test client "%s" to k8s namespace %s: '
+ 'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s '
+ 'print_response=%s', self.deployment_name, self.k8s_namespace.name,
+ server_target, rpc, qps, metadata, secure_mode, print_response)
+ self._logs_explorer_link(deployment_name=self.deployment_name,
+ namespace_name=self.k8s_namespace.name,
+ gcp_project=self.gcp_project,
+ gcp_ui_url=self.gcp_ui_url)
+
super().run()
- # TODO(sergiitk): make rpc UnaryCall enum or get it from proto
- # Create service account
- self.service_account = self._create_service_account(
- self.service_account_template,
- service_account_name=self.service_account_name,
- namespace_name=self.k8s_namespace.name,
- gcp_service_account=self.gcp_service_account)
+ if self.enable_workload_identity:
+ # Allow Kubernetes service account to use the GCP service account
+ # identity.
+ self._grant_workload_identity_user(
+ gcp_iam=self.gcp_iam,
+ gcp_service_account=self.gcp_service_account,
+ service_account_name=self.service_account_name)
+
+ # Create service account
+ self.service_account = self._create_service_account(
+ self.service_account_template,
+ service_account_name=self.service_account_name,
+ namespace_name=self.k8s_namespace.name,
+ gcp_service_account=self.gcp_service_account)
# Always create a new deployment
self.deployment = self._create_deployment(
@@ -256,10 +329,12 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
td_bootstrap_image=self.td_bootstrap_image,
xds_server_uri=self.xds_server_uri,
network=self.network,
+ config_scope=self.config_scope,
stats_port=self.stats_port,
server_target=server_target,
rpc=rpc,
qps=qps,
+ metadata=metadata,
secure_mode=secure_mode,
print_response=print_response)
@@ -269,6 +344,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
self._wait_pod_started(pod.metadata.name)
pod_ip = pod.status.pod_ip
+ rpc_port = self.stats_port
rpc_host = None
# Experimental, for local debugging.
@@ -277,21 +353,40 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
pod_ip, self.stats_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=self.stats_port)
- rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
+ rpc_port = self.port_forwarder.local_port
+ rpc_host = self.port_forwarder.local_address
return XdsTestClient(ip=pod_ip,
- rpc_port=self.stats_port,
+ rpc_port=rpc_port,
server_target=server_target,
rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False):
if self.port_forwarder:
- self.k8s_namespace.port_forward_stop(self.port_forwarder)
+ self.port_forwarder.close()
self.port_forwarder = None
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None
- if self.service_account or force:
+ if self.enable_workload_identity and (self.service_account or force):
+ self._revoke_workload_identity_user(
+ gcp_iam=self.gcp_iam,
+ gcp_service_account=self.gcp_service_account,
+ service_account_name=self.service_account_name)
self._delete_service_account(self.service_account_name)
self.service_account = None
super().cleanup(force=force_namespace and force)
+
+ @classmethod
+ def make_namespace_name(cls,
+ resource_prefix: str,
+ resource_suffix: str,
+ name: str = 'client') -> str:
+ """A helper to make consistent XdsTestClient kubernetes namespace name
+ for given resource prefix and suffix.
+
+ Note: the idea is to intentionally produce different namespace name for
+ the test server, and the test client, as that closely mimics real-world
+ deployments.
+ """
+ return cls._make_namespace_name(resource_prefix, resource_suffix, name)