import pymongo from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase import crud, schemas from core.config import settings from core.security import get_password_hash from db import get_database from api import deps from db.ckdb import CKDrive, get_ck_db from utils import casbin_enforcer router = APIRouter() @router.get("/api_list") async def api_list(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: """api 列表""" app = request.app data = {} for r in app.routes: title = r.tags[0] if hasattr(r, 'description') else None if not title: continue data.setdefault(title, {'list': []}) path = r.path name = r.description if hasattr(r, 'description') else r.name data[title]['list'].append({'api': path, 'title': name}) res = [{'title': k, 'list': v['list']} for k, v in data.items()] return schemas.Msg(code=0, msg='ok', data=res) @router.post('/set_data_auth') async def set_data_auth(request: Request, data_id: schemas.DataAuthSet, game: str = Depends(deps.get_game_project), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """设置用户数据权限""" await crud.authority.set_data_auth(db, data_id, game=game) return schemas.Msg(code=0, msg='ok', data=data_id) @router.get('/get_user_data_auth') async def get_user_data_auth(request: Request, game: str = Depends(deps.get_game_project), db: AsyncIOMotorDatabase = Depends(get_database), ck: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """获取当前用户数据权限""" data_auth = await crud.authority.get_data_auth(db, username=request.user.name, game=game) if not data_auth: values = await ck.distinct(game, 'event', '#event_name') return schemas.Msg(code=0, msg='ok', data={ 'data': values, 'game': game, 'name': '全部事件' }) data_auth_id = data_auth['data_auth_id'] data = await crud.data_auth.get(data_auth_id) return schemas.Msg(code=0, msg='ok', data=data) # @router.get('/get_users_data_auth') # async def get_users_data_auth(request: Request, # game: str = Depends(deps.get_game_project), # db: AsyncIOMotorDatabase = Depends(get_database), # ck: CKDrive = Depends(get_ck_db), # current_user: schemas.UserDB = Depends(deps.get_current_user) # ) -> schemas.Msg: # """获取当前项目所有用户数据权限""" # # roles = await crud.authority.find_many(db, ptype='g', v2=game) # for item in roles: # user = item['v0'] # data_auth = await crud.authority.get_data_auth(db, username=request.user.name, game=game) # if not data_auth: # values = await ck.distinct(game, 'event', '#event_name') # return schemas.Msg(code=0, msg='ok', data={ # 'data': values, # 'game': game, # 'name': '全部事件' # }) # data_auth_id = data_auth['data_auth_id'] # data = await crud.data_auth.get(data_auth_id) # return schemas.Msg(code=0, msg='ok', data=data) # # # data_auth = await crud.authority.get_data_auth(db, username=request.user.name, game=game) # # if not data_auth: # # values = await ck.distinct(game, 'event', '#event_name') # # return schemas.Msg(code=0, msg='ok', data={ # # 'data': values, # # 'game': game, # # 'name': '全部事件' # # }) # # data_auth_id = data_auth['data_auth_id'] # # data = await crud.data_auth.get(data_auth_id) # return schemas.Msg(code=0, msg='ok') @router.post("/add_role") async def add_role(request: Request, data_in: schemas.CasbinRoleCreate, game: str = Depends(deps.get_game_project), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """创建角色""" # 不允许角色名和用户名一样 if await crud.user.get_by_user(db, name=data_in.role_name): return schemas.Msg(code=-1, msg='请改个名字') role_dom = game api_dict = dict() for r in request.app.routes: api_dict[r.path] = r.description if hasattr(r, 'description') else r.name # 角色有的接口权限 for obj in data_in.role_api: casbin_enforcer.add_policy(data_in.role_name, role_dom, obj, '*') await crud.authority.update_one(db, {'ptype': 'p', 'v0': data_in.role_name, 'v1': role_dom, 'v2': obj}, {'$set': {'api_name': api_dict.get(obj)}}) # 管理员默认拥有该角色 方便从db中读出 await crud.authority.create(db, 'g', settings.SUPERUSER_NAME, data_in.role_name, role_dom, '*', role_name=data_in.role_name, game=role_dom) return schemas.Msg(code=0, msg='ok') @router.post("/add_sys_role") async def add_sys_role(request: Request, data_in: schemas.CasbinRoleCreate, game: str = Depends(deps.get_game_project), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """创建系统角色""" api_dict = dict() # 不允许角色名和用户名一样 if await crud.user.get_by_user(db, name=data_in.role_name): return schemas.Msg(code=-1, msg='请改个名字') for r in request.app.routes: api_dict[r.path] = r.description if hasattr(r, 'description') else r.name # 角色有的接口权限 for obj in data_in.role_api: casbin_enforcer.add_policy(data_in.role_name, '*', obj, '*') await crud.authority.create(db, 'p', data_in.role_name, '*', obj, '*', api_name=api_dict.get(obj)) # 管理员默认拥有该角色 方便从db中读出 await crud.authority.create(db, 'g', settings.SUPERUSER_NAME, data_in.role_name, role_name=data_in.role_name, game='*') return schemas.Msg(code=0, msg='ok') @router.post("/add_account") async def add_account(request: Request, data_in: schemas.AccountsCreate, game: str = Depends(deps.get_game_project), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """添加账号""" # 用户名不能与角色名重复 roles = casbin_enforcer.get_all_roles() accounts = {item.username for item in data_in.accounts} # 用户名不能与已存在的重复 exists_user = await crud.user.get_all_user(db) if accounts & set(roles) or accounts & set(exists_user): return schemas.Msg(code=-1, msg='已存在', data=list(set(accounts) & set(roles) | accounts & set(exists_user))) """创建账号 并设置角色""" for item in data_in.accounts: account = schemas.UserCreate(name=item.username, password=settings.DEFAULT_PASSWORD) try: await crud.user.create(db, account) except pymongo.errors.DuplicateKeyError: return schemas.Msg(code=-1, msg='用户名已存在') casbin_enforcer.add_grouping_policy(item.username, item.role_name, game) # 设置数据权限 await crud.authority.set_data_auth(db, schemas.DataAuthSet(username=item.username, data_auth_id=item.data_auth_id), game) # 添加到项目成员 await crud.project.add_members(db, schemas.ProjectMember(project_id=data_in.project_id, members=list(accounts))) return schemas.Msg(code=0, msg='ok') @router.get("/all_role") async def all_role(request: Request, db: AsyncIOMotorDatabase = Depends(get_database), game: str = Depends(deps.get_game_project), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """获取所有角色""" app = request.app api_data = {} for r in app.routes: title = r.tags[0] if hasattr(r, 'description') else None if not title: continue api_data[r.path] = { 'api': r.path, 'title': title, 'name': r.description if hasattr(r, 'description') else r.name } """获取域内所有角色""" roles = await crud.authority.find_many(db, {'role_name': {'$exists': 1}, 'game': game}) dom_data = [{'role': item['v1'], 'title': item['role_name'], 'id': str(item['_id'])} for item in roles] for item in dom_data: q = await crud.authority.get_role_dom_authority(db, item['role'], game, api_data) item['authority'] = [{'title': k, 'child': v} for k, v in q.items()] # 获取系统角色 roles = await crud.authority.find_many(db, {'role_name':{'$exists': 1}, 'game':'*'}) sys_data = [{'role': item['v1'], 'title': item['role_name'], 'id': str(item['_id'])} for item in roles] for item in sys_data: q = await crud.authority.get_role_dom_authority(db, item['role'], dom=game, api_data=api_data) item['authority'] = [{'title': k, 'child': v} for k, v in q.items()] data = { 'dom_role': dom_data, 'sys_role': sys_data } return schemas.Msg(code=0, msg='ok', data=data) # @router.post("/set_role") # async def set_role(request: Request, # data_id: schemas.AccountSetRole, # db: AsyncIOMotorDatabase = Depends(get_database), # current_user: schemas.UserDB = Depends(deps.get_current_user) # ) -> schemas.Msg: # """设置账号角色""" # casbin_enforcer.delete_user(data_id.name) # casbin_enforcer.add_role_for_user(data_id.name, data_id.role_name) # await crud.authority.update_one(db, {'ptype': 'g', 'v0': data_id.name}, dict(v1=data_id.role_name)) # # return schemas.Msg(code=0, msg='ok') # @router.get("/delete_user") # async def delete_user(request: Request, # data_id: schemas.AccountDeleteUser, # db: AsyncIOMotorDatabase = Depends(get_database), # current_user: schemas.UserDB = Depends(deps.get_current_user) # ) -> schemas.Msg: # pass # return schemas.Msg(code=0, msg='暂时没有')