This commit is contained in:
wuaho 2021-06-11 16:39:53 +08:00
parent 58eafaaf7f
commit 9ddf741d3d
9 changed files with 430 additions and 25 deletions

View File

@ -10,8 +10,10 @@ from .endpoints import data_mana
from .endpoints import query
from .endpoints import data_auth
from .endpoints import event_mana
from .endpoints import test
api_router = APIRouter()
api_router.include_router(test.router, tags=["test"], prefix='/test')
api_router.include_router(user.router, tags=["用户接口"], prefix='/user')
api_router.include_router(project.router, tags=["项目接口"], prefix='/project')

View File

@ -104,12 +104,13 @@ async def my_event(request: Request,
return schemas.Msg(code=0, msg='ok', data=[])
event_list = []
event_show_name = await crud.event_mana.get_all_show_name(db, game)
event_list.append({'id': 'event', 'title': '全部事件', 'category': []})
for item in my_data_auth:
event_list[-1]['category'].append({
'event_name': item,
# todo 后面添加事件属性管理
'event_desc': item
'event_desc': event_show_name.get(item, item)
})
return schemas.Msg(code=0, msg='ok', data=event_list)

View File

@ -1,13 +1,16 @@
import json
import pandas as pd
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models import ToSql
from models.behavior_analysis import BehaviorAnalysis
router = APIRouter()
@ -28,21 +31,14 @@ async def query_sql(
async def event_model_sql(
request: Request,
game: str,
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析模型 sql"""
columns_json = await rdb.get(f'{game}_event')
event_columns = json.loads(columns_json)
columns_json = await rdb.get(f'{game}_event')
user_columns = json.loads(columns_json)
to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys())
res = to_sql.get_sql_query_event_model()
return schemas.Msg(code=0, msg='ok', data=res)
await analysis.init()
data = analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model")
@ -52,17 +48,12 @@ async def event_model(
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析"""
columns_json = await rdb.get(f'{game}_event')
event_columns = json.loads(columns_json)
columns_json = await rdb.get(f'{game}_event')
user_columns = json.loads(columns_json)
to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys())
sqls = to_sql.get_sql_query_event_model()
await analysis.init()
sqls = analysis.event_model_sql()
res = []
for item in sqls:
q = {
@ -71,7 +62,7 @@ async def event_model(
'event_name': item['event_name']
}
sql = item['sql']
groupby = item['groupby'][1:]
groupby = item['groupby']
date_range = item['date_range']
q['date_range'] = date_range
df = await ckdb.query_dataframe(sql)
@ -105,3 +96,82 @@ async def event_model(
q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']]
res.append(q)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/retention_model_sql")
async def retention_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init()
data = analysis.retention_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/retention_model")
async def retention_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init()
res = analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
unit_num = res['unit_num']
title = await crud.event_mana.get_show_name(db, game, event_a)
title = f'{title}用户数'
df = await ckdb.query_dataframe(sql)
concat_data = []
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# 计算整体
summary_df = df.groupby(['date', 'event_name'])[['values', 'amount']].sum()
summary_values = {}
for i, d1 in enumerate(date_range):
a = set(summary_df.loc[(d1, event_a)]['values']) if (d1, event_a) in summary_df.index else set()
if not a:
continue
key = d1.strftime('%Y-%m-%d')
for j, d2 in enumerate(date_range[i:]):
if j >= unit_num:
break
b = set(summary_df.loc[(d2, event_b)]['values']) if (d2, event_b) in summary_df.index else set()
tmp = summary_values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
groups = set([tuple(i) for i in df[res['groupby'][2:]].values])
df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True)
values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num]
for i, d1 in enumerate(date_range):
for g in groups:
a = set(df.loc[(d1, event_a, *g)]['values']) if (d1, event_a, *g) in df.index else set()
if not a:
continue
key = ','.join((d1.strftime("%Y-%m-%d"), *g))
for j, d2 in enumerate(date_range[i:]):
if j >= unit_num:
break
b = set(df.loc[(d2, event_b, *g)]['values']) if (d2, event_b, *g) in df.index else set()
tmp = values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
data = {
'summary_values': summary_values,
'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num],
'title': title
}
return schemas.Msg(code=0, msg='ok', data=data)

View File

@ -0,0 +1,29 @@
from typing import Any
from fastapi import APIRouter, Depends, Request
import schemas
from api import deps
from db.ckdb import CKDrive, get_ck_db
from db.redisdb import RedisDrive, get_redis_pool
from models.behavior_analysis import BehaviorAnalysis
router = APIRouter()
@router.post("/test")
async def test(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
rdb: RedisDrive = Depends(get_redis_pool),
current_user: schemas.UserDB = Depends(deps.get_current_user),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis)) -> schemas.Msg:
await analysis.init()
query = analysis.funnel_model_sql()
data = {
'game': game,
'analysis': analysis.game,
'query': query
}
return schemas.Msg(code=0, msg='ok', data=data)

1
common/__init__.py Normal file
View File

@ -0,0 +1 @@
from .compute import *

7
common/compute.py Normal file
View File

@ -0,0 +1,7 @@
def division(a, b, n=2):
res = 0
try:
res = round(a / b, n)
except ZeroDivisionError:
pass
return res

View File

@ -15,6 +15,17 @@ class CRUDEventMap(CRUDBase):
await self.update_one(db, {'game': game, 'event_name': data_id.show_name}, {'$set': data_id.dict()},
upsert=True)
async def get_show_name(self, db: AsyncIOMotorDatabase, game: str, event_name: str):
res = await self.find_one(db, {'game': game, 'event_name': event_name})
return res.get('show_name', event_name)
async def get_all_show_name(self, db: AsyncIOMotorDatabase, game: str):
cursor = self.find(db, {'game': game})
res = {}
async for item in self.to_list(cursor):
res[item['event_name']] = item['show_name']
return res
async def create_index(self, db: AsyncIOMotorDatabase):
await db[self.coll_name].create_index(
[('game', pymongo.DESCENDING), ('event_name', pymongo.DESCENDING)],

271
models/behavior_analysis.py Normal file
View File

@ -0,0 +1,271 @@
from typing import Tuple
import sqlalchemy as sa
import json
from fastapi import Depends
import pandas as pd
from sqlalchemy import func, or_, and_, not_
import schemas
from core.config import settings
from db.redisdb import get_redis_pool, RedisDrive
class BehaviorAnalysis:
def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)):
self.game = game
self.rdb = rdb
self.user_tbl = None
self.event_tbl = None
self.event_view = data_in.eventView
self.events = data_in.events
self.zone_time: int = 0
self.start_date = None
self.end_date = None
self.global_filters = None
self.groupby = None
self.time_particle = None
self.date_range = None
self.unit_num = None
async def init(self):
await self._init_table()
self.zone_time = self._get_zone_time()
self.time_particle = self._get_time_particle_size()
self.start_date, self.end_date, self.date_range = self._get_date_range()
self.global_filters = self._get_global_filters()
self.groupby = self._get_group_by()
self.unit_num = self._get_unit_num()
def _get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'P1D'
def _get_unit_num(self):
return self.event_view.get('unitNum')
def _get_group_by(self):
return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy')]
def _get_zone_time(self):
return int(self.event_view.get('zone_time', 8))
def _get_date_range(self) -> Tuple[str, str, list]:
start_date: str = self.event_view.get('startTime')
end_date: str = self.event_view.get('endTime')
date_range = pd.date_range(start_date, end_date, freq=settings.PROPHET_TIME_GRAIN_MAP[self.time_particle],
tz='UTC').tolist()
return start_date, end_date, date_range
def _get_global_filters(self):
return self.event_view.get('filts') or []
async def _init_table(self):
"""
从redis中取出表字段构建表结构
:return:
"""
res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
res_json = await self.rdb.get(f'{self.game}_event')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
def handler_filts(self, *ext_filters, g_f=True):
user_filter = []
event_filter = []
filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,)
for item in filters:
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
else:
continue
tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName'])
comparator = item['comparator']
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == '!=':
where.append(col != ftv[0])
return event_filter, user_filter
def retention_model_sql(self):
event_name_a = self.events[0]['eventName']
event_name_b = self.events[1]['eventName']
event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name')
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
date_col = sa.Column('date')
selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'),
event_name_col.label('event_name'),
*self.groupby,
func.arrayDistinct(func.groupArray(e_account_id_col)).label('values'),
func.length(sa.Column('values')).label('amount')
]
base_where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
event_name_col.in_([event_name_a, event_name_b]),
]
event_filter, user_filter = self.handler_filts()
groupby = [date_col, event_name_col] + self.groupby
oredrby = [date_col]
if user_filter:
qry = sa.select(selectd).select_from(
self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where(
and_(*user_filter, *event_filter, *base_where)).group_by(*groupby).order_by(
*oredrby).limit(10000)
else:
qry = sa.select(selectd).where(and_(*base_where, *event_filter)).group_by(*groupby).order_by(
*oredrby).limit(10000)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return {'sql': sql,
'groupby': ['date', 'event_name'] + [i.key for i in self.groupby],
'date_range': self.date_range,
'event_name': [event_name_a, event_name_b],
'unit_num': self.unit_num
}
def event_model_sql(self):
sqls = []
event_time_col = getattr(self.event_tbl.c, '#event_time')
select_exprs = [
settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)]
select_exprs += self.groupby
for event in self.events:
event_name = event['event_name']
event_name_col = getattr(self.event_tbl.c, '#event_name')
base_where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
event_name_col == event_name
]
analysis = event['analysis']
event_filter, user_filter = self.handler_filts(*event['filts'])
u_account_id_col = getattr(self.user_tbl.c, '#account_id')
# 按账号聚合
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
# 聚合方式
if analysis == 'total_count':
selectd = select_exprs + [func.count().label('values')]
elif analysis == 'touch_user_count':
selectd = sa.select(
select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')])
elif analysis == 'touch_user_avg':
selectd = select_exprs + [
func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label(
'values')]
elif analysis == 'distinct_count':
selectd = select_exprs + [
func.count(sa.distinct(getattr(self.event_tbl.c, event['event_attr_id']))).label('values')]
else:
selectd = select_exprs + [
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label(
'values')]
if user_filter:
qry = sa.select(selectd).select_from(
self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where(
and_(*user_filter, *event_filter, *base_where))
else:
qry = sa.select(selectd).where(and_(*event_filter, *base_where))
qry = qry.group_by(*select_exprs)
qry = qry.order_by(sa.Column('date'))
qry = qry.limit(1000)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
sqls.append({'sql': sql,
'groupby': [i.key for i in self.groupby],
'date_range': self.date_range,
'event_name': event_name
})
return sqls
def funnel_model_sql(self):
"""
SELECT
level,
count() AS values
FROM
(SELECT `#account_id`,
windowFunnel(864000)(`#event_time`, `#event_name` = 'create_role',`#event_name` = 'login') AS level
FROM event
WHERE (`#event_time` >= '2021-06-01 00:00:00')
AND (`#event_time` <= '2021-06-05 00:00:00')
GROUP BY `#account_id`)
GROUP BY level
ORDER BY level
:return:
"""
windows_gap = self.event_view['windows_gap'] * 86400
event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name')
e_account_id_col = getattr(self.event_tbl.c, '#account_id')
conds = []
for item in self.events:
event_filter, _ = self.handler_filts(*item['filts'], g_f=False)
conds.append(
and_(event_name_col == item['eventName'], *event_filter)
)
# todo 替换 _windows_gap_
subq = sa.select(func.windowFunnel_windows_gap__(event_time_col, *conds).alias('level'))
g_event_filter, _ = self.handler_filts()
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,
*g_event_filter
]
subq = subq.where(and_(*where)).group_by(e_account_id_col)
subq = subq.subquery()
qry = sa.select(sa.Column('level'), func.count()).select_from(subq)
sql = str(subq.compile(compile_kwargs={"literal_binds": True}))
sql = sql.replace('_windows_gap_', f'({windows_gap})')
print(sql)
return sql

13
sql/留存.sql Normal file
View File

@ -0,0 +1,13 @@
SELECT toStartOfDay(addHours(shjy.event."#event_time", 8)) AS date,
shjy.event."#event_name" AS event_name,
`app_name`,
arrayDistinct(groupArray(shjy.event."#account_id")) AS values,
length(values) as num
FROM shjy.event
WHERE addHours(shjy.event."#event_time", 8) >= '2021-05-10 00:00:00'
AND addHours(shjy.event."#event_time", 8) < '2021-06-08 23:59:59'
AND shjy.event."#event_name" IN ('create_role', 'login')
GROUP BY toStartOfDay(addHours(shjy.event."#event_time", 8)), shjy.event."#event_name", `app_name`
ORDER BY date
LIMIT 1000