From 49c7668169d7924412654d4862651392c3b855ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=80=C3=AE=C3=97=C3=9A=C3=95=C3=B1?= Date: Thu, 7 Jul 2022 15:56:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=87=E7=AD=BE=E7=94=9F=E6=95=88=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api_v1/endpoints/query.py | 196 ++++++++++++++++++++------------- api/api_v1/endpoints/xquery.py | 53 ++++----- models/behavior_analysis.py | 166 ++++++++++++++++++++++++---- models/x_analysis.py | 118 +++++++++++++++++--- 4 files changed, 393 insertions(+), 140 deletions(-) diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 1c51414..f3febbd 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -616,16 +616,29 @@ async def retention_model(request: Request, tmp['n_outflow'].append(v[f'on{i}']) tmp = summary_values['均值'] retention_avg_dict = {} + group_label = res['group_label'] + # 多个分组项时,合成列表返回 - if len(groupby)>1: + if not group_label: + if len(groupby) > 1: + summary_valuess = {} + for k, v in summary_values.items(): + if 'str' in str(type(k)): + summary_valuess[str([k])] = v + else: + summary_valuess[str(list(k))] = v + + else: + summary_valuess = summary_values + # 包含标签分组项 + else: summary_valuess = {} for k, v in summary_values.items(): - if 'str' in str(type(k)): - summary_valuess[str([k])] = v - else: - summary_valuess[str(list(k))] = v - else: - summary_valuess=summary_values + key = list(k) + # 增加标签分组到对应的key里面 + for name, index in group_label.items(): + key.insert(index, name) + summary_valuess[str(key)] = v for rn in retention_n: for rt, rd in df.T.items(): @@ -1436,26 +1449,29 @@ async def scatter_model( event_type = analysis.events[0]['eventName'] where = analysis.events[-1]['quotaname'] sql = res['sql'] - columnName = analysis.event_view['groupBy'][-1]['columnName'] - if analysis.event_view['groupBy'] != []: - - if columnName != '': - #按天分组 - sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', - 1) - sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) - #按周分组 - sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', - 1) - sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) - #按月分组 - sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', - 1) - sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) - #合计 - if analysis.event_view.get('timeParticleSize') == "total": - sql = sql.replace(f'SELECT', f'SELECT `{columnName}` as va,', 1) - print(sql) + group_by = analysis.event_view['groupBy'] + # 排除标签 + true_group = [i for i in group_by if i['data_type'] != "user_label"] + columnName = true_group[-1]['columnName'] + if true_group != []: + # if columnName != '': + # # 按天分组 + # sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', + # 1) + # sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1) + # # 按周分组 + # sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', + # f'`{columnName}` as va', + # 1) + # sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1) + # # 按月分组 + # sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', + # f'`{columnName}` as va', + # 1) + # sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1) + # # 合计 + # if analysis.event_view.get('timeParticleSize') == "total": + # sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1) df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) @@ -1472,6 +1488,8 @@ async def scatter_model( interval_type = res['interval_type'] analysi = res['analysis'] groupby = res['groupby'] + true_df = df.groupby(groupby).sum() + group_label = res['group_label'] quota_interval_arr = res['quota_interval_arr'] # 兼容合计的 # if res['time_particle'] == 'total': @@ -1480,20 +1498,25 @@ async def scatter_model( if analysi != 'number_of_days' and interval_type != 'discrete': # 默认区间 - max_v = int(df['values'].max()) - min_v = int(df['values'].min()) + max_v = int(true_df['values'].max()) + min_v = int(true_df['values'].min()) interval = (max_v - min_v) // 10 or 1 resp = {'list': dict(), 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'], - 'biaotou': columnName + 'biaotou': groupby } - if 'float' in str(df.dtypes['va']): - df['va'] = df['va'].astype(int) - if 'list' in str(type(df['va'][0])): - f = lambda x: x[0] - df['va'] = df['va'].map(f) + + # if 'float' in str(df.dtypes['va']): + # df['va'] = df['va'].astype(int) + # for index, gi in enumerate(groupby): + # resp['list'][str(index)] = dict() + # if 'float' in str(df.dtypes[gi]): + # df[gi] = df[gi].astype(int) + # if 'list' in str(type(df[gi][0])): + # f = lambda x: x[0] + # df[gi] = df[gi].map(f) if not quota_interval_arr: resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)] bins = [i for i in range(min_v, max_v + interval, interval)] @@ -1510,12 +1533,19 @@ async def scatter_model( # f = lambda x: x[0] # df['va'] = df['va'].map(f) # 这是分组的 - for key, tmp_df in df.groupby('va'): + for key, tmp_df in true_df.groupby(groupby): bins_s = pd.cut(tmp_df['values'], bins=bins, right=True, include_lowest=True).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) + if group_label: + if isinstance(key, str): + key = [key] + key = list(key) + for name, idx in group_label.items(): + key.insert(idx, name) + key = str(key) if res['time_particle'] == 'total111': resp['list']['合计'] = dict() @@ -1528,32 +1558,36 @@ async def scatter_model( if str(p[i]) == 'nan': p[i] = 0 # 映射对应的埋点数据 - re = await crud.select_map.get_list(db, game) - re_list = [i['attr_name'] for i in re] - if columnName in re_list: - for i in re: - if columnName == i['attr_name']: - for datas in i['map_']: - if key == datas['id']: - key = datas['title'] - break - break - if 'time' not in columnName: - resp['list'][key] = dict() - resp['list'][key] = {'n': bins_s.to_list(), 'total': total, - 'p': [str(i) + '%' for i in p], - 'title': '总体'} - else: - resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict() - resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total, - 'p': [str(i) + '%' for i in p], - 'title': '总体'} + # re = await crud.select_map.get_list(db, game) + # re_list = [i['attr_name'] for i in re] + # if gi in re_list: + # for i in re: + # if gi == i['attr_name']: + # for datas in i['map_']: + # if key == datas['id']: + # key = datas['title'] + # break + # break + # if 'time' not in groupby: + resp['list'][str(key)] = dict() + resp['list'][str(key)] = {'n': bins_s.to_list(), 'total': total, + 'p': [str(i) + '%' for i in p], + 'title': '总体'} + # else: + # resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict() + # resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total, + # 'p': [str(i) + '%' for i in p], + # 'title': '总体'} # 兼容下载功能 download = analysis.event_view.get('download', '') if download == 1: create_df = create_neidf(resp, columnName) Download = Download_xlsx(create_df, '分布分析') return Download + + if group_label: + for name, idx in group_label.items(): + resp['biaotou'].insert(idx, name) return schemas.Msg(code=0, msg='ok', data=resp) else: # 离散数字 @@ -1561,15 +1595,17 @@ async def scatter_model( 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'], - 'biaotou': columnName + 'biaotou': groupby } - labels = [str(i) for i in sorted(df['values'].unique())] + labels = [str(i) for i in sorted(true_df['values'].unique())] resp['label'] = labels - if 'list' in str(type(df['va'][0])): - f = lambda x: x[0] - df['va'] = df['va'].map(f) + # for index, gi in enumerate(groupby): + # resp['list'][str(index)] = dict() + # if 'list' in str(type(df[gi][0])): + # f = lambda x: x[0] + # df[gi] = df[gi].map(f) shaixuan = analysis.events[0].get('analysis') - for key, tmp_df in df.groupby(['va']): + for key, tmp_df in true_df.groupby(groupby): if shaixuan == 'uniqExact': total = len(set(tmp_df['uid'])) else: @@ -1578,20 +1614,28 @@ async def scatter_model( dt = '合计' else: # 映射对应的埋点数据 - re = await crud.select_map.get_list(db, game) - re_list = [i['attr_name'] for i in re] - if columnName in re_list: - for i in re: - if columnName == i['attr_name']: - for datas in i['map_']: - if key == datas['id']: - key = datas['title'] - break - break + # re = await crud.select_map.get_list(db, game) + # re_list = [i['attr_name'] for i in re] + # if gi in re_list: + # for i in re: + # if gi == i['attr_name']: + # for datas in i['map_']: + # if key == datas['id']: + # key = datas['title'] + # break + # break dt = key # dt = key.strftime('%Y-%m-%d') # dt='合计' + # 存在标签分组 + if group_label: + if isinstance(dt, str): + dt = [dt] + dt = list(dt) + for name, idx in group_label.items(): + dt.insert(idx, name) + dt = str(dt) labels_dict = {} for key2, tmp_df2 in tmp_df.groupby('values'): label = str(key2) @@ -1618,7 +1662,7 @@ async def scatter_model( number_str = str(number_int) + '%' list_p.append(number_str) - resp['list'][dt] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total, + resp['list'][str(dt)] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total, 'p': list_p} else: list_p = [] @@ -1626,7 +1670,7 @@ async def scatter_model( number_int = round(labels_dict.get(i, 0) * 100 / total, 2) number_str = str(number_int) + '%' list_p.append(number_str) - resp['list'][dt] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, + resp['list'][str(dt)] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, 'p': list_p} # resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, # 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}} @@ -1641,6 +1685,10 @@ async def scatter_model( create_df = create_neidf(resp, columnName) Download = Download_xlsx(create_df, '分布分析') return Download + + if group_label: + for name, idx in group_label.items(): + resp['biaotou'].insert(idx, name) return schemas.Msg(code=0, msg='ok', data=resp) else: return schemas.Msg(code=-9, msg='没有添加分组项', data='') diff --git a/api/api_v1/endpoints/xquery.py b/api/api_v1/endpoints/xquery.py index 44e84f0..67d6a48 100644 --- a/api/api_v1/endpoints/xquery.py +++ b/api/api_v1/endpoints/xquery.py @@ -51,34 +51,34 @@ async def ltv_model_sql( ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) - res = analysis.ltv_model_sql() - sql = res['sql'] + res = await analysis.ltv_model_sql() + sql = res['sql'].replace('/n','').replace('[','').replace(']','') #仅一条筛选条件则是把GM过滤后获取全部数据 - if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': - try: - df = await ckdb.query_dataframe(sql) - except Exception as e: - return schemas.Msg(code=-9, msg='报表配置参数异常') + # if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': + try: + df = await ckdb.query_dataframe(sql) + except Exception as e: + return schemas.Msg(code=-9, msg='报表配置参数异常') #多条筛选条件则合成新的sql - else: - new_sql="""""" - #拆分sql - split_sql = sql.split('AND 1') - #获取每一条筛选条件 - for i in analysis.global_filters: - #剔除GM - if i['strftv'] != 'GM': - #获取筛选条件的包含关系 - bijiao=get_bijiao(i["comparator"]) - #获取筛选条件的值 - condition=tuple(i['ftv']) - #获取事件名 - columnName=i['columnName'] - dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" - new_sql+=dd - split_="""AND 1 """ - news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] - df = await ckdb.query_dataframe(news_sql) + # else: + # new_sql="""""" + # #拆分sql + # split_sql = sql.split('AND 1') + # #获取每一条筛选条件 + # for i in analysis.global_filters: + # #剔除GM + # if i['strftv'] != 'GM': + # #获取筛选条件的包含关系 + # bijiao=get_bijiao(i["comparator"]) + # #获取筛选条件的值 + # condition=tuple(i['ftv']) + # #获取事件名 + # columnName=i['columnName'] + # dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" + # new_sql+=dd + # split_="""AND 1 """ + # news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] + # df = await ckdb.query_dataframe(news_sql) # # 判断11月23号之前的数据 # list_data_range=analysis.date_range # liststr_data_range=[] @@ -248,6 +248,7 @@ async def ltv_model_sql( return schemas.Msg(code=0, msg='ok', data=data) + @router.post("/ltv_model_export") async def ltv_model_export(request: Request, game: str, diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index b4a932a..228d95c 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -1,6 +1,6 @@ import re from typing import Tuple - +from copy import deepcopy import arrow import sqlalchemy as sa import json @@ -196,14 +196,24 @@ class BehaviorAnalysis: if item['data_type'] == 'user_label': item.update({ 'comparator': "in", - 'comparator_name': "是" + 'comparator_name': "是", + 'tableType': "user_label" }) # 加入分组标签 self.group_label.update({item['columnDesc']: idx}) - # 加入events中每个event的filts条件中 - if self.events: - for i in self.events: - i['filts'].append(item) + + # 事件分析,分布分析 + if self.event_view['cksql'] in ['event', 'scatter']: + # 加入events中每个event的filts条件中 + if self.events: + for i in self.events: + i['filts'].append(item) + + # 留存分析,漏斗分析 + if self.event_view['cksql'] in ['retention', 'funnel']: + # 加入全局删选filts条件中 + self.event_view['filts'].append(item) + continue # 不是标签加入分组项中 res.append(getattr(self.event_tbl.c, item['columnName'])) @@ -230,7 +240,29 @@ class BehaviorAnalysis: return start_date, end_date, date_range def _get_global_filters(self): - return self.event_view.get('filts') or [] + _res = [] + + if self.event_view.get('filts'): + _res = self.event_view.get('filts') + # 漏斗分析,特殊处理 + if self.event_view['cksql'] in ['funnel']: + _trueRes = [] + for i in _res: + if 'comparator' in i: + trueI = deepcopy(i) + _trueRes.append(trueI) + continue + comparator = i.pop('comparator_id') + comparatorName = i.pop('comparator_name') + i.update({ + 'comparator': comparator, + 'comparatorName': comparatorName, + 'tableType': "user_label" + }) + trueI = deepcopy(i) + _trueRes.append(trueI) + _res = _trueRes + return _res async def _init_table(self): """ @@ -248,6 +280,87 @@ class BehaviorAnalysis: # self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns]) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) + async def handler_trace_filts(self, *filters): + """ + + :param filters: (filts:list,relation:str) + :param g_f: + :param relation: + :return: + """ + + user_filters = [] + event_filters = [] + for filter in filters: + user_filter = [] + event_filter = [] + for item in filter: + comparator = item['comparator_id'] + if item['tableType'] == 'user': + where = user_filter + elif item['tableType'] == 'event': + where = event_filter + elif item['tableType'] == 'user_label': + user_cluster_def = UserClusterDef(self.game, item['columnName'], self.data_where) + await user_cluster_def.init() + sub_qry = user_cluster_def.to_sql_qry() + if comparator == 'in': + event_filter.append(sa.Column('#account_id').in_(sub_qry)) + else: + event_filter.append(sa.Column('#account_id').notin_(sub_qry)) + + continue + else: + continue + + tbl = getattr(self, f'{item["tableType"]}_tbl') + col = getattr(tbl.c, item['columnName']) + # 日期类型处理时区 + if item.get('data_type') == 'datetime': + col = func.addHours(col, self.zone_time) + + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) + elif comparator == 'range': + where.append(col > ftv[0]) + where.append(col <= ftv[1]) + + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + + elif comparator == 'like': + where.append(col.like(f'%{ftv[0]}%')) + + elif comparator == 'not like': + where.append(col.notlike(f'%{ftv[0]}%')) + + elif comparator == 'in': + where.append(col.in_(ftv)) + + elif comparator == '!=': + where.append(col != ftv[0]) + else: + if event_filter: + event_filters.append(and_(*event_filter)) + if user_filter: + user_filters.append(and_(*user_filter)) + + return event_filters, user_filters + async def handler_filts(self, *filters): """ @@ -684,6 +797,7 @@ ORDER BY level sql = sql.replace('_windows_gap_', f"({windows_gap})") print(sql) return {'sql': sql, + 'group_label': self.group_label, 'groupby': [i.key for i in self.groupby], 'date_range': self.date_range, 'cond_level': cond_level, @@ -773,6 +887,7 @@ ORDER BY level print(sql) return { 'sql': sql, + 'group_label': self.group_label, 'interval_type': event['intervalType'], 'analysis': analysis, 'quota_interval_arr': quota_interval_arr, @@ -815,6 +930,7 @@ ORDER BY level print(sql) return { 'sql': sql, + 'group_label': self.group_label, 'interval_type': event['intervalType'], 'analysis': analysis, 'quota_interval_arr': quota_interval_arr, @@ -837,17 +953,19 @@ ORDER BY level event_names = self.events.get('event_names') source_event = self.events.get('source_event', {}).get('eventName') source_type = self.events.get('source_event', {}).get('source_type') - wheres = self.events['user_filter']['filts'] - sql_one='' - if wheres != []: # 有筛选条件的时候使用 - columnName=wheres[0]['columnName'] # 获取字段名 - event=await get_event(columnName,self.game) # 获取字段属于那个事件,或者是否是基础属性 - if event == '基础属性': - sql_one=f""" and `{columnName}` {wheres[0]['comparator']} '{wheres[0]['ftv'][0]}' and `#event_name` in evnet_all) """ - else:# 如果包含有事件则进入下面的逻辑 - sql_one=f"""and `#account_id` in ( SELECT `#account_id` FROM {self.game}.event WHERE `#event_name`= '{event}' and `{columnName}` = - '{wheres[0]['ftv'][0]}' and addHours(`#event_time`, {self.zone_time}) >= start_data - and addHours(`#event_time`, {self.zone_time}) <= end_data ) and `#event_name` in evnet_all)""" + filters = self.events.get('user_filter', {}).get('filts', []) + event_filter, user_filter = await self.handler_trace_filts(filters) + where_a = '1' + if event_filter: + qry = sa.select().where(*event_filter) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + where_a = 'WHERE '.join(sql.split('WHERE ')[1:]) + where_b = '1' + if user_filter: + qry = sa.select().where(*user_filter) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + where_b = sql.split('WHERE ')[1] + sql_a = f"""with '{source_event}' as start_event, {tuple(event_names)} as evnet_all, @@ -889,7 +1007,10 @@ from (with group by `#account_id` HAVING has_midway_hit = 1 ) -where arrayElement(event_chain, 1) = start_event +where arrayElement(event_chain, 1) = start_event and {where_a} and "#account_id" IN (SELECT "#account_id" +FROM (SELECT "#account_id" +FROM {self.game}.user_view +WHERE {where_b}) AS anon_b) GROUP BY event_chain,`#account_id` ORDER BY values desc """ @@ -933,13 +1054,14 @@ from (with group by `#account_id` HAVING has_midway_hit = 1 ) -where arrayElement(event_chain, -1) = end_event +where arrayElement(event_chain, -1) = end_event and {where_a} and "#account_id" IN (SELECT "#account_id" +FROM (SELECT "#account_id" +FROM {self.game}.user_view +WHERE {where_b}) AS anon_b) GROUP BY event_chain,`#account_id` ORDER BY values desc""" sql = sql_a if source_type == 'initial_event' else sql_b - if sql_one != '': - sql= sql.replace('and `#event_name` in evnet_all)',sql_one,1) print(sql) return { 'sql': sql, diff --git a/models/x_analysis.py b/models/x_analysis.py index f9ac39c..ce20433 100644 --- a/models/x_analysis.py +++ b/models/x_analysis.py @@ -14,9 +14,8 @@ import crud import schemas from core.config import settings from db import get_database -from db.redisdb import get_redis_pool, RedisDrive from models.user_label import UserClusterDef -from utils import get_event +from db.redisdb import get_redis_pool, RedisDrive class XAnalysis: @@ -25,20 +24,28 @@ class XAnalysis: self.game = game self.event_view = dict() self.events = [] - + self.zone_time: int = 0 self.global_filters = [] self.account_filters = [] self.global_relation = 'and' self.date_range = [] self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) - self.start_date = None - self.end_date = None - self.data_where=[] - self.event_tbl=None - self.zone_time:int = 0 + def _get_global_filters(self): - return self.event_view.get('filts') or [] #获取event_view字典里面filts的值,或返回空列表 + _res = self.event_view.get('filts', []) + if _res: + for idx, item in enumerate(_res): + if item['data_type'] == 'user_label': + _res[idx].update({ + 'tableType': item['data_type'], + }) + else: + _res[idx].update({ + 'tableType': item['table_type'], + }) + + return _res # 获取event_view字典里面filts的值,或返回空列表 async def init(self, *args, **kwargs): if self.data_in.report_id: @@ -70,8 +77,69 @@ class XAnalysis: if 'data_where' in kwargs: self.account_filters = kwargs['data_where'].get(self.game, []) - def handler_filts(self, *filters): + # def handler_filts(self, *filters): + # """ + # :param filters: (filts:list,relation:str) + # :param g_f: + # :param relation: + # :return: + # """ + # + # event_filters = [] + # for filter in filters: + # filts = filter[0] + # relation = filter[1] + # event_filter = [] + # for item in filts: + # + # where = event_filter + # + # col = sa.Column(item['columnName']) + # + # comparator = item['comparator'] + # ftv = item['ftv'] + # if comparator == '==': + # if len(ftv) > 1: + # where.append(or_(*[col == v for v in ftv])) + # else: + # where.append(col == ftv[0]) + # elif comparator == '>=': + # where.append(col >= ftv[0]) + # elif comparator == '<=': + # where.append(col <= ftv[0]) + # elif comparator == '>': + # where.append(col > ftv[0]) + # elif comparator == '<': + # where.append(col < ftv[0]) + # + # elif comparator == 'is not null': + # where.append(col.isnot(None)) + # elif comparator == 'is null': + # where.append(col.is_(None)) + # + # elif comparator == 'like': + # where.append(col.like(f'%{ftv[0]}%')) + # + # elif comparator == 'not like': + # where.append(col.notlike(f'%{ftv[0]}%')) + # + # elif comparator == 'in': + # where.append(col.in_(ftv)) + # + # elif comparator == '!=': + # where.append(col != ftv[0]) + # if relation == 'and': + # if event_filter: + # event_filters.append(and_(*event_filter)) + # else: + # if event_filter: + # event_filters.append(or_(*event_filter)) + # + # return event_filters + + async def handler_filts(self, *filters): """ + :param filters: (filts:list,relation:str) :param g_f: :param relation: @@ -82,10 +150,26 @@ class XAnalysis: for filter in filters: filts = filter[0] relation = filter[1] + user_filter = [] event_filter = [] for item in filts: + comparator = item['comparator'] + if item['tableType'] == 'user': + where = user_filter + elif item['tableType'] == 'event': + where = event_filter + elif item['tableType'] == 'user_label': + user_cluster_def = UserClusterDef(self.game, item['columnName'], self.account_filters) + await user_cluster_def.init() + sub_qry = user_cluster_def.to_sql_qry() + if comparator == 'in': + event_filter.append(sa.Column('#account_id').in_(sub_qry)) + else: + event_filter.append(sa.Column('#account_id').notin_(sub_qry)) - where = event_filter + continue + else: + continue col = sa.Column(item['columnName']) @@ -130,8 +214,7 @@ class XAnalysis: return event_filters - def ltv_model_sql(self): - # ltv的生成sql + async def ltv_model_sql(self): days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days quota = self.event_view['quota'] select_ltv = [] @@ -163,17 +246,16 @@ class XAnalysis: sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_str = sql.split('WHERE ')[1] - where_order = self.handler_filts((self.global_filters, self.global_relation)) #global_relation就是 and + where_order = await self.handler_filts((self.global_filters, self.global_relation)) # global_relation就是 and where_order_str = 1 if where_order: qry = sa.select().where(*where_order) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) - where_order_str = sql.split('WHERE ')[1] + where_order_str = 'WHERE '.join(sql.split('WHERE ')[1:]) - where_account = self.handler_filts((self.account_filters, 'and'), self.ext_filters) + where_account = await self.handler_filts((self.account_filters, 'and'), self.ext_filters) where_account_str = 1 if where_account: - qry = sa.select().where(*where_account) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_account_str = sql.split('WHERE ')[1] @@ -206,4 +288,4 @@ class XAnalysis: 'end_date': self.event_view['endTime'][:10], 'date_range': self.date_range, 'ltv_n': ltv_n - } \ No newline at end of file + }