shoumeng_xiangsu/utils/consumer.py
2021-10-09 10:36:52 +08:00

21 lines
495 B
Python

from kafka import KafkaConsumer
from kafka import TopicPartition
__all__ = 'create_consumer',
from core import settings
def create_consumer(partition=-1):
client = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
def consumer():
if partition > 0:
client.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
else:
client.subscribe([settings.SUBSCRIBE_TOPIC])
for msg in client:
yield msg
return consumer, client