From 713ed2d7e22b8de4fb40e18856548a92d1387a87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BC=9F?= <250213850@qq.com> Date: Mon, 20 Dec 2021 17:50:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0LTV=E5=85=A8=E5=B1=80?= =?UTF-8?q?=E7=AD=9B=E9=80=89=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api_v1/endpoints/project.py | 26 ++++++- api/api_v1/endpoints/query.py | 117 ++++++++++++++++++++++++++++++-- api/api_v1/endpoints/xquery.py | 69 ++++++++++++++++--- crud/__init__.py | 3 +- crud/crud_project_number.py | 33 +++++++++ schemas/__init__.py | 3 +- schemas/project.py | 2 +- schemas/project_number.py | 17 +++++ utils/func.py | 8 +++ 9 files changed, 257 insertions(+), 21 deletions(-) create mode 100644 crud/crud_project_number.py create mode 100644 schemas/project_number.py diff --git a/api/api_v1/endpoints/project.py b/api/api_v1/endpoints/project.py index 8a284fd..e34ccc2 100644 --- a/api/api_v1/endpoints/project.py +++ b/api/api_v1/endpoints/project.py @@ -5,7 +5,6 @@ from motor.motor_asyncio import AsyncIOMotorDatabase import crud, schemas from api import deps from core.config import settings - from db import get_database from db.ckdb import CKDrive, get_ck_db from schemas.project import ProjectCreate @@ -25,6 +24,7 @@ async def create( """创建项目""" try: res_project = await crud.project.create(db, data_in, current_user=request.user) + await crud.project_number.createxiangmu(db, data_in) except pymongo.errors.DuplicateKeyError: return schemas.Msg(code=-1, msg='项目名已存在', data='项目名已存在') @@ -74,8 +74,28 @@ async def read_project(request: Request, resp = await crud.project.get_my_game(db, game_list) return schemas.Msg(code=0, msg='ok', data=resp) - - +#获取项目名和渠道名project_name +@router.get("/project_name") +async def project_name(request: Request, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ): + if request.user.username == 'root': + res = await crud.project_number.all_xiangmu(db) + for i in res: + i['_id'] = str(i['_id']) + return schemas.Msg(code=0,msg='ok',data=res) +#添加项目名,渠道名 +@router.post("/add_project_name") +async def add_project_name(request: Request, + data_in: schemas.ProjectnumberInsert, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user)): + #插入数据 + #await crud.project_number.create(db, data_in) + #修改数据 + await crud.project_number.update(db, data_in) + return schemas.Msg(code=0, msg='修改成功', data=True) @router.get("/detail") async def detail(request: Request, game: str, diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 0bb43be..533a518 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -321,6 +321,112 @@ async def retention_model(request: Request, for d in set(res['date_range']) - set(df.index): df.loc[d] = 0 + df.sort_index(inplace=True) + summary_values = {'均值': {}} + max_retention_n = 1 + #留存人数 + avg = {} + #流失人数 + avgo = {} + for date, v in df.T.items(): + #字典中data存在时不替换,否则将data替换成空字典 + tmp = summary_values.setdefault(date, dict()) + tmp['d0'] = int(v.cnt0) + tmp['p'] = [] + tmp['n'] = [] + tmp['p_outflow'] = [] + tmp['n_outflow'] = [] + for i in retention_n: + n = (pd.Timestamp.now().date() - date).days + if i > n: + continue + # max_retention_n = i if i > max_retention_n else max_retention_n + #留存的人数 + avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}'] + #流失的人数 + avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}'] + tmp['p'].append(v[f'p{i}']) + tmp['n'].append(v[f'cnt{i}']) + tmp['p_outflow'].append(v[f'op{i}']) + tmp['n_outflow'].append(v[f'on{i}']) + tmp = summary_values['均值'] + retention_avg_dict = {} + + for rn in retention_n: + for rt, rd in df.T.items(): + if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date(): + retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0,'o_cnt0':0,'o_cntn':0}) + retention_avg_dict[rn]['cnt0'] += rd['cnt0'] + retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}'] + retention_avg_dict[rn]['o_cnt0'] += rd['cnt0'] + retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}'] + + tmp['p'] = [] + tmp['n'] = [] + tmp['p_outflow'] = [] + tmp['n_outflow'] = [] + tmp['d0'] = 0 + for rt, rd in retention_avg_dict.items(): + tmp['d0'] = int(df['cnt0'].sum()) + n = round(rd['cntn'] * 100 / rd['cnt0'],2) + n = 0 if np.isnan(n) else n + tmp['p'].append(n) + tmp['n'].append(rd['cntn']) + n = round(rd['o_cntn'] * 100 / rd['cnt0'],2) + n = 0 if np.isnan(n) else n + tmp['p_outflow'].append(n) + tmp['n_outflow'].append(rd['o_cntn']) + + #次留数 + title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] + + # 未到达的日期需要补齐- + retention_length = len(retention_n) + for _, items in summary_values.items(): + for key in ['p', 'n', 'p_outflow', 'n_outflow']: + items[key].extend(['-'] * (retention_length - len(items[key]))) + + resp = { + 'summary_values': summary_values, + # 'values': values, + 'date_range': [d.strftime('%Y-%m-%d') for d in date_range], + 'title': title, + 'filter_item_type': filter_item_type, + 'filter_item': filter_item, + 'start_date': res['start_date'], + 'end_date': res['end_date'], + 'time_particle': res['time_particle'] + + } + return schemas.Msg(code=0, msg='ok', data=resp) + +#计算流失率 +# retention_model +# retention_model_loss +#@router.post("/retention_model") +async def retention_model01(request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + db: AsyncIOMotorDatabase = Depends(get_database), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + await analysis.init(data_where=current_user.data_where) + res = await analysis.retention_model_sql2() #初始化开始时间结束时间,sql语句 字典 + sql = res['sql'] #获取到sql语句 + df = await ckdb.query_dataframe(sql) + if df.empty: + return schemas.Msg(code=-9, msg='无数据', data=None) + + date_range = res['date_range'] #时间 列表 + unit_num = res['unit_num'] #int + retention_n = res['retention_n'] #列表 int + filter_item_type = res['filter_item_type'] #all + filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30 + df.set_index('reg_date', inplace=True) + for d in set(res['date_range']) - set(df.index): + df.loc[d] = 0 + df.sort_index(inplace=True) summary_values = {'均值': {}} max_retention_n = 1 @@ -341,7 +447,8 @@ async def retention_model(request: Request, # max_retention_n = i if i > max_retention_n else max_retention_n avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}'] avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}'] - tmp['p'].append(v[f'p{i}']) + tmp['p'].append(round(100-v[f'p{i}'],2)) + #tmp['p'].append(v[f'p{i}']) tmp['n'].append(v[f'cnt{i}']) tmp['p_outflow'].append(v[f'op{i}']) tmp['n_outflow'].append(v[f'on{i}']) @@ -365,7 +472,8 @@ async def retention_model(request: Request, tmp['d0'] = 0 for rt, rd in retention_avg_dict.items(): tmp['d0'] = int(df['cnt0'].sum()) - n = round(rd['cntn'] * 100 / rd['cnt0'],2) + n = round(100-(rd['cntn'] * 100 / rd['cnt0']), 2) + #n = round(rd['cntn'] * 100 / rd['cnt0'],2) n = 0 if np.isnan(n) else n tmp['p'].append(n) tmp['n'].append(rd['cntn']) @@ -375,7 +483,7 @@ async def retention_model(request: Request, tmp['n_outflow'].append(rd['o_cntn']) - title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] + title = ['日期', '用户数', '次流失', *[f'{i + 1}流失' for i in retention_n[1:]]] # 未到达的日期需要补齐- retention_length = len(retention_n) @@ -398,6 +506,7 @@ async def retention_model(request: Request, return schemas.Msg(code=0, msg='ok', data=resp) + @router.post("/retention_model_export") async def retention_model_export(request: Request, game: str, @@ -899,7 +1008,7 @@ async def scatter_model( for i in labels: v +=1 if int(i) == 1: - labels_dict01["1"]=labels_dict["1"] + labels_dict01["1"]=labels_dict['1'] else: # for number in labels_dict.keys(): # if number >=i: diff --git a/api/api_v1/endpoints/xquery.py b/api/api_v1/endpoints/xquery.py index a872064..023d8a9 100644 --- a/api/api_v1/endpoints/xquery.py +++ b/api/api_v1/endpoints/xquery.py @@ -22,7 +22,7 @@ from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis from models.x_analysis import XAnalysis -from utils import DfToStream +from utils import DfToStream, get_bijiao router = APIRouter() @@ -50,12 +50,35 @@ async def ltv_model_sql( ) -> schemas.Msg: """ ltv模型sql """ await analysis.init(data_where=current_user.data_where) - res = analysis.ltv_model_sql() sql = res['sql'] + #仅一条筛选条件则是把GM过滤后获取全部数据 + if len(analysis.global_filters)==1 and analysis.global_filters[0]['strftv']=='GM': + df = await ckdb.query_dataframe(sql) + #多条筛选条件则合成新的sql + else: + new_sql="""""" + #拆分sql + split_sql = sql.split('AND 1') + #获取每一条筛选条件 + for i in analysis.global_filters: + #剔除GM + if i['strftv'] != 'GM': + #获取筛选条件的包含关系 + bijiao=get_bijiao(i["comparator"]) + #获取筛选条件的值 + condition=tuple(i['ftv']) + #获取事件名 + columnName=i['columnName'] + dd = f""" AND {game}.event.{columnName} {bijiao} {condition}""" + new_sql+=dd + split_="""AND 1 """ + news_sql = split_sql[0] + split_+new_sql + split_sql[1] + split_+new_sql+ split_sql[2]+split_+split_sql[3] + df = await ckdb.query_dataframe(news_sql) + quota = res['quota'] #字段名 ltv_n = res['ltv_n'] - df = await ckdb.query_dataframe(sql) + #df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') df.fillna(0, inplace=True) #修改原对象,以0填补空缺值 @@ -70,7 +93,7 @@ async def ltv_model_sql( # df.sort_values('date', inplace=True) # 根据date进行倒叙排序 for d in set(res['date_range']) - set(df['date']): - #在有效日期最后一行补充行数据(值都为0),补充的行数为两个集合的差集长度 + #在有效日期最后一行补充行数据(值都为'-'),补充的行数为两个集合的差集长度 df.loc[len(df)] = '-' #在date此列补充多行数据(值为两个集合差集的子元素) df.loc[len(df) - 1, 'date'] = d @@ -90,9 +113,16 @@ async def ltv_model_sql( df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] df2.replace('-', 0, inplace=True) #True改变原数据,前面是需要替换的值,后面是替换后的值。 在原数据把下划线替换成0 #修改下面代码 + # 去除sumpay_1的值为0的列 + new_df2 = (df2.drop(df2[(df2.sumpay_2 == 0)].index)) + #为new_df2排序 + new_df2=new_df2.reset_index(drop=True) #求相差天数 - str_time = str(res['date_range'][0]) - split_time = str_time.split('-') + str_time =new_df2['注册日期'][0] + str_time01=str(str_time) + split_time = str_time01.split('-') + #str_time = str(res['date_range'][0]) + # split_time = str_time.split('-') now_time = time.strftime("%Y-%m-%d", time.localtime()) split_now_time = now_time.split('-') today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2])) @@ -102,14 +132,26 @@ async def ltv_model_sql( _listData = {} for i in ltv_n: if i <=newday: - avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) - + #计算均值 + #avgLtv = (new_df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / new_df2[cat][0:newday + 1 - i].sum()).round(2) + #12.20号计算LTV均值的时候分母包括当天未充值新增设备数,比剔除掉的计算值偏小 + avgLtv = (df2[[f'sumpay_{i}']][0:newday + 1 - i].sum() / df2[cat][0:newday + 1 - i].sum()).round(2) + #取出均值 new_avgLtv=str(avgLtv).split('\n')[0].split(' ') new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] if new_avgLtv01 == 'NaN': _listData[f'sumpay_{i}'] = '-' else: _listData[f'sumpay_{i}'] = new_avgLtv01 + + #原代码 + # avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) + # new_avgLtv=str(avgLtv).split('\n')[0].split(' ') + # new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] + # if new_avgLtv01 == 'NaN': + # _listData[f'sumpay_{i}'] = '-' + # else: + # _listData[f'sumpay_{i}'] = new_avgLtv01 else: _listData[f'sumpay_{i}']='-' avgLtvlist = pd.Series(_listData) @@ -117,12 +159,16 @@ async def ltv_model_sql( _listname=[] #计算总累计LTV最后一个值 for k, v in _listData.items(): - if v !=0: + if v != 0 or v!= '-': + # if v !=0: _listname.append(k) max_nmu=max(_listname) + #max_num = (new_df2[[max_nmu]].sum() / new_df2[cat].sum()).round(2) max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2) max_number=str(max_num[0]) df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] + #原代码 + #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] # avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] @@ -141,9 +187,10 @@ async def ltv_model_sql( #把列中累计LTV等于0的值改成'-' - df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' + #df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' #剔除行,列的累计LTV=='-'的剔除出去 - df3 = df1.drop(df1[(df1.累计LTV=='-')].index) + df3 = df1.drop(df1[(df1.LTV1 == '-')].index) + #df3 = df1.drop(df1[(df1.累计LTV=='-')].index) days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days df1.iloc[len(df1) - 1, days + 4:] = '-' diff --git a/crud/__init__.py b/crud/__init__.py index 2cef920..78b2e7b 100644 --- a/crud/__init__.py +++ b/crud/__init__.py @@ -13,4 +13,5 @@ from .crud_api_list import api_list from .crud_role import role from .crud_check_data import check_data from .user_label import user_label -from .select_map import select_map \ No newline at end of file +from .select_map import select_map +from .crud_project_number import project_number \ No newline at end of file diff --git a/crud/crud_project_number.py b/crud/crud_project_number.py new file mode 100644 index 0000000..b375841 --- /dev/null +++ b/crud/crud_project_number.py @@ -0,0 +1,33 @@ +from motor.motor_asyncio import AsyncIOMotorDatabase +import schemas +from crud.base import CRUDBase + +__all__ = 'project_number', + +from utils import get_uid + + +class CRUDProjectNumber(CRUDBase): + # 获取所有数据 + async def all_xiangmu(self, db: AsyncIOMotorDatabase): + return await self.find_many(db, {}) + + # 修改数据 + async def update(self, db: AsyncIOMotorDatabase, data_in: schemas.AddProjectnumber): + game = data_in.game + add_ditch = [] + for member in data_in.ditch: + add_ditch.append(member.dict()) + await self.update_one(db, {'game': game}, {'$set': {'ditch': add_ditch}}) + + # 插入数据 + async def create(self, db: AsyncIOMotorDatabase, data_in: schemas.ProjectnumberInsert): + # await self.update_one(db, {'xiangmu': data_in.xiangmu}, {'$set': data_in.dict()}, upsert=True) + await self.update_one(db, {data_in.game, data_in.ditch}, upsert=True) + + # 同步插入项目 + async def createxiangmu(self, db: AsyncIOMotorDatabase, data_in: schemas.ProjectnumberInsert): + await self.insert_one(db, data_in.dict()) + + +project_number = CRUDProjectNumber('project_number') diff --git a/schemas/__init__.py b/schemas/__init__.py index 61f46a7..e0dfbbf 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -17,4 +17,5 @@ from .api_list import * from .role import * from .check_data import * from .userlabel import * -from .select_map import * \ No newline at end of file +from .select_map import * +from .project_number import * \ No newline at end of file diff --git a/schemas/project.py b/schemas/project.py index 41845d4..99d4758 100644 --- a/schemas/project.py +++ b/schemas/project.py @@ -51,7 +51,7 @@ class ProjectDelMember(BaseModel): class ProjectCreate(ProjectBase): name: str = Field(..., title='项目名') game: str = Field(..., title='游戏代号') - + #qudao:str = Field(...,title='渠道') # 查询某个项目看板 class ProjectKanban(DBBase): diff --git a/schemas/project_number.py b/schemas/project_number.py new file mode 100644 index 0000000..6fda869 --- /dev/null +++ b/schemas/project_number.py @@ -0,0 +1,17 @@ +from pydantic import BaseModel +from typing import List + + +class ProjectnumberList(BaseModel): + main_channel: str + ditch: str + + +class ProjectnumberInsert(BaseModel): + game: str + ditch: List[ProjectnumberList] + name: str + +class AddProjectnumber(BaseModel): + game: str + ditch: List[ProjectnumberInsert] diff --git a/utils/func.py b/utils/func.py index 2f9f747..e920529 100644 --- a/utils/func.py +++ b/utils/func.py @@ -4,3 +4,11 @@ import time def get_uid(): return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:] + + +# 获取筛选条件的包含关系 +def get_bijiao(bijiao): + if bijiao == '==' or bijiao == 'in' or bijiao == 'like' or bijiao == 'is not null': + return "IN" + elif bijiao == '!=' or bijiao == 'not like' or bijiao == 'is null': + return 'NOT LIKE'