标签生效逻辑

This commit is contained in:
Àî×ÚÕñ 2022-07-07 15:56:41 +08:00
parent de39d145aa
commit 49c7668169
4 changed files with 393 additions and 140 deletions

View File

@ -616,16 +616,29 @@ async def retention_model(request: Request,
tmp['n_outflow'].append(v[f'on{i}']) tmp['n_outflow'].append(v[f'on{i}'])
tmp = summary_values['均值'] tmp = summary_values['均值']
retention_avg_dict = {} retention_avg_dict = {}
group_label = res['group_label']
# 多个分组项时,合成列表返回 # 多个分组项时,合成列表返回
if len(groupby)>1: if not group_label:
if len(groupby) > 1:
summary_valuess = {}
for k, v in summary_values.items():
if 'str' in str(type(k)):
summary_valuess[str([k])] = v
else:
summary_valuess[str(list(k))] = v
else:
summary_valuess = summary_values
# 包含标签分组项
else:
summary_valuess = {} summary_valuess = {}
for k, v in summary_values.items(): for k, v in summary_values.items():
if 'str' in str(type(k)): key = list(k)
summary_valuess[str([k])] = v # 增加标签分组到对应的key里面
else: for name, index in group_label.items():
summary_valuess[str(list(k))] = v key.insert(index, name)
else: summary_valuess[str(key)] = v
summary_valuess=summary_values
for rn in retention_n: for rn in retention_n:
for rt, rd in df.T.items(): for rt, rd in df.T.items():
@ -1436,26 +1449,29 @@ async def scatter_model(
event_type = analysis.events[0]['eventName'] event_type = analysis.events[0]['eventName']
where = analysis.events[-1]['quotaname'] where = analysis.events[-1]['quotaname']
sql = res['sql'] sql = res['sql']
columnName = analysis.event_view['groupBy'][-1]['columnName'] group_by = analysis.event_view['groupBy']
if analysis.event_view['groupBy'] != []: # 排除标签
true_group = [i for i in group_by if i['data_type'] != "user_label"]
if columnName != '': columnName = true_group[-1]['columnName']
#按天分组 if true_group != []:
sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', # if columnName != '':
1) # # 按天分组
sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) # sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va',
#按周分组 # 1)
sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', # sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1)
1) # # 按周分组
sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) # sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date',
#按月分组 # f'`{columnName}` as va',
sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', # 1)
1) # sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1)
sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) # # 按月分组
#合计 # sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date',
if analysis.event_view.get('timeParticleSize') == "total": # f'`{columnName}` as va',
sql = sql.replace(f'SELECT', f'SELECT `{columnName}` as va,', 1) # 1)
print(sql) # sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1)
# # 合计
# if analysis.event_view.get('timeParticleSize') == "total":
# sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1)
df = await ckdb.query_dataframe(sql) df = await ckdb.query_dataframe(sql)
if df.empty: if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None) return schemas.Msg(code=-9, msg='无数据', data=None)
@ -1472,6 +1488,8 @@ async def scatter_model(
interval_type = res['interval_type'] interval_type = res['interval_type']
analysi = res['analysis'] analysi = res['analysis']
groupby = res['groupby'] groupby = res['groupby']
true_df = df.groupby(groupby).sum()
group_label = res['group_label']
quota_interval_arr = res['quota_interval_arr'] quota_interval_arr = res['quota_interval_arr']
# 兼容合计的 # 兼容合计的
# if res['time_particle'] == 'total': # if res['time_particle'] == 'total':
@ -1480,20 +1498,25 @@ async def scatter_model(
if analysi != 'number_of_days' and interval_type != 'discrete': if analysi != 'number_of_days' and interval_type != 'discrete':
# 默认区间 # 默认区间
max_v = int(df['values'].max()) max_v = int(true_df['values'].max())
min_v = int(df['values'].min()) min_v = int(true_df['values'].min())
interval = (max_v - min_v) // 10 or 1 interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(), resp = {'list': dict(),
'start_date': res['start_date'], 'start_date': res['start_date'],
'end_date': res['end_date'], 'end_date': res['end_date'],
'time_particle': res['time_particle'], 'time_particle': res['time_particle'],
'biaotou': columnName 'biaotou': groupby
} }
if 'float' in str(df.dtypes['va']):
df['va'] = df['va'].astype(int) # if 'float' in str(df.dtypes['va']):
if 'list' in str(type(df['va'][0])): # df['va'] = df['va'].astype(int)
f = lambda x: x[0] # for index, gi in enumerate(groupby):
df['va'] = df['va'].map(f) # resp['list'][str(index)] = dict()
# if 'float' in str(df.dtypes[gi]):
# df[gi] = df[gi].astype(int)
# if 'list' in str(type(df[gi][0])):
# f = lambda x: x[0]
# df[gi] = df[gi].map(f)
if not quota_interval_arr: if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)] resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)] bins = [i for i in range(min_v, max_v + interval, interval)]
@ -1510,12 +1533,19 @@ async def scatter_model(
# f = lambda x: x[0] # f = lambda x: x[0]
# df['va'] = df['va'].map(f) # df['va'] = df['va'].map(f)
# 这是分组的 # 这是分组的
for key, tmp_df in df.groupby('va'): for key, tmp_df in true_df.groupby(groupby):
bins_s = pd.cut(tmp_df['values'], bins=bins, bins_s = pd.cut(tmp_df['values'], bins=bins,
right=True, include_lowest=True).value_counts() right=True, include_lowest=True).value_counts()
bins_s.sort_index(inplace=True) bins_s.sort_index(inplace=True)
total = int(bins_s.sum()) total = int(bins_s.sum())
if group_label:
if isinstance(key, str):
key = [key]
key = list(key)
for name, idx in group_label.items():
key.insert(idx, name)
key = str(key)
if res['time_particle'] == 'total111': if res['time_particle'] == 'total111':
resp['list']['合计'] = dict() resp['list']['合计'] = dict()
@ -1528,32 +1558,36 @@ async def scatter_model(
if str(p[i]) == 'nan': if str(p[i]) == 'nan':
p[i] = 0 p[i] = 0
# 映射对应的埋点数据 # 映射对应的埋点数据
re = await crud.select_map.get_list(db, game) # re = await crud.select_map.get_list(db, game)
re_list = [i['attr_name'] for i in re] # re_list = [i['attr_name'] for i in re]
if columnName in re_list: # if gi in re_list:
for i in re: # for i in re:
if columnName == i['attr_name']: # if gi == i['attr_name']:
for datas in i['map_']: # for datas in i['map_']:
if key == datas['id']: # if key == datas['id']:
key = datas['title'] # key = datas['title']
break # break
break # break
if 'time' not in columnName: # if 'time' not in groupby:
resp['list'][key] = dict() resp['list'][str(key)] = dict()
resp['list'][key] = {'n': bins_s.to_list(), 'total': total, resp['list'][str(key)] = {'n': bins_s.to_list(), 'total': total,
'p': [str(i) + '%' for i in p], 'p': [str(i) + '%' for i in p],
'title': '总体'} 'title': '总体'}
else: # else:
resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict() # resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict()
resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total, # resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total,
'p': [str(i) + '%' for i in p], # 'p': [str(i) + '%' for i in p],
'title': '总体'} # 'title': '总体'}
# 兼容下载功能 # 兼容下载功能
download = analysis.event_view.get('download', '') download = analysis.event_view.get('download', '')
if download == 1: if download == 1:
create_df = create_neidf(resp, columnName) create_df = create_neidf(resp, columnName)
Download = Download_xlsx(create_df, '分布分析') Download = Download_xlsx(create_df, '分布分析')
return Download return Download
if group_label:
for name, idx in group_label.items():
resp['biaotou'].insert(idx, name)
return schemas.Msg(code=0, msg='ok', data=resp) return schemas.Msg(code=0, msg='ok', data=resp)
else: else:
# 离散数字 # 离散数字
@ -1561,15 +1595,17 @@ async def scatter_model(
'start_date': res['start_date'], 'start_date': res['start_date'],
'end_date': res['end_date'], 'end_date': res['end_date'],
'time_particle': res['time_particle'], 'time_particle': res['time_particle'],
'biaotou': columnName 'biaotou': groupby
} }
labels = [str(i) for i in sorted(df['values'].unique())] labels = [str(i) for i in sorted(true_df['values'].unique())]
resp['label'] = labels resp['label'] = labels
if 'list' in str(type(df['va'][0])): # for index, gi in enumerate(groupby):
f = lambda x: x[0] # resp['list'][str(index)] = dict()
df['va'] = df['va'].map(f) # if 'list' in str(type(df[gi][0])):
# f = lambda x: x[0]
# df[gi] = df[gi].map(f)
shaixuan = analysis.events[0].get('analysis') shaixuan = analysis.events[0].get('analysis')
for key, tmp_df in df.groupby(['va']): for key, tmp_df in true_df.groupby(groupby):
if shaixuan == 'uniqExact': if shaixuan == 'uniqExact':
total = len(set(tmp_df['uid'])) total = len(set(tmp_df['uid']))
else: else:
@ -1578,20 +1614,28 @@ async def scatter_model(
dt = '合计' dt = '合计'
else: else:
# 映射对应的埋点数据 # 映射对应的埋点数据
re = await crud.select_map.get_list(db, game) # re = await crud.select_map.get_list(db, game)
re_list = [i['attr_name'] for i in re] # re_list = [i['attr_name'] for i in re]
if columnName in re_list: # if gi in re_list:
for i in re: # for i in re:
if columnName == i['attr_name']: # if gi == i['attr_name']:
for datas in i['map_']: # for datas in i['map_']:
if key == datas['id']: # if key == datas['id']:
key = datas['title'] # key = datas['title']
break # break
break # break
dt = key dt = key
# dt = key.strftime('%Y-%m-%d') # dt = key.strftime('%Y-%m-%d')
# dt='合计' # dt='合计'
# 存在标签分组
if group_label:
if isinstance(dt, str):
dt = [dt]
dt = list(dt)
for name, idx in group_label.items():
dt.insert(idx, name)
dt = str(dt)
labels_dict = {} labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'): for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2) label = str(key2)
@ -1618,7 +1662,7 @@ async def scatter_model(
number_str = str(number_int) + '%' number_str = str(number_int) + '%'
list_p.append(number_str) list_p.append(number_str)
resp['list'][dt] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total, resp['list'][str(dt)] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
'p': list_p} 'p': list_p}
else: else:
list_p = [] list_p = []
@ -1626,7 +1670,7 @@ async def scatter_model(
number_int = round(labels_dict.get(i, 0) * 100 / total, 2) number_int = round(labels_dict.get(i, 0) * 100 / total, 2)
number_str = str(number_int) + '%' number_str = str(number_int) + '%'
list_p.append(number_str) list_p.append(number_str)
resp['list'][dt] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, resp['list'][str(dt)] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': list_p} 'p': list_p}
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, # resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}} # 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
@ -1641,6 +1685,10 @@ async def scatter_model(
create_df = create_neidf(resp, columnName) create_df = create_neidf(resp, columnName)
Download = Download_xlsx(create_df, '分布分析') Download = Download_xlsx(create_df, '分布分析')
return Download return Download
if group_label:
for name, idx in group_label.items():
resp['biaotou'].insert(idx, name)
return schemas.Msg(code=0, msg='ok', data=resp) return schemas.Msg(code=0, msg='ok', data=resp)
else: else:
return schemas.Msg(code=-9, msg='没有添加分组项', data='') return schemas.Msg(code=-9, msg='没有添加分组项', data='')

View File

@ -51,34 +51,34 @@ async def ltv_model_sql(
) -> schemas.Msg: ) -> schemas.Msg:
""" ltv模型sql """ """ ltv模型sql """
await analysis.init(data_where=current_user.data_where) await analysis.init(data_where=current_user.data_where)
res = analysis.ltv_model_sql() res = await analysis.ltv_model_sql()
sql = res['sql'] sql = res['sql'].replace('/n','').replace('[','').replace(']','')
#仅一条筛选条件则是把GM过滤后获取全部数据 #仅一条筛选条件则是把GM过滤后获取全部数据
if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': # if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM':
try: try:
df = await ckdb.query_dataframe(sql) df = await ckdb.query_dataframe(sql)
except Exception as e: except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常') return schemas.Msg(code=-9, msg='报表配置参数异常')
#多条筛选条件则合成新的sql #多条筛选条件则合成新的sql
else: # else:
new_sql="""""" # new_sql=""""""
#拆分sql # #拆分sql
split_sql = sql.split('AND 1') # split_sql = sql.split('AND 1')
#获取每一条筛选条件 # #获取每一条筛选条件
for i in analysis.global_filters: # for i in analysis.global_filters:
#剔除GM # #剔除GM
if i['strftv'] != 'GM': # if i['strftv'] != 'GM':
#获取筛选条件的包含关系 # #获取筛选条件的包含关系
bijiao=get_bijiao(i["comparator"]) # bijiao=get_bijiao(i["comparator"])
#获取筛选条件的值 # #获取筛选条件的值
condition=tuple(i['ftv']) # condition=tuple(i['ftv'])
#获取事件名 # #获取事件名
columnName=i['columnName'] # columnName=i['columnName']
dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" # dd = f""" AND {game}.event.{columnName} {bijiao} {condition}"""
new_sql+=dd # new_sql+=dd
split_="""AND 1 """ # split_="""AND 1 """
news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] # news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3]
df = await ckdb.query_dataframe(news_sql) # df = await ckdb.query_dataframe(news_sql)
# # 判断11月23号之前的数据 # # 判断11月23号之前的数据
# list_data_range=analysis.date_range # list_data_range=analysis.date_range
# liststr_data_range=[] # liststr_data_range=[]
@ -248,6 +248,7 @@ async def ltv_model_sql(
return schemas.Msg(code=0, msg='ok', data=data) return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/ltv_model_export") @router.post("/ltv_model_export")
async def ltv_model_export(request: Request, async def ltv_model_export(request: Request,
game: str, game: str,

View File

@ -1,6 +1,6 @@
import re import re
from typing import Tuple from typing import Tuple
from copy import deepcopy
import arrow import arrow
import sqlalchemy as sa import sqlalchemy as sa
import json import json
@ -196,14 +196,24 @@ class BehaviorAnalysis:
if item['data_type'] == 'user_label': if item['data_type'] == 'user_label':
item.update({ item.update({
'comparator': "in", 'comparator': "in",
'comparator_name': "" 'comparator_name': "",
'tableType': "user_label"
}) })
# 加入分组标签 # 加入分组标签
self.group_label.update({item['columnDesc']: idx}) self.group_label.update({item['columnDesc']: idx})
# 加入events中每个event的filts条件中
if self.events: # 事件分析,分布分析
for i in self.events: if self.event_view['cksql'] in ['event', 'scatter']:
i['filts'].append(item) # 加入events中每个event的filts条件中
if self.events:
for i in self.events:
i['filts'].append(item)
# 留存分析,漏斗分析
if self.event_view['cksql'] in ['retention', 'funnel']:
# 加入全局删选filts条件中
self.event_view['filts'].append(item)
continue continue
# 不是标签加入分组项中 # 不是标签加入分组项中
res.append(getattr(self.event_tbl.c, item['columnName'])) res.append(getattr(self.event_tbl.c, item['columnName']))
@ -230,7 +240,29 @@ class BehaviorAnalysis:
return start_date, end_date, date_range return start_date, end_date, date_range
def _get_global_filters(self): def _get_global_filters(self):
return self.event_view.get('filts') or [] _res = []
if self.event_view.get('filts'):
_res = self.event_view.get('filts')
# 漏斗分析,特殊处理
if self.event_view['cksql'] in ['funnel']:
_trueRes = []
for i in _res:
if 'comparator' in i:
trueI = deepcopy(i)
_trueRes.append(trueI)
continue
comparator = i.pop('comparator_id')
comparatorName = i.pop('comparator_name')
i.update({
'comparator': comparator,
'comparatorName': comparatorName,
'tableType': "user_label"
})
trueI = deepcopy(i)
_trueRes.append(trueI)
_res = _trueRes
return _res
async def _init_table(self): async def _init_table(self):
""" """
@ -248,6 +280,87 @@ class BehaviorAnalysis:
# self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns]) # self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns])
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
async def handler_trace_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
user_filters = []
event_filters = []
for filter in filters:
user_filter = []
event_filter = []
for item in filter:
comparator = item['comparator_id']
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
elif item['tableType'] == 'user_label':
user_cluster_def = UserClusterDef(self.game, item['columnName'], self.data_where)
await user_cluster_def.init()
sub_qry = user_cluster_def.to_sql_qry()
if comparator == 'in':
event_filter.append(sa.Column('#account_id').in_(sub_qry))
else:
event_filter.append(sa.Column('#account_id').notin_(sub_qry))
continue
else:
continue
tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName'])
# 日期类型处理时区
if item.get('data_type') == 'datetime':
col = func.addHours(col, self.zone_time)
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'range':
where.append(col > ftv[0])
where.append(col <= ftv[1])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == 'like':
where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in':
where.append(col.in_(ftv))
elif comparator == '!=':
where.append(col != ftv[0])
else:
if event_filter:
event_filters.append(and_(*event_filter))
if user_filter:
user_filters.append(and_(*user_filter))
return event_filters, user_filters
async def handler_filts(self, *filters): async def handler_filts(self, *filters):
""" """
@ -684,6 +797,7 @@ ORDER BY level
sql = sql.replace('_windows_gap_', f"({windows_gap})") sql = sql.replace('_windows_gap_', f"({windows_gap})")
print(sql) print(sql)
return {'sql': sql, return {'sql': sql,
'group_label': self.group_label,
'groupby': [i.key for i in self.groupby], 'groupby': [i.key for i in self.groupby],
'date_range': self.date_range, 'date_range': self.date_range,
'cond_level': cond_level, 'cond_level': cond_level,
@ -773,6 +887,7 @@ ORDER BY level
print(sql) print(sql)
return { return {
'sql': sql, 'sql': sql,
'group_label': self.group_label,
'interval_type': event['intervalType'], 'interval_type': event['intervalType'],
'analysis': analysis, 'analysis': analysis,
'quota_interval_arr': quota_interval_arr, 'quota_interval_arr': quota_interval_arr,
@ -815,6 +930,7 @@ ORDER BY level
print(sql) print(sql)
return { return {
'sql': sql, 'sql': sql,
'group_label': self.group_label,
'interval_type': event['intervalType'], 'interval_type': event['intervalType'],
'analysis': analysis, 'analysis': analysis,
'quota_interval_arr': quota_interval_arr, 'quota_interval_arr': quota_interval_arr,
@ -837,17 +953,19 @@ ORDER BY level
event_names = self.events.get('event_names') event_names = self.events.get('event_names')
source_event = self.events.get('source_event', {}).get('eventName') source_event = self.events.get('source_event', {}).get('eventName')
source_type = self.events.get('source_event', {}).get('source_type') source_type = self.events.get('source_event', {}).get('source_type')
wheres = self.events['user_filter']['filts'] filters = self.events.get('user_filter', {}).get('filts', [])
sql_one='' event_filter, user_filter = await self.handler_trace_filts(filters)
if wheres != []: # 有筛选条件的时候使用 where_a = '1'
columnName=wheres[0]['columnName'] # 获取字段名 if event_filter:
event=await get_event(columnName,self.game) # 获取字段属于那个事件,或者是否是基础属性 qry = sa.select().where(*event_filter)
if event == '基础属性': sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
sql_one=f""" and `{columnName}` {wheres[0]['comparator']} '{wheres[0]['ftv'][0]}' and `#event_name` in evnet_all) """ where_a = 'WHERE '.join(sql.split('WHERE ')[1:])
else:# 如果包含有事件则进入下面的逻辑 where_b = '1'
sql_one=f"""and `#account_id` in ( SELECT `#account_id` FROM {self.game}.event WHERE `#event_name`= '{event}' and `{columnName}` = if user_filter:
'{wheres[0]['ftv'][0]}' and addHours(`#event_time`, {self.zone_time}) >= start_data qry = sa.select().where(*user_filter)
and addHours(`#event_time`, {self.zone_time}) <= end_data ) and `#event_name` in evnet_all)""" sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_b = sql.split('WHERE ')[1]
sql_a = f"""with sql_a = f"""with
'{source_event}' as start_event, '{source_event}' as start_event,
{tuple(event_names)} as evnet_all, {tuple(event_names)} as evnet_all,
@ -889,7 +1007,10 @@ from (with
group by `#account_id` group by `#account_id`
HAVING has_midway_hit = 1 HAVING has_midway_hit = 1
) )
where arrayElement(event_chain, 1) = start_event where arrayElement(event_chain, 1) = start_event and {where_a} and "#account_id" IN (SELECT "#account_id"
FROM (SELECT "#account_id"
FROM {self.game}.user_view
WHERE {where_b}) AS anon_b)
GROUP BY event_chain,`#account_id` GROUP BY event_chain,`#account_id`
ORDER BY values desc ORDER BY values desc
""" """
@ -933,13 +1054,14 @@ from (with
group by `#account_id` group by `#account_id`
HAVING has_midway_hit = 1 HAVING has_midway_hit = 1
) )
where arrayElement(event_chain, -1) = end_event where arrayElement(event_chain, -1) = end_event and {where_a} and "#account_id" IN (SELECT "#account_id"
FROM (SELECT "#account_id"
FROM {self.game}.user_view
WHERE {where_b}) AS anon_b)
GROUP BY event_chain,`#account_id` GROUP BY event_chain,`#account_id`
ORDER BY values desc""" ORDER BY values desc"""
sql = sql_a if source_type == 'initial_event' else sql_b sql = sql_a if source_type == 'initial_event' else sql_b
if sql_one != '':
sql= sql.replace('and `#event_name` in evnet_all)',sql_one,1)
print(sql) print(sql)
return { return {
'sql': sql, 'sql': sql,

View File

@ -14,9 +14,8 @@ import crud
import schemas import schemas
from core.config import settings from core.config import settings
from db import get_database from db import get_database
from db.redisdb import get_redis_pool, RedisDrive
from models.user_label import UserClusterDef from models.user_label import UserClusterDef
from utils import get_event from db.redisdb import get_redis_pool, RedisDrive
class XAnalysis: class XAnalysis:
@ -25,20 +24,28 @@ class XAnalysis:
self.game = game self.game = game
self.event_view = dict() self.event_view = dict()
self.events = [] self.events = []
self.zone_time: int = 0
self.global_filters = [] self.global_filters = []
self.account_filters = [] self.account_filters = []
self.global_relation = 'and' self.global_relation = 'and'
self.date_range = [] self.date_range = []
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
self.start_date = None
self.end_date = None
self.data_where=[]
self.event_tbl=None
self.zone_time:int = 0
def _get_global_filters(self): def _get_global_filters(self):
return self.event_view.get('filts') or [] #获取event_view字典里面filts的值或返回空列表 _res = self.event_view.get('filts', [])
if _res:
for idx, item in enumerate(_res):
if item['data_type'] == 'user_label':
_res[idx].update({
'tableType': item['data_type'],
})
else:
_res[idx].update({
'tableType': item['table_type'],
})
return _res # 获取event_view字典里面filts的值或返回空列表
async def init(self, *args, **kwargs): async def init(self, *args, **kwargs):
if self.data_in.report_id: if self.data_in.report_id:
@ -70,8 +77,69 @@ class XAnalysis:
if 'data_where' in kwargs: if 'data_where' in kwargs:
self.account_filters = kwargs['data_where'].get(self.game, []) self.account_filters = kwargs['data_where'].get(self.game, [])
def handler_filts(self, *filters): # def handler_filts(self, *filters):
# """
# :param filters: (filts:list,relation:str)
# :param g_f:
# :param relation:
# :return:
# """
#
# event_filters = []
# for filter in filters:
# filts = filter[0]
# relation = filter[1]
# event_filter = []
# for item in filts:
#
# where = event_filter
#
# col = sa.Column(item['columnName'])
#
# comparator = item['comparator']
# ftv = item['ftv']
# if comparator == '==':
# if len(ftv) > 1:
# where.append(or_(*[col == v for v in ftv]))
# else:
# where.append(col == ftv[0])
# elif comparator == '>=':
# where.append(col >= ftv[0])
# elif comparator == '<=':
# where.append(col <= ftv[0])
# elif comparator == '>':
# where.append(col > ftv[0])
# elif comparator == '<':
# where.append(col < ftv[0])
#
# elif comparator == 'is not null':
# where.append(col.isnot(None))
# elif comparator == 'is null':
# where.append(col.is_(None))
#
# elif comparator == 'like':
# where.append(col.like(f'%{ftv[0]}%'))
#
# elif comparator == 'not like':
# where.append(col.notlike(f'%{ftv[0]}%'))
#
# elif comparator == 'in':
# where.append(col.in_(ftv))
#
# elif comparator == '!=':
# where.append(col != ftv[0])
# if relation == 'and':
# if event_filter:
# event_filters.append(and_(*event_filter))
# else:
# if event_filter:
# event_filters.append(or_(*event_filter))
#
# return event_filters
async def handler_filts(self, *filters):
""" """
:param filters: (filts:list,relation:str) :param filters: (filts:list,relation:str)
:param g_f: :param g_f:
:param relation: :param relation:
@ -82,10 +150,26 @@ class XAnalysis:
for filter in filters: for filter in filters:
filts = filter[0] filts = filter[0]
relation = filter[1] relation = filter[1]
user_filter = []
event_filter = [] event_filter = []
for item in filts: for item in filts:
comparator = item['comparator']
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
elif item['tableType'] == 'user_label':
user_cluster_def = UserClusterDef(self.game, item['columnName'], self.account_filters)
await user_cluster_def.init()
sub_qry = user_cluster_def.to_sql_qry()
if comparator == 'in':
event_filter.append(sa.Column('#account_id').in_(sub_qry))
else:
event_filter.append(sa.Column('#account_id').notin_(sub_qry))
where = event_filter continue
else:
continue
col = sa.Column(item['columnName']) col = sa.Column(item['columnName'])
@ -130,8 +214,7 @@ class XAnalysis:
return event_filters return event_filters
def ltv_model_sql(self): async def ltv_model_sql(self):
# ltv的生成sql
days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days
quota = self.event_view['quota'] quota = self.event_view['quota']
select_ltv = [] select_ltv = []
@ -163,17 +246,16 @@ class XAnalysis:
sql = str(qry.compile(compile_kwargs={"literal_binds": True})) sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_str = sql.split('WHERE ')[1] where_str = sql.split('WHERE ')[1]
where_order = self.handler_filts((self.global_filters, self.global_relation)) #global_relation就是 and where_order = await self.handler_filts((self.global_filters, self.global_relation)) # global_relation就是 and
where_order_str = 1 where_order_str = 1
if where_order: if where_order:
qry = sa.select().where(*where_order) qry = sa.select().where(*where_order)
sql = str(qry.compile(compile_kwargs={"literal_binds": True})) sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_order_str = sql.split('WHERE ')[1] where_order_str = 'WHERE '.join(sql.split('WHERE ')[1:])
where_account = self.handler_filts((self.account_filters, 'and'), self.ext_filters) where_account = await self.handler_filts((self.account_filters, 'and'), self.ext_filters)
where_account_str = 1 where_account_str = 1
if where_account: if where_account:
qry = sa.select().where(*where_account) qry = sa.select().where(*where_account)
sql = str(qry.compile(compile_kwargs={"literal_binds": True})) sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_account_str = sql.split('WHERE ')[1] where_account_str = sql.split('WHERE ')[1]
@ -206,4 +288,4 @@ class XAnalysis:
'end_date': self.event_view['endTime'][:10], 'end_date': self.event_view['endTime'][:10],
'date_range': self.date_range, 'date_range': self.date_range,
'ltv_n': ltv_n 'ltv_n': ltv_n
} }