获取事件模型拆分,api日志

This commit is contained in:
wuaho 2021-06-01 09:49:03 +08:00
parent fa9cb62683
commit 8f565aa4b2
12 changed files with 315 additions and 134 deletions

View File

@ -87,14 +87,9 @@ async def my_event(request: Request,
) -> schemas.Msg:
"""获取自己的事件权限"""
all_filed = await rdb.get(f'{game}_event')
all_filed = json.loads(all_filed)
data_attr = await crud.data_attr.find_many(db, game=game, cat='event')
data_attr = {item['name']: item for item in data_attr}
data_auth_id = await crud.authority.get_data_auth_id(db, game, request.user.username)
my_data_auth = []
if data_auth_id:
# 所有数据权限
if data_auth_id == '*':
@ -107,92 +102,141 @@ async def my_event(request: Request,
if not my_data_auth:
return schemas.Msg(code=0, msg='ok', data=[])
key_prefix = f'{game}_event_'
event_dict = await rdb.smembers_keys(*my_data_auth, prefix=key_prefix)
else:
event_dict = {}
event = []
event_list = []
event_list.append({'id': 'event', 'title': '全部事件', 'category': []})
for item in my_data_auth:
event_list[-1]['category'].append({
'event_name': item,
# todo 后面添加事件属性管理
'event_desc': item
})
group_by = [{
'id': item,
'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)),
'title': data_attr.get(item, {}).get('show_name') or item,
'category': settings.CK_FILTER.get(settings.CK_TYPE_DICT.get(all_filed.get(item))) or []
} for item in all_filed]
return schemas.Msg(code=0, msg='ok', data=event_list)
deserialization = {
'event_attr': {},
'event_filter': {}
}
for k, v in event_dict.items():
event_attr = [{
'id': '*',
'data_type': None,
'analysis': 'total_count',
'title': '总次数',
'category': []
},
{
'id': '*',
'analysis': 'touch_user_count',
'data_type': None,
'title': '触发用户数',
'category': []
},
{
'id': '*',
'analysis': 'touch_user_avg',
'data_type': None,
'title': '人均次数',
'category': []
}
]
event_filter = []
for item in sorted(v):
@router.post('/load_prop_quotas')
async def load_prop_quotas(request: Request,
game: str,
data_in: schemas.LoadProQuotas,
db: AsyncIOMotorDatabase = Depends(get_database),
rdb: RedisDrive = Depends(get_redis_pool),
ck: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""事件属性 聚合条件"""
key = f'{game}_event_{data_in.event_name}'
event_prop_set = await rdb.smembers(key)
event_prop_list = sorted(event_prop_set)
all_filed = await rdb.get(f'{game}_event')
all_filed = json.loads(all_filed)
data_attr = await crud.data_attr.find_many(db, game=game, cat='event')
data_attr = {item['name']: item for item in data_attr}
event_props = []
for item in event_prop_list:
data_type = settings.CK_TYPE_DICT.get(all_filed.get(item))
title = data_attr.get(item, {}).get('show_name') or item
event_attr.append(
{
event_prop = {
'id': item,
'data_type': data_type,
'title': title,
'category': settings.CK_OPERATOR.get(data_type) or []
}
)
event_props.append(event_prop)
event_filter.append({
staid_quots = [
{
"id": "*",
"data_type": None,
"analysis": "total_count",
"title": "总次数",
"category": []
},
{
"id": "*",
"analysis": "touch_user_count",
"data_type": None,
"title": "触发用户数",
"category": []
},
{
"id": "*",
"analysis": "touch_user_avg",
"data_type": None,
"title": "人均次数",
"category": []
},
]
res = {
'props': event_props,
'staid_quots': staid_quots
}
return schemas.Msg(code=0, msg='ok', data=res)
@router.post('/load_filter_props')
async def load_filter_props(request: Request,
game: str,
data_in: schemas.LoadProQuotas,
db: AsyncIOMotorDatabase = Depends(get_database),
rdb: RedisDrive = Depends(get_redis_pool),
ck: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""事件属性 过滤条件"""
key = f'{game}_event_{data_in.event_name}'
event_prop_set = await rdb.smembers(key)
event_prop_list = sorted(event_prop_set)
key = f'{game}_user'
user_prop_set = await rdb.get(key)
user_prop_list = sorted(event_prop_set)
all_filed = await rdb.get(f'{game}_event')
all_filed = json.loads(all_filed)
data_attr = await crud.data_attr.find_many(db, game=game, cat='event')
data_attr = {item['name']: item for item in data_attr}
event_props = []
for item in event_prop_list:
data_type = settings.CK_TYPE_DICT.get(all_filed.get(item))
title = data_attr.get(item, {}).get('show_name') or item
event_prop = {
'id': item,
'data_type': data_type,
'title': title,
'category': settings.CK_FILTER.get(data_type) or []
})
deserialization['event_attr'][k] = [{'id': 'event', 'title': '事件属性', 'category': event_attr}]
deserialization['event_filter'][k] = [{'id': 'event', 'title': '事件属性', 'category': event_filter}]
event.append({
'event_name': k,
'event_attr': [{'id': 'event', 'title': '事件属性', 'category': event_attr}],
'event_filter': [{'id': 'event', 'title': '事件属性', 'category': event_filter}],
}
)
event_props.append(event_prop)
res = {
'operator': settings.CK_OPERATOR,
'filter': settings.CK_FILTER,
'deserialization': deserialization,
data_attr = await crud.data_attr.find_many(db, game=game, cat='user')
data_attr = {item['name']: item for item in data_attr}
user_props = []
for item in user_prop_list:
data_type = settings.CK_TYPE_DICT.get(all_filed.get(item))
title = data_attr.get(item, {}).get('show_name') or item
user_prop = {
'id': item,
'data_type': data_type,
'title': title,
'category': settings.CK_FILTER.get(data_type) or []
}
user_props.append(user_prop)
'analysis': [{'id': 'event',
'title': '默认分组',
'category': event
}],
'group_by': [{
res = [
{
'title': '事件属性',
'id': 'event',
'title': '默认分组',
'category': group_by
}]
'category': event_props
},
{
'title': '用户属性',
'id': 'user',
'category': user_props
}
]
return schemas.Msg(code=0, msg='ok', data=res)

View File

@ -9,6 +9,7 @@ import crud, schemas
from api import deps
from core.config import settings
from db import get_database
from db.ckdb import CKDrive, get_ck_db
from db.redisdb import get_redis_pool
router = APIRouter()
@ -16,7 +17,7 @@ router = APIRouter()
__all__ = 'router',
@router.get("/list")
@router.get("/event_attr_list")
async def read_data_attr(
request: Request,
game: str,
@ -25,7 +26,7 @@ async def read_data_attr(
rdb: Redis = Depends(get_redis_pool),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""属性列表"""
"""事件属性列表"""
data = await rdb.get(f'{game}_{cat}')
data = json.loads(data)
res = []
@ -46,7 +47,7 @@ async def read_data_attr(
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/edit")
@router.post("/event_attr_edit")
async def edit_data_attr(
request: Request,
game: str,
@ -55,6 +56,43 @@ async def edit_data_attr(
rdb: Redis = Depends(get_redis_pool),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""编辑属性"""
"""编辑事件属性"""
await crud.data_attr.edit_data_attr(db, game, data_in)
return schemas.Msg(code=0, msg='ok', data=data_in)
@router.get("/event_list")
async def event_list(
request: Request,
game: str,
cat: str,
db: AsyncIOMotorDatabase = Depends(get_database),
ckdb: CKDrive = Depends(get_ck_db),
rdb: Redis = Depends(get_redis_pool),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""事件列表"""
event_list = await ckdb.distinct(game, 'event', '#event_name')
pass
# make_event_list = await crud.event_map
res = []
for name in event_list():
res.append(
)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/event_edit")
async def event_edit(
request: Request,
game: str,
data_in: schemas.DataAttrEdit,
db: AsyncIOMotorDatabase = Depends(get_database),
rdb: Redis = Depends(get_redis_pool),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""编辑事件"""
await crud.data_attr.edit_data_attr(db, game, data_in)
return schemas.Msg(code=0, msg='ok', data=data_in)

View File

@ -36,8 +36,11 @@ async def event_model_sql(
""" 事件分析模型 sql"""
columns_json = await rdb.get(f'{game}_event')
columns = json.loads(columns_json)
to_sql = ToSql(data_in.dict(), game, 'event', columns.keys())
event_columns = json.loads(columns_json)
columns_json = await rdb.get(f'{game}_event')
user_columns = json.loads(columns_json)
to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys())
res = to_sql.get_sql_query_event_model()
return schemas.Msg(code=0, msg='ok', data=res)
@ -53,8 +56,12 @@ async def event_model(
) -> schemas.Msg:
""" 事件分析"""
columns_json = await rdb.get(f'{game}_event')
columns = json.loads(columns_json)
to_sql = ToSql(data_in.dict(), game, 'event', columns.keys())
event_columns = json.loads(columns_json)
columns_json = await rdb.get(f'{game}_event')
user_columns = json.loads(columns_json)
to_sql = ToSql(data_in.dict(), game, event_columns.keys(), user_columns.keys())
sqls = to_sql.get_sql_query_event_model()
res = []
for item in sqls:

View File

@ -2,6 +2,7 @@ import sys
from typing import Any, Dict, List, Optional, Union
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, validator
from sqlalchemy import func
class Settings(BaseSettings):
@ -235,6 +236,31 @@ class Settings(BaseSettings):
],
}
PROPHET_TIME_GRAIN_MAP = {
"PT1S": "S",
"PT1M": "min",
"PT5M": "5min",
"PT10M": "10min",
"PT15M": "15min",
"PT0.5H": "30min",
"PT1H": "H",
"P1D": "D",
"P1W": "W",
"P1M": "M",
}
TIME_GRAIN_EXPRESSIONS = {
'PT1S': lambda col, zone: func.toStartOfSecond(func.addHours(col, zone)).label('date'),
'PT1M': lambda col, zone: func.toStartOfMinute(func.addHours(col, zone)).label('date'),
'PT5M': lambda col, zone: func.toStartOfFiveMinute(func.addHours(col, zone)).label('date'),
'PT10M': lambda col, zone: func.toStartOfTenMinutes(func.addHours(col, zone)).label('date'),
'PT15M': lambda col, zone: func.toStartOfFifteenMinutes(func.addHours(col, zone)).label('date'),
'PT1H': lambda col, zone: func.toStartOfHour(func.addHours(col, zone)).label('date'),
'P1D': lambda col, zone: func.toStartOfDay(func.addHours(col, zone)).label('date'),
'P1W': lambda col, zone: func.toMonday(func.addHours(col, zone)).label('date'),
'P1M': lambda col, zone: func.toStartOfMonth(func.addHours(col, zone)).label('date'),
}
class Config:
case_sensitive = True

View File

@ -7,3 +7,4 @@ from .crud_report import report
from .crud_authority import authority
from .crud_data_auth import data_auth
from .crud_data_attr import data_attr
from .crud_api_log import api_log

14
crud/crud_api_log.py Normal file
View File

@ -0,0 +1,14 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'api_log',
class CRUDApiLog(CRUDBase):
async def insert_log(self, db: AsyncIOMotorDatabase, data_in: schemas.ApiLogInsert):
await db[self.coll_name].insert_one(data_in.dict())
api_log = CRUDApiLog('api_log')

24
crud/crud_event_map.py Normal file
View File

@ -0,0 +1,24 @@
import pymongo
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
from schemas import *
__all__ = 'event_map',
class CRUDEventMap(CRUDBase):
async def edit_data_attr(self, db: AsyncIOMotorDatabase, game: str, data_id: schemas.EventMapEdit):
await self.update_one(db, {'game': game, 'cat': data_id.cat, 'name': data_id.name}, {'$set': data_id.dict()},
upsert=True)
async def create_index(self, db: AsyncIOMotorDatabase):
await db[self.coll_name].create_index(
[('game', pymongo.DESCENDING), ('cat', pymongo.DESCENDING), ('name', pymongo.DESCENDING)],
unique=True)
event_map = CRUDEventMap('event_map')

25
main.py
View File

@ -3,9 +3,15 @@ import time
import uvicorn
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from starlette.middleware.cors import CORSMiddleware
from starlette.authentication import AuthenticationBackend, AuthenticationError, AuthCredentials, BaseUser, SimpleUser
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.responses import Response
import crud
import schemas
from db.redisdb import get_redis_pool
from middleware import CasbinMiddleware
from db import connect_to_mongo, close_mongo_connection, get_database
@ -74,12 +80,27 @@ app.add_middleware(
)
# @app.exception_handler(RequestValidationError)
# async def validation_exception_handler(request, exc):
# return Response(schemas.Msg(code='-4', msg='服务器错误', data=str(exc)), status_code=200)
# @app.exception_handler(Exception)
# async def http_exception_handler(request, exc):
# return Response(schemas.Msg(code='-3', msg='服务器错误'), status_code=200)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = int(time.time()*1000)
start_time = int(time.time() * 1000)
response = await call_next(request)
process_time = int(time.time()*1000) - start_time
process_time = int(time.time() * 1000) - start_time
response.headers["X-Process-Time"] = str(process_time)
await crud.api_log.insert_log(get_database(), schemas.ApiLogInsert(
api=str(request.url),
ms=process_time,
user_id=request.user.id
))
return response

View File

@ -4,42 +4,21 @@ from sqlalchemy.sql import func
from sqlalchemy import create_engine, column, and_, desc, table, or_
import pandas as pd
PROPHET_TIME_GRAIN_MAP = {
"PT1S": "S",
"PT1M": "min",
"PT5M": "5min",
"PT10M": "10min",
"PT15M": "15min",
"PT0.5H": "30min",
"PT1H": "H",
"P1D": "D",
"P1W": "W",
"P1M": "M",
}
TIME_GRAIN_EXPRESSIONS = {
'PT1S': lambda col, zone: func.toStartOfSecond(func.addHours(col, zone)).label('date'),
'PT1M': lambda col, zone: func.toStartOfMinute(func.addHours(col, zone)).label('date'),
'PT5M': lambda col, zone: func.toStartOfFiveMinute(func.addHours(col, zone)).label('date'),
'PT10M': lambda col, zone: func.toStartOfTenMinutes(func.addHours(col, zone)).label('date'),
'PT15M': lambda col, zone: func.toStartOfFifteenMinutes(func.addHours(col, zone)).label('date'),
# 'PT0.5H': lambda col, zone: func.toStartOfMinute(func.addHours(col, zone)).label('date'),
'PT1H': lambda col, zone: func.toStartOfHour(func.addHours(col, zone)).label('date'),
'P1D': lambda col, zone: func.toStartOfDay(func.addHours(col, zone)).label('date'),
'P1W': lambda col, zone: func.toMonday(func.addHours(col, zone)).label('date'),
'P1M': lambda col, zone: func.toStartOfMonth(func.addHours(col, zone)).label('date'),
}
from core.config import settings
class ToSql:
def __init__(self, data: dict, db_name: str, table_name: str, columns: List[str]):
def __init__(self, data: dict, db_name: str, event_columns: List[str], user_columns: List[str]):
self.db_name = db_name
self.engine = create_engine('clickhouse://')
self.columns = self.gen_columns(columns)
self.event_view = data.get('eventView')
self.events = data.get('events')
self.table = sa.table(table_name, *self.columns.values(), schema=self.db_name)
self.event_columns = self.gen_columns(event_columns)
self.user_columns = self.gen_columns(user_columns)
self.event_table = sa.table('event', *self.event_columns.values(), schema=self.db_name)
self.user_table = sa.table('user_view', *self.user_columns.values(), schema=self.db_name)
def gen_columns(self, columns):
return {col: column(col) for col in columns}
@ -64,28 +43,34 @@ class ToSql:
return self.event_view.get('timeParticleSize') or 'P1D'
def get_sql_query_event_model(self):
"""只是查event表"""
is_join_user = False
sqls = []
select_exprs = self.get_group_by()
select_exprs = [self.columns.get(item) for item in select_exprs]
select_exprs = [self.event_columns.get(item) for item in select_exprs]
time_particle_size = self.get_time_particle_size()
start_data, end_data = self.get_date_range()
time_zone = self.get_zone_time()
select_exprs.insert(0, TIME_GRAIN_EXPRESSIONS[time_particle_size](self.columns['#event_time'], time_zone))
date_range = pd.date_range(start_data, end_data, freq=PROPHET_TIME_GRAIN_MAP[time_particle_size]).tolist()
select_exprs.insert(0, settings.TIME_GRAIN_EXPRESSIONS[time_particle_size](self.event_columns['#event_time'],
time_zone))
date_range = pd.date_range(start_data, end_data, freq=settings.PROPHET_TIME_GRAIN_MAP[time_particle_size],
tz='UTC').tolist()
groupby = [item.name for item in select_exprs]
for event in self.events:
event_name = event['event_name']
where = [
func.addHours(self.columns['#event_time'], time_zone) >= start_data,
func.addHours(self.columns['#event_time'], time_zone) <= end_data,
self.columns['#event_name'] == event_name
func.addHours(self.event_columns['#event_time'], time_zone) >= start_data,
func.addHours(self.event_columns['#event_time'], time_zone) <= end_data,
self.event_columns['#event_name'] == event_name
]
analysis = event['analysis']
filters = event['filters'] + self.get_global_filters()
for item in filters:
col = self.columns.get(item['column_id'])
if item['table_type'] == 'user':
is_join_user = True
col = getattr(self, f'{item["table_type"]}_columns').get(item['column_id'])
comparator = item['comparator_id']
ftv = item['ftv']
if comparator == '==':
@ -113,19 +98,22 @@ class ToSql:
if analysis == 'total_count':
qry = sa.select(select_exprs + [func.count().label('values')])
elif analysis == 'touch_user_count':
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns['#account_id'])).label('values')])
qry = sa.select(
select_exprs + [func.count(sa.distinct(self.event_columns['#account_id'])).label('values')])
elif analysis == 'touch_user_avg':
qry = sa.select(select_exprs + [
func.round((func.count() / func.count(sa.distinct(self.columns['#account_id']))), 2).label(
func.round((func.count() / func.count(sa.distinct(self.event_columns['#account_id']))), 2).label(
'values')])
elif analysis == 'distinct_count':
qry = sa.select(
select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']])).label('values')])
select_exprs + [
func.count(sa.distinct(self.event_columns[event['event_attr_id']])).label('values')])
else:
qry = sa.select(
select_exprs + [
func.round(getattr(func, analysis)(self.columns[event['event_attr_id']]), 2).label('values')])
func.round(getattr(func, analysis)(self.event_columns[event['event_attr_id']]), 2).label(
'values')])
qry = qry.where(and_(*where))
@ -134,7 +122,10 @@ class ToSql:
qry = qry.order_by(column('date'))
qry = qry.limit(1000)
qry = qry.select_from(self.table)
qry = qry.select_from(self.event_table)
if is_join_user:
qry = qry.join(self.user_table,
and_(self.event_columns.get('#account_id') == self.user_columns.get('#account_id')))
sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True}))
print(sql)
sqls.append({'sql': sql,

View File

@ -10,3 +10,4 @@ from .table_struct import *
from .data_auth import *
from .data_attr import *
from .sql import *
from .api_log import *

9
schemas/api_log.py Normal file
View File

@ -0,0 +1,9 @@
from typing import Any
from pydantic import BaseModel
class ApiLogInsert(BaseModel):
api: str
ms: int
user_id: str

View File

@ -17,3 +17,8 @@ class DataAuthEdit(BaseModel):
class DataAuthSet(BaseModel):
username: str
data_auth_id: str
class LoadProQuotas(BaseModel):
event_name: str