xbackend/api/api_v1/endpoints/xquery.py

267 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import mimetypes
from collections import defaultdict
import time
from urllib.parse import quote
import re
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
from pandas import DataFrame
from starlette.responses import StreamingResponse
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
from models.user_analysis import UserAnalysis
from models.x_analysis import XAnalysis
from utils import DfToStream, get_bijiao
router = APIRouter()
@router.post("/ltv_model_sql")
async def ltv_model_sql(
request: Request,
game: str,
analysis: XAnalysis = Depends(XAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" ltv模型sql """
await analysis.init(data_where=current_user.data_where)
data = analysis.ltv_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/ltv_model")
async def ltv_model_sql(
request: Request,
game: str,
analysis: XAnalysis = Depends(XAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" ltv模型sql """
await analysis.init(data_where=current_user.data_where)
res = analysis.ltv_model_sql()
sql = res['sql']
#仅一条筛选条件则是把GM过滤后获取全部数据
if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM':
df = await ckdb.query_dataframe(sql)
#多条筛选条件则合成新的sql
else:
new_sql=""""""
#拆分sql
split_sql = sql.split('AND 1')
#获取每一条筛选条件
for i in analysis.global_filters:
#剔除GM
if i['strftv'] != 'GM':
#获取筛选条件的包含关系
bijiao=get_bijiao(i["comparator"])
#获取筛选条件的值
condition=tuple(i['ftv'])
#获取事件名
columnName=i['columnName']
dd = f""" AND {game}.event.{columnName} {bijiao} {condition}"""
new_sql+=dd
split_="""AND 1 """
news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3]
df = await ckdb.query_dataframe(news_sql)
# 判断11月23号之前的数据
list_data_range=analysis.date_range
liststr_data_range=[]
for i in list_data_range:
liststr_data_range.append(str(i))
quota = analysis.event_view['quota']
#判断是设备LTV则执行下面代码如是角色实充LTV则不执行
if quota == '#distinct_id':
if '2021-11-22' in liststr_data_range or '2021-11-22' >=liststr_data_range[-1]:
#取搜索最后为11.23号之前的数据
if '2021-11-22' >=liststr_data_range[-1]:
news_sql=""""""
split_sql=sql.split('AND is_new_device = 1')
new_sql=split_sql[0]+split_sql[1]+split_sql[2]
news_sql+=new_sql
df_twenty_three=await ckdb.query_dataframe(news_sql)
#取包含有11.23号之前和23号之后的那一段
else:
start_date=str(list_data_range[0])
end_date='2021-11-22'
news_sql = """"""
split_sql = sql.split('AND is_new_device = 1')
for i in split_sql:
news_sql += i
#用正则表达式切时间
zhengze_time=r'\d{4}-\d{1,2}-\d{1,2}'
zhengze_sql=re.split(zhengze_time,news_sql)
zz_new_sql=zhengze_sql[0]+start_date+zhengze_sql[1]+end_date+zhengze_sql[2]+start_date+zhengze_sql[3]+end_date+zhengze_sql[4]
zz_news_sql=""""""
zz_news_sql+=zz_new_sql
df_twenty_three = await ckdb.query_dataframe(zz_news_sql)
#上下合并两组数据,忽略以前的索引下标
df= pd.concat([df,df_twenty_three], axis=0, ignore_index=True)
df.sort_values('date', inplace=True)
#去重
#df.drop_duplicates(inplace=True)
quota = res['quota'] #字段名
ltv_n = res['ltv_n']
#df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='查无数据')
df.fillna(0, inplace=True) #修改原对象以0填补空缺值
# for d in set(res['date_range']) - set(df['date']): # 时间的差集运算 最后为空
# df.loc[len(df)] = 0
# df.loc[len(df) - 1, 'date'] = d
# days = (pd.Timestamp.now().date() - d).days # 时间差
# # if days + 2 >= ltv_len:
# # continue
# df.iloc[len(df) - 1, days + 3:] = '-'
# df.sort_values('date', inplace=True) # 根据date进行倒叙排序
for d in set(res['date_range']) - set(df['date']):
#在有效日期最后一行补充行数据(值都为'-'),补充的行数为两个集合的差集长度
df.loc[len(df)] = '-'
#在date此列补充多行数据值为两个集合差集的子元素
df.loc[len(df) - 1, 'date'] = d
# days = (d-pd.Timestamp.now().date()).days
# # if days + 2 >= ltv_len:
# # continue
# if days>0:
# df.iloc[len(df) - 1, 1:] = '-'
df.sort_values('date', inplace=True)
df.rename(columns={'date': '注册日期'}, inplace=True) #True为将结果返回赋值给原变量修改原对象columns为列名
cat = '角色数'
if quota == '#distinct_id': #如果字段名=字段名
cat = '设备数'
df.rename(columns={'cnt1': cat}, inplace=True) #原数据基础上修改df里面列名为cnt1为设备数
df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] #1, 2, 3, 4, 5, 6, 7, 8, 9, ~~到360
df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]]
df2.replace('-', 0, inplace=True) #True改变原数据前面是需要替换的值后面是替换后的值。 在原数据把下划线替换成0
#修改下面代码
# 去除sumpay_1的值为0的列
new_df2 = (df2.drop(df2[(df2.sumpay_2 == 0)].index))
#为new_df2排序
new_df2=new_df2.reset_index(drop=True)
#求相差天数
str_time =new_df2['注册日期'][0]
str_time01=str(str_time)
split_time = str_time01.split('-')
#str_time = str(res['date_range'][0])
# split_time = str_time.split('-')
now_time = time.strftime("%Y-%m-%d", time.localtime())
split_now_time = now_time.split('-')
today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2]))
now_day = datetime.datetime(int(split_now_time[0]), int(split_now_time[1]), int(split_now_time[2]))
newday = (now_day - today).days + 1
#计算方法运算每个LTV的均值
_listData = {}
for i in ltv_n:
if i <=newday:
#计算均值
#avgLtv = (new_df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / new_df2[cat][0:newday + 1 - i].sum()).round(2)
#12.20号计算LTV均值的时候分母包括当天未充值新增设备数比剔除掉的计算值偏小
avgLtv = (df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / df2[cat][0:newday + 1 - i].sum()).round(2)
#取出均值
new_avgLtv=str(avgLtv).split('\n')[0].split(' ')
new_avgLtv01=new_avgLtv[len(new_avgLtv)-1]
if new_avgLtv01 == 'NaN':
_listData[f'sumpay_{i}'] = '-'
else:
_listData[f'sumpay_{i}'] = new_avgLtv01
#原代码
# avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2)
# new_avgLtv=str(avgLtv).split('\n')[0].split(' ')
# new_avgLtv01=new_avgLtv[len(new_avgLtv)-1]
# if new_avgLtv01 == 'NaN':
# _listData[f'sumpay_{i}'] = '-'
# else:
# _listData[f'sumpay_{i}'] = new_avgLtv01
else:
_listData[f'sumpay_{i}']='-'
avgLtvlist = pd.Series(_listData)
_listname=[]
#计算总累计LTV最后一个值
for k, v in _listData.items():
if v != 0 or v!= '-':
# if v !=0:
_listname.append(k)
max_nmu=max(_listname)
#max_num = (new_df2[[max_nmu]].sum() / new_df2[cat].sum()).round(2)
max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2)
max_number=str(max_num[0])
df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist]
#原代码
#df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist]
# avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2)
#df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv]
df1.insert(2, '累计LTV', 0)
last_ltv = []
for items in df1.values:
for item in items[::-1]:
if item != '-':
last_ltv.append(item)
break
#修改累计LTV中最后一个值
last_ltv[-1]=max_number
df1['累计LTV'] = last_ltv
#把列中累计LTV等于0的值改成'-'
#df1.loc[df1['累计LTV']==0, '累计LTV'] = '-'
#剔除行列的累计LTV=='-'的剔除出去
df3 = df1.drop(df1[(df1.LTV1 == '-')].index)
#df3 = df1.drop(df1[(df1.累计LTV=='-')].index)
days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days
df1.iloc[len(df1) - 1, days + 4:] = '-'
data = {
#'title': df1.columns.tolist(),
#'rows': df1.values.tolist(),
'title': df3.columns.tolist(),
'rows': df3.values.tolist(),
'start_date': res['start_date'],
'end_date': res['end_date']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/ltv_model_export")
async def ltv_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: XAnalysis = Depends(XAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" ltv分析 数据导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.ltv_model_sql()
file_name = quote(f'lvt.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='查无数据')
df_to_stream = DfToStream((df, 'ltv'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})