from typing import Tuple import arrow import sqlalchemy as sa import json from fastapi import Depends import pandas as pd from sqlalchemy import func, or_, and_, not_ import crud import schemas from core.config import settings from db import get_database from db.redisdb import get_redis_pool, RedisDrive class UserAnalysis: def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)): self.game = game self.rdb = rdb self.user_tbl = None self.event_view = data_in.eventView self.events = data_in.events self.zone_time: int = 0 self.data_in = data_in self.global_filters = [] self.groupby = None self.time_particle = None self.date_range = None self.unit_num = None self.global_relation = 'and' self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) async def init(self, *args, **kwargs): if self.data_in.report_id: db = get_database() report = await crud.report.get(db, id=self.data_in.report_id) self.event_view = report['query']['eventView'] self.events = report['query']['events'] else: self.event_view = self.data_in.eventView self.events = self.data_in.events await self._init_table() self.zone_time = self._get_zone_time() self.time_particle = self._get_time_particle_size() self.groupby = self._get_group_by() self.unit_num = self._get_unit_num() self.global_relation = self.event_view.get('relation', 'and') # 用户自带过滤 if 'data_where' in kwargs: self.global_filters.extend(kwargs['data_where'].get(self.game, [])) async def _init_table(self): """ 从redis中取出表字段,构建表结构 :return: """ res_json = await self.rdb.get(f'{self.game}_user') columns = json.loads(res_json).keys() metadata = sa.MetaData(schema=self.game) self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) def _get_time_particle_size(self): return self.event_view.get('timeParticleSize') or 'P1D' def _get_unit_num(self): return self.event_view.get('unitNum') def _get_group_by(self): return [getattr(self.user_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])] def _get_zone_time(self): return int(self.event_view.get('zone_time', 8)) # def _get_filters(self, filters): # tbl = self.user_tbl # where = [] # for item in filters: # col = getattr(tbl.c, item['columnName']) # # comparator = item['comparator'] # ftv = item['ftv'] # if comparator == '==': # if len(ftv) > 1: # where.append(or_(*[col == v for v in ftv])) # else: # where.append(col == ftv[0]) # elif comparator == '>=': # where.append(col >= ftv[0]) # elif comparator == '<=': # where.append(col <= ftv[0]) # elif comparator == '>': # where.append(col > ftv[0]) # elif comparator == '<': # where.append(col < ftv[0]) # # elif comparator == 'is not null': # where.append(col.isnot(None)) # elif comparator == 'is null': # where.append(col.is_(None)) # # elif comparator == '!=': # where.append(col != ftv[0]) # # elif comparator == 'like': # where.append(col.like(f'%{ftv[0]}%')) # # elif comparator == 'not like': # where.append(col.notlike(f'%{ftv[0]}%')) # # elif comparator == 'in': # where.append(col.in_(ftv)) # # # return where def handler_filts(self, *filters): """ :param filters: (filts:list,relation:str) :param g_f: :param relation: :return: """ user_filters = [] for filter in filters: filts = filter[0] relation = filter[1] user_filter = [] for item in filts: where = user_filter col = sa.Column(item['columnName']) if item.get('data_type') == 'datetime': col = func.addHours(col, self.zone_time) comparator = item['comparator'] ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: where.append(or_(*[col == v for v in ftv])) else: where.append(col == ftv[0]) elif comparator == '>=': where.append(col >= ftv[0]) elif comparator == '<=': where.append(col <= ftv[0]) elif comparator == '>': where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) elif comparator == 'is not null': where.append(col.isnot(None)) elif comparator == 'is null': where.append(col.is_(None)) elif comparator == 'like': where.append(col.like(f'%{ftv[0]}%')) elif comparator == 'not like': where.append(col.notlike(f'%{ftv[0]}%')) elif comparator == 'in': where.append(col.in_(ftv)) elif comparator == '!=': where.append(col != ftv[0]) if relation == 'and': if user_filter: user_filters.append(and_(*user_filter)) else: if user_filter: user_filters.append(or_(*user_filter)) return user_filters def property_model(self): event = self.events selectd = getattr(self.user_tbl.c, event['quota']) qry = sa.select(selectd) account_id_col = getattr(self.user_tbl.c, '#account_id') binduid_col = getattr(self.user_tbl.c, '#account_id') # 聚合方式 analysis = event['analysis'] if analysis == 'trig_user_num': selectd = [func.count().label('values')] elif analysis == 'distinct_count': selectd = [ func.count(sa.distinct(getattr(self.user_tbl.c, event['quota']))).label('values')] else: selectd = [ func.round(getattr(func, analysis)(getattr(self.user_tbl.c, event['quota'])), 2).label( 'values')] where = self.handler_filts((event['filts'], event.get('relation')), (self.global_filters, self.global_relation), self.ext_filters ) qry = sa.select((*self.groupby, *selectd)).where(*where) qry = qry.group_by(*self.groupby) qry = qry.order_by(sa.Column('values').desc()) qry = qry.limit(1000) sql = str(qry.compile(compile_kwargs={"literal_binds": True})) print(sql) result = {'sql': sql, 'groupby': [i.key for i in self.groupby], 'quota': event['quota'] } return result