xbackend/api/api_v1/endpoints/query.py
李伟 5c0e2bf60e 1.修改设备LTV计算公式
2.修改多次付费占比公式
3.优化超限范围的异常问题
4.修改下载多次付费占比数据没有百分比的问题
5.优化看板设置调整后提示文本异常问题
2021-12-13 13:59:08 +08:00

1072 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
from collections import defaultdict
import mimetypes
from urllib.parse import quote
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from fastapi.encoders import jsonable_encoder
from motor.motor_asyncio import AsyncIOMotorDatabase
from fastapi.responses import StreamingResponse
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
from models.user_analysis import UserAnalysis
from utils import DfToStream
router = APIRouter()
@router.post("/sql")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""原 sql 查询 """
sql = data_in.sql
sql = sql.replace('$game', game)
data = await ckdb.execute(sql)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/sql_export")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""sql 导出 """
file_name = quote(f'result.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data_in.sql
sql = sql.replace('$game', game)
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df_to_stream = DfToStream((df, 'result'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/event_model_sql")
async def event_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model_export")
async def event_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 事件分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
sqls = await analysis.event_model_sql()
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
excels = []
for item in sqls:
if item.get('combination_event'):
continue
sql = item['sql']
event_name = item['event_name']
df = await ckdb.query_dataframe(sql)
if df.empty:
continue
if 'date' in df:
df.sort_values('date', inplace=True)
try:
df['date'] = df['date'].dt.tz_localize(None)
except:
pass
excels.append((df, event_name))
df_to_stream = DfToStream(*excels)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
# @router.get("/event_model_export")
# async def event_model_export(request: Request,
# game: str,
# report_id: str,
# ckdb: CKDrive = Depends(get_ck_db),
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
# current_user: schemas.UserDB = Depends(deps.get_current_user)
# ):
# """ 事件分析模型 数据导出"""
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
# await analysis.init(data_where=current_user.data_where)
# sqls = analysis.event_model_sql()
# res = []
# file_name = f'{sqls[0]["report_name"]}.xlsx'
# mime = mimetypes.guess_type(file_name)[0]
# for item in sqls[:1]:
# sql = item['sql']
# event_name = item['event_name']
# df = await ckdb.query_dataframe(sql)
# file = df_to_stream(df, event_name)
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
@router.post("/event_model")
async def event_model(
request: Request,
game: str,
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析"""
await analysis.init(data_where=current_user.data_where)
sqls = await analysis.event_model_sql()
res = []
is_hide = []
for idx, item in enumerate(sqls): #列出索引下标
if item.get('is_show') == False:
is_hide.append(idx)
#event_name:事件名,日充总额
#formatfloat浮点型
q = {
'groups': [],
'values': [],
'sum': [],
'avg': [],
'event_name': item['event_name'],
'format': item['format'],
'last_value': 0,
'start_date': item['start_date'],
'end_date': item['end_date'],
'time_particle': item['time_particle']
}
# 处理组合问题如combination_event不存在则跳过
if item.get('combination_event'):
combination_event = CombinationEvent(res, item.get('combination_event'), item['format'])
values, sum_, avg = combination_event.parse()
q['values'].append(values)
q['sum'].append(sum_)
q['avg'].append(avg)
q['date_range'] = item['date_range']
for last_value in values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
res.append(q)
continue
#sql语句
sql = item['sql']
groupby = item['groupby']
date_range = item['date_range'] #获取的要查询的每一天的时间
q['date_range'] = date_range #把要查询的时间加入q字典中
df = await ckdb.query_dataframe(sql) #以sql语句查出数据df是二维列表
df.fillna(0, inplace=True)#以0填补空数据
#获取第一矩阵的长度
if df.shape[0] == 0:
df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
# continue
# return schemas.Msg(code=0, msg='ok', data=[q])
if item['time_particle'] == 'total':
# for group, df_group in df.groupby(groupby):
# df_group.reset_index(drop=True, inplace=True)
q['groups'].append(groupby)
q['values'].append(df['values'].to_list())
q['sum'].append(round(float(df['values'].sum()), 2))
q['avg'].append(round(float(df['values'].mean()), 2))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
if groupby and (set(groupby) & set(df) == set(groupby)):
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
else:
q['date_range'] = ['合计']
res.append(q)
continue
if groupby and (set(groupby) & set(df)) == set(groupby):
# 有分组
for group, df_group in df.groupby(groupby):
#在原数据上将索引重新转换为列,新索引的列删除
df_group.reset_index(drop=True, inplace=True)
q['groups'].append(str(group))
concat_data = []
for i in set(date_range) - set(df_group['date']):
if len(groupby) > 1:
concat_data.append((i, *group, 0))
else:
concat_data.append((i, group, 0))
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
df_group.sort_values('date', inplace=True)
q['values'].append(df_group['values'].to_list())
q['sum'].append(round(float(df_group['values'].sum()), 2))
q['avg'].append(round(float(df_group['values'].mean()), 2))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
else:
# 无分组
concat_data = []
for i in set(date_range) - set(df['date']):
concat_data.append((i, 0))
#纵向拼接两个表
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
#在原数据上按data排序
df.sort_values('date', inplace=True)
if len(df) >= 2:
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
if len(df) >= 8:
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0
q['values'].append(df['values'].to_list())
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
#求所有值的和
q['sum'].append(round(float(df['values'].sum()), 2))
#求平均值
q['avg'].append(round(float(df['values'].mean()), 2))
# q['eventNameDisplay']=item['event_name_display']
res.append(q)
# 按总和排序
for item in res:
try:
if item['time_particle'] in ('P1D', 'P1W'): #按格式修改年月日
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
elif item['time_particle'] in ('P1M',):
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
else:
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
except:
pass
sort_key = np.argsort(np.array(item['sum']))[::-1]#将sum中的元素从小到大排列后的结果提取其对应的索引然后倒着输出到变量之中
if item.get('groups'):
item['groups'] = np.array(item['groups'])[sort_key].tolist()
item['values'] = np.array(item['values'])[sort_key].tolist()
item['sum'] = np.array(item['sum'])[sort_key].tolist()
item['avg'] = np.array(item['avg'])[sort_key].tolist()
res = [item for idx, item in enumerate(res) if idx not in is_hide]
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/retention_model_sql")
async def retention_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.retention_model_sql2()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/retention_model")
async def retention_model(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
res = await analysis.retention_model_sql2() #初始化开始时间结束时间sql语句 字典
sql = res['sql'] #获取到sql语句
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
date_range = res['date_range'] #时间 列表
unit_num = res['unit_num'] #int
retention_n = res['retention_n'] #列表 int
filter_item_type = res['filter_item_type'] #all
filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30
df.set_index('reg_date', inplace=True)
for d in set(res['date_range']) - set(df.index):
df.loc[d] = 0
df.sort_index(inplace=True)
summary_values = {'均值': {}}
max_retention_n = 1
avg = {}
avgo = {}
for date, v in df.T.items():
#字典中data存在时不替换否则将data替换成空字典
tmp = summary_values.setdefault(date, dict())
tmp['d0'] = int(v.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in retention_n:
n = (pd.Timestamp.now().date() - date).days
if i > n:
continue
# max_retention_n = i if i > max_retention_n else max_retention_n
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
tmp['p'].append(v[f'p{i}'])
tmp['n'].append(v[f'cnt{i}'])
tmp['p_outflow'].append(v[f'op{i}'])
tmp['n_outflow'].append(v[f'on{i}'])
tmp = summary_values['均值']
retention_avg_dict = {}
for rn in retention_n:
for rt, rd in df.T.items():
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0,'o_cnt0':0,'o_cntn':0})
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
tmp['d0'] = 0
for rt, rd in retention_avg_dict.items():
tmp['d0'] = int(df['cnt0'].sum())
n = round(rd['cntn'] * 100 / rd['cnt0'],2)
n = 0 if np.isnan(n) else n
tmp['p'].append(n)
tmp['n'].append(rd['cntn'])
n = round(rd['o_cntn'] * 100 / rd['cnt0'],2)
n = 0 if np.isnan(n) else n
tmp['p_outflow'].append(n)
tmp['n_outflow'].append(rd['o_cntn'])
title = ['日期', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]]
# 未到达的日期需要补齐-
retention_length = len(retention_n)
for _, items in summary_values.items():
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
items[key].extend(['-'] * (retention_length - len(items[key])))
resp = {
'summary_values': summary_values,
# 'values': values,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
'title': title,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/retention_model_export")
async def retention_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 留存分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.retention_model_sql2()
file_name = quote(f'留存分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df_to_stream = DfToStream((df, '留存分析'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/retention_model_del", deprecated=True)
async def retention_model_del(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
unit_num = res['unit_num']
title = await crud.event_mana.get_show_name(db, game, event_a)
title = f'{title}用户数'
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
concat_data = []
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df['date'] = df['date'].apply(lambda x: x.date())
# 计算整体
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
summary_values = {}
for i, d1 in enumerate(date_range):
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
if not a:
continue
key = d1.strftime('%Y-%m-%d')
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
tmp = summary_values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
groups = set([tuple(i) for i in df[res['groupby']].values])
df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True)
values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
for i, d1 in enumerate(date_range):
for g in groups:
if len(g) == 1:
continue
a = set(df.loc[g]['val_a']) if g in df.index else set()
if not a:
continue
key = d1.strftime("%Y-%m-%d")
tmp_g = values.setdefault(key, {})
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(df.loc[g]['val_b']) if g in df.index else set()
tmp = tmp_g.setdefault(','.join(g[1:]), {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
data = {
'summary_values': summary_values,
'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/funnel_model_sql")
async def funnel_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/funnel_model")
async def funnel_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.funnel_model_sql()
sql = res['sql']
#查询的时间
date_range = res['date_range']
cond_level = res['cond_level']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
# 补齐level数据
concat_data = []
for key, tmp_df in df.groupby(['date'] + groupby):
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
for item in not_exists_level:
key = key if isinstance(key, tuple) else (key,)
concat_data.append((*key, item, 0))
#合并数据
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.set_index('date',inplace=True)
data_list = []
date_data = {}
if df.shape == (0, 0):
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
tmp = {'title': '总体'}
#以level分组后的和
tmp_df = df[['level', 'values']].groupby('level').sum()
#在原数据上对索引进行排序
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
# 补齐日期
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
concat_data = []
for i in all_idx - set(df.set_index(['date', 'level']).index):
concat_data.append((*i, 0))
summary_df = pd.concat(
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
for key, tmp_df in summary_df.groupby('date'):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
_['总体'] = tmp
if groupby:
# 补齐数据
concat_data = []
idx = set(df.set_index(['date'] + groupby).index)
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
concat_data.append((*i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
for key, tmp_df in df.groupby(groupby):
tmp = {'title': key}
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
for key, tmp_df in df.groupby(['date'] + groupby):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
_[key[1]] = tmp
title = (groupby or ['总体']) + cond_level
resp = {'list': data_list,
'date_data': date_data,
'title': title,
'level': cond_level,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/scatter_model_sql")
async def scatter_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.scatter_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/scatter_model_export")
async def retention_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 分布分析 数据导出"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.scatter_model_sql()
file_name = quote(f'分布分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
# 兼容合计的
if res['time_particle'] == 'total':
df['date'] = '合计'
if analysis != 'number_of_days' and interval_type != 'discrete':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
if res['time_particle'] == 'total':
resp['list']['合计'] = dict()
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
else:
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
export_df = pd.DataFrame(columns=resp['label'])
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': title
}
export_df.loc[(date.strftime('%Y-%m-%d'), title)] = bins_s.to_list()
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime,
headers={'Content-Disposition': f'filename="{file_name}"'})
# elif analysis == 'number_of_days':
else:
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
labels = [str(i) for i in sorted(df['values'].unique())]
resp['label'] = labels
for key, tmp_df in df.groupby(['date']):
total = len(tmp_df)
if res['time_particle'] == 'total':
dt = '合计'
else:
dt = key.strftime('%Y-%m-%d')
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
export_df = pd.DataFrame(columns=resp['label'])
for d, v in resp['list'].items():
export_df.loc[d] = v['总体']['n']
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/scatter_model")
async def scatter_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.scatter_model_sql()
sql = res['sql']
#查询买量渠道owner为kuaiyou3的日注册玩家等级分布
# sql_list=sql.split("GROUP BY")
# sql01 = """and xiangsu.event.owner_name='kuaiyou3'GROUP BY"""""
# new_sql=sql_list[0]+sql01+sql_list[1]
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df.fillna(0, inplace=True)
#转换数据类型为int
df['values'] = df['values'].astype(int)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
# 兼容合计的
if res['time_particle'] == 'total':
df['date'] = '合计'
if analysis != 'number_of_days' and interval_type != 'discrete':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
if res['time_particle'] == 'total':
resp['list']['合计'] = dict()
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
else:
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round((bins_s * 100 / total).fillna(0),
2).to_list(),
'title': title
}
return schemas.Msg(code=0, msg='ok', data=resp)
# elif analysis == 'number_of_days':
else:
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
labels = [str(i) for i in sorted(df['values'].unique())]
resp['label'] = labels
for key, tmp_df in df.groupby(['date']):
total = len(tmp_df)
if res['time_particle'] == 'total':
dt = '合计'
else:
dt = key.strftime('%Y-%m-%d')
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
#如需要2之后所有之和则执行下面代码返回值为字典的labels_dict01
labels_dict01={}
v=-1
for i in labels:
v +=1
if int(i) == 1:
labels_dict01["1"]=labels_dict["1"]
else:
# for number in labels_dict.keys():
# if number >=i:
values=list(labels_dict.values())
n=sum(values[v:])
labels_dict01[i]=n
#传入百分比数据
list_p=[]
for i in labels:
number_int=round(labels_dict01.get(i, 0) * 100 / total, 2)
number_str=str(number_int)+'%'
list_p.append(number_str)
resp['list'][dt] = {'总体': {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
'p': list_p}}
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
return schemas.Msg(code=0, msg='ok', data=resp)
# bins_s = pd.cut(tmp_df['values'], bins=bins,
# right=False).value_counts()
# bins_s.sort_index(inplace=True)
# total = int(bins_s.sum())
# resp['list'][key.strftime('%Y-%m-%d')] = dict()
# resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
# 'p': round(bins_s * 100 / total, 2).to_list(),
# 'title': '总体'}
@router.post("/trace_model_sql")
async def trace_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.trace_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/trace_model")
async def trace_model_sql(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.trace_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
chain_dict = defaultdict(dict)
nodes = {'流失'}
for event_names, count in zip(df['event_chain'], df['values']):
chain_len = len(event_names)
for i, event_name in enumerate(event_names):
if i >= 10:
continue
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
key = (f'{event_name}{i}', f'{next_event}{i + 1}')
nodes.update(key)
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
links = []
for _, items in chain_dict.items():
for keys, val in items.items():
links.append({
"source": keys[0],
"target": keys[1],
"value": val
})
# nodes = set()
# for item in links:
# nodes.update((
# item['source'],
# item['target'])
# )
data = {
'nodes': [{'name': item} for item in nodes],
'links': links,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/user_property_model_sql")
async def user_property_sql(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.property_model()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/user_property_model_export")
async def user_property_model_export(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""用户属性 导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.property_model()
file_name = quote(f'用户属性.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df_to_stream = DfToStream((df, '用户属性'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/user_property_model")
async def user_property_model(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性分析"""
await analysis.init(data_where=current_user.data_where)
res = analysis.property_model()
sql = res['sql']
quota = res['quota']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
# 没有分组
data = {}
if not groupby:
data['总体'] = int(df['values'][0])
title = ['总体', quota]
else:
sum_s = df.groupby(groupby)['values'].sum()
for key, val in sum_s.items():
if isinstance(key, tuple):
key = ','.join([str(i) for i in key])
else:
key = str(key)
data[key] = val
title = ['.'.join(groupby), quota]
return schemas.Msg(code=0, msg='ok', data={
'value': data,
'title': title
})