增加单事件漏斗

This commit is contained in:
Àî×ÚÕñ 2022-08-18 19:13:55 +08:00
parent 8eb8da852c
commit f02ea6dd18
2 changed files with 217 additions and 26 deletions

View File

@ -415,7 +415,7 @@ async def event_model(
appgb = str(gb).replace("'", '') appgb = str(gb).replace("'", '')
groups.append(appgb) groups.append(appgb)
item['groups'] = groups item['groups'] = groups
#修改后的方案 # 修改后的方案
# by_dict={} # by_dict={}
# for i in range(len(gb)): # for i in range(len(gb)):
# by_dict[groupby_list[i]]=gb[i] # by_dict[groupby_list[i]]=gb[i]
@ -431,7 +431,7 @@ async def event_model(
appgb = str(gb).replace("'", '') appgb = str(gb).replace("'", '')
groups.append(appgb) groups.append(appgb)
item['groups'] = groups item['groups'] = groups
item['groupby']=groupby item['groupby'] = groupby
item['values'] = np.array(item['values'])[sort_key].tolist() item['values'] = np.array(item['values'])[sort_key].tolist()
item['sum'] = np.array(item['sum'])[sort_key].tolist() item['sum'] = np.array(item['sum'])[sort_key].tolist()
item['avg'] = np.array(item['avg'])[sort_key].tolist() item['avg'] = np.array(item['avg'])[sort_key].tolist()
@ -587,7 +587,7 @@ async def retention_model(request: Request,
# 映射对应中文返回给前端展示 # 映射对应中文返回给前端展示
groupby_list = analysis.event_view.get('groupBy') groupby_list = analysis.event_view.get('groupBy')
groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label']
true_group = [] # 定义分组实际选择 true_group = [] # 定义分组实际选择
for g_data in groupby_list: for g_data in groupby_list:
data_type = g_data['data_type'] data_type = g_data['data_type']
@ -622,7 +622,7 @@ async def retention_model(request: Request,
max_v = int(df[g_data['columnName']].max()) max_v = int(df[g_data['columnName']].max())
min_v = int(df[g_data['columnName']].min()) min_v = int(df[g_data['columnName']].min())
interval = (max_v - min_v) // 10 or 1 interval = (max_v - min_v) // 10 or 1
for i in range(min_v, max_v+1, interval): for i in range(min_v, max_v + 1, interval):
zidai.append([i, i + interval]) zidai.append([i, i + interval])
true_group.append(zidai) true_group.append(zidai)
@ -795,8 +795,8 @@ async def retention_model(request: Request,
# 计算概率 # 计算概率
for key1, value1 in new_summary_valuess.items(): for key1, value1 in new_summary_valuess.items():
new_summary_valuess[key1]['p'] = [round(i*100 / value1['d0'], 2) for i in value1['n']] new_summary_valuess[key1]['p'] = [round(i * 100 / value1['d0'], 2) for i in value1['n']]
new_summary_valuess[key1]['p_outflow'] = [round(i1*100 / value1['d0'], 2) for i1 in value1['n_outflow']] new_summary_valuess[key1]['p_outflow'] = [round(i1 * 100 / value1['d0'], 2) for i1 in value1['n_outflow']]
title = ['分组项', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]] title = ['分组项', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]]
@ -820,8 +820,8 @@ async def retention_model(request: Request,
'start_date': res['start_date'], 'start_date': res['start_date'],
'end_date': res['end_date'], 'end_date': res['end_date'],
'time_particle': res['time_particle'], 'time_particle': res['time_particle'],
'groupby':[i['columnName'] for i in groupby_list], 'groupby': [i['columnName'] for i in groupby_list],
'groupby_data':groupby_data 'groupby_data': groupby_data
} }
return schemas.Msg(code=0, msg='ok', data=resp) return schemas.Msg(code=0, msg='ok', data=resp)
@ -1178,7 +1178,7 @@ async def funnel_model(
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {}) _ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
#[key[1]] = tmp # [key[1]] = tmp
title = (groupby or ['总体']) + cond_level title = (groupby or ['总体']) + cond_level
resp = {'list': data_list, resp = {'list': data_list,
'date_data': date_data, 'date_data': date_data,
@ -1520,6 +1520,58 @@ async def scatter_model(
# 'title': '总体'} # 'title': '总体'}
@router.post("/guide_model")
async def guide_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init(data_where=current_user.data_where)
event_type = analysis.events[0]['eventName']
try:
res = await analysis.guide_model_sql()
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
group_str = res['analysis']
# 转int
df[group_str] = df[group_str].astype(int)
step_list = [str(i) for i in sorted(df[group_str].unique())]
dict_k = {}
for k, nedf in df.groupby("date"):
ste_k = {}
for kk, ste_df in nedf.groupby(group_str):
ste_df.reset_index(drop=True, inplace=True)
ste_k[str(kk)] = int(ste_df['values'][0])
for ste in step_list:
if ste not in list(ste_k.keys()):
ste_k[ste] = 0
dict_k[str(k)] = ste_k
data = {}
for dict_key, dict_data in dict_k.items():
dict_data1 = deepcopy(dict_data)
dict_k1 = {int(k): v for k, v in dict_data1.items()}
sorted(dict_k1.keys())
data_values = list(dict_k1.values())
data_values.insert(0, dict_key)
data[dict_key] = data_values
step_list.insert(0, '日期')
res_msg = {
'level': step_list,
'list': data
}
return schemas.Msg(code=0, msg='ok', data=res_msg)
@router.post("/scatter_model_details") @router.post("/scatter_model_details")
async def scatter_model( async def scatter_model(
request: Request, request: Request,
@ -1683,7 +1735,7 @@ async def scatter_model(
ac = ast.literal_eval(i) ac = ast.literal_eval(i)
ab = [str(ii) for ii in ac] ab = [str(ii) for ii in ac]
groupby_data.append(ab) groupby_data.append(ab)
resp['groupby_data']=groupby_data resp['groupby_data'] = groupby_data
return schemas.Msg(code=0, msg='ok', data=resp) return schemas.Msg(code=0, msg='ok', data=resp)
else: else:
# 离散数字 # 离散数字
@ -1790,7 +1842,7 @@ async def scatter_model(
ac = ast.literal_eval(i) ac = ast.literal_eval(i)
ab = [str(ii) for ii in ac] ab = [str(ii) for ii in ac]
groupby_data.append(ab) groupby_data.append(ab)
resp['groupby_data']=groupby_data resp['groupby_data'] = groupby_data
return schemas.Msg(code=0, msg='ok', data=resp) return schemas.Msg(code=0, msg='ok', data=resp)
else: else:
return schemas.Msg(code=-9, msg='没有添加分组项', data='') return schemas.Msg(code=-9, msg='没有添加分组项', data='')

View File

@ -400,15 +400,15 @@ class BehaviorAnalysis:
# 字段名 # 字段名
col = getattr(tbl.c, item['columnName']) col = getattr(tbl.c, item['columnName'])
# 判断是否是同一个事件 # 判断是否是同一个事件
yuan_event=self.events[0].get('eventName') or self.events[0].get('event_name') # 指标中的事件名 yuan_event = self.events[0].get('eventName') or self.events[0].get('event_name') # 指标中的事件名
biao_event=self.events[0].get('customEvent','').split('.')[0] biao_event = self.events[0].get('customEvent', '').split('.')[0]
event= await get_event(item['columnName'],self.game) # 获取对应事件名或基础属性 event = await get_event(item['columnName'], self.game) # 获取对应事件名或基础属性
if event != yuan_event and event != biao_event and event != '基础属性' and self.game != 'debug': if event != yuan_event and event != biao_event and event != '基础属性' and self.game != 'debug':
event_time_col = getattr(self.event_tbl.c, '#event_time') event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name') event_name_col = getattr(self.event_tbl.c, '#event_name')
base_where = [ base_where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,] func.addHours(event_time_col, self.zone_time) <= self.end_date, ]
event_name_where = [] event_name_where = []
event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event)) event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event))
ftv = item['ftv'] ftv = item['ftv']
@ -440,8 +440,8 @@ class BehaviorAnalysis:
event_name_where.append(col.in_(ftv)) event_name_where.append(col.in_(ftv))
elif comparator == '!=': elif comparator == '!=':
event_name_where.append(col != ftv[0]) event_name_where.append(col != ftv[0])
sub_qry=sa.select(sa.Column('#account_id')).select_from( sub_qry = sa.select(sa.Column('#account_id')).select_from(
sa.select(sa.Column('#account_id')).where(and_(*base_where,*event_name_where)) sa.select(sa.Column('#account_id')).where(and_(*base_where, *event_name_where))
) )
event_filter.append(sa.Column('#account_id').in_(sub_qry)) event_filter.append(sa.Column('#account_id').in_(sub_qry))
continue continue
@ -727,7 +727,7 @@ addHours("#event_time", 8) >= '{stat_date}' AND addHours("#event_time", 8) <= '{
GROUP BY toDate(addHours("#event_time", 8))""" GROUP BY toDate(addHours("#event_time", 8))"""
sqls.append({'sql': sql, sqls.append({'sql': sql,
'group_label':self.group_label, 'group_label': self.group_label,
'groupby': [i.key for i in self.groupby], 'groupby': [i.key for i in self.groupby],
'date_range': self.date_range, 'date_range': self.date_range,
'event_name': event_name_display or event_name, 'event_name': event_name_display or event_name,
@ -826,7 +826,7 @@ ORDER BY level
if time != None and time != '合计': if time != None and time != '合计':
timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间 timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间
if timeParticleSize == 'P1W': # 按周 if timeParticleSize == 'P1W': # 按周
start_date , end_date = get_week(time) start_date, end_date = get_week(time)
if start_date < strptime(self.start_date): # 开头的时间 if start_date < strptime(self.start_date): # 开头的时间
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) >= self.start_date,
@ -835,13 +835,13 @@ ORDER BY level
elif end_date < strptime(self.end_date): # 中间的时间 elif end_date < strptime(self.end_date): # 中间的时间
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) >= start_date,
func.addHours(event_time_col, self.zone_time) <= end_date,] func.addHours(event_time_col, self.zone_time) <= end_date, ]
else: # 结尾的时间 else: # 结尾的时间
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) >= start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,] func.addHours(event_time_col, self.zone_time) <= self.end_date, ]
elif timeParticleSize == 'P1M': # 按月 elif timeParticleSize == 'P1M': # 按月
start_date, end_date=start_end_month(time) start_date, end_date = start_end_month(time)
if strptime(self.start_date) > strptime1(time): if strptime(self.start_date) > strptime1(time):
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) >= self.start_date,
@ -855,7 +855,7 @@ ORDER BY level
else: else:
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,] func.addHours(event_time_col, self.zone_time) <= self.end_date, ]
else: else:
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) >= self.start_date,
@ -873,7 +873,8 @@ ORDER BY level
if analysis in ['number_of_days', 'number_of_hours']: if analysis in ['number_of_days', 'number_of_hours']:
values_col = func.count(func.distinct(e_account_id_col)).label('values') values_col = func.count(func.distinct(e_account_id_col)).label('values')
if analysis in ['times', 'number_of_days', 'number_of_hours', 'sum', 'avg', 'median', 'max', 'min', 'distinct_count']: if analysis in ['times', 'number_of_days', 'number_of_hours', 'sum', 'avg', 'median', 'max', 'min',
'distinct_count']:
if self.time_particle == 'total': if self.time_particle == 'total':
qry = sa.select(*self.groupby, values_col) \ qry = sa.select(*self.groupby, values_col) \
.where(and_(*where)) \ .where(and_(*where)) \
@ -940,6 +941,144 @@ ORDER BY level
'end_date': self.end_date[:10], 'end_date': self.end_date[:10],
} }
async def guide_model_sql(self):
# 事件步骤生成sql
event = self.events[0]
event_name = event['eventName']
analysis = event['analysis']
if analysis in ['list_distinct', "set_distinct", "ele_distinct"]:
analysis = 'max'
e_account_id_col = getattr(self.event_tbl.c, '#account_id').label('uid')
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
event_name_col = getattr(self.event_tbl.c, '#event_name')
event_time_col = getattr(self.event_tbl.c, '#event_time').label('date')
event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)
quota_interval_arr = event.get('quotaIntervalArr')
time = self.data_in.time
global where
# 判断是分布分析里面的分组详情改时间范围其他情况都走else
if time != None and time != '合计':
timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间
if timeParticleSize == 'P1W': # 按周
start_date, end_date = get_week(time)
if start_date < strptime(self.start_date): # 开头的时间
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= end_date,
]
elif end_date < strptime(self.end_date): # 中间的时间
where = [
func.addHours(event_time_col, self.zone_time) >= start_date,
func.addHours(event_time_col, self.zone_time) <= end_date, ]
else: # 结尾的时间
where = [
func.addHours(event_time_col, self.zone_time) >= start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date, ]
elif timeParticleSize == 'P1M': # 按月
start_date, end_date = start_end_month(time)
if strptime(self.start_date) > strptime1(time):
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= end_date,
]
else:
where = [
func.addHours(event_time_col, self.zone_time) >= start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
]
else:
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date, ]
else:
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
]
if event_name != '*':
where.append(event_name_col == event_name)
event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
if user_filter:
where.append(e_account_id_col.in_(sa.select(u_account_id_col).where(*user_filter)))
where.extend(event_filter)
values_col = func.count().label('values')
if analysis in ['number_of_days', 'number_of_hours']:
values_col = func.count(func.distinct(e_account_id_col)).label('values')
if analysis:
if self.time_particle == 'total':
qry = sa.select(*self.groupby, analysis, values_col) \
.where(and_(*where)) \
.group_by(*self.groupby, analysis, e_account_id_col)
else:
qry = sa.select(event_date_col, *self.groupby, values_col) \
.where(and_(*where)) \
.group_by(event_date_col, *self.groupby, e_account_id_col)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
sqla = sql.replace('SELECT', f'SELECT {analysis}, ', 1)
sqlb = sqla.replace('GROUP BY', f'GROUP BY {analysis}, ', 1)
sqlc = sqlb.replace('WHERE', f'WHERE {analysis} is not null AND ', 1)
print(sqlc)
return {
'sql': sqlc,
'group_label': self.group_label,
'interval_type': event['intervalType'],
'analysis': analysis,
'quota_interval_arr': quota_interval_arr,
'groupby': [i.key for i in self.groupby],
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
elif event.get('quota'):
event_attr_col = getattr(self.event_tbl.c, event['quota'])
if self.time_particle == 'total':
if analysis == 'uniqExact':
# 去重数 合计
qry = sa.select(e_account_id_col,
event_attr_col.label('values')) \
.where(and_(*where)) \
.group_by(*self.groupby, e_account_id_col, event_attr_col)
else:
qry = sa.select(e_account_id_col,
settings.CK_FUNC[analysis](event_attr_col).label('values')) \
.where(and_(*where)) \
.group_by(*self.groupby, e_account_id_col)
else:
if analysis == 'uniqExact':
# 去重数
qry = sa.select(event_date_col, e_account_id_col,
event_attr_col.label('values')) \
.where(and_(*where)) \
.group_by(event_date_col, e_account_id_col, event_attr_col)
else:
qry = sa.select(event_date_col, e_account_id_col,
settings.CK_FUNC[analysis](event_attr_col).label('values')) \
.where(and_(*where)) \
.group_by(event_date_col, e_account_id_col)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
columnName = event.get('label_id', '')
if columnName != '':
sql = sql.replace('SELECT', f'SELECT {columnName},', 1)
sql += f',{columnName}'
print(sql)
return {
'sql': sql,
'group_label': self.group_label,
'interval_type': event['intervalType'],
'analysis': analysis,
'quota_interval_arr': quota_interval_arr,
'groupby': [i.key for i in self.groupby],
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}
async def trace_model_sql(self): async def trace_model_sql(self):
# 路径分析生成SQL # 路径分析生成SQL
session_interval = self.event_view.get('session_interval') session_interval = self.event_view.get('session_interval')
@ -1164,7 +1303,7 @@ group by a.reg_date) log on reg.date=log.reg_date
filter_item = self.event_view.get('filter_item') filter_item = self.event_view.get('filter_item')
event_name_a = self.events[0]['eventName'] # 初始的事件名 event_name_a = self.events[0]['eventName'] # 初始的事件名
event_name_b = self.events[1]['eventName'] # 回访的事件名 event_name_b = self.events[1]['eventName'] # 回访的事件名
groupby_list=self.event_view.get('groupBy') groupby_list = self.event_view.get('groupBy')
# 判断是基础的还是标签 # 判断是基础的还是标签
groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label']
@ -1206,7 +1345,7 @@ group by a.reg_date) log on reg.date=log.reg_date
if len(groupby) > 0: if len(groupby) > 0:
groupbys = ','.join([f"`{i}`" for i in groupby]) groupbys = ','.join([f"`{i}`" for i in groupby])
groupby_on = ' and '.join([f"reg.{ii} = log.{ii}" for ii in [f"`{i}`" for i in groupby]]) groupby_on = ' and '.join([f"reg.{ii} = log.{ii}" for ii in [f"`{i}`" for i in groupby]])
sql=f""" sql = f"""
with '{event_name_a}' as start_event, with '{event_name_a}' as start_event,
{event_name_b} as retuen_visit, {event_name_b} as retuen_visit,
`{visit_name}` as visit, `{visit_name}` as visit,