diff --git a/main.py b/main.py index fc454f4..4b7e92c 100644 --- a/main.py +++ b/main.py @@ -50,4 +50,4 @@ register_output() register_handler_data() if __name__ == '__main__': - uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True) + uvicorn.run(app='main:app', host="0.0.0.0", port=6789, reload=True, debug=True) diff --git a/output/kafka_p.py b/output/kafka_p.py index fed074b..eacf4f9 100644 --- a/output/kafka_p.py +++ b/output/kafka_p.py @@ -6,19 +6,18 @@ from .base import BaseOutput __all__ = ('ToKafka',) -class ToKafka(BaseOutput): +class ToKafka: """ 将数据发送到kafka 注意 减少不必要的查询 分区固定设置16个 """ def __init__(self, conf): - self.name = None self.__producer = KafkaProducer(**conf) - def send(self, msg, partition): + def send(self, topic, msg, partition): # msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}""" try: - self.__producer.send(self.name, msg, partition=partition) + self.__producer.send(topic, msg, partition=partition) except Exception as e: print(e) diff --git a/routers/point.py b/routers/point.py index e898027..b30787e 100644 --- a/routers/point.py +++ b/routers/point.py @@ -27,10 +27,10 @@ async def point(request: Request, items: DataModel): appid = items.public.get('b01') if not appid: return {'code': -1, 'msg': '必须要appid'} - output_factory.name = settings.OUTPUT_NAME.get(appid) - partition = dest_partition(items.public.get('x02') or '0') + topic = settings.OUTPUT_NAME.get(appid) + partition = dest_partition(items.public.get('x02') or 0) - if not output_factory.name: + if not topic: return {'code': -1, 'msg': '没有匹配到appid'} for item in items.data: data, sign_data = restore_field(item) @@ -49,7 +49,7 @@ async def point(request: Request, items: DataModel): await asyncio.gather( *map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link)) msg = data_factory.format_data(single_data) - output_factory.send(msg, partition) + output_factory.send(topic, msg, partition) return {"code": 0, 'msg': 'ok'} except Exception as e: