1.修改底层生成SQL代码

2.新增游戏埋点,并给配置新的表
This commit is contained in:
李伟 2022-07-07 09:59:24 +08:00
parent 7aaed98fdc
commit 317aa592a3
11 changed files with 494 additions and 52 deletions

View File

@ -394,8 +394,9 @@ async def event_model(
groups = [] groups = []
for gitem in item['groups']: for gitem in item['groups']:
gb = [] gb = []
if '(' in gitem: if '(' in gitem or '[' in gitem:
gitem = gitem.strip('(').strip(')').replace(' ', '').replace("'", '') gitem = gitem.replace('(', '').replace(')', '').replace(' ', '').replace("'", '') \
.replace('[', '').replace(']', '')
if isinstance(gitem, list): if isinstance(gitem, list):
true_list = gitem true_list = gitem
else: else:
@ -441,6 +442,7 @@ async def retention_model(request: Request,
current_user: schemas.UserDB = Depends(deps.get_current_user) current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg: ) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where) await analysis.init(data_where=current_user.data_where)
"""留存分析模型"""
try: try:
res = await analysis.retention_model_sql2() # 初始化开始时间结束时间sql语句 字典 res = await analysis.retention_model_sql2() # 初始化开始时间结束时间sql语句 字典
except Exception as e: except Exception as e:
@ -456,6 +458,7 @@ async def retention_model(request: Request,
filter_item_type = res['filter_item_type'] # all filter_item_type = res['filter_item_type'] # all
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30 filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
df.set_index('reg_date', inplace=True) df.set_index('reg_date', inplace=True)
# 补齐没有数据的日期
for d in set(res['date_range']) - set(df.index): for d in set(res['date_range']) - set(df.index):
df.loc[d] = 0 df.loc[d] = 0
@ -498,6 +501,140 @@ async def retention_model(request: Request,
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}'] retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0'] retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}'] retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
# 算均值
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
tmp['d0'] = 0
for rt, rd in retention_avg_dict.items():
tmp['d0'] = int(df['cnt0'].sum())
n = round(rd['cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p'].append(n)
tmp['n'].append(rd['cntn'])
n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2)
n = 0 if np.isnan(n) else n
tmp['p_outflow'].append(n)
tmp['n_outflow'].append(rd['o_cntn'])
# 次留数
title = ['日期', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]]
# 未到达的日期需要补齐-
retention_length = len(retention_n)
for _, items in summary_values.items():
for key in ['p', 'n', 'p_outflow', 'n_outflow']:
items[key].extend(['-'] * (retention_length - len(items[key])))
resp = {
'summary_values': summary_values,
# 'values': values,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
'title': title,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'start_date': res['start_date'],
'end_date': res['end_date'],
'time_particle': res['time_particle']
}
return schemas.Msg(code=0, msg='ok', data=resp)
@router.post("/retention_model_details")
async def retention_model(request: Request,
game: str,
ckdb: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
analysis: BehaviorAnalysis = Depends(BehaviorAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
"""留存分析分组详情"""
try:
res = await analysis.retention_model_sql3() # 初始化开始时间结束时间sql语句 字典
except Exception as e:
return schemas.Msg(code=-9, msg='报表配置参数异常')
sql = res['sql'] # 获取到sql语句
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None)
date_range = res['date_range'] # 时间 列表
# unit_num = res['unit_num'] # int
retention_n = res['retention_n'] # 列表 int
filter_item_type = res['filter_item_type'] # all
filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30
# 映射对应中文返回给前端展示
groupby_list=analysis.event_view.get('groupBy')
groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label']
if len(groupby_list) == 1:
max_v = int(df[groupby_list[0]['columnName']].max())
min_v = int(df[groupby_list[0]['columnName']].min())
for i in groupby:
if i == 'svrindex':
if game == 'mfmh5':
game = 'mzmfmh5'
chinese = {}
resp = await crud.select_map.get_one(db, game, i)
for ii in resp:
chinese[ii['id']] = ii['title']
for k, v in chinese.items():
# 开始映射
df.loc[df['svrindex'] == k, 'svrindex'] = v
times=df['reg_date'][0]
df.set_index(groupby, inplace=True)
# for d in set(res['date_range']) - set(df.index):
# df.loc[d] = 0
df.sort_index(inplace=True)
summary_values = {'均值': {}}
max_retention_n = 1
# 留存人数
avg = {}
# 流失人数
avgo = {}
for date, v in df.T.items():
# 字典中data存在时不替换否则将data替换成空字典
tmp = summary_values.setdefault(date, dict())
tmp['d0'] = int(v.cnt0)
tmp['p'] = []
tmp['n'] = []
tmp['p_outflow'] = []
tmp['n_outflow'] = []
for i in retention_n:
n = (pd.Timestamp.now().date() - v[0]).days
if i > n:
continue
# max_retention_n = i if i > max_retention_n else max_retention_n
# 留存的人数
avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}']
# 流失的人数
avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}']
tmp['p'].append(v[f'p{i}'])
tmp['n'].append(v[f'cnt{i}'])
tmp['p_outflow'].append(v[f'op{i}'])
tmp['n_outflow'].append(v[f'on{i}'])
tmp = summary_values['均值']
retention_avg_dict = {}
# 多个分组项时,合成列表返回
if len(groupby)>1:
summary_valuess = {}
for k, v in summary_values.items():
if 'str' in str(type(k)):
summary_valuess[str([k])] = v
else:
summary_valuess[str(list(k))] = v
else:
summary_valuess=summary_values
for rn in retention_n:
for rt, rd in df.T.items():
if times + datetime.timedelta(days=rn) <= pd.datetime.now().date():
retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0})
retention_avg_dict[rn]['cnt0'] += rd['cnt0']
retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}']
retention_avg_dict[rn]['o_cnt0'] += rd['cnt0']
retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}']
tmp['p'] = [] tmp['p'] = []
tmp['n'] = [] tmp['n'] = []
@ -514,18 +651,66 @@ async def retention_model(request: Request,
n = 0 if np.isnan(n) else n n = 0 if np.isnan(n) else n
tmp['p_outflow'].append(n) tmp['p_outflow'].append(n)
tmp['n_outflow'].append(rd['o_cntn']) tmp['n_outflow'].append(rd['o_cntn'])
# 如果分组项是int类型按选择的分组
# 次留数 # 默认区间
title = ['日期', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]] if analysis.event_view.get('groupBy')[0]['intervalType'] == 'def':
summary_valuess.pop('均值')
interval = (max_v - min_v) // 10 or 1
lens = len(summary_valuess[max_v]['n'])
ress = {}
for i in range(min_v, max_v, interval):
d0 = 0
n1 = []
n_outflow1 = []
for k, v in summary_valuess.items():
if k >= i and k < i + interval:
d0 += v['d0']
n1.append(v['n'])
n_outflow1.append(v['n_outflow'])
if len(n1) > 0:
re_dict = {}
n = np.sum([ii for ii in n1], axis=0).tolist()
n_outflow = np.sum([iii for iii in n_outflow1], axis=0).tolist()
p = [round(nu*100 / d0, 2) for nu in n]
p_outflow = [round(num*100 / d0, 2) for num in n_outflow]
re_dict['d0'] = d0
re_dict['n'] = n
re_dict['n_outflow'] = n_outflow
re_dict['p'] = p
re_dict['p_outflow'] = p_outflow
ress[f"[{i},{i + interval})"] = re_dict
else:
re_dict = {'d0': 0}
n = []
n_outflow = []
p = []
p_outflow = []
for cishu in range(0, lens):
n.append(0)
n_outflow.append(0)
p.append(0)
p_outflow.append(0)
re_dict['n'] = n
re_dict['n_outflow'] = n_outflow
re_dict['p'] = p
re_dict['p_outflow'] = p_outflow
ress[f"[{i},{i + interval})"] = re_dict
summary_valuess=ress
# 自定义区间
elif analysis.event_view.get('groupBy')[0]['intervalType'] == 'user_defined':
pass
# 次留数
title = ['分组项', '用户数', '次留', *[f'{i + 1}' for i in retention_n[1:]]]
# 未到达的日期需要补齐- # 未到达的日期需要补齐-
retention_length = len(retention_n) retention_length = len(retention_n)
for _, items in summary_values.items(): for _, items in summary_valuess.items():
for key in ['p', 'n', 'p_outflow', 'n_outflow']: for key in ['p', 'n', 'p_outflow', 'n_outflow']:
items[key].extend(['-'] * (retention_length - len(items[key]))) items[key].extend(['-'] * (retention_length - len(items[key])))
resp = { resp = {
'summary_values': summary_values, 'summary_values': summary_valuess,
# 'values': values, # 'values': values,
'date_range': [d.strftime('%Y-%m-%d') for d in date_range], 'date_range': [d.strftime('%Y-%m-%d') for d in date_range],
'title': title, 'title': title,
@ -832,7 +1017,7 @@ async def funnel_model(
_ = date_data.setdefault(key.strftime('%Y-%m-%d'), {}) _ = date_data.setdefault(key.strftime('%Y-%m-%d'), {})
_['总体'] = tmp _['总体'] = tmp
# 分组
if groupby: if groupby:
# 补齐数据 # 补齐数据
concat_data = [] concat_data = []
@ -843,7 +1028,18 @@ async def funnel_model(
df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)])
# df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False) # df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False)
# 映射对应中文返回给前端展示
for i in groupby:
if i == 'svrindex':
if game == 'mfmh5':
game = 'mzmfmh5'
chinese = {}
resp = await crud.select_map.get_one(db, game, i)
for ii in resp:
chinese[ii['id']] = ii['title']
for k, v in chinese.items():
# 开始映射
df.loc[df[i] == k, i] = v
for key, tmp_df in df.groupby(groupby): for key, tmp_df in df.groupby(groupby):
tmp = {'title': key} tmp = {'title': key}
tmp_df = tmp_df.groupby('level').sum() tmp_df = tmp_df.groupby('level').sum()
@ -1247,18 +1443,19 @@ async def scatter_model(
#按天分组 #按天分组
sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va',
1) 1)
sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1) sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1)
#按周分组 #按周分组
sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va',
1) 1)
sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1) sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1)
#按月分组 #按月分组
sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va',
1) 1)
sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1) sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1)
#合计 #合计
if analysis.event_view.get('timeParticleSize') == "total": if analysis.event_view.get('timeParticleSize') == "total":
sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1) sql = sql.replace(f'SELECT', f'SELECT `{columnName}` as va,', 1)
print(sql)
df = await ckdb.query_dataframe(sql) df = await ckdb.query_dataframe(sql)
if df.empty: if df.empty:
return schemas.Msg(code=-9, msg='无数据', data=None) return schemas.Msg(code=-9, msg='无数据', data=None)

View File

@ -399,21 +399,21 @@ class Settings(BaseSettings):
case_sensitive = True case_sensitive = True
class Debug(Settings):
MDB_HOST: str = '10.0.0.9'
MDB_PORT: int = 27017
MDB_USER: str = 'root'
MDB_PASSWORD: str = 'iamciniao'
MDB_DB: str = 'xdata'
DATABASE_URI = f'mongodb://{MDB_USER}:{MDB_PASSWORD}@{MDB_HOST}:{MDB_PORT}/admin'
#本地MongoDB的库测试
# class Debug(Settings): # class Debug(Settings):
# MDB_HOST: str = '127.0.0.1' # MDB_HOST: str = '10.0.0.9'
# MDB_PORT: int = 27017 # MDB_PORT: int = 27017
# MDB_USER: str = 'root'
# MDB_PASSWORD: str = 'iamciniao'
# MDB_DB: str = 'xdata' # MDB_DB: str = 'xdata'
# #
# DATABASE_URI = f'mongodb://{MDB_HOST}:{MDB_PORT}/admin' # DATABASE_URI = f'mongodb://{MDB_USER}:{MDB_PASSWORD}@{MDB_HOST}:{MDB_PORT}/admin'
#本地MongoDB的库测试
class Debug(Settings):
MDB_HOST: str = '127.0.0.1'
MDB_PORT: int = 27017
MDB_DB: str = 'xdata'
DATABASE_URI = f'mongodb://{MDB_HOST}:{MDB_PORT}/admin'
class Produce(Settings): class Produce(Settings):
MDB_HOST: str = '127.0.0.1' MDB_HOST: str = '127.0.0.1'

View File

@ -21,3 +21,4 @@ from .crud_url_list import url_list
from .crud_user_url import user_url from .crud_user_url import user_url
from .crud_api_module import api_module from .crud_api_module import api_module
from .crud_event_list import event_list from .crud_event_list import event_list
from .crud_event_point import event_point

29
crud/crud_event_point.py Normal file
View File

@ -0,0 +1,29 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'event_point',
from utils import get_uid
class CRUDProjectNumber(CRUDBase):
# 获取对应游戏的数据,默认把基础属性的数据也获取出来
async def all_event(self, db: AsyncIOMotorDatabase, game):
return await self.find_many(db, {'game': {'$in':[game,'basics_attr']}})
# # 修改数据
# async def update(self, db: AsyncIOMotorDatabase, data_in: schemas.AddProjectnumber):
# game = data_in.game
# add_ditch = []
# for member in data_in.ditch:
# add_ditch.append(member.dict())
# await self.update_one(db, {'game': game}, {'$set': {'ditch': add_ditch}})
#
# # 插入数据
# async def create(self, db: AsyncIOMotorDatabase, data_in: schemas.ProjectnumberInsert):
# # await self.update_one(db, {'xiangmu': data_in.xiangmu}, {'$set': data_in.dict()}, upsert=True)
# await self.update_one(db, {data_in.game, data_in.ditch}, upsert=True)
event_point = CRUDProjectNumber('event_point')

View File

@ -162,5 +162,5 @@ async def add_process_time_header(request: Request, call_next):
if __name__ == '__main__': if __name__ == '__main__':
#uvicorn.run(app='main:app', host="10.0.0.240", port=7899, reload=True, debug=True) uvicorn.run(app='main:app', host="10.0.0.240", port=7899, reload=True, debug=True)
uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) #uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True)

View File

@ -19,7 +19,8 @@ from db import get_database
from db.redisdb import get_redis_pool, RedisDrive from db.redisdb import get_redis_pool, RedisDrive
from models.user_label import UserClusterDef from models.user_label import UserClusterDef
from utils import get_week, strptime, start_end_month, strptime1 from utils import get_week, strptime, start_end_month, strptime1, get_event
class CombinationEvent: class CombinationEvent:
def __init__(self, data, string, format): def __init__(self, data, string, format):
@ -255,7 +256,7 @@ class BehaviorAnalysis:
:param relation: :param relation:
:return: :return:
""" """
# 事件,留存分布漏斗分析生成sql经过
user_filters = [] user_filters = []
event_filters = [] event_filters = []
for filter in filters: for filter in filters:
@ -281,9 +282,56 @@ class BehaviorAnalysis:
continue continue
else: else:
continue continue
# 表名
tbl = getattr(self, f'{item["tableType"]}_tbl') tbl = getattr(self, f'{item["tableType"]}_tbl')
# 字段名
col = getattr(tbl.c, item['columnName']) col = getattr(tbl.c, item['columnName'])
# 判断是否是同一个事件
yuan_event=self.events[0].get('eventName') or self.events[0].get('event_name') # 指标中的事件名
biao_event=self.events[0].get('customEvent','').split('.')[0]
event= await get_event(item['columnName'],self.game) # 获取对应事件名或基础属性
if event != yuan_event and event != biao_event and event != '基础属性' and self.game != 'debug':
event_time_col = getattr(self.event_tbl.c, '#event_time')
event_name_col = getattr(self.event_tbl.c, '#event_name')
base_where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
func.addHours(event_time_col, self.zone_time) <= self.end_date,]
event_name_where = []
event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event))
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
event_name_where.append(or_(*[col == v for v in ftv]))
else:
event_name_where.append(col == ftv[0])
elif comparator == '>=':
event_name_where.append(col >= ftv[0])
elif comparator == '<=':
event_name_where.append(col <= ftv[0])
elif comparator == '>':
event_name_where.append(col > ftv[0])
elif comparator == '<':
event_name_where.append(col < ftv[0])
elif comparator == 'range':
event_name_where.append(col > ftv[0])
event_name_where.append(col <= ftv[1])
elif comparator == 'is not null':
event_name_where.append(col.isnot(None))
elif comparator == 'is null':
event_name_where.append(col.is_(None))
elif comparator == 'like':
event_name_where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
event_name_where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in':
event_name_where.append(col.in_(ftv))
elif comparator == '!=':
event_name_where.append(col != ftv[0])
sub_qry=sa.select(sa.Column('#account_id')).select_from(
sa.select(sa.Column('#account_id')).where(and_(*base_where,*event_name_where))
)
event_filter.append(sa.Column('#account_id').in_(sub_qry))
continue
# 日期类型处理时区 # 日期类型处理时区
if item.get('data_type') == 'datetime': if item.get('data_type') == 'datetime':
col = func.addHours(col, self.zone_time) col = func.addHours(col, self.zone_time)
@ -404,6 +452,7 @@ class BehaviorAnalysis:
} }
async def event_model_sql(self): async def event_model_sql(self):
"""事件分析生成sql会经过"""
sqls = [] sqls = []
event_time_col = getattr(self.event_tbl.c, '#event_time') event_time_col = getattr(self.event_tbl.c, '#event_time')
for event in self.events: for event in self.events:
@ -776,19 +825,29 @@ ORDER BY level
} }
async def trace_model_sql(self): async def trace_model_sql(self):
# 路径分析生成SQL
session_interval = self.event_view.get('session_interval') session_interval = self.event_view.get('session_interval')
session_type = self.event_view.get('session_type') session_type = self.event_view.get('session_type')
session_type_map = { session_type_map = {
'minute': 60, 'minute': 60,
'second': 1, 'second': 1,
'hour': 3600 'hour': 3600
} }
interval_ts = session_interval * session_type_map.get(session_type, 60) interval_ts = session_interval * session_type_map.get(session_type, 60)
event_names = self.events.get('event_names') event_names = self.events.get('event_names')
source_event = self.events.get('source_event', {}).get('eventName') source_event = self.events.get('source_event', {}).get('eventName')
source_type = self.events.get('source_event', {}).get('source_type') source_type = self.events.get('source_event', {}).get('source_type')
wheres = self.events['user_filter']['filts']
sql_one=''
if wheres != []: # 有筛选条件的时候使用
columnName=wheres[0]['columnName'] # 获取字段名
event=await get_event(columnName,self.game) # 获取字段属于那个事件,或者是否是基础属性
if event == '基础属性':
sql_one=f""" and `{columnName}` {wheres[0]['comparator']} '{wheres[0]['ftv'][0]}' and `#event_name` in evnet_all) """
else:# 如果包含有事件则进入下面的逻辑
sql_one=f"""and `#account_id` in ( SELECT `#account_id` FROM {self.game}.event WHERE `#event_name`= '{event}' and `{columnName}` =
'{wheres[0]['ftv'][0]}' and addHours(`#event_time`, {self.zone_time}) >= start_data
and addHours(`#event_time`, {self.zone_time}) <= end_data ) and `#event_name` in evnet_all)"""
sql_a = f"""with sql_a = f"""with
'{source_event}' as start_event, '{source_event}' as start_event,
{tuple(event_names)} as evnet_all, {tuple(event_names)} as evnet_all,
@ -879,6 +938,8 @@ GROUP BY event_chain,`#account_id`
ORDER BY values desc""" ORDER BY values desc"""
sql = sql_a if source_type == 'initial_event' else sql_b sql = sql_a if source_type == 'initial_event' else sql_b
if sql_one != '':
sql= sql.replace('and `#event_name` in evnet_all)',sql_one,1)
print(sql) print(sql)
return { return {
'sql': sql, 'sql': sql,
@ -888,10 +949,11 @@ ORDER BY values desc"""
} }
async def retention_model_sql2(self): async def retention_model_sql2(self):
# 留存分析生成SQL
filter_item_type = self.event_view.get('filter_item_type') filter_item_type = self.event_view.get('filter_item_type')
filter_item = self.event_view.get('filter_item') filter_item = self.event_view.get('filter_item')
event_name_a = self.events[0]['eventName'] event_name_a = self.events[0]['eventName'] # 初始的事件名
event_name_b = self.events[1]['eventName'] event_name_b = self.events[1]['eventName'] # 回访的事件名
visit_name = self.events[0].get('event_attr_id') visit_name = self.events[0].get('event_attr_id')
@ -973,3 +1035,107 @@ group by a.reg_date) log on reg.date=log.reg_date
'start_date': self.start_date[:10], 'start_date': self.start_date[:10],
'end_date': self.end_date[:10], 'end_date': self.end_date[:10],
} }
async def retention_model_sql3(self):
# 留存分析分组详情生成SQL
filter_item_type = self.event_view.get('filter_item_type')
filter_item = self.event_view.get('filter_item')
event_name_a = self.events[0]['eventName'] # 初始的事件名
event_name_b = self.events[1]['eventName'] # 回访的事件名
groupby_list=self.event_view.get('groupBy')
# 判断是基础的还是标签
groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label']
visit_name = self.events[0].get('event_attr_id')
where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_a = '1'
if where:
qry = sa.select().where(*where)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_a = 'WHERE '.join(sql.split('WHERE ')[1:])
# 任意事件
event_name_b = 1 if event_name_b == '*' else f"`#event_name` = '{event_name_b}'"
days = (arrow.get(self.end_date).date() - arrow.get(self.start_date).date()).days
keep = []
cnt = []
retention_n = [*[k for k in range(1, 60)], 70 - 1, 75 - 1, 80 - 1, 85 - 1, 90 - 1, 95 - 1, 100 - 1, 110 - 1,
120 - 1, 150 - 1, 180 - 1, 210 - 1, 240 - 1, 270 - 1, 300 - 1,
360 - 1]
"""
cnt0-cnt1 as on1,
round(on1 * 100 / cnt0, 2) as `0p1`,
"""
for i in retention_n:
keep.append(
f"""cnt{i},
round(cnt{i} * 100 / cnt0, 2) as `p{i}`,
cnt0-cnt{i} as on{i},
round(on{i} * 100 / cnt0, 2) as `op{i}`
""")
cnt.append(f"""sum(if(dateDiff('day',a.reg_date,b.visit_date)={i},1,0)) as cnt{i}""")
keep_str = ','.join(keep)
cnt_str = ','.join(cnt)
if len(groupby) > 0:
groupbys = ','.join([f"`{i}`" for i in groupby])
groupby_on = ' and '.join([f"reg.{ii} = log.{ii}" for ii in [f"`{i}`" for i in groupby]])
sql=f"""
with '{event_name_a}' as start_event,
{event_name_b} as retuen_visit,
`{visit_name}` as visit,
'{self.start_date}' as start_data,
'{self.end_date}' as end_data,
toDate(addHours(`#event_time`, {self.zone_time})) as date
select reg_date,
{groupbys},
cnt0 ,
{keep_str}
from(select date,{groupbys},uniqExact(visit) as cnt0 from {self.game}.event
where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a}
group by date,{groupbys}) reg left join
(select a.reg_date,{groupbys},
{cnt_str}
from (select {groupbys},date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit,{groupbys}) a
left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on
a.visit = b.visit
group by a.reg_date,{groupbys}) log on reg.date=log.reg_date and {groupby_on}
"""
else:
sql = f"""
with '{event_name_a}' as start_event,
{event_name_b} as retuen_visit,
`{visit_name}` as visit,
'{self.start_date}' as start_data,
'{self.end_date}' as end_data,
toDate(addHours(`#event_time`, {self.zone_time})) as date
select reg_date,
cnt0 ,
{keep_str}
from(select date, uniqExact(visit) as cnt0 from {self.game}.event
where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a}
group by date) reg left join
(select a.reg_date,
{cnt_str}
from (select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit) a
left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on
a.visit = b.visit
group by a.reg_date) log on reg.date=log.reg_date
"""
print(sql)
return {
'sql': sql,
'group_label': self.group_label,
'date_range': self.date_range,
'unit_num': self.unit_num,
'retention_n': retention_n,
'filter_item_type': filter_item_type,
'filter_item': filter_item,
'time_particle': self.time_particle,
'start_date': self.start_date[:10],
'end_date': self.end_date[:10],
}

View File

@ -43,7 +43,7 @@ class UserClusterDef:
res_json = await self.rdb.get(f'{self.game}_user') res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys() columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game) metadata = sa.MetaData(schema=self.game)
#self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])修改了这里,这是原本的 # self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) # 修改了这里,这是原本的
self.user_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) self.user_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
self.u_account_id_col = getattr(self.user_tbl.c, '#account_id') self.u_account_id_col = getattr(self.user_tbl.c, '#account_id')
self.e_account_id_col = getattr(self.event_tbl.c, '#account_id') self.e_account_id_col = getattr(self.event_tbl.c, '#account_id')

View File

@ -15,6 +15,8 @@ import schemas
from core.config import settings from core.config import settings
from db import get_database from db import get_database
from db.redisdb import get_redis_pool, RedisDrive from db.redisdb import get_redis_pool, RedisDrive
from models.user_label import UserClusterDef
from utils import get_event
class XAnalysis: class XAnalysis:
@ -30,7 +32,11 @@ class XAnalysis:
self.date_range = [] self.date_range = []
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
self.start_date = None
self.end_date = None
self.data_where=[]
self.event_tbl=None
self.zone_time:int = 0
def _get_global_filters(self): def _get_global_filters(self):
return self.event_view.get('filts') or [] #获取event_view字典里面filts的值或返回空列表 return self.event_view.get('filts') or [] #获取event_view字典里面filts的值或返回空列表
@ -124,8 +130,8 @@ class XAnalysis:
return event_filters return event_filters
def ltv_model_sql(self): def ltv_model_sql(self):
# ltv的生成sql
days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days
quota = self.event_view['quota'] quota = self.event_view['quota']
select_ltv = [] select_ltv = []

View File

@ -25,3 +25,4 @@ from .url_list import *
from .user_url import * from .user_url import *
from .api_module import * from .api_module import *
from .event_list import * from .event_list import *
from .event_point import *

8
schemas/event_point.py Normal file
View File

@ -0,0 +1,8 @@
from pydantic import BaseModel
from typing import List
class Eventpoint(BaseModel):
game: str # 游戏名
event_name: str # 事件名
event_attr: List[str] # 事件属性

View File

@ -1,4 +1,3 @@
import random import random
import time import time
import datetime import datetime
@ -7,6 +6,12 @@ import pandas as pd
from datetime import timedelta from datetime import timedelta
from datetime import datetime as p1 from datetime import datetime as p1
import calendar import calendar
import crud
import schemas
from db import get_database
def get_uid(): def get_uid():
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:] return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
@ -27,7 +32,9 @@ def estimate_data(data_type):
return "Nullable(DateTime('UTC'))" return "Nullable(DateTime('UTC'))"
else: else:
return "Nullable(String)" return "Nullable(String)"
#将字典变成字符串
# 将字典变成字符串
def dict_to_str(dic): def dict_to_str(dic):
c = str() c = str()
b = 0 b = 0
@ -41,18 +48,21 @@ def dict_to_str(dic):
c += "\"%s\":\"%s\"}" % (k, v) c += "\"%s\":\"%s\"}" % (k, v)
return c return c
def getEveryDay(begin_date,end_date):
def getEveryDay(begin_date, end_date):
# 前闭后闭 # 前闭后闭
date_list = [] date_list = []
begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d") begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date,"%Y-%m-%d") end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
while begin_date <= end_date: while begin_date <= end_date:
date_str = begin_date.strftime("%Y-%m-%d") date_str = begin_date.strftime("%Y-%m-%d")
date_list.append(date_str) date_list.append(date_str)
begin_date += datetime.timedelta(days=1) begin_date += datetime.timedelta(days=1)
return date_list return date_list
#print(getEveryDay('2016-01-01','2017-05-11'))
def Download_xlsx(df,name):
# print(getEveryDay('2016-01-01','2017-05-11'))
def Download_xlsx(df, name):
""" """
下载功能 下载功能
name为文件名 name为文件名
@ -61,14 +71,15 @@ def Download_xlsx(df,name):
import mimetypes import mimetypes
from utils import DfToStream from utils import DfToStream
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
file_name=quote(f'{name}.xlsx') file_name = quote(f'{name}.xlsx')
mime = mimetypes.guess_type(file_name)[0] mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df, name)) df_to_stream = DfToStream((df, name))
with df_to_stream as d: with df_to_stream as d:
export = d.to_stream() export = d.to_stream()
Download=StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
return Download return Download
def jiange_insert(list_date): def jiange_insert(list_date):
""" """
间隔1条插入一条数据插入数据 间隔1条插入一条数据插入数据
@ -81,6 +92,7 @@ def jiange_insert(list_date):
i += 2 i += 2
return list_date return list_date
def create_df(resp): def create_df(resp):
""" """
分布分析外部下载功能的df数据 分布分析外部下载功能的df数据
@ -118,7 +130,9 @@ def create_df(resp):
columns.insert(0, '事件发生时间') columns.insert(0, '事件发生时间')
df = pd.DataFrame(data=date, columns=columns) df = pd.DataFrame(data=date, columns=columns)
return df return df
def create_neidf(resp,columnName):
def create_neidf(resp, columnName):
""" """
分布分析内部下载功能的df数据 分布分析内部下载功能的df数据
""" """
@ -156,19 +170,22 @@ def create_neidf(resp,columnName):
df = pd.DataFrame(data=date, columns=columns) df = pd.DataFrame(data=date, columns=columns)
return df return df
def get_week(date_str=None): def get_week(date_str=None):
if date_str and isinstance(date_str, str): if date_str and isinstance(date_str, str):
now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S") now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime(
"%Y-%m-%d %H:%M:%S")
else: else:
now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0) now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0)
now_time=strptime(now_time) now_time = strptime(now_time)
# 当前日期所在周的周一 # 当前日期所在周的周一
week_start_time = now_time - timedelta(days=now_time.weekday()+1, hours=now_time.hour, minutes=now_time.minute, week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute,
seconds=now_time.second) seconds=now_time.second)
# 当前日期所在周的周日 # 当前日期所在周的周日
week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59) week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59)
return week_start_time, week_end_time return week_start_time, week_end_time
def strptime(date_string): def strptime(date_string):
""" """
将字符串转换成datetime.datetime类型 将字符串转换成datetime.datetime类型
@ -177,6 +194,7 @@ def strptime(date_string):
""" """
return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S') return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S')
def strptime1(date_str): def strptime1(date_str):
""" """
将字符串转换成datetime.datetime类型 将字符串转换成datetime.datetime类型
@ -185,14 +203,30 @@ def strptime1(date_str):
""" """
return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")
def start_end_month(time): def start_end_month(time):
""" """
获取某个月的起始时间和结束时间 获取某个月的起始时间和结束时间
:param time: '2022-05-29' :param time: '2022-05-29'
:return: :return:
""" """
now=p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S") now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S")
this_month_start = datetime.datetime(now.year, now.month, 1) this_month_start = datetime.datetime(now.year, now.month, 1)
this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1]) this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1])
this_month_end1=this_month_end+timedelta(hours=23, minutes=59, seconds=59) this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59)
return this_month_start,this_month_end1 return this_month_start, this_month_end1
async def get_event(attr, game):
"""
:param attr: str 事件属性
:param game: str 游戏名
:return: str 事件名
"""
res = await crud.event_point.all_event(get_database(),game)
event_name=''
for i in res:
if attr in i['event_attr']:
event_name = i['event_name']
break
return event_name