from typing import Tuple import arrow import sqlalchemy as sa import json from fastapi import Depends import pandas as pd from sqlalchemy import func, or_, and_, not_, MetaData import crud import schemas from core.config import settings from db import get_database from models.user_label import UserClusterDef from db.redisdb import get_redis_pool, RedisDrive class XAnalysis: def __init__(self, data_in: schemas.CkQuery, game: str): self.data_in = data_in self.game = game self.event_view = dict() self.events = [] self.zone_time: int = 0 self.global_filters = [] self.account_filters = [] self.global_relation = 'and' self.date_range = [] self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) def _get_global_filters(self): _res = self.event_view.get('filts', []) if _res: for idx, item in enumerate(_res): if item['data_type'] == 'user_label': _res[idx].update({ 'tableType': item['data_type'], }) else: _res[idx].update({ 'tableType': item['table_type'], }) return _res # 获取event_view字典里面filts的值,或返回空列表 async def init(self, *args, **kwargs): if self.data_in.report_id: db = get_database() report = await crud.report.get(db, id=self.data_in.report_id) self.event_view = report['query']['eventView'] self.events = report['query']['events'] try: e_days = self.event_view['e_days'] s_days = self.event_view['s_days'] except: # 兼容以前的 e_days, s_days = self.event_view['recentDay'].split('-') # self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)+1).strftime('%Y-%m-%d 23:59:59') # self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)+1).strftime('%Y-%m-%d 00:00:00') self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') else: self.event_view = self.data_in.eventView self.events = self.data_in.events for d in pd.date_range(self.event_view['startTime'], self.event_view['endTime'], freq='D', tz='UTC'): self.date_range.append(d.date()) self.global_filters = self._get_global_filters() self.global_relation = self.event_view.get('relation', 'and') # 用户自带过滤 if 'data_where' in kwargs: self.account_filters = kwargs['data_where'].get(self.game, []) # def handler_filts(self, *filters): # """ # :param filters: (filts:list,relation:str) # :param g_f: # :param relation: # :return: # """ # # event_filters = [] # for filter in filters: # filts = filter[0] # relation = filter[1] # event_filter = [] # for item in filts: # # where = event_filter # # col = sa.Column(item['columnName']) # # comparator = item['comparator'] # ftv = item['ftv'] # if comparator == '==': # if len(ftv) > 1: # where.append(or_(*[col == v for v in ftv])) # else: # where.append(col == ftv[0]) # elif comparator == '>=': # where.append(col >= ftv[0]) # elif comparator == '<=': # where.append(col <= ftv[0]) # elif comparator == '>': # where.append(col > ftv[0]) # elif comparator == '<': # where.append(col < ftv[0]) # # elif comparator == 'is not null': # where.append(col.isnot(None)) # elif comparator == 'is null': # where.append(col.is_(None)) # # elif comparator == 'like': # where.append(col.like(f'%{ftv[0]}%')) # # elif comparator == 'not like': # where.append(col.notlike(f'%{ftv[0]}%')) # # elif comparator == 'in': # where.append(col.in_(ftv)) # # elif comparator == '!=': # where.append(col != ftv[0]) # if relation == 'and': # if event_filter: # event_filters.append(and_(*event_filter)) # else: # if event_filter: # event_filters.append(or_(*event_filter)) # # return event_filters async def handler_filts(self, *filters): """ :param filters: (filts:list,relation:str) :param g_f: :param relation: :return: """ event_filters = [] for filter in filters: filts = filter[0] relation = filter[1] user_filter = [] event_filter = [] for item in filts: comparator = item['comparator'] if item['tableType'] == 'user': where = user_filter elif item['tableType'] == 'event': where = event_filter elif item['tableType'] == 'user_label': user_cluster_def = UserClusterDef(self.game, item['columnName'], self.account_filters) await user_cluster_def.init() sub_qry = user_cluster_def.to_sql_qry() if comparator == 'in': event_filter.append(sa.Column('#account_id').in_(sub_qry)) else: event_filter.append(sa.Column('#account_id').notin_(sub_qry)) continue else: continue col = sa.Column(item['columnName']) comparator = item['comparator'] ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: where.append(or_(*[col == v for v in ftv])) else: where.append(col == ftv[0]) elif comparator == '>=': where.append(col >= ftv[0]) elif comparator == '<=': where.append(col <= ftv[0]) elif comparator == '>': where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) elif comparator == 'is not null': where.append(col.isnot(None)) elif comparator == 'is null': where.append(col.is_(None)) elif comparator == 'like': where.append(col.like(f'%{ftv[0]}%')) elif comparator == 'not like': where.append(col.notlike(f'%{ftv[0]}%')) elif comparator == 'in': where.append(col.in_(ftv)) elif comparator == '!=': where.append(col != ftv[0]) if relation == 'and': if event_filter: event_filters.append(and_(*event_filter)) else: if event_filter: event_filters.append(or_(*event_filter)) return event_filters async def ltv_model_sql(self): days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days quota = self.event_view['quota'] select_ltv = [] sumpay = [] sum_money = [] # for i in range(1, days + 2): ltv_n = [*[k for k in range(1, 61)], 70, 75, 80, 85, 90, 95, 100, 110, 120, 150, 180, 210, 240, 270, 300, 360] for i in ltv_n: # select_ltv.append(func.round(sa.Column(f'sumpay_{i}') / sa.Column('cnt1'), 2).label(f'LTV{i}')) select_ltv.append( f"if(dateDiff('day', reg.date, now())<{i - 1}, '-',toString(round(sumpay_{i} / cnt1, 2))) AS LTV{i}") sumpay.append(f"sum(if(dateDiff('day', a.date, b.date) < {i}, money, 0)) as sumpay_{i}") sum_money.append(f"sumpay_{i}") # qry = sa.select(*select_ltv) # select_ltv_str = str(qry.compile(compile_kwargs={"literal_binds": True})) # select_ltv_str = select_ltv_str.split('SELECT ')[1] sumpay_str = ','.join(sumpay) select_ltv_str = ','.join(select_ltv) sum_money_str = ','.join(sum_money) where = [ sa.Column('date') >= self.event_view['startTime'].split(' ')[0], sa.Column('date') <= self.event_view['endTime'].split(' ')[0] ] if quota == '#distinct_id': where.append(sa.Column('is_new_device') == 1) qry = sa.select().where(*where) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_str = sql.split('WHERE ')[1] where_order = await self.handler_filts((self.global_filters, self.global_relation)) # global_relation就是 and where_order_str = 1 if where_order: qry = sa.select().where(*where_order) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_order_str = 'WHERE '.join(sql.split('WHERE ')[1:]) # where_account = await self.handler_filts((self.account_filters, 'and'), self.ext_filters) where_account = where_order where_account_str = 1 if where_account: qry = sa.select().where(*where_account) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_account_str = sql.split('WHERE ')[1] if "orderid" in where_account_str: where_account_str = "1=1" else: where_account_str = where_account_str.split('AND')[0] if self.game == 'huixie' and quota == '#distinct_id': event_n='new_device' else: event_n = 'create_account' if 'is_new_device = 1' in where_str: timed=where_str.replace('AND is_new_device = 1','',1) else: timed=where_str sql = f"""SELECT reg.date as date, cnt1, {select_ltv_str}, {sum_money_str} FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`{quota}`) cnt1 FROM {self.game}.event where `#event_name` = '{event_n}' AND {where_str} AND {where_account_str} GROUP BY toDate(addHours(`#event_time`, `#zone_offset`))) as reg left join (select a.date, {sumpay_str} from (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, `{quota}` FROM {self.game}.event where `#event_name` = '{event_n}' AND {where_str} AND {where_account_str} ) as a left join (select `{quota}`, unitPrice/100 as money, toDate(addHours(`#event_time`, `#zone_offset`)) as date from {self.game}.event where `#event_name` = 'pay' and {where_order_str} AND {where_account_str}) b on a.`{quota}` = b.`{quota}` group by a.date) log on reg.date = log.date order by date """ #{timed} and 计算LTV时,所选时间区间只是为了划分注册人群,截止时间应该按照当前时间执行 print(sql) return {'sql': sql, 'quota': quota, 'start_date': self.event_view['startTime'][:10], 'end_date': self.event_view['endTime'][:10], 'date_range': self.date_range, 'ltv_n': ltv_n }