1.优化权限板块

2.新增用户搜索板块
This commit is contained in:
李伟 2022-04-14 16:08:01 +08:00
parent d7c4311f65
commit 84dcdad9b7
22 changed files with 1072 additions and 111 deletions

View File

@ -22,18 +22,42 @@ async def add_role_domain(
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)):
"""
域内为用户添加角色
当前项目为角色添加相应权限
"""
# username role dom
# for item in data_in.data:
# is_exists_role = await crud.role.check(db, _id=item.role_id, game=item.game)
# if not is_exists_role:
# continue
# casbin_enforcer.add_role_for_user_in_domain(user=item.username,
# role=item.role_id,
# domain=item.game)
#
# return schemas.Msg(code=0, msg='添加成功', data=True)
res = await crud.url_list.get_all(db)
role_id = {}
for i in res:
role_id[i['auth_id']] = i['name']
for item in data_in.data:
is_exists_role = await crud.role.check(db, _id=item.role_id, game=item.game)
if not is_exists_role:
continue
casbin_enforcer.add_role_for_user_in_domain(user=item.username,
role=item.role_id,
domain=item.game)
now_quanxian = await crud.user_url.get_quanxian(db, schemas.Url_quanxian(user_id=item.role_id))
# 如果不存在该用户其他游戏的权限,则新增一个
if now_quanxian == {}:
await crud.user_url.insert_quanxian(db, schemas.Url_quanxian(game=[item.game], user=item.username,
user_id=item.role_id,
quanxian=[role_id[item.auth_id]],
quanxian_id=[item.auth_id]))
# 存在则在这个用户加上要添加的游戏项目权限
else:
game = now_quanxian['game']
game.append(item.game)
quanxian = now_quanxian['quanxian']
quanxian.append(role_id[item.auth_id])
quanxian_id = now_quanxian['quanxian_id']
quanxian_id.append('auth_id')
await crud.user_url.updata_quanxian(db, schemas.Url_quanxian(game=game, user=item.username,
user_id=item.role_id, quanxian=quanxian,
quanxian_id=quanxian_id))
return schemas.Msg(code=0, msg='添加成功', data=True)
@ -67,47 +91,108 @@ async def del_role_domain(
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)):
"""
删除用户角色域
删除用户在当前项目中的权限
"""
# username role dom
# res = casbin_enforcer.delete_roles_for_user_in_domain(user=data_in.username,
# role=data_in.role_id,
# domain=data_in.game)
#
# #await crud.role.delete_id(db, data_in.role_id)
# return schemas.Msg(code=0, msg='ok', data=res)
res = await crud.user_url.get_all(db)
for i in res:
if i['user'] == data_in.username:
for nu in range(len(i['game'])):
if i['game'][nu] == data_in.game:
i['game'].remove(data_in.game)
i['quanxian_id'].remove(i['quanxian_id'][nu])
i['quanxian'].remove(data_in.role_id)
await crud.user_url.updata_quanxian(db, schemas.Url_quanxian(game=i['game'], user=data_in.username,
user_id=i['user_id'],
quanxian_id=i['quanxian_id'],
quanxian=i['quanxian']))
return schemas.Msg(code=0, msg='删除成功', data='')
res = casbin_enforcer.delete_roles_for_user_in_domain(user=data_in.username,
role=data_in.role_id,
domain=data_in.game)
#await crud.role.delete_id(db, data_in.role_id)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/del_role_user")
async def del_role_domain(
request: Request,
data_in: schemas.DeleteRolesForUserInDomain,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)):
"""
删除角色管理板块中的角色
"""
await crud.url_list.delete_name(db, data_in)
return schemas.Msg(code=0, msg="ok", data='')
@router.post("/add_policy")
async def add_policy(
request: Request,
data_id: schemas.AddPolicy,
data_in: schemas.Datalist,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)):
"""
向当前策略添加授权规则
向当前权限添加新路由
"""
res = 0
for path in data_id.path_list:
res = casbin_enforcer.add_policy(data_id.role_id, data_id.game, path, data_id.act)
return schemas.Msg(code=0, msg='ok', data=res)
# res = 0
# for path in data_id.path_list:
# res = casbin_enforcer.add_policy(data_id.role_id, data_id.game, path, data_id.act)
# return schemas.Msg(code=0, msg='ok', data=res)
res = await crud.url_list.find_one_url(db, data_in)
for i in range(len(res['api_list'])):
if res['api_list'][i] == data_in.path:
res['state'][i] = True
await crud.url_list.update_url_url(db, res)
return schemas.Msg(code=0, msg='修改成功', data='')
@router.post("/del_policy")
async def remove_policy(
request: Request,
data_id: schemas.DelPolicy,
data_in: schemas.Del_role,
current_user: schemas.UserDB = Depends(deps.get_current_user)):
"""
删除角色api权限
修改角色api权限
"""
# res = casbin_enforcer.remove_policy(data_id.role_id, data_id.game, data_id.path, data_id.act)
# return schemas.Msg(code=0, msg='ok', data=res)
res = await crud.url_list.find_one_url(db, data_in)
for i in range(len(res['api_list'])):
if res['api_list'][i] == data_in.path:
res['state'][i] = False
await crud.url_list.update_url_url(db, res)
return schemas.Msg(code=0, msg='修改成功', data='')
res = casbin_enforcer.remove_policy(data_id.role_id, data_id.game, data_id.path, data_id.act)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/del_api_module")
async def add_policy(
request: Request,
data_in: schemas.Add_module,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)):
res = await crud.api_module.get_one_module(db, data_in)
for i in range(len(res['state'])):
if data_in.url == res['api_list'][i]:
res['state'][i] = False
await crud.api_module.update_one_module(db, res)
return schemas.Msg(code=0, msg='修改成功', data='')
@router.post("/add_api_module")
async def add_policy(
request: Request,
data_in: schemas.Add_module,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)):
res = await crud.api_module.get_one_module(db, data_in)
for i in range(len(res['state'])):
if data_in.url == res['api_list'][i]:
res['state'][i] = True
await crud.api_module.update_one_module(db, res)
return schemas.Msg(code=0, msg='修改成功', data='')
@router.get("/api_list")
@ -120,10 +205,16 @@ async def api_list(
GetPermissionsForUserInDomain
所有的api
"""
res = await crud.api_list.all_api(db)
# res = await crud.api_list.all_api(db)
# return schemas.Msg(code=0, msg='ok', data=res)
re = await crud.api_module.get_api_module(db)
res = []
for i in re:
if i['path_name'] != 'root':
i['_id'] = str(i['_id'])
res.append(i)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/add_api")
async def add_api(
request: Request,
@ -134,11 +225,46 @@ async def add_api(
"""
添加api
"""
try:
res = await crud.api_list.add_api(db, data_in)
except Exception as e:
return schemas.Msg(code=-1, msg='已经存在')
return schemas.Msg(code=0, msg='ok', data=res.matched_count)
# try:
# res = await crud.api_list.add_api(db, data_in)
# except Exception as e:
# return schemas.Msg(code=-1, msg='已经存在')
# return schemas.Msg(code=0, msg='ok', data=res.matched_count)
res = await crud.api_module.get_api_module(db)
for i in res:
if data_in.path in i['api_list']:
return schemas.Msg(code=0, msg='该路由已存在', data='')
path_list = []
for i in res:
path_list.append(i['path_name'])
if data_in.name in path_list:
for i in res:
if data_in.name == i['path_name']:
i['api_list'].append(data_in.path)
i['api_name'].append(data_in.desc)
i['state'].append(True)
await crud.api_module.updata_quanxian_module(db, schemas.Url_module(auth_id=i['auth_id'],
path_name=data_in.name,
api_list=i['api_list'],
api_name=i['api_name'],
state=i['state']))
return schemas.Msg(code=0, msg='ok', data='路由添加成功!')
else:
auth_list = []
for i in res:
auth_list.append(i['auth_id'])
auth_id = max(auth_list)
# api_data={}
# api_data['auth_id']='abc'+str(int(auth_id.split('c')[-1])+1)
# api_data['path_name']=data_in.name
# api_data['api_list']=[data_in.path]
# api_data['api_name']=[data_in.desc]
# api_data['state']=[True]
auth_id = 'abc' + str(int(auth_id.split('c')[-1]) + 1)
await crud.api_module.insert_quanxian(db, schemas.Url_module(auth_id=auth_id, path_name=data_in.name,
api_list=[data_in.path],
api_name=[data_in.desc], state=[True]))
return schemas.Msg(code=0, msg='ok', data='路由添加成功!')
@router.post("/del_api")
@ -181,29 +307,98 @@ async def domain_list(
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""
可选择域 游戏代号
获取所有项目
"""
# roel dom path *
res = await crud.project.all_game(db)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/add_roles")
async def add_roles(
@router.get("/api_module")
async def domain_list(
request: Request,
data_in: schemas.AddRole,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""
新建角色
角色管理创建角色时显示的各个模块
"""
try:
res = await crud.role.add_role(db, data_in)
return schemas.Msg(code=0, msg='ok', data=res.upserted_id)
except Exception as e:
return schemas.Msg(code=-1, msg='添加失败', data=str(e))
res = await crud.api_module.get_api_module(db)
api_module=[]
for i in res:
if i['path_name'] !='root':
data=[]
data.append(i['auth_id'])
data.append(i['path_name'])
api_module.append(data)
return schemas.Msg(code=0, msg='ok', data=api_module)
@router.post("/add_roles")
async def add_roles(
request: Request,
game:str,
data_in: schemas.Add_role,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""
创建角色
"""
# try:
# res = await crud.role.add_role(db, data_in)
# return schemas.Msg(code=0, msg='ok', data=res.upserted_id)
# except Exception as e:
# return schemas.Msg(code=-1, msg='添加失败', data=str(e))
res = await crud.url_list.get_all(db)
for i in res:
if data_in.system == 1:
if data_in.name == i['name']:
return schemas.Msg(code=0, msg='该角色已存在!')
else:
if data_in.name == i['name'] and i['game'] == game:
return schemas.Msg(code=0, msg='该角色已存在!')
auth = []
if data_in.system == 1:
for i in res:
auth.append(i['auth_id'])
max_auth = 'ab' + str(int(max(auth).split('b')[-1]) + 1)
api_module = await crud.api_module.get_api_module(db)
for i in api_module:
if i['auth_id'] in data_in.path_name:
await crud.url_list.insert_url(db, schemas.Url_list(name=data_in.name, auth_id=max_auth,
path_name=i['path_name'], api_list=i['api_list'],
api_name=i['api_name'], state=i['state'],
system=data_in.system))
else:
state = []
for nu in range(len(i['state'])):
state.append(False)
if i['path_name'] != 'root':
await crud.url_list.insert_url(db, schemas.Url_list(name=data_in.name, auth_id=max_auth,
path_name=i['path_name'],
api_list=i['api_list'], api_name=i['api_name'],
state=state, system=data_in.system))
return schemas.Msg(code=0, msg='添加角色成功', data='')
else:
for i in res:
auth.append(i['auth_id'])
max_auth = 'ab' + str(int(max(auth).split('b')[-1]) + 1)
api_module = await crud.api_module.get_api_module(db)
for i in api_module:
if i['auth_id'] in data_in.path_name:
await crud.url_list.insert_urls(db, schemas.Url_lists(name=data_in.name, auth_id=max_auth,
path_name=i['path_name'], api_list=i['api_list'],
api_name=i['api_name'], state=i['state'],
system=data_in.system, game=game))
else:
state = []
for nu in range(len(i['state'])):
state.append(False)
if i['path_name'] != 'root':
await crud.url_list.insert_urls(db, schemas.Url_lists(name=data_in.name, auth_id=max_auth,
path_name=i['path_name'], game=game,
api_list=i['api_list'],
api_name=i['api_name'], state=state,
system=data_in.system))
return schemas.Msg(code=0, msg='添加角色成功', data='')
@router.get("/roles")
async def roles(
@ -213,25 +408,58 @@ async def roles(
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""
域内所有角色
获取所有的管理员用户
"""
res = await crud.role.dom_roles(db, game)
return schemas.Msg(code=0, msg='ok', data=res)
# res = await crud.role.dom_roles(db, game)
# return schemas.Msg(code=0, msg='ok', data=res)
res = await crud.url_list.get_all(db)
role = []
data = []
# 区分不同项目下的权限用户
for i in res:
if i['system'] == 1 and i['name'] != 'root':
role.append(i['name'])
if 'game' in i.keys():
if game == i['game']:
role.append(i['name'])
# 得到不同权限用户
role = list(set(role))
for id in role:
data_dcit = {}
data_dcit['name'] = id
auth_id = []
system = []
data_list = []
for i in res:
if i['name'] == id:
data_one = {}
auth_id.append(i['auth_id'])
system.append(i['system'])
data_one['path_name'] = i['path_name']
data_one['api_name'] = i['api_name']
data_one['api_list'] = i['api_list']
data_one['state'] = i['state']
data_list.append(data_one)
data_dcit['datalist'] = data_list
data_dcit['auth_id'] = auth_id[0]
data_dcit['system'] = system[0]
data.append(data_dcit)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/edit_role")
async def edit_role(
request: Request,
date_in: schemas.EditRole,
date_in: schemas.Editname,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""
修改角色名
"""
res = await crud.role.edit_role(db, date_in)
return schemas.Msg(code=0, msg='ok', data=res.matched_count)
# res = await crud.role.edit_role(db, date_in)
# return schemas.Msg(code=0, msg='ok', data=res.matched_count)
await crud.url_list.edit_name(db,date_in)
return schemas.Msg(code=0,msg="ok")
@router.get("/update_api_list")
async def update_api_list(
@ -335,3 +563,54 @@ async def account_owner_list(request: Request,
}, {'$set': {f'data_where.{game}': [set_data]}})
return schemas.Msg(code=0, msg='ok')
@router.get("/all_api_board")
async def all_api_board(request: Request,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg:
"""显示创建项目时生成的所有api权限模板"""
res = await crud.api_board.all_api(db)
for i in res:
i['_id'] = str(i['_id'])
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/updata_api_board")
async def updata_api_board(
request: Request,
opinion: bool,
data_in: schemas.Api_board,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg:
"""
修改api权限模板
"""
await crud.api_board.update(db, data_in,opinion)
return schemas.Msg(code=0, msg='ok')
@router.post("/add_api_board")
async def add_api_board(
request: Request,
data_in: schemas.Api_board,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg:
"""
添加api权限模板
"""
res = await crud.api_board.all_api(db)
for i in res:
if data_in.name ==i['name'] and data_in.api_name == i['api_name'] and data_in.api_path == i['api_path']:
return schemas.Msg(code=-1, msg='该路径已存在')
await crud.api_board.insert(db, data_in)
return schemas.Msg(code=0, msg='ok')
@router.post("/del_api_board")
async def del_api_board(
request: Request,
data_in: schemas.Api_board,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)) -> schemas.Msg:
"""
删除api权限模板
"""
await crud.api_board.del_api(db, data_in)
return schemas.Msg(code=0, msg='ok')

View File

@ -47,7 +47,55 @@ async def read_data_attr(
}
)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/game_user_event_list")
async def read_data_attr(
request: Request,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
ck: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户搜索时显示的用户属性"""
# data = await rdb.get(f'{game}_{data_in.cat}')
# data = json.loads(data)
# res = list(data.keys())
#event_columns = await ck.get_columns(game, 'event')
user_columns = await ck.get_columns(game, 'user')
data_attr = await crud.data_attr.find_many(db, {'game': game, 'cat': 'user'})
data_attr = {item['name']: item for item in data_attr}
user_props = []
for item in user_columns:
data_type = settings.CK_TYPE_DICT.get(item['type'])
title = data_attr.get(item['name'], {}).get('show_name') or item['name']
user_prop = {
'id': item['name'],
'data_type': data_type,
'title': title,
}
user_props.append(user_prop)
user_label_props = []
user_label_docs = await crud.user_label.find_many(db, {'game': game}, {'qp': 0})
for item in user_label_docs:
tmp = {
'id': item['cluster_name'],
'data_type': 'user_label',
'title': item['display_name'],
}
user_label_props.append(tmp)
res = [
{
'title': '用户属性',
'id': 'user',
'category': user_props
},
{
'title': '用户标签',
'id': 'user_label',
'category': user_label_props
}
]
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/attr_edit")
async def edit_data_attr(

View File

@ -25,20 +25,18 @@ async def create(
try:
res_project = await crud.project.create(db, data_in, current_user=request.user)
await crud.project_number.createxiangmu(db, data_in)
# game = data_in.game
# name = ['乐谷管理员', '发行方管理员']
# # path列表
# res = await crud.api_board.all_api(db)
# for i in name:
# path_list = []
# for data in res:
# if data['name'] == i:
# path_list.append(data['api_path'])
# # 存管理员角色
# id = await crud.role.add_role_project(db, game, i)
# for path in path_list:
# # 存路径
# casbin_enforcer.add_policy(id, game, path, '*')
# 同步插入项目
# await crud.project_number.createxiangmu(db, data_in)
# 同步存入root权限中新项目的权限
user_url = await crud.user_url.get_quanxian(db,
schemas.Url_quanxian(user_id='04491842be9811eb8acdd5bd867f57d6'))
user_url['game'].append(data_in.game)
user_url['quanxian_id'].append('ab1')
user_url['quanxian'].append('root')
await crud.user_url.updata_quanxian(db, schemas.Url_quanxian(user=user_url['user'], user_id=user_url['user_id'],
game=user_url['game'],
quanxian_id=user_url['quanxian_id'],
quanxian=user_url['quanxian']))
except pymongo.errors.DuplicateKeyError:
return schemas.Msg(code=-1, msg='项目名已存在', data='项目名已存在')
@ -84,9 +82,10 @@ async def read_project(request: Request,
resp = await crud.project.all_game(db)
resp = sorted(resp, key=lambda x: x.get('sort') or 999)
else:
game_list = casbin_enforcer.get_domains_for_user(request.user.username)
resp = await crud.project.get_my_game(db, game_list)
# game_list = casbin_enforcer.get_domains_for_user(request.user.username)
# resp = await crud.project.get_my_game(db, game_list)
project_data = await crud.user_url.get_quanxian(db, schemas.Url_quanxian(user_id=request.user.id))
resp = await crud.project.get_my_game(db, project_data['game'])
return schemas.Msg(code=0, msg='ok', data=resp)
#获取项目名和渠道名project_name
@router.get("/project_name")
@ -176,7 +175,25 @@ async def add_members(request: Request,
members=[item.username for item in data_in.members]))
return schemas.Msg(code=0, msg='ok', data=data_in)
@router.post("/import_member")
async def import_member(request: Request,
game:str,
data_in:schemas.Import_project,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""成员管理中的导入其他项目的成员到本项目中"""
res=await crud.user_url.get_all(db)
for i in res:
for nu in range(len(i['game'])):
if data_in.games == i['game'][nu] and game not in i['game']:
i['game'].append(game)
i['quanxian_id'].append(i['quanxian_id'][nu])
i['quanxian'].append(i['quanxian'][nu])
await crud.user_url.updata_quanxian(db,schemas.Url_quanxian(game=i['game'],user=i['user'],user_id=i['user_id'],quanxian_id=i['quanxian_id'],
quanxian=i['quanxian']))
return schemas.Msg(code=0, msg='ok',data='' )
@router.post("/edit_member")
async def edit_member(request: Request,
@ -199,35 +216,54 @@ async def members(request: Request,
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""查看项目成员"""
data = casbin_enforcer.get_all_users_by_domain(game)
# data = casbin_enforcer.get_all_users_by_domain(game)
# names = []
# role_ids = []
# for item in data:
# names.append(item['username'])
# role_ids.append(item['role_id'])
# users = await crud.user.get_by_users(db, {'name': {'$in': names}})
# roles = await crud.role.find_ids(db, role_ids)
# users = {item.name: item.dict() for item in users.data}
# roles = {item['_id']: item['name'] for item in roles}
# res = []
# for item in data:
# username = item['username']
# role_id = item['role_id']
# try:
# res.append({
# **users[username],
# 'role': roles[role_id],
# 'role_id': role_id,
# })
# except:
# pass
# # res.append({
# # **users[username],
# # 'role': roles[role_id],
# # 'role_id': role_id,
# # })
# return schemas.Msg(code=0, msg='ok', data=res)
res = await crud.user_url.get_all(db)
# 符合当前项目权限的用户
names = []
role_ids = []
for item in data:
names.append(item['username'])
role_ids.append(item['role_id'])
# 符合当前项目权限的用户的对应权限级别
quanxian = {}
quanxian_id = {}
for i in res:
for nu in range(len(i['game'])):
if game == i['game'][nu]:
names.append(i['user'])
quanxian[i['user']] = i['quanxian'][nu]
quanxian_id[i['user']] = i['quanxian_id'][nu]
users = await crud.user.get_by_users(db, {'name': {'$in': names}})
roles = await crud.role.find_ids(db, role_ids)
users = {item.name: item.dict() for item in users.data}
roles = {item['_id']: item['name'] for item in roles}
res = []
for item in data:
username = item['username']
role_id = item['role_id']
try:
res.append({
**users[username],
'role': roles[role_id],
'role_id': role_id,
})
except:
pass
# res.append({
# **users[username],
# 'role': roles[role_id],
# 'role_id': role_id,
# })
return schemas.Msg(code=0, msg='ok', data=res)
data = []
for use in users.data:
use = use.dict()
use['role'] = quanxian[use['name']]
use['role_id'] = quanxian[use['name']]
data.append(use)
return schemas.Msg(code=0, msg='ok', data=data)
# @router.post("/del_member")
# async def members(request: Request,
# game: str,

View File

@ -2,14 +2,14 @@ import datetime
from collections import defaultdict
import mimetypes
from urllib.parse import quote
import os
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from fastapi.encoders import jsonable_encoder
from motor.motor_asyncio import AsyncIOMotorDatabase
from fastapi.responses import StreamingResponse
from datetime import datetime
import crud, schemas
from common import *
@ -21,7 +21,7 @@ from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
from models.user_analysis import UserAnalysis
from models.x_analysis import XAnalysis
from utils import DfToStream
from utils import DfToStream, getEveryDay
router = APIRouter()
@ -1255,4 +1255,256 @@ async def user_property_model(
'title': title
})
@router.post("/seek_user")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_seek_user,
ckdb: CKDrive = Depends(get_ck_db)
) -> schemas.Msg:
"""游戏用户搜索功能"""
#判断的内容
data=data_in.condition
#需要判断的字段
ziduan=data_in.user_arrt_title
#筛选条件
tiaojian=data_in.comparator_id
if tiaojian == '==':
tiaojian = '='
#判断是否是时间类型
if data_in.user_arrt_type == 'datetime':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10}"""
#如果查询'#account_id'则不多余返回一个account_id
elif ziduan == '#account_id':
sql=f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10} """
elif data_in.user_arrt_type == 'int':
sql=f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10}"""
else:
sql=f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10}"""
# 查询数据
try:
df = await ckdb.query_dataframe(sql)
except Exception as e:
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
# 转换成列表返回
df.fillna(0, inplace=True)
account_id=list(df['#account_id'])
new_sql=f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
df1= await ckdb.query_dataframe(new_sql)
res = {'refer':{
'columns': df.columns.tolist(),
'values': df.values.tolist()
},
'details_data':{
'new_columns':df1.columns.tolist(),
'new_values':df1.values.tolist()
}}
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/seek_user_count")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_seek_user,
ckdb: CKDrive = Depends(get_ck_db)
) -> schemas.Msg:
"""游戏用户搜索功能查询到的数量"""
#判断的内容
data=data_in.condition
#需要判断的字段
ziduan=data_in.user_arrt_title
#筛选条件
tiaojian=data_in.comparator_id
if tiaojian == '==':
tiaojian = '='
#判断是否是时间类型
if data_in.user_arrt_type == 'datetime':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
#如果查询'#account_id'则不多余返回一个account_id
elif ziduan == '#account_id':
sql=f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
elif data_in.user_arrt_type == 'int':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
else:
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
#查询数据
try:
df = await ckdb.query_dataframe(sql)
except Exception as e:
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
#返回查询到的数量
res=len(df)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/download_user")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_seek_user,
ckdb: CKDrive = Depends(get_ck_db)
):
"""下载查询到的所有数据"""
#判断的内容
data=data_in.condition
#需要判断的字段
ziduan=data_in.user_arrt_title
#筛选条件
tiaojian=data_in.comparator_id
if tiaojian == '==':
tiaojian = '='
#判断是否是时间类型
if data_in.user_arrt_type == 'datetime':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
#如果查询'#account_id'则不多余返回一个account_id
elif ziduan == '#account_id':
sql=f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
elif data_in.user_arrt_type == 'int':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
else:
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
#查询数据
try:
df = await ckdb.query_dataframe(sql)
except Exception as e:
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
if df.empty:
return schemas.Msg(code=-9, msg='无数据',data='')
account_id = list(df['#account_id'])
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
df1 = await ckdb.query_dataframe(new_sql)
file_name=quote(f'下载的用户搜索数据.xlsx')
mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df1, '下载的用户搜索数据'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/solo_user")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_solo_user,
ckdb: CKDrive = Depends(get_ck_db)
):
"""用户的详情"""
event_dict={'pay':'充值','create_account':'创建角色','login':'登录','ta_app_end':'离开游戏','guide':'新手引导','level_up':'玩家等级',
'vip_level':'vip等级','sign':'签到','summon':'招募','ask_for_join_guild':'加入联盟','leave_guild':'离开联盟','create_guild':'创建联盟'}
sql=f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` = '{data_in.account_id}'"""
#获取用户基本详情
df= await ckdb.query_dataframe(sql)
#获取用户每天事件量
start_times=data_in.start_time.split(' ')[0]
end_times=data_in.end_time.split(' ')[0]
event = list(event_dict.keys())
sql1 = f"""select toDate(addHours(`#event_time`, `#zone_offset`)) as date,count(`#event_name`) as v from {game}.event
where `date`>='{start_times}' and `date`<='{end_times}'
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by date ORDER by date"""
df1=await ckdb.query_dataframe(sql1)
#时间间隔天数
global event_values, data_list,game_details,zhanbi
if len(df1) >0:
time_interval=getEveryDay(start_times,end_times)
a = list(df1['date'])
aa=[]
for i in a:
aa.append(str(i))
for i in time_interval:
if i not in aa:
df1.loc[len(df1.index)] = [i, 0]
df1[['date']]=df1[['date']].astype(str)
df1.sort_values('date',inplace=True)
data_list=list(df1['date'])
event_values=list(df1['v'])
else:
data_list= [] #getEveryDay(start_times,end_times)
event_values=[]
#获取用户事件的详情
sql2=f"""select * FROM {game}.event WHERE `#account_id`='{data_in.account_id}' and addHours(`#event_time`, `#zone_offset`) >='{data_in.start_time}' and
addHours(`#event_time`, `#zone_offset`) <= '{data_in.end_time}' and `#event_name` in ({event}) order by `#event_time`"""
df2=await ckdb.query_dataframe(sql2)
if len(df2) > 0:
game_details={}
#区分天数
days=list(df2['#event_time'])
day_set=set()
for i in days:
day_set.add(str(i).split(' ')[0])
#总日期,一天的
day_list=list(day_set)
day_list.sort()
for day in day_list:
game_deta = []
for nu in range(len(df2)):
if day in str(df2['#event_time'][nu]):
#详细时间
game_detail={}
time_s=str(df2['#event_time'][nu]).split('+')[0]
game_detail['time']=time_s.split(' ')[1]
game_detail['event']=event_dict[df2['#event_name'][nu]]
a_list = []
#获取df的字段名
columns=df2.columns.values
for col in columns:
a=str(df2[col][nu])
if a != 'None' and a != '' and a != 'nan' and a != '[]':
a_list.append({'title':col,'val':a})
game_detail['xaingqing']=a_list
game_deta.append(game_detail)
game_details[day]=game_deta
else:
game_details = {}
#event_count = await ckdb.yesterday_event_count(game)
#求事件占比
sql3=f"""select `#event_name` as a,count(`#event_name`) as v from {game}.event
where addHours(`#event_time`, `#zone_offset`)>='{data_in.start_time}' and addHours(`#event_time`, `#zone_offset`)<='{data_in.end_time}'
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by `#event_name`"""
df3 = await ckdb.query_dataframe(sql3)
if len(df3) > 0:
zhanbi=[]
sums=sum(list(df1['v']))
numbers=0
for i in range(len(df3)):
shuju={}
shuju['name']=event_dict[df3['a'][i]]
shuju['value']=int(df3['v'][i])
# if i != len(df3)-1:
# number1=round(int(df3['v'][i]) / sums, 2)
# number=round(number1*100,2)
# numbers+=number
# shuju['zhanbi'] = str(number) + '%'
# else:
# shuju['zhanbi']=str(100-numbers) + '%'
zhanbi.append(shuju)
else:
zhanbi = []
res = {
'details_data':{
'new_columns':df.columns.tolist(),
'new_values':df.values.tolist()},
'event_count':{
'date':data_list,
'event_values':event_values
},
'details_user':game_details,
'proportion':zhanbi
}
return schemas.Msg(code=0, msg='ok', data=res)
@router.get("/event_list")
async def event_list(
request: Request,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""个人详情中的事件列表"""
#获取事件名
event_list = await ckdb.distinct(game, 'event', '#event_name')
return schemas.Msg(code=0, msg='ok', data=event_list)

View File

@ -9,7 +9,7 @@ import crud, schemas
from api import deps
from core import security
from core.config import settings
from utils import get_uid
from db import get_database
router = APIRouter()
@ -143,15 +143,19 @@ async def all_account(
创建新账号
"""
created = []
id = []
for name in data_in.account_list:
if is_exists := await crud.user.exists(db, {'name': name}):
continue
else:
new_account = schemas.UserCreate(name=name, password='123')
created.append(name)
await crud.user.create(db, new_account)
#创建账户并返回id
id_one=await crud.user.create(db, new_account)
id.append(id_one)
res = {
'created_account': created,
'password': '123'
'password': '123',
'id':id
}
return schemas.Msg(code=0, msg='ok', data=res)

View File

@ -15,4 +15,8 @@ from .crud_check_data import check_data
from .user_label import user_label
from .select_map import select_map
from .crud_project_number import project_number
from .crud_proid_map import proid_map
from .crud_proid_map import proid_map
from .crud_api_board import api_board
from .crud_url_list import url_list
from .crud_user_url import user_url
from .crud_api_module import api_module

35
crud/crud_api_board.py Normal file
View File

@ -0,0 +1,35 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'api_board',
from schemas import ProjectDB
from utils import get_uid
class CRUDProjectNumber(CRUDBase):
# 获取所有数据
async def all_api(self, db: AsyncIOMotorDatabase):
return await self.find_many(db)
# 修改数据
async def update(self, db: AsyncIOMotorDatabase, data_in: schemas.Api_board,opinion):
name = data_in.name
api_name=data_in.api_name
api_path=data_in.api_path
if opinion == True:
await self.update_one(db, {'name': name,'api_name':api_name}, {'$set': {'api_path': api_path}})
else:
await self.update_one(db, {'name': name, 'api_path': api_path}, {'$set': {'api_name':api_name}})
# 插入数据
async def insert(self, db: AsyncIOMotorDatabase, data_in: schemas.Api_board):
await self.insert_one(db, data_in.dict())
#删除数据
async def del_api(self, db: AsyncIOMotorDatabase, data_in: schemas.Api_board):
return await self.delete(db,data_in.dict())
api_board = CRUDProjectNumber('api_board')

35
crud/crud_api_module.py Normal file
View File

@ -0,0 +1,35 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'api_module',
class Api_module(CRUDBase):
# 获取权限模板信息
async def get_api_module(self, db: AsyncIOMotorDatabase):
return await self.find_many(db)
# 获取一个用户的权限信息
async def get_quanxian(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_quanxian):
return await self.find_one(db, {'user_id': data_in.user_id})
# 插入一条全新的用户权限信息
async def insert_quanxian(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_module):
return await self.insert_one(db, data_in.dict())
# 更新一条用户权限信息
async def updata_quanxian_module(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_module):
return await self.update_one(db, {'auth_id': data_in.auth_id, 'path_name': data_in.path_name},
{'$set': {'api_list': data_in.api_list, 'api_name': data_in.api_name,
'state': data_in.state}})
#获取一条权限模板信息
async def get_one_module(self, db: AsyncIOMotorDatabase, data_in: schemas.Add_module):
return await self.find_one(db, {'auth_id': data_in.auth_id})
#更新一条权限模板状态
async def update_one_module(self, db: AsyncIOMotorDatabase, res):
return await self.update_one(db, {'_id':res['_id']}, {
'$set': {'state':res['state']}})
api_module = Api_module('api_module')

View File

@ -12,7 +12,12 @@ class CRUDApiList(CRUDBase):
data = {'$set': schemas.AddRoleDB(**data_in.dict()).dict(by_alias=True)}
return await self.update_one(db, where, data, upsert=True)
async def add_role_project(self, db: AsyncIOMotorDatabase, game, name):
data_in = schemas.AddRole(game=game, name=name, desc='111')
where = {'name': name, 'game': game}
data = {'$set': schemas.AddRoleDB(**data_in.dict()).dict(by_alias=True)}
await self.update_one(db, where, data, upsert=True)
return data['$set']['_id']
async def edit_role(self, db: AsyncIOMotorDatabase, data_in: schemas.EditRole):
data = data_in.dict()
where = {'_id': data.pop('role_id')}

42
crud/crud_url_list.py Normal file
View File

@ -0,0 +1,42 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'url_list',
class Url_list(CRUDBase):
# 获取所有级别权限的所有路由和路由状体
async def get_all(self, db: AsyncIOMotorDatabase):
return await self.find_many(db)
# 获取对应级别权限的所有路由和路由状体
async def get_url(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_list):
return await self.find_many(db, {'name': data_in.name})
# 插入单条对应级别权限的路由和状态
async def insert_url(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_list):
return await self.insert_one(db, data_in.dict())
async def insert_urls(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_lists):
return await self.insert_one(db, data_in.dict())
# 更新单条对应级别权限的路由和状态
async def update_url_url(self, db: AsyncIOMotorDatabase, res):
return await self.update_one(db, {'_id':res['_id']}, {
'$set': {'state':res['state']}})
async def find_one_url(self, db: AsyncIOMotorDatabase, data_in: schemas.Datalist):
return await self.find_one(db, {'auth_id': data_in.role_id, 'path_name': data_in.path_name})
#修改权限用户名字
async def edit_name(self, db: AsyncIOMotorDatabase, data_in: schemas.Editname):
where = {'auth_id': data_in.role_id}
up_data = {'$set': {'name':data_in.name}}
return await self.update_many(db, where, up_data)
#删除一个权限用户
async def delete_name(self,db: AsyncIOMotorDatabase, data_in: schemas.Del_roles):
return await self.delete(db,{'auth_id':data_in.role_id})
url_list = Url_list('url_list')

View File

@ -41,8 +41,8 @@ class CRUDUser(CRUDBase):
nickname=obj_in.nickname,
_id=get_uid()
)
return await db[self.coll_name].insert_one(db_obj.dict(by_alias=True))
await db[self.coll_name].insert_one(db_obj.dict(by_alias=True))
return db_obj.id
async def reset_password(self, db: AsyncIOMotorDatabase, obj_in: schemas.UserRestPassword):
hashed_password = get_password_hash(obj_in.password)
await self.update_one(db, {'name': obj_in.username}, {'$set': {'hashed_password': hashed_password}})
@ -63,6 +63,9 @@ class CRUDUser(CRUDBase):
res = await self.find_many(db, *args, **kwargs)
return schemas.Users(data=res)
async def get_all_users(self,db,where):
return await self.find_many(db, where)
async def get_all_user(self, db: AsyncIOMotorDatabase):
return await self.distinct(db, 'name')

28
crud/crud_user_url.py Normal file
View File

@ -0,0 +1,28 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'user_url',
class User_url(CRUDBase):
# 获取一个用户的权限信息
async def get_quanxian(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_quanxian):
return await self.find_one(db, {'user_id': data_in.user_id})
# 插入一条全新的用户权限信息
async def insert_quanxian(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_quanxian):
return await self.insert_one(db, data_in.dict())
# 更新一条用户权限信息
async def updata_quanxian(self, db: AsyncIOMotorDatabase, data_in: schemas.Url_quanxian):
return await self.update_one(db, {'user': data_in.user, 'user_id': data_in.user_id},
{'$set': {'game': data_in.game,'quanxian_id':data_in.quanxian_id, 'quanxian': data_in.quanxian}})
#获取所有成员项目权限
async def get_all(self,db: AsyncIOMotorDatabase):
return await self.find_many(db)
user_url = User_url('user_url')

49
main.py
View File

@ -71,9 +71,56 @@ class BasicAuth(AuthenticationBackend):
def login_expired(conn: HTTPConnection, exc: Exception) -> Response:
return JSONResponse(schemas.Msg(code=-5, msg='请重新登录').dict(), status_code=200)
#处理路由权限问题
@app.middleware("http")
async def panduan_quanxian_url(request: Request, call_next):
#user_id=request.user.id
#user=request.user.username
start_time = int(time.time() * 1000)
response = await call_next(request)
process_time = int(time.time() * 1000) - start_time
response.headers["X-Process-Time"] = str(process_time)
url=request.url.path
if 'docs' in url or 'openapi.json' in url:
return response
if url == '/api/v1/user/login':
return response
game=request.url.query.split('=')[1]
if 'undefined' in game:
return response
if '&' in game:
game=game.split('&')[0]
judge_url = await crud.user_url.get_quanxian(get_database(), schemas.Url_quanxian(user_id=request.user.id))
if judge_url == {}:
# data='没有匹配这个游戏'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
if game not in judge_url['game']:
#data='没有匹配这个游戏'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='' ).json())
quanxian_dict={}
for i in range(len(judge_url['game'])):
quanxian_dict[judge_url['game'][i]]=judge_url['quanxian'][i]
user_list=await crud.url_list.get_url(get_database(),schemas.Url_list(name=quanxian_dict[game]))
api_list=[]
state_list=[]
api_dict={}
for i in user_list:
for api in i['api_list']:
api_list.append(api)
for quanxian in i['state']:
state_list.append(quanxian)
for i in range(len(api_list)):
api_dict[api_list[i]]=state_list[i]
if url not in api_list:
# data='没有对应路由'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
elif api_dict[url] != True:
# data='路由为False'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
return response
app.add_middleware(CasbinMiddleware, enforcer=casbin_enforcer)
#app.add_middleware(CasbinMiddleware, enforcer=casbin_enforcer)
app.add_middleware(AuthenticationMiddleware, backend=BasicAuth(), on_error=login_expired)
app.add_middleware(

View File

@ -19,4 +19,8 @@ from .check_data import *
from .userlabel import *
from .select_map import *
from .project_number import *
from .proid_map import *
from .proid_map import *
from .api_board import *
from .url_list import *
from .user_url import *
from .api_module import *

12
schemas/api_board.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Any, List, Union
from pydantic import BaseModel, Field
from schemas import DBBase
from typing import Optional
class Api_board(BaseModel):
api_path: str = None
api_name: str = None
name: str

18
schemas/api_module.py Normal file
View File

@ -0,0 +1,18 @@
from typing import Any, List, Union
from pydantic import BaseModel, Field
from schemas import DBBase
from typing import Optional
class Url_module(BaseModel):
auth_id: str = None
path_name: str = None
api_list: List[str] = None
api_name: List[str] = None
state: List[bool] = None
class Add_module(BaseModel):
auth_id: str
url:str

View File

@ -8,6 +8,7 @@ class AddRoleForUserInDomain(BaseModel):
username: str
role_id: str
game: str
auth_id: str
class AddRoleForUsersInDomain(BaseModel):

View File

@ -35,6 +35,9 @@ class ProjectDetail(BaseModel):
class ProjectClean(BaseModel):
project_id: str
class Import_project(BaseModel):
game: str
games: str
class ProjectRename(BaseModel):
project_id: str

View File

@ -13,3 +13,21 @@ class CkQuery(BaseModel):
events: Union[List[dict], dict] = None
report_id: str = None
ext_filter: dict = dict()
class Ck_seek_user(BaseModel):
user_arrt_title: str # 用户属性
user_arrt_id: str # 用户属性id
user_arrt_type: str # 用户属性type
comparator_title: str # 筛选条件
comparator_id: str # 筛选条件id
condition: str # 手动输入条件,区间用~符号隔开如0~10
start_time: str # 开始时间
end_time: str # 结束时间
pages: int = 1 # 分页的当前页
class Ck_solo_user(BaseModel):
account_id : str # #account_id
start_time: str # 开始时间 例2022-04-02
end_time: str # 结束时间
event_list: List[str] #事件名

61
schemas/url_list.py Normal file
View File

@ -0,0 +1,61 @@
from typing import Any, List, Union
from pydantic import BaseModel, Field
from schemas import DBBase
from typing import Optional
class Url_list(BaseModel):
name: str = None
auth_id: str = None
path_name: str = None
api_list: List[str] = None
api_name: List[str] = None
state: List[bool] = None
system: int = None
class Url_lists(BaseModel):
name: str = None
auth_id: str = None
path_name: str = None
api_list: List[str] = None
api_name: List[str] = None
state: List[bool] = None
system: int = None
game: str = None
class Url_data(BaseModel):
api_list: List[str] = None
api_name: List[str] = None
path_name: str = None
stath_name: List[bool] = None
class Datalist(BaseModel):
path: str
path_name: str
role_id: str
system: int
class Add_role(BaseModel):
path_name: List[str]
system: int
name: str
class Del_role(BaseModel):
path: str
path_name: str
role_id: str
class Editname(BaseModel):
role_id: str #= Field(..., description='要编辑的id')
name: str = None
desc: str = None
class Del_roles(BaseModel):
game:str
role_id:str
username:str

14
schemas/user_url.py Normal file
View File

@ -0,0 +1,14 @@
from typing import Any, List, Union
from pydantic import BaseModel, Field
from schemas import DBBase
from typing import Optional
class Url_quanxian(BaseModel):
game: List[str] = None
user: str = None
user_id: str = None
quanxian: List[str] = None
quanxian_id:List[str] = None

View File

@ -1,6 +1,6 @@
import random
import time
import datetime
def get_uid():
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
@ -34,4 +34,16 @@ def dict_to_str(dic):
c += "\"%s\":\"%s\"," % (k, v)
else:
c += "\"%s\":\"%s\"}" % (k, v)
return c
return c
def getEveryDay(begin_date,end_date):
# 前闭后闭
date_list = []
begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date,"%Y-%m-%d")
while begin_date <= end_date:
date_str = begin_date.strftime("%Y-%m-%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
return date_list
#print(getEveryDay('2016-01-01','2017-05-11'))