diff options
author | Sergii Tkachenko <sergiitk@google.com> | 2023-05-30 22:58:49 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-30 19:58:49 -0700 |
commit | 531a6be335a7fcee4ad35d0b7b79d85106b89331 (patch) | |
tree | 5cc6b043ddba006ca4bb569c8ecb8a1a82d71780 | |
parent | 3225a9f117317661120ddeb6118ba77263068470 (diff) | |
download | grpc-grpc-531a6be335a7fcee4ad35d0b7b79d85106b89331.tar.gz |
[PSM Interop] Fix an issue with restarting k8s runner (#33280)
Fixes the issue introduced in https://github.com/grpc/grpc/pull/33104,
where stopping the current run didn't reset `self.time_start_requested`,
`self.time_start_completed`, `self.time_start_stopped`. Because of this,
the subsetting test (the only one [redeploying the client
app](https://github.com/grpc/grpc/blob/10001d16a9d4f00c4c7962cb1c310c80d4b9c992/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py#L73C1-L74))
started failing with:
```py
Traceback (most recent call last):
File "xds_k8s_test_driver/tests/subsetting_test.py", line 76, in test_subsetting_basic
test_client: _XdsTestClient = self.startTestClient(
File "xds_k8s_test_driver/framework/xds_k8s_testcase.py", line 615, in startTestClient
test_client = self.client_runner.run(server_target=test_server.xds_uri,
File "xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py", line 110, in run
super().run()
File "xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py", line 112, in run
raise RuntimeError(
RuntimeError: Deployment psm-grpc-client: has already been started at 2023-05-27T13:47:15.262461
```
This PR:
1. Instead of relying on the `time_start_requested`,
`time_start_stopped` to produce GCP links, tracks the history run of
each deployment. This fixes the issue described above, and adds support
for listing all past runs executed by a k8s runner.
2. Minor: remove the unnecessary call to `test_client.cleanup()` when
there's no past deployment runs (e.g. at the first iteration of `for i
in range(_NUM_CLIENTS):`)
6 files changed, 98 insertions, 25 deletions
diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py index 896894d025..f877656650 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py @@ -16,6 +16,7 @@ Common functionality for running xDS Test Client and Server on Kubernetes. """ from abc import ABCMeta import contextlib +import dataclasses import datetime import logging import pathlib @@ -42,6 +43,14 @@ _datetime = datetime.datetime _timedelta = datetime.timedelta +@dataclasses.dataclass(frozen=True) +class RunHistory: + deployment_id: str + time_start_requested: _datetime + time_start_completed: Optional[_datetime] + time_stopped: _datetime + + class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): # Pylint wants abstract classes to override abstract methods. # pylint: disable=abstract-method @@ -64,12 +73,16 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): namespace_template: str = 'namespace.yaml' reuse_namespace: bool = False - # Mutable state. + # Mutable state. Describes the current run. + namespace: Optional[k8s.V1Namespace] = None deployment: Optional[k8s.V1Deployment] = None + deployment_id: Optional[str] = None service_account: Optional[k8s.V1ServiceAccount] = None time_start_requested: Optional[_datetime] = None time_start_completed: Optional[_datetime] = None time_stopped: Optional[_datetime] = None + # The history of all runs performed by this runner. + run_history: List[RunHistory] def __init__(self, k8s_namespace: k8s.KubernetesNamespace, @@ -98,7 +111,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): self.reuse_namespace = reuse_namespace # Mutable state - self.namespace: Optional[k8s.V1Namespace] = None + self.run_history = [] self.pod_port_forwarders = [] self.pod_log_collectors = [] @@ -107,7 +120,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): def run(self, **kwargs): del kwargs - if self.time_start_requested: + if not self.time_stopped and self.time_start_requested: if self.time_start_completed: raise RuntimeError( f"Deployment {self.deployment_name}: has already been" @@ -117,15 +130,52 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): f"Deployment {self.deployment_name}: start has already been" f" requested at {self.time_start_requested.isoformat()}") + self._reset_state() self.time_start_requested = _datetime.now() - self.logs_explorer_link() + self.logs_explorer_link() if self.reuse_namespace: self.namespace = self._reuse_namespace() if not self.namespace: self.namespace = self._create_namespace( self.namespace_template, namespace_name=self.k8s_namespace.name) + def _start_completed(self): + self.time_start_completed = _datetime.now() + + def _stop(self): + self.time_stopped = _datetime.now() + if self.time_start_requested and self.deployment_id: + run_history = RunHistory( + deployment_id=self.deployment_id, + time_start_requested=self.time_start_requested, + time_start_completed=self.time_start_completed, + time_stopped=self.time_stopped, + ) + self.run_history.append(run_history) + + def _reset_state(self): + """Reset the mutable state of the previous run.""" + if self.pod_port_forwarders: + logger.warning( + "Port forwarders weren't cleaned up from the past run: %s", + len(self.pod_port_forwarders)) + + if self.pod_log_collectors: + logger.warning( + "Pod log collectors weren't cleaned up from the past run: %s", + len(self.pod_log_collectors)) + + self.namespace = None + self.deployment = None + self.deployment_id = None + self.service_account = None + self.time_start_requested = None + self.time_start_completed = None + self.time_stopped = None + self.pod_port_forwarders = [] + self.pod_log_collectors = [] + def _cleanup_namespace(self, *, force=False): if (self.namespace and not self.reuse_namespace) or force: self.delete_namespace() @@ -312,7 +362,10 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): # \" or n, but found 9, error found in #10 byte of ...|ent_id'. # Prepending deployment name forces deployment_id into a string, # as well as it's just a better description. - kwargs['deployment_id'] = f'{kwargs["deployment_name"]}-{rand_id}' + self.deployment_id = f'{kwargs["deployment_name"]}-{rand_id}' + kwargs['deployment_id'] = self.deployment_id + else: + self.deployment_id = kwargs['deployment_id'] deployment = self._create_from_template(template, **kwargs) if not isinstance(deployment, k8s.V1Deployment): @@ -361,6 +414,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): if wait_for_deletion: self.k8s_namespace.wait_for_service_deleted(name) + logger.debug('Service %s deleted', name) def _delete_service_account(self, name, wait_for_deletion=True): @@ -460,17 +514,29 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): neg_zones) def logs_explorer_link(self): - if not self.time_start_requested: - logger.warning( - 'Skipped printing GCP log link for a non-started deployment %s', - self.deployment_name) - return + """Prints GCP Logs Explorer link to all runs of the deployment.""" self._logs_explorer_link(deployment_name=self.deployment_name, namespace_name=self.k8s_namespace.name, gcp_project=self.gcp_project, - gcp_ui_url=self.gcp_ui_url, - start_time=self.time_start_requested, - end_time=self.time_stopped) + gcp_ui_url=self.gcp_ui_url) + + def logs_explorer_run_history_links(self): + """Prints a separate GCP Logs Explorer link for each run *completed* by + the runner. + + This excludes the current run, if it hasn't been completed. + """ + if not self.run_history: + logger.info('No completed deployments of %s', self.deployment_name) + return + for run in self.run_history: + self._logs_explorer_link(deployment_name=self.deployment_name, + namespace_name=self.k8s_namespace.name, + gcp_project=self.gcp_project, + gcp_ui_url=self.gcp_ui_url, + deployment_id=run.deployment_id, + start_time=run.time_start_requested, + end_time=run.time_stopped) @classmethod def _logs_explorer_link(cls, @@ -479,6 +545,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): namespace_name: str, gcp_project: str, gcp_ui_url: str, + deployment_id: Optional[str] = None, start_time: Optional[_datetime] = None, end_time: Optional[_datetime] = None): """Output the link to test server/client logs in GCP Logs Explorer.""" @@ -496,13 +563,16 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): 'resource.labels.container_name': deployment_name, 'resource.labels.namespace_name': namespace_name, } + if deployment_id: + query['labels."k8s-pod/deployment_id"'] = deployment_id link = cls._logs_explorer_link_from_params(gcp_ui_url=gcp_ui_url, gcp_project=gcp_project, query=query, request=request) + link_to = deployment_id if deployment_id else deployment_name # A whitespace at the end to indicate the end of the url. - logger.info("GCP Logs Explorer link to %s:\n%s ", deployment_name, link) + logger.info("GCP Logs Explorer link to %s:\n%s ", link_to, link) @classmethod def _make_namespace_name(cls, resource_prefix: str, resource_suffix: str, diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 5b8cf73ffc..cc97125d1d 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -14,7 +14,6 @@ """ Run xDS Test Client on Kubernetes. """ -import datetime import logging from typing import Optional @@ -151,7 +150,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): # Verify the deployment reports all pods started as well. self._wait_deployment_with_available_replicas(self.deployment_name) - self.time_start_completed = datetime.datetime.now() + self._start_completed() return self._xds_test_client_for_pod(pod, server_target=server_target) @@ -171,6 +170,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): # pylint: disable=arguments-differ def cleanup(self, *, force=False, force_namespace=False): + # TODO(sergiitk): rename to stop(). try: if self.deployment or force: self._delete_deployment(self.deployment_name) @@ -185,7 +185,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): self.service_account = None self._cleanup_namespace(force=force_namespace and force) finally: - self.time_stopped = datetime.datetime.now() + self._stop() # pylint: enable=arguments-differ diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 20d3186c9c..b6de2f85e0 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -14,7 +14,6 @@ """ Run xDS Test Client on Kubernetes. """ -import datetime import logging from typing import List, Optional @@ -197,7 +196,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): # Verify the deployment reports all pods started as well. self._wait_deployment_with_available_replicas(self.deployment_name, replica_count) - self.time_start_completed = datetime.datetime.now() + self._start_completed() servers: List[XdsTestServer] = [] for pod in pods: @@ -239,6 +238,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): # pylint: disable=arguments-differ def cleanup(self, *, force=False, force_namespace=False): + # TODO(sergiitk): rename to stop(). try: if self.deployment or force: self._delete_deployment(self.deployment_name) @@ -256,7 +256,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): self.service_account = None self._cleanup_namespace(force=(force_namespace and force)) finally: - self.time_stopped = datetime.datetime.now() + self._stop() # pylint: enable=arguments-differ diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index bf314fed7c..7b5587f871 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -512,8 +512,8 @@ class IsolatedXdsKubernetesTestCase(XdsKubernetesBaseTestCase, logger.exception('Got error during teardown') finally: logger.info('----- Test client/server logs -----') - self.client_runner.logs_explorer_link() - self.server_runner.logs_explorer_link() + self.client_runner.logs_explorer_run_history_links() + self.server_runner.logs_explorer_run_history_links() # Fail if any of the pods restarted. self.assertEqual( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py index c61f79fb79..c0d575fd22 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py @@ -395,7 +395,7 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): finally: if hasattr(cls, 'test_client_runner') and cls.test_client_runner: logging.info('----- Test client logs -----') - cls.test_client_runner.logs_explorer_link() + cls.test_client_runner.logs_explorer_run_history_links() # Fail if any of the pods restarted. error_msg = ( diff --git a/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py b/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py index abe537323d..51379175b5 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py @@ -70,8 +70,11 @@ class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): rpc_distribution = collections.defaultdict(int) with self.subTest('07_start_test_client'): for i in range(_NUM_CLIENTS): - # Clean created client pods if there is any - self.client_runner.cleanup(force=True) + # Clean created client pods if there is any. + if self.client_runner.time_start_requested: + # TODO(sergiitk): Speed up by reusing the namespace. + self.client_runner.cleanup() + # Create a test client test_client: _XdsTestClient = self.startTestClient( test_servers[0]) |