diff --git a/api/api_v1/endpoints/xquery.py b/api/api_v1/endpoints/xquery.py index 36ea713..bbc3f9d 100644 --- a/api/api_v1/endpoints/xquery.py +++ b/api/api_v1/endpoints/xquery.py @@ -51,26 +51,43 @@ async def ltv_model_sql( res = analysis.ltv_model_sql() sql = res['sql'] quota = res['quota'] + ltv_len = res['ltv_len'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-1, msg='查无数据') df.fillna(0, inplace=True) for d in set(res['date_range']) - set(df['date']): - df.loc[len(df)] = '0' + df.loc[len(df)] = 0 df.loc[len(df) - 1, 'date'] = d days = (pd.Timestamp.now().date() - d).days - if days + 2 >= res['ltv_len']: + if days + 2 >= ltv_len: continue df.loc[len(df) - 1, f'LTV{days + 2}':] = '-' df.sort_values('date', inplace=True) + df.rename(columns={'date': '注册日期'}, inplace=True) - if quota == '#account_id': - df.rename(columns={'cnt1': '角色数'}, inplace=True) - elif quota in ('#distinct_id',): - df.rename(columns={'cnt1': '设备数'}, inplace=True) + cat = '角色数' + if quota == '#distinct_id': + cat = '设备数' + df.rename(columns={'cnt1': cat}, inplace=True) + + df1 = df[['注册日期', cat, *[f'LTV{i}' for i in range(1, ltv_len + 1)]]] + df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in range(1, ltv_len + 1)]]] + df2.replace('-', 0, inplace=True) + avg_ltv = (df2[[f'sumpay_{i}' for i in range(1, ltv_len + 1)]].sum() / df2[cat].sum()).round(2) + df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] + df1.insert(2, '累计LTV', 0) + last_ltv = [] + for items in df1.values: + for item in items[::-1]: + if item != '-': + last_ltv.append(item) + break + df1['累计LTV'] = last_ltv + data = { - 'title': df.columns.tolist(), - 'rows': df.values.tolist(), + 'title': df1.columns.tolist(), + 'rows': df1.values.tolist(), 'start_date': res['start_date'], 'end_date': res['end_date'] } diff --git a/api/api_v1/user_label/controller.py b/api/api_v1/user_label/controller.py index 4e807ec..82e04a3 100644 --- a/api/api_v1/user_label/controller.py +++ b/api/api_v1/user_label/controller.py @@ -23,13 +23,13 @@ async def save(request: Request, @router.get("/list") async def get_list(request: Request, - project_id: str, + # project_id: str, game: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """读取项目保存的用户标签""" - data = await service.get_list(db, project_id) + data = await service.get_list(db, game) return schemas.Msg(code=0, msg='ok', data=data) @@ -47,11 +47,11 @@ async def get_detail(request: Request, @router.post("/del") async def delete(request: Request, - game: str, - data_id: schemas.UserLabelDel, - db: AsyncIOMotorDatabase = Depends(get_database), - current_user: schemas.UserDB = Depends(deps.get_current_user) - ) -> schemas.Msg: + game: str, + data_id: schemas.UserLabelDel, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: """删除用户标签""" data = await service.delete(db, data_id.label_id) return schemas.Msg(code=0, msg='ok', data=data) @@ -65,27 +65,40 @@ async def sql(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """自定义用户标签 sql测试""" - data = await service.json2sql(game, data_in.label_id) + data = await service.json2sql(game, data_in.cluster_name) return schemas.Msg(code=0, msg='ok', data=data) @router.post("/cluster_user_list") -async def cluster_user(request: Request, - data_in: schemas.UserLabelJson2Sql, - game: str, - # db: AsyncIOMotorDatabase = Depends(get_database), - current_user: schemas.UserDB = Depends(deps.get_current_user) - ) -> schemas.Msg: - """获取该标签用户""" - data = await service.get_cluster_user(game, data_in.label_id) +async def cluster_user_list(request: Request, + game: str, + data_id: schemas.ReadClusterUser, + + # db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """获取该标签用户列表""" + data = await service.get_cluster_user(game, data_id.cluster_name, data_id.page, data_id.limit) return schemas.Msg(code=0, msg='ok', data=data) + @router.post("/cluster_user_count") async def cluster_user_count(request: Request, - data_in: schemas.UserLabelJson2Sql, - game: str, - current_user: schemas.UserDB = Depends(deps.get_current_user) - ) -> schemas.Msg: + data_in: schemas.UserLabelJson2Sql, + game: str, + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: """获取该标签用户数量""" - data = await service.get_cluster_user_count(game, data_in.label_id) + data = await service.get_cluster_user_count(game, data_in.cluster_name) return schemas.Msg(code=0, msg='ok', data=data) + + +@router.post("/copy") +async def copy(request: Request, + data_in: schemas.UserLabelCopy, + game: str, + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """复制标签到其他项目""" + await service.copy_to(data_in.to_game, data_in.label_id_list, request.user.usernam) + return schemas.Msg(code=0, msg='ok') diff --git a/api/api_v1/user_label/service.py b/api/api_v1/user_label/service.py index c587b37..ebe91a9 100644 --- a/api/api_v1/user_label/service.py +++ b/api/api_v1/user_label/service.py @@ -2,6 +2,8 @@ import pandas as pd import numpy as np import crud +import schemas +from db import get_database from db.ckdb import get_ck_db from models.user_label import UserClusterDef @@ -14,8 +16,8 @@ async def read(db, data_in): return await crud.user_label.read(db, data_in) -async def get_list(db, project_id): - return await crud.user_label.get_list(db, project_id) +async def get_list(db, game): + return await crud.user_label.get_list(db, game) async def get_detail(db, label_id): @@ -33,11 +35,17 @@ async def json2sql(game, date_in): return user_cluster_def.to_sql() -async def get_cluster_user(game, date_in): - user_cluster_def = UserClusterDef(game, date_in) +async def get_cluster_user(game, cluster_name, page, limit): + user_cluster_def = UserClusterDef(game, cluster_name, page=page, limit=limit) await user_cluster_def.init() - sql = user_cluster_def.cluster_user() - return sql + sql = user_cluster_def.cluster_user_list() + ckdb = get_ck_db() + df = await ckdb.query_dataframe(sql) + df.fillna(0, inplace=True) + return { + 'columns': df.columns.tolist(), + 'values': df.values.tolist() + } async def get_cluster_user_count(game, date_in): @@ -46,3 +54,13 @@ async def get_cluster_user_count(game, date_in): sql = user_cluster_def.cluster_user_count() ckdb = get_ck_db() df = await ckdb.query_dataframe(sql) + return {'num': int(df.loc[0, 'values'])} + + +async def copy_to(to_game, ids, act_name): + db = get_database() + docs = await crud.user_label.find_ids(db, *ids) + for item in docs: + data = schemas.UserLabelSave(**item) + await crud.user_label.save(db, data, act_name, to_game) + return True diff --git a/crud/user_label.py b/crud/user_label.py index 8330836..7d6c005 100644 --- a/crud/user_label.py +++ b/crud/user_label.py @@ -10,8 +10,7 @@ from utils import get_uid class CRUDUserLabel(CRUDBase): async def save(self, db: AsyncIOMotorDatabase, data_in: schemas.UserLabelSave, act_name, game): - where = {'project_id': data_in.project_id, 'cluster_name': data_in.cluster_name, - 'game': game} + where = {'cluster_name': data_in.cluster_name, 'game': game} is_exists = await self.find_one(db, where) data = data_in.dict(skip_defaults=True) data['act_name'] = act_name @@ -25,8 +24,8 @@ class CRUDUserLabel(CRUDBase): res = await self.find_many(db, where) return res - async def get_list(self, db: AsyncIOMotorDatabase, project_id: str): - where = {'project_id': project_id} + async def get_list(self, db: AsyncIOMotorDatabase, game: str): + where = {'game': game} res = await self.find_many(db, where, {'qp': 0}) return res diff --git a/models/user_label.py b/models/user_label.py index a6eddd6..104ddb0 100644 --- a/models/user_label.py +++ b/models/user_label.py @@ -25,12 +25,14 @@ from db.redisdb import get_redis_pool, RedisDrive class UserClusterDef: - def __init__(self, game: str, cluster_name: str, data_where: list = None,rdb: RedisDrive = get_redis_pool()): + def __init__(self, game: str, cluster_name: str, data_where: list = None, rdb: RedisDrive = get_redis_pool(), + **kwargs): self.game = game self.rdb = rdb self.cluster_name = cluster_name self.event_tbl = None self.data_where = data_where or [] + self.kwargs = kwargs async def _init_tal(self): res_json = await self.rdb.get(f'{self.game}_event') @@ -49,7 +51,8 @@ class UserClusterDef: async def init(self): - self.data_in = (await crud.user_label.find_one(get_database(), {'cluster_name':self.cluster_name}, {'qp': 1})).get('qp') + self.data_in = ( + await crud.user_label.find_one(get_database(), {'cluster_name': self.cluster_name}, {'qp': 1})).get('qp') await self._init_tal() self.events = self.data_in['user_cluster_def']['events'] self.event_relation = self.data_in['user_cluster_def']['event_relation'] @@ -137,6 +140,7 @@ class UserClusterDef: date_type = event.get('date_type', 'dynamic') e_days = event.get('e_days') s_days = event.get('s_days') + is_touch = event.get('is_touch',True) filts = event['filts'] zone = event.get('zone', 8) @@ -175,14 +179,20 @@ class UserClusterDef: if event_name != '*': # 任意事件 event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name)) - selectd = [self.account_id_col, - func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label( - 'values') - ] - qry_tmp = sa.select(self.account_id_col).select_from( - sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( - self.e_account_id_col).having( - settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num))) + if quota != '*': + selectd = [self.account_id_col, + func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label( + 'values') + ] + qry_tmp = sa.select(self.account_id_col).select_from( + sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( + self.e_account_id_col).having( + settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num))) + else: + selectd = [self.account_id_col] + qry_tmp = sa.select(self.account_id_col).select_from( + sa.select(selectd).where(*date_where, *event_name_where, *data_where)) + if qry is None: qry = qry_tmp else: @@ -191,6 +201,9 @@ class UserClusterDef: sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id'))) elif self.event_relation == 'or': qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp)) + # 处理没做过 + if not is_touch: + qry = sa.select(self.u_account_id_col).where(self.u_account_id_col.notin_(qry)) return qry @@ -200,16 +213,21 @@ class UserClusterDef: print(sql) return sql - def cluster_user(self): + def cluster_user_list(self): sub_qry = self.to_sql_qry() - qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)) + page = self.kwargs.get('page') or 1 + page -= 1 + limit = self.kwargs.get('limit', 50) + qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)).order_by(sa.Column('#reg_time')) \ + .offset(page * limit) \ + .limit(limit) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return sql def cluster_user_count(self): sub_qry = self.to_sql_qry() - qry= sa.select(func.count()).select(sub_qry) + qry = sa.select(func.count(self.account_id_col).label('values')).select_from(sub_qry) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return sql diff --git a/models/x_analysis.py b/models/x_analysis.py index fb43c15..56c445d 100644 --- a/models/x_analysis.py +++ b/models/x_analysis.py @@ -130,16 +130,19 @@ class XAnalysis: quota = self.event_view['quota'] select_ltv = [] sumpay = [] + sum_money = [] for i in range(1, days + 2): # select_ltv.append(func.round(sa.Column(f'sumpay_{i}') / sa.Column('cnt1'), 2).label(f'LTV{i}')) select_ltv.append( f"if(dateDiff('day', reg.date, now())<{i - 1}, '-',toString(round(sumpay_{i} / cnt1, 2))) AS LTV{i}") sumpay.append(f"sum(if(dateDiff('day', a.date, b.date) < {i}, money, 0)) as sumpay_{i}") + sum_money.append(f"sumpay_{i}") # qry = sa.select(*select_ltv) # select_ltv_str = str(qry.compile(compile_kwargs={"literal_binds": True})) # select_ltv_str = select_ltv_str.split('SELECT ')[1] sumpay_str = ','.join(sumpay) select_ltv_str = ','.join(select_ltv) + sum_money_str = ','.join(sum_money) where = [ sa.Column('date') >= self.event_view['startTime'].split(' ')[0], @@ -165,7 +168,8 @@ class XAnalysis: sql = f"""SELECT reg.date as date, cnt1, - {select_ltv_str} + {select_ltv_str}, + {sum_money_str} FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`{quota}`) cnt1 FROM {self.game}.event where `#event_name` = 'create_account' diff --git a/schemas/userlabel.py b/schemas/userlabel.py index 4b2edbe..05acf90 100644 --- a/schemas/userlabel.py +++ b/schemas/userlabel.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import Union, List from typing import Optional from pydantic import BaseModel @@ -10,7 +10,7 @@ from pydantic import BaseModel class UserLabelSave(BaseModel): - project_id: str + # project_id: str cluster_name: str display_name: str qp: dict @@ -35,7 +35,17 @@ class UserLabelDel(BaseModel): # remarks: str class UserLabelJson2Sql(BaseModel): - label_id: str + cluster_name: str + +class ReadClusterUser(BaseModel): + cluster_name: str + page: int = 1 + limit: int = 50 + + +class UserLabelCopy(BaseModel): + label_id_list: List[str] + to_game: str class UserLabelRead(BaseModel):