xbackend/api/api_v1/endpoints/query.py

2339 lines
98 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
from collections import defaultdict
import mimetypes
from urllib.parse import quote
import os
from copy import deepcopy
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request, File
from fastapi.encoders import jsonable_encoder
from motor.motor_asyncio import AsyncIOMotorDatabase
from fastapi.responses import StreamingResponse
# from datetime import datetime
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis, CombinationEvent
from models.user_analysis import UserAnalysis
from models.x_analysis import XAnalysis
from utils import DfToStream, getEveryDay, Download_xlsx, jiange_insert, create_df, create_neidf
router = APIRouter()
@router.post("/sql")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""原 sql 查询 """
sql = data_in.sql
sql1 = sql.lower()
if 'insert' not in sql1 and 'update' not in sql1 and 'delete' not in sql1 and 'select' in sql1:
sql = sql.replace('$game', game)
data = await ckdb.execute(sql)
return schemas.Msg(code=0, msg='ok', data=data)
else:
return schemas.Msg(code=0, msg='ok', data='当前只支持SQL查询语句')
@router.post("/sql_export")
async def query_sql(
request: Request,
game: str,
data_in: schemas.Sql,
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""sql 导出 """
file_name = quote(f'result.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data_in.sql
sql = sql.replace('$game', game)
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df_to_stream = DfToStream((df, 'result'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/event_model_sql")
async def event_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/event_model_export")
async def event_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 事件分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
sqls = await analysis.event_model_sql()
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
excels = []
for item in sqls:
if item.get('combination_event'):
continue
sql = item['sql']
event_name = item['event_name']
df = await ckdb.query_dataframe(sql)
if df.empty:
continue
if 'date' in df:
df.sort_values('date', inplace=True)
try:
df['date'] = df['date'].dt.tz_localize(None)
except:
pass
excels.append((df, event_name))
df_to_stream = DfToStream(*excels)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/event_model_pay")
async def event_model_export(request: Request,
game: str,
data_in: schemas.Times,
ckdb: CKDrive = Depends(get_ck_db)
):
"""下载日充总额详细订单数据"""
sql = f"""select * FROM {game}.event WHERE addHours(`#event_time`, 8) >= '{data_in.start_time}' and addHours(`#event_time`, 8) <= '{data_in.end_time}' and `#event_name` = 'pay' and
orderid NOT LIKE '%GM%' order by `#event_time`"""
df = await ckdb.query_dataframe(sql)
list_columns = list(df.columns.values)
drop_list = []
for i in list_columns:
aa = type(df[i][0])
if df[i][0] == None or df[i][0] == [] or df[i][0] == '':
drop_list.append(i)
else:
if 'time' in i:
df[i] = df[i].astype(str)
for nu in range(len(df)):
df.replace(to_replace=df[i][nu], value=df[i][nu].split('+')[0], inplace=True)
df.drop(drop_list, axis=1, inplace=True)
file_name = quote(f'订单详情.xlsx')
mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df, '订单详情'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
# @router.get("/event_model_export")
# async def event_model_export(request: Request,
# game: str,
# report_id: str,
# ckdb: CKDrive = Depends(get_ck_db),
# # analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
# current_user: schemas.UserDB = Depends(deps.get_current_user)
# ):
# """ 事件分析模型 数据导出"""
# analysis = BehaviorAnalysis(game, schemas.CkQuery(report_id=report_id), get_redis_pool())
# await analysis.init(data_where=current_user.data_where)
# sqls = analysis.event_model_sql()
# res = []
# file_name = f'{sqls[0]["report_name"]}.xlsx'
# mime = mimetypes.guess_type(file_name)[0]
# for item in sqls[:1]:
# sql = item['sql']
# event_name = item['event_name']
# df = await ckdb.query_dataframe(sql)
# file = df_to_stream(df, event_name)
# return StreamingResponse(file, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
#
@router.post("/event_model")
async def event_model(
request: Request,
game: str,
data_in: schemas.CkQuery,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
rdb: RedisDrive = Depends(get_redis_pool),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 事件分析"""
await analysis.init(data_where=current_user.data_where)
try:
sqls = await analysis.event_model_sql()
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
res = []
is_hide = []
group_label = {}
for idx, item in enumerate(sqls): # 列出索引下标
if item.get('is_show') == False:
is_hide.append(idx)
# event_name:事件名,日充总额
# formatfloat浮点型
q = {
'groups': [],
'values': [],
'sum': [],
'avg': [],
'event_name': item['event_name'],
'format': item['format'],
'last_value': 0,
'start_date': item['start_date'],
'end_date': item['end_date'],
'time_particle': item['time_particle']
}
# 处理组合问题如combination_event不存在则跳过
if item.get('combination_event'):
combination_event = CombinationEvent(res, item.get('combination_event'), item['format'])
values, sum_, avg = combination_event.parse()
# q['values'].append(values)
# q['sum'].append(sum_)
q['avg'].append(avg)
q['date_range'] = item['date_range']
for last_value in values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
if list(item.get('event_name'))[-1] == '':
for i in range(len(values)):
values[i] = str((values[i])) + '%'
q['values'].append(values)
q['sum'].append(str(sum_) + '%')
elif '' in item['event_name']:
for i in range(len(values)):
values[i] = str(int(float(values[i]) * 100)) + '%'
q['values'].append(values)
q['sum'].append(str(int(float(sum_) * 100)) + '%')
else:
q['values'].append(values)
q['sum'].append(sum_)
res.append(q)
continue
# sql语句
sql = item['sql']
groupby = item['groupby']
date_range = item['date_range'] # 获取的要查询的每一天的时间
q['date_range'] = date_range # 把要查询的时间加入q字典中
df = await ckdb.query_dataframe(sql) # 以sql语句查出数据df是二维列表
df.fillna(0, inplace=True) # 以0填补空数据
# 映射对应中文返回给前端展示
for i in groupby:
if i == 'svrindex':
if game == 'mfmh5':
game = 'mzmfmh5'
chinese = {}
resp = await crud.select_map.get_one(db, game, i)
for ii in resp:
chinese[ii['id']] = ii['title']
for k, v in chinese.items():
# 开始映射
df.loc[df[i] == k, i] = v
# 获取第一矩阵的长度
if df.shape[0] == 0:
df = pd.DataFrame({'date': date_range, 'values': 0 * len(date_range)})
# continue
# return schemas.Msg(code=0, msg='ok', data=[q])
if item['time_particle'] == 'total':
# for group, df_group in df.groupby(groupby):
# df_group.reset_index(drop=True, inplace=True)
q['groups'].append(groupby)
q['values'].append(df['values'].to_list())
q['sum'].append(round(float(df['values'].sum()), 2))
q['avg'].append(round(float(df['values'].mean()), 2))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
if groupby and (set(groupby) & set(df) == set(groupby)):
q['date_range'] = [f'{i}' for i in df.set_index(groupby).index]
else:
q['date_range'] = ['合计']
# 暂时只执行像素的计费点加别名
if game == 'xiangsu':
if item['groupby'][0] == 'proid' and analysis.events[0]['event_name'] == 'pay':
# 将对应英文的中文意思按位置一一对应返回给前端
proid_dict = await crud.proid_map.get_all_show_name(db, game)
res_list = []
for i in q['date_range']:
try:
name = proid_dict[i]
res_list.append(name)
except:
pass
q['proid_name'] = res_list
# 将proid字段和金额money按对应关系组合成字典并算出对应的总额返回给前端
money_dict = await crud.proid_map.get_all_show_money(db, game)
add_money = []
number = q['values'][0]
next = -1
for i in q['date_range']:
next += 1
try:
mongey = money_dict[i]
add = number[next] * mongey
add_money.append(add)
except:
pass
q['proid_money'] = add_money
# 首充金额分布
# if item['groupby'][0] == 'money' and analysis.events[0]['event_name'] == 'pay':
# # 将proid字段和金额money按对应关系组合成字典并算出对应的总额返回给前端
# money_dict = await crud.proid_map.get_all_show_money(db, game)
# add_money = []
# number = q['values'][0]
# next = -1
# for i in q['date_range']:
# next += 1
# mongey = money_dict[i]
# add = number[next] * mongey
# add_money.append(add)
# q['proid_money'] = add_money
res.append(q)
continue
if groupby and (set(groupby) & set(df)) == set(groupby):
columns = groupby[0]
df[columns] = df[columns].astype(str)
# 有分组
for group, df_group in df.groupby(groupby):
# 在原数据上将索引重新转换为列,新索引的列删除
df_group.reset_index(drop=True, inplace=True)
# 判断为0的改成未知城市
if str(group) == '0' and analysis.event_view['groupBy'][0]['columnDesc'] == '城市':
q['groups'].append('未知城市')
else:
if 'str' in str(type(group)):
q['groups'].append(str(group))
else:
q['groups'].append(str(list(group)))
concat_data = []
for i in set(date_range) - set(df_group['date']):
if len(groupby) > 1:
concat_data.append((i, *group, 0))
else:
concat_data.append((i, group, 0))
df_group = pd.concat([df_group, pd.DataFrame(concat_data, columns=df_group.columns)])
df_group.sort_values('date', inplace=True)
q['values'].append(df_group['values'].to_list())
q['sum'].append(round(float(df_group['values'].sum()), 2))
q['avg'].append(round(float(df_group['values'].mean()), 2))
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
else:
# 无分组
concat_data = []
for i in set(date_range) - set(df['date']):
concat_data.append((i, 0))
# 纵向拼接两个表
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# 在原数据上按data排序
df.sort_values('date', inplace=True)
if len(df) >= 2:
q['chain_ratio'] = division((df.iloc[-1, 1] - df.iloc[-2, 1]) * 100, df.iloc[-2, 1], 2)
if len(df) >= 8:
q['wow'] = division((df.iloc[-1, 1] - df.iloc[-8, 1]) * 100, df.iloc[-8, 1], 2) or 0
q['values'].append(abs(df['values']).to_list())
for last_value in df['values'].values[::-1]:
if last_value > 0:
q['last_value'] = float(last_value)
break
# 求所有值的和
q['sum'].append(round(abs(float(df['values'].sum())), 2))
# 求平均值
q['avg'].append(round(float(df['values'].mean()), 2))
# q['eventNameDisplay']=item['event_name_display']
res.append(q)
group_label = item['group_label']
# 按总和排序
for item in res:
try:
if item['time_particle'] in ('P1D', 'P1W'): # 按格式修改年月日
item['date_range'] = [d.strftime('%Y-%m-%d') for d in item['date_range']]
elif item['time_particle'] in ('P1M',):
item['date_range'] = [d.strftime('%Y-%m') for d in item['date_range']]
else:
item['date_range'] = [d.strftime('%Y-%m-%d %H:%M:%S') for d in item['date_range']]
except:
pass
sort_key = np.argsort(np.array(item['sum']))[::-1] # 将sum中的元素从小到大排列后的结果提取其对应的索引然后倒着输出到变量之中
if item.get('groups'):
item['groups'] = np.array(item['groups'])[sort_key].tolist()
groups = []
groupbys=analysis.event_view.get('groupBy')
groupby_list=[i['columnName'] for i in groupbys]
for gitem in item['groups']:
gb = []
if '(' in gitem or '[' in gitem:
gitem = gitem.replace('(', '').replace(')', '').replace(' ', '').replace("'", '') \
.replace('[', '').replace(']', '')
if isinstance(gitem, list):
true_list = gitem
else:
true_list = gitem.split(',')
for gstr in true_list:
gb.append(gstr)
# 存在标签分组项
if group_label:
for name, idx in group_label.items():
gb.insert(idx, name)
# 去掉分组表现里面的''
# appgb = str(gb).replace("'", '')
# groups.append(appgb)
# item['groups'] = groups
#修改后的方案
by_dict={}
for i in range(len(gb)):
by_dict[groupby_list[i]]=gb[i]
groups.append(by_dict)
item['groups'] = groups
else:
if group_label:
groups = []
gb = []
for name, idx in group_label.items():
gb.insert(idx, name)
# 去掉分组表现里面的''
appgb = str(gb).replace("'", '')
groups.append(appgb)
item['groups'] = groups
item['values'] = np.array(item['values'])[sort_key].tolist()
item['sum'] = np.array(item['sum'])[sort_key].tolist()
item['avg'] = np.array(item['avg'])[sort_key].tolist()
res = [item for idx, item in enumerate(res) if idx not in is_hide]
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/retention_model_sql")
async def retention_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.retention_model_sql2()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/retention_model")
async def retention_model(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
"""留存分析模型"""
try:
res = await analysis.retention_model_sql2() # 初始化开始时间结束时间sql语句 字典
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
sql = res['sql'] # 获取到sql语句
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
date_range = res['date_range'] # 时间 列表
unit_num = res['unit_num'] # int
retention_n = res['retention_n'] # 列表 int
filter_item_type = res['filter_item_type'] # all
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
df.set_index('reg_date', inplace=True)
# 补齐没有数据的日期
for d in set(res['date_range']) - set(df.index):
df.loc[d] = 0
df.sort_index(inplace=True)
summary_values = {'均值': {}}
max_retention_n = 1
# 留存人数
avg = {}
# 流失人数
avgo = {}
for date, v in df.T.items():
# 字典中data存在时不替换否则将data替换成空字典
tmp = summary_values.setdefault(date, dict())
tmp['d0'] = int(v.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in retention_n:
n = (pd.Timestamp.now().date() - date).days
if i > n:
continue
# max_retention_n = i if i > max_retention_n else max_retention_n
# 留存的人数
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
# 流失的人数
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
tmp['p'].append(v[f'p{i}'])
tmp['n'].append(v[f'cnt{i}'])
tmp['p_outflow'].append(v[f'op{i}'])
tmp['n_outflow'].append(v[f'on{i}'])
tmp = summary_values['均值']
retention_avg_dict = {}
for rn in retention_n:
for rt, rd in df.T.items():
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
# 算均值
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
tmp['d0'] = 0
for rt, rd in retention_avg_dict.items():
tmp['d0'] = int(df['cnt0'].sum())
n = round(rd['cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p'].append(n)
tmp['n'].append(rd['cntn'])
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p_outflow'].append(n)
tmp['n_outflow'].append(rd['o_cntn'])
# 次留数
title = ['日期', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]]
# 未到达的日期需要补齐-
retention_length = len(retention_n)
for _, items in summary_values.items():
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
items[key].extend(['-'] * (retention_length - len(items[key])))
resp = {
'summary_values': summary_values,
# 'values': values,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
'title': title,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/retention_model_details")
async def retention_model(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
"""留存分析分组详情"""
try:
res = await analysis.retention_model_sql3() # 初始化开始时间结束时间sql语句 字典
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
sql = res['sql'] # 获取到sql语句
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
date_range = res['date_range'] # 时间 列表
# unit_num = res['unit_num'] # int
retention_n = res['retention_n'] # 列表 int
filter_item_type = res['filter_item_type'] # all
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
# 映射对应中文返回给前端展示
groupby_list = analysis.event_view.get('groupBy')
groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label']
true_group = [] # 定义分组实际选择
for g_data in groupby_list:
data_type = g_data['data_type']
# 不是int类型
if data_type != "int":
true_group.append("str")
continue
# 自定义区间
if g_data['intervalType'] == 'user_defined':
int_range = analysis.event_view.get('groupBy')[0]['quotaIntervalArr']
chk_range = []
for index, value in enumerate(int_range):
# 开头
if index == 0:
chk_range.append(['-', value])
# 只有两个数
if len(int_range) >= 2:
chk_range.append([value, int_range[index + 1]])
continue
# 结尾
if index + 1 >= len(int_range):
chk_range.append([value, '+'])
continue
# 中间
chk_range.append([value, int_range[index + 1]])
true_group.append(chk_range)
# 默认区间
elif g_data['intervalType'] == 'def':
zidai = []
max_v = int(df[g_data['columnName']].max())
min_v = int(df[g_data['columnName']].min())
interval = (max_v - min_v) // 10 or 1
for i in range(min_v, max_v, interval):
zidai.append([i, i + interval])
true_group.append(zidai)
# 离散数字
else:
true_group.append('discrete')
if len(groupby_list) == 1:
max_v = int(df[groupby_list[0]['columnName']].max())
min_v = int(df[groupby_list[0]['columnName']].min())
for i in groupby:
if i == 'svrindex':
if game == 'mfmh5':
game = 'mzmfmh5'
chinese = {}
resp = await crud.select_map.get_one(db, game, i)
for ii in resp:
chinese[ii['id']] = ii['title']
for k, v in chinese.items():
# 开始映射
df.loc[df['svrindex'] == k, 'svrindex'] = v
times = df['reg_date'][0]
df.set_index(groupby, inplace=True)
# for d in set(res['date_range']) - set(df.index):
# df.loc[d] = 0
df.sort_index(inplace=True)
summary_values = {'均值': {}}
max_retention_n = 1
# 留存人数
avg = {}
# 流失人数
avgo = {}
for date, v in df.T.items():
# 字典中data存在时不替换否则将data替换成空字典
tmp = summary_values.setdefault(date, dict())
tmp['d0'] = int(v.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in retention_n:
n = (pd.Timestamp.now().date() - v[0]).days
if i > n:
continue
# max_retention_n = i if i > max_retention_n else max_retention_n
# 留存的人数
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
# 流失的人数
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
tmp['p'].append(v[f'p{i}'])
tmp['n'].append(v[f'cnt{i}'])
tmp['p_outflow'].append(v[f'op{i}'])
tmp['n_outflow'].append(v[f'on{i}'])
tmp = summary_values['均值']
retention_avg_dict = {}
group_label = res['group_label']
# 多个分组项时,合成列表返回
if not group_label:
if len(groupby) > 1:
summary_valuess = {}
for k, v in summary_values.items():
if 'str' in str(type(k)):
summary_valuess[str([k])] = v
else:
summary_valuess[str(list(k))] = v
else:
summary_valuess = summary_values
# 包含标签分组项
else:
summary_valuess = {}
for k, v in summary_values.items():
key = list(k)
# 增加标签分组到对应的key里面
for name, index in group_label.items():
key.insert(index, name)
summary_valuess[str(key)] = v
for rn in retention_n:
for rt, rd in df.T.items():
if times + datetime.timedelta(days=rn) <= pd.datetime.now().date():
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
tmp['d0'] = 0
for rt, rd in retention_avg_dict.items():
tmp['d0'] = int(df['cnt0'].sum())
n = round(rd['cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p'].append(n)
tmp['n'].append(rd['cntn'])
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p_outflow'].append(n)
tmp['n_outflow'].append(rd['o_cntn'])
# 如果分组项是int类型按选择的分组
if '均值' in summary_valuess:
summary_valuess.pop('均值')
if "['均值']" in summary_valuess:
summary_valuess.pop("['均值']")
new_summary_valuess = {}
for group_key, group_data in summary_valuess.items():
key_list = eval(group_key)
true_key = [] # 重新定义后的分组
for index, value in enumerate(key_list):
true_group_index = true_group[index]
# 默认区间或者自定义区间
if isinstance(true_group_index, list):
for defined_list in true_group_index:
defined_list_max = defined_list[1]
defined_list_min = defined_list[0]
if defined_list_min == '-':
if value < defined_list_max:
true_key.append(defined_list)
break
else:
continue
if defined_list_max == '+':
if value >= defined_list_min:
true_key.append(defined_list)
break
else:
continue
if defined_list_min <= value < defined_list_max:
true_key.append(defined_list)
break
continue
continue
# 分组是字符串或者离散直接取这个值得str类型
if true_group_index in ['str', 'discrete']:
true_key.append(str(value))
continue
# 这个分组不存在:
if str(true_key) not in new_summary_valuess:
new_summary_valuess[str(true_key)] = group_data
continue
# 这个分组已存在
# d0相加
new_summary_valuess[str(true_key)]['d0'] += group_data['d0']
# n相加
n_list = new_summary_valuess[str(true_key)]['n']
n_list1 = group_data['n']
sum_n_lst = [x + y for x, y in zip(n_list, n_list1)]
new_summary_valuess[str(true_key)]['n'] = sum_n_lst
# n_outflow相加
n_outflow_list = new_summary_valuess[str(true_key)]['n_outflow']
n_outflow_list1 = group_data['n_outflow']
sum_n_ourflow_lst = [x + y for x, y in zip(n_outflow_list, n_outflow_list1)]
new_summary_valuess[str(true_key)]['n_outflow'] = sum_n_ourflow_lst
# 计算概率
for key1, value1 in new_summary_valuess.items():
new_summary_valuess[key1]['p'] = [round(i / value1['d0'], 2) for i in value1['n']]
new_summary_valuess[key1]['p_outflow'] = [round(i1 / value1['d0'], 2) for i1 in value1['n_outflow']]
title = ['分组项', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]]
# 未到达的日期需要补齐-
retention_length = len(retention_n)
for _, items in new_summary_valuess.items():
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
items[key].extend(['-'] * (retention_length - len(items[key])))
resp = {
'summary_values': new_summary_valuess,
# 'values': values,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
'title': title,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
async def retention_model01(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
res = await analysis.retention_model_sql2() # 初始化开始时间结束时间sql语句 字典
sql = res['sql'] # 获取到sql语句
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
date_range = res['date_range'] # 时间 列表
unit_num = res['unit_num'] # int
retention_n = res['retention_n'] # 列表 int
filter_item_type = res['filter_item_type'] # all
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
df.set_index('reg_date', inplace=True)
for d in set(res['date_range']) - set(df.index):
df.loc[d] = 0
df.sort_index(inplace=True)
summary_values = {'均值': {}}
max_retention_n = 1
avg = {}
avgo = {}
for date, v in df.T.items():
# 字典中data存在时不替换否则将data替换成空字典
tmp = summary_values.setdefault(date, dict())
tmp['d0'] = int(v.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in retention_n:
n = (pd.Timestamp.now().date() - date).days
if i > n:
continue
# max_retention_n = i if i > max_retention_n else max_retention_n
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
tmp['p'].append(round(100 - v[f'p{i}'], 2))
# tmp['p'].append(v[f'p{i}'])
tmp['n'].append(v[f'cnt{i}'])
tmp['p_outflow'].append(v[f'op{i}'])
tmp['n_outflow'].append(v[f'on{i}'])
tmp = summary_values['均值']
retention_avg_dict = {}
for rn in retention_n:
for rt, rd in df.T.items():
if rt + datetime.timedelta(days=rn) <= pd.datetime.now().date():
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
tmp['d0'] = 0
for rt, rd in retention_avg_dict.items():
tmp['d0'] = int(df['cnt0'].sum())
n = round(100 - (rd['cntn'] * 100 / rd['cnt0']), 2)
# n = round(rd['cntn'] * 100 / rd['cnt0'],2)
n = 0 if np.isnan(n) else n
tmp['p'].append(n)
tmp['n'].append(rd['cntn'])
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p_outflow'].append(n)
tmp['n_outflow'].append(rd['o_cntn'])
title = ['日期', '用户数', '次流失', *[f'{i + 1}流失' for i in retention_n[1:]]]
# 未到达的日期需要补齐-
retention_length = len(retention_n)
for _, items in summary_values.items():
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
items[key].extend(['-'] * (retention_length - len(items[key])))
resp = {
'summary_values': summary_values,
# 'values': values,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
'title': title,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/retention_model_export")
async def retention_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 留存分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.retention_model_sql2()
file_name = quote(f'留存分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df_to_stream = DfToStream((df, '留存分析'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/retention_model_del", deprecated=True)
async def retention_model_del(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
unit_num = res['unit_num']
title = await crud.event_mana.get_show_name(db, game, event_a)
title = f'{title}用户数'
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
concat_data = []
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
df['date'] = df['date'].apply(lambda x: x.date())
# 计算整体
summary_df = df.groupby('date')[['val_a', 'val_b', 'amount_a']].sum()
summary_values = {}
for i, d1 in enumerate(date_range):
a = set(summary_df.loc[d1]['val_a']) if d1 in summary_df.index else set()
if not a:
continue
key = d1.strftime('%Y-%m-%d')
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(summary_df.loc[d2]['val_b']) if d2 in summary_df.index else set()
tmp = summary_values.setdefault(key, {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
tmp.setdefault('p_outflow', []).append(round(100 - division(len(a & b) * 100, len(a)), 2))
tmp.setdefault('n_outflow', []).append(len(a) - len(a & b))
groups = set([tuple(i) for i in df[res['groupby']].values])
df.set_index(res['groupby'], inplace=True)
df.sort_index(inplace=True)
values = {}
days = [i for i in range((date_range[-1] - date_range[0]).days + 1)][:unit_num + 1]
for i, d1 in enumerate(date_range):
for g in groups:
if len(g) == 1:
continue
a = set(df.loc[g]['val_a']) if g in df.index else set()
if not a:
continue
key = d1.strftime("%Y-%m-%d")
tmp_g = values.setdefault(key, {})
for j, d2 in enumerate(date_range[i:]):
if j > unit_num:
break
b = set(df.loc[g]['val_b']) if g in df.index else set()
tmp = tmp_g.setdefault(','.join(g[1:]), {})
tmp.setdefault('d0', len(a))
tmp.setdefault('p', []).append(division(len(a & b) * 100, len(a)))
tmp.setdefault('n', []).append(len(a & b))
data = {
'summary_values': summary_values,
'values': values,
'days': days,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range][:unit_num + 1],
'title': title,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/funnel_model_sql")
async def funnel_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/funnel_model")
async def funnel_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.funnel_model_sql()
sql = res['sql']
# 查询的时间
date_range = res['date_range']
cond_level = res['cond_level']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
# 补齐level数据
concat_data = []
for key, tmp_df in df.groupby(['date'] + groupby):
not_exists_level = {i for i in range(1, len(cond_level) + 1)} - set(tmp_df['level'])
for item in not_exists_level:
key = key if isinstance(key, tuple) else (key,)
concat_data.append((*key, item, 0))
# 合并数据
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.set_index('date',inplace=True)
data_list = []
date_data = {}
if df.shape == (0, 0):
return schemas.Msg(code=0, msg='ok', data={'list': data_list, 'level': cond_level})
tmp = {'title': '总体'}
# 以level分组后的和
tmp_df = df[['level', 'values']].groupby('level').sum()
# 在原数据上对索引进行排序
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
tmp['p1'].append(round(v * 100 / tmp_df.loc[1, 'values'], 2))
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
# 补齐日期
all_idx = {(dt, lv) for dt in date_range for lv in range(1, len(cond_level) + 1)}
concat_data = []
for i in all_idx - set(df.set_index(['date', 'level']).index):
concat_data.append((*i, 0))
summary_df = pd.concat(
[df[['date', 'level', 'values']], pd.DataFrame(concat_data, columns=['date', 'level', 'values'])])
for key, tmp_df in summary_df.groupby('date'):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
_['总体'] = tmp
# 分组
if groupby:
# 补齐数据
concat_data = []
idx = set(df.set_index(['date'] + groupby).index)
all_idx = {(*j, i) for i in range(1, len(cond_level) + 1) for j in idx}
for i in all_idx - set(df.set_index(list(('date', *groupby, 'level'))).index):
concat_data.append((*i, 0))
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
# 映射对应中文返回给前端展示
for i in groupby:
if i == 'svrindex':
if game == 'mfmh5':
game = 'mzmfmh5'
chinese = {}
resp = await crud.select_map.get_one(db, game, i)
for ii in resp:
chinese[ii['id']] = ii['title']
for k, v in chinese.items():
# 开始映射
df.loc[df[i] == k, i] = v
for key, tmp_df in df.groupby(groupby):
tmp = {'title': key}
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
data_list.append(tmp)
for key, tmp_df in df.groupby(['date'] + groupby):
tmp_df = tmp_df.groupby('level').sum()
tmp_df.sort_index(inplace=True)
for i in tmp_df.index:
tmp_df.loc[i, 'values'] = tmp_df.loc[i:]['values'].sum()
tmp = dict()
tmp['n'] = tmp_df['values'].to_list()
tmp['p1'] = [100]
# tmp['p2'] = []
for i, v in tmp_df.loc[2:, 'values'].items():
var = round(v * 100 / tmp_df.loc[1, 'values'], 2)
var = 0 if np.isnan(var) else var
tmp['p1'].append(var)
# tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2))
_ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {})
_[key[1]] = tmp
title = (groupby or ['总体']) + cond_level
resp = {'list': data_list,
'date_data': date_data,
'title': title,
'level': cond_level,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/scatter_model_sql")
async def scatter_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.scatter_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/scatter_model_export")
async def retention_model_export(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
""" 分布分析 数据导出"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.scatter_model_sql()
file_name = quote(f'分布分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
interval_type = res['interval_type']
analysis = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
# 兼容合计的
if res['time_particle'] == 'total':
df['date'] = '合计'
if analysis != 'number_of_days' and interval_type != 'discrete':
max_v = int(df['values'].max())
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
if res['time_particle'] == 'total':
resp['list']['合计'] = dict()
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
else:
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
# 分组的
if groupby:
export_df = pd.DataFrame(columns=resp['label'])
for key, tmp_df in df.groupby(['date', *groupby]):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': title
}
export_df.loc[(date.strftime('%Y-%m-%d'), title)] = bins_s.to_list()
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime,
headers={'Content-Disposition': f'filename="{file_name}"'})
# elif analysis == 'number_of_days':
else:
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
labels = [str(i) for i in sorted(df['values'].unique())]
resp['label'] = labels
for key, tmp_df in df.groupby(['date']):
total = len(tmp_df)
if res['time_particle'] == 'total':
dt = '合计'
else:
dt = key.strftime('%Y-%m-%d')
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
export_df = pd.DataFrame(columns=resp['label'])
for d, v in resp['list'].items():
export_df.loc[d] = v['总体']['n']
df_to_stream = DfToStream((export_df, '分布分析'), (df, '分布分析原始数据'), index=True)
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/scatter_model")
async def scatter_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init(data_where=current_user.data_where)
event_type = analysis.events[0]['eventName']
try:
res = await analysis.scatter_model_sql()
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
end_date = analysis.end_date
start_date = analysis.start_date
where = analysis.events[-1]['quotaname']
sql = res['sql']
# columnName = analysis.events[-1]['label_id']
# 查询买量渠道owner为kuaiyou3的日注册玩家等级分布
# sql_list=sql.split("GROUP BY")
# sql01 = """and xiangsu.event.owner_name='kuaiyou3'GROUP BY"""""
# new_sql=sql_list[0]+sql01+sql_list[1]
# if columnName != '':
# sql = sql.replace('SELECT', f'SELECT {columnName},', 1)
# sql += f',{columnName}'
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
if 'list' in str(type(df['values'][0])):
# f=lambda x:x[0]
# df['values']=df['values'].map(f)
df = df.explode("values").reset_index(drop=True)
# df['values']=df['values'].astype(str)
df.fillna(0, inplace=True)
# 转换数据类型为int
if analysis.events[-1].get('analysis') != 'uniqExact':
df['values'] = df['values'].astype(int)
else:
df['values'] = df['values'].astype(str) # 统一声明使用去重数的时候为str
interval_type = res['interval_type']
analysi = res['analysis']
groupby = res['groupby']
quota_interval_arr = res['quota_interval_arr']
# 兼容合计的
if res['time_particle'] == 'total':
df['date'] = '合计'
if analysi != 'number_of_days' and interval_type != 'discrete':
try:
max_v = int(df['values'].max())
except Exception as e:
return schemas.Msg(code=-9, msg='请用离散数字', data=None)
min_v = int(df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# 这是整体的
for key, tmp_df in df.groupby('date'):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=False, include_lowest=True).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
if res['time_particle'] == 'total':
resp['list']['合计'] = dict()
p = list(round(bins_s * 100 / total, 2).to_list())
resp['list']['合计']['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': [str(i) + '%' for i in p],
'title': '总体'}
else:
p = list(round(bins_s * 100 / total, 2).to_list())
resp['list'][key.strftime('%Y-%m-%d')] = dict()
resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
'p': [str(i) + '%' for i in p],
'title': '总体'}
# 分组的
# if groupby:
# for key, tmp_df in df.groupby(['date', *groupby]):
# bins_s = pd.cut(tmp_df['values'], bins=bins,
# right=False).value_counts()
# bins_s.sort_index(inplace=True)
# total = int(bins_s.sum())
# title = '.'.join(key[1:])
# date = key[0]
# resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
# 'p': round((bins_s * 100 / total).fillna(0),
# 2).to_list(),
# 'title': title
# }
download = analysis.event_view.get('download', '')
if download == 1:
creat_df = create_df(resp)
Download = Download_xlsx(creat_df, '分布分析')
return Download
return schemas.Msg(code=0, msg='ok', data=resp)
# elif analysis == 'number_of_days':
else:
# 离散数字
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
labels = [str(i) for i in sorted(df['values'].unique())]
resp['label'] = labels
shaixuan = analysis.events[0].get('analysis')
for key, tmp_df in df.groupby(['date']):
if shaixuan == 'uniqExact':
total = len(set(tmp_df['uid']))
else:
total = len(tmp_df)
if res['time_particle'] == 'total':
dt = '合计'
else:
dt = key.strftime('%Y-%m-%d')
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
if event_type == 'pay':
# 如需要2之后所有之和则执行下面代码返回值为字典的labels_dict01
labels_dict01 = {}
v = -1
for i in labels:
v += 1
if int(i) == 1:
labels_dict01["1"] = labels_dict["1"]
else:
# for number in labels_dict.keys():
# if number >=i:
values = list(labels_dict.values())
n = sum(values[v:])
labels_dict01[i] = n
# 传入百分比数据
list_p = []
for i in labels:
number_int = round(labels_dict01.get(i, 0) * 100 / total, 2)
number_str = str(number_int) + '%'
list_p.append(number_str)
resp['list'][dt] = {'总体': {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
'p': list_p}}
else:
list_p = []
for i in labels:
number_int = round(labels_dict.get(i, 0) * 100 / total, 2)
number_str = str(number_int) + '%'
list_p.append(number_str)
resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': list_p}}
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
if where == "step_id" and event_type == "guide":
sql = f"""SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, count(DISTINCT {game}.event."#account_id") AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{start_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = 'create_account' GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date"""
df = await ckdb.query_dataframe(sql)
for i in range(len(df)):
resp['list'][str(df['date'][i])]['总体']['total'] = int(df['values'][i])
# 兼容下载功能
download = analysis.event_view.get('download', '')
if download == 1:
creat_df = create_df(resp)
Download = Download_xlsx(creat_df, '分布分析')
return Download
return schemas.Msg(code=0, msg='ok', data=resp)
# bins_s = pd.cut(tmp_df['values'], bins=bins,
# right=False).value_counts()
# bins_s.sort_index(inplace=True)
# total = int(bins_s.sum())
# resp['list'][key.strftime('%Y-%m-%d')] = dict()
# resp['list'][key.strftime('%Y-%m-%d')]['总体'] = {'n': bins_s.to_list(), 'total': total,
# 'p': round(bins_s * 100 / total, 2).to_list(),
# 'title': '总体'}
@router.post("/scatter_model_details")
async def scatter_model(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""分布分析分组详情"""
await analysis.init(data_where=current_user.data_where)
try:
res = await analysis.scatter_model_sql()
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
event_type = analysis.events[0]['eventName']
where = analysis.events[-1]['quotaname']
sql = res['sql']
group_by = analysis.event_view['groupBy']
# 排除标签
true_group = [i for i in group_by if i['data_type'] != "user_label"]
# columnName = true_group[-1]['columnName']
columnName = ''
if true_group != []:
# if columnName != '':
# # 按天分组
# sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va',
# 1)
# sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1)
# # 按周分组
# sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date',
# f'`{columnName}` as va',
# 1)
# sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1)
# # 按月分组
# sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date',
# f'`{columnName}` as va',
# 1)
# sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1)
# # 合计
# if analysis.event_view.get('timeParticleSize') == "total":
# sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1)
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
if 'list' in str(type(df['values'][0])):
# f = lambda x: x[0]
# df['values'] = df['values'].map(f)
df = df.explode("values").reset_index(drop=True)
df.fillna(0, inplace=True)
# 转换数据类型为int
if analysis.events[-1].get('analysis') != 'uniqExact':
df['values'] = df['values'].astype(int)
else:
df['values'] = df['values'].astype(str) # 统一声明使用去重数的时候为str
interval_type = res['interval_type']
analysi = res['analysis']
groupby = res['groupby']
true_df = df.groupby(groupby).sum()
group_label = res['group_label']
quota_interval_arr = res['quota_interval_arr']
# 兼容合计的
# if res['time_particle'] == 'total':
# if len(groupby) > 0:
# df['va'] = '合计'
if analysi != 'number_of_days' and interval_type != 'discrete':
# 默认区间
max_v = int(true_df['values'].max())
min_v = int(true_df['values'].min())
interval = (max_v - min_v) // 10 or 1
resp = {'list': dict(),
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle'],
'biaotou': groupby
}
# if 'float' in str(df.dtypes['va']):
# df['va'] = df['va'].astype(int)
# for index, gi in enumerate(groupby):
# resp['list'][str(index)] = dict()
# if 'float' in str(df.dtypes[gi]):
# df[gi] = df[gi].astype(int)
# if 'list' in str(type(df[gi][0])):
# f = lambda x: x[0]
# df[gi] = df[gi].map(f)
if not quota_interval_arr:
resp['label'] = [f'[{i},{i + interval})' for i in range(min_v, max_v, interval)]
bins = [i for i in range(min_v, max_v + interval, interval)]
else:
quota_interval_arr = [-float('inf')] + quota_interval_arr + [float('inf')]
resp['label'] = []
bins = [quota_interval_arr[0]]
for i, v in enumerate(quota_interval_arr[1:]):
resp['label'].append(f'[{quota_interval_arr[i]},{v})')
bins.append(v)
# if 'float' in str(df.dtypes['va']):
# df['va'] = df['va'].astype(int)
# if 'list' in str(type(df['va'][0])):
# f = lambda x: x[0]
# df['va'] = df['va'].map(f)
# 这是分组的
for key, tmp_df in true_df.groupby(groupby):
bins_s = pd.cut(tmp_df['values'], bins=bins,
right=True, include_lowest=True).value_counts()
bins_s.sort_index(inplace=True)
total = int(bins_s.sum())
if group_label:
if isinstance(key, str):
key = [key]
key = list(key)
for name, idx in group_label.items():
key.insert(idx, name)
key = str(key)
if res['time_particle'] == 'total111':
resp['list']['合计'] = dict()
resp['list']['合计'] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'title': '总体'}
else:
p = round(bins_s * 100 / total, 2).to_list()
for i in range(len(p)):
if str(p[i]) == 'nan':
p[i] = 0
# 映射对应的埋点数据
# re = await crud.select_map.get_list(db, game)
# re_list = [i['attr_name'] for i in re]
# if gi in re_list:
# for i in re:
# if gi == i['attr_name']:
# for datas in i['map_']:
# if key == datas['id']:
# key = datas['title']
# break
# break
# if 'time' not in groupby:
resp['list'][str(key)] = dict()
resp['list'][str(key)] = {'n': bins_s.to_list(), 'total': total,
'p': [str(i) + '%' for i in p],
'title': '总体'}
# else:
# resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = dict()
# resp['list'][key.strftime('%Y-%m-%d %H:%M:%S')] = {'n': bins_s.to_list(), 'total': total,
# 'p': [str(i) + '%' for i in p],
# 'title': '总体'}
# 兼容下载功能
download = analysis.event_view.get('download', '')
if download == 1:
create_df = create_neidf(resp, columnName)
Download = Download_xlsx(create_df, '分布分析')
return Download
if group_label:
for name, idx in group_label.items():
resp['biaotou'].insert(idx, name)
return schemas.Msg(code=0, msg='ok', data=resp)
else:
# 离散数字
resp = {'list': {}, 'label': [],
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle'],
'biaotou': groupby
}
labels = [str(i) for i in sorted(true_df['values'].unique())]
resp['label'] = labels
# for index, gi in enumerate(groupby):
# resp['list'][str(index)] = dict()
# if 'list' in str(type(df[gi][0])):
# f = lambda x: x[0]
# df[gi] = df[gi].map(f)
shaixuan = analysis.events[0].get('analysis')
for key, tmp_df in true_df.groupby(groupby):
if shaixuan == 'uniqExact':
total = len(set(tmp_df['uid']))
else:
total = len(tmp_df)
if res['time_particle'] == 'total11':
dt = '合计'
else:
# 映射对应的埋点数据
# re = await crud.select_map.get_list(db, game)
# re_list = [i['attr_name'] for i in re]
# if gi in re_list:
# for i in re:
# if gi == i['attr_name']:
# for datas in i['map_']:
# if key == datas['id']:
# key = datas['title']
# break
# break
dt = key
# dt = key.strftime('%Y-%m-%d')
# dt='合计'
# 存在标签分组
if group_label:
if isinstance(dt, str):
dt = [dt]
dt = list(dt)
for name, idx in group_label.items():
dt.insert(idx, name)
dt = str(dt)
labels_dict = {}
for key2, tmp_df2 in tmp_df.groupby('values'):
label = str(key2)
n = len(tmp_df2)
labels_dict[label] = n
if event_type == 'pay':
# 如需要2之后所有之和则执行下面代码返回值为字典的labels_dict01
labels_dict01 = {}
v = -1
for i in labels:
v += 1
if int(i) == 1:
labels_dict01["1"] = labels_dict["1"]
else:
# for number in labels_dict.keys():
# if number >=i:
values = list(labels_dict.values())
n = sum(values[v:])
labels_dict01[i] = n
# 传入百分比数据
list_p = []
for i in labels:
number_int = round(labels_dict01.get(i, 0) * 100 / total, 2)
number_str = str(number_int) + '%'
list_p.append(number_str)
resp['list'][str(dt)] = {'n': [labels_dict01.get(i, 0) for i in labels], 'total': total,
'p': list_p}
else:
list_p = []
for i in labels:
number_int = round(labels_dict.get(i, 0) * 100 / total, 2)
number_str = str(number_int) + '%'
list_p.append(number_str)
resp['list'][str(dt)] = {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
'p': list_p}
# resp['list'][dt] = {'总体': {'n': [labels_dict.get(i, 0) for i in labels], 'total': total,
# 'p': [round(labels_dict.get(i, 0) * 100 / total, 2) for i in labels]}}
if where == "step_id" and event_type == "guide":
sql = f"""SELECT toDate(addHours({game}.event."#event_time", 8)) AS date, count(DISTINCT {game}.event."#account_id") AS values FROM {game}.event WHERE addHours({game}.event."#event_time", 8) >= '{start_date}' AND addHours({game}.event."#event_time", 8) <= '{end_date}' AND {game}.event."#event_name" = 'create_account' GROUP BY toDate(addHours({game}.event."#event_time", 8)) ORDER BY date"""
df = await ckdb.query_dataframe(sql)
for i in range(len(df)):
resp['list'][str(df['date'][i])]['total'] = int(df['values'][i])
# 兼容下载功能
download = analysis.event_view.get('download', '')
if download == 1:
create_df = create_neidf(resp, columnName)
Download = Download_xlsx(create_df, '分布分析')
return Download
if group_label:
for name, idx in group_label.items():
resp['biaotou'].insert(idx, name)
return schemas.Msg(code=0, msg='ok', data=resp)
else:
return schemas.Msg(code=-9, msg='没有添加分组项', data='')
@router.post("/trace_model_sql")
async def trace_model_sql(
request: Request,
game: str,
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = await analysis.trace_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/trace_model")
async def trace_model_sql(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.trace_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
chain_dict = defaultdict(dict)
event_num_dict = {}
event_next_event = {}
nodes = {'流失'}
name_list = analysis.events['event_namesdes']
name_dict = {}
for i in name_list:
name_dict[i['event_name']] = i['event_desc']
for event_names, count in zip(df['event_chain'], df['values']):
fmt_keys = []
chain_len = len(event_names)
for i, event_name in enumerate(event_names):
if i >= 10:
continue
next_event = event_names[i + 1] if i < chain_len - 1 else '流失'
# 按对应的中文名显示
event_namess = name_dict.get(event_name, event_name)
next_eventss = name_dict.get(next_event, next_event)
key = (f'{event_namess}-{i}', f'{next_eventss}-{i + 1}')
# key = (f'{event_name}', f'{next_event}')
nodes.update(key)
chain_dict[i][key] = chain_dict[i].setdefault(key, 0) + count
keys = list(key)
for true_key in keys:
if true_key in fmt_keys:
continue
if true_key in event_num_dict:
event_num_dict[true_key] += count
else:
event_num_dict[true_key] = count
fmt_keys.append(true_key)
# 检测事件的后续事件有哪些
if keys[0] in event_next_event:
event_next_event[keys[0]].append(keys[1])
event_next_event[keys[0]] = list(set(event_next_event[keys[0]]))
else:
event_next_event[keys[0]] = [keys[1]]
links = []
for _, items in chain_dict.items():
for keys, val in items.items():
links.append({
"source": keys[0],
"target": keys[1],
"value": val
})
node = [item for item in nodes]
node.sort()
# 按固定的首尾排序
first = []
trail = []
nodes = []
for i in node:
if analysis.events['source_event']['eventDesc'] in i:
first.append(i)
elif '流失' in i:
trail.append(i)
else:
nodes.append(i)
first.sort(reverse=True)
for i in first:
nodes.insert(0, i)
for i in trail:
nodes.append(i)
# 处理event_next_event
event_new_next = {}
for key, key_list in event_next_event.items():
new_key_list = []
for key1 in key_list:
new_key_list.append({'event_name': key1, 'value': event_num_dict[key1]})
event_new_next[key] = new_key_list
data = {
# 'nodes': [{'name': item} for item in nodes],
'nodes': [{'name': item} for item in nodes],
'links': links,
'event_num': event_num_dict,
'event_next': event_new_next,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/trace_user_info_model")
async def trace_model_sql(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""路径分析用户详情"""
await analysis.init(data_where=current_user.data_where)
res = await analysis.trace_model_sql()
sql = res['sql']
name_list = analysis.events['event_namesdes']
name_dict = {}
for i in name_list:
name_dict[i['event_name']] = i['event_desc']
# 获取事件对应的uid
event_uid_list = await ckdb.query_data_trace(sql, name_dict)
if not event_uid_list:
return schemas.Msg(code=-9, msg='无数据', data=None)
event_name = analysis.events['event_name']
page = analysis.events['page']
event_next = analysis.events['event_next']
starindex = (page - 1) * 10
all_uid_list = []
# 后续事件统计的用户详情
if event_name.startswith('follow'):
for event_next_list in event_next:
true_event_name = event_next_list['event_name']
if true_event_name.startswith('流失'):
continue
all_uid_list += event_uid_list[true_event_name]
all_uid_list = list(set(all_uid_list))
# 更多事件的用户详情
elif event_name.startswith('more'):
# 后续事件排序后取第8个开始
event_next_true_eventlist = sorted(event_next, key=lambda x: x['value'], reverse=True)[8:]
for event_next_list in event_next_true_eventlist:
true_event_name = event_next_list['event_name']
all_uid_list += event_uid_list[true_event_name]
all_uid_list = list(set(all_uid_list))
# 单个节点的用户详情
else:
all_uid_list = event_uid_list[event_name]
user_num = len(all_uid_list)
account_id = all_uid_list[starindex:10 * page]
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
df1 = await ckdb.query_dataframe(new_sql)
new_values = df1.values.tolist()
for i in range(len(new_values)):
if str(new_values[i][6]) == 'nan':
new_values[i][6] = 0
res = {
'user_num': user_num,
'details_data': {
'new_columns': df1.columns.tolist(),
'new_values': new_values
}}
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/user_property_model_sql")
async def user_property_sql(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.property_model()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/user_property_model_export")
async def user_property_model_export(
request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
):
"""用户属性 导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.property_model()
file_name = quote(f'用户属性.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = data['sql']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
df_to_stream = DfToStream((df, '用户属性'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/user_property_model")
async def user_property_model(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性分析"""
await analysis.init(data_where=current_user.data_where)
res = analysis.property_model()
sql = res['sql']
quota = res['quota']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
# 没有分组
data = {}
if not groupby:
data['总体'] = int(df['values'][0])
title = ['总体', quota]
else:
sum_s = df.groupby(groupby)['values'].sum()
for key, val in sum_s.items():
if isinstance(key, tuple):
key = ','.join([str(i) for i in key])
else:
key = str(key)
data[key] = val
title = ['.'.join(groupby), quota]
return schemas.Msg(code=0, msg='ok', data={
'value': data,
'title': title
})
@router.post("/seek_user")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_seek_user,
ckdb: CKDrive = Depends(get_ck_db)
) -> schemas.Msg:
"""游戏用户搜索功能"""
# 判断的内容
data = data_in.condition
# 需要判断的字段
ziduan = data_in.user_arrt_title
# 筛选条件
tiaojian = data_in.comparator_id
if tiaojian == '==':
tiaojian = '='
# 判断是否是时间类型
if data_in.user_arrt_type == 'datetime':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(
data_in.pages - 1) * 10}"""
# 如果查询'#account_id'则不多余返回一个account_id
elif ziduan == '#account_id':
sql = f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(
data_in.pages - 1) * 10} """
elif data_in.user_arrt_type == 'int':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time` LIMIT 10 OFFSET {(
data_in.pages - 1) * 10}"""
else:
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time` LIMIT 10 OFFSET {(
data_in.pages - 1) * 10}"""
# 查询数据
try:
df = await ckdb.query_dataframe(sql)
except Exception as e:
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
if df.empty:
return schemas.Msg(code=-9, msg='查无数据')
# 转换成列表返回
df.fillna(0, inplace=True)
account_id = list(df['#account_id'])
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
df1 = await ckdb.query_dataframe(new_sql)
new_values = df1.values.tolist()
for i in range(len(new_values)):
if str(new_values[i][6]) == 'nan':
new_values[i][6] = 0
res = {'refer': {
'columns': df.columns.tolist(),
'values': df.values.tolist()
},
'details_data': {
'new_columns': df1.columns.tolist(),
'new_values': new_values
}}
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/seek_user_count")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_seek_user,
ckdb: CKDrive = Depends(get_ck_db)
) -> schemas.Msg:
"""游戏用户搜索功能查询到的数量"""
# 判断的内容
data = data_in.condition
# 需要判断的字段
ziduan = data_in.user_arrt_title
# 筛选条件
tiaojian = data_in.comparator_id
if tiaojian == '==':
tiaojian = '='
# 判断是否是时间类型
if data_in.user_arrt_type == 'datetime':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
# 如果查询'#account_id'则不多余返回一个account_id
elif ziduan == '#account_id':
sql = f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
elif data_in.user_arrt_type == 'int':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
else:
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
# 查询数据
try:
df = await ckdb.query_dataframe(sql)
except Exception as e:
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
# 返回查询到的数量
res = len(df)
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/download_user")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_seek_user,
ckdb: CKDrive = Depends(get_ck_db)
):
"""下载查询到的所有数据"""
# 判断的内容
data = data_in.condition
# 需要判断的字段
ziduan = data_in.user_arrt_title
# 筛选条件
tiaojian = data_in.comparator_id
if tiaojian == '==':
tiaojian = '='
# 判断是否是时间类型
if data_in.user_arrt_type == 'datetime':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE addHours(`{ziduan}`, 8) >= '{data_in.start_time}'
and addHours(`{ziduan}`, 8) <= '{data_in.end_time}' ORDER BY `#reg_time`"""
# 如果查询'#account_id'则不多余返回一个account_id
elif ziduan == '#account_id':
sql = f"""select `{ziduan}`,name from {game}.`user` WHERE `{ziduan}` {tiaojian} '{data_in.condition}' ORDER BY `#reg_time` """
elif data_in.user_arrt_type == 'int':
sql = f"""select `#account_id`,`{ziduan}` from {game}.`user` WHERE `{ziduan}` {tiaojian} {data_in.condition} ORDER BY `#reg_time`"""
else:
sql = f"""select `#account_id`,`{ziduan}` from `{game}`.`user` WHERE `{ziduan}` {tiaojian} '{data}' ORDER BY `#reg_time`"""
# 查询数据
try:
df = await ckdb.query_dataframe(sql)
except Exception as e:
return schemas.Msg(code=0, msg='查询参数不匹配', data=e)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data='')
account_id = list(df['#account_id'])
new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})"""
df1 = await ckdb.query_dataframe(new_sql)
file_name = quote(f'下载的用户搜索数据.xlsx')
mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df1, '下载的用户搜索数据'))
with df_to_stream as d:
export = d.to_stream()
return StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
@router.post("/solo_user")
async def user_property_model(
request: Request,
game: str,
data_in: schemas.Ck_solo_user,
ckdb: CKDrive = Depends(get_ck_db)
):
"""用户的详情"""
if data_in.event_list == []:
return schemas.Msg(code=-9, msg='请配置用户搜索模块事件', data=[])
event_dict = {}
for i in data_in.event_list:
event_dict[i['event']] = i['event_name']
# event_dict={'pay':'充值','create_account':'创建角色','login':'登录','ta_app_end':'离开游戏','guide':'新手引导','level_up':'玩家等级',
# 'vip_level':'vip等级','sign':'签到','summon':'招募','ask_for_join_guild':'加入联盟','leave_guild':'离开联盟','create_guild':'创建联盟'}
sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel,
channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` = '{data_in.account_id}'"""
# 获取用户基本详情
df = await ckdb.query_dataframe(sql)
# 获取用户每天事件量
start_times = data_in.start_time.split(' ')[0]
end_times = data_in.end_time.split(' ')[0]
event = list(event_dict.keys())
sql1 = f"""select toDate(addHours(`#event_time`, `#zone_offset`)) as date,count(`#event_name`) as v from {game}.event
where `date`>='{start_times}' and `date`<='{end_times}'
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by date ORDER by date"""
df1 = await ckdb.query_dataframe(sql1)
# 时间间隔天数
global event_values, data_list, game_details, zhanbi
if len(df1) > 0:
time_interval = getEveryDay(start_times, end_times)
a = list(df1['date'])
aa = []
for i in a:
aa.append(str(i))
for i in time_interval:
if i not in aa:
df1.loc[len(df1.index)] = [i, 0]
df1[['date']] = df1[['date']].astype(str)
df1.sort_values('date', inplace=True)
data_list = list(df1['date'])
event_values = list(df1['v'])
else:
data_list = [] # getEveryDay(start_times,end_times)
event_values = []
# 获取用户事件的详情
sql2 = f"""select * FROM {game}.event WHERE `#account_id`='{data_in.account_id}' and addHours(`#event_time`, `#zone_offset`) >='{data_in.start_time}' and
addHours(`#event_time`, `#zone_offset`) <= '{data_in.end_time}' and `#event_name` in ({event}) order by `#event_time`"""
df2 = await ckdb.query_dataframe(sql2)
if len(df2) > 0:
game_details = {}
# 区分天数
days = list(df2['#event_time'])
day_set = set()
for i in days:
day_set.add(str(i).split(' ')[0])
# 总日期,一天的
day_list = list(day_set)
day_list.sort()
for day in day_list:
game_deta = []
for nu in range(len(df2)):
if day in str(df2['#event_time'][nu]):
# 详细时间
game_detail = {}
time_s = str(df2['#event_time'][nu]).split('+')[0]
game_detail['time'] = time_s.split(' ')[1]
game_detail['event'] = event_dict[df2['#event_name'][nu]]
a_list = []
# 获取df的字段名
columns = df2.columns.values
for col in columns:
a = str(df2[col][nu])
if a != 'None' and a != '' and a != 'nan' and a != '[]':
a_list.append({'title': col, 'val': a})
game_detail['xaingqing'] = a_list
game_deta.append(game_detail)
game_details[day] = game_deta
else:
game_details = {}
# event_count = await ckdb.yesterday_event_count(game)
# 求事件占比
sql3 = f"""select `#event_name` as a,count(`#event_name`) as v from {game}.event
where addHours(`#event_time`, `#zone_offset`)>='{data_in.start_time}' and addHours(`#event_time`, `#zone_offset`)<='{data_in.end_time}'
and `#account_id`='{data_in.account_id}' and `#event_name` in ({event}) group by `#event_name`"""
df3 = await ckdb.query_dataframe(sql3)
if len(df3) > 0:
zhanbi = []
sums = sum(list(df1['v']))
numbers = 0
for i in range(len(df3)):
shuju = {}
shuju['name'] = event_dict[df3['a'][i]]
shuju['value'] = int(df3['v'][i])
# if i != len(df3)-1:
# number1=round(int(df3['v'][i]) / sums, 2)
# number=round(number1*100,2)
# numbers+=number
# shuju['zhanbi'] = str(number) + '%'
# else:
# shuju['zhanbi']=str(100-numbers) + '%'
zhanbi.append(shuju)
else:
zhanbi = []
res = {
'details_data': {
'new_columns': df.columns.tolist(),
'new_values': df.values.tolist()},
'event_count': {
'date': data_list,
'event_values': event_values
},
'details_user': game_details,
'proportion': zhanbi
}
return schemas.Msg(code=0, msg='ok', data=res)
@router.get("/event_list")
async def event_list(
request: Request,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""个人详情中的事件列表"""
# 获取事件名
# event_list = await ckdb.distinct(game, 'event', '#event_name')
event_list = await crud.event_list.get_list(db, game)
if event_list == []:
return schemas.Msg(code=0, msg='请配置用户搜索模块事件', data=[])
else:
res = event_list[0]['details']
return schemas.Msg(code=0, msg='ok', data=res)
@router.post("/add_event_list")
async def add_select_map(
request: Request,
game: str,
file: bytes = File(...),
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""添加对应游戏事件选择映射"""
dfs = pd.read_excel(file, engine='openpyxl', sheet_name=None)
for attr_name, df in dfs.items():
# 将id这列转换成字符串类型
if len(df) > 0:
ColNames = df.columns.tolist()
event = df.to_dict('records')
details = []
for i in event:
details_dict = {}
details_dict['event'] = i[ColNames[0]]
details_dict['event_name'] = i[ColNames[1]]
details.append(details_dict)
data_in = schemas.Event_list(game=game, details=details)
await crud.event_list.save(db, data_in)
return schemas.Msg(code=0, msg='ok', data=1)