ta2legu/common.py
2021-04-21 18:05:14 +08:00

49 lines
1001 B
Python

import random
from kafka import KafkaConsumer
from kafka import KafkaProducer
from settings import settings
__all__ = 'flat_data', 'producer', 'consumer'
producer_client = KafkaProducer(**settings.KAFKA_PRODUCER_CONF)
struct_dict = {}
def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
c.subscribe(settings.SUBSCRIBE_TOPIC)
for msg in c:
topic = msg.topic
val = msg.value
yield topic, val
def producer(topic, data):
producer_client.send(topic, data, partition=random.randint(0, 15))
def flat_data(data: dict):
"""
一层kv 去掉#
:param data:
:return:
"""
res = dict()
for k, v in data.items():
if k.startswith('#'):
k = k[1:]
if isinstance(v, dict):
res.update(flat_data(v))
else:
res[k] = v
if 'uuid' in res:
del res['uuid']
if 'event_time' not in res and 'time' in res:
res['event_time'] = res['time']
return res