xbackend/models/user_analysis.py
2021-11-18 15:53:30 +08:00

224 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Tuple
import arrow
import sqlalchemy as sa
import json
from fastapi import Depends
import pandas as pd
from sqlalchemy import func, or_, and_, not_
import crud
import schemas
from core.config import settings
from db import get_database
from db.redisdb import get_redis_pool, RedisDrive
class UserAnalysis:
def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)):
self.game = game
self.rdb = rdb
self.user_tbl = None
self.event_view = data_in.eventView
self.events = data_in.events
self.zone_time: int = 0
self.data_in = data_in
self.global_filters = []
self.groupby = None
self.time_particle = None
self.date_range = None
self.unit_num = None
self.global_relation = 'and'
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
async def init(self, *args, **kwargs):
if self.data_in.report_id:
db = get_database()
report = await crud.report.get(db, id=self.data_in.report_id)
self.event_view = report['query']['eventView']
self.events = report['query']['events']
else:
self.event_view = self.data_in.eventView
self.events = self.data_in.events
await self._init_table()
self.zone_time = self._get_zone_time()
self.time_particle = self._get_time_particle_size()
self.groupby = self._get_group_by()
self.unit_num = self._get_unit_num()
self.global_relation = self.event_view.get('relation', 'and')
# 用户自带过滤
if 'data_where' in kwargs:
self.global_filters.extend(kwargs['data_where'].get(self.game, []))
async def _init_table(self):
"""
从redis中取出表字段构建表结构
:return:
"""
res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
def _get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'P1D'
def _get_unit_num(self):
return self.event_view.get('unitNum')
def _get_group_by(self):
return [getattr(self.user_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])]
def _get_zone_time(self):
return int(self.event_view.get('zone_time', 8))
# def _get_filters(self, filters):
# tbl = self.user_tbl
# where = []
# for item in filters:
# col = getattr(tbl.c, item['columnName'])
#
# comparator = item['comparator']
# ftv = item['ftv']
# if comparator == '==':
# if len(ftv) > 1:
# where.append(or_(*[col == v for v in ftv]))
# else:
# where.append(col == ftv[0])
# elif comparator == '>=':
# where.append(col >= ftv[0])
# elif comparator == '<=':
# where.append(col <= ftv[0])
# elif comparator == '>':
# where.append(col > ftv[0])
# elif comparator == '<':
# where.append(col < ftv[0])
#
# elif comparator == 'is not null':
# where.append(col.isnot(None))
# elif comparator == 'is null':
# where.append(col.is_(None))
#
# elif comparator == '!=':
# where.append(col != ftv[0])
#
# elif comparator == 'like':
# where.append(col.like(f'%{ftv[0]}%'))
#
# elif comparator == 'not like':
# where.append(col.notlike(f'%{ftv[0]}%'))
#
# elif comparator == 'in':
# where.append(col.in_(ftv))
#
#
# return where
def handler_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
user_filters = []
for filter in filters:
filts = filter[0]
relation = filter[1]
user_filter = []
for item in filts:
where = user_filter
col = sa.Column(item['columnName'])
if item.get('data_type') == 'datetime':
col = func.addHours(col, self.zone_time)
comparator = item['comparator']
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == 'like':
where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in':
where.append(col.in_(ftv))
elif comparator == '!=':
where.append(col != ftv[0])
if relation == 'and':
if user_filter:
user_filters.append(and_(*user_filter))
else:
if user_filter:
user_filters.append(or_(*user_filter))
return user_filters
def property_model(self):
event = self.events
selectd = getattr(self.user_tbl.c, event['quota'])
qry = sa.select(selectd)
account_id_col = getattr(self.user_tbl.c, '#account_id')
binduid_col = getattr(self.user_tbl.c, '#account_id')
# 聚合方式
analysis = event['analysis']
if analysis == 'trig_user_num':
selectd = [func.count().label('values')]
elif analysis == 'distinct_count':
selectd = [
func.count(sa.distinct(getattr(self.user_tbl.c, event['quota']))).label('values')]
else:
selectd = [
func.round(getattr(func, analysis)(getattr(self.user_tbl.c, event['quota'])), 2).label(
'values')]
where = self.handler_filts((event['filts'], event.get('relation')),
(self.global_filters, self.global_relation),
self.ext_filters
)
qry = sa.select((*self.groupby, *selectd)).where(*where)
qry = qry.group_by(*self.groupby)
qry = qry.order_by(sa.Column('values').desc())
qry = qry.limit(1000)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
result = {'sql': sql,
'groupby': [i.key for i in self.groupby],
'quota': event['quota']
}
return result