xbackend/api/api_v1/endpoints/query.py
2021-10-13 15:29:51 +08:00

954 lines
37 KiB
Python

import datetime
from collections import defaultdict
import mimetypes
from urllib.parse import quote
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
from fastapi.responses import StreamingResponse
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
from models.user_analysis import UserAnalysis
from utils import DfToStream
router = APIRouter()
@router.post("/sql")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""原 sql 查询 """
sql = data_in.sql
sql = sql.replace('$game', game)
data = await ckdb.execute(sql)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/sql_export")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""sql 导出 """
file_name = quote(f'result.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data_in.sql
sql = sql.replace('$game', game)
df = await ckdb.query_dataframe(sql)
df_to_stream = DfToStream((df, 'result'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/event_model_sql")
async def event_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model_export")
async def event_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 事件分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
sqls = analysis.event_model_sql()
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
excels = []
for item in sqls:
if item.get('combination_event'):
continue
sql = item['sql']
event_name = item['event_name']
df = await ckdb.query_dataframe(sql)
if 'date' in df:
df.sort_values('date', inplace=True)
try:
df['date'] = df['date'].dt.tz_localize(None)
except:
pass
excels.append((df, event_name))
df_to_stream = DfToStream(*excels)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
# @router.get("/event_model_export")
# async def event_model_export(request: Request,
# game: str,
# report_id: str,
# ckdb: CKDrive = Depends(get_ck_db),
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
# current_user: schemas.UserDB = Depends(deps.get_current_user)
# ):
# """ 事件分析模型 数据导出"""
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
# await analysis.init(data_where=current_user.data_where)
# sqls = analysis.event_model_sql()
# res = []
# file_name = f'{sqls[0]["report_name"]}.xlsx'
# mime = mimetypes.guess_type(file_name)[0]
# for item in sqls[:1]:
# sql = item['sql']
# event_name = item['event_name']
# df = await ckdb.query_dataframe(sql)
# file = df_to_stream(df, event_name)
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
@router.post("/event_model")
async def event_model(
request: Request,
game: str,
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析"""
await analysis.init(data_where=current_user.data_where)
sqls = analysis.event_model_sql()
res = []
for item in sqls:
q = {
'groups': [],
'values': [],
'sum': [],
'avg': [],
'event_name': item['event_name'],
'format': item['format'],
'last_value': 0,
'start_date': item['start_date'],
'end_date': item['end_date'],
'time_particle': item['time_particle']
}
# 处理组合问题
if item.get('combination_event'):
combination_event = CombinationEvent(res, item.get('combination_event'), item['format'])
values, sum_, avg = combination_event.parse()
q['values'].append(values)
q['sum'].append(sum_)
q['avg'].append(avg)
q['date_range'] = item['date_range']
res.append(q)
continue
sql = item['sql']
groupby = item['groupby']
date_range = item['date_range']
q['date_range'] = date_range
df = await ckdb.query_dataframe(sql)
df.fillna(0, inplace=True)
if df.shape[0] == 0:
df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
# continue
# return schemas.Msg(code=0, msg='ok', data=[q])
if item['time_particle'] == 'total':
# for group, df_group in df.groupby(groupby):
# df_group.reset_index(drop=True, inplace=True)
q['groups'].append(groupby)
q['values'].append(df['values'].to_list())
q['sum'].append(round(float(df['values'].sum()), 2))
q['avg'].append(round(float(df['values'].mean()), 2))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
if groupby and (set(groupby) & set(df) == set(groupby)):
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
else:
q['date_range'] = ['合计']
res.append(q)
continue
if groupby and (set(groupby) & set(df)) == set(groupby):
# 有分组
for group, df_group in df.groupby(groupby):
df_group.reset_index(drop=True, inplace=True)
q['groups'].append(str(group))
concat_data = []
for i in set(date_range) - set(df_group['date']):
if len(groupby) > 1:
concat_data.append((i, *group, 0))
else:
concat_data.append((i, group, 0))
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
df_group.sort_values('date', inplace=True)
q['values'].append(df_group['values'].to_list())
q['sum'].append(round(float(df_group['values'].sum()), 2))
q['avg'].append(round(float(df_group['values'].mean()), 2))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
else:
# 无分组
concat_data = []
for i in set(date_range) - set(df['date']):
concat_data.append((i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df.sort_values('date', inplace=True)
if len(df) >= 2:
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
if len(df) >= 8:
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0
q['values'].append(df['values'].to_list())
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
q['sum'].append(round(float(df['values'].sum()), 2))
q['avg'].append(round(float(df['values'].mean()), 2))
# q['eventNameDisplay']=item['event_name_display']
res.append(q)
# 按总和排序
for item in res:
try:
if item['time_particle'] in ('P1D', 'P1W'):
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
elif item['time_particle'] in ('P1M',):
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
else:
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
except:
pass
sort_key = np.argsort(np.array(item['sum']))[::-1]
if item.get('groups'):
item['groups'] = np.array(item['groups'])[sort_key].tolist()
item['values'] = np.array(item['values'])[sort_key].tolist()
item['sum'] = np.array(item['sum'])[sort_key].tolist()
item['avg'] = np.array(item['avg'])[sort_key].tolist()
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/retention_model_sql")
async def retention_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.retention_model_sql2()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/retention_model")
async def retention_model(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
res = analysis.retention_model_sql2()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if len(df) == 0:
return schemas.Msg(code=0, msg='无数据', data=None)
title = f'用户数'
date_range = res['date_range']
unit_num = res['unit_num']
filter_item_type = res['filter_item_type']
filter_item = res['filter_item']
df.set_index('reg_date', inplace=True)
for i in set(date_range) - set(df.index):
df.loc[i] = 0
df.sort_index(inplace=True)
days = [i for i in range(1, unit_num + 1)]
summary_values = {}
today = datetime.datetime.today().date()
for date, value in df.T.items():
tmp = summary_values.setdefault(date.strftime('%Y-%m-%d'), dict())
tmp['d0'] = int(value.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in range(1, (today - date).days + 1):
if i > unit_num:
break
p = float(getattr(value, f'p{i + 1}'))
n = int(getattr(value, f'cnt{i + 1}'))
p_outflow = round(100 - p, 2)
n_outflow = value.cnt0 - n
tmp['p'].append(p)
tmp['n'].append(n)
tmp['p_outflow'].append(p_outflow)
tmp['n_outflow'].append(n_outflow)
resp = {
'summary_values': summary_values,
# 'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/retention_model_export")
async def retention_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 留存分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.retention_model_sql2()
file_name = quote(f'留存分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
df_to_stream = DfToStream((df, '留存分析'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/retention_model_del", deprecated=True)
async def retention_model_del(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init(data_where=current_user.data_where)
res = analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
unit_num = res['unit_num']
title = await crud.event_mana.get_show_name(db, game, event_a)
title = f'{title}用户数'
df = await ckdb.query_dataframe(sql)
concat_data = []
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df['date'] = df['date'].apply(lambda x: x.date())
# 计算整体
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
summary_values = {}
for i, d1 in enumerate(date_range):
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
if not a:
continue
key = d1.strftime('%Y-%m-%d')
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
tmp = summary_values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
groups = set([tuple(i) for i in df[res['groupby']].values])
df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True)
values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
for i, d1 in enumerate(date_range):
for g in groups:
if len(g) == 1:
continue
a = set(df.loc[g]['val_a']) if g in df.index else set()
if not a:
continue
key = d1.strftime("%Y-%m-%d")
tmp_g = values.setdefault(key, {})
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(df.loc[g]['val_b']) if g in df.index else set()
tmp = tmp_g.setdefault(','.join(g[1:]), {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
data = {
'summary_values': summary_values,
'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/funnel_model_sql")
async def funnel_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/funnel_model")
async def funnel_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init(data_where=current_user.data_where)
res = analysis.funnel_model_sql()
sql = res['sql']
date_range = res['date_range']
cond_level = res['cond_level']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
# 补齐level数据
concat_data = []
for key, tmp_df in df.groupby(['date'] + groupby):
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
for item in not_exists_level:
key = key if isinstance(key, tuple) else (key,)
concat_data.append((*key, item, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.set_index('date',inplace=True)
data_list = []
date_data = {}
if df.shape == (0, 0):
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
tmp = {'title': '总体'}
tmp_df = df[['level', 'values']].groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
# 补齐日期
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
concat_data = []
for i in all_idx - set(df.set_index(['date', 'level']).index):
concat_data.append((*i, 0))
summary_df = pd.concat(
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
for key, tmp_df in summary_df.groupby('date'):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
_['总体'] = tmp
if groupby:
# 补齐数据
concat_data = []
idx = set(df.set_index(['date'] + groupby).index)
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
concat_data.append((*i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
for key, tmp_df in df.groupby(groupby):
tmp = {'title': key}
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
for key, tmp_df in df.groupby(['date'] + groupby):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
_[key[1]] = tmp
title = (groupby or ['总体']) + cond_level
resp = {'list': data_list,
'date_data': date_data,
'title': title,
'level': cond_level,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/scatter_model_sql")
async def scatter_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.scatter_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/scatter_model_export")
async def retention_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 分布分析 数据导出"""
await analysis.init(data_where=current_user.data_where)
res = analysis.scatter_model_sql()
file_name = quote(f'分布分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = res['sql']
df = await ckdb.query_dataframe(sql)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
if analysis != 'number_of_days' and interval_type != 'discrete':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
export_df = pd.DataFrame(columns=resp['label'])
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': title
}
export_df.loc[(date.strftime('%Y-%m-%d'), title)] = bins_s.to_list()
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime,
headers={'Content-Disposition': f'filename="{file_name}"'})
# elif analysis == 'number_of_days':
else:
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
total_dict = {}
labels = [str(i) for i in sorted(df['values'].unique())]
resp['label'] = labels
for key, tmp_df in df.groupby(['date']):
total = len(tmp_df)
dt = key.strftime('%Y-%m-%d')
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
export_df = pd.DataFrame(columns=resp['label'])
for d, v in resp['list'].items():
export_df.loc[d] = v['总体']['n']
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/scatter_model")
async def scatter_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init(data_where=current_user.data_where)
res = analysis.scatter_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
df.fillna(0, inplace=True)
df['values'] = df['values'].astype(int)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
# 兼容合计的
if res['time_particle'] == 'total':
df['date'] = '合计'
if analysis != 'number_of_days' and interval_type != 'discrete':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
if res['time_particle'] == 'total':
resp['list']['合计'] = dict()
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
else:
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': title
}
return schemas.Msg(code=0, msg='ok', data=resp)
# elif analysis == 'number_of_days':
else:
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
labels = [str(i) for i in sorted(df['values'].unique())]
resp['label'] = labels
for key, tmp_df in df.groupby(['date']):
total = len(tmp_df)
if res['time_particle'] == 'total':
dt='合计'
else:
dt = key.strftime('%Y-%m-%d')
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
return schemas.Msg(code=0, msg='ok', data=resp)
# bins_s = pd.cut(tmp_df['values'], bins=bins,
# right=False).value_counts()
# bins_s.sort_index(inplace=True)
# total = int(bins_s.sum())
# resp['list'][key.strftime('%Y-%m-%d')] = dict()
# resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
# 'p': round(bins_s * 100 / total, 2).to_list(),
# 'title': '总体'}
@router.post("/trace_model_sql")
async def trace_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.trace_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/trace_model")
async def trace_model_sql(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析"""
await analysis.init(data_where=current_user.data_where)
res = analysis.trace_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
chain_dict = defaultdict(dict)
nodes = {'流失'}
for event_names, count in zip(df['event_chain'], df['values']):
chain_len = len(event_names)
for i, event_name in enumerate(event_names):
if i >= 10:
continue
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
key = (f'{event_name}{i}', f'{next_event}{i + 1}')
nodes.update(key)
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
links = []
for _, items in chain_dict.items():
for keys, val in items.items():
links.append({
"source": keys[0],
"target": keys[1],
"value": val
})
# nodes = set()
# for item in links:
# nodes.update((
# item['source'],
# item['target'])
# )
data = {
'nodes': [{'name': item} for item in nodes],
'links': links,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/user_property_model_sql")
async def user_property_sql(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.property_model()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/user_property_model_export")
async def user_property_model_export(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""用户属性 导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.property_model()
file_name = quote(f'用户属性.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
df_to_stream = DfToStream((df, '用户属性'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/user_property_model")
async def user_property_model(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性分析"""
await analysis.init(data_where=current_user.data_where)
res = analysis.property_model()
sql = res['sql']
quota = res['quota']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
if len(df) == 0:
return schemas.Msg(code=0, msg='查无数据', data=None)
# 没有分组
data = {'groupby': groupby}
title = []
if not groupby:
data['总体'] = int(df['values'][0])
title = ['总体', quota]
else:
sum_s = df.groupby(groupby)['values'].sum()
data = dict()
for key, val in sum_s.items():
if isinstance(key, tuple):
key = ','.join([str(i) for i in key])
else:
key = str(key)
data[key] = val
title = ['.'.join(groupby), quota]
return schemas.Msg(code=0, msg='ok', data={
'value': data,
'title': title
})