This commit is contained in:
wuaho 2021-05-26 21:07:53 +08:00
parent 17644de328
commit fcba7425aa
10 changed files with 147 additions and 54 deletions

View File

@ -56,6 +56,7 @@ async def move(
@router.post("/add_report")
async def add_report(data_in: schemas.AddReport,
game:str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):

View File

@ -106,22 +106,34 @@ async def my_event(request: Request,
'id': item,
'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)),
'title': data_attr.get(item, {}).get('show_name') or item,
'category': settings.CK_FILTER.get(settings.CK_TYPE_DICT.get(all_filed.get(item))) or []
} for item in all_filed]
# filter_by = [{
# 'id': item,
# 'data_type': settings.CK_TYPE_DICT.get(all_filed.get(item)),
# 'title': data_attr.get(item, {}).get('show_name') or item,
# 'category': settings.CK_FILTER.get(settings.CK_TYPE_DICT.get(all_filed.get(item))) or []
# } for item in all_filed]
for k, v in event_dict.items():
event_attr = [{
'id': 'total_count',
'id': '*',
'data_type': None,
'analysis': 'total_count',
'title': '总次数',
'category': []
},
{
'id': 'touch_user_count',
'id': '*',
'analysis': 'touch_user_count',
'data_type': None,
'title': '触发用户数',
'category': []
},
{
'id': 'touch_user_avg',
'id': '*',
'analysis': 'touch_user_avg',
'data_type': None,
'title': '人均次数',
'category': []
@ -158,7 +170,11 @@ async def my_event(request: Request,
'title': '默认分组',
'category': event
}],
'group_by': group_by
'group_by': [{
'id': 'event',
'title': '默认分组',
'category': group_by
}]
}
return schemas.Msg(code=0, msg='ok', data=res)

View File

@ -57,7 +57,47 @@ async def event_model(
to_sql = ToSql(data_in.dict(), game, 'event', columns.keys())
sqls = to_sql.get_sql_query_event_model()
res = []
for sql in sqls:
data = await ckdb.execute(sql)
res.append(data)
for item in sqls:
q = {
'groups': [],
'values': [],
'event_name': item['event_name']
}
sql = item['sql']
groupby = item['groupby'][1:]
date_range = item['date_range']
q['date_range'] = date_range
df = await ckdb.query_dataframe(sql)
if df.shape[0]==0:
return schemas.Msg(code=0, msg='ok', data=q)
df['date'] = df['date'].apply(lambda x: str(x))
# todo 时间粒度 暂时按天
df['date'] = df['date'].apply(
lambda x: pd.Timestamp(year=int(x[:4]), month=int(x[4:6]), day=int(x[6:])).strftime('%Y-%m-%d'))
if groupby:
# 有分组
for group, df_group in df.groupby(groupby):
df_group.reset_index(drop=True, inplace=True)
q['groups'].append(group)
concat_data = []
for i in set(date_range) - set(df_group['date']):
if len(groupby) > 1:
concat_data.append((i, *group, 0))
else:
concat_data.append((i, group, 0))
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
df_group.sort_values('date', inplace=True)
q['values'].append(df_group['values'].to_list())
else:
# 无分组
concat_data = []
for i in set(date_range) - set(df['date']):
concat_data.append((i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
q['values'].append(df['values'].to_list())
res.append(q)
return schemas.Msg(code=0, msg='ok', data=res)

View File

@ -1,7 +1,7 @@
from typing import Any
import pymongo
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
import crud, schemas
@ -13,13 +13,15 @@ router = APIRouter()
@router.post("/create")
async def create(
request: Request,
data_in: schemas.ReportCreate,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""新建报表"""
try:
await crud.report.create(db, data_in, user_id=current_user.id)
await crud.report.create(db, data_in, user_id=request.user.id)
except pymongo.errors.DuplicateKeyError:
return schemas.Msg(code=-1, msg='error', data='报表已存在')
@ -28,19 +30,23 @@ async def create(
@router.post("/read_report")
async def read_report(
request: Request,
data_in: schemas.ReportRead,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> Any:
"""获取已建报表"""
res = await crud.report.read_report(db, user_id=current_user.id, project_id=data_in.project_id)
return res
data = await crud.report.read_report(db, user_id=request.user.id, project_id=data_in.project_id)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/delete")
async def delete(
request: Request,
data_in: schemas.ReportDelete,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:

View File

@ -151,7 +151,7 @@ class Settings(BaseSettings):
'id': '>',
'title': '大于'
}, {
'id': 'not null',
'id': 'is not null',
'title': '有值'
}, {
'id': 'is null',

View File

@ -12,9 +12,7 @@ class CRUDReport(CRUDBase):
async def create(self, db: AsyncIOMotorDatabase, obj_in: ReportCreate, user_id: str):
db_obj = ReportDB(
**obj_in.dict(), user_id=user_id,
_id=uuid.uuid1().hex,
members=[user_id]
_id=uuid.uuid1().hex
)
await db[self.coll_name].insert_one(db_obj.dict(by_alias=True))
@ -23,8 +21,8 @@ class CRUDReport(CRUDBase):
[('project_id', pymongo.DESCENDING), ('name', pymongo.DESCENDING), ('user_id', pymongo.DESCENDING)],
unique=True)
async def read_report(self, db, user_id):
res = await self.read_have(db, user_id)
async def read_report(self, db, user_id, project_id):
res = await self.find_many(db, user_id=user_id, project_id=project_id)
return res

View File

@ -1,33 +1,31 @@
import base64
import binascii
import time
import uvicorn
from fastapi import FastAPI
import casbin
from api.deps import get_current_user2
from core.config import settings
from fastapi import FastAPI, Request
from starlette.middleware.cors import CORSMiddleware
from starlette.authentication import AuthenticationBackend, AuthenticationError, AuthCredentials, BaseUser, SimpleUser
from starlette.middleware.authentication import AuthenticationMiddleware
from fastapi_authz import CasbinMiddleware
from middleware import CasbinMiddleware
from db import connect_to_mongo, close_mongo_connection, get_database
from db.ckdb_utils import connect_to_ck, close_ck_connection
from db.redisdb_utils import connect_to_redis, close_redis_connection
from utils import *
from api.api_v1.api import api_router
from core.config import settings
from api.deps import get_current_user2
app = FastAPI(title=settings.PROJECT_NAME)
app.include_router(api_router, prefix=settings.API_V1_STR)
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_event_handler("startup", connect_to_mongo)
app.add_event_handler("startup", connect_to_redis)
app.add_event_handler("startup", connect_to_ck)
app.add_event_handler("shutdown", close_mongo_connection)
app.add_event_handler("shutdown", close_redis_connection)
app.add_event_handler("shutdown", close_ck_connection)
class CurrentUser(BaseUser):
@ -67,9 +65,23 @@ class BasicAuth(AuthenticationBackend):
app.add_middleware(CasbinMiddleware, enforcer=casbin_enforcer)
app.add_middleware(AuthenticationMiddleware, backend=BasicAuth())
from api.api_v1.api import api_router
app.include_router(api_router, prefix=settings.API_V1_STR)
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = int(time.time()*1000)
response = await call_next(request)
process_time = int(time.time()*1000) - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
if __name__ == '__main__':
uvicorn.run(app='main2:app', host="0.0.0.0", port=8899, reload=True, debug=True)
uvicorn.run(app='main:app', host="0.0.0.0", port=7889, reload=True, debug=True)

View File

@ -2,6 +2,7 @@ from typing import List, Tuple
import sqlalchemy as sa
from sqlalchemy.sql import func
from sqlalchemy import create_engine, column, and_, desc, table, or_
import pandas as pd
class ToSql:
@ -12,7 +13,7 @@ class ToSql:
self.event_view = data.get('eventView')
self.events = data.get('events')
self.table = sa.table(table_name, *self.columns, schema=self.db_name)
self.table = sa.table(table_name, *self.columns.values(), schema=self.db_name)
def gen_columns(self, columns):
return {col: column(col) for col in columns}
@ -27,7 +28,7 @@ class ToSql:
def get_group_by(self):
# return self.event_view.get('groupBy') or []
return [item['columnName'] for item in self.event_view.get('groupBy')]
return [item['column_id'] for item in self.event_view.get('groupBy')]
def get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'day'
@ -38,10 +39,12 @@ class ToSql:
select_exprs = self.get_group_by()
select_exprs = [self.columns.get(item) for item in select_exprs]
time_particle_size = self.get_time_particle_size()
if time_particle_size == 'day':
select_exprs.append(func.toYYYYMMDD(self.columns['#event_time']).label('date'))
start_data, end_data = self.get_date_range()
date_range = []
if time_particle_size == 'day':
select_exprs.insert(0, func.toYYYYMMDD(self.columns['#event_time']).label('date'))
date_range = pd.date_range(start_data, end_data, freq='D').strftime('%Y-%m-%d').tolist()
groupby = [item.name for item in select_exprs]
for event in self.events:
event_name = event['event_name']
@ -69,29 +72,44 @@ class ToSql:
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == '!=':
where.append(col != ftv[0])
if analysis == 'total_count':
qry = sa.select(select_exprs + [func.count()])
qry = sa.select(select_exprs + [func.count().label('values')])
elif analysis == 'touch_user_count':
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['#account_id']]))])
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns['#account_id'])).label('values')])
elif analysis == 'touch_user_avg':
qry = sa.select(select_exprs + [func.count(func.avg(self.columns[event['#account_id']]))])
qry = sa.select(select_exprs + [func.avg(self.columns['#account_id']).label('values')])
elif analysis == 'distinct_count':
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']]))])
qry = sa.select(
select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']])).label('values')])
else:
qry = sa.select(select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']])])
qry = sa.select(
select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']]).label('values')])
qry = qry.where(and_(*where))
qry = qry.group_by(*select_exprs)
if time_particle_size == 'day':
qry = qry.order_by(column('date'))
qry = qry.order_by(column('date'))
qry = qry.limit(1000)
qry = qry.select_from(self.table)
sqls.append(str(qry.compile(self.engine, compile_kwargs={"literal_binds": True})))
sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True}))
print(sql)
sqls.append({'sql': sql,
'groupby': groupby,
'date_range': date_range,
'event_name': event_name
})
return sqls

View File

@ -1,3 +1,4 @@
import uuid
from typing import Optional, Union
from bson import ObjectId

View File

@ -17,16 +17,17 @@ class ReportBase(BaseModel):
class ReportCreate(ReportBase):
name: str
desc: str
project_id: str
query: Json
sql: str
query: dict
cat: str
class ReportDelete(DBBase):
pass
class ReportRead(DBBase):
class ReportRead(BaseModel):
project_id: str
@ -36,7 +37,7 @@ class ReportDB(DBBase):
name: str
user_id: str
project_id: str
# cat: Category
members: List[str] = []
pid: str
desc: str
query: dict
cat: str
create_date: datetime = datetime.now()