From 9ddbe7871cf9eceefc86017e581445bb411a2765 Mon Sep 17 00:00:00 2001 From: kf_wuhao <15392746632@qq.com> Date: Fri, 2 Apr 2021 13:44:47 +0800 Subject: [PATCH] update --- .gitignore | 1 + common/__init__.py | 2 +- common/verification.py | 6 +- handler_data/__init__.py | 2 +- handler_data/ta_handler.py | 9 ++- output/__init__.py | 4 +- output/kafka_p.py | 4 +- routers/point.py | 15 ++-- settings.py | 144 +++++++++++++------------------------ 9 files changed, 78 insertions(+), 109 deletions(-) diff --git a/.gitignore b/.gitignore index 24d07bf..a3e7ea1 100644 --- a/.gitignore +++ b/.gitignore @@ -131,4 +131,5 @@ dmypy.json test .idea +utils/ta_sdk.py diff --git a/common/__init__.py b/common/__init__.py index fca89fd..bcf3767 100644 --- a/common/__init__.py +++ b/common/__init__.py @@ -1 +1 @@ -from verification import * +from .verification import * diff --git a/common/verification.py b/common/verification.py index da82360..71d7b72 100644 --- a/common/verification.py +++ b/common/verification.py @@ -13,11 +13,11 @@ def restore_field(data: dict) -> dict: def sort_kv(*args: dict): - return ''.join(map(lambda item: ''.join(map(lambda x: f'{x[0]}{x[1]}', item.items())), args)) + return '&'.join(map(lambda item: '&'.join(map(lambda x: f'{x[0]}={x[1]}', sorted(item.items()))), args)) -def check_sign(sign: str, *args: dict): - s = sort_kv(*args) + settings.SALT +def check_sign(sign: str, salt: str, *args: dict): + s = sort_kv(*args) + salt if hashlib.md5(s.encode()).hexdigest() != sign: return False return True diff --git a/handler_data/__init__.py b/handler_data/__init__.py index ffd5a40..a010dc3 100644 --- a/handler_data/__init__.py +++ b/handler_data/__init__.py @@ -1,4 +1,4 @@ -from ta_handler import TaHandler +from .ta_handler import TaHandler def data_factory(who): diff --git a/handler_data/ta_handler.py b/handler_data/ta_handler.py index a783e27..faec6ce 100644 --- a/handler_data/ta_handler.py +++ b/handler_data/ta_handler.py @@ -14,7 +14,12 @@ class TaHandler: v = data.get(k) if v: msg[k] = data.pop(k) - msg['properties'] = data + msg['properties'] = data.pop('properties') + if msg['#type'] == 'track': + msg.update(data) + elif data['#type'] == 'user': + data['#type'] = 'user' + data['#event_name'] + msg['properties']['#user_id'] = data['#user_id'] return msg @@ -28,7 +33,7 @@ async def add_ip(request, rdb, data): :return: """ ip = request.client.host - data['ip'] = data.get('ip') or ip + data['#ip'] = data.get('#ip') or ip @TaHandler diff --git a/output/__init__.py b/output/__init__.py index 5e9d7b4..9c999c0 100644 --- a/output/__init__.py +++ b/output/__init__.py @@ -1,9 +1,9 @@ -from kafka_p import * +from .kafka_p import * from settings import settings def output_factory(who): f = { - 'kafka': ToKafka(**settings.KAFKA_CONF) + 'kafka': ToKafka(settings.KAFKA_CONF) } return f.get(who) diff --git a/output/kafka_p.py b/output/kafka_p.py index c90e597..e1b1a90 100644 --- a/output/kafka_p.py +++ b/output/kafka_p.py @@ -13,13 +13,13 @@ class ToKafka(BaseOutput): """ def __init__(self, conf): - self.topic_name = None + self.name = None self.__producer = KafkaProducer(**conf) self.__partition = 15 def send(self, msg): # msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}""" try: - self.__producer.send(self.topic_name, msg, partition=random.randint(0, self.__partition)) + self.__producer.send(self.name, msg, partition=random.randint(0, self.__partition)) except Exception as e: print(e) diff --git a/routers/point.py b/routers/point.py index 83226ed..6f4b08b 100644 --- a/routers/point.py +++ b/routers/point.py @@ -4,22 +4,27 @@ from fastapi import APIRouter, Request from common import * from models import DataModel +from settings import settings router = APIRouter() -@router.post("/point/") +@router.post("/point") async def point(request: Request, items: DataModel): - public_data = restore_field(items.public) + public_data: dict = restore_field(items.public) rdb = request.app.state.redis data_factory = request.app.state.data_factory output_factory = request.app.state.output_factory + appid = items.public['b01'] + output_factory.name = settings.OUTPUT_NAME[appid] for item in items.data: - data = restore_field(item) + data: dict = restore_field(item) sign = data.pop('sign') - if not check_sign(sign, public_data, data): + properties = data.pop('properties') + if not check_sign(sign, settings.SALT.get('appid',''), data, properties): + # continue return {"code": -1, 'msg': '签名错误'} - single_data = public_data | data + single_data = dict(**public_data, **data, properties=properties) await asyncio.gather( *map(lambda o: asyncio.create_task(o(request, rdb, single_data)), data_factory.handler_link)) msg = data_factory.format_data(single_data) diff --git a/settings.py b/settings.py index 3e780d3..eaa673d 100644 --- a/settings.py +++ b/settings.py @@ -3,107 +3,65 @@ import json class Config: FIELD_MAP = { - "x01": "user_id", - "x02": "account_id", - "x03": "distinct_id", - "x04": "event_name", - "x05": "server_time", - "a01": "ip", - "a02": "country", - "a03": "country_code", - "a04": "province", - "a05": "city", - "a06": "os_version", - "a07": "manufacturer", - "a08": "os", - "a09": "device_id", - "a10": "screen_height", - "a11": "screen_width", - "a12": "device_model", - "a13": "app_version", - "a14": "bundle_id", - "a15": "lib", - "a16": "lib_version", - "a17": "network_type", - "a18": "carrier", - "a19": "browser", - "a20": "browser_version", - "a21": "duration", - "a22": "url", - "a23": "url_path", - "a24": "referrer", - "a25": "referrer_host", - "a26": "title", - "a27": "screen_name", - "a28": "element_id", - "a29": "element_type", - "a30": "resume_from_background", - "a31": "element_selector", - "a32": "element_position", - "a33": "element_content", - "a34": "scene", - "a35": "mp_platform", - "a36": "app_crashed_reason", - "a37": "zone_offset", - "b01": "app_id", - "b06": "event_time" + "x01": "#user_id", + "x02": "#account_id", + "x03": "#distinct_id", + "x04": "#event_name", + "x05": "#server_time", + "a01": "#ip", + "a02": "#country", + "a03": "#country_code", + "a04": "#province", + "a05": "#city", + "a06": "#os_version", + "a07": "#manufacturer", + "a08": "#os", + "a09": "#device_id", + "a10": "#screen_height", + "a11": "#screen_width", + "a12": "#device_model", + "a13": "#app_version", + "a14": "#bundle_id", + "a15": "#lib", + "a16": "#lib_version", + "a17": "#network_type", + "a18": "#carrier", + "a19": "#browser", + "a20": "#browser_version", + "a21": "#duration", + "a22": "#url", + "a23": "#url_path", + "a24": "#referrer", + "a25": "#referrer_host", + "a26": "#title", + "a27": "#screen_name", + "a28": "#element_id", + "a29": "#element_type", + "a30": "#resume_from_background", + "a31": "#element_selector", + "a32": "#element_position", + "a33": "#element_content", + "a34": "#scene", + "a35": "#mp_platform", + "a36": "#app_crashed_reason", + "a37": "#zone_offset", + "b01": "#app_id", + "b06": "#event_time" } - TA_MAP = { - "user_id": "#user_id", - "account_id": "#account_id", - "distinct_id": "#distinct_id", - "event_name": "#event_name", - "server_time": "#server_time", - "ip": "#ip", - "country": "#country", - "country_code": "#country_code", - "province": "#province", - "city": "#city", - "os_version": "#os_version", - "manufacturer": "#manufacturer", - "os": "#os", - "device_id": "#device_id", - "screen_height": "#screen_height", - "screen_width": "#screen_width", - "device_model": "#device_model", - "app_version": "#app_version", - "bundle_id": "#bundle_id", - "lib": "#lib", - "lib_version": "#lib_version", - "network_type": "#network_type", - "carrier": "#carrier", - "browser": "#browser", - "browser_version": "#browser_version", - "duration": "#duration", - "url": "#url", - "url_path": "#url_path", - "referrer": "#referrer", - "referrer_host": "#referrer_host", - "title": "#title", - "screen_name": "#screen_name", - "element_id": "#element_id", - "element_type": "#element_type", - "resume_from_background": "#resume_from_background", - "element_selector": "#element_selector", - "element_position": "#element_position", - "element_content": "#element_content", - "scene": "#scene", - "mp_platform": "#mp_platform", - "app_crashed_reason": "#app_crashed_reason", - "zone_offset": "#zone_offset", - "app_id": "#app_id", - "event_time": "#event_time" - } - - TA_OUTER = {'#time', '#ip', '#type', '#distinct_id', '#account_id', 'event_name'} + TA_OUTER = {'#time', '#ip', '#type', '#distinct_id', '#account_id', '#event_name'} REDIS_CONF = { 'address': ('192.168.0.161', 6379), 'password': 'd1Gh*zp5', 'db': 1 } - SALT = '0r4X00mH' + SALT = { + '3F9AdWKZGhhNS2': 's4epprEG8DdyG5' + } + OUTPUT_NAME = { + '3F9AdWKZGhhNS2': 'test' + } KAFKA_CONF = { 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_serializer': lambda v: json.dumps(v).encode('utf-8'),