xbackend/models/user_label.py
2021-10-25 17:08:33 +08:00

234 lines
8.6 KiB
Python

"""
本质查出符合条件的用户id
得到sql 查uid
"""
import re
from typing import Tuple
import arrow
import sqlalchemy as sa
import json
from fastapi import Depends
import pandas as pd
from sqlalchemy import func, or_, and_, not_
import crud
import schemas
from core.config import settings
from db import get_database
from db.redisdb import get_redis_pool, RedisDrive
class UserClusterDef:
def __init__(self, game: str, cluster_name: str, data_where: list = None, rdb: RedisDrive = get_redis_pool(),
**kwargs):
self.game = game
self.rdb = rdb
self.cluster_name = cluster_name
self.event_tbl = None
self.data_where = data_where or []
self.kwargs = kwargs
async def _init_tal(self):
res_json = await self.rdb.get(f'{self.game}_event')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
self.u_account_id_col = getattr(self.user_tbl.c, '#account_id')
self.e_account_id_col = getattr(self.event_tbl.c, '#account_id')
self.account_id_col = sa.Column('#account_id')
async def init(self):
self.data_in = (
await crud.user_label.find_one(get_database(), {'cluster_name': self.cluster_name}, {'qp': 1})).get('qp')
await self._init_tal()
self.events = self.data_in['user_cluster_def']['events']
self.event_relation = self.data_in['user_cluster_def']['event_relation']
async def handler_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
user_filters = []
event_filters = []
for filter in filters:
filts = filter[0]
relation = filter[1]
user_filter = []
event_filter = []
for item in filts:
comparator = item['comparator']
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
else:
continue
tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName'])
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == 'like':
where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in':
where.append(col.in_(ftv))
elif comparator == '!=':
where.append(col != ftv[0])
if relation == 'and':
if event_filter:
event_filters.append(and_(*event_filter))
if user_filter:
user_filters.append(and_(*user_filter)),
else:
if event_filter:
event_filters.append(or_(*event_filter))
if user_filter:
user_filters.append(or_(*user_filter))
return event_filters, user_filters
def to_sql_qry(self):
qry = None
for event in self.events:
event_name = event['event_name']
event_name_col = getattr(self.event_tbl.c, '#event_name')
analysis = event['prop_quota']['analysis']
quota = event['prop_quota']['quota']
num = event['num'].split(',')
date_type = event.get('date_type', 'dynamic')
e_days = event.get('e_days')
s_days = event.get('s_days')
is_touch = event.get('is_touch',True)
filts = event['filts']
zone = event.get('zone', 8)
# 账号数据过滤
data_where = []
filters = []
filters.extend(self.data_where)
for item in filters:
tmp = settings.CK_CALC_SYMBO[item['comparator']](sa.Column(item['columnName']), item['ftv'])
data_where.append(tmp)
event_time_col = func.addHours(getattr(self.event_tbl.c, '#event_time'), zone)
date_where = []
if date_type == 'static':
start_time = event['start_time']
end_time = event['end_time']
date_where.extend(
[settings.CK_CALC_SYMBO['>='](event_time_col, start_time),
settings.CK_CALC_SYMBO['<='](event_time_col, end_time)]
)
elif date_type == 'dynamic':
start_time = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
end_time = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
date_where.extend(
[settings.CK_CALC_SYMBO['>='](event_time_col, start_time),
settings.CK_CALC_SYMBO['<='](event_time_col, end_time)]
)
else:
# 所有时间
pass
uce_calcu_symbol = event['uce_calcu_symbol']
event_name_where = []
if event_name != '*':
# 任意事件
event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name))
if quota != '*':
selectd = [self.account_id_col,
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label(
'values')
]
qry_tmp = sa.select(self.account_id_col).select_from(
sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by(
self.e_account_id_col).having(
settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num)))
else:
selectd = [self.account_id_col]
qry_tmp = sa.select(self.account_id_col).select_from(
sa.select(selectd).where(*date_where, *event_name_where, *data_where))
if qry is None:
qry = qry_tmp
else:
if self.event_relation == 'and':
qry = sa.select(self.account_id_col).select_from(
sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id')))
elif self.event_relation == 'or':
qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp))
# 处理没做过
if not is_touch:
qry = sa.select(self.u_account_id_col).where(self.u_account_id_col.notin_(qry))
return qry
def to_sql(self):
qry = self.to_sql_qry()
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql
def cluster_user_list(self):
sub_qry = self.to_sql_qry()
page = self.kwargs.get('page') or 1
page -= 1
limit = self.kwargs.get('limit', 50)
qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)).order_by(sa.Column('#reg_time')) \
.offset(page * limit) \
.limit(limit)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql
def cluster_user_count(self):
sub_qry = self.to_sql_qry()
qry = sa.select(func.count(self.account_id_col).label('values')).select_from(sub_qry)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql