diff --git a/v2/consumer.py b/v2/consumer.py index 1c6c862..bf6cff8 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -6,11 +6,11 @@ from settings import settings __all__ = 'create_consumer', -def create_consumer(partition=-1): +def create_consumer(partition: int = -1): c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) def consumer(): - if partition > 0: + if partition > -1: c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) else: c.subscribe([settings.SUBSCRIBE_TOPIC]) diff --git a/v2/transmitter.py b/v2/transmitter.py index 298a377..7326eb7 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -5,7 +5,6 @@ import threading import time import traceback -from kafka import TopicPartition, OffsetAndMetadata from settings import settings from .valid_data import * @@ -47,7 +46,6 @@ class Transmitter: self.lock = lock self.event_attr = event_attr self.p = p - self.topic_p = TopicPartition(settings.SUBSCRIBE_TOPIC, p) def start_ping(self): t = Ping(self.db_client, self.p, self.log) @@ -144,7 +142,6 @@ class Transmitter: self.log.error(traceback.format_exc()) buffer.clear() try: - offsets = kafka_client.committed(self.topic_p) - kafka_client.commit({self.topic_p: OffsetAndMetadata(offsets, '')}) + kafka_client.commit() except Exception as e: self.log.error(f'进程:{self.p} error:{e}')