ltv 模型
This commit is contained in:
parent
a2d08b35e9
commit
db2262ec0b
@ -20,13 +20,36 @@ from models.x_analysis import XAnalysis
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/overview")
|
||||
async def event_model_sql(
|
||||
@router.post("/ltv_model_sql")
|
||||
async def ltv_model_sql(
|
||||
request: Request,
|
||||
game: str,
|
||||
analysis: XAnalysis = Depends(XAnalysis),
|
||||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||||
) -> schemas.Msg:
|
||||
""" 数据总览 """
|
||||
sql = analysis.to_sql()
|
||||
""" ltv模型sql """
|
||||
await analysis.init()
|
||||
sql = analysis.ltv_model_sql()
|
||||
return schemas.Msg(code=0, msg='ok', data=sql)
|
||||
|
||||
|
||||
@router.post("/ltv_model")
|
||||
async def ltv_model_sql(
|
||||
request: Request,
|
||||
game: str,
|
||||
analysis: XAnalysis = Depends(XAnalysis),
|
||||
ckdb: CKDrive = Depends(get_ck_db),
|
||||
current_user: schemas.UserDB = Depends(deps.get_current_user)
|
||||
) -> schemas.Msg:
|
||||
""" ltv模型sql """
|
||||
await analysis.init()
|
||||
sql = analysis.ltv_model_sql()
|
||||
df = await ckdb.query_dataframe(sql)
|
||||
df.fillna(0, inplace=True)
|
||||
df.rename(columns={'date': '注册日期', 'cnt1': '角色数'}, inplace=True)
|
||||
data = {
|
||||
'titel': df.columns.tolist(),
|
||||
'rows': df.values.tolist()
|
||||
}
|
||||
|
||||
return schemas.Msg(code=0, msg='ok', data=data)
|
||||
|
@ -1,5 +1,6 @@
|
||||
from typing import Tuple
|
||||
|
||||
import arrow
|
||||
import sqlalchemy as sa
|
||||
import json
|
||||
|
||||
@ -9,68 +10,120 @@ import pandas as pd
|
||||
|
||||
from sqlalchemy import func, or_, and_, not_, MetaData
|
||||
|
||||
import crud
|
||||
import schemas
|
||||
from core.config import settings
|
||||
from db import get_database
|
||||
from db.redisdb import get_redis_pool, RedisDrive
|
||||
|
||||
|
||||
class XAnalysis:
|
||||
def __init__(self, data_in: schemas.Overview, game: str):
|
||||
def __init__(self, data_in: schemas.CkQuery, game: str):
|
||||
self.data_in = data_in
|
||||
self.game = game
|
||||
self.event_view = dict()
|
||||
self.events = []
|
||||
|
||||
def to_sql(self):
|
||||
# 构建表
|
||||
metadata = sa.MetaData(schema=self.game)
|
||||
tbl = sa.Table(f'overview', metadata,
|
||||
sa.Column('date'),
|
||||
sa.Column('#os'),
|
||||
sa.Column('#bundle_id'),
|
||||
sa.Column('owner_name'),
|
||||
sa.Column('channel'),
|
||||
sa.Column('active_num'),
|
||||
sa.Column('new_account_num'),
|
||||
sa.Column('money'),
|
||||
sa.Column('pay_account_num'),
|
||||
sa.Column('new_pay_account_num'),
|
||||
sa.Column('new_money'),
|
||||
)
|
||||
# bundle_id_col = sa.Column('#bundle_id')
|
||||
# channel_col = sa.Column('channel')
|
||||
# owner_name_col = sa.Column('owner_name')
|
||||
# os_col = sa.Column('#os')
|
||||
# date_col = sa.Column('date')
|
||||
# active_num_col = sa.Column('active_num')
|
||||
# new_account_num_col = sa.Column('new_account_num')
|
||||
self.global_filters = []
|
||||
|
||||
def _get_global_filters(self):
|
||||
return self.event_view.get('filts') or []
|
||||
|
||||
async def init(self):
|
||||
if self.data_in.report_id:
|
||||
db = get_database()
|
||||
report = await crud.report.get(db, id=self.data_in.report_id)
|
||||
self.event_view = report['query']['eventView']
|
||||
self.events = report['query']['events']
|
||||
e_days, s_days = self.event_view['recentDay'].split('-')
|
||||
self.event_view['endTime'] = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
|
||||
self.event_view['startTime'] = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
|
||||
|
||||
|
||||
else:
|
||||
self.event_view = self.data_in.eventView
|
||||
self.events = self.data_in.events
|
||||
|
||||
self.global_filters = self._get_global_filters()
|
||||
|
||||
def handler_filts(self):
|
||||
where = []
|
||||
for item in self.global_filters:
|
||||
|
||||
col = sa.Column(item['columnName'])
|
||||
|
||||
comparator = item['comparator']
|
||||
ftv = item['ftv']
|
||||
if comparator == '==':
|
||||
if len(ftv) > 1:
|
||||
where.append(or_(*[col == v for v in ftv]))
|
||||
else:
|
||||
where.append(col == ftv[0])
|
||||
elif comparator == '>=':
|
||||
where.append(col >= ftv[0])
|
||||
elif comparator == '<=':
|
||||
where.append(col <= ftv[0])
|
||||
elif comparator == '>':
|
||||
where.append(col > ftv[0])
|
||||
elif comparator == '<':
|
||||
where.append(col < ftv[0])
|
||||
|
||||
elif comparator == 'is not null':
|
||||
where.append(col.isnot(None))
|
||||
elif comparator == 'is null':
|
||||
where.append(col.is_(None))
|
||||
|
||||
elif comparator == '!=':
|
||||
where.append(col != ftv[0])
|
||||
|
||||
return where
|
||||
|
||||
def ltv_model_sql(self):
|
||||
|
||||
days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days
|
||||
|
||||
select_ltv = []
|
||||
sumpay = []
|
||||
for i in range(1, days):
|
||||
# select_ltv.append(func.round(sa.Column(f'sumpay_{i}') / sa.Column('cnt1'), 2).label(f'LTV{i}'))
|
||||
select_ltv.append(
|
||||
f"if(dateDiff('day', reg.date, now())<{i - 1}, '-',toString(round(sumpay_{i} / cnt1, 2))) AS LTV{i}")
|
||||
sumpay.append(f"sum(if(dateDiff('day', a.date, b.date) < {i}, money, 0)) as sumpay_{i}")
|
||||
# qry = sa.select(*select_ltv)
|
||||
# select_ltv_str = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||||
# select_ltv_str = select_ltv_str.split('SELECT ')[1]
|
||||
sumpay_str = ','.join(sumpay)
|
||||
select_ltv_str = ','.join(select_ltv)
|
||||
|
||||
where = [
|
||||
getattr(tbl.c, 'date') >= self.data_in.sdate,
|
||||
getattr(tbl.c, 'date') <= self.data_in.edate,
|
||||
sa.Column('date') > self.event_view['startTime'].split(' ')[0],
|
||||
sa.Column('date') <= self.event_view['endTime'].split(' ')[0]
|
||||
]
|
||||
|
||||
if self.data_in.bundle_id:
|
||||
where.append(getattr(tbl.c, '#bundle_id').in_(self.data_in.bundle_id))
|
||||
|
||||
if self.data_in.channel:
|
||||
where.append(getattr(tbl.c, '#bundle_id').in_(self.data_in.channel))
|
||||
|
||||
if self.data_in.owner_name:
|
||||
where.append(getattr(tbl.c, 'owner_name').in_(self.data_in.owner_name))
|
||||
|
||||
if self.data_in.os:
|
||||
where.append(getattr(tbl.c, '#os').in_(self.data_in.os))
|
||||
|
||||
qry = sa.select(getattr(tbl.c, 'date'), getattr(tbl.c, '#bundle_id'),
|
||||
func.sum(getattr(tbl.c, 'active_num')).label('active_num'),
|
||||
func.sum(getattr(tbl.c, 'new_account_num')).label('new_account_num'),
|
||||
func.sum(getattr(tbl.c, 'pay_account_num')).label('pay_account_num'),
|
||||
func.sum(getattr(tbl.c, 'money')).label('money'),
|
||||
func.sum(getattr(tbl.c, 'new_pay_account_num')).label('new_pay_account_num'),
|
||||
func.sum(getattr(tbl.c, 'new_money')).label('new_money'),
|
||||
|
||||
) \
|
||||
.where(*where) \
|
||||
.group_by(getattr(tbl.c, 'date'), getattr(tbl.c, '#bundle_id'))
|
||||
where_ext = self.handler_filts()
|
||||
qry = sa.select().where(*where, *where_ext)
|
||||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||||
where_str = sql.split('WHERE ')[1]
|
||||
sql = f"""SELECT toYYYYMMDD(reg.date) as date,
|
||||
cnt1,
|
||||
{select_ltv_str}
|
||||
FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`#account_id`) cnt1
|
||||
FROM {self.game}.event
|
||||
where `#event_name` = 'create_role'
|
||||
AND {where_str}
|
||||
GROUP BY toDate(addHours(`#event_time`, `#zone_offset`))) as reg
|
||||
left join
|
||||
(select a.date,
|
||||
{sumpay_str}
|
||||
from (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, `#account_id`
|
||||
FROM {self.game}.event
|
||||
where `#event_name` = 'create_role'
|
||||
and {where_str}) as a
|
||||
left join (select `#account_id`, money, toDate(addHours(`#event_time`, `#zone_offset`)) as date
|
||||
from {self.game}.event
|
||||
where `#event_name` = 'rechargeGame') b
|
||||
on a.`#account_id` = b.`#account_id`
|
||||
group by a.date) log on reg.date = log.date
|
||||
order by date
|
||||
"""
|
||||
print(sql)
|
||||
return sql
|
||||
|
Loading…
Reference in New Issue
Block a user