import time import arrow from settings import settings class TaHandler: handler_link = [] def __init__(self, func): TaHandler.handler_link.append(func) @staticmethod def format_data(data: dict): msg = dict() for k in settings.TA_OUTER: v = data.get(k) if v is not None: msg[k] = data.pop(k) msg['properties'] = data.pop('properties') # type:dict if msg['#type'] in ('track', 'track_update', 'track_overwrite'): msg['properties'].update(data) if 'event_id' in msg['properties']: msg['#event_id'] = msg['properties'].pop('event_id') elif msg['#type'] == 'user': msg['#type'] = 'user_' + msg['#event_name'] if "#event_name" in msg: del msg["#event_name"] if "#event_time" in msg: del msg["#event_time"] return msg @TaHandler async def add_ip(request, rdb, data): """ 添加源ip :param request: :param rdb: :param data: :return: """ ip = request.client.host data['#ip'] = data.get('#ip') or ip @TaHandler async def set_appid(request, rdb, data): data['#app_id'] = settings.APPID_MAP.get(data['#app_id'], data['#app_id']) @TaHandler async def to_date(request, rdb, data): """ 时间戳转日期字符串 :param request: :param rdb: :param data: :return: """ data['#time'] = arrow.get(data['#time'], tzinfo='Asia/Shanghai').strftime('%Y-%m-%d %H:%M:%S') data['#event_time'] = data['#time'] @TaHandler async def order_idx(request, rdb, data): """ 充值编号 :param request: :param rdb: :param data: :return: """ # 条件 if data.get('#type') == 'track_update' and data.get('#event_name') == 'order_event': # key = $appid.$binduid.orders v : order_id order_complete_time = data.get('properties', {}).get('order_complete_time') event_id = data.get('properties', {}).get('event_id') order_id = data.get('properties', {}).get("order_id") account_id = data.get('#account_id') if order_complete_time and account_id and order_id and event_id: del data['properties']['order_id'] key = f'{data["#app_id"]}_{account_id}_orders' v = await rdb.execute('sadd', key, order_id) # 订单 编号 if v == 1: data['properties']['order_num'] = await rdb.execute('scard', key) @TaHandler async def role_idx(request, rdb, data): """ 滚服,标记角色编号;标记新设备 :param request: :param rdb: :param data: :return: """ # 条件 if data.get('#type') == 'track' and data.get('#event_name') == 'create_account': # key = $appid.$binduid v : uid account = data.get("properties", {}).get("binduid") account_id = data.get("#account_id") distinct_id = data.get("#distinct_id") if account and account_id: key = f'{data["#app_id"]}_{account}_roles' v = await rdb.execute('sadd', key, account_id) # 新角色 编号 if v == 1: data['properties']['role_idx'] = await rdb.execute('scard', key) device_key = f'{data["#app_id"]}_{distinct_id}' res = await rdb.execute('set', device_key, 1,'Nx') if res == b'OK': data['properties']['is_new_device'] = 1 # sadd O(n) # if distinct_id: # key = f'{data["#app_id"]}_devices' # v = await rdb.execute('sadd', key, distinct_id) # if v: # data['properties']['is_new_device'] = 1 # else: # data['properties']['is_new_device'] = 0 # @TaHandler # async def device_label(request, rdb, data): # """ # 标记新设备 # :param request: # :param rdb: # :param data: # :return: # """ # # 条件 # if data.get('#type') == 'user' and data.get('#event_name') == 'add': # v = await rdb.execute('sadd', f'{data["#app_id"]}.devices', data.get('#device_id', '')) # if v: # data['properties']['is_new_device'] = 1 # else: # data['properties']['is_new_device'] = 0