90 lines
3.1 KiB
Python
90 lines
3.1 KiB
Python
# coding:utf-8
|
||
|
||
import copy
|
||
import re
|
||
from collections import namedtuple
|
||
from ipaddress import IPv4Address
|
||
|
||
import numpy as np
|
||
|
||
import clickhouse_driver
|
||
|
||
import schemas
|
||
from core import settings
|
||
from db import ck_client
|
||
from db.mongo import check_template_coll
|
||
|
||
Type = namedtuple('Type', ['string', 'integer', 'array', 'ipv4'])
|
||
type_map = Type(string=str, integer=np.number, array=list, ipv4=IPv4Address)
|
||
|
||
|
||
async def check_data(db, game, data_in: schemas.CheckData):
|
||
event_name = data_in.event_name
|
||
is_unique = data_in.is_unique
|
||
props = data_in.props
|
||
where = data_in.where
|
||
report = []
|
||
limit = 10
|
||
check_type = copy.deepcopy(props)
|
||
check_type.update(settings.DEFAULT_FIELD)
|
||
|
||
select = ','.join([f'`{field}`' for field in check_type.keys()])
|
||
sql = f"""select {select} from {db}.event where game='{game}' and `#event_name`='{event_name}'"""
|
||
for k, v in where.items():
|
||
sql += f""" and `{k}`='{v}'"""
|
||
|
||
sql += f""" order by `#event_time` desc"""
|
||
sql += f""" limit {limit}"""
|
||
|
||
print(sql)
|
||
# pass_list: [], fail_list: []
|
||
# sql = 'show databases'
|
||
report = {'fail_list': [],
|
||
'pass_list': []}
|
||
fail_list = report['fail_list']
|
||
pass_list = report['pass_list']
|
||
try:
|
||
df = await ck_client.query_dataframe(sql)
|
||
except clickhouse_driver.errors.ServerException as e:
|
||
if e.code == 47:
|
||
msg = re.match(r"""DB::Exception: Missing columns: '(.*)' while processing query""", e.message)
|
||
filed = '未知'
|
||
if msg:
|
||
filed = msg.group(1)
|
||
fail_list.append(f'<p style="color:red;font-size:17px;">数据库不存在字段-> {filed}</p>')
|
||
else:
|
||
fail_list.append('<p style="color:red;font-size:17px;">数据库查询未知错误</p>')
|
||
|
||
return report
|
||
|
||
if df.empty:
|
||
fail_list.append('<p style="color:blue;font-size:17px;">根据过滤条件未查到任何数据,也有可能是数据未及时入库。(3分钟后还没查到说明存在问题)</p>')
|
||
return report
|
||
if is_unique and len(df) > 1:
|
||
fail_list.append('<p style="color:yellow;font-size:17px;">警告:记录数大于一条</p>')
|
||
|
||
for k, t in check_type.items():
|
||
if not isinstance(df[k][0], getattr(type_map, t)):
|
||
fail_list.append(f'<p style="color:red;font-size:17px;">错误:字段{k} 期望{t}类型,得到{type(df[k][0])}</p>')
|
||
else:
|
||
pass_list.append(f'<p style="color:green;font-size:17px;">通过:字段{k} 是期望的类型</p>')
|
||
|
||
return report
|
||
|
||
|
||
async def save_template(data_in: schemas.AddTemplate,
|
||
game: str,
|
||
db_name: str = 'debug'):
|
||
res = await check_template_coll.update_one({'title': data_in.title},
|
||
{'$set': {'game': game, 'db_name': db_name, 'check': data_in.dict()}},
|
||
upsert=True)
|
||
return True
|
||
|
||
|
||
async def get_template(*args, **kwargs):
|
||
doc_group = {}
|
||
async for doc in check_template_coll.find(*args, **kwargs):
|
||
doc_group.setdefault(doc['game'], []).append(doc)
|
||
|
||
return doc_group
|