to_ck/v2/sketch.py
2021-09-26 19:07:51 +08:00

128 lines
4.0 KiB
Python

from settings import settings
from .valid_data import *
class Sketch:
def __init__(self, db_client, struct_cache=None):
self.db_client = db_client
self.struct_cache = struct_cache
self.__type_dict = dict()
self.__struct_dict = dict()
self.init_tb_struct()
@property
def type_dict(self):
return self.__type_dict
@property
def struct_dict(self):
return self.__struct_dict
def up_tb_struct(self, db, tb, data):
struct_dict = self.__struct_dict.get(f'{db}_{tb}', {})
# 用于类型转换
type_dict = self.__type_dict.setdefault(f'{db}_{tb}', {})
for k, v in struct_dict.items():
type_dict.setdefault(v, set())
type_dict[v].add(k)
# 更新结构记录
if self.struct_cache:
self.struct_cache.update(db, tb, data)
def init_tb_struct_cache(self, db, tb):
sql = f"select name,type from system.columns where database='{db}' and table='{tb}'"
data, columns = self.db_client.execute(sql, with_column_types=True, columnar=True)
res = {k: v for k, v in zip(data[0], data[1])}
self.__struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res
def init_tb_struct(self):
self.init_tb_struct_cache(settings.GAME, 'event')
self.init_tb_struct_cache(settings.GAME, 'user')
def get_tb_struct_cache(self, db, tb):
"""
取字段 和类型
:param db:
:param tb:
:return:
"""
if self.__struct_dict.get(f'{db}_{tb}'):
return self.__struct_dict.get(f'{db}_{tb}')
res = self.init_tb_struct_cache(db, tb)
return res
def update_user_view(self, db, tb):
"""
更新视图
:param db:
:param tb:
:return:
"""
if tb != 'user':
return
sql = f'drop table if exists {db}.user_view'
self.db_client.execute(sql)
sql = f"""create view {db}.user_view as select *
from {db}.user
order by `#reg_time` desc
LIMIT 1 by `#account_id`"""
self.db_client.execute(sql)
def alter_table(self, db, tb, data, try_cnt=10):
"""
数据库字段检查
添加新字段为第一次出现类型
如果要修改字段类型 存在类型转换问题。停止程序,删除列
:param db:
:param tb:
:param data:
:return:
"""
default_field = self.get_tb_struct_cache(db, tb)
keys = set(default_field)
for k, v in data.items():
if k in ('#type',):
continue
if k not in default_field:
if isinstance(v, str):
if is_valid_date(v):
default_field[k] = "Nullable(DateTime('UTC'))"
else:
default_field[k] = 'Nullable(String)'
if isinstance(v, int):
default_field[k] = 'Nullable(Int64)'
if isinstance(v, float):
default_field[k] = 'Nullable(Float32)'
if isinstance(v, list):
default_field[k] = 'Array(String)'
if isinstance(v, bool):
default_field[k] = 'Nullable(UInt8)'
try:
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
print(sql)
self.db_client.execute(sql)
except Exception as e:
print(f'添加字段 {k} 失败,同步数据库表结构')
# 读取数据库表结构并设置
self.init_tb_struct()
default_field.pop(k)
if try_cnt < 0:
raise e
return self.alter_table(db, tb, data, try_cnt=try_cnt - 1)
if set(default_field) - keys:
self.up_tb_struct(db, tb, default_field)
self.update_user_view(db, tb)