aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergii Tkachenko <sergiitk@google.com>2023-05-30 22:58:49 -0400
committerGitHub <noreply@github.com>2023-05-30 19:58:49 -0700
commit531a6be335a7fcee4ad35d0b7b79d85106b89331 (patch)
tree5cc6b043ddba006ca4bb569c8ecb8a1a82d71780
parent3225a9f117317661120ddeb6118ba77263068470 (diff)
downloadgrpc-grpc-531a6be335a7fcee4ad35d0b7b79d85106b89331.tar.gz
[PSM Interop] Fix an issue with restarting k8s runner (#33280)
Fixes the issue introduced in https://github.com/grpc/grpc/pull/33104, where stopping the current run didn't reset `self.time_start_requested`, `self.time_start_completed`, `self.time_start_stopped`. Because of this, the subsetting test (the only one [redeploying the client app](https://github.com/grpc/grpc/blob/10001d16a9d4f00c4c7962cb1c310c80d4b9c992/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py#L73C1-L74)) started failing with: ```py Traceback (most recent call last): File "xds_k8s_test_driver/tests/subsetting_test.py", line 76, in test_subsetting_basic test_client: _XdsTestClient = self.startTestClient( File "xds_k8s_test_driver/framework/xds_k8s_testcase.py", line 615, in startTestClient test_client = self.client_runner.run(server_target=test_server.xds_uri, File "xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py", line 110, in run super().run() File "xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py", line 112, in run raise RuntimeError( RuntimeError: Deployment psm-grpc-client: has already been started at 2023-05-27T13:47:15.262461 ``` This PR: 1. Instead of relying on the `time_start_requested`, `time_start_stopped` to produce GCP links, tracks the history run of each deployment. This fixes the issue described above, and adds support for listing all past runs executed by a k8s runner. 2. Minor: remove the unnecessary call to `test_client.cleanup()` when there's no past deployment runs (e.g. at the first iteration of `for i in range(_NUM_CLIENTS):`)
-rw-r--r--tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py98
-rw-r--r--tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py6
-rw-r--r--tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py6
-rw-r--r--tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py4
-rw-r--r--tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py2
-rw-r--r--tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py7
6 files changed, 98 insertions, 25 deletions
diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
index 896894d025..f877656650 100644
--- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
+++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
@@ -16,6 +16,7 @@ Common functionality for running xDS Test Client and Server on Kubernetes.
"""
from abc import ABCMeta
import contextlib
+import dataclasses
import datetime
import logging
import pathlib
@@ -42,6 +43,14 @@ _datetime = datetime.datetime
_timedelta = datetime.timedelta
+@dataclasses.dataclass(frozen=True)
+class RunHistory:
+ deployment_id: str
+ time_start_requested: _datetime
+ time_start_completed: Optional[_datetime]
+ time_stopped: _datetime
+
+
class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
# Pylint wants abstract classes to override abstract methods.
# pylint: disable=abstract-method
@@ -64,12 +73,16 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
namespace_template: str = 'namespace.yaml'
reuse_namespace: bool = False
- # Mutable state.
+ # Mutable state. Describes the current run.
+ namespace: Optional[k8s.V1Namespace] = None
deployment: Optional[k8s.V1Deployment] = None
+ deployment_id: Optional[str] = None
service_account: Optional[k8s.V1ServiceAccount] = None
time_start_requested: Optional[_datetime] = None
time_start_completed: Optional[_datetime] = None
time_stopped: Optional[_datetime] = None
+ # The history of all runs performed by this runner.
+ run_history: List[RunHistory]
def __init__(self,
k8s_namespace: k8s.KubernetesNamespace,
@@ -98,7 +111,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
self.reuse_namespace = reuse_namespace
# Mutable state
- self.namespace: Optional[k8s.V1Namespace] = None
+ self.run_history = []
self.pod_port_forwarders = []
self.pod_log_collectors = []
@@ -107,7 +120,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
def run(self, **kwargs):
del kwargs
- if self.time_start_requested:
+ if not self.time_stopped and self.time_start_requested:
if self.time_start_completed:
raise RuntimeError(
f"Deployment {self.deployment_name}: has already been"
@@ -117,15 +130,52 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
f"Deployment {self.deployment_name}: start has already been"
f" requested at {self.time_start_requested.isoformat()}")
+ self._reset_state()
self.time_start_requested = _datetime.now()
- self.logs_explorer_link()
+ self.logs_explorer_link()
if self.reuse_namespace:
self.namespace = self._reuse_namespace()
if not self.namespace:
self.namespace = self._create_namespace(
self.namespace_template, namespace_name=self.k8s_namespace.name)
+ def _start_completed(self):
+ self.time_start_completed = _datetime.now()
+
+ def _stop(self):
+ self.time_stopped = _datetime.now()
+ if self.time_start_requested and self.deployment_id:
+ run_history = RunHistory(
+ deployment_id=self.deployment_id,
+ time_start_requested=self.time_start_requested,
+ time_start_completed=self.time_start_completed,
+ time_stopped=self.time_stopped,
+ )
+ self.run_history.append(run_history)
+
+ def _reset_state(self):
+ """Reset the mutable state of the previous run."""
+ if self.pod_port_forwarders:
+ logger.warning(
+ "Port forwarders weren't cleaned up from the past run: %s",
+ len(self.pod_port_forwarders))
+
+ if self.pod_log_collectors:
+ logger.warning(
+ "Pod log collectors weren't cleaned up from the past run: %s",
+ len(self.pod_log_collectors))
+
+ self.namespace = None
+ self.deployment = None
+ self.deployment_id = None
+ self.service_account = None
+ self.time_start_requested = None
+ self.time_start_completed = None
+ self.time_stopped = None
+ self.pod_port_forwarders = []
+ self.pod_log_collectors = []
+
def _cleanup_namespace(self, *, force=False):
if (self.namespace and not self.reuse_namespace) or force:
self.delete_namespace()
@@ -312,7 +362,10 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
# \" or n, but found 9, error found in #10 byte of ...|ent_id'.
# Prepending deployment name forces deployment_id into a string,
# as well as it's just a better description.
- kwargs['deployment_id'] = f'{kwargs["deployment_name"]}-{rand_id}'
+ self.deployment_id = f'{kwargs["deployment_name"]}-{rand_id}'
+ kwargs['deployment_id'] = self.deployment_id
+ else:
+ self.deployment_id = kwargs['deployment_id']
deployment = self._create_from_template(template, **kwargs)
if not isinstance(deployment, k8s.V1Deployment):
@@ -361,6 +414,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
if wait_for_deletion:
self.k8s_namespace.wait_for_service_deleted(name)
+
logger.debug('Service %s deleted', name)
def _delete_service_account(self, name, wait_for_deletion=True):
@@ -460,17 +514,29 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
neg_zones)
def logs_explorer_link(self):
- if not self.time_start_requested:
- logger.warning(
- 'Skipped printing GCP log link for a non-started deployment %s',
- self.deployment_name)
- return
+ """Prints GCP Logs Explorer link to all runs of the deployment."""
self._logs_explorer_link(deployment_name=self.deployment_name,
namespace_name=self.k8s_namespace.name,
gcp_project=self.gcp_project,
- gcp_ui_url=self.gcp_ui_url,
- start_time=self.time_start_requested,
- end_time=self.time_stopped)
+ gcp_ui_url=self.gcp_ui_url)
+
+ def logs_explorer_run_history_links(self):
+ """Prints a separate GCP Logs Explorer link for each run *completed* by
+ the runner.
+
+ This excludes the current run, if it hasn't been completed.
+ """
+ if not self.run_history:
+ logger.info('No completed deployments of %s', self.deployment_name)
+ return
+ for run in self.run_history:
+ self._logs_explorer_link(deployment_name=self.deployment_name,
+ namespace_name=self.k8s_namespace.name,
+ gcp_project=self.gcp_project,
+ gcp_ui_url=self.gcp_ui_url,
+ deployment_id=run.deployment_id,
+ start_time=run.time_start_requested,
+ end_time=run.time_stopped)
@classmethod
def _logs_explorer_link(cls,
@@ -479,6 +545,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
namespace_name: str,
gcp_project: str,
gcp_ui_url: str,
+ deployment_id: Optional[str] = None,
start_time: Optional[_datetime] = None,
end_time: Optional[_datetime] = None):
"""Output the link to test server/client logs in GCP Logs Explorer."""
@@ -496,13 +563,16 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
'resource.labels.container_name': deployment_name,
'resource.labels.namespace_name': namespace_name,
}
+ if deployment_id:
+ query['labels."k8s-pod/deployment_id"'] = deployment_id
link = cls._logs_explorer_link_from_params(gcp_ui_url=gcp_ui_url,
gcp_project=gcp_project,
query=query,
request=request)
+ link_to = deployment_id if deployment_id else deployment_name
# A whitespace at the end to indicate the end of the url.
- logger.info("GCP Logs Explorer link to %s:\n%s ", deployment_name, link)
+ logger.info("GCP Logs Explorer link to %s:\n%s ", link_to, link)
@classmethod
def _make_namespace_name(cls, resource_prefix: str, resource_suffix: str,
diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
index 5b8cf73ffc..cc97125d1d 100644
--- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
+++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
@@ -14,7 +14,6 @@
"""
Run xDS Test Client on Kubernetes.
"""
-import datetime
import logging
from typing import Optional
@@ -151,7 +150,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
# Verify the deployment reports all pods started as well.
self._wait_deployment_with_available_replicas(self.deployment_name)
- self.time_start_completed = datetime.datetime.now()
+ self._start_completed()
return self._xds_test_client_for_pod(pod, server_target=server_target)
@@ -171,6 +170,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
# pylint: disable=arguments-differ
def cleanup(self, *, force=False, force_namespace=False):
+ # TODO(sergiitk): rename to stop().
try:
if self.deployment or force:
self._delete_deployment(self.deployment_name)
@@ -185,7 +185,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
self.service_account = None
self._cleanup_namespace(force=force_namespace and force)
finally:
- self.time_stopped = datetime.datetime.now()
+ self._stop()
# pylint: enable=arguments-differ
diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
index 20d3186c9c..b6de2f85e0 100644
--- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
+++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
@@ -14,7 +14,6 @@
"""
Run xDS Test Client on Kubernetes.
"""
-import datetime
import logging
from typing import List, Optional
@@ -197,7 +196,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
# Verify the deployment reports all pods started as well.
self._wait_deployment_with_available_replicas(self.deployment_name,
replica_count)
- self.time_start_completed = datetime.datetime.now()
+ self._start_completed()
servers: List[XdsTestServer] = []
for pod in pods:
@@ -239,6 +238,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
# pylint: disable=arguments-differ
def cleanup(self, *, force=False, force_namespace=False):
+ # TODO(sergiitk): rename to stop().
try:
if self.deployment or force:
self._delete_deployment(self.deployment_name)
@@ -256,7 +256,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
self.service_account = None
self._cleanup_namespace(force=(force_namespace and force))
finally:
- self.time_stopped = datetime.datetime.now()
+ self._stop()
# pylint: enable=arguments-differ
diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
index bf314fed7c..7b5587f871 100644
--- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
+++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
@@ -512,8 +512,8 @@ class IsolatedXdsKubernetesTestCase(XdsKubernetesBaseTestCase,
logger.exception('Got error during teardown')
finally:
logger.info('----- Test client/server logs -----')
- self.client_runner.logs_explorer_link()
- self.server_runner.logs_explorer_link()
+ self.client_runner.logs_explorer_run_history_links()
+ self.server_runner.logs_explorer_run_history_links()
# Fail if any of the pods restarted.
self.assertEqual(
diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
index c61f79fb79..c0d575fd22 100644
--- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
+++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
@@ -395,7 +395,7 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
finally:
if hasattr(cls, 'test_client_runner') and cls.test_client_runner:
logging.info('----- Test client logs -----')
- cls.test_client_runner.logs_explorer_link()
+ cls.test_client_runner.logs_explorer_run_history_links()
# Fail if any of the pods restarted.
error_msg = (
diff --git a/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py b/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py
index abe537323d..51379175b5 100644
--- a/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py
+++ b/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py
@@ -70,8 +70,11 @@ class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
rpc_distribution = collections.defaultdict(int)
with self.subTest('07_start_test_client'):
for i in range(_NUM_CLIENTS):
- # Clean created client pods if there is any
- self.client_runner.cleanup(force=True)
+ # Clean created client pods if there is any.
+ if self.client_runner.time_start_requested:
+ # TODO(sergiitk): Speed up by reusing the namespace.
+ self.client_runner.cleanup()
+
# Create a test client
test_client: _XdsTestClient = self.startTestClient(
test_servers[0])