17 from typing
import Any, Dict, List, Optional
19 from googleapiclient
import discovery
20 import googleapiclient.errors
26 logger = logging.getLogger(__name__)
31 _WAIT_FOR_BACKEND_SEC = 60 * 10
32 _WAIT_FOR_OPERATION_SEC = 60 * 10
34 @dataclasses.dataclass(frozen=
True)
39 @dataclasses.dataclass(frozen=
True)
47 super().
__init__(api_manager.compute(version), project)
59 protocol: HealthCheckProtocol,
61 port: Optional[int] =
None) ->
'GcpResource':
63 health_check_field =
'tcpHealthCheck'
65 health_check_field =
'grpcHealthCheck'
67 raise TypeError(f
'Unexpected Health Check protocol: {protocol}')
69 health_check_settings = {}
71 health_check_settings[
'portSpecification'] =
'USE_SERVING_PORT'
73 health_check_settings[
'portSpecification'] =
'USE_FIXED_PORT'
74 health_check_settings[
'port'] = port
77 self.api.healthChecks(), {
79 'type': protocol.name,
80 health_check_field: health_check_settings,
90 source_ranges: List[str],
91 ports: List[str]) -> Optional[
'GcpResource']:
94 self.api.firewalls(), {
99 "direction":
"INGRESS",
101 "network": network_url,
103 "sourceRanges": source_ranges,
104 "targetTags": [
"allow-health-checks"]
106 except googleapiclient.errors.HttpError
as http_error:
108 if http_error.resp.status == 409:
109 logger.debug(
'Firewall rule %s already existed', name)
120 health_check:
'GcpResource',
121 affinity_header: Optional[str] =
None,
122 protocol: Optional[BackendServiceProtocol] =
None,
123 subset_size: Optional[int] =
None,
124 locality_lb_policies: Optional[List[dict]] =
None) ->
'GcpResource':
126 raise TypeError(f
'Unexpected Backend Service protocol: {protocol}')
129 'loadBalancingScheme':
'INTERNAL_SELF_MANAGED',
130 'healthChecks': [health_check.url],
131 'protocol': protocol.name,
136 body[
'sessionAffinity'] =
'HEADER_FIELD'
137 body[
'localityLbPolicy'] =
'RING_HASH'
138 body[
'consistentHash'] = {
139 'httpHeaderName': affinity_header,
142 body[
'subsetting'] = {
143 'policy':
'CONSISTENT_HASH_SUBSETTING',
144 'subsetSize': subset_size
146 if locality_lb_policies:
147 body[
'localityLbPolicies'] = locality_lb_policies
156 backendService=backend_service.name,
164 max_rate_per_endpoint: Optional[int] =
None):
165 if max_rate_per_endpoint
is None:
166 max_rate_per_endpoint = 5
168 'group': backend.url,
169 'balancingMode':
'RATE',
170 'maxRatePerEndpoint': max_rate_per_endpoint
171 }
for backend
in backends]
174 body={
'backends': backend_list},
175 backendService=backend_service.name)
179 body={
'backends': []},
180 backendService=backend_service.name)
191 dst_default_backend_service:
'GcpResource',
192 dst_host_rule_match_backend_service: Optional[
'GcpResource'] =
None,
194 if dst_host_rule_match_backend_service
is None:
195 dst_host_rule_match_backend_service = dst_default_backend_service
197 self.api.urlMaps(), {
201 dst_default_backend_service.url,
204 'pathMatcher': matcher_name,
207 'name': matcher_name,
208 'defaultService': dst_host_rule_match_backend_service.url,
227 url_map:
'GcpResource',
228 validate_for_proxyless: bool =
True,
231 self.api.targetGrpcProxies(), {
233 'url_map': url_map.url,
234 'validate_for_proxyless': validate_for_proxyless,
244 url_map:
'GcpResource',
248 'url_map': url_map.url,
258 target_proxy:
'GcpResource',
261 ip_address: str =
'0.0.0.0') ->
'GcpResource':
263 self.api.globalForwardingRules(),
266 'loadBalancingScheme':
267 'INTERNAL_SELF_MANAGED',
268 'portRange': src_port,
269 'IPAddress': ip_address,
270 'network': network_url,
271 'target': target_proxy.url,
279 filter_str = (f
'(portRange eq "{src_port}-{src_port}") '
280 f
'(IPAddress eq "0.0.0.0")'
281 f
'(loadBalancingScheme eq "INTERNAL_SELF_MANAGED")')
287 'forwardingRule', name)
291 return not neg
or neg.get(
'size', 0) == 0
296 stop_max_delay=60 * 1000,
298 def _wait_for_network_endpoint_group_ready():
302 'Waiting for endpoints: NEG %s in zone %s, '
303 'current count %s', neg[
'name'], zone, neg.get(
'size'))
304 except googleapiclient.errors.HttpError
as error:
306 reason = error._get_reason()
307 logger.debug(
'Retrying NEG load, got %s, details %s',
308 error.resp.status, reason)
312 network_endpoint_group = _wait_for_network_endpoint_group_ready()
315 network_endpoint_group[
'selfLink'], zone)
318 neg = self.api.networkEndpointGroups().
get(project=self.project,
319 networkEndpointGroup=name,
328 timeout_sec=_WAIT_FOR_BACKEND_SEC,
331 pending =
set(backends)
333 @retrying.retry(retry_on_result=
lambda result:
not result,
334 stop_max_delay=timeout_sec * 1000,
335 wait_fixed=wait_sec * 1000)
336 def _retry_backends_health():
337 for backend
in pending:
339 backend_service, backend)
341 if 'healthStatus' not in result:
342 logger.debug(
'Waiting for instances: backend %s, zone %s',
343 backend.name, backend.zone)
346 backend_healthy =
True
347 for instance
in result[
'healthStatus']:
349 'Backend %s in zone %s: instance %s:%s health: %s',
350 backend.name, backend.zone, instance[
'ipAddress'],
351 instance[
'port'], instance[
'healthState'])
352 if instance[
'healthState'] !=
'HEALTHY':
353 backend_healthy =
False
356 logger.info(
'Backend %s in zone %s reported healthy',
357 backend.name, backend.zone)
358 pending.remove(backend)
362 _retry_backends_health()
365 return self.api.backendServices().getHealth(
366 project=self.project,
367 backendService=backend_service.name,
373 **kwargs) ->
'GcpResource':
374 resp = collection.get(project=self.project, **kwargs).execute()
375 logger.info(
'Loaded compute resource:\n%s',
377 return self.
GcpResource(resp[
'name'], resp[
'selfLink'])
380 self, collection: discovery.Resource, filter: str) -> bool:
381 resp = collection.list(
382 project=self.project, filter=filter,
384 if 'kind' not in resp:
386 raise ValueError(
'List response "kind" is missing')
387 return 'items' in resp
and resp[
'items']
390 body: Dict[str, Any]) ->
'GcpResource':
391 logger.info(
'Creating compute resource:\n%s',
393 resp = self.
_execute(collection.insert(project=self.project, body=body))
394 return self.
GcpResource(body[
'name'], resp[
'targetLink'])
397 logger.info(
'Patching compute resource:\n%s',
400 collection.patch(project=self.project, body=body, **kwargs))
403 return collection.list(project=self.project).execute(
407 resource_type: str, resource_name: str) -> bool:
409 params = {
"project": self.project, resource_type: resource_name}
410 self.
_execute(collection.delete(**params))
412 except googleapiclient.errors.HttpError
as error:
413 if error.resp
and error.resp.status == 404:
415 'Resource %s "%s" not deleted since it does not exist',
416 resource_type, resource_name)
418 logger.warning(
'Failed to delete %s "%s", %r', resource_type,
419 resource_name, error)
424 return 'status' in operation
and operation[
'status'] ==
'DONE'
430 test_success_fn=None,
431 timeout_sec=_WAIT_FOR_OPERATION_SEC):
433 logger.debug(
'Response %s', operation)
437 operation_request = self.api.globalOperations().
get(
438 project=self.project, operation=operation[
'name'])
440 if test_success_fn
is None:
443 logger.debug(
'Waiting for global operation %s, timeout %s sec',
444 operation[
'name'], timeout_sec)
446 test_success_fn=test_success_fn,
447 timeout_sec=timeout_sec)
449 if 'error' in response:
450 logger.debug(
'Waiting for global operation failed, response: %r',
452 raise Exception(f
'Operation {operation["name"]} did not complete '
453 f
'within {timeout_sec}s, error={response["error"]}')