diff --git a/api/api_v1/endpoints/data_auth.py b/api/api_v1/endpoints/data_auth.py index 28c8768..ec0777b 100644 --- a/api/api_v1/endpoints/data_auth.py +++ b/api/api_v1/endpoints/data_auth.py @@ -43,6 +43,23 @@ async def edit_data_auth(request: Request, await crud.data_auth.edit_data_auth(db, data_id) return schemas.Msg(code=0, msg='ok', data=data_id) +@router.get("/quotas_map") +async def quotas_map( + request: Request, + game: str, + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + return schemas.Msg(code=0, msg='ok', data=settings.CK_OPERATOR) + + +@router.get("/filter_map") +async def filter_map( + request: Request, + game: str, + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + return schemas.Msg(code=0, msg='ok', data=settings.CK_FILTER) + @router.get('/all_event') async def all_event(request: Request, @@ -116,6 +133,38 @@ async def my_event(request: Request, return schemas.Msg(code=0, msg='ok', data=event_list) +@router.get("/user_property") +async def user_property(request: Request, + game: str, + db: AsyncIOMotorDatabase = Depends(get_database), + rdb: RedisDrive = Depends(get_redis_pool), + ck: CKDrive = Depends(get_ck_db), + current_user: schemas.UserDB = Depends(deps.get_current_user) + ) -> schemas.Msg: + """获取用户属性""" + data = await rdb.get(f'{game}_user') + data = json.loads(data) + propertys = [] + + data_attr = await crud.data_attr.find_many(db, game=game, cat='user') + data_attr = {item['name']: item for item in data_attr} + + for k, v in data.items(): + data_type = settings.CK_TYPE_DICT.get(v) + propertys.append( + {'name': k, + 'data_type': data_type, + 'show_name': data_attr.get(k, {}).get('show_name', ''), + } + ) + propertys = sorted(propertys, key=lambda x: x['show_name']) + + return schemas.Msg(code=0, msg='ok', data=propertys) + + + + + @router.post('/load_prop_quotas') async def load_prop_quotas(request: Request, game: str, @@ -218,11 +267,13 @@ async def load_filter_props(request: Request, event_prop_list = sorted(event_prop_set) key = f'{game}_user' - user_prop_set = await rdb.get(key) - user_prop_list = sorted(event_prop_set) + user_prop_dict = await rdb.get(key) + user_prop_dict = json.loads(user_prop_dict) + user_prop_list = sorted(user_prop_dict.keys()) all_filed = await rdb.get(f'{game}_event') all_filed = json.loads(all_filed) + data_attr = await crud.data_attr.find_many(db, game=game, cat='event') data_attr = {item['name']: item for item in data_attr} event_props = [] @@ -241,7 +292,7 @@ async def load_filter_props(request: Request, data_attr = {item['name']: item for item in data_attr} user_props = [] for item in user_prop_list: - data_type = settings.CK_TYPE_DICT.get(all_filed.get(item)) + data_type = settings.CK_TYPE_DICT.get(user_prop_dict.get(item)) title = data_attr.get(item, {}).get('show_name') or item user_prop = { 'id': item, diff --git a/api/api_v1/endpoints/query.py b/api/api_v1/endpoints/query.py index fcdaf52..97bd6d7 100644 --- a/api/api_v1/endpoints/query.py +++ b/api/api_v1/endpoints/query.py @@ -14,6 +14,7 @@ from db.ckdb import get_ck_db, CKDrive from db.redisdb import get_redis_pool, RedisDrive from models.behavior_analysis import BehaviorAnalysis +from models.user_analysis import UserAnalysis router = APIRouter() @@ -466,3 +467,40 @@ async def trace_model_sql( 'links': links } return schemas.Msg(code=0, msg='ok', data=data) + + +@router.post("/user_property_sql") +async def user_property_sql( + request: Request, + game: str, + analysis: UserAnalysis = Depends(UserAnalysis), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """用户属性sql""" + await analysis.init() + data = analysis.property_model() + return schemas.Msg(code=0, msg='ok', data=[data]) + + +@router.post("/user_property_model") +async def user_property_model( + request: Request, + game: str, + analysis: UserAnalysis = Depends(UserAnalysis), + ckdb: CKDrive = Depends(get_ck_db), + current_user: schemas.UserDB = Depends(deps.get_current_user) +) -> schemas.Msg: + """用户属性分析""" + await analysis.init() + res = analysis.property_model() + sql = res['sql'] + groupby = res['groupby'] + df = await ckdb.query_dataframe(sql) + # 没有分组 + data = {'groupby': groupby} + if not groupby: + data['总体'] = df['values'][0] + else: + data = df.groupby(groupby).sum().to_dict() + + return schemas.Msg(code=0, msg='ok', data=data) diff --git a/core/config.py b/core/config.py index c497a3a..e7761ec 100644 --- a/core/config.py +++ b/core/config.py @@ -31,8 +31,8 @@ class Settings(BaseSettings): 'decode_responses': 'utf-8', } - CK_CONFIG = {'host': '119.29.176.224', - 'send_receive_timeout': 30} + CK_CONFIG = {'host': '139.159.159.3', + 'port': 9654} CK_TYPE_DICT = {"DateTime('UTC')": 'datetime', "Nullable(DateTime('UTC'))": 'datetime', @@ -102,7 +102,11 @@ class Settings(BaseSettings): ], 'string': [{ - 'id': 'distinct_count', + 'id': 'uniqCombined', + 'title': '去重数' + }], + 'datetime': [{ + 'id': 'uniqCombined', 'title': '去重数' }], 'float': [{ @@ -281,7 +285,7 @@ class Debug(Settings): class Produce(Settings): - MDB_HOST: str = '119.29.176.224' + MDB_HOST: str = '127.0.0.1' MDB_PORT: int = 27017 MDB_USER: str = 'root' MDB_PASSWORD: str = 'iamciniao' diff --git a/models/behavior_analysis.py b/models/behavior_analysis.py index dc3a204..3ee3cc3 100644 --- a/models/behavior_analysis.py +++ b/models/behavior_analysis.py @@ -48,6 +48,7 @@ class BehaviorAnalysis: return self.event_view.get('unitNum') def _get_group_by(self): + return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])] def _get_zone_time(self): diff --git a/models/user_analysis.py b/models/user_analysis.py new file mode 100644 index 0000000..5326e7a --- /dev/null +++ b/models/user_analysis.py @@ -0,0 +1,127 @@ +from typing import Tuple + +import sqlalchemy as sa +import json + +from fastapi import Depends + +import pandas as pd + +from sqlalchemy import func, or_, and_, not_ + +import schemas +from core.config import settings +from db.redisdb import get_redis_pool, RedisDrive + + +class UserAnalysis: + def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)): + self.game = game + self.rdb = rdb + self.user_tbl = None + self.event_view = data_in.eventView + self.events = data_in.events + + self.zone_time: int = 0 + + self.global_filters = None + self.groupby = None + self.time_particle = None + self.date_range = None + self.unit_num = None + + async def init(self): + await self._init_table() + self.zone_time = self._get_zone_time() + self.time_particle = self._get_time_particle_size() + self.groupby = self._get_group_by() + self.unit_num = self._get_unit_num() + + async def _init_table(self): + """ + 从redis中取出表字段,构建表结构 + :return: + """ + res_json = await self.rdb.get(f'{self.game}_user') + columns = json.loads(res_json).keys() + metadata = sa.MetaData(schema=self.game) + self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns]) + + def _get_time_particle_size(self): + return self.event_view.get('timeParticleSize') or 'P1D' + + def _get_unit_num(self): + return self.event_view.get('unitNum') + + def _get_group_by(self): + return [getattr(self.user_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])] + + def _get_zone_time(self): + return int(self.event_view.get('zone_time', 8)) + + def _get_filters(self, filters): + tbl = self.user_tbl + where = [] + for item in filters: + col = getattr(tbl.c, item['columnName']) + + comparator = item['comparator'] + ftv = item['ftv'] + if comparator == '==': + if len(ftv) > 1: + where.append(or_(*[col == v for v in ftv])) + else: + where.append(col == ftv[0]) + elif comparator == '>=': + where.append(col >= ftv[0]) + elif comparator == '<=': + where.append(col <= ftv[0]) + elif comparator == '>': + where.append(col > ftv[0]) + elif comparator == '<': + where.append(col < ftv[0]) + + elif comparator == 'is not null': + where.append(col.isnot(None)) + elif comparator == 'is null': + where.append(col.is_(None)) + + elif comparator == '!=': + where.append(col != ftv[0]) + + return where + + def property_model(self): + event = self.events[0] + selectd = getattr(self.user_tbl.c, event['quota']) + qry = sa.select(selectd) + + account_id_col = getattr(self.user_tbl.c, '#account_id') + binduid_col = getattr(self.user_tbl.c, '#account_id') + # 聚合方式 + analysis = event['analysis'] + + if analysis == 'trig_user_num': + selectd = [func.count().label('values')] + elif analysis == 'distinct_count': + selectd = [ + func.count(sa.distinct(getattr(self.user_tbl.c, event['quota']))).label('values')] + + else: + selectd = [ + func.round(getattr(func, analysis)(getattr(self.user_tbl.c, event['quota'])), 2).label( + 'values')] + + where = self._get_filters(event['filts']) + qry = sa.select((*self.groupby, *selectd)).where(*where) + + qry = qry.group_by(*self.groupby) + qry = qry.order_by(sa.Column('values').desc()) + qry = qry.limit(1000) + sql = str(qry.compile(compile_kwargs={"literal_binds": True})) + print(sql) + result = {'sql': sql, + 'groupby': [i.key for i in self.groupby], + } + + return result diff --git a/sql/留存4.sql b/sql/留存4.sql index 24ef821..1d31e15 100644 --- a/sql/留存4.sql +++ b/sql/留存4.sql @@ -1,5 +1,4 @@ -select groupArray((date,login_account)) from (with groupArray(distinct binduid) as login_account, - toDate(addHours(`#event_time`, 8)) as date - select date, login_account +select toDate(addHours(`#event_time`, 8)) from zhengba.event -group by date) \ No newline at end of file +where role_idx = 1 +group by `binduid`