aboutsummaryrefslogtreecommitdiff
path: root/google/api_core/iam.py
diff options
context:
space:
mode:
Diffstat (limited to 'google/api_core/iam.py')
-rw-r--r--google/api_core/iam.py427
1 files changed, 427 insertions, 0 deletions
diff --git a/google/api_core/iam.py b/google/api_core/iam.py
new file mode 100644
index 0000000..4437c70
--- /dev/null
+++ b/google/api_core/iam.py
@@ -0,0 +1,427 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Non-API-specific IAM policy definitions
+
+For allowed roles / permissions, see:
+https://cloud.google.com/iam/docs/understanding-roles
+
+Example usage:
+
+.. code-block:: python
+
+ # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`.
+ policy = resource.get_iam_policy(requested_policy_version=3)
+
+ phred = "user:phred@example.com"
+ admin_group = "group:admins@groups.example.com"
+ account = "serviceAccount:account-1234@accounts.example.com"
+
+ policy.version = 3
+ policy.bindings = [
+ {
+ "role": "roles/owner",
+ "members": {phred, admin_group, account}
+ },
+ {
+ "role": "roles/editor",
+ "members": {"allAuthenticatedUsers"}
+ },
+ {
+ "role": "roles/viewer",
+ "members": {"allUsers"}
+ "condition": {
+ "title": "request_time",
+ "description": "Requests made before 2021-01-01T00:00:00Z",
+ "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
+ }
+ }
+ ]
+
+ resource.set_iam_policy(policy)
+"""
+
+import collections
+import collections.abc
+import operator
+import warnings
+
+# Generic IAM roles
+
+OWNER_ROLE = "roles/owner"
+"""Generic role implying all rights to an object."""
+
+EDITOR_ROLE = "roles/editor"
+"""Generic role implying rights to modify an object."""
+
+VIEWER_ROLE = "roles/viewer"
+"""Generic role implying rights to access an object."""
+
+_ASSIGNMENT_DEPRECATED_MSG = """\
+Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead."""
+
+_DICT_ACCESS_MSG = """\
+Dict access is not supported on policies with version > 1 or with conditional bindings."""
+
+
+class InvalidOperationException(Exception):
+ """Raised when trying to use Policy class as a dict."""
+
+ pass
+
+
+class Policy(collections.abc.MutableMapping):
+ """IAM Policy
+
+ Args:
+ etag (Optional[str]): ETag used to identify a unique of the policy
+ version (Optional[int]): The syntax schema version of the policy.
+
+ Note:
+ Using conditions in bindings requires the policy's version to be set
+ to `3` or greater, depending on the versions that are currently supported.
+
+ Accessing the policy using dict operations will raise InvalidOperationException
+ when the policy's version is set to 3.
+
+ Use the policy.bindings getter/setter to retrieve and modify the policy's bindings.
+
+ See:
+ IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy
+ Policy versions https://cloud.google.com/iam/docs/policies#versions
+ Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
+ """
+
+ _OWNER_ROLES = (OWNER_ROLE,)
+ """Roles mapped onto our ``owners`` attribute."""
+
+ _EDITOR_ROLES = (EDITOR_ROLE,)
+ """Roles mapped onto our ``editors`` attribute."""
+
+ _VIEWER_ROLES = (VIEWER_ROLE,)
+ """Roles mapped onto our ``viewers`` attribute."""
+
+ def __init__(self, etag=None, version=None):
+ self.etag = etag
+ self.version = version
+ self._bindings = []
+
+ def __iter__(self):
+ self.__check_version__()
+ # Exclude bindings with no members
+ return (binding["role"] for binding in self._bindings if binding["members"])
+
+ def __len__(self):
+ self.__check_version__()
+ # Exclude bindings with no members
+ return len(list(self.__iter__()))
+
+ def __getitem__(self, key):
+ self.__check_version__()
+ for b in self._bindings:
+ if b["role"] == key:
+ return b["members"]
+ # If the binding does not yet exist, create one
+ # NOTE: This will create bindings with no members
+ # which are ignored by __iter__ and __len__
+ new_binding = {"role": key, "members": set()}
+ self._bindings.append(new_binding)
+ return new_binding["members"]
+
+ def __setitem__(self, key, value):
+ self.__check_version__()
+ value = set(value)
+ for binding in self._bindings:
+ if binding["role"] == key:
+ binding["members"] = value
+ return
+ self._bindings.append({"role": key, "members": value})
+
+ def __delitem__(self, key):
+ self.__check_version__()
+ for b in self._bindings:
+ if b["role"] == key:
+ self._bindings.remove(b)
+ return
+ raise KeyError(key)
+
+ def __check_version__(self):
+ """Raise InvalidOperationException if version is greater than 1 or policy contains conditions."""
+ raise_version = self.version is not None and self.version > 1
+
+ if raise_version or self._contains_conditions():
+ raise InvalidOperationException(_DICT_ACCESS_MSG)
+
+ def _contains_conditions(self):
+ for b in self._bindings:
+ if b.get("condition") is not None:
+ return True
+ return False
+
+ @property
+ def bindings(self):
+ """The policy's list of bindings.
+
+ A binding is specified by a dictionary with keys:
+
+ * role (str): Role that is assigned to `members`.
+
+ * members (:obj:`set` of str): Specifies the identities associated to this binding.
+
+ * condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply.
+
+ * title (str): Title for the condition.
+
+ * description (:obj:str, optional): Description of the condition.
+
+ * expression: A CEL expression.
+
+ Type:
+ :obj:`list` of :obj:`dict`
+
+ See:
+ Policy versions https://cloud.google.com/iam/docs/policies#versions
+ Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
+
+ Example:
+
+ .. code-block:: python
+
+ USER = "user:phred@example.com"
+ ADMIN_GROUP = "group:admins@groups.example.com"
+ SERVICE_ACCOUNT = "serviceAccount:account-1234@accounts.example.com"
+ CONDITION = {
+ "title": "request_time",
+ "description": "Requests made before 2021-01-01T00:00:00Z", # Optional
+ "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
+ }
+
+ # Set policy's version to 3 before setting bindings containing conditions.
+ policy.version = 3
+
+ policy.bindings = [
+ {
+ "role": "roles/viewer",
+ "members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT},
+ "condition": CONDITION
+ },
+ ...
+ ]
+ """
+ return self._bindings
+
+ @bindings.setter
+ def bindings(self, bindings):
+ self._bindings = bindings
+
+ @property
+ def owners(self):
+ """Legacy access to owner role.
+
+ Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+ DEPRECATED: use `policy.bindings` to access bindings instead.
+ """
+ result = set()
+ for role in self._OWNER_ROLES:
+ for member in self.get(role, ()):
+ result.add(member)
+ return frozenset(result)
+
+ @owners.setter
+ def owners(self, value):
+ """Update owners.
+
+ Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+ DEPRECATED: use `policy.bindings` to access bindings instead.
+ """
+ warnings.warn(
+ _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning
+ )
+ self[OWNER_ROLE] = value
+
+ @property
+ def editors(self):
+ """Legacy access to editor role.
+
+ Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+ DEPRECATED: use `policy.bindings` to access bindings instead.
+ """
+ result = set()
+ for role in self._EDITOR_ROLES:
+ for member in self.get(role, ()):
+ result.add(member)
+ return frozenset(result)
+
+ @editors.setter
+ def editors(self, value):
+ """Update editors.
+
+ Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+ DEPRECATED: use `policy.bindings` to modify bindings instead.
+ """
+ warnings.warn(
+ _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE),
+ DeprecationWarning,
+ )
+ self[EDITOR_ROLE] = value
+
+ @property
+ def viewers(self):
+ """Legacy access to viewer role.
+
+ Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+ DEPRECATED: use `policy.bindings` to modify bindings instead.
+ """
+ result = set()
+ for role in self._VIEWER_ROLES:
+ for member in self.get(role, ()):
+ result.add(member)
+ return frozenset(result)
+
+ @viewers.setter
+ def viewers(self, value):
+ """Update viewers.
+
+ Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+ DEPRECATED: use `policy.bindings` to modify bindings instead.
+ """
+ warnings.warn(
+ _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE),
+ DeprecationWarning,
+ )
+ self[VIEWER_ROLE] = value
+
+ @staticmethod
+ def user(email):
+ """Factory method for a user member.
+
+ Args:
+ email (str): E-mail for this particular user.
+
+ Returns:
+ str: A member string corresponding to the given user.
+ """
+ return "user:%s" % (email,)
+
+ @staticmethod
+ def service_account(email):
+ """Factory method for a service account member.
+
+ Args:
+ email (str): E-mail for this particular service account.
+
+ Returns:
+ str: A member string corresponding to the given service account.
+
+ """
+ return "serviceAccount:%s" % (email,)
+
+ @staticmethod
+ def group(email):
+ """Factory method for a group member.
+
+ Args:
+ email (str): An id or e-mail for this particular group.
+
+ Returns:
+ str: A member string corresponding to the given group.
+ """
+ return "group:%s" % (email,)
+
+ @staticmethod
+ def domain(domain):
+ """Factory method for a domain member.
+
+ Args:
+ domain (str): The domain for this member.
+
+ Returns:
+ str: A member string corresponding to the given domain.
+ """
+ return "domain:%s" % (domain,)
+
+ @staticmethod
+ def all_users():
+ """Factory method for a member representing all users.
+
+ Returns:
+ str: A member string representing all users.
+ """
+ return "allUsers"
+
+ @staticmethod
+ def authenticated_users():
+ """Factory method for a member representing all authenticated users.
+
+ Returns:
+ str: A member string representing all authenticated users.
+ """
+ return "allAuthenticatedUsers"
+
+ @classmethod
+ def from_api_repr(cls, resource):
+ """Factory: create a policy from a JSON resource.
+
+ Args:
+ resource (dict): policy resource returned by ``getIamPolicy`` API.
+
+ Returns:
+ :class:`Policy`: the parsed policy
+ """
+ version = resource.get("version")
+ etag = resource.get("etag")
+ policy = cls(etag, version)
+ policy.bindings = resource.get("bindings", [])
+
+ for binding in policy.bindings:
+ binding["members"] = set(binding.get("members", ()))
+
+ return policy
+
+ def to_api_repr(self):
+ """Render a JSON policy resource.
+
+ Returns:
+ dict: a resource to be passed to the ``setIamPolicy`` API.
+ """
+ resource = {}
+
+ if self.etag is not None:
+ resource["etag"] = self.etag
+
+ if self.version is not None:
+ resource["version"] = self.version
+
+ if self._bindings and len(self._bindings) > 0:
+ bindings = []
+ for binding in self._bindings:
+ members = binding.get("members")
+ if members:
+ new_binding = {"role": binding["role"], "members": sorted(members)}
+ condition = binding.get("condition")
+ if condition:
+ new_binding["condition"] = condition
+ bindings.append(new_binding)
+
+ if bindings:
+ # Sort bindings by role
+ key = operator.itemgetter("role")
+ resource["bindings"] = sorted(bindings, key=key)
+
+ return resource