1959 lines
82 KiB
Python
1959 lines
82 KiB
Python
import datetime
|
||
from collections import defaultdict
|
||
import mimetypes
|
||
from urllib.parse import quote
|
||
import os
|
||
from copy import deepcopy
|
||
import pandas as pd
|
||
import numpy as np
|
||
from fastapi import APIRouter, Depends, Request,File
|
||
from fastapi.encoders import jsonable_encoder
|
||
from motor.motor_asyncio import AsyncIOMotorDatabase
|
||
from fastapi.responses import StreamingResponse
|
||
#from datetime import datetime
|
||
import crud, schemas
|
||
from common import *
|
||
|
||
from api import deps
|
||
from db import get_database
|
||
from db.ckdb import get_ck_db, CKDrive
|
||
from db.redisdb import get_redis_pool, RedisDrive
|
||
|
||
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
|
||
from models.user_analysis import UserAnalysis
|
||
from models.x_analysis import XAnalysis
|
||
from utils import DfToStream, getEveryDay, Download_xlsx,jiange_insert,create_df,create_neidf
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.post("/sql")
|
||
async def query_sql(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Sql,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""原 sql 查询 """
|
||
sql = data_in.sql
|
||
sql1=sql.lower()
|
||
if 'insert' not in sql1 and 'update' not in sql1 and 'delete' not in sql1 and 'select' in sql1:
|
||
sql = sql.replace('$game', game)
|
||
data = await ckdb.execute(sql)
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
else:
|
||
return schemas.Msg(code=0, msg='ok', data='当前只支持SQL查询语句!')
|
||
|
||
|
||
@router.post("/sql_export")
|
||
async def query_sql(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Sql,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
"""sql 导出 """
|
||
file_name = quote(f'result.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
|
||
sql = data_in.sql
|
||
sql = sql.replace('$game', game)
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
df_to_stream = DfToStream((df, 'result'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/event_model_sql")
|
||
async def event_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
""" 事件分析模型 sql"""
|
||
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.event_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
|
||
|
||
@router.post("/event_model_export")
|
||
async def event_model_export(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
""" 事件分析模型 数据导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
sqls = await analysis.event_model_sql()
|
||
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
excels = []
|
||
for item in sqls:
|
||
if item.get('combination_event'):
|
||
continue
|
||
sql = item['sql']
|
||
event_name = item['event_name']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
continue
|
||
if 'date' in df:
|
||
df.sort_values('date', inplace=True)
|
||
try:
|
||
df['date'] = df['date'].dt.tz_localize(None)
|
||
except:
|
||
pass
|
||
excels.append((df, event_name))
|
||
df_to_stream = DfToStream(*excels)
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
@router.post("/event_model_pay")
|
||
async def event_model_export(request: Request,
|
||
game: str,
|
||
data_in: schemas.Times,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
):
|
||
"""下载日充总额详细订单数据"""
|
||
sql=f"""select * FROM {game}.event WHERE addHours(`#event_time`, 8) >= '{data_in.start_time}' and addHours(`#event_time`, 8) <= '{data_in.end_time}' and `#event_name` = 'pay' and
|
||
orderid NOT LIKE '%GM%' order by `#event_time`"""
|
||
df = await ckdb.query_dataframe(sql)
|
||
list_columns=list(df.columns.values)
|
||
drop_list=[]
|
||
for i in list_columns:
|
||
aa=type(df[i][0])
|
||
if df[i][0] == None or df[i][0] == [] or df[i][0] == '':
|
||
drop_list.append(i)
|
||
else:
|
||
if 'time' in i :
|
||
df[i] = df[i].astype(str)
|
||
for nu in range(len(df)):
|
||
df.replace(to_replace=df[i][nu],value=df[i][nu].split('+')[0],inplace=True)
|
||
|
||
df.drop(drop_list, axis=1, inplace=True)
|
||
file_name=quote(f'订单详情.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
df_to_stream = DfToStream((df, '订单详情'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
# @router.get("/event_model_export")
|
||
# async def event_model_export(request: Request,
|
||
# game: str,
|
||
# report_id: str,
|
||
# ckdb: CKDrive = Depends(get_ck_db),
|
||
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
# current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
# ):
|
||
# """ 事件分析模型 数据导出"""
|
||
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
|
||
# await analysis.init(data_where=current_user.data_where)
|
||
# sqls = analysis.event_model_sql()
|
||
# res = []
|
||
# file_name = f'{sqls[0]["report_name"]}.xlsx'
|
||
# mime = mimetypes.guess_type(file_name)[0]
|
||
# for item in sqls[:1]:
|
||
# sql = item['sql']
|
||
# event_name = item['event_name']
|
||
# df = await ckdb.query_dataframe(sql)
|
||
# file = df_to_stream(df, event_name)
|
||
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
#
|
||
|
||
@router.post("/event_model")
|
||
async def event_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.CkQuery,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
rdb: RedisDrive = Depends(get_redis_pool),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
""" 事件分析"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
try:
|
||
sqls = await analysis.event_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
res = []
|
||
is_hide = []
|
||
group_label = {}
|
||
for idx, item in enumerate(sqls): #列出索引下标
|
||
if item.get('is_show') == False:
|
||
is_hide.append(idx)
|
||
#event_name:事件名,日充总额
|
||
#format:float浮点型
|
||
q = {
|
||
'groups': [],
|
||
'values': [],
|
||
'sum': [],
|
||
'avg': [],
|
||
'event_name': item['event_name'],
|
||
'format': item['format'],
|
||
'last_value': 0,
|
||
'start_date': item['start_date'],
|
||
'end_date': item['end_date'],
|
||
'time_particle': item['time_particle']
|
||
}
|
||
# 处理组合问题,如combination_event不存在则跳过
|
||
if item.get('combination_event'):
|
||
combination_event = CombinationEvent(res, item.get('combination_event'), item['format'])
|
||
values, sum_, avg = combination_event.parse()
|
||
|
||
# q['values'].append(values)
|
||
#q['sum'].append(sum_)
|
||
q['avg'].append(avg)
|
||
q['date_range'] = item['date_range']
|
||
for last_value in values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
if list(item.get('event_name'))[-1] == '率':
|
||
for i in range(len(values)):
|
||
values[i]=str((values[i]))+'%'
|
||
q['values'].append(values)
|
||
q['sum'].append(str(sum_)+'%')
|
||
elif '比' in item['event_name']:
|
||
for i in range(len(values)):
|
||
values[i]=str(int(float(values[i])*100))+'%'
|
||
q['values'].append(values)
|
||
q['sum'].append(str(int(float(sum_)*100))+'%')
|
||
else:
|
||
q['values'].append(values)
|
||
q['sum'].append(sum_)
|
||
res.append(q)
|
||
continue
|
||
#sql语句
|
||
sql = item['sql']
|
||
groupby = item['groupby']
|
||
date_range = item['date_range'] #获取的要查询的每一天的时间
|
||
q['date_range'] = date_range #把要查询的时间加入q字典中
|
||
df = await ckdb.query_dataframe(sql) #以sql语句查出数据,df是二维列表
|
||
df.fillna(0, inplace=True)#以0填补空数据
|
||
#映射对应中文返回给前端展示
|
||
for i in groupby:
|
||
if i == 'svrindex':
|
||
if game == 'mfmh5':
|
||
game = 'mzmfmh5'
|
||
chinese={}
|
||
resp = await crud.select_map.get_one(db, game,i)
|
||
for ii in resp:
|
||
chinese[ii['id']]=ii['title']
|
||
for k,v in chinese.items():
|
||
#开始映射
|
||
df.loc[df[i] == k, i] = v
|
||
|
||
#获取第一矩阵的长度
|
||
if df.shape[0] == 0:
|
||
df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
|
||
# continue
|
||
# return schemas.Msg(code=0, msg='ok', data=[q])
|
||
if item['time_particle'] == 'total':
|
||
# for group, df_group in df.groupby(groupby):
|
||
# df_group.reset_index(drop=True, inplace=True)
|
||
q['groups'].append(groupby)
|
||
q['values'].append(df['values'].to_list())
|
||
q['sum'].append(round(float(df['values'].sum()), 2))
|
||
q['avg'].append(round(float(df['values'].mean()), 2))
|
||
for last_value in df['values'].values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
if groupby and (set(groupby) & set(df) == set(groupby)):
|
||
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
|
||
else:
|
||
q['date_range'] = ['合计']
|
||
# 暂时只执行像素的计费点加别名
|
||
if game == 'xiangsu':
|
||
if item['groupby'][0] == 'proid' and analysis.events[0]['event_name'] == 'pay':
|
||
# 将对应英文的中文意思按位置一一对应返回给前端
|
||
proid_dict = await crud.proid_map.get_all_show_name(db, game)
|
||
res_list = []
|
||
for i in q['date_range']:
|
||
try:
|
||
name = proid_dict[i]
|
||
res_list.append(name)
|
||
except:
|
||
pass
|
||
|
||
q['proid_name'] = res_list
|
||
# 将proid字段和金额money按对应关系组合成字典并算出对应的总额返回给前端
|
||
money_dict = await crud.proid_map.get_all_show_money(db, game)
|
||
add_money = []
|
||
number = q['values'][0]
|
||
next = -1
|
||
for i in q['date_range']:
|
||
next += 1
|
||
try:
|
||
mongey = money_dict[i]
|
||
add = number[next] * mongey
|
||
add_money.append(add)
|
||
except:
|
||
pass
|
||
q['proid_money'] = add_money
|
||
# 首充金额分布
|
||
# if item['groupby'][0] == 'money' and analysis.events[0]['event_name'] == 'pay':
|
||
# # 将proid字段和金额money按对应关系组合成字典并算出对应的总额返回给前端
|
||
# money_dict = await crud.proid_map.get_all_show_money(db, game)
|
||
# add_money = []
|
||
# number = q['values'][0]
|
||
# next = -1
|
||
# for i in q['date_range']:
|
||
# next += 1
|
||
# mongey = money_dict[i]
|
||
# add = number[next] * mongey
|
||
# add_money.append(add)
|
||
# q['proid_money'] = add_money
|
||
res.append(q)
|
||
continue
|
||
if groupby and (set(groupby) & set(df)) == set(groupby):
|
||
columns = groupby[0]
|
||
df[columns] = df[columns].astype(str)
|
||
# 有分组
|
||
for group, df_group in df.groupby(groupby):
|
||
#在原数据上将索引重新转换为列,新索引的列删除
|
||
df_group.reset_index(drop=True, inplace=True)
|
||
#判断为0的改成未知城市
|
||
if str(group) == '0' and analysis.event_view['groupBy'][0]['columnDesc']== '城市':
|
||
q['groups'].append('未知城市')
|
||
else:
|
||
if 'str' in str(type(group)):
|
||
q['groups'].append(str(group))
|
||
else:
|
||
q['groups'].append(str(list(group)))
|
||
concat_data = []
|
||
for i in set(date_range) - set(df_group['date']):
|
||
if len(groupby) > 1:
|
||
concat_data.append((i, *group, 0))
|
||
else:
|
||
concat_data.append((i, group, 0))
|
||
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
|
||
df_group.sort_values('date', inplace=True)
|
||
q['values'].append(df_group['values'].to_list())
|
||
q['sum'].append(round(float(df_group['values'].sum()), 2))
|
||
q['avg'].append(round(float(df_group['values'].mean()), 2))
|
||
for last_value in df['values'].values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
|
||
else:
|
||
# 无分组
|
||
concat_data = []
|
||
for i in set(date_range) - set(df['date']):
|
||
concat_data.append((i, 0))
|
||
#纵向拼接两个表
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
#在原数据上按data排序
|
||
df.sort_values('date', inplace=True)
|
||
if len(df) >= 2:
|
||
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
|
||
if len(df) >= 8:
|
||
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0
|
||
q['values'].append(abs(df['values']).to_list())
|
||
for last_value in df['values'].values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
#求所有值的和
|
||
q['sum'].append(round(abs(float(df['values'].sum())), 2))
|
||
#求平均值
|
||
q['avg'].append(round(float(df['values'].mean()), 2))
|
||
|
||
# q['eventNameDisplay']=item['event_name_display']
|
||
res.append(q)
|
||
group_label = item['group_label']
|
||
# 按总和排序
|
||
for item in res:
|
||
try:
|
||
if item['time_particle'] in ('P1D', 'P1W'): #按格式修改年月日
|
||
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
|
||
elif item['time_particle'] in ('P1M',):
|
||
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
|
||
else:
|
||
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
|
||
except:
|
||
pass
|
||
|
||
sort_key = np.argsort(np.array(item['sum']))[::-1]#将sum中的元素从小到大排列后的结果,提取其对应的索引,然后倒着输出到变量之中
|
||
if item.get('groups'):
|
||
item['groups'] = np.array(item['groups'])[sort_key].tolist()
|
||
groups = []
|
||
for gitem in item['groups']:
|
||
gb = []
|
||
if '(' in gitem:
|
||
gitem = gitem.strip('(').strip(')').replace(' ', '').replace("'", '')
|
||
true_list = gitem.split(',')
|
||
for gstr in true_list:
|
||
gb.append(gstr)
|
||
# 存在标签分组项
|
||
if group_label:
|
||
for name, idx in group_label.items():
|
||
gb.insert(idx, name)
|
||
groups.append(str(gb))
|
||
item['groups'] = groups
|
||
item['values'] = np.array(item['values'])[sort_key].tolist()
|
||
item['sum'] = np.array(item['sum'])[sort_key].tolist()
|
||
item['avg'] = np.array(item['avg'])[sort_key].tolist()
|
||
res = [item for idx, item in enumerate(res) if idx not in is_hide]
|
||
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/retention_model_sql")
|
||
async def retention_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""留存查询 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.retention_model_sql2()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/retention_model")
|
||
async def retention_model(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
await analysis.init(data_where=current_user.data_where)
|
||
try:
|
||
res = await analysis.retention_model_sql2() #初始化开始时间结束时间,sql语句 字典
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
sql = res['sql'] #获取到sql语句
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
date_range = res['date_range'] #时间 列表
|
||
unit_num = res['unit_num'] #int
|
||
retention_n = res['retention_n'] #列表 int
|
||
filter_item_type = res['filter_item_type'] #all
|
||
filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30
|
||
df.set_index('reg_date', inplace=True)
|
||
for d in set(res['date_range']) - set(df.index):
|
||
df.loc[d] = 0
|
||
|
||
df.sort_index(inplace=True)
|
||
summary_values = {'均值': {}}
|
||
max_retention_n = 1
|
||
#留存人数
|
||
avg = {}
|
||
#流失人数
|
||
avgo = {}
|
||
for date, v in df.T.items():
|
||
#字典中data存在时不替换,否则将data替换成空字典
|
||
tmp = summary_values.setdefault(date, dict())
|
||
tmp['d0'] = int(v.cnt0)
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
for i in retention_n:
|
||
n = (pd.Timestamp.now().date() - date).days
|
||
if i > n:
|
||
continue
|
||
# max_retention_n = i if i > max_retention_n else max_retention_n
|
||
#留存的人数
|
||
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
|
||
#流失的人数
|
||
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
|
||
tmp['p'].append(v[f'p{i}'])
|
||
tmp['n'].append(v[f'cnt{i}'])
|
||
tmp['p_outflow'].append(v[f'op{i}'])
|
||
tmp['n_outflow'].append(v[f'on{i}'])
|
||
tmp = summary_values['均值']
|
||
retention_avg_dict = {}
|
||
|
||
for rn in retention_n:
|
||
for rt, rd in df.T.items():
|
||
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
|
||
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0,'o_cnt0':0,'o_cntn':0})
|
||
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
|
||
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
|
||
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
tmp['d0'] = 0
|
||
for rt, rd in retention_avg_dict.items():
|
||
tmp['d0'] = int(df['cnt0'].sum())
|
||
n = round(rd['cntn'] * 100 / rd['cnt0'],2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p'].append(n)
|
||
tmp['n'].append(rd['cntn'])
|
||
n = round(rd['o_cntn'] * 100 / rd['cnt0'],2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p_outflow'].append(n)
|
||
tmp['n_outflow'].append(rd['o_cntn'])
|
||
|
||
#次留数
|
||
title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]]
|
||
|
||
# 未到达的日期需要补齐-
|
||
retention_length = len(retention_n)
|
||
for _, items in summary_values.items():
|
||
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
|
||
items[key].extend(['-'] * (retention_length - len(items[key])))
|
||
|
||
resp = {
|
||
'summary_values': summary_values,
|
||
# 'values': values,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
|
||
'title': title,
|
||
'filter_item_type': filter_item_type,
|
||
'filter_item': filter_item,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
|
||
async def retention_model01(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.retention_model_sql2() #初始化开始时间结束时间,sql语句 字典
|
||
sql = res['sql'] #获取到sql语句
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
date_range = res['date_range'] #时间 列表
|
||
unit_num = res['unit_num'] #int
|
||
retention_n = res['retention_n'] #列表 int
|
||
filter_item_type = res['filter_item_type'] #all
|
||
filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30
|
||
df.set_index('reg_date', inplace=True)
|
||
for d in set(res['date_range']) - set(df.index):
|
||
df.loc[d] = 0
|
||
|
||
df.sort_index(inplace=True)
|
||
summary_values = {'均值': {}}
|
||
max_retention_n = 1
|
||
avg = {}
|
||
avgo = {}
|
||
for date, v in df.T.items():
|
||
#字典中data存在时不替换,否则将data替换成空字典
|
||
tmp = summary_values.setdefault(date, dict())
|
||
tmp['d0'] = int(v.cnt0)
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
for i in retention_n:
|
||
n = (pd.Timestamp.now().date() - date).days
|
||
if i > n:
|
||
continue
|
||
# max_retention_n = i if i > max_retention_n else max_retention_n
|
||
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
|
||
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
|
||
tmp['p'].append(round(100-v[f'p{i}'],2))
|
||
#tmp['p'].append(v[f'p{i}'])
|
||
tmp['n'].append(v[f'cnt{i}'])
|
||
tmp['p_outflow'].append(v[f'op{i}'])
|
||
tmp['n_outflow'].append(v[f'on{i}'])
|
||
tmp = summary_values['均值']
|
||
retention_avg_dict = {}
|
||
|
||
for rn in retention_n:
|
||
for rt, rd in df.T.items():
|
||
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
|
||
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0,'o_cnt0':0,'o_cntn':0})
|
||
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
|
||
|
||
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
|
||
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
tmp['d0'] = 0
|
||
for rt, rd in retention_avg_dict.items():
|
||
tmp['d0'] = int(df['cnt0'].sum())
|
||
n = round(100-(rd['cntn'] * 100 / rd['cnt0']), 2)
|
||
#n = round(rd['cntn'] * 100 / rd['cnt0'],2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p'].append(n)
|
||
tmp['n'].append(rd['cntn'])
|
||
n = round(rd['o_cntn'] * 100 / rd['cnt0'],2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p_outflow'].append(n)
|
||
tmp['n_outflow'].append(rd['o_cntn'])
|
||
|
||
|
||
title = ['日期', '用户数', '次流失', *[f'{i + 1}流失' for i in retention_n[1:]]]
|
||
|
||
# 未到达的日期需要补齐-
|
||
retention_length = len(retention_n)
|
||
for _, items in summary_values.items():
|
||
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
|
||
items[key].extend(['-'] * (retention_length - len(items[key])))
|
||
|
||
resp = {
|
||
'summary_values': summary_values,
|
||
# 'values': values,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
|
||
'title': title,
|
||
'filter_item_type': filter_item_type,
|
||
'filter_item': filter_item,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
|
||
|
||
@router.post("/retention_model_export")
|
||
async def retention_model_export(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
""" 留存分析模型 数据导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.retention_model_sql2()
|
||
file_name = quote(f'留存分析.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
|
||
sql = data['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
df_to_stream = DfToStream((df, '留存分析'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/retention_model_del", deprecated=True)
|
||
async def retention_model_del(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""留存数据模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.retention_model_sql()
|
||
sql = res['sql']
|
||
date_range = res['date_range']
|
||
event_a, event_b = res['event_name']
|
||
unit_num = res['unit_num']
|
||
title = await crud.event_mana.get_show_name(db, game, event_a)
|
||
title = f'{title}用户数'
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
concat_data = []
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
df['date'] = df['date'].apply(lambda x: x.date())
|
||
# 计算整体
|
||
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
|
||
summary_values = {}
|
||
for i, d1 in enumerate(date_range):
|
||
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
|
||
if not a:
|
||
continue
|
||
key = d1.strftime('%Y-%m-%d')
|
||
for j, d2 in enumerate(date_range[i:]):
|
||
if j > unit_num:
|
||
break
|
||
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
|
||
tmp = summary_values.setdefault(key, {})
|
||
tmp.setdefault('d0', len(a))
|
||
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
||
tmp.setdefault('n', []).append(len(a & b))
|
||
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
|
||
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
|
||
groups = set([tuple(i) for i in df[res['groupby']].values])
|
||
df.set_index(res['groupby'], inplace=True)
|
||
df.sort_index(inplace=True)
|
||
values = {}
|
||
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
|
||
for i, d1 in enumerate(date_range):
|
||
for g in groups:
|
||
if len(g) == 1:
|
||
continue
|
||
a = set(df.loc[g]['val_a']) if g in df.index else set()
|
||
if not a:
|
||
continue
|
||
key = d1.strftime("%Y-%m-%d")
|
||
tmp_g = values.setdefault(key, {})
|
||
for j, d2 in enumerate(date_range[i:]):
|
||
if j > unit_num:
|
||
break
|
||
b = set(df.loc[g]['val_b']) if g in df.index else set()
|
||
tmp = tmp_g.setdefault(','.join(g[1:]), {})
|
||
tmp.setdefault('d0', len(a))
|
||
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
||
tmp.setdefault('n', []).append(len(a & b))
|
||
data = {
|
||
'summary_values': summary_values,
|
||
'values': values,
|
||
'days': days,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
|
||
'title': title,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
|
||
|
||
@router.post("/funnel_model_sql")
|
||
async def funnel_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""漏斗数据模型 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.funnel_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/funnel_model")
|
||
async def funnel_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""漏斗数据模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.funnel_model_sql()
|
||
sql = res['sql']
|
||
#查询的时间
|
||
date_range = res['date_range']
|
||
cond_level = res['cond_level']
|
||
groupby = res['groupby']
|
||
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
# 补齐level数据
|
||
concat_data = []
|
||
for key, tmp_df in df.groupby(['date'] + groupby):
|
||
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
|
||
for item in not_exists_level:
|
||
key = key if isinstance(key, tuple) else (key,)
|
||
concat_data.append((*key, item, 0))
|
||
#合并数据
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
|
||
# df.set_index('date',inplace=True)
|
||
data_list = []
|
||
date_data = {}
|
||
if df.shape == (0, 0):
|
||
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
|
||
|
||
tmp = {'title': '总体'}
|
||
#以level分组后的和
|
||
tmp_df = df[['level', 'values']].groupby('level').sum()
|
||
#在原数据上对索引进行排序
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
data_list.append(tmp)
|
||
|
||
# 补齐日期
|
||
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
|
||
concat_data = []
|
||
for i in all_idx - set(df.set_index(['date', 'level']).index):
|
||
concat_data.append((*i, 0))
|
||
summary_df = pd.concat(
|
||
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
|
||
for key, tmp_df in summary_df.groupby('date'):
|
||
tmp_df = tmp_df.groupby('level').sum()
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp = dict()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
||
var = 0 if np.isnan(var) else var
|
||
tmp['p1'].append(var)
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
|
||
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
|
||
_['总体'] = tmp
|
||
|
||
if groupby:
|
||
# 补齐数据
|
||
concat_data = []
|
||
idx = set(df.set_index(['date'] + groupby).index)
|
||
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
|
||
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
|
||
concat_data.append((*i, 0))
|
||
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
|
||
|
||
for key, tmp_df in df.groupby(groupby):
|
||
tmp = {'title': key}
|
||
tmp_df = tmp_df.groupby('level').sum()
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
||
var = 0 if np.isnan(var) else var
|
||
tmp['p1'].append(var)
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
data_list.append(tmp)
|
||
|
||
for key, tmp_df in df.groupby(['date'] + groupby):
|
||
|
||
tmp_df = tmp_df.groupby('level').sum()
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp = dict()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
||
var = 0 if np.isnan(var) else var
|
||
tmp['p1'].append(var)
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
|
||
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
|
||
_[key[1]] = tmp
|
||
title = (groupby or ['总体']) + cond_level
|
||
resp = {'list': data_list,
|
||
'date_data': date_data,
|
||
'title': title,
|
||
'level': cond_level,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
|
||
@router.post("/scatter_model_sql")
|
||
async def scatter_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""分布分析 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.scatter_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/scatter_model_export")
|
||
async def retention_model_export(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
""" 分布分析 数据导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.scatter_model_sql()
|
||
file_name = quote(f'分布分析.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
interval_type = res['interval_type']
|
||
analysis = res['analysis']
|
||
groupby = res['groupby']
|
||
quota_interval_arr = res['quota_interval_arr']
|
||
# 兼容合计的
|
||
if res['time_particle'] == 'total':
|
||
df['date'] = '合计'
|
||
if analysis != 'number_of_days' and interval_type != 'discrete':
|
||
max_v = int(df['values'].max())
|
||
min_v = int(df['values'].min())
|
||
interval = (max_v - min_v) // 10 or 1
|
||
resp = {'list': dict(),
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
|
||
if not quota_interval_arr:
|
||
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
|
||
bins = [i for i in range(min_v, max_v + interval, interval)]
|
||
else:
|
||
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
||
resp['label'] = []
|
||
bins = [quota_interval_arr[0]]
|
||
for i, v in enumerate(quota_interval_arr[1:]):
|
||
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
||
bins.append(v)
|
||
|
||
# 这是整体的
|
||
for key, tmp_df in df.groupby('date'):
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=False).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
if res['time_particle'] == 'total':
|
||
resp['list']['合计'] = dict()
|
||
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': '总体'}
|
||
else:
|
||
resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
||
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': '总体'}
|
||
|
||
# 分组的
|
||
if groupby:
|
||
export_df = pd.DataFrame(columns=resp['label'])
|
||
|
||
for key, tmp_df in df.groupby(['date', *groupby]):
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=False).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
title = '.'.join(key[1:])
|
||
date = key[0]
|
||
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': title
|
||
}
|
||
|
||
export_df.loc[(date.strftime('%Y-%m-%d'), title)] = bins_s.to_list()
|
||
|
||
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime,
|
||
headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
# elif analysis == 'number_of_days':
|
||
else:
|
||
resp = {'list': {}, 'label': [],
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
labels = [str(i) for i in sorted(df['values'].unique())]
|
||
resp['label'] = labels
|
||
for key, tmp_df in df.groupby(['date']):
|
||
total = len(tmp_df)
|
||
if res['time_particle'] == 'total':
|
||
dt = '合计'
|
||
else:
|
||
dt = key.strftime('%Y-%m-%d')
|
||
labels_dict = {}
|
||
for key2, tmp_df2 in tmp_df.groupby('values'):
|
||
label = str(key2)
|
||
n = len(tmp_df2)
|
||
labels_dict[label] = n
|
||
|
||
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
|
||
|
||
export_df = pd.DataFrame(columns=resp['label'])
|
||
for d, v in resp['list'].items():
|
||
export_df.loc[d] = v['总体']['n']
|
||
|
||
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/scatter_model")
|
||
async def scatter_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""分布分析 模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
event_type = analysis.events[0]['eventName']
|
||
try:
|
||
res = await analysis.scatter_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
end_date=analysis.end_date
|
||
start_date=analysis.start_date
|
||
where=analysis.events[-1]['quotaname']
|
||
sql = res['sql']
|
||
#columnName = analysis.events[-1]['label_id']
|
||
|
||
#查询买量渠道owner为kuaiyou3的日注册玩家等级分布
|
||
# sql_list=sql.split("GROUP BY")
|
||
# sql01 = """and xiangsu.event.owner_name='kuaiyou3'GROUP BY"""""
|
||
# new_sql=sql_list[0]+sql01+sql_list[1]
|
||
# if columnName != '':
|
||
# sql = sql.replace('SELECT', f'SELECT {columnName},', 1)
|
||
# sql += f',{columnName}'
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
if 'list' in str(type(df['values'][0])):
|
||
# f=lambda x:x[0]
|
||
# df['values']=df['values'].map(f)
|
||
df = df.explode("values").reset_index(drop=True)
|
||
#df['values']=df['values'].astype(str)
|
||
df.fillna(0, inplace=True)
|
||
#转换数据类型为int
|
||
df['values'] = df['values'].astype(int)
|
||
interval_type = res['interval_type']
|
||
analysi = res['analysis']
|
||
groupby = res['groupby']
|
||
quota_interval_arr = res['quota_interval_arr']
|
||
# 兼容合计的
|
||
if res['time_particle'] == 'total':
|
||
df['date'] = '合计'
|
||
|
||
if analysi != 'number_of_days' and interval_type != 'discrete':
|
||
max_v = int(df['values'].max())
|
||
min_v = int(df['values'].min())
|
||
interval = (max_v - min_v) // 10 or 1
|
||
resp = {'list': dict(),
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
|
||
if not quota_interval_arr:
|
||
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
|
||
bins = [i for i in range(min_v, max_v + interval, interval)]
|
||
else:
|
||
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
||
resp['label'] = []
|
||
bins = [quota_interval_arr[0]]
|
||
for i, v in enumerate(quota_interval_arr[1:]):
|
||
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
||
bins.append(v)
|
||
|
||
# 这是整体的
|
||
for key, tmp_df in df.groupby('date'):
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=False).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
if res['time_particle'] == 'total':
|
||
resp['list']['合计'] = dict()
|
||
p = list(round(bins_s * 100 / total, 2).to_list())
|
||
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': [str(i)+'%' for i in p],
|
||
'title': '总体'}
|
||
else:
|
||
p=list(round(bins_s * 100 / total, 2).to_list())
|
||
resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
||
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p':[str(i)+'%' for i in p],
|
||
'title': '总体'}
|
||
# 分组的
|
||
# if groupby:
|
||
# for key, tmp_df in df.groupby(['date', *groupby]):
|
||
# bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
# right=False).value_counts()
|
||
# bins_s.sort_index(inplace=True)
|
||
# total = int(bins_s.sum())
|
||
# title = '.'.join(key[1:])
|
||
# date = key[0]
|
||
# resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
|
||
# 'p': round((bins_s * 100 / total).fillna(0),
|
||
# 2).to_list(),
|
||
# 'title': title
|
||
# }
|
||
download=analysis.event_view.get('download','')
|
||
if download == 1:
|
||
creat_df = create_df(resp)
|
||
Download=Download_xlsx(creat_df, '分布分析')
|
||
return Download
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
# elif analysis == 'number_of_days':
|
||
else:
|
||
resp = {'list': {}, 'label': [],
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
labels = [str(i) for i in sorted(df['values'].unique())]
|
||
resp['label'] = labels
|
||
for key, tmp_df in df.groupby(['date']):
|
||
total = len(tmp_df)
|
||
if res['time_particle'] == 'total':
|
||
dt = '合计'
|
||
else:
|
||
dt = key.strftime('%Y-%m-%d')
|
||
|
||
labels_dict = {}
|
||
for key2, tmp_df2 in tmp_df.groupby('values'):
|
||
label = str(key2)
|
||
n = len(tmp_df2)
|
||
labels_dict[label] = n
|
||
if event_type == 'pay':
|
||
#如需要2之后所有之和,则执行下面代码,返回值为字典的labels_dict01
|
||
labels_dict01={}
|
||
v=-1
|
||
for i in labels:
|
||
v +=1
|
||
if int(i) == 1:
|
||
labels_dict01["1"]=labels_dict["1"]
|
||
else:
|
||
# for number in labels_dict.keys():
|
||
# if number >=i:
|
||
values=list(labels_dict.values())
|
||
n=sum(values[v:])
|
||
labels_dict01[i]=n
|
||
#传入百分比数据
|
||
list_p=[]
|
||
for i in labels:
|
||
number_int=round(labels_dict01.get(i, 0) * 100 / total, 2)
|
||
number_str=str(number_int)+'%'
|
||
list_p.append(number_str)
|
||
|
||
resp['list'][dt] = {'总体': {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}}
|
||
else:
|
||
list_p=[]
|
||
for i in labels:
|
||
number_int=round(labels_dict.get(i, 0) * 100 / total, 2)
|
||
number_str=str(number_int)+'%'
|
||
list_p.append(number_str)
|
||
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}}
|
||
#resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
|
||
if where =="step_id" and event_type == "guide":
|
||
sql=f"""SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, count(DISTINCT {game}.event."#account_id") AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{start_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = 'create_account' GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date"""
|
||
df = await ckdb.query_dataframe(sql)
|
||
for i in range(len(df)):
|
||
resp['list'][str(df['date'][i])]['总体']['total']=int(df['values'][i])
|
||
#兼容下载功能
|
||
download=analysis.event_view.get('download','')
|
||
if download == 1:
|
||
creat_df=create_df(resp)
|
||
Download=Download_xlsx(creat_df,'分布分析')
|
||
return Download
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
# bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
# right=False).value_counts()
|
||
# bins_s.sort_index(inplace=True)
|
||
# total = int(bins_s.sum())
|
||
# resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
||
# resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
# 'p': round(bins_s * 100 / total, 2).to_list(),
|
||
# 'title': '总体'}
|
||
@router.post("/scatter_model_details")
|
||
async def scatter_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
await analysis.init(data_where=current_user.data_where)
|
||
try:
|
||
res = await analysis.scatter_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
event_type = analysis.events[0]['eventName']
|
||
where = analysis.events[-1]['quotaname']
|
||
sql=res['sql']
|
||
columnName = analysis.event_view['groupBy'][-1]['columnName']
|
||
if analysis.event_view['groupBy'] != []:
|
||
|
||
if columnName != '':
|
||
sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'{columnName} as va',
|
||
1)
|
||
sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1)
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
if 'list' in str(type(df['values'][0])):
|
||
# f = lambda x: x[0]
|
||
# df['values'] = df['values'].map(f)
|
||
df = df.explode("values").reset_index(drop=True)
|
||
df.fillna(0, inplace=True)
|
||
# 转换数据类型为int
|
||
df['values'] = df['values'].astype(int)
|
||
interval_type = res['interval_type']
|
||
analysi = res['analysis']
|
||
groupby = res['groupby']
|
||
quota_interval_arr = res['quota_interval_arr']
|
||
# 兼容合计的
|
||
if res['time_particle'] == 'total':
|
||
df['date'] = '合计'
|
||
|
||
if analysi != 'number_of_days' and interval_type != 'discrete':
|
||
#默认区间
|
||
max_v = int(df['values'].max())
|
||
min_v = int(df['values'].min())
|
||
interval = (max_v - min_v) // 10 or 1
|
||
resp = {'list': dict(),
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle'],
|
||
'biaotou':columnName
|
||
}
|
||
if 'float' in str(df.dtypes['va']):
|
||
df['va'] = df['va'].astype(int)
|
||
if 'list' in str(type(df['va'][0])):
|
||
f = lambda x: x[0]
|
||
df['va'] = df['va'].map(f)
|
||
if not quota_interval_arr:
|
||
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
|
||
bins = [i for i in range(min_v, max_v + interval, interval)]
|
||
else:
|
||
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
||
resp['label'] = []
|
||
bins = [quota_interval_arr[0]]
|
||
for i, v in enumerate(quota_interval_arr[1:]):
|
||
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
||
bins.append(v)
|
||
# if 'float' in str(df.dtypes['va']):
|
||
# df['va'] = df['va'].astype(int)
|
||
# if 'list' in str(type(df['va'][0])):
|
||
# f = lambda x: x[0]
|
||
# df['va'] = df['va'].map(f)
|
||
# 这是分组的
|
||
for key, tmp_df in df.groupby('va'):
|
||
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=True,include_lowest=True).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
if res['time_particle'] == 'total':
|
||
resp['list']['合计'] = dict()
|
||
|
||
resp['list']['合计'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': '总体'}
|
||
else:
|
||
p=round(bins_s * 100 / total, 2).to_list()
|
||
for i in range(len(p)):
|
||
if str(p[i]) == 'nan':
|
||
p[i] = 0
|
||
#映射对应的埋点数据
|
||
re = await crud.select_map.get_list(db, game)
|
||
re_list=[i['attr_name'] for i in re]
|
||
if columnName in re_list:
|
||
for i in re:
|
||
if columnName == i['attr_name']:
|
||
for datas in i['map_']:
|
||
if key == datas['id']:
|
||
key=datas['title']
|
||
break
|
||
break
|
||
if 'time' not in columnName:
|
||
resp['list'][key] = dict()
|
||
resp['list'][key] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': [str(i)+'%' for i in p],
|
||
'title': '总体'}
|
||
else:
|
||
resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict()
|
||
resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': [str(i)+'%' for i in p],
|
||
'title': '总体'}
|
||
# 兼容下载功能
|
||
download = analysis.event_view.get('download', '')
|
||
if download == 1:
|
||
create_df = create_neidf(resp,columnName)
|
||
Download=Download_xlsx(create_df, '分布分析')
|
||
return Download
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
else:
|
||
#离散数字
|
||
resp = {'list': {}, 'label': [],
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle'],
|
||
'biaotou':columnName
|
||
}
|
||
labels = [str(i) for i in sorted(df['values'].unique())]
|
||
resp['label'] = labels
|
||
if 'list' in str(type(df['va'][0])):
|
||
f = lambda x: x[0]
|
||
df['va'] = df['va'].map(f)
|
||
for key, tmp_df in df.groupby(['va']):
|
||
total = len(tmp_df)
|
||
if res['time_particle'] == 'total':
|
||
dt = '合计'
|
||
else:
|
||
#映射对应的埋点数据
|
||
re = await crud.select_map.get_list(db, game)
|
||
re_list=[i['attr_name'] for i in re]
|
||
if columnName in re_list:
|
||
for i in re:
|
||
if columnName == i['attr_name']:
|
||
for datas in i['map_']:
|
||
if key == datas['id']:
|
||
key=datas['title']
|
||
break
|
||
break
|
||
dt = key
|
||
#dt = key.strftime('%Y-%m-%d')
|
||
#dt='合计'
|
||
|
||
labels_dict = {}
|
||
for key2, tmp_df2 in tmp_df.groupby('values'):
|
||
label = str(key2)
|
||
n = len(tmp_df2)
|
||
labels_dict[label] = n
|
||
if event_type == 'pay':
|
||
# 如需要2之后所有之和,则执行下面代码,返回值为字典的labels_dict01
|
||
labels_dict01 = {}
|
||
v = -1
|
||
for i in labels:
|
||
v += 1
|
||
if int(i) == 1:
|
||
labels_dict01["1"] = labels_dict["1"]
|
||
else:
|
||
# for number in labels_dict.keys():
|
||
# if number >=i:
|
||
values = list(labels_dict.values())
|
||
n = sum(values[v:])
|
||
labels_dict01[i] = n
|
||
# 传入百分比数据
|
||
list_p = []
|
||
for i in labels:
|
||
number_int = round(labels_dict01.get(i, 0) * 100 / total, 2)
|
||
number_str = str(number_int) + '%'
|
||
list_p.append(number_str)
|
||
|
||
resp['list'][dt] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}
|
||
else:
|
||
list_p = []
|
||
for i in labels:
|
||
number_int = round(labels_dict.get(i, 0) * 100 / total, 2)
|
||
number_str = str(number_int) + '%'
|
||
list_p.append(number_str)
|
||
resp['list'][dt] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}
|
||
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
|
||
if where == "step_id" and event_type == "guide":
|
||
sql = f"""SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, count(DISTINCT {game}.event."#account_id") AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{start_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = 'create_account' GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date"""
|
||
df = await ckdb.query_dataframe(sql)
|
||
for i in range(len(df)):
|
||
resp['list'][str(df['date'][i])]['total'] = int(df['values'][i])
|
||
# 兼容下载功能
|
||
download = analysis.event_view.get('download', '')
|
||
if download == 1:
|
||
create_df = create_neidf(resp,columnName)
|
||
Download=Download_xlsx(create_df, '分布分析')
|
||
return Download
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
else:
|
||
return schemas.Msg(code=-9, msg='没有添加分组项', data='')
|
||
|
||
@router.post("/trace_model_sql")
|
||
async def trace_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""路径分析 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.trace_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/trace_model")
|
||
async def trace_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""路径分析"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.trace_model_sql()
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
chain_dict = defaultdict(dict)
|
||
event_num_dict={}
|
||
event_next_event={}
|
||
nodes = {'流失'}
|
||
name_list=analysis.events['event_namesdes']
|
||
name_dict={}
|
||
for i in name_list:
|
||
name_dict[i['event_name']]=i['event_desc']
|
||
for event_names, count in zip(df['event_chain'], df['values']):
|
||
fmt_keys=[]
|
||
chain_len = len(event_names)
|
||
for i, event_name in enumerate(event_names):
|
||
if i >= 10:
|
||
continue
|
||
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
|
||
#按对应的中文名显示
|
||
event_namess=name_dict.get(event_name,event_name)
|
||
next_eventss=name_dict.get(next_event,next_event)
|
||
key = (f'{event_namess}-{i}', f'{next_eventss}-{i + 1}')
|
||
#key = (f'{event_name}', f'{next_event}')
|
||
nodes.update(key)
|
||
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
|
||
keys = list(key)
|
||
for true_key in keys:
|
||
if true_key in fmt_keys:
|
||
continue
|
||
if true_key in event_num_dict:
|
||
event_num_dict[true_key] += count
|
||
else:
|
||
event_num_dict[true_key] = count
|
||
fmt_keys.append(true_key)
|
||
|
||
# 检测事件的后续事件有哪些
|
||
if keys[0] in event_next_event:
|
||
event_next_event[keys[0]].append(keys[1])
|
||
event_next_event[keys[0]] = list(set(event_next_event[keys[0]]))
|
||
else:
|
||
event_next_event[keys[0]] = [keys[1]]
|
||
|
||
links = []
|
||
for _, items in chain_dict.items():
|
||
for keys, val in items.items():
|
||
links.append({
|
||
"source": keys[0],
|
||
"target": keys[1],
|
||
"value": val
|
||
})
|
||
node=[ item for item in nodes]
|
||
node.sort()
|
||
#按固定的首尾排序
|
||
first = []
|
||
trail = []
|
||
nodes = []
|
||
for i in node:
|
||
if analysis.events['source_event']['eventDesc'] in i:
|
||
first.append(i)
|
||
elif '流失' in i:
|
||
trail.append(i)
|
||
else:
|
||
nodes.append(i)
|
||
first.sort(reverse=True)
|
||
for i in first:
|
||
nodes.insert(0, i)
|
||
for i in trail:
|
||
nodes.append(i)
|
||
# 处理event_next_event
|
||
event_new_next = {}
|
||
for key, key_list in event_next_event.items():
|
||
new_key_list = []
|
||
for key1 in key_list:
|
||
new_key_list.append({'event_name': key1, 'value': event_num_dict[key1]})
|
||
event_new_next[key] = new_key_list
|
||
|
||
data = {
|
||
#'nodes': [{'name': item} for item in nodes],
|
||
'nodes': [{'name': item} for item in nodes],
|
||
'links': links,
|
||
'event_num':event_num_dict,
|
||
'event_next':event_new_next,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
|
||
|
||
@router.post("/trace_user_info_model")
|
||
async def trace_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""路径分析用户详情"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.trace_model_sql()
|
||
sql = res['sql']
|
||
name_list = analysis.events['event_namesdes']
|
||
name_dict = {}
|
||
for i in name_list:
|
||
name_dict[i['event_name']] = i['event_desc']
|
||
# 获取事件对应的uid
|
||
event_uid_list = await ckdb.query_data_trace(sql, name_dict)
|
||
if not event_uid_list:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
event_name = analysis.events['event_name']
|
||
page = analysis.events['page']
|
||
event_next = analysis.events['event_next']
|
||
starindex = (page - 1) * 10
|
||
all_uid_list = []
|
||
|
||
# 后续事件统计的用户详情
|
||
if event_name.startswith('follow'):
|
||
for event_next_list in event_next:
|
||
true_event_name = event_next_list['event_name']
|
||
if true_event_name.startswith('流失'):
|
||
continue
|
||
all_uid_list += event_uid_list[true_event_name]
|
||
all_uid_list = list(set(all_uid_list))
|
||
|
||
# 更多事件的用户详情
|
||
elif event_name.startswith('more'):
|
||
# 后续事件排序后取第8个开始
|
||
event_next_true_eventlist = sorted(event_next, key=lambda x: x['value'], reverse=True)[8:]
|
||
for event_next_list in event_next_true_eventlist:
|
||
true_event_name = event_next_list['event_name']
|
||
all_uid_list += event_uid_list[true_event_name]
|
||
all_uid_list = list(set(all_uid_list))
|
||
|
||
# 单个节点的用户详情
|
||
else:
|
||
all_uid_list = event_uid_list[event_name]
|
||
|
||
user_num = len(all_uid_list)
|
||
account_id = all_uid_list[starindex:10 * page]
|
||
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
|
||
df1 = await ckdb.query_dataframe(new_sql)
|
||
new_values = df1.values.tolist()
|
||
for i in range(len(new_values)):
|
||
if str(new_values[i][6]) == 'nan':
|
||
new_values[i][6] = 0
|
||
res = {
|
||
'user_num': user_num,
|
||
'details_data': {
|
||
'new_columns': df1.columns.tolist(),
|
||
'new_values': new_values
|
||
}}
|
||
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/user_property_model_sql")
|
||
async def user_property_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: UserAnalysis = Depends(UserAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""用户属性sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = analysis.property_model()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/user_property_model_export")
|
||
async def user_property_model_export(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: UserAnalysis = Depends(UserAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
"""用户属性 导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = analysis.property_model()
|
||
file_name = quote(f'用户属性.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
|
||
sql = data['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
df_to_stream = DfToStream((df, '用户属性'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/user_property_model")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
analysis: UserAnalysis = Depends(UserAnalysis),
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""用户属性分析"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = analysis.property_model()
|
||
sql = res['sql']
|
||
quota = res['quota']
|
||
groupby = res['groupby']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
# 没有分组
|
||
data = {}
|
||
|
||
if not groupby:
|
||
data['总体'] = int(df['values'][0])
|
||
title = ['总体', quota]
|
||
|
||
else:
|
||
sum_s = df.groupby(groupby)['values'].sum()
|
||
for key, val in sum_s.items():
|
||
if isinstance(key, tuple):
|
||
key = ','.join([str(i) for i in key])
|
||
else:
|
||
key = str(key)
|
||
data[key] = val
|
||
title = ['.'.join(groupby), quota]
|
||
|
||
return schemas.Msg(code=0, msg='ok', data={
|
||
'value': data,
|
||
'title': title
|
||
})
|
||
|
||
@router.post("/seek_user")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_seek_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
) -> schemas.Msg:
|
||
"""游戏用户搜索功能"""
|
||
#判断的内容
|
||
data=data_in.condition
|
||
#需要判断的字段
|
||
ziduan=data_in.user_arrt_title
|
||
#筛选条件
|
||
tiaojian=data_in.comparator_id
|
||
if tiaojian == '==':
|
||
tiaojian = '='
|
||
#判断是否是时间类型
|
||
if data_in.user_arrt_type == 'datetime':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
|
||
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10}"""
|
||
#如果查询'#account_id',则不多余返回一个account_id
|
||
elif ziduan == '#account_id':
|
||
sql=f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10} """
|
||
elif data_in.user_arrt_type == 'int':
|
||
sql=f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10}"""
|
||
else:
|
||
sql=f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(data_in.pages-1)*10}"""
|
||
# 查询数据
|
||
try:
|
||
df = await ckdb.query_dataframe(sql)
|
||
except Exception as e:
|
||
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
|
||
# 转换成列表返回
|
||
df.fillna(0, inplace=True)
|
||
account_id=list(df['#account_id'])
|
||
new_sql=f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
|
||
df1= await ckdb.query_dataframe(new_sql)
|
||
new_values=df1.values.tolist()
|
||
for i in range(len(new_values)):
|
||
if str(new_values[i][6]) == 'nan':
|
||
new_values[i][6]=0
|
||
res = {'refer':{
|
||
'columns': df.columns.tolist(),
|
||
'values': df.values.tolist()
|
||
},
|
||
'details_data':{
|
||
'new_columns':df1.columns.tolist(),
|
||
'new_values':new_values
|
||
}}
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
@router.post("/seek_user_count")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_seek_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
) -> schemas.Msg:
|
||
"""游戏用户搜索功能查询到的数量"""
|
||
#判断的内容
|
||
data=data_in.condition
|
||
#需要判断的字段
|
||
ziduan=data_in.user_arrt_title
|
||
#筛选条件
|
||
tiaojian=data_in.comparator_id
|
||
if tiaojian == '==':
|
||
tiaojian = '='
|
||
#判断是否是时间类型
|
||
if data_in.user_arrt_type == 'datetime':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
|
||
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
|
||
#如果查询'#account_id',则不多余返回一个account_id
|
||
elif ziduan == '#account_id':
|
||
sql=f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
|
||
elif data_in.user_arrt_type == 'int':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
|
||
else:
|
||
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
|
||
#查询数据
|
||
try:
|
||
df = await ckdb.query_dataframe(sql)
|
||
except Exception as e:
|
||
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
|
||
#返回查询到的数量
|
||
res=len(df)
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
@router.post("/download_user")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_seek_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
):
|
||
"""下载查询到的所有数据"""
|
||
#判断的内容
|
||
data=data_in.condition
|
||
#需要判断的字段
|
||
ziduan=data_in.user_arrt_title
|
||
#筛选条件
|
||
tiaojian=data_in.comparator_id
|
||
if tiaojian == '==':
|
||
tiaojian = '='
|
||
#判断是否是时间类型
|
||
if data_in.user_arrt_type == 'datetime':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
|
||
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
|
||
#如果查询'#account_id',则不多余返回一个account_id
|
||
elif ziduan == '#account_id':
|
||
sql=f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
|
||
elif data_in.user_arrt_type == 'int':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
|
||
else:
|
||
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
|
||
#查询数据
|
||
try:
|
||
df = await ckdb.query_dataframe(sql)
|
||
except Exception as e:
|
||
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据',data='')
|
||
account_id = list(df['#account_id'])
|
||
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
|
||
df1 = await ckdb.query_dataframe(new_sql)
|
||
file_name=quote(f'下载的用户搜索数据.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
df_to_stream = DfToStream((df1, '下载的用户搜索数据'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
@router.post("/solo_user")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_solo_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
):
|
||
"""用户的详情"""
|
||
if data_in.event_list == []:
|
||
return schemas.Msg(code=-9, msg='请配置用户搜索模块事件', data=[])
|
||
event_dict={}
|
||
for i in data_in.event_list:
|
||
event_dict[i['event']]=i['event_name']
|
||
# event_dict={'pay':'充值','create_account':'创建角色','login':'登录','ta_app_end':'离开游戏','guide':'新手引导','level_up':'玩家等级',
|
||
# 'vip_level':'vip等级','sign':'签到','summon':'招募','ask_for_join_guild':'加入联盟','leave_guild':'离开联盟','create_guild':'创建联盟'}
|
||
sql=f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` = '{data_in.account_id}'"""
|
||
#获取用户基本详情
|
||
df= await ckdb.query_dataframe(sql)
|
||
#获取用户每天事件量
|
||
start_times=data_in.start_time.split(' ')[0]
|
||
end_times=data_in.end_time.split(' ')[0]
|
||
event = list(event_dict.keys())
|
||
sql1 = f"""select toDate(addHours(`#event_time`, `#zone_offset`)) as date,count(`#event_name`) as v from {game}.event
|
||
where `date`>='{start_times}' and `date`<='{end_times}'
|
||
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by date ORDER by date"""
|
||
df1=await ckdb.query_dataframe(sql1)
|
||
#时间间隔天数
|
||
global event_values, data_list,game_details,zhanbi
|
||
if len(df1) >0:
|
||
time_interval=getEveryDay(start_times,end_times)
|
||
a = list(df1['date'])
|
||
aa=[]
|
||
for i in a:
|
||
aa.append(str(i))
|
||
for i in time_interval:
|
||
if i not in aa:
|
||
df1.loc[len(df1.index)] = [i, 0]
|
||
df1[['date']]=df1[['date']].astype(str)
|
||
df1.sort_values('date',inplace=True)
|
||
data_list=list(df1['date'])
|
||
event_values=list(df1['v'])
|
||
else:
|
||
data_list= [] #getEveryDay(start_times,end_times)
|
||
event_values=[]
|
||
#获取用户事件的详情
|
||
sql2=f"""select * FROM {game}.event WHERE `#account_id`='{data_in.account_id}' and addHours(`#event_time`, `#zone_offset`) >='{data_in.start_time}' and
|
||
addHours(`#event_time`, `#zone_offset`) <= '{data_in.end_time}' and `#event_name` in ({event}) order by `#event_time`"""
|
||
df2=await ckdb.query_dataframe(sql2)
|
||
if len(df2) > 0:
|
||
game_details={}
|
||
#区分天数
|
||
days=list(df2['#event_time'])
|
||
day_set=set()
|
||
for i in days:
|
||
day_set.add(str(i).split(' ')[0])
|
||
#总日期,一天的
|
||
day_list=list(day_set)
|
||
day_list.sort()
|
||
for day in day_list:
|
||
game_deta = []
|
||
for nu in range(len(df2)):
|
||
if day in str(df2['#event_time'][nu]):
|
||
#详细时间
|
||
game_detail={}
|
||
time_s=str(df2['#event_time'][nu]).split('+')[0]
|
||
game_detail['time']=time_s.split(' ')[1]
|
||
game_detail['event']=event_dict[df2['#event_name'][nu]]
|
||
a_list = []
|
||
#获取df的字段名
|
||
columns=df2.columns.values
|
||
for col in columns:
|
||
a=str(df2[col][nu])
|
||
if a != 'None' and a != '' and a != 'nan' and a != '[]':
|
||
a_list.append({'title':col,'val':a})
|
||
game_detail['xaingqing']=a_list
|
||
game_deta.append(game_detail)
|
||
game_details[day]=game_deta
|
||
else:
|
||
game_details = {}
|
||
#event_count = await ckdb.yesterday_event_count(game)
|
||
#求事件占比
|
||
sql3=f"""select `#event_name` as a,count(`#event_name`) as v from {game}.event
|
||
where addHours(`#event_time`, `#zone_offset`)>='{data_in.start_time}' and addHours(`#event_time`, `#zone_offset`)<='{data_in.end_time}'
|
||
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by `#event_name`"""
|
||
df3 = await ckdb.query_dataframe(sql3)
|
||
if len(df3) > 0:
|
||
zhanbi=[]
|
||
sums=sum(list(df1['v']))
|
||
numbers=0
|
||
for i in range(len(df3)):
|
||
shuju={}
|
||
shuju['name']=event_dict[df3['a'][i]]
|
||
shuju['value']=int(df3['v'][i])
|
||
# if i != len(df3)-1:
|
||
# number1=round(int(df3['v'][i]) / sums, 2)
|
||
# number=round(number1*100,2)
|
||
# numbers+=number
|
||
# shuju['zhanbi'] = str(number) + '%'
|
||
# else:
|
||
# shuju['zhanbi']=str(100-numbers) + '%'
|
||
zhanbi.append(shuju)
|
||
else:
|
||
zhanbi = []
|
||
res = {
|
||
'details_data':{
|
||
'new_columns':df.columns.tolist(),
|
||
'new_values':df.values.tolist()},
|
||
'event_count':{
|
||
'date':data_list,
|
||
'event_values':event_values
|
||
},
|
||
'details_user':game_details,
|
||
'proportion':zhanbi
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
@router.get("/event_list")
|
||
async def event_list(
|
||
request: Request,
|
||
game: str,
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""个人详情中的事件列表"""
|
||
#获取事件名
|
||
#event_list = await ckdb.distinct(game, 'event', '#event_name')
|
||
event_list = await crud.event_list.get_list(db,game)
|
||
if event_list == []:
|
||
return schemas.Msg(code=0, msg='请配置用户搜索模块事件', data=[])
|
||
else:
|
||
res=event_list[0]['details']
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
@router.post("/add_event_list")
|
||
async def add_select_map(
|
||
request: Request,
|
||
game: str,
|
||
file: bytes = File(...),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""添加对应游戏事件选择映射"""
|
||
dfs = pd.read_excel(file, engine='openpyxl', sheet_name=None)
|
||
for attr_name, df in dfs.items():
|
||
#将id这列转换成字符串类型
|
||
if len(df) >0:
|
||
ColNames = df.columns.tolist()
|
||
event = df.to_dict('records')
|
||
details=[]
|
||
for i in event:
|
||
details_dict={}
|
||
details_dict['event']=i[ColNames[0]]
|
||
details_dict['event_name']=i[ColNames[1]]
|
||
details.append(details_dict)
|
||
data_in = schemas.Event_list(game=game, details=details)
|
||
await crud.event_list.save(db, data_in)
|
||
return schemas.Msg(code=0, msg='ok', data=1)
|