from typing import Any from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase import crud import schemas from api import deps from db import get_database from db.ckdb import CKDrive, get_ck_db from db.redisdb import RedisDrive, get_redis_pool from models.behavior_analysis import BehaviorAnalysis from utils import casbin_enforcer router = APIRouter() @router.post("/add_role_domain") async def add_role_domain( request: Request, data_in: schemas.AddRoleForUsersInDomain, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)): """ 在域内为用户添加角色 """ # username role dom for item in data_in.data: is_exists_role = await crud.role.check(db, _id=item.role_id, game=item.game) if not is_exists_role: continue casbin_enforcer.add_role_for_user_in_domain(user=item.username, role=item.role_id, domain=item.game) return schemas.Msg(code=0, msg='添加成功', data=True) @router.post("/get_permissions_for_user_in_domain") async def get_permissions_for_user_in_domain( request: Request, data_in: schemas.GetPermissionsForUserInDomain, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)): """ 获取域内用户或角色的权限 """ #data为列表 data = casbin_enforcer.get_permissions_for_user_in_domain(data_in.role_id, data_in.game) paths = {i[2] for i in data} #列表形式的coll_name all_api = await crud.api_list.all_api(db) for item in all_api: if item['path'] in paths: item['is_authz'] = True else: item['is_authz'] = False return schemas.Msg(code=0, msg='ok', data=all_api) @router.post("/del_role_user_domain") async def del_role_domain( request: Request, data_in: schemas.DeleteRolesForUserInDomain, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)): """ 删除用户角色域 """ # username role dom res = casbin_enforcer.delete_roles_for_user_in_domain(user=data_in.username, role=data_in.role_id, domain=data_in.game) #await crud.role.delete_id(db, data_in.role_id) return schemas.Msg(code=0, msg='ok', data=res) @router.post("/add_policy") async def add_policy( request: Request, data_id: schemas.AddPolicy, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)): """ 向当前策略添加授权规则 """ res = 0 for path in data_id.path_list: res = casbin_enforcer.add_policy(data_id.role_id, data_id.game, path, data_id.act) return schemas.Msg(code=0, msg='ok', data=res) @router.post("/del_policy") async def remove_policy( request: Request, data_id: schemas.DelPolicy, current_user: schemas.UserDB = Depends(deps.get_current_user)): """ 删除角色api权限 """ res = casbin_enforcer.remove_policy(data_id.role_id, data_id.game, data_id.path, data_id.act) return schemas.Msg(code=0, msg='ok', data=res) @router.get("/api_list") async def api_list( request: Request, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)): """ GetPermissionsForUserInDomain 所有的api """ res = await crud.api_list.all_api(db) return schemas.Msg(code=0, msg='ok', data=res) @router.post("/add_api") async def add_api( request: Request, data_in: schemas.AddApi, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 添加api """ try: res = await crud.api_list.add_api(db, data_in) except Exception as e: return schemas.Msg(code=-1, msg='已经存在') return schemas.Msg(code=0, msg='ok', data=res.matched_count) @router.post("/del_api") async def del_api( request: Request, data_in: schemas.DelApi, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: """ 删除api """ # 删除规则 paths = await crud.api_list.find_ids(db, data_in.ids, {'path': 1}) for item in paths: casbin_enforcer.remove_filtered_policy(2, item['path']) # 删除保存的记录 res = await crud.api_list.del_api(db, data_in) return schemas.Msg(code=0, msg='ok', data=res.deleted_count) @router.post("/edit_api") async def edit_api( request: Request, data_in: schemas.EditApi, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: """ 编辑api """ res = await crud.api_list.edit_api(db, data_in) return schemas.Msg(code=0, msg='ok', data=res.matched_count) @router.get("/domain") async def domain_list( request: Request, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 可选择域 游戏代号 """ # roel dom path * res = await crud.project.all_game(db) return schemas.Msg(code=0, msg='ok', data=res) @router.post("/add_roles") async def add_roles( request: Request, data_in: schemas.AddRole, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 新建角色 """ try: res = await crud.role.add_role(db, data_in) return schemas.Msg(code=0, msg='ok', data=res.upserted_id) except Exception as e: return schemas.Msg(code=-1, msg='添加失败', data=str(e)) @router.get("/roles") async def roles( request: Request, game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 域内所有角色 """ res = await crud.role.dom_roles(db, game) return schemas.Msg(code=0, msg='ok', data=res) @router.post("/edit_role") async def edit_role( request: Request, date_in: schemas.EditRole, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 修改角色名 """ res = await crud.role.edit_role(db, date_in) return schemas.Msg(code=0, msg='ok', data=res.matched_count) @router.get("/update_api_list") async def update_api_list( request: Request, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user), ): """更新 api 列表""" app = request.app data = {} for r in app.routes: title = r.tags[0] if hasattr(r, 'description') else None if not title: continue data.setdefault(title, {'list': []}) path = r.path name = r.description if hasattr(r, 'description') else r.name data[title]['list'].append({'api': path, 'title': name}) data = [{'title': k, 'list': v['list']} for k, v in data.items()] for item in data: title = item['title'] for l in item['list']: api = l['api'] name = l['title'] add_data = schemas.UpdateApi(path=api, name=name) await crud.api_list.update_api(db, add_data) return schemas.Msg(code=0, msg='ok', data=1) @router.get("/account_owner_list") async def account_owner_list(request: Request, game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: """获取账号owner权限""" account_infos = await crud.user.find_many(db, {}, {'_id': False, 'name': True, 'nickname': True, f'data_where.{game}': True}) resp = [] for account_info in account_infos: resp.append( { 'name': account_info.get('name'), 'nickname': account_info.get('nickname'), 'owner_list': '' } ) for item in account_info.get('data_where', {}).get(game, []): if item.get('columnName') == 'owner_name': resp[-1]['owner_list'] = ','.join(item.get('ftv', [])) break return schemas.Msg(code=0, msg='ok', data=resp) # @router.post("/git_owner") # async def git_owner(request: Request, # game: str, # db: AsyncIOMotorDatabase = Depends(get_database), # current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: # user=await crud.user @router.post("/update_account_owner") async def account_owner_list(request: Request, game: str, data_in: schemas.OwnerList, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg: """设置账号owner权限""" set_data = { "columnName": "owner_name", "tableType": "event", "comparator": "in", "ftv": data_in.owners } if not data_in.owners[0]: res = await crud.user.update_one(db, {'name': data_in.account_name, f'data_where.{game}': {'$exists': True} }, {'$pull': {f'data_where.{game}': {'columnName': 'owner_name'}}} ) return schemas.Msg(code=0, msg='ok', data=res.raw_result) is_exists = await crud.user.find_one(db, {'name': data_in.account_name, f'data_where.{game}': {'$exists': True}, }) if is_exists: if await crud.user.find_one(db, {'name': data_in.account_name, f'data_where.{game}': {'$exists': True}, f'data_where.{game}.columnName': 'owner_name' }): await crud.user.update_one(db, {'name': data_in.account_name, f'data_where.{game}': {'$exists': True}, f'data_where.{game}.columnName': 'owner_name' }, {'$set': {f'data_where.{game}.$': set_data}}) else: await crud.user.update_one(db, {'name': data_in.account_name, f'data_where.{game}': {'$exists': True}, }, {'$push': {f'data_where.{game}': set_data}}) else: await crud.user.update_one(db, {'name': data_in.account_name, }, {'$set': {f'data_where.{game}': [set_data]}}) return schemas.Msg(code=0, msg='ok')