compute.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import dataclasses
15 import enum
16 import logging
17 from typing import Any, Dict, List, Optional
18 
19 from googleapiclient import discovery
20 import googleapiclient.errors
21 # TODO(sergiitk): replace with tenacity
22 import retrying
23 
24 from framework.infrastructure import gcp
25 
26 logger = logging.getLogger(__name__)
27 
28 
29 class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-public-methods
30  # TODO(sergiitk): move someplace better
31  _WAIT_FOR_BACKEND_SEC = 60 * 10
32  _WAIT_FOR_OPERATION_SEC = 60 * 10
33 
34  @dataclasses.dataclass(frozen=True)
35  class GcpResource:
36  name: str
37  url: str
38 
39  @dataclasses.dataclass(frozen=True)
41  zone: str
42 
43  def __init__(self,
44  api_manager: gcp.api.GcpApiManager,
45  project: str,
46  version: str = 'v1'):
47  super().__init__(api_manager.compute(version), project)
48 
49  class HealthCheckProtocol(enum.Enum):
50  TCP = enum.auto()
51  GRPC = enum.auto()
52 
53  class BackendServiceProtocol(enum.Enum):
54  HTTP2 = enum.auto()
55  GRPC = enum.auto()
56 
58  name: str,
59  protocol: HealthCheckProtocol,
60  *,
61  port: Optional[int] = None) -> 'GcpResource':
62  if protocol is self.HealthCheckProtocol.TCP:
63  health_check_field = 'tcpHealthCheck'
64  elif protocol is self.HealthCheckProtocol.GRPC:
65  health_check_field = 'grpcHealthCheck'
66  else:
67  raise TypeError(f'Unexpected Health Check protocol: {protocol}')
68 
69  health_check_settings = {}
70  if port is None:
71  health_check_settings['portSpecification'] = 'USE_SERVING_PORT'
72  else:
73  health_check_settings['portSpecification'] = 'USE_FIXED_PORT'
74  health_check_settings['port'] = port
75 
76  return self._insert_resource(
77  self.api.healthChecks(), {
78  'name': name,
79  'type': protocol.name,
80  health_check_field: health_check_settings,
81  })
82 
83  def list_health_check(self):
84  return self._list_resource(self.api.healthChecks())
85 
86  def delete_health_check(self, name):
87  self._delete_resource(self.api.healthChecks(), 'healthCheck', name)
88 
89  def create_firewall_rule(self, name: str, network_url: str,
90  source_ranges: List[str],
91  ports: List[str]) -> Optional['GcpResource']:
92  try:
93  return self._insert_resource(
94  self.api.firewalls(), {
95  "allowed": [{
96  "IPProtocol": "tcp",
97  "ports": ports
98  }],
99  "direction": "INGRESS",
100  "name": name,
101  "network": network_url,
102  "priority": 1000,
103  "sourceRanges": source_ranges,
104  "targetTags": ["allow-health-checks"]
105  })
106  except googleapiclient.errors.HttpError as http_error:
107  # TODO(lidiz) use status_code() when we upgrade googleapiclient
108  if http_error.resp.status == 409:
109  logger.debug('Firewall rule %s already existed', name)
110  return None
111  else:
112  raise
113 
114  def delete_firewall_rule(self, name):
115  self._delete_resource(self.api.firewalls(), 'firewall', name)
116 
118  self,
119  name: str,
120  health_check: 'GcpResource',
121  affinity_header: Optional[str] = None,
122  protocol: Optional[BackendServiceProtocol] = None,
123  subset_size: Optional[int] = None,
124  locality_lb_policies: Optional[List[dict]] = None) -> 'GcpResource':
125  if not isinstance(protocol, self.BackendServiceProtocol):
126  raise TypeError(f'Unexpected Backend Service protocol: {protocol}')
127  body = {
128  'name': name,
129  'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', # Traffic Director
130  'healthChecks': [health_check.url],
131  'protocol': protocol.name,
132  }
133  # If affinity header is specified, config the backend service to support
134  # affinity, and set affinity header to the one given.
135  if affinity_header:
136  body['sessionAffinity'] = 'HEADER_FIELD'
137  body['localityLbPolicy'] = 'RING_HASH'
138  body['consistentHash'] = {
139  'httpHeaderName': affinity_header,
140  }
141  if subset_size:
142  body['subsetting'] = {
143  'policy': 'CONSISTENT_HASH_SUBSETTING',
144  'subsetSize': subset_size
145  }
146  if locality_lb_policies:
147  body['localityLbPolicies'] = locality_lb_policies
148  return self._insert_resource(self.api.backendServices(), body)
149 
150  def get_backend_service_traffic_director(self, name: str) -> 'GcpResource':
151  return self._get_resource(self.api.backendServices(),
152  backendService=name)
153 
154  def patch_backend_service(self, backend_service, body, **kwargs):
155  self._patch_resource(collection=self.api.backendServices(),
156  backendService=backend_service.name,
157  body=body,
158  **kwargs)
159 
161  self,
162  backend_service,
163  backends,
164  max_rate_per_endpoint: Optional[int] = None):
165  if max_rate_per_endpoint is None:
166  max_rate_per_endpoint = 5
167  backend_list = [{
168  'group': backend.url,
169  'balancingMode': 'RATE',
170  'maxRatePerEndpoint': max_rate_per_endpoint
171  } for backend in backends]
172 
173  self._patch_resource(collection=self.api.backendServices(),
174  body={'backends': backend_list},
175  backendService=backend_service.name)
176 
177  def backend_service_remove_all_backends(self, backend_service):
178  self._patch_resource(collection=self.api.backendServices(),
179  body={'backends': []},
180  backendService=backend_service.name)
181 
182  def delete_backend_service(self, name):
183  self._delete_resource(self.api.backendServices(), 'backendService',
184  name)
185 
187  self,
188  name: str,
189  matcher_name: str,
190  src_hosts,
191  dst_default_backend_service: 'GcpResource',
192  dst_host_rule_match_backend_service: Optional['GcpResource'] = None,
193  ) -> 'GcpResource':
194  if dst_host_rule_match_backend_service is None:
195  dst_host_rule_match_backend_service = dst_default_backend_service
196  return self._insert_resource(
197  self.api.urlMaps(), {
198  'name':
199  name,
200  'defaultService':
201  dst_default_backend_service.url,
202  'hostRules': [{
203  'hosts': src_hosts,
204  'pathMatcher': matcher_name,
205  }],
206  'pathMatchers': [{
207  'name': matcher_name,
208  'defaultService': dst_host_rule_match_backend_service.url,
209  }],
210  })
211 
212  def create_url_map_with_content(self, url_map_body: Any) -> 'GcpResource':
213  return self._insert_resource(self.api.urlMaps(), url_map_body)
214 
215  def patch_url_map(self, url_map: 'GcpResource', body, **kwargs):
216  self._patch_resource(collection=self.api.urlMaps(),
217  urlMap=url_map.name,
218  body=body,
219  **kwargs)
220 
221  def delete_url_map(self, name):
222  self._delete_resource(self.api.urlMaps(), 'urlMap', name)
223 
225  self,
226  name: str,
227  url_map: 'GcpResource',
228  validate_for_proxyless: bool = True,
229  ) -> 'GcpResource':
230  return self._insert_resource(
231  self.api.targetGrpcProxies(), {
232  'name': name,
233  'url_map': url_map.url,
234  'validate_for_proxyless': validate_for_proxyless,
235  })
236 
237  def delete_target_grpc_proxy(self, name):
238  self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy',
239  name)
240 
242  self,
243  name: str,
244  url_map: 'GcpResource',
245  ) -> 'GcpResource':
246  return self._insert_resource(self.api.targetHttpProxies(), {
247  'name': name,
248  'url_map': url_map.url,
249  })
250 
251  def delete_target_http_proxy(self, name):
252  self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy',
253  name)
254 
256  name: str,
257  src_port: int,
258  target_proxy: 'GcpResource',
259  network_url: str,
260  *,
261  ip_address: str = '0.0.0.0') -> 'GcpResource':
262  return self._insert_resource(
263  self.api.globalForwardingRules(),
264  {
265  'name': name,
266  'loadBalancingScheme':
267  'INTERNAL_SELF_MANAGED', # Traffic Director
268  'portRange': src_port,
269  'IPAddress': ip_address,
270  'network': network_url,
271  'target': target_proxy.url,
272  })
273 
274  def exists_forwarding_rule(self, src_port) -> bool:
275  # TODO(sergiitk): Better approach for confirming the port is available.
276  # It's possible a rule allocates actual port range, e.g 8000-9000,
277  # and this wouldn't catch it. For now, we assume there's no
278  # port ranges used in the project.
279  filter_str = (f'(portRange eq "{src_port}-{src_port}") '
280  f'(IPAddress eq "0.0.0.0")'
281  f'(loadBalancingScheme eq "INTERNAL_SELF_MANAGED")')
282  return self._exists_resource(self.api.globalForwardingRules(),
283  filter=filter_str)
284 
285  def delete_forwarding_rule(self, name):
286  self._delete_resource(self.api.globalForwardingRules(),
287  'forwardingRule', name)
288 
289  @staticmethod
291  return not neg or neg.get('size', 0) == 0
292 
293  def wait_for_network_endpoint_group(self, name, zone):
294 
295  @retrying.retry(retry_on_result=self._network_endpoint_group_not_ready,
296  stop_max_delay=60 * 1000,
297  wait_fixed=2 * 1000)
298  def _wait_for_network_endpoint_group_ready():
299  try:
300  neg = self.get_network_endpoint_group(name, zone)
301  logger.debug(
302  'Waiting for endpoints: NEG %s in zone %s, '
303  'current count %s', neg['name'], zone, neg.get('size'))
304  except googleapiclient.errors.HttpError as error:
305  # noinspection PyProtectedMember
306  reason = error._get_reason()
307  logger.debug('Retrying NEG load, got %s, details %s',
308  error.resp.status, reason)
309  raise
310  return neg
311 
312  network_endpoint_group = _wait_for_network_endpoint_group_ready()
313  # TODO(sergiitk): dataclass
314  return self.ZonalGcpResource(network_endpoint_group['name'],
315  network_endpoint_group['selfLink'], zone)
316 
317  def get_network_endpoint_group(self, name, zone):
318  neg = self.api.networkEndpointGroups().get(project=self.project,
319  networkEndpointGroup=name,
320  zone=zone).execute()
321  # TODO(sergiitk): dataclass
322  return neg
323 
325  self,
326  backend_service,
327  backends,
328  timeout_sec=_WAIT_FOR_BACKEND_SEC,
329  wait_sec=4,
330  ):
331  pending = set(backends)
332 
333  @retrying.retry(retry_on_result=lambda result: not result,
334  stop_max_delay=timeout_sec * 1000,
335  wait_fixed=wait_sec * 1000)
336  def _retry_backends_health():
337  for backend in pending:
339  backend_service, backend)
340 
341  if 'healthStatus' not in result:
342  logger.debug('Waiting for instances: backend %s, zone %s',
343  backend.name, backend.zone)
344  continue
345 
346  backend_healthy = True
347  for instance in result['healthStatus']:
348  logger.debug(
349  'Backend %s in zone %s: instance %s:%s health: %s',
350  backend.name, backend.zone, instance['ipAddress'],
351  instance['port'], instance['healthState'])
352  if instance['healthState'] != 'HEALTHY':
353  backend_healthy = False
354 
355  if backend_healthy:
356  logger.info('Backend %s in zone %s reported healthy',
357  backend.name, backend.zone)
358  pending.remove(backend)
359 
360  return not pending
361 
362  _retry_backends_health()
363 
364  def get_backend_service_backend_health(self, backend_service, backend):
365  return self.api.backendServices().getHealth(
366  project=self.project,
367  backendService=backend_service.name,
368  body={
369  "group": backend.url
370  }).execute()
371 
372  def _get_resource(self, collection: discovery.Resource,
373  **kwargs) -> 'GcpResource':
374  resp = collection.get(project=self.project, **kwargs).execute()
375  logger.info('Loaded compute resource:\n%s',
376  self.resource_pretty_format(resp))
377  return self.GcpResource(resp['name'], resp['selfLink'])
378 
380  self, collection: discovery.Resource, filter: str) -> bool: # pylint: disable=redefined-builtin
381  resp = collection.list(
382  project=self.project, filter=filter,
383  maxResults=1).execute(num_retries=self._GCP_API_RETRIES)
384  if 'kind' not in resp:
385  # TODO(sergiitk): better error
386  raise ValueError('List response "kind" is missing')
387  return 'items' in resp and resp['items']
388 
389  def _insert_resource(self, collection: discovery.Resource,
390  body: Dict[str, Any]) -> 'GcpResource':
391  logger.info('Creating compute resource:\n%s',
392  self.resource_pretty_format(body))
393  resp = self._execute(collection.insert(project=self.project, body=body))
394  return self.GcpResource(body['name'], resp['targetLink'])
395 
396  def _patch_resource(self, collection, body, **kwargs):
397  logger.info('Patching compute resource:\n%s',
398  self.resource_pretty_format(body))
399  self._execute(
400  collection.patch(project=self.project, body=body, **kwargs))
401 
402  def _list_resource(self, collection: discovery.Resource):
403  return collection.list(project=self.project).execute(
404  num_retries=self._GCP_API_RETRIES)
405 
406  def _delete_resource(self, collection: discovery.Resource,
407  resource_type: str, resource_name: str) -> bool:
408  try:
409  params = {"project": self.project, resource_type: resource_name}
410  self._execute(collection.delete(**params))
411  return True
412  except googleapiclient.errors.HttpError as error:
413  if error.resp and error.resp.status == 404:
414  logger.info(
415  'Resource %s "%s" not deleted since it does not exist',
416  resource_type, resource_name)
417  else:
418  logger.warning('Failed to delete %s "%s", %r', resource_type,
419  resource_name, error)
420  return False
421 
422  @staticmethod
423  def _operation_status_done(operation):
424  return 'status' in operation and operation['status'] == 'DONE'
425 
426  def _execute( # pylint: disable=arguments-differ
427  self,
428  request,
429  *,
430  test_success_fn=None,
431  timeout_sec=_WAIT_FOR_OPERATION_SEC):
432  operation = request.execute(num_retries=self._GCP_API_RETRIES)
433  logger.debug('Response %s', operation)
434 
435  # TODO(sergiitk) try using wait() here
436  # https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait
437  operation_request = self.api.globalOperations().get(
438  project=self.project, operation=operation['name'])
439 
440  if test_success_fn is None:
441  test_success_fn = self._operation_status_done
442 
443  logger.debug('Waiting for global operation %s, timeout %s sec',
444  operation['name'], timeout_sec)
445  response = self.wait_for_operation(operation_request=operation_request,
446  test_success_fn=test_success_fn,
447  timeout_sec=timeout_sec)
448 
449  if 'error' in response:
450  logger.debug('Waiting for global operation failed, response: %r',
451  response)
452  raise Exception(f'Operation {operation["name"]} did not complete '
453  f'within {timeout_sec}s, error={response["error"]}')
454  return response
framework.infrastructure.gcp.api.GcpProjectApiResource.resource_pretty_format
str resource_pretty_format(self, dict body)
Definition: api.py:365
framework.infrastructure.gcp.compute.ComputeV1.create_firewall_rule
Optional[ 'GcpResource'] create_firewall_rule(self, str name, str network_url, List[str] source_ranges, List[str] ports)
Definition: compute.py:89
framework.infrastructure.gcp.api.GcpProjectApiResource
Definition: api.py:329
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
framework.infrastructure.gcp.compute.ComputeV1.create_health_check
'GcpResource' create_health_check(self, str name, HealthCheckProtocol protocol, *Optional[int] port=None)
Definition: compute.py:57
framework.infrastructure.gcp.compute.ComputeV1.get_network_endpoint_group
def get_network_endpoint_group(self, name, zone)
Definition: compute.py:317
framework.infrastructure.gcp.compute.ComputeV1._patch_resource
def _patch_resource(self, collection, body, **kwargs)
Definition: compute.py:396
framework.infrastructure.gcp.compute.ComputeV1.create_url_map_with_content
'GcpResource' create_url_map_with_content(self, Any url_map_body)
Definition: compute.py:212
framework.infrastructure.gcp.compute.ComputeV1._list_resource
def _list_resource(self, discovery.Resource collection)
Definition: compute.py:402
framework.infrastructure.gcp.compute.ComputeV1.backend_service_patch_backends
def backend_service_patch_backends(self, backend_service, backends, Optional[int] max_rate_per_endpoint=None)
Definition: compute.py:160
framework.infrastructure.gcp.compute.ComputeV1.GcpResource
Definition: compute.py:35
framework.infrastructure.gcp.compute.ComputeV1.wait_for_network_endpoint_group
def wait_for_network_endpoint_group(self, name, zone)
Definition: compute.py:293
framework.infrastructure.gcp.compute.ComputeV1.patch_backend_service
def patch_backend_service(self, backend_service, body, **kwargs)
Definition: compute.py:154
framework.infrastructure.gcp.compute.ComputeV1
Definition: compute.py:29
framework.infrastructure.gcp.compute.ComputeV1._network_endpoint_group_not_ready
def _network_endpoint_group_not_ready(neg)
Definition: compute.py:290
framework.infrastructure.gcp.compute.ComputeV1.BackendServiceProtocol
Definition: compute.py:53
framework.infrastructure.gcp.compute.ComputeV1._exists_resource
bool _exists_resource(self, discovery.Resource collection, str filter)
Definition: compute.py:379
framework.infrastructure.gcp.compute.ComputeV1.patch_url_map
def patch_url_map(self, 'GcpResource' url_map, body, **kwargs)
Definition: compute.py:215
framework.infrastructure.gcp.compute.ComputeV1.create_target_http_proxy
'GcpResource' create_target_http_proxy(self, str name, 'GcpResource' url_map)
Definition: compute.py:241
framework.infrastructure.gcp.compute.ComputeV1.delete_firewall_rule
def delete_firewall_rule(self, name)
Definition: compute.py:114
framework.infrastructure.gcp.compute.ComputeV1.delete_backend_service
def delete_backend_service(self, name)
Definition: compute.py:182
framework.infrastructure.gcp.compute.ComputeV1.delete_forwarding_rule
def delete_forwarding_rule(self, name)
Definition: compute.py:285
framework.infrastructure.gcp.compute.ComputeV1.delete_health_check
def delete_health_check(self, name)
Definition: compute.py:86
framework.infrastructure.gcp.compute.ComputeV1.backend_service_remove_all_backends
def backend_service_remove_all_backends(self, backend_service)
Definition: compute.py:177
framework.infrastructure.gcp.compute.ComputeV1.delete_target_grpc_proxy
def delete_target_grpc_proxy(self, name)
Definition: compute.py:237
framework.infrastructure.gcp.compute.ComputeV1.delete_url_map
def delete_url_map(self, name)
Definition: compute.py:221
framework.infrastructure.gcp.compute.ComputeV1.get_backend_service_traffic_director
'GcpResource' get_backend_service_traffic_director(self, str name)
Definition: compute.py:150
framework.infrastructure.gcp.compute.ComputeV1.delete_target_http_proxy
def delete_target_http_proxy(self, name)
Definition: compute.py:251
framework.infrastructure.gcp.compute.ComputeV1.ZonalGcpResource
Definition: compute.py:40
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.infrastructure.gcp.compute.ComputeV1.HealthCheckProtocol
Definition: compute.py:49
framework.infrastructure.gcp.compute.ComputeV1.create_url_map
'GcpResource' create_url_map(self, str name, str matcher_name, src_hosts, 'GcpResource' dst_default_backend_service, Optional['GcpResource'] dst_host_rule_match_backend_service=None)
Definition: compute.py:186
framework.infrastructure.gcp.compute.ComputeV1._get_resource
'GcpResource' _get_resource(self, discovery.Resource collection, **kwargs)
Definition: compute.py:372
framework.infrastructure.gcp.compute.ComputeV1.get_backend_service_backend_health
def get_backend_service_backend_health(self, backend_service, backend)
Definition: compute.py:364
framework.infrastructure.gcp.api.GcpProjectApiResource.wait_for_operation
def wait_for_operation(operation_request, test_success_fn, timeout_sec=_WAIT_FOR_OPERATION_SEC, wait_sec=_WAIT_FIXED_SEC)
Definition: api.py:371
framework.infrastructure.gcp.compute.ComputeV1.create_target_grpc_proxy
'GcpResource' create_target_grpc_proxy(self, str name, 'GcpResource' url_map, bool validate_for_proxyless=True)
Definition: compute.py:224
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
framework.infrastructure.gcp.compute.ComputeV1.__init__
def __init__(self, gcp.api.GcpApiManager api_manager, str project, str version='v1')
Definition: compute.py:43
framework.infrastructure.gcp.compute.ComputeV1._execute
def _execute(self, request, *test_success_fn=None, timeout_sec=_WAIT_FOR_OPERATION_SEC)
Definition: compute.py:426
framework.infrastructure.gcp.api.GcpProjectApiResource._GCP_API_RETRIES
int _GCP_API_RETRIES
Definition: api.py:333
framework.infrastructure.gcp.compute.ComputeV1._operation_status_done
def _operation_status_done(operation)
Definition: compute.py:423
framework.infrastructure.gcp.compute.ComputeV1.create_backend_service_traffic_director
'GcpResource' create_backend_service_traffic_director(self, str name, 'GcpResource' health_check, Optional[str] affinity_header=None, Optional[BackendServiceProtocol] protocol=None, Optional[int] subset_size=None, Optional[List[dict]] locality_lb_policies=None)
Definition: compute.py:117
framework.infrastructure.gcp.compute.ComputeV1.create_forwarding_rule
'GcpResource' create_forwarding_rule(self, str name, int src_port, 'GcpResource' target_proxy, str network_url, *str ip_address='0.0.0.0')
Definition: compute.py:255
framework.infrastructure.gcp.compute.ComputeV1.wait_for_backends_healthy_status
def wait_for_backends_healthy_status(self, backend_service, backends, timeout_sec=_WAIT_FOR_BACKEND_SEC, wait_sec=4)
Definition: compute.py:324
framework.infrastructure.gcp.compute.ComputeV1._insert_resource
'GcpResource' _insert_resource(self, discovery.Resource collection, Dict[str, Any] body)
Definition: compute.py:389
framework.infrastructure.gcp.compute.ComputeV1._delete_resource
bool _delete_resource(self, discovery.Resource collection, str resource_type, str resource_name)
Definition: compute.py:406
framework.infrastructure.gcp.compute.ComputeV1.list_health_check
def list_health_check(self)
Definition: compute.py:83
framework.infrastructure.gcp.api.GcpApiManager
Definition: api.py:64
framework.infrastructure.gcp.api.GcpProjectApiResource._execute
Dict[str, Any] _execute(self, HttpRequest request, *Optional[int] num_retries=_GCP_API_RETRIES)
Definition: api.py:342
framework.infrastructure.gcp.compute.ComputeV1.exists_forwarding_rule
bool exists_forwarding_rule(self, src_port)
Definition: compute.py:274


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52