summaryrefslogtreecommitdiff
path: root/grpc/tools/run_tests/run_xds_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/tools/run_tests/run_xds_tests.py')
-rwxr-xr-xgrpc/tools/run_tests/run_xds_tests.py1164
1 files changed, 1105 insertions, 59 deletions
diff --git a/grpc/tools/run_tests/run_xds_tests.py b/grpc/tools/run_tests/run_xds_tests.py
index 6b397d49..7743df1b 100755
--- a/grpc/tools/run_tests/run_xds_tests.py
+++ b/grpc/tools/run_tests/run_xds_tests.py
@@ -15,6 +15,7 @@
"""Run xDS integration tests on GCP using Traffic Director."""
import argparse
+import datetime
import googleapiclient.discovery
import grpc
import json
@@ -27,8 +28,10 @@ import subprocess
import sys
import tempfile
import time
+import uuid
from oauth2client.client import GoogleCredentials
+from google.protobuf import json_format
import python_utils.jobset as jobset
import python_utils.report_utils as report_utils
@@ -39,6 +42,20 @@ from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
+# Envoy protos provided by PyPI package xds-protos
+# Needs to import the generated Python file to load descriptors
+try:
+ from envoy.service.status.v3 import csds_pb2
+ from envoy.service.status.v3 import csds_pb2_grpc
+ from envoy.extensions.filters.network.http_connection_manager.v3 import http_connection_manager_pb2
+ from envoy.extensions.filters.common.fault.v3 import fault_pb2
+ from envoy.extensions.filters.http.fault.v3 import fault_pb2
+ from envoy.extensions.filters.http.router.v3 import router_pb2
+except ImportError:
+ # These protos are required by CSDS test. We should not fail the entire
+ # script for one test case.
+ pass
+
logger = logging.getLogger()
console_handler = logging.StreamHandler()
formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
@@ -47,27 +64,49 @@ logger.handlers = []
logger.addHandler(console_handler)
logger.setLevel(logging.WARNING)
+# Suppress excessive logs for gRPC Python
+original_grpc_trace = os.environ.pop('GRPC_TRACE', None)
+original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None)
+# Suppress not-essential logs for GCP clients
+logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING)
+logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING)
+
_TEST_CASES = [
'backends_restart',
'change_backend_service',
'gentle_failover',
+ 'load_report_based_failover',
'ping_pong',
'remove_instance_group',
'round_robin',
'secondary_locality_gets_no_requests_on_partial_primary_failure',
'secondary_locality_gets_requests_on_primary_failure',
'traffic_splitting',
+ 'path_matching',
+ 'header_matching',
+ 'forwarding_rule_port_match',
+ 'forwarding_rule_default_port',
+ 'metadata_filter',
]
+
# Valid test cases, but not in all. So the tests can only run manually, and
# aren't enabled automatically for all languages.
#
# TODO: Move them into _TEST_CASES when support is ready in all languages.
_ADDITIONAL_TEST_CASES = [
- 'path_matching',
- 'header_matching',
'circuit_breaking',
+ 'timeout',
+ 'fault_injection',
+ 'csds',
+ 'api_listener', # TODO(b/187352987) Relieve quota pressure
]
+# Test cases that require the V3 API. Skipped in older runs.
+_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds'])
+
+# Test cases that require the alpha API. Skipped for stable API runs.
+_ALPHA_TEST_CASES = frozenset(['timeout'])
+
def parse_test_cases(arg):
if arg == '':
@@ -96,7 +135,11 @@ def parse_port_range(port_arg):
argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
-argp.add_argument('--project_id', help='GCP project id')
+# TODO(zdapeng): remove default value of project_id and project_num
+argp.add_argument('--project_id', default='grpc-testing', help='GCP project id')
+argp.add_argument('--project_num',
+ default='830293263384',
+ help='GCP project number')
argp.add_argument(
'--gcp_suffix',
default='',
@@ -175,8 +218,9 @@ argp.add_argument(
argp.add_argument('--network',
default='global/networks/default',
help='GCP network to use')
+_DEFAULT_PORT_RANGE = '8080:8280'
argp.add_argument('--service_port_range',
- default='8080:8110',
+ default=_DEFAULT_PORT_RANGE,
type=parse_port_range,
help='Listening port for created gRPC backends. Specified as '
'either a single int or as a range in the format min:max, in '
@@ -235,14 +279,23 @@ CLIENT_HOSTS = []
if args.client_hosts:
CLIENT_HOSTS = args.client_hosts.split(',')
+# Each of the config propagation in the control plane should finish within 600s.
+# Otherwise, it indicates a bug in the control plane. The config propagation
+# includes all kinds of traffic config update, like updating urlMap, creating
+# the resources for the first time, updating BackendService, and changing the
+# status of endpoints in BackendService.
+_WAIT_FOR_URL_MAP_PATCH_SEC = 600
+# In general, fetching load balancing stats only takes ~10s. However, slow
+# config update could lead to empty EDS or similar symptoms causing the
+# connection to hang for a long period of time. So, we want to extend the stats
+# wait time to be the same as urlMap patch time.
+_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC
+
_DEFAULT_SERVICE_PORT = 80
_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
_WAIT_FOR_OPERATION_SEC = 1200
_INSTANCE_GROUP_SIZE = args.instance_group_size
_NUM_TEST_RPCS = 10 * args.qps
-_WAIT_FOR_STATS_SEC = 360
-_WAIT_FOR_VALID_CONFIG_SEC = 60
-_WAIT_FOR_URL_MAP_PATCH_SEC = 300
_CONNECTION_TIMEOUT_SEC = 60
_GCP_API_RETRIES = 5
_BOOTSTRAP_TEMPLATE = """
@@ -250,7 +303,8 @@ _BOOTSTRAP_TEMPLATE = """
"node": {{
"id": "{node_id}",
"metadata": {{
- "TRAFFICDIRECTOR_NETWORK_NAME": "%s"
+ "TRAFFICDIRECTOR_NETWORK_NAME": "%s",
+ "com.googleapis.trafficdirector.config_time_trace": "TRUE"
}},
"locality": {{
"zone": "%s"
@@ -279,6 +333,9 @@ _TESTS_TO_SEND_METADATA = ['header_matching']
_TEST_METADATA_KEY = 'xds_md'
_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
+# Extra RPC metadata whose value is a number, sent with UnaryCall only.
+_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
+_TEST_METADATA_NUMERIC_VALUE = '159'
_PATH_MATCHER_NAME = 'path-matcher'
_BASE_TEMPLATE_NAME = 'test-template'
_BASE_INSTANCE_GROUP_NAME = 'test-ig'
@@ -313,7 +370,8 @@ def get_client_stats(num_rpcs, timeout_sec):
response = stub.GetClientStats(request,
wait_for_ready=True,
timeout=rpc_timeout)
- logger.debug('Invoked GetClientStats RPC to %s: %s', host, response)
+ logger.debug('Invoked GetClientStats RPC to %s: %s', host,
+ json_format.MessageToJson(response))
return response
@@ -336,7 +394,33 @@ def get_client_accumulated_stats():
return response
-def configure_client(rpc_types, metadata):
+def get_client_xds_config_dump():
+ if CLIENT_HOSTS:
+ hosts = CLIENT_HOSTS
+ else:
+ hosts = ['localhost']
+ for host in hosts:
+ server_address = '%s:%d' % (host, args.stats_port)
+ with grpc.insecure_channel(server_address) as channel:
+ stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
+ logger.debug('Fetching xDS config dump from %s', server_address)
+ response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(),
+ wait_for_ready=True,
+ timeout=_CONNECTION_TIMEOUT_SEC)
+ logger.debug('Fetched xDS config dump from %s', server_address)
+ if len(response.config) != 1:
+ logger.error('Unexpected number of ClientConfigs %d: %s',
+ len(response.config), response)
+ return None
+ else:
+ # Converting the ClientStatusResponse into JSON, because many
+ # fields are packed in google.protobuf.Any. It will require many
+ # duplicated code to unpack proto message and inspect values.
+ return json_format.MessageToDict(
+ response.config[0], preserving_proto_field_name=True)
+
+
+def configure_client(rpc_types, metadata=[], timeout_sec=None):
if CLIENT_HOSTS:
hosts = CLIENT_HOSTS
else:
@@ -352,6 +436,8 @@ def configure_client(rpc_types, metadata):
md.type = rpc_type
md.key = md_key
md.value = md_value
+ if timeout_sec:
+ request.timeout_sec = timeout_sec
logger.debug(
'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
host, args.stats_port, request)
@@ -407,6 +493,21 @@ def wait_until_all_rpcs_go_to_given_backends(backends,
allow_failures=False)
+def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec):
+ start_time = time.time()
+ while time.time() - start_time <= timeout_sec:
+ stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec)
+ error_msg = None
+ rpcs_by_peer = stats.rpcs_by_peer
+ for backend in backends:
+ if backend in rpcs_by_peer:
+ error_msg = 'Unexpected backend %s receives load' % backend
+ break
+ if not error_msg:
+ return
+ raise Exception('Unexpected RPCs going to given backends')
+
+
def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
'''Block until the test client reaches the state with the given number
of RPCs being outstanding stably.
@@ -619,6 +720,56 @@ def test_gentle_failover(gcp,
_WAIT_FOR_BACKEND_SEC)
+def test_load_report_based_failover(gcp, backend_service,
+ primary_instance_group,
+ secondary_instance_group):
+ logger.info('Running test_load_report_based_failover')
+ try:
+ patch_backend_service(
+ gcp, backend_service,
+ [primary_instance_group, secondary_instance_group])
+ primary_instance_names = get_instance_names(gcp, primary_instance_group)
+ secondary_instance_names = get_instance_names(gcp,
+ secondary_instance_group)
+ wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
+ wait_for_healthy_backends(gcp, backend_service,
+ secondary_instance_group)
+ wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
+ _WAIT_FOR_STATS_SEC)
+ # Set primary locality's balance mode to RATE, and RPS to 20% of the
+ # client's QPS. The secondary locality will be used.
+ max_rate = int(args.qps * 1 / 5)
+ logger.info('Patching backend service to RATE with %d max_rate',
+ max_rate)
+ patch_backend_service(
+ gcp,
+ backend_service, [primary_instance_group, secondary_instance_group],
+ balancing_mode='RATE',
+ max_rate=max_rate)
+ wait_until_all_rpcs_go_to_given_backends(
+ primary_instance_names + secondary_instance_names,
+ _WAIT_FOR_BACKEND_SEC)
+
+ # Set primary locality's balance mode to RATE, and RPS to 120% of the
+ # client's QPS. Only the primary locality will be used.
+ max_rate = int(args.qps * 6 / 5)
+ logger.info('Patching backend service to RATE with %d max_rate',
+ max_rate)
+ patch_backend_service(
+ gcp,
+ backend_service, [primary_instance_group, secondary_instance_group],
+ balancing_mode='RATE',
+ max_rate=max_rate)
+ wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
+ _WAIT_FOR_BACKEND_SEC)
+ logger.info("success")
+ finally:
+ patch_backend_service(gcp, backend_service, [primary_instance_group])
+ instance_names = get_instance_names(gcp, primary_instance_group)
+ wait_until_all_rpcs_go_to_given_backends(instance_names,
+ _WAIT_FOR_BACKEND_SEC)
+
+
def test_ping_pong(gcp, backend_service, instance_group):
logger.info('Running test_ping_pong')
wait_for_healthy_backends(gcp, backend_service, instance_group)
@@ -829,6 +980,303 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service,
return original_backend_instances, alternate_backend_instances
+def test_metadata_filter(gcp, original_backend_service, instance_group,
+ alternate_backend_service, same_zone_instance_group):
+ logger.info("Running test_metadata_filter")
+ wait_for_healthy_backends(gcp, original_backend_service, instance_group)
+ original_backend_instances = get_instance_names(gcp, instance_group)
+ alternate_backend_instances = get_instance_names(gcp,
+ same_zone_instance_group)
+ patch_backend_service(gcp, alternate_backend_service,
+ [same_zone_instance_group])
+ wait_for_healthy_backends(gcp, alternate_backend_service,
+ same_zone_instance_group)
+ try:
+ with open(bootstrap_path) as f:
+ md = json.load(f)['node']['metadata']
+ match_labels = []
+ for k, v in md.items():
+ match_labels.append({'name': k, 'value': v})
+
+ not_match_labels = [{'name': 'fake', 'value': 'fail'}]
+ test_route_rules = [
+ # test MATCH_ALL
+ [
+ {
+ 'priority': 0,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ALL',
+ 'filterLabels': not_match_labels
+ }]
+ }],
+ 'service': original_backend_service.url
+ },
+ {
+ 'priority': 1,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ALL',
+ 'filterLabels': match_labels
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ },
+ ],
+ # test mixing MATCH_ALL and MATCH_ANY
+ # test MATCH_ALL: super set labels won't match
+ [
+ {
+ 'priority': 0,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ALL',
+ 'filterLabels': not_match_labels + match_labels
+ }]
+ }],
+ 'service': original_backend_service.url
+ },
+ {
+ 'priority': 1,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ANY',
+ 'filterLabels': not_match_labels + match_labels
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ },
+ ],
+ # test MATCH_ANY
+ [
+ {
+ 'priority': 0,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ANY',
+ 'filterLabels': not_match_labels
+ }]
+ }],
+ 'service': original_backend_service.url
+ },
+ {
+ 'priority': 1,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ANY',
+ 'filterLabels': not_match_labels + match_labels
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ },
+ ],
+ # test match multiple route rules
+ [
+ {
+ 'priority': 0,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ANY',
+ 'filterLabels': match_labels
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ },
+ {
+ 'priority': 1,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'metadataFilters': [{
+ 'filterMatchCriteria': 'MATCH_ALL',
+ 'filterLabels': match_labels
+ }]
+ }],
+ 'service': original_backend_service.url
+ },
+ ]
+ ]
+
+ for route_rules in test_route_rules:
+ wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ patch_url_map_backend_service(gcp,
+ original_backend_service,
+ route_rules=route_rules)
+ wait_until_no_rpcs_go_to_given_backends(original_backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ wait_until_all_rpcs_go_to_given_backends(
+ alternate_backend_instances, _WAIT_FOR_STATS_SEC)
+ patch_url_map_backend_service(gcp, original_backend_service)
+ finally:
+ patch_backend_service(gcp, alternate_backend_service, [])
+
+
+def test_api_listener(gcp, backend_service, instance_group,
+ alternate_backend_service):
+ logger.info("Running api_listener")
+ try:
+ wait_for_healthy_backends(gcp, backend_service, instance_group)
+ backend_instances = get_instance_names(gcp, instance_group)
+ wait_until_all_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ # create a second suite of map+tp+fr with the same host name in host rule
+ # and we have to disable proxyless validation because it needs `0.0.0.0`
+ # ip address in fr for proxyless and also we violate ip:port uniqueness
+ # for test purpose. See https://github.com/grpc/grpc-java/issues/8009
+ new_config_suffix = '2'
+ create_url_map(gcp, url_map_name + new_config_suffix, backend_service,
+ service_host_name)
+ create_target_proxy(gcp, target_proxy_name + new_config_suffix, False)
+ if not gcp.service_port:
+ raise Exception(
+ 'Faied to find a valid port for the forwarding rule')
+ potential_ip_addresses = []
+ max_attempts = 10
+ for i in range(max_attempts):
+ potential_ip_addresses.append('10.10.10.%d' %
+ (random.randint(0, 255)))
+ create_global_forwarding_rule(gcp,
+ forwarding_rule_name + new_config_suffix,
+ [gcp.service_port],
+ potential_ip_addresses)
+ if gcp.service_port != _DEFAULT_SERVICE_PORT:
+ patch_url_map_host_rule_with_port(gcp,
+ url_map_name + new_config_suffix,
+ backend_service,
+ service_host_name)
+ wait_until_all_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+
+ delete_global_forwarding_rule(gcp, forwarding_rule_name)
+ delete_target_proxy(gcp, target_proxy_name)
+ delete_url_map(gcp, url_map_name)
+ verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS *
+ args.qps)
+ for i in range(verify_attempts):
+ wait_until_all_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ # delete host rule for the original host name
+ patch_url_map_backend_service(gcp, alternate_backend_service)
+ wait_until_no_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+
+ finally:
+ delete_global_forwarding_rule(gcp,
+ forwarding_rule_name + new_config_suffix)
+ delete_target_proxy(gcp, target_proxy_name + new_config_suffix)
+ delete_url_map(gcp, url_map_name + new_config_suffix)
+ create_url_map(gcp, url_map_name, backend_service, service_host_name)
+ create_target_proxy(gcp, target_proxy_name)
+ create_global_forwarding_rule(gcp, forwarding_rule_name,
+ potential_service_ports)
+ if gcp.service_port != _DEFAULT_SERVICE_PORT:
+ patch_url_map_host_rule_with_port(gcp, url_map_name,
+ backend_service,
+ service_host_name)
+ server_uri = service_host_name + ':' + str(gcp.service_port)
+ else:
+ server_uri = service_host_name
+ return server_uri
+
+
+def test_forwarding_rule_port_match(gcp, backend_service, instance_group):
+ logger.info("Running test_forwarding_rule_port_match")
+ try:
+ wait_for_healthy_backends(gcp, backend_service, instance_group)
+ backend_instances = get_instance_names(gcp, instance_group)
+ wait_until_all_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ delete_global_forwarding_rule(gcp)
+ create_global_forwarding_rule(gcp, forwarding_rule_name, [
+ x for x in parse_port_range(_DEFAULT_PORT_RANGE)
+ if x != gcp.service_port
+ ])
+ wait_until_no_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ finally:
+ delete_global_forwarding_rule(gcp)
+ create_global_forwarding_rule(gcp, forwarding_rule_name,
+ potential_service_ports)
+ if gcp.service_port != _DEFAULT_SERVICE_PORT:
+ patch_url_map_host_rule_with_port(gcp, url_map_name,
+ backend_service,
+ service_host_name)
+ server_uri = service_host_name + ':' + str(gcp.service_port)
+ else:
+ server_uri = service_host_name
+ return server_uri
+
+
+def test_forwarding_rule_default_port(gcp, backend_service, instance_group):
+ logger.info("Running test_forwarding_rule_default_port")
+ try:
+ wait_for_healthy_backends(gcp, backend_service, instance_group)
+ backend_instances = get_instance_names(gcp, instance_group)
+ if gcp.service_port == _DEFAULT_SERVICE_PORT:
+ wait_until_all_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ delete_global_forwarding_rule(gcp)
+ create_global_forwarding_rule(gcp, forwarding_rule_name,
+ parse_port_range(_DEFAULT_PORT_RANGE))
+ patch_url_map_host_rule_with_port(gcp, url_map_name,
+ backend_service,
+ service_host_name)
+ wait_until_no_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ # expect success when no port in client request service uri, and no port in url-map
+ delete_global_forwarding_rule(gcp)
+ delete_target_proxy(gcp)
+ delete_url_map(gcp)
+ create_url_map(gcp, url_map_name, backend_service, service_host_name)
+ create_target_proxy(gcp, gcp.target_proxy.name, False)
+ potential_ip_addresses = []
+ max_attempts = 10
+ for i in range(max_attempts):
+ potential_ip_addresses.append('10.10.10.%d' %
+ (random.randint(0, 255)))
+ create_global_forwarding_rule(gcp, forwarding_rule_name, [80],
+ potential_ip_addresses)
+ wait_until_all_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+
+ # expect failure when no port in client request uri, but specify port in url-map
+ patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service,
+ service_host_name)
+ wait_until_no_rpcs_go_to_given_backends(backend_instances,
+ _WAIT_FOR_STATS_SEC)
+ finally:
+ delete_global_forwarding_rule(gcp)
+ delete_target_proxy(gcp)
+ delete_url_map(gcp)
+ create_url_map(gcp, url_map_name, backend_service, service_host_name)
+ create_target_proxy(gcp, target_proxy_name)
+ create_global_forwarding_rule(gcp, forwarding_rule_name,
+ potential_service_ports)
+ if gcp.service_port != _DEFAULT_SERVICE_PORT:
+ patch_url_map_host_rule_with_port(gcp, url_map_name,
+ backend_service,
+ service_host_name)
+ server_uri = service_host_name + ':' + str(gcp.service_port)
+ else:
+ server_uri = service_host_name
+ return server_uri
+
+
def test_traffic_splitting(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic going to original_backend_service. Then
@@ -974,7 +1422,36 @@ def test_path_matching(gcp, original_backend_service, instance_group,
{
"UnaryCall": original_backend_instances,
"EmptyCall": alternate_backend_instances
- })
+ }),
+ (
+ [{
+ 'priority': 0,
+ # Regex UnaryCall -> alternate_backend_service.
+ 'matchRules': [{
+ 'regexMatch':
+ '^\/.*\/UnaryCall$' # Unary methods with any services.
+ }],
+ 'service': alternate_backend_service.url
+ }],
+ {
+ "UnaryCall": alternate_backend_instances,
+ "EmptyCall": original_backend_instances
+ }),
+ (
+ [{
+ 'priority': 0,
+ # ignoreCase EmptyCall -> alternate_backend_service.
+ 'matchRules': [{
+ # Case insensitive matching.
+ 'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
+ 'ignoreCase': True,
+ }],
+ 'service': alternate_backend_service.url
+ }],
+ {
+ "UnaryCall": original_backend_instances,
+ "EmptyCall": alternate_backend_instances
+ }),
]
for (route_rules, expected_instances) in test_cases:
@@ -991,8 +1468,8 @@ def test_path_matching(gcp, original_backend_service, instance_group,
original_backend_instances + alternate_backend_instances,
_WAIT_FOR_STATS_SEC)
- retry_count = 20
- # Each attempt takes about 10 seconds, 20 retries is equivalent to 200
+ retry_count = 80
+ # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
# seconds timeout.
for i in range(retry_count):
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
@@ -1004,6 +1481,10 @@ def test_path_matching(gcp, original_backend_service, instance_group,
if compare_expected_instances(stats, expected_instances):
logger.info("success")
break
+ elif i == retry_count - 1:
+ raise Exception(
+ 'timeout waiting for RPCs to the expected instances: %s'
+ % expected_instances)
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_service(gcp, alternate_backend_service, [])
@@ -1087,23 +1568,87 @@ def test_header_matching(gcp, original_backend_service, instance_group,
(
[{
'priority': 0,
+ # Header 'xds_md_numeric' present -> alternate_backend_service.
+ # UnaryCall is sent with the metadata, so will be sent to alternative.
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'headerMatches': [{
+ 'headerName': _TEST_METADATA_NUMERIC_KEY,
+ 'presentMatch': True
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ }],
+ {
+ "EmptyCall": original_backend_instances,
+ "UnaryCall": alternate_backend_instances
+ }),
+ (
+ [{
+ 'priority': 0,
# Header invert ExactMatch -> alternate_backend_service.
- # EmptyCall is sent with the metadata, so will be sent to original.
+ # UnaryCall is sent with the metadata, so will be sent to
+ # original. EmptyCall will be sent to alternative.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
- 'exactMatch': _TEST_METADATA_VALUE_EMPTY,
+ 'exactMatch': _TEST_METADATA_VALUE_UNARY,
'invertMatch': True
}]
}],
'service': alternate_backend_service.url
}],
{
+ "EmptyCall": alternate_backend_instances,
+ "UnaryCall": original_backend_instances
+ }),
+ (
+ [{
+ 'priority': 0,
+ # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
+ # UnaryCall is sent with the metadata in range.
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'headerMatches': [{
+ 'headerName': _TEST_METADATA_NUMERIC_KEY,
+ 'rangeMatch': {
+ 'rangeStart': '100',
+ 'rangeEnd': '200'
+ }
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ }],
+ {
"EmptyCall": original_backend_instances,
"UnaryCall": alternate_backend_instances
}),
+ (
+ [{
+ 'priority': 0,
+ # Header RegexMatch -> alternate_backend_service.
+ # EmptyCall is sent with the metadata.
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'headerMatches': [{
+ 'headerName':
+ _TEST_METADATA_KEY,
+ 'regexMatch':
+ "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
+ _TEST_METADATA_VALUE_EMPTY[-2:])
+ }]
+ }],
+ 'service': alternate_backend_service.url
+ }],
+ {
+ "EmptyCall": alternate_backend_instances,
+ "UnaryCall": original_backend_instances
+ }),
]
for (route_rules, expected_instances) in test_cases:
@@ -1121,8 +1666,8 @@ def test_header_matching(gcp, original_backend_service, instance_group,
original_backend_instances + alternate_backend_instances,
_WAIT_FOR_STATS_SEC)
- retry_count = 20
- # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
+ retry_count = 80
+ # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
# seconds timeout.
for i in range(retry_count):
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
@@ -1134,6 +1679,10 @@ def test_header_matching(gcp, original_backend_service, instance_group,
if compare_expected_instances(stats, expected_instances):
logger.info("success")
break
+ elif i == retry_count - 1:
+ raise Exception(
+ 'timeout waiting for RPCs to the expected instances: %s'
+ % expected_instances)
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_service(gcp, alternate_backend_service, [])
@@ -1235,7 +1784,7 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group,
configure_client([
messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
- ], [])
+ ])
logger.info('Patching url map with %s', route_rules)
patch_url_map_backend_service(gcp,
extra_backend_service,
@@ -1285,7 +1834,7 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group,
# Avoid new RPCs being outstanding (some test clients create threads
# for sending RPCs) after restoring backend services.
configure_client(
- [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [])
+ [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL])
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_service(gcp, original_backend_service, [instance_group])
@@ -1294,6 +1843,426 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group,
set_validate_for_proxyless(gcp, True)
+def test_timeout(gcp, original_backend_service, instance_group):
+ logger.info('Running test_timeout')
+
+ logger.info('waiting for original backends to become healthy')
+ wait_for_healthy_backends(gcp, original_backend_service, instance_group)
+
+ # UnaryCall -> maxStreamDuration:3s
+ route_rules = [{
+ 'priority': 0,
+ 'matchRules': [{
+ 'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
+ }],
+ 'service': original_backend_service.url,
+ 'routeAction': {
+ 'maxStreamDuration': {
+ 'seconds': 3,
+ },
+ },
+ }]
+ patch_url_map_backend_service(gcp,
+ original_backend_service,
+ route_rules=route_rules)
+ # A list of tuples (testcase_name, {client_config}, {expected_results})
+ test_cases = [
+ (
+ 'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)',
+ # UnaryCall and EmptyCall both sleep-4.
+ # UnaryCall timeouts, EmptyCall succeeds.
+ {
+ 'rpc_types': [
+ messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
+ ],
+ 'metadata': [
+ (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ 'rpc-behavior', 'sleep-4'),
+ (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
+ 'rpc-behavior', 'sleep-4'),
+ ],
+ },
+ {
+ 'UNARY_CALL': 4, # DEADLINE_EXCEEDED
+ 'EMPTY_CALL': 0,
+ },
+ ),
+ (
+ 'app_timeout_exceeded',
+ # UnaryCall only with sleep-2; timeout=1s; calls timeout.
+ {
+ 'rpc_types': [
+ messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ ],
+ 'metadata': [
+ (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ 'rpc-behavior', 'sleep-2'),
+ ],
+ 'timeout_sec': 1,
+ },
+ {
+ 'UNARY_CALL': 4, # DEADLINE_EXCEEDED
+ },
+ ),
+ (
+ 'timeout_not_exceeded',
+ # UnaryCall only with no sleep; calls succeed.
+ {
+ 'rpc_types': [
+ messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ ],
+ },
+ {
+ 'UNARY_CALL': 0,
+ },
+ )
+ ]
+
+ try:
+ first_case = True
+ for (testcase_name, client_config, expected_results) in test_cases:
+ logger.info('starting case %s', testcase_name)
+ configure_client(**client_config)
+ # wait a second to help ensure the client stops sending RPCs with
+ # the old config. We will make multiple attempts if it is failing,
+ # but this improves confidence that the test is valid if the
+ # previous client_config would lead to the same results.
+ time.sleep(1)
+ # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
+ # second timeout.
+ attempt_count = 20
+ if first_case:
+ attempt_count = 120
+ first_case = False
+ before_stats = get_client_accumulated_stats()
+ if not before_stats.stats_per_method:
+ raise ValueError(
+ 'stats.stats_per_method is None, the interop client stats service does not support this test case'
+ )
+ for i in range(attempt_count):
+ logger.info('%s: attempt %d', testcase_name, i)
+
+ test_runtime_secs = 10
+ time.sleep(test_runtime_secs)
+ after_stats = get_client_accumulated_stats()
+
+ success = True
+ for rpc, status in expected_results.items():
+ qty = (after_stats.stats_per_method[rpc].result[status] -
+ before_stats.stats_per_method[rpc].result[status])
+ want = test_runtime_secs * args.qps
+ # Allow 10% deviation from expectation to reduce flakiness
+ if qty < (want * .9) or qty > (want * 1.1):
+ logger.info('%s: failed due to %s[%s]: got %d want ~%d',
+ testcase_name, rpc, status, qty, want)
+ success = False
+ if success:
+ logger.info('success')
+ break
+ logger.info('%s attempt %d failed', testcase_name, i)
+ before_stats = after_stats
+ else:
+ raise Exception(
+ '%s: timeout waiting for expected results: %s; got %s' %
+ (testcase_name, expected_results,
+ after_stats.stats_per_method))
+ finally:
+ patch_url_map_backend_service(gcp, original_backend_service)
+
+
+def test_fault_injection(gcp, original_backend_service, instance_group):
+ logger.info('Running test_fault_injection')
+
+ logger.info('waiting for original backends to become healthy')
+ wait_for_healthy_backends(gcp, original_backend_service, instance_group)
+
+ testcase_header = 'fi_testcase'
+
+ def _route(pri, name, fi_policy):
+ return {
+ 'priority': pri,
+ 'matchRules': [{
+ 'prefixMatch':
+ '/',
+ 'headerMatches': [{
+ 'headerName': testcase_header,
+ 'exactMatch': name,
+ }],
+ }],
+ 'service': original_backend_service.url,
+ 'routeAction': {
+ 'faultInjectionPolicy': fi_policy
+ },
+ }
+
+ def _abort(pct):
+ return {
+ 'abort': {
+ 'httpStatus': 401,
+ 'percentage': pct,
+ }
+ }
+
+ def _delay(pct):
+ return {
+ 'delay': {
+ 'fixedDelay': {
+ 'seconds': '20'
+ },
+ 'percentage': pct,
+ }
+ }
+
+ zero_route = _abort(0)
+ zero_route.update(_delay(0))
+ route_rules = [
+ _route(0, 'zero_percent_fault_injection', zero_route),
+ _route(1, 'always_delay', _delay(100)),
+ _route(2, 'always_abort', _abort(100)),
+ _route(3, 'delay_half', _delay(50)),
+ _route(4, 'abort_half', _abort(50)),
+ {
+ 'priority': 5,
+ 'matchRules': [{
+ 'prefixMatch': '/'
+ }],
+ 'service': original_backend_service.url,
+ },
+ ]
+ set_validate_for_proxyless(gcp, False)
+ patch_url_map_backend_service(gcp,
+ original_backend_service,
+ route_rules=route_rules)
+ # A list of tuples (testcase_name, {client_config}, {code: percent}). Each
+ # test case will set the testcase_header with the testcase_name for routing
+ # to the appropriate config for the case, defined above.
+ test_cases = [
+ (
+ 'zero_percent_fault_injection',
+ {},
+ {
+ 0: 1
+ }, # OK
+ ),
+ (
+ 'non_matching_fault_injection', # Not in route_rules, above.
+ {},
+ {
+ 0: 1
+ }, # OK
+ ),
+ (
+ 'always_delay',
+ {
+ 'timeout_sec': 2
+ },
+ {
+ 4: 1
+ }, # DEADLINE_EXCEEDED
+ ),
+ (
+ 'always_abort',
+ {},
+ {
+ 16: 1
+ }, # UNAUTHENTICATED
+ ),
+ (
+ 'delay_half',
+ {
+ 'timeout_sec': 2
+ },
+ {
+ 4: .5,
+ 0: .5
+ }, # DEADLINE_EXCEEDED / OK: 50% / 50%
+ ),
+ (
+ 'abort_half',
+ {},
+ {
+ 16: .5,
+ 0: .5
+ }, # UNAUTHENTICATED / OK: 50% / 50%
+ )
+ ]
+
+ try:
+ first_case = True
+ for (testcase_name, client_config, expected_results) in test_cases:
+ logger.info('starting case %s', testcase_name)
+
+ client_config['metadata'] = [
+ (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ testcase_header, testcase_name)
+ ]
+ client_config['rpc_types'] = [
+ messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+ ]
+ configure_client(**client_config)
+ # wait a second to help ensure the client stops sending RPCs with
+ # the old config. We will make multiple attempts if it is failing,
+ # but this improves confidence that the test is valid if the
+ # previous client_config would lead to the same results.
+ time.sleep(1)
+ # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
+ # second timeout.
+ attempt_count = 20
+ if first_case:
+ attempt_count = 120
+ first_case = False
+ before_stats = get_client_accumulated_stats()
+ if not before_stats.stats_per_method:
+ raise ValueError(
+ 'stats.stats_per_method is None, the interop client stats service does not support this test case'
+ )
+ for i in range(attempt_count):
+ logger.info('%s: attempt %d', testcase_name, i)
+
+ test_runtime_secs = 10
+ time.sleep(test_runtime_secs)
+ after_stats = get_client_accumulated_stats()
+
+ success = True
+ for status, pct in expected_results.items():
+ rpc = 'UNARY_CALL'
+ qty = (after_stats.stats_per_method[rpc].result[status] -
+ before_stats.stats_per_method[rpc].result[status])
+ want = pct * args.qps * test_runtime_secs
+ # Allow 10% deviation from expectation to reduce flakiness
+ VARIANCE_ALLOWED = 0.1
+ if abs(qty - want) > want * VARIANCE_ALLOWED:
+ logger.info('%s: failed due to %s[%s]: got %d want ~%d',
+ testcase_name, rpc, status, qty, want)
+ success = False
+ if success:
+ logger.info('success')
+ break
+ logger.info('%s attempt %d failed', testcase_name, i)
+ before_stats = after_stats
+ else:
+ raise Exception(
+ '%s: timeout waiting for expected results: %s; got %s' %
+ (testcase_name, expected_results,
+ after_stats.stats_per_method))
+ finally:
+ patch_url_map_backend_service(gcp, original_backend_service)
+ set_validate_for_proxyless(gcp, True)
+
+
+def test_csds(gcp, original_backend_service, instance_group, server_uri):
+ test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
+ sleep_interval_between_attempts_s = datetime.timedelta(
+ seconds=2).total_seconds()
+ logger.info('Running test_csds')
+
+ logger.info('waiting for original backends to become healthy')
+ wait_for_healthy_backends(gcp, original_backend_service, instance_group)
+
+ # Test case timeout: 5 minutes
+ deadline = time.time() + test_csds_timeout_s
+ cnt = 0
+ while time.time() <= deadline:
+ client_config = get_client_xds_config_dump()
+ logger.info('test_csds attempt %d: received xDS config %s', cnt,
+ json.dumps(client_config, indent=2))
+ if client_config is not None:
+ # Got the xDS config dump, now validate it
+ ok = True
+ try:
+ if client_config['node']['locality']['zone'] != args.zone:
+ logger.info('Invalid zone %s != %s',
+ client_config['node']['locality']['zone'],
+ args.zone)
+ ok = False
+ seen = set()
+ for xds_config in client_config['xds_config']:
+ if 'listener_config' in xds_config:
+ listener_name = xds_config['listener_config'][
+ 'dynamic_listeners'][0]['active_state']['listener'][
+ 'name']
+ if listener_name != server_uri:
+ logger.info('Invalid Listener name %s != %s',
+ listener_name, server_uri)
+ ok = False
+ else:
+ seen.add('lds')
+ elif 'route_config' in xds_config:
+ num_vh = len(
+ xds_config['route_config']['dynamic_route_configs']
+ [0]['route_config']['virtual_hosts'])
+ if num_vh <= 0:
+ logger.info('Invalid number of VirtualHosts %s',
+ num_vh)
+ ok = False
+ else:
+ seen.add('rds')
+ elif 'cluster_config' in xds_config:
+ cluster_type = xds_config['cluster_config'][
+ 'dynamic_active_clusters'][0]['cluster']['type']
+ if cluster_type != 'EDS':
+ logger.info('Invalid cluster type %s != EDS',
+ cluster_type)
+ ok = False
+ else:
+ seen.add('cds')
+ elif 'endpoint_config' in xds_config:
+ sub_zone = xds_config["endpoint_config"][
+ "dynamic_endpoint_configs"][0]["endpoint_config"][
+ "endpoints"][0]["locality"]["sub_zone"]
+ if args.zone not in sub_zone:
+ logger.info('Invalid endpoint sub_zone %s',
+ sub_zone)
+ ok = False
+ else:
+ seen.add('eds')
+ want = {'lds', 'rds', 'cds', 'eds'}
+ if seen != want:
+ logger.info('Incomplete xDS config dump, seen=%s', seen)
+ ok = False
+ except:
+ logger.exception('Error in xDS config dump:')
+ ok = False
+ finally:
+ if ok:
+ # Successfully fetched xDS config, and they looks good.
+ logger.info('success')
+ return
+ logger.info('test_csds attempt %d failed', cnt)
+ # Give the client some time to fetch xDS resources
+ time.sleep(sleep_interval_between_attempts_s)
+ cnt += 1
+
+ raise RuntimeError('failed to receive a valid xDS config in %s seconds' %
+ test_csds_timeout_s)
+
+
+def maybe_write_sponge_properties():
+ """Writing test infos to enable more advanced testgrid searches."""
+ if 'KOKORO_ARTIFACTS_DIR' not in os.environ:
+ return
+ if 'GIT_ORIGIN_URL' not in os.environ:
+ return
+ if 'GIT_COMMIT_SHORT' not in os.environ:
+ return
+ properties = [
+ # Technically, 'TESTS_FORMAT_VERSION' is not required for run_xds_tests.
+ # We keep it here so one day we may merge the process of writing sponge
+ # properties.
+ 'TESTS_FORMAT_VERSION,2',
+ 'TESTGRID_EXCLUDE,%s' % os.environ.get('TESTGRID_EXCLUDE', 0),
+ 'GIT_ORIGIN_URL,%s' % os.environ['GIT_ORIGIN_URL'],
+ 'GIT_COMMIT_SHORT,%s' % os.environ['GIT_COMMIT_SHORT'],
+ ]
+ logger.info('Writing Sponge configs: %s', properties)
+ with open(
+ os.path.join(os.environ['KOKORO_ARTIFACTS_DIR'],
+ "custom_sponge_config.csv"), 'w') as f:
+ f.write("\n".join(properties))
+ f.write("\n")
+
+
def set_validate_for_proxyless(gcp, validate_for_proxyless):
if not gcp.alpha_compute:
logger.debug(
@@ -1350,7 +2319,7 @@ def is_primary_instance_group(gcp, instance_group):
def get_startup_script(path_to_server_binary, service_port):
if path_to_server_binary:
- return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
+ return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary,
service_port)
else:
return """#!/bin/bash
@@ -1561,34 +2530,39 @@ def create_target_proxy(gcp, name, validate_for_proxyless=True):
gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
-def create_global_forwarding_rule(gcp, name, potential_ports):
+def create_global_forwarding_rule(gcp,
+ name,
+ potential_ports,
+ potential_ip_addresses=['0.0.0.0']):
if gcp.alpha_compute:
compute_to_use = gcp.alpha_compute
else:
compute_to_use = gcp.compute
for port in potential_ports:
- try:
- config = {
- 'name': name,
- 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
- 'portRange': str(port),
- 'IPAddress': '0.0.0.0',
- 'network': args.network,
- 'target': gcp.target_proxy.url,
- }
- logger.debug('Sending GCP request with body=%s', config)
- result = compute_to_use.globalForwardingRules().insert(
- project=gcp.project,
- body=config).execute(num_retries=_GCP_API_RETRIES)
- wait_for_global_operation(gcp, result['name'])
- gcp.global_forwarding_rule = GcpResource(config['name'],
- result['targetLink'])
- gcp.service_port = port
- return
- except googleapiclient.errors.HttpError as http_error:
- logger.warning(
- 'Got error %s when attempting to create forwarding rule to '
- '0.0.0.0:%d. Retrying with another port.' % (http_error, port))
+ for ip_address in potential_ip_addresses:
+ try:
+ config = {
+ 'name': name,
+ 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
+ 'portRange': str(port),
+ 'IPAddress': ip_address,
+ 'network': args.network,
+ 'target': gcp.target_proxy.url,
+ }
+ logger.debug('Sending GCP request with body=%s', config)
+ result = compute_to_use.globalForwardingRules().insert(
+ project=gcp.project,
+ body=config).execute(num_retries=_GCP_API_RETRIES)
+ wait_for_global_operation(gcp, result['name'])
+ gcp.global_forwarding_rule = GcpResource(
+ config['name'], result['targetLink'])
+ gcp.service_port = port
+ return
+ except googleapiclient.errors.HttpError as http_error:
+ logger.warning(
+ 'Got error %s when attempting to create forwarding rule to '
+ '%s:%d. Retrying with another port.' %
+ (http_error, ip_address, port))
def get_health_check(gcp, health_check_name):
@@ -1652,39 +2626,49 @@ def get_instance_group(gcp, zone, instance_group_name):
return instance_group
-def delete_global_forwarding_rule(gcp):
+def delete_global_forwarding_rule(gcp, name=None):
+ if name:
+ forwarding_rule_to_delete = name
+ else:
+ forwarding_rule_to_delete = gcp.global_forwarding_rule.name
try:
result = gcp.compute.globalForwardingRules().delete(
project=gcp.project,
- forwardingRule=gcp.global_forwarding_rule.name).execute(
+ forwardingRule=forwarding_rule_to_delete).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
-def delete_target_proxy(gcp):
+def delete_target_proxy(gcp, name=None):
+ if name:
+ proxy_to_delete = name
+ else:
+ proxy_to_delete = gcp.target_proxy.name
try:
if gcp.alpha_compute:
result = gcp.alpha_compute.targetGrpcProxies().delete(
- project=gcp.project,
- targetGrpcProxy=gcp.target_proxy.name).execute(
+ project=gcp.project, targetGrpcProxy=proxy_to_delete).execute(
num_retries=_GCP_API_RETRIES)
else:
result = gcp.compute.targetHttpProxies().delete(
- project=gcp.project,
- targetHttpProxy=gcp.target_proxy.name).execute(
+ project=gcp.project, targetHttpProxy=proxy_to_delete).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
-def delete_url_map(gcp):
+def delete_url_map(gcp, name=None):
+ if name:
+ url_map_to_delete = name
+ else:
+ url_map_to_delete = gcp.url_map.name
try:
result = gcp.compute.urlMaps().delete(
project=gcp.project,
- urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
+ urlMap=url_map_to_delete).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
@@ -1757,6 +2741,7 @@ def patch_backend_service(gcp,
backend_service,
instance_groups,
balancing_mode='UTILIZATION',
+ max_rate=1,
circuit_breakers=None):
if gcp.alpha_compute:
compute_to_use = gcp.alpha_compute
@@ -1766,7 +2751,7 @@ def patch_backend_service(gcp,
'backends': [{
'group': instance_group.url,
'balancingMode': balancing_mode,
- 'maxRate': 1 if balancing_mode == 'RATE' else None
+ 'maxRate': max_rate if balancing_mode == 'RATE' else None
} for instance_group in instance_groups],
'circuitBreakers': circuit_breakers,
}
@@ -1804,6 +2789,11 @@ def patch_url_map_backend_service(gcp,
Only one of backend_service and service_with_weights can be not None.
'''
+ if gcp.alpha_compute:
+ compute_to_use = gcp.alpha_compute
+ else:
+ compute_to_use = gcp.compute
+
if backend_service and services_with_weights:
raise ValueError(
'both backend_service and service_with_weights are not None.')
@@ -1825,7 +2815,7 @@ def patch_url_map_backend_service(gcp,
}]
}
logger.debug('Sending GCP request with body=%s', config)
- result = gcp.compute.urlMaps().patch(
+ result = compute_to_use.urlMaps().patch(
project=gcp.project, urlMap=gcp.url_map.name,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
@@ -1973,10 +2963,11 @@ class GcpResource(object):
class GcpState(object):
- def __init__(self, compute, alpha_compute, project):
+ def __init__(self, compute, alpha_compute, project, project_num):
self.compute = compute
self.alpha_compute = alpha_compute
self.project = project
+ self.project_num = project_num
self.health_check = None
self.health_check_firewall_rule = None
self.backend_services = []
@@ -1988,6 +2979,7 @@ class GcpState(object):
self.instance_groups = []
+maybe_write_sponge_properties()
alpha_compute = None
if args.compute_discovery_document:
with open(args.compute_discovery_document, 'r') as discovery_doc:
@@ -2003,7 +2995,7 @@ else:
alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
try:
- gcp = GcpState(compute, alpha_compute, args.project_id)
+ gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
gcp_suffix = args.gcp_suffix
health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
if not args.use_existing_gcp_resources:
@@ -2095,7 +3087,12 @@ try:
if args.test_case:
client_env = dict(os.environ)
+ if original_grpc_trace:
+ client_env['GRPC_TRACE'] = original_grpc_trace
+ if original_grpc_verbosity:
+ client_env['GRPC_VERBOSITY'] = original_grpc_verbosity
bootstrap_server_features = []
+
if gcp.service_port == _DEFAULT_SERVICE_PORT:
server_uri = service_host_name
else:
@@ -2109,15 +3106,38 @@ try:
with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
bootstrap_file.write(
_BOOTSTRAP_TEMPLATE.format(
- node_id=socket.gethostname(),
+ node_id='projects/%s/networks/%s/nodes/%s' %
+ (gcp.project_num, args.network.split('/')[-1],
+ uuid.uuid1()),
server_features=json.dumps(
bootstrap_server_features)).encode('utf-8'))
bootstrap_path = bootstrap_file.name
client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
+ client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true'
+ client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true'
test_results = {}
failed_tests = []
for test_case in args.test_case:
+ if test_case in _V3_TEST_CASES and not args.xds_v3_support:
+ logger.info('skipping test %s due to missing v3 support',
+ test_case)
+ continue
+ if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
+ logger.info('skipping test %s due to missing alpha support',
+ test_case)
+ continue
+ if test_case in [
+ 'api_listener', 'forwarding_rule_port_match',
+ 'forwarding_rule_default_port'
+ ] and CLIENT_HOSTS:
+ logger.info(
+ 'skipping test %s because test configuration is'
+ 'not compatible with client processes on existing'
+ 'client hosts', test_case)
+ continue
+ if test_case == 'forwarding_rule_default_port':
+ server_uri = service_host_name
result = jobset.JobResult()
log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
if not os.path.exists(log_dir):
@@ -2132,11 +3152,13 @@ try:
rpcs_to_send = '--rpc="UnaryCall"'
if test_case in _TESTS_TO_SEND_METADATA:
- metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU}"'.format(
+ metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
keyE=_TEST_METADATA_KEY,
valueE=_TEST_METADATA_VALUE_EMPTY,
keyU=_TEST_METADATA_KEY,
- valueU=_TEST_METADATA_VALUE_UNARY)
+ valueU=_TEST_METADATA_VALUE_UNARY,
+ keyNU=_TEST_METADATA_NUMERIC_KEY,
+ valueNU=_TEST_METADATA_NUMERIC_VALUE)
else:
# Setting the arg explicitly to empty with '--metadata=""'
# makes C# client fail
@@ -2185,6 +3207,10 @@ try:
elif test_case == 'gentle_failover':
test_gentle_failover(gcp, backend_service, instance_group,
secondary_zone_instance_group)
+ elif test_case == 'load_report_based_failover':
+ test_load_report_based_failover(
+ gcp, backend_service, instance_group,
+ secondary_zone_instance_group)
elif test_case == 'ping_pong':
test_ping_pong(gcp, backend_service, instance_group)
elif test_case == 'remove_instance_group':
@@ -2216,6 +3242,26 @@ try:
elif test_case == 'circuit_breaking':
test_circuit_breaking(gcp, backend_service, instance_group,
same_zone_instance_group)
+ elif test_case == 'timeout':
+ test_timeout(gcp, backend_service, instance_group)
+ elif test_case == 'fault_injection':
+ test_fault_injection(gcp, backend_service, instance_group)
+ elif test_case == 'api_listener':
+ server_uri = test_api_listener(gcp, backend_service,
+ instance_group,
+ alternate_backend_service)
+ elif test_case == 'forwarding_rule_port_match':
+ server_uri = test_forwarding_rule_port_match(
+ gcp, backend_service, instance_group)
+ elif test_case == 'forwarding_rule_default_port':
+ server_uri = test_forwarding_rule_default_port(
+ gcp, backend_service, instance_group)
+ elif test_case == 'metadata_filter':
+ test_metadata_filter(gcp, backend_service, instance_group,
+ alternate_backend_service,
+ same_zone_instance_group)
+ elif test_case == 'csds':
+ test_csds(gcp, backend_service, instance_group, server_uri)
else:
logger.error('Unknown test case: %s', test_case)
sys.exit(1)