diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index e926ef3..6ac467d 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -1061,112 +1061,51 @@ async def funnel_model( date_range = res['date_range'] cond_level = res['cond_level'] groupby = res['groupby'] + switch_test = analysis.event_view.switchTest + if switch_test: + df = await ckdb.query_dataframe(sql) + if df.empty: + return schemas.Msg(code=-9, msg='无数据', data=None) + # 补齐level数据 + concat_data = [] + for key, tmp_df in df.groupby(['date'] + groupby): + not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level']) + for item in not_exists_level: + key = key if isinstance(key, tuple) else (key,) + concat_data.append((*key, item, 0)) + # 合并数据 + df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) - df = await ckdb.query_dataframe(sql) - if df.empty: - return schemas.Msg(code=-9, msg='无数据', data=None) - # 补齐level数据 - concat_data = [] - for key, tmp_df in df.groupby(['date'] + groupby): - not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level']) - for item in not_exists_level: - key = key if isinstance(key, tuple) else (key,) - concat_data.append((*key, item, 0)) - # 合并数据 - df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) + # df.set_index('date',inplace=True) + data_list = [] + date_data = {} + if df.shape == (0, 0): + return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level}) - # df.set_index('date',inplace=True) - data_list = [] - date_data = {} - if df.shape == (0, 0): - return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level}) - - tmp = {'title': '总体'} - # 以level分组后的和 - tmp_df = df[['level', 'values']].groupby('level').sum() - # 在原数据上对索引进行排序 - tmp_df.sort_index(inplace=True) - for i in tmp_df.index: - tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() - - tmp['n'] = tmp_df['values'].to_list() - tmp['p1'] = [100] - # tmp['p2'] = [] - for i, v in tmp_df.loc[2:, 'values'].items(): - tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2)) - # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) - data_list.append(tmp) - - # 补齐日期 - all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)} - concat_data = [] - for i in all_idx - set(df.set_index(['date', 'level']).index): - concat_data.append((*i, 0)) - summary_df = pd.concat( - [df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])]) - for key, tmp_df in summary_df.groupby('date'): - tmp_df = tmp_df.groupby('level').sum() + tmp = {'title': '总体'} + # 以level分组后的和 + tmp_df = df[['level', 'values']].groupby('level').sum() + # 在原数据上对索引进行排序 tmp_df.sort_index(inplace=True) for i in tmp_df.index: tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() - tmp = dict() - tmp['n'] = tmp_df['values'].to_list() tmp['p1'] = [100] # tmp['p2'] = [] for i, v in tmp_df.loc[2:, 'values'].items(): - var = round(v * 100 / tmp_df.loc[1, 'values'], 2) - var = 0 if np.isnan(var) else var - tmp['p1'].append(var) + tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2)) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) + data_list.append(tmp) - _ = date_data.setdefault(key.strftime('%Y-%m-%d'), {}) - _['总体'] = tmp - # 分组 - if groupby: - # 补齐数据 + # 补齐日期 + all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)} concat_data = [] - idx = set(df.set_index(['date'] + groupby).index) - all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx} - for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index): + for i in all_idx - set(df.set_index(['date', 'level']).index): concat_data.append((*i, 0)) - - df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) - # df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False) - # 映射对应中文返回给前端展示 - for i in groupby: - if i == 'svrindex': - if game == 'mfmh5': - game = 'mzmfmh5' - chinese = {} - resp = await crud.select_map.get_one(db, game, i) - if not resp: - continue - for ii in resp: - chinese[ii['id']] = ii['title'] - for k, v in chinese.items(): - # 开始映射 - df.loc[df[i] == k, i] = v - for key, tmp_df in df.groupby(groupby): - tmp = {'title': key} - tmp_df = tmp_df.groupby('level').sum() - tmp_df.sort_index(inplace=True) - for i in tmp_df.index: - tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() - - tmp['n'] = tmp_df['values'].to_list() - tmp['p1'] = [100] - # tmp['p2'] = [] - for i, v in tmp_df.loc[2:, 'values'].items(): - var = round(v * 100 / tmp_df.loc[1, 'values'], 2) - var = 0 if np.isnan(var) else var - tmp['p1'].append(var) - # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) - data_list.append(tmp) - - for key, tmp_df in df.groupby(['date'] + groupby): - + summary_df = pd.concat( + [df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])]) + for key, tmp_df in summary_df.groupby('date'): tmp_df = tmp_df.groupby('level').sum() tmp_df.sort_index(inplace=True) for i in tmp_df.index: @@ -1183,18 +1122,130 @@ async def funnel_model( tmp['p1'].append(var) # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) - _ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {}) - # [key[1]] = tmp - title = (groupby or ['总体']) + cond_level - resp = {'list': data_list, - 'date_data': date_data, - 'title': title, - 'level': cond_level, - 'start_date': res['start_date'], - 'end_date': res['end_date'], - 'time_particle': res['time_particle'] - } - return schemas.Msg(code=0, msg='ok', data=resp) + _ = date_data.setdefault(key.strftime('%Y-%m-%d'), {}) + _['总体'] = tmp + # 分组 + if groupby: + # 补齐数据 + concat_data = [] + idx = set(df.set_index(['date'] + groupby).index) + all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx} + for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index): + concat_data.append((*i, 0)) + + df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) + # df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False) + # 映射对应中文返回给前端展示 + for i in groupby: + if i == 'svrindex': + if game == 'mfmh5': + game = 'mzmfmh5' + chinese = {} + resp = await crud.select_map.get_one(db, game, i) + if not resp: + continue + for ii in resp: + chinese[ii['id']] = ii['title'] + for k, v in chinese.items(): + # 开始映射 + df.loc[df[i] == k, i] = v + for key, tmp_df in df.groupby(groupby): + tmp = {'title': key} + tmp_df = tmp_df.groupby('level').sum() + tmp_df.sort_index(inplace=True) + for i in tmp_df.index: + tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() + + tmp['n'] = tmp_df['values'].to_list() + tmp['p1'] = [100] + # tmp['p2'] = [] + for i, v in tmp_df.loc[2:, 'values'].items(): + var = round(v * 100 / tmp_df.loc[1, 'values'], 2) + var = 0 if np.isnan(var) else var + tmp['p1'].append(var) + # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) + data_list.append(tmp) + + for key, tmp_df in df.groupby(['date'] + groupby): + + tmp_df = tmp_df.groupby('level').sum() + tmp_df.sort_index(inplace=True) + for i in tmp_df.index: + tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum() + + tmp = dict() + + tmp['n'] = tmp_df['values'].to_list() + tmp['p1'] = [100] + # tmp['p2'] = [] + for i, v in tmp_df.loc[2:, 'values'].items(): + var = round(v * 100 / tmp_df.loc[1, 'values'], 2) + var = 0 if np.isnan(var) else var + tmp['p1'].append(var) + # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) + + _ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {}) + # [key[1]] = tmp + title = (groupby or ['总体']) + cond_level + resp = {'list': data_list, + 'date_data': date_data, + 'title': title, + 'level': cond_level, + 'switch_test': switch_test, + 'start_date': res['start_date'], + 'end_date': res['end_date'], + 'time_particle': res['time_particle'] + } + return schemas.Msg(code=0, msg='ok', data=resp) + else: + try: + res = await analysis.guide_model_sql() + except Exception as e: + return schemas.Msg(code=-9, msg='报表配置参数异常') + + sql = res['sql'] + df = await ckdb.query_dataframe(sql) + if df.empty: + return schemas.Msg(code=-9, msg='无数据', data=None) + + group_str = res['analysis'] + # 转int + df[group_str] = df[group_str].astype(int) + step_list = [str(i) for i in sorted(df[group_str].unique())] + dict_k = {} + for k, nedf in df.groupby("date"): + ste_k = {} + for kk, ste_df in nedf.groupby(group_str): + value_list = ste_df.iloc[:, -1].to_list() + ste_k[str(kk)] = int(sum(value_list)) + for ste in step_list: + if ste not in list(ste_k.keys()): + ste_k[ste] = 0 + dict_k[str(k)] = ste_k + p_data = {} + data = {} + for dict_key, dict_data in dict_k.items(): + dict_data1 = deepcopy(dict_data) + dict_k1 = {int(k): v for k, v in dict_data1.items()} + sorted(dict_k1.keys()) + data_values = list(dict_k1.values()) + p_values = [round(i / sum(data_values), 2) or 0 for i in data_values] + p_values.insert(0, dict_key) + data_values.insert(0, dict_key) + data[dict_key] = data_values + p_data[dict_key] = p_values + + step_list.insert(0, '日期') + resp = {'list': data, + 'date_data': p_data, + 'title': '1', + 'level': step_list, + 'switch_test': switch_test, + 'start_date': res['start_date'], + 'end_date': res['end_date'], + 'time_particle': res['time_particle'] + } + return schemas.Msg(code=0, msg='ok', data=resp) @router.post("/scatter_model_sql")