diff --git a/api/api_v1/endpoints/data_auth.py b/api/api_v1/endpoints/data_auth.py index e6fe954..81f7c5c 100644 --- a/api/api_v1/endpoints/data_auth.py +++ b/api/api_v1/endpoints/data_auth.py @@ -87,14 +87,9 @@ async def my_event(request: Request, ) -> schemas.Msg: """获取自己的事件权限""" - all_filed = await rdb.get(f'{game}_event') - all_filed = json.loads(all_filed) - data_attr = await crud.data_attr.find_many(db, game=game, cat='event') - data_attr = {item['name']: item for item in data_attr} - data_auth_id = await crud.authority.get_data_auth_id(db, game, request.user.username) - + my_data_auth = [] if data_auth_id: # 所有数据权限 if data_auth_id == '*': @@ -107,92 +102,141 @@ async def my_event(request: Request, if not my_data_auth: return schemas.Msg(code=0, msg='ok', data=[]) - key_prefix = f'{game}_event_' - event_dict = await rdb.smembers_keys(*my_data_auth, prefix=key_prefix) - else: - event_dict = {} - event = [] + event_list = [] + event_list.append({'id': 'event', 'title': '全部事件', 'category': []}) + for item in my_data_auth: + event_list[-1]['category'].append({ + 'event_name': item, + # todo 后面添加事件属性管理 + 'event_desc': item + }) - group_by = [{ - 'id': item, - 'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)), - 'title': data_attr.get(item, {}).get('show_name') or item, - 'category': settings.CK_FILTER.get(settings.CK_TYPE_DICT.get(all_filed.get(item))) or [] - } for item in all_filed] + return schemas.Msg(code=0, msg='ok', data=event_list) - deserialization = { - 'event_attr': {}, - 'event_filter': {} - } - for k, v in event_dict.items(): - event_attr = [{ - 'id': '*', - 'data_type': None, - 'analysis': 'total_count', - 'title': '总次数', - 'category': [] - }, - { - 'id': '*', - 'analysis': 'touch_user_count', - 'data_type': None, - 'title': '触发用户数', - 'category': [] - }, - { - 'id': '*', - 'analysis': 'touch_user_avg', - 'data_type': None, - 'title': '人均次数', - 'category': [] - } - ] - event_filter = [] - for item in sorted(v): - data_type = settings.CK_TYPE_DICT.get(all_filed.get(item)) - title = data_attr.get(item, {}).get('show_name') or item - event_attr.append( - { - 'id': item, - 'data_type': data_type, - 'title': title, - 'category': settings.CK_OPERATOR.get(data_type) or [] - } - ) +@router.post('/load_prop_quotas') +async def load_prop_quotas(request: Request, + game: str, + data_in: schemas.LoadProQuotas, + db: AsyncIOMotorDatabase = Depends(get_database), + rdb: RedisDrive = Depends(get_redis_pool), + ck: CKDrive = Depends(get_ck_db), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """事件属性 聚合条件""" + key = f'{game}_event_{data_in.event_name}' + event_prop_set = await rdb.smembers(key) + event_prop_list = sorted(event_prop_set) - event_filter.append({ - 'id': item, - 'data_type': data_type, - 'title': title, - 'category': settings.CK_FILTER.get(data_type) or [] - }) - - deserialization['event_attr'][k] = [{'id': 'event', 'title': '事件属性', 'category': event_attr}] - deserialization['event_filter'][k] = [{'id': 'event', 'title': '事件属性', 'category': event_filter}] - - event.append({ - 'event_name': k, - 'event_attr': [{'id': 'event', 'title': '事件属性', 'category': event_attr}], - 'event_filter': [{'id': 'event', 'title': '事件属性', 'category': event_filter}], + all_filed = await rdb.get(f'{game}_event') + all_filed = json.loads(all_filed) + data_attr = await crud.data_attr.find_many(db, game=game, cat='event') + data_attr = {item['name']: item for item in data_attr} + event_props = [] + for item in event_prop_list: + data_type = settings.CK_TYPE_DICT.get(all_filed.get(item)) + title = data_attr.get(item, {}).get('show_name') or item + event_prop = { + 'id': item, + 'data_type': data_type, + 'title': title, + 'category': settings.CK_OPERATOR.get(data_type) or [] } - ) + event_props.append(event_prop) + + staid_quots = [ + { + "id": "*", + "data_type": None, + "analysis": "total_count", + "title": "总次数", + "category": [] + }, + { + "id": "*", + "analysis": "touch_user_count", + "data_type": None, + "title": "触发用户数", + "category": [] + }, + { + "id": "*", + "analysis": "touch_user_avg", + "data_type": None, + "title": "人均次数", + "category": [] + }, + ] res = { - 'operator': settings.CK_OPERATOR, - 'filter': settings.CK_FILTER, - 'deserialization': deserialization, + 'props': event_props, + 'staid_quots': staid_quots - 'analysis': [{'id': 'event', - 'title': '默认分组', - 'category': event - }], - 'group_by': [{ - 'id': 'event', - 'title': '默认分组', - 'category': group_by - }] } return schemas.Msg(code=0, msg='ok', data=res) + + +@router.post('/load_filter_props') +async def load_filter_props(request: Request, + game: str, + data_in: schemas.LoadProQuotas, + db: AsyncIOMotorDatabase = Depends(get_database), + rdb: RedisDrive = Depends(get_redis_pool), + ck: CKDrive = Depends(get_ck_db), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """事件属性 过滤条件""" + key = f'{game}_event_{data_in.event_name}' + event_prop_set = await rdb.smembers(key) + event_prop_list = sorted(event_prop_set) + + key = f'{game}_user' + user_prop_set = await rdb.get(key) + user_prop_list = sorted(event_prop_set) + + all_filed = await rdb.get(f'{game}_event') + all_filed = json.loads(all_filed) + data_attr = await crud.data_attr.find_many(db, game=game, cat='event') + data_attr = {item['name']: item for item in data_attr} + event_props = [] + for item in event_prop_list: + data_type = settings.CK_TYPE_DICT.get(all_filed.get(item)) + title = data_attr.get(item, {}).get('show_name') or item + event_prop = { + 'id': item, + 'data_type': data_type, + 'title': title, + 'category': settings.CK_FILTER.get(data_type) or [] + } + event_props.append(event_prop) + + data_attr = await crud.data_attr.find_many(db, game=game, cat='user') + data_attr = {item['name']: item for item in data_attr} + user_props = [] + for item in user_prop_list: + data_type = settings.CK_TYPE_DICT.get(all_filed.get(item)) + title = data_attr.get(item, {}).get('show_name') or item + user_prop = { + 'id': item, + 'data_type': data_type, + 'title': title, + 'category': settings.CK_FILTER.get(data_type) or [] + } + user_props.append(user_prop) + + res = [ + { + 'title': '事件属性', + 'id': 'event', + 'category': event_props + }, + { + 'title': '用户属性', + 'id': 'user', + 'category': user_props + } + ] + + return schemas.Msg(code=0, msg='ok', data=res) diff --git a/api/api_v1/endpoints/data_mana.py b/api/api_v1/endpoints/data_mana.py index 91c52ec..7f08d66 100644 --- a/api/api_v1/endpoints/data_mana.py +++ b/api/api_v1/endpoints/data_mana.py @@ -9,6 +9,7 @@ import crud, schemas from api import deps from core.config import settings from db import get_database +from db.ckdb import CKDrive, get_ck_db from db.redisdb import get_redis_pool router = APIRouter() @@ -16,7 +17,7 @@ router = APIRouter() __all__ = 'router', -@router.get("/list") +@router.get("/event_attr_list") async def read_data_attr( request: Request, game: str, @@ -25,7 +26,7 @@ async def read_data_attr( rdb: Redis = Depends(get_redis_pool), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: - """属性列表""" + """事件属性列表""" data = await rdb.get(f'{game}_{cat}') data = json.loads(data) res = [] @@ -46,7 +47,7 @@ async def read_data_attr( return schemas.Msg(code=0, msg='ok', data=res) -@router.post("/edit") +@router.post("/event_attr_edit") async def edit_data_attr( request: Request, game: str, @@ -55,6 +56,43 @@ async def edit_data_attr( rdb: Redis = Depends(get_redis_pool), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: - """编辑属性""" + """编辑事件属性""" + await crud.data_attr.edit_data_attr(db, game, data_in) + return schemas.Msg(code=0, msg='ok', data=data_in) + + +@router.get("/event_list") +async def event_list( + request: Request, + game: str, + cat: str, + db: AsyncIOMotorDatabase = Depends(get_database), + ckdb: CKDrive = Depends(get_ck_db), + rdb: Redis = Depends(get_redis_pool), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """事件列表""" + + event_list = await ckdb.distinct(game, 'event', '#event_name') + pass + # make_event_list = await crud.event_map + res = [] + for name in event_list(): + res.append( + + ) + return schemas.Msg(code=0, msg='ok', data=res) + + +@router.post("/event_edit") +async def event_edit( + request: Request, + game: str, + data_in: schemas.DataAttrEdit, + db: AsyncIOMotorDatabase = Depends(get_database), + rdb: Redis = Depends(get_redis_pool), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """编辑事件""" await crud.data_attr.edit_data_attr(db, game, data_in) return schemas.Msg(code=0, msg='ok', data=data_in) diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 2edb391..048d712 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -36,8 +36,11 @@ async def event_model_sql( """ 事件分析模型 sql""" columns_json = await rdb.get(f'{game}_event') - columns = json.loads(columns_json) - to_sql = ToSql(data_in.dict(), game, 'event', columns.keys()) + event_columns = json.loads(columns_json) + + columns_json = await rdb.get(f'{game}_event') + user_columns = json.loads(columns_json) + to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys()) res = to_sql.get_sql_query_event_model() return schemas.Msg(code=0, msg='ok', data=res) @@ -53,8 +56,12 @@ async def event_model( ) -> schemas.Msg: """ 事件分析""" columns_json = await rdb.get(f'{game}_event') - columns = json.loads(columns_json) - to_sql = ToSql(data_in.dict(), game, 'event', columns.keys()) + event_columns = json.loads(columns_json) + + columns_json = await rdb.get(f'{game}_event') + user_columns = json.loads(columns_json) + + to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys()) sqls = to_sql.get_sql_query_event_model() res = [] for item in sqls: diff --git a/core/config.py b/core/config.py index 32ae1ec..01c2ad2 100644 --- a/core/config.py +++ b/core/config.py @@ -2,6 +2,7 @@ import sys from typing import Any, Dict, List, Optional, Union from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, validator +from sqlalchemy import func class Settings(BaseSettings): @@ -235,6 +236,31 @@ class Settings(BaseSettings): ], } + PROPHET_TIME_GRAIN_MAP = { + "PT1S": "S", + "PT1M": "min", + "PT5M": "5min", + "PT10M": "10min", + "PT15M": "15min", + "PT0.5H": "30min", + "PT1H": "H", + "P1D": "D", + "P1W": "W", + "P1M": "M", + } + + TIME_GRAIN_EXPRESSIONS = { + 'PT1S': lambda col, zone: func.toStartOfSecond(func.addHours(col, zone)).label('date'), + 'PT1M': lambda col, zone: func.toStartOfMinute(func.addHours(col, zone)).label('date'), + 'PT5M': lambda col, zone: func.toStartOfFiveMinute(func.addHours(col, zone)).label('date'), + 'PT10M': lambda col, zone: func.toStartOfTenMinutes(func.addHours(col, zone)).label('date'), + 'PT15M': lambda col, zone: func.toStartOfFifteenMinutes(func.addHours(col, zone)).label('date'), + 'PT1H': lambda col, zone: func.toStartOfHour(func.addHours(col, zone)).label('date'), + 'P1D': lambda col, zone: func.toStartOfDay(func.addHours(col, zone)).label('date'), + 'P1W': lambda col, zone: func.toMonday(func.addHours(col, zone)).label('date'), + 'P1M': lambda col, zone: func.toStartOfMonth(func.addHours(col, zone)).label('date'), + } + class Config: case_sensitive = True diff --git a/crud/__init__.py b/crud/__init__.py index 31dc7e2..35fa7ca 100644 --- a/crud/__init__.py +++ b/crud/__init__.py @@ -7,3 +7,4 @@ from .crud_report import report from .crud_authority import authority from .crud_data_auth import data_auth from .crud_data_attr import data_attr +from .crud_api_log import api_log diff --git a/crud/crud_api_log.py b/crud/crud_api_log.py new file mode 100644 index 0000000..cc4bfa4 --- /dev/null +++ b/crud/crud_api_log.py @@ -0,0 +1,14 @@ +from motor.motor_asyncio import AsyncIOMotorDatabase + +import schemas +from crud.base import CRUDBase + +__all__ = 'api_log', + + +class CRUDApiLog(CRUDBase): + async def insert_log(self, db: AsyncIOMotorDatabase, data_in: schemas.ApiLogInsert): + await db[self.coll_name].insert_one(data_in.dict()) + + +api_log = CRUDApiLog('api_log') diff --git a/crud/crud_event_map.py b/crud/crud_event_map.py new file mode 100644 index 0000000..df08df6 --- /dev/null +++ b/crud/crud_event_map.py @@ -0,0 +1,24 @@ +import pymongo +from bson import ObjectId +from motor.motor_asyncio import AsyncIOMotorDatabase + +import schemas +from crud.base import CRUDBase +from schemas import * + +__all__ = 'event_map', + + +class CRUDEventMap(CRUDBase): + + async def edit_data_attr(self, db: AsyncIOMotorDatabase, game: str, data_id: schemas.EventMapEdit): + await self.update_one(db, {'game': game, 'cat': data_id.cat, 'name': data_id.name}, {'$set': data_id.dict()}, + upsert=True) + + async def create_index(self, db: AsyncIOMotorDatabase): + await db[self.coll_name].create_index( + [('game', pymongo.DESCENDING), ('cat', pymongo.DESCENDING), ('name', pymongo.DESCENDING)], + unique=True) + + +event_map = CRUDEventMap('event_map') diff --git a/main.py b/main.py index a0a5b37..6d90c60 100644 --- a/main.py +++ b/main.py @@ -3,9 +3,15 @@ import time import uvicorn from fastapi import FastAPI, Request +from fastapi.exceptions import RequestValidationError from starlette.middleware.cors import CORSMiddleware from starlette.authentication import AuthenticationBackend, AuthenticationError, AuthCredentials, BaseUser, SimpleUser from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.responses import Response + +import crud +import schemas +from db.redisdb import get_redis_pool from middleware import CasbinMiddleware from db import connect_to_mongo, close_mongo_connection, get_database @@ -74,12 +80,27 @@ app.add_middleware( ) +# @app.exception_handler(RequestValidationError) +# async def validation_exception_handler(request, exc): +# return Response(schemas.Msg(code='-4', msg='服务器错误', data=str(exc)), status_code=200) + + +# @app.exception_handler(Exception) +# async def http_exception_handler(request, exc): +# return Response(schemas.Msg(code='-3', msg='服务器错误'), status_code=200) + + @app.middleware("http") async def add_process_time_header(request: Request, call_next): - start_time = int(time.time()*1000) + start_time = int(time.time() * 1000) response = await call_next(request) - process_time = int(time.time()*1000) - start_time + process_time = int(time.time() * 1000) - start_time response.headers["X-Process-Time"] = str(process_time) + await crud.api_log.insert_log(get_database(), schemas.ApiLogInsert( + api=str(request.url), + ms=process_time, + user_id=request.user.id + )) return response diff --git a/models/to_sql.py b/models/to_sql.py index eca1542..1722cfd 100644 --- a/models/to_sql.py +++ b/models/to_sql.py @@ -4,42 +4,21 @@ from sqlalchemy.sql import func from sqlalchemy import create_engine, column, and_, desc, table, or_ import pandas as pd -PROPHET_TIME_GRAIN_MAP = { - "PT1S": "S", - "PT1M": "min", - "PT5M": "5min", - "PT10M": "10min", - "PT15M": "15min", - "PT0.5H": "30min", - "PT1H": "H", - "P1D": "D", - "P1W": "W", - "P1M": "M", -} - -TIME_GRAIN_EXPRESSIONS = { - 'PT1S': lambda col, zone: func.toStartOfSecond(func.addHours(col, zone)).label('date'), - 'PT1M': lambda col, zone: func.toStartOfMinute(func.addHours(col, zone)).label('date'), - 'PT5M': lambda col, zone: func.toStartOfFiveMinute(func.addHours(col, zone)).label('date'), - 'PT10M': lambda col, zone: func.toStartOfTenMinutes(func.addHours(col, zone)).label('date'), - 'PT15M': lambda col, zone: func.toStartOfFifteenMinutes(func.addHours(col, zone)).label('date'), - # 'PT0.5H': lambda col, zone: func.toStartOfMinute(func.addHours(col, zone)).label('date'), - 'PT1H': lambda col, zone: func.toStartOfHour(func.addHours(col, zone)).label('date'), - 'P1D': lambda col, zone: func.toStartOfDay(func.addHours(col, zone)).label('date'), - 'P1W': lambda col, zone: func.toMonday(func.addHours(col, zone)).label('date'), - 'P1M': lambda col, zone: func.toStartOfMonth(func.addHours(col, zone)).label('date'), -} +from core.config import settings class ToSql: - def __init__(self, data: dict, db_name: str, table_name: str, columns: List[str]): + def __init__(self, data: dict, db_name: str, event_columns: List[str], user_columns: List[str]): self.db_name = db_name self.engine = create_engine('clickhouse://') - self.columns = self.gen_columns(columns) self.event_view = data.get('eventView') self.events = data.get('events') - self.table = sa.table(table_name, *self.columns.values(), schema=self.db_name) + self.event_columns = self.gen_columns(event_columns) + self.user_columns = self.gen_columns(user_columns) + + self.event_table = sa.table('event', *self.event_columns.values(), schema=self.db_name) + self.user_table = sa.table('user_view', *self.user_columns.values(), schema=self.db_name) def gen_columns(self, columns): return {col: column(col) for col in columns} @@ -64,28 +43,34 @@ class ToSql: return self.event_view.get('timeParticleSize') or 'P1D' def get_sql_query_event_model(self): - """只是查event表""" + is_join_user = False sqls = [] select_exprs = self.get_group_by() - select_exprs = [self.columns.get(item) for item in select_exprs] + select_exprs = [self.event_columns.get(item) for item in select_exprs] time_particle_size = self.get_time_particle_size() start_data, end_data = self.get_date_range() time_zone = self.get_zone_time() - select_exprs.insert(0, TIME_GRAIN_EXPRESSIONS[time_particle_size](self.columns['#event_time'], time_zone)) - date_range = pd.date_range(start_data, end_data, freq=PROPHET_TIME_GRAIN_MAP[time_particle_size]).tolist() + select_exprs.insert(0, settings.TIME_GRAIN_EXPRESSIONS[time_particle_size](self.event_columns['#event_time'], + time_zone)) + date_range = pd.date_range(start_data, end_data, freq=settings.PROPHET_TIME_GRAIN_MAP[time_particle_size], + tz='UTC').tolist() groupby = [item.name for item in select_exprs] for event in self.events: event_name = event['event_name'] where = [ - func.addHours(self.columns['#event_time'], time_zone) >= start_data, - func.addHours(self.columns['#event_time'], time_zone) <= end_data, - self.columns['#event_name'] == event_name + func.addHours(self.event_columns['#event_time'], time_zone) >= start_data, + func.addHours(self.event_columns['#event_time'], time_zone) <= end_data, + self.event_columns['#event_name'] == event_name ] analysis = event['analysis'] filters = event['filters'] + self.get_global_filters() for item in filters: - col = self.columns.get(item['column_id']) + if item['table_type'] == 'user': + is_join_user = True + + col = getattr(self, f'{item["table_type"]}_columns').get(item['column_id']) + comparator = item['comparator_id'] ftv = item['ftv'] if comparator == '==': @@ -113,19 +98,22 @@ class ToSql: if analysis == 'total_count': qry = sa.select(select_exprs + [func.count().label('values')]) elif analysis == 'touch_user_count': - qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns['#account_id'])).label('values')]) + qry = sa.select( + select_exprs + [func.count(sa.distinct(self.event_columns['#account_id'])).label('values')]) elif analysis == 'touch_user_avg': qry = sa.select(select_exprs + [ - func.round((func.count() / func.count(sa.distinct(self.columns['#account_id']))), 2).label( + func.round((func.count() / func.count(sa.distinct(self.event_columns['#account_id']))), 2).label( 'values')]) elif analysis == 'distinct_count': qry = sa.select( - select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']])).label('values')]) + select_exprs + [ + func.count(sa.distinct(self.event_columns[event['event_attr_id']])).label('values')]) else: qry = sa.select( select_exprs + [ - func.round(getattr(func, analysis)(self.columns[event['event_attr_id']]), 2).label('values')]) + func.round(getattr(func, analysis)(self.event_columns[event['event_attr_id']]), 2).label( + 'values')]) qry = qry.where(and_(*where)) @@ -134,7 +122,10 @@ class ToSql: qry = qry.order_by(column('date')) qry = qry.limit(1000) - qry = qry.select_from(self.table) + qry = qry.select_from(self.event_table) + if is_join_user: + qry = qry.join(self.user_table, + and_(self.event_columns.get('#account_id') == self.user_columns.get('#account_id'))) sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True})) print(sql) sqls.append({'sql': sql, diff --git a/schemas/__init__.py b/schemas/__init__.py index 8b14fea..11ba086 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -10,3 +10,4 @@ from .table_struct import * from .data_auth import * from .data_attr import * from .sql import * +from .api_log import * diff --git a/schemas/api_log.py b/schemas/api_log.py new file mode 100644 index 0000000..ae08eb4 --- /dev/null +++ b/schemas/api_log.py @@ -0,0 +1,9 @@ +from typing import Any + +from pydantic import BaseModel + + +class ApiLogInsert(BaseModel): + api: str + ms: int + user_id: str diff --git a/schemas/data_auth.py b/schemas/data_auth.py index 608a13c..dfd1abd 100644 --- a/schemas/data_auth.py +++ b/schemas/data_auth.py @@ -17,3 +17,8 @@ class DataAuthEdit(BaseModel): class DataAuthSet(BaseModel): username: str data_auth_id: str + + +class LoadProQuotas(BaseModel): + event_name: str +