178 lines
6.4 KiB
Python
178 lines
6.4 KiB
Python
import pandas as pd
|
|
from fastapi import APIRouter, Depends, Request
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
|
|
import crud, schemas
|
|
from common import *
|
|
|
|
from api import deps
|
|
from db import get_database
|
|
from db.ckdb import get_ck_db, CKDrive
|
|
from db.redisdb import get_redis_pool, RedisDrive
|
|
|
|
from models.behavior_analysis import BehaviorAnalysis
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/sql")
|
|
async def query_sql(
|
|
request: Request,
|
|
data_in: schemas.Sql,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""原 sql 查询 """
|
|
data = await ckdb.execute(data_in.sql)
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/event_model_sql")
|
|
async def event_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
""" 事件分析模型 sql"""
|
|
|
|
await analysis.init()
|
|
data = analysis.event_model_sql()
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|
|
|
|
|
|
@router.post("/event_model")
|
|
async def event_model(
|
|
request: Request,
|
|
game: str,
|
|
data_in: schemas.CkQuery,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
rdb: RedisDrive = Depends(get_redis_pool),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
""" 事件分析"""
|
|
await analysis.init()
|
|
sqls = analysis.event_model_sql()
|
|
res = []
|
|
for item in sqls:
|
|
q = {
|
|
'groups': [],
|
|
'values': [],
|
|
'event_name': item['event_name']
|
|
}
|
|
sql = item['sql']
|
|
groupby = item['groupby']
|
|
date_range = item['date_range']
|
|
q['date_range'] = date_range
|
|
df = await ckdb.query_dataframe(sql)
|
|
if df.shape[0] == 0:
|
|
return schemas.Msg(code=0, msg='ok', data=q)
|
|
|
|
if groupby:
|
|
# 有分组
|
|
for group, df_group in df.groupby(groupby):
|
|
df_group.reset_index(drop=True, inplace=True)
|
|
q['groups'].append(group)
|
|
|
|
concat_data = []
|
|
for i in set(date_range) - set(df_group['date']):
|
|
if len(groupby) > 1:
|
|
concat_data.append((i, *group, 0))
|
|
else:
|
|
concat_data.append((i, group, 0))
|
|
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
|
|
df_group.sort_values('date', inplace=True)
|
|
q['values'].append(df_group['values'].to_list())
|
|
|
|
else:
|
|
# 无分组
|
|
concat_data = []
|
|
for i in set(date_range) - set(df['date']):
|
|
concat_data.append((i, 0))
|
|
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
|
df.sort_values('values', inplace=True)
|
|
q['values'].append(df['values'].to_list())
|
|
q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']]
|
|
res.append(q)
|
|
return schemas.Msg(code=0, msg='ok', data=res)
|
|
|
|
|
|
@router.post("/retention_model_sql")
|
|
async def retention_model_sql(
|
|
request: Request,
|
|
game: str,
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""留存查询 sql"""
|
|
await analysis.init()
|
|
data = analysis.retention_model_sql()
|
|
return schemas.Msg(code=0, msg='ok', data=[data])
|
|
|
|
|
|
@router.post("/retention_model")
|
|
async def retention_model(
|
|
request: Request,
|
|
game: str,
|
|
ckdb: CKDrive = Depends(get_ck_db),
|
|
db: AsyncIOMotorDatabase = Depends(get_database),
|
|
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
|
|
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
|
) -> schemas.Msg:
|
|
"""留存数据模型"""
|
|
await analysis.init()
|
|
res = analysis.retention_model_sql()
|
|
sql = res['sql']
|
|
date_range = res['date_range']
|
|
event_a, event_b = res['event_name']
|
|
unit_num = res['unit_num']
|
|
title = await crud.event_mana.get_show_name(db, game, event_a)
|
|
title = f'{title}用户数'
|
|
df = await ckdb.query_dataframe(sql)
|
|
concat_data = []
|
|
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
|
|
# 计算整体
|
|
summary_df = df.groupby(['date', 'event_name'])[['values', 'amount']].sum()
|
|
summary_values = {}
|
|
for i, d1 in enumerate(date_range):
|
|
a = set(summary_df.loc[(d1, event_a)]['values']) if (d1, event_a) in summary_df.index else set()
|
|
if not a:
|
|
continue
|
|
key = d1.strftime('%Y-%m-%d')
|
|
for j, d2 in enumerate(date_range[i:]):
|
|
if j > unit_num:
|
|
break
|
|
b = set(summary_df.loc[(d2, event_b)]['values']) if (d2, event_b) in summary_df.index else set()
|
|
tmp = summary_values.setdefault(key, {})
|
|
tmp.setdefault('d0', len(a))
|
|
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
|
tmp.setdefault('n', []).append(len(a & b))
|
|
groups = set([tuple(i) for i in df[res['groupby'][2:]].values])
|
|
df.set_index(res['groupby'], inplace=True)
|
|
df.sort_index(inplace=True)
|
|
values = {}
|
|
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num]
|
|
for i, d1 in enumerate(date_range):
|
|
for g in groups:
|
|
a = set(df.loc[(d1, event_a, *g)]['values']) if (d1, event_a, *g) in df.index else set()
|
|
if not a:
|
|
continue
|
|
key = ','.join((d1.strftime("%Y-%m-%d"), *g))
|
|
for j, d2 in enumerate(date_range[i:]):
|
|
if j > unit_num:
|
|
break
|
|
b = set(df.loc[(d2, event_b, *g)]['values']) if (d2, event_b, *g) in df.index else set()
|
|
tmp = values.setdefault(key, {})
|
|
tmp.setdefault('d0', len(a))
|
|
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
|
|
tmp.setdefault('n', []).append(len(a & b))
|
|
data = {
|
|
'summary_values': summary_values,
|
|
'values': values,
|
|
'days': days,
|
|
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num],
|
|
'title': title
|
|
}
|
|
return schemas.Msg(code=0, msg='ok', data=data)
|