数据检查
This commit is contained in:
parent
55277e5389
commit
eeaaca7dba
@ -13,6 +13,7 @@ from .endpoints import data_auth
|
||||
from .endpoints import event_mana
|
||||
from .endpoints import test
|
||||
from .authz import authz
|
||||
from .check_data import controller as check_data
|
||||
|
||||
api_router = APIRouter()
|
||||
api_router.include_router(test.router, tags=["test"], prefix='/test')
|
||||
@ -34,3 +35,4 @@ api_router.include_router(query.router, tags=["ck"], prefix='/ck')
|
||||
api_router.include_router(xquery.router, tags=["xck"], prefix='/ck')
|
||||
|
||||
api_router.include_router(authz.router, tags=["api接口管理"], prefix='/authz')
|
||||
api_router.include_router(check_data.router, tags=["打点验证"], prefix='/check_data')
|
||||
|
1
api/api_v1/check_data/__init__.py
Normal file
1
api/api_v1/check_data/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .controller import router
|
43
api/api_v1/check_data/controller.py
Normal file
43
api/api_v1/check_data/controller.py
Normal file
@ -0,0 +1,43 @@
|
||||
from fastapi import APIRouter, Request
|
||||
|
||||
import schemas
|
||||
from api.api_v1.check_data import service
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/check")
|
||||
async def check(request: Request,
|
||||
data_in: schemas.CheckData,
|
||||
game: str,
|
||||
) -> schemas.Msg:
|
||||
res = await service.check_data( game, data_in)
|
||||
return schemas.Msg(code=0, msg='ok', data=res)
|
||||
|
||||
|
||||
@router.post("/save")
|
||||
async def save(request: Request,
|
||||
data_in: schemas.AddTemplate,
|
||||
game: str,
|
||||
db_name: str = 'debug'
|
||||
) -> schemas.Msg:
|
||||
res = await service.save_template(data_in, game, db_name)
|
||||
return schemas.Msg(code=0, msg='ok', data=res)
|
||||
|
||||
|
||||
@router.get('/template')
|
||||
async def template(request: Request, game: str) -> schemas.Msg:
|
||||
data = await service.get_template(dict(request.query_params))
|
||||
return schemas.Msg(code=0, msg='ok', data=data)
|
||||
|
||||
|
||||
@router.post('/del_template')
|
||||
async def del_template(request: Request, game: str, data_in: schemas.DelTemplate) -> schemas.Msg:
|
||||
data = await service.del_template(data_in)
|
||||
return schemas.Msg(code=0, msg='ok', data=data)
|
||||
|
||||
|
||||
@router.get('/default_field')
|
||||
async def template(request: Request, game: str) -> schemas.Msg:
|
||||
data = service.get_default_field()
|
||||
return schemas.Msg(code=0, msg='ok', data=data)
|
110
api/api_v1/check_data/service.py
Normal file
110
api/api_v1/check_data/service.py
Normal file
@ -0,0 +1,110 @@
|
||||
# coding:utf-8
|
||||
|
||||
import copy
|
||||
import json
|
||||
import re
|
||||
from collections import namedtuple
|
||||
from ipaddress import IPv4Address
|
||||
|
||||
import numpy as np
|
||||
|
||||
import clickhouse_driver
|
||||
|
||||
import schemas
|
||||
from core.config import settings
|
||||
from db.ckdb import ckdb as ck_client
|
||||
import crud
|
||||
|
||||
Type = namedtuple('Type', ['string', 'integer', 'array', 'ipv4'])
|
||||
type_map = Type(string=str, integer=np.number, array=list, ipv4=IPv4Address)
|
||||
|
||||
|
||||
async def check_data(game, data_in: schemas.CheckData):
|
||||
db = data_in.db_name
|
||||
event_name = data_in.event_name
|
||||
is_unique = data_in.is_unique
|
||||
props = data_in.props
|
||||
where = data_in.where
|
||||
limit = 5
|
||||
check_type = copy.deepcopy(props)
|
||||
check_type.update(data_in.default_field)
|
||||
|
||||
select = ','.join([f'`{field}`' for field in check_type.keys()])
|
||||
sql = f"""select {select} from {db}.event where game='{game}' and `#event_name`='{event_name}'"""
|
||||
for k, v in where.items():
|
||||
sql += f""" and `{k}`='{v}'"""
|
||||
|
||||
sql += f""" order by `#event_time` desc"""
|
||||
sql += f""" limit {limit}"""
|
||||
|
||||
print(sql)
|
||||
# pass_list: [], fail_list: []
|
||||
# sql = 'show databases'
|
||||
report = {'fail_list': [],
|
||||
'pass_list': []}
|
||||
fail_list = report['fail_list']
|
||||
pass_list = report['pass_list']
|
||||
try:
|
||||
df = await ck_client.query_dataframe(sql)
|
||||
report['title'] = df.columns.tolist()
|
||||
report['data'] = df.values.astype(str).tolist()
|
||||
|
||||
except clickhouse_driver.errors.ServerException as e:
|
||||
if e.code == 47:
|
||||
msg = re.match(r"""DB::Exception: Missing columns: '(.*)' while processing query""", e.message)
|
||||
filed = '未知'
|
||||
if msg:
|
||||
filed = msg.group(1)
|
||||
fail_list.append(f'<p style="color:red;font-size:17px;">数据库不存在字段-> {filed}</p>')
|
||||
else:
|
||||
fail_list.append('<p style="color:red;font-size:17px;">数据库查询未知错误</p>')
|
||||
|
||||
return report
|
||||
|
||||
if df.empty:
|
||||
fail_list.append('<p style="color:blue;font-size:17px;">根据过滤条件未查到任何数据,也有可能是数据未及时入库。(3分钟后还没查到说明存在问题)</p>')
|
||||
return report
|
||||
if is_unique and len(df) > 1:
|
||||
fail_list.append('<p style="color:yellow;font-size:17px;">警告:记录数大于一条</p>')
|
||||
|
||||
for k, t in check_type.items():
|
||||
if isinstance(df[k][0], str) and t == 'json':
|
||||
try:
|
||||
json.loads(df[k][0])
|
||||
except:
|
||||
fail_list.append(
|
||||
f"""<p style="color:red;font-size:17px;">错误:字段{k} 期望{t}类型,不是json格式</p>""")
|
||||
continue
|
||||
if not isinstance(df[k][0], getattr(type_map, t)):
|
||||
fail_list.append(
|
||||
f"""<p style="color:red;font-size:17px;">错误:字段{k} 期望{t}类型,得到->{re.findall("'(.*)'>", str(type(df[k][0])))[0]}</p>""")
|
||||
else:
|
||||
pass_list.append(f'<p style="color:green;font-size:17px;">通过:字段{k} 是期望的类型</p>')
|
||||
|
||||
return report
|
||||
|
||||
|
||||
async def save_template(data_in: schemas.AddTemplate,
|
||||
game: str,
|
||||
):
|
||||
res = await crud.check_data.update_one({'title': data_in.title},
|
||||
{'$set': {'game': game,
|
||||
'check': data_in.check.dict()}},
|
||||
upsert=True)
|
||||
return True
|
||||
|
||||
|
||||
async def get_template(*args, **kwargs):
|
||||
res = []
|
||||
async for doc in crud.check_data.find(*args, {'_id': False}, **kwargs):
|
||||
res.append(doc)
|
||||
return res
|
||||
|
||||
|
||||
def get_default_field():
|
||||
return settings.DEFAULT_FIELD
|
||||
|
||||
|
||||
async def del_template(data_id: schemas.DelTemplate):
|
||||
await crud.check_data.delete_one(data_id.dict())
|
||||
return True
|
@ -289,6 +289,60 @@ class Settings(BaseSettings):
|
||||
'P1M': lambda col, zone: func.toStartOfMonth(func.addHours(col, zone)).label('date'),
|
||||
}
|
||||
|
||||
|
||||
DEFAULT_FIELD: dict = {
|
||||
'#ip': 'ipv4',
|
||||
'#country': 'string',
|
||||
'#province': 'string',
|
||||
'#city': 'string',
|
||||
'#os': 'string',
|
||||
'#device_id': 'string',
|
||||
'#screen_height': 'integer',
|
||||
'#screen_width': 'integer',
|
||||
'#device_model': 'string',
|
||||
'#app_version': 'string',
|
||||
'#bundle_id': 'string',
|
||||
'#app_name': 'string',
|
||||
'#game_version': 'string',
|
||||
'#os_version': 'string',
|
||||
'#network_type': 'string',
|
||||
'#carrier': 'string',
|
||||
'#manufacturer': 'string',
|
||||
'#app_id': 'string',
|
||||
'#account_id': 'string',
|
||||
'#distinct_id': 'string',
|
||||
'binduid': 'string',
|
||||
'channel': 'string',
|
||||
'owner_name': 'string',
|
||||
'role_name': 'string',
|
||||
'exp': 'integer',
|
||||
'zhanli': 'integer',
|
||||
'maxmapid': 'integer',
|
||||
'mapid': 'integer',
|
||||
'ghid': 'string',
|
||||
'rmbmoney': 'integer',
|
||||
'jinbi': 'integer',
|
||||
'svrindex': 'string',
|
||||
'lv': 'integer',
|
||||
'vip': 'integer',
|
||||
'game': 'string',
|
||||
|
||||
# 'unitPrice': 'integer',
|
||||
# 'money': 'string',
|
||||
# 'isdangrishouci': 'integer',
|
||||
# 'islishishouci': 'integer',
|
||||
# 'is_today_reg': 'integer',
|
||||
# 'orderid': 'string',
|
||||
# 'proid': 'string',
|
||||
#
|
||||
# 'step_id': 'integer',
|
||||
# 'step_group': 'integer',
|
||||
# 'guide_start_time': 'integer',
|
||||
#
|
||||
# 'online_ts': 'integer'
|
||||
}
|
||||
|
||||
|
||||
class Config:
|
||||
case_sensitive = True
|
||||
|
||||
|
@ -11,3 +11,4 @@ from .crud_api_log import api_log
|
||||
from .crud_event_mana import event_mana
|
||||
from .crud_api_list import api_list
|
||||
from .crud_role import role
|
||||
from .crud_check_data import check_data
|
15
crud/crud_check_data.py
Normal file
15
crud/crud_check_data.py
Normal file
@ -0,0 +1,15 @@
|
||||
from motor.motor_asyncio import AsyncIOMotorDatabase
|
||||
|
||||
import schemas
|
||||
from crud.base import CRUDBase
|
||||
|
||||
__all__ = 'check_data',
|
||||
|
||||
|
||||
|
||||
|
||||
class CRUDCheckData(CRUDBase):
|
||||
pass
|
||||
|
||||
|
||||
check_data = CRUDCheckData('check_data')
|
@ -4,7 +4,7 @@ from core.config import settings
|
||||
from .ckdb import CKDrive
|
||||
|
||||
|
||||
async def connect_to_ck(pool_size=100):
|
||||
async def connect_to_ck(pool_size=15):
|
||||
for i in range(pool_size):
|
||||
client = Client(**settings.CK_CONFIG)
|
||||
CKDrive.ClientPool.add(client)
|
||||
|
@ -14,4 +14,5 @@ from .api_log import *
|
||||
from .event_mana import *
|
||||
from .xquery import *
|
||||
from .api_list import *
|
||||
from .role import *
|
||||
from .role import *
|
||||
from .check_data import *
|
19
schemas/check_data.py
Normal file
19
schemas/check_data.py
Normal file
@ -0,0 +1,19 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class CheckData(BaseModel):
|
||||
db_name: str
|
||||
event_name: str
|
||||
is_unique: bool
|
||||
props: dict
|
||||
default_field: dict = dict()
|
||||
where: dict = dict()
|
||||
|
||||
|
||||
class AddTemplate(BaseModel):
|
||||
check: CheckData
|
||||
title: str
|
||||
|
||||
|
||||
class DelTemplate(BaseModel):
|
||||
title: str
|
Loading…
Reference in New Issue
Block a user