subsetting_test.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import collections
16 from typing import List
17 
18 from absl import flags
19 from absl import logging
20 from absl.testing import absltest
21 from google.protobuf import json_format
22 
23 from framework import xds_k8s_testcase
24 from framework import xds_url_map_testcase
25 from framework.helpers import skips
26 
27 flags.adopt_module_key_flags(xds_k8s_testcase)
28 
29 # Type aliases
30 _XdsTestServer = xds_k8s_testcase.XdsTestServer
31 _XdsTestClient = xds_k8s_testcase.XdsTestClient
32 
33 _SUBSET_SIZE = 4
34 _NUM_BACKENDS = 8
35 _NUM_CLIENTS = 3
36 
37 
38 class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
39 
40  @staticmethod
41  def is_supported(config: skips.TestConfig) -> bool:
42  # Subsetting is an experimental feature where most work is done on the
43  # server-side. We limit it to only run on master branch to save
44  # resources.
45  return config.version_gte('master')
46 
47  def test_subsetting_basic(self) -> None:
48  with self.subTest('00_create_health_check'):
49  self.td.create_health_check()
50 
51  with self.subTest('01_create_backend_services'):
52  self.td.create_backend_service(subset_size=_SUBSET_SIZE)
53 
54  with self.subTest('02_create_url_map'):
55  self.td.create_url_map(self.server_xds_host, self.server_xds_port)
56 
57  with self.subTest('03_create_target_proxy'):
58  self.td.create_target_proxy()
59 
60  with self.subTest('04_create_forwarding_rule'):
61  self.td.create_forwarding_rule(self.server_xds_port)
62 
63  test_servers: List[_XdsTestServer]
64  with self.subTest('05_start_test_servers'):
65  test_servers = self.startTestServers(replica_count=_NUM_BACKENDS)
66 
67  with self.subTest('06_add_server_backends_to_backend_services'):
68  self.setupServerBackends()
69 
70  rpc_distribution = collections.defaultdict(int)
71  with self.subTest('07_start_test_client'):
72  for i in range(_NUM_CLIENTS):
73  # Clean created client pods if there is any
74  self.client_runner.cleanup(force=True)
75  # Create a test client
76  test_client: _XdsTestClient = self.startTestClient(
77  test_servers[0])
78  # Validate the number of received endpoints
79  config = test_client.csds.fetch_client_status(
80  log_level=logging.INFO)
81  self.assertIsNotNone(config)
82  json_config = json_format.MessageToDict(config)
83  parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
84  logging.info('Client %d received endpoints (len=%s): %s', i,
85  len(parsed.endpoints), parsed.endpoints)
86  self.assertLen(parsed.endpoints, _SUBSET_SIZE)
87  # Record RPC stats
88  lb_stats = self.getClientRpcStats(test_client,
89  _NUM_BACKENDS * 25)
90  for key, value in lb_stats.rpcs_by_peer.items():
91  rpc_distribution[key] += value
92 
93  with self.subTest('08_log_rpc_distribution'):
94  server_entries = sorted(rpc_distribution.items(),
95  key=lambda x: -x[1])
96  # Validate if clients are receiving different sets of backends (3
97  # client received a total of 4 unique backends == FAIL, a total of 5
98  # unique backends == PASS)
99  self.assertGreater(len(server_entries), _SUBSET_SIZE)
100  logging.info('RPC distribution (len=%s): %s', len(server_entries),
101  server_entries)
102  peak = server_entries[0][1]
103  mean = sum(map(lambda x: x[1],
104  server_entries)) / len(server_entries)
105  logging.info('Peak=%d Mean=%.1f Peak-to-Mean-Ratio=%.2f', peak,
106  mean, peak / mean)
107 
108 
109 if __name__ == '__main__':
110  absltest.main(failfast=True)
tests.subsetting_test.SubsettingTest.is_supported
bool is_supported(skips.TestConfig config)
Definition: subsetting_test.py:41
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
grpc::testing::sum
double sum(const T &container, F functor)
Definition: test/cpp/qps/stats.h:30
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
tests.subsetting_test.SubsettingTest
Definition: subsetting_test.py:38
run_xds_tests.create_target_proxy
def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None)
Definition: run_xds_tests.py:2618
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
run_xds_tests.create_url_map
def create_url_map(gcp, name, backend_service, host_name)
Definition: run_xds_tests.py:2582
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
tests.subsetting_test.SubsettingTest.test_subsetting_basic
None test_subsetting_basic(self)
Definition: subsetting_test.py:47
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
cleanup
Definition: cleanup.py:1
run_xds_tests.create_health_check
def create_health_check(gcp, name)
Definition: run_xds_tests.py:2515


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:27