diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 740aaff..ea434a6 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -3,6 +3,7 @@ from collections import defaultdict import mimetypes from urllib.parse import quote import os +from copy import deepcopy import pandas as pd import numpy as np from fastapi import APIRouter, Depends, Request,File @@ -1402,6 +1403,7 @@ async def trace_model_sql( return schemas.Msg(code=-9, msg='无数据', data=None) chain_dict = defaultdict(dict) event_num_dict={} + event_next_event={} nodes = {'流失'} name_list=analysis.events['event_namesdes'] name_dict={} @@ -1431,6 +1433,13 @@ async def trace_model_sql( event_num_dict[true_key] = count fmt_keys.append(true_key) + # 检测事件的后续事件有哪些 + if keys[0] in event_next_event: + event_next_event[keys[0]].append(keys[1]) + event_next_event[keys[0]] = list(set(event_next_event[keys[0]])) + else: + event_next_event[keys[0]] = [keys[1]] + links = [] for _, items in chain_dict.items(): for keys, val in items.items(): @@ -1457,12 +1466,20 @@ async def trace_model_sql( nodes.insert(0, i) for i in trail: nodes.append(i) + # 处理event_next_event + event_new_next = {} + for key, key_list in event_next_event.items(): + new_key_list = [] + for key1 in key_list: + new_key_list.append({'event_name': key1, 'value': event_num_dict[key1]}) + event_new_next[key] = new_key_list data = { #'nodes': [{'name': item} for item in nodes], 'nodes': [{'name': item} for item in nodes], 'links': links, - #'event_num':event_num_dict, + 'event_num':event_num_dict, + 'event_next':event_new_next, 'start_date': res['start_date'], 'end_date': res['end_date'], 'time_particle': res['time_particle'] @@ -1470,6 +1487,73 @@ async def trace_model_sql( return schemas.Msg(code=0, msg='ok', data=data) +@router.post("/trace_user_info_model") +async def trace_model_sql( + request: Request, + game: str, + ckdb: CKDrive = Depends(get_ck_db), + analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """路径分析用户详情""" + await analysis.init(data_where=current_user.data_where) + res = await analysis.trace_model_sql() + sql = res['sql'] + name_list = analysis.events['event_namesdes'] + name_dict = {} + for i in name_list: + name_dict[i['event_name']] = i['event_desc'] + # 获取事件对应的uid + event_uid_list = await ckdb.query_data_trace(sql, name_dict) + if not event_uid_list: + return schemas.Msg(code=-9, msg='无数据', data=None) + event_name = analysis.events['event_name'] + page = analysis.events['page'] + event_next = analysis.events['event_next'] + starindex = (page - 1) * 10 + all_uid_list = [] + + # 后续事件统计的用户详情 + if event_name.startswith('follow'): + for event_next_list in event_next: + true_event_name = event_next_list['event_name'] + if true_event_name.startswith('流失'): + continue + all_uid_list += event_uid_list[true_event_name] + all_uid_list = list(set(all_uid_list)) + + # 更多事件的用户详情 + elif event_name.startswith('more'): + # 后续事件排序后取第8个开始 + event_next_true_eventlist = sorted(event_next, key=lambda x: x['value'], reverse=True)[8:] + for event_next_list in event_next_true_eventlist: + true_event_name = event_next_list['event_name'] + all_uid_list += event_uid_list[true_event_name] + all_uid_list = list(set(all_uid_list)) + + # 单个节点的用户详情 + else: + all_uid_list = event_uid_list[event_name] + + user_num = len(all_uid_list) + account_id = all_uid_list[starindex:10 * page] + new_sql = f"""select `#account_id`,`#ip`,`#distinct_id`,rmbmoney,owner_name,lv,zhanli,channel, + channel,svrindex,maxmapid,name,`exp`,vip,jinbi,last_account_login_time,binduid from {game}.`user` where `#account_id` in ({account_id})""" + df1 = await ckdb.query_dataframe(new_sql) + new_values = df1.values.tolist() + for i in range(len(new_values)): + if str(new_values[i][6]) == 'nan': + new_values[i][6] = 0 + res = { + 'user_num': user_num, + 'details_data': { + 'new_columns': df1.columns.tolist(), + 'new_values': new_values + }} + + return schemas.Msg(code=0, msg='ok', data=res) + + @router.post("/user_property_model_sql") async def user_property_sql( request: Request, diff --git a/db/ckdb.py b/db/ckdb.py index dd7c624..deeac77 100644 --- a/db/ckdb.py +++ b/db/ckdb.py @@ -38,6 +38,34 @@ class CKDrive: df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df + async def query_data_trace(self, sql, name_dict): + data, columns = await self._execute(sql, with_column_types=True, columnar=True) + res_dict = {} + if data: + for index, value in enumerate(data[0]): + event_next_list = [] + value_len = len(value) + for i, key in enumerate(value): + if i >= 10: + continue + next_event = value[i + 1] if i < value_len - 1 else '流失' + # 按对应的中文名显示 + event_namess = name_dict.get(key, key) + next_eventss = name_dict.get(next_event, next_event) + key = (f'{event_namess}-{i}', f'{next_eventss}-{i + 1}') + keys = list(key) + for next_key in keys: + if next_key in event_next_list: + continue + event_next_list.append(next_key) + for event_name in event_next_list: + if event_name not in res_dict: + res_dict[event_name] = [data[1][index]] + continue + res_dict[event_name].append(data[1][index]) + res_dict[event_name] = list(set(res_dict[event_name])) + return res_dict + async def count(self, db: str, tb: str): sql = f'select count() as `count` from {db}.{tb}' res = await self.execute(sql) diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index ef73997..80ac374 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -714,7 +714,7 @@ ORDER BY level {tuple(event_names)} as evnet_all, '{self.start_date}' as start_data, '{self.end_date}' as end_data -select event_chain, +select event_chain,`#account_id`, count() as values from (with toUInt32(minIf(`#event_time`, `#event_name` = start_event)) AS start_event_ts, @@ -751,7 +751,7 @@ from (with HAVING has_midway_hit = 1 ) where arrayElement(event_chain, 1) = start_event -GROUP BY event_chain +GROUP BY event_chain,`#account_id` ORDER BY values desc """ sql_b = f"""with @@ -759,7 +759,7 @@ ORDER BY values desc {tuple(event_names)} as evnet_all, '{self.start_date}' as start_data, '{self.end_date}' as end_data -select event_chain, +select event_chain,`#account_id`, count() as values from (with toUInt32(maxIf(`#event_time`, `#event_name` = end_event)) AS end_event_ts, @@ -795,7 +795,7 @@ from (with HAVING has_midway_hit = 1 ) where arrayElement(event_chain, -1) = end_event -GROUP BY event_chain +GROUP BY event_chain,`#account_id` ORDER BY values desc""" sql = sql_a if source_type == 'initial_event' else sql_b