From dad6c0b072ded98d803254d6768d5e5058870ec6 Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 26 Oct 2021 19:55:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E4=B8=BA=E6=89=8B=E5=8A=A8=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- v2/transmitter.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/v2/transmitter.py b/v2/transmitter.py index 9f03419..298a377 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -5,6 +5,9 @@ import threading import time import traceback +from kafka import TopicPartition, OffsetAndMetadata + +from settings import settings from .valid_data import * __all__ = 'Transmitter', @@ -44,6 +47,7 @@ class Transmitter: self.lock = lock self.event_attr = event_attr self.p = p + self.topic_p = TopicPartition(settings.SUBSCRIBE_TOPIC, p) def start_ping(self): t = Ping(self.db_client, self.p, self.log) @@ -106,8 +110,6 @@ class Transmitter: def check_table(self, db, tb, data): [self.sketch.alter_table(db, tb, item) for item in data] - - def collect_event(self, db, tb, data): if tb != 'event': return @@ -141,7 +143,8 @@ class Transmitter: except Exception as e: self.log.error(traceback.format_exc()) buffer.clear() - # try: - # kafka_client.commit() - # except Exception as e: - # self.log.error(f'进程:{self.p} error:{e}') + try: + offsets = kafka_client.committed(self.topic_p) + kafka_client.commit({self.topic_p: OffsetAndMetadata(offsets, '')}) + except Exception as e: + self.log.error(f'进程:{self.p} error:{e}')