diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index e26f421..1c51414 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -394,8 +394,9 @@ async def event_model( groups = [] for gitem in item['groups']: gb = [] - if '(' in gitem: - gitem = gitem.strip('(').strip(')').replace(' ', '').replace("'", '') + if '(' in gitem or '[' in gitem: + gitem = gitem.replace('(', '').replace(')', '').replace(' ', '').replace("'", '') \ + .replace('[', '').replace(']', '') if isinstance(gitem, list): true_list = gitem else: @@ -441,6 +442,7 @@ async def retention_model(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: await analysis.init(data_where=current_user.data_where) + """留存分析模型""" try: res = await analysis.retention_model_sql2() # 初始化开始时间结束时间,sql语句 字典 except Exception as e: @@ -456,6 +458,7 @@ async def retention_model(request: Request, filter_item_type = res['filter_item_type'] # all filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30 df.set_index('reg_date', inplace=True) + # 补齐没有数据的日期 for d in set(res['date_range']) - set(df.index): df.loc[d] = 0 @@ -498,6 +501,140 @@ async def retention_model(request: Request, retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}'] retention_avg_dict[rn]['o_cnt0'] += rd['cnt0'] retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}'] + # 算均值 + tmp['p'] = [] + tmp['n'] = [] + tmp['p_outflow'] = [] + tmp['n_outflow'] = [] + tmp['d0'] = 0 + for rt, rd in retention_avg_dict.items(): + tmp['d0'] = int(df['cnt0'].sum()) + n = round(rd['cntn'] * 100 / rd['cnt0'], 2) + n = 0 if np.isnan(n) else n + tmp['p'].append(n) + tmp['n'].append(rd['cntn']) + n = round(rd['o_cntn'] * 100 / rd['cnt0'], 2) + n = 0 if np.isnan(n) else n + tmp['p_outflow'].append(n) + tmp['n_outflow'].append(rd['o_cntn']) + # 次留数 + title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] + + # 未到达的日期需要补齐- + retention_length = len(retention_n) + for _, items in summary_values.items(): + for key in ['p', 'n', 'p_outflow', 'n_outflow']: + items[key].extend(['-'] * (retention_length - len(items[key]))) + + resp = { + 'summary_values': summary_values, + # 'values': values, + 'date_range': [d.strftime('%Y-%m-%d') for d in date_range], + 'title': title, + 'filter_item_type': filter_item_type, + 'filter_item': filter_item, + 'start_date': res['start_date'], + 'end_date': res['end_date'], + 'time_particle': res['time_particle'] + + } + return schemas.Msg(code=0, msg='ok', data=resp) + + +@router.post("/retention_model_details") +async def retention_model(request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + db: AsyncIOMotorDatabase = Depends(get_database), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + await analysis.init(data_where=current_user.data_where) + """留存分析分组详情""" + try: + res = await analysis.retention_model_sql3() # 初始化开始时间结束时间,sql语句 字典 + except Exception as e: + return schemas.Msg(code=-9, msg='报表配置参数异常') + sql = res['sql'] # 获取到sql语句 + df = await ckdb.query_dataframe(sql) + if df.empty: + return schemas.Msg(code=-9, msg='无数据', data=None) + date_range = res['date_range'] # 时间 列表 + # unit_num = res['unit_num'] # int + retention_n = res['retention_n'] # 列表 int + filter_item_type = res['filter_item_type'] # all + filter_item = res['filter_item'] # 列表 0,1,3,7,14,21,30 + # 映射对应中文返回给前端展示 + groupby_list=analysis.event_view.get('groupBy') + groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] + if len(groupby_list) == 1: + max_v = int(df[groupby_list[0]['columnName']].max()) + min_v = int(df[groupby_list[0]['columnName']].min()) + for i in groupby: + if i == 'svrindex': + if game == 'mfmh5': + game = 'mzmfmh5' + chinese = {} + resp = await crud.select_map.get_one(db, game, i) + for ii in resp: + chinese[ii['id']] = ii['title'] + for k, v in chinese.items(): + # 开始映射 + df.loc[df['svrindex'] == k, 'svrindex'] = v + times=df['reg_date'][0] + df.set_index(groupby, inplace=True) + # for d in set(res['date_range']) - set(df.index): + # df.loc[d] = 0 + + df.sort_index(inplace=True) + summary_values = {'均值': {}} + max_retention_n = 1 + # 留存人数 + avg = {} + # 流失人数 + avgo = {} + for date, v in df.T.items(): + # 字典中data存在时不替换,否则将data替换成空字典 + tmp = summary_values.setdefault(date, dict()) + tmp['d0'] = int(v.cnt0) + tmp['p'] = [] + tmp['n'] = [] + tmp['p_outflow'] = [] + tmp['n_outflow'] = [] + for i in retention_n: + n = (pd.Timestamp.now().date() - v[0]).days + if i > n: + continue + # max_retention_n = i if i > max_retention_n else max_retention_n + # 留存的人数 + avg[i] = avg.setdefault(i, 0) + v[f'cnt{i}'] + # 流失的人数 + avgo[i] = avgo.setdefault(i, 0) + v[f'on{i}'] + tmp['p'].append(v[f'p{i}']) + tmp['n'].append(v[f'cnt{i}']) + tmp['p_outflow'].append(v[f'op{i}']) + tmp['n_outflow'].append(v[f'on{i}']) + tmp = summary_values['均值'] + retention_avg_dict = {} + # 多个分组项时,合成列表返回 + if len(groupby)>1: + summary_valuess = {} + for k, v in summary_values.items(): + if 'str' in str(type(k)): + summary_valuess[str([k])] = v + else: + summary_valuess[str(list(k))] = v + else: + summary_valuess=summary_values + + for rn in retention_n: + for rt, rd in df.T.items(): + if times + datetime.timedelta(days=rn) <= pd.datetime.now().date(): + retention_avg_dict.setdefault(rn, {'cnt0': 0, 'cntn': 0, 'o_cnt0': 0, 'o_cntn': 0}) + retention_avg_dict[rn]['cnt0'] += rd['cnt0'] + retention_avg_dict[rn]['cntn'] += rd[f'cnt{rn}'] + retention_avg_dict[rn]['o_cnt0'] += rd['cnt0'] + retention_avg_dict[rn]['o_cntn'] += rd[f'on{rn}'] tmp['p'] = [] tmp['n'] = [] @@ -514,18 +651,66 @@ async def retention_model(request: Request, n = 0 if np.isnan(n) else n tmp['p_outflow'].append(n) tmp['n_outflow'].append(rd['o_cntn']) + # 如果分组项是int类型按选择的分组 - # 次留数 - title = ['日期', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] + # 默认区间 + if analysis.event_view.get('groupBy')[0]['intervalType'] == 'def': + summary_valuess.pop('均值') + interval = (max_v - min_v) // 10 or 1 + lens = len(summary_valuess[max_v]['n']) + ress = {} + for i in range(min_v, max_v, interval): + d0 = 0 + n1 = [] + n_outflow1 = [] + for k, v in summary_valuess.items(): + if k >= i and k < i + interval: + d0 += v['d0'] + n1.append(v['n']) + n_outflow1.append(v['n_outflow']) + if len(n1) > 0: + re_dict = {} + n = np.sum([ii for ii in n1], axis=0).tolist() + n_outflow = np.sum([iii for iii in n_outflow1], axis=0).tolist() + p = [round(nu*100 / d0, 2) for nu in n] + p_outflow = [round(num*100 / d0, 2) for num in n_outflow] + re_dict['d0'] = d0 + re_dict['n'] = n + re_dict['n_outflow'] = n_outflow + re_dict['p'] = p + re_dict['p_outflow'] = p_outflow + ress[f"[{i},{i + interval})"] = re_dict + else: + re_dict = {'d0': 0} + n = [] + n_outflow = [] + p = [] + p_outflow = [] + for cishu in range(0, lens): + n.append(0) + n_outflow.append(0) + p.append(0) + p_outflow.append(0) + re_dict['n'] = n + re_dict['n_outflow'] = n_outflow + re_dict['p'] = p + re_dict['p_outflow'] = p_outflow + ress[f"[{i},{i + interval})"] = re_dict + summary_valuess=ress + # 自定义区间 + elif analysis.event_view.get('groupBy')[0]['intervalType'] == 'user_defined': + pass + # 次留数 + title = ['分组项', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] # 未到达的日期需要补齐- retention_length = len(retention_n) - for _, items in summary_values.items(): + for _, items in summary_valuess.items(): for key in ['p', 'n', 'p_outflow', 'n_outflow']: items[key].extend(['-'] * (retention_length - len(items[key]))) resp = { - 'summary_values': summary_values, + 'summary_values': summary_valuess, # 'values': values, 'date_range': [d.strftime('%Y-%m-%d') for d in date_range], 'title': title, @@ -832,7 +1017,7 @@ async def funnel_model( _ = date_data.setdefault(key.strftime('%Y-%m-%d'), {}) _['总体'] = tmp - + # 分组 if groupby: # 补齐数据 concat_data = [] @@ -843,7 +1028,18 @@ async def funnel_model( df = pd.concat([df, pd.DataFrame(concat_data, columns=df.columns)]) # df.sort_values(list((*groupby, 'level')), inplace=True, ascending=False) - + # 映射对应中文返回给前端展示 + for i in groupby: + if i == 'svrindex': + if game == 'mfmh5': + game = 'mzmfmh5' + chinese = {} + resp = await crud.select_map.get_one(db, game, i) + for ii in resp: + chinese[ii['id']] = ii['title'] + for k, v in chinese.items(): + # 开始映射 + df.loc[df[i] == k, i] = v for key, tmp_df in df.groupby(groupby): tmp = {'title': key} tmp_df = tmp_df.groupby('level').sum() @@ -1247,18 +1443,19 @@ async def scatter_model( #按天分组 sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', 1) - sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1) + sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) #按周分组 sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', 1) - sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1) + sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) #按月分组 sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', 1) - sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1) + sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', f'`{columnName}`', 1) #合计 if analysis.event_view.get('timeParticleSize') == "total": - sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1) + sql = sql.replace(f'SELECT', f'SELECT `{columnName}` as va,', 1) + print(sql) df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) diff --git a/core/config.py b/core/config.py index c19f647..2350f41 100644 --- a/core/config.py +++ b/core/config.py @@ -399,21 +399,21 @@ class Settings(BaseSettings): case_sensitive = True -class Debug(Settings): - MDB_HOST: str = '10.0.0.9' - MDB_PORT: int = 27017 - MDB_USER: str = 'root' - MDB_PASSWORD: str = 'iamciniao' - MDB_DB: str = 'xdata' - - DATABASE_URI = f'mongodb://{MDB_USER}:{MDB_PASSWORD}@{MDB_HOST}:{MDB_PORT}/admin' -#本地MongoDB的库测试 # class Debug(Settings): -# MDB_HOST: str = '127.0.0.1' +# MDB_HOST: str = '10.0.0.9' # MDB_PORT: int = 27017 +# MDB_USER: str = 'root' +# MDB_PASSWORD: str = 'iamciniao' # MDB_DB: str = 'xdata' # -# DATABASE_URI = f'mongodb://{MDB_HOST}:{MDB_PORT}/admin' +# DATABASE_URI = f'mongodb://{MDB_USER}:{MDB_PASSWORD}@{MDB_HOST}:{MDB_PORT}/admin' +#本地MongoDB的库测试 +class Debug(Settings): + MDB_HOST: str = '127.0.0.1' + MDB_PORT: int = 27017 + MDB_DB: str = 'xdata' + + DATABASE_URI = f'mongodb://{MDB_HOST}:{MDB_PORT}/admin' class Produce(Settings): MDB_HOST: str = '127.0.0.1' diff --git a/crud/__init__.py b/crud/__init__.py index 8094eaa..07a6219 100644 --- a/crud/__init__.py +++ b/crud/__init__.py @@ -20,4 +20,5 @@ from .crud_api_board import api_board from .crud_url_list import url_list from .crud_user_url import user_url from .crud_api_module import api_module -from .crud_event_list import event_list \ No newline at end of file +from .crud_event_list import event_list +from .crud_event_point import event_point \ No newline at end of file diff --git a/crud/crud_event_point.py b/crud/crud_event_point.py new file mode 100644 index 0000000..26e2409 --- /dev/null +++ b/crud/crud_event_point.py @@ -0,0 +1,29 @@ +from motor.motor_asyncio import AsyncIOMotorDatabase +import schemas +from crud.base import CRUDBase + +__all__ = 'event_point', + +from utils import get_uid + + +class CRUDProjectNumber(CRUDBase): + # 获取对应游戏的数据,默认把基础属性的数据也获取出来 + async def all_event(self, db: AsyncIOMotorDatabase, game): + return await self.find_many(db, {'game': {'$in':[game,'basics_attr']}}) + + # # 修改数据 + # async def update(self, db: AsyncIOMotorDatabase, data_in: schemas.AddProjectnumber): + # game = data_in.game + # add_ditch = [] + # for member in data_in.ditch: + # add_ditch.append(member.dict()) + # await self.update_one(db, {'game': game}, {'$set': {'ditch': add_ditch}}) + # + # # 插入数据 + # async def create(self, db: AsyncIOMotorDatabase, data_in: schemas.ProjectnumberInsert): + # # await self.update_one(db, {'xiangmu': data_in.xiangmu}, {'$set': data_in.dict()}, upsert=True) + # await self.update_one(db, {data_in.game, data_in.ditch}, upsert=True) + + +event_point = CRUDProjectNumber('event_point') diff --git a/main.py b/main.py index 551b5f4..0638ceb 100644 --- a/main.py +++ b/main.py @@ -162,5 +162,5 @@ async def add_process_time_header(request: Request, call_next): if __name__ == '__main__': - #uvicorn.run(app='main:app', host="10.0.0.240", port=7899, reload=True, debug=True) - uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) \ No newline at end of file + uvicorn.run(app='main:app', host="10.0.0.240", port=7899, reload=True, debug=True) + #uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) \ No newline at end of file diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index b2e30c7..b4a932a 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -19,7 +19,8 @@ from db import get_database from db.redisdb import get_redis_pool, RedisDrive from models.user_label import UserClusterDef -from utils import get_week, strptime, start_end_month, strptime1 +from utils import get_week, strptime, start_end_month, strptime1, get_event + class CombinationEvent: def __init__(self, data, string, format): @@ -255,7 +256,7 @@ class BehaviorAnalysis: :param relation: :return: """ - + # 事件,留存,分布,漏斗分析生成sql经过, user_filters = [] event_filters = [] for filter in filters: @@ -281,9 +282,56 @@ class BehaviorAnalysis: continue else: continue - + # 表名 tbl = getattr(self, f'{item["tableType"]}_tbl') + # 字段名 col = getattr(tbl.c, item['columnName']) + # 判断是否是同一个事件 + yuan_event=self.events[0].get('eventName') or self.events[0].get('event_name') # 指标中的事件名 + biao_event=self.events[0].get('customEvent','').split('.')[0] + event= await get_event(item['columnName'],self.game) # 获取对应事件名或基础属性 + if event != yuan_event and event != biao_event and event != '基础属性' and self.game != 'debug': + event_time_col = getattr(self.event_tbl.c, '#event_time') + event_name_col = getattr(self.event_tbl.c, '#event_name') + base_where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date,] + event_name_where = [] + event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event)) + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + event_name_where.append(or_(*[col == v for v in ftv])) + else: + event_name_where.append(col == ftv[0]) + elif comparator == '>=': + event_name_where.append(col >= ftv[0]) + elif comparator == '<=': + event_name_where.append(col <= ftv[0]) + elif comparator == '>': + event_name_where.append(col > ftv[0]) + elif comparator == '<': + event_name_where.append(col < ftv[0]) + elif comparator == 'range': + event_name_where.append(col > ftv[0]) + event_name_where.append(col <= ftv[1]) + elif comparator == 'is not null': + event_name_where.append(col.isnot(None)) + elif comparator == 'is null': + event_name_where.append(col.is_(None)) + elif comparator == 'like': + event_name_where.append(col.like(f'%{ftv[0]}%')) + elif comparator == 'not like': + event_name_where.append(col.notlike(f'%{ftv[0]}%')) + elif comparator == 'in': + event_name_where.append(col.in_(ftv)) + elif comparator == '!=': + event_name_where.append(col != ftv[0]) + sub_qry=sa.select(sa.Column('#account_id')).select_from( + sa.select(sa.Column('#account_id')).where(and_(*base_where,*event_name_where)) + ) + event_filter.append(sa.Column('#account_id').in_(sub_qry)) + continue # 日期类型处理时区 if item.get('data_type') == 'datetime': col = func.addHours(col, self.zone_time) @@ -404,6 +452,7 @@ class BehaviorAnalysis: } async def event_model_sql(self): + """事件分析生成sql会经过""" sqls = [] event_time_col = getattr(self.event_tbl.c, '#event_time') for event in self.events: @@ -776,19 +825,29 @@ ORDER BY level } async def trace_model_sql(self): + # 路径分析生成SQL session_interval = self.event_view.get('session_interval') session_type = self.event_view.get('session_type') session_type_map = { 'minute': 60, 'second': 1, 'hour': 3600 - } interval_ts = session_interval * session_type_map.get(session_type, 60) event_names = self.events.get('event_names') source_event = self.events.get('source_event', {}).get('eventName') source_type = self.events.get('source_event', {}).get('source_type') - + wheres = self.events['user_filter']['filts'] + sql_one='' + if wheres != []: # 有筛选条件的时候使用 + columnName=wheres[0]['columnName'] # 获取字段名 + event=await get_event(columnName,self.game) # 获取字段属于那个事件,或者是否是基础属性 + if event == '基础属性': + sql_one=f""" and `{columnName}` {wheres[0]['comparator']} '{wheres[0]['ftv'][0]}' and `#event_name` in evnet_all) """ + else:# 如果包含有事件则进入下面的逻辑 + sql_one=f"""and `#account_id` in ( SELECT `#account_id` FROM {self.game}.event WHERE `#event_name`= '{event}' and `{columnName}` = + '{wheres[0]['ftv'][0]}' and addHours(`#event_time`, {self.zone_time}) >= start_data + and addHours(`#event_time`, {self.zone_time}) <= end_data ) and `#event_name` in evnet_all)""" sql_a = f"""with '{source_event}' as start_event, {tuple(event_names)} as evnet_all, @@ -879,6 +938,8 @@ GROUP BY event_chain,`#account_id` ORDER BY values desc""" sql = sql_a if source_type == 'initial_event' else sql_b + if sql_one != '': + sql= sql.replace('and `#event_name` in evnet_all)',sql_one,1) print(sql) return { 'sql': sql, @@ -888,10 +949,11 @@ ORDER BY values desc""" } async def retention_model_sql2(self): + # 留存分析生成SQL filter_item_type = self.event_view.get('filter_item_type') filter_item = self.event_view.get('filter_item') - event_name_a = self.events[0]['eventName'] - event_name_b = self.events[1]['eventName'] + event_name_a = self.events[0]['eventName'] # 初始的事件名 + event_name_b = self.events[1]['eventName'] # 回访的事件名 visit_name = self.events[0].get('event_attr_id') @@ -973,3 +1035,107 @@ group by a.reg_date) log on reg.date=log.reg_date 'start_date': self.start_date[:10], 'end_date': self.end_date[:10], } + + async def retention_model_sql3(self): + # 留存分析分组详情生成SQL + filter_item_type = self.event_view.get('filter_item_type') + filter_item = self.event_view.get('filter_item') + event_name_a = self.events[0]['eventName'] # 初始的事件名 + event_name_b = self.events[1]['eventName'] # 回访的事件名 + groupby_list=self.event_view.get('groupBy') + # 判断是基础的还是标签 + groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] + + visit_name = self.events[0].get('event_attr_id') + + where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')), + (self.global_filters, self.global_relation) + , self.ext_filters) + where_a = '1' + if where: + qry = sa.select().where(*where) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + where_a = 'WHERE '.join(sql.split('WHERE ')[1:]) + # 任意事件 + event_name_b = 1 if event_name_b == '*' else f"`#event_name` = '{event_name_b}'" + + days = (arrow.get(self.end_date).date() - arrow.get(self.start_date).date()).days + keep = [] + cnt = [] + retention_n = [*[k for k in range(1, 60)], 70 - 1, 75 - 1, 80 - 1, 85 - 1, 90 - 1, 95 - 1, 100 - 1, 110 - 1, + 120 - 1, 150 - 1, 180 - 1, 210 - 1, 240 - 1, 270 - 1, 300 - 1, + 360 - 1] + + """ + cnt0-cnt1 as on1, + round(on1 * 100 / cnt0, 2) as `0p1`, + """ + + for i in retention_n: + keep.append( + f"""cnt{i}, + round(cnt{i} * 100 / cnt0, 2) as `p{i}`, + cnt0-cnt{i} as on{i}, + round(on{i} * 100 / cnt0, 2) as `op{i}` + """) + cnt.append(f"""sum(if(dateDiff('day',a.reg_date,b.visit_date)={i},1,0)) as cnt{i}""") + keep_str = ','.join(keep) + cnt_str = ','.join(cnt) + if len(groupby) > 0: + groupbys = ','.join([f"`{i}`" for i in groupby]) + groupby_on = ' and '.join([f"reg.{ii} = log.{ii}" for ii in [f"`{i}`" for i in groupby]]) + sql=f""" + with '{event_name_a}' as start_event, + {event_name_b} as retuen_visit, + `{visit_name}` as visit, + '{self.start_date}' as start_data, + '{self.end_date}' as end_data, + toDate(addHours(`#event_time`, {self.zone_time})) as date + select reg_date, + {groupbys}, + cnt0 , + {keep_str} + from(select date,{groupbys},uniqExact(visit) as cnt0 from {self.game}.event + where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} + group by date,{groupbys}) reg left join + (select a.reg_date,{groupbys}, + {cnt_str} + from (select {groupbys},date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit,{groupbys}) a + left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on + a.visit = b.visit + group by a.reg_date,{groupbys}) log on reg.date=log.reg_date and {groupby_on} + """ + else: + sql = f""" + with '{event_name_a}' as start_event, + {event_name_b} as retuen_visit, + `{visit_name}` as visit, + '{self.start_date}' as start_data, + '{self.end_date}' as end_data, + toDate(addHours(`#event_time`, {self.zone_time})) as date + select reg_date, + cnt0 , + {keep_str} + from(select date, uniqExact(visit) as cnt0 from {self.game}.event + where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} + group by date) reg left join + (select a.reg_date, + {cnt_str} + from (select date as reg_date, visit from {self.game}.event where `#event_name` = start_event and addHours(`#event_time`, {self.zone_time}) >= start_data and addHours(`#event_time`, {self.zone_time}) <= end_data and {where_a} group by reg_date, visit) a + left join (select date as visit_date, visit from {self.game}.event where retuen_visit and addHours(`#event_time`, {self.zone_time}) >= start_data group by visit_date, visit) b on + a.visit = b.visit + group by a.reg_date) log on reg.date=log.reg_date + """ + print(sql) + return { + 'sql': sql, + 'group_label': self.group_label, + 'date_range': self.date_range, + 'unit_num': self.unit_num, + 'retention_n': retention_n, + 'filter_item_type': filter_item_type, + 'filter_item': filter_item, + 'time_particle': self.time_particle, + 'start_date': self.start_date[:10], + 'end_date': self.end_date[:10], + } diff --git a/models/user_label.py b/models/user_label.py index a482dfb..a45dd89 100644 --- a/models/user_label.py +++ b/models/user_label.py @@ -43,7 +43,7 @@ class UserClusterDef: res_json = await self.rdb.get(f'{self.game}_user') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) - #self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])修改了这里,这是原本的 + # self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) # 修改了这里,这是原本的 self.user_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) self.u_account_id_col = getattr(self.user_tbl.c, '#account_id') self.e_account_id_col = getattr(self.event_tbl.c, '#account_id') diff --git a/models/x_analysis.py b/models/x_analysis.py index e6a4654..f9ac39c 100644 --- a/models/x_analysis.py +++ b/models/x_analysis.py @@ -15,6 +15,8 @@ import schemas from core.config import settings from db import get_database from db.redisdb import get_redis_pool, RedisDrive +from models.user_label import UserClusterDef +from utils import get_event class XAnalysis: @@ -30,7 +32,11 @@ class XAnalysis: self.date_range = [] self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) - + self.start_date = None + self.end_date = None + self.data_where=[] + self.event_tbl=None + self.zone_time:int = 0 def _get_global_filters(self): return self.event_view.get('filts') or [] #获取event_view字典里面filts的值,或返回空列表 @@ -124,8 +130,8 @@ class XAnalysis: return event_filters - def ltv_model_sql(self): + # ltv的生成sql days = (arrow.get(self.event_view['endTime']).date() - arrow.get(self.event_view['startTime']).date()).days quota = self.event_view['quota'] select_ltv = [] diff --git a/schemas/__init__.py b/schemas/__init__.py index c297ede..9d51027 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -24,4 +24,5 @@ from .api_board import * from .url_list import * from .user_url import * from .api_module import * -from .event_list import * \ No newline at end of file +from .event_list import * +from .event_point import * \ No newline at end of file diff --git a/schemas/event_point.py b/schemas/event_point.py new file mode 100644 index 0000000..5aefb4e --- /dev/null +++ b/schemas/event_point.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel +from typing import List + + +class Eventpoint(BaseModel): + game: str # 游戏名 + event_name: str # 事件名 + event_attr: List[str] # 事件属性 diff --git a/utils/func.py b/utils/func.py index 2f33561..bc5d898 100644 --- a/utils/func.py +++ b/utils/func.py @@ -1,4 +1,3 @@ - import random import time import datetime @@ -7,6 +6,12 @@ import pandas as pd from datetime import timedelta from datetime import datetime as p1 import calendar + +import crud +import schemas +from db import get_database + + def get_uid(): return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:] @@ -27,7 +32,9 @@ def estimate_data(data_type): return "Nullable(DateTime('UTC'))" else: return "Nullable(String)" -#将字典变成字符串 + + +# 将字典变成字符串 def dict_to_str(dic): c = str() b = 0 @@ -41,18 +48,21 @@ def dict_to_str(dic): c += "\"%s\":\"%s\"}" % (k, v) return c -def getEveryDay(begin_date,end_date): + +def getEveryDay(begin_date, end_date): # 前闭后闭 date_list = [] begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d") - end_date = datetime.datetime.strptime(end_date,"%Y-%m-%d") + end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") while begin_date <= end_date: date_str = begin_date.strftime("%Y-%m-%d") date_list.append(date_str) begin_date += datetime.timedelta(days=1) return date_list -#print(getEveryDay('2016-01-01','2017-05-11')) -def Download_xlsx(df,name): + + +# print(getEveryDay('2016-01-01','2017-05-11')) +def Download_xlsx(df, name): """ 下载功能 name为文件名 @@ -61,14 +71,15 @@ def Download_xlsx(df,name): import mimetypes from utils import DfToStream from fastapi.responses import StreamingResponse - file_name=quote(f'{name}.xlsx') + file_name = quote(f'{name}.xlsx') mime = mimetypes.guess_type(file_name)[0] df_to_stream = DfToStream((df, name)) with df_to_stream as d: export = d.to_stream() - Download=StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) + Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) return Download + def jiange_insert(list_date): """ 间隔1条插入一条数据插入数据 @@ -81,6 +92,7 @@ def jiange_insert(list_date): i += 2 return list_date + def create_df(resp): """ 分布分析外部下载功能的df数据 @@ -118,7 +130,9 @@ def create_df(resp): columns.insert(0, '事件发生时间') df = pd.DataFrame(data=date, columns=columns) return df -def create_neidf(resp,columnName): + + +def create_neidf(resp, columnName): """ 分布分析内部下载功能的df数据 """ @@ -156,19 +170,22 @@ def create_neidf(resp,columnName): df = pd.DataFrame(data=date, columns=columns) return df + def get_week(date_str=None): if date_str and isinstance(date_str, str): - now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S") + now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime( + "%Y-%m-%d %H:%M:%S") else: now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0) - now_time=strptime(now_time) + now_time = strptime(now_time) # 当前日期所在周的周一 - week_start_time = now_time - timedelta(days=now_time.weekday()+1, hours=now_time.hour, minutes=now_time.minute, + week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute, seconds=now_time.second) # 当前日期所在周的周日 week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59) return week_start_time, week_end_time + def strptime(date_string): """ 将字符串转换成datetime.datetime类型 @@ -177,6 +194,7 @@ def strptime(date_string): """ return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S') + def strptime1(date_str): """ 将字符串转换成datetime.datetime类型 @@ -185,14 +203,30 @@ def strptime1(date_str): """ return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + def start_end_month(time): """ 获取某个月的起始时间和结束时间 :param time: '2022-05-29' :return: """ - now=p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S") + now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S") this_month_start = datetime.datetime(now.year, now.month, 1) this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1]) - this_month_end1=this_month_end+timedelta(hours=23, minutes=59, seconds=59) - return this_month_start,this_month_end1 \ No newline at end of file + this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59) + return this_month_start, this_month_end1 + + +async def get_event(attr, game): + """ + :param attr: str 事件属性 + :param game: str 游戏名 + :return: str 事件名 + """ + res = await crud.event_point.all_event(get_database(),game) + event_name='' + for i in res: + if attr in i['event_attr']: + event_name = i['event_name'] + break + return event_name