api.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import abc
15 import contextlib
16 import functools
17 import json
18 import logging
19 from typing import Any, Dict, List, Optional
20 
21 from absl import flags
22 from google.cloud import secretmanager_v1
23 from google.longrunning import operations_pb2
24 from google.protobuf import json_format
25 from google.rpc import code_pb2
26 from google.rpc import error_details_pb2
27 from google.rpc import status_pb2
28 from googleapiclient import discovery
29 import googleapiclient.errors
30 import googleapiclient.http
31 import tenacity
32 import yaml
33 
35 
36 logger = logging.getLogger(__name__)
37 PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
38  "private_api_key_secret_name",
39  default=None,
40  help="Load Private API access key from the latest version of the secret "
41  "with the given name, in the format projects/*/secrets/*")
42 V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri",
43  default=discovery.V1_DISCOVERY_URI,
44  help="Override v1 Discovery URI")
45 V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri",
46  default=discovery.V2_DISCOVERY_URI,
47  help="Override v2 Discovery URI")
48 COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string(
49  "compute_v1_discovery_file",
50  default=None,
51  help="Load compute v1 from discovery file")
52 GCP_UI_URL = flags.DEFINE_string("gcp_ui_url",
53  default="console.cloud.google.com",
54  help="Override GCP UI URL.")
55 
56 # Type aliases
57 _HttpError = googleapiclient.errors.HttpError
58 _HttpLib2Error = googleapiclient.http.httplib2.HttpLib2Error
60 Operation = operations_pb2.Operation
61 HttpRequest = googleapiclient.http.HttpRequest
62 
63 
65 
66  def __init__(self,
67  *,
68  v1_discovery_uri=None,
69  v2_discovery_uri=None,
70  compute_v1_discovery_file=None,
71  private_api_key_secret_name=None,
72  gcp_ui_url=None):
73  self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value
74  self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value
75  self.compute_v1_discovery_file = (compute_v1_discovery_file or
76  COMPUTE_V1_DISCOVERY_FILE.value)
77  self.private_api_key_secret_name = (private_api_key_secret_name or
78  PRIVATE_API_KEY_SECRET_NAME.value)
79  self.gcp_ui_url = gcp_ui_url or GCP_UI_URL.value
80  # TODO(sergiitk): add options to pass google Credentials
81  self._exit_stack = contextlib.ExitStack()
82 
83  def close(self):
84  self._exit_stack.close()
85 
86  @property
87  @functools.lru_cache(None)
88  def private_api_key(self):
89  """
90  Private API key.
91 
92  Return API key credential that identifies a GCP project allow-listed for
93  accessing private API discovery documents.
94  https://console.cloud.google.com/apis/credentials
95 
96  This method lazy-loads the content of the key from the Secret Manager.
97  https://console.cloud.google.com/security/secret-manager
98  """
99  if not self.private_api_key_secret_name:
100  raise ValueError('private_api_key_secret_name must be set to '
101  'access private_api_key.')
102 
103  secrets_api = self.secrets('v1')
104  version_resource_path = secrets_api.secret_version_path(
105  **secrets_api.parse_secret_path(self.private_api_key_secret_name),
106  secret_version='latest')
107  secret: secretmanager_v1.AccessSecretVersionResponse
108  secret = secrets_api.access_secret_version(name=version_resource_path)
109  return secret.payload.data.decode()
110 
111  @functools.lru_cache(None)
112  def compute(self, version):
113  api_name = 'compute'
114  if version == 'v1':
117  else:
118  return self._build_from_discovery_v1(api_name, version)
119  elif version == 'v1alpha':
120  return self._build_from_discovery_v1(api_name, 'alpha')
121 
122  raise NotImplementedError(f'Compute {version} not supported')
123 
124  @functools.lru_cache(None)
125  def networksecurity(self, version):
126  api_name = 'networksecurity'
127  if version == 'v1alpha1':
128  return self._build_from_discovery_v2(
129  api_name,
130  version,
131  api_key=self.private_api_key,
132  visibility_labels=['NETWORKSECURITY_ALPHA'])
133  elif version == 'v1beta1':
134  return self._build_from_discovery_v2(api_name, version)
135 
136  raise NotImplementedError(f'Network Security {version} not supported')
137 
138  @functools.lru_cache(None)
139  def networkservices(self, version):
140  api_name = 'networkservices'
141  if version == 'v1alpha1':
142  return self._build_from_discovery_v2(
143  api_name,
144  version,
145  api_key=self.private_api_key,
146  visibility_labels=['NETWORKSERVICES_ALPHA'])
147  elif version == 'v1beta1':
148  return self._build_from_discovery_v2(api_name, version)
149 
150  raise NotImplementedError(f'Network Services {version} not supported')
151 
152  @staticmethod
153  @functools.lru_cache(None)
154  def secrets(version: str):
155  if version == 'v1':
156  return secretmanager_v1.SecretManagerServiceClient()
157 
158  raise NotImplementedError(f'Secret Manager {version} not supported')
159 
160  @functools.lru_cache(None)
161  def iam(self, version: str) -> discovery.Resource:
162  """Identity and Access Management (IAM) API.
163 
164  https://cloud.google.com/iam/docs/reference/rest
165  https://googleapis.github.io/google-api-python-client/docs/dyn/iam_v1.html
166  """
167  api_name = 'iam'
168  if version == 'v1':
169  return self._build_from_discovery_v1(api_name, version)
170 
171  raise NotImplementedError(
172  f'Identity and Access Management (IAM) {version} not supported')
173 
174  def _build_from_discovery_v1(self, api_name, version):
175  api = discovery.build(api_name,
176  version,
177  cache_discovery=False,
178  discoveryServiceUrl=self.v1_discovery_uri)
179  self._exit_stack.enter_context(api)
180  return api
181 
183  api_name,
184  version,
185  *,
186  api_key: Optional[str] = None,
187  visibility_labels: Optional[List] = None):
188  params = {}
189  if api_key:
190  params['key'] = api_key
191  if visibility_labels:
192  # Dash-separated list of labels.
193  params['labels'] = '_'.join(visibility_labels)
194 
195  params_str = ''
196  if params:
197  params_str = '&' + ('&'.join(f'{k}={v}' for k, v in params.items()))
198 
199  api = discovery.build(
200  api_name,
201  version,
202  cache_discovery=False,
203  discoveryServiceUrl=f'{self.v2_discovery_uri}{params_str}')
204  self._exit_stack.enter_context(api)
205  return api
206 
207  def _build_from_file(self, discovery_file):
208  with open(discovery_file, 'r') as f:
209  api = discovery.build_from_document(f.read())
210  self._exit_stack.enter_context(api)
211  return api
212 
213 
214 class Error(Exception):
215  """Base error class for GCP API errors."""
216 
217 
218 class ResponseError(Error):
219  """The response was not a 2xx."""
220  reason: str
221  uri: str
222  error_details: Optional[str]
223  status: Optional[int]
224  cause: _HttpError
225 
226  def __init__(self, cause: _HttpError):
227  # TODO(sergiitk): cleanup when we upgrade googleapiclient:
228  # - remove _get_reason()
229  # - remove error_details note
230  # - use status_code()
231  self.reason = cause._get_reason().strip() # noqa
232  self.uri = cause.uri
233  self.error_details = cause.error_details # NOTE: Must after _get_reason
234  self.status = None
235  if cause.resp and cause.resp.status:
236  self.status = cause.resp.status
237  self.cause = cause
238  super().__init__()
239 
240  def __repr__(self):
241  return (f'<ResponseError {self.status} when requesting {self.uri} '
242  f'returned "{self.reason}". Details: "{self.error_details}">')
243 
244 
246  """A transport error has occurred."""
247  cause: _HttpLib2Error
248 
249  def __init__(self, cause: _HttpLib2Error):
250  self.cause = cause
251  super().__init__()
252 
253  def __repr__(self):
254  return f'<TransportError cause: {self.cause!r}>'
255 
256 
258  """
259  Operation was not successful.
260 
261  Assuming Operation based on Google API Style Guide:
262  https://cloud.google.com/apis/design/design_patterns#long_running_operations
263  https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto
264  """
265  api_name: str
266  name: str
267  metadata: Any
268  code_name: code_pb2.Code
269  error: status_pb2.Status
270 
271  def __init__(self, api_name: str, response: dict):
272  self.api_name = api_name
273 
274  # Operation.metadata field is Any specific to the API. It may not be
275  # present in the default descriptor pool, and that's expected.
276  # To avoid json_format.ParseError, handle it separately.
277  self.metadata = response.pop('metadata', {})
278 
279  # Must be after removing metadata field.
280  operation: Operation = self._parse_operation_response(response)
281  self.name = operation.name or 'unknown'
282  self.code_name = code_pb2.Code.Name(operation.error.code)
283  self.error = operation.error
284  super().__init__()
285 
286  @staticmethod
287  def _parse_operation_response(operation_response: dict) -> Operation:
288  try:
289  return json_format.ParseDict(
290  operation_response,
291  Operation(),
292  ignore_unknown_fields=True,
293  descriptor_pool=error_details_pb2.DESCRIPTOR.pool)
294  except (json_format.Error, TypeError) as e:
295  # Swallow parsing errors if any. Building correct OperationError()
296  # is more important than losing debug information. Details still
297  # can be extracted from the warning.
298  logger.warning(
299  ("Can't parse response while processing OperationError: '%r', "
300  "error %r"), operation_response, e)
301  return Operation()
302 
303  def __str__(self):
304  indent_l1 = ' ' * 2
305  indent_l2 = indent_l1 * 2
306 
307  result = (f'{self.api_name} operation "{self.name}" failed.\n'
308  f'{indent_l1}code: {self.error.code} ({self.code_name})\n'
309  f'{indent_l1}message: "{self.error.message}"')
310 
311  if self.error.details:
312  result += f'\n{indent_l1}details: [\n'
313  for any_error in self.error.details:
314  error_str = json_format.MessageToJson(any_error)
315  for line in error_str.splitlines():
316  result += indent_l2 + line + '\n'
317  result += f'{indent_l1}]'
318 
319  if self.metadata:
320  result += f'\n metadata: \n'
321  metadata_str = json.dumps(self.metadata, indent=2)
322  for line in metadata_str.splitlines():
323  result += indent_l2 + line + '\n'
324  result = result.rstrip()
325 
326  return result
327 
328 
330  # TODO(sergiitk): move someplace better
331  _WAIT_FOR_OPERATION_SEC = 60 * 10
332  _WAIT_FIXED_SEC = 2
333  _GCP_API_RETRIES = 5
334 
335  def __init__(self, api: discovery.Resource, project: str):
336  self.api: discovery.Resource = api
337  self.project: str = project
339 
340  # TODO(sergiitk): in upcoming GCP refactoring, differentiate between
341  # _execute for LRO (Long Running Operations), and immediate operations.
342  def _execute(
343  self,
344  request: HttpRequest,
345  *,
346  num_retries: Optional[int] = _GCP_API_RETRIES) -> Dict[str, Any]:
347  """Execute the immediate request.
348 
349  Returns:
350  Unmarshalled response as a dictionary.
351 
352  Raises:
353  ResponseError if the response was not a 2xx.
354  TransportError if a transport error has occurred.
355  """
356  if num_retries is None:
357  num_retries = self._GCP_API_RETRIES
358  try:
359  return request.execute(num_retries=num_retries)
360  except _HttpError as error:
361  raise ResponseError(error)
362  except _HttpLib2Error as error:
363  raise TransportError(error)
364 
365  def resource_pretty_format(self, body: dict) -> str:
366  """Return a string with pretty-printed resource body."""
367  yaml_out: str = yaml.dump(body, explicit_start=True, explicit_end=True)
368  return self._highlighter.highlight(yaml_out)
369 
370  @staticmethod
371  def wait_for_operation(operation_request,
372  test_success_fn,
373  timeout_sec=_WAIT_FOR_OPERATION_SEC,
374  wait_sec=_WAIT_FIXED_SEC):
375  retryer = tenacity.Retrying(
376  retry=(tenacity.retry_if_not_result(test_success_fn) |
377  tenacity.retry_if_exception_type()),
378  wait=tenacity.wait_fixed(wait_sec),
379  stop=tenacity.stop_after_delay(timeout_sec),
380  after=tenacity.after_log(logger, logging.DEBUG),
381  reraise=True)
382  return retryer(operation_request.execute)
383 
384 
386  GLOBAL_LOCATION = 'global'
387 
388  def parent(self, location: Optional[str] = GLOBAL_LOCATION):
389  if location is None:
390  location = self.GLOBAL_LOCATION
391  return f'projects/{self.project}/locations/{location}'
392 
393  def resource_full_name(self, name, collection_name):
394  return f'{self.parent()}/{collection_name}/{name}'
395 
396  def _create_resource(self, collection: discovery.Resource, body: dict,
397  **kwargs):
398  logger.info("Creating %s resource:\n%s", self.api_name,
399  self.resource_pretty_format(body))
400  create_req = collection.create(parent=self.parent(),
401  body=body,
402  **kwargs)
403  self._execute(create_req)
404 
405  @property
406  @abc.abstractmethod
407  def api_name(self) -> str:
408  raise NotImplementedError
409 
410  @property
411  @abc.abstractmethod
412  def api_version(self) -> str:
413  raise NotImplementedError
414 
415  def _get_resource(self, collection: discovery.Resource, full_name):
416  resource = collection.get(name=full_name).execute()
417  logger.info('Loaded %s:\n%s', full_name,
418  self.resource_pretty_format(resource))
419  return resource
420 
421  def _delete_resource(self, collection: discovery.Resource,
422  full_name: str) -> bool:
423  logger.debug("Deleting %s", full_name)
424  try:
425  self._execute(collection.delete(name=full_name))
426  return True
427  except _HttpError as error:
428  if error.resp and error.resp.status == 404:
429  logger.info('%s not deleted since it does not exist', full_name)
430  else:
431  logger.warning('Failed to delete %s, %r', full_name, error)
432  return False
433 
434  # TODO(sergiitk): Use ResponseError and TransportError
435  def _execute( # pylint: disable=arguments-differ
436  self,
437  request: HttpRequest,
438  timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
439  operation = request.execute(num_retries=self._GCP_API_RETRIES)
440  self._wait(operation, timeout_sec)
441 
442  def _wait(self,
443  operation,
444  timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
445  op_name = operation['name']
446  logger.debug('Waiting for %s operation, timeout %s sec: %s',
447  self.api_name, timeout_sec, op_name)
448 
449  op_request = self.api.projects().locations().operations().get(
450  name=op_name)
451  operation = self.wait_for_operation(
452  operation_request=op_request,
453  test_success_fn=lambda result: result['done'],
454  timeout_sec=timeout_sec)
455 
456  logger.debug('Completed operation: %s', operation)
457  if 'error' in operation:
458  raise OperationError(self.api_name, operation)
framework.infrastructure.gcp.api.GcpStandardCloudApiResource._create_resource
def _create_resource(self, discovery.Resource collection, dict body, **kwargs)
Definition: api.py:396
framework.infrastructure.gcp.api.GcpApiManager.v1_discovery_uri
v1_discovery_uri
Definition: api.py:67
framework.infrastructure.gcp.api.GcpStandardCloudApiResource.api_version
str api_version(self)
Definition: api.py:412
framework.infrastructure.gcp.api.GcpApiManager.close
def close(self)
Definition: api.py:83
framework.infrastructure.gcp.api.GcpStandardCloudApiResource._delete_resource
bool _delete_resource(self, discovery.Resource collection, str full_name)
Definition: api.py:421
framework.infrastructure.gcp.api.GcpProjectApiResource.resource_pretty_format
str resource_pretty_format(self, dict body)
Definition: api.py:365
framework.infrastructure.gcp.api.GcpApiManager.compute_v1_discovery_file
compute_v1_discovery_file
Definition: api.py:69
framework.infrastructure.gcp.api.GcpProjectApiResource
Definition: api.py:329
framework.infrastructure.gcp.api.OperationError.metadata
metadata
Definition: api.py:277
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
framework.infrastructure.gcp.api.GcpStandardCloudApiResource._wait
def _wait(self, operation, timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC)
Definition: api.py:442
framework.infrastructure.gcp.api.ResponseError.uri
uri
Definition: api.py:232
framework.infrastructure.gcp.api.GcpApiManager.private_api_key
def private_api_key(self)
Definition: api.py:88
framework.infrastructure.gcp.api.GcpApiManager.iam
discovery.Resource iam(self, str version)
Definition: api.py:161
framework.infrastructure.gcp.api.OperationError.__init__
def __init__(self, str api_name, dict response)
Definition: api.py:271
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
framework.infrastructure.gcp.api.GcpProjectApiResource._highlighter
_highlighter
Definition: api.py:338
framework.infrastructure.gcp.api.ResponseError.status
status
Definition: api.py:234
framework.infrastructure.gcp.api.ResponseError.reason
reason
Definition: api.py:231
framework.infrastructure.gcp.api.GcpApiManager.__init__
def __init__(self, *v1_discovery_uri=None, v2_discovery_uri=None, compute_v1_discovery_file=None, private_api_key_secret_name=None, gcp_ui_url=None)
Definition: api.py:66
framework.infrastructure.gcp.api.TransportError.__init__
def __init__(self, _HttpLib2Error cause)
Definition: api.py:249
framework.infrastructure.gcp.api.GcpStandardCloudApiResource._get_resource
def _get_resource(self, discovery.Resource collection, full_name)
Definition: api.py:415
framework.infrastructure.gcp.api.GcpStandardCloudApiResource
Definition: api.py:385
framework.infrastructure.gcp.api.GcpStandardCloudApiResource.GLOBAL_LOCATION
string GLOBAL_LOCATION
Definition: api.py:386
framework.infrastructure.gcp.api.GcpApiManager.private_api_key_secret_name
private_api_key_secret_name
Definition: api.py:71
framework.infrastructure.gcp.api.ResponseError
Definition: api.py:218
framework.infrastructure.gcp.api.ResponseError.cause
cause
Definition: api.py:237
framework.infrastructure.gcp.api.GcpStandardCloudApiResource.api_name
str api_name(self)
Definition: api.py:407
framework.infrastructure.gcp.api.GcpApiManager.gcp_ui_url
gcp_ui_url
Definition: api.py:73
framework.infrastructure.gcp.api.GcpApiManager.networksecurity
def networksecurity(self, version)
Definition: api.py:125
framework.infrastructure.gcp.api.GcpApiManager._build_from_file
def _build_from_file(self, discovery_file)
Definition: api.py:207
framework.infrastructure.gcp.api._HighlighterYaml
_HighlighterYaml
Definition: api.py:59
framework.infrastructure.gcp.api.GcpApiManager._build_from_discovery_v2
def _build_from_discovery_v2(self, api_name, version, *Optional[str] api_key=None, Optional[List] visibility_labels=None)
Definition: api.py:182
framework.helpers.highlighter.HighlighterYaml
Definition: highlighter.py:91
framework.infrastructure.gcp.api.Error
Definition: api.py:214
framework.infrastructure.gcp.api.OperationError.code_name
code_name
Definition: api.py:282
framework.infrastructure.gcp.api.TransportError.__repr__
def __repr__(self)
Definition: api.py:253
framework.infrastructure.gcp.api.GcpStandardCloudApiResource._execute
def _execute(self, HttpRequest request, timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC)
Definition: api.py:435
framework.infrastructure.gcp.api.GcpApiManager.v2_discovery_uri
v2_discovery_uri
Definition: api.py:68
framework.infrastructure.gcp.api.OperationError.__str__
def __str__(self)
Definition: api.py:303
framework.infrastructure.gcp.api.GcpApiManager._build_from_discovery_v1
def _build_from_discovery_v1(self, api_name, version)
Definition: api.py:174
framework.infrastructure.gcp.api.GcpProjectApiResource.wait_for_operation
def wait_for_operation(operation_request, test_success_fn, timeout_sec=_WAIT_FOR_OPERATION_SEC, wait_sec=_WAIT_FIXED_SEC)
Definition: api.py:371
framework.infrastructure.gcp.api.TransportError.cause
cause
Definition: api.py:250
framework.infrastructure.gcp.api.TransportError
Definition: api.py:245
framework.infrastructure.gcp.api.ResponseError.error_details
error_details
Definition: api.py:233
framework.infrastructure.gcp.api.GcpStandardCloudApiResource.resource_full_name
def resource_full_name(self, name, collection_name)
Definition: api.py:393
framework.infrastructure.gcp.api.GcpProjectApiResource._GCP_API_RETRIES
int _GCP_API_RETRIES
Definition: api.py:333
framework.infrastructure.gcp.api.OperationError
Definition: api.py:257
framework.infrastructure.gcp.api.OperationError._parse_operation_response
Operation _parse_operation_response(dict operation_response)
Definition: api.py:287
open
#define open
Definition: test-fs.c:46
framework.infrastructure.gcp.api.GcpProjectApiResource.__init__
def __init__(self, discovery.Resource api, str project)
Definition: api.py:335
framework.infrastructure.gcp.api.OperationError.name
name
Definition: api.py:281
framework.infrastructure.gcp.api.GcpApiManager.secrets
def secrets(str version)
Definition: api.py:154
framework.infrastructure.gcp.api.GcpApiManager._exit_stack
_exit_stack
Definition: api.py:75
framework.infrastructure.gcp.api.OperationError.error
error
Definition: api.py:283
framework.infrastructure.gcp.api.GcpApiManager.networkservices
def networkservices(self, version)
Definition: api.py:139
framework.infrastructure.gcp.api.ResponseError.__init__
def __init__(self, _HttpError cause)
Definition: api.py:226
framework.infrastructure.gcp.api.GcpApiManager.compute
def compute(self, version)
Definition: api.py:112
framework.infrastructure.gcp.api.OperationError.api_name
api_name
Definition: api.py:272
framework.infrastructure.gcp.api.ResponseError.__repr__
def __repr__(self)
Definition: api.py:240
framework.infrastructure.gcp.api.Operation
Operation
Definition: api.py:60
framework.infrastructure.gcp.api.GcpApiManager
Definition: api.py:64
framework.infrastructure.gcp.api.GcpProjectApiResource._execute
Dict[str, Any] _execute(self, HttpRequest request, *Optional[int] num_retries=_GCP_API_RETRIES)
Definition: api.py:342
framework.infrastructure.gcp.api.GcpStandardCloudApiResource.parent
def parent(self, Optional[str] location=GLOBAL_LOCATION)
Definition: api.py:388
framework.helpers.highlighter
Definition: highlighter.py:1


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:31