224 lines
7.4 KiB
Python
224 lines
7.4 KiB
Python
from typing import Tuple
|
||
|
||
import arrow
|
||
import sqlalchemy as sa
|
||
import json
|
||
|
||
from fastapi import Depends
|
||
|
||
import pandas as pd
|
||
|
||
from sqlalchemy import func, or_, and_, not_
|
||
|
||
import crud
|
||
import schemas
|
||
from core.config import settings
|
||
from db import get_database
|
||
from db.redisdb import get_redis_pool, RedisDrive
|
||
|
||
|
||
class UserAnalysis:
|
||
def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)):
|
||
self.game = game
|
||
self.rdb = rdb
|
||
self.user_tbl = None
|
||
self.event_view = data_in.eventView
|
||
self.events = data_in.events
|
||
|
||
self.zone_time: int = 0
|
||
self.data_in = data_in
|
||
|
||
self.global_filters = []
|
||
self.groupby = None
|
||
self.time_particle = None
|
||
self.date_range = None
|
||
self.unit_num = None
|
||
self.global_relation = 'and'
|
||
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
|
||
|
||
async def init(self, *args, **kwargs):
|
||
if self.data_in.report_id:
|
||
db = get_database()
|
||
report = await crud.report.get(db, id=self.data_in.report_id)
|
||
self.event_view = report['query']['eventView']
|
||
self.events = report['query']['events']
|
||
|
||
else:
|
||
self.event_view = self.data_in.eventView
|
||
self.events = self.data_in.events
|
||
|
||
await self._init_table()
|
||
self.zone_time = self._get_zone_time()
|
||
self.time_particle = self._get_time_particle_size()
|
||
self.groupby = self._get_group_by()
|
||
self.unit_num = self._get_unit_num()
|
||
self.global_relation = self.event_view.get('relation', 'and')
|
||
# 用户自带过滤
|
||
if 'data_where' in kwargs:
|
||
self.global_filters.extend(kwargs['data_where'].get(self.game, []))
|
||
|
||
async def _init_table(self):
|
||
"""
|
||
从redis中取出表字段,构建表结构
|
||
:return:
|
||
"""
|
||
res_json = await self.rdb.get(f'{self.game}_user')
|
||
columns = json.loads(res_json).keys()
|
||
metadata = sa.MetaData(schema=self.game)
|
||
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
|
||
|
||
def _get_time_particle_size(self):
|
||
return self.event_view.get('timeParticleSize') or 'P1D'
|
||
|
||
def _get_unit_num(self):
|
||
return self.event_view.get('unitNum')
|
||
|
||
def _get_group_by(self):
|
||
return [getattr(self.user_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])]
|
||
|
||
def _get_zone_time(self):
|
||
return int(self.event_view.get('zone_time', 8))
|
||
|
||
# def _get_filters(self, filters):
|
||
# tbl = self.user_tbl
|
||
# where = []
|
||
# for item in filters:
|
||
# col = getattr(tbl.c, item['columnName'])
|
||
#
|
||
# comparator = item['comparator']
|
||
# ftv = item['ftv']
|
||
# if comparator == '==':
|
||
# if len(ftv) > 1:
|
||
# where.append(or_(*[col == v for v in ftv]))
|
||
# else:
|
||
# where.append(col == ftv[0])
|
||
# elif comparator == '>=':
|
||
# where.append(col >= ftv[0])
|
||
# elif comparator == '<=':
|
||
# where.append(col <= ftv[0])
|
||
# elif comparator == '>':
|
||
# where.append(col > ftv[0])
|
||
# elif comparator == '<':
|
||
# where.append(col < ftv[0])
|
||
#
|
||
# elif comparator == 'is not null':
|
||
# where.append(col.isnot(None))
|
||
# elif comparator == 'is null':
|
||
# where.append(col.is_(None))
|
||
#
|
||
# elif comparator == '!=':
|
||
# where.append(col != ftv[0])
|
||
#
|
||
# elif comparator == 'like':
|
||
# where.append(col.like(f'%{ftv[0]}%'))
|
||
#
|
||
# elif comparator == 'not like':
|
||
# where.append(col.notlike(f'%{ftv[0]}%'))
|
||
#
|
||
# elif comparator == 'in':
|
||
# where.append(col.in_(ftv))
|
||
#
|
||
#
|
||
# return where
|
||
|
||
def handler_filts(self, *filters):
|
||
"""
|
||
:param filters: (filts:list,relation:str)
|
||
:param g_f:
|
||
:param relation:
|
||
:return:
|
||
"""
|
||
|
||
user_filters = []
|
||
for filter in filters:
|
||
filts = filter[0]
|
||
relation = filter[1]
|
||
user_filter = []
|
||
for item in filts:
|
||
|
||
where = user_filter
|
||
|
||
col = sa.Column(item['columnName'])
|
||
if item.get('data_type') == 'datetime':
|
||
col = func.addHours(col, self.zone_time)
|
||
|
||
comparator = item['comparator']
|
||
ftv = item['ftv']
|
||
if comparator == '==':
|
||
if len(ftv) > 1:
|
||
where.append(or_(*[col == v for v in ftv]))
|
||
else:
|
||
where.append(col == ftv[0])
|
||
elif comparator == '>=':
|
||
where.append(col >= ftv[0])
|
||
elif comparator == '<=':
|
||
where.append(col <= ftv[0])
|
||
elif comparator == '>':
|
||
where.append(col > ftv[0])
|
||
elif comparator == '<':
|
||
where.append(col < ftv[0])
|
||
|
||
elif comparator == 'is not null':
|
||
where.append(col.isnot(None))
|
||
elif comparator == 'is null':
|
||
where.append(col.is_(None))
|
||
|
||
elif comparator == 'like':
|
||
where.append(col.like(f'%{ftv[0]}%'))
|
||
|
||
elif comparator == 'not like':
|
||
where.append(col.notlike(f'%{ftv[0]}%'))
|
||
|
||
elif comparator == 'in':
|
||
where.append(col.in_(ftv))
|
||
|
||
elif comparator == '!=':
|
||
where.append(col != ftv[0])
|
||
if relation == 'and':
|
||
if user_filter:
|
||
user_filters.append(and_(*user_filter))
|
||
else:
|
||
if user_filter:
|
||
user_filters.append(or_(*user_filter))
|
||
|
||
return user_filters
|
||
|
||
def property_model(self):
|
||
event = self.events
|
||
selectd = getattr(self.user_tbl.c, event['quota'])
|
||
qry = sa.select(selectd)
|
||
|
||
account_id_col = getattr(self.user_tbl.c, '#account_id')
|
||
binduid_col = getattr(self.user_tbl.c, '#account_id')
|
||
# 聚合方式
|
||
analysis = event['analysis']
|
||
|
||
if analysis == 'trig_user_num':
|
||
selectd = [func.count().label('values')]
|
||
elif analysis == 'distinct_count':
|
||
selectd = [
|
||
func.count(sa.distinct(getattr(self.user_tbl.c, event['quota']))).label('values')]
|
||
|
||
else:
|
||
selectd = [
|
||
func.round(getattr(func, analysis)(getattr(self.user_tbl.c, event['quota'])), 2).label(
|
||
'values')]
|
||
|
||
where = self.handler_filts((event['filts'], event.get('relation')),
|
||
(self.global_filters, self.global_relation),
|
||
self.ext_filters
|
||
)
|
||
qry = sa.select((*self.groupby, *selectd)).where(*where)
|
||
|
||
qry = qry.group_by(*self.groupby)
|
||
qry = qry.order_by(sa.Column('values').desc())
|
||
qry = qry.limit(1000)
|
||
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
||
print(sql)
|
||
result = {'sql': sql,
|
||
'groupby': [i.key for i in self.groupby],
|
||
'quota': event['quota']
|
||
}
|
||
|
||
return result
|