import datetime from collections import defaultdict import mimetypes from urllib.parse import quote import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from fastapi.encoders import jsonable_encoder from motor.motor_asyncio import AsyncIOMotorDatabase from fastapi.responses import StreamingResponse import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis, CombinationEvent from models.user_analysis import UserAnalysis from utils import DfToStream router = APIRouter() @router.post("/sql") async def query_sql( request: Request, game: str, data_in: schemas.Sql, ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """原 sql 查询 """ sql = data_in.sql sql = sql.replace('$game', game) data = await ckdb.execute(sql) return schemas.Msg(code=0, msg='ok', data=data) @router.post("/sql_export") async def query_sql( request: Request, game: str, data_in: schemas.Sql, ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """sql 导出 """ file_name = quote(f'result.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data_in.sql sql = sql.replace('$game', game) df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) df_to_stream = DfToStream((df, 'result')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) @router.post("/event_model_sql") async def event_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 事件分析模型 sql""" await analysis.init(data_where=current_user.data_where) data = await analysis.event_model_sql() return schemas.Msg(code=0, msg='ok', data=data) @router.post("/event_model_export") async def event_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ 事件分析模型 数据导出""" await analysis.init(data_where=current_user.data_where) sqls = await analysis.event_model_sql() file_name = quote(f'{sqls[0]["report_name"]}.xlsx') mime = mimetypes.guess_type(file_name)[0] excels = [] for item in sqls: if item.get('combination_event'): continue sql = item['sql'] event_name = item['event_name'] df = await ckdb.query_dataframe(sql) if df.empty: continue if 'date' in df: df.sort_values('date', inplace=True) try: df['date'] = df['date'].dt.tz_localize(None) except: pass excels.append((df, event_name)) df_to_stream = DfToStream(*excels) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) # # @router.get("/event_model_export") # async def event_model_export(request: Request, # game: str, # report_id: str, # ckdb: CKDrive = Depends(get_ck_db), # # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), # current_user: schemas.UserDB = Depends(deps.get_current_user) # ): # """ 事件分析模型 数据导出""" # analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool()) # await analysis.init(data_where=current_user.data_where) # sqls = analysis.event_model_sql() # res = [] # file_name = f'{sqls[0]["report_name"]}.xlsx' # mime = mimetypes.guess_type(file_name)[0] # for item in sqls[:1]: # sql = item['sql'] # event_name = item['event_name'] # df = await ckdb.query_dataframe(sql) # file = df_to_stream(df, event_name) # return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) # @router.post("/event_model") async def event_model( request: Request, game: str, data_in: schemas.CkQuery, ckdb: CKDrive = Depends(get_ck_db), rdb: RedisDrive = Depends(get_redis_pool), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 事件分析""" await analysis.init(data_where=current_user.data_where) sqls = await analysis.event_model_sql() res = [] is_hide = [] for idx, item in enumerate(sqls): #列出索引下标 if item.get('is_show') == False: is_hide.append(idx) #event_name:事件名,日充总额 #format:float浮点型 q = { 'groups': [], 'values': [], 'sum': [], 'avg': [], 'event_name': item['event_name'], 'format': item['format'], 'last_value': 0, 'start_date': item['start_date'], 'end_date': item['end_date'], 'time_particle': item['time_particle'] } # 处理组合问题,如combination_event不存在则跳过 if item.get('combination_event'): combination_event = CombinationEvent(res, item.get('combination_event'), item['format']) values, sum_, avg = combination_event.parse() # q['values'].append(values) #q['sum'].append(sum_) q['avg'].append(avg) q['date_range'] = item['date_range'] for last_value in values[::-1]: if last_value > 0: q['last_value'] = float(last_value) break for i in range(len(values)): values[i]=str(values[i])+'%' q['values'].append(values) q['sum'].append(str(sum_)+'%') res.append(q) continue #sql语句 sql = item['sql'] groupby = item['groupby'] date_range = item['date_range'] #获取的要查询的每一天的时间 q['date_range'] = date_range #把要查询的时间加入q字典中 df = await ckdb.query_dataframe(sql) #以sql语句查出数据,df是二维列表 df.fillna(0, inplace=True)#以0填补空数据 #获取第一矩阵的长度 if df.shape[0] == 0: df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)}) # continue # return schemas.Msg(code=0, msg='ok', data=[q]) if item['time_particle'] == 'total': # for group, df_group in df.groupby(groupby): # df_group.reset_index(drop=True, inplace=True) q['groups'].append(groupby) q['values'].append(df['values'].to_list()) q['sum'].append(round(float(df['values'].sum()), 2)) q['avg'].append(round(float(df['values'].mean()), 2)) for last_value in df['values'].values[::-1]: if last_value > 0: q['last_value'] = float(last_value) break if groupby and (set(groupby) & set(df) == set(groupby)): q['date_range'] = [f'{i}' for i in df.set_index(groupby).index] else: q['date_range'] = ['合计'] res.append(q) continue if groupby and (set(groupby) & set(df)) == set(groupby): # 有分组 for group, df_group in df.groupby(groupby): #在原数据上将索引重新转换为列,新索引的列删除 df_group.reset_index(drop=True, inplace=True) q['groups'].append(str(group)) concat_data = [] for i in set(date_range) - set(df_group['date']): if len(groupby) > 1: concat_data.append((i, *group, 0)) else: concat_data.append((i, group, 0)) df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)]) df_group.sort_values('date', inplace=True) q['values'].append(df_group['values'].to_list()) q['sum'].append(round(float(df_group['values'].sum()), 2)) q['avg'].append(round(float(df_group['values'].mean()), 2)) for last_value in df['values'].values[::-1]: if last_value > 0: q['last_value'] = float(last_value) break else: # 无分组 concat_data = [] for i in set(date_range) - set(df['date']): concat_data.append((i, 0)) #纵向拼接两个表 df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) #在原数据上按data排序 df.sort_values('date', inplace=True) if len(df) >= 2: q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2) if len(df) >= 8: q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0 q['values'].append(df['values'].to_list()) for last_value in df['values'].values[::-1]: if last_value > 0: q['last_value'] = float(last_value) break #求所有值的和 q['sum'].append(round(float(df['values'].sum()), 2)) #求平均值 q['avg'].append(round(float(df['values'].mean()), 2)) # q['eventNameDisplay']=item['event_name_display'] res.append(q) # 按总和排序 for item in res: try: if item['time_particle'] in ('P1D', 'P1W'): #按格式修改年月日 item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']] elif item['time_particle'] in ('P1M',): item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']] else: item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']] except: pass sort_key = np.argsort(np.array(item['sum']))[::-1]#将sum中的元素从小到大排列后的结果,提取其对应的索引,然后倒着输出到变量之中 if item.get('groups'): item['groups'] = np.array(item['groups'])[sort_key].tolist() item['values'] = np.array(item['values'])[sort_key].tolist() item['sum'] = np.array(item['sum'])[sort_key].tolist() item['avg'] = np.array(item['avg'])[sort_key].tolist() res = [item for idx, item in enumerate(res) if idx not in is_hide] return schemas.Msg(code=0, msg='ok', data=res) @router.post("/retention_model_sql") async def retention_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """留存查询 sql""" await analysis.init(data_where=current_user.data_where) data = await analysis.retention_model_sql2() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/retention_model") async def retention_model(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: await analysis.init(data_where=current_user.data_where) res = await analysis.retention_model_sql2() #初始化开始时间结束时间,sql语句 字典 sql = res['sql'] #获取到sql语句 df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) date_range = res['date_range'] #时间 列表 unit_num = res['unit_num'] #int retention_n = res['retention_n'] #列表 int filter_item_type = res['filter_item_type'] #all filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30 df.set_index('reg_date', inplace=True) for d in set(res['date_range']) - set(df.index): df.loc[d] = 0 df.sort_index(inplace=True) summary_values = {'均值': {}} max_retention_n = 1 #留存人数 avg = {} #流失人数 avgo = {} for date, v in df.T.items(): #字典中data存在时不替换,否则将data替换成空字典 tmp = summary_values.setdefault(date, dict()) tmp['d0'] = int(v.cnt0) tmp['p'] = [] tmp['n'] = [] tmp['p_outflow'] = [] tmp['n_outflow'] = [] for i in retention_n: n = (pd.Timestamp.now().date() - date).days if i > n: continue # max_retention_n = i if i > max_retention_n else max_retention_n #留存的人数 avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}'] #流失的人数 avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}'] tmp['p'].append(v[f'p{i}']) tmp['n'].append(v[f'cnt{i}']) tmp['p_outflow'].append(v[f'op{i}']) tmp['n_outflow'].append(v[f'on{i}']) tmp = summary_values['均值'] retention_avg_dict = {} for rn in retention_n: for rt, rd in df.T.items(): if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date(): retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0,'o_cnt0':0,'o_cntn':0}) retention_avg_dict[rn]['cnt0'] += rd['cnt0'] retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}'] retention_avg_dict[rn]['o_cnt0'] += rd['cnt0'] retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}'] tmp['p'] = [] tmp['n'] = [] tmp['p_outflow'] = [] tmp['n_outflow'] = [] tmp['d0'] = 0 for rt, rd in retention_avg_dict.items(): tmp['d0'] = int(df['cnt0'].sum()) n = round(rd['cntn'] * 100 / rd['cnt0'],2) n = 0 if np.isnan(n) else n tmp['p'].append(n) tmp['n'].append(rd['cntn']) n = round(rd['o_cntn'] * 100 / rd['cnt0'],2) n = 0 if np.isnan(n) else n tmp['p_outflow'].append(n) tmp['n_outflow'].append(rd['o_cntn']) #次留数 title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] # 未到达的日期需要补齐- retention_length = len(retention_n) for _, items in summary_values.items(): for key in ['p', 'n', 'p_outflow', 'n_outflow']: items[key].extend(['-'] * (retention_length - len(items[key]))) resp = { 'summary_values': summary_values, # 'values': values, 'date_range': [d.strftime('%Y-%m-%d') for d in date_range], 'title': title, 'filter_item_type': filter_item_type, 'filter_item': filter_item, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } return schemas.Msg(code=0, msg='ok', data=resp) async def retention_model01(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: await analysis.init(data_where=current_user.data_where) res = await analysis.retention_model_sql2() #初始化开始时间结束时间,sql语句 字典 sql = res['sql'] #获取到sql语句 df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) date_range = res['date_range'] #时间 列表 unit_num = res['unit_num'] #int retention_n = res['retention_n'] #列表 int filter_item_type = res['filter_item_type'] #all filter_item = res['filter_item'] #列表 0,1,3,7,14,21,30 df.set_index('reg_date', inplace=True) for d in set(res['date_range']) - set(df.index): df.loc[d] = 0 df.sort_index(inplace=True) summary_values = {'均值': {}} max_retention_n = 1 avg = {} avgo = {} for date, v in df.T.items(): #字典中data存在时不替换,否则将data替换成空字典 tmp = summary_values.setdefault(date, dict()) tmp['d0'] = int(v.cnt0) tmp['p'] = [] tmp['n'] = [] tmp['p_outflow'] = [] tmp['n_outflow'] = [] for i in retention_n: n = (pd.Timestamp.now().date() - date).days if i > n: continue # max_retention_n = i if i > max_retention_n else max_retention_n avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}'] avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}'] tmp['p'].append(round(100-v[f'p{i}'],2)) #tmp['p'].append(v[f'p{i}']) tmp['n'].append(v[f'cnt{i}']) tmp['p_outflow'].append(v[f'op{i}']) tmp['n_outflow'].append(v[f'on{i}']) tmp = summary_values['均值'] retention_avg_dict = {} for rn in retention_n: for rt, rd in df.T.items(): if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date(): retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0,'o_cnt0':0,'o_cntn':0}) retention_avg_dict[rn]['cnt0'] += rd['cnt0'] retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}'] retention_avg_dict[rn]['o_cnt0'] += rd['cnt0'] retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}'] tmp['p'] = [] tmp['n'] = [] tmp['p_outflow'] = [] tmp['n_outflow'] = [] tmp['d0'] = 0 for rt, rd in retention_avg_dict.items(): tmp['d0'] = int(df['cnt0'].sum()) n = round(100-(rd['cntn'] * 100 / rd['cnt0']), 2) #n = round(rd['cntn'] * 100 / rd['cnt0'],2) n = 0 if np.isnan(n) else n tmp['p'].append(n) tmp['n'].append(rd['cntn']) n = round(rd['o_cntn'] * 100 / rd['cnt0'],2) n = 0 if np.isnan(n) else n tmp['p_outflow'].append(n) tmp['n_outflow'].append(rd['o_cntn']) title = ['日期', '用户数', '次流失', *[f'{i + 1}流失' for i in retention_n[1:]]] # 未到达的日期需要补齐- retention_length = len(retention_n) for _, items in summary_values.items(): for key in ['p', 'n', 'p_outflow', 'n_outflow']: items[key].extend(['-'] * (retention_length - len(items[key]))) resp = { 'summary_values': summary_values, # 'values': values, 'date_range': [d.strftime('%Y-%m-%d') for d in date_range], 'title': title, 'filter_item_type': filter_item_type, 'filter_item': filter_item, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } return schemas.Msg(code=0, msg='ok', data=resp) @router.post("/retention_model_export") async def retention_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ 留存分析模型 数据导出""" await analysis.init(data_where=current_user.data_where) data = await analysis.retention_model_sql2() file_name = quote(f'留存分析.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) df_to_stream = DfToStream((df, '留存分析')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) @router.post("/retention_model_del", deprecated=True) async def retention_model_del( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """留存数据模型""" await analysis.init(data_where=current_user.data_where) res = await analysis.retention_model_sql() sql = res['sql'] date_range = res['date_range'] event_a, event_b = res['event_name'] unit_num = res['unit_num'] title = await crud.event_mana.get_show_name(db, game, event_a) title = f'{title}用户数' df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) concat_data = [] df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) df['date'] = df['date'].apply(lambda x: x.date()) # 计算整体 summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum() summary_values = {} for i, d1 in enumerate(date_range): a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set() if not a: continue key = d1.strftime('%Y-%m-%d') for j, d2 in enumerate(date_range[i:]): if j > unit_num: break b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set() tmp = summary_values.setdefault(key, {}) tmp.setdefault('d0', len(a)) tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) tmp.setdefault('n', []).append(len(a & b)) tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2)) tmp.setdefault('n_outflow', []).append(len(a) - len(a & b)) groups = set([tuple(i) for i in df[res['groupby']].values]) df.set_index(res['groupby'], inplace=True) df.sort_index(inplace=True) values = {} days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1] for i, d1 in enumerate(date_range): for g in groups: if len(g) == 1: continue a = set(df.loc[g]['val_a']) if g in df.index else set() if not a: continue key = d1.strftime("%Y-%m-%d") tmp_g = values.setdefault(key, {}) for j, d2 in enumerate(date_range[i:]): if j > unit_num: break b = set(df.loc[g]['val_b']) if g in df.index else set() tmp = tmp_g.setdefault(','.join(g[1:]), {}) tmp.setdefault('d0', len(a)) tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) tmp.setdefault('n', []).append(len(a & b)) data = { 'summary_values': summary_values, 'values': values, 'days': days, 'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1], 'title': title, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/funnel_model_sql") async def funnel_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """漏斗数据模型 sql""" await analysis.init(data_where=current_user.data_where) data = await analysis.funnel_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/funnel_model") async def funnel_model( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """漏斗数据模型""" await analysis.init(data_where=current_user.data_where) res = await analysis.funnel_model_sql() sql = res['sql'] #查询的时间 date_range = res['date_range'] cond_level = res['cond_level'] groupby = res['groupby'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) # 补齐level数据 concat_data = [] for key, tmp_df in df.groupby(['date'] + groupby): not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level']) for item in not_exists_level: key = key if isinstance(key, tuple) else (key,) concat_data.append((*key, item, 0)) #合并数据 df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # df.set_index('date',inplace=True) data_list = [] date_data = {} if df.shape == (0, 0): return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level}) tmp = {'title': '总体'} #以level分组后的和 tmp_df = df[['level', 'values']].groupby('level').sum() #在原数据上对索引进行排序 tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2)) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) data_list.append(tmp) # 补齐日期 all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)} concat_data = [] for i in all_idx - set(df.set_index(['date', 'level']).index): concat_data.append((*i, 0)) summary_df = pd.concat( [df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])]) for key, tmp_df in summary_df.groupby('date'): tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp = dict() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): var = round(v * 100 / tmp_df.loc[1, 'values'], 2) var = 0 if np.isnan(var) else var tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) _ = date_data.setdefault(key.strftime('%Y-%m-%d'), {}) _['总体'] = tmp if groupby: # 补齐数据 concat_data = [] idx = set(df.set_index(['date'] + groupby).index) all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx} for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index): concat_data.append((*i, 0)) df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False) for key, tmp_df in df.groupby(groupby): tmp = {'title': key} tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): var = round(v * 100 / tmp_df.loc[1, 'values'], 2) var = 0 if np.isnan(var) else var tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) data_list.append(tmp) for key, tmp_df in df.groupby(['date'] + groupby): tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp = dict() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): var = round(v * 100 / tmp_df.loc[1, 'values'], 2) var = 0 if np.isnan(var) else var tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) _ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {}) _[key[1]] = tmp title = (groupby or ['总体']) + cond_level resp = {'list': data_list, 'date_data': date_data, 'title': title, 'level': cond_level, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } return schemas.Msg(code=0, msg='ok', data=resp) @router.post("/scatter_model_sql") async def scatter_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """分布分析 sql""" await analysis.init(data_where=current_user.data_where) data = await analysis.scatter_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/scatter_model_export") async def retention_model_export(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """ 分布分析 数据导出""" await analysis.init(data_where=current_user.data_where) res = await analysis.scatter_model_sql() file_name = quote(f'分布分析.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = res['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) interval_type = res['interval_type'] analysis = res['analysis'] groupby = res['groupby'] quota_interval_arr = res['quota_interval_arr'] # 兼容合计的 if res['time_particle'] == 'total': df['date'] = '合计' if analysis != 'number_of_days' and interval_type != 'discrete': max_v = int(df['values'].max()) min_v = int(df['values'].min()) interval = (max_v - min_v) // 10 or 1 resp = {'list': dict(), 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } if not quota_interval_arr: resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)] bins = [i for i in range(min_v, max_v + interval, interval)] else: quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')] resp['label'] = [] bins = [quota_interval_arr[0]] for i, v in enumerate(quota_interval_arr[1:]): resp['label'].append(f'[{quota_interval_arr[i]},{v})') bins.append(v) # 这是整体的 for key, tmp_df in df.groupby('date'): bins_s = pd.cut(tmp_df['values'], bins=bins, right=False).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) if res['time_particle'] == 'total': resp['list']['合计'] = dict() resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': '总体'} else: resp['list'][key.strftime('%Y-%m-%d')] = dict() resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': '总体'} # 分组的 if groupby: export_df = pd.DataFrame(columns=resp['label']) for key, tmp_df in df.groupby(['date', *groupby]): bins_s = pd.cut(tmp_df['values'], bins=bins, right=False).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) title = '.'.join(key[1:]) date = key[0] resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': title } export_df.loc[(date.strftime('%Y-%m-%d'), title)] = bins_s.to_list() df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) # elif analysis == 'number_of_days': else: resp = {'list': {}, 'label': [], 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } labels = [str(i) for i in sorted(df['values'].unique())] resp['label'] = labels for key, tmp_df in df.groupby(['date']): total = len(tmp_df) if res['time_particle'] == 'total': dt = '合计' else: dt = key.strftime('%Y-%m-%d') labels_dict = {} for key2, tmp_df2 in tmp_df.groupby('values'): label = str(key2) n = len(tmp_df2) labels_dict[label] = n resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}} export_df = pd.DataFrame(columns=resp['label']) for d, v in resp['list'].items(): export_df.loc[d] = v['总体']['n'] df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) @router.post("/scatter_model") async def scatter_model( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """分布分析 模型""" await analysis.init(data_where=current_user.data_where) event_type = analysis.events[0]['eventName'] res = await analysis.scatter_model_sql() sql = res['sql'] #查询买量渠道owner为kuaiyou3的日注册玩家等级分布 # sql_list=sql.split("GROUP BY") # sql01 = """and xiangsu.event.owner_name='kuaiyou3'GROUP BY""""" # new_sql=sql_list[0]+sql01+sql_list[1] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) df.fillna(0, inplace=True) #转换数据类型为int df['values'] = df['values'].astype(int) interval_type = res['interval_type'] analysis = res['analysis'] groupby = res['groupby'] quota_interval_arr = res['quota_interval_arr'] # 兼容合计的 if res['time_particle'] == 'total': df['date'] = '合计' if analysis != 'number_of_days' and interval_type != 'discrete': max_v = int(df['values'].max()) min_v = int(df['values'].min()) interval = (max_v - min_v) // 10 or 1 resp = {'list': dict(), 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } if not quota_interval_arr: resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)] bins = [i for i in range(min_v, max_v + interval, interval)] else: quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')] resp['label'] = [] bins = [quota_interval_arr[0]] for i, v in enumerate(quota_interval_arr[1:]): resp['label'].append(f'[{quota_interval_arr[i]},{v})') bins.append(v) # 这是整体的 for key, tmp_df in df.groupby('date'): bins_s = pd.cut(tmp_df['values'], bins=bins, right=False).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) if res['time_particle'] == 'total': resp['list']['合计'] = dict() resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': '总体'} else: resp['list'][key.strftime('%Y-%m-%d')] = dict() resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': '总体'} # 分组的 if groupby: for key, tmp_df in df.groupby(['date', *groupby]): bins_s = pd.cut(tmp_df['values'], bins=bins, right=False).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) title = '.'.join(key[1:]) date = key[0] resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total, 'p': round((bins_s * 100 / total).fillna(0), 2).to_list(), 'title': title } return schemas.Msg(code=0, msg='ok', data=resp) # elif analysis == 'number_of_days': else: resp = {'list': {}, 'label': [], 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } labels = [str(i) for i in sorted(df['values'].unique())] resp['label'] = labels for key, tmp_df in df.groupby(['date']): total = len(tmp_df) if res['time_particle'] == 'total': dt = '合计' else: dt = key.strftime('%Y-%m-%d') labels_dict = {} for key2, tmp_df2 in tmp_df.groupby('values'): label = str(key2) n = len(tmp_df2) labels_dict[label] = n if event_type == 'pay': #如需要2之后所有之和,则执行下面代码,返回值为字典的labels_dict01 labels_dict01={} v=-1 for i in labels: v +=1 if int(i) == 1: labels_dict01["1"]=labels_dict["1"] else: # for number in labels_dict.keys(): # if number >=i: values=list(labels_dict.values()) n=sum(values[v:]) labels_dict01[i]=n #传入百分比数据 list_p=[] for i in labels: number_int=round(labels_dict01.get(i, 0) * 100 / total, 2) number_str=str(number_int)+'%' list_p.append(number_str) resp['list'][dt] = {'总体': {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total, 'p': list_p}} else: list_p=[] for i in labels: number_int=round(labels_dict.get(i, 0) * 100 / total, 2) number_str=str(number_int)+'%' list_p.append(number_str) resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total, 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}} return schemas.Msg(code=0, msg='ok', data=resp) # bins_s = pd.cut(tmp_df['values'], bins=bins, # right=False).value_counts() # bins_s.sort_index(inplace=True) # total = int(bins_s.sum()) # resp['list'][key.strftime('%Y-%m-%d')] = dict() # resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total, # 'p': round(bins_s * 100 / total, 2).to_list(), # 'title': '总体'} @router.post("/trace_model_sql") async def trace_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """路径分析 sql""" await analysis.init(data_where=current_user.data_where) data = await analysis.trace_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/trace_model") async def trace_model_sql( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """路径分析""" await analysis.init(data_where=current_user.data_where) res = await analysis.trace_model_sql() sql = res['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) chain_dict = defaultdict(dict) nodes = {'流失'} for event_names, count in zip(df['event_chain'], df['values']): chain_len = len(event_names) for i, event_name in enumerate(event_names): if i >= 10: continue next_event = event_names[i + 1] if i < chain_len - 1 else '流失' key = (f'{event_name}{i}', f'{next_event}{i + 1}') nodes.update(key) chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count links = [] for _, items in chain_dict.items(): for keys, val in items.items(): links.append({ "source": keys[0], "target": keys[1], "value": val }) # nodes = set() # for item in links: # nodes.update(( # item['source'], # item['target']) # ) data = { 'nodes': [{'name': item} for item in nodes], 'links': links, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/user_property_model_sql") async def user_property_sql( request: Request, game: str, analysis: UserAnalysis = Depends(UserAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """用户属性sql""" await analysis.init(data_where=current_user.data_where) data = analysis.property_model() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/user_property_model_export") async def user_property_model_export( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: UserAnalysis = Depends(UserAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ): """用户属性 导出""" await analysis.init(data_where=current_user.data_where) data = analysis.property_model() file_name = quote(f'用户属性.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = data['sql'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) df_to_stream = DfToStream((df, '用户属性')) with df_to_stream as d: export = d.to_stream() return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) @router.post("/user_property_model") async def user_property_model( request: Request, game: str, analysis: UserAnalysis = Depends(UserAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """用户属性分析""" await analysis.init(data_where=current_user.data_where) res = analysis.property_model() sql = res['sql'] quota = res['quota'] groupby = res['groupby'] df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) # 没有分组 data = {} if not groupby: data['总体'] = int(df['values'][0]) title = ['总体', quota] else: sum_s = df.groupby(groupby)['values'].sum() for key, val in sum_s.items(): if isinstance(key, tuple): key = ','.join([str(i) for i in key]) else: key = str(key) data[key] = val title = ['.'.join(groupby), quota] return schemas.Msg(code=0, msg='ok', data={ 'value': data, 'title': title })