diff options
Diffstat (limited to 'grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py')
-rw-r--r-- | grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py | 84 |
1 files changed, 48 insertions, 36 deletions
diff --git a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index 31ec6660..23dddf95 100644 --- a/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -72,10 +72,10 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) def get_load_balancer_stats( - self, - *, - num_rpcs: int, - timeout_sec: Optional[int] = None, + self, + *, + num_rpcs: int, + timeout_sec: Optional[int] = None, ) -> grpc_testing.LoadBalancerStatsResponse: """ Shortcut to LoadBalancerStatsServiceClient.get_client_stats() @@ -83,9 +83,6 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): return self.load_balancer_stats.get_client_stats( num_rpcs=num_rpcs, timeout_sec=timeout_sec) - def get_server_channels(self) -> Iterator[_ChannelzChannel]: - return self.channelz.find_channels_for_target(self.server_target) - def wait_for_active_server_channel(self) -> _ChannelzChannel: """Wait for the channel to the server to transition to READY. @@ -94,16 +91,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): """ return self.wait_for_server_channel_state(_ChannelzChannelState.READY) - def get_active_server_channel(self) -> _ChannelzChannel: - """Return a READY channel to the server. - - Raises: - GrpcApp.NotFound: If there's no READY channel to the server. - """ - return self.find_server_channel_with_state(_ChannelzChannelState.READY) - def get_active_server_channel_socket(self) -> _ChannelzSocket: - channel = self.get_active_server_channel() + channel = self.find_server_channel_with_state( + _ChannelzChannelState.READY) # Get the first subchannel of the active channel to the server. logger.debug( 'Retrieving client -> server socket, ' @@ -121,32 +111,45 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): logger.debug('Found client -> server socket: %s', socket.ref.name) return socket - def wait_for_server_channel_state(self, - state: _ChannelzChannelState, - *, - timeout: Optional[_timedelta] = None - ) -> _ChannelzChannel: + def wait_for_server_channel_state( + self, + state: _ChannelzChannelState, + *, + timeout: Optional[_timedelta] = None, + rpc_deadline: Optional[_timedelta] = None) -> _ChannelzChannel: + # When polling for a state, prefer smaller wait times to avoid + # exhausting all allowed time on a single long RPC. + if rpc_deadline is None: + rpc_deadline = _timedelta(seconds=30) + # Fine-tuned to wait for the channel to the server. retryer = retryers.exponential_retryer_with_timeout( wait_min=_timedelta(seconds=10), wait_max=_timedelta(seconds=25), - timeout=_timedelta(minutes=3) if timeout is None else timeout) + timeout=_timedelta(minutes=5) if timeout is None else timeout) logger.info('Waiting for client %s to report a %s channel to %s', self.ip, _ChannelzChannelState.Name(state), self.server_target) - channel = retryer(self.find_server_channel_with_state, state) + channel = retryer(self.find_server_channel_with_state, + state, + rpc_deadline=rpc_deadline) logger.info('Client %s channel to %s transitioned to state %s:\n%s', self.ip, self.server_target, _ChannelzChannelState.Name(state), channel) return channel - def find_server_channel_with_state(self, - state: _ChannelzChannelState, - *, - check_subchannel=True - ) -> _ChannelzChannel: - for channel in self.get_server_channels(): + def find_server_channel_with_state( + self, + state: _ChannelzChannelState, + *, + rpc_deadline: Optional[_timedelta] = None, + check_subchannel=True) -> _ChannelzChannel: + rpc_params = {} + if rpc_deadline is not None: + rpc_params['deadline_sec'] = rpc_deadline.total_seconds() + + for channel in self.get_server_channels(**rpc_params): channel_state: _ChannelzChannelState = channel.data.state.state logger.info('Server channel: %s, state: %s', channel.ref.name, _ChannelzChannelState.Name(channel_state)) @@ -156,8 +159,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): # one subchannel in the requested state. try: subchannel = self.find_subchannel_with_state( - channel, state) - logger.info('Found subchannel in state %s: %s', state, + channel, state, **rpc_params) + logger.info('Found subchannel in state %s: %s', + _ChannelzChannelState.Name(state), subchannel) except self.NotFound as e: # Otherwise, keep searching. @@ -169,10 +173,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): f'Client has no {_ChannelzChannelState.Name(state)} channel with ' 'the server') + def get_server_channels(self, **kwargs) -> Iterator[_ChannelzChannel]: + return self.channelz.find_channels_for_target(self.server_target, + **kwargs) + def find_subchannel_with_state(self, channel: _ChannelzChannel, - state: _ChannelzChannelState - ) -> _ChannelzSubchannel: - for subchannel in self.channelz.list_channel_subchannels(channel): + state: _ChannelzChannelState, + **kwargs) -> _ChannelzSubchannel: + subchannels = self.channelz.list_channel_subchannels(channel, **kwargs) + for subchannel in subchannels: if subchannel.data.state.state is state: return subchannel @@ -190,9 +199,10 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): image_name, gcp_service_account, td_bootstrap_image, + xds_server_uri=None, + network='default', service_account_name=None, stats_port=8079, - network='default', deployment_template='client.deployment.yaml', service_account_template='service-account.yaml', reuse_namespace=False, @@ -208,6 +218,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): self.stats_port = stats_port # xDS bootstrap generator self.td_bootstrap_image = td_bootstrap_image + self.xds_server_uri = xds_server_uri self.network = network self.deployment_template = deployment_template self.service_account_template = service_account_template @@ -243,7 +254,8 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): namespace_name=self.k8s_namespace.name, service_account_name=self.service_account_name, td_bootstrap_image=self.td_bootstrap_image, - network_name=self.network, + xds_server_uri=self.xds_server_uri, + network=self.network, stats_port=self.stats_port, server_target=server_target, rpc=rpc, |