117 lines
4.0 KiB
Python
117 lines
4.0 KiB
Python
# coding:utf-8
|
||
|
||
import copy
|
||
import json
|
||
import re
|
||
from collections import namedtuple
|
||
from ipaddress import IPv4Address
|
||
|
||
import numpy as np
|
||
|
||
import clickhouse_driver
|
||
|
||
import schemas
|
||
from core.config import settings
|
||
from db import get_database
|
||
from db.ckdb import ckdb as ck_client
|
||
import crud
|
||
|
||
Type = namedtuple('Type', ['string', 'integer', 'array', 'ipv4'])
|
||
type_map = Type(string=str, integer=np.number, array=list, ipv4=IPv4Address)
|
||
|
||
|
||
async def check_data(game, data_in: schemas.CheckData):
|
||
db = data_in.db_name
|
||
event_name = data_in.event_name
|
||
is_unique = data_in.is_unique
|
||
props = data_in.props
|
||
where = data_in.where
|
||
limit = 5
|
||
check_type = copy.deepcopy(props)
|
||
check_type.update(data_in.default_field)
|
||
|
||
select = ','.join([f'`{field}`' for field in check_type.keys()])
|
||
sql = f"""select {select} from {db}.event where game='{game}' and `#event_name`='{event_name}'"""
|
||
for k, v in where.items():
|
||
sql += f""" and `{k}`='{v}'"""
|
||
|
||
sql += f""" order by `#event_time` desc"""
|
||
sql += f""" limit {limit}"""
|
||
|
||
print(sql)
|
||
# pass_list: [], fail_list: []
|
||
# sql = 'show databases'
|
||
report = {'fail_list': [],
|
||
'pass_list': []}
|
||
fail_list = report['fail_list']
|
||
pass_list = report['pass_list']
|
||
try:
|
||
df = await ck_client.query_dataframe(sql)
|
||
report['title'] = df.columns.tolist()
|
||
report['data'] = []
|
||
for item in df.values:
|
||
report['data'].append([])
|
||
report['data'][-1] = [str(i) for i in item]
|
||
|
||
except clickhouse_driver.errors.ServerException as e:
|
||
if e.code == 47:
|
||
msg = re.match(r"""DB::Exception: Missing columns: '(.*)' while processing query""", e.message)
|
||
filed = '未知'
|
||
if msg:
|
||
filed = msg.group(1)
|
||
fail_list.append(f'<p style="color:red;font-size:17px;">数据库不存在字段-> {filed}</p>')
|
||
else:
|
||
fail_list.append('<p style="color:red;font-size:17px;">数据库查询未知错误</p>')
|
||
|
||
return report
|
||
|
||
if df.empty:
|
||
fail_list.append('<p style="color:blue;font-size:17px;">根据过滤条件未查到任何数据,也有可能是数据未及时入库。(3分钟后还没查到说明存在问题)</p>')
|
||
return report
|
||
if is_unique and len(df) > 1:
|
||
fail_list.append('<p style="color:yellow;font-size:17px;">警告:记录数大于一条</p>')
|
||
|
||
for k, t in check_type.items():
|
||
if isinstance(df[k][0], str) and t == 'json':
|
||
try:
|
||
json.loads(df[k][0])
|
||
pass_list.append(f'<p style="color:green;font-size:17px;">通过:字段{k} 是期望的类型</p>')
|
||
continue
|
||
except:
|
||
fail_list.append(
|
||
f"""<p style="color:red;font-size:17px;">错误:字段{k} 期望{t}类型,不是json格式</p>""")
|
||
continue
|
||
if not isinstance(df[k][0], getattr(type_map, t)):
|
||
fail_list.append(
|
||
f"""<p style="color:red;font-size:17px;">错误:字段{k} 期望{t}类型,得到->{re.findall("'(.*)'>", str(type(df[k][0])))[0]}</p>""")
|
||
else:
|
||
pass_list.append(f'<p style="color:green;font-size:17px;">通过:字段{k} 是期望的类型</p>')
|
||
|
||
return report
|
||
|
||
|
||
async def save_template(db, data_in: schemas.AddTemplate,
|
||
game: str,
|
||
):
|
||
res = await crud.check_data.update_one(db, {'title': data_in.title, 'game': game},
|
||
{'$set': {'game': game,
|
||
'check': data_in.check.dict()}},
|
||
upsert=True)
|
||
return True
|
||
|
||
|
||
async def get_template(db, game, **kwargs):
|
||
res = []
|
||
async for doc in crud.check_data.find(db, {'game': game}, {'_id': False}, **kwargs):
|
||
res.append(doc)
|
||
return res
|
||
|
||
|
||
def get_default_field():
|
||
return settings.DEFAULT_FIELD
|
||
|
||
|
||
async def del_template(db, data_id: schemas.DelTemplate):
|
||
await crud.check_data.delete(db, data_id.dict())
|
||
return True
|