14 """A test framework built for urlMap related xDS test cases."""
17 from dataclasses
import dataclass
24 from typing
import Any, Iterable, Mapping, Optional, Tuple
27 from absl
import flags
28 from absl
import logging
29 from absl.testing
import absltest
33 from framework
import xds_k8s_testcase
34 from framework
import xds_url_map_test_resources
40 flags.adopt_module_key_flags(xds_k8s_testcase)
41 flags.adopt_module_key_flags(xds_url_map_test_resources)
44 QPS = flags.DEFINE_integer(
'qps', default=25, help=
'The QPS client is sending')
47 _URL_MAP_PROPAGATE_TIMEOUT_SEC = 600
50 _URL_MAP_PROPAGATE_CHECK_INTERVAL_SEC = 15
51 URL_MAP_TESTCASE_FILE_SUFFIX =
'_test.py'
52 _CLIENT_CONFIGURE_WAIT_SEC = 2
55 XdsTestClient = client_app.XdsTestClient
57 HostRule = xds_url_map_test_resources.HostRule
58 PathMatcher = xds_url_map_test_resources.PathMatcher
62 RpcTypeUnaryCall =
'UNARY_CALL'
63 RpcTypeEmptyCall =
'EMPTY_CALL'
67 """Turn camel case name to snake-case-like name."""
68 return ''.join(delimiter + c.lower()
if c.isupper()
else c
69 for c
in s).lstrip(delimiter)
73 """A convenience class to check xDS config.
75 Feel free to add more pre-compute fields.
87 for xds_config
in self.get(
'xdsConfig', []):
89 if 'listenerConfig' in xds_config:
90 self.
lds = xds_config[
'listenerConfig'][
'dynamicListeners'][
91 0][
'activeState'][
'listener']
92 elif 'routeConfig' in xds_config:
93 self.
rds = xds_config[
'routeConfig'][
'dynamicRouteConfigs'][
96 'dynamicRouteConfigs'][0][
'versionInfo']
97 elif 'clusterConfig' in xds_config:
98 for cluster
in xds_config[
'clusterConfig'][
99 'dynamicActiveClusters']:
100 self.
cds.append(cluster[
'cluster'])
101 elif 'endpointConfig' in xds_config:
102 for endpoint
in xds_config[
'endpointConfig'][
103 'dynamicEndpointConfigs']:
104 self.
eds.append(endpoint[
'endpointConfig'])
106 except Exception
as e:
107 logging.debug(
'Parsing dumped xDS config failed with %s: %s',
109 for generic_xds_config
in self.get(
'genericXdsConfigs', []):
111 if re.search(
r'\.Listener$', generic_xds_config[
'typeUrl']):
112 self.
lds = generic_xds_config[
"xdsConfig"]
113 elif re.search(
r'\.RouteConfiguration$',
114 generic_xds_config[
'typeUrl']):
115 self.
rds = generic_xds_config[
"xdsConfig"]
116 self.
rds_version = generic_xds_config[
"versionInfo"]
117 elif re.search(
r'\.Cluster$', generic_xds_config[
'typeUrl']):
118 self.
cds.append(generic_xds_config[
"xdsConfig"])
119 elif re.search(
r'\.ClusterLoadAssignment$',
120 generic_xds_config[
'typeUrl']):
121 self.
eds.append(generic_xds_config[
"xdsConfig"])
123 except Exception
as e:
124 logging.debug(
'Parsing dumped xDS config failed with %s: %s',
126 for endpoint_config
in self.
eds:
127 for endpoint
in endpoint_config.get(
'endpoints', {}):
128 for lb_endpoint
in endpoint.get(
'lbEndpoints', {}):
130 if lb_endpoint[
'healthStatus'] ==
'HEALTHY':
132 '%s:%s' % (lb_endpoint[
'endpoint'][
'address']
133 [
'socketAddress'][
'address'],
134 lb_endpoint[
'endpoint'][
'address']
135 [
'socketAddress'][
'portValue']))
137 except Exception
as e:
138 logging.debug(
'Parse endpoint failed with %s: %s',
142 return json.dumps(self, indent=2)
146 """A convenience class to check RPC distribution.
148 Feel free to add more pre-compute fields.
152 default_service_rpc_count: int
153 alternative_service_rpc_count: int
154 unary_call_default_service_rpc_count: int
155 empty_call_default_service_rpc_count: int
156 unary_call_alternative_service_rpc_count: int
157 empty_call_alternative_service_rpc_count: int
172 if 'rpcsByPeer' in json_lb_stats:
174 if 'rpcsByMethod' in json_lb_stats:
175 for rpc_type
in json_lb_stats[
'rpcsByMethod']:
176 for peer
in json_lb_stats[
'rpcsByMethod'][rpc_type][
178 count = json_lb_stats[
'rpcsByMethod'][rpc_type][
181 if rpc_type ==
'UnaryCall':
182 if 'alternative' in peer:
189 if 'alternative' in peer:
199 """Describes the expected result of assertRpcStatusCode method below."""
200 rpc_type: str = RpcTypeUnaryCall
206 """Tracking test case subclasses."""
209 _test_case_classes = []
213 _started_test_cases =
set()
214 _finished_test_cases =
set()
216 def __new__(cls, name: str, bases: Iterable[Any],
217 attrs: Mapping[str, Any]) -> Any:
224 module_name = os.path.split(
225 sys.modules[attrs[
'__module__']].__file__)[-1]
226 if module_name.endswith(URL_MAP_TESTCASE_FILE_SUFFIX):
227 module_name = module_name.replace(URL_MAP_TESTCASE_FILE_SUFFIX,
'')
228 attrs[
'short_module_name'] = module_name.replace(
'_',
'-')
230 new_class = type.__new__(cls, name, bases, attrs)
231 if name.startswith(
'Test'):
235 logging.debug(
'Skipping test case class: %s', name)
240 """XdsUrlMapTestCase is the base class for urlMap related tests.
242 The subclass is expected to implement 3 methods:
244 - url_map_change: Updates the urlMap components for this test case
245 - xds_config_validate: Validates if the client received legit xDS configs
246 - rpc_distribution_validate: Validates if the routing behavior is correct
251 """Allow the test case to decide whether it supports the given config.
254 A bool indicates if the given config is supported.
261 """Updates the initial RPC configs for this test case.
263 Each test case will start a test client. The client takes RPC configs
264 and starts to send RPCs immediately. The config returned by this
265 function will be used to replace the default configs.
267 The default configs are passed in as arguments, so this method can
271 rpc: The default rpc config, specifying RPCs to send, format
272 'UnaryCall,EmptyCall'
273 metadata: The metadata config, specifying metadata to send with each
274 RPC, format 'EmptyCall:key1:value1,UnaryCall:key2:value2'.
277 A tuple contains the updated rpc and metadata config.
285 path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
286 """Updates the dedicated urlMap components for this test case.
288 Each test case will have a dedicated HostRule, where the hostname is
289 generated from the test case name. The HostRule will be linked to a
290 PathMatcher, where stores the routing logic.
293 host_rule: A HostRule GCP resource as a JSON dict.
294 path_matcher: A PathMatcher GCP resource as a JSON dict.
297 A tuple contains the updated version of given HostRule and
303 """Validates received xDS config, if anything is wrong, raise.
305 This stage only ends when the control plane failed to send a valid
306 config within a given time range, like 600s.
309 xds_config: A DumpedXdsConfig instance can be used as a JSON dict,
310 but also provides helper fields for commonly checked xDS config.
315 """Validates the routing behavior, if any is wrong, raise.
318 test_client: A XdsTestClient instance for all sorts of end2end testing.
323 return "%s.%s:%s" % (cls.short_module_name,
_split_camel(
329 return "%s-%s-pm" % (cls.short_module_name,
_split_camel(cls.__name__))
333 logging.info(
'----- Testing %s -----', cls.__name__)
334 logging.info(
'Logs timezone: %s', time.localtime().tm_zone)
340 if not cls.started_test_cases:
343 cls.started_test_cases.
add(cls.__name__)
353 server_target=f
'xds:///{cls.hostname()}',
372 config = self.
test_client.csds.fetch_client_status(
373 log_level=logging.INFO)
374 self.assertIsNotNone(config)
380 def run(self, result: unittest.TestResult =
None) ->
None:
381 """Abort this test case if CSDS check is failed.
383 This prevents the test runner to waste time on RPC distribution test,
384 and yields clearer signal.
386 if result.failures
or result.errors:
387 logging.info(
'Aborting %s', self.__class__.__name__)
392 retryer = retryers.constant_retryer(
393 wait_fixed=datetime.timedelta(
394 seconds=_URL_MAP_PROPAGATE_CHECK_INTERVAL_SEC),
395 timeout=datetime.timedelta(seconds=_URL_MAP_PROPAGATE_TIMEOUT_SEC),
397 log_level=logging.INFO)
402 'latest xDS config:\n%s',
412 rpc_types: Iterable[str],
413 metadata: Optional[Iterable[Tuple[str, str,
415 app_timeout: Optional[int] =
None,
416 num_rpcs: int) -> RpcDistributionStats:
417 test_client.update_config.configure(rpc_types=rpc_types,
419 app_timeout=app_timeout)
421 time.sleep(_CLIENT_CONFIGURE_WAIT_SEC)
422 json_lb_stats = json_format.MessageToDict(
423 test_client.get_load_balancer_stats(num_rpcs=num_rpcs))
425 'Received LoadBalancerStatsResponse from test client %s:\n%s',
426 test_client.ip, json.dumps(json_lb_stats, indent=2))
431 xds_config.endpoints, k,
432 f
'insufficient endpoints in EDS: want={k} seen={xds_config.endpoints}'
436 self, test_client: XdsTestClient, *,
437 expected: Iterable[ExpectedResult], length: int,
438 tolerance: float) ->
None:
439 """Assert the distribution of RPC statuses over a period of time."""
441 before_stats = test_client.get_load_balancer_accumulated_stats()
443 'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s',
444 test_client.ip, before_stats)
446 after_stats = test_client.get_load_balancer_accumulated_stats()
448 'Received LoadBalancerAccumulatedStatsResponse from test client %s: after: \n%s',
449 test_client.ip, after_stats)
452 for expected_result
in expected:
453 rpc = expected_result.rpc_type
454 status = expected_result.status_code.value[0]
459 seen_after = after_stats.stats_per_method[rpc].result[status]
460 seen_before = before_stats.stats_per_method[rpc].result[status]
461 seen = seen_after - seen_before
463 stats_per_method_after = after_stats.stats_per_method.get(
464 rpc, {}).result.items()
466 x[1]
for x
in stats_per_method_after)
467 stats_per_method_before = before_stats.stats_per_method.get(
468 rpc, {}).result.items()
470 x[1]
for x
in stats_per_method_before)
471 total = total_after - total_before
473 want = total * expected_result.ratio
474 diff_ratio = abs(seen - want) / total
475 self.assertLessEqual(
476 diff_ratio, tolerance,
477 (f
'Expect rpc [{rpc}] to return '
478 f
'[{expected_result.status_code}] at '
479 f
'{expected_result.ratio:.2f} ratio: '
480 f
'seen={seen} want={want} total={total} '
481 f
'diff_ratio={diff_ratio:.4f} > {tolerance:.2f}'))