From 9adee8f27af4162738bdb732cd5bd5149df4e840 Mon Sep 17 00:00:00 2001 From: wuaho Date: Thu, 21 Oct 2021 19:13:13 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=A8=E6=88=B7=E6=A0=87=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api_v1/api.py | 2 + api/api_v1/endpoints/data_auth.py | 14 ++ api/api_v1/endpoints/query.py | 31 ++--- api/api_v1/user_label/__init__.py | 0 api/api_v1/user_label/controller.py | 81 +++++++++++ api/api_v1/user_label/service.py | 36 +++++ core/config.py | 37 +++-- crud/__init__.py | 3 +- crud/base.py | 4 +- crud/user_label.py | 34 +++++ main.py | 2 +- models/behavior_analysis.py | 109 +++++---------- models/user_label.py | 208 ++++++++++++++++++++++++++++ schemas/__init__.py | 3 +- schemas/userlabel.py | 52 +++++++ 15 files changed, 507 insertions(+), 109 deletions(-) create mode 100644 api/api_v1/user_label/__init__.py create mode 100644 api/api_v1/user_label/controller.py create mode 100644 api/api_v1/user_label/service.py create mode 100644 crud/user_label.py create mode 100644 models/user_label.py create mode 100644 schemas/userlabel.py diff --git a/api/api_v1/api.py b/api/api_v1/api.py index 7f20515..490546e 100644 --- a/api/api_v1/api.py +++ b/api/api_v1/api.py @@ -14,6 +14,7 @@ from .endpoints import event_mana from .endpoints import test from .authz import authz from .check_data import controller as check_data +from .user_label import controller as user_label api_router = APIRouter() api_router.include_router(test.router, tags=["test"], prefix='/test') @@ -36,3 +37,4 @@ api_router.include_router(xquery.router, tags=["xck"], prefix='/ck') api_router.include_router(authz.router, tags=["api接口管理"], prefix='/authz') api_router.include_router(check_data.router, tags=["打点验证"], prefix='/check_data') +api_router.include_router(user_label.router, tags=["用户标签"], prefix='/user_label') diff --git a/api/api_v1/endpoints/data_auth.py b/api/api_v1/endpoints/data_auth.py index b7509e4..5605445 100644 --- a/api/api_v1/endpoints/data_auth.py +++ b/api/api_v1/endpoints/data_auth.py @@ -276,6 +276,15 @@ async def load_filter_props(request: Request, } user_props.append(user_prop) + user_label_props = [] + user_label_docs = await crud.user_label.find_many(db, {'game': game}, {'qp': 0}) + for item in user_label_docs: + tmp = { + 'id': item['_id'], + 'data_type': 'user_label', + 'title': item['display_name'], + } + user_label_props.append(tmp) res = [ { 'title': '事件属性', @@ -286,6 +295,11 @@ async def load_filter_props(request: Request, 'title': '用户属性', 'id': 'user', 'category': user_props + }, + { + 'title': '用户标签', + 'id': 'user_label', + 'category': user_label_props } ] diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index 02828bb..34cb552 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -70,7 +70,7 @@ async def event_model_sql( """ 事件分析模型 sql""" await analysis.init(data_where=current_user.data_where) - data = analysis.event_model_sql() + data = await analysis.event_model_sql() return schemas.Msg(code=0, msg='ok', data=data) @@ -83,7 +83,7 @@ async def event_model_export(request: Request, ): """ 事件分析模型 数据导出""" await analysis.init(data_where=current_user.data_where) - sqls = analysis.event_model_sql() + sqls = await analysis.event_model_sql() file_name = quote(f'{sqls[0]["report_name"]}.xlsx') mime = mimetypes.guess_type(file_name)[0] excels = [] @@ -142,7 +142,7 @@ async def event_model( ) -> schemas.Msg: """ 事件分析""" await analysis.init(data_where=current_user.data_where) - sqls = analysis.event_model_sql() + sqls = await analysis.event_model_sql() res = [] for item in sqls: q = { @@ -271,7 +271,7 @@ async def retention_model_sql( ) -> schemas.Msg: """留存查询 sql""" await analysis.init(data_where=current_user.data_where) - data = analysis.retention_model_sql2() + data = await analysis.retention_model_sql2() return schemas.Msg(code=0, msg='ok', data=[data]) @@ -284,7 +284,7 @@ async def retention_model(request: Request, current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: await analysis.init(data_where=current_user.data_where) - res = analysis.retention_model_sql2() + res = await analysis.retention_model_sql2() sql = res['sql'] df = await ckdb.query_dataframe(sql) if len(df) == 0: @@ -347,7 +347,7 @@ async def retention_model_export(request: Request, ): """ 留存分析模型 数据导出""" await analysis.init(data_where=current_user.data_where) - data = analysis.retention_model_sql2() + data = await analysis.retention_model_sql2() file_name = quote(f'留存分析.xlsx') mime = mimetypes.guess_type(file_name)[0] @@ -370,7 +370,7 @@ async def retention_model_del( ) -> schemas.Msg: """留存数据模型""" await analysis.init(data_where=current_user.data_where) - res = analysis.retention_model_sql() + res = await analysis.retention_model_sql() sql = res['sql'] date_range = res['date_range'] event_a, event_b = res['event_name'] @@ -443,7 +443,7 @@ async def funnel_model_sql( ) -> schemas.Msg: """漏斗数据模型 sql""" await analysis.init(data_where=current_user.data_where) - data = analysis.funnel_model_sql() + data = await analysis.funnel_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @@ -458,7 +458,7 @@ async def funnel_model( ) -> schemas.Msg: """漏斗数据模型""" await analysis.init(data_where=current_user.data_where) - res = analysis.funnel_model_sql() + res = await analysis.funnel_model_sql() sql = res['sql'] date_range = res['date_range'] cond_level = res['cond_level'] @@ -590,7 +590,7 @@ async def scatter_model_sql( ) -> schemas.Msg: """分布分析 sql""" await analysis.init(data_where=current_user.data_where) - data = analysis.scatter_model_sql() + data = await analysis.scatter_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @@ -603,7 +603,7 @@ async def retention_model_export(request: Request, ): """ 分布分析 数据导出""" await analysis.init(data_where=current_user.data_where) - res = analysis.scatter_model_sql() + res = await analysis.scatter_model_sql() file_name = quote(f'分布分析.xlsx') mime = mimetypes.guess_type(file_name)[0] sql = res['sql'] @@ -723,7 +723,7 @@ async def scatter_model( ) -> schemas.Msg: """分布分析 模型""" await analysis.init(data_where=current_user.data_where) - res = analysis.scatter_model_sql() + res = await analysis.scatter_model_sql() sql = res['sql'] df = await ckdb.query_dataframe(sql) df.fillna(0, inplace=True) @@ -784,10 +784,9 @@ async def scatter_model( title = '.'.join(key[1:]) date = key[0] resp['list'][date.strftime('%Y-%m-%d')][title] = {'n': bins_s.to_list(), 'total': total, - 'p': round(bins_s * 100 / total, 2).to_list(), + 'p': round((bins_s * 100 / total).fillna(0), 2).to_list(), 'title': title } - return schemas.Msg(code=0, msg='ok', data=resp) # elif analysis == 'number_of_days': @@ -834,7 +833,7 @@ async def trace_model_sql( ) -> schemas.Msg: """路径分析 sql""" await analysis.init(data_where=current_user.data_where) - data = analysis.trace_model_sql() + data = await analysis.trace_model_sql() return schemas.Msg(code=0, msg='ok', data=[data]) @@ -848,7 +847,7 @@ async def trace_model_sql( ) -> schemas.Msg: """路径分析""" await analysis.init(data_where=current_user.data_where) - res = analysis.trace_model_sql() + res = await analysis.trace_model_sql() sql = res['sql'] df = await ckdb.query_dataframe(sql) chain_dict = defaultdict(dict) diff --git a/api/api_v1/user_label/__init__.py b/api/api_v1/user_label/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/api_v1/user_label/controller.py b/api/api_v1/user_label/controller.py new file mode 100644 index 0000000..80df6d9 --- /dev/null +++ b/api/api_v1/user_label/controller.py @@ -0,0 +1,81 @@ +from fastapi import APIRouter, Request, Depends +from motor.motor_asyncio import AsyncIOMotorDatabase + +import schemas +from api import deps +from api.api_v1.user_label import service +from db import get_database + +router = APIRouter() + + +@router.post("/save") +async def save(request: Request, + data_in: schemas.UserLabelSave, + game: str, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """用户标签保存""" + await service.save(db, data_in, request.user.username, game) + return schemas.Msg(code=0, msg='ok') + + +@router.get("/list") +async def get_list(request: Request, + project_id: str, + game: str, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """读取项目保存的用户标签""" + data = await service.get_list(db, project_id) + return schemas.Msg(code=0, msg='ok', data=data) + + +@router.post("/detail") +async def get_detail(request: Request, + game: str, + data_id: schemas.UserLabelDetail, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """读取用户标签详细""" + data = await service.get_detail(db, data_id.label_id) + return schemas.Msg(code=0, msg='ok', data=data) + + +@router.post("/del") +async def delete(request: Request, + game: str, + data_id: schemas.UserLabelDel, + db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """删除用户标签""" + data = await service.delete(db, data_id.label_id) + return schemas.Msg(code=0, msg='ok', data=data) + + +@router.post("/sql") +async def sql(request: Request, + data_in: schemas.UserLabelJson2Sql, + game: str, + # db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """自定义用户标签 sql测试""" + data = await service.json2sql(game, data_in.label_id) + return schemas.Msg(code=0, msg='ok', data=data) + + +@router.post("/cluster_user") +async def cluster_user(request: Request, + data_in: schemas.UserLabelJson2Sql, + game: str, + # db: AsyncIOMotorDatabase = Depends(get_database), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """获取该标签用户""" + data = await service.get_cluster_user(game, data_in.label_id) + return schemas.Msg(code=0, msg='ok', data=data) diff --git a/api/api_v1/user_label/service.py b/api/api_v1/user_label/service.py new file mode 100644 index 0000000..6a66296 --- /dev/null +++ b/api/api_v1/user_label/service.py @@ -0,0 +1,36 @@ +import crud +from models.user_label import UserClusterDef + + +async def save(db, data_in, act_user, game): + return await crud.user_label.save(db, data_in, act_user, game) + + +async def read(db, data_in): + return await crud.user_label.read(db, data_in) + + +async def get_list(db, project_id): + return await crud.user_label.get_list(db, project_id) + + +async def get_detail(db, label_id): + return await crud.user_label.get(db, label_id) + + +async def delete(db, label_id): + await crud.user_label.delete_id(db, label_id) + return True + + +async def json2sql(game, date_in): + user_cluster_def = UserClusterDef(game, date_in) + await user_cluster_def.init() + return user_cluster_def.to_sql() + + +async def get_cluster_user(game, date_in): + user_cluster_def = UserClusterDef(game, date_in) + await user_cluster_def.init() + sql = user_cluster_def.cluster_user() + return sql diff --git a/core/config.py b/core/config.py index 58a1530..e181788 100644 --- a/core/config.py +++ b/core/config.py @@ -2,7 +2,7 @@ import sys from typing import Any, Dict, List, Optional, Union from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, validator -from sqlalchemy import func +from sqlalchemy import func, and_ class Settings(BaseSettings): @@ -39,17 +39,20 @@ class Settings(BaseSettings): } CK_CALC_SYMBO = { - '==': lambda col, val: col == val, - '>=': lambda col, val: col >= val, - '<=': lambda col, val: col <= val, - '>': lambda col, val: col > val, - '<': lambda col, val: col < val, - 'is not null': lambda col: col.isnot(None), - 'is null': lambda col: col.is_(None), - 'like': lambda col, val: col.like(f'%{val}%'), - 'not like': lambda col, val: col.notlike(f'%{val}%'), - 'in': lambda col, val: col.in_(val), - '!=': lambda col, val: col != val, + '==': lambda col, *val: col == val[0], + '>=': lambda col, *val: col >= val[0], + '<=': lambda col, *val: col <= val[0], + '>': lambda col, *val: col > val[0], + '<': lambda col, *val: col < val[0], + 'is not null': lambda col, *val: col.isnot(None), + 'is null': lambda col, *val: col.is_(None), + 'like': lambda col, *val: col.like(f'%{val[0]}%'), + 'not like': lambda col, *val: col.notlike(f'%{val[0]}%'), + 'in': lambda col, *val: col.in_(val[0]), + 'not in': lambda col, *val: col.notin_(val[0]), + '!=': lambda col, *val: col != val[0], + 'range': lambda col, *val: and_(col >= val[0], col <= val[1]) + } CK_TYPE_DICT = {"DateTime('UTC')": 'datetime', @@ -275,6 +278,16 @@ class Settings(BaseSettings): 'title': '无值' }, ], + 'user_label': [ + { + 'id': 'in', + 'title': '是' + }, + { + 'id': 'not in', + 'title': '不是' + }, + ] } ARITHMETIC = { '+': lambda x, y: x + y, diff --git a/crud/__init__.py b/crud/__init__.py index ed3a5cb..a66d533 100644 --- a/crud/__init__.py +++ b/crud/__init__.py @@ -11,4 +11,5 @@ from .crud_api_log import api_log from .crud_event_mana import event_mana from .crud_api_list import api_list from .crud_role import role -from .crud_check_data import check_data \ No newline at end of file +from .crud_check_data import check_data +from .user_label import user_label \ No newline at end of file diff --git a/crud/base.py b/crud/base.py index ad1060a..a9d1af4 100644 --- a/crud/base.py +++ b/crud/base.py @@ -8,8 +8,8 @@ class CRUDBase: def __init__(self, coll_name): self.coll_name = coll_name - async def get(self, db, id: Union[ObjectId, str]) -> dict: - return (await db[self.coll_name].find_one({'_id': id})) or dict() + async def get(self, db, id: Union[ObjectId, str], *args, **kwargs) -> dict: + return (await db[self.coll_name].find_one({'_id': id}, *args, **kwargs)) or dict() async def insert_one(self, db, document): return await db[self.coll_name].insert_one(document) diff --git a/crud/user_label.py b/crud/user_label.py new file mode 100644 index 0000000..8330836 --- /dev/null +++ b/crud/user_label.py @@ -0,0 +1,34 @@ +from motor.motor_asyncio import AsyncIOMotorDatabase + +import schemas +from crud.base import CRUDBase + +__all__ = 'user_label', + +from utils import get_uid + + +class CRUDUserLabel(CRUDBase): + async def save(self, db: AsyncIOMotorDatabase, data_in: schemas.UserLabelSave, act_name, game): + where = {'project_id': data_in.project_id, 'cluster_name': data_in.cluster_name, + 'game': game} + is_exists = await self.find_one(db, where) + data = data_in.dict(skip_defaults=True) + data['act_name'] = act_name + if not is_exists: + data = {'$set': {**data, '_id': get_uid()}} + return await self.update_one(db, where, data, upsert=True) + return await self.update_one(db, where, {'$set': data}, upsert=True) + + async def read(self, db: AsyncIOMotorDatabase, data_in: schemas.UserLabelRead): + where = data_in.dict(skip_defaults=True) + res = await self.find_many(db, where) + return res + + async def get_list(self, db: AsyncIOMotorDatabase, project_id: str): + where = {'project_id': project_id} + res = await self.find_many(db, where, {'qp': 0}) + return res + + +user_label = CRUDUserLabel('user_label') diff --git a/main.py b/main.py index 5b790ab..fd43eea 100644 --- a/main.py +++ b/main.py @@ -115,4 +115,4 @@ async def add_process_time_header(request: Request, call_next): if __name__ == '__main__': - uvicorn.run(app='main:app', host="0.0.0.0", port=7889, reload=True, debug=True) + uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index fecd30e..214608d 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -17,6 +17,7 @@ from core.config import settings from db import get_database from db.redisdb import get_redis_pool, RedisDrive +from models.user_label import UserClusterDef class CombinationEvent: @@ -127,6 +128,7 @@ class BehaviorAnalysis: self.combination_event = [] self.ext_filters = (self.data_in.ext_filter.get('filts', []), self.data_in.ext_filter.get('relation', 'and')) self.global_relation = 'and' + self.data_where = [] async def init(self, *args, **kwargs): @@ -166,7 +168,8 @@ class BehaviorAnalysis: # 用户自带过滤 if 'data_where' in kwargs: - self.global_filters.extend(kwargs['data_where'].get(self.game, [])) + self.data_where = kwargs['data_where'].get(self.game, []) + self.global_filters.extend(self.data_where) # self.global_filters.extend(self.data_in.ext_filter.get('filts', [])) def _get_time_particle_size(self): @@ -217,66 +220,8 @@ class BehaviorAnalysis: # self.event_tbl = sa.Table('event_view', metadata, *[sa.Column(column) for column in columns]) self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) - # - # def handler_filts(self, *ext_filters, g_f=True, relation='and'): - # user_filter = [] - # event_filter = [] - # # filters = (*self.global_filters, *ext_filters) if g_f else (*ext_filters,) - # filters = [] - # filters.extend(ext_filters) - # if g_f: - # filters.extend(self.global_filters) - # - # # filters = [] if filters == ([],) else filters - # for item in filters: - # if item['tableType'] == 'user': - # where = user_filter - # elif item['tableType'] == 'event': - # where = event_filter - # else: - # continue - # - # tbl = getattr(self, f'{item["tableType"]}_tbl') - # col = getattr(tbl.c, item['columnName']) - # - # comparator = item['comparator'] - # ftv = item['ftv'] - # if comparator == '==': - # if len(ftv) > 1: - # where.append(or_(*[col == v for v in ftv])) - # else: - # where.append(col == ftv[0]) - # elif comparator == '>=': - # where.append(col >= ftv[0]) - # elif comparator == '<=': - # where.append(col <= ftv[0]) - # elif comparator == '>': - # where.append(col > ftv[0]) - # elif comparator == '<': - # where.append(col < ftv[0]) - # - # elif comparator == 'is not null': - # where.append(col.isnot(None)) - # elif comparator == 'is null': - # where.append(col.is_(None)) - # - # elif comparator == 'like': - # where.append(col.like(f'%{ftv[0]}%')) - # - # elif comparator == 'not like': - # where.append(col.notlike(f'%{ftv[0]}%')) - # - # elif comparator == 'in': - # where.append(col.in_(ftv)) - # - # elif comparator == '!=': - # where.append(col != ftv[0]) - # if relation == 'and': - # return event_filter, user_filter - # else: - # return or_(*event_filter), or_(*user_filter) - def handler_filts(self, *filters): + async def handler_filts(self, *filters): """ :param filters: (filts:list,relation:str) @@ -293,17 +238,29 @@ class BehaviorAnalysis: user_filter = [] event_filter = [] for item in filts: + comparator = item['comparator'] if item['tableType'] == 'user': where = user_filter elif item['tableType'] == 'event': where = event_filter + elif item['tableType'] == 'user_label': + user_cluster_def=UserClusterDef(self.game,item['columnName'],self.data_where) + await user_cluster_def.init() + sub_qry = user_cluster_def.to_sql_qry() + if comparator == 'in': + event_filter.append(sa.Column('#account_id').in_(sub_qry)) + else: + event_filter.append(sa.Column('#account_id').notin_(sub_qry)) + + + continue else: - where = event_filter + continue tbl = getattr(self, f'{item["tableType"]}_tbl') col = getattr(tbl.c, item['columnName']) - comparator = item['comparator'] + ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: @@ -348,7 +305,7 @@ class BehaviorAnalysis: return event_filters, user_filters - def retention_model_sql(self): + async def retention_model_sql(self): event_name_a = self.events[0]['eventName'] event_name_b = self.events[1]['eventName'] visit_name = self.events[0].get('event_attr_id') @@ -361,7 +318,7 @@ class BehaviorAnalysis: if visit_name: who_visit = getattr(self.event_tbl.c, visit_name) - filters, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')), + filters, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation')), self.ext_filters) filters = filters or [1] selectd = [func.toStartOfDay(func.addHours(event_time_col, self.zone_time)).label('date'), @@ -389,7 +346,7 @@ class BehaviorAnalysis: func.addHours(event_time_col, self.zone_time) <= self.end_date, ] - event_filter, user_filter = self.handler_filts( + event_filter, user_filter = await self.handler_filts( (self.global_filters, self.global_relation), self.ext_filters ) @@ -416,7 +373,7 @@ class BehaviorAnalysis: 'end_date': self.end_date[:10], } - def event_model_sql(self): + async def event_model_sql(self): sqls = [] event_time_col = getattr(self.event_tbl.c, '#event_time') @@ -445,7 +402,7 @@ class BehaviorAnalysis: custom = CustomEvent(self.event_tbl, formula, format).parse() event_name = custom['event_name'] where = [event_name_col.in_(event_name)] - event_filter, _ = self.handler_filts((event['filts'], event.get('relation')), + event_filter, _ = await self.handler_filts((event['filts'], event.get('relation')), (self.global_filters, self.global_relation), self.ext_filters ) @@ -475,7 +432,7 @@ class BehaviorAnalysis: base_where.append(event_name_col == event_name) analysis = event['analysis'] - event_filter, user_filter = self.handler_filts( + event_filter, user_filter = await self.handler_filts( (event['filts'], event.get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters @@ -530,7 +487,7 @@ class BehaviorAnalysis: return sqls - def funnel_model_sql(self): + async def funnel_model_sql(self): """ SELECT level, count(*) AS values FROM (SELECT windowFunnel(86400)(shjy.event."#event_time", shjy.event."#event_name" = 'create_role', @@ -554,7 +511,7 @@ ORDER BY level conds = [] cond_level = [] for item in self.events: - event_filter, _ = self.handler_filts((item['filts'], item.get('relation', 'and')) + event_filter, _ = await self.handler_filts((item['filts'], item.get('relation', 'and')) , self.ext_filters) conds.append( and_(event_name_col == item['eventName'], *event_filter) @@ -565,7 +522,7 @@ ORDER BY level func.windowFunnel_windows_gap__(event_time_col, *conds).label('level')).select_from( self.event_tbl) - g_event_filter, _ = self.handler_filts((self.global_filters, self.global_relation) + g_event_filter, _ = await self.handler_filts((self.global_filters, self.global_relation) , self.ext_filters) where = [ func.addHours(event_time_col, self.zone_time) >= self.start_date, @@ -593,7 +550,7 @@ ORDER BY level 'end_date': self.end_date[:10], } - def scatter_model_sql(self): + async def scatter_model_sql(self): event = self.events[0] event_name = event['eventName'] analysis = event['analysis'] @@ -614,7 +571,7 @@ ORDER BY level ] if event_name != '*': where.append(event_name_col == event_name) - event_filter, user_filter = self.handler_filts((event['filts'], event.get('relation', 'and')), + event_filter, user_filter = await self.handler_filts((event['filts'], event.get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) if user_filter: @@ -783,7 +740,7 @@ ORDER BY values desc""" 'end_date': self.end_date[:10], } - def retention_model_sql2(self): + async def retention_model_sql2(self): filter_item_type = self.event_view.get('filter_item_type') filter_item = self.event_view.get('filter_item') event_name_a = self.events[0]['eventName'] @@ -791,7 +748,7 @@ ORDER BY values desc""" visit_name = self.events[0].get('event_attr_id') - where, _ = self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')), + where, _ = await self.handler_filts((self.events[0]['filts'], self.events[0].get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) where_a = '1' @@ -800,7 +757,7 @@ ORDER BY values desc""" sql = str(qry.compile(compile_kwargs={"literal_binds": True})) where_a = sql.split('WHERE ')[1] - where, _ = self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')), + where, _ = await self.handler_filts((self.events[1]['filts'], self.events[1].get('relation', 'and')), (self.global_filters, self.global_relation) , self.ext_filters) where_b = '1' diff --git a/models/user_label.py b/models/user_label.py new file mode 100644 index 0000000..295d860 --- /dev/null +++ b/models/user_label.py @@ -0,0 +1,208 @@ +""" +本质查出符合条件的用户id +得到sql 查uid +""" + +import re +from typing import Tuple + +import arrow +import sqlalchemy as sa +import json + +from fastapi import Depends + +import pandas as pd + +from sqlalchemy import func, or_, and_, not_ + +import crud +import schemas +from core.config import settings +from db import get_database + +from db.redisdb import get_redis_pool, RedisDrive + + +class UserClusterDef: + def __init__(self, game: str, label_id: str, data_where: list = None, rdb: RedisDrive = get_redis_pool()): + self.game = game + self.rdb = rdb + self.label_id = label_id + self.event_tbl = None + self.data_where = data_where or [] + + async def _init_tal(self): + res_json = await self.rdb.get(f'{self.game}_event') + columns = json.loads(res_json).keys() + metadata = sa.MetaData(schema=self.game) + self.event_tbl = sa.Table('event', metadata, *[sa.Column(column) for column in columns]) + + res_json = await self.rdb.get(f'{self.game}_user') + columns = json.loads(res_json).keys() + metadata = sa.MetaData(schema=self.game) + self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) + + self.u_account_id_col = getattr(self.user_tbl.c, '#account_id') + self.e_account_id_col = getattr(self.event_tbl.c, '#account_id') + self.account_id_col = sa.Column('#account_id') + + async def init(self): + + self.data_in = (await crud.user_label.get(get_database(), self.label_id, {'qp': 1})).get('qp') + await self._init_tal() + self.events = self.data_in['user_cluster_def']['events'] + self.event_relation = self.data_in['user_cluster_def']['event_relation'] + + async def handler_filts(self, *filters): + """ + + :param filters: (filts:list,relation:str) + :param g_f: + :param relation: + :return: + """ + + user_filters = [] + event_filters = [] + for filter in filters: + filts = filter[0] + relation = filter[1] + user_filter = [] + event_filter = [] + for item in filts: + comparator = item['comparator'] + if item['tableType'] == 'user': + where = user_filter + elif item['tableType'] == 'event': + where = event_filter + else: + continue + + tbl = getattr(self, f'{item["tableType"]}_tbl') + col = getattr(tbl.c, item['columnName']) + + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) + + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + + elif comparator == 'like': + where.append(col.like(f'%{ftv[0]}%')) + + elif comparator == 'not like': + where.append(col.notlike(f'%{ftv[0]}%')) + + elif comparator == 'in': + where.append(col.in_(ftv)) + + elif comparator == '!=': + where.append(col != ftv[0]) + if relation == 'and': + if event_filter: + event_filters.append(and_(*event_filter)) + if user_filter: + user_filters.append(and_(*user_filter)), + else: + if event_filter: + event_filters.append(or_(*event_filter)) + if user_filter: + user_filters.append(or_(*user_filter)) + + return event_filters, user_filters + + def to_sql_qry(self): + qry = None + for event in self.events: + event_name = event['event_name'] + event_name_col = getattr(self.event_tbl.c, '#event_name') + analysis = event['prop_quota']['analysis'] + quota = event['prop_quota']['quota'] + num = event['num'].split(',') + date_type = event.get('date_type','dynamic') + e_days = event.get('e_days') + s_days = event.get('s_days') + + filts = event['filts'] + zone = event.get('zone', 8) + + # 账号数据过滤 + data_where = [] + filters = [] + filters.extend(self.data_where) + for item in filters: + tmp = settings.CK_CALC_SYMBO[item['comparator']](sa.Column(item['columnName']), item['ftv']) + data_where.append(tmp) + + event_time_col = func.addHours(getattr(self.event_tbl.c, '#event_time'), zone) + date_where = [] + if date_type == 'static': + start_time = event['start_time'] + end_time = event['end_time'] + date_where.extend( + [settings.CK_CALC_SYMBO['>='](event_time_col, start_time), + settings.CK_CALC_SYMBO['<='](event_time_col, end_time)] + ) + elif date_type == 'dynamic': + start_time = arrow.get().shift(days=-int(s_days)).strftime('%Y-%m-%d 00:00:00') + end_time = arrow.get().shift(days=-int(e_days)).strftime('%Y-%m-%d 23:59:59') + date_where.extend( + [settings.CK_CALC_SYMBO['>='](event_time_col, start_time), + settings.CK_CALC_SYMBO['<='](event_time_col, end_time)] + ) + else: + # 所有时间 + pass + + uce_calcu_symbol = event['uce_calcu_symbol'] + + event_name_where = [] + if event_name != '*': + # 任意事件 + event_name_where.append(settings.CK_CALC_SYMBO['=='](event_name_col, event_name)) + selectd = [self.account_id_col, + func.round(getattr(func, analysis)(getattr(self.event_tbl.c, quota)), 2).label( + 'values') + ] + qry_tmp = sa.select(self.account_id_col).select_from( + sa.select(selectd).where(*date_where, *event_name_where, *data_where).group_by( + self.e_account_id_col).having( + settings.CK_CALC_SYMBO[uce_calcu_symbol](sa.Column('values'), *num))) + if qry is None: + qry = qry_tmp + else: + if self.event_relation == 'and': + qry = sa.select(self.account_id_col).select_from( + sa.join(qry, qry_tmp, getattr(qry.c, '#account_id') == getattr(qry_tmp.c, '#account_id'))) + elif self.event_relation == 'or': + qry = sa.select(sa.distinct(self.account_id_col)).select_from(sa.union_all(qry, qry_tmp)) + + return qry + + def to_sql(self): + qry = self.to_sql_qry() + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + print(sql) + return sql + + def cluster_user(self): + sub_qry = self.to_sql_qry() + qry = sa.select('*').where(self.u_account_id_col.in_(sub_qry)) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + print(sql) + return sql diff --git a/schemas/__init__.py b/schemas/__init__.py index 2b4b7f7..d47a4fe 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -15,4 +15,5 @@ from .event_mana import * from .xquery import * from .api_list import * from .role import * -from .check_data import * \ No newline at end of file +from .check_data import * +from .userlabel import * \ No newline at end of file diff --git a/schemas/userlabel.py b/schemas/userlabel.py new file mode 100644 index 0000000..4b2edbe --- /dev/null +++ b/schemas/userlabel.py @@ -0,0 +1,52 @@ +from typing import Union +from typing import Optional + +from pydantic import BaseModel + + +# +# class QP(BaseModel): +# qp: dict + + +class UserLabelSave(BaseModel): + project_id: str + cluster_name: str + display_name: str + qp: dict + cluster_type: str + remarks: str + + +class UserLabelDetail(BaseModel): + label_id: str + + +class UserLabelDel(BaseModel): + label_id: str + + +# class UserLabelJson2Sql(BaseModel): +# project_id: str +# cluster_name: str +# display_name: str +# qp: dict +# cluster_type: str +# remarks: str + +class UserLabelJson2Sql(BaseModel): + label_id: str + + +class UserLabelRead(BaseModel): + project_id: str + cluster_name: Optional[str] + label_id: Optional[str] + +# class UserLabel(BaseModel): +# project_id: str +# cluster_name: str +# display_name: str +# qp: dict +# cluster_type: str +# remarks: str