16 from typing
import List
18 from absl
import flags
19 from absl.testing
import absltest
22 from framework
import xds_k8s_testcase
23 from framework
import xds_url_map_testcase
27 logger = logging.getLogger(__name__)
28 flags.adopt_module_key_flags(xds_k8s_testcase)
31 _XdsTestServer = xds_k8s_testcase.XdsTestServer
32 _XdsTestClient = xds_k8s_testcase.XdsTestClient
33 _ChannelzChannelState = grpc_channelz.ChannelState
37 _TEST_AFFINITY_METADATA_KEY =
'xds_md'
38 _TD_PROPAGATE_CHECK_INTERVAL_SEC = 10
39 _TD_PROPAGATE_TIMEOUT = 600
48 if config.client_lang
in _Lang.CPP | _Lang.JAVA:
49 return config.version_gte(
'v1.40.x')
50 elif config.client_lang == _Lang.GO:
51 return config.version_gte(
'v1.41.x')
52 elif config.client_lang == _Lang.PYTHON:
60 with self.subTest(
'00_create_health_check'):
63 with self.subTest(
'01_create_backend_services'):
64 self.td.create_backend_service(
65 affinity_header=_TEST_AFFINITY_METADATA_KEY)
67 with self.subTest(
'02_create_url_map'):
70 with self.subTest(
'03_create_target_proxy'):
73 with self.subTest(
'04_create_forwarding_rule'):
74 self.td.create_forwarding_rule(self.server_xds_port)
76 test_servers: List[_XdsTestServer]
77 with self.subTest(
'05_start_test_servers'):
78 test_servers = self.startTestServers(replica_count=_REPLICA_COUNT)
80 with self.subTest(
'06_add_server_backends_to_backend_services'):
81 self.setupServerBackends()
83 test_client: _XdsTestClient
84 with self.subTest(
'07_start_test_client'):
85 test_client = self.startTestClient(test_servers[0],
87 metadata=
'EmptyCall:%s:123' %
88 _TEST_AFFINITY_METADATA_KEY)
90 config = test_client.csds.fetch_client_status(
91 log_level=logging.INFO)
92 self.assertIsNotNone(config)
93 json_config = json_format.MessageToDict(config)
94 parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
95 logging.info(
'Client received CSDS response: %s', parsed)
96 self.assertLen(parsed.endpoints, _REPLICA_COUNT)
98 parsed.rds[
'virtualHosts'][0][
'routes'][0][
'route']
99 [
'hashPolicy'][0][
'header'][
'headerName'],
100 _TEST_AFFINITY_METADATA_KEY)
101 self.assertEqual(parsed.cds[0][
'lbPolicy'],
'RING_HASH')
103 with self.subTest(
'08_test_client_xds_config_exists'):
104 self.assertXdsConfigExists(test_client)
106 with self.subTest(
'09_test_server_received_rpcs_from_test_client'):
107 self.assertSuccessfulRpcs(test_client)
109 with self.subTest(
'10_first_100_affinity_rpcs_pick_same_backend'):
110 rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT)
111 json_lb_stats = json_format.MessageToDict(rpc_stats)
112 rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
114 self.assertEqual(1, rpc_distribution.num_peers)
116 test_client.find_subchannels_with_state(
117 _ChannelzChannelState.READY),
121 test_client.find_subchannels_with_state(
122 _ChannelzChannelState.IDLE),
126 first_backend_inuse = list(
127 rpc_distribution.raw[
'rpcsByPeer'].
keys())[0]
129 with self.subTest(
'11_turn_down_server_in_use'):
130 for s
in test_servers:
131 if s.pod_name == first_backend_inuse:
132 logging.info(
'setting backend %s to NOT_SERVING',
136 with self.subTest(
'12_wait_for_unhealth_status_propagation'):
137 deadline = time.time() + _TD_PROPAGATE_TIMEOUT
140 while time.time() < deadline:
141 config = test_client.csds.fetch_client_status(
142 log_level=logging.INFO)
143 self.assertIsNotNone(config)
144 json_config = json_format.MessageToDict(config)
145 parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
146 if len(parsed.endpoints) == _REPLICA_COUNT - 1:
149 'CSDS got unexpected endpoints, will retry after %d seconds',
150 _TD_PROPAGATE_CHECK_INTERVAL_SEC)
151 time.sleep(_TD_PROPAGATE_CHECK_INTERVAL_SEC)
154 'unhealthy status did not propagate after 600 seconds')
156 logging.info(
'Client received CSDS response: %s', parsed)
158 with self.subTest(
'12_next_100_affinity_rpcs_pick_different_backend'):
159 rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT)
160 json_lb_stats = json_format.MessageToDict(rpc_stats)
161 rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
163 self.assertEqual(1, rpc_distribution.num_peers)
164 new_backend_inuse = list(
165 rpc_distribution.raw[
'rpcsByPeer'].
keys())[0]
166 self.assertNotEqual(new_backend_inuse, first_backend_inuse)
169 if __name__ ==
'__main__':
170 absltest.main(failfast=
True)