import mimetypes from collections import defaultdict from urllib.parse import quote import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase from starlette.responses import StreamingResponse import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis from models.x_analysis import XAnalysis from utils import DfToStream router = APIRouter() @router.post("/ltv_model_sql") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/ltv_model") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) res = analysis.ltv_model_sql() sql = res['sql'] quota = res['quota'] ltv_n = res['ltv_n'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-1, msg='查无数据') df.fillna(0, inplace=True) for d in set(res['date_range']) - set(df['date']): df.loc[len(df)] = 0 df.loc[len(df) - 1, 'date'] = d days = (pd.Timestamp.now().date() - d).days # if days + 2 >= ltv_len: # continue df.iloc[len(df) - 1, days+3:] = '-' df.sort_values('date', inplace=True) df.rename(columns={'date': '注册日期'}, inplace=True) cat = '角色数' if quota == '#distinct_id': cat = '设备数' df.rename(columns={'cnt1': cat}, inplace=True) df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] df2.replace('-', 0, inplace=True) avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] df1.insert(2, '累计LTV', 0) last_ltv = [] for items in df1.values: for item in items[::-1]: if item != '-': last_ltv.append(item) break df1['累计LTV'] = last_ltv data = { 'title': df1.columns.tolist(), 'rows': df1.values.tolist(), 'start_date': res['start_date'], 'end_date': res['end_date'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/ltv_model_export") async def ltv_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ ltv分析 数据导出""" await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() file_name = quote(f'lvt.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) df_to_stream = DfToStream((df, 'ltv')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})