summaryrefslogtreecommitdiff
path: root/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py')
-rw-r--r--grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py84
1 files changed, 48 insertions, 36 deletions
diff --git a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
index 31ec6660..23dddf95 100644
--- a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
+++ b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
@@ -72,10 +72,10 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
def get_load_balancer_stats(
- self,
- *,
- num_rpcs: int,
- timeout_sec: Optional[int] = None,
+ self,
+ *,
+ num_rpcs: int,
+ timeout_sec: Optional[int] = None,
) -> grpc_testing.LoadBalancerStatsResponse:
"""
Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
@@ -83,9 +83,6 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return self.load_balancer_stats.get_client_stats(
num_rpcs=num_rpcs, timeout_sec=timeout_sec)
- def get_server_channels(self) -> Iterator[_ChannelzChannel]:
- return self.channelz.find_channels_for_target(self.server_target)
-
def wait_for_active_server_channel(self) -> _ChannelzChannel:
"""Wait for the channel to the server to transition to READY.
@@ -94,16 +91,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
"""
return self.wait_for_server_channel_state(_ChannelzChannelState.READY)
- def get_active_server_channel(self) -> _ChannelzChannel:
- """Return a READY channel to the server.
-
- Raises:
- GrpcApp.NotFound: If there's no READY channel to the server.
- """
- return self.find_server_channel_with_state(_ChannelzChannelState.READY)
-
def get_active_server_channel_socket(self) -> _ChannelzSocket:
- channel = self.get_active_server_channel()
+ channel = self.find_server_channel_with_state(
+ _ChannelzChannelState.READY)
# Get the first subchannel of the active channel to the server.
logger.debug(
'Retrieving client -> server socket, '
@@ -121,32 +111,45 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
logger.debug('Found client -> server socket: %s', socket.ref.name)
return socket
- def wait_for_server_channel_state(self,
- state: _ChannelzChannelState,
- *,
- timeout: Optional[_timedelta] = None
- ) -> _ChannelzChannel:
+ def wait_for_server_channel_state(
+ self,
+ state: _ChannelzChannelState,
+ *,
+ timeout: Optional[_timedelta] = None,
+ rpc_deadline: Optional[_timedelta] = None) -> _ChannelzChannel:
+ # When polling for a state, prefer smaller wait times to avoid
+ # exhausting all allowed time on a single long RPC.
+ if rpc_deadline is None:
+ rpc_deadline = _timedelta(seconds=30)
+
# Fine-tuned to wait for the channel to the server.
retryer = retryers.exponential_retryer_with_timeout(
wait_min=_timedelta(seconds=10),
wait_max=_timedelta(seconds=25),
- timeout=_timedelta(minutes=3) if timeout is None else timeout)
+ timeout=_timedelta(minutes=5) if timeout is None else timeout)
logger.info('Waiting for client %s to report a %s channel to %s',
self.ip, _ChannelzChannelState.Name(state),
self.server_target)
- channel = retryer(self.find_server_channel_with_state, state)
+ channel = retryer(self.find_server_channel_with_state,
+ state,
+ rpc_deadline=rpc_deadline)
logger.info('Client %s channel to %s transitioned to state %s:\n%s',
self.ip, self.server_target,
_ChannelzChannelState.Name(state), channel)
return channel
- def find_server_channel_with_state(self,
- state: _ChannelzChannelState,
- *,
- check_subchannel=True
- ) -> _ChannelzChannel:
- for channel in self.get_server_channels():
+ def find_server_channel_with_state(
+ self,
+ state: _ChannelzChannelState,
+ *,
+ rpc_deadline: Optional[_timedelta] = None,
+ check_subchannel=True) -> _ChannelzChannel:
+ rpc_params = {}
+ if rpc_deadline is not None:
+ rpc_params['deadline_sec'] = rpc_deadline.total_seconds()
+
+ for channel in self.get_server_channels(**rpc_params):
channel_state: _ChannelzChannelState = channel.data.state.state
logger.info('Server channel: %s, state: %s', channel.ref.name,
_ChannelzChannelState.Name(channel_state))
@@ -156,8 +159,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
# one subchannel in the requested state.
try:
subchannel = self.find_subchannel_with_state(
- channel, state)
- logger.info('Found subchannel in state %s: %s', state,
+ channel, state, **rpc_params)
+ logger.info('Found subchannel in state %s: %s',
+ _ChannelzChannelState.Name(state),
subchannel)
except self.NotFound as e:
# Otherwise, keep searching.
@@ -169,10 +173,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
f'Client has no {_ChannelzChannelState.Name(state)} channel with '
'the server')
+ def get_server_channels(self, **kwargs) -> Iterator[_ChannelzChannel]:
+ return self.channelz.find_channels_for_target(self.server_target,
+ **kwargs)
+
def find_subchannel_with_state(self, channel: _ChannelzChannel,
- state: _ChannelzChannelState
- ) -> _ChannelzSubchannel:
- for subchannel in self.channelz.list_channel_subchannels(channel):
+ state: _ChannelzChannelState,
+ **kwargs) -> _ChannelzSubchannel:
+ subchannels = self.channelz.list_channel_subchannels(channel, **kwargs)
+ for subchannel in subchannels:
if subchannel.data.state.state is state:
return subchannel
@@ -190,9 +199,10 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
image_name,
gcp_service_account,
td_bootstrap_image,
+ xds_server_uri=None,
+ network='default',
service_account_name=None,
stats_port=8079,
- network='default',
deployment_template='client.deployment.yaml',
service_account_template='service-account.yaml',
reuse_namespace=False,
@@ -208,6 +218,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
self.stats_port = stats_port
# xDS bootstrap generator
self.td_bootstrap_image = td_bootstrap_image
+ self.xds_server_uri = xds_server_uri
self.network = network
self.deployment_template = deployment_template
self.service_account_template = service_account_template
@@ -243,7 +254,8 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
namespace_name=self.k8s_namespace.name,
service_account_name=self.service_account_name,
td_bootstrap_image=self.td_bootstrap_image,
- network_name=self.network,
+ xds_server_uri=self.xds_server_uri,
+ network=self.network,
stats_port=self.stats_port,
server_target=server_target,
rpc=rpc,