This commit is contained in:
wuaho 2021-06-16 10:07:44 +08:00
parent 2337caa3e5
commit 54b53fcc20
2 changed files with 65 additions and 19 deletions

View File

@ -152,18 +152,21 @@ async def retention_model(
df.set_index(res['groupby'], inplace=True) df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True) df.sort_index(inplace=True)
values = {} values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num+1] days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
for i, d1 in enumerate(date_range): for i, d1 in enumerate(date_range):
for g in groups: for g in groups:
if g == tuple():
continue
a = set(df.loc[(d1, event_a, *g)]['values']) if (d1, event_a, *g) in df.index else set() a = set(df.loc[(d1, event_a, *g)]['values']) if (d1, event_a, *g) in df.index else set()
if not a: if not a:
continue continue
key = ','.join((d1.strftime("%Y-%m-%d"), *g)) key = d1.strftime("%Y-%m-%d")
tmp_g = values.setdefault(key, {})
for j, d2 in enumerate(date_range[i:]): for j, d2 in enumerate(date_range[i:]):
if j > unit_num: if j > unit_num:
break break
b = set(df.loc[(d2, event_b, *g)]['values']) if (d2, event_b, *g) in df.index else set() b = set(df.loc[(d2, event_b, *g)]['values']) if (d2, event_b, *g) in df.index else set()
tmp = values.setdefault(key, {}) tmp = tmp_g.setdefault(','.join(g), {})
tmp.setdefault('d0', len(a)) tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b)) tmp.setdefault('n', []).append(len(a & b))
@ -171,7 +174,42 @@ async def retention_model(
'summary_values': summary_values, 'summary_values': summary_values,
'values': values, 'values': values,
'days': days, 'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num+1], 'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title 'title': title
} }
return schemas.Msg(code=0, msg='ok', data=data) return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/funnel_model_sql")
async def funnel_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init()
data = analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/funnel_model")
async def funnel_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init()
res = analysis.funnel_model_sql()
sql = res['sql']
date_range = res['date_range']
df = await ckdb.query_dataframe(sql)
data = {}
return schemas.Msg(code=0, msg='ok', data=data)

View File

@ -226,16 +226,13 @@ class BehaviorAnalysis:
def funnel_model_sql(self): def funnel_model_sql(self):
""" """
SELECT SELECT level, count(*) AS values
level, FROM (SELECT windowFunnel(86400)(shjy.event."#event_time", shjy.event."#event_name" = 'create_role',
count() AS values shjy.event."#event_name" = 'login') AS level
FROM FROM shjy.event
(SELECT `#account_id`, WHERE addHours(shjy.event."#event_time", 8) >= '2021-05-16 00:00:00'
windowFunnel(864000)(`#event_time`, `#event_name` = 'create_role',`#event_name` = 'login') AS level AND addHours(shjy.event."#event_time", 8) <= '2021-06-14 23:59:59'
FROM event GROUP BY shjy.event."#account_id") AS anon_1
WHERE (`#event_time` >= '2021-06-01 00:00:00')
AND (`#event_time` <= '2021-06-05 00:00:00')
GROUP BY `#account_id`)
GROUP BY level GROUP BY level
ORDER BY level ORDER BY level
:return: :return:
@ -245,6 +242,8 @@ ORDER BY level
event_time_col = getattr(self.event_tbl.c, '#event_time') event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name') event_name_col = getattr(self.event_tbl.c, '#event_name')
e_account_id_col = getattr(self.event_tbl.c, '#account_id') e_account_id_col = getattr(self.event_tbl.c, '#account_id')
sub_group = [*self.groupby, e_account_id_col]
conds = [] conds = []
for item in self.events: for item in self.events:
event_filter, _ = self.handler_filts(*item['filts'], g_f=False) event_filter, _ = self.handler_filts(*item['filts'], g_f=False)
@ -252,7 +251,9 @@ ORDER BY level
and_(event_name_col == item['eventName'], *event_filter) and_(event_name_col == item['eventName'], *event_filter)
) )
# todo 替换 _windows_gap_ # todo 替换 _windows_gap_
subq = sa.select(func.windowFunnel_windows_gap__(event_time_col, *conds).alias('level')) subq = sa.select(*[sa.Column(i.key) for i in self.groupby],
func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from(
self.event_tbl)
g_event_filter, _ = self.handler_filts() g_event_filter, _ = self.handler_filts()
where = [ where = [
@ -260,11 +261,18 @@ ORDER BY level
func.addHours(event_time_col, self.zone_time) <= self.end_date, func.addHours(event_time_col, self.zone_time) <= self.end_date,
*g_event_filter *g_event_filter
] ]
subq = subq.where(and_(*where)).group_by(e_account_id_col) subq = subq.where(and_(*where)).group_by(*sub_group)
subq = subq.subquery() subq = subq.subquery()
qry = sa.select(sa.Column('level'), func.count()).select_from(subq) qry = sa.select(*[sa.Column(i.key) for i in self.groupby], sa.Column('level'),
sql = str(subq.compile(compile_kwargs={"literal_binds": True})) func.count().label('values')).select_from(subq) \
.where(sa.Column('level') > 0) \
.group_by(*[sa.Column(i.key) for i in self.groupby], sa.Column('level')) \
.order_by(*[sa.Column(i.key) for i in self.groupby], sa.Column('level'))
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
sql = sql.replace('_windows_gap_', f'({windows_gap})') sql = sql.replace('_windows_gap_', f'({windows_gap})')
print(sql) print(sql)
return sql return {'sql': sql,
'groupby': [i.key for i in self.groupby],
'date_range': self.date_range,
}