分配 分区
This commit is contained in:
parent
808e8842f7
commit
cf8015f805
@ -3,5 +3,5 @@ import abc
|
||||
|
||||
class BaseOutput(metaclass=abc.ABCMeta):
|
||||
@abc.abstractmethod
|
||||
def send(self, msg: dict):
|
||||
def send(self, msg: dict, partition: int):
|
||||
pass
|
||||
|
@ -15,11 +15,10 @@ class ToKafka(BaseOutput):
|
||||
def __init__(self, conf):
|
||||
self.name = None
|
||||
self.__producer = KafkaProducer(**conf)
|
||||
self.__partition = 15
|
||||
|
||||
def send(self, msg):
|
||||
def send(self, msg, partition):
|
||||
# msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}"""
|
||||
try:
|
||||
self.__producer.send(self.name, msg, partition=random.randint(0, self.__partition))
|
||||
self.__producer.send(self.name, msg, partition=partition)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
@ -10,6 +10,13 @@ from settings import settings
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def dest_partition(s: str) -> int:
|
||||
try:
|
||||
return int(s[-8:], 16) % 16
|
||||
except:
|
||||
return 0
|
||||
|
||||
|
||||
@router.post("/")
|
||||
async def point(request: Request, items: DataModel):
|
||||
try:
|
||||
@ -21,6 +28,8 @@ async def point(request: Request, items: DataModel):
|
||||
if not appid:
|
||||
return {'code': -1, 'msg': '必须要appid'}
|
||||
output_factory.name = settings.OUTPUT_NAME.get(appid)
|
||||
partition = dest_partition(items.public.get('x02') or '0')
|
||||
|
||||
if not output_factory.name:
|
||||
return {'code': -1, 'msg': '没有匹配到appid'}
|
||||
for item in items.data:
|
||||
@ -40,7 +49,7 @@ async def point(request: Request, items: DataModel):
|
||||
await asyncio.gather(
|
||||
*map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link))
|
||||
msg = data_factory.format_data(single_data)
|
||||
output_factory.send(msg)
|
||||
output_factory.send(msg, partition)
|
||||
|
||||
return {"code": 0, 'msg': 'ok'}
|
||||
except Exception as e:
|
||||
|
Loading…
Reference in New Issue
Block a user