check_xdata_xbackend/apis/check_data/service.py
2021-09-23 12:04:11 +08:00

93 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
import copy
import re
from collections import namedtuple
from ipaddress import IPv4Address
import numpy as np
import clickhouse_driver
import schemas
from core import settings
from db import ck_client
from db.mongo import check_template_coll
Type = namedtuple('Type', ['string', 'integer', 'array', 'ipv4'])
type_map = Type(string=str, integer=np.number, array=list, ipv4=IPv4Address)
async def check_data(db, game, data_in: schemas.CheckData):
event_name = data_in.event_name
is_unique = data_in.is_unique
props = data_in.props
where = data_in.where
limit = 10
check_type = copy.deepcopy(props)
check_type.update(data_in.default_field)
select = ','.join([f'`{field}`' for field in check_type.keys()])
sql = f"""select {select} from {db}.event where game='{game}' and `#event_name`='{event_name}'"""
for k, v in where.items():
sql += f""" and `{k}`='{v}'"""
sql += f""" order by `#event_time` desc"""
sql += f""" limit {limit}"""
print(sql)
# pass_list: [], fail_list: []
# sql = 'show databases'
report = {'fail_list': [],
'pass_list': []}
fail_list = report['fail_list']
pass_list = report['pass_list']
try:
df = await ck_client.query_dataframe(sql)
except clickhouse_driver.errors.ServerException as e:
if e.code == 47:
msg = re.match(r"""DB::Exception: Missing columns: '(.*)' while processing query""", e.message)
filed = '未知'
if msg:
filed = msg.group(1)
fail_list.append(f'<p style="color:red;font-size:17px;">数据库不存在字段-> {filed}</p>')
else:
fail_list.append('<p style="color:red;font-size:17px;">数据库查询未知错误</p>')
return report
if df.empty:
fail_list.append('<p style="color:blue;font-size:17px;">根据过滤条件未查到任何数据也有可能是数据未及时入库。3分钟后还没查到说明存在问题</p>')
return report
if is_unique and len(df) > 1:
fail_list.append('<p style="color:yellow;font-size:17px;">警告:记录数大于一条</p>')
for k, t in check_type.items():
if not isinstance(df[k][0], getattr(type_map, t)):
fail_list.append(f"""<p style="color:red;font-size:17px;">错误:字段{k} 期望{t}类型,得到->{re.findall("'(.*)'>",str(type(df[k][0])))[0]}</p>""")
else:
pass_list.append(f'<p style="color:green;font-size:17px;">通过:字段{k} 是期望的类型</p>')
return report
async def save_template(data_in: schemas.AddTemplate,
game: str,
db_name: str = 'debug'):
res = await check_template_coll.update_one({'title': data_in.title},
{'$set': {'game': game, 'db_name': db_name,
'check': data_in.check.dict()}},
upsert=True)
return True
async def get_template(*args, **kwargs):
res = []
async for doc in check_template_coll.find(*args, {'_id': False}, **kwargs):
res.append(doc)
return res
def get_default_field():
return settings.DEFAULT_FIELD