From f078d04279697897cd0aefd06ab567d71ff0c218 Mon Sep 17 00:00:00 2001 From: wuaho Date: Wed, 28 Jul 2021 19:32:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E8=A1=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- v2/consumer.py | 4 ++-- v2/sketch.py | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/v2/consumer.py b/v2/consumer.py index 747cef4..ff12e4f 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -17,8 +17,8 @@ def create_consumer(partition=-1): # print(msg) topic = msg.topic val = msg.value - if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'): - continue + # if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'): + # continue yield topic, val return consumer diff --git a/v2/sketch.py b/v2/sketch.py index 00c8836..350827c 100644 --- a/v2/sketch.py +++ b/v2/sketch.py @@ -74,7 +74,7 @@ class Sketch: LIMIT 1 by `#account_id`""" self.db_client.execute(sql) - def alter_table(self, db, tb, data): + def alter_table(self, db, tb, data, try_cnt=10): """ 数据库字段检查 添加新字段为第一次出现类型 @@ -114,8 +114,13 @@ class Sketch: try: self.db_client.execute(sql) except Exception as e: - print(f'添加字段 {k} 失败') + print(f'添加字段 {k} 失败,同步数据库表结构') + # 读取数据库表结构并设置 + self.init_tb_struct() default_field.pop(k) + if try_cnt < 0: + raise e + return self.alter_table(db, tb, data, try_cnt=try_cnt - 1) if set(default_field) - keys: self.up_tb_struct(db, tb, default_field)