import datetime import mimetypes from collections import defaultdict import time from urllib.parse import quote import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase from pandas import DataFrame from starlette.responses import StreamingResponse import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis from models.x_analysis import XAnalysis from utils import DfToStream router = APIRouter() @router.post("/ltv_model_sql") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/ltv_model") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) res = analysis.ltv_model_sql() sql = res['sql'] quota = res['quota'] #字段名 ltv_n = res['ltv_n'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df.fillna(0, inplace=True) #修改原对象,以0填补空缺值 # for d in set(res['date_range']) - set(df['date']): # 时间的差集运算 最后为空 # df.loc[len(df)] = 0 # df.loc[len(df) - 1, 'date'] = d # days = (pd.Timestamp.now().date() - d).days # 时间差 # # if days + 2 >= ltv_len: # # continue # df.iloc[len(df) - 1, days + 3:] = '-' # df.sort_values('date', inplace=True) # 根据date进行倒叙排序 for d in set(res['date_range']) - set(df['date']): #在有效日期最后一行补充行数据(值都为0),补充的行数为两个集合的差集长度 df.loc[len(df)] = '-' #在date此列补充多行数据(值为两个集合差集的子元素) df.loc[len(df) - 1, 'date'] = d # days = (d-pd.Timestamp.now().date()).days # # if days + 2 >= ltv_len: # # continue # if days>0: # df.iloc[len(df) - 1, 1:] = '-' df.sort_values('date', inplace=True) df.rename(columns={'date': '注册日期'}, inplace=True) #True为将结果返回赋值给原变量,修改原对象,columns为列名 cat = '角色数' if quota == '#distinct_id': #如果字段名=字段名 cat = '设备数' df.rename(columns={'cnt1': cat}, inplace=True) #原数据基础上修改df里面列名为cnt1为设备数 df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] #1, 2, 3, 4, 5, 6, 7, 8, 9, ~~到360 df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] df2.replace('-', 0, inplace=True) #True改变原数据,前面是需要替换的值,后面是替换后的值。 在原数据把下划线替换成0 #修改下面代码 #求相差天数 str_time = str(res['date_range'][0]) split_time = str_time.split('-') now_time = time.strftime("%Y-%m-%d", time.localtime()) split_now_time = now_time.split('-') today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2])) now_day = datetime.datetime(int(split_now_time[0]), int(split_now_time[1]), int(split_now_time[2])) newday = (now_day - today).days + 1 #计算方法运算每个LTV的均值 _listData = {} for i in ltv_n: if i <=newday: avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) new_avgLtv=str(avgLtv).split('\n')[0].split(' ') new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] if new_avgLtv01 == 'NaN': _listData[f'sumpay_{i}'] = '-' else: _listData[f'sumpay_{i}'] = new_avgLtv01 else: _listData[f'sumpay_{i}']='-' avgLtvlist = pd.Series(_listData) _listname=[] #计算总累计LTV最后一个值 for k, v in _listData.items(): if v !=0: _listname.append(k) max_nmu=max(_listname) max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2) max_number=str(max_num[0]) df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] # avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] df1.insert(2, '累计LTV', 0) last_ltv = [] for items in df1.values: for item in items[::-1]: if item != '-': last_ltv.append(item) break #修改累计LTV中最后一个值 last_ltv[-1]=max_number df1['累计LTV'] = last_ltv #把列中累计LTV等于0的值改成'-' df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' #剔除行,列的累计LTV=='-'的剔除出去 df3 = df1.drop(df1[(df1.累计LTV=='-')].index) days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days df1.iloc[len(df1) - 1, days + 4:] = '-' data = { #'title': df1.columns.tolist(), #'rows': df1.values.tolist(), 'title': df3.columns.tolist(), 'rows': df3.values.tolist(), 'start_date': res['start_date'], 'end_date': res['end_date'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/ltv_model_export") async def ltv_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ ltv分析 数据导出""" await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() file_name = quote(f'lvt.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df_to_stream = DfToStream((df, 'ltv')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})