This commit is contained in:
wuaho 2021-07-19 14:20:09 +08:00
parent c3eff67546
commit d5487e6e64
6 changed files with 231 additions and 11 deletions

View File

@ -43,6 +43,23 @@ async def edit_data_auth(request: Request,
await crud.data_auth.edit_data_auth(db, data_id)
return schemas.Msg(code=0, msg='ok', data=data_id)
@router.get("/quotas_map")
async def quotas_map(
request: Request,
game: str,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
return schemas.Msg(code=0, msg='ok', data=settings.CK_OPERATOR)
@router.get("/filter_map")
async def filter_map(
request: Request,
game: str,
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
return schemas.Msg(code=0, msg='ok', data=settings.CK_FILTER)
@router.get('/all_event')
async def all_event(request: Request,
@ -116,6 +133,38 @@ async def my_event(request: Request,
return schemas.Msg(code=0, msg='ok', data=event_list)
@router.get("/user_property")
async def user_property(request: Request,
game: str,
db: AsyncIOMotorDatabase = Depends(get_database),
rdb: RedisDrive = Depends(get_redis_pool),
ck: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""获取用户属性"""
data = await rdb.get(f'{game}_user')
data = json.loads(data)
propertys = []
data_attr = await crud.data_attr.find_many(db, game=game, cat='user')
data_attr = {item['name']: item for item in data_attr}
for k, v in data.items():
data_type = settings.CK_TYPE_DICT.get(v)
propertys.append(
{'name': k,
'data_type': data_type,
'show_name': data_attr.get(k, {}).get('show_name', ''),
}
)
propertys = sorted(propertys, key=lambda x: x['show_name'])
return schemas.Msg(code=0, msg='ok', data=propertys)
@router.post('/load_prop_quotas')
async def load_prop_quotas(request: Request,
game: str,
@ -218,11 +267,13 @@ async def load_filter_props(request: Request,
event_prop_list = sorted(event_prop_set)
key = f'{game}_user'
user_prop_set = await rdb.get(key)
user_prop_list = sorted(event_prop_set)
user_prop_dict = await rdb.get(key)
user_prop_dict = json.loads(user_prop_dict)
user_prop_list = sorted(user_prop_dict.keys())
all_filed = await rdb.get(f'{game}_event')
all_filed = json.loads(all_filed)
data_attr = await crud.data_attr.find_many(db, game=game, cat='event')
data_attr = {item['name']: item for item in data_attr}
event_props = []
@ -241,7 +292,7 @@ async def load_filter_props(request: Request,
data_attr = {item['name']: item for item in data_attr}
user_props = []
for item in user_prop_list:
data_type = settings.CK_TYPE_DICT.get(all_filed.get(item))
data_type = settings.CK_TYPE_DICT.get(user_prop_dict.get(item))
title = data_attr.get(item, {}).get('show_name') or item
user_prop = {
'id': item,

View File

@ -14,6 +14,7 @@ from db.ckdb import get_ck_db, CKDrive
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
from models.user_analysis import UserAnalysis
router = APIRouter()
@ -466,3 +467,40 @@ async def trace_model_sql(
'links': links
}
return schemas.Msg(code=0, msg='ok', data=data)
@router.post("/user_property_sql")
async def user_property_sql(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性sql"""
await analysis.init()
data = analysis.property_model()
return schemas.Msg(code=0, msg='ok', data=[data])
@router.post("/user_property_model")
async def user_property_model(
request: Request,
game: str,
analysis: UserAnalysis = Depends(UserAnalysis),
ckdb: CKDrive = Depends(get_ck_db),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""用户属性分析"""
await analysis.init()
res = analysis.property_model()
sql = res['sql']
groupby = res['groupby']
df = await ckdb.query_dataframe(sql)
# 没有分组
data = {'groupby': groupby}
if not groupby:
data['总体'] = df['values'][0]
else:
data = df.groupby(groupby).sum().to_dict()
return schemas.Msg(code=0, msg='ok', data=data)

View File

@ -31,8 +31,8 @@ class Settings(BaseSettings):
'decode_responses': 'utf-8',
}
CK_CONFIG = {'host': '119.29.176.224',
'send_receive_timeout': 30}
CK_CONFIG = {'host': '139.159.159.3',
'port': 9654}
CK_TYPE_DICT = {"DateTime('UTC')": 'datetime',
"Nullable(DateTime('UTC'))": 'datetime',
@ -102,7 +102,11 @@ class Settings(BaseSettings):
],
'string': [{
'id': 'distinct_count',
'id': 'uniqCombined',
'title': '去重数'
}],
'datetime': [{
'id': 'uniqCombined',
'title': '去重数'
}],
'float': [{
@ -281,7 +285,7 @@ class Debug(Settings):
class Produce(Settings):
MDB_HOST: str = '119.29.176.224'
MDB_HOST: str = '127.0.0.1'
MDB_PORT: int = 27017
MDB_USER: str = 'root'
MDB_PASSWORD: str = 'iamciniao'

View File

@ -48,6 +48,7 @@ class BehaviorAnalysis:
return self.event_view.get('unitNum')
def _get_group_by(self):
return [getattr(self.event_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])]
def _get_zone_time(self):

127
models/user_analysis.py Normal file
View File

@ -0,0 +1,127 @@
from typing import Tuple
import sqlalchemy as sa
import json
from fastapi import Depends
import pandas as pd
from sqlalchemy import func, or_, and_, not_
import schemas
from core.config import settings
from db.redisdb import get_redis_pool, RedisDrive
class UserAnalysis:
def __init__(self, game: str, data_in: schemas.CkQuery, rdb: RedisDrive = Depends(get_redis_pool)):
self.game = game
self.rdb = rdb
self.user_tbl = None
self.event_view = data_in.eventView
self.events = data_in.events
self.zone_time: int = 0
self.global_filters = None
self.groupby = None
self.time_particle = None
self.date_range = None
self.unit_num = None
async def init(self):
await self._init_table()
self.zone_time = self._get_zone_time()
self.time_particle = self._get_time_particle_size()
self.groupby = self._get_group_by()
self.unit_num = self._get_unit_num()
async def _init_table(self):
"""
从redis中取出表字段构建表结构
:return:
"""
res_json = await self.rdb.get(f'{self.game}_user')
columns = json.loads(res_json).keys()
metadata = sa.MetaData(schema=self.game)
self.user_tbl = sa.Table('user_view', metadata, *[sa.Column(column) for column in columns])
def _get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'P1D'
def _get_unit_num(self):
return self.event_view.get('unitNum')
def _get_group_by(self):
return [getattr(self.user_tbl.c, item['columnName']) for item in self.event_view.get('groupBy', [])]
def _get_zone_time(self):
return int(self.event_view.get('zone_time', 8))
def _get_filters(self, filters):
tbl = self.user_tbl
where = []
for item in filters:
col = getattr(tbl.c, item['columnName'])
comparator = item['comparator']
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == 'is not null':
where.append(col.isnot(None))
elif comparator == 'is null':
where.append(col.is_(None))
elif comparator == '!=':
where.append(col != ftv[0])
return where
def property_model(self):
event = self.events[0]
selectd = getattr(self.user_tbl.c, event['quota'])
qry = sa.select(selectd)
account_id_col = getattr(self.user_tbl.c, '#account_id')
binduid_col = getattr(self.user_tbl.c, '#account_id')
# 聚合方式
analysis = event['analysis']
if analysis == 'trig_user_num':
selectd = [func.count().label('values')]
elif analysis == 'distinct_count':
selectd = [
func.count(sa.distinct(getattr(self.user_tbl.c, event['quota']))).label('values')]
else:
selectd = [
func.round(getattr(func, analysis)(getattr(self.user_tbl.c, event['quota'])), 2).label(
'values')]
where = self._get_filters(event['filts'])
qry = sa.select((*self.groupby, *selectd)).where(*where)
qry = qry.group_by(*self.groupby)
qry = qry.order_by(sa.Column('values').desc())
qry = qry.limit(1000)
sql = str(qry.compile(compile_kwargs={"literal_binds": True}))
print(sql)
result = {'sql': sql,
'groupby': [i.key for i in self.groupby],
}
return result

View File

@ -1,5 +1,4 @@
select groupArray((date,login_account)) from (with groupArray(distinct binduid) as login_account,
toDate(addHours(`#event_time`, 8)) as date
select date, login_account
select toDate(addHours(`#event_time`, 8))
from zhengba.event
group by date)
where role_idx = 1
group by `binduid`