import asyncio import traceback from fastapi import APIRouter, Request from common import * from models import DataModel from settings import settings router = APIRouter() def dest_partition(s: str) -> int: try: return int(s[-18:], 16) % 16 except: return 0 @router.post("/") async def point(request: Request, items: DataModel): try: print(items.json()) public_data, _ = restore_field(items.public) rdb = request.app.state.redis data_factory = request.app.state.data_factory output_factory = request.app.state.output_factory appid = items.public.get('b01') if not appid: return {'code': -1, 'msg': '必须要appid'} topic = settings.OUTPUT_NAME.get(appid) partition = dest_partition(items.public.get('x02') or 0) if not topic: return {'code': -1, 'msg': '没有匹配到appid'} for item in items.data: data, sign_data = restore_field(item) _ = data.pop('sign') properties = data.pop('properties') # sign = sign_data.pop('sign') sign_properties = sign_data.pop('properties') if not check_sign(sign, settings.SALT.get(appid, ''), sign_data, sign_properties): # continue print('签名错误') print(items) return {"code": -1, 'msg': '签名错误'} single_data = dict(**public_data, **data, properties=properties) check_preset(single_data) await asyncio.gather( *map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link)) msg = data_factory.format_data(single_data) output_factory.send(topic, msg, partition) return {"code": 0, 'msg': 'ok'} except Exception as e: traceback.format_exc() return {"code": -1, 'msg': str(e)}