2604 lines
110 KiB
Python
2604 lines
110 KiB
Python
import ast
|
||
import datetime
|
||
from collections import defaultdict
|
||
import mimetypes
|
||
from urllib.parse import quote
|
||
import os
|
||
from copy import deepcopy
|
||
import pandas as pd
|
||
import numpy as np
|
||
from fastapi import APIRouter, Depends, Request, File
|
||
from fastapi.encoders import jsonable_encoder
|
||
from motor.motor_asyncio import AsyncIOMotorDatabase
|
||
from fastapi.responses import StreamingResponse
|
||
# from datetime import datetime
|
||
import crud, schemas
|
||
from common import *
|
||
|
||
from api import deps
|
||
from db import get_database
|
||
from db.ckdb import get_ck_db, CKDrive
|
||
from db.redisdb import get_redis_pool, RedisDrive
|
||
|
||
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
|
||
from models.user_analysis import UserAnalysis
|
||
from models.x_analysis import XAnalysis
|
||
from utils import DfToStream, getEveryDay, Download_xlsx, jiange_insert, create_df, create_neidf
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.post("/sql")
|
||
async def query_sql(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Sql,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""原 sql 查询 """
|
||
sql = data_in.sql
|
||
sql1 = sql.lower()
|
||
if 'insert' not in sql1 and 'update' not in sql1 and 'delete' not in sql1 and 'select' in sql1:
|
||
sql = sql.replace('$game', game)
|
||
data = await ckdb.execute(sql)
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
else:
|
||
return schemas.Msg(code=0, msg='ok', data='当前只支持SQL查询语句!')
|
||
|
||
|
||
@router.post("/sql_export")
|
||
async def query_sql(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Sql,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
"""sql 导出 """
|
||
file_name = quote(f'result.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
|
||
sql = data_in.sql
|
||
sql = sql.replace('$game', game)
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
columns=df.columns.tolist()
|
||
for i in columns:
|
||
if 'time' in i:
|
||
df[i] = df[i].apply(lambda a: pd.to_datetime(a).strftime('%Y-%m-%d %H:%M:%S'))
|
||
df_to_stream = DfToStream((df, 'result'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/event_model_sql")
|
||
async def event_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
""" 事件分析模型 sql"""
|
||
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.event_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
|
||
|
||
@router.post("/event_model_export")
|
||
async def event_model_export(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
""" 事件分析模型 数据导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
sqls = await analysis.event_model_sql()
|
||
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
excels = []
|
||
for item in sqls:
|
||
if item.get('combination_event'):
|
||
continue
|
||
sql = item['sql']
|
||
event_name = item['event_name']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
continue
|
||
if 'date' in df:
|
||
df.sort_values('date', inplace=True)
|
||
try:
|
||
df['date'] = df['date'].dt.tz_localize(None)
|
||
except:
|
||
pass
|
||
excels.append((df, event_name))
|
||
df_to_stream = DfToStream(*excels)
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/event_model_pay")
|
||
async def event_model_export(request: Request,
|
||
game: str,
|
||
data_in: schemas.Times,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
):
|
||
"""下载日充总额详细订单数据"""
|
||
sql = f"""select * FROM {game}.event WHERE addHours(`#event_time`, 8) >= '{data_in.start_time}' and addHours(`#event_time`, 8) <= '{data_in.end_time}' and `#event_name` = 'pay' and
|
||
orderid NOT LIKE '%GM%' order by `#event_time`"""
|
||
df = await ckdb.query_dataframe(sql)
|
||
list_columns = list(df.columns.values)
|
||
drop_list = []
|
||
for i in list_columns:
|
||
aa = type(df[i][0])
|
||
if df[i][0] == None or df[i][0] == [] or df[i][0] == '':
|
||
drop_list.append(i)
|
||
else:
|
||
if 'time' in i:
|
||
df[i] = df[i].astype(str)
|
||
for nu in range(len(df)):
|
||
df.replace(to_replace=df[i][nu], value=df[i][nu].split('+')[0], inplace=True)
|
||
|
||
df.drop(drop_list, axis=1, inplace=True)
|
||
file_name = quote(f'订单详情.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
df_to_stream = DfToStream((df, '订单详情'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
# @router.get("/event_model_export")
|
||
# async def event_model_export(request: Request,
|
||
# game: str,
|
||
# report_id: str,
|
||
# ckdb: CKDrive = Depends(get_ck_db),
|
||
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
# current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
# ):
|
||
# """ 事件分析模型 数据导出"""
|
||
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
|
||
# await analysis.init(data_where=current_user.data_where)
|
||
# sqls = analysis.event_model_sql()
|
||
# res = []
|
||
# file_name = f'{sqls[0]["report_name"]}.xlsx'
|
||
# mime = mimetypes.guess_type(file_name)[0]
|
||
# for item in sqls[:1]:
|
||
# sql = item['sql']
|
||
# event_name = item['event_name']
|
||
# df = await ckdb.query_dataframe(sql)
|
||
# file = df_to_stream(df, event_name)
|
||
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
#
|
||
|
||
@router.post("/event_model")
|
||
async def event_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.CkQuery,
|
||
ckdb: CKDrive = Depends(get_ck_db), # ck 驱动
|
||
db: AsyncIOMotorDatabase = Depends(get_database), # mongodb 驱动
|
||
rdb: RedisDrive = Depends(get_redis_pool), # redis 驱动
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user) # 登录校验
|
||
) -> schemas.Msg:
|
||
""" 事件分析"""
|
||
await analysis.init(data_where=current_user.data_where) # 初始化请求参数
|
||
try:
|
||
sqls = await analysis.event_model_sql() # 生成SQL,查询时间范围,日期类型
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
res = []
|
||
is_hide = []
|
||
group_label = {}
|
||
for idx, item in enumerate(sqls): # 列出索引下标
|
||
if item.get('is_show') == False:
|
||
is_hide.append(idx)
|
||
# event_name:事件名,日充总额
|
||
# format:float浮点型
|
||
q = {
|
||
'groups': [],
|
||
'values': [],
|
||
'sum': [],
|
||
'avg': [],
|
||
'event_name': item['event_name'],
|
||
'format': item['format'],
|
||
'last_value': 0,
|
||
'start_date': item['start_date'],
|
||
'end_date': item['end_date'],
|
||
'time_particle': item['time_particle']
|
||
}
|
||
# 处理组合问题,如combination_event不存在则跳过
|
||
if item.get('combination_event'):
|
||
combination_event = CombinationEvent(res, item.get('combination_event'), item['format'])
|
||
values, sum_, avg = combination_event.parse()
|
||
|
||
# q['values'].append(values)
|
||
# q['sum'].append(sum_)
|
||
q['avg'].append(avg)
|
||
q['date_range'] = item['date_range']
|
||
for last_value in values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
if list(item.get('event_name'))[-1] == '率': # 比列类的全部加上百分比符号
|
||
for i in range(len(values)):
|
||
values[i] = str((values[i])) + '%'
|
||
q['values'].append(values)
|
||
q['sum'].append(str(sum_) + '%')
|
||
elif '比' in item['event_name']:
|
||
for i in range(len(values)):
|
||
values[i] = str(int(float(values[i]) * 100)) + '%'
|
||
q['values'].append(values)
|
||
q['sum'].append(str(int(float(sum_) * 100)) + '%')
|
||
else:
|
||
q['values'].append(values)
|
||
q['sum'].append(sum_)
|
||
res.append(q)
|
||
continue
|
||
# sql语句
|
||
sql = item['sql']
|
||
groupby = item['groupby'] # 分组项
|
||
date_range = item['date_range'] # 获取的要查询的每一天的时间
|
||
df = await ckdb.query_dataframe(sql) # 以sql语句查出数据,df是二维列表
|
||
if item['event_name'] == '月充总额': # 充值总额和月充总额是单独拿出来做处理返回数据
|
||
date_range=df['date'].tolist()
|
||
q['date_range']=[str(i).split('-')[0]+'-'+str(i).split('-')[1] for i in date_range]
|
||
else:
|
||
q['date_range'] = date_range # 把要查询的时间加入q字典中
|
||
df.fillna(0, inplace=True) # 以0填补空数据
|
||
# 映射对应中文返回给前端展示
|
||
for i in groupby:
|
||
if i == 'svrindex':
|
||
if game == 'mfmh5': # 只有mfmh5单独处理,因为同步区服的的游戏名不一样
|
||
game = 'mzmfmh5'
|
||
chinese = {}
|
||
resp = await crud.select_map.get_one(db, game, i)
|
||
if not resp:
|
||
continue
|
||
for ii in resp:
|
||
chinese[ii['id']] = ii['title']
|
||
for k, v in chinese.items():
|
||
# 开始映射
|
||
df.loc[df[i] == k, i] = v
|
||
|
||
# 获取第一矩阵的长度
|
||
if df.shape[0] == 0:
|
||
df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
|
||
# continue
|
||
# return schemas.Msg(code=0, msg='ok', data=[q])
|
||
if item['time_particle'] == 'total' or item['event_name'] == '充值总额':
|
||
# for group, df_group in df.groupby(groupby):
|
||
# df_group.reset_index(drop=True, inplace=True)
|
||
if item['event_name'] == '充值总额':
|
||
q['groups']=[]
|
||
else:
|
||
q['groups'].append(groupby)
|
||
q['values'].append(df['values'].to_list())
|
||
q['sum'].append(round(float(df['values'].sum()), 2))
|
||
q['avg'].append(round(float(df['values'].mean()), 2))
|
||
for last_value in df['values'].values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
if groupby and (set(groupby) & set(df) == set(groupby)):
|
||
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
|
||
else:
|
||
q['date_range'] = ['合计']
|
||
# 暂时只执行像素的计费点加别名
|
||
if game == 'xiangsu':
|
||
if item['groupby'][0] == 'proid' and analysis.events[0]['event_name'] == 'pay':
|
||
# 将对应英文的中文意思按位置一一对应返回给前端
|
||
proid_dict = await crud.proid_map.get_all_show_name(db, game)
|
||
res_list = []
|
||
for i in q['date_range']:
|
||
try:
|
||
name = proid_dict[i]
|
||
res_list.append(name)
|
||
except:
|
||
pass
|
||
|
||
q['proid_name'] = res_list
|
||
# 将proid字段和金额money按对应关系组合成字典并算出对应的总额返回给前端
|
||
money_dict = await crud.proid_map.get_all_show_money(db, game)
|
||
add_money = []
|
||
number = q['values'][0]
|
||
next = -1
|
||
for i in q['date_range']:
|
||
next += 1
|
||
try:
|
||
mongey = money_dict[i]
|
||
add = number[next] * mongey
|
||
add_money.append(add)
|
||
except:
|
||
pass
|
||
q['proid_money'] = add_money
|
||
# 首充金额分布
|
||
# if item['groupby'][0] == 'money' and analysis.events[0]['event_name'] == 'pay':
|
||
# # 将proid字段和金额money按对应关系组合成字典并算出对应的总额返回给前端
|
||
# money_dict = await crud.proid_map.get_all_show_money(db, game)
|
||
# add_money = []
|
||
# number = q['values'][0]
|
||
# next = -1
|
||
# for i in q['date_range']:
|
||
# next += 1
|
||
# mongey = money_dict[i]
|
||
# add = number[next] * mongey
|
||
# add_money.append(add)
|
||
# q['proid_money'] = add_money
|
||
res.append(q)
|
||
continue
|
||
if groupby and (set(groupby) & set(df)) == set(groupby):
|
||
columns = groupby[0]
|
||
df[columns] = df[columns].astype(str)
|
||
# 有分组
|
||
for group, df_group in df.groupby(groupby):
|
||
# 在原数据上将索引重新转换为列,新索引的列删除
|
||
df_group.reset_index(drop=True, inplace=True)
|
||
# 判断为0的改成未知城市
|
||
if str(group) == '0' and analysis.event_view['groupBy'][0]['columnDesc'] == '城市':
|
||
q['groups'].append('未知城市')
|
||
else:
|
||
if 'str' in str(type(group)):
|
||
q['groups'].append(str(group))
|
||
else:
|
||
q['groups'].append(str(list(group)))
|
||
concat_data = []
|
||
for i in set(date_range) - set(df_group['date']):
|
||
if len(groupby) > 1:
|
||
concat_data.append((i, *group, 0))
|
||
else:
|
||
concat_data.append((i, group, 0))
|
||
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
|
||
df_group.sort_values('date', inplace=True) # 按date这一列的值在原数据排序
|
||
q['values'].append(df_group['values'].to_list())
|
||
q['sum'].append(round(float(df_group['values'].sum()), 2)) # 求和
|
||
q['avg'].append(round(float(df_group['values'].mean()), 2)) #平均数
|
||
for last_value in df['values'].values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
|
||
else:
|
||
# 无分组
|
||
concat_data = []
|
||
|
||
for i in set(date_range) - set(df['date']):
|
||
concat_data.append((i, 0))
|
||
# 纵向拼接两个表
|
||
if item['event_name'] != '月充总额':
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
# 在原数据上按data排序
|
||
df.sort_values('date', inplace=True)
|
||
if len(df) >= 2:
|
||
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
|
||
if len(df) >= 8:
|
||
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0
|
||
q['values'].append(abs(df['values']).to_list())
|
||
for last_value in df['values'].values[::-1]:
|
||
if last_value > 0:
|
||
q['last_value'] = float(last_value)
|
||
break
|
||
# 求所有值的和
|
||
q['sum'].append(round(abs(float(df['values'].sum())), 2))
|
||
# 求平均值
|
||
q['avg'].append(round(float(df['values'].mean()), 2))
|
||
|
||
# q['eventNameDisplay']=item['event_name_display']
|
||
res.append(q)
|
||
group_label = item['group_label']
|
||
# 按总和排序
|
||
for item in res:
|
||
try:
|
||
if item['time_particle'] in ('P1D', 'P1W'): # 按格式修改年月日
|
||
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
|
||
elif item['time_particle'] in ('P1M',):
|
||
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
|
||
else:
|
||
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
|
||
except:
|
||
pass
|
||
|
||
sort_key = np.argsort(np.array(item['sum']))[::-1] # 将sum中的元素从小到大排列后的结果,提取其对应的索引,然后倒着输出到变量之中
|
||
if item.get('groups'):
|
||
item['groups'] = np.array(item['groups'])[sort_key].tolist()
|
||
groups = []
|
||
# groupbys=analysis.event_view.get('groupBy')
|
||
# groupby_list=[i['columnName'] for i in groupbys]
|
||
for gitem in item['groups']:
|
||
gb = []
|
||
if '(' in gitem or '[' in gitem:
|
||
gitem = gitem.replace('(', '').replace(')', '').replace(' ', '').replace("'", '') \
|
||
.replace('[', '').replace(']', '')
|
||
if isinstance(gitem, list):
|
||
true_list = gitem
|
||
else:
|
||
true_list = gitem.split(',')
|
||
for gstr in true_list:
|
||
gb.append(gstr)
|
||
# 存在标签分组项
|
||
if group_label:
|
||
for name, idx in group_label.items():
|
||
gb.insert(idx, name)
|
||
|
||
# 去掉分组表现里面的''
|
||
appgb = str(gb).replace("'", '')
|
||
groups.append(appgb)
|
||
item['groups'] = groups
|
||
# 修改后的方案
|
||
# by_dict={}
|
||
# for i in range(len(gb)):
|
||
# by_dict[groupby_list[i]]=gb[i]
|
||
# groups.append(by_dict)
|
||
# item['groups'] = groups
|
||
else:
|
||
if group_label:
|
||
groups = []
|
||
gb = []
|
||
for name, idx in group_label.items():
|
||
gb.insert(idx, name)
|
||
# 去掉分组表现里面的''
|
||
appgb = str(gb).replace("'", '')
|
||
groups.append(appgb)
|
||
item['groups'] = groups
|
||
item['groupby'] = groupby
|
||
item['values'] = np.array(item['values'])[sort_key].tolist()
|
||
item['sum'] = np.array(item['sum'])[sort_key].tolist()
|
||
item['avg'] = np.array(item['avg'])[sort_key].tolist()
|
||
res = [item for idx, item in enumerate(res) if idx not in is_hide]
|
||
# 对单个为int的分组进行排序
|
||
if analysis.event_view.get('groupBy','') !=[] and analysis.event_view.get('groupBy','') != '':
|
||
if analysis.event_view.get('groupBy','')[0]['data_type'] == 'int' and len(analysis.event_view.get('groupBy','')) == 1 and len(res[0]['groups']) > 1:
|
||
short_list = sorted([ast.literal_eval(i)[0] for i in res[0]['groups']])
|
||
idx_list = []
|
||
for date in res[0]['groups']:
|
||
for id, i in enumerate(short_list):
|
||
if date == f'[{i}]':
|
||
idx_list.append(id)
|
||
break
|
||
for i in ['groups', 'values', 'sum', 'avg']:
|
||
ad = [0 for nu in range(len(idx_list))]
|
||
for idx, ii in enumerate(idx_list):
|
||
ad[ii] = res[0][i][idx]
|
||
res[0][i] = ad
|
||
# 如是充值排行,限制返回300条
|
||
if res[0].get('event_name','') == '充值排行':
|
||
res[0]['groups'] = res[0]['groups'][0:300]
|
||
res[0]['values'] = res[0]['values'][0:300]
|
||
res[0]['sum'] = res[0]['sum'][0:300]
|
||
res[0]['avg'] = res[0]['avg'][0:300]
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/retention_model_sql")
|
||
async def retention_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""留存查询 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.retention_model_sql2()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/retention_model")
|
||
async def retention_model(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
await analysis.init(data_where=current_user.data_where)
|
||
"""留存分析模型"""
|
||
try:
|
||
res = await analysis.retention_model_sql2() # 初始化开始时间结束时间,sql语句 字典
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
sql = res['sql'] # 获取到sql语句
|
||
df = await ckdb.query_dataframe(sql) # 查询数据
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
date_range = res['date_range'] # 时间 列表
|
||
unit_num = res['unit_num'] # int
|
||
retention_n = res['retention_n'] # 列表 int
|
||
filter_item_type = res['filter_item_type'] # all
|
||
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
|
||
df.set_index('reg_date', inplace=True) # 重新生成下标
|
||
# 补齐没有数据的日期
|
||
for d in set(res['date_range']) - set(df.index):
|
||
df.loc[d] = 0
|
||
|
||
df.sort_index(inplace=True) # 原数据排序
|
||
summary_values = {'均值': {}}
|
||
max_retention_n = 1
|
||
# 留存人数
|
||
avg = {}
|
||
# 流失人数
|
||
avgo = {}
|
||
for date, v in df.T.items():
|
||
# 字典中data存在时不替换,否则将data替换成空字典
|
||
tmp = summary_values.setdefault(date, dict())
|
||
tmp['d0'] = int(v.cnt0)
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
for i in retention_n:
|
||
n = (pd.Timestamp.now().date() - date).days
|
||
if i > n:
|
||
continue
|
||
# max_retention_n = i if i > max_retention_n else max_retention_n
|
||
# 留存的人数
|
||
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
|
||
# 流失的人数
|
||
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
|
||
tmp['p'].append(v[f'p{i}'])
|
||
tmp['n'].append(v[f'cnt{i}'])
|
||
tmp['p_outflow'].append(v[f'op{i}'])
|
||
tmp['n_outflow'].append(v[f'on{i}'])
|
||
tmp = summary_values['均值']
|
||
retention_avg_dict = {}
|
||
|
||
for rn in retention_n:
|
||
for rt, rd in df.T.items():
|
||
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
|
||
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
|
||
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
|
||
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
|
||
# 算均值
|
||
tmp['p'] = [] # 留存率
|
||
tmp['n'] = [] # 留存人数
|
||
tmp['p_outflow'] = [] # 流失率
|
||
tmp['n_outflow'] = [] # 流失人数
|
||
tmp['d0'] = 0 # 总人数
|
||
for rt, rd in retention_avg_dict.items():
|
||
tmp['d0'] = int(df['cnt0'].sum())
|
||
n = round(rd['cntn'] * 100 / rd['cnt0'], 2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p'].append(n)
|
||
tmp['n'].append(rd['cntn'])
|
||
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p_outflow'].append(n)
|
||
tmp['n_outflow'].append(rd['o_cntn'])
|
||
# 次留数
|
||
title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]]
|
||
|
||
# 未到达的日期需要补齐-
|
||
retention_length = len(retention_n)
|
||
for _, items in summary_values.items():
|
||
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
|
||
items[key].extend(['-'] * (retention_length - len(items[key])))
|
||
|
||
resp = {
|
||
'summary_values': summary_values, # 留存/流失数据
|
||
# 'values': values,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range], # 时间范围
|
||
'title': title, # 表头
|
||
'filter_item_type': filter_item_type,
|
||
'filter_item': filter_item,
|
||
'start_date': res['start_date'], # 开始时间
|
||
'end_date': res['end_date'], # 结束时间
|
||
'time_particle': res['time_particle'] # 时间类型
|
||
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
|
||
@router.post("/retention_model_details")
|
||
async def retention_model(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
await analysis.init(data_where=current_user.data_where)
|
||
"""留存分析分组详情"""
|
||
try:
|
||
res = await analysis.retention_model_sql3() # 初始化开始时间结束时间,sql语句 字典
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
sql = res['sql'] # 获取到sql语句
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
date_range = res['date_range'] # 时间 列表
|
||
# unit_num = res['unit_num'] # int
|
||
retention_n = res['retention_n'] # 列表 int
|
||
filter_item_type = res['filter_item_type'] # all
|
||
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
|
||
# 映射对应中文返回给前端展示
|
||
groupby_list = analysis.event_view.get('groupBy')
|
||
groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label']
|
||
true_group = [] # 定义分组实际选择
|
||
for g_data in groupby_list:
|
||
data_type = g_data['data_type']
|
||
|
||
# 不是int类型
|
||
if data_type != "int":
|
||
true_group.append("str")
|
||
continue
|
||
|
||
# 自定义区间
|
||
if g_data['intervalType'] == 'user_defined':
|
||
int_range = g_data['quotaIntervalArr']
|
||
chk_range = []
|
||
for index, value in enumerate(int_range):
|
||
# 开头
|
||
if index == 0:
|
||
chk_range.append(['-', value])
|
||
# 只有两个数
|
||
if len(int_range) >= 2:
|
||
chk_range.append([value, int_range[index + 1]])
|
||
continue
|
||
# 结尾
|
||
if index + 1 >= len(int_range):
|
||
chk_range.append([value, '+'])
|
||
continue
|
||
# 中间
|
||
chk_range.append([value, int_range[index + 1]])
|
||
true_group.append(chk_range)
|
||
|
||
# 默认区间
|
||
elif g_data['intervalType'] == 'def':
|
||
zidai = []
|
||
max_v = int(df[g_data['columnName']].max())
|
||
min_v = int(df[g_data['columnName']].min())
|
||
interval = (max_v - min_v) // 10 or 1
|
||
for i in range(min_v, max_v + 1, interval):
|
||
zidai.append([i, i + interval])
|
||
true_group.append(zidai)
|
||
|
||
# 离散数字
|
||
else:
|
||
true_group.append('discrete')
|
||
|
||
if len(groupby_list) == 1:
|
||
max_v = int(df[groupby_list[0]['columnName']].max())
|
||
min_v = int(df[groupby_list[0]['columnName']].min())
|
||
for i in groupby: # 映射区服
|
||
if i == 'svrindex':
|
||
if game == 'mfmh5':
|
||
game = 'mzmfmh5'
|
||
chinese = {}
|
||
resp = await crud.select_map.get_one(db, game, i)
|
||
if not resp: # 如果没有配置相关的映射数据就跳过
|
||
continue
|
||
for ii in resp:
|
||
chinese[ii['id']] = ii['title']
|
||
for k, v in chinese.items():
|
||
# 开始映射
|
||
df.loc[df['svrindex'] == k, 'svrindex'] = v
|
||
times = df['reg_date'][0]
|
||
df.set_index(groupby, inplace=True) # 重新生成下标
|
||
# for d in set(res['date_range']) - set(df.index):
|
||
# df.loc[d] = 0
|
||
|
||
df.sort_index(inplace=True) # 排序
|
||
summary_values = {'均值': {}}
|
||
max_retention_n = 1
|
||
# 留存人数
|
||
avg = {}
|
||
# 流失人数
|
||
avgo = {}
|
||
for date, v in df.T.items():
|
||
# 字典中data存在时不替换,否则将data替换成空字典
|
||
tmp = summary_values.setdefault(date, dict())
|
||
tmp['d0'] = int(v.cnt0)
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
for i in retention_n:
|
||
n = (pd.Timestamp.now().date() - v[0]).days
|
||
if i > n:
|
||
continue
|
||
# max_retention_n = i if i > max_retention_n else max_retention_n
|
||
# 留存的人数
|
||
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
|
||
# 流失的人数
|
||
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
|
||
tmp['p'].append(v[f'p{i}'])
|
||
tmp['n'].append(v[f'cnt{i}'])
|
||
tmp['p_outflow'].append(v[f'op{i}'])
|
||
tmp['n_outflow'].append(v[f'on{i}'])
|
||
tmp = summary_values['均值']
|
||
retention_avg_dict = {}
|
||
group_label = res['group_label']
|
||
|
||
# 多个分组项时,合成列表返回
|
||
if not group_label:
|
||
if len(groupby) > 1:
|
||
summary_valuess = {}
|
||
for k, v in summary_values.items():
|
||
if 'str' in str(type(k)):
|
||
summary_valuess[str([k])] = v
|
||
else:
|
||
summary_valuess[str(list(k))] = v
|
||
|
||
else:
|
||
summary_valuess = summary_values
|
||
# 包含标签分组项
|
||
else:
|
||
summary_valuess = {}
|
||
for k, v in summary_values.items():
|
||
key = list(k)
|
||
# 增加标签分组到对应的key里面
|
||
for name, index in group_label.items():
|
||
key.insert(index, name)
|
||
summary_valuess[str(key)] = v
|
||
|
||
for rn in retention_n:
|
||
for rt, rd in df.T.items():
|
||
if times + datetime.timedelta(days=rn) <= pd.datetime.now().date():
|
||
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
|
||
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
|
||
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
|
||
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
tmp['d0'] = 0
|
||
for rt, rd in retention_avg_dict.items():
|
||
tmp['d0'] = int(df['cnt0'].sum())
|
||
n = round(rd['cntn'] * 100 / rd['cnt0'], 2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p'].append(n)
|
||
tmp['n'].append(rd['cntn'])
|
||
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p_outflow'].append(n)
|
||
tmp['n_outflow'].append(rd['o_cntn'])
|
||
# 如果分组项是int类型按选择的分组
|
||
if '均值' in summary_valuess:
|
||
summary_valuess.pop('均值')
|
||
if "['均值']" in summary_valuess:
|
||
summary_valuess.pop("['均值']")
|
||
new_summary_valuess = {}
|
||
for group_key, group_data in summary_valuess.items():
|
||
# 单个分组
|
||
if len(true_group) <= 1:
|
||
key_list = [group_key]
|
||
else:
|
||
key_list = eval(group_key)
|
||
true_key = [] # 重新定义后的分组
|
||
for index, value in enumerate(key_list):
|
||
|
||
true_group_index = true_group[index]
|
||
# 默认区间或者自定义区间
|
||
if isinstance(true_group_index, list):
|
||
for defined_list in true_group_index:
|
||
defined_list_max = defined_list[1]
|
||
defined_list_min = defined_list[0]
|
||
if defined_list_min == '-':
|
||
if value < defined_list_max:
|
||
true_key.append(defined_list)
|
||
break
|
||
else:
|
||
continue
|
||
if defined_list_max == '+':
|
||
if value >= defined_list_min:
|
||
true_key.append(defined_list)
|
||
break
|
||
else:
|
||
continue
|
||
|
||
if defined_list_min <= value < defined_list_max:
|
||
true_key.append(defined_list)
|
||
break
|
||
continue
|
||
continue
|
||
|
||
# 分组是字符串或者离散直接取这个值得str类型
|
||
if true_group_index in ['str', 'discrete']:
|
||
true_key.append(str(value))
|
||
continue
|
||
|
||
# 这个分组不存在:
|
||
if str(true_key) not in new_summary_valuess:
|
||
new_summary_valuess[str(true_key)] = group_data
|
||
continue
|
||
|
||
# 这个分组已存在
|
||
# d0相加
|
||
new_summary_valuess[str(true_key)]['d0'] += group_data['d0']
|
||
|
||
# n相加
|
||
n_list = new_summary_valuess[str(true_key)]['n']
|
||
n_list1 = group_data['n']
|
||
sum_n_lst = [x + y for x, y in zip(n_list, n_list1)]
|
||
new_summary_valuess[str(true_key)]['n'] = sum_n_lst
|
||
|
||
# n_outflow相加
|
||
n_outflow_list = new_summary_valuess[str(true_key)]['n_outflow']
|
||
n_outflow_list1 = group_data['n_outflow']
|
||
sum_n_ourflow_lst = [x + y for x, y in zip(n_outflow_list, n_outflow_list1)]
|
||
new_summary_valuess[str(true_key)]['n_outflow'] = sum_n_ourflow_lst
|
||
|
||
# 计算概率
|
||
for key1, value1 in new_summary_valuess.items():
|
||
new_summary_valuess[key1]['p'] = [round(i * 100 / value1['d0'], 2) for i in value1['n']]
|
||
new_summary_valuess[key1]['p_outflow'] = [round(i1 * 100 / value1['d0'], 2) for i1 in value1['n_outflow']]
|
||
|
||
title = ['分组项', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]]
|
||
|
||
# 未到达的日期需要补齐-
|
||
retention_length = len(retention_n)
|
||
for _, items in new_summary_valuess.items():
|
||
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
|
||
items[key].extend(['-'] * (retention_length - len(items[key])))
|
||
groupby_data = []
|
||
for i in new_summary_valuess.keys():
|
||
ac = ast.literal_eval(i)
|
||
ab = [str(ii) for ii in ac]
|
||
groupby_data.append(ab)
|
||
resp = {
|
||
'summary_values': new_summary_valuess,
|
||
# 'values': values,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
|
||
'title': title,
|
||
'filter_item_type': filter_item_type,
|
||
'filter_item': filter_item,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle'],
|
||
'groupby': [i['columnName'] for i in groupby_list],
|
||
'groupby_data': groupby_data
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
# 废弃
|
||
async def retention_model01(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.retention_model_sql2() # 初始化开始时间结束时间,sql语句 字典
|
||
sql = res['sql'] # 获取到sql语句
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
date_range = res['date_range'] # 时间 列表
|
||
unit_num = res['unit_num'] # int
|
||
retention_n = res['retention_n'] # 列表 int
|
||
filter_item_type = res['filter_item_type'] # all
|
||
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
|
||
df.set_index('reg_date', inplace=True)
|
||
for d in set(res['date_range']) - set(df.index):
|
||
df.loc[d] = 0
|
||
|
||
df.sort_index(inplace=True)
|
||
summary_values = {'均值': {}}
|
||
max_retention_n = 1
|
||
avg = {}
|
||
avgo = {}
|
||
for date, v in df.T.items():
|
||
# 字典中data存在时不替换,否则将data替换成空字典
|
||
tmp = summary_values.setdefault(date, dict())
|
||
tmp['d0'] = int(v.cnt0)
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
for i in retention_n:
|
||
n = (pd.Timestamp.now().date() - date).days
|
||
if i > n:
|
||
continue
|
||
# max_retention_n = i if i > max_retention_n else max_retention_n
|
||
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
|
||
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
|
||
tmp['p'].append(round(100 - v[f'p{i}'], 2))
|
||
# tmp['p'].append(v[f'p{i}'])
|
||
tmp['n'].append(v[f'cnt{i}'])
|
||
tmp['p_outflow'].append(v[f'op{i}'])
|
||
tmp['n_outflow'].append(v[f'on{i}'])
|
||
tmp = summary_values['均值']
|
||
retention_avg_dict = {}
|
||
|
||
for rn in retention_n:
|
||
for rt, rd in df.T.items():
|
||
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
|
||
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
|
||
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
|
||
|
||
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
|
||
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
|
||
|
||
tmp['p'] = []
|
||
tmp['n'] = []
|
||
tmp['p_outflow'] = []
|
||
tmp['n_outflow'] = []
|
||
tmp['d0'] = 0
|
||
for rt, rd in retention_avg_dict.items():
|
||
tmp['d0'] = int(df['cnt0'].sum())
|
||
n = round(100 - (rd['cntn'] * 100 / rd['cnt0']), 2)
|
||
# n = round(rd['cntn'] * 100 / rd['cnt0'],2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p'].append(n)
|
||
tmp['n'].append(rd['cntn'])
|
||
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
|
||
n = 0 if np.isnan(n) else n
|
||
tmp['p_outflow'].append(n)
|
||
tmp['n_outflow'].append(rd['o_cntn'])
|
||
|
||
title = ['日期', '用户数', '次流失', *[f'{i + 1}流失' for i in retention_n[1:]]]
|
||
|
||
# 未到达的日期需要补齐-
|
||
retention_length = len(retention_n)
|
||
for _, items in summary_values.items():
|
||
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
|
||
items[key].extend(['-'] * (retention_length - len(items[key])))
|
||
|
||
resp = {
|
||
'summary_values': summary_values,
|
||
# 'values': values,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
|
||
'title': title,
|
||
'filter_item_type': filter_item_type,
|
||
'filter_item': filter_item,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
|
||
@router.post("/retention_model_export")
|
||
async def retention_model_export(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
""" 留存分析模型 数据导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.retention_model_sql2()
|
||
file_name = quote(f'留存分析.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
|
||
sql = data['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
df_to_stream = DfToStream((df, '留存分析'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
# 弃用
|
||
@router.post("/retention_model_del", deprecated=True)
|
||
async def retention_model_del(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""留存数据模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.retention_model_sql()
|
||
sql = res['sql']
|
||
date_range = res['date_range']
|
||
event_a, event_b = res['event_name']
|
||
unit_num = res['unit_num']
|
||
title = await crud.event_mana.get_show_name(db, game, event_a)
|
||
title = f'{title}用户数'
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
concat_data = []
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
df['date'] = df['date'].apply(lambda x: x.date())
|
||
# 计算整体
|
||
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
|
||
summary_values = {}
|
||
for i, d1 in enumerate(date_range):
|
||
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
|
||
if not a:
|
||
continue
|
||
key = d1.strftime('%Y-%m-%d')
|
||
for j, d2 in enumerate(date_range[i:]):
|
||
if j > unit_num:
|
||
break
|
||
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
|
||
tmp = summary_values.setdefault(key, {})
|
||
tmp.setdefault('d0', len(a))
|
||
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
||
tmp.setdefault('n', []).append(len(a & b))
|
||
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
|
||
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
|
||
groups = set([tuple(i) for i in df[res['groupby']].values])
|
||
df.set_index(res['groupby'], inplace=True)
|
||
df.sort_index(inplace=True)
|
||
values = {}
|
||
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
|
||
for i, d1 in enumerate(date_range):
|
||
for g in groups:
|
||
if len(g) == 1:
|
||
continue
|
||
a = set(df.loc[g]['val_a']) if g in df.index else set()
|
||
if not a:
|
||
continue
|
||
key = d1.strftime("%Y-%m-%d")
|
||
tmp_g = values.setdefault(key, {})
|
||
for j, d2 in enumerate(date_range[i:]):
|
||
if j > unit_num:
|
||
break
|
||
b = set(df.loc[g]['val_b']) if g in df.index else set()
|
||
tmp = tmp_g.setdefault(','.join(g[1:]), {})
|
||
tmp.setdefault('d0', len(a))
|
||
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
||
tmp.setdefault('n', []).append(len(a & b))
|
||
data = {
|
||
'summary_values': summary_values,
|
||
'values': values,
|
||
'days': days,
|
||
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
|
||
'title': title,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
|
||
|
||
@router.post("/funnel_model_sql")
|
||
async def funnel_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""漏斗数据模型 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.funnel_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/funnel_model")
|
||
async def funnel_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""漏斗数据模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.funnel_model_sql()
|
||
sql = res['sql']
|
||
# 查询的时间
|
||
date_range = res['date_range']
|
||
cond_level = res['cond_level']
|
||
groupby = res['groupby']
|
||
switch_test = analysis.event_view.get('switchTest', True)
|
||
if switch_test: # 界面的开关是开的走这里
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
# 补齐level数据
|
||
concat_data = []
|
||
for key, tmp_df in df.groupby(['date'] + groupby):
|
||
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
|
||
for item in not_exists_level:
|
||
key = key if isinstance(key, tuple) else (key,)
|
||
concat_data.append((*key, item, 0))
|
||
# 合并数据
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
||
|
||
# df.set_index('date',inplace=True)
|
||
data_list = []
|
||
date_data = {}
|
||
if df.shape == (0, 0): # 元组的第一个元素代表行数,第二个元素代表列数,判断数据大小
|
||
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
|
||
|
||
tmp = {'title': '总体'}
|
||
# 以level分组后的和
|
||
tmp_df = df[['level', 'values']].groupby('level').sum()
|
||
# 在原数据上对索引进行排序
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list() # 取出values这一列的所有数据返回成列表
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
data_list.append(tmp)
|
||
|
||
# 补齐日期
|
||
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
|
||
concat_data = []
|
||
for i in all_idx - set(df.set_index(['date', 'level']).index):
|
||
concat_data.append((*i, 0))
|
||
summary_df = pd.concat(
|
||
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
|
||
for key, tmp_df in summary_df.groupby('date'):
|
||
tmp_df = tmp_df.groupby('level').sum()
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() # 数据求和
|
||
|
||
tmp = dict()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
||
var = 0 if np.isnan(var) else var
|
||
tmp['p1'].append(var)
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
|
||
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
|
||
_['总体'] = tmp
|
||
# 分组
|
||
if groupby:
|
||
# 补齐数据
|
||
concat_data = []
|
||
idx = set(df.set_index(['date'] + groupby).index) # 下标去重
|
||
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
|
||
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
|
||
concat_data.append((*i, 0))
|
||
|
||
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # 合并数据
|
||
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
|
||
# 映射对应中文返回给前端展示
|
||
for i in groupby:
|
||
if i == 'svrindex':
|
||
if game == 'mfmh5':
|
||
game = 'mzmfmh5'
|
||
chinese = {}
|
||
resp = await crud.select_map.get_one(db, game, i)
|
||
if not resp:
|
||
continue
|
||
for ii in resp:
|
||
chinese[ii['id']] = ii['title']
|
||
for k, v in chinese.items():
|
||
# 开始映射
|
||
df.loc[df[i] == k, i] = v
|
||
for key, tmp_df in df.groupby(groupby):
|
||
tmp = {'title': key}
|
||
tmp_df = tmp_df.groupby('level').sum() #按level分组求和
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
||
var = 0 if np.isnan(var) else var
|
||
tmp['p1'].append(var)
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
data_list.append(tmp)
|
||
|
||
for key, tmp_df in df.groupby(['date'] + groupby):
|
||
|
||
tmp_df = tmp_df.groupby('level').sum()
|
||
tmp_df.sort_index(inplace=True)
|
||
for i in tmp_df.index:
|
||
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
||
|
||
tmp = dict()
|
||
|
||
tmp['n'] = tmp_df['values'].to_list()
|
||
tmp['p1'] = [100]
|
||
# tmp['p2'] = []
|
||
for i, v in tmp_df.loc[2:, 'values'].items():
|
||
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
||
var = 0 if np.isnan(var) else var
|
||
tmp['p1'].append(var)
|
||
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
||
|
||
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
|
||
# [key[1]] = tmp
|
||
title = (groupby or ['总体']) + cond_level
|
||
resp = {'list': data_list, # 数据列表
|
||
'date_data': date_data,
|
||
'title': title, # 表头
|
||
'level': cond_level, # 所选事件
|
||
'switch_test': switch_test,
|
||
'start_date': res['start_date'], # 开始时间
|
||
'end_date': res['end_date'], # 结束时间
|
||
'time_particle': res['time_particle'] # 时间类型
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
else:
|
||
try:
|
||
res = await analysis.guide_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
group_str = res['analysis']
|
||
# 转int
|
||
df[group_str] = df[group_str].astype(int) # 声明这一列的数据类型为int
|
||
step_list = [str(i) for i in sorted(df[group_str].unique())] # 唯一性去重排序
|
||
dict_k = {}
|
||
df['values'] = 1
|
||
for k, nedf in df.groupby("date"):
|
||
ste_k = {}
|
||
for kk, ste_df in nedf.groupby(group_str):
|
||
value_list = ste_df.iloc[:, -1].to_list()
|
||
ste_k[str(kk)] = int(sum(value_list))
|
||
for ste in step_list:
|
||
if ste not in list(ste_k.keys()):
|
||
ste_k[ste] = 0
|
||
dict_k[str(k)] = ste_k
|
||
p_data = {}
|
||
data = {}
|
||
for dict_key, dict_data in dict_k.items():
|
||
dict_data1 = deepcopy(dict_data)
|
||
dict_k1 = {int(k): v for k, v in dict_data1.items()}
|
||
sorted(dict_k1.keys())
|
||
data_values = list(dict_k1.values())
|
||
p_values = [round(i / sum(data_values), 2) or 0 for i in data_values]
|
||
p_values.insert(0, dict_key)
|
||
data_values.insert(0, dict_key)
|
||
data[dict_key] = data_values
|
||
p_data[dict_key] = p_values
|
||
|
||
step_list.insert(0, '日期')
|
||
resp = {'list': data,
|
||
'date_data': p_data,
|
||
'title': '1',
|
||
'level': step_list,
|
||
'switch_test': switch_test,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
|
||
@router.post("/scatter_model_sql")
|
||
async def scatter_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""分布分析 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.scatter_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/scatter_model_export")
|
||
async def retention_model_export(request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
""" 分布分析 数据导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.scatter_model_sql()
|
||
file_name = quote(f'分布分析.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
interval_type = res['interval_type']
|
||
analysis = res['analysis']
|
||
groupby = res['groupby']
|
||
quota_interval_arr = res['quota_interval_arr']
|
||
# 兼容合计的
|
||
if res['time_particle'] == 'total':
|
||
df['date'] = '合计'
|
||
if analysis != 'number_of_days' and interval_type != 'discrete':
|
||
max_v = int(df['values'].max())
|
||
min_v = int(df['values'].min())
|
||
interval = (max_v - min_v) // 10 or 1
|
||
resp = {'list': dict(),
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
|
||
if not quota_interval_arr:
|
||
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
|
||
bins = [i for i in range(min_v, max_v + interval, interval)]
|
||
else:
|
||
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
||
resp['label'] = []
|
||
bins = [quota_interval_arr[0]]
|
||
for i, v in enumerate(quota_interval_arr[1:]):
|
||
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
||
bins.append(v)
|
||
|
||
# 这是整体的
|
||
for key, tmp_df in df.groupby('date'):
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=False).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
if res['time_particle'] == 'total':
|
||
resp['list']['合计'] = dict()
|
||
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': '总体'}
|
||
else:
|
||
resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
||
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': '总体'}
|
||
|
||
# 分组的
|
||
if groupby:
|
||
export_df = pd.DataFrame(columns=resp['label'])
|
||
|
||
for key, tmp_df in df.groupby(['date', *groupby]):
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=False).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
title = '.'.join(key[1:])
|
||
date = key[0]
|
||
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': title
|
||
}
|
||
|
||
export_df.loc[(date.strftime('%Y-%m-%d'), title)] = bins_s.to_list()
|
||
|
||
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime,
|
||
headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
# elif analysis == 'number_of_days':
|
||
else:
|
||
resp = {'list': {}, 'label': [],
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
labels = [str(i) for i in sorted(df['values'].unique())]
|
||
resp['label'] = labels
|
||
for key, tmp_df in df.groupby(['date']):
|
||
total = len(tmp_df)
|
||
if res['time_particle'] == 'total':
|
||
dt = '合计'
|
||
else:
|
||
dt = key.strftime('%Y-%m-%d')
|
||
labels_dict = {}
|
||
for key2, tmp_df2 in tmp_df.groupby('values'):
|
||
label = str(key2)
|
||
n = len(tmp_df2)
|
||
labels_dict[label] = n
|
||
|
||
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
|
||
|
||
export_df = pd.DataFrame(columns=resp['label'])
|
||
for d, v in resp['list'].items():
|
||
export_df.loc[d] = v['总体']['n']
|
||
|
||
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/scatter_model")
|
||
async def scatter_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""分布分析 模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
event_type = analysis.events[0]['eventName']
|
||
try:
|
||
res = await analysis.scatter_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
end_date = analysis.end_date # 开始时间
|
||
start_date = analysis.start_date # 结束时间
|
||
where = analysis.events[-1]['quotaname'] # 查询条件的字段名
|
||
sql = res['sql']
|
||
# columnName = analysis.events[-1]['label_id']
|
||
|
||
# 查询买量渠道owner为kuaiyou3的日注册玩家等级分布
|
||
# sql_list=sql.split("GROUP BY")
|
||
# sql01 = """and xiangsu.event.owner_name='kuaiyou3'GROUP BY"""""
|
||
# new_sql=sql_list[0]+sql01+sql_list[1]
|
||
# if columnName != '':
|
||
# sql = sql.replace('SELECT', f'SELECT {columnName},', 1)
|
||
# sql += f',{columnName}'
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
if 'list' in str(type(df['values'][0])):
|
||
# f=lambda x:x[0]
|
||
# df['values']=df['values'].map(f)
|
||
df = df.explode("values").reset_index(drop=True) # 取字典类型数据并重置索引
|
||
# df['values']=df['values'].astype(str)
|
||
df.fillna(0, inplace=True) # 在原数据把缺失值填充为0
|
||
# 转换数据类型为int
|
||
if analysis.events[-1].get('analysis') != 'uniqExact': # 如果不是去重数的话声明值为int类型
|
||
df['values'] = df['values'].astype(int)
|
||
else:
|
||
df['values'] = df['values'].astype(str) # 统一声明使用去重数的时候为str
|
||
interval_type = res['interval_type'] # 在前端的展现形式,默认/离散/自定义
|
||
analysi = res['analysis']
|
||
groupby = res['groupby']
|
||
quota_interval_arr = res['quota_interval_arr']
|
||
# 兼容合计的
|
||
if res['time_particle'] == 'total': # 计算所有的数据走这里
|
||
df['date'] = '合计'
|
||
|
||
if analysi != 'number_of_days' and interval_type != 'discrete':
|
||
try:
|
||
max_v = int(df['values'].max()) # 数值中的最大值
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='请用离散数字', data=None)
|
||
min_v = int(df['values'].min()) # 数值中的最小值
|
||
interval = (max_v - min_v) // 10 or 1 # 大小值之间10等分的区间数值
|
||
resp = {'list': dict(),
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
|
||
if not quota_interval_arr:
|
||
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)] # 表头
|
||
bins = [i for i in range(min_v, max_v + interval, interval)] #区间
|
||
else:
|
||
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
||
resp['label'] = []
|
||
bins = [quota_interval_arr[0]]
|
||
for i, v in enumerate(quota_interval_arr[1:]):
|
||
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
||
bins.append(v)
|
||
|
||
# 这是整体的
|
||
for key, tmp_df in df.groupby('date'):
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=False, include_lowest=True).value_counts() # 按区间分开
|
||
bins_s.sort_index(inplace=True) # 重新排序
|
||
total = int(bins_s.sum()) # 算出数量
|
||
if res['time_particle'] == 'total': # 合计的数据
|
||
resp['list']['合计'] = dict()
|
||
p = list(round(bins_s * 100 / total, 2).to_list())
|
||
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': [str(i) + '%' for i in p],
|
||
'title': '总体'}
|
||
else: # 按时间分开算数据
|
||
p = list(round(bins_s * 100 / total, 2).to_list())
|
||
resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
||
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': [str(i) + '%' for i in p],
|
||
'title': '总体'}
|
||
# 分组的
|
||
# if groupby:
|
||
# for key, tmp_df in df.groupby(['date', *groupby]):
|
||
# bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
# right=False).value_counts()
|
||
# bins_s.sort_index(inplace=True)
|
||
# total = int(bins_s.sum())
|
||
# title = '.'.join(key[1:])
|
||
# date = key[0]
|
||
# resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
|
||
# 'p': round((bins_s * 100 / total).fillna(0),
|
||
# 2).to_list(),
|
||
# 'title': title
|
||
# }
|
||
download = analysis.event_view.get('download', '')
|
||
if download == 1: # 下载数据
|
||
creat_df = create_df(resp)
|
||
Download = Download_xlsx(creat_df, '分布分析')
|
||
return Download
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
# elif analysis == 'number_of_days':
|
||
else:
|
||
# 离散数字
|
||
resp = {'list': {}, 'label': [],
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
labels = [str(i) for i in sorted(df['values'].unique())]
|
||
resp['label'] = labels
|
||
shaixuan = analysis.events[0].get('analysis')
|
||
for key, tmp_df in df.groupby(['date']): # 按日期分组
|
||
if shaixuan == 'uniqExact':
|
||
total = len(set(tmp_df['uid']))
|
||
else:
|
||
total = len(tmp_df)
|
||
if res['time_particle'] == 'total':
|
||
dt = '合计'
|
||
else:
|
||
dt = key.strftime('%Y-%m-%d')
|
||
|
||
labels_dict = {}
|
||
for key2, tmp_df2 in tmp_df.groupby('values'):
|
||
label = str(key2)
|
||
n = len(tmp_df2)
|
||
labels_dict[label] = n
|
||
if event_type == 'pay':
|
||
# 如需要2之后所有之和,则执行下面代码,返回值为字典的labels_dict01
|
||
labels_dict01 = {}
|
||
v = -1
|
||
for i in labels:
|
||
v += 1
|
||
if int(i) == 1:
|
||
labels_dict01["1"] = labels_dict["1"]
|
||
else:
|
||
# for number in labels_dict.keys():
|
||
# if number >=i:
|
||
values = list(labels_dict.values())
|
||
n = sum(values[v:])
|
||
labels_dict01[i] = n
|
||
# 传入百分比数据
|
||
list_p = []
|
||
for i in labels:
|
||
number_int = round(labels_dict01.get(i, 0) * 100 / total, 2)
|
||
number_str = str(number_int) + '%'
|
||
list_p.append(number_str)
|
||
|
||
resp['list'][dt] = {'总体': {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}}
|
||
else:
|
||
list_p = []
|
||
for i in labels:
|
||
number_int = round(labels_dict.get(i, 0) * 100 / total, 2)
|
||
number_str = str(number_int) + '%'
|
||
list_p.append(number_str)
|
||
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}} # 整合出数据
|
||
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
|
||
if where == "step_id" and event_type == "guide":
|
||
sql = f"""SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, count(DISTINCT {game}.event."#account_id") AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{start_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = 'create_account' GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date"""
|
||
df = await ckdb.query_dataframe(sql)
|
||
# for kk, ste_df in df.groupby('date'):
|
||
# if str(kk) not in resp['list']:
|
||
# continue
|
||
# value_list = ste_df.iloc[:, -1].to_list()
|
||
# resp['list'][str(kk)]['总体']['total'] = int(sum(value_list))
|
||
for i in range(len(df)):
|
||
if str(df['date'][i]) not in resp['list']:
|
||
continue
|
||
resp['list'][str(df['date'][i])]['总体']['total'] = int(df['values'][i])
|
||
# 兼容下载功能
|
||
download = analysis.event_view.get('download', '')
|
||
if download == 1:
|
||
creat_df = create_df(resp)
|
||
Download = Download_xlsx(creat_df, '分布分析')
|
||
return Download
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
|
||
# bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
# right=False).value_counts()
|
||
# bins_s.sort_index(inplace=True)
|
||
# total = int(bins_s.sum())
|
||
# resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
||
# resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
||
# 'p': round(bins_s * 100 / total, 2).to_list(),
|
||
# 'title': '总体'}
|
||
|
||
# 疑弃用,有同类型分布分析
|
||
@router.post("/guide_model")
|
||
async def guide_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""分布分析 模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
event_type = analysis.events[0]['eventName']
|
||
try:
|
||
res = await analysis.guide_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
group_str = res['analysis']
|
||
# 转int
|
||
df[group_str] = df[group_str].astype(int)
|
||
step_list = [str(i) for i in sorted(df[group_str].unique())]
|
||
dict_k = {}
|
||
for k, nedf in df.groupby("date"):
|
||
ste_k = {}
|
||
for kk, ste_df in nedf.groupby(group_str):
|
||
value_list = ste_df.iloc[:, -1].to_list()
|
||
ste_k[str(kk)] = int(sum(value_list))
|
||
for ste in step_list:
|
||
if ste not in list(ste_k.keys()):
|
||
ste_k[ste] = 0
|
||
dict_k[str(k)] = ste_k
|
||
p_data = {}
|
||
data = {}
|
||
for dict_key, dict_data in dict_k.items():
|
||
dict_data1 = deepcopy(dict_data) # 深拷贝一份数据
|
||
dict_k1 = {int(k): v for k, v in dict_data1.items()}
|
||
sorted(dict_k1.keys()) # 按key排序
|
||
data_values = list(dict_k1.values())
|
||
p_values = [round(i / sum(data_values), 2) or 0 for i in data_values]
|
||
p_values.insert(0, dict_key)
|
||
data_values.insert(0, dict_key)
|
||
data[dict_key] = data_values
|
||
p_data[dict_key] = p_values
|
||
|
||
step_list.insert(0, '日期')
|
||
res_msg = {
|
||
'level': step_list,
|
||
'list': data,
|
||
'p': p_data
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=res_msg)
|
||
|
||
|
||
@router.post("/first_event_model")
|
||
async def first_event_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""首次事件触发时间分析 模型"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
try:
|
||
res = await analysis.first_event_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
step_list = []
|
||
groups = analysis.event_view.get('group', [30, 60, 120, 240])
|
||
for index, num_int in enumerate(groups):
|
||
# 第一个区间
|
||
if index == 0:
|
||
step_list.append(['-', num_int])
|
||
|
||
if index + 1 < len(groups):
|
||
end_num = groups[index + 1]
|
||
step_list.append([num_int, end_num])
|
||
|
||
# 最后一个数
|
||
if index + 1 >= len(groups):
|
||
step_list.append([num_int, '+'])
|
||
|
||
dict_k = {}
|
||
for k, nedf in df.groupby("dff_time"):
|
||
value_list = nedf.iloc[:, -1].to_list()
|
||
sum_num = sum(value_list)
|
||
key = ''
|
||
for i in step_list:
|
||
if i[0] == '-':
|
||
if k < i[1]:
|
||
key = str(i)
|
||
break
|
||
else:
|
||
continue
|
||
if i[1] == '+':
|
||
if k >= i[0]:
|
||
key = str(i)
|
||
break
|
||
if i[0] <= k < i[1]:
|
||
key = str(i)
|
||
break
|
||
if key in dict_k:
|
||
dict_k[key] += sum_num
|
||
else:
|
||
dict_k[key] = sum_num
|
||
|
||
list_data = []
|
||
for i in step_list:
|
||
if str(i) not in dict_k:
|
||
list_data.append(0)
|
||
continue
|
||
list_data.append(dict_k[str(i)])
|
||
|
||
all_num = sum(list_data)
|
||
p_data = [round(v * 100 / all_num, 2) for v in list_data]
|
||
|
||
true_step = [str(i) for i in step_list]
|
||
res_msg = {
|
||
'level': true_step,
|
||
'list': list_data,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'p': p_data
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=res_msg)
|
||
|
||
|
||
@router.post("/scatter_model_details")
|
||
async def scatter_model(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""分布分析分组详情"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
try:
|
||
res = await analysis.scatter_model_sql()
|
||
except Exception as e:
|
||
return schemas.Msg(code=-9, msg='报表配置参数异常')
|
||
event_type = analysis.events[0]['eventName']
|
||
where = analysis.events[-1]['quotaname']
|
||
sql = res['sql']
|
||
group_by = analysis.event_view['groupBy']
|
||
# 排除标签
|
||
true_group = [i for i in group_by if i['data_type'] != "user_label"]
|
||
# columnName = true_group[-1]['columnName']
|
||
columnName = ''
|
||
if true_group != []:
|
||
# if columnName != '':
|
||
# # 按天分组
|
||
# sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va',
|
||
# 1)
|
||
# sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1)
|
||
# # 按周分组
|
||
# sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date',
|
||
# f'`{columnName}` as va',
|
||
# 1)
|
||
# sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1)
|
||
# # 按月分组
|
||
# sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date',
|
||
# f'`{columnName}` as va',
|
||
# 1)
|
||
# sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1)
|
||
# # 合计
|
||
# if analysis.event_view.get('timeParticleSize') == "total":
|
||
# sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1)
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
if 'list' in str(type(df['values'][0])):
|
||
# f = lambda x: x[0]
|
||
# df['values'] = df['values'].map(f)
|
||
df = df.explode("values").reset_index(drop=True)
|
||
df.fillna(0, inplace=True)
|
||
# 转换数据类型为int
|
||
if analysis.events[-1].get('analysis') != 'uniqExact':
|
||
df['values'] = df['values'].astype(int)
|
||
else:
|
||
df['values'] = df['values'].astype(str) # 统一声明使用去重数的时候为str
|
||
interval_type = res['interval_type']
|
||
analysi = res['analysis']
|
||
groupby = res['groupby'] # 分组项
|
||
true_df = df.groupby(groupby).sum() # 按分组求和
|
||
group_label = res['group_label']
|
||
quota_interval_arr = res['quota_interval_arr']
|
||
# 兼容合计的
|
||
# if res['time_particle'] == 'total':
|
||
# if len(groupby) > 0:
|
||
# df['va'] = '合计'
|
||
|
||
if analysi != 'number_of_days' and interval_type != 'discrete':
|
||
# 默认区间
|
||
max_v = int(true_df['values'].max())
|
||
min_v = int(true_df['values'].min())
|
||
interval = (max_v - min_v) // 10 or 1
|
||
resp = {'list': dict(),
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle'],
|
||
'biaotou': groupby
|
||
}
|
||
|
||
# if 'float' in str(df.dtypes['va']):
|
||
# df['va'] = df['va'].astype(int)
|
||
# for index, gi in enumerate(groupby):
|
||
# resp['list'][str(index)] = dict()
|
||
# if 'float' in str(df.dtypes[gi]):
|
||
# df[gi] = df[gi].astype(int)
|
||
# if 'list' in str(type(df[gi][0])):
|
||
# f = lambda x: x[0]
|
||
# df[gi] = df[gi].map(f)
|
||
if not quota_interval_arr:
|
||
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
|
||
bins = [i for i in range(min_v, max_v + interval, interval)]
|
||
else:
|
||
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
||
resp['label'] = []
|
||
bins = [quota_interval_arr[0]]
|
||
for i, v in enumerate(quota_interval_arr[1:]):
|
||
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
||
bins.append(v)
|
||
# if 'float' in str(df.dtypes['va']):
|
||
# df['va'] = df['va'].astype(int)
|
||
# if 'list' in str(type(df['va'][0])):
|
||
# f = lambda x: x[0]
|
||
# df['va'] = df['va'].map(f)
|
||
# 这是分组的
|
||
for key, tmp_df in true_df.groupby(groupby):
|
||
|
||
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
||
right=True, include_lowest=True).value_counts()
|
||
bins_s.sort_index(inplace=True)
|
||
total = int(bins_s.sum())
|
||
if group_label:
|
||
if isinstance(key, str):
|
||
key = [key]
|
||
key = list(key)
|
||
for name, idx in group_label.items():
|
||
key.insert(idx, name)
|
||
key = str(key)
|
||
if res['time_particle'] == 'total111':
|
||
resp['list']['合计'] = dict()
|
||
|
||
resp['list']['合计'] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': round(bins_s * 100 / total, 2).to_list(),
|
||
'title': '总体'}
|
||
else:
|
||
p = round(bins_s * 100 / total, 2).to_list()
|
||
for i in range(len(p)):
|
||
if str(p[i]) == 'nan':
|
||
p[i] = 0
|
||
# 映射对应的埋点数据
|
||
# re = await crud.select_map.get_list(db, game)
|
||
# re_list = [i['attr_name'] for i in re]
|
||
# if gi in re_list:
|
||
# for i in re:
|
||
# if gi == i['attr_name']:
|
||
# for datas in i['map_']:
|
||
# if key == datas['id']:
|
||
# key = datas['title']
|
||
# break
|
||
# break
|
||
# if 'time' not in groupby:
|
||
resp['list'][str(key)] = dict()
|
||
resp['list'][str(key)] = {'n': bins_s.to_list(), 'total': total,
|
||
'p': [str(i) + '%' for i in p],
|
||
'title': '总体'}
|
||
# else:
|
||
# resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict()
|
||
# resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total,
|
||
# 'p': [str(i) + '%' for i in p],
|
||
# 'title': '总体'}
|
||
# 兼容下载功能
|
||
download = analysis.event_view.get('download', '')
|
||
if download == 1:
|
||
create_df = create_neidf(resp, columnName)
|
||
Download = Download_xlsx(create_df, '分布分析')
|
||
return Download
|
||
|
||
if group_label:
|
||
for name, idx in group_label.items():
|
||
resp['biaotou'].insert(idx, name)
|
||
groupby_data = []
|
||
for i in resp['list'].keys():
|
||
ac = ast.literal_eval(i)
|
||
ab = [str(ii) for ii in ac]
|
||
groupby_data.append(ab)
|
||
# if not isinstance(ac,int):
|
||
# ab = [str(ii) for ii in ac]
|
||
# groupby_data.append(ab)
|
||
# else:
|
||
# groupby_data.append(ac)
|
||
resp['groupby_data'] = groupby_data
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
else:
|
||
# 离散数字
|
||
resp = {'list': {}, 'label': [],
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle'],
|
||
'biaotou': groupby
|
||
}
|
||
labels = [str(i) for i in sorted(true_df['values'].unique())]
|
||
resp['label'] = labels
|
||
# for index, gi in enumerate(groupby):
|
||
# resp['list'][str(index)] = dict()
|
||
# if 'list' in str(type(df[gi][0])):
|
||
# f = lambda x: x[0]
|
||
# df[gi] = df[gi].map(f)
|
||
shaixuan = analysis.events[0].get('analysis')
|
||
for key, tmp_df in true_df.groupby(groupby):
|
||
if shaixuan == 'uniqExact':
|
||
total = len(set(tmp_df['uid']))
|
||
else:
|
||
total = len(tmp_df)
|
||
if res['time_particle'] == 'total11':
|
||
dt = '合计'
|
||
else:
|
||
# 映射对应的埋点数据
|
||
# re = await crud.select_map.get_list(db, game)
|
||
# re_list = [i['attr_name'] for i in re]
|
||
# if gi in re_list:
|
||
# for i in re:
|
||
# if gi == i['attr_name']:
|
||
# for datas in i['map_']:
|
||
# if key == datas['id']:
|
||
# key = datas['title']
|
||
# break
|
||
# break
|
||
dt = key
|
||
# dt = key.strftime('%Y-%m-%d')
|
||
# dt='合计'
|
||
|
||
# 存在标签分组
|
||
if group_label:
|
||
if isinstance(dt, str):
|
||
dt = [dt]
|
||
dt = list(dt)
|
||
for name, idx in group_label.items():
|
||
dt.insert(idx, name)
|
||
dt = str(dt)
|
||
labels_dict = {}
|
||
for key2, tmp_df2 in tmp_df.groupby('values'):
|
||
label = str(key2)
|
||
n = len(tmp_df2)
|
||
labels_dict[label] = n
|
||
if event_type == 'pay':
|
||
# 如需要2之后所有之和,则执行下面代码,返回值为字典的labels_dict01
|
||
labels_dict01 = {}
|
||
v = -1
|
||
for i in labels:
|
||
v += 1
|
||
if int(i) == 1:
|
||
labels_dict01["1"] = labels_dict["1"]
|
||
else:
|
||
# for number in labels_dict.keys():
|
||
# if number >=i:
|
||
values = list(labels_dict.values())
|
||
n = sum(values[v:])
|
||
labels_dict01[i] = n
|
||
# 传入百分比数据
|
||
list_p = []
|
||
for i in labels:
|
||
number_int = round(labels_dict01.get(i, 0) * 100 / total, 2)
|
||
number_str = str(number_int) + '%'
|
||
list_p.append(number_str)
|
||
|
||
resp['list'][str(dt)] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}
|
||
else:
|
||
list_p = []
|
||
for i in labels:
|
||
number_int = round(labels_dict.get(i, 0) * 100 / total, 2)
|
||
number_str = str(number_int) + '%'
|
||
list_p.append(number_str)
|
||
resp['list'][str(dt)] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
'p': list_p}
|
||
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
|
||
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
|
||
if where == "step_id" and event_type == "guide":
|
||
sql = f"""SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, count(DISTINCT {game}.event."#account_id") AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{start_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = 'create_account' GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date"""
|
||
df = await ckdb.query_dataframe(sql)
|
||
for i in range(len(df)):
|
||
resp['list'][str(df['date'][i])]['total'] = int(df['values'][i])
|
||
# 兼容下载功能
|
||
download = analysis.event_view.get('download', '')
|
||
if download == 1:
|
||
create_df = create_neidf(resp, columnName)
|
||
Download = Download_xlsx(create_df, '分布分析')
|
||
return Download
|
||
|
||
if group_label:
|
||
for name, idx in group_label.items():
|
||
resp['biaotou'].insert(idx, name)
|
||
groupby_data = []
|
||
for i in resp['list'].keys():
|
||
ac = ast.literal_eval(i)
|
||
ab = [str(ii) for ii in ac]
|
||
groupby_data.append(ab)
|
||
resp['groupby_data'] = groupby_data
|
||
return schemas.Msg(code=0, msg='ok', data=resp)
|
||
else:
|
||
return schemas.Msg(code=-9, msg='没有添加分组项', data='')
|
||
|
||
|
||
@router.post("/trace_model_sql")
|
||
async def trace_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""路径分析 sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = await analysis.trace_model_sql()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/trace_model")
|
||
async def trace_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""路径分析"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.trace_model_sql()
|
||
sql = res['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
chain_dict = defaultdict(dict)
|
||
event_num_dict = {}
|
||
event_next_event = {}
|
||
nodes = {'流失'}
|
||
name_list = analysis.events['event_namesdes'] # 参与分析的事件
|
||
name_dict = {}
|
||
for i in name_list:
|
||
name_dict[i['event_name']] = i['event_desc']
|
||
for event_names, count in zip(df['event_chain'], df['values']):
|
||
fmt_keys = []
|
||
chain_len = len(event_names)
|
||
for i, event_name in enumerate(event_names):
|
||
if i >= 10:
|
||
continue
|
||
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
|
||
# 按对应的中文名显示
|
||
event_namess = name_dict.get(event_name, event_name)
|
||
next_eventss = name_dict.get(next_event, next_event)
|
||
key = (f'{event_namess}-{i}', f'{next_eventss}-{i + 1}')
|
||
# key = (f'{event_name}', f'{next_event}')
|
||
nodes.update(key)
|
||
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
|
||
keys = list(key)
|
||
for true_key in keys:
|
||
if true_key in fmt_keys:
|
||
continue
|
||
if true_key in event_num_dict:
|
||
event_num_dict[true_key] += count
|
||
else:
|
||
event_num_dict[true_key] = count
|
||
fmt_keys.append(true_key)
|
||
|
||
# 检测事件的后续事件有哪些
|
||
if keys[0] in event_next_event:
|
||
event_next_event[keys[0]].append(keys[1])
|
||
event_next_event[keys[0]] = list(set(event_next_event[keys[0]]))
|
||
else:
|
||
event_next_event[keys[0]] = [keys[1]]
|
||
|
||
links = []
|
||
for _, items in chain_dict.items():
|
||
for keys, val in items.items():
|
||
links.append({
|
||
"source": keys[0],
|
||
"target": keys[1],
|
||
"value": val
|
||
})
|
||
node = [item for item in nodes]
|
||
node.sort()
|
||
# 按固定的首尾排序
|
||
first = []
|
||
trail = []
|
||
nodes = []
|
||
for i in node:
|
||
if analysis.events['source_event']['eventDesc'] in i:
|
||
first.append(i)
|
||
elif '流失' in i:
|
||
trail.append(i)
|
||
else:
|
||
nodes.append(i)
|
||
first.sort(reverse=True) # 排序
|
||
for i in first:
|
||
nodes.insert(0, i)
|
||
for i in trail:
|
||
nodes.append(i)
|
||
# 处理event_next_event
|
||
event_new_next = {}
|
||
for key, key_list in event_next_event.items():
|
||
new_key_list = []
|
||
for key1 in key_list:
|
||
new_key_list.append({'event_name': key1, 'value': event_num_dict[key1]})
|
||
event_new_next[key] = new_key_list
|
||
|
||
data = {
|
||
# 'nodes': [{'name': item} for item in nodes],
|
||
'nodes': [{'name': item} for item in nodes],
|
||
'links': links,
|
||
'event_num': event_num_dict,
|
||
'event_next': event_new_next,
|
||
'start_date': res['start_date'],
|
||
'end_date': res['end_date'],
|
||
'time_particle': res['time_particle']
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=data)
|
||
|
||
|
||
@router.post("/trace_user_info_model")
|
||
async def trace_model_sql(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""路径分析用户详情"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = await analysis.trace_model_sql()
|
||
sql = res['sql']
|
||
name_list = analysis.events['event_namesdes']
|
||
name_dict = {}
|
||
for i in name_list:
|
||
name_dict[i['event_name']] = i['event_desc']
|
||
# 获取事件对应的uid
|
||
event_uid_list = await ckdb.query_data_trace(sql, name_dict)
|
||
if not event_uid_list:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
event_name = analysis.events['event_name']
|
||
page = analysis.events['page']
|
||
event_next = analysis.events['event_next']
|
||
starindex = (page - 1) * 10
|
||
all_uid_list = []
|
||
|
||
# 后续事件统计的用户详情
|
||
if event_name.startswith('follow'):
|
||
for event_next_list in event_next:
|
||
true_event_name = event_next_list['event_name']
|
||
if true_event_name.startswith('流失'):
|
||
continue
|
||
all_uid_list += event_uid_list[true_event_name]
|
||
all_uid_list = list(set(all_uid_list))
|
||
|
||
# 更多事件的用户详情
|
||
elif event_name.startswith('more'):
|
||
# 后续事件排序后取第8个开始
|
||
event_next_true_eventlist = sorted(event_next, key=lambda x: x['value'], reverse=True)[8:]
|
||
for event_next_list in event_next_true_eventlist:
|
||
true_event_name = event_next_list['event_name']
|
||
all_uid_list += event_uid_list[true_event_name]
|
||
all_uid_list = list(set(all_uid_list))
|
||
|
||
# 单个节点的用户详情
|
||
else:
|
||
all_uid_list = event_uid_list[event_name]
|
||
|
||
user_num = len(all_uid_list)
|
||
account_id = all_uid_list[starindex:10 * page]
|
||
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
|
||
df1 = await ckdb.query_dataframe(new_sql)
|
||
new_values = df1.values.tolist()
|
||
for i in range(len(new_values)):
|
||
if str(new_values[i][6]) == 'nan':
|
||
new_values[i][6] = 0
|
||
res = {
|
||
'user_num': user_num,
|
||
'details_data': {
|
||
'new_columns': df1.columns.tolist(),
|
||
'new_values': new_values
|
||
}}
|
||
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/user_property_model_sql")
|
||
async def user_property_sql(
|
||
request: Request,
|
||
game: str,
|
||
analysis: UserAnalysis = Depends(UserAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""用户属性sql"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = analysis.property_model()
|
||
return schemas.Msg(code=0, msg='ok', data=[data])
|
||
|
||
|
||
@router.post("/user_property_model_export")
|
||
async def user_property_model_export(
|
||
request: Request,
|
||
game: str,
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
analysis: UserAnalysis = Depends(UserAnalysis),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
):
|
||
"""用户属性 导出"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
data = analysis.property_model()
|
||
file_name = quote(f'用户属性.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
|
||
sql = data['sql']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
df_to_stream = DfToStream((df, '用户属性'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/user_property_model")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
analysis: UserAnalysis = Depends(UserAnalysis),
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""用户属性分析"""
|
||
await analysis.init(data_where=current_user.data_where)
|
||
res = analysis.property_model()
|
||
sql = res['sql']
|
||
quota = res['quota']
|
||
groupby = res['groupby']
|
||
df = await ckdb.query_dataframe(sql)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data=None)
|
||
|
||
# 没有分组
|
||
data = {}
|
||
|
||
if not groupby:
|
||
data['总体'] = int(df['values'][0])
|
||
title = ['总体', quota]
|
||
|
||
else:
|
||
sum_s = df.groupby(groupby)['values'].sum()
|
||
for key, val in sum_s.items():
|
||
if isinstance(key, tuple):
|
||
key = ','.join([str(i) for i in key])
|
||
else:
|
||
key = str(key)
|
||
data[key] = val
|
||
title = ['.'.join(groupby), quota]
|
||
value=list(data.values())
|
||
return schemas.Msg(code=0, msg='ok', data={
|
||
'value': value,
|
||
'title': title,
|
||
'key':list(data.keys())
|
||
})
|
||
|
||
|
||
@router.post("/seek_user")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_seek_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
) -> schemas.Msg:
|
||
"""游戏用户搜索功能"""
|
||
# 判断的内容
|
||
data = data_in.condition
|
||
# 需要判断的字段
|
||
ziduan = data_in.user_arrt_title
|
||
# 筛选条件
|
||
tiaojian = data_in.comparator_id
|
||
if tiaojian == '==':
|
||
tiaojian = '='
|
||
# 判断是否是时间类型
|
||
if data_in.user_arrt_type == 'datetime':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
|
||
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(
|
||
data_in.pages - 1) * 10}"""
|
||
# 如果查询'#account_id',则不多余返回一个account_id
|
||
elif ziduan == '#account_id':
|
||
sql = f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(
|
||
data_in.pages - 1) * 10} """
|
||
elif data_in.user_arrt_type == 'int':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time` LIMIT 10 OFFSET {(
|
||
data_in.pages - 1) * 10}"""
|
||
else:
|
||
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(
|
||
data_in.pages - 1) * 10}"""
|
||
# 查询数据
|
||
try:
|
||
df = await ckdb.query_dataframe(sql)
|
||
except Exception as e:
|
||
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='查无数据')
|
||
# 转换成列表返回
|
||
df.fillna(0, inplace=True)
|
||
account_id = list(df['#account_id'])
|
||
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
|
||
df1 = await ckdb.query_dataframe(new_sql) # 按对应用户查询数据
|
||
new_values = df1.values.tolist() # 把所有值按列表返回
|
||
for i in range(len(new_values)):
|
||
if str(new_values[i][6]) == 'nan':
|
||
new_values[i][6] = 0
|
||
res = {'refer': {
|
||
'columns': df.columns.tolist(),
|
||
'values': df.values.tolist()
|
||
},
|
||
'details_data': {
|
||
'new_columns': df1.columns.tolist(),
|
||
'new_values': new_values
|
||
}}
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/seek_user_count")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_seek_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
) -> schemas.Msg:
|
||
"""游戏用户搜索功能查询到的数量"""
|
||
# 判断的内容
|
||
data = data_in.condition
|
||
# 需要判断的字段
|
||
ziduan = data_in.user_arrt_title
|
||
# 筛选条件
|
||
tiaojian = data_in.comparator_id
|
||
if tiaojian == '==':
|
||
tiaojian = '='
|
||
# 判断是否是时间类型
|
||
if data_in.user_arrt_type == 'datetime':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
|
||
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
|
||
# 如果查询'#account_id',则不多余返回一个account_id
|
||
elif ziduan == '#account_id':
|
||
sql = f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
|
||
elif data_in.user_arrt_type == 'int':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
|
||
else:
|
||
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
|
||
# 查询数据
|
||
try:
|
||
df = await ckdb.query_dataframe(sql)
|
||
except Exception as e:
|
||
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
|
||
# 返回查询到的数量
|
||
res = len(df)
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/download_user")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_seek_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
):
|
||
"""下载查询到的所有数据"""
|
||
# 判断的内容
|
||
data = data_in.condition
|
||
# 需要判断的字段
|
||
ziduan = data_in.user_arrt_title
|
||
# 筛选条件
|
||
tiaojian = data_in.comparator_id
|
||
if tiaojian == '==':
|
||
tiaojian = '='
|
||
# 判断是否是时间类型
|
||
if data_in.user_arrt_type == 'datetime':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
|
||
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
|
||
# 如果查询'#account_id',则不多余返回一个account_id
|
||
elif ziduan == '#account_id':
|
||
sql = f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
|
||
elif data_in.user_arrt_type == 'int':
|
||
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
|
||
else:
|
||
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
|
||
# 查询数据
|
||
try:
|
||
df = await ckdb.query_dataframe(sql)
|
||
except Exception as e:
|
||
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
|
||
if df.empty:
|
||
return schemas.Msg(code=-9, msg='无数据', data='')
|
||
account_id = list(df['#account_id'])
|
||
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
|
||
df1 = await ckdb.query_dataframe(new_sql)
|
||
file_name = quote(f'下载的用户搜索数据.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
df_to_stream = DfToStream((df1, '下载的用户搜索数据'))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
|
||
|
||
@router.post("/solo_user")
|
||
async def user_property_model(
|
||
request: Request,
|
||
game: str,
|
||
data_in: schemas.Ck_solo_user,
|
||
ckdb: CKDrive = Depends(get_ck_db)
|
||
):
|
||
"""用户的详情"""
|
||
if data_in.event_list == []:
|
||
return schemas.Msg(code=-9, msg='请配置用户搜索模块事件', data=[])
|
||
event_dict = {}
|
||
for i in data_in.event_list:
|
||
event_dict[i['event']] = i['event_name']
|
||
# event_dict={'pay':'充值','create_account':'创建角色','login':'登录','ta_app_end':'离开游戏','guide':'新手引导','level_up':'玩家等级',
|
||
# 'vip_level':'vip等级','sign':'签到','summon':'招募','ask_for_join_guild':'加入联盟','leave_guild':'离开联盟','create_guild':'创建联盟'}
|
||
sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
|
||
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` = '{data_in.account_id}'"""
|
||
# 获取用户基本详情
|
||
df = await ckdb.query_dataframe(sql)
|
||
# 获取用户每天事件量
|
||
start_times = data_in.start_time.split(' ')[0]
|
||
end_times = data_in.end_time.split(' ')[0]
|
||
event = list(event_dict.keys())
|
||
sql1 = f"""select toDate(addHours(`#event_time`, `#zone_offset`)) as date,count(`#event_name`) as v from {game}.event
|
||
where `date`>='{start_times}' and `date`<='{end_times}'
|
||
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by date ORDER by date"""
|
||
df1 = await ckdb.query_dataframe(sql1)
|
||
# 时间间隔天数
|
||
global event_values, data_list, game_details, zhanbi
|
||
if len(df1) > 0:
|
||
time_interval = getEveryDay(start_times, end_times)
|
||
a = list(df1['date'])
|
||
aa = []
|
||
for i in a:
|
||
aa.append(str(i))
|
||
for i in time_interval:
|
||
if i not in aa:
|
||
df1.loc[len(df1.index)] = [i, 0]
|
||
df1[['date']] = df1[['date']].astype(str)
|
||
df1.sort_values('date', inplace=True)
|
||
data_list = list(df1['date'])
|
||
event_values = list(df1['v'])
|
||
else:
|
||
data_list = [] # getEveryDay(start_times,end_times)
|
||
event_values = []
|
||
# 获取用户事件的详情
|
||
sql2 = f"""select * FROM {game}.event WHERE `#account_id`='{data_in.account_id}' and addHours(`#event_time`, `#zone_offset`) >='{data_in.start_time}' and
|
||
addHours(`#event_time`, `#zone_offset`) <= '{data_in.end_time}' and `#event_name` in ({event}) order by `#event_time`"""
|
||
df2 = await ckdb.query_dataframe(sql2)
|
||
if len(df2) > 0:
|
||
game_details = {}
|
||
# 区分天数
|
||
days = list(df2['#event_time'])
|
||
day_set = set()
|
||
for i in days:
|
||
day_set.add(str(i).split(' ')[0])
|
||
# 总日期,一天的
|
||
day_list = list(day_set)
|
||
day_list.sort()
|
||
for day in day_list:
|
||
game_deta = []
|
||
for nu in range(len(df2)):
|
||
if day in str(df2['#event_time'][nu]):
|
||
# 详细时间
|
||
game_detail = {}
|
||
time_s = str(df2['#event_time'][nu]).split('+')[0]
|
||
game_detail['time'] = time_s.split(' ')[1]
|
||
game_detail['event'] = event_dict[df2['#event_name'][nu]]
|
||
a_list = []
|
||
# 获取df的字段名
|
||
columns = df2.columns.values
|
||
for col in columns:
|
||
a = str(df2[col][nu])
|
||
if a != 'None' and a != '' and a != 'nan' and a != '[]':
|
||
a_list.append({'title': col, 'val': a})
|
||
game_detail['xaingqing'] = a_list
|
||
game_deta.append(game_detail)
|
||
game_details[day] = game_deta
|
||
else:
|
||
game_details = {}
|
||
# event_count = await ckdb.yesterday_event_count(game)
|
||
# 求事件占比
|
||
sql3 = f"""select `#event_name` as a,count(`#event_name`) as v from {game}.event
|
||
where addHours(`#event_time`, `#zone_offset`)>='{data_in.start_time}' and addHours(`#event_time`, `#zone_offset`)<='{data_in.end_time}'
|
||
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by `#event_name`"""
|
||
df3 = await ckdb.query_dataframe(sql3)
|
||
if len(df3) > 0:
|
||
zhanbi = []
|
||
sums = sum(list(df1['v']))
|
||
numbers = 0
|
||
for i in range(len(df3)):
|
||
shuju = {}
|
||
shuju['name'] = event_dict[df3['a'][i]]
|
||
shuju['value'] = int(df3['v'][i])
|
||
# if i != len(df3)-1:
|
||
# number1=round(int(df3['v'][i]) / sums, 2)
|
||
# number=round(number1*100,2)
|
||
# numbers+=number
|
||
# shuju['zhanbi'] = str(number) + '%'
|
||
# else:
|
||
# shuju['zhanbi']=str(100-numbers) + '%'
|
||
zhanbi.append(shuju)
|
||
else:
|
||
zhanbi = []
|
||
res = {
|
||
'details_data': {
|
||
'new_columns': df.columns.tolist(),
|
||
'new_values': df.values.tolist()},
|
||
'event_count': {
|
||
'date': data_list,
|
||
'event_values': event_values
|
||
},
|
||
'details_user': game_details,
|
||
'proportion': zhanbi
|
||
}
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.get("/event_list")
|
||
async def event_list(
|
||
request: Request,
|
||
game: str,
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
ckdb: CKDrive = Depends(get_ck_db),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""个人详情中的事件列表"""
|
||
# 获取事件名
|
||
# event_list = await ckdb.distinct(game, 'event', '#event_name')
|
||
event_list = await crud.event_list.get_list(db, game)
|
||
if event_list == []:
|
||
return schemas.Msg(code=0, msg='请配置用户搜索模块事件', data=[])
|
||
else:
|
||
res = event_list[0]['details']
|
||
return schemas.Msg(code=0, msg='ok', data=res)
|
||
|
||
|
||
@router.post("/add_event_list")
|
||
async def add_select_map(
|
||
request: Request,
|
||
game: str,
|
||
file: bytes = File(...),
|
||
db: AsyncIOMotorDatabase = Depends(get_database),
|
||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||
) -> schemas.Msg:
|
||
"""添加对应游戏事件选择映射"""
|
||
dfs = pd.read_excel(file, engine='openpyxl', sheet_name=None)
|
||
for attr_name, df in dfs.items():
|
||
# 将id这列转换成字符串类型
|
||
if len(df) > 0:
|
||
ColNames = df.columns.tolist()
|
||
event = df.to_dict('records')
|
||
details = []
|
||
for i in event:
|
||
details_dict = {}
|
||
details_dict['event'] = i[ColNames[0]]
|
||
details_dict['event_name'] = i[ColNames[1]]
|
||
details.append(details_dict)
|
||
data_in = schemas.Event_list(game=game, details=details)
|
||
await crud.event_list.save(db, data_in)
|
||
return schemas.Msg(code=0, msg='ok', data=1)
|