From 22fcd5e651d0a1a98ff34f73c125b2920445cd2c Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 26 Oct 2021 10:40:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E4=B8=BA=E6=89=8B=E5=8A=A8=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 2 +- v2/transmitter.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/app.py b/app.py index d50ea77..ef8733a 100644 --- a/app.py +++ b/app.py @@ -79,7 +79,7 @@ class XProcess(Process): else: continue - transmitter.run() + transmitter.run(kafka_client) while True: time.sleep(5) diff --git a/v2/transmitter.py b/v2/transmitter.py index b739e27..447b880 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -130,7 +130,7 @@ class Transmitter: for key in del_keys: del item[key] - def run(self): + def run(self, kafka_client): for tb, buffer in self.check_send(): try: data = [self.flat_data(x) for x in buffer.values()] @@ -141,3 +141,7 @@ class Transmitter: except Exception as e: self.log.error(traceback.format_exc()) buffer.clear() + try: + kafka_client.commit() + except Exception as e: + self.log.error(e)