799 lines
30 KiB
Python
799 lines
30 KiB
Python
import datetime
|
|
from collections import defaultdict
|
|
import mimetypes
|
|
from urllib.parse import quote
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from fastapi import APIRouter, Depends, Request
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
import crud, schemas
|
|
from common import *
|
|
|
|
from api import deps
|
|
from db import get_database
|
|
from db.ckdb import get_ck_db, CKDrive
|
|
from db.redisdb import get_redis_pool, RedisDrive
|
|
|
|
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
|
|
from models.user_analysis import UserAnalysis
|
|
from utils import DfToStream
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/sql")
|
|
async def query_sql(
|
|
request: Request,
|
|
game: str,
|
|
data_in: schemas.Sql,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""原 sql 查询 """
|
|
sql = data_in.sql
|
|
sql = sql.replace('$game', game)
|
|
data = await ckdb.execute(sql)
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/sql_export")
|
|
async def query_sql(
|
|
request: Request,
|
|
game: str,
|
|
data_in: schemas.Sql,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
):
|
|
"""sql 导出 """
|
|
file_name = quote(f'result.xlsx')
|
|
mime = mimetypes.guess_type(file_name)[0]
|
|
|
|
sql = data_in.sql
|
|
sql = sql.replace('$game', game)
|
|
df = await ckdb.query_dataframe(sql)
|
|
df_to_stream = DfToStream((df, 'result'))
|
|
with df_to_stream as d:
|
|
export = d.to_stream()
|
|
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
|
|
|
|
|
@router.post("/event_model_sql")
|
|
async def event_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
""" 事件分析模型 sql"""
|
|
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.event_model_sql()
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/event_model_export")
|
|
async def event_model_export(request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
):
|
|
""" 事件分析模型 数据导出"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
sqls = analysis.event_model_sql()
|
|
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
|
|
mime = mimetypes.guess_type(file_name)[0]
|
|
excels = []
|
|
for item in sqls:
|
|
if not item.get('combination_event'):
|
|
continue
|
|
sql = item['sql']
|
|
event_name = item['event_name']
|
|
df = await ckdb.query_dataframe(sql)
|
|
excels.append((df, event_name))
|
|
df_to_stream = DfToStream(*excels)
|
|
with df_to_stream as d:
|
|
export = d.to_stream()
|
|
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
|
|
|
|
|
#
|
|
# @router.get("/event_model_export")
|
|
# async def event_model_export(request: Request,
|
|
# game: str,
|
|
# report_id: str,
|
|
# ckdb: CKDrive = Depends(get_ck_db),
|
|
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
# current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
# ):
|
|
# """ 事件分析模型 数据导出"""
|
|
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
|
|
# await analysis.init(data_where=current_user.data_where)
|
|
# sqls = analysis.event_model_sql()
|
|
# res = []
|
|
# file_name = f'{sqls[0]["report_name"]}.xlsx'
|
|
# mime = mimetypes.guess_type(file_name)[0]
|
|
# for item in sqls[:1]:
|
|
# sql = item['sql']
|
|
# event_name = item['event_name']
|
|
# df = await ckdb.query_dataframe(sql)
|
|
# file = df_to_stream(df, event_name)
|
|
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
|
#
|
|
|
|
@router.post("/event_model")
|
|
async def event_model(
|
|
request: Request,
|
|
game: str,
|
|
data_in: schemas.CkQuery,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
rdb: RedisDrive = Depends(get_redis_pool),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
""" 事件分析"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
sqls = analysis.event_model_sql()
|
|
res = []
|
|
for item in sqls:
|
|
q = {
|
|
'groups': [],
|
|
'values': [],
|
|
'sum': [],
|
|
'avg': [],
|
|
'event_name': item['event_name'],
|
|
'format': item['format'],
|
|
'last_value': 0,
|
|
'start_date': item['start_date'],
|
|
'end_date': item['end_date'],
|
|
'time_particle': item['time_particle']
|
|
}
|
|
# 处理组合问题
|
|
if item.get('combination_event'):
|
|
combination_event = CombinationEvent(res, item.get('combination_event'), item['format'])
|
|
values, sum_, avg = combination_event.parse()
|
|
q['values'].append(values)
|
|
q['sum'].append(sum_)
|
|
q['avg'].append(avg)
|
|
q['date_range'] = item['date_range']
|
|
res.append(q)
|
|
continue
|
|
|
|
sql = item['sql']
|
|
groupby = item['groupby']
|
|
date_range = item['date_range']
|
|
q['date_range'] = date_range
|
|
df = await ckdb.query_dataframe(sql)
|
|
df.fillna(0, inplace=True)
|
|
if df.shape[0] == 0:
|
|
df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
|
|
# continue
|
|
# return schemas.Msg(code=0, msg='ok', data=[q])
|
|
if item['time_particle'] == 'total':
|
|
# for group, df_group in df.groupby(groupby):
|
|
# df_group.reset_index(drop=True, inplace=True)
|
|
q['groups'].append(groupby)
|
|
q['values'].append(df['values'].to_list())
|
|
q['sum'].append(round(float(df['values'].sum()), 2))
|
|
q['avg'].append(round(float(df['values'].mean()), 2))
|
|
for last_value in df['values'].values[::-1]:
|
|
if last_value > 0:
|
|
q['last_value'] = float(last_value)
|
|
break
|
|
if groupby:
|
|
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
|
|
else:
|
|
q['date_range'] = ['合计']
|
|
|
|
res.append(q)
|
|
continue
|
|
|
|
if groupby:
|
|
# 有分组
|
|
for group, df_group in df.groupby(groupby):
|
|
df_group.reset_index(drop=True, inplace=True)
|
|
q['groups'].append(str(group))
|
|
|
|
concat_data = []
|
|
for i in set(date_range) - set(df_group['date']):
|
|
if len(groupby) > 1:
|
|
concat_data.append((i, *group, 0))
|
|
else:
|
|
concat_data.append((i, group, 0))
|
|
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
|
|
df_group.sort_values('date', inplace=True)
|
|
q['values'].append(df_group['values'].to_list())
|
|
q['sum'].append(round(float(df_group['values'].sum()), 2))
|
|
q['avg'].append(round(float(df_group['values'].mean()), 2))
|
|
for last_value in df['values'].values[::-1]:
|
|
if last_value > 0:
|
|
q['last_value'] = float(last_value)
|
|
break
|
|
|
|
else:
|
|
# 无分组
|
|
concat_data = []
|
|
for i in set(date_range) - set(df['date']):
|
|
concat_data.append((i, 0))
|
|
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
|
df.sort_values('date', inplace=True)
|
|
if len(df) >= 2:
|
|
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
|
|
if len(df) >= 8:
|
|
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0
|
|
q['values'].append(df['values'].to_list())
|
|
for last_value in df['values'].values[::-1]:
|
|
if last_value > 0:
|
|
q['last_value'] = float(last_value)
|
|
break
|
|
q['sum'].append(round(float(df['values'].sum()), 2))
|
|
q['avg'].append(round(float(df['values'].mean()), 2))
|
|
|
|
# q['eventNameDisplay']=item['event_name_display']
|
|
res.append(q)
|
|
# 按总和排序
|
|
for item in res:
|
|
try:
|
|
if item['time_particle'] in ('P1D', 'P1W'):
|
|
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
|
|
elif item['time_particle'] in ('P1M',):
|
|
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
|
|
else:
|
|
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
|
|
except:
|
|
pass
|
|
|
|
sort_key = np.argsort(np.array(item['sum']))[::-1]
|
|
if item.get('groups'):
|
|
item['groups'] = np.array(item['groups'])[sort_key].tolist()
|
|
item['values'] = np.array(item['values'])[sort_key].tolist()
|
|
item['sum'] = np.array(item['sum'])[sort_key].tolist()
|
|
item['avg'] = np.array(item['avg'])[sort_key].tolist()
|
|
|
|
return schemas.Msg(code=0, msg='ok', data=res)
|
|
|
|
|
|
@router.post("/retention_model_sql")
|
|
async def retention_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""留存查询 sql"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.retention_model_sql2()
|
|
return schemas.Msg(code=0, msg='ok', data=[data])
|
|
|
|
|
|
@router.post("/retention_model")
|
|
async def retention_model(request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
await analysis.init(data_where=current_user.data_where)
|
|
res = analysis.retention_model_sql2()
|
|
sql = res['sql']
|
|
df = await ckdb.query_dataframe(sql)
|
|
if len(df) == 0:
|
|
return schemas.Msg(code=0, msg='无数据', data=None)
|
|
|
|
title = f'用户数'
|
|
date_range = res['date_range']
|
|
unit_num = res['unit_num']
|
|
filter_item_type = res['filter_item_type']
|
|
filter_item = res['filter_item']
|
|
df.set_index('reg_date', inplace=True)
|
|
for i in set(date_range) - set(df.index):
|
|
df.loc[i] = 0
|
|
df.sort_index(inplace=True)
|
|
days = [i for i in range(unit_num + 1)]
|
|
summary_values = {}
|
|
today = datetime.datetime.today().date()
|
|
|
|
for date, value in df.T.items():
|
|
tmp = summary_values.setdefault(date.strftime('%Y-%m-%d'), dict())
|
|
tmp['d0'] = int(value.cnt0)
|
|
tmp['p'] = []
|
|
tmp['n'] = []
|
|
tmp['p_outflow'] = []
|
|
tmp['n_outflow'] = []
|
|
for i in range((today - date).days + 1):
|
|
if i > unit_num:
|
|
break
|
|
p = float(getattr(value, f'p{i + 1}'))
|
|
n = int(getattr(value, f'cnt{i + 1}'))
|
|
p_outflow = round(100 - p, 2)
|
|
n_outflow = value.cnt0 - n
|
|
tmp['p'].append(p)
|
|
tmp['n'].append(n)
|
|
tmp['p_outflow'].append(p_outflow)
|
|
tmp['n_outflow'].append(n_outflow)
|
|
|
|
resp = {
|
|
'summary_values': summary_values,
|
|
# 'values': values,
|
|
'days': days,
|
|
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
|
|
'title': title,
|
|
'filter_item_type': filter_item_type,
|
|
'filter_item': filter_item,
|
|
'start_date': res['start_date'],
|
|
'end_date': res['end_date'],
|
|
'time_particle': res['time_particle']
|
|
|
|
}
|
|
return schemas.Msg(code=0, msg='ok', data=resp)
|
|
|
|
|
|
@router.post("/retention_model_export")
|
|
async def retention_model_export(request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
):
|
|
""" 留存分析模型 数据导出"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.retention_model_sql2()
|
|
file_name = quote(f'留存分析.xlsx')
|
|
mime = mimetypes.guess_type(file_name)[0]
|
|
|
|
sql = data['sql']
|
|
df = await ckdb.query_dataframe(sql)
|
|
df_to_stream = DfToStream((df, '留存分析'))
|
|
with df_to_stream as d:
|
|
export = d.to_stream()
|
|
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
|
|
|
|
|
@router.post("/retention_model_del", deprecated=True)
|
|
async def retention_model_del(
|
|
request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""留存数据模型"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
res = analysis.retention_model_sql()
|
|
sql = res['sql']
|
|
date_range = res['date_range']
|
|
event_a, event_b = res['event_name']
|
|
unit_num = res['unit_num']
|
|
title = await crud.event_mana.get_show_name(db, game, event_a)
|
|
title = f'{title}用户数'
|
|
df = await ckdb.query_dataframe(sql)
|
|
concat_data = []
|
|
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
|
df['date'] = df['date'].apply(lambda x: x.date())
|
|
# 计算整体
|
|
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
|
|
summary_values = {}
|
|
for i, d1 in enumerate(date_range):
|
|
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
|
|
if not a:
|
|
continue
|
|
key = d1.strftime('%Y-%m-%d')
|
|
for j, d2 in enumerate(date_range[i:]):
|
|
if j > unit_num:
|
|
break
|
|
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
|
|
tmp = summary_values.setdefault(key, {})
|
|
tmp.setdefault('d0', len(a))
|
|
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
|
tmp.setdefault('n', []).append(len(a & b))
|
|
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
|
|
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
|
|
groups = set([tuple(i) for i in df[res['groupby']].values])
|
|
df.set_index(res['groupby'], inplace=True)
|
|
df.sort_index(inplace=True)
|
|
values = {}
|
|
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
|
|
for i, d1 in enumerate(date_range):
|
|
for g in groups:
|
|
if len(g) == 1:
|
|
continue
|
|
a = set(df.loc[g]['val_a']) if g in df.index else set()
|
|
if not a:
|
|
continue
|
|
key = d1.strftime("%Y-%m-%d")
|
|
tmp_g = values.setdefault(key, {})
|
|
for j, d2 in enumerate(date_range[i:]):
|
|
if j > unit_num:
|
|
break
|
|
b = set(df.loc[g]['val_b']) if g in df.index else set()
|
|
tmp = tmp_g.setdefault(','.join(g[1:]), {})
|
|
tmp.setdefault('d0', len(a))
|
|
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
|
tmp.setdefault('n', []).append(len(a & b))
|
|
data = {
|
|
'summary_values': summary_values,
|
|
'values': values,
|
|
'days': days,
|
|
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
|
|
'title': title,
|
|
'start_date': res['start_date'],
|
|
'end_date': res['end_date'],
|
|
'time_particle': res['time_particle']
|
|
}
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/funnel_model_sql")
|
|
async def funnel_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""漏斗数据模型 sql"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.funnel_model_sql()
|
|
return schemas.Msg(code=0, msg='ok', data=[data])
|
|
|
|
|
|
@router.post("/funnel_model")
|
|
async def funnel_model(
|
|
request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""漏斗数据模型"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
res = analysis.funnel_model_sql()
|
|
sql = res['sql']
|
|
date_range = res['date_range']
|
|
cond_level = res['cond_level']
|
|
groupby = res['groupby']
|
|
|
|
df = await ckdb.query_dataframe(sql)
|
|
# 补齐level数据
|
|
concat_data = []
|
|
for key, tmp_df in df.groupby(['date'] + groupby):
|
|
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
|
|
for item in not_exists_level:
|
|
key = key if isinstance(key, tuple) else (key,)
|
|
concat_data.append((*key, item, 0))
|
|
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
|
|
|
# df.set_index('date',inplace=True)
|
|
data_list = []
|
|
date_data = {}
|
|
if df.shape == (0, 0):
|
|
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
|
|
|
|
tmp = {'title': '总体'}
|
|
tmp_df = df[['level', 'values']].groupby('level').sum()
|
|
tmp_df.sort_index(inplace=True)
|
|
for i in tmp_df.index:
|
|
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
|
|
|
tmp['n'] = tmp_df['values'].to_list()
|
|
tmp['p1'] = [100]
|
|
# tmp['p2'] = []
|
|
for i, v in tmp_df.loc[2:, 'values'].items():
|
|
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
|
|
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
|
data_list.append(tmp)
|
|
|
|
# 补齐日期
|
|
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
|
|
concat_data = []
|
|
for i in all_idx - set(df.set_index(['date', 'level']).index):
|
|
concat_data.append((*i, 0))
|
|
summary_df = pd.concat(
|
|
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
|
|
for key, tmp_df in summary_df.groupby('date'):
|
|
tmp_df = tmp_df.groupby('level').sum()
|
|
tmp_df.sort_index(inplace=True)
|
|
for i in tmp_df.index:
|
|
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
|
|
|
tmp = dict()
|
|
|
|
tmp['n'] = tmp_df['values'].to_list()
|
|
tmp['p1'] = [100]
|
|
# tmp['p2'] = []
|
|
for i, v in tmp_df.loc[2:, 'values'].items():
|
|
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
|
var = 0 if np.isnan(var) else var
|
|
tmp['p1'].append(var)
|
|
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
|
|
|
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
|
|
_['总体'] = tmp
|
|
|
|
if groupby:
|
|
# 补齐数据
|
|
concat_data = []
|
|
idx = set(df.set_index(['date'] + groupby).index)
|
|
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
|
|
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
|
|
concat_data.append((*i, 0))
|
|
|
|
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
|
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
|
|
|
|
for key, tmp_df in df.groupby(groupby):
|
|
tmp = {'title': key}
|
|
tmp_df = tmp_df.groupby('level').sum()
|
|
tmp_df.sort_index(inplace=True)
|
|
for i in tmp_df.index:
|
|
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
|
|
|
tmp['n'] = tmp_df['values'].to_list()
|
|
tmp['p1'] = [100]
|
|
# tmp['p2'] = []
|
|
for i, v in tmp_df.loc[2:, 'values'].items():
|
|
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
|
var = 0 if np.isnan(var) else var
|
|
tmp['p1'].append(var)
|
|
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
|
data_list.append(tmp)
|
|
|
|
for key, tmp_df in df.groupby(['date'] + groupby):
|
|
|
|
tmp_df = tmp_df.groupby('level').sum()
|
|
tmp_df.sort_index(inplace=True)
|
|
for i in tmp_df.index:
|
|
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
|
|
|
|
tmp = dict()
|
|
|
|
tmp['n'] = tmp_df['values'].to_list()
|
|
tmp['p1'] = [100]
|
|
# tmp['p2'] = []
|
|
for i, v in tmp_df.loc[2:, 'values'].items():
|
|
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
|
|
var = 0 if np.isnan(var) else var
|
|
tmp['p1'].append(var)
|
|
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
|
|
|
|
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
|
|
_[key[1]] = tmp
|
|
title = (groupby or ['总体']) + cond_level
|
|
resp = {'list': data_list,
|
|
'date_data': date_data,
|
|
'title': title,
|
|
'level': cond_level,
|
|
'start_date': res['start_date'],
|
|
'end_date': res['end_date'],
|
|
'time_particle': res['time_particle']
|
|
}
|
|
return schemas.Msg(code=0, msg='ok', data=resp)
|
|
|
|
|
|
@router.post("/scatter_model_sql")
|
|
async def scatter_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""分布分析 sql"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.scatter_model_sql()
|
|
return schemas.Msg(code=0, msg='ok', data=[data])
|
|
|
|
|
|
@router.post("/scatter_model")
|
|
async def scatter_model(
|
|
request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""分布分析 模型"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
res = analysis.scatter_model_sql()
|
|
sql = res['sql']
|
|
df = await ckdb.query_dataframe(sql)
|
|
interval_type = res['interval_type']
|
|
analysis = res['analysis']
|
|
groupby = res['groupby']
|
|
quota_interval_arr = res['quota_interval_arr']
|
|
if analysis != 'number_of_days':
|
|
max_v = int(df['values'].max())
|
|
min_v = int(df['values'].min())
|
|
interval = (max_v - min_v) // 10 or 1
|
|
resp = {'list': dict(),
|
|
'start_date': res['start_date'],
|
|
'end_date': res['end_date'],
|
|
'time_particle': res['time_particle']
|
|
}
|
|
|
|
if not quota_interval_arr:
|
|
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
|
|
bins = [i for i in range(min_v, max_v + interval, interval)]
|
|
else:
|
|
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
|
|
resp['label'] = []
|
|
bins = [quota_interval_arr[0]]
|
|
for i, v in enumerate(quota_interval_arr[1:]):
|
|
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
|
|
bins.append(v)
|
|
|
|
# 这是整体的
|
|
for key, tmp_df in df.groupby('date'):
|
|
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
|
right=False).value_counts()
|
|
bins_s.sort_index(inplace=True)
|
|
total = int(bins_s.sum())
|
|
resp['list'][key.strftime('%Y-%m-%d')] = dict()
|
|
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
|
|
'p': round(bins_s * 100 / total, 2).to_list(),
|
|
'title': '总体'}
|
|
# 分组的
|
|
if groupby:
|
|
for key, tmp_df in df.groupby(['date', *groupby]):
|
|
bins_s = pd.cut(tmp_df['values'], bins=bins,
|
|
right=False).value_counts()
|
|
bins_s.sort_index(inplace=True)
|
|
total = int(bins_s.sum())
|
|
title = '.'.join(key[1:])
|
|
date = key[0]
|
|
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
|
|
'p': round(bins_s * 100 / total, 2).to_list(),
|
|
'title': title
|
|
}
|
|
|
|
return schemas.Msg(code=0, msg='ok', data=resp)
|
|
|
|
if interval_type == 'def' and analysis == 'number_of_days':
|
|
resp = {'list': {}}
|
|
for key, tmp_df in df.groupby('date'):
|
|
total = round(tmp_df['values'].sum(), 2)
|
|
resp['list'][key.strftime('%Y-%m-%d')] = {'n': total, 'total': total, 'p': 100}
|
|
return schemas.Msg(code=0, msg='ok', data=resp)
|
|
|
|
|
|
@router.post("/trace_model_sql")
|
|
async def trace_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""路径分析 sql"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.trace_model_sql()
|
|
return schemas.Msg(code=0, msg='ok', data=[data])
|
|
|
|
|
|
@router.post("/trace_model")
|
|
async def trace_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""路径分析"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
res = analysis.trace_model_sql()
|
|
sql = res['sql']
|
|
df = await ckdb.query_dataframe(sql)
|
|
chain_dict = defaultdict(dict)
|
|
nodes = {'流失'}
|
|
for event_names, count in zip(df['event_chain'], df['values']):
|
|
chain_len = len(event_names)
|
|
for i, event_name in enumerate(event_names):
|
|
if i >= 10:
|
|
continue
|
|
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
|
|
key = (f'{event_name}{i}', f'{next_event}{i + 1}')
|
|
nodes.update(key)
|
|
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
|
|
|
|
links = []
|
|
for _, items in chain_dict.items():
|
|
for keys, val in items.items():
|
|
links.append({
|
|
"source": keys[0],
|
|
"target": keys[1],
|
|
"value": val
|
|
})
|
|
# nodes = set()
|
|
# for item in links:
|
|
# nodes.update((
|
|
# item['source'],
|
|
# item['target'])
|
|
# )
|
|
data = {
|
|
'nodes': [{'name': item} for item in nodes],
|
|
'links': links,
|
|
'start_date': res['start_date'],
|
|
'end_date': res['end_date'],
|
|
'time_particle': res['time_particle']
|
|
}
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/user_property_model_sql")
|
|
async def user_property_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: UserAnalysis = Depends(UserAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""用户属性sql"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.property_model()
|
|
return schemas.Msg(code=0, msg='ok', data=[data])
|
|
|
|
|
|
@router.post("/user_property_model_export")
|
|
async def user_property_model_export(
|
|
request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
analysis: UserAnalysis = Depends(UserAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
):
|
|
"""用户属性 导出"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
data = analysis.property_model()
|
|
file_name = quote(f'用户属性.xlsx')
|
|
mime = mimetypes.guess_type(file_name)[0]
|
|
|
|
sql = data['sql']
|
|
df = await ckdb.query_dataframe(sql)
|
|
df_to_stream = DfToStream((df, '用户属性'))
|
|
with df_to_stream as d:
|
|
export = d.to_stream()
|
|
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
|
|
|
|
|
@router.post("/user_property_model")
|
|
async def user_property_model(
|
|
request: Request,
|
|
game: str,
|
|
analysis: UserAnalysis = Depends(UserAnalysis),
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""用户属性分析"""
|
|
await analysis.init(data_where=current_user.data_where)
|
|
res = analysis.property_model()
|
|
sql = res['sql']
|
|
quota = res['quota']
|
|
groupby = res['groupby']
|
|
df = await ckdb.query_dataframe(sql)
|
|
# 没有分组
|
|
data = {'groupby': groupby}
|
|
title = []
|
|
|
|
if not groupby:
|
|
data['总体'] = int(df['values'][0])
|
|
title = ['总体', quota]
|
|
|
|
else:
|
|
sum_s = df.groupby(groupby)['values'].sum()
|
|
data = dict()
|
|
for key, val in sum_s.items():
|
|
if isinstance(key, tuple):
|
|
key = ','.join([str(i) for i in key])
|
|
else:
|
|
key = str(key)
|
|
data[key] = val
|
|
title = ['.'.join(groupby), quota]
|
|
|
|
return schemas.Msg(code=0, msg='ok', data={
|
|
'value': data,
|
|
'title': title
|
|
})
|