diff --git a/output/base.py b/output/base.py index ed62d92..ba39faa 100644 --- a/output/base.py +++ b/output/base.py @@ -3,5 +3,5 @@ import abc class BaseOutput(metaclass=abc.ABCMeta): @abc.abstractmethod - def send(self, msg: dict): + def send(self, msg: dict, partition: int): pass diff --git a/output/kafka_p.py b/output/kafka_p.py index e1b1a90..fed074b 100644 --- a/output/kafka_p.py +++ b/output/kafka_p.py @@ -15,11 +15,10 @@ class ToKafka(BaseOutput): def __init__(self, conf): self.name = None self.__producer = KafkaProducer(**conf) - self.__partition = 15 - def send(self, msg): + def send(self, msg, partition): # msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}""" try: - self.__producer.send(self.name, msg, partition=random.randint(0, self.__partition)) + self.__producer.send(self.name, msg, partition=partition) except Exception as e: print(e) diff --git a/routers/point.py b/routers/point.py index 2987d87..cd0aea2 100644 --- a/routers/point.py +++ b/routers/point.py @@ -10,6 +10,13 @@ from settings import settings router = APIRouter() +def dest_partition(s: str) -> int: + try: + return int(s[-8:], 16) % 16 + except: + return 0 + + @router.post("/") async def point(request: Request, items: DataModel): try: @@ -21,6 +28,8 @@ async def point(request: Request, items: DataModel): if not appid: return {'code': -1, 'msg': '必须要appid'} output_factory.name = settings.OUTPUT_NAME.get(appid) + partition = dest_partition(items.public.get('x02') or '0') + if not output_factory.name: return {'code': -1, 'msg': '没有匹配到appid'} for item in items.data: @@ -40,7 +49,7 @@ async def point(request: Request, items: DataModel): await asyncio.gather( *map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link)) msg = data_factory.format_data(single_data) - output_factory.send(msg) + output_factory.send(msg, partition) return {"code": 0, 'msg': 'ok'} except Exception as e: