iam.py
Go to the documentation of this file.
1 # Copyright 2021 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import dataclasses
15 import datetime
16 import functools
17 import logging
18 from typing import Any, Dict, FrozenSet, Optional
19 
20 from framework.helpers import retryers
21 from framework.infrastructure import gcp
22 
23 logger = logging.getLogger(__name__)
24 
25 # Type aliases
26 _timedelta = datetime.timedelta
27 _HttpRequest = gcp.api.HttpRequest
28 
29 
31  """
32  Indicates concurrent policy changes.
33 
34  https://cloud.google.com/iam/docs/policies#etag
35  """
36 
37 
39 
40  def wrap_retry_on_etag_conflict(*args, **kwargs):
41  retryer = retryers.exponential_retryer_with_timeout(
42  retry_on_exceptions=(EtagConflict, gcp.api.TransportError),
43  wait_min=_timedelta(seconds=1),
44  wait_max=_timedelta(seconds=10),
45  timeout=_timedelta(minutes=2))
46  return retryer(func, *args, **kwargs)
47 
48  return wrap_retry_on_etag_conflict
49 
50 
51 def _replace_binding(policy: 'Policy', binding: 'Policy.Binding',
52  new_binding: 'Policy.Binding') -> 'Policy':
53  new_bindings = set(policy.bindings)
54  new_bindings.discard(binding)
55  new_bindings.add(new_binding)
56  return dataclasses.replace(policy, bindings=frozenset(new_bindings)) # pylint: disable=too-many-function-args
57 
58 
59 @dataclasses.dataclass(frozen=True)
61  """An IAM service account.
62 
63  https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts
64  Note: "etag" field is skipped because it's deprecated
65  """
66  name: str
67  projectId: str
68  uniqueId: str
69  email: str
70  oauth2ClientId: str
71  displayName: str = ''
72  description: str = ''
73  disabled: bool = False
74 
75  @classmethod
76  def from_response(cls, response: Dict[str, Any]) -> 'ServiceAccount':
77  return cls(name=response['name'],
78  projectId=response['projectId'],
79  uniqueId=response['uniqueId'],
80  email=response['email'],
81  oauth2ClientId=response['oauth2ClientId'],
82  description=response.get('description', ''),
83  displayName=response.get('displayName', ''),
84  disabled=response.get('disabled', False))
85 
86  def as_dict(self) -> Dict[str, Any]:
87  return dataclasses.asdict(self)
88 
89 
90 @dataclasses.dataclass(frozen=True)
91 class Expr:
92  """
93  Represents a textual expression in the Common Expression Language syntax.
94 
95  https://cloud.google.com/iam/docs/reference/rest/v1/Expr
96  """
97  expression: str
98  title: str = ''
99  description: str = ''
100  location: str = ''
101 
102  @classmethod
103  def from_response(cls, response: Dict[str, Any]) -> 'Expr':
104  return cls(**response)
105 
106  def as_dict(self) -> Dict[str, Any]:
107  return dataclasses.asdict(self)
108 
109 
110 @dataclasses.dataclass(frozen=True)
111 class Policy:
112  """An Identity and Access Management (IAM) policy, which specifies
113  access controls for Google Cloud resources.
114 
115  https://cloud.google.com/iam/docs/reference/rest/v1/Policy
116  Note: auditConfigs not supported by this implementation.
117  """
118 
119  @dataclasses.dataclass(frozen=True)
120  class Binding:
121  """Policy Binding. Associates members with a role.
122 
123  https://cloud.google.com/iam/docs/reference/rest/v1/Policy#binding
124  """
125  role: str
126  members: FrozenSet[str]
127  condition: Optional[Expr] = None
128 
129  @classmethod
130  def from_response(cls, response: Dict[str, Any]) -> 'Policy.Binding':
131  fields = {
132  'role': response['role'],
133  'members': frozenset(response.get('members', [])),
134  }
135  if 'condition' in response:
136  fields['condition'] = Expr.from_response(response['condition'])
137 
138  return cls(**fields)
139 
140  def as_dict(self) -> Dict[str, Any]:
141  result = {
142  'role': self.role,
143  'members': list(self.members),
144  }
145  if self.condition is not None:
146  result['condition'] = self.condition.as_dict()
147  return result
148 
149  bindings: FrozenSet[Binding]
150  etag: str
151  version: Optional[int] = None
152 
153  @functools.lru_cache(maxsize=128)
155  self,
156  role: str,
157  condition: Optional[Expr] = None) -> Optional['Policy.Binding']:
158  results = (binding for binding in self.bindings
159  if binding.role == role and binding.condition == condition)
160  return next(results, None)
161 
162  @classmethod
163  def from_response(cls, response: Dict[str, Any]) -> 'Policy':
164  bindings = frozenset(
165  cls.Binding.from_response(b) for b in response.get('bindings', []))
166  return cls(bindings=bindings,
167  etag=response['etag'],
168  version=response.get('version'))
169 
170  def as_dict(self) -> Dict[str, Any]:
171  result = {
172  'bindings': [binding.as_dict() for binding in self.bindings],
173  'etag': self.etag,
174  }
175  if self.version is not None:
176  result['version'] = self.version
177  return result
178 
179 
181  """
182  Identity and Access Management (IAM) API.
183 
184  https://cloud.google.com/iam/docs/reference/rest
185  """
186  _service_accounts: gcp.api.discovery.Resource
187 
188  # Operations that affect conditional role bindings must specify version 3.
189  # Otherwise conditions are omitted, and role names returned with a suffix,
190  # f.e. roles/iam.workloadIdentityUser_withcond_f1ec33c9beb41857dbf0
191  # https://cloud.google.com/iam/docs/reference/rest/v1/Policy#FIELDS.version
192  POLICY_VERSION: int = 3
193 
194  def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
195  super().__init__(api_manager.iam('v1'), project)
196  # Shortcut to projects/*/serviceAccounts/ endpoints
197  self._service_accounts = self.api.projects().serviceAccounts()
198 
199  def service_account_resource_name(self, account) -> str:
200  """
201  Returns full resource name of the service account.
202 
203  The resource name of the service account in the following format:
204  projects/{PROJECT_ID}/serviceAccounts/{ACCOUNT}.
205  The ACCOUNT value can be the email address or the uniqueId of the
206  service account.
207  Ref https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/get
208 
209  Args:
210  account: The ACCOUNT value
211  """
212  return f'projects/{self.project}/serviceAccounts/{account}'
213 
214  def get_service_account(self, account: str) -> ServiceAccount:
215  resource_name = self.service_account_resource_name(account)
216  request: _HttpRequest = self._service_accounts.get(name=resource_name)
217  response: Dict[str, Any] = self._execute(request)
218  logger.debug('Loaded Service Account:\n%s',
219  self.resource_pretty_format(response))
220  return ServiceAccount.from_response(response)
221 
222  def get_service_account_iam_policy(self, account: str) -> Policy:
223  resource_name = self.service_account_resource_name(account)
224  request: _HttpRequest = self._service_accounts.getIamPolicy(
225  resource=resource_name,
226  options_requestedPolicyVersion=self.POLICY_VERSION)
227  response: Dict[str, Any] = self._execute(request)
228  logger.debug('Loaded Service Account Policy:\n%s',
229  self.resource_pretty_format(response))
230  return Policy.from_response(response)
231 
232  def set_service_account_iam_policy(self, account: str,
233  policy: Policy) -> Policy:
234  """Sets the IAM policy that is attached to a service account.
235 
236  https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
237  """
238  resource_name = self.service_account_resource_name(account)
239  body = {'policy': policy.as_dict()}
240  logger.debug('Updating Service Account %s policy:\n%s', account,
241  self.resource_pretty_format(body))
242  try:
243  request: _HttpRequest = self._service_accounts.setIamPolicy(
244  resource=resource_name, body=body)
245  response: Dict[str, Any] = self._execute(request)
246  return Policy.from_response(response)
247  except gcp.api.ResponseError as error:
248  if error.status == 409:
249  # https://cloud.google.com/iam/docs/policies#etag
250  logger.debug(error)
251  raise EtagConflict from error
252  raise
253 
254  @handle_etag_conflict
255  def add_service_account_iam_policy_binding(self, account: str, role: str,
256  member: str) -> None:
257  """Add an IAM policy binding to an IAM service account.
258 
259  See for details on updating policy bindings:
260  https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
261  """
262  policy: Policy = self.get_service_account_iam_policy(account)
263  binding: Optional[Policy.Binding] = policy.find_binding_for_role(role)
264  if binding and member in binding.members:
265  logger.debug('Member %s already has role %s for Service Account %s',
266  member, role, account)
267  return
268 
269  if binding is None:
270  updated_binding = Policy.Binding(role, frozenset([member]))
271  else:
272  updated_members: FrozenSet[str] = binding.members.union({member})
273  updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args
274  binding,
275  members=updated_members)
276 
277  updated_policy: Policy = _replace_binding(policy, binding,
278  updated_binding)
279  self.set_service_account_iam_policy(account, updated_policy)
280  logger.debug('Role %s granted to member %s for Service Account %s',
281  role, member, account)
282 
283  @handle_etag_conflict
284  def remove_service_account_iam_policy_binding(self, account: str, role: str,
285  member: str) -> None:
286  """Remove an IAM policy binding from the IAM policy of a service
287  account.
288 
289  See for details on updating policy bindings:
290  https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
291  """
292  policy: Policy = self.get_service_account_iam_policy(account)
293  binding: Optional[Policy.Binding] = policy.find_binding_for_role(role)
294 
295  if binding is None:
296  logger.debug('Noop: Service Account %s has no bindings for role %s',
297  account, role)
298  return
299  if member not in binding.members:
300  logger.debug(
301  'Noop: Service Account %s binding for role %s has no member %s',
302  account, role, member)
303  return
304 
305  updated_members: FrozenSet[str] = binding.members.difference({member})
306  updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args
307  binding,
308  members=updated_members)
309  updated_policy: Policy = _replace_binding(policy, binding,
310  updated_binding)
311  self.set_service_account_iam_policy(account, updated_policy)
312  logger.debug('Role %s revoked from member %s for Service Account %s',
313  role, member, account)
framework.infrastructure.gcp.iam.EtagConflict
Definition: iam.py:30
framework.infrastructure.gcp.iam.ServiceAccount.from_response
'ServiceAccount' from_response(cls, Dict[str, Any] response)
Definition: iam.py:76
framework.infrastructure.gcp.api.GcpProjectApiResource.resource_pretty_format
str resource_pretty_format(self, dict body)
Definition: api.py:365
framework.infrastructure.gcp.iam.Policy.Binding
Definition: iam.py:120
framework.infrastructure.gcp.api.GcpProjectApiResource
Definition: api.py:329
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
framework.infrastructure.gcp.iam.Policy.as_dict
Dict[str, Any] as_dict(self)
Definition: iam.py:170
framework.infrastructure.gcp.iam.IamV1.__init__
def __init__(self, gcp.api.GcpApiManager api_manager, str project)
Definition: iam.py:194
framework.infrastructure.gcp.iam.IamV1.get_service_account_iam_policy
Policy get_service_account_iam_policy(self, str account)
Definition: iam.py:222
framework.infrastructure.gcp.iam.ServiceAccount
Definition: iam.py:60
framework.infrastructure.gcp.iam.Expr
Definition: iam.py:91
framework.infrastructure.gcp.iam.handle_etag_conflict
def handle_etag_conflict(func)
Definition: iam.py:38
framework.infrastructure.gcp.iam._timedelta
_timedelta
Definition: iam.py:26
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
framework.infrastructure.gcp.api.ResponseError
Definition: api.py:218
framework.infrastructure.gcp.iam._replace_binding
'Policy' _replace_binding('Policy' policy, 'Policy.Binding' binding, 'Policy.Binding' new_binding)
Definition: iam.py:51
framework.infrastructure.gcp.iam.IamV1.service_account_resource_name
str service_account_resource_name(self, account)
Definition: iam.py:199
framework.infrastructure.gcp.iam.Expr.as_dict
Dict[str, Any] as_dict(self)
Definition: iam.py:106
framework.infrastructure.gcp.iam.Policy.Binding.from_response
'Policy.Binding' from_response(cls, Dict[str, Any] response)
Definition: iam.py:130
framework.infrastructure.gcp.iam.Policy.Binding.as_dict
Dict[str, Any] as_dict(self)
Definition: iam.py:140
framework.infrastructure.gcp.iam.Policy.find_binding_for_role
Optional[ 'Policy.Binding'] find_binding_for_role(self, str role, Optional[Expr] condition=None)
Definition: iam.py:154
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.infrastructure.gcp.api.Error
Definition: api.py:214
framework.infrastructure.gcp.iam.Expr.from_response
'Expr' from_response(cls, Dict[str, Any] response)
Definition: iam.py:103
framework.infrastructure.gcp.iam.IamV1.set_service_account_iam_policy
Policy set_service_account_iam_policy(self, str account, Policy policy)
Definition: iam.py:232
framework.infrastructure.gcp.iam.Policy.from_response
'Policy' from_response(cls, Dict[str, Any] response)
Definition: iam.py:163
framework.infrastructure.gcp.iam.IamV1._service_accounts
_service_accounts
Definition: iam.py:197
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
framework.infrastructure.gcp.iam.IamV1.get_service_account
ServiceAccount get_service_account(self, str account)
Definition: iam.py:214
framework.infrastructure.gcp.api.TransportError
Definition: api.py:245
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
framework.infrastructure.gcp.iam.ServiceAccount.as_dict
Dict[str, Any] as_dict(self)
Definition: iam.py:86
framework.infrastructure.gcp.iam.Policy
Definition: iam.py:111
framework.infrastructure.gcp.iam.IamV1.remove_service_account_iam_policy_binding
None remove_service_account_iam_policy_binding(self, str account, str role, str member)
Definition: iam.py:284
framework.infrastructure.gcp.iam.IamV1.add_service_account_iam_policy_binding
None add_service_account_iam_policy_binding(self, str account, str role, str member)
Definition: iam.py:255
framework.infrastructure.gcp.api.GcpApiManager
Definition: api.py:64
framework.infrastructure.gcp.api.GcpProjectApiResource._execute
Dict[str, Any] _execute(self, HttpRequest request, *Optional[int] num_retries=_GCP_API_RETRIES)
Definition: api.py:342
framework.infrastructure.gcp.iam.IamV1
Definition: iam.py:180


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:14