import datetime import mimetypes from collections import defaultdict import time from urllib.parse import quote import re from clickhouse_driver import Client import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase from pandas import DataFrame from starlette.responses import StreamingResponse import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive, ckdb from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis from models.x_analysis import XAnalysis from utils import DfToStream, get_bijiao router = APIRouter() @router.post("/ltv_model_sql") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/ltv_model") async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) res = await analysis.ltv_model_sql() sql = res['sql'].replace('/n','').replace('[','').replace(']','') #仅一条筛选条件则是把GM过滤后获取全部数据 # if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': try: df = await ckdb.query_dataframe(sql) except Exception as e: return schemas.Msg(code=-9, msg='报表配置参数异常') #多条筛选条件则合成新的sql # else: # new_sql="""""" # #拆分sql # split_sql = sql.split('AND 1') # #获取每一条筛选条件 # for i in analysis.global_filters: # #剔除GM # if i['strftv'] != 'GM': # #获取筛选条件的包含关系 # bijiao=get_bijiao(i["comparator"]) # #获取筛选条件的值 # condition=tuple(i['ftv']) # #获取事件名 # columnName=i['columnName'] # dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" # new_sql+=dd # split_="""AND 1 """ # news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] # df = await ckdb.query_dataframe(news_sql) # # 判断11月23号之前的数据 # list_data_range=analysis.date_range # liststr_data_range=[] # for i in list_data_range: # liststr_data_range.append(str(i)) # quota = analysis.event_view['quota'] # #判断是设备LTV则执行下面代码,如是角色实充LTV则不执行 # if quota == '#distinct_id': # if '2021-11-22' in liststr_data_range or '2021-11-22' >=liststr_data_range[-1]: # #取搜索最后为11.23号之前的数据 # if '2021-11-22' >=liststr_data_range[-1]: # news_sql="""""" # split_sql=sql.split('AND is_new_device = 1') # new_sql=split_sql[0]+split_sql[1]+split_sql[2] # news_sql+=new_sql # df_twenty_three=await ckdb.query_dataframe(news_sql) # #取包含有11.23号之前和23号之后的那一段 # else: # start_date=str(list_data_range[0]) # end_date='2021-11-22' # news_sql = """""" # split_sql = sql.split('AND is_new_device = 1') # for i in split_sql: # news_sql += i # #用正则表达式切时间 # zhengze_time=r'\d{4}-\d{1,2}-\d{1,2}' # zhengze_sql=re.split(zhengze_time,news_sql) # zz_new_sql=zhengze_sql[0]+start_date+zhengze_sql[1]+end_date+zhengze_sql[2]+start_date+zhengze_sql[3]+end_date+zhengze_sql[4] # zz_news_sql="""""" # zz_news_sql+=zz_new_sql # df_twenty_three = await ckdb.query_dataframe(zz_news_sql) # #上下合并两组数据,忽略以前的索引下标 # df= pd.concat([df,df_twenty_three], axis=0, ignore_index=True) # df.sort_values('date', inplace=True) # #去重 # #df.drop_duplicates(inplace=True) quota = res['quota'] #字段名 ltv_n = res['ltv_n'] #df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df.fillna(0, inplace=True) #修改原对象,以0填补空缺值 # for d in set(res['date_range']) - set(df['date']): # 时间的差集运算 最后为空 # df.loc[len(df)] = 0 # df.loc[len(df) - 1, 'date'] = d # days = (pd.Timestamp.now().date() - d).days # 时间差 # # if days + 2 >= ltv_len: # # continue # df.iloc[len(df) - 1, days + 3:] = '-' # df.sort_values('date', inplace=True) # 根据date进行倒叙排序 for d in set(res['date_range']) - set(df['date']): #在有效日期最后一行补充行数据(值都为'-'),补充的行数为两个集合的差集长度 df.loc[len(df)] = '-' #在date此列补充多行数据(值为两个集合差集的子元素) df.loc[len(df) - 1, 'date'] = d # days = (d-pd.Timestamp.now().date()).days # # if days + 2 >= ltv_len: # # continue # if days>0: # df.iloc[len(df) - 1, 1:] = '-' df.sort_values('date', inplace=True) df.rename(columns={'date': '注册日期'}, inplace=True) #True为将结果返回赋值给原变量,修改原对象,columns为列名 cat = '角色数' if quota == '#distinct_id': #如果字段名=字段名 cat = '设备数' df.rename(columns={'cnt1': cat}, inplace=True) #原数据基础上修改df里面列名为cnt1为设备数 df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] #1, 2, 3, 4, 5, 6, 7, 8, 9, ~~到360 df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] df2.replace('-', 0, inplace=True) #True改变原数据,前面是需要替换的值,后面是替换后的值。 在原数据把下划线替换成0 #修改下面代码 # 去除sumpay_2的值为0的列 new_df2 = (df2.drop(df2[(df2.sumpay_2 == 0)].index)) #为new_df2排序 new_df2=new_df2.reset_index(drop=True) #求相差天数 str_time = df2['注册日期'][0] #str_time =new_df2['注册日期'][0] str_time01=str(str_time) split_time = str_time01.split('-') #str_time = str(res['date_range'][0]) # split_time = str_time.split('-') now_time=time.strftime("%Y-%m-%d",time.localtime()) split_now_time = now_time.split('-') today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2])) now_day = datetime.datetime(int(split_now_time[0]), int(split_now_time[1]), int(split_now_time[2])) newday = (now_day - today).days + 1 #计算方法运算每个LTV的均值 _listData = {} for i in ltv_n: if i <=newday: #计算均值 #avgLtv = (new_df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / new_df2[cat][0:newday + 1 - i].sum()).round(2) #12.20号计算LTV均值的时候分母包括当天未充值新增设备数,比剔除掉的计算值偏小 avgLtv = (df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / df2[cat][0:newday + 1 - i].sum()).round(2) #取出均值 new_avgLtv=str(avgLtv).split('\n')[0].split(' ') new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] if new_avgLtv01 == 'NaN': _listData[f'sumpay_{i}'] = '-' else: _listData[f'sumpay_{i}'] = new_avgLtv01 #原代码 # avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) # new_avgLtv=str(avgLtv).split('\n')[0].split(' ') # new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] # if new_avgLtv01 == 'NaN': # _listData[f'sumpay_{i}'] = '-' # else: # _listData[f'sumpay_{i}'] = new_avgLtv01 else: _listData[f'sumpay_{i}']='-' avgLtvlist = pd.Series(_listData) _listname=[] #计算总累计LTV最后一个值 for k, v in _listData.items(): if v != 0 or v!= '-': # if v !=0: _listname.append(k) max_nmu=max(_listname) #max_num = (new_df2[[max_nmu]].sum() / new_df2[cat].sum()).round(2) max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2) max_number=str(max_num[0]) df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] #原代码 #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] # avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] df1.insert(2, '累计LTV', 0) last_ltv = [] for items in df1.values: for item in items[::-1]: if item != '-': last_ltv.append(item) break #修改累计LTV中最后一个值 last_ltv[-1]=max_number df1['累计LTV'] = last_ltv #把列中累计LTV等于0的值改成'-' #df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' #剔除行,列的累计LTV=='-'的剔除出去 df3 = df1.drop(df1[(df1.LTV1 == '-')].index) #df3 = df1.drop(df1[(df1.累计LTV=='-')].index) days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days df1.iloc[len(df1) - 1, days + 4:] = '-' data = { #'title': df1.columns.tolist(), #'rows': df1.values.tolist(), 'title': df3.columns.tolist(), 'rows': df3.values.tolist(), 'start_date': res['start_date'], 'end_date': res['end_date'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/ltv_model_export") async def ltv_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ ltv分析 数据导出""" await analysis.init(data_where=current_user.data_where) data = analysis.ltv_model_sql() file_name = quote(f'lvt.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df_to_stream = DfToStream((df, 'ltv')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})