import random from kafka import KafkaConsumer from kafka import KafkaProducer from settings import settings __all__ = 'flat_data', 'producer', 'consumer' producer_client = KafkaProducer(**settings.KAFKA_PRODUCER_CONF) struct_dict = {} def consumer(): c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) c.subscribe(settings.SUBSCRIBE_TOPIC) for msg in c: topic = msg.topic val = msg.value yield topic, val def producer(topic, data): producer_client.send(topic, data, partition=random.randint(0, 15)) def flat_data(data: dict): """ 一层kv 去掉# :param data: :return: """ res = dict() for k, v in data.items(): if k.startswith('#'): k = k[1:] if isinstance(v, dict): res.update(flat_data(v)) else: res[k] = v if 'uuid' in res: del res['uuid'] if 'event_time' not in res and 'time' in res: res['event_time'] = res['time'] return res