diff --git a/api/api_v1/api.py b/api/api_v1/api.py index 2c1a470..f12c669 100644 --- a/api/api_v1/api.py +++ b/api/api_v1/api.py @@ -10,8 +10,10 @@ from .endpoints import data_mana from .endpoints import query from .endpoints import data_auth from .endpoints import event_mana +from .endpoints import test api_router = APIRouter() +api_router.include_router(test.router, tags=["test"], prefix='/test') api_router.include_router(user.router, tags=["用户接口"], prefix='/user') api_router.include_router(project.router, tags=["项目接口"], prefix='/project') diff --git a/api/api_v1/endpoints/data_auth.py b/api/api_v1/endpoints/data_auth.py index 81f7c5c..ce95c05 100644 --- a/api/api_v1/endpoints/data_auth.py +++ b/api/api_v1/endpoints/data_auth.py @@ -104,12 +104,13 @@ async def my_event(request: Request, return schemas.Msg(code=0, msg='ok', data=[]) event_list = [] + + event_show_name = await crud.event_mana.get_all_show_name(db, game) event_list.append({'id': 'event', 'title': '全部事件', 'category': []}) for item in my_data_auth: event_list[-1]['category'].append({ 'event_name': item, - # todo 后面添加事件属性管理 - 'event_desc': item + 'event_desc': event_show_name.get(item, item) }) return schemas.Msg(code=0, msg='ok', data=event_list) diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 8ffa69f..815ad53 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -1,13 +1,16 @@ -import json - import pandas as pd from fastapi import APIRouter, Depends, Request +from motor.motor_asyncio import AsyncIOMotorDatabase + import crud, schemas +from common import * from api import deps +from db import get_database from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive -from models import ToSql + +from models.behavior_analysis import BehaviorAnalysis router = APIRouter() @@ -28,21 +31,14 @@ async def query_sql( async def event_model_sql( request: Request, game: str, - data_in: schemas.CkQuery, - ckdb: CKDrive = Depends(get_ck_db), - rdb: RedisDrive = Depends(get_redis_pool), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 事件分析模型 sql""" - columns_json = await rdb.get(f'{game}_event') - event_columns = json.loads(columns_json) - - columns_json = await rdb.get(f'{game}_event') - user_columns = json.loads(columns_json) - to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys()) - res = to_sql.get_sql_query_event_model() - return schemas.Msg(code=0, msg='ok', data=res) + await analysis.init() + data = analysis.event_model_sql() + return schemas.Msg(code=0, msg='ok', data=data) @router.post("/event_model") @@ -52,17 +48,12 @@ async def event_model( data_in: schemas.CkQuery, ckdb: CKDrive = Depends(get_ck_db), rdb: RedisDrive = Depends(get_redis_pool), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 事件分析""" - columns_json = await rdb.get(f'{game}_event') - event_columns = json.loads(columns_json) - - columns_json = await rdb.get(f'{game}_event') - user_columns = json.loads(columns_json) - - to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys()) - sqls = to_sql.get_sql_query_event_model() + await analysis.init() + sqls = analysis.event_model_sql() res = [] for item in sqls: q = { @@ -71,7 +62,7 @@ async def event_model( 'event_name': item['event_name'] } sql = item['sql'] - groupby = item['groupby'][1:] + groupby = item['groupby'] date_range = item['date_range'] q['date_range'] = date_range df = await ckdb.query_dataframe(sql) @@ -105,3 +96,82 @@ async def event_model( q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']] res.append(q) return schemas.Msg(code=0, msg='ok', data=res) + + +@router.post("/retention_model_sql") +async def retention_model_sql( + request: Request, + game: str, + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """留存查询 sql""" + await analysis.init() + data = analysis.retention_model_sql() + return schemas.Msg(code=0, msg='ok', data=[data]) + + +@router.post("/retention_model") +async def retention_model( + request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + db: AsyncIOMotorDatabase = Depends(get_database), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """留存数据模型""" + await analysis.init() + res = analysis.retention_model_sql() + sql = res['sql'] + date_range = res['date_range'] + event_a, event_b = res['event_name'] + unit_num = res['unit_num'] + title = await crud.event_mana.get_show_name(db, game, event_a) + title = f'{title}用户数' + df = await ckdb.query_dataframe(sql) + concat_data = [] + df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) + # 计算整体 + summary_df = df.groupby(['date', 'event_name'])[['values', 'amount']].sum() + summary_values = {} + for i, d1 in enumerate(date_range): + a = set(summary_df.loc[(d1, event_a)]['values']) if (d1, event_a) in summary_df.index else set() + if not a: + continue + key = d1.strftime('%Y-%m-%d') + for j, d2 in enumerate(date_range[i:]): + if j >= unit_num: + break + b = set(summary_df.loc[(d2, event_b)]['values']) if (d2, event_b) in summary_df.index else set() + tmp = summary_values.setdefault(key, {}) + tmp.setdefault('d0', len(a)) + tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) + tmp.setdefault('n', []).append(len(a & b)) + groups = set([tuple(i) for i in df[res['groupby'][2:]].values]) + df.set_index(res['groupby'], inplace=True) + df.sort_index(inplace=True) + values = {} + days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num] + for i, d1 in enumerate(date_range): + for g in groups: + a = set(df.loc[(d1, event_a, *g)]['values']) if (d1, event_a, *g) in df.index else set() + if not a: + continue + key = ','.join((d1.strftime("%Y-%m-%d"), *g)) + for j, d2 in enumerate(date_range[i:]): + if j >= unit_num: + break + b = set(df.loc[(d2, event_b, *g)]['values']) if (d2, event_b, *g) in df.index else set() + tmp = values.setdefault(key, {}) + tmp.setdefault('d0', len(a)) + tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) + tmp.setdefault('n', []).append(len(a & b)) + data = { + 'summary_values': summary_values, + 'values': values, + 'days': days, + 'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num], + 'title': title + } + return schemas.Msg(code=0, msg='ok', data=data) diff --git a/api/api_v1/endpoints/test.py b/api/api_v1/endpoints/test.py new file mode 100644 index 0000000..854d2af --- /dev/null +++ b/api/api_v1/endpoints/test.py @@ -0,0 +1,29 @@ +from typing import Any + +from fastapi import APIRouter, Depends, Request + +import schemas +from api import deps +from db.ckdb import CKDrive, get_ck_db +from db.redisdb import RedisDrive, get_redis_pool +from models.behavior_analysis import BehaviorAnalysis + +router = APIRouter() + + +@router.post("/test") +async def test( + request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + rdb: RedisDrive = Depends(get_redis_pool), + current_user: schemas.UserDB = Depends(deps.get_current_user), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis)) -> schemas.Msg: + await analysis.init() + query = analysis.funnel_model_sql() + data = { + 'game': game, + 'analysis': analysis.game, + 'query': query + } + return schemas.Msg(code=0, msg='ok', data=data) diff --git a/common/__init__.py b/common/__init__.py new file mode 100644 index 0000000..c8b272a --- /dev/null +++ b/common/__init__.py @@ -0,0 +1 @@ +from .compute import * \ No newline at end of file diff --git a/common/compute.py b/common/compute.py new file mode 100644 index 0000000..05a5aa6 --- /dev/null +++ b/common/compute.py @@ -0,0 +1,7 @@ +def division(a, b, n=2): + res = 0 + try: + res = round(a / b, n) + except ZeroDivisionError: + pass + return res diff --git a/crud/crud_event_mana.py b/crud/crud_event_mana.py index 6f762d6..109bc01 100644 --- a/crud/crud_event_mana.py +++ b/crud/crud_event_mana.py @@ -15,6 +15,17 @@ class CRUDEventMap(CRUDBase): await self.update_one(db, {'game': game, 'event_name': data_id.show_name}, {'$set': data_id.dict()}, upsert=True) + async def get_show_name(self, db: AsyncIOMotorDatabase, game: str, event_name: str): + res = await self.find_one(db, {'game': game, 'event_name': event_name}) + return res.get('show_name', event_name) + + async def get_all_show_name(self, db: AsyncIOMotorDatabase, game: str): + cursor = self.find(db, {'game': game}) + res = {} + async for item in self.to_list(cursor): + res[item['event_name']] = item['show_name'] + return res + async def create_index(self, db: AsyncIOMotorDatabase): await db[self.coll_name].create_index( [('game', pymongo.DESCENDING), ('event_name', pymongo.DESCENDING)], diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py new file mode 100644 index 0000000..929c231 --- /dev/null +++ b/models/behavior_analysis.py @@ -0,0 +1,271 @@ +from typing import Tuple + +import sqlalchemy as sa +import json + +from fastapi import Depends + +import pandas as pd + +from sqlalchemy import func, or_, and_, not_ + +import schemas +from core.config import settings +from db.redisdb import get_redis_pool, RedisDrive + + +class BehaviorAnalysis: + def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)): + self.game = game + self.rdb = rdb + self.user_tbl = None + self.event_tbl = None + self.event_view = data_in.eventView + self.events = data_in.events + + self.zone_time: int = 0 + self.start_date = None + self.end_date = None + self.global_filters = None + self.groupby = None + self.time_particle = None + self.date_range = None + self.unit_num = None + + async def init(self): + await self._init_table() + self.zone_time = self._get_zone_time() + self.time_particle = self._get_time_particle_size() + self.start_date, self.end_date, self.date_range = self._get_date_range() + self.global_filters = self._get_global_filters() + self.groupby = self._get_group_by() + self.unit_num = self._get_unit_num() + + def _get_time_particle_size(self): + return self.event_view.get('timeParticleSize') or 'P1D' + + def _get_unit_num(self): + return self.event_view.get('unitNum') + + def _get_group_by(self): + return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy')] + + def _get_zone_time(self): + return int(self.event_view.get('zone_time', 8)) + + def _get_date_range(self) -> Tuple[str, str, list]: + start_date: str = self.event_view.get('startTime') + end_date: str = self.event_view.get('endTime') + date_range = pd.date_range(start_date, end_date, freq=settings.PROPHET_TIME_GRAIN_MAP[self.time_particle], + tz='UTC').tolist() + + return start_date, end_date, date_range + + def _get_global_filters(self): + return self.event_view.get('filts') or [] + + async def _init_table(self): + """ + 从redis中取出表字段,构建表结构 + :return: + """ + res_json = await self.rdb.get(f'{self.game}_user') + columns = json.loads(res_json).keys() + metadata = sa.MetaData(schema=self.game) + self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) + + res_json = await self.rdb.get(f'{self.game}_event') + columns = json.loads(res_json).keys() + metadata = sa.MetaData(schema=self.game) + self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) + + def handler_filts(self, *ext_filters, g_f=True): + user_filter = [] + event_filter = [] + filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,) + for item in filters: + if item['tableType'] == 'user': + where = user_filter + elif item['tableType'] == 'event': + where = event_filter + else: + continue + + tbl = getattr(self, f'{item["tableType"]}_tbl') + col = getattr(tbl.c, item['columnName']) + + comparator = item['comparator'] + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) + + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + + elif comparator == '!=': + where.append(col != ftv[0]) + + return event_filter, user_filter + + def retention_model_sql(self): + event_name_a = self.events[0]['eventName'] + event_name_b = self.events[1]['eventName'] + event_time_col = getattr(self.event_tbl.c, '#event_time') + event_name_col = getattr(self.event_tbl.c, '#event_name') + e_account_id_col = getattr(self.event_tbl.c, '#account_id') + u_account_id_col = getattr(self.user_tbl.c, '#account_id') + date_col = sa.Column('date') + + selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'), + event_name_col.label('event_name'), + *self.groupby, + func.arrayDistinct(func.groupArray(e_account_id_col)).label('values'), + func.length(sa.Column('values')).label('amount') + ] + base_where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + event_name_col.in_([event_name_a, event_name_b]), + ] + + event_filter, user_filter = self.handler_filts() + + groupby = [date_col, event_name_col] + self.groupby + oredrby = [date_col] + if user_filter: + qry = sa.select(selectd).select_from( + self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where( + and_(*user_filter, *event_filter, *base_where)).group_by(*groupby).order_by( + *oredrby).limit(10000) + else: + qry = sa.select(selectd).where(and_(*base_where, *event_filter)).group_by(*groupby).order_by( + *oredrby).limit(10000) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + print(sql) + return {'sql': sql, + 'groupby': ['date', 'event_name'] + [i.key for i in self.groupby], + 'date_range': self.date_range, + 'event_name': [event_name_a, event_name_b], + 'unit_num': self.unit_num + } + + def event_model_sql(self): + sqls = [] + event_time_col = getattr(self.event_tbl.c, '#event_time') + select_exprs = [ + settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)] + select_exprs += self.groupby + + for event in self.events: + event_name = event['event_name'] + event_name_col = getattr(self.event_tbl.c, '#event_name') + base_where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + event_name_col == event_name + ] + analysis = event['analysis'] + event_filter, user_filter = self.handler_filts(*event['filts']) + + u_account_id_col = getattr(self.user_tbl.c, '#account_id') + # 按账号聚合 + e_account_id_col = getattr(self.event_tbl.c, '#account_id') + + # 聚合方式 + if analysis == 'total_count': + selectd = select_exprs + [func.count().label('values')] + elif analysis == 'touch_user_count': + selectd = sa.select( + select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')]) + elif analysis == 'touch_user_avg': + selectd = select_exprs + [ + func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label( + 'values')] + + elif analysis == 'distinct_count': + selectd = select_exprs + [ + func.count(sa.distinct(getattr(self.event_tbl.c, event['event_attr_id']))).label('values')] + else: + selectd = select_exprs + [ + func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label( + 'values')] + + if user_filter: + qry = sa.select(selectd).select_from( + self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where( + and_(*user_filter, *event_filter, *base_where)) + + else: + qry = sa.select(selectd).where(and_(*event_filter, *base_where)) + + qry = qry.group_by(*select_exprs) + qry = qry.order_by(sa.Column('date')) + qry = qry.limit(1000) + + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + print(sql) + sqls.append({'sql': sql, + 'groupby': [i.key for i in self.groupby], + 'date_range': self.date_range, + 'event_name': event_name + }) + + return sqls + + def funnel_model_sql(self): + """ + SELECT + level, + count() AS values +FROM +(SELECT `#account_id`, + windowFunnel(864000)(`#event_time`, `#event_name` = 'create_role',`#event_name` = 'login') AS level +FROM event +WHERE (`#event_time` >= '2021-06-01 00:00:00') + AND (`#event_time` <= '2021-06-05 00:00:00') +GROUP BY `#account_id`) +GROUP BY level +ORDER BY level + :return: + """ + + windows_gap = self.event_view['windows_gap'] * 86400 + event_time_col = getattr(self.event_tbl.c, '#event_time') + event_name_col = getattr(self.event_tbl.c, '#event_name') + e_account_id_col = getattr(self.event_tbl.c, '#account_id') + conds = [] + for item in self.events: + event_filter, _ = self.handler_filts(*item['filts'], g_f=False) + conds.append( + and_(event_name_col == item['eventName'], *event_filter) + ) + # todo 替换 _windows_gap_ + subq = sa.select(func.windowFunnel_windows_gap__(event_time_col, *conds).alias('level')) + + g_event_filter, _ = self.handler_filts() + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + *g_event_filter + ] + subq = subq.where(and_(*where)).group_by(e_account_id_col) + subq = subq.subquery() + + qry = sa.select(sa.Column('level'), func.count()).select_from(subq) + sql = str(subq.compile(compile_kwargs={"literal_binds": True})) + sql = sql.replace('_windows_gap_', f'({windows_gap})') + print(sql) + return sql diff --git a/sql/留存.sql b/sql/留存.sql new file mode 100644 index 0000000..610fe4f --- /dev/null +++ b/sql/留存.sql @@ -0,0 +1,13 @@ +SELECT toStartOfDay(addHours(shjy.event."#event_time", 8)) AS date, + shjy.event."#event_name" AS event_name, + `app_name`, + arrayDistinct(groupArray(shjy.event."#account_id")) AS values, + length(values) as num + +FROM shjy.event +WHERE addHours(shjy.event."#event_time", 8) >= '2021-05-10 00:00:00' + AND addHours(shjy.event."#event_time", 8) < '2021-06-08 23:59:59' + AND shjy.event."#event_name" IN ('create_role', 'login') +GROUP BY toStartOfDay(addHours(shjy.event."#event_time", 8)), shjy.event."#event_name", `app_name` +ORDER BY date +LIMIT 1000 \ No newline at end of file