133 lines
4.3 KiB
Python
133 lines
4.3 KiB
Python
import pymongo
|
|
from fastapi import APIRouter, Depends, Request
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
import crud, schemas
|
|
from core.config import settings
|
|
from core.security import get_password_hash
|
|
|
|
from db import get_database
|
|
from api import deps
|
|
from utils import casbin_enforcer
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/api_list")
|
|
async def api_list(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg:
|
|
"""api 列表"""
|
|
app = request.app
|
|
data = []
|
|
for r in app.routes:
|
|
path = r.path
|
|
name = r.description if hasattr(r, 'description') else r.name
|
|
data.append({'api': path, 'name': name})
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/add_role")
|
|
async def add_role(request: Request, data_in: schemas.CasbinRoleCreate,
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""创建角色"""
|
|
role = (
|
|
'g',
|
|
'root',
|
|
data_in.role_name,
|
|
None
|
|
)
|
|
await crud.authority.create(db, role)
|
|
|
|
for item in data_in.role_api:
|
|
await crud.authority.create(db, (
|
|
'p',
|
|
data_in.role_name,
|
|
item,
|
|
'*'
|
|
))
|
|
return schemas.Msg(code=0, msg='ok')
|
|
|
|
|
|
@router.post("/add_account")
|
|
async def add_account(request: Request,
|
|
data_in: schemas.AccountCreate,
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""创建账号 并设置角色"""
|
|
|
|
account = schemas.UserCreate(name=data_in.name, password=settings.DEFAULT_PASSWORD)
|
|
try:
|
|
await crud.user.create(db, account)
|
|
except pymongo.errors.DuplicateKeyError:
|
|
return schemas.Msg(code=-1, msg='用户名已存在')
|
|
rule = (
|
|
'g',
|
|
data_in.name,
|
|
data_in.role_name,
|
|
None
|
|
)
|
|
await crud.authority.create(db, rule)
|
|
|
|
return schemas.Msg(code=0, msg='ok')
|
|
|
|
|
|
@router.get("/all_role")
|
|
async def all_role(request: Request,
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""获取所有角色 和 角色权限"""
|
|
routes = {}
|
|
for item in request.app.routes:
|
|
routes[item.path] = item.description if hasattr(item, 'description') else item.name
|
|
roles = casbin_enforcer.get_all_roles()
|
|
permissions = {}
|
|
for role in roles:
|
|
for _, path, _ in casbin_enforcer.get_permissions_for_user(role):
|
|
permissions.setdefault(role, [])
|
|
if path == '*':
|
|
permissions[role].clear()
|
|
|
|
permissions[role] = [{
|
|
'path': k,
|
|
'name': v
|
|
} for k, v in routes.items()]
|
|
break
|
|
|
|
if path in routes:
|
|
permissions[role].append(
|
|
{
|
|
'path': path,
|
|
'name': routes[path]
|
|
}
|
|
)
|
|
|
|
return schemas.Msg(code=0, msg='ok', data={'roles': roles, 'permissions': permissions})
|
|
|
|
|
|
@router.post("/set_role")
|
|
async def set_role(request: Request,
|
|
data_id: schemas.AccountSetRole,
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""设置账号角色"""
|
|
casbin_enforcer.delete_user(data_id.name)
|
|
casbin_enforcer.add_role_for_user(data_id.name, data_id.role_name)
|
|
crud.authority.update_upsert(db, {'prtype': 'g', 'v0': data_id.name}, v1=data_id.role_name)
|
|
|
|
return schemas.Msg(code=0, msg='ok')
|
|
|
|
|
|
|
|
|
|
# @router.get("/delete_user")
|
|
# async def delete_user(request: Request,
|
|
# data_id: schemas.AccountDeleteUser,
|
|
# db: AsyncIOMotorDatabase = Depends(get_database),
|
|
# current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
# ) -> schemas.Msg:
|
|
# pass
|
|
# return schemas.Msg(code=0, msg='暂时没有')
|