From c62ae3af9c3b1aef5aa95ddcf1f7bdeb9a2e67f3 Mon Sep 17 00:00:00 2001 From: wuaho Date: Fri, 23 Jul 2021 13:37:50 +0800 Subject: [PATCH] update --- api/api_v1/endpoints/dashboard.py | 31 +++++--- api/api_v1/endpoints/project.py | 2 +- api/api_v1/endpoints/report.py | 21 ++++- core/config.py | 8 +- crud/base.py | 8 +- crud/crud_report.py | 4 +- models/behavior_analysis.py | 126 +++++++++++++++++++++--------- schemas/dashboard.py | 16 +++- schemas/report.py | 2 + schemas/sql.py | 5 +- 10 files changed, 158 insertions(+), 65 deletions(-) diff --git a/api/api_v1/endpoints/dashboard.py b/api/api_v1/endpoints/dashboard.py index ef3da79..a00ced5 100644 --- a/api/api_v1/endpoints/dashboard.py +++ b/api/api_v1/endpoints/dashboard.py @@ -1,5 +1,5 @@ import pymongo -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase import crud, schemas @@ -56,33 +56,38 @@ async def move( @router.post("/add_report") async def add_report(data_in: schemas.AddReport, - game:str, + game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """添加报表""" + reports = [item.dict() for item in data_in.report_ids] res = await crud.dashboard.update_one(db, {'_id': data_in.id}, - {'$push': {'reports': {'$each': data_in.report_ids}}}) + {'$push': {'reports': {'$each': reports}}}) return schemas.Msg(code=0, msg='ok', data='ok') @router.post("/del_report") -async def del_report(data_in: schemas.DelReport, - db: AsyncIOMotorDatabase = Depends(get_database), - current_user: schemas.UserDB = Depends(deps.get_current_user) - ): +async def del_report( + game: str, + data_in: schemas.DelReport, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) +): """删除报表""" - for item in data_in.report_ids: - await crud.dashboard.update_one(db, {'_id': data_in.id}, {'$pull': {'reports': item}}) + del_item = {'report_id': data_in.report_id} + await crud.dashboard.update_one(db, {'_id': data_in.id}, {'$pull': {'reports': del_item}}) return schemas.Msg(code=0, msg='ok', data='ok') -@router.get("/") -async def dashboards(_id: str, +@router.post("/") +async def dashboards(request: Request, + game: str, + data_in: schemas.ReadDashboard, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """获取一个看板""" - res = await crud.dashboard.get(db, id=_id) - res['reports'] = await crud.report.find_many(db, **{'$in': {'_id': res.get('reports')}}) + res = await crud.dashboard.get(db, id=data_in.id) + # res['reports'] = await crud.report.find_many(db, **{'_id': {'$in': [item['report_id'] for item in res.get('reports')]}}) return schemas.Msg(code=0, msg='ok', data=res['reports']) diff --git a/api/api_v1/endpoints/project.py b/api/api_v1/endpoints/project.py index fe389e6..44af6b3 100644 --- a/api/api_v1/endpoints/project.py +++ b/api/api_v1/endpoints/project.py @@ -223,7 +223,7 @@ async def read_kanban( for d in dashboards: res['kanban'][-1]['children'].append({ 'name': d['name'], - '_id': item['_id'] + '_id': d['_id'] }) # 我的空间 diff --git a/api/api_v1/endpoints/report.py b/api/api_v1/endpoints/report.py index c9b2fe3..4f99c64 100644 --- a/api/api_v1/endpoints/report.py +++ b/api/api_v1/endpoints/report.py @@ -11,6 +11,8 @@ from api import deps router = APIRouter() + + @router.post("/create") async def create( request: Request, @@ -37,9 +39,24 @@ async def read_report( current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> Any: """获取已建报表""" + ext_where = dict() + dashboard = dict() + if data_in.report_id: + ext_where = {'_id': {'$in': data_in.report_id}} + if data_in.dashboard_id: + dashboard = await crud.dashboard.get(db, id=data_in.dashboard_id) + # projection = {'query': False} + projection = None + reports = await crud.report.read_report(db, user_id=request.user.id, project_id=data_in.project_id, + projection=projection, **ext_where) - data = await crud.report.read_report(db, user_id=request.user.id, project_id=data_in.project_id) - return schemas.Msg(code=0, msg='ok', data=data) + for item in reports: + item['added'] = False + added_ids = [item['report_id'] for item in dashboard.get('reports', [])] + if item['_id'] in added_ids: + item['added'] = True + + return schemas.Msg(code=0, msg='ok', data=reports) @router.post("/delete") diff --git a/core/config.py b/core/config.py index e7761ec..a04b493 100644 --- a/core/config.py +++ b/core/config.py @@ -76,7 +76,7 @@ class Settings(BaseSettings): 'median': lambda x: func.median(x), 'max': lambda x: func.max(x), 'min': lambda x: func.min(x), - 'distinct_count': lambda x: func.count(func.distinct(x)), + 'distinct_count': lambda x: func.uniqCombined(x), } CK_OPERATOR = { @@ -244,6 +244,12 @@ class Settings(BaseSettings): }, ], } + ARITHMETIC = { + '+': lambda x, y: x + y, + '-': lambda x, y: x - y, + 'x': lambda x, y: x * y, + '/': lambda x, y: x / y, + } PROPHET_TIME_GRAIN_MAP = { "PT1S": "S", diff --git a/crud/base.py b/crud/base.py index eab987a..00fdfa4 100644 --- a/crud/base.py +++ b/crud/base.py @@ -10,8 +10,8 @@ class CRUDBase: async def get(self, db, id: Union[ObjectId, str]) -> dict: return (await db[self.coll_name].find_one({'_id': id})) or dict() - async def find_one(self, db, filter=None, *args, **kwargs): - return (await db[self.coll_name].find_one(filter, *args, **kwargs)) or dict() + async def find_one(self, db, projection=None, *args, **kwargs): + return (await db[self.coll_name].find_one(projection, *args, **kwargs)) or dict() async def read_have(self, db, v: str, **kwargs): where = {'members': v} @@ -19,8 +19,8 @@ class CRUDBase: cursor = db[self.coll_name].find(where) return await cursor.to_list(length=9999) - async def find_many(self, db, **kwargs): - cursor = db[self.coll_name].find(kwargs) + async def find_many(self, db, projection=None, **kwargs): + cursor = db[self.coll_name].find(kwargs, projection) return await cursor.to_list(length=9999) def find(self, db, *args, **kwargs): diff --git a/crud/crud_report.py b/crud/crud_report.py index 5f58c1e..4e0d2cd 100644 --- a/crud/crud_report.py +++ b/crud/crud_report.py @@ -21,8 +21,8 @@ class CRUDReport(CRUDBase): [('project_id', pymongo.DESCENDING), ('name', pymongo.DESCENDING), ('user_id', pymongo.DESCENDING)], unique=True) - async def read_report(self, db, user_id, project_id): - res = await self.find_many(db, user_id=user_id, project_id=project_id) + async def read_report(self, db, user_id, project_id, projection=None, **kwargs): + res = await self.find_many(db, user_id=user_id, project_id=project_id,projection=projection, **kwargs) return res diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index 3ee3cc3..a2d2e61 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -9,8 +9,11 @@ import pandas as pd from sqlalchemy import func, or_, and_, not_ +import crud import schemas from core.config import settings +from db import get_database + from db.redisdb import get_redis_pool, RedisDrive @@ -20,8 +23,9 @@ class BehaviorAnalysis: self.rdb = rdb self.user_tbl = None self.event_tbl = None - self.event_view = data_in.eventView - self.events = data_in.events + self.data_in = data_in + self.event_view = dict() + self.events = [dict()] self.zone_time: int = 0 self.start_date = None @@ -33,6 +37,15 @@ class BehaviorAnalysis: self.unit_num = None async def init(self): + if self.data_in.report_id: + db = get_database() + report = await crud.report.get(db, id=self.data_in.report_id) + self.event_view = report['query']['eventView'] + self.events = report['query']['events'] + else: + self.event_view = self.data_in.eventView + self.events = self.data_in.events + await self._init_table() self.zone_time = self._get_zone_time() self.time_particle = self._get_time_particle_size() @@ -164,53 +177,92 @@ class BehaviorAnalysis: 'unit_num': self.unit_num } + def custom_event(self, s): + def f(m): + if len(m) == 3: + event_name, attr, comp = m + return getattr(func, comp)(getattr(func, 'if')(getattr(self.event_tbl.c, '#event_name') == event_name, + getattr(self.event_tbl.c, attr), 0)) + elif len(m) == 2: + event_name, comp = m + # 总次数 + if comp == 'total_count': + return func.sum(getattr(func, 'if')(getattr(self.event_tbl.c, '#event_name') == event_name, 1, 0)) + elif comp == 'touch_user_count': + return func.uniqCombined(getattr(func, 'if')(getattr(self.event_tbl.c, '#event_name') == event_name, + getattr(self.event_tbl.c, 'binduid'), 0)) + elif comp == 'touch_user_avg': + return func.divide( + func.sum(getattr(func, 'if')(getattr(self.event_tbl.c, '#event_name') == event_name, 1, 0)), + func.uniqCombined(getattr(func, 'if')(getattr(self.event_tbl.c, '#event_name') == event_name, + getattr(self.event_tbl.c, 'binduid'), 0))) + + opt = ({'+', '-', '*', '/'} & set(s)).pop() + a, b = s.split(opt) + r1 = a.split('.') + r2 = b.split('.') + return {'event_name': [r1[0], r2[0]], + 'select': (settings.ARITHMETIC[opt](f(r1), f(r2))).label('values') + } + def event_model_sql(self): sqls = [] event_time_col = getattr(self.event_tbl.c, '#event_time') - select_exprs = [ - settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)] - select_exprs += self.groupby for event in self.events: - event_name = event['event_name'] - event_name_col = getattr(self.event_tbl.c, '#event_name') + select_exprs = [ + settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)] + base_where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, - event_name_col == event_name ] - analysis = event['analysis'] - event_filter, user_filter = self.handler_filts(*event['filts']) - - u_account_id_col = getattr(self.user_tbl.c, '#account_id') - # 按账号聚合 - e_account_id_col = getattr(self.event_tbl.c, '#account_id') - - # 聚合方式 - if analysis == 'total_count': - selectd = select_exprs + [func.count().label('values')] - elif analysis == 'touch_user_count': - selectd = select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')] - elif analysis == 'touch_user_avg': - selectd = select_exprs + [ - func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label( - 'values')] - - elif analysis == 'distinct_count': - selectd = select_exprs + [ - func.count(sa.distinct(getattr(self.event_tbl.c, event['event_attr_id']))).label('values')] + event_name_col = getattr(self.event_tbl.c, '#event_name') + if event.get('customEvent'): + formula = event.get('customEvent') + custom = self.custom_event(formula) + event_name = custom['event_name'] + where = [event_name_col.in_(event_name)] + event_filter, _ = self.handler_filts(*event['filts']) + qry = sa.select( + *select_exprs, + custom['select'] + ).where(*base_where, *where, *event_filter) else: - selectd = select_exprs + [ - func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label( - 'values')] + event_name = event['event_name'] - if user_filter: - qry = sa.select(selectd).select_from( - self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where( - and_(*user_filter, *event_filter, *base_where)) + select_exprs += self.groupby - else: - qry = sa.select(selectd).where(and_(*event_filter, *base_where)) + base_where.append(event_name_col == event_name) + + analysis = event['analysis'] + event_filter, user_filter = self.handler_filts(*event['filts']) + + u_account_id_col = getattr(self.user_tbl.c, '#account_id') + # 按账号聚合 + e_account_id_col = getattr(self.event_tbl.c, '#account_id') + + # 聚合方式 + if analysis == 'total_count': + selectd = select_exprs + [func.count().label('values')] + elif analysis == 'touch_user_count': + selectd = select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')] + elif analysis == 'touch_user_avg': + selectd = select_exprs + [ + func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label( + 'values')] + else: + selectd = select_exprs + [ + func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label( + 'values')] + + if user_filter: + qry = sa.select(selectd).select_from( + self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where( + and_(*user_filter, *event_filter, *base_where)) + + else: + qry = sa.select(selectd).where(and_(*event_filter, *base_where)) qry = qry.group_by(*select_exprs) qry = qry.order_by(sa.Column('date')) diff --git a/schemas/dashboard.py b/schemas/dashboard.py index 30d87dc..3c988ea 100644 --- a/schemas/dashboard.py +++ b/schemas/dashboard.py @@ -1,7 +1,7 @@ import uuid from datetime import datetime from enum import Enum -from typing import List +from typing import List, Dict from pydantic import BaseModel @@ -20,6 +20,10 @@ class DashboardCreate(DashboardBase): pid: str +class ReadDashboard(BaseModel): + id: str + + class DashboardDelete(DBBase): pass @@ -35,12 +39,18 @@ class DashboardMove(BaseModel): cat: Category +class Report(BaseModel): + report_id: str + graph_type: str + model: str + + class AddReport(DBBase): - report_ids: List[str] + report_ids: List[Report] class DelReport(DBBase): - report_ids: List[str] + report_id: str # -------------------------------------------------------------- diff --git a/schemas/report.py b/schemas/report.py index d751aae..1231e0d 100644 --- a/schemas/report.py +++ b/schemas/report.py @@ -29,6 +29,8 @@ class ReportDelete(DBBase): class ReportRead(BaseModel): project_id: str + report_id: List = [] + dashboard_id: str = None # -------------------------------------------------------------- diff --git a/schemas/sql.py b/schemas/sql.py index 91f3628..6b21972 100644 --- a/schemas/sql.py +++ b/schemas/sql.py @@ -9,5 +9,6 @@ class Sql(BaseModel): class CkQuery(BaseModel): - eventView: dict - events: Union[List[dict],dict] + eventView: dict = None + events: Union[List[dict], dict] = None + report_id: str = None