筛选支持 or

This commit is contained in:
wuaho 2021-09-15 17:05:53 +08:00
parent 1a0b076ee6
commit c0f80a8e7e
4 changed files with 289 additions and 102 deletions

View File

@ -27,15 +27,39 @@ router = APIRouter()
@router.post("/sql") @router.post("/sql")
async def query_sql( async def query_sql(
request: Request, request: Request,
game: str,
data_in: schemas.Sql, data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db), ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user) current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg: ) -> schemas.Msg:
"""原 sql 查询 """ """原 sql 查询 """
data = await ckdb.execute(data_in.sql) sql = data_in.sql
sql = sql.replace('$game', game)
data = await ckdb.execute(sql)
return schemas.Msg(code=0, msg='ok', data=data) return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/sql_export")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""sql 导出 """
file_name = quote(f'result.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data_in.sql
sql = sql.replace('$game', game)
df = await ckdb.query_dataframe(sql)
df_to_stream = DfToStream((df, '留存分析'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/event_model_sql") @router.post("/event_model_sql")
async def event_model_sql( async def event_model_sql(
request: Request, request: Request,
@ -145,7 +169,7 @@ async def event_model(
df = await ckdb.query_dataframe(sql) df = await ckdb.query_dataframe(sql)
df.fillna(0, inplace=True) df.fillna(0, inplace=True)
if df.shape[0] == 0: if df.shape[0] == 0:
df = pd.DataFrame({'date':date_range,'values':0*len(date_range)}) df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
# continue # continue
# return schemas.Msg(code=0, msg='ok', data=[q]) # return schemas.Msg(code=0, msg='ok', data=[q])
if item['time_particle'] == 'total': if item['time_particle'] == 'total':
@ -212,12 +236,15 @@ async def event_model(
res.append(q) res.append(q)
# 按总和排序 # 按总和排序
for item in res: for item in res:
try:
if item['time_particle'] in ('P1D', 'P1W'): if item['time_particle'] in ('P1D', 'P1W'):
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']] item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
elif item['time_particle'] in ('P1M',): elif item['time_particle'] in ('P1M',):
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']] item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
else: else:
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']] item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
except:
pass
sort_key = np.argsort(np.array(item['sum']))[::-1] sort_key = np.argsort(np.array(item['sum']))[::-1]
if item.get('groups'): if item.get('groups'):
@ -708,6 +735,7 @@ async def user_property_sql(
data = analysis.property_model() data = analysis.property_model()
return schemas.Msg(code=0, msg='ok', data=[data]) return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/user_property_model_export") @router.post("/user_property_model_export")
async def user_property_model_export( async def user_property_model_export(
request: Request, request: Request,
@ -730,7 +758,6 @@ async def user_property_model_export(
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/user_property_model") @router.post("/user_property_model")
async def user_property_model( async def user_property_model(
request: Request, request: Request,

View File

@ -125,6 +125,8 @@ class BehaviorAnalysis:
self.unit_num = None self.unit_num = None
self.report_name = None self.report_name = None
self.combination_event = [] self.combination_event = []
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
self.global_relation = 'and'
async def init(self, *args, **kwargs): async def init(self, *args, **kwargs):
@ -157,11 +159,12 @@ class BehaviorAnalysis:
self.global_filters = self._get_global_filters() self.global_filters = self._get_global_filters()
self.groupby = self._get_group_by() self.groupby = self._get_group_by()
self.unit_num = self._get_unit_num() self.unit_num = self._get_unit_num()
self.global_relation = self.event_view.get('relation', 'and')
# 用户自带过滤 # 用户自带过滤
if 'data_where' in kwargs: if 'data_where' in kwargs:
self.global_filters.extend(kwargs['data_where'].get(self.game, [])) self.global_filters.extend(kwargs['data_where'].get(self.game, []))
self.global_filters.extend(self.data_in.ext_filter) # self.global_filters.extend(self.data_in.ext_filter.get('filts', []))
def _get_time_particle_size(self): def _get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'P1D' return self.event_view.get('timeParticleSize') or 'P1D'
@ -207,23 +210,88 @@ class BehaviorAnalysis:
# self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns]) # self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns])
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
def handler_filts(self, *ext_filters, g_f=True): #
# def handler_filts(self, *ext_filters, g_f=True, relation='and'):
# user_filter = []
# event_filter = []
# # filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,)
# filters = []
# filters.extend(ext_filters)
# if g_f:
# filters.extend(self.global_filters)
#
# # filters = [] if filters == ([],) else filters
# for item in filters:
# if item['tableType'] == 'user':
# where = user_filter
# elif item['tableType'] == 'event':
# where = event_filter
# else:
# continue
#
# tbl = getattr(self, f'{item["tableType"]}_tbl')
# col = getattr(tbl.c, item['columnName'])
#
# comparator = item['comparator']
# ftv = item['ftv']
# if comparator == '==':
# if len(ftv) > 1:
# where.append(or_(*[col == v for v in ftv]))
# else:
# where.append(col == ftv[0])
# elif comparator == '>=':
# where.append(col >= ftv[0])
# elif comparator == '<=':
# where.append(col <= ftv[0])
# elif comparator == '>':
# where.append(col > ftv[0])
# elif comparator == '<':
# where.append(col < ftv[0])
#
# elif comparator == 'is not null':
# where.append(col.isnot(None))
# elif comparator == 'is null':
# where.append(col.is_(None))
#
# elif comparator == 'like':
# where.append(col.like(f'%{ftv[0]}%'))
#
# elif comparator == 'not like':
# where.append(col.notlike(f'%{ftv[0]}%'))
#
# elif comparator == 'in':
# where.append(col.in_(ftv))
#
# elif comparator == '!=':
# where.append(col != ftv[0])
# if relation == 'and':
# return event_filter, user_filter
# else:
# return or_(*event_filter), or_(*user_filter)
def handler_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
user_filters = []
event_filters = []
for filter in filters:
filts = filter[0]
relation = filter[1]
user_filter = [] user_filter = []
event_filter = [] event_filter = []
# filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,) for item in filts:
filters = []
filters.extend(ext_filters)
if g_f:
filters.extend(self.global_filters)
# filters = [] if filters == ([],) else filters
for item in filters:
if item['tableType'] == 'user': if item['tableType'] == 'user':
where = user_filter where = user_filter
elif item['tableType'] == 'event': elif item['tableType'] == 'event':
where = event_filter where = event_filter
else: else:
continue where = event_filter
tbl = getattr(self, f'{item["tableType"]}_tbl') tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName']) col = getattr(tbl.c, item['columnName'])
@ -258,12 +326,20 @@ class BehaviorAnalysis:
elif comparator == 'in': elif comparator == 'in':
where.append(col.in_(ftv)) where.append(col.in_(ftv))
elif comparator == '!=': elif comparator == '!=':
where.append(col != ftv[0]) where.append(col != ftv[0])
if relation == 'and':
if event_filter:
event_filters.append(and_(*event_filter))
if user_filter:
user_filters.append(and_(*user_filter)),
else:
if event_filter:
event_filters.append(or_(*event_filter))
if user_filter:
user_filters.append(or_(*user_filter))
return event_filter, user_filter return event_filters, user_filters
def retention_model_sql(self): def retention_model_sql(self):
event_name_a = self.events[0]['eventName'] event_name_a = self.events[0]['eventName']
@ -278,7 +354,8 @@ class BehaviorAnalysis:
if visit_name: if visit_name:
who_visit = getattr(self.event_tbl.c, visit_name) who_visit = getattr(self.event_tbl.c, visit_name)
filters, _ = self.handler_filts(*self.events[0].get('filts'), g_f=False) filters, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')),
self.ext_filters)
filters = filters or [1] filters = filters or [1]
selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'), selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'),
*self.groupby, *self.groupby,
@ -305,7 +382,10 @@ class BehaviorAnalysis:
func.addHours(event_time_col, self.zone_time) <= self.end_date, func.addHours(event_time_col, self.zone_time) <= self.end_date,
] ]
event_filter, user_filter = self.handler_filts() event_filter, user_filter = self.handler_filts(
(self.global_filters, self.global_relation),
self.ext_filters
)
groupby = [date_col] + self.groupby groupby = [date_col] + self.groupby
oredrby = [date_col] oredrby = [date_col]
@ -357,7 +437,10 @@ class BehaviorAnalysis:
custom = CustomEvent(self.event_tbl, formula, format).parse() custom = CustomEvent(self.event_tbl, formula, format).parse()
event_name = custom['event_name'] event_name = custom['event_name']
where = [event_name_col.in_(event_name)] where = [event_name_col.in_(event_name)]
event_filter, _ = self.handler_filts(*event['filts']) event_filter, _ = self.handler_filts((event['filts'], event.get('relation')),
(self.global_filters, self.global_relation),
self.ext_filters
)
select_exprs.extend(self.groupby) select_exprs.extend(self.groupby)
qry = sa.select( qry = sa.select(
*select_exprs, *select_exprs,
@ -384,7 +467,11 @@ class BehaviorAnalysis:
base_where.append(event_name_col == event_name) base_where.append(event_name_col == event_name)
analysis = event['analysis'] analysis = event['analysis']
event_filter, user_filter = self.handler_filts(*event['filts']) event_filter, user_filter = self.handler_filts(
(event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters
)
u_account_id_col = getattr(self.user_tbl.c, '#account_id') u_account_id_col = getattr(self.user_tbl.c, '#account_id')
# 按账号聚合 # 按账号聚合
@ -459,7 +546,8 @@ ORDER BY level
conds = [] conds = []
cond_level = [] cond_level = []
for item in self.events: for item in self.events:
event_filter, _ = self.handler_filts(*item['filts'], g_f=False) event_filter, _ = self.handler_filts((item['filts'], item.get('relation', 'and'))
, self.ext_filters)
conds.append( conds.append(
and_(event_name_col == item['eventName'], *event_filter) and_(event_name_col == item['eventName'], *event_filter)
) )
@ -469,7 +557,8 @@ ORDER BY level
func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from( func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from(
self.event_tbl) self.event_tbl)
g_event_filter, _ = self.handler_filts() g_event_filter, _ = self.handler_filts((self.global_filters, self.global_relation)
, self.ext_filters)
where = [ where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date, func.addHours(event_time_col, self.zone_time) <= self.end_date,
@ -514,7 +603,9 @@ ORDER BY level
func.addHours(event_time_col, self.zone_time) <= self.end_date, func.addHours(event_time_col, self.zone_time) <= self.end_date,
event_name_col == event_name event_name_col == event_name
] ]
event_filter, _ = self.handler_filts(*self.events[0]['filts']) event_filter, _ = self.handler_filts((event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where.extend(event_filter) where.extend(event_filter)
values_col = func.count().label('values') values_col = func.count().label('values')
if analysis in ['number_of_days', 'number_of_hours']: if analysis in ['number_of_days', 'number_of_hours']:
@ -677,14 +768,18 @@ ORDER BY values desc"""
visit_name = self.events[0].get('event_attr_id') visit_name = self.events[0].get('event_attr_id')
where, _ = self.handler_filts(*self.events[0].get('filts', [])) where, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_a = '1' where_a = '1'
if where: if where:
qry = sa.select().where(*where) qry = sa.select().where(*where)
sql = str(qry.compile(compile_kwargs={"literal_binds": True})) sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_a = sql.split('WHERE ')[1] where_a = sql.split('WHERE ')[1]
where, _ = self.handler_filts(*self.events[1].get('filts', [])) where, _ = self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_b = '1' where_b = '1'
if where: if where:
qry = sa.select().where(*where) qry = sa.select().where(*where)

View File

@ -26,6 +26,8 @@ class XAnalysis:
self.global_filters = [] self.global_filters = []
self.account_filters = [] self.account_filters = []
self.global_relation = 'and'
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
def _get_global_filters(self): def _get_global_filters(self):
return self.event_view.get('filts') or [] return self.event_view.get('filts') or []
@ -47,20 +49,35 @@ class XAnalysis:
self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
else: else:
self.event_view = self.data_in.eventView self.event_view = self.data_in.eventView
self.events = self.data_in.events self.events = self.data_in.events
self.global_filters = self._get_global_filters() self.global_filters = self._get_global_filters()
self.global_relation = self.event_view.get('relation', 'and')
# 用户自带过滤 # 用户自带过滤
if 'data_where' in kwargs: if 'data_where' in kwargs:
self.account_filters = kwargs['data_where'].get(self.game, []) self.account_filters = kwargs['data_where'].get(self.game, [])
self.global_filters.extend(self.data_in.ext_filter)
def handler_filts(self, filters):
where = [] def handler_filts(self, *filters):
for item in filters: """
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
event_filters = []
for filter in filters:
filts = filter[0]
relation = filter[1]
event_filter = []
for item in filts:
where = event_filter
col = sa.Column(item['columnName']) col = sa.Column(item['columnName'])
@ -85,18 +102,66 @@ class XAnalysis:
elif comparator == 'is null': elif comparator == 'is null':
where.append(col.is_(None)) where.append(col.is_(None))
elif comparator == '!=':
where.append(col != ftv[0])
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'like': elif comparator == 'like':
where.append(col.like(f'%{ftv[0]}%')) where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in': elif comparator == 'in':
where.append(col.in_(ftv)) where.append(col.in_(ftv))
return where elif comparator == '!=':
where.append(col != ftv[0])
if relation == 'and':
if event_filter:
event_filters.append(and_(*event_filter))
else:
if event_filter:
event_filters.append(or_(*event_filter))
return event_filters
# def handler_filts(self, filters):
# where = []
# for item in filters:
#
# col = sa.Column(item['columnName'])
#
# comparator = item['comparator']
# ftv = item['ftv']
# if comparator == '==':
# if len(ftv) > 1:
# where.append(or_(*[col == v for v in ftv]))
# else:
# where.append(col == ftv[0])
# elif comparator == '>=':
# where.append(col >= ftv[0])
# elif comparator == '<=':
# where.append(col <= ftv[0])
# elif comparator == '>':
# where.append(col > ftv[0])
# elif comparator == '<':
# where.append(col < ftv[0])
#
# elif comparator == 'is not null':
# where.append(col.isnot(None))
# elif comparator == 'is null':
# where.append(col.is_(None))
#
# elif comparator == '!=':
# where.append(col != ftv[0])
# elif comparator == 'not like':
# where.append(col.notlike(f'%{ftv[0]}%'))
#
# elif comparator == 'like':
# where.append(col.like(f'%{ftv[0]}%'))
#
# elif comparator == 'in':
# where.append(col.in_(ftv))
#
# return where
def ltv_model_sql(self): def ltv_model_sql(self):
@ -123,14 +188,14 @@ class XAnalysis:
sql = str(qry.compile(compile_kwargs={"literal_binds": True})) sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_str = sql.split('WHERE ')[1] where_str = sql.split('WHERE ')[1]
where_order = self.handler_filts(self.global_filters) where_order = self.handler_filts((self.global_filters, self.global_relation))
where_order_str = 1 where_order_str = 1
if where_order: if where_order:
qry = sa.select().where(*where_order) qry = sa.select().where(*where_order)
sql = str(qry.compile(compile_kwargs={"literal_binds": True})) sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_order_str = sql.split('WHERE ')[1] where_order_str = sql.split('WHERE ')[1]
where_account = self.handler_filts(self.account_filters) where_account = self.handler_filts((self.account_filters, 'and'),self.ext_filters)
where_account_str = 1 where_account_str = 1
if where_account: if where_account:
qry = sa.select().where(*where_account) qry = sa.select().where(*where_account)

View File

@ -1,4 +1,4 @@
from typing import List, Union from typing import List, Union, Dict
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
@ -12,4 +12,4 @@ class CkQuery(BaseModel):
eventView: dict = None eventView: dict = None
events: Union[List[dict], dict] = None events: Union[List[dict], dict] = None
report_id: str = None report_id: str = None
ext_filter: list = [] ext_filter: dict = dict()