import copy from .valid_data import * class Sketch: def __init__(self, db_client, struct_cache=None): self.db_client = db_client self.struct_cache = struct_cache self.__type_dict = dict() self.__struct_dict = dict() @property def type_dict(self): return self.__type_dict @property def struct_dict(self): return self.__struct_dict def up_tb_struct(self, db, tb, data): struct_dict = self.__struct_dict.get(f'{db}_{tb}', {}) # 用于类型转换 type_dict = self.__type_dict.setdefault(f'{db}_{tb}', {}) for k, v in struct_dict.items(): type_dict.setdefault(v, set()) type_dict[v].add(k) # 更新结构记录 if self.struct_cache: self.struct_cache.update(db, tb, data) def get_tb_struct_cache(self, db, tb): """ 查一条记录 取字段 和类型 :param db: :param tb: :return: """ if self.__struct_dict.get(f'{db}_{tb}'): return self.__struct_dict.get(f'{db}_{tb}') sql = f'select * from {db}.{tb} limit 1' _, columns = self.db_client.execute(sql, with_column_types=True) res = {item[0]: item[1] for item in columns} self.__struct_dict[f'{db}_{tb}'] = res self.up_tb_struct(db, tb, res) return res def update_user_view(self, db, tb): """ 更新视图 :param db: :param tb: :return: """ if tb != 'user': return sql = f'drop table if exists {db}.user_view' self.db_client.execute(sql) sql = f"""create view {db}.user_view as select * from {db}.user order by `#reg_time` desc LIMIT 1 by `#account_id`""" self.db_client.execute(sql) def alter_table(self, db, tb, data): """ 数据库字段检查 添加新字段为第一次出现类型 如果要修改字段类型 存在类型转换问题。停止程序,删除列 :param db: :param tb: :param data: :return: """ default_field = self.get_tb_struct_cache(db, tb) keys = set(default_field) for k, v in data.items(): if k in ('#type',): continue if k not in default_field: if isinstance(v, str): if is_valid_date(v): default_field[k] = "Nullable(DateTime('UTC'))" else: default_field[k] = 'Nullable(String)' if isinstance(v, int): default_field[k] = 'Nullable(UInt64)' if isinstance(v, float): default_field[k] = 'Nullable(Float32)' if isinstance(v, list): default_field[k] = 'Array(String)' if isinstance(v, bool): default_field[k] = 'Nullable(UInt8)' sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}' print(sql) try: self.db_client.execute(sql) except Exception as e: print(f'添加字段 {k} 失败') default_field.pop(k) if set(default_field) - keys: self.up_tb_struct(db, tb, default_field) self.update_user_view(db, tb)