""" 本质查出符合条件的用户id 得到sql 查uid """ import re from typing import Tuple import arrow import sqlalchemy as sa import json from fastapi import Depends import pandas as pd from sqlalchemy import func, or_, and_, not_ import crud import schemas from core.config import settings from db import get_database from db.redisdb import get_redis_pool, RedisDrive class UserClusterDef: def __init__(self, game: str, cluster_name: str, data_where: list = None, rdb: RedisDrive = get_redis_pool(), **kwargs): self.game = game self.rdb = rdb self.cluster_name = cluster_name self.event_tbl = None self.data_where = data_where or [] self.kwargs = kwargs async def _init_tal(self): res_json = await self.rdb.get(f'{self.game}_event') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) res_json = await self.rdb.get(f'{self.game}_user') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) # self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) # 修改了这里,这是原本的 self.user_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) self.u_account_id_col = getattr(self.user_tbl.c, '#account_id') self.e_account_id_col = getattr(self.event_tbl.c, '#account_id') self.account_id_col = sa.Column('#account_id') async def init(self): self.data_in = ( await crud.user_label.find_one(get_database(), {'cluster_name': self.cluster_name, 'game': self.game}, {'qp': 1})).get('qp') await self._init_tal() self.events = self.data_in['user_cluster_def']['events'] self.event_relation = self.data_in['user_cluster_def']['event_relation'] async def handler_filts(self, *filters): """ :param filters: (filts:list,relation:str) :param g_f: :param relation: :return: """ user_filters = [] event_filters = [] for filter in filters: filts = filter[0] relation = filter[1] user_filter = [] event_filter = [] for item in filts: comparator = item['comparator'] if item['tableType'] == 'user': where = user_filter elif item['tableType'] == 'event': where = event_filter else: continue tbl = getattr(self, f'{item["tableType"]}_tbl') col = getattr(tbl.c, item['columnName']) ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: where.append(or_(*[col == v for v in ftv])) else: where.append(col == ftv[0]) elif comparator == '>=': where.append(col >= ftv[0]) elif comparator == '<=': where.append(col <= ftv[0]) elif comparator == '>': where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) elif comparator == 'is not null': where.append(col.isnot(None)) elif comparator == 'is null': where.append(col.is_(None)) elif comparator == 'like': where.append(col.like(f'%{ftv[0]}%')) elif comparator == 'not like': where.append(col.notlike(f'%{ftv[0]}%')) elif comparator == 'in': where.append(col.in_(ftv)) elif comparator == '!=': where.append(col != ftv[0]) if relation == 'and': if event_filter: event_filters.append(and_(*event_filter)) if user_filter: user_filters.append(and_(*user_filter)), else: if event_filter: event_filters.append(or_(*event_filter)) if user_filter: user_filters.append(or_(*user_filter)) return event_filters, user_filters def to_sql_qry(self): qry = None for event in self.events: event_name = event['event_name'] event_name_col = getattr(self.event_tbl.c, '#event_name') analysis = event['prop_quota']['analysis'] quota = event['prop_quota']['quota'] num = event['num'].split(',') date_type = event.get('date_type', 'dynamic') e_days = event.get('e_days') s_days = event.get('s_days') is_touch = event.get('is_touch', True) filts = event['filts'] zone = event.get('zone', 8) # 账号数据过滤 data_where = [] filters = [] filters.extend(self.data_where) for item in filters: tmp = settings.CK_CALC_SYMBO[item['comparator']](sa.Column(item['columnName']), item['ftv']) data_where.append(tmp) event_time_col = func.addHours(getattr(self.event_tbl.c, '#event_time'), zone) date_where = [] if date_type == 'static': start_time = event['start_time'] end_time = event['end_time'] date_where.extend( [settings.CK_CALC_SYMBO['>='](event_time_col, start_time), settings.CK_CALC_SYMBO['<='](event_time_col, end_time)] ) elif date_type == 'dynamic': start_time = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') end_time = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') date_where.extend( [settings.CK_CALC_SYMBO['>='](event_time_col, start_time), settings.CK_CALC_SYMBO['<='](event_time_col, end_time)] ) else: # 所有时间 pass uce_calcu_symbol = event['uce_calcu_symbol'] event_name_where = [] if event_name != '*': # 任意事件 event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name)) if quota != '*': selectd = [self.account_id_col, func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label( 'values') ] if len(num) >1:#处理区间筛选的问题 qry_tmp = sa.select(self.account_id_col).select_from( sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( self.e_account_id_col).having(sa.and_(sa.Column('values') > num[0],sa.Column('values') <= num[1]) )) else: qry_tmp = sa.select(self.account_id_col).select_from( sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( self.e_account_id_col).having( settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num))) else: selectd = [self.account_id_col] qry_tmp = sa.select(self.account_id_col).select_from( sa.select(selectd).where(*date_where, *event_name_where, *data_where)) if qry is None: qry = qry_tmp else: if self.event_relation == 'and': qry = sa.select(self.account_id_col).select_from( sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id'))) elif self.event_relation == 'or': qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp)) # 处理没做过 if not is_touch: qry = sa.select(self.u_account_id_col).where(self.u_account_id_col.notin_(qry)) return qry def to_sql(self): qry = self.to_sql_qry() sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return sql def cluster_user_list(self): sub_qry = self.to_sql_qry() page = self.kwargs.get('page') or 1 page -= 1 limit = self.kwargs.get('limit', 50) qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)).order_by(sa.Column('#reg_time')) \ .offset(page * limit) \ .limit(limit) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return sql def cluster_user_count(self): sub_qry = self.to_sql_qry() qry = sa.select(func.count(self.account_id_col).label('values')).select_from(sub_qry) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) return sql