import datetime import mimetypes from collections import defaultdict import time from urllib.parse import quote import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase from pandas import DataFrame from starlette.responses import StreamingResponse import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis from models.x_analysis import XAnalysis from utils import DfToStream, get_bijiao router = APIRouter() @router.post("/ltv_model_sql") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/ltv_model") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) res = analysis.ltv_model_sql() sql = res['sql'] #仅一条筛选条件则是把GM过滤后获取全部数据 if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': df = await ckdb.query_dataframe(sql) #多条筛选条件则合成新的sql else: new_sql="""""" #拆分sql split_sql = sql.split('AND 1') #获取每一条筛选条件 for i in analysis.global_filters: #剔除GM if i['strftv'] != 'GM': #获取筛选条件的包含关系 bijiao=get_bijiao(i["comparator"]) #获取筛选条件的值 condition=tuple(i['ftv']) #获取事件名 columnName=i['columnName'] dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" new_sql+=dd split_="""AND 1 """ news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] df = await ckdb.query_dataframe(news_sql) quota = res['quota'] #字段名 ltv_n = res['ltv_n'] #df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df.fillna(0, inplace=True) #修改原对象,以0填补空缺值 # for d in set(res['date_range']) - set(df['date']): # 时间的差集运算 最后为空 # df.loc[len(df)] = 0 # df.loc[len(df) - 1, 'date'] = d # days = (pd.Timestamp.now().date() - d).days # 时间差 # # if days + 2 >= ltv_len: # # continue # df.iloc[len(df) - 1, days + 3:] = '-' # df.sort_values('date', inplace=True) # 根据date进行倒叙排序 for d in set(res['date_range']) - set(df['date']): #在有效日期最后一行补充行数据(值都为'-'),补充的行数为两个集合的差集长度 df.loc[len(df)] = '-' #在date此列补充多行数据(值为两个集合差集的子元素) df.loc[len(df) - 1, 'date'] = d # days = (d-pd.Timestamp.now().date()).days # # if days + 2 >= ltv_len: # # continue # if days>0: # df.iloc[len(df) - 1, 1:] = '-' df.sort_values('date', inplace=True) df.rename(columns={'date': '注册日期'}, inplace=True) #True为将结果返回赋值给原变量,修改原对象,columns为列名 cat = '角色数' if quota == '#distinct_id': #如果字段名=字段名 cat = '设备数' df.rename(columns={'cnt1': cat}, inplace=True) #原数据基础上修改df里面列名为cnt1为设备数 df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] #1, 2, 3, 4, 5, 6, 7, 8, 9, ~~到360 df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] df2.replace('-', 0, inplace=True) #True改变原数据,前面是需要替换的值,后面是替换后的值。 在原数据把下划线替换成0 #修改下面代码 # 去除sumpay_1的值为0的列 new_df2 = (df2.drop(df2[(df2.sumpay_2 == 0)].index)) #为new_df2排序 new_df2=new_df2.reset_index(drop=True) #求相差天数 str_time =new_df2['注册日期'][0] str_time01=str(str_time) split_time = str_time01.split('-') #str_time = str(res['date_range'][0]) # split_time = str_time.split('-') now_time = time.strftime("%Y-%m-%d", time.localtime()) split_now_time = now_time.split('-') today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2])) now_day = datetime.datetime(int(split_now_time[0]), int(split_now_time[1]), int(split_now_time[2])) newday = (now_day - today).days + 1 #计算方法运算每个LTV的均值 _listData = {} for i in ltv_n: if i <=newday: #计算均值 #avgLtv = (new_df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / new_df2[cat][0:newday + 1 - i].sum()).round(2) #12.20号计算LTV均值的时候分母包括当天未充值新增设备数,比剔除掉的计算值偏小 avgLtv = (df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / df2[cat][0:newday + 1 - i].sum()).round(2) #取出均值 new_avgLtv=str(avgLtv).split('\n')[0].split(' ') new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] if new_avgLtv01 == 'NaN': _listData[f'sumpay_{i}'] = '-' else: _listData[f'sumpay_{i}'] = new_avgLtv01 #原代码 # avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) # new_avgLtv=str(avgLtv).split('\n')[0].split(' ') # new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] # if new_avgLtv01 == 'NaN': # _listData[f'sumpay_{i}'] = '-' # else: # _listData[f'sumpay_{i}'] = new_avgLtv01 else: _listData[f'sumpay_{i}']='-' avgLtvlist = pd.Series(_listData) _listname=[] #计算总累计LTV最后一个值 for k, v in _listData.items(): if v != 0 or v!= '-': # if v !=0: _listname.append(k) max_nmu=max(_listname) #max_num = (new_df2[[max_nmu]].sum() / new_df2[cat].sum()).round(2) max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2) max_number=str(max_num[0]) df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] #原代码 #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] # avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] df1.insert(2, '累计LTV', 0) last_ltv = [] for items in df1.values: for item in items[::-1]: if item != '-': last_ltv.append(item) break #修改累计LTV中最后一个值 last_ltv[-1]=max_number df1['累计LTV'] = last_ltv #把列中累计LTV等于0的值改成'-' #df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' #剔除行,列的累计LTV=='-'的剔除出去 df3 = df1.drop(df1[(df1.LTV1 == '-')].index) #df3 = df1.drop(df1[(df1.累计LTV=='-')].index) days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days df1.iloc[len(df1) - 1, days + 4:] = '-' data = { #'title': df1.columns.tolist(), #'rows': df1.values.tolist(), 'title': df3.columns.tolist(), 'rows': df3.values.tolist(), 'start_date': res['start_date'], 'end_date': res['end_date'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/ltv_model_export") async def ltv_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ ltv分析 数据导出""" await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() file_name = quote(f'lvt.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df_to_stream = DfToStream((df, 'ltv')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})