835 lines
35 KiB
Python
835 lines
35 KiB
Python
import re
|
||
from typing import Tuple
|
||
|
||
import arrow
|
||
import sqlalchemy as sa
|
||
import json
|
||
|
||
from fastapi import Depends
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
from sqlalchemy import func, or_, and_, not_
|
||
|
||
import crud
|
||
import schemas
|
||
from core.config import settings
|
||
from db import get_database
|
||
|
||
from db.redisdb import get_redis_pool, RedisDrive
|
||
from models.user_label import UserClusterDef
|
||
|
||
|
||
class CombinationEvent:
|
||
def __init__(self, data, string, format):
|
||
self.data = data
|
||
self.string = string
|
||
self.pattern = re.compile('[+\-*/]')
|
||
self.format = format
|
||
self.events_name = []
|
||
|
||
def parse(self):
|
||
opts = self.pattern.findall(self.string)
|
||
factors = self.pattern.split(self.string)
|
||
result = pd.Series(self.data[int(factors[0])]['values'][0])
|
||
for i, opt in enumerate(opts):
|
||
b = pd.Series(self.data[int(factors[i + 1])]['values'][0])
|
||
result = settings.ARITHMETIC[opt](result, b).fillna(0)
|
||
if self.format == 'percent':
|
||
result = round(result * 100, 2)
|
||
elif self.format == 'float':
|
||
result = round(result, 2)
|
||
elif self.format == 'integer':
|
||
result = result.astype(int)
|
||
result.replace(np.inf, 0, inplace=True)
|
||
return result.to_list(), round(result.sum(), 2), round(result.mean(), 2)
|
||
|
||
|
||
class CustomEvent:
|
||
def __init__(self, tbl, string, format):
|
||
self.tbl = tbl
|
||
self.string = string
|
||
self.pattern = re.compile('[+\-*/]')
|
||
self.format = format
|
||
self.events_name = []
|
||
|
||
def _parse(self, s):
|
||
m = s.split('.')
|
||
if len(m) == 3:
|
||
event_name, attr, comp = m
|
||
self.events_name.append(event_name)
|
||
return getattr(func, comp)(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name,
|
||
getattr(self.tbl.c, attr), 0))
|
||
elif len(m) == 2:
|
||
event_name, comp = m
|
||
self.events_name.append(event_name)
|
||
# 总次数
|
||
if comp == 'total_count':
|
||
return func.sum(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, 1, 0))
|
||
elif comp == 'touch_user_count':
|
||
return func.uniqCombined(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name,
|
||
getattr(self.tbl.c, '#account_id'), None))
|
||
elif comp == 'touch_user_avg':
|
||
return func.divide(
|
||
func.sum(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, 1, 0)),
|
||
func.uniqCombined(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name,
|
||
getattr(self.tbl.c, '#account_id'), None)))
|
||
elif len(m) == 1:
|
||
n = int(m[0])
|
||
return n
|
||
|
||
def str2obj(self, factors, opts):
|
||
sel = None
|
||
for i, factor in enumerate(factors):
|
||
if i == 0:
|
||
sel = self._parse(factor)
|
||
else:
|
||
tmp = self._parse(factor)
|
||
sel = settings.ARITHMETIC[opts[i - 1]](sel, tmp)
|
||
return sel
|
||
|
||
def parse(self):
|
||
factors = self.pattern.split(self.string)
|
||
opts = self.pattern.findall(self.string)
|
||
sel = self.str2obj(factors, opts)
|
||
decimal = 2
|
||
if self.format == 'percent':
|
||
sel = sel * 100
|
||
elif format == 'integer':
|
||
decimal = 0
|
||
elif format == 'float':
|
||
decimal = 2
|
||
sel = func.round(sel, decimal).label('values')
|
||
res = {
|
||
'event_name': self.events_name,
|
||
'select': sel
|
||
}
|
||
return res
|
||
|
||
|
||
class BehaviorAnalysis:
|
||
def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)):
|
||
self.game = game
|
||
self.rdb = rdb
|
||
self.user_tbl = None
|
||
self.event_tbl = None
|
||
self.data_in = data_in
|
||
self.event_view = dict()
|
||
self.events = [dict()]
|
||
|
||
self.zone_time: int = 0
|
||
self.start_date = None
|
||
self.end_date = None
|
||
self.global_filters = None
|
||
self.groupby = None
|
||
self.time_particle = None
|
||
self.date_range = None
|
||
self.unit_num = None
|
||
self.report_name = None
|
||
self.combination_event = []
|
||
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
|
||
self.global_relation = 'and'
|
||
self.data_where = []
|
||
|
||
async def init(self, *args, **kwargs):
|
||
|
||
if self.data_in.report_id:
|
||
db = get_database()
|
||
report = await crud.report.get(db, id=self.data_in.report_id)
|
||
self.event_view = report['query']['eventView']
|
||
self.events = report['query']['events']
|
||
if self.event_view.get('date_type') == 'static':
|
||
pass
|
||
else:
|
||
try:
|
||
e_days = self.event_view['e_days']
|
||
s_days = self.event_view['s_days']
|
||
except:
|
||
# 兼容以前的
|
||
e_days, s_days = self.event_view['recentDay'].split('-')
|
||
|
||
self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
|
||
self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
|
||
|
||
self.event_view['startTime'] = self.data_in.ext_filter.get('startTime') or self.event_view['startTime']
|
||
self.event_view['endTime'] = self.data_in.ext_filter.get('endTime') or self.event_view['endTime']
|
||
|
||
self.report_name = report["name"]
|
||
|
||
|
||
else:
|
||
self.event_view = self.data_in.eventView
|
||
self.events = self.data_in.events
|
||
|
||
await self._init_table()
|
||
self.zone_time = self._get_zone_time()
|
||
self.time_particle = self._get_time_particle_size()
|
||
self.start_date, self.end_date, self.date_range = self._get_date_range()
|
||
self.global_filters = self._get_global_filters()
|
||
self.groupby = self._get_group_by()
|
||
self.unit_num = self._get_unit_num()
|
||
self.global_relation = self.event_view.get('relation', 'and')
|
||
|
||
# 用户自带过滤
|
||
if 'data_where' in kwargs:
|
||
self.data_where = kwargs['data_where'].get(self.game, [])
|
||
self.global_filters.extend(self.data_where)
|
||
# self.global_filters.extend(self.data_in.ext_filter.get('filts', []))
|
||
|
||
def _get_time_particle_size(self):
|
||
return self.event_view.get('timeParticleSize') or 'P1D'
|
||
|
||
def _get_unit_num(self):
|
||
return self.event_view.get('unitNum')
|
||
|
||
def _get_group_by(self):
|
||
|
||
return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])]
|
||
|
||
def _get_zone_time(self):
|
||
return int(self.event_view.get('zone_time', 8))
|
||
|
||
def _get_date_range(self) -> Tuple[str, str, list]:
|
||
start_date: str = self.event_view.get('startTime')
|
||
end_date: str = self.event_view.get('endTime')
|
||
if self.time_particle == 'HOUR':
|
||
date_range = [i for i in range(24)]
|
||
return start_date, end_date, date_range
|
||
|
||
date_range = pd.date_range(start_date, end_date, freq=settings.PROPHET_TIME_GRAIN_MAP[self.time_particle],
|
||
tz='UTC').tolist()
|
||
if self.time_particle in ('P1D', 'P1W', 'P1M'):
|
||
date_range = [item.date() for item in date_range]
|
||
# start_date = date_range[0].strftime('%Y-%m-%d')
|
||
# end_date = date_range[-1].strftime('%Y-%m-%d')
|
||
|
||
return start_date, end_date, date_range
|
||
|
||
def _get_global_filters(self):
|
||
return self.event_view.get('filts') or []
|
||
|
||
async def _init_table(self):
|
||
"""
|
||
从redis中取出表字段,构建表结构
|
||
:return:
|
||
"""
|
||
res_json = await self.rdb.get(f'{self.game}_user')
|
||
columns = json.loads(res_json).keys()
|
||
metadata = sa.MetaData(schema=self.game)
|
||
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
|
||
|
||
res_json = await self.rdb.get(f'{self.game}_event')
|
||
columns = json.loads(res_json).keys()
|
||
metadata = sa.MetaData(schema=self.game)
|
||
# self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns])
|
||
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
|
||
|
||
async def handler_filts(self, *filters):
|
||
"""
|
||
|
||
:param filters: (filts:list,relation:str)
|
||
:param g_f:
|
||
:param relation:
|
||
:return:
|
||
"""
|
||
|
||
user_filters = []
|
||
event_filters = []
|
||
for filter in filters:
|
||
filts = filter[0]
|
||
relation = filter[1]
|
||
user_filter = []
|
||
event_filter = []
|
||
for item in filts:
|
||
comparator = item['comparator']
|
||
if item['tableType'] == 'user':
|
||
where = user_filter
|
||
elif item['tableType'] == 'event':
|
||
where = event_filter
|
||
elif item['tableType'] == 'user_label':
|
||
user_cluster_def = UserClusterDef(self.game, item['columnName'], self.data_where)
|
||
await user_cluster_def.init()
|
||
sub_qry = user_cluster_def.to_sql_qry()
|
||
if comparator == 'in':
|
||
event_filter.append(sa.Column('#account_id').in_(sub_qry))
|
||
else:
|
||
event_filter.append(sa.Column('#account_id').notin_(sub_qry))
|
||
|
||
continue
|
||
else:
|
||
continue
|
||
|
||
tbl = getattr(self, f'{item["tableType"]}_tbl')
|
||
col = getattr(tbl.c, item['columnName'])
|
||
# 日期类型处理时区
|
||
if item.get('data_type') == 'datetime':
|
||
col = func.addHours(col, self.zone_time)
|
||
|
||
ftv = item['ftv']
|
||
if comparator == '==':
|
||
if len(ftv) > 1:
|
||
where.append(or_(*[col == v for v in ftv]))
|
||
else:
|
||
where.append(col == ftv[0])
|
||
elif comparator == '>=':
|
||
where.append(col >= ftv[0])
|
||
elif comparator == '<=':
|
||
where.append(col <= ftv[0])
|
||
elif comparator == '>':
|
||
where.append(col > ftv[0])
|
||
elif comparator == '<':
|
||
where.append(col < ftv[0])
|
||
|
||
elif comparator == 'is not null':
|
||
where.append(col.isnot(None))
|
||
elif comparator == 'is null':
|
||
where.append(col.is_(None))
|
||
|
||
elif comparator == 'like':
|
||
where.append(col.like(f'%{ftv[0]}%'))
|
||
|
||
elif comparator == 'not like':
|
||
where.append(col.notlike(f'%{ftv[0]}%'))
|
||
|
||
elif comparator == 'in':
|
||
where.append(col.in_(ftv))
|
||
|
||
elif comparator == '!=':
|
||
where.append(col != ftv[0])
|
||
if relation == 'and':
|
||
if event_filter:
|
||
event_filters.append(and_(*event_filter))
|
||
if user_filter:
|
||
user_filters.append(and_(*user_filter)),
|
||
else:
|
||
if event_filter:
|
||
event_filters.append(or_(*event_filter))
|
||
if user_filter:
|
||
user_filters.append(or_(*user_filter))
|
||
|
||
return event_filters, user_filters
|
||
|
||
async def retention_model_sql(self):
|
||
event_name_a = self.events[0]['eventName']
|
||
event_name_b = self.events[1]['eventName']
|
||
visit_name = self.events[0].get('event_attr_id')
|
||
event_time_col = getattr(self.event_tbl.c, '#event_time')
|
||
event_name_col = getattr(self.event_tbl.c, '#event_name')
|
||
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
|
||
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
|
||
date_col = sa.Column('date')
|
||
who_visit = e_account_id_col
|
||
if visit_name:
|
||
who_visit = getattr(self.event_tbl.c, visit_name)
|
||
|
||
filters, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')),
|
||
self.ext_filters)
|
||
filters = filters or [1]
|
||
selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'),
|
||
*self.groupby,
|
||
func.arrayDistinct(
|
||
(func.groupArray(
|
||
func.if_(func.and_(event_name_col == event_name_a, *filters), who_visit, None)))).label(
|
||
'val_a'),
|
||
|
||
func.length(sa.Column('val_a')).label('amount_a'),
|
||
func.length(sa.Column('val_b')).label('amount_b'),
|
||
]
|
||
|
||
if event_name_b == '*':
|
||
val_b = func.arrayDistinct(
|
||
(func.groupArray(func.if_(1, who_visit, None)))).label('val_b'),
|
||
selectd.insert(-2, *val_b)
|
||
else:
|
||
val_b = func.arrayDistinct(
|
||
(func.groupArray(func.if_(event_name_col == event_name_b, who_visit, None)))).label('val_b'),
|
||
selectd.insert(-2, *val_b)
|
||
|
||
base_where = [
|
||
func.addHours(event_time_col, self.zone_time) >= self.start_date,
|
||
func.addHours(event_time_col, self.zone_time) <= self.end_date,
|
||
]
|
||
|
||
event_filter, user_filter = await self.handler_filts(
|
||
(self.global_filters, self.global_relation),
|
||
self.ext_filters
|
||
)
|
||
|
||
groupby = [date_col] + self.groupby
|
||
oredrby = [date_col]
|
||
if user_filter:
|
||
qry = sa.select(selectd).select_from(
|
||
self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where(
|
||
and_(*user_filter, *event_filter, *base_where)).group_by(*groupby).order_by(
|
||
*oredrby).limit(10000)
|
||
else:
|
||
qry = sa.select(selectd).where(and_(*base_where, *event_filter)).group_by(*groupby).order_by(
|
||
*oredrby).limit(10000)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
print(sql)
|
||
return {'sql': sql,
|
||
'groupby': ['date'] + [i.key for i in self.groupby],
|
||
'date_range': self.date_range,
|
||
'event_name': [event_name_a, event_name_b],
|
||
'unit_num': self.unit_num,
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
}
|
||
|
||
async def event_model_sql(self):
|
||
sqls = []
|
||
event_time_col = getattr(self.event_tbl.c, '#event_time')
|
||
|
||
for event in self.events:
|
||
event_name_display = event.get('eventNameDisplay')
|
||
is_show = event.get('is_show', True)
|
||
|
||
select_exprs = []
|
||
if self.time_particle != 'total':
|
||
select_exprs.append(
|
||
settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time))
|
||
|
||
base_where = [
|
||
func.addHours(event_time_col, self.zone_time) >= self.start_date,
|
||
func.addHours(event_time_col, self.zone_time) <= self.end_date,
|
||
]
|
||
event_name_col = getattr(self.event_tbl.c, '#event_name')
|
||
format = event.get('format') or 'float'
|
||
|
||
# 兼容以前的结构
|
||
if event.get('customEvent'):
|
||
event['customType'] = event.get('customType') or 'formula'
|
||
|
||
if event.get('customType') == 'formula':
|
||
if event.get('customEvent'):
|
||
formula = event.get('customEvent')
|
||
custom = CustomEvent(self.event_tbl, formula, format).parse()
|
||
event_name = custom['event_name']
|
||
where = [event_name_col.in_(event_name)]
|
||
event_filter, _ = await self.handler_filts((event['filts'], event.get('relation')),
|
||
(self.global_filters, self.global_relation),
|
||
self.ext_filters
|
||
)
|
||
select_exprs.extend(self.groupby)
|
||
qry = sa.select(
|
||
*select_exprs,
|
||
custom['select']
|
||
).where(*base_where, *where, *event_filter)
|
||
|
||
# 指标组合计算
|
||
elif event.get('customType') == 'combination':
|
||
sqls.append({'combination_event': event.get('customEvent'),
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
'event_name': event.get('eventNameDisplay'),
|
||
'format': event.get('format') or 'float',
|
||
'date_range': self.date_range,
|
||
'is_show': is_show,
|
||
}
|
||
)
|
||
continue
|
||
else:
|
||
event_name = event['event_name']
|
||
|
||
select_exprs += self.groupby
|
||
if event_name != '*':
|
||
base_where.append(event_name_col == event_name)
|
||
|
||
analysis = event['analysis']
|
||
event_filter, user_filter = await self.handler_filts(
|
||
(event['filts'], event.get('relation', 'and')),
|
||
(self.global_filters, self.global_relation)
|
||
, self.ext_filters
|
||
)
|
||
|
||
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
|
||
# 按账号聚合
|
||
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
|
||
|
||
# 聚合方式
|
||
if analysis == 'total_count':
|
||
selectd = select_exprs + [func.count().label('values')]
|
||
elif analysis == 'touch_user_count':
|
||
selectd = select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')]
|
||
elif analysis == 'touch_user_avg':
|
||
selectd = select_exprs + [
|
||
func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label(
|
||
'values')]
|
||
else:
|
||
selectd = select_exprs + [
|
||
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label(
|
||
'values')]
|
||
|
||
if user_filter:
|
||
qry = sa.select(selectd).select_from(
|
||
self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where(
|
||
and_(*user_filter, *event_filter, *base_where))
|
||
|
||
else:
|
||
qry = sa.select(selectd).where(and_(*event_filter, *base_where))
|
||
|
||
qry = qry.group_by(*select_exprs)
|
||
if self.time_particle != 'total':
|
||
qry = qry.order_by(sa.Column('date'))
|
||
else:
|
||
qry = qry.order_by(sa.Column('values').desc())
|
||
qry = qry.limit(10000)
|
||
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
print(sql)
|
||
sqls.append({'sql': sql,
|
||
'groupby': [i.key for i in self.groupby],
|
||
'date_range': self.date_range,
|
||
'event_name': event_name_display or event_name,
|
||
'format': format,
|
||
'report_name': self.report_name or 'temp',
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
'is_show': is_show,
|
||
})
|
||
|
||
return sqls
|
||
|
||
async def funnel_model_sql(self):
|
||
"""
|
||
SELECT level, count(*) AS values
|
||
FROM (SELECT windowFunnel(86400)(shjy.event."#event_time", shjy.event."#event_name" = 'create_role',
|
||
shjy.event."#event_name" = 'login') AS level
|
||
FROM shjy.event
|
||
WHERE addHours(shjy.event."#event_time", 8) >= '2021-05-16 00:00:00'
|
||
AND addHours(shjy.event."#event_time", 8) <= '2021-06-14 23:59:59'
|
||
GROUP BY shjy.event."#account_id") AS anon_1
|
||
GROUP BY level
|
||
ORDER BY level
|
||
:return:
|
||
"""
|
||
|
||
windows_gap = self.event_view['windows_gap'] * 86400
|
||
event_time_col = getattr(self.event_tbl.c, '#event_time')
|
||
event_name_col = getattr(self.event_tbl.c, '#event_name')
|
||
date_col = func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date')
|
||
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
|
||
|
||
sub_group = [date_col, *self.groupby, e_account_id_col]
|
||
conds = []
|
||
cond_level = []
|
||
for item in self.events:
|
||
event_filter, _ = await self.handler_filts((item['filts'], item.get('relation', 'and'))
|
||
, self.ext_filters)
|
||
conds.append(
|
||
and_(event_name_col == item['eventName'], *event_filter)
|
||
)
|
||
cond_level.append(item['eventName'])
|
||
# todo 替换 _windows_gap_
|
||
subq = sa.select(*[sa.Column(i.key) for i in self.groupby], date_col,
|
||
func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from(
|
||
self.event_tbl)
|
||
|
||
g_event_filter, _ = await self.handler_filts((self.global_filters, self.global_relation)
|
||
, self.ext_filters)
|
||
where = [
|
||
func.addHours(event_time_col, self.zone_time) >= self.start_date,
|
||
func.addHours(event_time_col, self.zone_time) <= self.end_date,
|
||
*g_event_filter
|
||
]
|
||
subq = subq.where(and_(*where)).group_by(*sub_group)
|
||
subq = subq.subquery()
|
||
|
||
qry = sa.select(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level'),
|
||
func.count().label('values')).select_from(subq) \
|
||
.where(sa.Column('level') > 0) \
|
||
.group_by(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level')) \
|
||
.order_by(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level'))
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
# sql = sql.replace('_windows_gap_', f"({windows_gap},'strict_increase')")
|
||
sql = sql.replace('_windows_gap_', f"({windows_gap})")
|
||
print(sql)
|
||
return {'sql': sql,
|
||
'groupby': [i.key for i in self.groupby],
|
||
'date_range': self.date_range,
|
||
'cond_level': cond_level,
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
}
|
||
|
||
async def scatter_model_sql(self):
|
||
event = self.events[0]
|
||
event_name = event['eventName']
|
||
analysis = event['analysis']
|
||
e_account_id_col = getattr(self.event_tbl.c, '#account_id').label('uid')
|
||
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
|
||
event_name_col = getattr(self.event_tbl.c, '#event_name')
|
||
event_time_col = getattr(self.event_tbl.c, '#event_time').label('date')
|
||
event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)
|
||
|
||
quota_interval_arr = event.get('quotaIntervalArr')
|
||
|
||
where = [
|
||
# event_date_col >= self.start_date,
|
||
# event_date_col <= self.end_date,
|
||
func.addHours(event_time_col, self.zone_time) >= self.start_date,
|
||
func.addHours(event_time_col, self.zone_time) <= self.end_date,
|
||
|
||
]
|
||
if event_name != '*':
|
||
where.append(event_name_col == event_name)
|
||
event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')),
|
||
(self.global_filters, self.global_relation)
|
||
, self.ext_filters)
|
||
if user_filter:
|
||
where.append(e_account_id_col.in_(sa.select(u_account_id_col).where(*user_filter)))
|
||
where.extend(event_filter)
|
||
values_col = func.count().label('values')
|
||
if analysis in ['number_of_days', 'number_of_hours']:
|
||
values_col = func.count(func.distinct(e_account_id_col)).label('values')
|
||
|
||
if analysis in ['times', 'number_of_days', 'number_of_hours']:
|
||
if self.time_particle == 'total':
|
||
qry = sa.select(*self.groupby, values_col) \
|
||
.where(and_(*where)) \
|
||
.group_by(*self.groupby, e_account_id_col)
|
||
else:
|
||
qry = sa.select(event_date_col, *self.groupby, values_col) \
|
||
.where(and_(*where)) \
|
||
.group_by(event_date_col, *self.groupby, e_account_id_col)
|
||
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
print(sql)
|
||
return {
|
||
'sql': sql,
|
||
'interval_type': event['intervalType'],
|
||
'analysis': analysis,
|
||
'quota_interval_arr': quota_interval_arr,
|
||
'groupby': [i.key for i in self.groupby],
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
}
|
||
elif event.get('quota'):
|
||
event_attr_col = getattr(self.event_tbl.c, event['quota'])
|
||
if self.time_particle == 'total':
|
||
qry = sa.select(e_account_id_col,
|
||
settings.CK_FUNC[analysis](event_attr_col).label('values')) \
|
||
.where(and_(*where)) \
|
||
.group_by(*self.groupby, e_account_id_col)
|
||
else:
|
||
qry = sa.select(event_date_col, e_account_id_col,
|
||
settings.CK_FUNC[analysis](event_attr_col).label('values')) \
|
||
.where(and_(*where)) \
|
||
.group_by(event_date_col, *self.groupby, e_account_id_col)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
print(sql)
|
||
return {
|
||
'sql': sql,
|
||
'interval_type': event['intervalType'],
|
||
'analysis': analysis,
|
||
'quota_interval_arr': quota_interval_arr,
|
||
'groupby': [i.key for i in self.groupby],
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
}
|
||
|
||
def trace_model_sql(self):
|
||
session_interval = self.event_view.get('session_interval')
|
||
session_type = self.event_view.get('session_type')
|
||
session_type_map = {
|
||
'minute': 60,
|
||
'second': 1,
|
||
'hour': 3600
|
||
|
||
}
|
||
interval_ts = session_interval * session_type_map.get(session_type, 60)
|
||
event_names = self.events.get('event_names')
|
||
source_event = self.events.get('source_event', {}).get('eventName')
|
||
source_type = self.events.get('source_event', {}).get('source_type')
|
||
|
||
sql_a = f"""with
|
||
'{source_event}' as start_event,
|
||
{tuple(event_names)} as evnet_all,
|
||
'{self.start_date}' as start_data,
|
||
'{self.end_date}' as end_data
|
||
select event_chain,
|
||
count() as values
|
||
from (with
|
||
toUInt32(minIf(`#event_time`, `#event_name` = start_event)) AS start_event_ts,
|
||
arraySort(
|
||
x ->
|
||
x.1,
|
||
arrayFilter(
|
||
x -> x.1 >= start_event_ts,
|
||
groupArray((toUInt32(`#event_time`), `#event_name`))
|
||
)
|
||
) AS sorted_events,
|
||
arrayEnumerate(sorted_events) AS event_idxs,
|
||
arrayFilter(
|
||
(x, y, z) -> z.1 >= start_event_ts and ((z.2 = start_event and y > {interval_ts}) or y > {interval_ts}) ,
|
||
event_idxs,
|
||
arrayDifference(sorted_events.1),
|
||
sorted_events
|
||
) AS gap_idxs,
|
||
arrayMap(x -> x, gap_idxs) AS gap_idxs_,
|
||
arrayMap(x -> if(has(gap_idxs_, x), 1, 0), event_idxs) AS gap_masks,
|
||
arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events
|
||
select `#account_id`,
|
||
arrayJoin(split_events) AS event_chain_,
|
||
arrayMap(x ->
|
||
x.2, event_chain_) AS event_chain,
|
||
has(event_chain, start_event) AS has_midway_hit
|
||
|
||
from (select `#event_time`, `#event_name`, `#account_id`
|
||
from {self.game}.event
|
||
where addHours(`#event_time`, {self.zone_time}) >= start_data
|
||
and addHours(`#event_time`, {self.zone_time}) <= end_data
|
||
and `#event_name` in evnet_all)
|
||
group by `#account_id`
|
||
HAVING has_midway_hit = 1
|
||
)
|
||
where arrayElement(event_chain, 1) = start_event
|
||
GROUP BY event_chain
|
||
ORDER BY values desc
|
||
"""
|
||
sql_b = f"""with
|
||
'{source_event}' as end_event,
|
||
{tuple(event_names)} as evnet_all,
|
||
'{self.start_date}' as start_data,
|
||
'{self.end_date}' as end_data
|
||
select event_chain,
|
||
count() as values
|
||
from (with
|
||
toUInt32(maxIf(`#event_time`, `#event_name` = end_event)) AS end_event_ts,
|
||
arraySort(
|
||
x ->
|
||
x.1,
|
||
arrayFilter(
|
||
x -> x.1 <= end_event_ts,
|
||
groupArray((toUInt32(`#event_time`), `#event_name`))
|
||
)
|
||
) AS sorted_events,
|
||
arrayEnumerate(sorted_events) AS event_idxs,
|
||
arrayFilter(
|
||
(x, y, z) -> z.1 <= end_event_ts and (z.2 = end_event and y>{interval_ts}) OR y > {interval_ts},
|
||
event_idxs,
|
||
arrayDifference(sorted_events.1),
|
||
sorted_events
|
||
) AS gap_idxs,
|
||
arrayMap(x -> x+1, gap_idxs) AS gap_idxs_,
|
||
arrayMap(x -> if(has(gap_idxs_, x), 1,0), event_idxs) AS gap_masks,
|
||
arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events
|
||
select `#account_id`,
|
||
arrayJoin(split_events) AS event_chain_,
|
||
arrayMap(x ->
|
||
x.2, event_chain_) AS event_chain,
|
||
has(event_chain, end_event) AS has_midway_hit
|
||
from (select `#event_time`, `#event_name`, `#account_id`
|
||
from {self.game}.event
|
||
where addHours(`#event_time`, {self.zone_time}) >= start_data
|
||
and addHours(`#event_time`, {self.zone_time}) <= end_data
|
||
and `#event_name` in evnet_all)
|
||
group by `#account_id`
|
||
HAVING has_midway_hit = 1
|
||
)
|
||
where arrayElement(event_chain, -1) = end_event
|
||
GROUP BY event_chain
|
||
ORDER BY values desc"""
|
||
|
||
sql = sql_a if source_type == 'initial_event' else sql_b
|
||
print(sql)
|
||
return {
|
||
'sql': sql,
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
}
|
||
|
||
async def retention_model_sql2(self):
|
||
filter_item_type = self.event_view.get('filter_item_type')
|
||
filter_item = self.event_view.get('filter_item')
|
||
event_name_a = self.events[0]['eventName']
|
||
event_name_b = self.events[1]['eventName']
|
||
|
||
visit_name = self.events[0].get('event_attr_id')
|
||
|
||
where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')),
|
||
(self.global_filters, self.global_relation)
|
||
, self.ext_filters)
|
||
where_a = '1'
|
||
if where:
|
||
qry = sa.select().where(*where)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
where_a = sql.split('WHERE ')[1]
|
||
|
||
where, _ = await self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')),
|
||
(self.global_filters, self.global_relation)
|
||
, self.ext_filters)
|
||
where_b = '1'
|
||
if where:
|
||
qry = sa.select().where(*where)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
where_b = sql.split('WHERE ')[1]
|
||
|
||
# 任意事件
|
||
event_name_b = 1 if event_name_b == '*' else f"`#event_name` = '{event_name_b}'"
|
||
|
||
days = (arrow.get(self.end_date).date() - arrow.get(self.start_date).date()).days
|
||
keep = []
|
||
cnt = []
|
||
retention_n = [*[k for k in range(1, 60)], 70-1, 75-1, 80-1, 85-1, 90-1, 95-1, 100-1, 110-1, 120-1, 150-1, 180-1, 210-1, 240-1, 270-1, 300-1,
|
||
360-1]
|
||
|
||
"""
|
||
cnt0-cnt1 as on1,
|
||
round(on1 * 100 / cnt0, 2) as `0p1`,
|
||
"""
|
||
|
||
for i in retention_n:
|
||
keep.append(
|
||
f"""cnt{i},
|
||
round(cnt{i} * 100 / cnt0, 2) as `p{i}`,
|
||
cnt0-cnt{i} as on{i},
|
||
round(on{i} * 100 / cnt0, 2) as `op{i}`
|
||
""")
|
||
cnt.append(f"""sum(if(dateDiff('day',a.reg_date,b.visit_date)={i},1,0)) as cnt{i}""")
|
||
keep_str = ','.join(keep)
|
||
cnt_str = ','.join(cnt)
|
||
|
||
sql = f"""
|
||
with '{event_name_a}' as start_event,
|
||
{event_name_b} as retuen_visit,
|
||
`{visit_name}` as visit,
|
||
'{self.start_date}' as start_data,
|
||
'{self.end_date}' as end_data,
|
||
toDate(addHours(`#event_time`, {self.zone_time})) as date
|
||
|
||
select reg_date,
|
||
cnt0 ,
|
||
{keep_str}
|
||
|
||
from(select date, uniqExact(visit) as cnt0 from {self.game}.event
|
||
where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a}
|
||
group by date) reg left join
|
||
(select a.reg_date,
|
||
{cnt_str}
|
||
from (select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit) a
|
||
left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on
|
||
a.visit = b.visit
|
||
group by a.reg_date) log on reg.date=log.reg_date
|
||
"""
|
||
print(sql)
|
||
return {
|
||
'sql': sql,
|
||
'date_range': self.date_range,
|
||
'unit_num': self.unit_num,
|
||
'retention_n': retention_n,
|
||
'filter_item_type': filter_item_type,
|
||
'filter_item': filter_item,
|
||
'time_particle': self.time_particle,
|
||
'start_date': self.start_date[:10],
|
||
'end_date': self.end_date[:10],
|
||
}
|