改为手动提交

This commit is contained in:
wuaho 2021-10-26 20:35:54 +08:00
parent dad6c0b072
commit bf55fafe46
2 changed files with 3 additions and 6 deletions

View File

@ -6,11 +6,11 @@ from settings import settings
__all__ = 'create_consumer', __all__ = 'create_consumer',
def create_consumer(partition=-1): def create_consumer(partition: int = -1):
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
def consumer(): def consumer():
if partition > 0: if partition > -1:
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
else: else:
c.subscribe([settings.SUBSCRIBE_TOPIC]) c.subscribe([settings.SUBSCRIBE_TOPIC])

View File

@ -5,7 +5,6 @@ import threading
import time import time
import traceback import traceback
from kafka import TopicPartition, OffsetAndMetadata
from settings import settings from settings import settings
from .valid_data import * from .valid_data import *
@ -47,7 +46,6 @@ class Transmitter:
self.lock = lock self.lock = lock
self.event_attr = event_attr self.event_attr = event_attr
self.p = p self.p = p
self.topic_p = TopicPartition(settings.SUBSCRIBE_TOPIC, p)
def start_ping(self): def start_ping(self):
t = Ping(self.db_client, self.p, self.log) t = Ping(self.db_client, self.p, self.log)
@ -144,7 +142,6 @@ class Transmitter:
self.log.error(traceback.format_exc()) self.log.error(traceback.format_exc())
buffer.clear() buffer.clear()
try: try:
offsets = kafka_client.committed(self.topic_p) kafka_client.commit()
kafka_client.commit({self.topic_p: OffsetAndMetadata(offsets, '')})
except Exception as e: except Exception as e:
self.log.error(f'进程:{self.p} error:{e}') self.log.error(f'进程:{self.p} error:{e}')