xds_url_map_test_resources.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """A test framework built for urlMap related xDS test cases."""
15 
16 import functools
17 import inspect
18 from typing import Any, Iterable, Mapping, Tuple
19 
20 from absl import flags
21 from absl import logging
22 
23 from framework import xds_flags
24 from framework import xds_k8s_flags
26 from framework.infrastructure import gcp
27 from framework.infrastructure import k8s
28 from framework.infrastructure import traffic_director
29 from framework.test_app import client_app
30 from framework.test_app import server_app
31 
32 flags.adopt_module_key_flags(xds_flags)
33 flags.adopt_module_key_flags(xds_k8s_flags)
34 
35 STRATEGY = flags.DEFINE_enum('strategy',
36  default='reuse',
37  enum_values=['create', 'keep', 'reuse'],
38  help='Strategy of GCP resources management')
39 
40 # Type alias
41 UrlMapType = Any
42 HostRule = Any
43 PathMatcher = Any
44 
45 
47  """Where all the urlMap change happens."""
48 
49  def __init__(self, url_map_name: str):
50  self._map = {
51  "name": url_map_name,
52  "defaultService": GcpResourceManager().default_backend_service(),
53  "hostRules": [],
54  "pathMatchers": [],
55  }
56 
57  def get_map(self) -> UrlMapType:
58  return self._map
59 
60  def apply_change(self, test_case: 'XdsUrlMapTestCase') -> None:
61  logging.info('Apply urlMap change for test case: %s.%s',
62  test_case.short_module_name, test_case.__name__)
63  url_map_parts = test_case.url_map_change(
64  *self._get_test_case_url_map(test_case))
65  self._set_test_case_url_map(*url_map_parts)
66 
67  @staticmethod
69  test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]:
70  host_rule = {
71  "hosts": [test_case.hostname()],
72  "pathMatcher": test_case.path_matcher_name(),
73  }
74  path_matcher = {
75  "name": test_case.path_matcher_name(),
76  "defaultService": GcpResourceManager().default_backend_service(),
77  }
78  return host_rule, path_matcher
79 
80  def _set_test_case_url_map(self, host_rule: HostRule,
81  path_matcher: PathMatcher) -> None:
82  self._map["hostRules"].append(host_rule)
83  self._map["pathMatchers"].append(path_matcher)
84 
85 
86 def _package_flags() -> Mapping[str, Any]:
87  """Automatically parse Abseil flags into a dictionary.
88 
89  Abseil flag is only available after the Abseil app initialization. If we use
90  __new__ in our metaclass, the flag value parse will happen during the
91  initialization of modules, hence will fail. That's why we are using __call__
92  to inject metaclass magics, and the flag parsing will be delayed until the
93  class is about to be instantiated.
94  """
95  res = {}
96  for flag_module in [xds_flags, xds_k8s_flags]:
97  for key, value in inspect.getmembers(flag_module):
98  if isinstance(value, flags.FlagHolder):
99  res[key.lower()] = value.value
100  res['strategy'] = STRATEGY.value
101  return res
102 
103 
105  """Ensures singleton and injects flag values."""
106 
107  # Allow different subclasses to create different singletons.
108  _instances = {}
109  # But we only parse Abseil flags once.
110  _flags = None
111 
112  def __call__(cls, *args, **kwargs):
113  if cls not in cls._instances:
114  if cls._flags is None:
115  cls._flags = _package_flags()
116  obj = super().__call__(cls._flags, *args, **kwargs)
117  cls._instances[cls] = obj
118  return obj
119  return cls._instances[cls]
120 
121 
122 class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
123  """Manages the lifecycle of GCP resources.
124 
125  The GCP resources including:
126  - 3 K8s deployment (client, default backends, alternative backends)
127  - Full set of the Traffic Director stuff
128  - Merged gigantic urlMap from all imported test cases
129 
130  All resources are intended to be used across test cases and multiple runs
131  (except the client K8s deployment).
132  """
133 
134  # This class dynamically set, so disable "no-member" check.
135  # pylint: disable=no-member
136 
137  def __init__(self, absl_flags: Mapping[str, Any] = None):
138  if absl_flags is not None:
139  for key in absl_flags:
140  setattr(self, key, absl_flags[key])
141  # Pick a client_namespace_suffix if not set
142  if getattr(self, 'resource_suffix', None) is None:
143  self.resource_suffix = ""
144  else:
145  raise NotImplementedError(
146  'Predefined resource_suffix is not supported for UrlMap tests')
147  logging.info('GcpResourceManager: resource prefix=%s, suffix=%s',
148  self.resource_prefix, self.resource_suffix)
149  # API managers
150  self.k8s_api_manager = k8s.KubernetesApiManager(self.kube_context)
151  self.gcp_api_manager = gcp.api.GcpApiManager()
152  self.td = traffic_director.TrafficDirectorManager(
153  self.gcp_api_manager,
154  self.project,
155  resource_prefix=self.resource_prefix,
156  resource_suffix=(self.resource_suffix or ""),
157  network=self.network,
158  compute_api_version=self.compute_api_version,
159  )
160  # Kubernetes namespace
161  self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
162  self.resource_prefix)
163  # Kubernetes Test Servers
164  self.test_server_runner = server_app.KubernetesServerRunner(
165  self.k8s_namespace,
166  deployment_name=self.server_name,
167  image_name=self.server_image,
168  gcp_project=self.project,
169  gcp_api_manager=self.gcp_api_manager,
170  gcp_service_account=self.gcp_service_account,
171  td_bootstrap_image=self.td_bootstrap_image,
172  xds_server_uri=self.xds_server_uri,
173  network=self.network,
174  enable_workload_identity=self.enable_workload_identity)
175  self.test_server_alternative_runner = server_app.KubernetesServerRunner(
176  self.k8s_namespace,
177  deployment_name=self.server_name + '-alternative',
178  image_name=self.server_image,
179  gcp_project=self.project,
180  gcp_api_manager=self.gcp_api_manager,
181  gcp_service_account=self.gcp_service_account,
182  td_bootstrap_image=self.td_bootstrap_image,
183  xds_server_uri=self.xds_server_uri,
184  network=self.network,
185  enable_workload_identity=self.enable_workload_identity,
186  reuse_namespace=True)
187  self.test_server_affinity_runner = server_app.KubernetesServerRunner(
188  self.k8s_namespace,
189  deployment_name=self.server_name + '-affinity',
190  image_name=self.server_image,
191  gcp_project=self.project,
192  gcp_api_manager=self.gcp_api_manager,
193  gcp_service_account=self.gcp_service_account,
194  td_bootstrap_image=self.td_bootstrap_image,
195  xds_server_uri=self.xds_server_uri,
196  network=self.network,
197  enable_workload_identity=self.enable_workload_identity,
198  reuse_namespace=True)
199  logging.info('Strategy of GCP resources management: %s', self.strategy)
200 
202  if self.resource_suffix:
203  client_namespace_suffix = self.resource_suffix
204  else:
205  client_namespace_suffix = framework.helpers.rand.random_resource_suffix(
206  )
207  logging.info('GcpResourceManager: client_namespace_suffix=%s',
208  client_namespace_suffix)
209  # Kubernetes Test Client
210  return client_app.KubernetesClientRunner(
211  k8s.KubernetesNamespace(
212  self.k8s_api_manager,
213  client_app.KubernetesClientRunner.make_namespace_name(
214  self.resource_prefix, client_namespace_suffix)),
215  deployment_name=self.client_name,
216  image_name=self.client_image,
217  gcp_project=self.project,
218  gcp_api_manager=self.gcp_api_manager,
219  gcp_service_account=self.gcp_service_account,
220  td_bootstrap_image=self.td_bootstrap_image,
221  xds_server_uri=self.xds_server_uri,
222  network=self.network,
223  debug_use_port_forwarding=self.debug_use_port_forwarding,
224  enable_workload_identity=self.enable_workload_identity,
225  stats_port=self.client_port)
226 
227  def _pre_cleanup(self):
228  # Cleanup existing debris
229  logging.info('GcpResourceManager: pre clean-up')
230  self.td.cleanup(force=True)
231  self.test_server_runner.delete_namespace()
232 
233  def setup(self, test_case_classes: Iterable['XdsUrlMapTestCase']) -> None:
234  if self.strategy not in ['create', 'keep']:
235  logging.info('GcpResourceManager: skipping setup for strategy [%s]',
236  self.strategy)
237  return
238  # Clean up debris from previous runs
239  self._pre_cleanup()
240  # Start creating GCP resources
241  logging.info('GcpResourceManager: start setup')
242  # Firewall
243  if self.ensure_firewall:
244  self.td.create_firewall_rule(
245  allowed_ports=self.firewall_allowed_ports)
246  # Health Checks
247  self.td.create_health_check()
248  # Backend Services
249  self.td.create_backend_service()
250  self.td.create_alternative_backend_service()
251  self.td.create_affinity_backend_service()
252  # Construct UrlMap from test classes
253  aggregator = _UrlMapChangeAggregator(
254  url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
255  for test_case_class in test_case_classes:
256  aggregator.apply_change(test_case_class)
257  final_url_map = aggregator.get_map()
258  # UrlMap
259  self.td.create_url_map_with_content(final_url_map)
260  # Target Proxy
261  self.td.create_target_proxy()
262  # Forwarding Rule
263  self.td.create_forwarding_rule(self.server_xds_port)
264  # Kubernetes Test Server
265  self.test_server_runner.run(
266  test_port=self.server_port,
267  maintenance_port=self.server_maintenance_port)
268  # Kubernetes Test Server Alternative
270  test_port=self.server_port,
271  maintenance_port=self.server_maintenance_port)
272  # Kubernetes Test Server Affinity. 3 endpoints to test that only the
273  # picked sub-channel is connected.
275  test_port=self.server_port,
276  maintenance_port=self.server_maintenance_port,
277  replica_count=3)
278  # Add backend to default backend service
279  neg_name, neg_zones = self.k8s_namespace.get_service_neg(
280  self.test_server_runner.service_name, self.server_port)
281  self.td.backend_service_add_neg_backends(neg_name, neg_zones)
282  # Add backend to alternative backend service
283  neg_name_alt, neg_zones_alt = self.k8s_namespace.get_service_neg(
284  self.test_server_alternative_runner.service_name, self.server_port)
285  self.td.alternative_backend_service_add_neg_backends(
286  neg_name_alt, neg_zones_alt)
287  # Add backend to affinity backend service
288  neg_name_affinity, neg_zones_affinity = self.k8s_namespace.get_service_neg(
289  self.test_server_affinity_runner.service_name, self.server_port)
290  self.td.affinity_backend_service_add_neg_backends(
291  neg_name_affinity, neg_zones_affinity)
292  # Wait for healthy backends
293  self.td.wait_for_backends_healthy_status()
294  self.td.wait_for_alternative_backends_healthy_status()
295  self.td.wait_for_affinity_backends_healthy_status()
296 
297  def cleanup(self) -> None:
298  if self.strategy not in ['create']:
299  logging.info(
300  'GcpResourceManager: skipping tear down for strategy [%s]',
301  self.strategy)
302  return
303  logging.info('GcpResourceManager: start tear down')
304  if hasattr(self, 'td'):
305  self.td.cleanup(force=True)
306  if hasattr(self, 'test_server_runner'):
307  self.test_server_runner.cleanup(force=True)
308  if hasattr(self, 'test_server_alternative_runner'):
309  self.test_server_alternative_runner.cleanup(force=True,
310  force_namespace=True)
311  if hasattr(self, 'test_server_affinity_runner'):
312  self.test_server_affinity_runner.cleanup(force=True,
313  force_namespace=True)
314 
315  @functools.lru_cache(None)
316  def default_backend_service(self) -> str:
317  """Returns default backend service URL."""
318  self.td.load_backend_service()
319  return self.td.backend_service.url
320 
321  @functools.lru_cache(None)
322  def alternative_backend_service(self) -> str:
323  """Returns alternative backend service URL."""
324  self.td.load_alternative_backend_service()
325  return self.td.alternative_backend_service.url
326 
327  @functools.lru_cache(None)
328  def affinity_backend_service(self) -> str:
329  """Returns affinity backend service URL."""
330  self.td.load_affinity_backend_service()
331  return self.td.affinity_backend_service.url
framework.helpers.rand
Definition: rand.py:1
framework.xds_url_map_test_resources.GcpResourceManager._pre_cleanup
def _pre_cleanup(self)
Definition: xds_url_map_test_resources.py:227
framework.xds_url_map_test_resources.GcpResourceManager.cleanup
None cleanup(self)
Definition: xds_url_map_test_resources.py:297
framework.xds_url_map_test_resources._UrlMapChangeAggregator._set_test_case_url_map
None _set_test_case_url_map(self, HostRule host_rule, PathMatcher path_matcher)
Definition: xds_url_map_test_resources.py:80
framework.xds_url_map_test_resources.GcpResourceManager.test_server_affinity_runner
test_server_affinity_runner
Definition: xds_url_map_test_resources.py:187
framework.xds_url_map_test_resources.GcpResourceManager.k8s_namespace
k8s_namespace
Definition: xds_url_map_test_resources.py:161
framework.xds_url_map_test_resources.GcpResourceManager.test_server_runner
test_server_runner
Definition: xds_url_map_test_resources.py:164
framework.xds_url_map_test_resources._MetaSingletonAndAbslFlags.__call__
def __call__(cls, *args, **kwargs)
Definition: xds_url_map_test_resources.py:112
framework.xds_url_map_test_resources.GcpResourceManager.gcp_api_manager
gcp_api_manager
Definition: xds_url_map_test_resources.py:151
run_xds_tests.create_target_proxy
def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None)
Definition: run_xds_tests.py:2618
framework.xds_url_map_test_resources._UrlMapChangeAggregator
Definition: xds_url_map_test_resources.py:46
framework.xds_url_map_test_resources._UrlMapChangeAggregator.get_map
UrlMapType get_map(self)
Definition: xds_url_map_test_resources.py:57
framework.xds_url_map_test_resources._UrlMapChangeAggregator._get_test_case_url_map
Tuple[HostRule, PathMatcher] _get_test_case_url_map('XdsUrlMapTestCase' test_case)
Definition: xds_url_map_test_resources.py:68
framework.xds_url_map_test_resources._UrlMapChangeAggregator.apply_change
None apply_change(self, 'XdsUrlMapTestCase' test_case)
Definition: xds_url_map_test_resources.py:60
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
framework.xds_url_map_test_resources.GcpResourceManager.alternative_backend_service
str alternative_backend_service(self)
Definition: xds_url_map_test_resources.py:322
framework.xds_url_map_test_resources.GcpResourceManager.__init__
def __init__(self, Mapping[str, Any] absl_flags=None)
Definition: xds_url_map_test_resources.py:137
framework.xds_url_map_test_resources.GcpResourceManager.affinity_backend_service
str affinity_backend_service(self)
Definition: xds_url_map_test_resources.py:328
framework.xds_url_map_test_resources._MetaSingletonAndAbslFlags._flags
_flags
Definition: xds_url_map_test_resources.py:110
framework.xds_url_map_test_resources._UrlMapChangeAggregator.__init__
def __init__(self, str url_map_name)
Definition: xds_url_map_test_resources.py:49
framework.xds_url_map_test_resources.GcpResourceManager.create_test_client_runner
def create_test_client_runner(self)
Definition: xds_url_map_test_resources.py:201
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.xds_url_map_test_resources._MetaSingletonAndAbslFlags
Definition: xds_url_map_test_resources.py:104
framework.xds_url_map_test_resources.GcpResourceManager
Definition: xds_url_map_test_resources.py:122
framework.xds_url_map_test_resources.GcpResourceManager.td
td
Definition: xds_url_map_test_resources.py:152
framework.xds_url_map_test_resources._package_flags
Mapping[str, Any] _package_flags()
Definition: xds_url_map_test_resources.py:86
client.run
def run()
Definition: examples/python/async_streaming/client.py:109
framework.helpers.rand.random_resource_suffix
str random_resource_suffix()
Definition: rand.py:38
framework.xds_url_map_test_resources.GcpResourceManager.k8s_api_manager
k8s_api_manager
Definition: xds_url_map_test_resources.py:150
framework.xds_url_map_test_resources._UrlMapChangeAggregator._map
_map
Definition: xds_url_map_test_resources.py:50
framework.xds_url_map_test_resources._MetaSingletonAndAbslFlags._instances
dictionary _instances
Definition: xds_url_map_test_resources.py:108
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
cleanup
Definition: cleanup.py:1
framework.xds_url_map_test_resources.GcpResourceManager.setup
None setup(self, Iterable['XdsUrlMapTestCase'] test_case_classes)
Definition: xds_url_map_test_resources.py:233
framework.xds_url_map_test_resources.GcpResourceManager.test_server_alternative_runner
test_server_alternative_runner
Definition: xds_url_map_test_resources.py:175
framework.xds_url_map_test_resources.GcpResourceManager.default_backend_service
str default_backend_service(self)
Definition: xds_url_map_test_resources.py:316
run_xds_tests.create_health_check
def create_health_check(gcp, name)
Definition: run_xds_tests.py:2515
framework.xds_url_map_test_resources.GcpResourceManager.resource_suffix
resource_suffix
Definition: xds_url_map_test_resources.py:143


grpc
Author(s):
autogenerated on Fri May 16 2025 03:01:00