diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 34273fa..45cc080 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -415,7 +415,7 @@ async def event_model( appgb = str(gb).replace("'", '') groups.append(appgb) item['groups'] = groups - #修改后的方案 + # 修改后的方案 # by_dict={} # for i in range(len(gb)): # by_dict[groupby_list[i]]=gb[i] @@ -431,7 +431,7 @@ async def event_model( appgb = str(gb).replace("'", '') groups.append(appgb) item['groups'] = groups - item['groupby']=groupby + item['groupby'] = groupby item['values'] = np.array(item['values'])[sort_key].tolist() item['sum'] = np.array(item['sum'])[sort_key].tolist() item['avg'] = np.array(item['avg'])[sort_key].tolist() @@ -587,7 +587,7 @@ async def retention_model(request: Request, # 映射对应中文返回给前端展示 groupby_list = analysis.event_view.get('groupBy') groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] - true_group = [] # 定义分组实际选择 + true_group = [] # 定义分组实际选择 for g_data in groupby_list: data_type = g_data['data_type'] @@ -622,7 +622,7 @@ async def retention_model(request: Request, max_v = int(df[g_data['columnName']].max()) min_v = int(df[g_data['columnName']].min()) interval = (max_v - min_v) // 10 or 1 - for i in range(min_v, max_v+1, interval): + for i in range(min_v, max_v + 1, interval): zidai.append([i, i + interval]) true_group.append(zidai) @@ -795,8 +795,8 @@ async def retention_model(request: Request, # 计算概率 for key1, value1 in new_summary_valuess.items(): - new_summary_valuess[key1]['p'] = [round(i*100 / value1['d0'], 2) for i in value1['n']] - new_summary_valuess[key1]['p_outflow'] = [round(i1*100 / value1['d0'], 2) for i1 in value1['n_outflow']] + new_summary_valuess[key1]['p'] = [round(i * 100 / value1['d0'], 2) for i in value1['n']] + new_summary_valuess[key1]['p_outflow'] = [round(i1 * 100 / value1['d0'], 2) for i1 in value1['n_outflow']] title = ['分组项', '用户数', '次留', *[f'{i + 1}留' for i in retention_n[1:]]] @@ -820,8 +820,8 @@ async def retention_model(request: Request, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'], - 'groupby':[i['columnName'] for i in groupby_list], - 'groupby_data':groupby_data + 'groupby': [i['columnName'] for i in groupby_list], + 'groupby_data': groupby_data } return schemas.Msg(code=0, msg='ok', data=resp) @@ -1178,7 +1178,7 @@ async def funnel_model( # tmp['p2'].append(round(v*100 / tmp_df.loc[i - 1, 'values'], 2)) _ = date_data.setdefault(key[0].strftime('%Y-%m-%d'), {}) - #[key[1]] = tmp + # [key[1]] = tmp title = (groupby or ['总体']) + cond_level resp = {'list': data_list, 'date_data': date_data, @@ -1520,6 +1520,58 @@ async def scatter_model( # 'title': '总体'} +@router.post("/guide_model") +async def guide_model( + request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + db: AsyncIOMotorDatabase = Depends(get_database), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """分布分析 模型""" + await analysis.init(data_where=current_user.data_where) + event_type = analysis.events[0]['eventName'] + try: + res = await analysis.guide_model_sql() + except Exception as e: + return schemas.Msg(code=-9, msg='报表配置参数异常') + + sql = res['sql'] + df = await ckdb.query_dataframe(sql) + if df.empty: + return schemas.Msg(code=-9, msg='无数据', data=None) + + group_str = res['analysis'] + # 转int + df[group_str] = df[group_str].astype(int) + step_list = [str(i) for i in sorted(df[group_str].unique())] + dict_k = {} + for k, nedf in df.groupby("date"): + ste_k = {} + for kk, ste_df in nedf.groupby(group_str): + ste_df.reset_index(drop=True, inplace=True) + ste_k[str(kk)] = int(ste_df['values'][0]) + for ste in step_list: + if ste not in list(ste_k.keys()): + ste_k[ste] = 0 + dict_k[str(k)] = ste_k + data = {} + for dict_key, dict_data in dict_k.items(): + dict_data1 = deepcopy(dict_data) + dict_k1 = {int(k): v for k, v in dict_data1.items()} + sorted(dict_k1.keys()) + data_values = list(dict_k1.values()) + data_values.insert(0, dict_key) + data[dict_key] = data_values + step_list.insert(0, '日期') + res_msg = { + 'level': step_list, + 'list': data + } + return schemas.Msg(code=0, msg='ok', data=res_msg) + + @router.post("/scatter_model_details") async def scatter_model( request: Request, @@ -1683,7 +1735,7 @@ async def scatter_model( ac = ast.literal_eval(i) ab = [str(ii) for ii in ac] groupby_data.append(ab) - resp['groupby_data']=groupby_data + resp['groupby_data'] = groupby_data return schemas.Msg(code=0, msg='ok', data=resp) else: # 离散数字 @@ -1790,7 +1842,7 @@ async def scatter_model( ac = ast.literal_eval(i) ab = [str(ii) for ii in ac] groupby_data.append(ab) - resp['groupby_data']=groupby_data + resp['groupby_data'] = groupby_data return schemas.Msg(code=0, msg='ok', data=resp) else: return schemas.Msg(code=-9, msg='没有添加分组项', data='') diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index 61ab716..48787c8 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -400,15 +400,15 @@ class BehaviorAnalysis: # 字段名 col = getattr(tbl.c, item['columnName']) # 判断是否是同一个事件 - yuan_event=self.events[0].get('eventName') or self.events[0].get('event_name') # 指标中的事件名 - biao_event=self.events[0].get('customEvent','').split('.')[0] - event= await get_event(item['columnName'],self.game) # 获取对应事件名或基础属性 + yuan_event = self.events[0].get('eventName') or self.events[0].get('event_name') # 指标中的事件名 + biao_event = self.events[0].get('customEvent', '').split('.')[0] + event = await get_event(item['columnName'], self.game) # 获取对应事件名或基础属性 if event != yuan_event and event != biao_event and event != '基础属性' and self.game != 'debug': event_time_col = getattr(self.event_tbl.c, '#event_time') event_name_col = getattr(self.event_tbl.c, '#event_name') base_where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, - func.addHours(event_time_col, self.zone_time) <= self.end_date,] + func.addHours(event_time_col, self.zone_time) <= self.end_date, ] event_name_where = [] event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event)) ftv = item['ftv'] @@ -440,8 +440,8 @@ class BehaviorAnalysis: event_name_where.append(col.in_(ftv)) elif comparator == '!=': event_name_where.append(col != ftv[0]) - sub_qry=sa.select(sa.Column('#account_id')).select_from( - sa.select(sa.Column('#account_id')).where(and_(*base_where,*event_name_where)) + sub_qry = sa.select(sa.Column('#account_id')).select_from( + sa.select(sa.Column('#account_id')).where(and_(*base_where, *event_name_where)) ) event_filter.append(sa.Column('#account_id').in_(sub_qry)) continue @@ -727,7 +727,7 @@ addHours("#event_time", 8) >= '{stat_date}' AND addHours("#event_time", 8) <= '{ GROUP BY toDate(addHours("#event_time", 8))""" sqls.append({'sql': sql, - 'group_label':self.group_label, + 'group_label': self.group_label, 'groupby': [i.key for i in self.groupby], 'date_range': self.date_range, 'event_name': event_name_display or event_name, @@ -826,7 +826,7 @@ ORDER BY level if time != None and time != '合计': timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间 if timeParticleSize == 'P1W': # 按周 - start_date , end_date = get_week(time) + start_date, end_date = get_week(time) if start_date < strptime(self.start_date): # 开头的时间 where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, @@ -835,13 +835,13 @@ ORDER BY level elif end_date < strptime(self.end_date): # 中间的时间 where = [ func.addHours(event_time_col, self.zone_time) >= start_date, - func.addHours(event_time_col, self.zone_time) <= end_date,] + func.addHours(event_time_col, self.zone_time) <= end_date, ] else: # 结尾的时间 where = [ func.addHours(event_time_col, self.zone_time) >= start_date, - func.addHours(event_time_col, self.zone_time) <= self.end_date,] + func.addHours(event_time_col, self.zone_time) <= self.end_date, ] elif timeParticleSize == 'P1M': # 按月 - start_date, end_date=start_end_month(time) + start_date, end_date = start_end_month(time) if strptime(self.start_date) > strptime1(time): where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, @@ -855,7 +855,7 @@ ORDER BY level else: where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, - func.addHours(event_time_col, self.zone_time) <= self.end_date,] + func.addHours(event_time_col, self.zone_time) <= self.end_date, ] else: where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, @@ -873,7 +873,8 @@ ORDER BY level if analysis in ['number_of_days', 'number_of_hours']: values_col = func.count(func.distinct(e_account_id_col)).label('values') - if analysis in ['times', 'number_of_days', 'number_of_hours', 'sum', 'avg', 'median', 'max', 'min', 'distinct_count']: + if analysis in ['times', 'number_of_days', 'number_of_hours', 'sum', 'avg', 'median', 'max', 'min', + 'distinct_count']: if self.time_particle == 'total': qry = sa.select(*self.groupby, values_col) \ .where(and_(*where)) \ @@ -940,6 +941,144 @@ ORDER BY level 'end_date': self.end_date[:10], } + async def guide_model_sql(self): + # 事件步骤生成sql + event = self.events[0] + event_name = event['eventName'] + analysis = event['analysis'] + if analysis in ['list_distinct', "set_distinct", "ele_distinct"]: + analysis = 'max' + e_account_id_col = getattr(self.event_tbl.c, '#account_id').label('uid') + u_account_id_col = getattr(self.user_tbl.c, '#account_id') + event_name_col = getattr(self.event_tbl.c, '#event_name') + event_time_col = getattr(self.event_tbl.c, '#event_time').label('date') + event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time) + + quota_interval_arr = event.get('quotaIntervalArr') + time = self.data_in.time + global where + # 判断是分布分析里面的分组详情,改时间范围,其他情况都走else + if time != None and time != '合计': + timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间 + if timeParticleSize == 'P1W': # 按周 + start_date, end_date = get_week(time) + if start_date < strptime(self.start_date): # 开头的时间 + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= end_date, + ] + elif end_date < strptime(self.end_date): # 中间的时间 + where = [ + func.addHours(event_time_col, self.zone_time) >= start_date, + func.addHours(event_time_col, self.zone_time) <= end_date, ] + else: # 结尾的时间 + where = [ + func.addHours(event_time_col, self.zone_time) >= start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, ] + elif timeParticleSize == 'P1M': # 按月 + start_date, end_date = start_end_month(time) + if strptime(self.start_date) > strptime1(time): + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= end_date, + ] + else: + where = [ + func.addHours(event_time_col, self.zone_time) >= start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + ] + else: + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, ] + else: + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + ] + if event_name != '*': + where.append(event_name_col == event_name) + event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')), + (self.global_filters, self.global_relation) + , self.ext_filters) + if user_filter: + where.append(e_account_id_col.in_(sa.select(u_account_id_col).where(*user_filter))) + where.extend(event_filter) + values_col = func.count().label('values') + if analysis in ['number_of_days', 'number_of_hours']: + values_col = func.count(func.distinct(e_account_id_col)).label('values') + + if analysis: + if self.time_particle == 'total': + qry = sa.select(*self.groupby, analysis, values_col) \ + .where(and_(*where)) \ + .group_by(*self.groupby, analysis, e_account_id_col) + else: + qry = sa.select(event_date_col, *self.groupby, values_col) \ + .where(and_(*where)) \ + .group_by(event_date_col, *self.groupby, e_account_id_col) + + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + print(sql) + sqla = sql.replace('SELECT', f'SELECT {analysis}, ', 1) + sqlb = sqla.replace('GROUP BY', f'GROUP BY {analysis}, ', 1) + sqlc = sqlb.replace('WHERE', f'WHERE {analysis} is not null AND ', 1) + print(sqlc) + return { + 'sql': sqlc, + 'group_label': self.group_label, + 'interval_type': event['intervalType'], + 'analysis': analysis, + 'quota_interval_arr': quota_interval_arr, + 'groupby': [i.key for i in self.groupby], + 'time_particle': self.time_particle, + 'start_date': self.start_date[:10], + 'end_date': self.end_date[:10], + } + elif event.get('quota'): + event_attr_col = getattr(self.event_tbl.c, event['quota']) + if self.time_particle == 'total': + if analysis == 'uniqExact': + # 去重数 合计 + qry = sa.select(e_account_id_col, + event_attr_col.label('values')) \ + .where(and_(*where)) \ + .group_by(*self.groupby, e_account_id_col, event_attr_col) + else: + qry = sa.select(e_account_id_col, + settings.CK_FUNC[analysis](event_attr_col).label('values')) \ + .where(and_(*where)) \ + .group_by(*self.groupby, e_account_id_col) + else: + if analysis == 'uniqExact': + # 去重数 + qry = sa.select(event_date_col, e_account_id_col, + event_attr_col.label('values')) \ + .where(and_(*where)) \ + .group_by(event_date_col, e_account_id_col, event_attr_col) + else: + qry = sa.select(event_date_col, e_account_id_col, + settings.CK_FUNC[analysis](event_attr_col).label('values')) \ + .where(and_(*where)) \ + .group_by(event_date_col, e_account_id_col) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + columnName = event.get('label_id', '') + if columnName != '': + sql = sql.replace('SELECT', f'SELECT {columnName},', 1) + sql += f',{columnName}' + print(sql) + return { + 'sql': sql, + 'group_label': self.group_label, + 'interval_type': event['intervalType'], + 'analysis': analysis, + 'quota_interval_arr': quota_interval_arr, + 'groupby': [i.key for i in self.groupby], + 'time_particle': self.time_particle, + 'start_date': self.start_date[:10], + 'end_date': self.end_date[:10], + } + async def trace_model_sql(self): # 路径分析生成SQL session_interval = self.event_view.get('session_interval') @@ -1164,7 +1303,7 @@ group by a.reg_date) log on reg.date=log.reg_date filter_item = self.event_view.get('filter_item') event_name_a = self.events[0]['eventName'] # 初始的事件名 event_name_b = self.events[1]['eventName'] # 回访的事件名 - groupby_list=self.event_view.get('groupBy') + groupby_list = self.event_view.get('groupBy') # 判断是基础的还是标签 groupby = [i['columnName'] for i in groupby_list if i['tableType'] != 'user_label'] @@ -1206,7 +1345,7 @@ group by a.reg_date) log on reg.date=log.reg_date if len(groupby) > 0: groupbys = ','.join([f"`{i}`" for i in groupby]) groupby_on = ' and '.join([f"reg.{ii} = log.{ii}" for ii in [f"`{i}`" for i in groupby]]) - sql=f""" + sql = f""" with '{event_name_a}' as start_event, {event_name_b} as retuen_visit, `{visit_name}` as visit,