19 from typing
import Any, Dict, List, Optional
21 from absl
import flags
22 from google.cloud
import secretmanager_v1
23 from google.longrunning
import operations_pb2
25 from google.rpc
import code_pb2
26 from google.rpc
import error_details_pb2
27 from google.rpc
import status_pb2
28 from googleapiclient
import discovery
29 import googleapiclient.errors
30 import googleapiclient.http
36 logger = logging.getLogger(__name__)
37 PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
38 "private_api_key_secret_name",
40 help=
"Load Private API access key from the latest version of the secret "
41 "with the given name, in the format projects/*/secrets/*")
42 V1_DISCOVERY_URI = flags.DEFINE_string(
"v1_discovery_uri",
43 default=discovery.V1_DISCOVERY_URI,
44 help=
"Override v1 Discovery URI")
45 V2_DISCOVERY_URI = flags.DEFINE_string(
"v2_discovery_uri",
46 default=discovery.V2_DISCOVERY_URI,
47 help=
"Override v2 Discovery URI")
48 COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string(
49 "compute_v1_discovery_file",
51 help=
"Load compute v1 from discovery file")
52 GCP_UI_URL = flags.DEFINE_string(
"gcp_ui_url",
53 default=
"console.cloud.google.com",
54 help=
"Override GCP UI URL.")
57 _HttpError = googleapiclient.errors.HttpError
58 _HttpLib2Error = googleapiclient.http.httplib2.HttpLib2Error
60 Operation = operations_pb2.Operation
61 HttpRequest = googleapiclient.http.HttpRequest
68 v1_discovery_uri=None,
69 v2_discovery_uri=None,
70 compute_v1_discovery_file=None,
71 private_api_key_secret_name=None,
76 COMPUTE_V1_DISCOVERY_FILE.value)
78 PRIVATE_API_KEY_SECRET_NAME.value)
79 self.
gcp_ui_url = gcp_ui_url
or GCP_UI_URL.value
87 @functools.lru_cache(
None)
92 Return API key credential that identifies a GCP project allow-listed for
93 accessing private API discovery documents.
94 https://console.cloud.google.com/apis/credentials
96 This method lazy-loads the content of the key from the Secret Manager.
97 https://console.cloud.google.com/security/secret-manager
100 raise ValueError(
'private_api_key_secret_name must be set to '
101 'access private_api_key.')
103 secrets_api = self.
secrets(
'v1')
104 version_resource_path = secrets_api.secret_version_path(
106 secret_version=
'latest')
107 secret: secretmanager_v1.AccessSecretVersionResponse
108 secret = secrets_api.access_secret_version(name=version_resource_path)
109 return secret.payload.data.decode()
111 @functools.lru_cache(
None)
119 elif version ==
'v1alpha':
122 raise NotImplementedError(f
'Compute {version} not supported')
124 @functools.lru_cache(
None)
126 api_name =
'networksecurity'
127 if version ==
'v1alpha1':
132 visibility_labels=[
'NETWORKSECURITY_ALPHA'])
133 elif version ==
'v1beta1':
136 raise NotImplementedError(f
'Network Security {version} not supported')
138 @functools.lru_cache(
None)
140 api_name =
'networkservices'
141 if version ==
'v1alpha1':
146 visibility_labels=[
'NETWORKSERVICES_ALPHA'])
147 elif version ==
'v1beta1':
150 raise NotImplementedError(f
'Network Services {version} not supported')
153 @functools.lru_cache(
None)
156 return secretmanager_v1.SecretManagerServiceClient()
158 raise NotImplementedError(f
'Secret Manager {version} not supported')
160 @functools.lru_cache(
None)
161 def iam(self, version: str) -> discovery.Resource:
162 """Identity and Access Management (IAM) API.
164 https://cloud.google.com/iam/docs/reference/rest
165 https://googleapis.github.io/google-api-python-client/docs/dyn/iam_v1.html
171 raise NotImplementedError(
172 f
'Identity and Access Management (IAM) {version} not supported')
175 api = discovery.build(api_name,
177 cache_discovery=
False,
186 api_key: Optional[str] =
None,
187 visibility_labels: Optional[List] =
None):
190 params[
'key'] = api_key
191 if visibility_labels:
193 params[
'labels'] =
'_'.join(visibility_labels)
197 params_str =
'&' + (
'&'.join(f
'{k}={v}' for k, v
in params.items()))
199 api = discovery.build(
202 cache_discovery=
False,
203 discoveryServiceUrl=f
'{self.v2_discovery_uri}{params_str}')
208 with open(discovery_file,
'r')
as f:
209 api = discovery.build_from_document(f.read())
215 """Base error class for GCP API errors."""
218 class ResponseError(Error):
219 """The response was not a 2xx."""
222 error_details: Optional[str]
223 status: Optional[int]
231 self.
reason = cause._get_reason().strip()
235 if cause.resp
and cause.resp.status:
236 self.
status = cause.resp.status
241 return (f
'<ResponseError {self.status} when requesting {self.uri} '
242 f
'returned "{self.reason}". Details: "{self.error_details}">')
246 """A transport error has occurred."""
247 cause: _HttpLib2Error
254 return f
'<TransportError cause: {self.cause!r}>'
259 Operation was not successful.
261 Assuming Operation based on Google API Style Guide:
262 https://cloud.google.com/apis/design/design_patterns#long_running_operations
263 https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto
268 code_name: code_pb2.Code
269 error: status_pb2.Status
281 self.
name = operation.name
or 'unknown'
282 self.
code_name = code_pb2.Code.Name(operation.error.code)
289 return json_format.ParseDict(
292 ignore_unknown_fields=
True,
293 descriptor_pool=error_details_pb2.DESCRIPTOR.pool)
294 except (json_format.Error, TypeError)
as e:
299 (
"Can't parse response while processing OperationError: '%r', "
300 "error %r"), operation_response, e)
305 indent_l2 = indent_l1 * 2
307 result = (f
'{self.api_name} operation "{self.name}" failed.\n'
308 f
'{indent_l1}code: {self.error.code} ({self.code_name})\n'
309 f
'{indent_l1}message: "{self.error.message}"')
311 if self.
error.details:
312 result += f
'\n{indent_l1}details: [\n'
313 for any_error
in self.
error.details:
314 error_str = json_format.MessageToJson(any_error)
315 for line
in error_str.splitlines():
316 result += indent_l2 + line +
'\n'
317 result += f
'{indent_l1}]'
320 result += f
'\n metadata: \n'
321 metadata_str = json.dumps(self.
metadata, indent=2)
322 for line
in metadata_str.splitlines():
323 result += indent_l2 + line +
'\n'
324 result = result.rstrip()
331 _WAIT_FOR_OPERATION_SEC = 60 * 10
335 def __init__(self, api: discovery.Resource, project: str):
336 self.api: discovery.Resource = api
337 self.project: str = project
344 request: HttpRequest,
346 num_retries: Optional[int] = _GCP_API_RETRIES) -> Dict[str, Any]:
347 """Execute the immediate request.
350 Unmarshalled response as a dictionary.
353 ResponseError if the response was not a 2xx.
354 TransportError if a transport error has occurred.
356 if num_retries
is None:
359 return request.execute(num_retries=num_retries)
360 except _HttpError
as error:
362 except _HttpLib2Error
as error:
366 """Return a string with pretty-printed resource body."""
367 yaml_out: str = yaml.dump(body, explicit_start=
True, explicit_end=
True)
373 timeout_sec=_WAIT_FOR_OPERATION_SEC,
374 wait_sec=_WAIT_FIXED_SEC):
375 retryer = tenacity.Retrying(
376 retry=(tenacity.retry_if_not_result(test_success_fn) |
377 tenacity.retry_if_exception_type()),
378 wait=tenacity.wait_fixed(wait_sec),
379 stop=tenacity.stop_after_delay(timeout_sec),
380 after=tenacity.after_log(logger, logging.DEBUG),
382 return retryer(operation_request.execute)
386 GLOBAL_LOCATION =
'global'
388 def parent(self, location: Optional[str] = GLOBAL_LOCATION):
391 return f
'projects/{self.project}/locations/{location}'
394 return f
'{self.parent()}/{collection_name}/{name}'
398 logger.info(
"Creating %s resource:\n%s", self.
api_name,
400 create_req = collection.create(parent=self.
parent(),
408 raise NotImplementedError
413 raise NotImplementedError
416 resource = collection.get(name=full_name).execute()
417 logger.info(
'Loaded %s:\n%s', full_name,
422 full_name: str) -> bool:
423 logger.debug(
"Deleting %s", full_name)
425 self.
_execute(collection.delete(name=full_name))
427 except _HttpError
as error:
428 if error.resp
and error.resp.status == 404:
429 logger.info(
'%s not deleted since it does not exist', full_name)
431 logger.warning(
'Failed to delete %s, %r', full_name, error)
437 request: HttpRequest,
438 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
440 self.
_wait(operation, timeout_sec)
444 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
445 op_name = operation[
'name']
446 logger.debug(
'Waiting for %s operation, timeout %s sec: %s',
447 self.
api_name, timeout_sec, op_name)
449 op_request = self.api.projects().locations().operations().
get(
452 operation_request=op_request,
453 test_success_fn=
lambda result: result[
'done'],
454 timeout_sec=timeout_sec)
456 logger.debug(
'Completed operation: %s', operation)
457 if 'error' in operation: