pointapi/output/kafka_p.py
2021-04-02 01:30:28 +08:00

26 lines
835 B
Python

import random
from kafka import KafkaProducer
from .base import BaseOutput
__all__ = ('ToKafka',)
class ToKafka(BaseOutput):
"""
将数据发送到kafka
注意 减少不必要的查询 分区固定设置16个
"""
def __init__(self, conf):
self.topic_name = None
self.__producer = KafkaProducer(**conf)
self.__partition = 15
def send(self, msg):
# msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}"""
try:
self.__producer.send(self.topic_name, msg, partition=random.randint(0, self.__partition))
except Exception as e:
print(e)