From bf55fafe46afb74e7900a80afe8240e655c7314b Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 26 Oct 2021 20:35:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E4=B8=BA=E6=89=8B=E5=8A=A8=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- v2/consumer.py | 4 ++-- v2/transmitter.py | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/v2/consumer.py b/v2/consumer.py index 1c6c862..bf6cff8 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -6,11 +6,11 @@ from settings import settings __all__ = 'create_consumer', -def create_consumer(partition=-1): +def create_consumer(partition: int = -1): c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) def consumer(): - if partition > 0: + if partition > -1: c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) else: c.subscribe([settings.SUBSCRIBE_TOPIC]) diff --git a/v2/transmitter.py b/v2/transmitter.py index 298a377..7326eb7 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -5,7 +5,6 @@ import threading import time import traceback -from kafka import TopicPartition, OffsetAndMetadata from settings import settings from .valid_data import * @@ -47,7 +46,6 @@ class Transmitter: self.lock = lock self.event_attr = event_attr self.p = p - self.topic_p = TopicPartition(settings.SUBSCRIBE_TOPIC, p) def start_ping(self): t = Ping(self.db_client, self.p, self.log) @@ -144,7 +142,6 @@ class Transmitter: self.log.error(traceback.format_exc()) buffer.clear() try: - offsets = kafka_client.committed(self.topic_p) - kafka_client.commit({self.topic_p: OffsetAndMetadata(offsets, '')}) + kafka_client.commit() except Exception as e: self.log.error(f'进程:{self.p} error:{e}')