import datetime from collections import defaultdict import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request from motor.motor_asyncio import AsyncIOMotorDatabase import crud, schemas from common import * from api import deps from db import get_database from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis from models.user_analysis import UserAnalysis router = APIRouter() @router.post("/sql") async def query_sql( request: Request, data_in: schemas.Sql, ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """原 sql 查询 """ data = await ckdb.execute(data_in.sql) return schemas.Msg(code=0, msg='ok', data=data) @router.post("/event_model_sql") async def event_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 事件分析模型 sql""" await analysis.init() data = analysis.event_model_sql() return schemas.Msg(code=0, msg='ok', data=data) @router.post("/event_model") async def event_model( request: Request, game: str, data_in: schemas.CkQuery, ckdb: CKDrive = Depends(get_ck_db), rdb: RedisDrive = Depends(get_redis_pool), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 事件分析""" await analysis.init() sqls = analysis.event_model_sql() res = [] for item in sqls: q = { 'groups': [], 'values': [], 'sum': [], 'event_name': item['event_name'], 'format': item['format'] } sql = item['sql'] groupby = item['groupby'] date_range = item['date_range'] q['date_range'] = date_range df = await ckdb.query_dataframe(sql) df.fillna(0, inplace=True) if df.shape[0] == 0: continue # return schemas.Msg(code=0, msg='ok', data=[q]) if item['time_particle'] == 'total': # for group, df_group in df.groupby(groupby): # df_group.reset_index(drop=True, inplace=True) q['groups'].append(groupby) q['values'].append(df['values'].to_list()) q['sum'].append(int(df['values'].sum())) q['date_range'] = [f'{i[0]}-{i[1]}' for i in df.set_index(['svrindex', 'name']).index] res.append(q) continue if groupby: # 有分组 for group, df_group in df.groupby(groupby): df_group.reset_index(drop=True, inplace=True) q['groups'].append(str(group)) concat_data = [] for i in set(date_range) - set(df_group['date']): if len(groupby) > 1: concat_data.append((i, *group, 0)) else: concat_data.append((i, group, 0)) df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)]) df_group.sort_values('date', inplace=True) q['values'].append(df_group['values'].to_list()) q['sum'].append(int(df_group['values'].sum())) else: # 无分组 concat_data = [] for i in set(date_range) - set(df['date']): concat_data.append((i, 0)) df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) df.sort_values('date', inplace=True) q['values'].append(df['values'].to_list()) q['sum'].append(int(df['values'].sum())) if item['time_particle'] in ('P1D', 'P1W'): q['date_range'] = [d.strftime('%Y-%m-%d') for d in q['date_range']] elif item['time_particle'] in ('P1M',): q['date_range'] = [d.strftime('%Y-%m') for d in q['date_range']] else: q['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in q['date_range']] # q['eventNameDisplay']=item['event_name_display'] res.append(q) return schemas.Msg(code=0, msg='ok', data=res) @router.post("/retention_model_sql") async def retention_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """留存查询 sql""" await analysis.init() data = analysis.retention_model_sql2() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/retention_model") async def retention_model(request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: await analysis.init() res = analysis.retention_model_sql2() sql = res['sql'] df = await ckdb.query_dataframe(sql) title = f'用户数' date_range = res['date_range'] unit_num = res['unit_num'] df.set_index('reg_date', inplace=True) for i in set(date_range) - set(df.index): df.loc[i] = 0 df.sort_index(inplace=True) days = [i for i in range(unit_num+1)] summary_values = {} today = datetime.datetime.today().date() for date, value in df.T.items(): tmp = summary_values.setdefault(date.strftime('%Y-%m-%d'), dict()) tmp['d0'] = int(value.cnt0) tmp['p'] = [] tmp['n'] = [] tmp['p_outflow'] = [] tmp['n_outflow'] = [] for i in range((today - date).days+1): if i > unit_num: break p = float(getattr(value, f'p{i+1}')) n = int(getattr(value, f'cnt{i+1}')) p_outflow = round(100 - p, 2) n_outflow = value.cnt0 - n tmp['p'].append(p) tmp['n'].append(n) tmp['p_outflow'].append(p_outflow) tmp['n_outflow'].append(n_outflow) resp = { 'summary_values': summary_values, # 'values': values, 'days': days, 'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1], 'title': title } return schemas.Msg(code=0, msg='ok', data=resp) @router.post("/retention_model_del", deprecated=True) async def retention_model( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """留存数据模型""" await analysis.init() res = analysis.retention_model_sql() sql = res['sql'] date_range = res['date_range'] event_a, event_b = res['event_name'] unit_num = res['unit_num'] title = await crud.event_mana.get_show_name(db, game, event_a) title = f'{title}用户数' df = await ckdb.query_dataframe(sql) concat_data = [] df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) df['date'] = df['date'].apply(lambda x: x.date()) # 计算整体 summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum() summary_values = {} for i, d1 in enumerate(date_range): a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set() if not a: continue key = d1.strftime('%Y-%m-%d') for j, d2 in enumerate(date_range[i:]): if j > unit_num: break b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set() tmp = summary_values.setdefault(key, {}) tmp.setdefault('d0', len(a)) tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) tmp.setdefault('n', []).append(len(a & b)) tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2)) tmp.setdefault('n_outflow', []).append(len(a) - len(a & b)) groups = set([tuple(i) for i in df[res['groupby']].values]) df.set_index(res['groupby'], inplace=True) df.sort_index(inplace=True) values = {} days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1] for i, d1 in enumerate(date_range): for g in groups: if len(g) == 1: continue a = set(df.loc[g]['val_a']) if g in df.index else set() if not a: continue key = d1.strftime("%Y-%m-%d") tmp_g = values.setdefault(key, {}) for j, d2 in enumerate(date_range[i:]): if j > unit_num: break b = set(df.loc[g]['val_b']) if g in df.index else set() tmp = tmp_g.setdefault(','.join(g[1:]), {}) tmp.setdefault('d0', len(a)) tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a))) tmp.setdefault('n', []).append(len(a & b)) data = { 'summary_values': summary_values, 'values': values, 'days': days, 'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1], 'title': title } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/funnel_model_sql") async def funnel_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """漏斗数据模型 sql""" await analysis.init() data = analysis.funnel_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/funnel_model") async def funnel_model( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """漏斗数据模型""" await analysis.init() res = analysis.funnel_model_sql() sql = res['sql'] date_range = res['date_range'] cond_level = res['cond_level'] groupby = res['groupby'] df = await ckdb.query_dataframe(sql) # 补齐level数据 concat_data = [] for key, tmp_df in df.groupby(['date'] + groupby): not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level']) for item in not_exists_level: key = key if isinstance(key, tuple) else (key,) concat_data.append((*key, item, 0)) df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # df.set_index('date',inplace=True) data_list = [] date_data = {} if df.shape == (0, 0): return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level}) tmp = {'title': '总体'} tmp_df = df[['level', 'values']].groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2)) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) data_list.append(tmp) # 补齐日期 all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)} concat_data = [] for i in all_idx - set(df.set_index(['date', 'level']).index): concat_data.append((*i, 0)) summary_df = pd.concat( [df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])]) for key, tmp_df in summary_df.groupby('date'): tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp = dict() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): var = round(v * 100 / tmp_df.loc[1, 'values'], 2) var = 0 if np.isnan(var) else var tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) _ = date_data.setdefault(key.strftime('%Y-%m-%d'), {}) _['总体'] = tmp if groupby: # 补齐数据 concat_data = [] idx = set(df.set_index(['date'] + groupby).index) all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx} for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index): concat_data.append((*i, 0)) df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False) for key, tmp_df in df.groupby(groupby): tmp = {'title': key} tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): var = round(v * 100 / tmp_df.loc[1, 'values'], 2) var = 0 if np.isnan(var) else var tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) data_list.append(tmp) for key, tmp_df in df.groupby(['date'] + groupby): tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() tmp = dict() tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): var = round(v * 100 / tmp_df.loc[1, 'values'], 2) var = 0 if np.isnan(var) else var tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) _ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {}) _[key[1]] = tmp title = (groupby or ['总体']) + cond_level resp = {'list': data_list, 'date_data': date_data, 'title': title, 'level': cond_level } return schemas.Msg(code=0, msg='ok', data=resp) @router.post("/scatter_model_sql") async def scatter_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """分布分析 sql""" await analysis.init() data = analysis.scatter_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/scatter_model") async def scatter_model( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """分布分析 模型""" await analysis.init() res = analysis.scatter_model_sql() sql = res['sql'] df = await ckdb.query_dataframe(sql) interval_type = res['interval_type'] analysis = res['analysis'] groupby = res['groupby'] quota_interval_arr = res['quota_interval_arr'] if analysis != 'number_of_days': max_v = int(df['values'].max()) min_v = int(df['values'].min()) interval = (max_v - min_v) // 10 or 1 resp = {'list': dict()} if not quota_interval_arr: resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)] bins = [i for i in range(min_v, max_v + interval, interval)] else: quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')] resp['label'] = [] bins = [quota_interval_arr[0]] for i, v in enumerate(quota_interval_arr[1:]): resp['label'].append(f'[{quota_interval_arr[i]},{v})') bins.append(v) # 这是整体的 for key, tmp_df in df.groupby('date'): bins_s = pd.cut(tmp_df['values'], bins=bins, right=False).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) resp['list'][key.strftime('%Y-%m-%d')] = dict() resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': '总体'} # 分组的 if groupby: for key, tmp_df in df.groupby(['date', *groupby]): bins_s = pd.cut(tmp_df['values'], bins=bins, right=False).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) title = '.'.join(key[1:]) date = key[0] resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total, 'p': round(bins_s * 100 / total, 2).to_list(), 'title': title } return schemas.Msg(code=0, msg='ok', data=resp) if interval_type == 'def' and analysis == 'number_of_days': resp = {'list': {}} for key, tmp_df in df.groupby('date'): total = int(tmp_df['values'].sum()) resp['list'][key.strftime('%Y-%m-%d')] = {'n': total, 'total': total, 'p': 100} return schemas.Msg(code=0, msg='ok', data=resp) @router.post("/trace_model_sql") async def trace_model_sql( request: Request, game: str, analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """路径分析 sql""" await analysis.init() data = analysis.trace_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/trace_model") async def trace_model_sql( request: Request, game: str, ckdb: CKDrive = Depends(get_ck_db), analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """路径分析""" await analysis.init() res = analysis.trace_model_sql() sql = res['sql'] df = await ckdb.query_dataframe(sql) chain_dict = defaultdict(dict) nodes = {'流失'} for event_names, count in zip(df['event_chain'], df['values']): chain_len = len(event_names) for i, event_name in enumerate(event_names): if i >= 10: continue next_event = event_names[i + 1] if i < chain_len - 1 else '流失' key = (f'{event_name}{i}', f'{next_event}{i + 1}') nodes.update(key) chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count links = [] for _, items in chain_dict.items(): for keys, val in items.items(): links.append({ "source": keys[0], "target": keys[1], "value": val }) # nodes = set() # for item in links: # nodes.update(( # item['source'], # item['target']) # ) data = { 'nodes': [{'name': item} for item in nodes], 'links': links } return schemas.Msg(code=0, msg='ok', data=data) @router.post("/user_property_model_sql") async def user_property_sql( request: Request, game: str, analysis: UserAnalysis = Depends(UserAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """用户属性sql""" await analysis.init() data = analysis.property_model() return schemas.Msg(code=0, msg='ok', data=[data]) @router.post("/user_property_model") async def user_property_model( request: Request, game: str, analysis: UserAnalysis = Depends(UserAnalysis), ckdb: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """用户属性分析""" await analysis.init() res = analysis.property_model() sql = res['sql'] quota = res['quota'] groupby = res['groupby'] df = await ckdb.query_dataframe(sql) # 没有分组 data = {'groupby': groupby} title = [] if not groupby: data['总体'] = int(df['values'][0]) title = ['总体', quota] else: sum_s = df.groupby(groupby)['values'].sum() data = dict() for key, val in sum_s.items(): if isinstance(key, tuple): key = ','.join([str(i) for i in key]) else: key = str(key) data[key] = val title = ['.'.join(groupby), quota] return schemas.Msg(code=0, msg='ok', data={ 'value': data, 'title': title })