xbackend/utils/casbin/enforcer.py
2021-08-23 17:42:18 +08:00

232 lines
7.8 KiB
Python

from utils.casbin.management_enforcer import ManagementEnforcer
from utils.casbin.util import join_slice, set_subtract
class Enforcer(ManagementEnforcer):
"""
Enforcer = ManagementEnforcer + RBAC_API + RBAC_WITH_DOMAIN_API
"""
"""creates an enforcer via file or DB.
File:
e = casbin.Enforcer("path/to/basic_model.conf", "path/to/basic_policy.csv")
MySQL DB:
a = mysqladapter.DBAdapter("mysql", "mysql_username:mysql_password@tcp(127.0.0.1:3306)/")
e = casbin.Enforcer("path/to/basic_model.conf", a)
"""
def get_roles_for_user(self, name):
""" gets the roles that a user has. """
return self.model.model["g"]["g"].rm.get_roles(name)
def get_users_for_role(self, name):
""" gets the users that has a role. """
return self.model.model["g"]["g"].rm.get_users(name)
def has_role_for_user(self, name, role):
""" determines whether a user has a role. """
roles = self.get_roles_for_user(name)
return any(r == role for r in roles)
def add_role_for_user(self, user, role):
"""
adds a role for a user.
Returns false if the user already has the role (aka not affected).
"""
return self.add_grouping_policy(user, role)
def delete_role_for_user(self, user, role):
"""
deletes a role for a user.
Returns false if the user does not have the role (aka not affected).
"""
return self.remove_grouping_policy(user, role)
def delete_roles_for_user(self, user):
"""
deletes all roles for a user.
Returns false if the user does not have any roles (aka not affected).
"""
return self.remove_filtered_grouping_policy(0, user)
def delete_user(self, user):
"""
deletes a user.
Returns false if the user does not exist (aka not affected).
"""
res1 = self.remove_filtered_grouping_policy(0, user)
res2 = self.remove_filtered_policy(0, user)
return res1 or res2
def delete_role(self, role):
"""
deletes a role.
Returns false if the role does not exist (aka not affected).
"""
res1 = self.remove_filtered_grouping_policy(1, role)
res2 = self.remove_filtered_policy(0, role)
return res1 or res2
def delete_permission(self, *permission):
"""
deletes a permission.
Returns false if the permission does not exist (aka not affected).
"""
return self.remove_filtered_policy(1, *permission)
def add_permission_for_user(self, user, *permission):
"""
adds a permission for a user or role.
Returns false if the user or role already has the permission (aka not affected).
"""
return self.add_policy(join_slice(user, *permission))
def delete_permission_for_user(self, user, *permission):
"""
deletes a permission for a user or role.
Returns false if the user or role does not have the permission (aka not affected).
"""
return self.remove_policy(join_slice(user, *permission))
def delete_permissions_for_user(self, user):
"""
deletes permissions for a user or role.
Returns false if the user or role does not have any permissions (aka not affected).
"""
return self.remove_filtered_policy(0, user)
def get_permissions_for_user(self, user):
"""
gets permissions for a user or role.
"""
return self.get_filtered_policy(0, user)
def has_permission_for_user(self, user, *permission):
"""
determines whether a user has a permission.
"""
return self.has_policy(join_slice(user, *permission))
def get_implicit_roles_for_user(self, name, domain=None):
"""
gets implicit roles that a user has.
Compared to get_roles_for_user(), this function retrieves indirect roles besides direct roles.
For example:
g, alice, role:admin
g, role:admin, role:user
get_roles_for_user("alice") can only get: ["role:admin"].
But get_implicit_roles_for_user("alice") will get: ["role:admin", "role:user"].
"""
res = []
queue = [name]
while queue:
name = queue.pop(0)
for rm in self.rm_map.values():
roles = rm.get_roles(name, domain)
for r in roles:
if r not in res:
res.append(r)
queue.append(r)
return res
def get_implicit_permissions_for_user(self, user, domain=None):
"""
gets implicit permissions for a user or role.
Compared to get_permissions_for_user(), this function retrieves permissions for inherited roles.
For example:
p, admin, data1, read
p, alice, data2, read
g, alice, admin
get_permissions_for_user("alice") can only get: [["alice", "data2", "read"]].
But get_implicit_permissions_for_user("alice") will get: [["admin", "data1", "read"], ["alice", "data2", "read"]].
"""
roles = self.get_implicit_roles_for_user(user, domain)
roles.insert(0, user)
res = []
for role in roles:
if domain:
permissions = self.get_permissions_for_user_in_domain(role, domain)
else:
permissions = self.get_permissions_for_user(role)
res.extend(permissions)
return res
def get_implicit_users_for_permission(self, *permission):
"""
gets implicit users for a permission.
For example:
p, admin, data1, read
p, bob, data1, read
g, alice, admin
get_implicit_users_for_permission("data1", "read") will get: ["alice", "bob"].
Note: only users will be returned, roles (2nd arg in "g") will be excluded.
"""
subjects = self.get_all_subjects()
roles = self.get_all_roles()
users = set_subtract(subjects, roles)
res = list()
for user in users:
req = join_slice(user, *permission)
allowed = self.enforce(*req)
if allowed:
res.append(user)
return res
def get_roles_for_user_in_domain(self, name, domain):
"""gets the roles that a user has inside a domain."""
return self.model.model['g']['g'].rm.get_roles(name, domain)
def get_users_for_role_in_domain(self, name, domain):
"""gets the users that has a role inside a domain."""
return self.model.model['g']['g'].rm.get_users(name, domain)
def add_role_for_user_in_domain(self, user, role, domain):
"""adds a role for a user inside a domain."""
"""Returns false if the user already has the role (aka not affected)."""
return self.add_grouping_policy(user, role, domain)
def delete_roles_for_user_in_domain(self, user, role, domain):
"""deletes a role for a user inside a domain."""
"""Returns false if the user does not have any roles (aka not affected)."""
return self.remove_filtered_grouping_policy(0, user, role, domain)
def get_permissions_for_user_in_domain(self, user, domain):
"""gets permissions for a user or role inside domain."""
return self.get_filtered_policy(0, user, domain)
def get_all_users_by_domain(self, domain):
"""获得所有与该域相关联的用户"""
data = self.get_filtered_grouping_policy(2, domain)
res = []
for item in data:
res.append({
'username': item[0],
'role_id': item[1]
})
return res
def get_domains_for_user(self, user):
"""获取用户拥有的所有域名"""
data = self.get_filtered_grouping_policy(0, user)
res = []
for item in data:
res.append(item[2])
return res