affinity_test.py
Go to the documentation of this file.
1 # Copyright 2021 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import time
16 from typing import List
17 
18 from absl import flags
19 from absl.testing import absltest
20 from google.protobuf import json_format
21 
22 from framework import xds_k8s_testcase
23 from framework import xds_url_map_testcase
24 from framework.helpers import skips
25 from framework.rpc import grpc_channelz
26 
27 logger = logging.getLogger(__name__)
28 flags.adopt_module_key_flags(xds_k8s_testcase)
29 
30 # Type aliases
31 _XdsTestServer = xds_k8s_testcase.XdsTestServer
32 _XdsTestClient = xds_k8s_testcase.XdsTestClient
33 _ChannelzChannelState = grpc_channelz.ChannelState
34 _Lang = skips.Lang
35 
36 # Testing consts
37 _TEST_AFFINITY_METADATA_KEY = 'xds_md'
38 _TD_PROPAGATE_CHECK_INTERVAL_SEC = 10
39 _TD_PROPAGATE_TIMEOUT = 600
40 _REPLICA_COUNT = 3
41 _RPC_COUNT = 100
42 
43 
44 class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
45 
46  @staticmethod
47  def is_supported(config: skips.TestConfig) -> bool:
48  if config.client_lang in _Lang.CPP | _Lang.JAVA:
49  return config.version_gte('v1.40.x')
50  elif config.client_lang == _Lang.GO:
51  return config.version_gte('v1.41.x')
52  elif config.client_lang == _Lang.PYTHON:
53  # TODO(https://github.com/grpc/grpc/issues/27430): supported after
54  # the issue is fixed.
55  return False
56  return True
57 
58  def test_affinity(self) -> None: # pylint: disable=too-many-statements
59 
60  with self.subTest('00_create_health_check'):
61  self.td.create_health_check()
62 
63  with self.subTest('01_create_backend_services'):
64  self.td.create_backend_service(
65  affinity_header=_TEST_AFFINITY_METADATA_KEY)
66 
67  with self.subTest('02_create_url_map'):
68  self.td.create_url_map(self.server_xds_host, self.server_xds_port)
69 
70  with self.subTest('03_create_target_proxy'):
71  self.td.create_target_proxy()
72 
73  with self.subTest('04_create_forwarding_rule'):
74  self.td.create_forwarding_rule(self.server_xds_port)
75 
76  test_servers: List[_XdsTestServer]
77  with self.subTest('05_start_test_servers'):
78  test_servers = self.startTestServers(replica_count=_REPLICA_COUNT)
79 
80  with self.subTest('06_add_server_backends_to_backend_services'):
81  self.setupServerBackends()
82 
83  test_client: _XdsTestClient
84  with self.subTest('07_start_test_client'):
85  test_client = self.startTestClient(test_servers[0],
86  rpc='EmptyCall',
87  metadata='EmptyCall:%s:123' %
88  _TEST_AFFINITY_METADATA_KEY)
89  # Validate the number of received endpoints and affinity configs.
90  config = test_client.csds.fetch_client_status(
91  log_level=logging.INFO)
92  self.assertIsNotNone(config)
93  json_config = json_format.MessageToDict(config)
94  parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
95  logging.info('Client received CSDS response: %s', parsed)
96  self.assertLen(parsed.endpoints, _REPLICA_COUNT)
97  self.assertEqual(
98  parsed.rds['virtualHosts'][0]['routes'][0]['route']
99  ['hashPolicy'][0]['header']['headerName'],
100  _TEST_AFFINITY_METADATA_KEY)
101  self.assertEqual(parsed.cds[0]['lbPolicy'], 'RING_HASH')
102 
103  with self.subTest('08_test_client_xds_config_exists'):
104  self.assertXdsConfigExists(test_client)
105 
106  with self.subTest('09_test_server_received_rpcs_from_test_client'):
107  self.assertSuccessfulRpcs(test_client)
108 
109  with self.subTest('10_first_100_affinity_rpcs_pick_same_backend'):
110  rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT)
111  json_lb_stats = json_format.MessageToDict(rpc_stats)
112  rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
113  json_lb_stats)
114  self.assertEqual(1, rpc_distribution.num_peers)
115  self.assertLen(
116  test_client.find_subchannels_with_state(
117  _ChannelzChannelState.READY),
118  1,
119  )
120  self.assertLen(
121  test_client.find_subchannels_with_state(
122  _ChannelzChannelState.IDLE),
123  2,
124  )
125  # Remember the backend inuse, and turn it down later.
126  first_backend_inuse = list(
127  rpc_distribution.raw['rpcsByPeer'].keys())[0]
128 
129  with self.subTest('11_turn_down_server_in_use'):
130  for s in test_servers:
131  if s.pod_name == first_backend_inuse:
132  logging.info('setting backend %s to NOT_SERVING',
133  s.pod_name)
134  s.set_not_serving()
135 
136  with self.subTest('12_wait_for_unhealth_status_propagation'):
137  deadline = time.time() + _TD_PROPAGATE_TIMEOUT
138  parsed = None
139  try:
140  while time.time() < deadline:
141  config = test_client.csds.fetch_client_status(
142  log_level=logging.INFO)
143  self.assertIsNotNone(config)
144  json_config = json_format.MessageToDict(config)
145  parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
146  if len(parsed.endpoints) == _REPLICA_COUNT - 1:
147  break
148  logging.info(
149  'CSDS got unexpected endpoints, will retry after %d seconds',
150  _TD_PROPAGATE_CHECK_INTERVAL_SEC)
151  time.sleep(_TD_PROPAGATE_CHECK_INTERVAL_SEC)
152  else:
153  self.fail(
154  'unhealthy status did not propagate after 600 seconds')
155  finally:
156  logging.info('Client received CSDS response: %s', parsed)
157 
158  with self.subTest('12_next_100_affinity_rpcs_pick_different_backend'):
159  rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT)
160  json_lb_stats = json_format.MessageToDict(rpc_stats)
161  rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
162  json_lb_stats)
163  self.assertEqual(1, rpc_distribution.num_peers)
164  new_backend_inuse = list(
165  rpc_distribution.raw['rpcsByPeer'].keys())[0]
166  self.assertNotEqual(new_backend_inuse, first_backend_inuse)
167 
168 
169 if __name__ == '__main__':
170  absltest.main(failfast=True)
tests.affinity_test.AffinityTest
Definition: affinity_test.py:44
tests.affinity_test.AffinityTest.test_affinity
None test_affinity(self)
Definition: affinity_test.py:58
keys
const void * keys
Definition: abseil-cpp/absl/random/internal/randen.cc:49
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
run_xds_tests.create_target_proxy
def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None)
Definition: run_xds_tests.py:2618
run_xds_tests.create_url_map
def create_url_map(gcp, name, backend_service, host_name)
Definition: run_xds_tests.py:2582
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
framework.rpc
Definition: tools/run_tests/xds_k8s_test_driver/framework/rpc/__init__.py:1
tests.affinity_test.AffinityTest.is_supported
bool is_supported(skips.TestConfig config)
Definition: affinity_test.py:47
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
run_xds_tests.create_health_check
def create_health_check(gcp, name)
Definition: run_xds_tests.py:2515


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:29