From c3eff67546e06d83d39f3e1408bae8e75a987ccb Mon Sep 17 00:00:00 2001 From: wuaho Date: Fri, 9 Jul 2021 16:55:45 +0800 Subject: [PATCH] update --- api/api_v1/endpoints/project.py | 4 +- api/api_v1/endpoints/query.py | 64 ++++++++++++++++++ crud/crud_authority.py | 6 +- models/behavior_analysis.py | 111 +++++++++++++++++++++++++++++++- schemas/sql.py | 5 +- sql/end_chain.sql | 44 +++++++++++++ sql/start_chain.sql | 43 +++++++++++++ sql/新增用户.sql | 4 ++ sql/活跃用户.sql | 4 ++ sql/留存2.sql | 17 +++++ sql/留存3.sql | 7 ++ sql/留存4.sql | 5 ++ 12 files changed, 308 insertions(+), 6 deletions(-) create mode 100644 sql/end_chain.sql create mode 100644 sql/start_chain.sql create mode 100644 sql/新增用户.sql create mode 100644 sql/活跃用户.sql create mode 100644 sql/留存2.sql create mode 100644 sql/留存3.sql create mode 100644 sql/留存4.sql diff --git a/api/api_v1/endpoints/project.py b/api/api_v1/endpoints/project.py index 049af7d..fe389e6 100644 --- a/api/api_v1/endpoints/project.py +++ b/api/api_v1/endpoints/project.py @@ -53,7 +53,9 @@ async def create( await crud.authority.create(db, 'p', role_name, role_dom, '*', '*') # 添加角色 await crud.authority.create(db, 'g', settings.SUPERUSER_NAME, role_name, '*', '*', role_name='系统项目管理员', game='*') - + # 添加数据权限 + await crud.authority.set_data_auth(db, schemas.DataAuthSet(username=request.user.username, data_auth_id='*'), + game=data_in.game,v1=role_name) return schemas.Msg(code=0, msg='创建成功') diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 27a6974..fcdaf52 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -1,3 +1,5 @@ +from collections import defaultdict + import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request @@ -60,6 +62,7 @@ async def event_model( q = { 'groups': [], 'values': [], + 'sum': [], 'event_name': item['event_name'] } sql = item['sql'] @@ -85,6 +88,7 @@ async def event_model( df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)]) df_group.sort_values('date', inplace=True) q['values'].append(df_group['values'].to_list()) + q['sum'].append(int(df['values'].sum())) else: # 无分组 @@ -94,6 +98,7 @@ async def event_model( df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) df.sort_values('date', inplace=True) q['values'].append(df['values'].to_list()) + q['sum'].append(int(df['values'].sum())) q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']] res.append(q) return schemas.Msg(code=0, msg='ok', data=res) @@ -402,3 +407,62 @@ async def scatter_model( total = int(tmp_df['values'].sum()) resp['list'][key.strftime('%Y-%m-%d')] = {'n': total, 'total': total, 'p': 100} return schemas.Msg(code=0, msg='ok', data=resp) + + +@router.post("/trace_model_sql") +async def trace_model_sql( + request: Request, + game: str, + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """路径分析 sql""" + await analysis.init() + data = analysis.trace_model_sql() + return schemas.Msg(code=0, msg='ok', data=[data]) + + +@router.post("/trace_model") +async def trace_model_sql( + request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """路径分析""" + await analysis.init() + res = analysis.trace_model_sql() + sql = res['sql'] + df = await ckdb.query_dataframe(sql) + chain_dict = defaultdict(dict) + nodes = {'流失'} + for event_names, count in zip(df['event_chain'], df['values']): + chain_len = len(event_names) + for i, event_name in enumerate(event_names): + if i >= 10: + continue + next_event = event_names[i + 1] if i < chain_len - 1 else '流失' + key = (f'{event_name}{i}', f'{next_event}{i + 1}') + nodes.update(key) + chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count + + links = [] + for _, items in chain_dict.items(): + for keys, val in items.items(): + links.append({ + "source": keys[0], + "target": keys[1], + "value": val + }) + # nodes = set() + # for item in links: + # nodes.update(( + # item['source'], + # item['target']) + # ) + data = { + 'nodes': [{'name': item} for item in nodes], + 'links': links + } + return schemas.Msg(code=0, msg='ok', data=data) diff --git a/crud/crud_authority.py b/crud/crud_authority.py index 69d48c9..e828b27 100644 --- a/crud/crud_authority.py +++ b/crud/crud_authority.py @@ -56,11 +56,13 @@ class CRUDAuthority(CRUDBase): res[item['title']].append(item) return res - async def set_data_auth(self, db: AsyncIOMotorDatabase, data_in, game): + async def set_data_auth(self, db: AsyncIOMotorDatabase, data_in, game, **kwargs): v0 = data_in.username v2 = game data_auth_id = data_in.data_auth_id - await self.update_one(db, {'ptype': 'g', 'v0': v0, 'v2': v2}, {'$set': {'data_auth_id': data_auth_id}}, + set_data = {'data_auth_id': data_auth_id} + set_data.update(kwargs) + await self.update_one(db, {'ptype': 'g', 'v0': v0, 'v2': v2}, {'$set': set_data}, upsert=True) async def get_data_auth(self, db, username, game): diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index 01a2442..dc3a204 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -48,7 +48,7 @@ class BehaviorAnalysis: return self.event_view.get('unitNum') def _get_group_by(self): - return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy')] + return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])] def _get_zone_time(self): return int(self.event_view.get('zone_time', 8)) @@ -335,3 +335,112 @@ ORDER BY level 'quota_interval_arr': quota_interval_arr, 'groupby': [i.key for i in self.groupby] } + + def trace_model_sql(self): + session_interval = self.event_view.get('session_interval') + session_type = self.event_view.get('session_type') + session_type_map = { + 'minute': 60, + 'second': 1, + 'hour': 3600 + + } + interval_ts = session_interval * session_type_map.get(session_type, 60) + event_names = self.events.get('event_names') + source_event = self.events.get('source_event', {}).get('eventName') + source_type = self.events.get('source_event', {}).get('source_type') + + sql_a = f"""with + '{source_event}' as start_event, + {tuple(event_names)} as evnet_all, + '{self.start_date}' as start_data, + '{self.end_date}' as end_data +select event_chain, + count() as values +from (with + toUInt32(minIf(`#event_time`, `#event_name` = start_event)) AS start_event_ts, + arraySort( + x -> + x.1, + arrayFilter( + x -> x.1 >= start_event_ts, + groupArray((toUInt32(`#event_time`), `#event_name`)) + ) + ) AS sorted_events, + arrayEnumerate(sorted_events) AS event_idxs, + arrayFilter( + (x, y, z) -> z.1 >= start_event_ts and ((z.2 = start_event and y > {interval_ts}) or y > {interval_ts}) , + event_idxs, + arrayDifference(sorted_events.1), + sorted_events + ) AS gap_idxs, + arrayMap(x -> x, gap_idxs) AS gap_idxs_, + arrayMap(x -> if(has(gap_idxs_, x), 1, 0), event_idxs) AS gap_masks, + arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events + select `#account_id`, + arrayJoin(split_events) AS event_chain_, + arrayMap(x -> + x.2, event_chain_) AS event_chain, + has(event_chain, start_event) AS has_midway_hit + + from (select `#event_time`, `#event_name`, `#account_id` + from {self.game}.event + where addHours(`#event_time`, {self.zone_time}) >= start_data + and addHours(`#event_time`, {self.zone_time}) <= end_data + and `#event_name` in evnet_all) + group by `#account_id` + HAVING has_midway_hit = 1 + ) +where arrayElement(event_chain, 1) = start_event +GROUP BY event_chain +ORDER BY values desc +""" + sql_b = f"""with + '{source_event}' as end_event, + {tuple(event_names)} as evnet_all, + '{self.start_date}' as start_data, + '{self.end_date}' as end_data +select event_chain, + count() as values +from (with + toUInt32(maxIf(`#event_time`, `#event_name` = end_event)) AS end_event_ts, + arraySort( + x -> + x.1, + arrayFilter( + x -> x.1 <= end_event_ts, + groupArray((toUInt32(`#event_time`), `#event_name`)) + ) + ) AS sorted_events, + arrayEnumerate(sorted_events) AS event_idxs, + arrayFilter( + (x, y, z) -> z.1 <= end_event_ts and (z.2 = end_event and y>{interval_ts}) OR y > {interval_ts}, + event_idxs, + arrayDifference(sorted_events.1), + sorted_events + ) AS gap_idxs, + arrayMap(x -> x+1, gap_idxs) AS gap_idxs_, + arrayMap(x -> if(has(gap_idxs_, x), 1,0), event_idxs) AS gap_masks, + arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events + select `#account_id`, + arrayJoin(split_events) AS event_chain_, + arrayMap(x -> + x.2, event_chain_) AS event_chain, + has(event_chain, end_event) AS has_midway_hit + from (select `#event_time`, `#event_name`, `#account_id` + from {self.game}.event + where addHours(`#event_time`, {self.zone_time}) >= start_data + and addHours(`#event_time`, {self.zone_time}) <= end_data + and `#event_name` in evnet_all) + group by `#account_id` + HAVING has_midway_hit = 1 + ) +where arrayElement(event_chain, -1) = end_event +GROUP BY event_chain +ORDER BY values desc""" + + sql = sql_a if source_type == 'initial_event' else sql_b + print(sql) + return { + 'sql': sql, + } diff --git a/schemas/sql.py b/schemas/sql.py index 6c816e7..91f3628 100644 --- a/schemas/sql.py +++ b/schemas/sql.py @@ -1,6 +1,7 @@ -from typing import List +from typing import List, Union from pydantic import BaseModel +from typing import Optional class Sql(BaseModel): @@ -9,4 +10,4 @@ class Sql(BaseModel): class CkQuery(BaseModel): eventView: dict - events: List[dict] + events: Union[List[dict],dict] diff --git a/sql/end_chain.sql b/sql/end_chain.sql new file mode 100644 index 0000000..f66421c --- /dev/null +++ b/sql/end_chain.sql @@ -0,0 +1,44 @@ +with + 'pvp_end' as end_event, + ('pvp_end', 'login') as evnet_all, + '2021-07-01 00:00:00' as start_data, + '2021-07-07 23:59:59' as end_data +select event_chain, + count() as values +from (with + toUInt32(maxIf(`#event_time`, `#event_name` = end_event)) AS end_event_ts, + arraySort( + x -> + x.1, + arrayFilter( + x -> x.1 <= end_event_ts, + groupArray((toUInt32(`#event_time`), `#event_name`)) + ) + ) AS sorted_events, + arrayEnumerate(sorted_events) AS event_idxs, + arrayFilter( + (x, y, z) -> z.1 <= end_event_ts and (z.2 = end_event and y>1800) OR y > 1800, + event_idxs, + arrayDifference(sorted_events.1), + sorted_events + ) AS gap_idxs, + arrayMap(x -> x+1, gap_idxs) AS gap_idxs_, + arrayMap(x -> if(has(gap_idxs_, x), 1,0), event_idxs) AS gap_masks, + arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events + select `#account_id`, + arrayJoin(split_events) AS event_chain_, + arrayMap(x -> + x.2, event_chain_) AS event_chain, + has(event_chain, end_event) AS has_midway_hit + from (select `#event_time`, `#event_name`, `#account_id` + from shjy.event + where addHours(`#event_time`, 8) >= start_data + and addHours(`#event_time`, 8) <= end_data + and `#event_name` in evnet_all) + group by `#account_id` + HAVING has_midway_hit = 1 + ) +where arrayElement(event_chain, -1) = end_event +GROUP BY event_chain +ORDER BY values desc + diff --git a/sql/start_chain.sql b/sql/start_chain.sql new file mode 100644 index 0000000..b69fa2e --- /dev/null +++ b/sql/start_chain.sql @@ -0,0 +1,43 @@ +with + 'create_role' as start_event, + ('create_role', 'pvp_end') as evnet_all, + '2021-06-30 00:00:00' as start_data, + '2021-07-06 23:59:59' as end_data +select event_chain, + count() as values +from (with + toUInt32(minIf(`#event_time`, `#event_name` = start_event)) AS start_event_ts, + arraySort( + x -> + x.1, + arrayFilter( + x -> x.1 >= start_event_ts, + groupArray((toUInt32(`#event_time`), `#event_name`)) + ) + ) AS sorted_events, + arrayEnumerate(sorted_events) AS event_idxs, + arrayFilter( + (x, y, z) -> z.1 >= start_event_ts and (z.2 = start_event OR y > 1800), + event_idxs, + arrayDifference(sorted_events.1), + sorted_events + ) AS gap_idxs, + arrayMap(x -> x, gap_idxs) AS gap_idxs_, + arrayMap(x -> if(has(gap_idxs_, x), 1, 0), event_idxs) AS gap_masks, + arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events + select `#account_id`, + arrayJoin(split_events) AS event_chain_, + arrayMap(x -> + x.2, event_chain_) AS event_chain, + has(event_chain, start_event) AS has_midway_hit + from (select `#event_time`, `#event_name`, `#account_id` + from shjy.event + where addHours(`#event_time`, 8) >= start_data + and addHours(`#event_time`, 8) <= end_data + and `#event_name` in evnet_all) + group by `#account_id` + HAVING has_midway_hit = 1 + ) +where arrayElement(event_chain, 1) = start_event +GROUP BY event_chain +ORDER BY values desc diff --git a/sql/新增用户.sql b/sql/新增用户.sql new file mode 100644 index 0000000..dd535e6 --- /dev/null +++ b/sql/新增用户.sql @@ -0,0 +1,4 @@ +select toDate(addHours(`#event_time`,8)), groupArray(`binduid`) as account,length(account) as num +from zhengba.event +where role_idx = 1 +group by toDate(addHours(`#event_time`,8)) \ No newline at end of file diff --git a/sql/活跃用户.sql b/sql/活跃用户.sql new file mode 100644 index 0000000..09d63b9 --- /dev/null +++ b/sql/活跃用户.sql @@ -0,0 +1,4 @@ +select toDate(addHours(`#event_time`, 8)), + uniqCombined(binduid), groupArray(distinct binduid) as bids +from zhengba.event +group by toDate(addHours(`#event_time`, 8)) \ No newline at end of file diff --git a/sql/留存2.sql b/sql/留存2.sql new file mode 100644 index 0000000..eba649e --- /dev/null +++ b/sql/留存2.sql @@ -0,0 +1,17 @@ +select date, account, login_date, +arrayMap((x,y)->x,account,login_date) + from (with groupArray(`binduid`) as account, + toDate(addHours(`#event_time`, 8)) as date + select date, + account +-- length(account) as num + from zhengba.event + where role_idx = 1 + group by date) as tb_a + left join (select arrayJoin(groupArray(date)) as dd, + groupArray((date, login_account)) as login_date + from (with groupArray(distinct binduid) as login_account, + toDate(addHours(`#event_time`, 8)) as date + select date, login_account + from zhengba.event + group by date)) as tb_b on tb_a.date = tb_b.dd \ No newline at end of file diff --git a/sql/留存3.sql b/sql/留存3.sql new file mode 100644 index 0000000..7fff510 --- /dev/null +++ b/sql/留存3.sql @@ -0,0 +1,7 @@ +select arrayJoin(groupArray(date)) as dd, + groupArray((date, login_account)) +from (with groupArray(distinct binduid) as login_account, + toDate(addHours(`#event_time`, 8)) as date + select date, login_account + from zhengba.event + group by date) \ No newline at end of file diff --git a/sql/留存4.sql b/sql/留存4.sql new file mode 100644 index 0000000..24ef821 --- /dev/null +++ b/sql/留存4.sql @@ -0,0 +1,5 @@ +select groupArray((date,login_account)) from (with groupArray(distinct binduid) as login_account, + toDate(addHours(`#event_time`, 8)) as date + select date, login_account +from zhengba.event +group by date) \ No newline at end of file