From ed2bdd1dead83375b2cad04a2eea3ea6a609a588 Mon Sep 17 00:00:00 2001 From: wu hao <15392746632@qq.com> Date: Fri, 2 Apr 2021 01:10:23 +0800 Subject: [PATCH] update --- common/__init__.py | 1 + common/verification.py | 23 + handler_data/__init__.py | 10 +- handler_data/handler_event.py | 7 - handler_data/handler_user.py | 20 - handler_data/legu_handler.py | 9 + handler_data/ta_handler.py | 49 ++ main.py | 26 +- models/__init__.py | 3 +- models/base.py | 107 +--- models/event.py | 70 --- models/user.py | 66 --- output/__init__.py | 9 + output/base.py | 7 + output/kafka_p.py | 34 ++ routers/event.py | 23 - routers/point.py | 68 +-- routers/user.py | 22 - settings.py | 101 +++- utils/ta_sdk.py | 930 ---------------------------------- 20 files changed, 285 insertions(+), 1300 deletions(-) create mode 100644 common/__init__.py create mode 100644 common/verification.py delete mode 100644 handler_data/handler_event.py delete mode 100644 handler_data/handler_user.py create mode 100644 handler_data/legu_handler.py create mode 100644 handler_data/ta_handler.py delete mode 100644 models/event.py delete mode 100644 models/user.py create mode 100644 output/__init__.py create mode 100644 output/base.py create mode 100644 output/kafka_p.py delete mode 100644 routers/event.py delete mode 100644 routers/user.py delete mode 100644 utils/ta_sdk.py diff --git a/common/__init__.py b/common/__init__.py new file mode 100644 index 0000000..fca89fd --- /dev/null +++ b/common/__init__.py @@ -0,0 +1 @@ +from verification import * diff --git a/common/verification.py b/common/verification.py new file mode 100644 index 0000000..da82360 --- /dev/null +++ b/common/verification.py @@ -0,0 +1,23 @@ +import hashlib + +from settings import settings + +__all__ = 'restore_field', 'sort_kv', 'check_sign' + + +def restore_field(data: dict) -> dict: + res = dict() + for k, v in data.items(): + res[settings.FIELD_MAP.get(k) or k] = v + return res + + +def sort_kv(*args: dict): + return ''.join(map(lambda item: ''.join(map(lambda x: f'{x[0]}{x[1]}', item.items())), args)) + + +def check_sign(sign: str, *args: dict): + s = sort_kv(*args) + settings.SALT + if hashlib.md5(s.encode()).hexdigest() != sign: + return False + return True diff --git a/handler_data/__init__.py b/handler_data/__init__.py index 3909da1..ffd5a40 100644 --- a/handler_data/__init__.py +++ b/handler_data/__init__.py @@ -1,2 +1,8 @@ -from .handler_user import HandlerUser -from .handler_event import HandlerEvent +from ta_handler import TaHandler + + +def data_factory(who): + f = { + 'ta': TaHandler + } + return f.get(who) diff --git a/handler_data/handler_event.py b/handler_data/handler_event.py deleted file mode 100644 index 2dcae22..0000000 --- a/handler_data/handler_event.py +++ /dev/null @@ -1,7 +0,0 @@ -class HandlerEvent: - handler_link = [] - - def __init__(self, func): - HandlerEvent.handler_link.append(func) - - diff --git a/handler_data/handler_user.py b/handler_data/handler_user.py deleted file mode 100644 index f558205..0000000 --- a/handler_data/handler_user.py +++ /dev/null @@ -1,20 +0,0 @@ -class HandlerUser: - handler_link = [] - - def __init__(self, func): - HandlerUser.handler_link.append(func) - - -@HandlerUser -async def device_label(rdb, item): - """ - 标记新设备 - :param rdb: - :param item: - :return: - """ - v = await rdb.execute('sadd', f'{item.game}.devices', item.properties.get('#device_id', '')) - if v: - item.properties['is_new_device'] = 1 - else: - item.properties['is_new_device'] = 0 \ No newline at end of file diff --git a/handler_data/legu_handler.py b/handler_data/legu_handler.py new file mode 100644 index 0000000..bb920ed --- /dev/null +++ b/handler_data/legu_handler.py @@ -0,0 +1,9 @@ +class LeGuHandler: + handler_link = [] + + def __init__(self, func): + LeGuHandler.handler_link.append(func) + + @staticmethod + def format_data(data: dict): + pass diff --git a/handler_data/ta_handler.py b/handler_data/ta_handler.py new file mode 100644 index 0000000..a783e27 --- /dev/null +++ b/handler_data/ta_handler.py @@ -0,0 +1,49 @@ +from settings import settings + + +class TaHandler: + handler_link = [] + + def __init__(self, func): + TaHandler.handler_link.append(func) + + @staticmethod + def format_data(data: dict): + msg = dict() + for k in settings.TA_OUTER: + v = data.get(k) + if v: + msg[k] = data.pop(k) + msg['properties'] = data + return msg + + +@TaHandler +async def add_ip(request, rdb, data): + """ + 添加源ip + :param request: + :param rdb: + :param data: + :return: + """ + ip = request.client.host + data['ip'] = data.get('ip') or ip + + +@TaHandler +async def device_label(request, rdb, data): + """ + 标记新设备 + :param request: + :param rdb: + :param data: + :return: + """ + # 条件 + if data.get('type') == 'user_add': + v = await rdb.execute('sadd', f'{data.game}.devices', data.properties.get('#device_id', '')) + if v: + data.properties['is_new_device'] = 1 + else: + data.properties['is_new_device'] = 0 diff --git a/main.py b/main.py index ea387cc..fc454f4 100644 --- a/main.py +++ b/main.py @@ -3,10 +3,11 @@ from aioredis import create_redis_pool from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from routers import point, user, event +from output import output_factory +from handler_data import data_factory +from routers import point from settings import settings -from utils.ta_sdk import TGAnalytics, ToKafka app = FastAPI() @@ -18,7 +19,8 @@ app.add_middleware( allow_headers=["*"], ) -def register_redis(app: FastAPI) -> None: + +def register_redis(app: FastAPI = app) -> None: @app.on_event('startup') async def startup_event(): app.state.redis = await create_redis_pool(**settings.REDIS_CONF) @@ -29,17 +31,23 @@ def register_redis(app: FastAPI) -> None: await app.state.redis.wait_closed() -def register_ta(app: FastAPI) -> None: +def register_output(app: FastAPI = app) -> None: @app.on_event('startup') def startup_event(): - app.state.ta = TGAnalytics(ToKafka(settings.KAFKA_CONF)) + app.state.output_factory = output_factory('kafka') + + +def register_handler_data(app: FastAPI = app) -> None: + @app.on_event('startup') + def startup_event(): + app.state.data_factory = data_factory('ta') app.include_router(point.router, prefix='/v1') -app.include_router(user.router, prefix='/v1') -app.include_router(event.router, prefix='/v1') -register_redis(app) -register_ta(app) + +register_redis() +register_output() +register_handler_data() if __name__ == '__main__': uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True) diff --git a/models/__init__.py b/models/__init__.py index 064a3bc..5bfb18c 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,2 +1 @@ -from .user import * -from .event import * \ No newline at end of file +from .base import DataModel \ No newline at end of file diff --git a/models/base.py b/models/base.py index 034d9f7..95e3c50 100644 --- a/models/base.py +++ b/models/base.py @@ -1,107 +1,10 @@ -import hashlib -from enum import Enum +from typing import List, Dict from pydantic import Field -from pydantic import BaseModel, validator - -from settings import settings - -from ipaddress import IPv4Address - -FIELD_MAP = { - 'user_id': 'x01', - 'account_id': 'x02', - 'distinct_id': 'x03', - 'event_name': 'x04', - 'server_time':'x05', - - 'ip': 'a01', - 'country': 'a02', - 'country_code': 'a03', - 'province': 'a04', - 'city': 'a05', - 'os_version': 'a06', - 'manufacturer': 'a07', - 'os': 'a08', - 'device_id': 'a09', - 'screen_height': 'a10', - 'screen_width': 'a11', - 'device_model': 'a12', - 'app_version': 'a13', - 'bundle_id': 'a14', - 'lib': 'a15', - 'lib_version': 'a16', - 'network_type': 'a17', - 'carrier': 'a18', - 'browser': 'a19', - 'browser_version': 'a20', - 'duration': 'a21', - 'url': 'a22', - 'url_path': 'a23', - 'referrer': 'a24', - 'referrer_host': 'a25', - 'title': 'a26', - 'screen_name': 'a27', - 'element_id': 'a28', - 'element_type': 'a29', - 'resume_from_background': 'a30', - 'element_selector': 'a31', - 'element_position': 'a32', - 'element_content': 'a33', - 'scene': 'a34', - 'mp_platform': 'a35', - 'app_crashed_reason': 'a36', - 'zone_offset': 'a37', - - 'app_id':'b01', - 'event_time':'b06' -} +from pydantic import BaseModel -def to_alias(k: str) -> str: - return FIELD_MAP.get(k) or k - - -class IP4(str): - - @classmethod - def __get_validators__(cls): - yield cls.validate - - @classmethod - def validate(cls, v): - IPv4Address(v) - return str(v) - - -class ActEnum(str, Enum): - track = 'track' - user_set = 'user_set' - user_setOnce = 'user_setOnce' - user_add = 'user_add' - user_unset = 'user_unset' - user_append = 'user_append' - user_del = 'user_del' - - -class Base(BaseModel): - # sign = md5(game+act+ts+salt) - game: str = Field(..., title='游戏代号') - act: ActEnum = Field(..., title='操作', description='同ta一致') - preset: BaseModel - properties: dict = Field(..., title='自定义属性') - ts: int = Field(..., title='时间戳') - sign: str = Field(..., title='签名') - - @validator('sign') - def sign_validator(cls, v: str, values: dict): - s = f'{values.get("game", "")}{values.get("act", "")}{values.get("ts", "")}{settings.SALT}' - if hashlib.md5(s.encode()).hexdigest() == v: - return v - raise ValueError(f'sign {hashlib.md5(s.encode()).hexdigest()}') - - def dict(self, **kwargs): - kwargs.setdefault('exclude', {'preset', 'account_id', 'distinct_id', 'event_name'}) - self.properties.update(self.preset.dict(**kwargs)) - return super().dict(**kwargs) +class DataModel(BaseModel): + public: Dict = Field(..., title='公有属性', description='') + data: List = Field(..., title='数据属性', description='') diff --git a/models/event.py b/models/event.py deleted file mode 100644 index c89d690..0000000 --- a/models/event.py +++ /dev/null @@ -1,70 +0,0 @@ -from datetime import datetime - -from pydantic import BaseModel, Field - -from .base import Base, IP4, to_alias - -__all__ = ('EventModel',) - - -class Preset(BaseModel): - ip: IP4 = Field(None, title='ipv4',description='不传该字段默认使用源ip') - country: str = Field(None, title='国家',description='') - country_code: str = Field(None, title='国家代码',description='') - province: str = Field(None, title='省份',description='') - city: str = Field(None, title='城市',description='') - os_version: str = Field(None, title='操作系统版本',description='') - manufacturer: str = Field(None, title='设备制造商',description='') - os: str = Field(None, title='操作系统',description='') - device_id: str = Field(None, title='设备 ID',description='') - screen_height: int = Field(None, title='屏幕高度',description='') - screen_width: int = Field(None, title='屏幕宽度',description='') - device_model: str = Field(None, title='设备型号',description='') - app_version: str = Field(None, title='APP 版本',description='') - bundle_id: str = Field(None, title='APP包名',description='') - lib: str = Field(None, title='SDK 类型',description='') - lib_version: str = Field(None, title='SDK 版本',description='') - network_type: str = Field(None, title='网络状态',description='') - carrier: str = Field(None, title='网络运营商',description='') - browser: str = Field(None, title='浏览器类型',description='') - browser_version: str = Field(None, title='浏览器版本',description='') - duration: int = Field(None, title='事件时长',description='') - url: str = Field(None, title='页面地址',description='') - url_path: str = Field(None, title='页面路径',description='') - referrer: str = Field(None, title='前向地址',description='') - referrer_host: str = Field(None, title='前向路径',description='') - title: str = Field(None, title='页面标题',description='') - screen_name: str = Field(None, title='页面名称',description='') - element_id: str = Field(None, title='元素 ID',description='') - element_type: str = Field(None, title='元素类型',description='') - resume_from_background: str = Field(None, title='是否从后台唤醒',description='') - element_selector: str = Field(None, title='元素选择器',description='') - element_position: str = Field(None, title='元素位置',description='') - element_content: str = Field(None, title='元素内容',description='') - scene: str = Field(None, title='场景值',description='') - mp_platform: str = Field(None, title='小程序平台',description='') - app_crashed_reason: str = Field(None, title='异常信息',description='') - zone_offset: str = Field(None, title='时区偏移',description='') - - user_id: str = Field(..., title='用户唯一 ID',description='') - account_id: str = Field(..., title='账户 ID', description='') - distinct_id: str = Field(..., title='访客 ID',description='') - event_name: str = Field(..., title='事件名称',description='') - - # 事件 - app_id: str = Field(None, description='') - event_time: datetime = Field(None,title='事件时间', description='') - server_time: datetime = Field(None,title='服务端时间', description='') - - - def dict(self, **kwargs): - res = super().dict(**kwargs) - return {'#' + k: v for k, v in res.items() if v is not None} - - class Config: - alias_generator = to_alias - - -class EventModel(Base): - preset: Preset = Field(..., title='系统属性') - diff --git a/models/user.py b/models/user.py deleted file mode 100644 index 3877870..0000000 --- a/models/user.py +++ /dev/null @@ -1,66 +0,0 @@ -from datetime import datetime - -from pydantic import BaseModel, Field - -from .base import Base, IP4, to_alias - -__all__ = ('UserModel',) - - -class Preset(BaseModel): - ip: IP4 = Field(None, title='ipv4', description='不传该字段默认使用源ip') - country: str = Field(None, title='国家', description='') - country_code: str = Field(None, title='国家代码', description='') - province: str = Field(None, title='省份', description='') - city: str = Field(None, title='城市', description='') - os_version: str = Field(None, title='操作系统版本', description='') - manufacturer: str = Field(None, title='设备制造商', description='') - os: str = Field(None, title='操作系统', description='') - device_id: str = Field(None, title='设备 ID', description='') - screen_height: int = Field(None, title='屏幕高度', description='') - screen_width: int = Field(None, title='屏幕宽度', description='') - device_model: str = Field(None, title='设备型号', description='') - app_version: str = Field(None, title='APP 版本', description='') - bundle_id: str = Field(None, title='APP包名', description='') - lib: str = Field(None, title='SDK 类型', description='') - lib_version: str = Field(None, title='SDK 版本', description='') - network_type: str = Field(None, title='网络状态', description='') - carrier: str = Field(None, title='网络运营商', description='') - browser: str = Field(None, title='浏览器类型', description='') - browser_version: str = Field(None, title='浏览器版本', description='') - duration: int = Field(None, title='事件时长', description='') - url: str = Field(None, title='页面地址', description='') - url_path: str = Field(None, title='页面路径', description='') - referrer: str = Field(None, title='前向地址', description='') - referrer_host: str = Field(None, title='前向路径', description='') - title: str = Field(None, title='页面标题', description='') - screen_name: str = Field(None, title='页面名称', description='') - element_id: str = Field(None, title='元素 ID', description='') - element_type: str = Field(None, title='元素类型', description='') - resume_from_background: str = Field(None, title='是否从后台唤醒', description='') - element_selector: str = Field(None, title='元素选择器', description='') - element_position: str = Field(None, title='元素位置', description='') - element_content: str = Field(None, title='元素内容', description='') - scene: str = Field(None, title='场景值', description='') - mp_platform: str = Field(None, title='小程序平台', description='') - app_crashed_reason: str = Field(None, title='异常信息', description='') - zone_offset: str = Field(None, title='时区偏移', description='') - - user_id: str = Field(..., title='用户唯一 ID', description='') - account_id: str = Field(..., title='账户 ID', description='') - distinct_id: str = Field(..., title='访客 ID', description='') - # event_name: str = Field(..., title='事件名称',description='') - - # 用户 - server_time: datetime = Field(None, title='服务端时间', description='') - - def dict(self, **kwargs): - res = super().dict(**kwargs) - return {'#' + k: v for k, v in res.items() if v is not None} - - class Config: - alias_generator = to_alias - - -class UserModel(Base): - preset: Preset = Field(..., title='系统属性') diff --git a/output/__init__.py b/output/__init__.py new file mode 100644 index 0000000..5e9d7b4 --- /dev/null +++ b/output/__init__.py @@ -0,0 +1,9 @@ +from kafka_p import * +from settings import settings + + +def output_factory(who): + f = { + 'kafka': ToKafka(**settings.KAFKA_CONF) + } + return f.get(who) diff --git a/output/base.py b/output/base.py new file mode 100644 index 0000000..ed62d92 --- /dev/null +++ b/output/base.py @@ -0,0 +1,7 @@ +import abc + + +class BaseOutput(metaclass=abc.ABCMeta): + @abc.abstractmethod + def send(self, msg: dict): + pass diff --git a/output/kafka_p.py b/output/kafka_p.py new file mode 100644 index 0000000..1fbe2e8 --- /dev/null +++ b/output/kafka_p.py @@ -0,0 +1,34 @@ +import random + +from kafka import KafkaProducer +from .base import BaseOutput + +__all__ = ('ToKafka',) + + +class ToKafka(BaseOutput): + """ + 将数据发送到kafka + 注意 减少不必要的查询 分区固定设置16个 + """ + + def __init__(self, conf): + self.__topic_name = None + self.__producer = KafkaProducer(**conf) + self.partition = 15 + + @property + def topic_name(self): + return self.__topic_name + + @topic_name.setter + def topic_name(self, topic_name): + self.__topic_name = topic_name + # self.__producer.partitions_for(topic_name) + + def send(self, msg): + # msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}""" + try: + self.__producer.send(self.__topic_name, msg, partition=random.randint(0, self.partition)) + except Exception as e: + print(e) diff --git a/routers/event.py b/routers/event.py deleted file mode 100644 index 1bf4b3d..0000000 --- a/routers/event.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio - -from fastapi import APIRouter, Request - -from handler_data import HandlerEvent - -router = APIRouter() -from models import EventModel - - -@router.post("/event/") -async def event(request: Request, item: EventModel): - item.preset.ip = item.preset.ip or request.client.host - ta = getattr(request.app.state.ta, item.act) - # 将不同游戏发送到不同 topic_name - request.app.state.ta.consumer.topic_name = item.game - rdb = request.app.state.redis - await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerEvent.handler_link)) - - properties = item.dict()['properties'] - ta(item.preset.distinct_id, item.preset.account_id, item.preset.event_name, properties) - results = {"code": 0, 'msg': 'ok'} - return results diff --git a/routers/point.py b/routers/point.py index 8e5d8fe..e307688 100644 --- a/routers/point.py +++ b/routers/point.py @@ -1,58 +1,34 @@ import asyncio -import hashlib +import json from fastapi import APIRouter, Request -from pydantic import BaseModel, validator -from handler_data import HandlerUser, HandlerEvent -from settings import settings +from common import * +from models import DataModel router = APIRouter() -class Item(BaseModel): - # sign = md5(game+act+ts+salt) - distinct_id: str - game: str - account_id: str - act: str - event_name: str = None - properties: dict - ts: int - sign: str - - @validator('sign') - def sign_validator(cls, v: str, values: dict): - s = f'{values.get("game")}{values.get("act", "")}{values.get("ts", "")}{settings.SALT}' - if hashlib.md5(s.encode()).hexdigest() == v: - return v - raise ValueError(f'签名 {hashlib.md5(s.encode()).hexdigest()}') - - @router.post("/point/") -async def point(request: Request, item: Item): - ip = request.client.host - ta = getattr(request.app.state.ta, item.act) - # 将不同游戏发送到不同 topic_name - request.app.state.ta.consumer.topic_name = item.game - +async def point(request: Request, items: DataModel): + # 还原字段名 和 组装数据 + data_list = [] + public_data = restore_field(items.public) rdb = request.app.state.redis - if ta and item.event_name and item.act == 'track': - await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerEvent.handler_link)) - await track(ta, item) - else: - await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerUser.handler_link)) - await user_set(ta, item) - results = {"code": 0, 'msg': 'ok','ip':ip} - return results - - -async def track(ta, item): - ta(item.distinct_id, item.account_id, item.event_name, item.properties) - - -async def user_set(ta, item): - ta(item.distinct_id, item.account_id, item.properties) - + data_factory = request.app.state.data_factory + output_factory = request.app.state.output_factory + for item in items.data: + data = restore_field(item) + sign = data.pop('sign') + # 验签 + if not check_sign(sign, public_data, data): + return {"code": -1, 'msg': '签名错误'} + data.update(public_data) + data_list.append(data) + for item in data_list: + await asyncio.gather(*map(lambda o: asyncio.create_task(o(request, rdb, item)), data_factory.handler_link)) + msg = data_factory.format_data(item) + output_factory.send(msg) + return {"code": 0, 'msg': 'ok'} diff --git a/routers/user.py b/routers/user.py deleted file mode 100644 index cd219f5..0000000 --- a/routers/user.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio - -from fastapi import APIRouter, Request - -from handler_data import HandlerUser - -router = APIRouter() -from models import UserModel - - -@router.post("/user/") -async def user(request: Request, item: UserModel): - item.preset.ip = item.preset.ip or request.client.host - ta = getattr(request.app.state.ta, item.act) - # 将不同游戏发送到不同 topic_name - request.app.state.ta.consumer.topic_name = item.game - rdb = request.app.state.redis - await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerUser.handler_link)) - properties = item.dict()['properties'] - ta(item.preset.distinct_id, item.preset.account_id, properties) - results = {"code": 0, 'msg': 'ok'} - return results diff --git a/settings.py b/settings.py index 165ea17..3e780d3 100644 --- a/settings.py +++ b/settings.py @@ -1,4 +1,103 @@ +import json + + class Config: + FIELD_MAP = { + "x01": "user_id", + "x02": "account_id", + "x03": "distinct_id", + "x04": "event_name", + "x05": "server_time", + "a01": "ip", + "a02": "country", + "a03": "country_code", + "a04": "province", + "a05": "city", + "a06": "os_version", + "a07": "manufacturer", + "a08": "os", + "a09": "device_id", + "a10": "screen_height", + "a11": "screen_width", + "a12": "device_model", + "a13": "app_version", + "a14": "bundle_id", + "a15": "lib", + "a16": "lib_version", + "a17": "network_type", + "a18": "carrier", + "a19": "browser", + "a20": "browser_version", + "a21": "duration", + "a22": "url", + "a23": "url_path", + "a24": "referrer", + "a25": "referrer_host", + "a26": "title", + "a27": "screen_name", + "a28": "element_id", + "a29": "element_type", + "a30": "resume_from_background", + "a31": "element_selector", + "a32": "element_position", + "a33": "element_content", + "a34": "scene", + "a35": "mp_platform", + "a36": "app_crashed_reason", + "a37": "zone_offset", + "b01": "app_id", + "b06": "event_time" + } + + TA_MAP = { + "user_id": "#user_id", + "account_id": "#account_id", + "distinct_id": "#distinct_id", + "event_name": "#event_name", + "server_time": "#server_time", + "ip": "#ip", + "country": "#country", + "country_code": "#country_code", + "province": "#province", + "city": "#city", + "os_version": "#os_version", + "manufacturer": "#manufacturer", + "os": "#os", + "device_id": "#device_id", + "screen_height": "#screen_height", + "screen_width": "#screen_width", + "device_model": "#device_model", + "app_version": "#app_version", + "bundle_id": "#bundle_id", + "lib": "#lib", + "lib_version": "#lib_version", + "network_type": "#network_type", + "carrier": "#carrier", + "browser": "#browser", + "browser_version": "#browser_version", + "duration": "#duration", + "url": "#url", + "url_path": "#url_path", + "referrer": "#referrer", + "referrer_host": "#referrer_host", + "title": "#title", + "screen_name": "#screen_name", + "element_id": "#element_id", + "element_type": "#element_type", + "resume_from_background": "#resume_from_background", + "element_selector": "#element_selector", + "element_position": "#element_position", + "element_content": "#element_content", + "scene": "#scene", + "mp_platform": "#mp_platform", + "app_crashed_reason": "#app_crashed_reason", + "zone_offset": "#zone_offset", + "app_id": "#app_id", + "event_time": "#event_time" + } + + TA_OUTER = {'#time', '#ip', '#type', '#distinct_id', '#account_id', 'event_name'} + REDIS_CONF = { 'address': ('192.168.0.161', 6379), 'password': 'd1Gh*zp5', @@ -7,7 +106,7 @@ class Config: SALT = '0r4X00mH' KAFKA_CONF = { 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], - 'value_serializer': lambda v: v.encode('utf-8'), + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), } diff --git a/utils/ta_sdk.py b/utils/ta_sdk.py deleted file mode 100644 index 7f8bc16..0000000 --- a/utils/ta_sdk.py +++ /dev/null @@ -1,930 +0,0 @@ -# encoding:utf-8 - -from __future__ import unicode_literals - -import datetime -import gzip -import json -import os -import random -import re -import threading -import time -import uuid - -import requests -from kafka import KafkaProducer -from requests import ConnectionError - -try: - import queue - from urllib.parse import urlparse -except ImportError: - import Queue as queue - from urlparse import urlparse -try: - isinstance("", basestring) - - - def is_str(s): - return isinstance(s, basestring) -except NameError: - def is_str(s): - return isinstance(s, str) -try: - isinstance(1, long) - - - def is_int(n): - return isinstance(n, int) or isinstance(n, long) -except NameError: - def is_int(n): - return isinstance(n, int) - -try: - from enum import Enum - - ROTATE_MODE = Enum('ROTATE_MODE', ('DAILY', 'HOURLY')) -except ImportError: - class ROTATE_MODE(object): - DAILY = 0 - HOURLY = 1 - - -class TGAException(Exception): - pass - - -class TGAIllegalDataException(TGAException): - """数据格式异常 - - 在发送的数据格式有误时,SDK 会抛出此异常,用户应当捕获并处理. - """ - pass - - -class TGANetworkException(TGAException): - """网络异常 - - 在因为网络或者不可预知的问题导致数据无法发送时,SDK会抛出此异常,用户应当捕获并处理. - """ - pass - - -__version__ = '1.6.0' - - -class TGAnalytics(object): - """TGAnalytics 实例是发送事件数据和用户属性数据的关键实例 - """ - - __NAME_PATTERN = re.compile(r"^(#[a-z][a-z0-9_]{0,49})|([a-z][a-z0-9_]{0,50})$", re.I) - - def __init__(self, consumer, enable_uuid=False): - """创建一个 TGAnalytics 实例 - - TGAanlytics 需要与指定的 Consumer 一起使用,可以使用以下任何一种: - - LoggingConsumer: 批量实时写本地文件,并与 LogBus 搭配 - - BatchConsumer: 批量实时地向TA服务器传输数据(同步阻塞),不需要搭配传输工具 - - AsyncBatchConsumer: 批量实时地向TA服务器传输数据(异步非阻塞),不需要搭配传输工具 - - DebugConsumer: 逐条发送数据,并对数据格式做严格校验 - - Args: - consumer: 指定的 Consumer - """ - self.__consumer = consumer - self.__enableUuid = enable_uuid - self.__super_properties = {} - self.clear_super_properties() - - @property - def consumer(self): - """ - 用了更换 kafka topic_name - :return: - """ - return self.__consumer - - def user_set(self, distinct_id=None, account_id=None, properties=None): - """设置用户属性 - - 对于一般的用户属性,您可以调用 user_set 来进行设置。使用该接口上传的属性将会覆盖原有的属性值,如果之前不存在该用户属性, - 则会新建该用户属性,类型与传入属性的类型一致. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - properties: dict 类型的用户属性 - """ - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_set', properties_add=properties) - - def user_unset(self, distinct_id=None, account_id=None, properties=None): - """ - 删除某个用户的用户属性 - :param distinct_id: - :param account_id: - :param properties: - """ - if isinstance(properties, list): - properties = dict((key, 0) for key in properties) - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_unset', properties_add=properties) - - def user_setOnce(self, distinct_id=None, account_id=None, properties=None): - """设置用户属性, 不覆盖已存在的用户属性 - - 如果您要上传的用户属性只要设置一次,则可以调用 user_setOnce 来进行设置,当该属性之前已经有值的时候,将会忽略这条信息. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - properties: dict 类型的用户属性 - """ - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_setOnce', properties_add=properties) - - def user_add(self, distinct_id=None, account_id=None, properties=None): - """对指定的数值类型的用户属性进行累加操作 - - 当您要上传数值型的属性时,您可以调用 user_add 来对该属性进行累加操作. 如果该属性还未被设置,则会赋值0后再进行计算. - 可传入负值,等同于相减操作. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - properties: 数值类型的用户属性 - """ - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_add', properties_add=properties) - - def user_append(self, distinct_id=None, account_id=None, properties=None): - """追加一个用户的某一个或者多个集合类型 - Args: - distinct_id: 访客 ID - account_id: 账户 ID - properties: 集合 - """ - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_append', properties_add=properties) - - def user_del(self, distinct_id=None, account_id=None): - """删除用户 - - 如果您要删除某个用户,可以调用 user_del 将该用户删除。调用此函数后,将无法再查询该用户的用户属性, 但该用户产生的事件仍然可以被查询到. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - """ - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_del') - - def track(self, distinct_id=None, account_id=None, event_name=None, properties=None): - """发送事件数据 - - 您可以调用 track 来上传事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”, - 长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - event_name: 事件名称 - properties: 事件属性 - - Raises: - TGAIllegalDataException: 数据格式错误时会抛出此异常 - """ - all_properties = self._public_track_add(event_name) - - if properties: - all_properties.update(properties) - - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track', event_name=event_name, - properties_add=all_properties) - - def track_update(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None): - """发送可更新的事件数据 - - 您可以调用 track_update 来上传可更新的事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”, - 长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - event_name: 事件名称 - event_id: 事件唯一ID - properties: 事件属性 - - Raises: - TGAIllegalDataException: 数据格式错误时会抛出此异常 - """ - all_properties = self._public_track_add(event_name) - - if properties: - all_properties.update(properties) - - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_update', event_name=event_name, - event_id=event_id, properties_add=all_properties) - - def track_overwrite(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None): - """发送可覆盖的事件数据 - - 您可以调用 track_overwrite 来上传可全部覆盖的事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”, - 长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性. - - Args: - distinct_id: 访客 ID - account_id: 账户 ID - event_name: 事件名称 - event_id: 事件唯一ID - properties: 事件属性 - - Raises: - TGAIllegalDataException: 数据格式错误时会抛出此异常 - """ - all_properties = self._public_track_add(event_name) - - if properties: - all_properties.update(properties) - - self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_overwrite', event_name=event_name, - event_id=event_id, properties_add=all_properties) - - def flush(self): - """立即提交数据到相应的接收端 - """ - self.__consumer.flush() - - def close(self): - """关闭并退出 sdk - - 请在退出前调用本接口,以避免缓存内的数据丢失 - """ - self.__consumer.close() - - def _public_track_add(self, event_name): - if not is_str(event_name): - raise TGAIllegalDataException('a string type event_name is required for track') - - all_properties = { - '#lib': 'tga_python_sdk', - '#lib_version': __version__, - } - all_properties.update(self.__super_properties) - return all_properties - pass - - def __add(self, distinct_id, account_id, send_type, event_name=None, event_id=None, properties_add=None): - if distinct_id is None and account_id is None: - raise TGAException("Distinct_id and account_id must be set at least one") - - if properties_add: - properties = properties_add.copy() - else: - properties = {} - data = { - '#type': send_type - } - if "#ip" in properties.keys(): - data['#ip'] = properties.get("#ip") - del (properties['#ip']) - if "#first_check_id" in properties.keys(): - data['#first_check_id'] = properties.get("#first_check_id") - del (properties['#first_check_id']) - # 只支持UUID标准格式xxxxxxxx - xxxx - xxxx - xxxx - xxxxxxxxxxxx - if "#uuid" in properties.keys(): - data['#uuid'] = str(properties['#uuid']) - del (properties['#uuid']) - elif self.__enableUuid: - data['#uuid'] = str(uuid.uuid1()) - if "#app_id" in properties.keys(): - data['#app_id'] = properties.get("#app_id") - del (properties['#app_id']) - - self.__assert_properties(send_type, properties) - td_time = properties.get("#time") - data['#time'] = td_time - del (properties['#time']) - - data['properties'] = properties - - if event_name is not None: - data['#event_name'] = event_name - if event_id is not None: - data['#event_id'] = event_id - if distinct_id is not None: - data['#distinct_id'] = distinct_id - if account_id is not None: - data['#account_id'] = account_id - - self.__consumer.add(json.dumps(data)) - - def __assert_properties(self, action_type, properties): - if properties is not None: - if "#time" not in properties.keys(): - properties['#time'] = datetime.datetime.now() - else: - try: - time_temp = properties.get('#time') - if isinstance(time_temp, datetime.datetime) or isinstance(time_temp, datetime.date): - pass - else: - raise TGAIllegalDataException('Value of #time should be datetime.datetime or datetime.date') - except Exception as e: - raise TGAIllegalDataException(e) - - for key, value in properties.items(): - if not is_str(key): - raise TGAIllegalDataException("Property key must be a str. [key=%s]" % str(key)) - - if value is None: - continue - - if not self.__NAME_PATTERN.match(key): - raise TGAIllegalDataException( - "type[%s] property key must be a valid variable name. [key=%s]" % (action_type, str(key))) - - if not is_str(value) and not is_int(value) and not isinstance(value, float) \ - and not isinstance(value, bool) \ - and not isinstance(value, datetime.datetime) and not isinstance(value, datetime.date) \ - and not isinstance(value, list): - raise TGAIllegalDataException( - "property value must be a str/int/float/bool/datetime/date/list. [value=%s]" % type(value)) - - if 'user_add' == action_type.lower() and not self.__number(value) and not key.startswith('#'): - raise TGAIllegalDataException('user_add properties must be number type') - - if isinstance(value, datetime.datetime): - properties[key] = value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - elif isinstance(value, datetime.date): - properties[key] = value.strftime('%Y-%m-%d') - if isinstance(value, list): - i = 0 - for lvalue in value: - if isinstance(lvalue, datetime.datetime): - value[i] = lvalue.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - i += 1 - - def __number(self, s): - if is_int(s): - return True - if isinstance(s, float): - return True - return False - - def clear_super_properties(self): - """删除所有已设置的事件公共属性 - """ - self.__super_properties = { - '#lib': 'tga_python_sdk', - '#lib_version': __version__, - } - - def set_super_properties(self, super_properties): - """设置公共事件属性 - - 公共事件属性是所有事件中的属性属性,建议您在发送事件前,先设置公共事件属性. 当 track 的 properties 和 - super properties 有相同的 key 时,track 的 properties 会覆盖公共事件属性的值. - - Args: - super_properties 公共属性 - """ - self.__super_properties.update(super_properties) - - -if os.name == 'nt': - import msvcrt - - - def _lock(file_): - try: - save_pos = file_.tell() - file_.seek(0) - try: - msvcrt.locking(file_.fileno(), msvcrt.LK_LOCK, 1) - except IOError as e: - raise TGAException(e) - finally: - if save_pos: - file_.seek(save_pos) - except IOError as e: - raise TGAException(e) - - - def _unlock(file_): - try: - save_pos = file_.tell() - if save_pos: - file_.seek(0) - try: - msvcrt.locking(file_.fileno(), msvcrt.LK_UNLCK, 1) - except IOError as e: - raise TGAException(e) - finally: - if save_pos: - file_.seek(save_pos) - except IOError as e: - raise TGAException(e) -elif os.name == 'posix': - import fcntl - - - def _lock(file_): - try: - fcntl.flock(file_.fileno(), fcntl.LOCK_EX) - except IOError as e: - raise TGAException(e) - - - def _unlock(file_): - fcntl.flock(file_.fileno(), fcntl.LOCK_UN) -else: - raise TGAException("Python SDK is defined for NT and POSIX system.") - - -class _TAFileLock(object): - def __init__(self, file_handler): - self._file_handler = file_handler - - def __enter__(self): - _lock(self._file_handler) - return self - - def __exit__(self, t, v, tb): - _unlock(self._file_handler) - - -class LoggingConsumer(object): - """数据批量实时写入本地文件 - - 创建指定文件存放目录的 LoggingConsumer, 将数据使用 logging 库输出到指定路径. 同时,需将 LogBus 的监听文件夹地址 - 设置为此处的地址,即可使用LogBus进行数据的监听上传. - """ - _mutex = queue.Queue() - _mutex.put(1) - - class _FileWriter(object): - _writers = {} - _writeMutex = queue.Queue() - _writeMutex.put(1) - - @classmethod - def instance(cls, filename): - cls._writeMutex.get(block=True, timeout=None) - try: - if filename in cls._writers.keys(): - result = cls._writers[filename] - result._count = result._count + 1 - else: - result = cls(filename) - cls._writers[filename] = result - return result - finally: - cls._writeMutex.put(1) - - def __init__(self, filename): - self._filename = filename - self._file = open(self._filename, 'a') - self._count = 1 - - def close(self): - LoggingConsumer._FileWriter._writeMutex.get(block=True, timeout=None) - try: - self._count = self._count - 1 - if self._count == 0: - self._file.close() - del LoggingConsumer._FileWriter._writers[self._filename] - finally: - LoggingConsumer._FileWriter._writeMutex.put(1) - - def is_valid(self, filename): - return self._filename == filename - - def write(self, messages): - with _TAFileLock(self._file): - for message in messages: - self._file.write(message) - self._file.write('\n') - self._file.flush() - - @classmethod - def construct_filename(cls, directory, date_suffix, file_size, file_prefix): - filename = file_prefix + ".log." + date_suffix \ - if file_prefix is not None else "log." + date_suffix - - if file_size > 0: - count = 0 - file_path = directory + filename + "_" + str(count) - while os.path.exists(file_path) and cls.file_size_out(file_path, file_size): - count = count + 1 - file_path = directory + filename + "_" + str(count) - return file_path - else: - return directory + filename - - @classmethod - def file_size_out(cls, file_path, file_size): - fsize = os.path.getsize(file_path) - fsize = fsize / float(1024 * 1024) - if fsize >= file_size: - return True - return False - - @classmethod - def unlock_logging_consumer(cls): - cls._mutex.put(1) - - @classmethod - def lock_logging_consumer(cls): - cls._mutex.get(block=True, timeout=None) - - def __init__(self, log_directory, log_size=0, buffer_size=8192, rotate_mode=ROTATE_MODE.DAILY, file_prefix=None): - """创建指定日志文件目录的 LoggingConsumer - - Args: - log_directory: 日志保存目录 - log_size: 单个日志文件的大小, 单位 MB, log_size <= 0 表示不限制单个文件大小 - buffer_size: 每次写入文件的大小, 单位 Byte, 默认 8K - rotate_mode: 日志切分模式,默认按天切分 - """ - if not os.path.exists(log_directory): - os.makedirs(log_directory) - self.log_directory = log_directory # log文件保存的目录 - self.sdf = '%Y-%m-%d-%H' if rotate_mode == ROTATE_MODE.HOURLY else '%Y-%m-%d' - self.suffix = datetime.datetime.now().strftime(self.sdf) - self._fileSize = log_size # 单个log文件的大小 - if not self.log_directory.endswith("/"): - self.log_directory = self.log_directory + "/" - - self._buffer = [] - self._buffer_size = buffer_size - self._file_prefix = file_prefix - self.lock_logging_consumer() - filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize, - self._file_prefix) - self._writer = LoggingConsumer._FileWriter.instance(filename) - self.unlock_logging_consumer() - - def add(self, msg): - messages = None - self.lock_logging_consumer() - self._buffer.append(msg) - if len(self._buffer) > self._buffer_size: - messages = self._buffer - date_suffix = datetime.datetime.now().strftime(self.sdf) - if self.suffix != date_suffix: - self.suffix = date_suffix - - filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize, - self._file_prefix) - if not self._writer.is_valid(filename): - self._writer.close() - self._writer = LoggingConsumer._FileWriter.instance(filename) - self._buffer = [] - if messages: - self._writer.write(messages) - self.unlock_logging_consumer() - - def flush_with_close(self, is_close): - messages = None - self.lock_logging_consumer() - if len(self._buffer) > 0: - messages = self._buffer - filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize, - self._file_prefix) - if not self._writer.is_valid(filename): - self._writer.close() - self._writer = LoggingConsumer._FileWriter.instance(filename) - self._buffer = [] - if messages: - self._writer.write(messages) - if is_close: - self._writer.close() - self.unlock_logging_consumer() - - def flush(self): - self.flush_with_close(False) - - def close(self): - self.flush_with_close(True) - - -class BatchConsumer(object): - """同步、批量地向 TA 服务器传输数据 - - 通过指定接收端地址和 APP ID,可以同步的向 TA 服务器传输数据. 此 Consumer 不需要搭配传输工具, - 但是存在网络不稳定等原因造成数据丢失的可能,因此不建议在生产环境中使用. - - 触发上报的时机为以下条件满足其中之一的时候: - 1. 数据条数大于预定义的最大值, 默认为 20 条 - 2. 数据发送间隔超过预定义的最大时间, 默认为 3 秒 - """ - _batchlock = threading.RLock() - _cachelock = threading.RLock() - - def __init__(self, server_uri, appid, batch=20, timeout=30000, interval=3, compress=True, maxCacheSize=50): - """创建 BatchConsumer - - Args: - server_uri: 服务器的 URL 地址 - appid: 项目的 APP ID - batch: 指定触发上传的数据条数, 默认为 20 条, 最大 200 条 - timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms - interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3 秒 - """ - self.__interval = interval - self.__batch = min(batch, 200) - self.__message_channel = [] - self.__maxCacheSize = maxCacheSize - self.__cache_buffer = [] - self.__last_flush = time.time() - server_url = urlparse(server_uri) - self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, timeout) - self.__http_service.compress = compress - - def add(self, msg): - self._batchlock.acquire() - try: - self.__message_channel.append(msg) - finally: - self._batchlock.release() - if len(self.__message_channel) >= self.__batch \ - or len(self.__cache_buffer) > 0: - self.flush_once() - - def flush(self, throw_exception=True): - while len(self.__cache_buffer) > 0 or len(self.__message_channel) > 0: - try: - self.flush_once(throw_exception) - except TGAIllegalDataException: - continue - - def flush_once(self, throw_exception=True): - if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0: - return - - self._cachelock.acquire() - self._batchlock.acquire() - try: - try: - if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0: - return - if len(self.__cache_buffer) == 0 or len(self.__message_channel) >= self.__batch: - self.__cache_buffer.append(self.__message_channel) - self.__message_channel = [] - finally: - self._batchlock.release() - msg = self.__cache_buffer[0] - self.__http_service.send('[' + ','.join(msg) + ']', str(len(msg))) - self.__last_flush = time.time() - self.__cache_buffer = self.__cache_buffer[1:] - except TGANetworkException as e: - if throw_exception: - raise e - except TGAIllegalDataException as e: - self.__cache_buffer = self.__cache_buffer[1:] - if throw_exception: - raise e - finally: - self._cachelock.release() - - def close(self): - self.flush() - - pass - - -class AsyncBatchConsumer(object): - """异步、批量地向 TA 服务器发送数据的 - - AsyncBatchConsumer 使用独立的线程进行数据发送,当满足以下两个条件之一时触发数据上报: - 1. 数据条数大于预定义的最大值, 默认为 20 条 - 2. 数据发送间隔超过预定义的最大时间, 默认为 3 秒 - """ - - def __init__(self, server_uri, appid, interval=3, flush_size=20, queue_size=100000): - """创建 AsyncBatchConsumer - - Args: - server_uri: 服务器的 URL 地址 - appid: 项目的 APP ID - interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3 秒 - flush_size: 队列缓存的阈值,超过此值将立即进行发送 - queue_size: 缓存队列的大小 - """ - server_url = urlparse(server_uri) - self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, 30000) - self.__batch = flush_size - self.__queue = queue.Queue(queue_size) - - # 初始化发送线程 - self.__flushing_thread = self._AsyncFlushThread(self, interval) - self.__flushing_thread.daemon = True - self.__flushing_thread.start() - - def add(self, msg): - try: - self.__queue.put_nowait(msg) - except queue.Full as e: - raise TGANetworkException(e) - - if self.__queue.qsize() > self.__batch: - self.flush() - - def flush(self): - self.__flushing_thread.flush() - - def close(self): - self.__flushing_thread.stop() - while not self.__queue.empty(): - self._perform_request() - - def _perform_request(self): - """同步的发送数据 - - 仅用于内部调用, 用户不应当调用此方法. - """ - flush_buffer = [] - while len(flush_buffer) < self.__batch: - try: - flush_buffer.append(str(self.__queue.get_nowait())) - except queue.Empty: - break - - if len(flush_buffer) > 0: - for i in range(3): # 网络异常情况下重试 3 次 - try: - self.__http_service.send('[' + ','.join(flush_buffer) + ']', str(len(flush_buffer))) - return True - except TGANetworkException: - pass - except TGAIllegalDataException: - break - - class _AsyncFlushThread(threading.Thread): - def __init__(self, consumer, interval): - threading.Thread.__init__(self) - self._consumer = consumer - self._interval = interval - - self._stop_event = threading.Event() - self._finished_event = threading.Event() - self._flush_event = threading.Event() - - def flush(self): - self._flush_event.set() - - def stop(self): - """停止线程 - - 退出时需调用此方法,以保证线程安全结束. - """ - self._stop_event.set() - self._finished_event.wait() - - def run(self): - while True: - # 如果 _flush_event 标志位为 True,或者等待超过 _interval 则继续执行 - self._flush_event.wait(self._interval) - self._consumer._perform_request() - self._flush_event.clear() - - # 发现 stop 标志位时安全退出 - if self._stop_event.isSet(): - break - self._finished_event.set() - - -def _gzip_string(data): - try: - return gzip.compress(data) - except AttributeError: - import StringIO - buf = StringIO.StringIO() - fd = gzip.GzipFile(fileobj=buf, mode="w") - fd.write(data) - fd.close() - return buf.getvalue() - - -class _HttpServices(object): - """内部类,用于发送网络请求 - - 指定接收端地址和项目 APP ID, 实现向接收端上传数据的接口. 发送前将数据默认使用 Gzip 压缩, - """ - - def __init__(self, server_uri, appid, timeout=30000): - self.url = server_uri - self.appid = appid - self.timeout = timeout - self.compress = True - - def send(self, data, length): - """使用 Requests 发送数据给服务器 - - Args: - data: 待发送的数据 - length - - Raises: - TGAIllegalDataException: 数据错误 - TGANetworkException: 网络错误 - """ - headers = {'appid': self.appid, 'TA-Integration-Type': 'python-sdk', 'TA-Integration-Version': __version__, - 'TA-Integration-Count': length} - try: - compress_type = 'gzip' - if self.compress: - data = _gzip_string(data.encode("utf-8")) - else: - compress_type = 'none' - data = data.encode("utf-8") - headers['compress'] = compress_type - response = requests.post(self.url, data=data, headers=headers, timeout=self.timeout) - if response.status_code == 200: - responseData = json.loads(response.text) - if responseData["code"] == 0: - return True - else: - raise TGAIllegalDataException("Unexpected result code: " + str(responseData["code"])) - else: - raise TGANetworkException("Unexpected Http status code " + str(response.status_code)) - except ConnectionError as e: - time.sleep(0.5) - raise TGANetworkException("Data transmission failed due to " + repr(e)) - - -class DebugConsumer(object): - """逐条、同步的发送数据给接收服务器 - - 服务端会对数据进行严格校验,当某个属性不符合规范时,整条数据都不会入库. 当数据格式错误时抛出包含详细原因的异常信息. - 建议首先使用此 Consumer 来调试埋点数据. - """ - - def __init__(self, server_uri, appid, timeout=30000, write_data=True): - """创建 DebugConsumer - - Args: - server_uri: 服务器的 URL 地址 - appid: 项目的 APP ID - timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms - """ - server_url = urlparse(server_uri) - debug_url = server_url._replace(path='/data_debug') - self.__server_uri = debug_url.geturl() - self.__appid = appid - self.__timeout = timeout - self.__writer_data = write_data - - def add(self, msg): - try: - dry_run = 0 - if not self.__writer_data: - dry_run = 1 - response = requests.post(self.__server_uri, - data={'source': 'server', 'appid': self.__appid, 'data': msg, 'dryRun': dry_run}, - timeout=self.__timeout) - if response.status_code == 200: - responseData = json.loads(response.text) - if responseData["errorLevel"] == 0: - return True - else: - print("Unexpected result : \n %s" % response.text) - else: - raise TGANetworkException("Unexpected http status code: " + str(response.status_code)) - except ConnectionError as e: - time.sleep(0.5) - raise TGANetworkException("Data transmission failed due to " + repr(e)) - - def flush(self, throw_exception=True): - pass - - def close(self): - pass - - -class ToKafka(object): - """ - 将数据发送到kafka - 注意 减少不必要的查询 分区固定设置16个 - """ - - def __init__(self, conf): - self.__topic_name = None - self.__producer = KafkaProducer(**conf) - - @property - def topic_name(self): - return self.__topic_name - - @topic_name.setter - def topic_name(self, topic_name): - self.__topic_name = topic_name - # self.__producer.partitions_for(topic_name) - - def add(self, msg): - try: - self.__producer.send(self.__topic_name, msg, partition=random.randint(0, 15)) - except Exception as e: - print(e) - - def flush(self, throw_exception=True): - pass - - def close(self): - pass