import datetime import mimetypes from collections import defaultdict import time from urllib.parse import quote import re import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase from pandas import DataFrame from starlette.responses import StreamingResponse import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive, ckdb from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis from models.x_analysis import XAnalysis from utils import DfToStream, get_bijiao router = APIRouter() @router.post("/ltv_model_sql") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/ltv_model") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) res = analysis.ltv_model_sql() sql = res['sql'] #仅一条筛选条件则是把GM过滤后获取全部数据 if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': df = await ckdb.query_dataframe(sql) #多条筛选条件则合成新的sql else: new_sql="""""" #拆分sql split_sql = sql.split('AND 1') #获取每一条筛选条件 for i in analysis.global_filters: #剔除GM if i['strftv'] != 'GM': #获取筛选条件的包含关系 bijiao=get_bijiao(i["comparator"]) #获取筛选条件的值 condition=tuple(i['ftv']) #获取事件名 columnName=i['columnName'] dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" new_sql+=dd split_="""AND 1 """ news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] df = await ckdb.query_dataframe(news_sql) # # 判断11月23号之前的数据 # list_data_range=analysis.date_range # liststr_data_range=[] # for i in list_data_range: # liststr_data_range.append(str(i)) # quota = analysis.event_view['quota'] # #判断是设备LTV则执行下面代码,如是角色实充LTV则不执行 # if quota == '#distinct_id': # if '2021-11-22' in liststr_data_range or '2021-11-22' >=liststr_data_range[-1]: # #取搜索最后为11.23号之前的数据 # if '2021-11-22' >=liststr_data_range[-1]: # news_sql="""""" # split_sql=sql.split('AND is_new_device = 1') # new_sql=split_sql[0]+split_sql[1]+split_sql[2] # news_sql+=new_sql # df_twenty_three=await ckdb.query_dataframe(news_sql) # #取包含有11.23号之前和23号之后的那一段 # else: # start_date=str(list_data_range[0]) # end_date='2021-11-22' # news_sql = """""" # split_sql = sql.split('AND is_new_device = 1') # for i in split_sql: # news_sql += i # #用正则表达式切时间 # zhengze_time=r'\d{4}-\d{1,2}-\d{1,2}' # zhengze_sql=re.split(zhengze_time,news_sql) # zz_new_sql=zhengze_sql[0]+start_date+zhengze_sql[1]+end_date+zhengze_sql[2]+start_date+zhengze_sql[3]+end_date+zhengze_sql[4] # zz_news_sql="""""" # zz_news_sql+=zz_new_sql # df_twenty_three = await ckdb.query_dataframe(zz_news_sql) # #上下合并两组数据,忽略以前的索引下标 # df= pd.concat([df,df_twenty_three], axis=0, ignore_index=True) # df.sort_values('date', inplace=True) # #去重 # #df.drop_duplicates(inplace=True) quota = res['quota'] #字段名 ltv_n = res['ltv_n'] #df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df.fillna(0, inplace=True) #修改原对象,以0填补空缺值 # for d in set(res['date_range']) - set(df['date']): # 时间的差集运算 最后为空 # df.loc[len(df)] = 0 # df.loc[len(df) - 1, 'date'] = d # days = (pd.Timestamp.now().date() - d).days # 时间差 # # if days + 2 >= ltv_len: # # continue # df.iloc[len(df) - 1, days + 3:] = '-' # df.sort_values('date', inplace=True) # 根据date进行倒叙排序 for d in set(res['date_range']) - set(df['date']): #在有效日期最后一行补充行数据(值都为'-'),补充的行数为两个集合的差集长度 df.loc[len(df)] = '-' #在date此列补充多行数据(值为两个集合差集的子元素) df.loc[len(df) - 1, 'date'] = d # days = (d-pd.Timestamp.now().date()).days # # if days + 2 >= ltv_len: # # continue # if days>0: # df.iloc[len(df) - 1, 1:] = '-' df.sort_values('date', inplace=True) df.rename(columns={'date': '注册日期'}, inplace=True) #True为将结果返回赋值给原变量,修改原对象,columns为列名 cat = '角色数' if quota == '#distinct_id': #如果字段名=字段名 cat = '设备数' df.rename(columns={'cnt1': cat}, inplace=True) #原数据基础上修改df里面列名为cnt1为设备数 df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] #1, 2, 3, 4, 5, 6, 7, 8, 9, ~~到360 df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] df2.replace('-', 0, inplace=True) #True改变原数据,前面是需要替换的值,后面是替换后的值。 在原数据把下划线替换成0 #修改下面代码 # 去除sumpay_2的值为0的列 new_df2 = (df2.drop(df2[(df2.sumpay_2 == 0)].index)) #为new_df2排序 new_df2=new_df2.reset_index(drop=True) #求相差天数 str_time = df2['注册日期'][0] #str_time =new_df2['注册日期'][0] str_time01=str(str_time) split_time = str_time01.split('-') #str_time = str(res['date_range'][0]) # split_time = str_time.split('-') now_time=time.strftime("%Y-%m-%d",time.localtime()) split_now_time = now_time.split('-') today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2])) now_day = datetime.datetime(int(split_now_time[0]), int(split_now_time[1]), int(split_now_time[2])) newday = (now_day - today).days + 1 #计算方法运算每个LTV的均值 _listData = {} for i in ltv_n: if i <=newday: #计算均值 #avgLtv = (new_df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / new_df2[cat][0:newday + 1 - i].sum()).round(2) #12.20号计算LTV均值的时候分母包括当天未充值新增设备数,比剔除掉的计算值偏小 avgLtv = (df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / df2[cat][0:newday + 1 - i].sum()).round(2) #取出均值 new_avgLtv=str(avgLtv).split('\n')[0].split(' ') new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] if new_avgLtv01 == 'NaN': _listData[f'sumpay_{i}'] = '-' else: _listData[f'sumpay_{i}'] = new_avgLtv01 #原代码 # avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) # new_avgLtv=str(avgLtv).split('\n')[0].split(' ') # new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] # if new_avgLtv01 == 'NaN': # _listData[f'sumpay_{i}'] = '-' # else: # _listData[f'sumpay_{i}'] = new_avgLtv01 else: _listData[f'sumpay_{i}']='-' avgLtvlist = pd.Series(_listData) _listname=[] #计算总累计LTV最后一个值 for k, v in _listData.items(): if v != 0 or v!= '-': # if v !=0: _listname.append(k) max_nmu=max(_listname) #max_num = (new_df2[[max_nmu]].sum() / new_df2[cat].sum()).round(2) max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2) max_number=str(max_num[0]) df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] #原代码 #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] # avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] df1.insert(2, '累计LTV', 0) last_ltv = [] for items in df1.values: for item in items[::-1]: if item != '-': last_ltv.append(item) break #修改累计LTV中最后一个值 last_ltv[-1]=max_number df1['累计LTV'] = last_ltv #把列中累计LTV等于0的值改成'-' #df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' #剔除行,列的累计LTV=='-'的剔除出去 df3 = df1.drop(df1[(df1.LTV1 == '-')].index) #df3 = df1.drop(df1[(df1.累计LTV=='-')].index) days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days df1.iloc[len(df1) - 1, days + 4:] = '-' data = { #'title': df1.columns.tolist(), #'rows': df1.values.tolist(), 'title': df3.columns.tolist(), 'rows': df3.values.tolist(), 'start_date': res['start_date'], 'end_date': res['end_date'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/ltv_model_export") async def ltv_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ ltv分析 数据导出""" await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() file_name = quote(f'lvt.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df_to_stream = DfToStream((df, 'ltv')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})