232 lines
7.8 KiB
Python
232 lines
7.8 KiB
Python
from utils.casbin.management_enforcer import ManagementEnforcer
|
|
from utils.casbin.util import join_slice, set_subtract
|
|
|
|
|
|
class Enforcer(ManagementEnforcer):
|
|
"""
|
|
Enforcer = ManagementEnforcer + RBAC_API + RBAC_WITH_DOMAIN_API
|
|
"""
|
|
|
|
"""creates an enforcer via file or DB.
|
|
|
|
File:
|
|
e = casbin.Enforcer("path/to/basic_model.conf", "path/to/basic_policy.csv")
|
|
MySQL DB:
|
|
a = mysqladapter.DBAdapter("mysql", "mysql_username:mysql_password@tcp(127.0.0.1:3306)/")
|
|
e = casbin.Enforcer("path/to/basic_model.conf", a)
|
|
"""
|
|
|
|
def get_roles_for_user(self, name):
|
|
""" gets the roles that a user has. """
|
|
return self.model.model["g"]["g"].rm.get_roles(name)
|
|
|
|
def get_users_for_role(self, name):
|
|
""" gets the users that has a role. """
|
|
return self.model.model["g"]["g"].rm.get_users(name)
|
|
|
|
def has_role_for_user(self, name, role):
|
|
""" determines whether a user has a role. """
|
|
roles = self.get_roles_for_user(name)
|
|
return any(r == role for r in roles)
|
|
|
|
def add_role_for_user(self, user, role):
|
|
"""
|
|
adds a role for a user.
|
|
Returns false if the user already has the role (aka not affected).
|
|
"""
|
|
return self.add_grouping_policy(user, role)
|
|
|
|
def delete_role_for_user(self, user, role):
|
|
"""
|
|
deletes a role for a user.
|
|
Returns false if the user does not have the role (aka not affected).
|
|
"""
|
|
return self.remove_grouping_policy(user, role)
|
|
|
|
def delete_roles_for_user(self, user):
|
|
"""
|
|
deletes all roles for a user.
|
|
Returns false if the user does not have any roles (aka not affected).
|
|
"""
|
|
return self.remove_filtered_grouping_policy(0, user)
|
|
|
|
def delete_user(self, user):
|
|
"""
|
|
deletes a user.
|
|
Returns false if the user does not exist (aka not affected).
|
|
"""
|
|
res1 = self.remove_filtered_grouping_policy(0, user)
|
|
|
|
res2 = self.remove_filtered_policy(0, user)
|
|
return res1 or res2
|
|
|
|
def delete_role(self, role):
|
|
"""
|
|
deletes a role.
|
|
Returns false if the role does not exist (aka not affected).
|
|
"""
|
|
res1 = self.remove_filtered_grouping_policy(1, role)
|
|
|
|
res2 = self.remove_filtered_policy(0, role)
|
|
return res1 or res2
|
|
|
|
def delete_permission(self, *permission):
|
|
"""
|
|
deletes a permission.
|
|
Returns false if the permission does not exist (aka not affected).
|
|
"""
|
|
return self.remove_filtered_policy(1, *permission)
|
|
|
|
def add_permission_for_user(self, user, *permission):
|
|
"""
|
|
adds a permission for a user or role.
|
|
Returns false if the user or role already has the permission (aka not affected).
|
|
"""
|
|
return self.add_policy(join_slice(user, *permission))
|
|
|
|
def delete_permission_for_user(self, user, *permission):
|
|
"""
|
|
deletes a permission for a user or role.
|
|
Returns false if the user or role does not have the permission (aka not affected).
|
|
"""
|
|
return self.remove_policy(join_slice(user, *permission))
|
|
|
|
def delete_permissions_for_user(self, user):
|
|
"""
|
|
deletes permissions for a user or role.
|
|
Returns false if the user or role does not have any permissions (aka not affected).
|
|
"""
|
|
return self.remove_filtered_policy(0, user)
|
|
|
|
def get_permissions_for_user(self, user):
|
|
"""
|
|
gets permissions for a user or role.
|
|
"""
|
|
return self.get_filtered_policy(0, user)
|
|
|
|
def has_permission_for_user(self, user, *permission):
|
|
"""
|
|
determines whether a user has a permission.
|
|
"""
|
|
return self.has_policy(join_slice(user, *permission))
|
|
|
|
def get_implicit_roles_for_user(self, name, domain=None):
|
|
"""
|
|
gets implicit roles that a user has.
|
|
Compared to get_roles_for_user(), this function retrieves indirect roles besides direct roles.
|
|
For example:
|
|
g, alice, role:admin
|
|
g, role:admin, role:user
|
|
|
|
get_roles_for_user("alice") can only get: ["role:admin"].
|
|
But get_implicit_roles_for_user("alice") will get: ["role:admin", "role:user"].
|
|
"""
|
|
res = []
|
|
queue = [name]
|
|
|
|
while queue:
|
|
name = queue.pop(0)
|
|
|
|
for rm in self.rm_map.values():
|
|
roles = rm.get_roles(name, domain)
|
|
for r in roles:
|
|
if r not in res:
|
|
res.append(r)
|
|
queue.append(r)
|
|
|
|
return res
|
|
|
|
def get_implicit_permissions_for_user(self, user, domain=None):
|
|
"""
|
|
gets implicit permissions for a user or role.
|
|
Compared to get_permissions_for_user(), this function retrieves permissions for inherited roles.
|
|
For example:
|
|
p, admin, data1, read
|
|
p, alice, data2, read
|
|
g, alice, admin
|
|
|
|
get_permissions_for_user("alice") can only get: [["alice", "data2", "read"]].
|
|
But get_implicit_permissions_for_user("alice") will get: [["admin", "data1", "read"], ["alice", "data2", "read"]].
|
|
"""
|
|
roles = self.get_implicit_roles_for_user(user, domain)
|
|
|
|
roles.insert(0, user)
|
|
|
|
res = []
|
|
for role in roles:
|
|
if domain:
|
|
permissions = self.get_permissions_for_user_in_domain(role, domain)
|
|
else:
|
|
permissions = self.get_permissions_for_user(role)
|
|
|
|
res.extend(permissions)
|
|
|
|
return res
|
|
|
|
def get_implicit_users_for_permission(self, *permission):
|
|
"""
|
|
gets implicit users for a permission.
|
|
For example:
|
|
p, admin, data1, read
|
|
p, bob, data1, read
|
|
g, alice, admin
|
|
|
|
get_implicit_users_for_permission("data1", "read") will get: ["alice", "bob"].
|
|
Note: only users will be returned, roles (2nd arg in "g") will be excluded.
|
|
"""
|
|
subjects = self.get_all_subjects()
|
|
roles = self.get_all_roles()
|
|
|
|
users = set_subtract(subjects, roles)
|
|
|
|
res = list()
|
|
for user in users:
|
|
req = join_slice(user, *permission)
|
|
allowed = self.enforce(*req)
|
|
|
|
if allowed:
|
|
res.append(user)
|
|
|
|
return res
|
|
|
|
def get_roles_for_user_in_domain(self, name, domain):
|
|
"""gets the roles that a user has inside a domain."""
|
|
return self.model.model['g']['g'].rm.get_roles(name, domain)
|
|
|
|
def get_users_for_role_in_domain(self, name, domain):
|
|
"""gets the users that has a role inside a domain."""
|
|
return self.model.model['g']['g'].rm.get_users(name, domain)
|
|
|
|
def add_role_for_user_in_domain(self, user, role, domain):
|
|
"""adds a role for a user inside a domain."""
|
|
"""Returns false if the user already has the role (aka not affected)."""
|
|
return self.add_grouping_policy(user, role, domain)
|
|
|
|
def delete_roles_for_user_in_domain(self, user, role, domain):
|
|
"""deletes a role for a user inside a domain."""
|
|
"""Returns false if the user does not have any roles (aka not affected)."""
|
|
return self.remove_filtered_grouping_policy(0, user, role, domain)
|
|
|
|
def get_permissions_for_user_in_domain(self, user, domain):
|
|
"""gets permissions for a user or role inside domain."""
|
|
return self.get_filtered_policy(0, user, domain)
|
|
|
|
def get_all_users_by_domain(self, domain):
|
|
"""获得所有与该域相关联的用户"""
|
|
data = self.get_filtered_grouping_policy(2, domain)
|
|
res = []
|
|
for item in data:
|
|
res.append({
|
|
'username': item[0],
|
|
'role_id': item[1]
|
|
})
|
|
return res
|
|
|
|
def get_domains_for_user(self, user):
|
|
"""获取用户拥有的所有域名"""
|
|
data = self.get_filtered_grouping_policy(0, user)
|
|
res = []
|
|
for item in data:
|
|
res.append(item[2])
|
|
return res
|