import pymongo from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase import crud, schemas from core.config import settings from core.security import get_password_hash from db import get_database from api import deps from utils import casbin_enforcer router = APIRouter() @router.get("/api_list") async def api_list(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: """api 列表""" app = request.app data = [] for r in app.routes: path = r.path name = r.description if hasattr(r, 'description') else r.name data.append({'api': path, 'name': name}) return schemas.Msg(code=0, msg='ok', data=data) @router.post("/add_role") async def add_role(request: Request, data_in: schemas.CasbinRoleCreate, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """创建角色""" role = ( 'g', 'root', data_in.role_name, None ) await crud.authority.create(db, role) for item in data_in.role_api: await crud.authority.create(db, ( 'p', data_in.role_name, item, '*' )) return schemas.Msg(code=0, msg='ok') @router.post("/add_account") async def add_account(request: Request, data_in: schemas.AccountCreate, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """创建账号 并设置角色""" account = schemas.UserCreate(name=data_in.name, password=settings.DEFAULT_PASSWORD) try: await crud.user.create(db, account) except pymongo.errors.DuplicateKeyError: return schemas.Msg(code=-1, msg='用户名已存在') rule = ( 'g', data_in.name, data_in.role_name, None ) await crud.authority.create(db, rule) return schemas.Msg(code=0, msg='ok') @router.get("/all_role") async def all_role(request: Request, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """获取所有角色 和 角色权限""" routes = {} for item in request.app.routes: routes[item.path] = item.description if hasattr(item, 'description') else item.name roles = casbin_enforcer.get_all_roles() permissions = {} for role in roles: for _, path, _ in casbin_enforcer.get_permissions_for_user(role): permissions.setdefault(role, []) if path == '*': permissions[role].clear() permissions[role] = [{ 'path': k, 'name': v } for k, v in routes.items()] break if path in routes: permissions[role].append( { 'path': path, 'name': routes[path] } ) return schemas.Msg(code=0, msg='ok', data={'roles': roles, 'permissions': permissions}) @router.post("/set_role") async def set_role(request: Request, data_id: schemas.AccountSetRole, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """设置账号角色""" casbin_enforcer.delete_user(data_id.name) casbin_enforcer.add_role_for_user(data_id.name, data_id.role_name) crud.authority.update_upsert(db, {'prtype': 'g', 'v0': data_id.name}, v1=data_id.role_name) return schemas.Msg(code=0, msg='ok') # @router.get("/delete_user") # async def delete_user(request: Request, # data_id: schemas.AccountDeleteUser, # db: AsyncIOMotorDatabase = Depends(get_database), # current_user: schemas.UserDB = Depends(deps.get_current_user) # ) -> schemas.Msg: # pass # return schemas.Msg(code=0, msg='暂时没有')