xbackend/api/api_v1/endpoints/query.py
2021-08-30 19:05:28 +08:00

677 lines
25 KiB
Python

import datetime
from collections import defaultdict
import mimetypes
from urllib.parse import quote
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
from fastapi.responses import StreamingResponse
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
from models.user_analysis import UserAnalysis
from utils import DfToStream
router = APIRouter()
@router.post("/sql")
async def query_sql(
request: Request,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""原 sql 查询 """
data = await ckdb.execute(data_in.sql)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model_sql")
async def event_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析模型 sql"""
await analysis.init()
data = analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model_export")
async def event_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 事件分析模型 数据导出"""
await analysis.init()
sqls = analysis.event_model_sql()
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
excels = []
for item in sqls:
sql = item['sql']
event_name = item['event_name']
df = await ckdb.query_dataframe(sql)
excels.append((df, event_name))
df_to_stream = DfToStream(*excels)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
# @router.get("/event_model_export")
# async def event_model_export(request: Request,
# game: str,
# report_id: str,
# ckdb: CKDrive = Depends(get_ck_db),
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
# current_user: schemas.UserDB = Depends(deps.get_current_user)
# ):
# """ 事件分析模型 数据导出"""
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
# await analysis.init()
# sqls = analysis.event_model_sql()
# res = []
# file_name = f'{sqls[0]["report_name"]}.xlsx'
# mime = mimetypes.guess_type(file_name)[0]
# for item in sqls[:1]:
# sql = item['sql']
# event_name = item['event_name']
# df = await ckdb.query_dataframe(sql)
# file = df_to_stream(df, event_name)
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
@router.post("/event_model")
async def event_model(
request: Request,
game: str,
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析"""
await analysis.init()
sqls = analysis.event_model_sql()
res = []
for item in sqls:
q = {
'groups': [],
'values': [],
'sum': [],
'avg': [],
'event_name': item['event_name'],
'format': item['format'],
'last_value': 0
}
sql = item['sql']
groupby = item['groupby']
date_range = item['date_range']
q['date_range'] = date_range
df = await ckdb.query_dataframe(sql)
df.fillna(0, inplace=True)
if df.shape[0] == 0:
continue
# return schemas.Msg(code=0, msg='ok', data=[q])
if item['time_particle'] == 'total':
# for group, df_group in df.groupby(groupby):
# df_group.reset_index(drop=True, inplace=True)
q['groups'].append(groupby)
q['values'].append(df['values'].to_list())
q['sum'].append(float(df['values'].sum()))
q['avg'].append(float(df['values'].mean()))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
if groupby:
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
else:
q['date_range'] = ['合计']
res.append(q)
continue
if groupby:
# 有分组
for group, df_group in df.groupby(groupby):
df_group.reset_index(drop=True, inplace=True)
q['groups'].append(str(group))
concat_data = []
for i in set(date_range) - set(df_group['date']):
if len(groupby) > 1:
concat_data.append((i, *group, 0))
else:
concat_data.append((i, group, 0))
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
df_group.sort_values('date', inplace=True)
q['values'].append(df_group['values'].to_list())
q['sum'].append(float(df_group['values'].sum()))
q['avg'].append(float(df_group['values'].mean()))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
else:
# 无分组
concat_data = []
for i in set(date_range) - set(df['date']):
concat_data.append((i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df.sort_values('date', inplace=True)
if len(df) >= 2:
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
if len(df) >= 8:
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100 / df.iloc[-8, 1], 2) or 0
q['values'].append(df['values'].to_list())
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
q['sum'].append(float(df['values'].sum()))
q['avg'].append(float(df['values'].mean()))
if item['time_particle'] in ('P1D', 'P1W'):
q['date_range'] = [d.strftime('%Y-%m-%d') for d in q['date_range']]
elif item['time_particle'] in ('P1M',):
q['date_range'] = [d.strftime('%Y-%m') for d in q['date_range']]
else:
q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']]
# q['eventNameDisplay']=item['event_name_display']
res.append(q)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/retention_model_sql")
async def retention_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init()
data = analysis.retention_model_sql2()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/retention_model")
async def retention_model(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init()
res = analysis.retention_model_sql2()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if len(df) == 0:
return schemas.Msg(code=-1, msg='无数据', data=None)
title = f'用户数'
date_range = res['date_range']
unit_num = res['unit_num']
df.set_index('reg_date', inplace=True)
for i in set(date_range) - set(df.index):
df.loc[i] = 0
df.sort_index(inplace=True)
days = [i for i in range(unit_num + 1)]
summary_values = {}
today = datetime.datetime.today().date()
for date, value in df.T.items():
tmp = summary_values.setdefault(date.strftime('%Y-%m-%d'), dict())
tmp['d0'] = int(value.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in range((today - date).days + 1):
if i > unit_num:
break
p = float(getattr(value, f'p{i + 1}'))
n = int(getattr(value, f'cnt{i + 1}'))
p_outflow = round(100 - p, 2)
n_outflow = value.cnt0 - n
tmp['p'].append(p)
tmp['n'].append(n)
tmp['p_outflow'].append(p_outflow)
tmp['n_outflow'].append(n_outflow)
resp = {
'summary_values': summary_values,
# 'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/retention_model_del", deprecated=True)
async def retention_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init()
res = analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
unit_num = res['unit_num']
title = await crud.event_mana.get_show_name(db, game, event_a)
title = f'{title}用户数'
df = await ckdb.query_dataframe(sql)
concat_data = []
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df['date'] = df['date'].apply(lambda x: x.date())
# 计算整体
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
summary_values = {}
for i, d1 in enumerate(date_range):
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
if not a:
continue
key = d1.strftime('%Y-%m-%d')
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
tmp = summary_values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
groups = set([tuple(i) for i in df[res['groupby']].values])
df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True)
values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
for i, d1 in enumerate(date_range):
for g in groups:
if len(g) == 1:
continue
a = set(df.loc[g]['val_a']) if g in df.index else set()
if not a:
continue
key = d1.strftime("%Y-%m-%d")
tmp_g = values.setdefault(key, {})
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(df.loc[g]['val_b']) if g in df.index else set()
tmp = tmp_g.setdefault(','.join(g[1:]), {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
data = {
'summary_values': summary_values,
'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/funnel_model_sql")
async def funnel_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init()
data = analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/funnel_model")
async def funnel_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init()
res = analysis.funnel_model_sql()
sql = res['sql']
date_range = res['date_range']
cond_level = res['cond_level']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
# 补齐level数据
concat_data = []
for key, tmp_df in df.groupby(['date'] + groupby):
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
for item in not_exists_level:
key = key if isinstance(key, tuple) else (key,)
concat_data.append((*key, item, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.set_index('date',inplace=True)
data_list = []
date_data = {}
if df.shape == (0, 0):
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
tmp = {'title': '总体'}
tmp_df = df[['level', 'values']].groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
# 补齐日期
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
concat_data = []
for i in all_idx - set(df.set_index(['date', 'level']).index):
concat_data.append((*i, 0))
summary_df = pd.concat(
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
for key, tmp_df in summary_df.groupby('date'):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
_['总体'] = tmp
if groupby:
# 补齐数据
concat_data = []
idx = set(df.set_index(['date'] + groupby).index)
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
concat_data.append((*i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
for key, tmp_df in df.groupby(groupby):
tmp = {'title': key}
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
for key, tmp_df in df.groupby(['date'] + groupby):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
_[key[1]] = tmp
title = (groupby or ['总体']) + cond_level
resp = {'list': data_list, 'date_data': date_data, 'title': title, 'level': cond_level
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/scatter_model_sql")
async def scatter_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 sql"""
await analysis.init()
data = analysis.scatter_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/scatter_model")
async def scatter_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init()
res = analysis.scatter_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
if analysis != 'number_of_days':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict()}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': title
}
return schemas.Msg(code=0, msg='ok', data=resp)
if interval_type == 'def' and analysis == 'number_of_days':
resp = {'list': {}}
for key, tmp_df in df.groupby('date'):
total = round(tmp_df['values'].sum(), 2)
resp['list'][key.strftime('%Y-%m-%d')] = {'n': total, 'total': total, 'p': 100}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/trace_model_sql")
async def trace_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析 sql"""
await analysis.init()
data = analysis.trace_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/trace_model")
async def trace_model_sql(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析"""
await analysis.init()
res = analysis.trace_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
chain_dict = defaultdict(dict)
nodes = {'流失'}
for event_names, count in zip(df['event_chain'], df['values']):
chain_len = len(event_names)
for i, event_name in enumerate(event_names):
if i >= 10:
continue
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
key = (f'{event_name}{i}', f'{next_event}{i + 1}')
nodes.update(key)
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
links = []
for _, items in chain_dict.items():
for keys, val in items.items():
links.append({
"source": keys[0],
"target": keys[1],
"value": val
})
# nodes = set()
# for item in links:
# nodes.update((
# item['source'],
# item['target'])
# )
data = {
'nodes': [{'name': item} for item in nodes],
'links': links
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/user_property_model_sql")
async def user_property_sql(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性sql"""
await analysis.init()
data = analysis.property_model()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/user_property_model")
async def user_property_model(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性分析"""
await analysis.init()
res = analysis.property_model()
sql = res['sql']
quota = res['quota']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
# 没有分组
data = {'groupby': groupby}
title = []
if not groupby:
data['总体'] = int(df['values'][0])
title = ['总体', quota]
else:
sum_s = df.groupby(groupby)['values'].sum()
data = dict()
for key, val in sum_s.items():
if isinstance(key, tuple):
key = ','.join([str(i) for i in key])
else:
key = str(key)
data[key] = val
title = ['.'.join(groupby), quota]
return schemas.Msg(code=0, msg='ok', data={
'value': data,
'title': title
})