ltv加工

This commit is contained in:
wuaho 2021-10-25 17:08:33 +08:00
parent 74c365e1a1
commit f93a120700
7 changed files with 135 additions and 56 deletions

View File

@ -51,26 +51,43 @@ async def ltv_model_sql(
res = analysis.ltv_model_sql()
sql = res['sql']
quota = res['quota']
ltv_len = res['ltv_len']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-1, msg='查无数据')
df.fillna(0, inplace=True)
for d in set(res['date_range']) - set(df['date']):
df.loc[len(df)] = '0'
df.loc[len(df)] = 0
df.loc[len(df) - 1, 'date'] = d
days = (pd.Timestamp.now().date() - d).days
if days + 2 >= res['ltv_len']:
if days + 2 >= ltv_len:
continue
df.loc[len(df) - 1, f'LTV{days + 2}':] = '-'
df.sort_values('date', inplace=True)
df.rename(columns={'date': '注册日期'}, inplace=True)
if quota == '#account_id':
df.rename(columns={'cnt1': '角色数'}, inplace=True)
elif quota in ('#distinct_id',):
df.rename(columns={'cnt1': '设备数'}, inplace=True)
cat = '角色数'
if quota == '#distinct_id':
cat = '设备数'
df.rename(columns={'cnt1': cat}, inplace=True)
df1 = df[['注册日期', cat, *[f'LTV{i}' for i in range(1, ltv_len + 1)]]]
df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in range(1, ltv_len + 1)]]]
df2.replace('-', 0, inplace=True)
avg_ltv = (df2[[f'sumpay_{i}' for i in range(1, ltv_len + 1)]].sum() / df2[cat].sum()).round(2)
df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv]
df1.insert(2, '累计LTV', 0)
last_ltv = []
for items in df1.values:
for item in items[::-1]:
if item != '-':
last_ltv.append(item)
break
df1['累计LTV'] = last_ltv
data = {
'title': df.columns.tolist(),
'rows': df.values.tolist(),
'title': df1.columns.tolist(),
'rows': df1.values.tolist(),
'start_date': res['start_date'],
'end_date': res['end_date']
}

View File

@ -23,13 +23,13 @@ async def save(request: Request,
@router.get("/list")
async def get_list(request: Request,
project_id: str,
# project_id: str,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""读取项目保存的用户标签"""
data = await service.get_list(db, project_id)
data = await service.get_list(db, game)
return schemas.Msg(code=0, msg='ok', data=data)
@ -47,11 +47,11 @@ async def get_detail(request: Request,
@router.post("/del")
async def delete(request: Request,
game: str,
data_id: schemas.UserLabelDel,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
game: str,
data_id: schemas.UserLabelDel,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""删除用户标签"""
data = await service.delete(db, data_id.label_id)
return schemas.Msg(code=0, msg='ok', data=data)
@ -65,27 +65,40 @@ async def sql(request: Request,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""自定义用户标签 sql测试"""
data = await service.json2sql(game, data_in.label_id)
data = await service.json2sql(game, data_in.cluster_name)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/cluster_user_list")
async def cluster_user(request: Request,
data_in: schemas.UserLabelJson2Sql,
game: str,
# db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""获取该标签用户"""
data = await service.get_cluster_user(game, data_in.label_id)
async def cluster_user_list(request: Request,
game: str,
data_id: schemas.ReadClusterUser,
# db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""获取该标签用户列表"""
data = await service.get_cluster_user(game, data_id.cluster_name, data_id.page, data_id.limit)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/cluster_user_count")
async def cluster_user_count(request: Request,
data_in: schemas.UserLabelJson2Sql,
game: str,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
data_in: schemas.UserLabelJson2Sql,
game: str,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""获取该标签用户数量"""
data = await service.get_cluster_user_count(game, data_in.label_id)
data = await service.get_cluster_user_count(game, data_in.cluster_name)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/copy")
async def copy(request: Request,
data_in: schemas.UserLabelCopy,
game: str,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""复制标签到其他项目"""
await service.copy_to(data_in.to_game, data_in.label_id_list, request.user.usernam)
return schemas.Msg(code=0, msg='ok')

View File

@ -2,6 +2,8 @@ import pandas as pd
import numpy as np
import crud
import schemas
from db import get_database
from db.ckdb import get_ck_db
from models.user_label import UserClusterDef
@ -14,8 +16,8 @@ async def read(db, data_in):
return await crud.user_label.read(db, data_in)
async def get_list(db, project_id):
return await crud.user_label.get_list(db, project_id)
async def get_list(db, game):
return await crud.user_label.get_list(db, game)
async def get_detail(db, label_id):
@ -33,11 +35,17 @@ async def json2sql(game, date_in):
return user_cluster_def.to_sql()
async def get_cluster_user(game, date_in):
user_cluster_def = UserClusterDef(game, date_in)
async def get_cluster_user(game, cluster_name, page, limit):
user_cluster_def = UserClusterDef(game, cluster_name, page=page, limit=limit)
await user_cluster_def.init()
sql = user_cluster_def.cluster_user()
return sql
sql = user_cluster_def.cluster_user_list()
ckdb = get_ck_db()
df = await ckdb.query_dataframe(sql)
df.fillna(0, inplace=True)
return {
'columns': df.columns.tolist(),
'values': df.values.tolist()
}
async def get_cluster_user_count(game, date_in):
@ -46,3 +54,13 @@ async def get_cluster_user_count(game, date_in):
sql = user_cluster_def.cluster_user_count()
ckdb = get_ck_db()
df = await ckdb.query_dataframe(sql)
return {'num': int(df.loc[0, 'values'])}
async def copy_to(to_game, ids, act_name):
db = get_database()
docs = await crud.user_label.find_ids(db, *ids)
for item in docs:
data = schemas.UserLabelSave(**item)
await crud.user_label.save(db, data, act_name, to_game)
return True

View File

@ -10,8 +10,7 @@ from utils import get_uid
class CRUDUserLabel(CRUDBase):
async def save(self, db: AsyncIOMotorDatabase, data_in: schemas.UserLabelSave, act_name, game):
where = {'project_id': data_in.project_id, 'cluster_name': data_in.cluster_name,
'game': game}
where = {'cluster_name': data_in.cluster_name, 'game': game}
is_exists = await self.find_one(db, where)
data = data_in.dict(skip_defaults=True)
data['act_name'] = act_name
@ -25,8 +24,8 @@ class CRUDUserLabel(CRUDBase):
res = await self.find_many(db, where)
return res
async def get_list(self, db: AsyncIOMotorDatabase, project_id: str):
where = {'project_id': project_id}
async def get_list(self, db: AsyncIOMotorDatabase, game: str):
where = {'game': game}
res = await self.find_many(db, where, {'qp': 0})
return res

View File

@ -25,12 +25,14 @@ from db.redisdb import get_redis_pool, RedisDrive
class UserClusterDef:
def __init__(self, game: str, cluster_name: str, data_where: list = None,rdb: RedisDrive = get_redis_pool()):
def __init__(self, game: str, cluster_name: str, data_where: list = None, rdb: RedisDrive = get_redis_pool(),
**kwargs):
self.game = game
self.rdb = rdb
self.cluster_name = cluster_name
self.event_tbl = None
self.data_where = data_where or []
self.kwargs = kwargs
async def _init_tal(self):
res_json = await self.rdb.get(f'{self.game}_event')
@ -49,7 +51,8 @@ class UserClusterDef:
async def init(self):
self.data_in = (await crud.user_label.find_one(get_database(), {'cluster_name':self.cluster_name}, {'qp': 1})).get('qp')
self.data_in = (
await crud.user_label.find_one(get_database(), {'cluster_name': self.cluster_name}, {'qp': 1})).get('qp')
await self._init_tal()
self.events = self.data_in['user_cluster_def']['events']
self.event_relation = self.data_in['user_cluster_def']['event_relation']
@ -137,6 +140,7 @@ class UserClusterDef:
date_type = event.get('date_type', 'dynamic')
e_days = event.get('e_days')
s_days = event.get('s_days')
is_touch = event.get('is_touch',True)
filts = event['filts']
zone = event.get('zone', 8)
@ -175,14 +179,20 @@ class UserClusterDef:
if event_name != '*':
# 任意事件
event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name))
selectd = [self.account_id_col,
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label(
'values')
]
qry_tmp = sa.select(self.account_id_col).select_from(
sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by(
self.e_account_id_col).having(
settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num)))
if quota != '*':
selectd = [self.account_id_col,
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label(
'values')
]
qry_tmp = sa.select(self.account_id_col).select_from(
sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by(
self.e_account_id_col).having(
settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num)))
else:
selectd = [self.account_id_col]
qry_tmp = sa.select(self.account_id_col).select_from(
sa.select(selectd).where(*date_where, *event_name_where, *data_where))
if qry is None:
qry = qry_tmp
else:
@ -191,6 +201,9 @@ class UserClusterDef:
sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id')))
elif self.event_relation == 'or':
qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp))
# 处理没做过
if not is_touch:
qry = sa.select(self.u_account_id_col).where(self.u_account_id_col.notin_(qry))
return qry
@ -200,16 +213,21 @@ class UserClusterDef:
print(sql)
return sql
def cluster_user(self):
def cluster_user_list(self):
sub_qry = self.to_sql_qry()
qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry))
page = self.kwargs.get('page') or 1
page -= 1
limit = self.kwargs.get('limit', 50)
qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)).order_by(sa.Column('#reg_time')) \
.offset(page * limit) \
.limit(limit)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql
def cluster_user_count(self):
sub_qry = self.to_sql_qry()
qry= sa.select(func.count()).select(sub_qry)
qry = sa.select(func.count(self.account_id_col).label('values')).select_from(sub_qry)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql

View File

@ -130,16 +130,19 @@ class XAnalysis:
quota = self.event_view['quota']
select_ltv = []
sumpay = []
sum_money = []
for i in range(1, days + 2):
# select_ltv.append(func.round(sa.Column(f'sumpay_{i}') / sa.Column('cnt1'), 2).label(f'LTV{i}'))
select_ltv.append(
f"if(dateDiff('day', reg.date, now())<{i - 1}, '-',toString(round(sumpay_{i} / cnt1, 2))) AS LTV{i}")
sumpay.append(f"sum(if(dateDiff('day', a.date, b.date) < {i}, money, 0)) as sumpay_{i}")
sum_money.append(f"sumpay_{i}")
# qry = sa.select(*select_ltv)
# select_ltv_str = str(qry.compile(compile_kwargs={"literal_binds": True}))
# select_ltv_str = select_ltv_str.split('SELECT ')[1]
sumpay_str = ','.join(sumpay)
select_ltv_str = ','.join(select_ltv)
sum_money_str = ','.join(sum_money)
where = [
sa.Column('date') >= self.event_view['startTime'].split(' ')[0],
@ -165,7 +168,8 @@ class XAnalysis:
sql = f"""SELECT reg.date as date,
cnt1,
{select_ltv_str}
{select_ltv_str},
{sum_money_str}
FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`{quota}`) cnt1
FROM {self.game}.event
where `#event_name` = 'create_account'

View File

@ -1,4 +1,4 @@
from typing import Union
from typing import Union, List
from typing import Optional
from pydantic import BaseModel
@ -10,7 +10,7 @@ from pydantic import BaseModel
class UserLabelSave(BaseModel):
project_id: str
# project_id: str
cluster_name: str
display_name: str
qp: dict
@ -35,7 +35,17 @@ class UserLabelDel(BaseModel):
# remarks: str
class UserLabelJson2Sql(BaseModel):
label_id: str
cluster_name: str
class ReadClusterUser(BaseModel):
cluster_name: str
page: int = 1
limit: int = 50
class UserLabelCopy(BaseModel):
label_id_list: List[str]
to_game: str
class UserLabelRead(BaseModel):