# coding:utf-8 import copy import re from collections import namedtuple from ipaddress import IPv4Address import numpy as np import clickhouse_driver import schemas from core import settings from db import ck_client from db.mongo import check_template_coll Type = namedtuple('Type', ['string', 'integer', 'array', 'ipv4']) type_map = Type(string=str, integer=np.number, array=list, ipv4=IPv4Address) async def check_data(db, game, data_in: schemas.CheckData): event_name = data_in.event_name is_unique = data_in.is_unique props = data_in.props where = data_in.where limit = 10 check_type = copy.deepcopy(props) check_type.update(data_in.default_field) select = ','.join([f'`{field}`' for field in check_type.keys()]) sql = f"""select {select} from {db}.event where game='{game}' and `#event_name`='{event_name}'""" for k, v in where.items(): sql += f""" and `{k}`='{v}'""" sql += f""" order by `#event_time` desc""" sql += f""" limit {limit}""" print(sql) # pass_list: [], fail_list: [] # sql = 'show databases' report = {'fail_list': [], 'pass_list': []} fail_list = report['fail_list'] pass_list = report['pass_list'] try: df = await ck_client.query_dataframe(sql) except clickhouse_driver.errors.ServerException as e: if e.code == 47: msg = re.match(r"""DB::Exception: Missing columns: '(.*)' while processing query""", e.message) filed = '未知' if msg: filed = msg.group(1) fail_list.append(f'

数据库不存在字段-> {filed}

') else: fail_list.append('

数据库查询未知错误

') return report if df.empty: fail_list.append('

根据过滤条件未查到任何数据,也有可能是数据未及时入库。(3分钟后还没查到说明存在问题)

') return report if is_unique and len(df) > 1: fail_list.append('

警告:记录数大于一条

') for k, t in check_type.items(): if not isinstance(df[k][0], getattr(type_map, t)): fail_list.append(f"""

错误:字段{k} 期望{t}类型,得到->{re.findall("'(.*)'>",str(type(df[k][0])))[0]}

""") else: pass_list.append(f'

通过:字段{k} 是期望的类型

') return report async def save_template(data_in: schemas.AddTemplate, game: str, db_name: str = 'debug'): res = await check_template_coll.update_one({'title': data_in.title}, {'$set': {'game': game, 'db_name': db_name, 'check': data_in.check.dict()}}, upsert=True) return True async def get_template(*args, **kwargs): res = [] async for doc in check_template_coll.find(*args, {'_id': False}, **kwargs): res.append(doc) return res def get_default_field(): return settings.DEFAULT_FIELD