import random from kafka import KafkaProducer from .base import BaseOutput __all__ = ('ToKafka',) class ToKafka: """ 将数据发送到kafka 注意 减少不必要的查询 分区固定设置16个 """ def __init__(self, conf): self.__producer = KafkaProducer(**conf) def send(self, topic, msg, partition): # msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}""" try: self.__producer.send(topic, msg, partition=partition) except Exception as e: print(e)