import time from datetime import datetime import json import re import arrow from clickhouse_driver.client import Client from settings import settings from .struct_cache import StructCacheFile from .robot import DDRobot __all__ = ('CK',) class DataHandler: handler_link = [] def __init__(self, func): DataHandler.handler_link.append(func) class CK: def __init__(self, client=Client(**settings.CK_CONFIG), struct_cache=StructCacheFile(), robot=DDRobot(), bulk_max=1000): self.client = client self.struct_cache = struct_cache self.robot = robot self.type_dict = dict() self.struct_dict = dict() self.bulk_data = dict() self.bulk_max = bulk_max self.__send_ts = int(time.time()) def set_connect(self, client): self.client = client def up_tb_struct(self, db, tb, data): struct_dict = self.struct_dict.get(f'{db}_{tb}', {}) # 用于类型转换 type_dict = self.type_dict.setdefault(f'{db}_{tb}', {}) for k, v in struct_dict.items(): type_dict.setdefault(v, set()) type_dict[v].add(k) # 更新结构记录 self.struct_cache.update(db, tb, data) @DataHandler def date2utc(self, db, tb, data: dict): """ 日期置为utc时间 :param data: :return: """ for k in self.type_dict[f'{db}_{tb}'].get('Nullable(DateTime(\'UTC\'))', set()) | self.type_dict[ f'{db}_{tb}'].get( 'DateTime(\'UTC\')', set()): try: data[k] = arrow.get(data[k]).shift(hours=-data.get('zone_offset', 8)).format('YYYY-MM-DD HH:mm:ss') except: pass @DataHandler def array2str(self, db, tb, data: dict): """ 数组里统一存字符串 :param data: :return: """ for k in self.type_dict[f'{db}_{tb}'].get('Array(String)', set()): try: data[k] = [str(v) for v in data[k]] except: pass @DataHandler def up_user(self, db, tb, data: dict): """ 从视图中查出最新user info 与当前user 合并 :param db: :param tb: :param data: :return: """ if tb != 'user': return sql = f"SELECT * from {db}.user_view where account_id='{data['account_id']}'" d, col = self.client.execute(sql, with_column_types=True) if not d: return for v, c in zip(d[0], col): c = c[0] if c not in data: data[c] = v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v def __notify(self, *args, **kwargs): """ 预留机器人通知 :param args: :param kwargs: :return: """ if self.robot: self.robot.send(*args, **kwargs) else: pass def get_tb_struct_cache(self, db, tb): """ 查一条记录 取字段 和类型 :param db: :param tb: :return: """ if self.struct_dict.get(f'{db}_{tb}'): return self.struct_dict.get(f'{db}_{tb}') sql = f'select * from {db}.{tb} limit 1' _, columns = self.client.execute(sql, with_column_types=True) res = {item[0]: item[1] for item in columns} self.struct_dict[f'{db}_{tb}'] = res # s = self.client.execute(f'show create {db}.{tb}')[0][0] # s = re.match(r'.*?\((.*?)\)\n?ENGINE', s, re.S).group(1) # res = dict() # for row in s.split('\n'): # row = row.strip() # if not row: # continue # row = re.sub("[,`]", '', row) # k, t = row.split(' ')[:2] # res[k] = t # # self.struct_dict[f'{db}_{tb}'] = res self.up_tb_struct(db, tb, res) return res @staticmethod def is_valid_date(date: str): try: res = arrow.get(date) return res except: return False def update_user_view(self, db, tb): """ 更新视图 :param db: :param tb: :return: """ if tb != 'user': return sql = f"""drop table {db}.user_view;create view {db}.user_view as select * from {db}.user order by role_create_time desc LIMIT 1 by account_id""" self.client.execute(sql) def alter_table(self, db, tb, data): """ 数据库字段检查 添加新字段为第一次出现类型 如果要修改字段类型 存在类型转换问题。停止程序,删除列 :param db: :param tb: :param data: :return: """ default_field = self.get_tb_struct_cache(db, tb) keys = set(default_field) for k, v in data.items(): if k not in default_field: if isinstance(v, str): if self.is_valid_date(v): default_field[k] = "Nullable(DateTime('UTC'))" else: default_field[k] = 'Nullable(String)' if isinstance(v, int): default_field[k] = 'Nullable(UInt64)' if isinstance(v, float): default_field[k] = 'Nullable(Float32)' if isinstance(v, list): default_field[k] = 'Array(String)' if isinstance(v, bool): default_field[k] = 'Nullable(UInt8)' sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}' print(sql) try: self.client.execute(sql) except Exception as e: print(f'添加字段 {k} 失败') default_field.pop(k) else: self.update_user_view(db, tb) if set(default_field) - keys: self.up_tb_struct(db, tb, default_field) def __send(self, db, tb, data): """ sql写入需要字段名或全字段 这里 json 格式写入 超过错误允许率去掉那条记录再次递归 一般都是 类型不匹配错误 :param db: :param tb: :param data: :return: """ if not data: return sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow ' sql = sql + '\n'.join(data) try: # 允许20%错误率 self.client.execute('set input_format_allow_errors_ratio=0.2') self.client.execute(sql) except Exception as e: # 丢弃错误行 再次发送 if e.code == 26: m = re.match('(.*)?Stack trace', e.message) if m: error_msg = m.group(1) error_row = re.match('.*?errors out of (\d+) rows', error_msg) if error_row: error_row = int(error_row.group(1)) - 1 error_data = data.pop(error_row) self.__notify(error_msg, error_data) self.__send(db, tb, data) else: print(f'{db}.{tb}插入{len(data)}条') finally: data.clear() def __add(self, db, tb, msg): """ 列表缓存池 :param db: :param tb: :param msg: :return: """ bulk_data = self.bulk_data.setdefault(f'{db}', {'event': [], 'user': []}) bulk_data[tb].append(json.dumps(msg)) ts = int(time.time()) # 满足其一条件 写入ck if len(bulk_data[tb]) >= self.bulk_max or self.__send_ts + 60 <= ts: self.__send_ts = ts self.__send(db, tb, bulk_data[tb]) def send(self, db, tb, msg): params = (db, tb, msg) self.alter_table(*params) # 数据加工链 [f(self, *params) for f in DataHandler.handler_link] self.__add(*params) if __name__ == '__main__': ck_client = CK()