18 from typing
import Any, Dict, FrozenSet, Optional
23 logger = logging.getLogger(__name__)
26 _timedelta = datetime.timedelta
27 _HttpRequest = gcp.api.HttpRequest
32 Indicates concurrent policy changes.
34 https://cloud.google.com/iam/docs/policies#etag
40 def wrap_retry_on_etag_conflict(*args, **kwargs):
41 retryer = retryers.exponential_retryer_with_timeout(
46 return retryer(func, *args, **kwargs)
48 return wrap_retry_on_etag_conflict
52 new_binding:
'Policy.Binding') ->
'Policy':
53 new_bindings =
set(policy.bindings)
54 new_bindings.discard(binding)
55 new_bindings.add(new_binding)
56 return dataclasses.replace(policy, bindings=frozenset(new_bindings))
59 @dataclasses.dataclass(frozen=
True)
61 """An IAM service account.
63 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts
64 Note: "etag" field is skipped because it's deprecated
73 disabled: bool =
False
77 return cls(name=response[
'name'],
78 projectId=response[
'projectId'],
79 uniqueId=response[
'uniqueId'],
80 email=response[
'email'],
81 oauth2ClientId=response[
'oauth2ClientId'],
82 description=response.get(
'description',
''),
83 displayName=response.get(
'displayName',
''),
84 disabled=response.get(
'disabled',
False))
87 return dataclasses.asdict(self)
90 @dataclasses.dataclass(frozen=
True)
93 Represents a textual expression in the Common Expression Language syntax.
95 https://cloud.google.com/iam/docs/reference/rest/v1/Expr
104 return cls(**response)
107 return dataclasses.asdict(self)
110 @dataclasses.dataclass(frozen=
True)
112 """An Identity and Access Management (IAM) policy, which specifies
113 access controls for Google Cloud resources.
115 https://cloud.google.com/iam/docs/reference/rest/v1/Policy
116 Note: auditConfigs not supported by this implementation.
119 @dataclasses.dataclass(frozen=
True)
121 """Policy Binding. Associates members with a role.
123 https://cloud.google.com/iam/docs/reference/rest/v1/Policy#binding
126 members: FrozenSet[str]
127 condition: Optional[Expr] =
None
132 'role': response[
'role'],
133 'members': frozenset(response.get(
'members', [])),
135 if 'condition' in response:
136 fields[
'condition'] = Expr.from_response(response[
'condition'])
143 'members': list(self.members),
145 if self.condition
is not None:
146 result[
'condition'] = self.condition.
as_dict()
149 bindings: FrozenSet[Binding]
151 version: Optional[int] =
None
153 @functools.lru_cache(maxsize=128)
157 condition: Optional[Expr] =
None) -> Optional[
'Policy.Binding']:
158 results = (binding
for binding
in self.bindings
159 if binding.role == role
and binding.condition == condition)
160 return next(results,
None)
164 bindings = frozenset(
166 return cls(bindings=bindings,
167 etag=response[
'etag'],
168 version=response.get(
'version'))
172 'bindings': [binding.as_dict()
for binding
in self.bindings],
175 if self.version
is not None:
176 result[
'version'] = self.version
182 Identity and Access Management (IAM) API.
184 https://cloud.google.com/iam/docs/reference/rest
186 _service_accounts: gcp.api.discovery.Resource
192 POLICY_VERSION: int = 3
195 super().
__init__(api_manager.iam(
'v1'), project)
201 Returns full resource name of the service account.
203 The resource name of the service account in the following format:
204 projects/{PROJECT_ID}/serviceAccounts/{ACCOUNT}.
205 The ACCOUNT value can be the email address or the uniqueId of the
207 Ref https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/get
210 account: The ACCOUNT value
212 return f
'projects/{self.project}/serviceAccounts/{account}'
217 response: Dict[str, Any] = self.
_execute(request)
218 logger.debug(
'Loaded Service Account:\n%s',
220 return ServiceAccount.from_response(response)
225 resource=resource_name,
226 options_requestedPolicyVersion=self.POLICY_VERSION)
227 response: Dict[str, Any] = self.
_execute(request)
228 logger.debug(
'Loaded Service Account Policy:\n%s',
230 return Policy.from_response(response)
233 policy: Policy) -> Policy:
234 """Sets the IAM policy that is attached to a service account.
236 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
239 body = {
'policy': policy.as_dict()}
240 logger.debug(
'Updating Service Account %s policy:\n%s', account,
244 resource=resource_name, body=body)
245 response: Dict[str, Any] = self.
_execute(request)
246 return Policy.from_response(response)
248 if error.status == 409:
251 raise EtagConflict
from error
254 @handle_etag_conflict
256 member: str) ->
None:
257 """Add an IAM policy binding to an IAM service account.
259 See for details on updating policy bindings:
260 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
263 binding: Optional[
Policy.Binding] = policy.find_binding_for_role(role)
264 if binding
and member
in binding.members:
265 logger.debug(
'Member %s already has role %s for Service Account %s',
266 member, role, account)
272 updated_members: FrozenSet[str] = binding.members.union({member})
275 members=updated_members)
280 logger.debug(
'Role %s granted to member %s for Service Account %s',
281 role, member, account)
283 @handle_etag_conflict
285 member: str) ->
None:
286 """Remove an IAM policy binding from the IAM policy of a service
289 See for details on updating policy bindings:
290 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
293 binding: Optional[
Policy.Binding] = policy.find_binding_for_role(role)
296 logger.debug(
'Noop: Service Account %s has no bindings for role %s',
299 if member
not in binding.members:
301 'Noop: Service Account %s binding for role %s has no member %s',
302 account, role, member)
305 updated_members: FrozenSet[str] = binding.members.difference({member})
308 members=updated_members)
312 logger.debug(
'Role %s revoked from member %s for Service Account %s',
313 role, member, account)