xbackend/api/api_v1/endpoints/xquery.py
2021-10-29 16:01:14 +08:00

125 lines
3.9 KiB
Python

import mimetypes
from collections import defaultdict
from urllib.parse import quote
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
from starlette.responses import StreamingResponse
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
from models.user_analysis import UserAnalysis
from models.x_analysis import XAnalysis
from utils import DfToStream
router = APIRouter()
@router.post("/ltv_model_sql")
async def ltv_model_sql(
request: Request,
game: str,
analysis: XAnalysis = Depends(XAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" ltv模型sql """
await analysis.init(data_where=current_user.data_where)
data = analysis.ltv_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/ltv_model")
async def ltv_model_sql(
request: Request,
game: str,
analysis: XAnalysis = Depends(XAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" ltv模型sql """
await analysis.init(data_where=current_user.data_where)
res = analysis.ltv_model_sql()
sql = res['sql']
quota = res['quota']
ltv_n = res['ltv_n']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='查无数据')
df.fillna(0, inplace=True)
for d in set(res['date_range']) - set(df['date']):
df.loc[len(df)] = 0
df.loc[len(df) - 1, 'date'] = d
days = (pd.Timestamp.now().date() - d).days
# if days + 2 >= ltv_len:
# continue
df.iloc[len(df) - 1, days+3:] = '-'
df.sort_values('date', inplace=True)
df.rename(columns={'date': '注册日期'}, inplace=True)
cat = '角色数'
if quota == '#distinct_id':
cat = '设备数'
df.rename(columns={'cnt1': cat}, inplace=True)
df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]]
df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]]
df2.replace('-', 0, inplace=True)
avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2)
df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv]
df1.insert(2, '累计LTV', 0)
last_ltv = []
for items in df1.values:
for item in items[::-1]:
if item != '-':
last_ltv.append(item)
break
df1['累计LTV'] = last_ltv
days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days
df1.iloc[len(df1) - 1, days + 4:] = '-'
data = {
'title': df1.columns.tolist(),
'rows': df1.values.tolist(),
'start_date': res['start_date'],
'end_date': res['end_date']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/ltv_model_export")
async def ltv_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: XAnalysis = Depends(XAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" ltv分析 数据导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.ltv_model_sql()
file_name = quote(f'lvt.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='查无数据')
df_to_stream = DfToStream((df, 'ltv'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})