From db2262ec0bf0936c18b100ab1be5480d2dd80940 Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 27 Jul 2021 14:30:23 +0800 Subject: [PATCH] =?UTF-8?q?ltv=20=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api_v1/endpoints/xquery.py | 31 ++++++- models/x_analysis.py | 153 ++++++++++++++++++++++----------- 2 files changed, 130 insertions(+), 54 deletions(-) diff --git a/api/api_v1/endpoints/xquery.py b/api/api_v1/endpoints/xquery.py index 4304b21..af9be2c 100644 --- a/api/api_v1/endpoints/xquery.py +++ b/api/api_v1/endpoints/xquery.py @@ -20,13 +20,36 @@ from models.x_analysis import XAnalysis router = APIRouter() -@router.post("/overview") -async def event_model_sql( +@router.post("/ltv_model_sql") +async def ltv_model_sql( request: Request, game: str, analysis: XAnalysis = Depends(XAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: - """ 数据总览 """ - sql = analysis.to_sql() + """ ltv模型sql """ + await analysis.init() + sql = analysis.ltv_model_sql() return schemas.Msg(code=0, msg='ok', data=sql) + + +@router.post("/ltv_model") +async def ltv_model_sql( + request: Request, + game: str, + analysis: XAnalysis = Depends(XAnalysis), + ckdb: CKDrive = Depends(get_ck_db), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """ ltv模型sql """ + await analysis.init() + sql = analysis.ltv_model_sql() + df = await ckdb.query_dataframe(sql) + df.fillna(0, inplace=True) + df.rename(columns={'date': '注册日期', 'cnt1': '角色数'}, inplace=True) + data = { + 'titel': df.columns.tolist(), + 'rows': df.values.tolist() + } + + return schemas.Msg(code=0, msg='ok', data=data) diff --git a/models/x_analysis.py b/models/x_analysis.py index 3317bea..30f4f3f 100644 --- a/models/x_analysis.py +++ b/models/x_analysis.py @@ -1,5 +1,6 @@ from typing import Tuple +import arrow import sqlalchemy as sa import json @@ -9,68 +10,120 @@ import pandas as pd from sqlalchemy import func, or_, and_, not_, MetaData +import crud import schemas from core.config import settings +from db import get_database from db.redisdb import get_redis_pool, RedisDrive class XAnalysis: - def __init__(self, data_in: schemas.Overview, game: str): + def __init__(self, data_in: schemas.CkQuery, game: str): self.data_in = data_in self.game = game + self.event_view = dict() + self.events = [] - def to_sql(self): - # 构建表 - metadata = sa.MetaData(schema=self.game) - tbl = sa.Table(f'overview', metadata, - sa.Column('date'), - sa.Column('#os'), - sa.Column('#bundle_id'), - sa.Column('owner_name'), - sa.Column('channel'), - sa.Column('active_num'), - sa.Column('new_account_num'), - sa.Column('money'), - sa.Column('pay_account_num'), - sa.Column('new_pay_account_num'), - sa.Column('new_money'), - ) - # bundle_id_col = sa.Column('#bundle_id') - # channel_col = sa.Column('channel') - # owner_name_col = sa.Column('owner_name') - # os_col = sa.Column('#os') - # date_col = sa.Column('date') - # active_num_col = sa.Column('active_num') - # new_account_num_col = sa.Column('new_account_num') + self.global_filters = [] + + def _get_global_filters(self): + return self.event_view.get('filts') or [] + + async def init(self): + if self.data_in.report_id: + db = get_database() + report = await crud.report.get(db, id=self.data_in.report_id) + self.event_view = report['query']['eventView'] + self.events = report['query']['events'] + e_days, s_days = self.event_view['recentDay'].split('-') + self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') + self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') + + + else: + self.event_view = self.data_in.eventView + self.events = self.data_in.events + + self.global_filters = self._get_global_filters() + + def handler_filts(self): + where = [] + for item in self.global_filters: + + col = sa.Column(item['columnName']) + + comparator = item['comparator'] + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) + + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + + elif comparator == '!=': + where.append(col != ftv[0]) + + return where + + def ltv_model_sql(self): + + days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days + + select_ltv = [] + sumpay = [] + for i in range(1, days): + # select_ltv.append(func.round(sa.Column(f'sumpay_{i}') / sa.Column('cnt1'), 2).label(f'LTV{i}')) + select_ltv.append( + f"if(dateDiff('day', reg.date, now())<{i - 1}, '-',toString(round(sumpay_{i} / cnt1, 2))) AS LTV{i}") + sumpay.append(f"sum(if(dateDiff('day', a.date, b.date) < {i}, money, 0)) as sumpay_{i}") + # qry = sa.select(*select_ltv) + # select_ltv_str = str(qry.compile(compile_kwargs={"literal_binds": True})) + # select_ltv_str = select_ltv_str.split('SELECT ')[1] + sumpay_str = ','.join(sumpay) + select_ltv_str = ','.join(select_ltv) where = [ - getattr(tbl.c, 'date') >= self.data_in.sdate, - getattr(tbl.c, 'date') <= self.data_in.edate, + sa.Column('date') > self.event_view['startTime'].split(' ')[0], + sa.Column('date') <= self.event_view['endTime'].split(' ')[0] ] - - if self.data_in.bundle_id: - where.append(getattr(tbl.c, '#bundle_id').in_(self.data_in.bundle_id)) - - if self.data_in.channel: - where.append(getattr(tbl.c, '#bundle_id').in_(self.data_in.channel)) - - if self.data_in.owner_name: - where.append(getattr(tbl.c, 'owner_name').in_(self.data_in.owner_name)) - - if self.data_in.os: - where.append(getattr(tbl.c, '#os').in_(self.data_in.os)) - - qry = sa.select(getattr(tbl.c, 'date'), getattr(tbl.c, '#bundle_id'), - func.sum(getattr(tbl.c, 'active_num')).label('active_num'), - func.sum(getattr(tbl.c, 'new_account_num')).label('new_account_num'), - func.sum(getattr(tbl.c, 'pay_account_num')).label('pay_account_num'), - func.sum(getattr(tbl.c, 'money')).label('money'), - func.sum(getattr(tbl.c, 'new_pay_account_num')).label('new_pay_account_num'), - func.sum(getattr(tbl.c, 'new_money')).label('new_money'), - - ) \ - .where(*where) \ - .group_by(getattr(tbl.c, 'date'), getattr(tbl.c, '#bundle_id')) + where_ext = self.handler_filts() + qry = sa.select().where(*where, *where_ext) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + where_str = sql.split('WHERE ')[1] + sql = f"""SELECT toYYYYMMDD(reg.date) as date, + cnt1, + {select_ltv_str} + FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`#account_id`) cnt1 + FROM {self.game}.event + where `#event_name` = 'create_role' + AND {where_str} + GROUP BY toDate(addHours(`#event_time`, `#zone_offset`))) as reg + left join + (select a.date, + {sumpay_str} + from (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, `#account_id` + FROM {self.game}.event + where `#event_name` = 'create_role' + and {where_str}) as a + left join (select `#account_id`, money, toDate(addHours(`#event_time`, `#zone_offset`)) as date + from {self.game}.event + where `#event_name` = 'rechargeGame') b + on a.`#account_id` = b.`#account_id` + group by a.date) log on reg.date = log.date + order by date + """ print(sql) return sql