235 lines
8.6 KiB
Python
235 lines
8.6 KiB
Python
"""
|
|
本质查出符合条件的用户id
|
|
得到sql 查uid
|
|
"""
|
|
|
|
import re
|
|
from typing import Tuple
|
|
|
|
import arrow
|
|
import sqlalchemy as sa
|
|
import json
|
|
|
|
from fastapi import Depends
|
|
|
|
import pandas as pd
|
|
|
|
from sqlalchemy import func, or_, and_, not_
|
|
|
|
import crud
|
|
import schemas
|
|
from core.config import settings
|
|
from db import get_database
|
|
|
|
from db.redisdb import get_redis_pool, RedisDrive
|
|
|
|
|
|
class UserClusterDef:
|
|
def __init__(self, game: str, cluster_name: str, data_where: list = None, rdb: RedisDrive = get_redis_pool(),
|
|
**kwargs):
|
|
self.game = game
|
|
self.rdb = rdb
|
|
self.cluster_name = cluster_name
|
|
self.event_tbl = None
|
|
self.data_where = data_where or []
|
|
self.kwargs = kwargs
|
|
|
|
async def _init_tal(self):
|
|
res_json = await self.rdb.get(f'{self.game}_event')
|
|
columns = json.loads(res_json).keys()
|
|
metadata = sa.MetaData(schema=self.game)
|
|
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
|
|
|
|
res_json = await self.rdb.get(f'{self.game}_user')
|
|
columns = json.loads(res_json).keys()
|
|
metadata = sa.MetaData(schema=self.game)
|
|
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
|
|
|
|
self.u_account_id_col = getattr(self.user_tbl.c, '#account_id')
|
|
self.e_account_id_col = getattr(self.event_tbl.c, '#account_id')
|
|
self.account_id_col = sa.Column('#account_id')
|
|
|
|
async def init(self):
|
|
|
|
self.data_in = (
|
|
await crud.user_label.find_one(get_database(), {'cluster_name': self.cluster_name, 'game': self.game},
|
|
{'qp': 1})).get('qp')
|
|
await self._init_tal()
|
|
self.events = self.data_in['user_cluster_def']['events']
|
|
self.event_relation = self.data_in['user_cluster_def']['event_relation']
|
|
|
|
async def handler_filts(self, *filters):
|
|
"""
|
|
|
|
:param filters: (filts:list,relation:str)
|
|
:param g_f:
|
|
:param relation:
|
|
:return:
|
|
"""
|
|
|
|
user_filters = []
|
|
event_filters = []
|
|
for filter in filters:
|
|
filts = filter[0]
|
|
relation = filter[1]
|
|
user_filter = []
|
|
event_filter = []
|
|
for item in filts:
|
|
comparator = item['comparator']
|
|
if item['tableType'] == 'user':
|
|
where = user_filter
|
|
elif item['tableType'] == 'event':
|
|
where = event_filter
|
|
else:
|
|
continue
|
|
|
|
tbl = getattr(self, f'{item["tableType"]}_tbl')
|
|
col = getattr(tbl.c, item['columnName'])
|
|
|
|
ftv = item['ftv']
|
|
if comparator == '==':
|
|
if len(ftv) > 1:
|
|
where.append(or_(*[col == v for v in ftv]))
|
|
else:
|
|
where.append(col == ftv[0])
|
|
elif comparator == '>=':
|
|
where.append(col >= ftv[0])
|
|
elif comparator == '<=':
|
|
where.append(col <= ftv[0])
|
|
elif comparator == '>':
|
|
where.append(col > ftv[0])
|
|
elif comparator == '<':
|
|
where.append(col < ftv[0])
|
|
|
|
elif comparator == 'is not null':
|
|
where.append(col.isnot(None))
|
|
elif comparator == 'is null':
|
|
where.append(col.is_(None))
|
|
|
|
elif comparator == 'like':
|
|
where.append(col.like(f'%{ftv[0]}%'))
|
|
|
|
elif comparator == 'not like':
|
|
where.append(col.notlike(f'%{ftv[0]}%'))
|
|
|
|
elif comparator == 'in':
|
|
where.append(col.in_(ftv))
|
|
|
|
elif comparator == '!=':
|
|
where.append(col != ftv[0])
|
|
if relation == 'and':
|
|
if event_filter:
|
|
event_filters.append(and_(*event_filter))
|
|
if user_filter:
|
|
user_filters.append(and_(*user_filter)),
|
|
else:
|
|
if event_filter:
|
|
event_filters.append(or_(*event_filter))
|
|
if user_filter:
|
|
user_filters.append(or_(*user_filter))
|
|
|
|
return event_filters, user_filters
|
|
|
|
def to_sql_qry(self):
|
|
qry = None
|
|
for event in self.events:
|
|
event_name = event['event_name']
|
|
event_name_col = getattr(self.event_tbl.c, '#event_name')
|
|
analysis = event['prop_quota']['analysis']
|
|
quota = event['prop_quota']['quota']
|
|
num = event['num'].split(',')
|
|
date_type = event.get('date_type', 'dynamic')
|
|
e_days = event.get('e_days')
|
|
s_days = event.get('s_days')
|
|
is_touch = event.get('is_touch', True)
|
|
|
|
filts = event['filts']
|
|
zone = event.get('zone', 8)
|
|
|
|
# 账号数据过滤
|
|
data_where = []
|
|
filters = []
|
|
filters.extend(self.data_where)
|
|
for item in filters:
|
|
tmp = settings.CK_CALC_SYMBO[item['comparator']](sa.Column(item['columnName']), item['ftv'])
|
|
data_where.append(tmp)
|
|
|
|
event_time_col = func.addHours(getattr(self.event_tbl.c, '#event_time'), zone)
|
|
date_where = []
|
|
if date_type == 'static':
|
|
start_time = event['start_time']
|
|
end_time = event['end_time']
|
|
date_where.extend(
|
|
[settings.CK_CALC_SYMBO['>='](event_time_col, start_time),
|
|
settings.CK_CALC_SYMBO['<='](event_time_col, end_time)]
|
|
)
|
|
elif date_type == 'dynamic':
|
|
start_time = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
|
|
end_time = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
|
|
date_where.extend(
|
|
[settings.CK_CALC_SYMBO['>='](event_time_col, start_time),
|
|
settings.CK_CALC_SYMBO['<='](event_time_col, end_time)]
|
|
)
|
|
else:
|
|
# 所有时间
|
|
pass
|
|
|
|
uce_calcu_symbol = event['uce_calcu_symbol']
|
|
|
|
event_name_where = []
|
|
if event_name != '*':
|
|
# 任意事件
|
|
event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name))
|
|
if quota != '*':
|
|
selectd = [self.account_id_col,
|
|
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label(
|
|
'values')
|
|
]
|
|
qry_tmp = sa.select(self.account_id_col).select_from(
|
|
sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by(
|
|
self.e_account_id_col).having(
|
|
settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num)))
|
|
else:
|
|
selectd = [self.account_id_col]
|
|
qry_tmp = sa.select(self.account_id_col).select_from(
|
|
sa.select(selectd).where(*date_where, *event_name_where, *data_where))
|
|
|
|
if qry is None:
|
|
qry = qry_tmp
|
|
else:
|
|
if self.event_relation == 'and':
|
|
qry = sa.select(self.account_id_col).select_from(
|
|
sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id')))
|
|
elif self.event_relation == 'or':
|
|
qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp))
|
|
# 处理没做过
|
|
if not is_touch:
|
|
qry = sa.select(self.u_account_id_col).where(self.u_account_id_col.notin_(qry))
|
|
|
|
return qry
|
|
|
|
def to_sql(self):
|
|
qry = self.to_sql_qry()
|
|
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
|
print(sql)
|
|
return sql
|
|
|
|
def cluster_user_list(self):
|
|
sub_qry = self.to_sql_qry()
|
|
page = self.kwargs.get('page') or 1
|
|
page -= 1
|
|
limit = self.kwargs.get('limit', 50)
|
|
qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)).order_by(sa.Column('#reg_time')) \
|
|
.offset(page * limit) \
|
|
.limit(limit)
|
|
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
|
print(sql)
|
|
return sql
|
|
|
|
def cluster_user_count(self):
|
|
sub_qry = self.to_sql_qry()
|
|
qry = sa.select(func.count(self.account_id_col).label('values')).select_from(sub_qry)
|
|
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
|
|
print(sql)
|
|
return sql
|