用户标签

This commit is contained in:
wuaho 2021-10-21 19:13:13 +08:00
parent 9375114519
commit 9adee8f27a
15 changed files with 507 additions and 109 deletions

View File

@ -14,6 +14,7 @@ from .endpoints import event_mana
from .endpoints import test
from .authz import authz
from .check_data import controller as check_data
from .user_label import controller as user_label
api_router = APIRouter()
api_router.include_router(test.router, tags=["test"], prefix='/test')
@ -36,3 +37,4 @@ api_router.include_router(xquery.router, tags=["xck"], prefix='/ck')
api_router.include_router(authz.router, tags=["api接口管理"], prefix='/authz')
api_router.include_router(check_data.router, tags=["打点验证"], prefix='/check_data')
api_router.include_router(user_label.router, tags=["用户标签"], prefix='/user_label')

View File

@ -276,6 +276,15 @@ async def load_filter_props(request: Request,
}
user_props.append(user_prop)
user_label_props = []
user_label_docs = await crud.user_label.find_many(db, {'game': game}, {'qp': 0})
for item in user_label_docs:
tmp = {
'id': item['_id'],
'data_type': 'user_label',
'title': item['display_name'],
}
user_label_props.append(tmp)
res = [
{
'title': '事件属性',
@ -286,6 +295,11 @@ async def load_filter_props(request: Request,
'title': '用户属性',
'id': 'user',
'category': user_props
},
{
'title': '用户标签',
'id': 'user_label',
'category': user_label_props
}
]

View File

@ -70,7 +70,7 @@ async def event_model_sql(
""" 事件分析模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.event_model_sql()
data = await analysis.event_model_sql()
return schemas.Msg(code=0, msg='ok', data=data)
@ -83,7 +83,7 @@ async def event_model_export(request: Request,
):
""" 事件分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
sqls = analysis.event_model_sql()
sqls = await analysis.event_model_sql()
file_name = quote(f'{sqls[0]["report_name"]}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
excels = []
@ -142,7 +142,7 @@ async def event_model(
) -> schemas.Msg:
""" 事件分析"""
await analysis.init(data_where=current_user.data_where)
sqls = analysis.event_model_sql()
sqls = await analysis.event_model_sql()
res = []
for item in sqls:
q = {
@ -271,7 +271,7 @@ async def retention_model_sql(
) -> schemas.Msg:
"""留存查询 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.retention_model_sql2()
data = await analysis.retention_model_sql2()
return schemas.Msg(code=0, msg='ok', data=[data])
@ -284,7 +284,7 @@ async def retention_model(request: Request,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
await analysis.init(data_where=current_user.data_where)
res = analysis.retention_model_sql2()
res = await analysis.retention_model_sql2()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
if len(df) == 0:
@ -347,7 +347,7 @@ async def retention_model_export(request: Request,
):
""" 留存分析模型 数据导出"""
await analysis.init(data_where=current_user.data_where)
data = analysis.retention_model_sql2()
data = await analysis.retention_model_sql2()
file_name = quote(f'留存分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
@ -370,7 +370,7 @@ async def retention_model_del(
) -> schemas.Msg:
"""留存数据模型"""
await analysis.init(data_where=current_user.data_where)
res = analysis.retention_model_sql()
res = await analysis.retention_model_sql()
sql = res['sql']
date_range = res['date_range']
event_a, event_b = res['event_name']
@ -443,7 +443,7 @@ async def funnel_model_sql(
) -> schemas.Msg:
"""漏斗数据模型 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.funnel_model_sql()
data = await analysis.funnel_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@ -458,7 +458,7 @@ async def funnel_model(
) -> schemas.Msg:
"""漏斗数据模型"""
await analysis.init(data_where=current_user.data_where)
res = analysis.funnel_model_sql()
res = await analysis.funnel_model_sql()
sql = res['sql']
date_range = res['date_range']
cond_level = res['cond_level']
@ -590,7 +590,7 @@ async def scatter_model_sql(
) -> schemas.Msg:
"""分布分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.scatter_model_sql()
data = await analysis.scatter_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@ -603,7 +603,7 @@ async def retention_model_export(request: Request,
):
""" 分布分析 数据导出"""
await analysis.init(data_where=current_user.data_where)
res = analysis.scatter_model_sql()
res = await analysis.scatter_model_sql()
file_name = quote(f'分布分析.xlsx')
mime = mimetypes.guess_type(file_name)[0]
sql = res['sql']
@ -723,7 +723,7 @@ async def scatter_model(
) -> schemas.Msg:
"""分布分析 模型"""
await analysis.init(data_where=current_user.data_where)
res = analysis.scatter_model_sql()
res = await analysis.scatter_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
df.fillna(0, inplace=True)
@ -784,10 +784,9 @@ async def scatter_model(
title = '.'.join(key[1:])
date = key[0]
resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total,
'p': round(bins_s * 100 / total, 2).to_list(),
'p': round((bins_s * 100 / total).fillna(0), 2).to_list(),
'title': title
}
return schemas.Msg(code=0, msg='ok', data=resp)
# elif analysis == 'number_of_days':
@ -834,7 +833,7 @@ async def trace_model_sql(
) -> schemas.Msg:
"""路径分析 sql"""
await analysis.init(data_where=current_user.data_where)
data = analysis.trace_model_sql()
data = await analysis.trace_model_sql()
return schemas.Msg(code=0, msg='ok', data=[data])
@ -848,7 +847,7 @@ async def trace_model_sql(
) -> schemas.Msg:
"""路径分析"""
await analysis.init(data_where=current_user.data_where)
res = analysis.trace_model_sql()
res = await analysis.trace_model_sql()
sql = res['sql']
df = await ckdb.query_dataframe(sql)
chain_dict = defaultdict(dict)

View File

View File

@ -0,0 +1,81 @@
from fastapi import APIRouter, Request, Depends
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from api import deps
from api.api_v1.user_label import service
from db import get_database
router = APIRouter()
@router.post("/save")
async def save(request: Request,
data_in: schemas.UserLabelSave,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户标签保存"""
await service.save(db, data_in, request.user.username, game)
return schemas.Msg(code=0, msg='ok')
@router.get("/list")
async def get_list(request: Request,
project_id: str,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""读取项目保存的用户标签"""
data = await service.get_list(db, project_id)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/detail")
async def get_detail(request: Request,
game: str,
data_id: schemas.UserLabelDetail,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""读取用户标签详细"""
data = await service.get_detail(db, data_id.label_id)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/del")
async def delete(request: Request,
game: str,
data_id: schemas.UserLabelDel,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""删除用户标签"""
data = await service.delete(db, data_id.label_id)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/sql")
async def sql(request: Request,
data_in: schemas.UserLabelJson2Sql,
game: str,
# db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""自定义用户标签 sql测试"""
data = await service.json2sql(game, data_in.label_id)
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/cluster_user")
async def cluster_user(request: Request,
data_in: schemas.UserLabelJson2Sql,
game: str,
# db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""获取该标签用户"""
data = await service.get_cluster_user(game, data_in.label_id)
return schemas.Msg(code=0, msg='ok', data=data)

View File

@ -0,0 +1,36 @@
import crud
from models.user_label import UserClusterDef
async def save(db, data_in, act_user, game):
return await crud.user_label.save(db, data_in, act_user, game)
async def read(db, data_in):
return await crud.user_label.read(db, data_in)
async def get_list(db, project_id):
return await crud.user_label.get_list(db, project_id)
async def get_detail(db, label_id):
return await crud.user_label.get(db, label_id)
async def delete(db, label_id):
await crud.user_label.delete_id(db, label_id)
return True
async def json2sql(game, date_in):
user_cluster_def = UserClusterDef(game, date_in)
await user_cluster_def.init()
return user_cluster_def.to_sql()
async def get_cluster_user(game, date_in):
user_cluster_def = UserClusterDef(game, date_in)
await user_cluster_def.init()
sql = user_cluster_def.cluster_user()
return sql

View File

@ -2,7 +2,7 @@ import sys
from typing import Any, Dict, List, Optional, Union
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, validator
from sqlalchemy import func
from sqlalchemy import func, and_
class Settings(BaseSettings):
@ -39,17 +39,20 @@ class Settings(BaseSettings):
}
CK_CALC_SYMBO = {
'==': lambda col, val: col == val,
'>=': lambda col, val: col >= val,
'<=': lambda col, val: col <= val,
'>': lambda col, val: col > val,
'<': lambda col, val: col < val,
'is not null': lambda col: col.isnot(None),
'is null': lambda col: col.is_(None),
'like': lambda col, val: col.like(f'%{val}%'),
'not like': lambda col, val: col.notlike(f'%{val}%'),
'in': lambda col, val: col.in_(val),
'!=': lambda col, val: col != val,
'==': lambda col, *val: col == val[0],
'>=': lambda col, *val: col >= val[0],
'<=': lambda col, *val: col <= val[0],
'>': lambda col, *val: col > val[0],
'<': lambda col, *val: col < val[0],
'is not null': lambda col, *val: col.isnot(None),
'is null': lambda col, *val: col.is_(None),
'like': lambda col, *val: col.like(f'%{val[0]}%'),
'not like': lambda col, *val: col.notlike(f'%{val[0]}%'),
'in': lambda col, *val: col.in_(val[0]),
'not in': lambda col, *val: col.notin_(val[0]),
'!=': lambda col, *val: col != val[0],
'range': lambda col, *val: and_(col >= val[0], col <= val[1])
}
CK_TYPE_DICT = {"DateTime('UTC')": 'datetime',
@ -275,6 +278,16 @@ class Settings(BaseSettings):
'title': '无值'
},
],
'user_label': [
{
'id': 'in',
'title': ''
},
{
'id': 'not in',
'title': '不是'
},
]
}
ARITHMETIC = {
'+': lambda x, y: x + y,

View File

@ -11,4 +11,5 @@ from .crud_api_log import api_log
from .crud_event_mana import event_mana
from .crud_api_list import api_list
from .crud_role import role
from .crud_check_data import check_data
from .crud_check_data import check_data
from .user_label import user_label

View File

@ -8,8 +8,8 @@ class CRUDBase:
def __init__(self, coll_name):
self.coll_name = coll_name
async def get(self, db, id: Union[ObjectId, str]) -> dict:
return (await db[self.coll_name].find_one({'_id': id})) or dict()
async def get(self, db, id: Union[ObjectId, str], *args, **kwargs) -> dict:
return (await db[self.coll_name].find_one({'_id': id}, *args, **kwargs)) or dict()
async def insert_one(self, db, document):
return await db[self.coll_name].insert_one(document)

34
crud/user_label.py Normal file
View File

@ -0,0 +1,34 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'user_label',
from utils import get_uid
class CRUDUserLabel(CRUDBase):
async def save(self, db: AsyncIOMotorDatabase, data_in: schemas.UserLabelSave, act_name, game):
where = {'project_id': data_in.project_id, 'cluster_name': data_in.cluster_name,
'game': game}
is_exists = await self.find_one(db, where)
data = data_in.dict(skip_defaults=True)
data['act_name'] = act_name
if not is_exists:
data = {'$set': {**data, '_id': get_uid()}}
return await self.update_one(db, where, data, upsert=True)
return await self.update_one(db, where, {'$set': data}, upsert=True)
async def read(self, db: AsyncIOMotorDatabase, data_in: schemas.UserLabelRead):
where = data_in.dict(skip_defaults=True)
res = await self.find_many(db, where)
return res
async def get_list(self, db: AsyncIOMotorDatabase, project_id: str):
where = {'project_id': project_id}
res = await self.find_many(db, where, {'qp': 0})
return res
user_label = CRUDUserLabel('user_label')

View File

@ -115,4 +115,4 @@ async def add_process_time_header(request: Request, call_next):
if __name__ == '__main__':
uvicorn.run(app='main:app', host="0.0.0.0", port=7889, reload=True, debug=True)
uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True)

View File

@ -17,6 +17,7 @@ from core.config import settings
from db import get_database
from db.redisdb import get_redis_pool, RedisDrive
from models.user_label import UserClusterDef
class CombinationEvent:
@ -127,6 +128,7 @@ class BehaviorAnalysis:
self.combination_event = []
self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and'))
self.global_relation = 'and'
self.data_where = []
async def init(self, *args, **kwargs):
@ -166,7 +168,8 @@ class BehaviorAnalysis:
# 用户自带过滤
if 'data_where' in kwargs:
self.global_filters.extend(kwargs['data_where'].get(self.game, []))
self.data_where = kwargs['data_where'].get(self.game, [])
self.global_filters.extend(self.data_where)
# self.global_filters.extend(self.data_in.ext_filter.get('filts', []))
def _get_time_particle_size(self):
@ -217,66 +220,8 @@ class BehaviorAnalysis:
# self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns])
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
#
# def handler_filts(self, *ext_filters, g_f=True, relation='and'):
# user_filter = []
# event_filter = []
# # filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,)
# filters = []
# filters.extend(ext_filters)
# if g_f:
# filters.extend(self.global_filters)
#
# # filters = [] if filters == ([],) else filters
# for item in filters:
# if item['tableType'] == 'user':
# where = user_filter
# elif item['tableType'] == 'event':
# where = event_filter
# else:
# continue
#
# tbl = getattr(self, f'{item["tableType"]}_tbl')
# col = getattr(tbl.c, item['columnName'])
#
# comparator = item['comparator']
# ftv = item['ftv']
# if comparator == '==':
# if len(ftv) > 1:
# where.append(or_(*[col == v for v in ftv]))
# else:
# where.append(col == ftv[0])
# elif comparator == '>=':
# where.append(col >= ftv[0])
# elif comparator == '<=':
# where.append(col <= ftv[0])
# elif comparator == '>':
# where.append(col > ftv[0])
# elif comparator == '<':
# where.append(col < ftv[0])
#
# elif comparator == 'is not null':
# where.append(col.isnot(None))
# elif comparator == 'is null':
# where.append(col.is_(None))
#
# elif comparator == 'like':
# where.append(col.like(f'%{ftv[0]}%'))
#
# elif comparator == 'not like':
# where.append(col.notlike(f'%{ftv[0]}%'))
#
# elif comparator == 'in':
# where.append(col.in_(ftv))
#
# elif comparator == '!=':
# where.append(col != ftv[0])
# if relation == 'and':
# return event_filter, user_filter
# else:
# return or_(*event_filter), or_(*user_filter)
def handler_filts(self, *filters):
async def handler_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
@ -293,17 +238,29 @@ class BehaviorAnalysis:
user_filter = []
event_filter = []
for item in filts:
comparator = item['comparator']
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
elif item['tableType'] == 'user_label':
user_cluster_def=UserClusterDef(self.game,item['columnName'],self.data_where)
await user_cluster_def.init()
sub_qry = user_cluster_def.to_sql_qry()
if comparator == 'in':
event_filter.append(sa.Column('#account_id').in_(sub_qry))
else:
event_filter.append(sa.Column('#account_id').notin_(sub_qry))
continue
else:
where = event_filter
continue
tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName'])
comparator = item['comparator']
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
@ -348,7 +305,7 @@ class BehaviorAnalysis:
return event_filters, user_filters
def retention_model_sql(self):
async def retention_model_sql(self):
event_name_a = self.events[0]['eventName']
event_name_b = self.events[1]['eventName']
visit_name = self.events[0].get('event_attr_id')
@ -361,7 +318,7 @@ class BehaviorAnalysis:
if visit_name:
who_visit = getattr(self.event_tbl.c, visit_name)
filters, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')),
filters, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')),
self.ext_filters)
filters = filters or [1]
selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'),
@ -389,7 +346,7 @@ class BehaviorAnalysis:
func.addHours(event_time_col, self.zone_time) <= self.end_date,
]
event_filter, user_filter = self.handler_filts(
event_filter, user_filter = await self.handler_filts(
(self.global_filters, self.global_relation),
self.ext_filters
)
@ -416,7 +373,7 @@ class BehaviorAnalysis:
'end_date': self.end_date[:10],
}
def event_model_sql(self):
async def event_model_sql(self):
sqls = []
event_time_col = getattr(self.event_tbl.c, '#event_time')
@ -445,7 +402,7 @@ class BehaviorAnalysis:
custom = CustomEvent(self.event_tbl, formula, format).parse()
event_name = custom['event_name']
where = [event_name_col.in_(event_name)]
event_filter, _ = self.handler_filts((event['filts'], event.get('relation')),
event_filter, _ = await self.handler_filts((event['filts'], event.get('relation')),
(self.global_filters, self.global_relation),
self.ext_filters
)
@ -475,7 +432,7 @@ class BehaviorAnalysis:
base_where.append(event_name_col == event_name)
analysis = event['analysis']
event_filter, user_filter = self.handler_filts(
event_filter, user_filter = await self.handler_filts(
(event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters
@ -530,7 +487,7 @@ class BehaviorAnalysis:
return sqls
def funnel_model_sql(self):
async def funnel_model_sql(self):
"""
SELECT level, count(*) AS values
FROM (SELECT windowFunnel(86400)(shjy.event."#event_time", shjy.event."#event_name" = 'create_role',
@ -554,7 +511,7 @@ ORDER BY level
conds = []
cond_level = []
for item in self.events:
event_filter, _ = self.handler_filts((item['filts'], item.get('relation', 'and'))
event_filter, _ = await self.handler_filts((item['filts'], item.get('relation', 'and'))
, self.ext_filters)
conds.append(
and_(event_name_col == item['eventName'], *event_filter)
@ -565,7 +522,7 @@ ORDER BY level
func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from(
self.event_tbl)
g_event_filter, _ = self.handler_filts((self.global_filters, self.global_relation)
g_event_filter, _ = await self.handler_filts((self.global_filters, self.global_relation)
, self.ext_filters)
where = [
func.addHours(event_time_col, self.zone_time) >= self.start_date,
@ -593,7 +550,7 @@ ORDER BY level
'end_date': self.end_date[:10],
}
def scatter_model_sql(self):
async def scatter_model_sql(self):
event = self.events[0]
event_name = event['eventName']
analysis = event['analysis']
@ -614,7 +571,7 @@ ORDER BY level
]
if event_name != '*':
where.append(event_name_col == event_name)
event_filter, user_filter = self.handler_filts((event['filts'], event.get('relation', 'and')),
event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
if user_filter:
@ -783,7 +740,7 @@ ORDER BY values desc"""
'end_date': self.end_date[:10],
}
def retention_model_sql2(self):
async def retention_model_sql2(self):
filter_item_type = self.event_view.get('filter_item_type')
filter_item = self.event_view.get('filter_item')
event_name_a = self.events[0]['eventName']
@ -791,7 +748,7 @@ ORDER BY values desc"""
visit_name = self.events[0].get('event_attr_id')
where, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')),
where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_a = '1'
@ -800,7 +757,7 @@ ORDER BY values desc"""
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
where_a = sql.split('WHERE ')[1]
where, _ = self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')),
where, _ = await self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')),
(self.global_filters, self.global_relation)
, self.ext_filters)
where_b = '1'

208
models/user_label.py Normal file
View File

@ -0,0 +1,208 @@
"""
本质查出符合条件的用户id
得到sql 查uid
"""
import re
from typing import Tuple
import arrow
import sqlalchemy as sa
import json
from fastapi import Depends
import pandas as pd
from sqlalchemy import func, or_, and_, not_
import crud
import schemas
from core.config import settings
from db import get_database
from db.redisdb import get_redis_pool, RedisDrive
class UserClusterDef:
def __init__(self, game: str, label_id: str, data_where: list = None, rdb: RedisDrive = get_redis_pool()):
self.game = game
self.rdb = rdb
self.label_id = label_id
self.event_tbl = None
self.data_where = data_where or []
async def _init_tal(self):
res_json = await self.rdb.get(f'{self.game}_event')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns])
res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
self.u_account_id_col = getattr(self.user_tbl.c, '#account_id')
self.e_account_id_col = getattr(self.event_tbl.c, '#account_id')
self.account_id_col = sa.Column('#account_id')
async def init(self):
self.data_in = (await crud.user_label.get(get_database(), self.label_id, {'qp': 1})).get('qp')
await self._init_tal()
self.events = self.data_in['user_cluster_def']['events']
self.event_relation = self.data_in['user_cluster_def']['event_relation']
async def handler_filts(self, *filters):
"""
:param filters: (filts:list,relation:str)
:param g_f:
:param relation:
:return:
"""
user_filters = []
event_filters = []
for filter in filters:
filts = filter[0]
relation = filter[1]
user_filter = []
event_filter = []
for item in filts:
comparator = item['comparator']
if item['tableType'] == 'user':
where = user_filter
elif item['tableType'] == 'event':
where = event_filter
else:
continue
tbl = getattr(self, f'{item["tableType"]}_tbl')
col = getattr(tbl.c, item['columnName'])
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == 'like':
where.append(col.like(f'%{ftv[0]}%'))
elif comparator == 'not like':
where.append(col.notlike(f'%{ftv[0]}%'))
elif comparator == 'in':
where.append(col.in_(ftv))
elif comparator == '!=':
where.append(col != ftv[0])
if relation == 'and':
if event_filter:
event_filters.append(and_(*event_filter))
if user_filter:
user_filters.append(and_(*user_filter)),
else:
if event_filter:
event_filters.append(or_(*event_filter))
if user_filter:
user_filters.append(or_(*user_filter))
return event_filters, user_filters
def to_sql_qry(self):
qry = None
for event in self.events:
event_name = event['event_name']
event_name_col = getattr(self.event_tbl.c, '#event_name')
analysis = event['prop_quota']['analysis']
quota = event['prop_quota']['quota']
num = event['num'].split(',')
date_type = event.get('date_type','dynamic')
e_days = event.get('e_days')
s_days = event.get('s_days')
filts = event['filts']
zone = event.get('zone', 8)
# 账号数据过滤
data_where = []
filters = []
filters.extend(self.data_where)
for item in filters:
tmp = settings.CK_CALC_SYMBO[item['comparator']](sa.Column(item['columnName']), item['ftv'])
data_where.append(tmp)
event_time_col = func.addHours(getattr(self.event_tbl.c, '#event_time'), zone)
date_where = []
if date_type == 'static':
start_time = event['start_time']
end_time = event['end_time']
date_where.extend(
[settings.CK_CALC_SYMBO['>='](event_time_col, start_time),
settings.CK_CALC_SYMBO['<='](event_time_col, end_time)]
)
elif date_type == 'dynamic':
start_time = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00')
end_time = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59')
date_where.extend(
[settings.CK_CALC_SYMBO['>='](event_time_col, start_time),
settings.CK_CALC_SYMBO['<='](event_time_col, end_time)]
)
else:
# 所有时间
pass
uce_calcu_symbol = event['uce_calcu_symbol']
event_name_where = []
if event_name != '*':
# 任意事件
event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name))
selectd = [self.account_id_col,
func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label(
'values')
]
qry_tmp = sa.select(self.account_id_col).select_from(
sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by(
self.e_account_id_col).having(
settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num)))
if qry is None:
qry = qry_tmp
else:
if self.event_relation == 'and':
qry = sa.select(self.account_id_col).select_from(
sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id')))
elif self.event_relation == 'or':
qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp))
return qry
def to_sql(self):
qry = self.to_sql_qry()
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql
def cluster_user(self):
sub_qry = self.to_sql_qry()
qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry))
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
return sql

View File

@ -15,4 +15,5 @@ from .event_mana import *
from .xquery import *
from .api_list import *
from .role import *
from .check_data import *
from .check_data import *
from .userlabel import *

52
schemas/userlabel.py Normal file
View File

@ -0,0 +1,52 @@
from typing import Union
from typing import Optional
from pydantic import BaseModel
#
# class QP(BaseModel):
# qp: dict
class UserLabelSave(BaseModel):
project_id: str
cluster_name: str
display_name: str
qp: dict
cluster_type: str
remarks: str
class UserLabelDetail(BaseModel):
label_id: str
class UserLabelDel(BaseModel):
label_id: str
# class UserLabelJson2Sql(BaseModel):
# project_id: str
# cluster_name: str
# display_name: str
# qp: dict
# cluster_type: str
# remarks: str
class UserLabelJson2Sql(BaseModel):
label_id: str
class UserLabelRead(BaseModel):
project_id: str
cluster_name: Optional[str]
label_id: Optional[str]
# class UserLabel(BaseModel):
# project_id: str
# cluster_name: str
# display_name: str
# qp: dict
# cluster_type: str
# remarks: str