This commit is contained in:
wu hao 2021-04-02 01:10:23 +08:00
parent 6e677eda58
commit ed2bdd1dea
20 changed files with 285 additions and 1300 deletions

1
common/__init__.py Normal file
View File

@ -0,0 +1 @@
from verification import *

23
common/verification.py Normal file
View File

@ -0,0 +1,23 @@
import hashlib
from settings import settings
__all__ = 'restore_field', 'sort_kv', 'check_sign'
def restore_field(data: dict) -> dict:
res = dict()
for k, v in data.items():
res[settings.FIELD_MAP.get(k) or k] = v
return res
def sort_kv(*args: dict):
return ''.join(map(lambda item: ''.join(map(lambda x: f'{x[0]}{x[1]}', item.items())), args))
def check_sign(sign: str, *args: dict):
s = sort_kv(*args) + settings.SALT
if hashlib.md5(s.encode()).hexdigest() != sign:
return False
return True

View File

@ -1,2 +1,8 @@
from .handler_user import HandlerUser from ta_handler import TaHandler
from .handler_event import HandlerEvent
def data_factory(who):
f = {
'ta': TaHandler
}
return f.get(who)

View File

@ -1,7 +0,0 @@
class HandlerEvent:
handler_link = []
def __init__(self, func):
HandlerEvent.handler_link.append(func)

View File

@ -1,20 +0,0 @@
class HandlerUser:
handler_link = []
def __init__(self, func):
HandlerUser.handler_link.append(func)
@HandlerUser
async def device_label(rdb, item):
"""
标记新设备
:param rdb:
:param item:
:return:
"""
v = await rdb.execute('sadd', f'{item.game}.devices', item.properties.get('#device_id', ''))
if v:
item.properties['is_new_device'] = 1
else:
item.properties['is_new_device'] = 0

View File

@ -0,0 +1,9 @@
class LeGuHandler:
handler_link = []
def __init__(self, func):
LeGuHandler.handler_link.append(func)
@staticmethod
def format_data(data: dict):
pass

View File

@ -0,0 +1,49 @@
from settings import settings
class TaHandler:
handler_link = []
def __init__(self, func):
TaHandler.handler_link.append(func)
@staticmethod
def format_data(data: dict):
msg = dict()
for k in settings.TA_OUTER:
v = data.get(k)
if v:
msg[k] = data.pop(k)
msg['properties'] = data
return msg
@TaHandler
async def add_ip(request, rdb, data):
"""
添加源ip
:param request:
:param rdb:
:param data:
:return:
"""
ip = request.client.host
data['ip'] = data.get('ip') or ip
@TaHandler
async def device_label(request, rdb, data):
"""
标记新设备
:param request:
:param rdb:
:param data:
:return:
"""
# 条件
if data.get('type') == 'user_add':
v = await rdb.execute('sadd', f'{data.game}.devices', data.properties.get('#device_id', ''))
if v:
data.properties['is_new_device'] = 1
else:
data.properties['is_new_device'] = 0

26
main.py
View File

@ -3,10 +3,11 @@ from aioredis import create_redis_pool
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from routers import point, user, event from output import output_factory
from handler_data import data_factory
from routers import point
from settings import settings from settings import settings
from utils.ta_sdk import TGAnalytics, ToKafka
app = FastAPI() app = FastAPI()
@ -18,7 +19,8 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
def register_redis(app: FastAPI) -> None:
def register_redis(app: FastAPI = app) -> None:
@app.on_event('startup') @app.on_event('startup')
async def startup_event(): async def startup_event():
app.state.redis = await create_redis_pool(**settings.REDIS_CONF) app.state.redis = await create_redis_pool(**settings.REDIS_CONF)
@ -29,17 +31,23 @@ def register_redis(app: FastAPI) -> None:
await app.state.redis.wait_closed() await app.state.redis.wait_closed()
def register_ta(app: FastAPI) -> None: def register_output(app: FastAPI = app) -> None:
@app.on_event('startup') @app.on_event('startup')
def startup_event(): def startup_event():
app.state.ta = TGAnalytics(ToKafka(settings.KAFKA_CONF)) app.state.output_factory = output_factory('kafka')
def register_handler_data(app: FastAPI = app) -> None:
@app.on_event('startup')
def startup_event():
app.state.data_factory = data_factory('ta')
app.include_router(point.router, prefix='/v1') app.include_router(point.router, prefix='/v1')
app.include_router(user.router, prefix='/v1')
app.include_router(event.router, prefix='/v1') register_redis()
register_redis(app) register_output()
register_ta(app) register_handler_data()
if __name__ == '__main__': if __name__ == '__main__':
uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True) uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True)

View File

@ -1,2 +1 @@
from .user import * from .base import DataModel
from .event import *

View File

@ -1,107 +1,10 @@
import hashlib from typing import List, Dict
from enum import Enum
from pydantic import Field from pydantic import Field
from pydantic import BaseModel, validator from pydantic import BaseModel
from settings import settings
from ipaddress import IPv4Address
FIELD_MAP = {
'user_id': 'x01',
'account_id': 'x02',
'distinct_id': 'x03',
'event_name': 'x04',
'server_time':'x05',
'ip': 'a01',
'country': 'a02',
'country_code': 'a03',
'province': 'a04',
'city': 'a05',
'os_version': 'a06',
'manufacturer': 'a07',
'os': 'a08',
'device_id': 'a09',
'screen_height': 'a10',
'screen_width': 'a11',
'device_model': 'a12',
'app_version': 'a13',
'bundle_id': 'a14',
'lib': 'a15',
'lib_version': 'a16',
'network_type': 'a17',
'carrier': 'a18',
'browser': 'a19',
'browser_version': 'a20',
'duration': 'a21',
'url': 'a22',
'url_path': 'a23',
'referrer': 'a24',
'referrer_host': 'a25',
'title': 'a26',
'screen_name': 'a27',
'element_id': 'a28',
'element_type': 'a29',
'resume_from_background': 'a30',
'element_selector': 'a31',
'element_position': 'a32',
'element_content': 'a33',
'scene': 'a34',
'mp_platform': 'a35',
'app_crashed_reason': 'a36',
'zone_offset': 'a37',
'app_id':'b01',
'event_time':'b06'
}
def to_alias(k: str) -> str: class DataModel(BaseModel):
return FIELD_MAP.get(k) or k public: Dict = Field(..., title='公有属性', description='')
data: List = Field(..., title='数据属性', description='')
class IP4(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
IPv4Address(v)
return str(v)
class ActEnum(str, Enum):
track = 'track'
user_set = 'user_set'
user_setOnce = 'user_setOnce'
user_add = 'user_add'
user_unset = 'user_unset'
user_append = 'user_append'
user_del = 'user_del'
class Base(BaseModel):
# sign = md5(game+act+ts+salt)
game: str = Field(..., title='游戏代号')
act: ActEnum = Field(..., title='操作', description='同ta一致')
preset: BaseModel
properties: dict = Field(..., title='自定义属性')
ts: int = Field(..., title='时间戳')
sign: str = Field(..., title='签名')
@validator('sign')
def sign_validator(cls, v: str, values: dict):
s = f'{values.get("game", "")}{values.get("act", "")}{values.get("ts", "")}{settings.SALT}'
if hashlib.md5(s.encode()).hexdigest() == v:
return v
raise ValueError(f'sign {hashlib.md5(s.encode()).hexdigest()}')
def dict(self, **kwargs):
kwargs.setdefault('exclude', {'preset', 'account_id', 'distinct_id', 'event_name'})
self.properties.update(self.preset.dict(**kwargs))
return super().dict(**kwargs)

View File

@ -1,70 +0,0 @@
from datetime import datetime
from pydantic import BaseModel, Field
from .base import Base, IP4, to_alias
__all__ = ('EventModel',)
class Preset(BaseModel):
ip: IP4 = Field(None, title='ipv4',description='不传该字段默认使用源ip')
country: str = Field(None, title='国家',description='')
country_code: str = Field(None, title='国家代码',description='')
province: str = Field(None, title='省份',description='')
city: str = Field(None, title='城市',description='')
os_version: str = Field(None, title='操作系统版本',description='')
manufacturer: str = Field(None, title='设备制造商',description='')
os: str = Field(None, title='操作系统',description='')
device_id: str = Field(None, title='设备 ID',description='')
screen_height: int = Field(None, title='屏幕高度',description='')
screen_width: int = Field(None, title='屏幕宽度',description='')
device_model: str = Field(None, title='设备型号',description='')
app_version: str = Field(None, title='APP 版本',description='')
bundle_id: str = Field(None, title='APP包名',description='')
lib: str = Field(None, title='SDK 类型',description='')
lib_version: str = Field(None, title='SDK 版本',description='')
network_type: str = Field(None, title='网络状态',description='')
carrier: str = Field(None, title='网络运营商',description='')
browser: str = Field(None, title='浏览器类型',description='')
browser_version: str = Field(None, title='浏览器版本',description='')
duration: int = Field(None, title='事件时长',description='')
url: str = Field(None, title='页面地址',description='')
url_path: str = Field(None, title='页面路径',description='')
referrer: str = Field(None, title='前向地址',description='')
referrer_host: str = Field(None, title='前向路径',description='')
title: str = Field(None, title='页面标题',description='')
screen_name: str = Field(None, title='页面名称',description='')
element_id: str = Field(None, title='元素 ID',description='')
element_type: str = Field(None, title='元素类型',description='')
resume_from_background: str = Field(None, title='是否从后台唤醒',description='')
element_selector: str = Field(None, title='元素选择器',description='')
element_position: str = Field(None, title='元素位置',description='')
element_content: str = Field(None, title='元素内容',description='')
scene: str = Field(None, title='场景值',description='')
mp_platform: str = Field(None, title='小程序平台',description='')
app_crashed_reason: str = Field(None, title='异常信息',description='')
zone_offset: str = Field(None, title='时区偏移',description='')
user_id: str = Field(..., title='用户唯一 ID',description='')
account_id: str = Field(..., title='账户 ID', description='')
distinct_id: str = Field(..., title='访客 ID',description='')
event_name: str = Field(..., title='事件名称',description='')
# 事件
app_id: str = Field(None, description='')
event_time: datetime = Field(None,title='事件时间', description='')
server_time: datetime = Field(None,title='服务端时间', description='')
def dict(self, **kwargs):
res = super().dict(**kwargs)
return {'#' + k: v for k, v in res.items() if v is not None}
class Config:
alias_generator = to_alias
class EventModel(Base):
preset: Preset = Field(..., title='系统属性')

View File

@ -1,66 +0,0 @@
from datetime import datetime
from pydantic import BaseModel, Field
from .base import Base, IP4, to_alias
__all__ = ('UserModel',)
class Preset(BaseModel):
ip: IP4 = Field(None, title='ipv4', description='不传该字段默认使用源ip')
country: str = Field(None, title='国家', description='')
country_code: str = Field(None, title='国家代码', description='')
province: str = Field(None, title='省份', description='')
city: str = Field(None, title='城市', description='')
os_version: str = Field(None, title='操作系统版本', description='')
manufacturer: str = Field(None, title='设备制造商', description='')
os: str = Field(None, title='操作系统', description='')
device_id: str = Field(None, title='设备 ID', description='')
screen_height: int = Field(None, title='屏幕高度', description='')
screen_width: int = Field(None, title='屏幕宽度', description='')
device_model: str = Field(None, title='设备型号', description='')
app_version: str = Field(None, title='APP 版本', description='')
bundle_id: str = Field(None, title='APP包名', description='')
lib: str = Field(None, title='SDK 类型', description='')
lib_version: str = Field(None, title='SDK 版本', description='')
network_type: str = Field(None, title='网络状态', description='')
carrier: str = Field(None, title='网络运营商', description='')
browser: str = Field(None, title='浏览器类型', description='')
browser_version: str = Field(None, title='浏览器版本', description='')
duration: int = Field(None, title='事件时长', description='')
url: str = Field(None, title='页面地址', description='')
url_path: str = Field(None, title='页面路径', description='')
referrer: str = Field(None, title='前向地址', description='')
referrer_host: str = Field(None, title='前向路径', description='')
title: str = Field(None, title='页面标题', description='')
screen_name: str = Field(None, title='页面名称', description='')
element_id: str = Field(None, title='元素 ID', description='')
element_type: str = Field(None, title='元素类型', description='')
resume_from_background: str = Field(None, title='是否从后台唤醒', description='')
element_selector: str = Field(None, title='元素选择器', description='')
element_position: str = Field(None, title='元素位置', description='')
element_content: str = Field(None, title='元素内容', description='')
scene: str = Field(None, title='场景值', description='')
mp_platform: str = Field(None, title='小程序平台', description='')
app_crashed_reason: str = Field(None, title='异常信息', description='')
zone_offset: str = Field(None, title='时区偏移', description='')
user_id: str = Field(..., title='用户唯一 ID', description='')
account_id: str = Field(..., title='账户 ID', description='')
distinct_id: str = Field(..., title='访客 ID', description='')
# event_name: str = Field(..., title='事件名称',description='')
# 用户
server_time: datetime = Field(None, title='服务端时间', description='')
def dict(self, **kwargs):
res = super().dict(**kwargs)
return {'#' + k: v for k, v in res.items() if v is not None}
class Config:
alias_generator = to_alias
class UserModel(Base):
preset: Preset = Field(..., title='系统属性')

9
output/__init__.py Normal file
View File

@ -0,0 +1,9 @@
from kafka_p import *
from settings import settings
def output_factory(who):
f = {
'kafka': ToKafka(**settings.KAFKA_CONF)
}
return f.get(who)

7
output/base.py Normal file
View File

@ -0,0 +1,7 @@
import abc
class BaseOutput(metaclass=abc.ABCMeta):
@abc.abstractmethod
def send(self, msg: dict):
pass

34
output/kafka_p.py Normal file
View File

@ -0,0 +1,34 @@
import random
from kafka import KafkaProducer
from .base import BaseOutput
__all__ = ('ToKafka',)
class ToKafka(BaseOutput):
"""
将数据发送到kafka
注意 减少不必要的查询 分区固定设置16个
"""
def __init__(self, conf):
self.__topic_name = None
self.__producer = KafkaProducer(**conf)
self.partition = 15
@property
def topic_name(self):
return self.__topic_name
@topic_name.setter
def topic_name(self, topic_name):
self.__topic_name = topic_name
# self.__producer.partitions_for(topic_name)
def send(self, msg):
# msg="""{"#type": "user_set", "#ip": "192.168.1.1", "#time": "2021-04-01 17:54:28.084", "properties": {"herostate": [["fffgsa", 2, 3, 4, 5], ["ddd", 4, 5, 6, 8]], "lv": 60, "#user_id": "fvdsvgdsf"}, "#distinct_id": "ABCDEF123456", "#account_id": "TA10001"}"""
try:
self.__producer.send(self.__topic_name, msg, partition=random.randint(0, self.partition))
except Exception as e:
print(e)

View File

@ -1,23 +0,0 @@
import asyncio
from fastapi import APIRouter, Request
from handler_data import HandlerEvent
router = APIRouter()
from models import EventModel
@router.post("/event/")
async def event(request: Request, item: EventModel):
item.preset.ip = item.preset.ip or request.client.host
ta = getattr(request.app.state.ta, item.act)
# 将不同游戏发送到不同 topic_name
request.app.state.ta.consumer.topic_name = item.game
rdb = request.app.state.redis
await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerEvent.handler_link))
properties = item.dict()['properties']
ta(item.preset.distinct_id, item.preset.account_id, item.preset.event_name, properties)
results = {"code": 0, 'msg': 'ok'}
return results

View File

@ -1,58 +1,34 @@
import asyncio import asyncio
import hashlib import json
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
from pydantic import BaseModel, validator
from handler_data import HandlerUser, HandlerEvent from common import *
from settings import settings from models import DataModel
router = APIRouter() router = APIRouter()
class Item(BaseModel):
# sign = md5(game+act+ts+salt)
distinct_id: str
game: str
account_id: str
act: str
event_name: str = None
properties: dict
ts: int
sign: str
@validator('sign')
def sign_validator(cls, v: str, values: dict):
s = f'{values.get("game")}{values.get("act", "")}{values.get("ts", "")}{settings.SALT}'
if hashlib.md5(s.encode()).hexdigest() == v:
return v
raise ValueError(f'签名 {hashlib.md5(s.encode()).hexdigest()}')
@router.post("/point/") @router.post("/point/")
async def point(request: Request, item: Item): async def point(request: Request, items: DataModel):
ip = request.client.host # 还原字段名 和 组装数据
ta = getattr(request.app.state.ta, item.act) data_list = []
# 将不同游戏发送到不同 topic_name public_data = restore_field(items.public)
request.app.state.ta.consumer.topic_name = item.game
rdb = request.app.state.redis rdb = request.app.state.redis
if ta and item.event_name and item.act == 'track': data_factory = request.app.state.data_factory
await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerEvent.handler_link)) output_factory = request.app.state.output_factory
await track(ta, item) for item in items.data:
else: data = restore_field(item)
await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerUser.handler_link)) sign = data.pop('sign')
await user_set(ta, item) # 验签
results = {"code": 0, 'msg': 'ok','ip':ip} if not check_sign(sign, public_data, data):
return results return {"code": -1, 'msg': '签名错误'}
data.update(public_data)
data_list.append(data)
async def track(ta, item):
ta(item.distinct_id, item.account_id, item.event_name, item.properties)
async def user_set(ta, item):
ta(item.distinct_id, item.account_id, item.properties)
for item in data_list:
await asyncio.gather(*map(lambda o: asyncio.create_task(o(request, rdb, item)), data_factory.handler_link))
msg = data_factory.format_data(item)
output_factory.send(msg)
return {"code": 0, 'msg': 'ok'}

View File

@ -1,22 +0,0 @@
import asyncio
from fastapi import APIRouter, Request
from handler_data import HandlerUser
router = APIRouter()
from models import UserModel
@router.post("/user/")
async def user(request: Request, item: UserModel):
item.preset.ip = item.preset.ip or request.client.host
ta = getattr(request.app.state.ta, item.act)
# 将不同游戏发送到不同 topic_name
request.app.state.ta.consumer.topic_name = item.game
rdb = request.app.state.redis
await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerUser.handler_link))
properties = item.dict()['properties']
ta(item.preset.distinct_id, item.preset.account_id, properties)
results = {"code": 0, 'msg': 'ok'}
return results

View File

@ -1,4 +1,103 @@
import json
class Config: class Config:
FIELD_MAP = {
"x01": "user_id",
"x02": "account_id",
"x03": "distinct_id",
"x04": "event_name",
"x05": "server_time",
"a01": "ip",
"a02": "country",
"a03": "country_code",
"a04": "province",
"a05": "city",
"a06": "os_version",
"a07": "manufacturer",
"a08": "os",
"a09": "device_id",
"a10": "screen_height",
"a11": "screen_width",
"a12": "device_model",
"a13": "app_version",
"a14": "bundle_id",
"a15": "lib",
"a16": "lib_version",
"a17": "network_type",
"a18": "carrier",
"a19": "browser",
"a20": "browser_version",
"a21": "duration",
"a22": "url",
"a23": "url_path",
"a24": "referrer",
"a25": "referrer_host",
"a26": "title",
"a27": "screen_name",
"a28": "element_id",
"a29": "element_type",
"a30": "resume_from_background",
"a31": "element_selector",
"a32": "element_position",
"a33": "element_content",
"a34": "scene",
"a35": "mp_platform",
"a36": "app_crashed_reason",
"a37": "zone_offset",
"b01": "app_id",
"b06": "event_time"
}
TA_MAP = {
"user_id": "#user_id",
"account_id": "#account_id",
"distinct_id": "#distinct_id",
"event_name": "#event_name",
"server_time": "#server_time",
"ip": "#ip",
"country": "#country",
"country_code": "#country_code",
"province": "#province",
"city": "#city",
"os_version": "#os_version",
"manufacturer": "#manufacturer",
"os": "#os",
"device_id": "#device_id",
"screen_height": "#screen_height",
"screen_width": "#screen_width",
"device_model": "#device_model",
"app_version": "#app_version",
"bundle_id": "#bundle_id",
"lib": "#lib",
"lib_version": "#lib_version",
"network_type": "#network_type",
"carrier": "#carrier",
"browser": "#browser",
"browser_version": "#browser_version",
"duration": "#duration",
"url": "#url",
"url_path": "#url_path",
"referrer": "#referrer",
"referrer_host": "#referrer_host",
"title": "#title",
"screen_name": "#screen_name",
"element_id": "#element_id",
"element_type": "#element_type",
"resume_from_background": "#resume_from_background",
"element_selector": "#element_selector",
"element_position": "#element_position",
"element_content": "#element_content",
"scene": "#scene",
"mp_platform": "#mp_platform",
"app_crashed_reason": "#app_crashed_reason",
"zone_offset": "#zone_offset",
"app_id": "#app_id",
"event_time": "#event_time"
}
TA_OUTER = {'#time', '#ip', '#type', '#distinct_id', '#account_id', 'event_name'}
REDIS_CONF = { REDIS_CONF = {
'address': ('192.168.0.161', 6379), 'address': ('192.168.0.161', 6379),
'password': 'd1Gh*zp5', 'password': 'd1Gh*zp5',
@ -7,7 +106,7 @@ class Config:
SALT = '0r4X00mH' SALT = '0r4X00mH'
KAFKA_CONF = { KAFKA_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_serializer': lambda v: v.encode('utf-8'), 'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
} }

View File

@ -1,930 +0,0 @@
# encoding:utf-8
from __future__ import unicode_literals
import datetime
import gzip
import json
import os
import random
import re
import threading
import time
import uuid
import requests
from kafka import KafkaProducer
from requests import ConnectionError
try:
import queue
from urllib.parse import urlparse
except ImportError:
import Queue as queue
from urlparse import urlparse
try:
isinstance("", basestring)
def is_str(s):
return isinstance(s, basestring)
except NameError:
def is_str(s):
return isinstance(s, str)
try:
isinstance(1, long)
def is_int(n):
return isinstance(n, int) or isinstance(n, long)
except NameError:
def is_int(n):
return isinstance(n, int)
try:
from enum import Enum
ROTATE_MODE = Enum('ROTATE_MODE', ('DAILY', 'HOURLY'))
except ImportError:
class ROTATE_MODE(object):
DAILY = 0
HOURLY = 1
class TGAException(Exception):
pass
class TGAIllegalDataException(TGAException):
"""数据格式异常
在发送的数据格式有误时SDK 会抛出此异常用户应当捕获并处理.
"""
pass
class TGANetworkException(TGAException):
"""网络异常
在因为网络或者不可预知的问题导致数据无法发送时SDK会抛出此异常用户应当捕获并处理.
"""
pass
__version__ = '1.6.0'
class TGAnalytics(object):
"""TGAnalytics 实例是发送事件数据和用户属性数据的关键实例
"""
__NAME_PATTERN = re.compile(r"^(#[a-z][a-z0-9_]{0,49})|([a-z][a-z0-9_]{0,50})$", re.I)
def __init__(self, consumer, enable_uuid=False):
"""创建一个 TGAnalytics 实例
TGAanlytics 需要与指定的 Consumer 一起使用可以使用以下任何一种:
- LoggingConsumer: 批量实时写本地文件并与 LogBus 搭配
- BatchConsumer: 批量实时地向TA服务器传输数据同步阻塞不需要搭配传输工具
- AsyncBatchConsumer: 批量实时地向TA服务器传输数据异步非阻塞不需要搭配传输工具
- DebugConsumer: 逐条发送数据并对数据格式做严格校验
Args:
consumer: 指定的 Consumer
"""
self.__consumer = consumer
self.__enableUuid = enable_uuid
self.__super_properties = {}
self.clear_super_properties()
@property
def consumer(self):
"""
用了更换 kafka topic_name
:return:
"""
return self.__consumer
def user_set(self, distinct_id=None, account_id=None, properties=None):
"""设置用户属性
对于一般的用户属性您可以调用 user_set 来进行设置使用该接口上传的属性将会覆盖原有的属性值如果之前不存在该用户属性
则会新建该用户属性类型与传入属性的类型一致.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
properties: dict 类型的用户属性
"""
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_set', properties_add=properties)
def user_unset(self, distinct_id=None, account_id=None, properties=None):
"""
删除某个用户的用户属性
:param distinct_id:
:param account_id:
:param properties:
"""
if isinstance(properties, list):
properties = dict((key, 0) for key in properties)
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_unset', properties_add=properties)
def user_setOnce(self, distinct_id=None, account_id=None, properties=None):
"""设置用户属性, 不覆盖已存在的用户属性
如果您要上传的用户属性只要设置一次则可以调用 user_setOnce 来进行设置当该属性之前已经有值的时候将会忽略这条信息.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
properties: dict 类型的用户属性
"""
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_setOnce', properties_add=properties)
def user_add(self, distinct_id=None, account_id=None, properties=None):
"""对指定的数值类型的用户属性进行累加操作
当您要上传数值型的属性时您可以调用 user_add 来对该属性进行累加操作. 如果该属性还未被设置则会赋值0后再进行计算.
可传入负值等同于相减操作.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
properties: 数值类型的用户属性
"""
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_add', properties_add=properties)
def user_append(self, distinct_id=None, account_id=None, properties=None):
"""追加一个用户的某一个或者多个集合类型
Args:
distinct_id: 访客 ID
account_id: 账户 ID
properties: 集合
"""
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_append', properties_add=properties)
def user_del(self, distinct_id=None, account_id=None):
"""删除用户
如果您要删除某个用户可以调用 user_del 将该用户删除调用此函数后将无法再查询该用户的用户属性, 但该用户产生的事件仍然可以被查询到.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
"""
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_del')
def track(self, distinct_id=None, account_id=None, event_name=None, properties=None):
"""发送事件数据
您可以调用 track 来上传事件建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头可包含数字字母和下划线_
长度最大为 50 个字符对字母大小写不敏感. 事件的属性是一个 dict 对象其中每个元素代表一个属性.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
event_name: 事件名称
properties: 事件属性
Raises:
TGAIllegalDataException: 数据格式错误时会抛出此异常
"""
all_properties = self._public_track_add(event_name)
if properties:
all_properties.update(properties)
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track', event_name=event_name,
properties_add=all_properties)
def track_update(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None):
"""发送可更新的事件数据
您可以调用 track_update 来上传可更新的事件建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头可包含数字字母和下划线_
长度最大为 50 个字符对字母大小写不敏感. 事件的属性是一个 dict 对象其中每个元素代表一个属性.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
event_name: 事件名称
event_id: 事件唯一ID
properties: 事件属性
Raises:
TGAIllegalDataException: 数据格式错误时会抛出此异常
"""
all_properties = self._public_track_add(event_name)
if properties:
all_properties.update(properties)
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_update', event_name=event_name,
event_id=event_id, properties_add=all_properties)
def track_overwrite(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None):
"""发送可覆盖的事件数据
您可以调用 track_overwrite 来上传可全部覆盖的事件建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头可包含数字字母和下划线_
长度最大为 50 个字符对字母大小写不敏感. 事件的属性是一个 dict 对象其中每个元素代表一个属性.
Args:
distinct_id: 访客 ID
account_id: 账户 ID
event_name: 事件名称
event_id: 事件唯一ID
properties: 事件属性
Raises:
TGAIllegalDataException: 数据格式错误时会抛出此异常
"""
all_properties = self._public_track_add(event_name)
if properties:
all_properties.update(properties)
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_overwrite', event_name=event_name,
event_id=event_id, properties_add=all_properties)
def flush(self):
"""立即提交数据到相应的接收端
"""
self.__consumer.flush()
def close(self):
"""关闭并退出 sdk
请在退出前调用本接口以避免缓存内的数据丢失
"""
self.__consumer.close()
def _public_track_add(self, event_name):
if not is_str(event_name):
raise TGAIllegalDataException('a string type event_name is required for track')
all_properties = {
'#lib': 'tga_python_sdk',
'#lib_version': __version__,
}
all_properties.update(self.__super_properties)
return all_properties
pass
def __add(self, distinct_id, account_id, send_type, event_name=None, event_id=None, properties_add=None):
if distinct_id is None and account_id is None:
raise TGAException("Distinct_id and account_id must be set at least one")
if properties_add:
properties = properties_add.copy()
else:
properties = {}
data = {
'#type': send_type
}
if "#ip" in properties.keys():
data['#ip'] = properties.get("#ip")
del (properties['#ip'])
if "#first_check_id" in properties.keys():
data['#first_check_id'] = properties.get("#first_check_id")
del (properties['#first_check_id'])
# 只支持UUID标准格式xxxxxxxx - xxxx - xxxx - xxxx - xxxxxxxxxxxx
if "#uuid" in properties.keys():
data['#uuid'] = str(properties['#uuid'])
del (properties['#uuid'])
elif self.__enableUuid:
data['#uuid'] = str(uuid.uuid1())
if "#app_id" in properties.keys():
data['#app_id'] = properties.get("#app_id")
del (properties['#app_id'])
self.__assert_properties(send_type, properties)
td_time = properties.get("#time")
data['#time'] = td_time
del (properties['#time'])
data['properties'] = properties
if event_name is not None:
data['#event_name'] = event_name
if event_id is not None:
data['#event_id'] = event_id
if distinct_id is not None:
data['#distinct_id'] = distinct_id
if account_id is not None:
data['#account_id'] = account_id
self.__consumer.add(json.dumps(data))
def __assert_properties(self, action_type, properties):
if properties is not None:
if "#time" not in properties.keys():
properties['#time'] = datetime.datetime.now()
else:
try:
time_temp = properties.get('#time')
if isinstance(time_temp, datetime.datetime) or isinstance(time_temp, datetime.date):
pass
else:
raise TGAIllegalDataException('Value of #time should be datetime.datetime or datetime.date')
except Exception as e:
raise TGAIllegalDataException(e)
for key, value in properties.items():
if not is_str(key):
raise TGAIllegalDataException("Property key must be a str. [key=%s]" % str(key))
if value is None:
continue
if not self.__NAME_PATTERN.match(key):
raise TGAIllegalDataException(
"type[%s] property key must be a valid variable name. [key=%s]" % (action_type, str(key)))
if not is_str(value) and not is_int(value) and not isinstance(value, float) \
and not isinstance(value, bool) \
and not isinstance(value, datetime.datetime) and not isinstance(value, datetime.date) \
and not isinstance(value, list):
raise TGAIllegalDataException(
"property value must be a str/int/float/bool/datetime/date/list. [value=%s]" % type(value))
if 'user_add' == action_type.lower() and not self.__number(value) and not key.startswith('#'):
raise TGAIllegalDataException('user_add properties must be number type')
if isinstance(value, datetime.datetime):
properties[key] = value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
elif isinstance(value, datetime.date):
properties[key] = value.strftime('%Y-%m-%d')
if isinstance(value, list):
i = 0
for lvalue in value:
if isinstance(lvalue, datetime.datetime):
value[i] = lvalue.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
i += 1
def __number(self, s):
if is_int(s):
return True
if isinstance(s, float):
return True
return False
def clear_super_properties(self):
"""删除所有已设置的事件公共属性
"""
self.__super_properties = {
'#lib': 'tga_python_sdk',
'#lib_version': __version__,
}
def set_super_properties(self, super_properties):
"""设置公共事件属性
公共事件属性是所有事件中的属性属性建议您在发送事件前先设置公共事件属性. track properties
super properties 有相同的 key track properties 会覆盖公共事件属性的值.
Args:
super_properties 公共属性
"""
self.__super_properties.update(super_properties)
if os.name == 'nt':
import msvcrt
def _lock(file_):
try:
save_pos = file_.tell()
file_.seek(0)
try:
msvcrt.locking(file_.fileno(), msvcrt.LK_LOCK, 1)
except IOError as e:
raise TGAException(e)
finally:
if save_pos:
file_.seek(save_pos)
except IOError as e:
raise TGAException(e)
def _unlock(file_):
try:
save_pos = file_.tell()
if save_pos:
file_.seek(0)
try:
msvcrt.locking(file_.fileno(), msvcrt.LK_UNLCK, 1)
except IOError as e:
raise TGAException(e)
finally:
if save_pos:
file_.seek(save_pos)
except IOError as e:
raise TGAException(e)
elif os.name == 'posix':
import fcntl
def _lock(file_):
try:
fcntl.flock(file_.fileno(), fcntl.LOCK_EX)
except IOError as e:
raise TGAException(e)
def _unlock(file_):
fcntl.flock(file_.fileno(), fcntl.LOCK_UN)
else:
raise TGAException("Python SDK is defined for NT and POSIX system.")
class _TAFileLock(object):
def __init__(self, file_handler):
self._file_handler = file_handler
def __enter__(self):
_lock(self._file_handler)
return self
def __exit__(self, t, v, tb):
_unlock(self._file_handler)
class LoggingConsumer(object):
"""数据批量实时写入本地文件
创建指定文件存放目录的 LoggingConsumer, 将数据使用 logging 库输出到指定路径. 同时需将 LogBus 的监听文件夹地址
设置为此处的地址即可使用LogBus进行数据的监听上传.
"""
_mutex = queue.Queue()
_mutex.put(1)
class _FileWriter(object):
_writers = {}
_writeMutex = queue.Queue()
_writeMutex.put(1)
@classmethod
def instance(cls, filename):
cls._writeMutex.get(block=True, timeout=None)
try:
if filename in cls._writers.keys():
result = cls._writers[filename]
result._count = result._count + 1
else:
result = cls(filename)
cls._writers[filename] = result
return result
finally:
cls._writeMutex.put(1)
def __init__(self, filename):
self._filename = filename
self._file = open(self._filename, 'a')
self._count = 1
def close(self):
LoggingConsumer._FileWriter._writeMutex.get(block=True, timeout=None)
try:
self._count = self._count - 1
if self._count == 0:
self._file.close()
del LoggingConsumer._FileWriter._writers[self._filename]
finally:
LoggingConsumer._FileWriter._writeMutex.put(1)
def is_valid(self, filename):
return self._filename == filename
def write(self, messages):
with _TAFileLock(self._file):
for message in messages:
self._file.write(message)
self._file.write('\n')
self._file.flush()
@classmethod
def construct_filename(cls, directory, date_suffix, file_size, file_prefix):
filename = file_prefix + ".log." + date_suffix \
if file_prefix is not None else "log." + date_suffix
if file_size > 0:
count = 0
file_path = directory + filename + "_" + str(count)
while os.path.exists(file_path) and cls.file_size_out(file_path, file_size):
count = count + 1
file_path = directory + filename + "_" + str(count)
return file_path
else:
return directory + filename
@classmethod
def file_size_out(cls, file_path, file_size):
fsize = os.path.getsize(file_path)
fsize = fsize / float(1024 * 1024)
if fsize >= file_size:
return True
return False
@classmethod
def unlock_logging_consumer(cls):
cls._mutex.put(1)
@classmethod
def lock_logging_consumer(cls):
cls._mutex.get(block=True, timeout=None)
def __init__(self, log_directory, log_size=0, buffer_size=8192, rotate_mode=ROTATE_MODE.DAILY, file_prefix=None):
"""创建指定日志文件目录的 LoggingConsumer
Args:
log_directory: 日志保存目录
log_size: 单个日志文件的大小, 单位 MB, log_size <= 0 表示不限制单个文件大小
buffer_size: 每次写入文件的大小, 单位 Byte, 默认 8K
rotate_mode: 日志切分模式默认按天切分
"""
if not os.path.exists(log_directory):
os.makedirs(log_directory)
self.log_directory = log_directory # log文件保存的目录
self.sdf = '%Y-%m-%d-%H' if rotate_mode == ROTATE_MODE.HOURLY else '%Y-%m-%d'
self.suffix = datetime.datetime.now().strftime(self.sdf)
self._fileSize = log_size # 单个log文件的大小
if not self.log_directory.endswith("/"):
self.log_directory = self.log_directory + "/"
self._buffer = []
self._buffer_size = buffer_size
self._file_prefix = file_prefix
self.lock_logging_consumer()
filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize,
self._file_prefix)
self._writer = LoggingConsumer._FileWriter.instance(filename)
self.unlock_logging_consumer()
def add(self, msg):
messages = None
self.lock_logging_consumer()
self._buffer.append(msg)
if len(self._buffer) > self._buffer_size:
messages = self._buffer
date_suffix = datetime.datetime.now().strftime(self.sdf)
if self.suffix != date_suffix:
self.suffix = date_suffix
filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize,
self._file_prefix)
if not self._writer.is_valid(filename):
self._writer.close()
self._writer = LoggingConsumer._FileWriter.instance(filename)
self._buffer = []
if messages:
self._writer.write(messages)
self.unlock_logging_consumer()
def flush_with_close(self, is_close):
messages = None
self.lock_logging_consumer()
if len(self._buffer) > 0:
messages = self._buffer
filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize,
self._file_prefix)
if not self._writer.is_valid(filename):
self._writer.close()
self._writer = LoggingConsumer._FileWriter.instance(filename)
self._buffer = []
if messages:
self._writer.write(messages)
if is_close:
self._writer.close()
self.unlock_logging_consumer()
def flush(self):
self.flush_with_close(False)
def close(self):
self.flush_with_close(True)
class BatchConsumer(object):
"""同步、批量地向 TA 服务器传输数据
通过指定接收端地址和 APP ID可以同步的向 TA 服务器传输数据. Consumer 不需要搭配传输工具,
但是存在网络不稳定等原因造成数据丢失的可能因此不建议在生产环境中使用.
触发上报的时机为以下条件满足其中之一的时候:
1. 数据条数大于预定义的最大值, 默认为 20
2. 数据发送间隔超过预定义的最大时间, 默认为 3
"""
_batchlock = threading.RLock()
_cachelock = threading.RLock()
def __init__(self, server_uri, appid, batch=20, timeout=30000, interval=3, compress=True, maxCacheSize=50):
"""创建 BatchConsumer
Args:
server_uri: 服务器的 URL 地址
appid: 项目的 APP ID
batch: 指定触发上传的数据条数, 默认为 20 , 最大 200
timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms
interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3
"""
self.__interval = interval
self.__batch = min(batch, 200)
self.__message_channel = []
self.__maxCacheSize = maxCacheSize
self.__cache_buffer = []
self.__last_flush = time.time()
server_url = urlparse(server_uri)
self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, timeout)
self.__http_service.compress = compress
def add(self, msg):
self._batchlock.acquire()
try:
self.__message_channel.append(msg)
finally:
self._batchlock.release()
if len(self.__message_channel) >= self.__batch \
or len(self.__cache_buffer) > 0:
self.flush_once()
def flush(self, throw_exception=True):
while len(self.__cache_buffer) > 0 or len(self.__message_channel) > 0:
try:
self.flush_once(throw_exception)
except TGAIllegalDataException:
continue
def flush_once(self, throw_exception=True):
if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0:
return
self._cachelock.acquire()
self._batchlock.acquire()
try:
try:
if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0:
return
if len(self.__cache_buffer) == 0 or len(self.__message_channel) >= self.__batch:
self.__cache_buffer.append(self.__message_channel)
self.__message_channel = []
finally:
self._batchlock.release()
msg = self.__cache_buffer[0]
self.__http_service.send('[' + ','.join(msg) + ']', str(len(msg)))
self.__last_flush = time.time()
self.__cache_buffer = self.__cache_buffer[1:]
except TGANetworkException as e:
if throw_exception:
raise e
except TGAIllegalDataException as e:
self.__cache_buffer = self.__cache_buffer[1:]
if throw_exception:
raise e
finally:
self._cachelock.release()
def close(self):
self.flush()
pass
class AsyncBatchConsumer(object):
"""异步、批量地向 TA 服务器发送数据的
AsyncBatchConsumer 使用独立的线程进行数据发送当满足以下两个条件之一时触发数据上报:
1. 数据条数大于预定义的最大值, 默认为 20
2. 数据发送间隔超过预定义的最大时间, 默认为 3
"""
def __init__(self, server_uri, appid, interval=3, flush_size=20, queue_size=100000):
"""创建 AsyncBatchConsumer
Args:
server_uri: 服务器的 URL 地址
appid: 项目的 APP ID
interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3
flush_size: 队列缓存的阈值超过此值将立即进行发送
queue_size: 缓存队列的大小
"""
server_url = urlparse(server_uri)
self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, 30000)
self.__batch = flush_size
self.__queue = queue.Queue(queue_size)
# 初始化发送线程
self.__flushing_thread = self._AsyncFlushThread(self, interval)
self.__flushing_thread.daemon = True
self.__flushing_thread.start()
def add(self, msg):
try:
self.__queue.put_nowait(msg)
except queue.Full as e:
raise TGANetworkException(e)
if self.__queue.qsize() > self.__batch:
self.flush()
def flush(self):
self.__flushing_thread.flush()
def close(self):
self.__flushing_thread.stop()
while not self.__queue.empty():
self._perform_request()
def _perform_request(self):
"""同步的发送数据
仅用于内部调用, 用户不应当调用此方法.
"""
flush_buffer = []
while len(flush_buffer) < self.__batch:
try:
flush_buffer.append(str(self.__queue.get_nowait()))
except queue.Empty:
break
if len(flush_buffer) > 0:
for i in range(3): # 网络异常情况下重试 3 次
try:
self.__http_service.send('[' + ','.join(flush_buffer) + ']', str(len(flush_buffer)))
return True
except TGANetworkException:
pass
except TGAIllegalDataException:
break
class _AsyncFlushThread(threading.Thread):
def __init__(self, consumer, interval):
threading.Thread.__init__(self)
self._consumer = consumer
self._interval = interval
self._stop_event = threading.Event()
self._finished_event = threading.Event()
self._flush_event = threading.Event()
def flush(self):
self._flush_event.set()
def stop(self):
"""停止线程
退出时需调用此方法以保证线程安全结束.
"""
self._stop_event.set()
self._finished_event.wait()
def run(self):
while True:
# 如果 _flush_event 标志位为 True或者等待超过 _interval 则继续执行
self._flush_event.wait(self._interval)
self._consumer._perform_request()
self._flush_event.clear()
# 发现 stop 标志位时安全退出
if self._stop_event.isSet():
break
self._finished_event.set()
def _gzip_string(data):
try:
return gzip.compress(data)
except AttributeError:
import StringIO
buf = StringIO.StringIO()
fd = gzip.GzipFile(fileobj=buf, mode="w")
fd.write(data)
fd.close()
return buf.getvalue()
class _HttpServices(object):
"""内部类,用于发送网络请求
指定接收端地址和项目 APP ID, 实现向接收端上传数据的接口. 发送前将数据默认使用 Gzip 压缩,
"""
def __init__(self, server_uri, appid, timeout=30000):
self.url = server_uri
self.appid = appid
self.timeout = timeout
self.compress = True
def send(self, data, length):
"""使用 Requests 发送数据给服务器
Args:
data: 待发送的数据
length
Raises:
TGAIllegalDataException: 数据错误
TGANetworkException: 网络错误
"""
headers = {'appid': self.appid, 'TA-Integration-Type': 'python-sdk', 'TA-Integration-Version': __version__,
'TA-Integration-Count': length}
try:
compress_type = 'gzip'
if self.compress:
data = _gzip_string(data.encode("utf-8"))
else:
compress_type = 'none'
data = data.encode("utf-8")
headers['compress'] = compress_type
response = requests.post(self.url, data=data, headers=headers, timeout=self.timeout)
if response.status_code == 200:
responseData = json.loads(response.text)
if responseData["code"] == 0:
return True
else:
raise TGAIllegalDataException("Unexpected result code: " + str(responseData["code"]))
else:
raise TGANetworkException("Unexpected Http status code " + str(response.status_code))
except ConnectionError as e:
time.sleep(0.5)
raise TGANetworkException("Data transmission failed due to " + repr(e))
class DebugConsumer(object):
"""逐条、同步的发送数据给接收服务器
服务端会对数据进行严格校验当某个属性不符合规范时整条数据都不会入库. 当数据格式错误时抛出包含详细原因的异常信息.
建议首先使用此 Consumer 来调试埋点数据.
"""
def __init__(self, server_uri, appid, timeout=30000, write_data=True):
"""创建 DebugConsumer
Args:
server_uri: 服务器的 URL 地址
appid: 项目的 APP ID
timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms
"""
server_url = urlparse(server_uri)
debug_url = server_url._replace(path='/data_debug')
self.__server_uri = debug_url.geturl()
self.__appid = appid
self.__timeout = timeout
self.__writer_data = write_data
def add(self, msg):
try:
dry_run = 0
if not self.__writer_data:
dry_run = 1
response = requests.post(self.__server_uri,
data={'source': 'server', 'appid': self.__appid, 'data': msg, 'dryRun': dry_run},
timeout=self.__timeout)
if response.status_code == 200:
responseData = json.loads(response.text)
if responseData["errorLevel"] == 0:
return True
else:
print("Unexpected result : \n %s" % response.text)
else:
raise TGANetworkException("Unexpected http status code: " + str(response.status_code))
except ConnectionError as e:
time.sleep(0.5)
raise TGANetworkException("Data transmission failed due to " + repr(e))
def flush(self, throw_exception=True):
pass
def close(self):
pass
class ToKafka(object):
"""
将数据发送到kafka
注意 减少不必要的查询 分区固定设置16个
"""
def __init__(self, conf):
self.__topic_name = None
self.__producer = KafkaProducer(**conf)
@property
def topic_name(self):
return self.__topic_name
@topic_name.setter
def topic_name(self, topic_name):
self.__topic_name = topic_name
# self.__producer.partitions_for(topic_name)
def add(self, msg):
try:
self.__producer.send(self.__topic_name, msg, partition=random.randint(0, 15))
except Exception as e:
print(e)
def flush(self, throw_exception=True):
pass
def close(self):
pass