import calendar import datetime import re from typing import Tuple from copy import deepcopy import arrow import sqlalchemy as sa import json from fastapi import Depends import pandas as pd import numpy as np from sqlalchemy import func, or_, and_, not_ import crud import schemas from core.config import settings from db import get_database from db.redisdb import get_redis_pool, RedisDrive from models.user_label import UserClusterDef from utils import get_week, strptime, start_end_month, strptime1, get_event, get_time class CombinationEvent: def __init__(self, data, string, format): self.data = data self.string = string self.pattern = re.compile('[+\-*/]') self.format = format self.events_name = [] def parse(self): opts = self.pattern.findall(self.string) factors = self.pattern.split(self.string) result = pd.Series(self.data[int(factors[0])]['values'][0]) for i, opt in enumerate(opts): b = pd.Series(self.data[int(factors[i + 1])]['values'][0]) result = settings.ARITHMETIC[opt](result, b).fillna(0) if self.format == 'percent': result = round(result * 100, 2) elif self.format == 'float': result = round(result, 2) elif self.format == 'integer': result = result.astype(int) result.replace(np.inf, 0, inplace=True) return result.to_list(), round(result.sum(), 2), round(result.mean(), 2) class CustomEvent: def __init__(self, tbl, string, format): self.tbl = tbl self.string = string self.pattern = re.compile('[+\-*/]') self.format = format self.events_name = [] def _parse(self, s): m = s.split('.') if len(m) == 3: event_name, attr, comp = m self.events_name.append(event_name) return getattr(func, comp)(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, getattr(self.tbl.c, attr), 0)) elif len(m) == 2: event_name, comp = m self.events_name.append(event_name) # 总次数 if comp == 'total_count': return func.sum(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, 1, 0)) elif comp == 'touch_user_count' or comp == 'touch_device_count': return func.uniqCombined(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, getattr(self.tbl.c, '#account_id'), None)) elif comp == 'touch_user_avg': return func.divide( func.sum(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, 1, 0)), func.uniqCombined(getattr(func, 'if')(getattr(self.tbl.c, '#event_name') == event_name, getattr(self.tbl.c, '#account_id'), None))) elif len(m) == 1: n = int(m[0]) return n def str2obj(self, factors, opts): sel = None for i, factor in enumerate(factors): if i == 0: sel = self._parse(factor) else: tmp = self._parse(factor) sel = settings.ARITHMETIC[opts[i - 1]](sel, tmp) return sel def parse(self): factors = self.pattern.split(self.string) opts = self.pattern.findall(self.string) sel = self.str2obj(factors, opts) decimal = 2 if self.format == 'percent': sel = sel * 100 elif format == 'integer': decimal = 0 elif format == 'float': decimal = 2 sel = func.round(sel, decimal).label('values') res = { 'event_name': self.events_name, 'select': sel } return res class BehaviorAnalysis: def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)): self.game = game self.rdb = rdb self.user_tbl = None self.event_tbl = None self.data_in = data_in self.group_label = {} self.event_view = dict() self.events = [dict()] self.time = None self.zone_time: int = 0 self.start_date = None self.end_date = None self.global_filters = None self.groupby = None self.time_particle = None self.date_range = None self.unit_num = None self.report_name = None self.combination_event = [] self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) self.global_relation = 'and' self.data_where = [] async def init(self, *args, **kwargs): # 在自动获取看板和报表信息的时候会走这里 if self.data_in.report_id: db = get_database() report = await crud.report.get(db, id=self.data_in.report_id) self.event_view = report['query']['eventView'] self.events = report['query']['events'] if self.event_view.get('date_type') == 'static': pass else: try: e_days = self.event_view['e_days'] s_days = self.event_view['s_days'] except: # 兼容以前的 e_days, s_days = self.event_view['recentDay'].split('-') # 根据本地电脑时间,按时间范围自动适应当前的时间范围 #self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') #self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') self.event_view['endTime'] = arrow.now('Asia/Shanghai').shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') self.event_view['startTime'] = arrow.now('Asia/Shanghai').shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') self.event_view['startTime'] = self.data_in.ext_filter.get('startTime') or self.event_view['startTime'] self.event_view['endTime'] = self.data_in.ext_filter.get('endTime') or self.event_view['endTime'] self.report_name = report["name"] else: self.event_view = self.data_in.eventView self.events = self.data_in.events await self._init_table() self.zone_time = self._get_zone_time() self.time_particle = self._get_time_particle_size() self.start_date, self.end_date, self.date_range = self._get_date_range() self.groupby = self._get_group_by() self.global_filters = self._get_global_filters() self.unit_num = self._get_unit_num() self.global_relation = self.event_view.get('relation', 'and') # 用户自带过滤 if 'data_where' in kwargs: self.data_where = kwargs['data_where'].get(self.game, []) self.global_filters.extend(self.data_where) # self.global_filters.extend(self.data_in.ext_filter.get('filts', [])) def _get_time_particle_size(self): return self.event_view.get('timeParticleSize') or 'P1D' def _get_unit_num(self): return self.event_view.get('unitNum') def _get_group_by(self): res = [] # 存在删选条件 groupBy = self.event_view.get('groupBy', []) if not groupBy: return [] for idx, item in enumerate(groupBy): # 如果是标签 if item['data_type'] == 'user_label': item.update({ 'comparator': "in", 'comparator_name': "是", 'tableType': "user_label" }) # 加入分组标签 self.group_label.update({item['columnDesc']: idx}) # 事件分析,分布分析 if self.event_view['cksql'] in ['event', 'scatter']: # 加入events中每个event的filts条件中 if self.events: for i in self.events: i['filts'].append(item) # 留存分析,漏斗分析 if self.event_view['cksql'] in ['retention', 'funnel']: # 加入全局删选filts条件中 self.event_view['filts'].append(item) continue # 不是标签加入分组项中 res.append(getattr(self.event_tbl.c, item['columnName'])) return res # return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])] def _get_zone_time(self): return int(self.event_view.get('zone_time', 8)) def _get_date_range(self) -> Tuple[str, str, list]: start_date: str = self.event_view.get('startTime') end_date: str = self.event_view.get('endTime') if self.time_particle == 'HOUR': date_range = [i for i in range(24)] return start_date, end_date, date_range date_range = pd.date_range(start_date, end_date, freq=settings.PROPHET_TIME_GRAIN_MAP[self.time_particle], tz='UTC').tolist() if self.time_particle in ('P1D', 'P1W', 'P1M'): date_range = [item.date() for item in date_range] # start_date = date_range[0].strftime('%Y-%m-%d') # end_date = date_range[-1].strftime('%Y-%m-%d') return start_date, end_date, date_range def _get_global_filters(self): _res = [] if self.event_view.get('filts'): _res = self.event_view.get('filts') # 漏斗分析,特殊处理 if self.event_view['cksql'] in ['funnel']: _trueRes = [] for i in _res: if 'comparator' in i: trueI = deepcopy(i) _trueRes.append(trueI) continue comparator = i.pop('comparator_id') comparatorName = i.pop('comparator_name') i.update({ 'comparator': comparator, 'comparatorName': comparatorName, 'tableType': "user_label" }) trueI = deepcopy(i) _trueRes.append(trueI) _res = _trueRes return _res async def _init_table(self): """ 从redis中取出表字段,构建表结构 :return: """ res_json = await self.rdb.get(f'{self.game}_user') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) res_json = await self.rdb.get(f'{self.game}_event') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) # self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns]) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) async def handler_trace_filts(self, *filters): """ :param filters: (filts:list,relation:str) :param g_f: :param relation: :return: """ user_filters = [] event_filters = [] for filter in filters: user_filter = [] event_filter = [] for item in filter: comparator = item['comparator_id'] if item['tableType'] == 'user': where = user_filter elif item['tableType'] == 'event': where = event_filter elif item['tableType'] == 'user_label': user_cluster_def = UserClusterDef(self.game, item['columnName'], self.data_where) await user_cluster_def.init() sub_qry = user_cluster_def.to_sql_qry() if comparator == 'in': event_filter.append(sa.Column('#account_id').in_(sub_qry)) else: event_filter.append(sa.Column('#account_id').notin_(sub_qry)) continue else: continue tbl = getattr(self, f'{item["tableType"]}_tbl') col = getattr(tbl.c, item['columnName']) # 日期类型处理时区 if item.get('data_type') == 'datetime': col = func.addHours(col, self.zone_time) ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: where.append(or_(*[col == v for v in ftv])) else: where.append(col == ftv[0]) elif comparator == '>=': where.append(col >= ftv[0]) elif comparator == '<=': where.append(col <= ftv[0]) elif comparator == '>': where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) elif comparator == 'range': where.append(col > ftv[0]) where.append(col <= ftv[1]) elif comparator == 'is not null': where.append(col.isnot(None)) elif comparator == 'is null': where.append(col.is_(None)) elif comparator == 'like': where.append(col.like(f'%{ftv[0]}%')) elif comparator == 'not like': where.append(col.notlike(f'%{ftv[0]}%')) elif comparator == 'in': where.append(col.in_(ftv)) elif comparator == '!=': where.append(col != ftv[0]) else: if event_filter: event_filters.append(and_(*event_filter)) if user_filter: user_filters.append(and_(*user_filter)) return event_filters, user_filters # 根据筛选的where条件生成对应SQL async def handler_filts(self, *filters, nu=0): """ :param filters: (filts:list,relation:str) :param g_f: :param relation: :return: """ # 事件,留存,分布,漏斗分析生成sql经过, user_filters = [] event_filters = [] for filter in filters: filts = filter[0] relation = filter[1] user_filter = [] event_filter = [] for item in filts: comparator = item['comparator'] if item['tableType'] == 'user': where = user_filter elif item['tableType'] == 'event': where = event_filter elif item['tableType'] == 'user_label': user_cluster_def = UserClusterDef(self.game, item['columnName'], self.data_where) await user_cluster_def.init() sub_qry = user_cluster_def.to_sql_qry() if comparator == 'in': event_filter.append(sa.Column('#account_id').in_(sub_qry)) else: event_filter.append(sa.Column('#account_id').notin_(sub_qry)) continue else: continue # 表名 tbl = getattr(self, f'{item["tableType"]}_tbl') # 字段名 col = getattr(tbl.c, item['columnName']) # 判断是否是同一个事件 yuan_event = self.events[nu].get('eventName') or self.events[nu].get('event_name') # 指标中的事件名 biao_event = self.events[nu].get('customEvent', '').split('.')[0] event = await get_event(item['columnName'], self.game) # 获取对应事件名或基础属性 if event != yuan_event and event != biao_event and event != '基础属性' and self.game != 'debug': event_time_col = getattr(self.event_tbl.c, '#event_time') event_name_col = getattr(self.event_tbl.c, '#event_name') base_where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] event_name_where = [] event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event)) ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: event_name_where.append(or_(*[col == v for v in ftv])) else: event_name_where.append(col == ftv[0]) elif comparator == '>=': event_name_where.append(col >= ftv[0]) elif comparator == '<=': event_name_where.append(col <= ftv[0]) elif comparator == '>': event_name_where.append(col > ftv[0]) elif comparator == '<': event_name_where.append(col < ftv[0]) elif comparator == 'range': event_name_where.append(col > ftv[0]) event_name_where.append(col <= ftv[1]) elif comparator == 'is not null': event_name_where.append(col.isnot(None)) elif comparator == 'is null': event_name_where.append(col.is_(None)) elif comparator == 'like': event_name_where.append(col.like(f'%{ftv[0]}%')) elif comparator == 'not like': event_name_where.append(col.notlike(f'%{ftv[0]}%')) elif comparator == 'in': event_name_where.append(col.in_(ftv)) elif comparator == '!=': event_name_where.append(col != ftv[0]) sub_qry = sa.select(sa.Column('#account_id')).select_from( sa.select(sa.Column('#account_id')).where(and_(*base_where, *event_name_where)) ) event_filter.append(sa.Column('#account_id').in_(sub_qry)) continue # 日期类型处理时区 if item.get('data_type') == 'datetime': col = func.addHours(col, self.zone_time) ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: where.append(or_(*[col == v for v in ftv])) else: where.append(col == ftv[0]) elif comparator == '>=': where.append(col >= ftv[0]) elif comparator == '<=': where.append(col <= ftv[0]) elif comparator == '>': where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) elif comparator == 'range': where.append(col > ftv[0]) where.append(col <= ftv[1]) elif comparator == 'is not null': where.append(col.isnot(None)) elif comparator == 'is null': where.append(col.is_(None)) elif comparator == 'like': where.append(col.like(f'%{ftv[0]}%')) elif comparator == 'not like': where.append(col.notlike(f'%{ftv[0]}%')) elif comparator == 'in': where.append(col.in_(ftv)) elif comparator == '!=': where.append(col != ftv[0]) if relation == 'and': if event_filter: event_filters.append(and_(*event_filter)) if user_filter: user_filters.append(and_(*user_filter)), else: if event_filter: event_filters.append(or_(*event_filter)) if user_filter: user_filters.append(or_(*user_filter)) return event_filters, user_filters async def retention_model_sql(self): event_name_a = self.events[0]['eventName'] event_name_b = self.events[1]['eventName'] visit_name = self.events[0].get('event_attr_id') event_time_col = getattr(self.event_tbl.c, '#event_time') event_name_col = getattr(self.event_tbl.c, '#event_name') e_account_id_col = getattr(self.event_tbl.c, '#account_id') u_account_id_col = getattr(self.user_tbl.c, '#account_id') date_col = sa.Column('date') who_visit = e_account_id_col if visit_name: who_visit = getattr(self.event_tbl.c, visit_name) filters, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')), self.ext_filters) filters = filters or [1] selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'), *self.groupby, func.arrayDistinct( (func.groupArray( func.if_(func.and_(event_name_col == event_name_a, *filters), who_visit, None)))).label( 'val_a'), func.length(sa.Column('val_a')).label('amount_a'), func.length(sa.Column('val_b')).label('amount_b'), ] if event_name_b == '*': val_b = func.arrayDistinct( (func.groupArray(func.if_(1, who_visit, None)))).label('val_b'), selectd.insert(-2, *val_b) else: val_b = func.arrayDistinct( (func.groupArray(func.if_(event_name_col == event_name_b, who_visit, None)))).label('val_b'), selectd.insert(-2, *val_b) base_where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] event_filter, user_filter = await self.handler_filts( (self.global_filters, self.global_relation), self.ext_filters ) groupby = [date_col] + self.groupby oredrby = [date_col] if user_filter: qry = sa.select(selectd).select_from( self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where( and_(*user_filter, *event_filter, *base_where)).group_by(*groupby).order_by( *oredrby).limit(100000) else: qry = sa.select(selectd).where(and_(*base_where, *event_filter)).group_by(*groupby).order_by( *oredrby).limit(100000) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return {'sql': sql, 'groupby': ['date'] + [i.key for i in self.groupby], 'date_range': self.date_range, 'event_name': [event_name_a, event_name_b], 'unit_num': self.unit_num, 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } async def event_model_sql(self): """事件分析生成sql会经过""" sqls = [] event_time_col = getattr(self.event_tbl.c, '#event_time') event_date_col = settings.TIME_GRAIN_EXPRESSIONS['P1M'](event_time_col, self.zone_time)#按月的 for idx, event in enumerate(self.events): operator_ = event.get('operator_val', '') # 排头显示名 event_name_display = event.get('eventNameDisplay') is_show = event.get('is_show', True) select_exprs = [] if self.time_particle != 'total': select_exprs.append( settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time)) base_where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] event_name_col = getattr(self.event_tbl.c, '#event_name') format = event.get('format') or 'float' # 兼容以前的结构 if event.get('customEvent'): event['customType'] = event.get('customType') or 'formula' if event.get('customType') == 'formula': if event.get('customEvent'): # 组合公式的内容 formula = event.get('customEvent') custom = CustomEvent(self.event_tbl, formula, format).parse() event_name = custom['event_name'] where = [event_name_col.in_(event_name)] event_filter, _ = await self.handler_filts((event['filts'], event.get('relation')), (self.global_filters, self.global_relation), self.ext_filters, nu=idx ) select_exprs.extend(self.groupby) if event_name_display == '充值总额': qry = sa.select( custom['select'] ).where(*where,*event_filter) elif '月充总额' in event_name_display: times = self.start_date.split('-') start_date = times[0] + '-' + times[1] + '-' + '01 00:00:00' now = get_time('%Y-%m-%d %H:%M:%S') _sp_end = self.end_date[0:7] _now = now[0:7] if self.end_date < now: if _sp_end == _now: # 如果结束时间和现在时间是一个月,则取今天为结束时间 times = now.split(' ') end_date = times[0] + ' 23:59:59' else: # 如果是之前月份,则算出那个月的最后一天 data = self.end_date.split(' ')[0].split('-') year = int(data[0]) month = int(data[1]) weekDay, monthCountDay = calendar.monthrange(year, month) time = str(datetime.date(year, month, day=monthCountDay)) end_date = time + ' 23:59:59' else: # 结束时间大于现在的时间,则取今天的时间 times = now.split(' ') end_date = times[0] + ' 23:59:59' base_time = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] qry = sa.select(event_date_col, custom['select'] ).where(*base_time, *where, *event_filter)#.group_by(event_date_col) # qry = sa.select(event_date_col, *self.groupby, values_col) \ # .where(and_(*where)) \ # .group_by(event_date_col, *self.groupby, e_account_id_col) else: qry = sa.select( *select_exprs, custom['select'] ).where(*base_where, *where, *event_filter) # 指标组合计算 elif event.get('customType') == 'combination': sqls.append({'combination_event': event.get('customEvent'), 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], 'event_name': event.get('eventNameDisplay'), 'format': event.get('format') or 'float', 'date_range': self.date_range, 'is_show': is_show, } ) continue else: event_name = event['event_name'] select_exprs += self.groupby if event_name != '*': base_where.append(event_name_col == event_name) analysis = event['analysis'] if analysis == 'distinct_count': # 事件分析-int类型去重数用到 analysis='uniqExact' event_filter, user_filter = await self.handler_filts( (event['filts'], event.get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters, nu=idx ) u_account_id_col = getattr(self.user_tbl.c, '#account_id') # 按账号聚合 e_account_id_col = getattr(self.event_tbl.c, '#account_id') if operator_ == '': # 聚合方式 if analysis == 'total_count': selectd = select_exprs + [func.count().label('values')] elif analysis == 'touch_user_count' or analysis == 'touch_device_count': selectd = select_exprs + [func.count(sa.distinct(e_account_id_col)).label('values')] elif analysis == 'touch_user_avg': selectd = select_exprs + [ func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2).label( 'values')] else: selectd = select_exprs + [ func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2).label( 'values')] else: operator_val = int(operator_) operator = event['operator'] # 运算符号 if analysis == 'total_count': selectd = select_exprs + [ settings.ARITHMETIC[operator](func.count(), operator_val).label('values')] elif analysis == 'touch_user_count' or analysis == 'touch_device_count': selectd = select_exprs + [ settings.ARITHMETIC[operator](func.count(sa.distinct(e_account_id_col)), operator_val).label('values')] elif analysis == 'touch_user_avg': selectd = select_exprs + [ settings.ARITHMETIC[operator]( func.round((func.count() / func.count(sa.distinct(e_account_id_col))), 2), operator_val).label( 'values')] else: selectd = select_exprs + [ settings.ARITHMETIC[operator]( func.round(getattr(func, analysis)(getattr(self.event_tbl.c, event['event_attr_id'])), 2), operator_val).label( 'values')] if user_filter: qry = sa.select(selectd).select_from( self.event_tbl.join(self.user_tbl, u_account_id_col == e_account_id_col)).where( and_(*user_filter, *event_filter, *base_where)) else: qry = sa.select(selectd).where(and_(*event_filter, *base_where)) if '月充总额' in event_name_display: qry = qry.group_by(event_date_col) else: qry = qry.group_by(*select_exprs) if self.time_particle != 'total': qry = qry.order_by(sa.Column('date')) else: qry = qry.order_by(sa.Column('values').desc()) qry = qry.limit(100000) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) # 新增付费设备数单独进行sql处理 if event_name_display == '新增付费设备数': if "huixie" in self.game: check_event = "new_device" else: check_event = "create_account" stat_date = self.start_date end_date = self.end_date game = self.game # 截取原始sql中动态条件部分 aa = r'anon_1(.*?)addHours' dt_where = re.findall(aa, str(sql)) dt_where2 = (dt_where[0]).split("AND ") if len(dt_where2) > 2: bb = "" for i in range(len(dt_where2) - 1): if i > 0: bb += " AND " + dt_where2[i] else: bb = "" sql = f""" select p.date ,round(uniqExact(p."#distinct_id"),2) AS values from (select toDate(addHours({game}.event."#event_time", 8)) as date,"#distinct_id" from {game}.event where `#event_name` = 'pay' and addHours({game}.event.`#event_time`, 8) >= '{stat_date}' and addHours({game}.event.`#event_time`, 8) <= '{end_date}' {bb} group by date, "#distinct_id") p inner join (SELECT toDate(addHours({game}.event."#event_time", 8)) as date,"#distinct_id" FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{stat_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = '{check_event}' AND {game}.event.is_new_device = '1' {bb}) d on p."#distinct_id" = d."#distinct_id" and p.date = d.date group by date """ #充值总额 if event_name_display == '充值总额': sql=sql.replace("""GROUP BY toDate(addHours(huixie.event."#event_time", 8)) ORDER BY date LIMIT 100000""",'',1) print(sql) # 单独付费率的拿出来 if event.get('customEvent') == 'pay.touch_user_count/login.touch_user_count': stat_date = self.start_date end_date = self.end_date game = self.game if len(event_filter) > 0: where_fil = str(and_(*event_filter, *base_where).compile(compile_kwargs={"literal_binds": True})) sql = f""" select aa.date as date,round((a/b)*100,2) as values from (select toDate(addHours({game}.event."#event_time", 8)) AS date,uniqCombined(if({game}.event."#event_name" = 'pay', {game}.event."#account_id", NULL)) as a from {game}.event WHERE {where_fil} AND {game}.event."#event_name"='pay' and orderid NOT LIKE '%GM%' GROUP BY toDate(addHours({game}.event."#event_time", 8))) as aa LEFT join (SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, round(uniqExact({game}.event."#account_id"), 2) AS b FROM {game}.event WHERE {where_fil} GROUP BY toDate(addHours({game}.event."#event_time", 8))) as bb on aa.date = bb.date ORDER by date """ else: sql = f""" select aa.date as date,round((a/b)*100,2) as values from (select toDate(addHours({game}.event."#event_time", 8)) AS date,uniqCombined(if({game}.event."#event_name" = 'pay', {game}.event."#account_id", NULL)) as a from {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{stat_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name"='pay' and orderid NOT LIKE '%GM%' GROUP BY toDate(addHours({game}.event."#event_time", 8))) as aa LEFT join (SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, round(uniqExact({game}.event."#account_id"), 2) AS b FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{stat_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' GROUP BY toDate(addHours({game}.event."#event_time", 8))) as bb on aa.date = bb.date ORDER by date """ # 单独设备付费率的拿出来 if event.get('customEvent') == 'pay.touch_device_count/login.touch_device_count': stat_date = self.start_date end_date = self.end_date game = self.game if len(event_filter) > 0: where_fil = str(and_(*event_filter, *base_where).compile(compile_kwargs={"literal_binds": True})) sql = f""" select toDate(addHours({game}.event."#event_time", 8)) AS date,round((uniqCombined(if({game}.event."#event_name" = 'pay',{game}.event.`#distinct_id`,NULL)) / uniqCombined(if({game}.event."#event_name" = 'login',{game}.event."#distinct_id",NULL))) * 100,2) AS values FROM {game}.event WHERE {where_fil} AND {game}.event."#event_name" IN ('pay', 'login') GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date LIMIT 100000 """ else: sql = f""" select toDate(addHours({game}.event."#event_time", 8)) AS date,round((uniqCombined(if({game}.event."#event_name" = 'pay',{game}.event.`#distinct_id`,NULL)) / uniqCombined(if({game}.event."#event_name" = 'login',{game}.event."#distinct_id",NULL))) * 100,2) AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{stat_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" IN ('pay', 'login') GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date LIMIT 100000 """ # 单独把新增付费人数(以设备为维度)拿出来 if event.get('event_attr') == '触发用户数' and ['is_new_device', 'orderid'] == [i['columnName'] for i in event.get('filts')]: stat_date = self.start_date end_date = self.end_date game = self.game sql = f"""SELECT toDate(addHours("#event_time", 8)) as date, round(uniqExact("#distinct_id"), 2) AS values FROM (SELECT toDate(addHours("#event_time", 8)) as date,"#event_time",`#event_name`,`#distinct_id`,`#account_id` from {game}.event WHERE addHours("#event_time", 8) >= '{stat_date}' AND addHours("#event_time", 8) <= '{end_date}' and `#event_name` = 'pay' and orderid NOT LIKE '%GM%') a inner join (SELECT toDate(addHours("#event_time", 8)) as date,"#event_time",is_new_device,`#distinct_id`,`#event_name`,`#account_id` from {game}.event WHERE addHours("#event_time", 8) >= '{stat_date}' AND addHours("#event_time", 8) <= '{end_date}' and `#event_name` = 'create_account' and is_new_device = 1) b on a.`#distinct_id`= b.`#distinct_id` and a.date = b.date GROUP BY toDate(addHours("#event_time", 8))""" # 排行榜前100名提取优化处理 if event_name_display == '充值排行': game = self.game # 截取原始sql中动态条件部分 aaa = r'WHERE(.*?)GROUP' bbb = re.findall(aaa, str(sql)) dt_where = bbb[0] sql = f""" SELECT toDate(addHours({game}.event."#event_time", 8)) AS date,{game}.event."#account_id", round(sum(if({game}.event."#event_name" = 'pay', {game}.event."unitPrice", 0)) / 100,2) AS values FROM {game}.event WHERE {dt_where} and {game}.event.`#account_id` in ( SELECT {game}.event."#account_id" FROM {game}.event WHERE {dt_where} GROUP BY {game}.event."#account_id" ORDER BY round(sum(if({game}.event."#event_name" = 'pay', {game}.event."unitPrice", 0)) / 100,2) AS values desc LIMIT 100) GROUP BY toDate(addHours({game}.event."#event_time", 8)),{game}.event."#account_id" ORDER BY date LIMIT 100000""" sqls.append({'sql': sql, 'group_label': self.group_label, 'groupby': [i.key for i in self.groupby], 'date_range': self.date_range, 'event_name': event_name_display or event_name, 'format': format, 'report_name': self.report_name or 'temp', 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], 'is_show': is_show, }) return sqls # 在漏斗分析,事件分析模型里面都有用到这块 async def funnel_model_sql(self): """ SELECT level, count(*) AS values FROM (SELECT windowFunnel(86400)(shjy.event."#event_time", shjy.event."#event_name" = 'create_role', shjy.event."#event_name" = 'login') AS level FROM shjy.event WHERE addHours(shjy.event."#event_time", 8) >= '2021-05-16 00:00:00' AND addHours(shjy.event."#event_time", 8) <= '2021-06-14 23:59:59' GROUP BY shjy.event."#account_id") AS anon_1 GROUP BY level ORDER BY level :return: """ windows_gap = self.event_view['windows_gap'] * 86400 event_time_col = getattr(self.event_tbl.c, '#event_time') event_name_col = getattr(self.event_tbl.c, '#event_name') date_col = func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date') e_account_id_col = getattr(self.event_tbl.c, '#account_id') sub_group = [date_col, *self.groupby, e_account_id_col] conds = [] cond_level = [] for item in self.events: event_filter, _ = await self.handler_filts((item['filts'], item.get('relation', 'and')) , self.ext_filters) conds.append( and_(event_name_col == item['eventName'], *event_filter) ) cond_level.append(item['eventName']) # todo 替换 _windows_gap_ subq = sa.select(*[sa.Column(i.key) for i in self.groupby], date_col, func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from( self.event_tbl) g_event_filter, _ = await self.handler_filts((self.global_filters, self.global_relation) , self.ext_filters) where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, *g_event_filter ] subq = subq.where(and_(*where)).group_by(*sub_group) subq = subq.subquery() qry = sa.select(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level'), func.count().label('values')).select_from(subq) \ .where(sa.Column('level') > 0) \ .group_by(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level')) \ .order_by(sa.Column('date'), *[sa.Column(i.key) for i in self.groupby], sa.Column('level')) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) # sql = sql.replace('_windows_gap_', f"({windows_gap},'strict_increase')") sql = sql.replace('_windows_gap_', f"({windows_gap})") print(sql) return {'sql': sql, 'group_label': self.group_label, 'groupby': [i.key for i in self.groupby], 'date_range': self.date_range, 'cond_level': cond_level, 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } async def scatter_model_sql(self): # 分布分析生成sql event = self.events[0] event_name = event['eventName'] analysis = event['analysis'] if analysis in ['list_distinct', "set_distinct", "ele_distinct"]: analysis = 'max' e_account_id_col = getattr(self.event_tbl.c, '#account_id').label('uid') u_account_id_col = getattr(self.user_tbl.c, '#account_id') event_name_col = getattr(self.event_tbl.c, '#event_name') event_time_col = getattr(self.event_tbl.c, '#event_time').label('date') event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time) quota_interval_arr = event.get('quotaIntervalArr') time = self.data_in.time global where # 判断是分布分析里面的分组详情,改时间范围,其他情况都走else if time != None and time != '合计': timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间 if timeParticleSize == 'P1W': # 按周 start_date, end_date = get_week(time) if start_date < strptime(self.start_date): # 开头的时间 where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] elif end_date < strptime(self.end_date): # 中间的时间 where = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] else: # 结尾的时间 where = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] elif timeParticleSize == 'P1M': # 按月 start_date, end_date = start_end_month(time) if strptime(self.start_date) > strptime1(time): where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] if event_name != '*': where.append(event_name_col == event_name) event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) if user_filter: where.append(e_account_id_col.in_(sa.select(u_account_id_col).where(*user_filter))) where.extend(event_filter) values_col = func.count().label('values') if analysis in ['number_of_days', 'number_of_hours']: values_col = func.count(func.distinct(e_account_id_col)).label('values') if analysis in ['times', 'number_of_days', 'number_of_hours', 'sum', 'avg', 'median', 'max', 'min', 'distinct_count']: if self.time_particle == 'total': qry = sa.select(*self.groupby, values_col) \ .where(and_(*where)) \ .group_by(*self.groupby, e_account_id_col) else: qry = sa.select(event_date_col, *self.groupby, values_col) \ .where(and_(*where)) \ .group_by(event_date_col, *self.groupby, e_account_id_col) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return { 'sql': sql, 'group_label': self.group_label, 'interval_type': event['intervalType'], 'analysis': analysis, 'quota_interval_arr': quota_interval_arr, 'groupby': [i.key for i in self.groupby], 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } elif event.get('quota'): event_attr_col = getattr(self.event_tbl.c, event['quota']) if self.time_particle == 'total': if analysis == 'uniqExact': # 去重数 合计 qry = sa.select(e_account_id_col, event_attr_col.label('values')) \ .where(and_(*where)) \ .group_by(*self.groupby, e_account_id_col, event_attr_col) else: qry = sa.select(e_account_id_col, settings.CK_FUNC[analysis](event_attr_col).label('values')) \ .where(and_(*where)) \ .group_by(*self.groupby, e_account_id_col) else: if analysis == 'uniqExact': # 去重数 qry = sa.select(event_date_col, e_account_id_col, event_attr_col.label('values')) \ .where(and_(*where)) \ .group_by(event_date_col, e_account_id_col, event_attr_col) else: qry = sa.select(event_date_col, e_account_id_col, settings.CK_FUNC[analysis](event_attr_col).label('values')) \ .where(and_(*where)) \ .group_by(event_date_col, e_account_id_col) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) columnName = event.get('label_id', '') if columnName != '': sql = sql.replace('SELECT', f'SELECT {columnName},', 1) sql += f',{columnName}' print(sql) return { 'sql': sql, 'group_label': self.group_label, 'interval_type': event['intervalType'], 'analysis': analysis, 'quota_interval_arr': quota_interval_arr, 'groupby': [i.key for i in self.groupby], 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } async def first_event_model_sql(self): # 事件首次触发 event_start = self.events[0] # 起始事件 event_end = self.events[1] # 查询事件 start_event_name = event_start.get('eventName', 'create_account') end_event_name = event_end.get('eventName', 'pay') diff_dict = { 'year': 'year', 'P1M': 'year', 'P1D': 'year', 'PT1H': 'year', 'PT1M': 'minute', 'second': 'second' } view_diff = self.event_view.get('event_diff', 'PT1M') # 类型选择 'year','month','day','hour','minute','second' sql_diff = diff_dict.get(view_diff, 'minute') # end_where = event_end.get('filts', []) # end_where_str = '' # if end_where: # for i in end_where: """ /*目前版本为clickhouse 20.12.5.14 * dateDiff函数是将两个日期相减后得到精确的的时间差值,后上取整 * */ SELECT /*计算 年份 这里两个日期差值大于0年但小于1年 结果为1*/ dateDiff('year', toDateTime('2020-11-01 00:00:00'), toDateTime('2021-02-01 00:00:30')) AS year, /*计算 月份 这里两个日期差值大于1个月但未到2个月 结果为2*/ dateDiff('month', toDateTime('2021-01-21 00:00:00'), toDateTime('2021-03-02 00:00:30')) AS month, /*计算 天 这里两个日期差值大于0天但未到1天 结果为1*/ dateDiff('day', toDateTime('2021-01-01 10:00:00'), toDateTime('2021-01-02 01:00:30')) AS day, /*计算 小时 这里两个日期差值大于1小时但未到1小时 结果为2*/ dateDiff('hour', toDateTime('2021-01-01 00:40:00'), toDateTime('2021-01-01 02:10:30')) AS hour, /*计算 分钟 这里两个日期差值大于1分钟但未到2分钟 结果为2*/ dateDiff('minute', toDateTime('2021-01-01 10:10:40'), toDateTime('2021-01-01 10:12:12')) AS minute, /*计算 毫秒 这里两个日期差值 结果为-10*/ dateDiff('second', toDateTime('2021-01-01 00:00:40'), toDateTime('2021-01-01 00:00:30')) AS second; """ # 待优化 sql = f"""select dateDiff({sql_diff}, a.`#event_time`, b.`#event_time`) as dff_time, count() as values from(select `#event_time`, `#account_id` from {self.game}.event where `#event_name` == '{start_event_name}') a, (select `#event_time`, `#account_id` from {self.game}.event where `#event_name` == '{end_event_name}' and addHours(`#event_time`, {self.zone_time}) >= '{self.start_date}' and addHours(`#event_time`, {self.zone_time}) <= '{self.end_date}' and islishishouci == 1) b where a.`#account_id` == b.`#account_id` group by dff_time """ print(sql) return { 'sql': sql, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10] } async def guide_model_sql(self): # 事件步骤生成sql event = self.events[0] event_name = event['eventName'] analysis = event['analysis'] if analysis in ['list_distinct', "set_distinct", "ele_distinct"]: analysis = 'max' e_account_id_col = getattr(self.event_tbl.c, '#account_id').label('uid') u_account_id_col = getattr(self.user_tbl.c, '#account_id') event_name_col = getattr(self.event_tbl.c, '#event_name') event_time_col = getattr(self.event_tbl.c, '#event_time').label('date') event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time) quota_interval_arr = event.get('quotaIntervalArr') time = self.data_in.time global where # 判断是分布分析里面的分组详情,改时间范围,其他情况都走else if time != None and time != '合计': timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间 if timeParticleSize == 'P1W': # 按周 start_date, end_date = get_week(time) if start_date < strptime(self.start_date): # 开头的时间 where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] elif end_date < strptime(self.end_date): # 中间的时间 where = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] else: # 结尾的时间 where = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] elif timeParticleSize == 'P1M': # 按月 start_date, end_date = start_end_month(time) if strptime(self.start_date) > strptime1(time): where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, func.addHours(event_time_col, self.zone_time) <= self.end_date, ] if event_name != '*': where.append(event_name_col == event_name) event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) if user_filter: where.append(e_account_id_col.in_(sa.select(u_account_id_col).where(*user_filter))) where.extend(event_filter) values_col = func.count().label('values') if analysis in ['number_of_days', 'number_of_hours']: values_col = func.count(func.distinct(e_account_id_col)).label('values') if analysis: if self.time_particle == 'total': qry = sa.select(*self.groupby, analysis, values_col) \ .where(and_(*where)) \ .group_by(*self.groupby, analysis, e_account_id_col) else: qry = sa.select(event_date_col, *self.groupby, values_col) \ .where(and_(*where)) \ .group_by(event_date_col, *self.groupby, e_account_id_col) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) sqla = sql.replace('SELECT', f'SELECT `#account_id`, {analysis}, ', 1) sqlb = sqla.replace('GROUP BY', f'GROUP BY `#account_id`, {analysis}, ', 1) sqlc = sqlb.replace('WHERE', f'WHERE {analysis} is not null AND ', 1) print(sqlc) return { 'sql': sqlc, 'group_label': self.group_label, 'interval_type': event['intervalType'], 'analysis': analysis, 'quota_interval_arr': quota_interval_arr, 'groupby': [i.key for i in self.groupby], 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } elif event.get('quota'): event_attr_col = getattr(self.event_tbl.c, event['quota']) if self.time_particle == 'total': if analysis == 'uniqExact': # 去重数 合计 qry = sa.select(e_account_id_col, event_attr_col.label('values')) \ .where(and_(*where)) \ .group_by(*self.groupby, e_account_id_col, event_attr_col) else: qry = sa.select(e_account_id_col, settings.CK_FUNC[analysis](event_attr_col).label('values')) \ .where(and_(*where)) \ .group_by(*self.groupby, e_account_id_col) else: if analysis == 'uniqExact': # 去重数 qry = sa.select(event_date_col, e_account_id_col, event_attr_col.label('values')) \ .where(and_(*where)) \ .group_by(event_date_col, e_account_id_col, event_attr_col) else: qry = sa.select(event_date_col, e_account_id_col, settings.CK_FUNC[analysis](event_attr_col).label('values')) \ .where(and_(*where)) \ .group_by(event_date_col, e_account_id_col) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) columnName = event.get('label_id', '') if columnName != '': sql = sql.replace('SELECT', f'SELECT {columnName},', 1) sql += f',{columnName}' print(sql) return { 'sql': sql, 'group_label': self.group_label, 'interval_type': event['intervalType'], 'analysis': analysis, 'quota_interval_arr': quota_interval_arr, 'groupby': [i.key for i in self.groupby], 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } async def trace_model_sql(self): # 路径分析生成SQL session_interval = self.event_view.get('session_interval') session_type = self.event_view.get('session_type') session_type_map = { 'minute': 60, 'second': 1, 'hour': 3600 } interval_ts = session_interval * session_type_map.get(session_type, 60) event_names = self.events.get('event_names') source_event = self.events.get('source_event', {}).get('eventName') source_type = self.events.get('source_event', {}).get('source_type') filters = self.events.get('user_filter', {}).get('filts', []) event_filter, user_filter = await self.handler_trace_filts(filters) where_a = '1' if event_filter: qry = sa.select().where(*event_filter) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_a = 'WHERE '.join(sql.split('WHERE ')[1:]) where_b = '1' if user_filter: qry = sa.select().where(*user_filter) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_b = sql.split('WHERE ')[1] sql_a = f"""with '{source_event}' as start_event, {tuple(event_names)} as evnet_all, '{self.start_date}' as start_data, '{self.end_date}' as end_data select event_chain,`#account_id`, count() as values from (with toUInt32(minIf(`#event_time`, `#event_name` = start_event)) AS start_event_ts, arraySort( x -> x.1, arrayFilter( x -> x.1 >= start_event_ts, groupArray((toUInt32(`#event_time`), `#event_name`)) ) ) AS sorted_events, arrayEnumerate(sorted_events) AS event_idxs, arrayFilter( (x, y, z) -> z.1 >= start_event_ts and ((z.2 = start_event and y > {interval_ts}) or y > {interval_ts}) , event_idxs, arrayDifference(sorted_events.1), sorted_events ) AS gap_idxs, arrayMap(x -> x, gap_idxs) AS gap_idxs_, arrayMap(x -> if(has(gap_idxs_, x), 1, 0), event_idxs) AS gap_masks, arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events select `#account_id`, arrayJoin(split_events) AS event_chain_, arrayMap(x -> x.2, event_chain_) AS event_chain, has(event_chain, start_event) AS has_midway_hit from (select `#event_time`, `#event_name`, `#account_id` from {self.game}.event where addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and `#event_name` in evnet_all and {where_a}) group by `#account_id` HAVING has_midway_hit = 1 ) where arrayElement(event_chain, 1) = start_event and "#account_id" IN (SELECT "#account_id" FROM (SELECT "#account_id" FROM {self.game}.user_view WHERE {where_b}) AS anon_b) GROUP BY event_chain,`#account_id` ORDER BY values desc """ sql_b = f"""with '{source_event}' as end_event, {tuple(event_names)} as evnet_all, '{self.start_date}' as start_data, '{self.end_date}' as end_data select event_chain,`#account_id`, count() as values from (with toUInt32(maxIf(`#event_time`, `#event_name` = end_event)) AS end_event_ts, arraySort( x -> x.1, arrayFilter( x -> x.1 <= end_event_ts, groupArray((toUInt32(`#event_time`), `#event_name`)) ) ) AS sorted_events, arrayEnumerate(sorted_events) AS event_idxs, arrayFilter( (x, y, z) -> z.1 <= end_event_ts and (z.2 = end_event and y>{interval_ts}) OR y > {interval_ts}, event_idxs, arrayDifference(sorted_events.1), sorted_events ) AS gap_idxs, arrayMap(x -> x+1, gap_idxs) AS gap_idxs_, arrayMap(x -> if(has(gap_idxs_, x), 1,0), event_idxs) AS gap_masks, arraySplit((x, y) -> y, sorted_events, gap_masks) AS split_events select `#account_id`, arrayJoin(split_events) AS event_chain_, arrayMap(x -> x.2, event_chain_) AS event_chain, has(event_chain, end_event) AS has_midway_hit from (select `#event_time`, `#event_name`, `#account_id` from {self.game}.event where addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and `#event_name` in evnet_all and {where_a}) group by `#account_id` HAVING has_midway_hit = 1 ) where arrayElement(event_chain, -1) = end_event and "#account_id" IN (SELECT "#account_id" FROM (SELECT "#account_id" FROM {self.game}.user_view WHERE {where_b}) AS anon_b) GROUP BY event_chain,`#account_id` ORDER BY values desc""" # initial_event 为初始事件,还有一个结束事件 sql = sql_a if source_type == 'initial_event' else sql_b print(sql) return { 'sql': sql, 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } async def retention_model_sql2(self): # 留存分析生成SQL filter_item_type = self.event_view.get('filter_item_type') filter_item = self.event_view.get('filter_item') event_name_a = self.events[0]['eventName'] # 初始的事件名 event_name_b = self.events[1]['eventName'] # 回访的事件名 visit_name = self.events[0].get('event_attr_id') where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) where_a = '1' if where: qry = sa.select().where(*where) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_a = 'WHERE '.join(sql.split('WHERE ')[1:]) where, _ = await self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) where_b = '1' if where: qry = sa.select().where(*where) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_b = sql.split('WHERE ')[1] # 任意事件 event_name_b = 1 if event_name_b == '*' else f"`#event_name` = '{event_name_b}'" days = (arrow.get(self.end_date).date() - arrow.get(self.start_date).date()).days keep = [] cnt = [] retention_n = [*[k for k in range(1, 60)], 70 - 1, 75 - 1, 80 - 1, 85 - 1, 90 - 1, 95 - 1, 100 - 1, 110 - 1, 120 - 1, 150 - 1, 180 - 1, 210 - 1, 240 - 1, 270 - 1, 300 - 1, 360 - 1] """ cnt0-cnt1 as on1, round(on1 * 100 / cnt0, 2) as `0p1`, """ for i in retention_n: keep.append( f"""cnt{i}, round(cnt{i} * 100 / cnt0, 2) as `p{i}`, cnt0-cnt{i} as on{i}, round(on{i} * 100 / cnt0, 2) as `op{i}` """) cnt.append(f"""sum(if(dateDiff('day',a.reg_date,b.visit_date)={i},1,0)) as cnt{i}""") keep_str = ','.join(keep) cnt_str = ','.join(cnt) if "pay" in event_name_a and "distinct_id" in visit_name: if "huixie" in self.game: check_event = "new_device" else: check_event = "create_account" sql = f""" with '{event_name_a}' as start_event, {event_name_b} as retuen_visit, `{visit_name}` as visit, '{self.start_date}' as start_data, '{self.end_date}' as end_data, toDate(addHours(`#event_time`, {self.zone_time})) as date select reg_date, cnt0 , {keep_str} from(select p.reg_date, uniqExact(p.visit) as cnt0 from ( select date as reg_date,visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data group by reg_date,visit ) p inner join(SELECT date as reg_date,visit FROM {self.game}.event WHERE addHours({self.game}.event."#event_time", 8) >=start_data AND addHours({self.game}.event."#event_time", 8) <=end_data AND {self.game}.event."#event_name" = '{check_event}' AND {self.game}.event.is_new_device = '1')d on p.visit = d.visit and p.reg_date = d.reg_date group by reg_date ) reg left join (select a.reg_date, {cnt_str} from (select p.* from ( select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data group by reg_date, visit) p inner join (SELECT date as reg_date,visit FROM {self.game}.event WHERE addHours({self.game}.event."#event_time", 8) >=start_data AND addHours({self.game}.event."#event_time", 8) <=end_data AND {self.game}.event."#event_name" = '{check_event}' AND {self.game}.event.is_new_device = '1') d on p.visit = d.visit and p.reg_date = d.reg_date) a left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on a.visit = b.visit group by a.reg_date) log on reg.reg_date=log.reg_date """ print(sql) else: sql = f""" with '{event_name_a}' as start_event, {event_name_b} as retuen_visit, `{visit_name}` as visit, '{self.start_date}' as start_data, '{self.end_date}' as end_data, toDate(addHours(`#event_time`, {self.zone_time})) as date select reg_date, cnt0 , {keep_str} from(select date, uniqExact(visit) as cnt0 from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by date) reg left join (select a.reg_date, {cnt_str} from (select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit) a left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on a.visit = b.visit group by a.reg_date) log on reg.date=log.reg_date """ print(sql) return { 'sql': sql, 'date_range': self.date_range, 'unit_num': self.unit_num, 'retention_n': retention_n, 'filter_item_type': filter_item_type, 'filter_item': filter_item, 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } async def retention_model_sql3(self): # 留存分析分组详情生成SQL filter_item_type = self.event_view.get('filter_item_type') filter_item = self.event_view.get('filter_item') event_name_a = self.events[0]['eventName'] # 初始的事件名 event_name_b = self.events[1]['eventName'] # 回访的事件名 groupby_list = self.event_view.get('groupBy') # 判断是基础的还是标签 groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] visit_name = self.events[0].get('event_attr_id') where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) where_a = '1' if where: qry = sa.select().where(*where) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_a = 'WHERE '.join(sql.split('WHERE ')[1:]) # 任意事件 event_name_b = 1 if event_name_b == '*' else f"`#event_name` = '{event_name_b}'" days = (arrow.get(self.end_date).date() - arrow.get(self.start_date).date()).days keep = [] cnt = [] retention_n = [*[k for k in range(1, 60)], 70 - 1, 75 - 1, 80 - 1, 85 - 1, 90 - 1, 95 - 1, 100 - 1, 110 - 1, 120 - 1, 150 - 1, 180 - 1, 210 - 1, 240 - 1, 270 - 1, 300 - 1, 360 - 1] """ cnt0-cnt1 as on1, round(on1 * 100 / cnt0, 2) as `0p1`, """ for i in retention_n: keep.append( f"""cnt{i}, round(cnt{i} * 100 / cnt0, 2) as `p{i}`, cnt0-cnt{i} as on{i}, round(on{i} * 100 / cnt0, 2) as `op{i}` """) cnt.append(f"""sum(if(dateDiff('day',a.reg_date,b.visit_date)={i},1,0)) as cnt{i}""") keep_str = ','.join(keep) cnt_str = ','.join(cnt) if len(groupby) > 0: groupbys = ','.join([f"`{i}`" for i in groupby]) groupby_on = ' and '.join([f"reg.{ii} = log.{ii}" for ii in [f"`{i}`" for i in groupby]]) sql = f""" with '{event_name_a}' as start_event, {event_name_b} as retuen_visit, `{visit_name}` as visit, '{self.start_date}' as start_data, '{self.end_date}' as end_data, toDate(addHours(`#event_time`, {self.zone_time})) as date select reg_date, {groupbys}, cnt0 , {keep_str} from(select date,{groupbys},uniqExact(visit) as cnt0 from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by date,{groupbys}) reg left join (select a.reg_date,{groupbys}, {cnt_str} from (select {groupbys},date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit,{groupbys}) a left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on a.visit = b.visit group by a.reg_date,{groupbys}) log on reg.date=log.reg_date and {groupby_on} """ else: sql = f""" with '{event_name_a}' as start_event, {event_name_b} as retuen_visit, `{visit_name}` as visit, '{self.start_date}' as start_data, '{self.end_date}' as end_data, toDate(addHours(`#event_time`, {self.zone_time})) as date select reg_date, cnt0 , {keep_str} from(select date, uniqExact(visit) as cnt0 from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by date) reg left join (select a.reg_date, {cnt_str} from (select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit) a left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on a.visit = b.visit group by a.reg_date) log on reg.date=log.reg_date """ print(sql) return { 'sql': sql, 'group_label': self.group_label, 'date_range': self.date_range, 'unit_num': self.unit_num, 'retention_n': retention_n, 'filter_item_type': filter_item_type, 'filter_item': filter_item, 'time_particle': self.time_particle, 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], }