diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index c023568..6833187 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -27,15 +27,39 @@ router = APIRouter() @router.post("/sql") async def query_sql( request: Request, + game: str, data_in: schemas.Sql, ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """原 sql 查询 """ - data = await ckdb.execute(data_in.sql) + sql = data_in.sql + sql = sql.replace('$game', game) + data = await ckdb.execute(sql) return schemas.Msg(code=0, msg='ok', data=data) +@router.post("/sql_export") +async def query_sql( + request: Request, + game: str, + data_in: schemas.Sql, + ckdb: CKDrive = Depends(get_ck_db), + current_user: schemas.UserDB = Depends(deps.get_current_user) +): + """sql 导出 """ + file_name = quote(f'result.xlsx') + mime = mimetypes.guess_type(file_name)[0] + + sql = data_in.sql + sql = sql.replace('$game', game) + df = await ckdb.query_dataframe(sql) + df_to_stream = DfToStream((df, '留存分析')) + with df_to_stream as d: + export = d.to_stream() + return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) + + @router.post("/event_model_sql") async def event_model_sql( request: Request, @@ -145,7 +169,7 @@ async def event_model( df = await ckdb.query_dataframe(sql) df.fillna(0, inplace=True) if df.shape[0] == 0: - df = pd.DataFrame({'date':date_range,'values':0*len(date_range)}) + df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)}) # continue # return schemas.Msg(code=0, msg='ok', data=[q]) if item['time_particle'] == 'total': @@ -212,12 +236,15 @@ async def event_model( res.append(q) # 按总和排序 for item in res: - if item['time_particle'] in ('P1D', 'P1W'): - item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']] - elif item['time_particle'] in ('P1M',): - item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']] - else: - item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']] + try: + if item['time_particle'] in ('P1D', 'P1W'): + item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']] + elif item['time_particle'] in ('P1M',): + item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']] + else: + item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']] + except: + pass sort_key = np.argsort(np.array(item['sum']))[::-1] if item.get('groups'): @@ -708,6 +735,7 @@ async def user_property_sql( data = analysis.property_model() return schemas.Msg(code=0, msg='ok', data=[data]) + @router.post("/user_property_model_export") async def user_property_model_export( request: Request, @@ -730,7 +758,6 @@ async def user_property_model_export( return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) - @router.post("/user_property_model") async def user_property_model( request: Request, diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index 0af4435..2a297cd 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -125,6 +125,8 @@ class BehaviorAnalysis: self.unit_num = None self.report_name = None self.combination_event = [] + self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) + self.global_relation = 'and' async def init(self, *args, **kwargs): @@ -157,11 +159,12 @@ class BehaviorAnalysis: self.global_filters = self._get_global_filters() self.groupby = self._get_group_by() self.unit_num = self._get_unit_num() + self.global_relation = self.event_view.get('relation', 'and') # 用户自带过滤 if 'data_where' in kwargs: self.global_filters.extend(kwargs['data_where'].get(self.game, [])) - self.global_filters.extend(self.data_in.ext_filter) + # self.global_filters.extend(self.data_in.ext_filter.get('filts', [])) def _get_time_particle_size(self): return self.event_view.get('timeParticleSize') or 'P1D' @@ -207,63 +210,136 @@ class BehaviorAnalysis: # self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns]) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) - def handler_filts(self, *ext_filters, g_f=True): - user_filter = [] - event_filter = [] - # filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,) - filters = [] - filters.extend(ext_filters) - if g_f: - filters.extend(self.global_filters) + # + # def handler_filts(self, *ext_filters, g_f=True, relation='and'): + # user_filter = [] + # event_filter = [] + # # filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,) + # filters = [] + # filters.extend(ext_filters) + # if g_f: + # filters.extend(self.global_filters) + # + # # filters = [] if filters == ([],) else filters + # for item in filters: + # if item['tableType'] == 'user': + # where = user_filter + # elif item['tableType'] == 'event': + # where = event_filter + # else: + # continue + # + # tbl = getattr(self, f'{item["tableType"]}_tbl') + # col = getattr(tbl.c, item['columnName']) + # + # comparator = item['comparator'] + # ftv = item['ftv'] + # if comparator == '==': + # if len(ftv) > 1: + # where.append(or_(*[col == v for v in ftv])) + # else: + # where.append(col == ftv[0]) + # elif comparator == '>=': + # where.append(col >= ftv[0]) + # elif comparator == '<=': + # where.append(col <= ftv[0]) + # elif comparator == '>': + # where.append(col > ftv[0]) + # elif comparator == '<': + # where.append(col < ftv[0]) + # + # elif comparator == 'is not null': + # where.append(col.isnot(None)) + # elif comparator == 'is null': + # where.append(col.is_(None)) + # + # elif comparator == 'like': + # where.append(col.like(f'%{ftv[0]}%')) + # + # elif comparator == 'not like': + # where.append(col.notlike(f'%{ftv[0]}%')) + # + # elif comparator == 'in': + # where.append(col.in_(ftv)) + # + # elif comparator == '!=': + # where.append(col != ftv[0]) + # if relation == 'and': + # return event_filter, user_filter + # else: + # return or_(*event_filter), or_(*user_filter) - # filters = [] if filters == ([],) else filters - for item in filters: - if item['tableType'] == 'user': - where = user_filter - elif item['tableType'] == 'event': - where = event_filter - else: - continue + def handler_filts(self, *filters): + """ - tbl = getattr(self, f'{item["tableType"]}_tbl') - col = getattr(tbl.c, item['columnName']) + :param filters: (filts:list,relation:str) + :param g_f: + :param relation: + :return: + """ - comparator = item['comparator'] - ftv = item['ftv'] - if comparator == '==': - if len(ftv) > 1: - where.append(or_(*[col == v for v in ftv])) + user_filters = [] + event_filters = [] + for filter in filters: + filts = filter[0] + relation = filter[1] + user_filter = [] + event_filter = [] + for item in filts: + if item['tableType'] == 'user': + where = user_filter + elif item['tableType'] == 'event': + where = event_filter else: - where.append(col == ftv[0]) - elif comparator == '>=': - where.append(col >= ftv[0]) - elif comparator == '<=': - where.append(col <= ftv[0]) - elif comparator == '>': - where.append(col > ftv[0]) - elif comparator == '<': - where.append(col < ftv[0]) + where = event_filter - elif comparator == 'is not null': - where.append(col.isnot(None)) - elif comparator == 'is null': - where.append(col.is_(None)) + tbl = getattr(self, f'{item["tableType"]}_tbl') + col = getattr(tbl.c, item['columnName']) - elif comparator == 'like': - where.append(col.like(f'%{ftv[0]}%')) + comparator = item['comparator'] + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) - elif comparator == 'not like': - where.append(col.notlike(f'%{ftv[0]}%')) + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) - elif comparator == 'in': - where.append(col.in_(ftv)) + elif comparator == 'like': + where.append(col.like(f'%{ftv[0]}%')) + elif comparator == 'not like': + where.append(col.notlike(f'%{ftv[0]}%')) + elif comparator == 'in': + where.append(col.in_(ftv)) - elif comparator == '!=': - where.append(col != ftv[0]) + elif comparator == '!=': + where.append(col != ftv[0]) + if relation == 'and': + if event_filter: + event_filters.append(and_(*event_filter)) + if user_filter: + user_filters.append(and_(*user_filter)), + else: + if event_filter: + event_filters.append(or_(*event_filter)) + if user_filter: + user_filters.append(or_(*user_filter)) - return event_filter, user_filter + return event_filters, user_filters def retention_model_sql(self): event_name_a = self.events[0]['eventName'] @@ -278,7 +354,8 @@ class BehaviorAnalysis: if visit_name: who_visit = getattr(self.event_tbl.c, visit_name) - filters, _ = self.handler_filts(*self.events[0].get('filts'), g_f=False) + filters, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')), + self.ext_filters) filters = filters or [1] selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'), *self.groupby, @@ -305,7 +382,10 @@ class BehaviorAnalysis: func.addHours(event_time_col, self.zone_time) <= self.end_date, ] - event_filter, user_filter = self.handler_filts() + event_filter, user_filter = self.handler_filts( + (self.global_filters, self.global_relation), + self.ext_filters + ) groupby = [date_col] + self.groupby oredrby = [date_col] @@ -357,7 +437,10 @@ class BehaviorAnalysis: custom = CustomEvent(self.event_tbl, formula, format).parse() event_name = custom['event_name'] where = [event_name_col.in_(event_name)] - event_filter, _ = self.handler_filts(*event['filts']) + event_filter, _ = self.handler_filts((event['filts'], event.get('relation')), + (self.global_filters, self.global_relation), + self.ext_filters + ) select_exprs.extend(self.groupby) qry = sa.select( *select_exprs, @@ -384,7 +467,11 @@ class BehaviorAnalysis: base_where.append(event_name_col == event_name) analysis = event['analysis'] - event_filter, user_filter = self.handler_filts(*event['filts']) + event_filter, user_filter = self.handler_filts( + (event['filts'], event.get('relation', 'and')), + (self.global_filters, self.global_relation) + , self.ext_filters + ) u_account_id_col = getattr(self.user_tbl.c, '#account_id') # 按账号聚合 @@ -459,7 +546,8 @@ ORDER BY level conds = [] cond_level = [] for item in self.events: - event_filter, _ = self.handler_filts(*item['filts'], g_f=False) + event_filter, _ = self.handler_filts((item['filts'], item.get('relation', 'and')) + , self.ext_filters) conds.append( and_(event_name_col == item['eventName'], *event_filter) ) @@ -469,7 +557,8 @@ ORDER BY level func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from( self.event_tbl) - g_event_filter, _ = self.handler_filts() + g_event_filter, _ = self.handler_filts((self.global_filters, self.global_relation) + , self.ext_filters) where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, @@ -514,7 +603,9 @@ ORDER BY level func.addHours(event_time_col, self.zone_time) <= self.end_date, event_name_col == event_name ] - event_filter, _ = self.handler_filts(*self.events[0]['filts']) + event_filter, _ = self.handler_filts((event['filts'], event.get('relation', 'and')), + (self.global_filters, self.global_relation) + , self.ext_filters) where.extend(event_filter) values_col = func.count().label('values') if analysis in ['number_of_days', 'number_of_hours']: @@ -677,14 +768,18 @@ ORDER BY values desc""" visit_name = self.events[0].get('event_attr_id') - where, _ = self.handler_filts(*self.events[0].get('filts', [])) + where, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')), + (self.global_filters, self.global_relation) + , self.ext_filters) where_a = '1' if where: qry = sa.select().where(*where) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_a = sql.split('WHERE ')[1] - where, _ = self.handler_filts(*self.events[1].get('filts', [])) + where, _ = self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')), + (self.global_filters, self.global_relation) + , self.ext_filters) where_b = '1' if where: qry = sa.select().where(*where) diff --git a/models/x_analysis.py b/models/x_analysis.py index a7272dd..8713757 100644 --- a/models/x_analysis.py +++ b/models/x_analysis.py @@ -26,6 +26,8 @@ class XAnalysis: self.global_filters = [] self.account_filters = [] + self.global_relation = 'and' + self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) def _get_global_filters(self): return self.event_view.get('filts') or [] @@ -47,56 +49,119 @@ class XAnalysis: self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') - else: self.event_view = self.data_in.eventView self.events = self.data_in.events self.global_filters = self._get_global_filters() + self.global_relation = self.event_view.get('relation', 'and') + # 用户自带过滤 if 'data_where' in kwargs: self.account_filters = kwargs['data_where'].get(self.game, []) - self.global_filters.extend(self.data_in.ext_filter) - def handler_filts(self, filters): - where = [] - for item in filters: - col = sa.Column(item['columnName']) - comparator = item['comparator'] - ftv = item['ftv'] - if comparator == '==': - if len(ftv) > 1: - where.append(or_(*[col == v for v in ftv])) - else: - where.append(col == ftv[0]) - elif comparator == '>=': - where.append(col >= ftv[0]) - elif comparator == '<=': - where.append(col <= ftv[0]) - elif comparator == '>': - where.append(col > ftv[0]) - elif comparator == '<': - where.append(col < ftv[0]) + def handler_filts(self, *filters): + """ + :param filters: (filts:list,relation:str) + :param g_f: + :param relation: + :return: + """ - elif comparator == 'is not null': - where.append(col.isnot(None)) - elif comparator == 'is null': - where.append(col.is_(None)) + event_filters = [] + for filter in filters: + filts = filter[0] + relation = filter[1] + event_filter = [] + for item in filts: - elif comparator == '!=': - where.append(col != ftv[0]) - elif comparator == 'not like': - where.append(col.notlike(f'%{ftv[0]}%')) + where = event_filter - elif comparator == 'like': - where.append(col.like(f'%{ftv[0]}%')) + col = sa.Column(item['columnName']) - elif comparator == 'in': - where.append(col.in_(ftv)) + comparator = item['comparator'] + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) - return where + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + + elif comparator == 'like': + where.append(col.like(f'%{ftv[0]}%')) + + elif comparator == 'not like': + where.append(col.notlike(f'%{ftv[0]}%')) + + elif comparator == 'in': + where.append(col.in_(ftv)) + + elif comparator == '!=': + where.append(col != ftv[0]) + if relation == 'and': + if event_filter: + event_filters.append(and_(*event_filter)) + else: + if event_filter: + event_filters.append(or_(*event_filter)) + + + return event_filters + + # def handler_filts(self, filters): + # where = [] + # for item in filters: + # + # col = sa.Column(item['columnName']) + # + # comparator = item['comparator'] + # ftv = item['ftv'] + # if comparator == '==': + # if len(ftv) > 1: + # where.append(or_(*[col == v for v in ftv])) + # else: + # where.append(col == ftv[0]) + # elif comparator == '>=': + # where.append(col >= ftv[0]) + # elif comparator == '<=': + # where.append(col <= ftv[0]) + # elif comparator == '>': + # where.append(col > ftv[0]) + # elif comparator == '<': + # where.append(col < ftv[0]) + # + # elif comparator == 'is not null': + # where.append(col.isnot(None)) + # elif comparator == 'is null': + # where.append(col.is_(None)) + # + # elif comparator == '!=': + # where.append(col != ftv[0]) + # elif comparator == 'not like': + # where.append(col.notlike(f'%{ftv[0]}%')) + # + # elif comparator == 'like': + # where.append(col.like(f'%{ftv[0]}%')) + # + # elif comparator == 'in': + # where.append(col.in_(ftv)) + # + # return where def ltv_model_sql(self): @@ -123,14 +188,14 @@ class XAnalysis: sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_str = sql.split('WHERE ')[1] - where_order = self.handler_filts(self.global_filters) + where_order = self.handler_filts((self.global_filters, self.global_relation)) where_order_str = 1 if where_order: qry = sa.select().where(*where_order) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_order_str = sql.split('WHERE ')[1] - where_account = self.handler_filts(self.account_filters) + where_account = self.handler_filts((self.account_filters, 'and'),self.ext_filters) where_account_str = 1 if where_account: qry = sa.select().where(*where_account) diff --git a/schemas/sql.py b/schemas/sql.py index f82244c..070e5e4 100644 --- a/schemas/sql.py +++ b/schemas/sql.py @@ -1,4 +1,4 @@ -from typing import List, Union +from typing import List, Union, Dict from pydantic import BaseModel from typing import Optional @@ -12,4 +12,4 @@ class CkQuery(BaseModel): eventView: dict = None events: Union[List[dict], dict] = None report_id: str = None - ext_filter: list = [] + ext_filter: dict = dict()