From 5c0e2bf60eeb709f04e3a6b3db50e15dfb2c0edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BC=9F?= <250213850@qq.com> Date: Mon, 13 Dec 2021 13:59:08 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9=E8=AE=BE=E5=A4=87LTV?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=E5=85=AC=E5=BC=8F=202.=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E5=A4=9A=E6=AC=A1=E4=BB=98=E8=B4=B9=E5=8D=A0=E6=AF=94=E5=85=AC?= =?UTF-8?q?=E5=BC=8F=203.=E4=BC=98=E5=8C=96=E8=B6=85=E9=99=90=E8=8C=83?= =?UTF-8?q?=E5=9B=B4=E7=9A=84=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98=204.?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=8B=E8=BD=BD=E5=A4=9A=E6=AC=A1=E4=BB=98?= =?UTF-8?q?=E8=B4=B9=E5=8D=A0=E6=AF=94=E6=95=B0=E6=8D=AE=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E7=99=BE=E5=88=86=E6=AF=94=E7=9A=84=E9=97=AE=E9=A2=98=205.?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9C=8B=E6=9D=BF=E8=AE=BE=E7=BD=AE=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E5=90=8E=E6=8F=90=E7=A4=BA=E6=96=87=E6=9C=AC=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api_v1/authz/authz.py | 3 +- api/api_v1/endpoints/query.py | 81 +++++++++++++++++++------ api/api_v1/endpoints/report.py | 3 +- api/api_v1/endpoints/xquery.py | 104 +++++++++++++++++++++++++-------- main.py | 3 +- models/x_analysis.py | 9 +-- 6 files changed, 154 insertions(+), 49 deletions(-) diff --git a/api/api_v1/authz/authz.py b/api/api_v1/authz/authz.py index e10abd4..64c833b 100644 --- a/api/api_v1/authz/authz.py +++ b/api/api_v1/authz/authz.py @@ -46,9 +46,10 @@ async def get_permissions_for_user_in_domain( """ 获取域内用户或角色的权限 """ - + #data为列表 data = casbin_enforcer.get_permissions_for_user_in_domain(data_in.role_id, data_in.game) paths = {i[2] for i in data} + #列表形式的coll_name all_api = await crud.api_list.all_api(db) for item in all_api: if item['path'] in paths: diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 40ea2f7..0bb43be 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -151,10 +151,11 @@ async def event_model( res = [] is_hide = [] - for idx, item in enumerate(sqls): + for idx, item in enumerate(sqls): #列出索引下标 if item.get('is_show') == False: is_hide.append(idx) - + #event_name:事件名,日充总额 + #format:float浮点型 q = { 'groups': [], 'values': [], @@ -167,7 +168,7 @@ async def event_model( 'end_date': item['end_date'], 'time_particle': item['time_particle'] } - # 处理组合问题 + # 处理组合问题,如combination_event不存在则跳过 if item.get('combination_event'): combination_event = CombinationEvent(res, item.get('combination_event'), item['format']) values, sum_, avg = combination_event.parse() @@ -181,13 +182,14 @@ async def event_model( break res.append(q) continue - + #sql语句 sql = item['sql'] groupby = item['groupby'] - date_range = item['date_range'] - q['date_range'] = date_range - df = await ckdb.query_dataframe(sql) - df.fillna(0, inplace=True) + date_range = item['date_range'] #获取的要查询的每一天的时间 + q['date_range'] = date_range #把要查询的时间加入q字典中 + df = await ckdb.query_dataframe(sql) #以sql语句查出数据,df是二维列表 + df.fillna(0, inplace=True)#以0填补空数据 + #获取第一矩阵的长度 if df.shape[0] == 0: df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)}) # continue @@ -214,6 +216,7 @@ async def event_model( if groupby and (set(groupby) & set(df)) == set(groupby): # 有分组 for group, df_group in df.groupby(groupby): + #在原数据上将索引重新转换为列,新索引的列删除 df_group.reset_index(drop=True, inplace=True) q['groups'].append(str(group)) @@ -238,7 +241,9 @@ async def event_model( concat_data = [] for i in set(date_range) - set(df['date']): concat_data.append((i, 0)) + #纵向拼接两个表 df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) + #在原数据上按data排序 df.sort_values('date', inplace=True) if len(df) >= 2: q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2) @@ -249,7 +254,9 @@ async def event_model( if last_value > 0: q['last_value'] = float(last_value) break + #求所有值的和 q['sum'].append(round(float(df['values'].sum()), 2)) + #求平均值 q['avg'].append(round(float(df['values'].mean()), 2)) # q['eventNameDisplay']=item['event_name_display'] @@ -257,7 +264,7 @@ async def event_model( # 按总和排序 for item in res: try: - if item['time_particle'] in ('P1D', 'P1W'): + if item['time_particle'] in ('P1D', 'P1W'): #按格式修改年月日 item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']] elif item['time_particle'] in ('P1M',): item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']] @@ -266,7 +273,7 @@ async def event_model( except: pass - sort_key = np.argsort(np.array(item['sum']))[::-1] + sort_key = np.argsort(np.array(item['sum']))[::-1]#将sum中的元素从小到大排列后的结果,提取其对应的索引,然后倒着输出到变量之中 if item.get('groups'): item['groups'] = np.array(item['groups'])[sort_key].tolist() item['values'] = np.array(item['values'])[sort_key].tolist() @@ -299,17 +306,17 @@ async def retention_model(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: await analysis.init(data_where=current_user.data_where) - res = await analysis.retention_model_sql2() - sql = res['sql'] + res = await analysis.retention_model_sql2() #初始化开始时间结束时间,sql语句 字典 + sql = res['sql'] #获取到sql语句 df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) - date_range = res['date_range'] - unit_num = res['unit_num'] - retention_n = res['retention_n'] - filter_item_type = res['filter_item_type'] - filter_item = res['filter_item'] + date_range = res['date_range'] #时间 列表 + unit_num = res['unit_num'] #int + retention_n = res['retention_n'] #列表 int + filter_item_type = res['filter_item_type'] #all + filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30 df.set_index('reg_date', inplace=True) for d in set(res['date_range']) - set(df.index): df.loc[d] = 0 @@ -320,6 +327,7 @@ async def retention_model(request: Request, avg = {} avgo = {} for date, v in df.T.items(): + #字典中data存在时不替换,否则将data替换成空字典 tmp = summary_values.setdefault(date, dict()) tmp['d0'] = int(v.cnt0) tmp['p'] = [] @@ -516,6 +524,7 @@ async def funnel_model( await analysis.init(data_where=current_user.data_where) res = await analysis.funnel_model_sql() sql = res['sql'] + #查询的时间 date_range = res['date_range'] cond_level = res['cond_level'] groupby = res['groupby'] @@ -530,6 +539,7 @@ async def funnel_model( for item in not_exists_level: key = key if isinstance(key, tuple) else (key,) concat_data.append((*key, item, 0)) + #合并数据 df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # df.set_index('date',inplace=True) @@ -539,7 +549,9 @@ async def funnel_model( return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level}) tmp = {'title': '总体'} + #以level分组后的和 tmp_df = df[['level', 'values']].groupby('level').sum() + #在原数据上对索引进行排序 tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() @@ -785,10 +797,18 @@ async def scatter_model( await analysis.init(data_where=current_user.data_where) res = await analysis.scatter_model_sql() sql = res['sql'] + + #查询买量渠道owner为kuaiyou3的日注册玩家等级分布 + # sql_list=sql.split("GROUP BY") + # sql01 = """and xiangsu.event.owner_name='kuaiyou3'GROUP BY""""" + # new_sql=sql_list[0]+sql01+sql_list[1] + + df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) df.fillna(0, inplace=True) + #转换数据类型为int df['values'] = df['values'].astype(int) interval_type = res['interval_type'] analysis = res['analysis'] @@ -866,14 +886,37 @@ async def scatter_model( dt = '合计' else: dt = key.strftime('%Y-%m-%d') + labels_dict = {} for key2, tmp_df2 in tmp_df.groupby('values'): label = str(key2) n = len(tmp_df2) labels_dict[label] = n - resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, - 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}} + #如需要2之后所有之和,则执行下面代码,返回值为字典的labels_dict01 + labels_dict01={} + v=-1 + for i in labels: + v +=1 + if int(i) == 1: + labels_dict01["1"]=labels_dict["1"] + else: + # for number in labels_dict.keys(): + # if number >=i: + values=list(labels_dict.values()) + n=sum(values[v:]) + labels_dict01[i]=n + #传入百分比数据 + list_p=[] + for i in labels: + number_int=round(labels_dict01.get(i, 0) * 100 / total, 2) + number_str=str(number_int)+'%' + list_p.append(number_str) + + resp['list'][dt] = {'总体': {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total, + 'p': list_p}} + # resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, + # 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}} return schemas.Msg(code=0, msg='ok', data=resp) # bins_s = pd.cut(tmp_df['values'], bins=bins, diff --git a/api/api_v1/endpoints/report.py b/api/api_v1/endpoints/report.py index dc7e4aa..ec31f8f 100644 --- a/api/api_v1/endpoints/report.py +++ b/api/api_v1/endpoints/report.py @@ -40,7 +40,8 @@ async def edit( """编辑报表""" res = await crud.report.update_one(db, {'_id': data_in.report_id, 'user_id': request.user.id}, {'$set': {'query': data_in.query, 'name': data_in.name, 'desc': data_in.desc}}) - if not res.matched_count: + #if not res.matched_count: + if res.matched_count: return schemas.Msg(code=-1, msg='只能报表所有者编辑') return schemas.Msg(code=0, msg='ok', data='编辑成功') diff --git a/api/api_v1/endpoints/xquery.py b/api/api_v1/endpoints/xquery.py index f0fe3c2..ff9a6c2 100644 --- a/api/api_v1/endpoints/xquery.py +++ b/api/api_v1/endpoints/xquery.py @@ -1,11 +1,14 @@ +import datetime import mimetypes from collections import defaultdict +import time from urllib.parse import quote import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase +from pandas import DataFrame from starlette.responses import StreamingResponse import crud, schemas @@ -50,50 +53,105 @@ async def ltv_model_sql( res = analysis.ltv_model_sql() sql = res['sql'] - quota = res['quota'] + quota = res['quota'] #字段名 ltv_n = res['ltv_n'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='查无数据') - df.fillna(0, inplace=True) + df.fillna(0, inplace=True) #修改原对象,以0填补空缺值 + + # for d in set(res['date_range']) - set(df['date']): # 时间的差集运算 最后为空 + # df.loc[len(df)] = 0 + # df.loc[len(df) - 1, 'date'] = d + # days = (pd.Timestamp.now().date() - d).days # 时间差 + # # if days + 2 >= ltv_len: + # # continue + # df.iloc[len(df) - 1, days + 3:] = '-' + # df.sort_values('date', inplace=True) # 根据date进行倒叙排序 + for d in set(res['date_range']) - set(df['date']): - df.loc[len(df)] = 0 + #在有效日期最后一行补充行数据(值都为0),补充的行数为两个集合的差集长度 + df.loc[len(df)] = '-' + #在date此列补充多行数据(值为两个集合差集的子元素) df.loc[len(df) - 1, 'date'] = d - days = (pd.Timestamp.now().date() - d).days - # if days + 2 >= ltv_len: - # continue - df.iloc[len(df) - 1, days+3:] = '-' + # days = (d-pd.Timestamp.now().date()).days + # # if days + 2 >= ltv_len: + # # continue + # if days>0: + # df.iloc[len(df) - 1, 1:] = '-' + df.sort_values('date', inplace=True) - - df.rename(columns={'date': '注册日期'}, inplace=True) + df.rename(columns={'date': '注册日期'}, inplace=True) #True为将结果返回赋值给原变量,修改原对象,columns为列名 cat = '角色数' - if quota == '#distinct_id': + if quota == '#distinct_id': #如果字段名=字段名 cat = '设备数' - df.rename(columns={'cnt1': cat}, inplace=True) - - df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] + df.rename(columns={'cnt1': cat}, inplace=True) #原数据基础上修改df里面列名为cnt1为设备数 + df1 = df[['注册日期', cat, *[f'LTV{i}' for i in ltv_n]]] #1, 2, 3, 4, 5, 6, 7, 8, 9, ~~到360 df2 = df[['注册日期', cat, *[f'sumpay_{i}' for i in ltv_n]]] - df2.replace('-', 0, inplace=True) - avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) - df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] + df2.replace('-', 0, inplace=True) #True改变原数据,前面是需要替换的值,后面是替换后的值。 在原数据把下划线替换成0 + #修改下面代码 + #求相差天数 + str_time = str(res['date_range'][0]) + split_time = str_time.split('-') + now_time = time.strftime("%Y-%m-%d", time.localtime()) + split_now_time = now_time.split('-') + today = datetime.datetime(int(split_time[0]), int(split_time[1]), int(split_time[2])) + now_day = datetime.datetime(int(split_now_time[0]), int(split_now_time[1]), int(split_now_time[2])) + newday = (now_day - today).days + 1 + #计算方法运算每个LTV的均值 + _listData = {} + for i in ltv_n: + if i <=newday: + avgLtv=(df2[[f'sumpay_{i}']][0:newday+1-i].sum()/df2[cat][0:newday+1-i].sum()).round(2) + + new_avgLtv=str(avgLtv).split('\n')[0].split(' ') + new_avgLtv01=new_avgLtv[len(new_avgLtv)-1] + + _listData[f'sumpay_{i}'] = new_avgLtv01 + else: + _listData[f'sumpay_{i}']=0 + avgLtvlist = pd.Series(_listData) + _listname=[] + + #计算总累计LTV最后一个值 + for k, v in _listData.items(): + if v !=0: + _listname.append(k) + max_nmu=max(_listname) + max_num=(df2[[max_nmu]].sum()/df2[cat].sum()).round(2) + max_number=str(max_num[0]) + df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avgLtvlist] + + # avg_ltv = (df2[[f'sumpay_{i}' for i in ltv_n]].sum() / df2[cat].sum()).round(2) + #df1.loc[len(df1)] = ['均值', df2[cat].sum(), *avg_ltv] df1.insert(2, '累计LTV', 0) - - - last_ltv = [] for items in df1.values: for item in items[::-1]: if item != '-': last_ltv.append(item) break - df1['累计LTV'] = last_ltv - days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days + #修改累计LTV中最后一个值 + last_ltv[-1]=max_number + + df1['累计LTV'] = last_ltv + + + #把列中累计LTV等于0的值改成'-' + df1.loc[df1['累计LTV']==0, '累计LTV'] = '-' + #剔除行,列的累计LTV=='-'的剔除出去 + df3 = df1.drop(df1[(df1.累计LTV=='-')].index) + + days = (pd.Timestamp.now().date() - pd.to_datetime(res['start_date']).date()).days df1.iloc[len(df1) - 1, days + 4:] = '-' data = { - 'title': df1.columns.tolist(), - 'rows': df1.values.tolist(), + #'title': df1.columns.tolist(), + #'rows': df1.values.tolist(), + 'title': df3.columns.tolist(), + 'rows': df3.values.tolist(), + 'start_date': res['start_date'], 'end_date': res['end_date'] } diff --git a/main.py b/main.py index fd43eea..f743f87 100644 --- a/main.py +++ b/main.py @@ -115,4 +115,5 @@ async def add_process_time_header(request: Request, call_next): if __name__ == '__main__': - uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) + #uvicorn.run(app='main:app', host="10.0.0.240", port=7899, reload=True, debug=True) + uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) \ No newline at end of file diff --git a/models/x_analysis.py b/models/x_analysis.py index 6f91aee..059f3c7 100644 --- a/models/x_analysis.py +++ b/models/x_analysis.py @@ -32,7 +32,7 @@ class XAnalysis: self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) def _get_global_filters(self): - return self.event_view.get('filts') or [] + return self.event_view.get('filts') or [] #获取event_view字典里面filts的值,或返回空列表 async def init(self, *args, **kwargs): if self.data_in.report_id: @@ -152,11 +152,12 @@ class XAnalysis: ] if quota == '#distinct_id': where.append(sa.Column('is_new_device') == 1) + qry = sa.select().where(*where) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_str = sql.split('WHERE ')[1] - where_order = self.handler_filts((self.global_filters, self.global_relation)) + where_order = self.handler_filts((self.global_filters, self.global_relation)) #global_relation就是 and where_order_str = 1 if where_order: qry = sa.select().where(*where_order) @@ -176,9 +177,9 @@ class XAnalysis: {select_ltv_str}, {sum_money_str} FROM (SELECT toDate(addHours(`#event_time`, `#zone_offset`)) as date, uniqExact(`{quota}`) cnt1 - FROM {self.game}.event + FROM {self.game}.event where `#event_name` = 'create_account' - AND {where_str} AND {where_account_str} + AND {where_str} AND {where_account_str} GROUP BY toDate(addHours(`#event_time`, `#zone_offset`))) as reg left join (select a.date,