diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index e314ac0..e26f421 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -1065,7 +1065,10 @@ async def scatter_model( # df['values']=df['values'].astype(str) df.fillna(0, inplace=True) # 转换数据类型为int - df['values'] = df['values'].astype(int) + if analysis.events[-1].get('analysis') != 'uniqExact': + df['values'] = df['values'].astype(int) + else: + df['values'] = df['values'].astype(str) # 统一声明使用去重数的时候为str interval_type = res['interval_type'] analysi = res['analysis'] groupby = res['groupby'] @@ -1075,7 +1078,10 @@ async def scatter_model( df['date'] = '合计' if analysi != 'number_of_days' and interval_type != 'discrete': - max_v = int(df['values'].max()) + try: + max_v = int(df['values'].max()) + except Exception as e: + return schemas.Msg(code=-9, msg='请用离散数字', data=None) min_v = int(df['values'].min()) interval = (max_v - min_v) // 10 or 1 resp = {'list': dict(), @@ -1098,7 +1104,7 @@ async def scatter_model( # 这是整体的 for key, tmp_df in df.groupby('date'): bins_s = pd.cut(tmp_df['values'], bins=bins, - right=False).value_counts() + right=False,include_lowest=True).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) if res['time_particle'] == 'total': @@ -1136,6 +1142,7 @@ async def scatter_model( # elif analysis == 'number_of_days': else: + # 离散数字 resp = {'list': {}, 'label': [], 'start_date': res['start_date'], 'end_date': res['end_date'], @@ -1143,8 +1150,12 @@ async def scatter_model( } labels = [str(i) for i in sorted(df['values'].unique())] resp['label'] = labels + shaixuan = analysis.events[0].get('analysis') for key, tmp_df in df.groupby(['date']): - total = len(tmp_df) + if shaixuan == 'uniqExact': + total = len(set(tmp_df['uid'])) + else: + total = len(tmp_df) if res['time_particle'] == 'total': dt = '合计' else: @@ -1220,6 +1231,7 @@ async def scatter_model( analysis: BehaviorAnalysis = Depends(BehaviorAnalysis), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: + """分布分析分组详情""" await analysis.init(data_where=current_user.data_where) try: res = await analysis.scatter_model_sql() @@ -1232,9 +1244,21 @@ async def scatter_model( if analysis.event_view['groupBy'] != []: if columnName != '': - sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'{columnName} as va', + #按天分组 + sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', 1) sql = sql.replace(f'toDate(addHours({game}.event."#event_time", 8))', columnName, 1) + #按周分组 + sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', + 1) + sql = sql.replace(f'toStartOfWeek(addHours({game}.event."#event_time", 8))', columnName, 1) + #按月分组 + sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8)) AS date', f'`{columnName}` as va', + 1) + sql = sql.replace(f'toStartOfMonth(addHours({game}.event."#event_time", 8))', columnName, 1) + #合计 + if analysis.event_view.get('timeParticleSize') == "total": + sql = sql.replace(f'SELECT', f'SELECT {columnName} as va,', 1) df = await ckdb.query_dataframe(sql) if df.empty: return schemas.Msg(code=-9, msg='无数据', data=None) @@ -1244,14 +1268,18 @@ async def scatter_model( df = df.explode("values").reset_index(drop=True) df.fillna(0, inplace=True) # 转换数据类型为int - df['values'] = df['values'].astype(int) + if analysis.events[-1].get('analysis') != 'uniqExact': + df['values'] = df['values'].astype(int) + else: + df['values'] = df['values'].astype(str) # 统一声明使用去重数的时候为str interval_type = res['interval_type'] analysi = res['analysis'] groupby = res['groupby'] quota_interval_arr = res['quota_interval_arr'] # 兼容合计的 - if res['time_particle'] == 'total': - df['date'] = '合计' + # if res['time_particle'] == 'total': + # if len(groupby) > 0: + # df['va'] = '合计' if analysi != 'number_of_days' and interval_type != 'discrete': # 默认区间 @@ -1291,7 +1319,7 @@ async def scatter_model( right=True, include_lowest=True).value_counts() bins_s.sort_index(inplace=True) total = int(bins_s.sum()) - if res['time_particle'] == 'total': + if res['time_particle'] == 'total111': resp['list']['合计'] = dict() resp['list']['合计'] = {'n': bins_s.to_list(), 'total': total, @@ -1343,9 +1371,13 @@ async def scatter_model( if 'list' in str(type(df['va'][0])): f = lambda x: x[0] df['va'] = df['va'].map(f) + shaixuan = analysis.events[0].get('analysis') for key, tmp_df in df.groupby(['va']): - total = len(tmp_df) - if res['time_particle'] == 'total': + if shaixuan == 'uniqExact': + total = len(set(tmp_df['uid'])) + else: + total = len(tmp_df) + if res['time_particle'] == 'total11': dt = '合计' else: # 映射对应的埋点数据 diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index 673c507..b2e30c7 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -19,7 +19,7 @@ from db import get_database from db.redisdb import get_redis_pool, RedisDrive from models.user_label import UserClusterDef - +from utils import get_week, strptime, start_end_month, strptime1 class CombinationEvent: def __init__(self, data, string, format): @@ -118,7 +118,7 @@ class BehaviorAnalysis: self.group_label = {} self.event_view = dict() self.events = [dict()] - + self.time = None self.zone_time: int = 0 self.start_date = None self.end_date = None @@ -644,6 +644,7 @@ ORDER BY level } async def scatter_model_sql(self): + # 分布分析生成sql event = self.events[0] event_name = event['eventName'] analysis = event['analysis'] @@ -656,14 +657,47 @@ ORDER BY level event_date_col = settings.TIME_GRAIN_EXPRESSIONS[self.time_particle](event_time_col, self.zone_time) quota_interval_arr = event.get('quotaIntervalArr') - - where = [ - # event_date_col >= self.start_date, - # event_date_col <= self.end_date, - func.addHours(event_time_col, self.zone_time) >= self.start_date, - func.addHours(event_time_col, self.zone_time) <= self.end_date, - - ] + time = self.data_in.time + global where + # 判断是分布分析里面的分组详情,改时间范围,其他情况都走else + if time != None and time != '合计': + timeParticleSize = self.event_view.get('timeParticleSize') # 筛选是按周,按月,合计等情况,用不同的时间 + if timeParticleSize == 'P1W': # 按周 + start_date , end_date = get_week(time) + if start_date < strptime(self.start_date): # 开头的时间 + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= end_date, + ] + elif end_date < strptime(self.end_date): # 中间的时间 + where = [ + func.addHours(event_time_col, self.zone_time) >= start_date, + func.addHours(event_time_col, self.zone_time) <= end_date,] + else: # 结尾的时间 + where = [ + func.addHours(event_time_col, self.zone_time) >= start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date,] + elif timeParticleSize == 'P1M': # 按月 + start_date, end_date=start_end_month(time) + if strptime(self.start_date) > strptime1(time): + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= end_date, + ] + else: + where = [ + func.addHours(event_time_col, self.zone_time) >= start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + ] + else: + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date,] + else: + where = [ + func.addHours(event_time_col, self.zone_time) >= self.start_date, + func.addHours(event_time_col, self.zone_time) <= self.end_date, + ] if event_name != '*': where.append(event_name_col == event_name) event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')), @@ -701,19 +735,29 @@ ORDER BY level elif event.get('quota'): event_attr_col = getattr(self.event_tbl.c, event['quota']) if self.time_particle == 'total': - qry = sa.select(e_account_id_col, - settings.CK_FUNC[analysis](event_attr_col).label('values')) \ - .where(and_(*where)) \ - .group_by(*self.groupby, e_account_id_col) + if analysis == 'uniqExact': + # 去重数 合计 + qry = sa.select(e_account_id_col, + event_attr_col.label('values')) \ + .where(and_(*where)) \ + .group_by(*self.groupby, e_account_id_col, event_attr_col) + else: + qry = sa.select(e_account_id_col, + settings.CK_FUNC[analysis](event_attr_col).label('values')) \ + .where(and_(*where)) \ + .group_by(*self.groupby, e_account_id_col) else: - # qry = sa.select(event_date_col, e_account_id_col, - # settings.CK_FUNC[analysis](event_attr_col).label('values')) \ - # .where(and_(*where)) \ - # .group_by(event_date_col, *self.groupby, e_account_id_col) - qry = sa.select(event_date_col, e_account_id_col, - settings.CK_FUNC[analysis](event_attr_col).label('values')) \ - .where(and_(*where)) \ - .group_by(event_date_col, e_account_id_col) + if analysis == 'uniqExact': + # 去重数 + qry = sa.select(event_date_col, e_account_id_col, + event_attr_col.label('values')) \ + .where(and_(*where)) \ + .group_by(event_date_col, e_account_id_col, event_attr_col) + else: + qry = sa.select(event_date_col, e_account_id_col, + settings.CK_FUNC[analysis](event_attr_col).label('values')) \ + .where(and_(*where)) \ + .group_by(event_date_col, e_account_id_col) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) columnName = event.get('label_id', '') if columnName != '': diff --git a/models/user_label.py b/models/user_label.py index 88b4075..a482dfb 100644 --- a/models/user_label.py +++ b/models/user_label.py @@ -43,8 +43,8 @@ class UserClusterDef: res_json = await self.rdb.get(f'{self.game}_user') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) - self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) - + #self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])修改了这里,这是原本的 + self.user_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) self.u_account_id_col = getattr(self.user_tbl.c, '#account_id') self.e_account_id_col = getattr(self.event_tbl.c, '#account_id') self.account_id_col = sa.Column('#account_id') @@ -185,10 +185,16 @@ class UserClusterDef: func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label( 'values') ] - qry_tmp = sa.select(self.account_id_col).select_from( - sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( - self.e_account_id_col).having( - settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num))) + if len(num) >1:#处理区间筛选的问题 + qry_tmp = sa.select(self.account_id_col).select_from( + sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( + self.e_account_id_col).having(sa.and_(sa.Column('values') > num[0],sa.Column('values') <= num[1]) + )) + else: + qry_tmp = sa.select(self.account_id_col).select_from( + sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( + self.e_account_id_col).having( + settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num))) else: selectd = [self.account_id_col] qry_tmp = sa.select(self.account_id_col).select_from( diff --git a/schemas/sql.py b/schemas/sql.py index 84613d7..6a4ddc5 100644 --- a/schemas/sql.py +++ b/schemas/sql.py @@ -13,6 +13,7 @@ class CkQuery(BaseModel): events: Union[List[dict], dict] = None report_id: str = None ext_filter: dict = dict() + time : str = None class Ck_seek_user(BaseModel): user_arrt_title: str # 用户属性 diff --git a/utils/func.py b/utils/func.py index 8dee789..2f33561 100644 --- a/utils/func.py +++ b/utils/func.py @@ -1,7 +1,12 @@ + import random import time import datetime import pandas as pd + +from datetime import timedelta +from datetime import datetime as p1 +import calendar def get_uid(): return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:] @@ -149,4 +154,45 @@ def create_neidf(resp,columnName): columns.insert(0, '全部用户数') columns.insert(0, columnName) df = pd.DataFrame(data=date, columns=columns) - return df \ No newline at end of file + return df + +def get_week(date_str=None): + if date_str and isinstance(date_str, str): + now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S") + else: + now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0) + now_time=strptime(now_time) + # 当前日期所在周的周一 + week_start_time = now_time - timedelta(days=now_time.weekday()+1, hours=now_time.hour, minutes=now_time.minute, + seconds=now_time.second) + # 当前日期所在周的周日 + week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59) + return week_start_time, week_end_time + +def strptime(date_string): + """ + 将字符串转换成datetime.datetime类型 + :param date_string: '2022-05-29 23:59:59' + :return: 2022-05-29 23:59:59 + """ + return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S') + +def strptime1(date_str): + """ + 将字符串转换成datetime.datetime类型 + :param date_string: '2022-05-29' + :return: 2022-05-29 00:00:00 + """ + return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + +def start_end_month(time): + """ + 获取某个月的起始时间和结束时间 + :param time: '2022-05-29' + :return: + """ + now=p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S") + this_month_start = datetime.datetime(now.year, now.month, 1) + this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1]) + this_month_end1=this_month_end+timedelta(hours=23, minutes=59, seconds=59) + return this_month_start,this_month_end1 \ No newline at end of file