diff options
Diffstat (limited to 'grpc/tools/run_tests/run_xds_tests.py')
-rwxr-xr-x | grpc/tools/run_tests/run_xds_tests.py | 1164 |
1 files changed, 1105 insertions, 59 deletions
diff --git a/grpc/tools/run_tests/run_xds_tests.py b/grpc/tools/run_tests/run_xds_tests.py index 6b397d49..7743df1b 100755 --- a/grpc/tools/run_tests/run_xds_tests.py +++ b/grpc/tools/run_tests/run_xds_tests.py @@ -15,6 +15,7 @@ """Run xDS integration tests on GCP using Traffic Director.""" import argparse +import datetime import googleapiclient.discovery import grpc import json @@ -27,8 +28,10 @@ import subprocess import sys import tempfile import time +import uuid from oauth2client.client import GoogleCredentials +from google.protobuf import json_format import python_utils.jobset as jobset import python_utils.report_utils as report_utils @@ -39,6 +42,20 @@ from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc +# Envoy protos provided by PyPI package xds-protos +# Needs to import the generated Python file to load descriptors +try: + from envoy.service.status.v3 import csds_pb2 + from envoy.service.status.v3 import csds_pb2_grpc + from envoy.extensions.filters.network.http_connection_manager.v3 import http_connection_manager_pb2 + from envoy.extensions.filters.common.fault.v3 import fault_pb2 + from envoy.extensions.filters.http.fault.v3 import fault_pb2 + from envoy.extensions.filters.http.router.v3 import router_pb2 +except ImportError: + # These protos are required by CSDS test. We should not fail the entire + # script for one test case. + pass + logger = logging.getLogger() console_handler = logging.StreamHandler() formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') @@ -47,27 +64,49 @@ logger.handlers = [] logger.addHandler(console_handler) logger.setLevel(logging.WARNING) +# Suppress excessive logs for gRPC Python +original_grpc_trace = os.environ.pop('GRPC_TRACE', None) +original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None) +# Suppress not-essential logs for GCP clients +logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING) +logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING) + _TEST_CASES = [ 'backends_restart', 'change_backend_service', 'gentle_failover', + 'load_report_based_failover', 'ping_pong', 'remove_instance_group', 'round_robin', 'secondary_locality_gets_no_requests_on_partial_primary_failure', 'secondary_locality_gets_requests_on_primary_failure', 'traffic_splitting', + 'path_matching', + 'header_matching', + 'forwarding_rule_port_match', + 'forwarding_rule_default_port', + 'metadata_filter', ] + # Valid test cases, but not in all. So the tests can only run manually, and # aren't enabled automatically for all languages. # # TODO: Move them into _TEST_CASES when support is ready in all languages. _ADDITIONAL_TEST_CASES = [ - 'path_matching', - 'header_matching', 'circuit_breaking', + 'timeout', + 'fault_injection', + 'csds', + 'api_listener', # TODO(b/187352987) Relieve quota pressure ] +# Test cases that require the V3 API. Skipped in older runs. +_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds']) + +# Test cases that require the alpha API. Skipped for stable API runs. +_ALPHA_TEST_CASES = frozenset(['timeout']) + def parse_test_cases(arg): if arg == '': @@ -96,7 +135,11 @@ def parse_port_range(port_arg): argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP') -argp.add_argument('--project_id', help='GCP project id') +# TODO(zdapeng): remove default value of project_id and project_num +argp.add_argument('--project_id', default='grpc-testing', help='GCP project id') +argp.add_argument('--project_num', + default='830293263384', + help='GCP project number') argp.add_argument( '--gcp_suffix', default='', @@ -175,8 +218,9 @@ argp.add_argument( argp.add_argument('--network', default='global/networks/default', help='GCP network to use') +_DEFAULT_PORT_RANGE = '8080:8280' argp.add_argument('--service_port_range', - default='8080:8110', + default=_DEFAULT_PORT_RANGE, type=parse_port_range, help='Listening port for created gRPC backends. Specified as ' 'either a single int or as a range in the format min:max, in ' @@ -235,14 +279,23 @@ CLIENT_HOSTS = [] if args.client_hosts: CLIENT_HOSTS = args.client_hosts.split(',') +# Each of the config propagation in the control plane should finish within 600s. +# Otherwise, it indicates a bug in the control plane. The config propagation +# includes all kinds of traffic config update, like updating urlMap, creating +# the resources for the first time, updating BackendService, and changing the +# status of endpoints in BackendService. +_WAIT_FOR_URL_MAP_PATCH_SEC = 600 +# In general, fetching load balancing stats only takes ~10s. However, slow +# config update could lead to empty EDS or similar symptoms causing the +# connection to hang for a long period of time. So, we want to extend the stats +# wait time to be the same as urlMap patch time. +_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC + _DEFAULT_SERVICE_PORT = 80 _WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec _WAIT_FOR_OPERATION_SEC = 1200 _INSTANCE_GROUP_SIZE = args.instance_group_size _NUM_TEST_RPCS = 10 * args.qps -_WAIT_FOR_STATS_SEC = 360 -_WAIT_FOR_VALID_CONFIG_SEC = 60 -_WAIT_FOR_URL_MAP_PATCH_SEC = 300 _CONNECTION_TIMEOUT_SEC = 60 _GCP_API_RETRIES = 5 _BOOTSTRAP_TEMPLATE = """ @@ -250,7 +303,8 @@ _BOOTSTRAP_TEMPLATE = """ "node": {{ "id": "{node_id}", "metadata": {{ - "TRAFFICDIRECTOR_NETWORK_NAME": "%s" + "TRAFFICDIRECTOR_NETWORK_NAME": "%s", + "com.googleapis.trafficdirector.config_time_trace": "TRUE" }}, "locality": {{ "zone": "%s" @@ -279,6 +333,9 @@ _TESTS_TO_SEND_METADATA = ['header_matching'] _TEST_METADATA_KEY = 'xds_md' _TEST_METADATA_VALUE_UNARY = 'unary_yranu' _TEST_METADATA_VALUE_EMPTY = 'empty_ytpme' +# Extra RPC metadata whose value is a number, sent with UnaryCall only. +_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric' +_TEST_METADATA_NUMERIC_VALUE = '159' _PATH_MATCHER_NAME = 'path-matcher' _BASE_TEMPLATE_NAME = 'test-template' _BASE_INSTANCE_GROUP_NAME = 'test-ig' @@ -313,7 +370,8 @@ def get_client_stats(num_rpcs, timeout_sec): response = stub.GetClientStats(request, wait_for_ready=True, timeout=rpc_timeout) - logger.debug('Invoked GetClientStats RPC to %s: %s', host, response) + logger.debug('Invoked GetClientStats RPC to %s: %s', host, + json_format.MessageToJson(response)) return response @@ -336,7 +394,33 @@ def get_client_accumulated_stats(): return response -def configure_client(rpc_types, metadata): +def get_client_xds_config_dump(): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + server_address = '%s:%d' % (host, args.stats_port) + with grpc.insecure_channel(server_address) as channel: + stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel) + logger.debug('Fetching xDS config dump from %s', server_address) + response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(), + wait_for_ready=True, + timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Fetched xDS config dump from %s', server_address) + if len(response.config) != 1: + logger.error('Unexpected number of ClientConfigs %d: %s', + len(response.config), response) + return None + else: + # Converting the ClientStatusResponse into JSON, because many + # fields are packed in google.protobuf.Any. It will require many + # duplicated code to unpack proto message and inspect values. + return json_format.MessageToDict( + response.config[0], preserving_proto_field_name=True) + + +def configure_client(rpc_types, metadata=[], timeout_sec=None): if CLIENT_HOSTS: hosts = CLIENT_HOSTS else: @@ -352,6 +436,8 @@ def configure_client(rpc_types, metadata): md.type = rpc_type md.key = md_key md.value = md_value + if timeout_sec: + request.timeout_sec = timeout_sec logger.debug( 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', host, args.stats_port, request) @@ -407,6 +493,21 @@ def wait_until_all_rpcs_go_to_given_backends(backends, allow_failures=False) +def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec): + start_time = time.time() + while time.time() - start_time <= timeout_sec: + stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec) + error_msg = None + rpcs_by_peer = stats.rpcs_by_peer + for backend in backends: + if backend in rpcs_by_peer: + error_msg = 'Unexpected backend %s receives load' % backend + break + if not error_msg: + return + raise Exception('Unexpected RPCs going to given backends') + + def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): '''Block until the test client reaches the state with the given number of RPCs being outstanding stably. @@ -619,6 +720,56 @@ def test_gentle_failover(gcp, _WAIT_FOR_BACKEND_SEC) +def test_load_report_based_failover(gcp, backend_service, + primary_instance_group, + secondary_instance_group): + logger.info('Running test_load_report_based_failover') + try: + patch_backend_service( + gcp, backend_service, + [primary_instance_group, secondary_instance_group]) + primary_instance_names = get_instance_names(gcp, primary_instance_group) + secondary_instance_names = get_instance_names(gcp, + secondary_instance_group) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_instance_group) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_STATS_SEC) + # Set primary locality's balance mode to RATE, and RPS to 20% of the + # client's QPS. The secondary locality will be used. + max_rate = int(args.qps * 1 / 5) + logger.info('Patching backend service to RATE with %d max_rate', + max_rate) + patch_backend_service( + gcp, + backend_service, [primary_instance_group, secondary_instance_group], + balancing_mode='RATE', + max_rate=max_rate) + wait_until_all_rpcs_go_to_given_backends( + primary_instance_names + secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) + + # Set primary locality's balance mode to RATE, and RPS to 120% of the + # client's QPS. Only the primary locality will be used. + max_rate = int(args.qps * 6 / 5) + logger.info('Patching backend service to RATE with %d max_rate', + max_rate) + patch_backend_service( + gcp, + backend_service, [primary_instance_group, secondary_instance_group], + balancing_mode='RATE', + max_rate=max_rate) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_BACKEND_SEC) + logger.info("success") + finally: + patch_backend_service(gcp, backend_service, [primary_instance_group]) + instance_names = get_instance_names(gcp, primary_instance_group) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_BACKEND_SEC) + + def test_ping_pong(gcp, backend_service, instance_group): logger.info('Running test_ping_pong') wait_for_healthy_backends(gcp, backend_service, instance_group) @@ -829,6 +980,303 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service, return original_backend_instances, alternate_backend_instances +def test_metadata_filter(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + logger.info("Running test_metadata_filter") + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group]) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + try: + with open(bootstrap_path) as f: + md = json.load(f)['node']['metadata'] + match_labels = [] + for k, v in md.items(): + match_labels.append({'name': k, 'value': v}) + + not_match_labels = [{'name': 'fake', 'value': 'fail'}] + test_route_rules = [ + # test MATCH_ALL + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': not_match_labels + }] + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': match_labels + }] + }], + 'service': alternate_backend_service.url + }, + ], + # test mixing MATCH_ALL and MATCH_ANY + # test MATCH_ALL: super set labels won't match + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': not_match_labels + match_labels + }] + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': not_match_labels + match_labels + }] + }], + 'service': alternate_backend_service.url + }, + ], + # test MATCH_ANY + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': not_match_labels + }] + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': not_match_labels + match_labels + }] + }], + 'service': alternate_backend_service.url + }, + ], + # test match multiple route rules + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': match_labels + }] + }], + 'service': alternate_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': match_labels + }] + }], + 'service': original_backend_service.url + }, + ] + ] + + for route_rules in test_route_rules: + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + wait_until_no_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends( + alternate_backend_instances, _WAIT_FOR_STATS_SEC) + patch_url_map_backend_service(gcp, original_backend_service) + finally: + patch_backend_service(gcp, alternate_backend_service, []) + + +def test_api_listener(gcp, backend_service, instance_group, + alternate_backend_service): + logger.info("Running api_listener") + try: + wait_for_healthy_backends(gcp, backend_service, instance_group) + backend_instances = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + # create a second suite of map+tp+fr with the same host name in host rule + # and we have to disable proxyless validation because it needs `0.0.0.0` + # ip address in fr for proxyless and also we violate ip:port uniqueness + # for test purpose. See https://github.com/grpc/grpc-java/issues/8009 + new_config_suffix = '2' + create_url_map(gcp, url_map_name + new_config_suffix, backend_service, + service_host_name) + create_target_proxy(gcp, target_proxy_name + new_config_suffix, False) + if not gcp.service_port: + raise Exception( + 'Faied to find a valid port for the forwarding rule') + potential_ip_addresses = [] + max_attempts = 10 + for i in range(max_attempts): + potential_ip_addresses.append('10.10.10.%d' % + (random.randint(0, 255))) + create_global_forwarding_rule(gcp, + forwarding_rule_name + new_config_suffix, + [gcp.service_port], + potential_ip_addresses) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, + url_map_name + new_config_suffix, + backend_service, + service_host_name) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + + delete_global_forwarding_rule(gcp, forwarding_rule_name) + delete_target_proxy(gcp, target_proxy_name) + delete_url_map(gcp, url_map_name) + verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * + args.qps) + for i in range(verify_attempts): + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + # delete host rule for the original host name + patch_url_map_backend_service(gcp, alternate_backend_service) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + + finally: + delete_global_forwarding_rule(gcp, + forwarding_rule_name + new_config_suffix) + delete_target_proxy(gcp, target_proxy_name + new_config_suffix) + delete_url_map(gcp, url_map_name + new_config_suffix) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_proxy(gcp, target_proxy_name) + create_global_forwarding_rule(gcp, forwarding_rule_name, + potential_service_ports) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + server_uri = service_host_name + ':' + str(gcp.service_port) + else: + server_uri = service_host_name + return server_uri + + +def test_forwarding_rule_port_match(gcp, backend_service, instance_group): + logger.info("Running test_forwarding_rule_port_match") + try: + wait_for_healthy_backends(gcp, backend_service, instance_group) + backend_instances = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + delete_global_forwarding_rule(gcp) + create_global_forwarding_rule(gcp, forwarding_rule_name, [ + x for x in parse_port_range(_DEFAULT_PORT_RANGE) + if x != gcp.service_port + ]) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + finally: + delete_global_forwarding_rule(gcp) + create_global_forwarding_rule(gcp, forwarding_rule_name, + potential_service_ports) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + server_uri = service_host_name + ':' + str(gcp.service_port) + else: + server_uri = service_host_name + return server_uri + + +def test_forwarding_rule_default_port(gcp, backend_service, instance_group): + logger.info("Running test_forwarding_rule_default_port") + try: + wait_for_healthy_backends(gcp, backend_service, instance_group) + backend_instances = get_instance_names(gcp, instance_group) + if gcp.service_port == _DEFAULT_SERVICE_PORT: + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + delete_global_forwarding_rule(gcp) + create_global_forwarding_rule(gcp, forwarding_rule_name, + parse_port_range(_DEFAULT_PORT_RANGE)) + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + # expect success when no port in client request service uri, and no port in url-map + delete_global_forwarding_rule(gcp) + delete_target_proxy(gcp) + delete_url_map(gcp) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_proxy(gcp, gcp.target_proxy.name, False) + potential_ip_addresses = [] + max_attempts = 10 + for i in range(max_attempts): + potential_ip_addresses.append('10.10.10.%d' % + (random.randint(0, 255))) + create_global_forwarding_rule(gcp, forwarding_rule_name, [80], + potential_ip_addresses) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + + # expect failure when no port in client request uri, but specify port in url-map + patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service, + service_host_name) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + finally: + delete_global_forwarding_rule(gcp) + delete_target_proxy(gcp) + delete_url_map(gcp) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_proxy(gcp, target_proxy_name) + create_global_forwarding_rule(gcp, forwarding_rule_name, + potential_service_ports) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + server_uri = service_host_name + ':' + str(gcp.service_port) + else: + server_uri = service_host_name + return server_uri + + def test_traffic_splitting(gcp, original_backend_service, instance_group, alternate_backend_service, same_zone_instance_group): # This test start with all traffic going to original_backend_service. Then @@ -974,7 +1422,36 @@ def test_path_matching(gcp, original_backend_service, instance_group, { "UnaryCall": original_backend_instances, "EmptyCall": alternate_backend_instances - }) + }), + ( + [{ + 'priority': 0, + # Regex UnaryCall -> alternate_backend_service. + 'matchRules': [{ + 'regexMatch': + '^\/.*\/UnaryCall$' # Unary methods with any services. + }], + 'service': alternate_backend_service.url + }], + { + "UnaryCall": alternate_backend_instances, + "EmptyCall": original_backend_instances + }), + ( + [{ + 'priority': 0, + # ignoreCase EmptyCall -> alternate_backend_service. + 'matchRules': [{ + # Case insensitive matching. + 'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl', + 'ignoreCase': True, + }], + 'service': alternate_backend_service.url + }], + { + "UnaryCall": original_backend_instances, + "EmptyCall": alternate_backend_instances + }), ] for (route_rules, expected_instances) in test_cases: @@ -991,8 +1468,8 @@ def test_path_matching(gcp, original_backend_service, instance_group, original_backend_instances + alternate_backend_instances, _WAIT_FOR_STATS_SEC) - retry_count = 20 - # Each attempt takes about 10 seconds, 20 retries is equivalent to 200 + retry_count = 80 + # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 # seconds timeout. for i in range(retry_count): stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) @@ -1004,6 +1481,10 @@ def test_path_matching(gcp, original_backend_service, instance_group, if compare_expected_instances(stats, expected_instances): logger.info("success") break + elif i == retry_count - 1: + raise Exception( + 'timeout waiting for RPCs to the expected instances: %s' + % expected_instances) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, alternate_backend_service, []) @@ -1087,23 +1568,87 @@ def test_header_matching(gcp, original_backend_service, instance_group, ( [{ 'priority': 0, + # Header 'xds_md_numeric' present -> alternate_backend_service. + # UnaryCall is sent with the metadata, so will be sent to alternative. + 'matchRules': [{ + 'prefixMatch': + '/', + 'headerMatches': [{ + 'headerName': _TEST_METADATA_NUMERIC_KEY, + 'presentMatch': True + }] + }], + 'service': alternate_backend_service.url + }], + { + "EmptyCall": original_backend_instances, + "UnaryCall": alternate_backend_instances + }), + ( + [{ + 'priority': 0, # Header invert ExactMatch -> alternate_backend_service. - # EmptyCall is sent with the metadata, so will be sent to original. + # UnaryCall is sent with the metadata, so will be sent to + # original. EmptyCall will be sent to alternative. 'matchRules': [{ 'prefixMatch': '/', 'headerMatches': [{ 'headerName': _TEST_METADATA_KEY, - 'exactMatch': _TEST_METADATA_VALUE_EMPTY, + 'exactMatch': _TEST_METADATA_VALUE_UNARY, 'invertMatch': True }] }], 'service': alternate_backend_service.url }], { + "EmptyCall": alternate_backend_instances, + "UnaryCall": original_backend_instances + }), + ( + [{ + 'priority': 0, + # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service. + # UnaryCall is sent with the metadata in range. + 'matchRules': [{ + 'prefixMatch': + '/', + 'headerMatches': [{ + 'headerName': _TEST_METADATA_NUMERIC_KEY, + 'rangeMatch': { + 'rangeStart': '100', + 'rangeEnd': '200' + } + }] + }], + 'service': alternate_backend_service.url + }], + { "EmptyCall": original_backend_instances, "UnaryCall": alternate_backend_instances }), + ( + [{ + 'priority': 0, + # Header RegexMatch -> alternate_backend_service. + # EmptyCall is sent with the metadata. + 'matchRules': [{ + 'prefixMatch': + '/', + 'headerMatches': [{ + 'headerName': + _TEST_METADATA_KEY, + 'regexMatch': + "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2], + _TEST_METADATA_VALUE_EMPTY[-2:]) + }] + }], + 'service': alternate_backend_service.url + }], + { + "EmptyCall": alternate_backend_instances, + "UnaryCall": original_backend_instances + }), ] for (route_rules, expected_instances) in test_cases: @@ -1121,8 +1666,8 @@ def test_header_matching(gcp, original_backend_service, instance_group, original_backend_instances + alternate_backend_instances, _WAIT_FOR_STATS_SEC) - retry_count = 20 - # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 + retry_count = 80 + # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 # seconds timeout. for i in range(retry_count): stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) @@ -1134,6 +1679,10 @@ def test_header_matching(gcp, original_backend_service, instance_group, if compare_expected_instances(stats, expected_instances): logger.info("success") break + elif i == retry_count - 1: + raise Exception( + 'timeout waiting for RPCs to the expected instances: %s' + % expected_instances) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, alternate_backend_service, []) @@ -1235,7 +1784,7 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group, configure_client([ messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL - ], []) + ]) logger.info('Patching url map with %s', route_rules) patch_url_map_backend_service(gcp, extra_backend_service, @@ -1285,7 +1834,7 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group, # Avoid new RPCs being outstanding (some test clients create threads # for sending RPCs) after restoring backend services. configure_client( - [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], []) + [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL]) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, original_backend_service, [instance_group]) @@ -1294,6 +1843,426 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group, set_validate_for_proxyless(gcp, True) +def test_timeout(gcp, original_backend_service, instance_group): + logger.info('Running test_timeout') + + logger.info('waiting for original backends to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + + # UnaryCall -> maxStreamDuration:3s + route_rules = [{ + 'priority': 0, + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' + }], + 'service': original_backend_service.url, + 'routeAction': { + 'maxStreamDuration': { + 'seconds': 3, + }, + }, + }] + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + # A list of tuples (testcase_name, {client_config}, {expected_results}) + test_cases = [ + ( + 'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)', + # UnaryCall and EmptyCall both sleep-4. + # UnaryCall timeouts, EmptyCall succeeds. + { + 'rpc_types': [ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, + ], + 'metadata': [ + (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'sleep-4'), + (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, + 'rpc-behavior', 'sleep-4'), + ], + }, + { + 'UNARY_CALL': 4, # DEADLINE_EXCEEDED + 'EMPTY_CALL': 0, + }, + ), + ( + 'app_timeout_exceeded', + # UnaryCall only with sleep-2; timeout=1s; calls timeout. + { + 'rpc_types': [ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + ], + 'metadata': [ + (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'sleep-2'), + ], + 'timeout_sec': 1, + }, + { + 'UNARY_CALL': 4, # DEADLINE_EXCEEDED + }, + ), + ( + 'timeout_not_exceeded', + # UnaryCall only with no sleep; calls succeed. + { + 'rpc_types': [ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + ], + }, + { + 'UNARY_CALL': 0, + }, + ) + ] + + try: + first_case = True + for (testcase_name, client_config, expected_results) in test_cases: + logger.info('starting case %s', testcase_name) + configure_client(**client_config) + # wait a second to help ensure the client stops sending RPCs with + # the old config. We will make multiple attempts if it is failing, + # but this improves confidence that the test is valid if the + # previous client_config would lead to the same results. + time.sleep(1) + # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 + # second timeout. + attempt_count = 20 + if first_case: + attempt_count = 120 + first_case = False + before_stats = get_client_accumulated_stats() + if not before_stats.stats_per_method: + raise ValueError( + 'stats.stats_per_method is None, the interop client stats service does not support this test case' + ) + for i in range(attempt_count): + logger.info('%s: attempt %d', testcase_name, i) + + test_runtime_secs = 10 + time.sleep(test_runtime_secs) + after_stats = get_client_accumulated_stats() + + success = True + for rpc, status in expected_results.items(): + qty = (after_stats.stats_per_method[rpc].result[status] - + before_stats.stats_per_method[rpc].result[status]) + want = test_runtime_secs * args.qps + # Allow 10% deviation from expectation to reduce flakiness + if qty < (want * .9) or qty > (want * 1.1): + logger.info('%s: failed due to %s[%s]: got %d want ~%d', + testcase_name, rpc, status, qty, want) + success = False + if success: + logger.info('success') + break + logger.info('%s attempt %d failed', testcase_name, i) + before_stats = after_stats + else: + raise Exception( + '%s: timeout waiting for expected results: %s; got %s' % + (testcase_name, expected_results, + after_stats.stats_per_method)) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + + +def test_fault_injection(gcp, original_backend_service, instance_group): + logger.info('Running test_fault_injection') + + logger.info('waiting for original backends to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + + testcase_header = 'fi_testcase' + + def _route(pri, name, fi_policy): + return { + 'priority': pri, + 'matchRules': [{ + 'prefixMatch': + '/', + 'headerMatches': [{ + 'headerName': testcase_header, + 'exactMatch': name, + }], + }], + 'service': original_backend_service.url, + 'routeAction': { + 'faultInjectionPolicy': fi_policy + }, + } + + def _abort(pct): + return { + 'abort': { + 'httpStatus': 401, + 'percentage': pct, + } + } + + def _delay(pct): + return { + 'delay': { + 'fixedDelay': { + 'seconds': '20' + }, + 'percentage': pct, + } + } + + zero_route = _abort(0) + zero_route.update(_delay(0)) + route_rules = [ + _route(0, 'zero_percent_fault_injection', zero_route), + _route(1, 'always_delay', _delay(100)), + _route(2, 'always_abort', _abort(100)), + _route(3, 'delay_half', _delay(50)), + _route(4, 'abort_half', _abort(50)), + { + 'priority': 5, + 'matchRules': [{ + 'prefixMatch': '/' + }], + 'service': original_backend_service.url, + }, + ] + set_validate_for_proxyless(gcp, False) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + # A list of tuples (testcase_name, {client_config}, {code: percent}). Each + # test case will set the testcase_header with the testcase_name for routing + # to the appropriate config for the case, defined above. + test_cases = [ + ( + 'zero_percent_fault_injection', + {}, + { + 0: 1 + }, # OK + ), + ( + 'non_matching_fault_injection', # Not in route_rules, above. + {}, + { + 0: 1 + }, # OK + ), + ( + 'always_delay', + { + 'timeout_sec': 2 + }, + { + 4: 1 + }, # DEADLINE_EXCEEDED + ), + ( + 'always_abort', + {}, + { + 16: 1 + }, # UNAUTHENTICATED + ), + ( + 'delay_half', + { + 'timeout_sec': 2 + }, + { + 4: .5, + 0: .5 + }, # DEADLINE_EXCEEDED / OK: 50% / 50% + ), + ( + 'abort_half', + {}, + { + 16: .5, + 0: .5 + }, # UNAUTHENTICATED / OK: 50% / 50% + ) + ] + + try: + first_case = True + for (testcase_name, client_config, expected_results) in test_cases: + logger.info('starting case %s', testcase_name) + + client_config['metadata'] = [ + (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + testcase_header, testcase_name) + ] + client_config['rpc_types'] = [ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + ] + configure_client(**client_config) + # wait a second to help ensure the client stops sending RPCs with + # the old config. We will make multiple attempts if it is failing, + # but this improves confidence that the test is valid if the + # previous client_config would lead to the same results. + time.sleep(1) + # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 + # second timeout. + attempt_count = 20 + if first_case: + attempt_count = 120 + first_case = False + before_stats = get_client_accumulated_stats() + if not before_stats.stats_per_method: + raise ValueError( + 'stats.stats_per_method is None, the interop client stats service does not support this test case' + ) + for i in range(attempt_count): + logger.info('%s: attempt %d', testcase_name, i) + + test_runtime_secs = 10 + time.sleep(test_runtime_secs) + after_stats = get_client_accumulated_stats() + + success = True + for status, pct in expected_results.items(): + rpc = 'UNARY_CALL' + qty = (after_stats.stats_per_method[rpc].result[status] - + before_stats.stats_per_method[rpc].result[status]) + want = pct * args.qps * test_runtime_secs + # Allow 10% deviation from expectation to reduce flakiness + VARIANCE_ALLOWED = 0.1 + if abs(qty - want) > want * VARIANCE_ALLOWED: + logger.info('%s: failed due to %s[%s]: got %d want ~%d', + testcase_name, rpc, status, qty, want) + success = False + if success: + logger.info('success') + break + logger.info('%s attempt %d failed', testcase_name, i) + before_stats = after_stats + else: + raise Exception( + '%s: timeout waiting for expected results: %s; got %s' % + (testcase_name, expected_results, + after_stats.stats_per_method)) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + set_validate_for_proxyless(gcp, True) + + +def test_csds(gcp, original_backend_service, instance_group, server_uri): + test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds() + sleep_interval_between_attempts_s = datetime.timedelta( + seconds=2).total_seconds() + logger.info('Running test_csds') + + logger.info('waiting for original backends to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + + # Test case timeout: 5 minutes + deadline = time.time() + test_csds_timeout_s + cnt = 0 + while time.time() <= deadline: + client_config = get_client_xds_config_dump() + logger.info('test_csds attempt %d: received xDS config %s', cnt, + json.dumps(client_config, indent=2)) + if client_config is not None: + # Got the xDS config dump, now validate it + ok = True + try: + if client_config['node']['locality']['zone'] != args.zone: + logger.info('Invalid zone %s != %s', + client_config['node']['locality']['zone'], + args.zone) + ok = False + seen = set() + for xds_config in client_config['xds_config']: + if 'listener_config' in xds_config: + listener_name = xds_config['listener_config'][ + 'dynamic_listeners'][0]['active_state']['listener'][ + 'name'] + if listener_name != server_uri: + logger.info('Invalid Listener name %s != %s', + listener_name, server_uri) + ok = False + else: + seen.add('lds') + elif 'route_config' in xds_config: + num_vh = len( + xds_config['route_config']['dynamic_route_configs'] + [0]['route_config']['virtual_hosts']) + if num_vh <= 0: + logger.info('Invalid number of VirtualHosts %s', + num_vh) + ok = False + else: + seen.add('rds') + elif 'cluster_config' in xds_config: + cluster_type = xds_config['cluster_config'][ + 'dynamic_active_clusters'][0]['cluster']['type'] + if cluster_type != 'EDS': + logger.info('Invalid cluster type %s != EDS', + cluster_type) + ok = False + else: + seen.add('cds') + elif 'endpoint_config' in xds_config: + sub_zone = xds_config["endpoint_config"][ + "dynamic_endpoint_configs"][0]["endpoint_config"][ + "endpoints"][0]["locality"]["sub_zone"] + if args.zone not in sub_zone: + logger.info('Invalid endpoint sub_zone %s', + sub_zone) + ok = False + else: + seen.add('eds') + want = {'lds', 'rds', 'cds', 'eds'} + if seen != want: + logger.info('Incomplete xDS config dump, seen=%s', seen) + ok = False + except: + logger.exception('Error in xDS config dump:') + ok = False + finally: + if ok: + # Successfully fetched xDS config, and they looks good. + logger.info('success') + return + logger.info('test_csds attempt %d failed', cnt) + # Give the client some time to fetch xDS resources + time.sleep(sleep_interval_between_attempts_s) + cnt += 1 + + raise RuntimeError('failed to receive a valid xDS config in %s seconds' % + test_csds_timeout_s) + + +def maybe_write_sponge_properties(): + """Writing test infos to enable more advanced testgrid searches.""" + if 'KOKORO_ARTIFACTS_DIR' not in os.environ: + return + if 'GIT_ORIGIN_URL' not in os.environ: + return + if 'GIT_COMMIT_SHORT' not in os.environ: + return + properties = [ + # Technically, 'TESTS_FORMAT_VERSION' is not required for run_xds_tests. + # We keep it here so one day we may merge the process of writing sponge + # properties. + 'TESTS_FORMAT_VERSION,2', + 'TESTGRID_EXCLUDE,%s' % os.environ.get('TESTGRID_EXCLUDE', 0), + 'GIT_ORIGIN_URL,%s' % os.environ['GIT_ORIGIN_URL'], + 'GIT_COMMIT_SHORT,%s' % os.environ['GIT_COMMIT_SHORT'], + ] + logger.info('Writing Sponge configs: %s', properties) + with open( + os.path.join(os.environ['KOKORO_ARTIFACTS_DIR'], + "custom_sponge_config.csv"), 'w') as f: + f.write("\n".join(properties)) + f.write("\n") + + def set_validate_for_proxyless(gcp, validate_for_proxyless): if not gcp.alpha_compute: logger.debug( @@ -1350,7 +2319,7 @@ def is_primary_instance_group(gcp, instance_group): def get_startup_script(path_to_server_binary, service_port): if path_to_server_binary: - return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary, + return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary, service_port) else: return """#!/bin/bash @@ -1561,34 +2530,39 @@ def create_target_proxy(gcp, name, validate_for_proxyless=True): gcp.target_proxy = GcpResource(config['name'], result['targetLink']) -def create_global_forwarding_rule(gcp, name, potential_ports): +def create_global_forwarding_rule(gcp, + name, + potential_ports, + potential_ip_addresses=['0.0.0.0']): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute else: compute_to_use = gcp.compute for port in potential_ports: - try: - config = { - 'name': name, - 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'portRange': str(port), - 'IPAddress': '0.0.0.0', - 'network': args.network, - 'target': gcp.target_proxy.url, - } - logger.debug('Sending GCP request with body=%s', config) - result = compute_to_use.globalForwardingRules().insert( - project=gcp.project, - body=config).execute(num_retries=_GCP_API_RETRIES) - wait_for_global_operation(gcp, result['name']) - gcp.global_forwarding_rule = GcpResource(config['name'], - result['targetLink']) - gcp.service_port = port - return - except googleapiclient.errors.HttpError as http_error: - logger.warning( - 'Got error %s when attempting to create forwarding rule to ' - '0.0.0.0:%d. Retrying with another port.' % (http_error, port)) + for ip_address in potential_ip_addresses: + try: + config = { + 'name': name, + 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', + 'portRange': str(port), + 'IPAddress': ip_address, + 'network': args.network, + 'target': gcp.target_proxy.url, + } + logger.debug('Sending GCP request with body=%s', config) + result = compute_to_use.globalForwardingRules().insert( + project=gcp.project, + body=config).execute(num_retries=_GCP_API_RETRIES) + wait_for_global_operation(gcp, result['name']) + gcp.global_forwarding_rule = GcpResource( + config['name'], result['targetLink']) + gcp.service_port = port + return + except googleapiclient.errors.HttpError as http_error: + logger.warning( + 'Got error %s when attempting to create forwarding rule to ' + '%s:%d. Retrying with another port.' % + (http_error, ip_address, port)) def get_health_check(gcp, health_check_name): @@ -1652,39 +2626,49 @@ def get_instance_group(gcp, zone, instance_group_name): return instance_group -def delete_global_forwarding_rule(gcp): +def delete_global_forwarding_rule(gcp, name=None): + if name: + forwarding_rule_to_delete = name + else: + forwarding_rule_to_delete = gcp.global_forwarding_rule.name try: result = gcp.compute.globalForwardingRules().delete( project=gcp.project, - forwardingRule=gcp.global_forwarding_rule.name).execute( + forwardingRule=forwarding_rule_to_delete).execute( num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_target_proxy(gcp): +def delete_target_proxy(gcp, name=None): + if name: + proxy_to_delete = name + else: + proxy_to_delete = gcp.target_proxy.name try: if gcp.alpha_compute: result = gcp.alpha_compute.targetGrpcProxies().delete( - project=gcp.project, - targetGrpcProxy=gcp.target_proxy.name).execute( + project=gcp.project, targetGrpcProxy=proxy_to_delete).execute( num_retries=_GCP_API_RETRIES) else: result = gcp.compute.targetHttpProxies().delete( - project=gcp.project, - targetHttpProxy=gcp.target_proxy.name).execute( + project=gcp.project, targetHttpProxy=proxy_to_delete).execute( num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_url_map(gcp): +def delete_url_map(gcp, name=None): + if name: + url_map_to_delete = name + else: + url_map_to_delete = gcp.url_map.name try: result = gcp.compute.urlMaps().delete( project=gcp.project, - urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES) + urlMap=url_map_to_delete).execute(num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) @@ -1757,6 +2741,7 @@ def patch_backend_service(gcp, backend_service, instance_groups, balancing_mode='UTILIZATION', + max_rate=1, circuit_breakers=None): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute @@ -1766,7 +2751,7 @@ def patch_backend_service(gcp, 'backends': [{ 'group': instance_group.url, 'balancingMode': balancing_mode, - 'maxRate': 1 if balancing_mode == 'RATE' else None + 'maxRate': max_rate if balancing_mode == 'RATE' else None } for instance_group in instance_groups], 'circuitBreakers': circuit_breakers, } @@ -1804,6 +2789,11 @@ def patch_url_map_backend_service(gcp, Only one of backend_service and service_with_weights can be not None. ''' + if gcp.alpha_compute: + compute_to_use = gcp.alpha_compute + else: + compute_to_use = gcp.compute + if backend_service and services_with_weights: raise ValueError( 'both backend_service and service_with_weights are not None.') @@ -1825,7 +2815,7 @@ def patch_url_map_backend_service(gcp, }] } logger.debug('Sending GCP request with body=%s', config) - result = gcp.compute.urlMaps().patch( + result = compute_to_use.urlMaps().patch( project=gcp.project, urlMap=gcp.url_map.name, body=config).execute(num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) @@ -1973,10 +2963,11 @@ class GcpResource(object): class GcpState(object): - def __init__(self, compute, alpha_compute, project): + def __init__(self, compute, alpha_compute, project, project_num): self.compute = compute self.alpha_compute = alpha_compute self.project = project + self.project_num = project_num self.health_check = None self.health_check_firewall_rule = None self.backend_services = [] @@ -1988,6 +2979,7 @@ class GcpState(object): self.instance_groups = [] +maybe_write_sponge_properties() alpha_compute = None if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: @@ -2003,7 +2995,7 @@ else: alpha_compute = googleapiclient.discovery.build('compute', 'alpha') try: - gcp = GcpState(compute, alpha_compute, args.project_id) + gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num) gcp_suffix = args.gcp_suffix health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix if not args.use_existing_gcp_resources: @@ -2095,7 +3087,12 @@ try: if args.test_case: client_env = dict(os.environ) + if original_grpc_trace: + client_env['GRPC_TRACE'] = original_grpc_trace + if original_grpc_verbosity: + client_env['GRPC_VERBOSITY'] = original_grpc_verbosity bootstrap_server_features = [] + if gcp.service_port == _DEFAULT_SERVICE_PORT: server_uri = service_host_name else: @@ -2109,15 +3106,38 @@ try: with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: bootstrap_file.write( _BOOTSTRAP_TEMPLATE.format( - node_id=socket.gethostname(), + node_id='projects/%s/networks/%s/nodes/%s' % + (gcp.project_num, args.network.split('/')[-1], + uuid.uuid1()), server_features=json.dumps( bootstrap_server_features)).encode('utf-8')) bootstrap_path = bootstrap_file.name client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true' + client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true' + client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true' test_results = {} failed_tests = [] for test_case in args.test_case: + if test_case in _V3_TEST_CASES and not args.xds_v3_support: + logger.info('skipping test %s due to missing v3 support', + test_case) + continue + if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute: + logger.info('skipping test %s due to missing alpha support', + test_case) + continue + if test_case in [ + 'api_listener', 'forwarding_rule_port_match', + 'forwarding_rule_default_port' + ] and CLIENT_HOSTS: + logger.info( + 'skipping test %s because test configuration is' + 'not compatible with client processes on existing' + 'client hosts', test_case) + continue + if test_case == 'forwarding_rule_default_port': + server_uri = service_host_name result = jobset.JobResult() log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) if not os.path.exists(log_dir): @@ -2132,11 +3152,13 @@ try: rpcs_to_send = '--rpc="UnaryCall"' if test_case in _TESTS_TO_SEND_METADATA: - metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU}"'.format( + metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format( keyE=_TEST_METADATA_KEY, valueE=_TEST_METADATA_VALUE_EMPTY, keyU=_TEST_METADATA_KEY, - valueU=_TEST_METADATA_VALUE_UNARY) + valueU=_TEST_METADATA_VALUE_UNARY, + keyNU=_TEST_METADATA_NUMERIC_KEY, + valueNU=_TEST_METADATA_NUMERIC_VALUE) else: # Setting the arg explicitly to empty with '--metadata=""' # makes C# client fail @@ -2185,6 +3207,10 @@ try: elif test_case == 'gentle_failover': test_gentle_failover(gcp, backend_service, instance_group, secondary_zone_instance_group) + elif test_case == 'load_report_based_failover': + test_load_report_based_failover( + gcp, backend_service, instance_group, + secondary_zone_instance_group) elif test_case == 'ping_pong': test_ping_pong(gcp, backend_service, instance_group) elif test_case == 'remove_instance_group': @@ -2216,6 +3242,26 @@ try: elif test_case == 'circuit_breaking': test_circuit_breaking(gcp, backend_service, instance_group, same_zone_instance_group) + elif test_case == 'timeout': + test_timeout(gcp, backend_service, instance_group) + elif test_case == 'fault_injection': + test_fault_injection(gcp, backend_service, instance_group) + elif test_case == 'api_listener': + server_uri = test_api_listener(gcp, backend_service, + instance_group, + alternate_backend_service) + elif test_case == 'forwarding_rule_port_match': + server_uri = test_forwarding_rule_port_match( + gcp, backend_service, instance_group) + elif test_case == 'forwarding_rule_default_port': + server_uri = test_forwarding_rule_default_port( + gcp, backend_service, instance_group) + elif test_case == 'metadata_filter': + test_metadata_filter(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + elif test_case == 'csds': + test_csds(gcp, backend_service, instance_group, server_uri) else: logger.error('Unknown test case: %s', test_case) sys.exit(1) |