From fcba7425aa2adfbd3dddc1024d16a0a41571e527 Mon Sep 17 00:00:00 2001 From: wuaho Date: Wed, 26 May 2021 21:07:53 +0800 Subject: [PATCH] 1 --- api/api_v1/endpoints/dashboard.py | 1 + api/api_v1/endpoints/data_auth.py | 24 +++++++++++++--- api/api_v1/endpoints/query.py | 46 +++++++++++++++++++++++++++-- api/api_v1/endpoints/report.py | 14 ++++++--- core/config.py | 2 +- crud/crud_report.py | 8 ++---- main2.py | 48 +++++++++++++++++++------------ models/to_sql.py | 44 +++++++++++++++++++--------- schemas/base.py | 1 + schemas/report.py | 13 +++++---- 10 files changed, 147 insertions(+), 54 deletions(-) diff --git a/api/api_v1/endpoints/dashboard.py b/api/api_v1/endpoints/dashboard.py index b9f0a04..ef3da79 100644 --- a/api/api_v1/endpoints/dashboard.py +++ b/api/api_v1/endpoints/dashboard.py @@ -56,6 +56,7 @@ async def move( @router.post("/add_report") async def add_report(data_in: schemas.AddReport, + game:str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ): diff --git a/api/api_v1/endpoints/data_auth.py b/api/api_v1/endpoints/data_auth.py index 73ae46d..9f7b9d4 100644 --- a/api/api_v1/endpoints/data_auth.py +++ b/api/api_v1/endpoints/data_auth.py @@ -106,22 +106,34 @@ async def my_event(request: Request, 'id': item, 'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)), 'title': data_attr.get(item, {}).get('show_name') or item, + 'category': settings.CK_FILTER.get(settings.CK_TYPE_DICT.get(all_filed.get(item))) or [] } for item in all_filed] + + # filter_by = [{ + # 'id': item, + # 'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)), + # 'title': data_attr.get(item, {}).get('show_name') or item, + # 'category': settings.CK_FILTER.get(settings.CK_TYPE_DICT.get(all_filed.get(item))) or [] + # } for item in all_filed] + for k, v in event_dict.items(): event_attr = [{ - 'id': 'total_count', + 'id': '*', 'data_type': None, + 'analysis': 'total_count', 'title': '总次数', 'category': [] }, { - 'id': 'touch_user_count', + 'id': '*', + 'analysis': 'touch_user_count', 'data_type': None, 'title': '触发用户数', 'category': [] }, { - 'id': 'touch_user_avg', + 'id': '*', + 'analysis': 'touch_user_avg', 'data_type': None, 'title': '人均次数', 'category': [] @@ -158,7 +170,11 @@ async def my_event(request: Request, 'title': '默认分组', 'category': event }], - 'group_by': group_by + 'group_by': [{ + 'id': 'event', + 'title': '默认分组', + 'category': group_by + }] } return schemas.Msg(code=0, msg='ok', data=res) diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index a1dc266..db3a9c0 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -57,7 +57,47 @@ async def event_model( to_sql = ToSql(data_in.dict(), game, 'event', columns.keys()) sqls = to_sql.get_sql_query_event_model() res = [] - for sql in sqls: - data = await ckdb.execute(sql) - res.append(data) + for item in sqls: + q = { + 'groups': [], + 'values': [], + 'event_name': item['event_name'] + } + sql = item['sql'] + groupby = item['groupby'][1:] + date_range = item['date_range'] + q['date_range'] = date_range + df = await ckdb.query_dataframe(sql) + if df.shape[0]==0: + return schemas.Msg(code=0, msg='ok', data=q) + df['date'] = df['date'].apply(lambda x: str(x)) + # todo 时间粒度 暂时按天 + df['date'] = df['date'].apply( + lambda x: pd.Timestamp(year=int(x[:4]), month=int(x[4:6]), day=int(x[6:])).strftime('%Y-%m-%d')) + + if groupby: + # 有分组 + for group, df_group in df.groupby(groupby): + df_group.reset_index(drop=True, inplace=True) + q['groups'].append(group) + + concat_data = [] + for i in set(date_range) - set(df_group['date']): + if len(groupby) > 1: + concat_data.append((i, *group, 0)) + else: + concat_data.append((i, group, 0)) + df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)]) + df_group.sort_values('date', inplace=True) + q['values'].append(df_group['values'].to_list()) + + else: + # 无分组 + concat_data = [] + for i in set(date_range) - set(df['date']): + concat_data.append((i, 0)) + df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) + q['values'].append(df['values'].to_list()) + + res.append(q) return schemas.Msg(code=0, msg='ok', data=res) diff --git a/api/api_v1/endpoints/report.py b/api/api_v1/endpoints/report.py index 6e9b706..c9b2fe3 100644 --- a/api/api_v1/endpoints/report.py +++ b/api/api_v1/endpoints/report.py @@ -1,7 +1,7 @@ from typing import Any import pymongo -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase import crud, schemas @@ -13,13 +13,15 @@ router = APIRouter() @router.post("/create") async def create( + request: Request, data_in: schemas.ReportCreate, + game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """新建报表""" try: - await crud.report.create(db, data_in, user_id=current_user.id) + await crud.report.create(db, data_in, user_id=request.user.id) except pymongo.errors.DuplicateKeyError: return schemas.Msg(code=-1, msg='error', data='报表已存在') @@ -28,19 +30,23 @@ async def create( @router.post("/read_report") async def read_report( + request: Request, data_in: schemas.ReportRead, + game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> Any: """获取已建报表""" - res = await crud.report.read_report(db, user_id=current_user.id, project_id=data_in.project_id) - return res + data = await crud.report.read_report(db, user_id=request.user.id, project_id=data_in.project_id) + return schemas.Msg(code=0, msg='ok', data=data) @router.post("/delete") async def delete( + request: Request, data_in: schemas.ReportDelete, + game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: diff --git a/core/config.py b/core/config.py index 85c1b91..bdb32e6 100644 --- a/core/config.py +++ b/core/config.py @@ -151,7 +151,7 @@ class Settings(BaseSettings): 'id': '>', 'title': '大于' }, { - 'id': 'not null', + 'id': 'is not null', 'title': '有值' }, { 'id': 'is null', diff --git a/crud/crud_report.py b/crud/crud_report.py index 2cfe07b..5f58c1e 100644 --- a/crud/crud_report.py +++ b/crud/crud_report.py @@ -12,9 +12,7 @@ class CRUDReport(CRUDBase): async def create(self, db: AsyncIOMotorDatabase, obj_in: ReportCreate, user_id: str): db_obj = ReportDB( **obj_in.dict(), user_id=user_id, - _id=uuid.uuid1().hex, - members=[user_id] - + _id=uuid.uuid1().hex ) await db[self.coll_name].insert_one(db_obj.dict(by_alias=True)) @@ -23,8 +21,8 @@ class CRUDReport(CRUDBase): [('project_id', pymongo.DESCENDING), ('name', pymongo.DESCENDING), ('user_id', pymongo.DESCENDING)], unique=True) - async def read_report(self, db, user_id): - res = await self.read_have(db, user_id) + async def read_report(self, db, user_id, project_id): + res = await self.find_many(db, user_id=user_id, project_id=project_id) return res diff --git a/main2.py b/main2.py index db2b1f9..a0a5b37 100644 --- a/main2.py +++ b/main2.py @@ -1,33 +1,31 @@ -import base64 import binascii +import time import uvicorn -from fastapi import FastAPI -import casbin - -from api.deps import get_current_user2 -from core.config import settings +from fastapi import FastAPI, Request from starlette.middleware.cors import CORSMiddleware from starlette.authentication import AuthenticationBackend, AuthenticationError, AuthCredentials, BaseUser, SimpleUser from starlette.middleware.authentication import AuthenticationMiddleware -from fastapi_authz import CasbinMiddleware +from middleware import CasbinMiddleware from db import connect_to_mongo, close_mongo_connection, get_database +from db.ckdb_utils import connect_to_ck, close_ck_connection +from db.redisdb_utils import connect_to_redis, close_redis_connection from utils import * +from api.api_v1.api import api_router +from core.config import settings +from api.deps import get_current_user2 app = FastAPI(title=settings.PROJECT_NAME) +app.include_router(api_router, prefix=settings.API_V1_STR) -if settings.BACKEND_CORS_ORIGINS: - app.add_middleware( - CORSMiddleware, - allow_origins=['*'], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) app.add_event_handler("startup", connect_to_mongo) +app.add_event_handler("startup", connect_to_redis) +app.add_event_handler("startup", connect_to_ck) app.add_event_handler("shutdown", close_mongo_connection) +app.add_event_handler("shutdown", close_redis_connection) +app.add_event_handler("shutdown", close_ck_connection) class CurrentUser(BaseUser): @@ -67,9 +65,23 @@ class BasicAuth(AuthenticationBackend): app.add_middleware(CasbinMiddleware, enforcer=casbin_enforcer) app.add_middleware(AuthenticationMiddleware, backend=BasicAuth()) -from api.api_v1.api import api_router -app.include_router(api_router, prefix=settings.API_V1_STR) +app.add_middleware( + CORSMiddleware, + allow_origins=['*'], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.middleware("http") +async def add_process_time_header(request: Request, call_next): + start_time = int(time.time()*1000) + response = await call_next(request) + process_time = int(time.time()*1000) - start_time + response.headers["X-Process-Time"] = str(process_time) + return response if __name__ == '__main__': - uvicorn.run(app='main2:app', host="0.0.0.0", port=8899, reload=True, debug=True) + uvicorn.run(app='main:app', host="0.0.0.0", port=7889, reload=True, debug=True) diff --git a/models/to_sql.py b/models/to_sql.py index 2922d86..cfd0dee 100644 --- a/models/to_sql.py +++ b/models/to_sql.py @@ -2,6 +2,7 @@ from typing import List, Tuple import sqlalchemy as sa from sqlalchemy.sql import func from sqlalchemy import create_engine, column, and_, desc, table, or_ +import pandas as pd class ToSql: @@ -12,7 +13,7 @@ class ToSql: self.event_view = data.get('eventView') self.events = data.get('events') - self.table = sa.table(table_name, *self.columns, schema=self.db_name) + self.table = sa.table(table_name, *self.columns.values(), schema=self.db_name) def gen_columns(self, columns): return {col: column(col) for col in columns} @@ -27,7 +28,7 @@ class ToSql: def get_group_by(self): # return self.event_view.get('groupBy') or [] - return [item['columnName'] for item in self.event_view.get('groupBy')] + return [item['column_id'] for item in self.event_view.get('groupBy')] def get_time_particle_size(self): return self.event_view.get('timeParticleSize') or 'day' @@ -38,10 +39,12 @@ class ToSql: select_exprs = self.get_group_by() select_exprs = [self.columns.get(item) for item in select_exprs] time_particle_size = self.get_time_particle_size() - if time_particle_size == 'day': - select_exprs.append(func.toYYYYMMDD(self.columns['#event_time']).label('date')) - start_data, end_data = self.get_date_range() + date_range = [] + if time_particle_size == 'day': + select_exprs.insert(0, func.toYYYYMMDD(self.columns['#event_time']).label('date')) + date_range = pd.date_range(start_data, end_data, freq='D').strftime('%Y-%m-%d').tolist() + groupby = [item.name for item in select_exprs] for event in self.events: event_name = event['event_name'] @@ -69,29 +72,44 @@ class ToSql: where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) + + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + elif comparator == '!=': where.append(col != ftv[0]) if analysis == 'total_count': - qry = sa.select(select_exprs + [func.count()]) + qry = sa.select(select_exprs + [func.count().label('values')]) elif analysis == 'touch_user_count': - qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['#account_id']]))]) + qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns['#account_id'])).label('values')]) elif analysis == 'touch_user_avg': - qry = sa.select(select_exprs + [func.count(func.avg(self.columns[event['#account_id']]))]) + qry = sa.select(select_exprs + [func.avg(self.columns['#account_id']).label('values')]) elif analysis == 'distinct_count': - qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']]))]) + qry = sa.select( + select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']])).label('values')]) else: - qry = sa.select(select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']])]) + qry = sa.select( + select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']]).label('values')]) qry = qry.where(and_(*where)) qry = qry.group_by(*select_exprs) - if time_particle_size == 'day': - qry = qry.order_by(column('date')) + qry = qry.order_by(column('date')) + qry = qry.limit(1000) qry = qry.select_from(self.table) - sqls.append(str(qry.compile(self.engine, compile_kwargs={"literal_binds": True}))) + sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True})) + print(sql) + sqls.append({'sql': sql, + 'groupby': groupby, + 'date_range': date_range, + 'event_name': event_name + + }) return sqls diff --git a/schemas/base.py b/schemas/base.py index c648806..477f335 100644 --- a/schemas/base.py +++ b/schemas/base.py @@ -1,3 +1,4 @@ +import uuid from typing import Optional, Union from bson import ObjectId diff --git a/schemas/report.py b/schemas/report.py index 47cc59e..d751aae 100644 --- a/schemas/report.py +++ b/schemas/report.py @@ -17,16 +17,17 @@ class ReportBase(BaseModel): class ReportCreate(ReportBase): name: str + desc: str project_id: str - query: Json - sql: str + query: dict + cat: str class ReportDelete(DBBase): pass -class ReportRead(DBBase): +class ReportRead(BaseModel): project_id: str @@ -36,7 +37,7 @@ class ReportDB(DBBase): name: str user_id: str project_id: str - # cat: Category - members: List[str] = [] - pid: str + desc: str + query: dict + cat: str create_date: datetime = datetime.now()