xbackend/models/behavior_analysis.py
2022-05-13 11:10:44 +08:00

895 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import Tuple
import arrow
import sqlalchemy as sa
import json
from fastapi import Depends
import pandas as pd
import numpy as np
from sqlalchemy import func, or_, and_, not_
import crud
import schemas
from core.config import settings
from db import get_database
from db.redisdb import get_redis_pool, RedisDrive
from models.user_label import UserClusterDef
class CombinationEvent:
def __init__(self, data, string, format):
self.data = data
self.string = string
self.pattern = re.compile('[+\-*/]')
self.format = format
self.events_name = []
def parse(self):
opts = self.pattern.findall(self.string)
factors = self.pattern.split(self.string)
result = pd.Series(self.data[int(factors[0])]['values'][0])
for i, opt in enumerate(opts):
b = pd.Series(self.data[int(factors[i + 1])]['values'][0])
result = settings.ARITHMETIC[opt](result, b).fillna(0)
if self.format == 'percent':
result = round(result * 100, 2)
elif self.format == 'float':
result = round(result, 2)
elif self.format == 'integer':
result = result.astype(int)
result.replace(np.inf, 0, inplace=True)
return result.to_list(), round(result.sum(), 2), round(result.mean(), 2)
class CustomEvent:
def __init__(self, tbl, string, format):
self.tbl = tbl
self.string = string
self.pattern = re.compile('[+\-*/]')
self.format = format
self.events_name = []
def _parse(self, s):
m = s.split('.')
if len(m) == 3:
event_name, attr, comp = m
self.events_name.append(event_name)
return getattr(func, comp)(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name,
getattr(self.tbl.c, attr), 0))
elif len(m) == 2:
event_name, comp = m
self.events_name.append(event_name)
# 总次数
if comp == 'total_count':
return func.sum(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, 1, 0))
elif comp == 'touch_user_count':
return func.uniqCombined(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name,
getattr(self.tbl.c, '#account_id'), None))
elif comp == 'touch_user_avg':
return func.divide(
func.sum(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, 1, 0)),
func.uniqCombined(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name,
getattr(self.tbl.c, '#account_id'), None)))
elif len(m) == 1:
n = int(m[0])
return n
def str2obj(self, factors, opts):
sel = None
for i, factor in enumerate(factors):
if i == 0:
sel = self._parse(factor)
else:
tmp = self._parse(factor)
sel = settings.ARITHMETIC[opts[i - 1]](sel, tmp)
return sel
def parse(self):
factors = self.pattern.split(self.string)
opts = self.pattern.findall(self.string)
sel = self.str2obj(factors, opts)
decimal = 2
if self.format == 'percent':
sel = sel * 100
elif format == 'integer':
decimal = 0
elif format == 'float':
decimal = 2
sel = func.round(sel, decimal).label('values')
res = {
'event_name': self.events_name,
'select': sel
}
return res
class BehaviorAnalysis:
def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)):
self.game = game
self.rdb = rdb
self.user_tbl = None
self.event_tbl = None
self.data_in = data_in
self.event_view = dict()
self.events = [dict()]
self.zone_time: int = 0
self.start_date = None
self.end_date = None
self.global_filters = None
self.groupby = None
self.time_particle = None
self.date_range = None
self.unit_num = None
self.report_name = None
self.combination_event = []
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
self.global_relation = 'and'
self.data_where = []
async def init(self, *args, **kwargs):
if self.data_in.report_id:
db = get_database()
report = await crud.report.get(db, id=self.data_in.report_id)
self.event_view = report['query']['eventView']
self.events = report['query']['events']
if self.event_view.get('date_type') == 'static':
pass
else:
try:
e_days = self.event_view['e_days']
s_days = self.event_view['s_days']
except:
# 兼容以前的
e_days, s_days = self.event_view['recentDay'].split('-')
self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
self.event_view['startTime'] = self.data_in.ext_filter.get('startTime') or self.event_view['startTime']
self.event_view['endTime'] = self.data_in.ext_filter.get('endTime') or self.event_view['endTime']
self.report_name = report["name"]
else:
self.event_view = self.data_in.eventView
self.events = self.data_in.events
await self._init_table()
self.zone_time = self._get_zone_time()
self.time_particle = self._get_time_particle_size()
self.start_date, self.end_date, self.date_range = self._get_date_range()
self.global_filters = self._get_global_filters()
self.groupby = self._get_group_by()
self.unit_num = self._get_unit_num()
self.global_relation = self.event_view.get('relation', 'and')
# 用户自带过滤
if 'data_where' in kwargs:
self.data_where = kwargs['data_where'].get(self.game, [])
self.global_filters.extend(self.data_where)
# self.global_filters.extend(self.data_in.ext_filter.get('filts', []))
def _get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'P1D'
def _get_unit_num(self):
return self.event_view.get('unitNum')
def _get_group_by(self):
return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])]
def _get_zone_time(self):
return int(self.event_view.get('zone_time', 8))
def _get_date_range(self) -> Tuple[str, str, list]:
start_date: str = self.event_view.get('startTime')
end_date: str = self.event_view.get('endTime')
if self.time_particle == 'HOUR':
date_range = [i for i in range(24)]
return start_date, end_date, date_range
date_range = pd.date_range(start_date, end_date, freq=settings.PROPHET_TIME_GRAIN_MAP[self.time_particle],
tz='UTC').tolist()
if self.time_particle in ('P1D', 'P1W', 'P1M'):
date_range = [item.date() for item in date_range]
# start_date = date_range[0].strftime('%Y-%m-%d')
# end_date = date_range[-1].strftime('%Y-%m-%d')
return start_date, end_date, date_range
def _get_global_filters(self):
return self.event_view.get('filts') or []
async def _init_table(self):
"""
从redis中取出表字段构建表结构
:return:
"""
res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
res_json = await self.rdb.get(f'{self.game}_event')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
# self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns])
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
async def handler_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
user_filters = []
event_filters = []
for filter in filters:
filts = filter[0]
relation = filter[1]
user_filter = []
event_filter = []
for item in filts:
comparator = item['comparator']
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
elif item['tableType'] == 'user_label':
user_cluster_def = UserClusterDef(self.game, item['columnName'], self.data_where)
await user_cluster_def.init()
sub_qry = user_cluster_def.to_sql_qry()
if comparator == 'in':
event_filter.append(sa.Column('#account_id').in_(sub_qry))
else:
event_filter.append(sa.Column('#account_id').notin_(sub_qry))
continue
else:
continue
tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName'])
# 日期类型处理时区
if item.get('data_type') == 'datetime':
col = func.addHours(col, self.zone_time)
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == 'like':
where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in':
where.append(col.in_(ftv))
elif comparator == '!=':
where.append(col != ftv[0])
if relation == 'and':
if event_filter:
event_filters.append(and_(*event_filter))
if user_filter:
user_filters.append(and_(*user_filter)),
else:
if event_filter:
event_filters.append(or_(*event_filter))
if user_filter:
user_filters.append(or_(*user_filter))
return event_filters, user_filters
async def retention_model_sql(self):
event_name_a = self.events[0]['eventName']
event_name_b = self.events[1]['eventName']
visit_name = self.events[0].get('event_attr_id')
event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name')
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
date_col = sa.Column('date')
who_visit = e_account_id_col
if visit_name:
who_visit = getattr(self.event_tbl.c, visit_name)
filters, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')),
self.ext_filters)
filters = filters or [1]
selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'),
*self.groupby,
func.arrayDistinct(
(func.groupArray(
func.if_(func.and_(event_name_col == event_name_a, *filters), who_visit, None)))).label(
'val_a'),
func.length(sa.Column('val_a')).label('amount_a'),
func.length(sa.Column('val_b')).label('amount_b'),
]
if event_name_b == '*':
val_b = func.arrayDistinct(
(func.groupArray(func.if_(1, who_visit, None)))).label('val_b'),
selectd.insert(-2, *val_b)
else:
val_b = func.arrayDistinct(
(func.groupArray(func.if_(event_name_col == event_name_b, who_visit, None)))).label('val_b'),
selectd.insert(-2, *val_b)
base_where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
]
event_filter, user_filter = await self.handler_filts(
(self.global_filters, self.global_relation),
self.ext_filters
)
groupby = [date_col] + self.groupby
oredrby = [date_col]
if user_filter:
qry = sa.select(selectd).select_from(
self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where(
and_(*user_filter, *event_filter, *base_where)).group_by(*groupby).order_by(
*oredrby).limit(10000)
else:
qry = sa.select(selectd).where(and_(*base_where, *event_filter)).group_by(*groupby).order_by(
*oredrby).limit(10000)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return {'sql': sql,
'groupby': ['date'] + [i.key for i in self.groupby],
'date_range': self.date_range,
'event_name': [event_name_a, event_name_b],
'unit_num': self.unit_num,
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
async def event_model_sql(self):
sqls = []
event_time_col = getattr(self.event_tbl.c, '#event_time')
for event in self.events:
operator_ = event.get('operator_val','')
#排头显示名
event_name_display = event.get('eventNameDisplay')
is_show = event.get('is_show', True)
select_exprs = []
if self.time_particle != 'total':
select_exprs.append(
settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time))
base_where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
]
event_name_col = getattr(self.event_tbl.c, '#event_name')
format = event.get('format') or 'float'
# 兼容以前的结构
if event.get('customEvent'):
event['customType'] = event.get('customType') or 'formula'
if event.get('customType') == 'formula':
if event.get('customEvent'):
#组合公式的内容
formula = event.get('customEvent')
custom = CustomEvent(self.event_tbl, formula, format).parse()
event_name = custom['event_name']
where = [event_name_col.in_(event_name)]
event_filter, _ = await self.handler_filts((event['filts'], event.get('relation')),
(self.global_filters, self.global_relation),
self.ext_filters
)
select_exprs.extend(self.groupby)
qry = sa.select(
*select_exprs,
custom['select']
).where(*base_where, *where, *event_filter)
# 指标组合计算
elif event.get('customType') == 'combination':
sqls.append({'combination_event': event.get('customEvent'),
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
'event_name': event.get('eventNameDisplay'),
'format': event.get('format') or 'float',
'date_range': self.date_range,
'is_show': is_show,
}
)
continue
else:
event_name = event['event_name']
select_exprs += self.groupby
if event_name != '*':
base_where.append(event_name_col == event_name)
analysis = event['analysis']
event_filter, user_filter = await self.handler_filts(
(event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters
)
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
# 按账号聚合
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
if operator_ == '':
# 聚合方式
if analysis == 'total_count':
selectd = select_exprs + [func.count().label('values')]
elif analysis == 'touch_user_count':
selectd = select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')]
elif analysis == 'touch_user_avg':
selectd = select_exprs + [
func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label(
'values')]
else:
selectd = select_exprs + [
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label(
'values')]
else:
operator_val=int(operator_)
operator=event['operator'] #运算符号
if analysis == 'total_count':
selectd = select_exprs + [settings.ARITHMETIC[operator](func.count(),operator_val).label('values')]
elif analysis == 'touch_user_count':
selectd = select_exprs + [settings.ARITHMETIC[operator](func.count(sa.distinct(e_account_id_col)),operator_val).label('values')]
elif analysis == 'touch_user_avg':
selectd = select_exprs + [
settings.ARITHMETIC[operator](func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2),operator_val).label(
'values')]
else:
selectd = select_exprs + [
settings.ARITHMETIC[operator](func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2),operator_val).label(
'values')]
if user_filter:
qry = sa.select(selectd).select_from(
self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where(
and_(*user_filter, *event_filter, *base_where))
else:
qry = sa.select(selectd).where(and_(*event_filter, *base_where))
qry = qry.group_by(*select_exprs)
if self.time_particle != 'total':
qry = qry.order_by(sa.Column('date'))
else:
qry = qry.order_by(sa.Column('values').desc())
qry = qry.limit(10000)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
# 单独付费率的拿出来
if event.get('customEvent') == 'pay.touch_user_count/login.touch_user_count':
stat_date=self.start_date
end_date=self.end_date
game=self.game
sql=f"""
select aa.date as date,round((a/b)*100,2) as values from
(select toDate(addHours({game}.event."#event_time", 8)) AS date,uniqCombined(if({game}.event."#event_name" = 'pay', {game}.event."#account_id", NULL)) as a from {game}.event
WHERE addHours({game}.event."#event_time", 8) >= '{stat_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}'
AND {game}.event."#event_name"='pay' and orderid NOT LIKE '%GM%' GROUP BY toDate(addHours({game}.event."#event_time", 8))) as aa
LEFT join
(SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, round(uniqExact({game}.event."#account_id"), 2) AS b
FROM {game}.event
WHERE addHours({game}.event."#event_time", 8) >= '{stat_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}'
GROUP BY toDate(addHours({game}.event."#event_time", 8))) as bb on aa.date = bb.date ORDER by date
"""
# 单独把新增付费人数(以设备为维度)拿出来
if event.get('event_attr') == '触发用户数' and ['is_new_device', 'orderid']== [i['columnName'] for i in event.get('filts')]:
stat_date=self.start_date
end_date=self.end_date
game=self.game
sql=f"""SELECT toDate(addHours("#event_time", 8)) as date,
round(uniqExact("#distinct_id"), 2) AS values FROM
(SELECT toDate(addHours("#event_time", 8)) as date,"#event_time",`#event_name`,`#distinct_id`,`#account_id` from {game}.event WHERE
addHours("#event_time", 8) >= '{stat_date}' AND addHours("#event_time", 8) <= '{end_date}'
and `#event_name` = 'pay' and orderid NOT LIKE '%GM%') a
inner join
(SELECT toDate(addHours("#event_time", 8)) as date,"#event_time",is_new_device,`#distinct_id`,`#event_name`,`#account_id` from {game}.event WHERE
addHours("#event_time", 8) >= '{stat_date}' AND addHours("#event_time", 8) <= '{end_date}' and
`#event_name` = 'create_account' and is_new_device = 1) b on a.`#distinct_id`= b.`#distinct_id` and a.date = b.date
GROUP BY toDate(addHours("#event_time", 8))"""
sqls.append({'sql': sql,
'groupby': [i.key for i in self.groupby],
'date_range': self.date_range,
'event_name': event_name_display or event_name,
'format': format,
'report_name': self.report_name or 'temp',
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
'is_show': is_show,
})
return sqls
#在漏斗分析,事件分析模型里面都有用到这块
async def funnel_model_sql(self):
"""
SELECT level, count(*) AS values
FROM (SELECT windowFunnel(86400)(shjy.event."#event_time", shjy.event."#event_name" = 'create_role',
shjy.event."#event_name" = 'login') AS level
FROM shjy.event
WHERE addHours(shjy.event."#event_time", 8) >= '2021-05-16 00:00:00'
AND addHours(shjy.event."#event_time", 8) <= '2021-06-14 23:59:59'
GROUP BY shjy.event."#account_id") AS anon_1
GROUP BY level
ORDER BY level
:return:
"""
windows_gap = self.event_view['windows_gap'] * 86400
event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name')
date_col = func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date')
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
sub_group = [date_col, *self.groupby, e_account_id_col]
conds = []
cond_level = []
for item in self.events:
event_filter, _ = await self.handler_filts((item['filts'], item.get('relation', 'and'))
, self.ext_filters)
conds.append(
and_(event_name_col == item['eventName'], *event_filter)
)
cond_level.append(item['eventName'])
# todo 替换 _windows_gap_
subq = sa.select(*[sa.Column(i.key) for i in self.groupby], date_col,
func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from(
self.event_tbl)
g_event_filter, _ = await self.handler_filts((self.global_filters, self.global_relation)
, self.ext_filters)
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
*g_event_filter
]
subq = subq.where(and_(*where)).group_by(*sub_group)
subq = subq.subquery()
qry = sa.select(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level'),
func.count().label('values')).select_from(subq) \
.where(sa.Column('level') > 0) \
.group_by(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level')) \
.order_by(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level'))
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
# sql = sql.replace('_windows_gap_', f"({windows_gap},'strict_increase')")
sql = sql.replace('_windows_gap_', f"({windows_gap})")
print(sql)
return {'sql': sql,
'groupby': [i.key for i in self.groupby],
'date_range': self.date_range,
'cond_level': cond_level,
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
async def scatter_model_sql(self):
event = self.events[0]
event_name = event['eventName']
analysis = event['analysis']
if analysis in ['list_distinct',"set_distinct","ele_distinct"]:
analysis = 'max'
e_account_id_col = getattr(self.event_tbl.c, '#account_id').label('uid')
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
event_name_col = getattr(self.event_tbl.c, '#event_name')
event_time_col = getattr(self.event_tbl.c, '#event_time').label('date')
event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)
quota_interval_arr = event.get('quotaIntervalArr')
where = [
# event_date_col >= self.start_date,
# event_date_col <= self.end_date,
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
]
if event_name != '*':
where.append(event_name_col == event_name)
event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
if user_filter:
where.append(e_account_id_col.in_(sa.select(u_account_id_col).where(*user_filter)))
where.extend(event_filter)
values_col = func.count().label('values')
if analysis in ['number_of_days', 'number_of_hours']:
values_col = func.count(func.distinct(e_account_id_col)).label('values')
if analysis in ['times', 'number_of_days', 'number_of_hours']:
if self.time_particle == 'total':
qry = sa.select(*self.groupby, values_col) \
.where(and_(*where)) \
.group_by(*self.groupby, e_account_id_col)
else:
qry = sa.select(event_date_col, *self.groupby, values_col) \
.where(and_(*where)) \
.group_by(event_date_col, *self.groupby, e_account_id_col)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return {
'sql': sql,
'interval_type': event['intervalType'],
'analysis': analysis,
'quota_interval_arr': quota_interval_arr,
'groupby': [i.key for i in self.groupby],
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
elif event.get('quota'):
event_attr_col = getattr(self.event_tbl.c, event['quota'])
if self.time_particle == 'total':
qry = sa.select(e_account_id_col,
settings.CK_FUNC[analysis](event_attr_col).label('values')) \
.where(and_(*where)) \
.group_by(*self.groupby, e_account_id_col)
else:
# qry = sa.select(event_date_col, e_account_id_col,
# settings.CK_FUNC[analysis](event_attr_col).label('values')) \
# .where(and_(*where)) \
# .group_by(event_date_col, *self.groupby, e_account_id_col)
qry = sa.select(event_date_col, e_account_id_col,
settings.CK_FUNC[analysis](event_attr_col).label('values')) \
.where(and_(*where)) \
.group_by(event_date_col,e_account_id_col)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
columnName = event['label_id']
if columnName != '':
sql = sql.replace('SELECT', f'SELECT {columnName},', 1)
sql += f',{columnName}'
print(sql)
return {
'sql': sql,
'interval_type': event['intervalType'],
'analysis': analysis,
'quota_interval_arr': quota_interval_arr,
'groupby': [i.key for i in self.groupby],
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
async def trace_model_sql(self):
session_interval = self.event_view.get('session_interval')
session_type = self.event_view.get('session_type')
session_type_map = {
'minute': 60,
'second': 1,
'hour': 3600
}
interval_ts = session_interval * session_type_map.get(session_type, 60)
event_names = self.events.get('event_names')
source_event = self.events.get('source_event', {}).get('eventName')
source_type = self.events.get('source_event', {}).get('source_type')
sql_a = f"""with
'{source_event}' as start_event,
{tuple(event_names)} as evnet_all,
'{self.start_date}' as start_data,
'{self.end_date}' as end_data
select event_chain,
count() as values
from (with
toUInt32(minIf(`#event_time`, `#event_name` = start_event)) AS start_event_ts,
arraySort(
x ->
x.1,
arrayFilter(
x -> x.1 >= start_event_ts,
groupArray((toUInt32(`#event_time`), `#event_name`))
)
) AS sorted_events,
arrayEnumerate(sorted_events) AS event_idxs,
arrayFilter(
(x, y, z) -> z.1 >= start_event_ts and ((z.2 = start_event and y > {interval_ts}) or y > {interval_ts}) ,
event_idxs,
arrayDifference(sorted_events.1),
sorted_events
) AS gap_idxs,
arrayMap(x -> x, gap_idxs) AS gap_idxs_,
arrayMap(x -> if(has(gap_idxs_, x), 1, 0), event_idxs) AS gap_masks,
arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events
select `#account_id`,
arrayJoin(split_events) AS event_chain_,
arrayMap(x ->
x.2, event_chain_) AS event_chain,
has(event_chain, start_event) AS has_midway_hit
from (select `#event_time`, `#event_name`, `#account_id`
from {self.game}.event
where addHours(`#event_time`, {self.zone_time}) >= start_data
and addHours(`#event_time`, {self.zone_time}) <= end_data
and `#event_name` in evnet_all)
group by `#account_id`
HAVING has_midway_hit = 1
)
where arrayElement(event_chain, 1) = start_event
GROUP BY event_chain
ORDER BY values desc
"""
sql_b = f"""with
'{source_event}' as end_event,
{tuple(event_names)} as evnet_all,
'{self.start_date}' as start_data,
'{self.end_date}' as end_data
select event_chain,
count() as values
from (with
toUInt32(maxIf(`#event_time`, `#event_name` = end_event)) AS end_event_ts,
arraySort(
x ->
x.1,
arrayFilter(
x -> x.1 <= end_event_ts,
groupArray((toUInt32(`#event_time`), `#event_name`))
)
) AS sorted_events,
arrayEnumerate(sorted_events) AS event_idxs,
arrayFilter(
(x, y, z) -> z.1 <= end_event_ts and (z.2 = end_event and y>{interval_ts}) OR y > {interval_ts},
event_idxs,
arrayDifference(sorted_events.1),
sorted_events
) AS gap_idxs,
arrayMap(x -> x+1, gap_idxs) AS gap_idxs_,
arrayMap(x -> if(has(gap_idxs_, x), 1,0), event_idxs) AS gap_masks,
arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events
select `#account_id`,
arrayJoin(split_events) AS event_chain_,
arrayMap(x ->
x.2, event_chain_) AS event_chain,
has(event_chain, end_event) AS has_midway_hit
from (select `#event_time`, `#event_name`, `#account_id`
from {self.game}.event
where addHours(`#event_time`, {self.zone_time}) >= start_data
and addHours(`#event_time`, {self.zone_time}) <= end_data
and `#event_name` in evnet_all)
group by `#account_id`
HAVING has_midway_hit = 1
)
where arrayElement(event_chain, -1) = end_event
GROUP BY event_chain
ORDER BY values desc"""
sql = sql_a if source_type == 'initial_event' else sql_b
print(sql)
return {
'sql': sql,
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
async def retention_model_sql2(self):
filter_item_type = self.event_view.get('filter_item_type')
filter_item = self.event_view.get('filter_item')
event_name_a = self.events[0]['eventName']
event_name_b = self.events[1]['eventName']
visit_name = self.events[0].get('event_attr_id')
where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_a = '1'
if where:
qry = sa.select().where(*where)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_a = 'WHERE '.join(sql.split('WHERE ')[1:])
where, _ = await self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_b = '1'
if where:
qry = sa.select().where(*where)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_b = sql.split('WHERE ')[1]
# 任意事件
event_name_b = 1 if event_name_b == '*' else f"`#event_name` = '{event_name_b}'"
days = (arrow.get(self.end_date).date() - arrow.get(self.start_date).date()).days
keep = []
cnt = []
retention_n = [*[k for k in range(1, 60)], 70-1, 75-1, 80-1, 85-1, 90-1, 95-1, 100-1, 110-1, 120-1, 150-1, 180-1, 210-1, 240-1, 270-1, 300-1,
360-1]
"""
cnt0-cnt1 as on1,
round(on1 * 100 / cnt0, 2) as `0p1`,
"""
for i in retention_n:
keep.append(
f"""cnt{i},
round(cnt{i} * 100 / cnt0, 2) as `p{i}`,
cnt0-cnt{i} as on{i},
round(on{i} * 100 / cnt0, 2) as `op{i}`
""")
cnt.append(f"""sum(if(dateDiff('day',a.reg_date,b.visit_date)={i},1,0)) as cnt{i}""")
keep_str = ','.join(keep)
cnt_str = ','.join(cnt)
sql = f"""
with '{event_name_a}' as start_event,
{event_name_b} as retuen_visit,
`{visit_name}` as visit,
'{self.start_date}' as start_data,
'{self.end_date}' as end_data,
toDate(addHours(`#event_time`, {self.zone_time})) as date
select reg_date,
cnt0 ,
{keep_str}
from(select date, uniqExact(visit) as cnt0 from {self.game}.event
where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a}
group by date) reg left join
(select a.reg_date,
{cnt_str}
from (select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit) a
left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on
a.visit = b.visit
group by a.reg_date) log on reg.date=log.reg_date
"""
print(sql)
return {
'sql': sql,
'date_range': self.date_range,
'unit_num': self.unit_num,
'retention_n': retention_n,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}