topic没按预期发送问题
This commit is contained in:
parent
c71bf72436
commit
96e25fe2e3
2
main.py
2
main.py
@ -50,4 +50,4 @@ register_output()
|
|||||||
register_handler_data()
|
register_handler_data()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True)
|
uvicorn.run(app='main:app', host="0.0.0.0", port=6789, reload=True, debug=True)
|
||||||
|
@ -6,19 +6,18 @@ from .base import BaseOutput
|
|||||||
__all__ = ('ToKafka',)
|
__all__ = ('ToKafka',)
|
||||||
|
|
||||||
|
|
||||||
class ToKafka(BaseOutput):
|
class ToKafka:
|
||||||
"""
|
"""
|
||||||
将数据发送到kafka
|
将数据发送到kafka
|
||||||
注意 减少不必要的查询 分区固定设置16个
|
注意 减少不必要的查询 分区固定设置16个
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
self.name = None
|
|
||||||
self.__producer = KafkaProducer(**conf)
|
self.__producer = KafkaProducer(**conf)
|
||||||
|
|
||||||
def send(self, msg, partition):
|
def send(self, topic, msg, partition):
|
||||||
# msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}"""
|
# msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}"""
|
||||||
try:
|
try:
|
||||||
self.__producer.send(self.name, msg, partition=partition)
|
self.__producer.send(topic, msg, partition=partition)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
|
@ -27,10 +27,10 @@ async def point(request: Request, items: DataModel):
|
|||||||
appid = items.public.get('b01')
|
appid = items.public.get('b01')
|
||||||
if not appid:
|
if not appid:
|
||||||
return {'code': -1, 'msg': '必须要appid'}
|
return {'code': -1, 'msg': '必须要appid'}
|
||||||
output_factory.name = settings.OUTPUT_NAME.get(appid)
|
topic = settings.OUTPUT_NAME.get(appid)
|
||||||
partition = dest_partition(items.public.get('x02') or '0')
|
partition = dest_partition(items.public.get('x02') or 0)
|
||||||
|
|
||||||
if not output_factory.name:
|
if not topic:
|
||||||
return {'code': -1, 'msg': '没有匹配到appid'}
|
return {'code': -1, 'msg': '没有匹配到appid'}
|
||||||
for item in items.data:
|
for item in items.data:
|
||||||
data, sign_data = restore_field(item)
|
data, sign_data = restore_field(item)
|
||||||
@ -49,7 +49,7 @@ async def point(request: Request, items: DataModel):
|
|||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
*map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link))
|
*map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link))
|
||||||
msg = data_factory.format_data(single_data)
|
msg = data_factory.format_data(single_data)
|
||||||
output_factory.send(msg, partition)
|
output_factory.send(topic, msg, partition)
|
||||||
|
|
||||||
return {"code": 0, 'msg': 'ok'}
|
return {"code": 0, 'msg': 'ok'}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
Loading…
Reference in New Issue
Block a user