309 lines
12 KiB
Python
309 lines
12 KiB
Python
from typing import Tuple
|
||
|
||
import arrow
|
||
import sqlalchemy as sa
|
||
import json
|
||
|
||
from fastapi import Depends
|
||
|
||
import pandas as pd
|
||
|
||
from sqlalchemy import func, or_, and_, not_, MetaData
|
||
|
||
import crud
|
||
import schemas
|
||
from core.config import settings
|
||
from db import get_database
|
||
from models.user_label import UserClusterDef
|
||
from db.redisdb import get_redis_pool, RedisDrive
|
||
|
||
|
||
class XAnalysis:
|
||
def __init__(self, data_in: schemas.CkQuery, game: str):
|
||
self.data_in = data_in
|
||
self.game = game
|
||
self.event_view = dict()
|
||
self.events = []
|
||
self.zone_time: int = 0
|
||
self.global_filters = []
|
||
self.account_filters = []
|
||
self.global_relation = 'and'
|
||
self.date_range = []
|
||
|
||
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
|
||
|
||
def _get_global_filters(self):
|
||
_res = self.event_view.get('filts', [])
|
||
if _res:
|
||
for idx, item in enumerate(_res):
|
||
if item['data_type'] == 'user_label':
|
||
_res[idx].update({
|
||
'tableType': item['data_type'],
|
||
})
|
||
else:
|
||
_res[idx].update({
|
||
'tableType': item['table_type'],
|
||
})
|
||
|
||
return _res # 获取event_view字典里面filts的值,或返回空列表
|
||
|
||
async def init(self, *args, **kwargs):
|
||
if self.data_in.report_id:
|
||
db = get_database()
|
||
report = await crud.report.get(db, id=self.data_in.report_id)
|
||
self.event_view = report['query']['eventView']
|
||
self.events = report['query']['events']
|
||
try:
|
||
e_days = self.event_view['e_days']
|
||
s_days = self.event_view['s_days']
|
||
except:
|
||
# 兼容以前的
|
||
e_days, s_days = self.event_view['recentDay'].split('-')
|
||
# self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)+1).strftime('%Y-%m-%d 23:59:59')
|
||
# self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)+1).strftime('%Y-%m-%d 00:00:00')
|
||
self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
|
||
self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
|
||
|
||
else:
|
||
self.event_view = self.data_in.eventView
|
||
self.events = self.data_in.events
|
||
for d in pd.date_range(self.event_view['startTime'], self.event_view['endTime'], freq='D', tz='UTC'):
|
||
self.date_range.append(d.date())
|
||
|
||
self.global_filters = self._get_global_filters()
|
||
self.global_relation = self.event_view.get('relation', 'and')
|
||
|
||
# 用户自带过滤
|
||
if 'data_where' in kwargs:
|
||
self.account_filters = kwargs['data_where'].get(self.game, [])
|
||
|
||
# def handler_filts(self, *filters):
|
||
# """
|
||
# :param filters: (filts:list,relation:str)
|
||
# :param g_f:
|
||
# :param relation:
|
||
# :return:
|
||
# """
|
||
#
|
||
# event_filters = []
|
||
# for filter in filters:
|
||
# filts = filter[0]
|
||
# relation = filter[1]
|
||
# event_filter = []
|
||
# for item in filts:
|
||
#
|
||
# where = event_filter
|
||
#
|
||
# col = sa.Column(item['columnName'])
|
||
#
|
||
# comparator = item['comparator']
|
||
# ftv = item['ftv']
|
||
# if comparator == '==':
|
||
# if len(ftv) > 1:
|
||
# where.append(or_(*[col == v for v in ftv]))
|
||
# else:
|
||
# where.append(col == ftv[0])
|
||
# elif comparator == '>=':
|
||
# where.append(col >= ftv[0])
|
||
# elif comparator == '<=':
|
||
# where.append(col <= ftv[0])
|
||
# elif comparator == '>':
|
||
# where.append(col > ftv[0])
|
||
# elif comparator == '<':
|
||
# where.append(col < ftv[0])
|
||
#
|
||
# elif comparator == 'is not null':
|
||
# where.append(col.isnot(None))
|
||
# elif comparator == 'is null':
|
||
# where.append(col.is_(None))
|
||
#
|
||
# elif comparator == 'like':
|
||
# where.append(col.like(f'%{ftv[0]}%'))
|
||
#
|
||
# elif comparator == 'not like':
|
||
# where.append(col.notlike(f'%{ftv[0]}%'))
|
||
#
|
||
# elif comparator == 'in':
|
||
# where.append(col.in_(ftv))
|
||
#
|
||
# elif comparator == '!=':
|
||
# where.append(col != ftv[0])
|
||
# if relation == 'and':
|
||
# if event_filter:
|
||
# event_filters.append(and_(*event_filter))
|
||
# else:
|
||
# if event_filter:
|
||
# event_filters.append(or_(*event_filter))
|
||
#
|
||
# return event_filters
|
||
|
||
async def handler_filts(self, *filters):
|
||
"""
|
||
|
||
:param filters: (filts:list,relation:str)
|
||
:param g_f:
|
||
:param relation:
|
||
:return:
|
||
"""
|
||
|
||
event_filters = []
|
||
for filter in filters:
|
||
filts = filter[0]
|
||
relation = filter[1]
|
||
user_filter = []
|
||
event_filter = []
|
||
for item in filts:
|
||
comparator = item['comparator']
|
||
if item['tableType'] == 'user':
|
||
where = user_filter
|
||
elif item['tableType'] == 'event':
|
||
where = event_filter
|
||
elif item['tableType'] == 'user_label':
|
||
user_cluster_def = UserClusterDef(self.game, item['columnName'], self.account_filters)
|
||
await user_cluster_def.init()
|
||
sub_qry = user_cluster_def.to_sql_qry()
|
||
if comparator == 'in':
|
||
event_filter.append(sa.Column('#account_id').in_(sub_qry))
|
||
else:
|
||
event_filter.append(sa.Column('#account_id').notin_(sub_qry))
|
||
|
||
continue
|
||
else:
|
||
continue
|
||
|
||
col = sa.Column(item['columnName'])
|
||
|
||
comparator = item['comparator']
|
||
ftv = item['ftv']
|
||
if comparator == '==':
|
||
if len(ftv) > 1:
|
||
where.append(or_(*[col == v for v in ftv]))
|
||
else:
|
||
where.append(col == ftv[0])
|
||
elif comparator == '>=':
|
||
where.append(col >= ftv[0])
|
||
elif comparator == '<=':
|
||
where.append(col <= ftv[0])
|
||
elif comparator == '>':
|
||
where.append(col > ftv[0])
|
||
elif comparator == '<':
|
||
where.append(col < ftv[0])
|
||
|
||
elif comparator == 'is not null':
|
||
where.append(col.isnot(None))
|
||
elif comparator == 'is null':
|
||
where.append(col.is_(None))
|
||
|
||
elif comparator == 'like':
|
||
where.append(col.like(f'%{ftv[0]}%'))
|
||
|
||
elif comparator == 'not like':
|
||
where.append(col.notlike(f'%{ftv[0]}%'))
|
||
|
||
elif comparator == 'in':
|
||
where.append(col.in_(ftv))
|
||
|
||
elif comparator == '!=':
|
||
where.append(col != ftv[0])
|
||
if relation == 'and':
|
||
if event_filter:
|
||
event_filters.append(and_(*event_filter))
|
||
else:
|
||
if event_filter:
|
||
event_filters.append(or_(*event_filter))
|
||
|
||
return event_filters
|
||
|
||
async def ltv_model_sql(self):
|
||
days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days
|
||
quota = self.event_view['quota']
|
||
select_ltv = []
|
||
sumpay = []
|
||
sum_money = []
|
||
# for i in range(1, days + 2):
|
||
ltv_n = [*[k for k in range(1, 61)], 70, 75, 80, 85, 90, 95, 100, 110, 120, 150, 180, 210, 240, 270, 300, 360]
|
||
for i in ltv_n:
|
||
# select_ltv.append(func.round(sa.Column(f'sumpay_{i}') / sa.Column('cnt1'), 2).label(f'LTV{i}'))
|
||
select_ltv.append(
|
||
f"if(dateDiff('day', reg.date, now())<{i - 1}, '-',toString(round(sumpay_{i} / cnt1, 2))) AS LTV{i}")
|
||
sumpay.append(f"sum(if(dateDiff('day', a.date, b.date) < {i}, money, 0)) as sumpay_{i}")
|
||
sum_money.append(f"sumpay_{i}")
|
||
# qry = sa.select(*select_ltv)
|
||
# select_ltv_str = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
# select_ltv_str = select_ltv_str.split('SELECT ')[1]
|
||
sumpay_str = ','.join(sumpay)
|
||
select_ltv_str = ','.join(select_ltv)
|
||
sum_money_str = ','.join(sum_money)
|
||
|
||
where = [
|
||
sa.Column('date') >= self.event_view['startTime'].split(' ')[0],
|
||
sa.Column('date') <= self.event_view['endTime'].split(' ')[0]
|
||
]
|
||
if quota == '#distinct_id':
|
||
where.append(sa.Column('is_new_device') == 1)
|
||
|
||
qry = sa.select().where(*where)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
where_str = sql.split('WHERE ')[1]
|
||
|
||
where_order = await self.handler_filts((self.global_filters, self.global_relation)) # global_relation就是 and
|
||
where_order_str = 1
|
||
if where_order:
|
||
qry = sa.select().where(*where_order)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
where_order_str = 'WHERE '.join(sql.split('WHERE ')[1:])
|
||
|
||
# where_account = await self.handler_filts((self.account_filters, 'and'), self.ext_filters)
|
||
where_account = where_order
|
||
where_account_str = 1
|
||
if where_account:
|
||
qry = sa.select().where(*where_account)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
where_account_str = sql.split('WHERE ')[1]
|
||
if "AND" in where_account_str:
|
||
where_account_str = where_account_str.split('AND')[1]
|
||
else:
|
||
if "orderid" in where_account_str:
|
||
where_account_str = "1=1"
|
||
if self.game == 'huixie' and quota == '#distinct_id':
|
||
event_n='new_device'
|
||
elif self.game == 'yxwd_h5':
|
||
event_n = 'role_create'
|
||
else:
|
||
event_n = 'create_account'
|
||
if 'is_new_device = 1' in where_str:
|
||
timed=where_str.replace('AND is_new_device = 1','',1)
|
||
else:
|
||
timed=where_str
|
||
sql = f"""SELECT reg.date as date,
|
||
cnt1,
|
||
{select_ltv_str},
|
||
{sum_money_str}
|
||
FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`{quota}`) cnt1
|
||
FROM {self.game}.event
|
||
where `#event_name` = '{event_n}'
|
||
AND {where_str} AND {where_account_str}
|
||
GROUP BY toDate(addHours(`#event_time`, `#zone_offset`))) as reg
|
||
left join
|
||
(select a.date,
|
||
{sumpay_str}
|
||
from (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, `{quota}`
|
||
FROM {self.game}.event
|
||
where `#event_name` = '{event_n}'
|
||
AND {where_str} AND {where_account_str} ) as a
|
||
left join (select `{quota}`, unitPrice/100 as money, toDate(addHours(`#event_time`, `#zone_offset`)) as date
|
||
from {self.game}.event
|
||
where `#event_name` = 'pay' and {where_order_str} AND {where_account_str}) b
|
||
on a.`{quota}` = b.`{quota}`
|
||
group by a.date) log on reg.date = log.date
|
||
order by date
|
||
"""
|
||
#{timed} and 计算LTV时,所选时间区间只是为了划分注册人群,截止时间应该按照当前时间执行
|
||
print(sql)
|
||
return {'sql': sql, 'quota': quota,
|
||
'start_date': self.event_view['startTime'][:10],
|
||
'end_date': self.event_view['endTime'][:10],
|
||
'date_range': self.date_range,
|
||
'ltv_n': ltv_n
|
||
}
|