xbackend/api/api_v1/endpoints/query.py
2021-07-09 16:55:45 +08:00

469 lines
17 KiB
Python

from collections import defaultdict
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
router = APIRouter()
@router.post("/sql")
async def query_sql(
request: Request,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""原 sql 查询 """
data = await ckdb.execute(data_in.sql)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model_sql")
async def event_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析模型 sql"""
await analysis.init()
data = analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model")
async def event_model(
request: Request,
game: str,
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析"""
await analysis.init()
sqls = analysis.event_model_sql()
res = []
for item in sqls:
q = {
'groups': [],
'values': [],
'sum': [],
'event_name': item['event_name']
}
sql = item['sql']
groupby = item['groupby']
date_range = item['date_range']
q['date_range'] = date_range
df = await ckdb.query_dataframe(sql)
if df.shape[0] == 0:
return schemas.Msg(code=0, msg='ok', data=q)
if groupby:
# 有分组
for group, df_group in df.groupby(groupby):
df_group.reset_index(drop=True, inplace=True)
q['groups'].append(group)
concat_data = []
for i in set(date_range) - set(df_group['date']):
if len(groupby) > 1:
concat_data.append((i, *group, 0))
else:
concat_data.append((i, group, 0))
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
df_group.sort_values('date', inplace=True)
q['values'].append(df_group['values'].to_list())
q['sum'].append(int(df['values'].sum()))
else:
# 无分组
concat_data = []
for i in set(date_range) - set(df['date']):
concat_data.append((i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df.sort_values('date', inplace=True)
q['values'].append(df['values'].to_list())
q['sum'].append(int(df['values'].sum()))
q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']]
res.append(q)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/retention_model_sql")
async def retention_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init()
data = analysis.retention_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/retention_model")
async def retention_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init()
res = analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
unit_num = res['unit_num']
title = await crud.event_mana.get_show_name(db, game, event_a)
title = f'{title}用户数'
df = await ckdb.query_dataframe(sql)
concat_data = []
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# 计算整体
summary_df = df.groupby(['date', 'event_name'])[['values', 'amount']].sum()
summary_values = {}
for i, d1 in enumerate(date_range):
a = set(summary_df.loc[(d1, event_a)]['values']) if (d1, event_a) in summary_df.index else set()
if not a:
continue
key = d1.strftime('%Y-%m-%d')
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(summary_df.loc[(d2, event_b)]['values']) if (d2, event_b) in summary_df.index else set()
tmp = summary_values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
groups = set([tuple(i) for i in df[res['groupby'][2:]].values])
df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True)
values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
for i, d1 in enumerate(date_range):
for g in groups:
if g == tuple():
continue
a = set(df.loc[(d1, event_a, *g)]['values']) if (d1, event_a, *g) in df.index else set()
if not a:
continue
key = d1.strftime("%Y-%m-%d")
tmp_g = values.setdefault(key, {})
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(df.loc[(d2, event_b, *g)]['values']) if (d2, event_b, *g) in df.index else set()
tmp = tmp_g.setdefault(','.join(g), {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
data = {
'summary_values': summary_values,
'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/funnel_model_sql")
async def funnel_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init()
data = analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/funnel_model")
async def funnel_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init()
res = analysis.funnel_model_sql()
sql = res['sql']
date_range = res['date_range']
cond_level = res['cond_level']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
# 补齐level数据
concat_data = []
for key, tmp_df in df.groupby(['date'] + groupby):
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
for item in not_exists_level:
key = key if isinstance(key, tuple) else (key,)
concat_data.append((*key, item, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.set_index('date',inplace=True)
data_list = []
date_data = {}
if df.shape == (0, 0):
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
tmp = {'title': '总体'}
tmp_df = df[['level', 'values']].groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
# 补齐日期
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
concat_data = []
for i in all_idx - set(df.set_index(['date', 'level']).index):
concat_data.append((*i, 0))
summary_df = pd.concat(
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
for key, tmp_df in summary_df.groupby('date'):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
_['总体'] = tmp
if groupby:
# 补齐数据
concat_data = []
idx = set(df.set_index(['date'] + groupby).index)
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
concat_data.append((*i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
for key, tmp_df in df.groupby(groupby):
tmp = {'title': key}
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
for key, tmp_df in df.groupby(['date'] + groupby):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
_[key[1]] = tmp
title = (groupby or ['总体']) + cond_level
resp = {'list': data_list, 'date_data': date_data, 'title': title, 'level': cond_level
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/scatter_model_sql")
async def scatter_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 sql"""
await analysis.init()
data = analysis.scatter_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/scatter_model")
async def scatter_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init()
res = analysis.scatter_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
if analysis != 'number_of_days':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict()}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': title
}
return schemas.Msg(code=0, msg='ok', data=resp)
if interval_type == 'def' and analysis == 'number_of_days':
resp = {'list': {}}
for key, tmp_df in df.groupby('date'):
total = int(tmp_df['values'].sum())
resp['list'][key.strftime('%Y-%m-%d')] = {'n': total, 'total': total, 'p': 100}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/trace_model_sql")
async def trace_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析 sql"""
await analysis.init()
data = analysis.trace_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/trace_model")
async def trace_model_sql(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析"""
await analysis.init()
res = analysis.trace_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
chain_dict = defaultdict(dict)
nodes = {'流失'}
for event_names, count in zip(df['event_chain'], df['values']):
chain_len = len(event_names)
for i, event_name in enumerate(event_names):
if i >= 10:
continue
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
key = (f'{event_name}{i}', f'{next_event}{i + 1}')
nodes.update(key)
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
links = []
for _, items in chain_dict.items():
for keys, val in items.items():
links.append({
"source": keys[0],
"target": keys[1],
"value": val
})
# nodes = set()
# for item in links:
# nodes.update((
# item['source'],
# item['target'])
# )
data = {
'nodes': [{'name': item} for item in nodes],
'links': links
}
return schemas.Msg(code=0, msg='ok', data=data)