from kafka import KafkaConsumer from kafka import TopicPartition from settings import settings __all__ = 'create_consumer', def create_consumer(partition: int = -1): c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) def consumer(): if partition > -1: c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) else: c.subscribe([settings.SUBSCRIBE_TOPIC]) for msg in c: # print(msg) yield msg # topic = msg.topic # val = msg.value # yield topic, val return consumer, c