csds_test.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 from typing import Tuple
16 
17 from absl import flags
18 from absl.testing import absltest
19 
20 from framework import xds_url_map_testcase
21 from framework.helpers import skips
22 from framework.test_app import client_app
23 
24 # Type aliases
25 HostRule = xds_url_map_testcase.HostRule
26 PathMatcher = xds_url_map_testcase.PathMatcher
27 GcpResourceManager = xds_url_map_testcase.GcpResourceManager
28 DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
29 RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
30 RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
31 XdsTestClient = client_app.XdsTestClient
32 _Lang = skips.Lang
33 
34 logger = logging.getLogger(__name__)
35 flags.adopt_module_key_flags(xds_url_map_testcase)
36 
37 _NUM_RPCS = 50
38 
39 
40 class TestBasicCsds(xds_url_map_testcase.XdsUrlMapTestCase):
41 
42  @staticmethod
43  def is_supported(config: skips.TestConfig) -> bool:
44  if config.client_lang == _Lang.NODE:
45  return config.version_gte('v1.5.x')
46  return True
47 
48  @staticmethod
50  host_rule: HostRule,
51  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
52  return host_rule, path_matcher
53 
54  def xds_config_validate(self, xds_config: DumpedXdsConfig):
55  # Validate Endpoint Configs
56  self.assertNumEndpoints(xds_config, 1)
57  # Validate Node
58  self.assertEqual(self.test_client.ip,
59  xds_config['node']['metadata']['INSTANCE_IP'])
60  # Validate Listeners
61  self.assertIsNotNone(xds_config.lds)
62  self.assertEqual(self.hostname(), xds_config.lds['name'])
63  # Validate Route Configs
64  self.assertTrue(xds_config.rds['virtualHosts'])
65  # Validate Clusters
66  self.assertEqual(1, len(xds_config.cds))
67  self.assertEqual('EDS', xds_config.cds[0]['type'])
68 
69  def rpc_distribution_validate(self, test_client: XdsTestClient):
70  rpc_distribution = self.configure_and_send(
71  test_client,
72  rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
73  num_rpcs=_NUM_RPCS)
74  self.assertEqual(_NUM_RPCS, rpc_distribution.num_oks)
75 
76 
77 if __name__ == '__main__':
78  absltest.main()
tests.url_map.csds_test.TestBasicCsds
Definition: csds_test.py:40
tests.url_map.csds_test.TestBasicCsds.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: csds_test.py:69
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
tests.url_map.csds_test.TestBasicCsds.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: csds_test.py:54
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
tests.url_map.csds_test.TestBasicCsds.is_supported
bool is_supported(skips.TestConfig config)
Definition: csds_test.py:43
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.url_map.csds_test.TestBasicCsds.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: csds_test.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:01