From 17644de3288ea8a1991464445128795d30f8a551 Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 25 May 2021 14:22:01 +0800 Subject: [PATCH] 1 --- api/api_v1/endpoints/data_auth.py | 46 ++++++++++++--- api/api_v1/endpoints/query.py | 37 ++++++++---- core/config.py | 63 ++++++++++---------- models/__init__.py | 1 + models/to_sql.py | 97 +++++++++++++++++++++++++++++++ schemas/report.py | 1 + schemas/sql.py | 3 +- 7 files changed, 199 insertions(+), 49 deletions(-) create mode 100644 models/__init__.py create mode 100644 models/to_sql.py diff --git a/api/api_v1/endpoints/data_auth.py b/api/api_v1/endpoints/data_auth.py index 8846985..73ae46d 100644 --- a/api/api_v1/endpoints/data_auth.py +++ b/api/api_v1/endpoints/data_auth.py @@ -100,11 +100,36 @@ async def my_event(request: Request, key_prefix = f'{game}_event_' event_dict = await rdb.smembers_keys(*my_data_auth['data'], prefix=key_prefix) - res = [] + event = [] + + group_by = [{ + 'id': item, + 'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)), + 'title': data_attr.get(item, {}).get('show_name') or item, + } for item in all_filed] for k, v in event_dict.items(): - event_attr = [] + event_attr = [{ + 'id': 'total_count', + 'data_type': None, + 'title': '总次数', + 'category': [] + }, + { + 'id': 'touch_user_count', + 'data_type': None, + 'title': '触发用户数', + 'category': [] + }, + { + 'id': 'touch_user_avg', + 'data_type': None, + 'title': '人均次数', + 'category': [] + } + + ] event_filter = [] - for item in v: + for item in sorted(v): data_type = settings.CK_TYPE_DICT.get(all_filed.get(item)) title = data_attr.get(item, {}).get('show_name') or item event_attr.append( @@ -121,14 +146,19 @@ async def my_event(request: Request, 'title': title, 'category': settings.CK_FILTER.get(data_type) or [] }) - res.append({ + event.append({ 'event_name': k, 'event_attr': [{'id': 'event', 'title': '事件属性', 'category': event_attr}], 'event_filter': [{'id': 'event', 'title': '事件属性', 'category': event_filter}], } ) - return schemas.Msg(code=0, msg='ok', data=[{'id': 'event', - 'title': '默认分组', - 'category': res - }]) + res = { + 'analysis': [{'id': 'event', + 'title': '默认分组', + 'category': event + }], + 'group_by': group_by + } + + return schemas.Msg(code=0, msg='ok', data=res) diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index c7248e1..a1dc266 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -1,9 +1,13 @@ +import json + import pandas as pd from fastapi import APIRouter, Depends, Request import crud, schemas from api import deps from db.ckdb import get_ck_db, CKDrive +from db.redisdb import get_redis_pool, RedisDrive +from models import ToSql router = APIRouter() @@ -20,27 +24,40 @@ async def query_sql( return schemas.Msg(code=0, msg='ok', data=data) -@router.post("/query") -async def query( +@router.post("/event_model_sql") +async def event_model_sql( request: Request, + game: str, data_in: schemas.CkQuery, ckdb: CKDrive = Depends(get_ck_db), + rdb: RedisDrive = Depends(get_redis_pool), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: - """ json解析 sql 查询""" - # data, columns = await ckdb.execute(data_in.sql, with_column_types=True, columnar=True) - # df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) - return schemas.Msg(code=0, msg='ok', data=data_in) + """ 事件分析模型 sql""" + + columns_json = await rdb.get(f'{game}_event') + columns = json.loads(columns_json) + to_sql = ToSql(data_in.dict(), game, 'event', columns.keys()) + res = to_sql.get_sql_query_event_model() + return schemas.Msg(code=0, msg='ok', data=res) @router.post("/event_model") async def event_model( request: Request, + game: str, data_in: schemas.CkQuery, ckdb: CKDrive = Depends(get_ck_db), + rdb: RedisDrive = Depends(get_redis_pool), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: - """ json解析 sql 查询""" - # data, columns = await ckdb.execute(data_in.sql, with_column_types=True, columnar=True) - # df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) - return schemas.Msg(code=0, msg='ok', data=data_in) + """ 事件分析""" + columns_json = await rdb.get(f'{game}_event') + columns = json.loads(columns_json) + to_sql = ToSql(data_in.dict(), game, 'event', columns.keys()) + sqls = to_sql.get_sql_query_event_model() + res = [] + for sql in sqls: + data = await ckdb.execute(sql) + res.append(data) + return schemas.Msg(code=0, msg='ok', data=res) diff --git a/core/config.py b/core/config.py index 9233485..85c1b91 100644 --- a/core/config.py +++ b/core/config.py @@ -91,13 +91,13 @@ class Settings(BaseSettings): 'id': 'min', 'title': '最小值' }, { - 'id': 'distinct', + 'id': 'distinct_count', 'title': '去重数' }, ], 'string': [{ - 'id': 'distinct', + 'id': 'distinct_count', 'title': '去重数' }], 'float': [{ @@ -116,22 +116,22 @@ class Settings(BaseSettings): 'id': 'min', 'title': '最小值' }, { - 'id': 'distinct', + 'id': 'distinct_count', 'title': '去重数' }, ], 'array': [ { - 'id': 'distinct1', + 'id': 'list_distinct', 'title': '列表去重数' }, { - 'id': 'distinct2', + 'id': 'set_distinct', 'title': '集合去重数' }, { - 'id': 'distinct3', + 'id': 'ele_distinct', 'title': '元素去重数' }, ] @@ -139,7 +139,7 @@ class Settings(BaseSettings): CK_FILTER = { 'int': [{ - 'id': '=', + 'id': '==', 'title': '等于' }, { 'id': '!=', @@ -162,33 +162,34 @@ class Settings(BaseSettings): }, ], 'string': [{ - 'id': '=', + 'id': '==', 'title': '等于' }, { 'id': '!=', 'title': '不等于' }, { - 'id': 'in', - 'title': '包括' + 'id': 'like', + 'title': '包含' }, { - 'id': 'not in', - 'title': '不包括' + 'id': 'not like', + 'title': '不包含' }, { - 'id': 'not null', + 'id': 'is not null', 'title': '有值' }, { 'id': 'is null', 'title': '无值' - }, { - 'id': 'regex', - 'title': '正则匹配' - }, { - 'id': 'not regex', - 'title': '正则不匹配' }, + # { + # 'id': 'regex', + # 'title': '正则匹配' + # }, { + # 'id': 'not regex', + # 'title': '正则不匹配' + # }, ], 'float': [{ - 'id': '=', + 'id': '==', 'title': '等于' }, { 'id': '!=', @@ -200,34 +201,36 @@ class Settings(BaseSettings): 'id': '>', 'title': '大于' }, { - 'id': 'not null', + 'id': 'is not null', 'title': '有值' }, { 'id': 'is null', 'title': '无值' - }, { - 'id': 'range', - 'title': '区间' - }, ], + }, + # { + # 'id': 'range', + # 'title': '区间' + # }, + ], 'datetime': [ { - 'id': '=', + 'id': '==', 'title': '绝对时间' }, { - 'id': '=', + 'id': '==', 'title': '相对当前日期' }, { - 'id': '=', + 'id': '==', 'title': '相对事件发生时刻' }, { - 'id': '=', + 'id': 'is not null', 'title': '有值' }, { - 'id': '=', + 'id': 'is null', 'title': '无值' }, ], diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..ae73002 --- /dev/null +++ b/models/__init__.py @@ -0,0 +1 @@ +from .to_sql import ToSql diff --git a/models/to_sql.py b/models/to_sql.py new file mode 100644 index 0000000..2922d86 --- /dev/null +++ b/models/to_sql.py @@ -0,0 +1,97 @@ +from typing import List, Tuple +import sqlalchemy as sa +from sqlalchemy.sql import func +from sqlalchemy import create_engine, column, and_, desc, table, or_ + + +class ToSql: + def __init__(self, data: dict, db_name: str, table_name: str, columns: List[str]): + self.db_name = db_name + self.engine = create_engine('clickhouse://') + self.columns = self.gen_columns(columns) + self.event_view = data.get('eventView') + self.events = data.get('events') + + self.table = sa.table(table_name, *self.columns, schema=self.db_name) + + def gen_columns(self, columns): + return {col: column(col) for col in columns} + + def get_date_range(self) -> Tuple[str, str]: + start_data: str = self.event_view.get('startTime') + end_data: str = self.event_view.get('endTime') + return start_data, end_data + + def get_global_filters(self): + return self.event_view.get('filters') or [] + + def get_group_by(self): + # return self.event_view.get('groupBy') or [] + return [item['columnName'] for item in self.event_view.get('groupBy')] + + def get_time_particle_size(self): + return self.event_view.get('timeParticleSize') or 'day' + + def get_sql_query_event_model(self): + """只是查event表""" + sqls = [] + select_exprs = self.get_group_by() + select_exprs = [self.columns.get(item) for item in select_exprs] + time_particle_size = self.get_time_particle_size() + if time_particle_size == 'day': + select_exprs.append(func.toYYYYMMDD(self.columns['#event_time']).label('date')) + + start_data, end_data = self.get_date_range() + + for event in self.events: + event_name = event['event_name'] + where = [ + self.columns['#event_time'] >= start_data, + self.columns['#event_time'] <= end_data, + self.columns['#event_name'] == event_name + ] + analysis = event['analysis'] + filters = event['filters'] + self.get_global_filters() + for item in filters: + col = self.columns.get(item['column_id']) + comparator = item['comparator_id'] + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) + elif comparator == '!=': + where.append(col != ftv[0]) + + if analysis == 'total_count': + qry = sa.select(select_exprs + [func.count()]) + elif analysis == 'touch_user_count': + qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['#account_id']]))]) + elif analysis == 'touch_user_avg': + qry = sa.select(select_exprs + [func.count(func.avg(self.columns[event['#account_id']]))]) + + elif analysis == 'distinct_count': + qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']]))]) + else: + qry = sa.select(select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']])]) + + qry = qry.where(and_(*where)) + + qry = qry.group_by(*select_exprs) + + if time_particle_size == 'day': + qry = qry.order_by(column('date')) + + qry = qry.select_from(self.table) + sqls.append(str(qry.compile(self.engine, compile_kwargs={"literal_binds": True}))) + + return sqls diff --git a/schemas/report.py b/schemas/report.py index 2e431e2..47cc59e 100644 --- a/schemas/report.py +++ b/schemas/report.py @@ -19,6 +19,7 @@ class ReportCreate(ReportBase): name: str project_id: str query: Json + sql: str class ReportDelete(DBBase): diff --git a/schemas/sql.py b/schemas/sql.py index 4c617da..6c816e7 100644 --- a/schemas/sql.py +++ b/schemas/sql.py @@ -8,4 +8,5 @@ class Sql(BaseModel): class CkQuery(BaseModel): - report_id: List[str] + eventView: dict + events: List[dict]