from kafka import KafkaConsumer from kafka import TopicPartition from settings import settings __all__ = 'create_consumer', def create_consumer(partition=-1): c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) def consumer(): if partition > 0: c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) else: c.subscribe([settings.SUBSCRIBE_TOPIC]) for msg in c: # print(msg) topic = msg.topic val = msg.value if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'): continue yield topic, val return consumer, c