From 6cb5391eb0486f283aec9cdc3cceea5bc85ba276 Mon Sep 17 00:00:00 2001 From: kf_wuhao <15392746632@qq.com> Date: Fri, 8 Jan 2021 15:41:28 +0800 Subject: [PATCH] add summary3.py --- config.json | 3 +- task/summary3.py | 521 +++++++++++++++++++++++++++++++++++++++++++++++ task/task.py | 10 +- 3 files changed, 529 insertions(+), 5 deletions(-) create mode 100644 task/summary3.py diff --git a/config.json b/config.json index b7131c5..b22c5f1 100644 --- a/config.json +++ b/config.json @@ -43,7 +43,8 @@ "source_coll": "user", "dest_coll": "", "task_name": "summary3", - "freq": "D" + "freq": "D", + "is_inc": true }, "sync_user": { "source_coll": "user", diff --git a/task/summary3.py b/task/summary3.py new file mode 100644 index 0000000..05481e7 --- /dev/null +++ b/task/summary3.py @@ -0,0 +1,521 @@ +import time + +from pydantic import Field, BaseModel +import pandas as pd + +from .task import Task +from utils import * +import numpy as np + +# LTV 天数 +LTV_DAYS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, + 57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360] + +# 留存的天数 +RETAIN_DAYS = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, + 56, + 57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360] + + +def role_struct(cdate, owner, channel, platform): + data = { + 'cdate': cdate, + '_channel_name': channel, + '_owner_name': owner, + '_platform': platform, + 'new_role_list': [], + 'new_role': 0, + 'now_pay_role_list': [], + 'now_pay_role': 0, + 'role_now_login': 0, + 'pay_role_now_login': 0, + 'now_pay_money': 0, + } + + role_login_n = {'role_login_' + str(day): 0 for day in LTV_DAYS} + pay_role_login_n = {'pay_role_login_' + str(day): 0 for day in RETAIN_DAYS} + + role_all_money_n = {'role_all_money_' + str(day): 0 for day in LTV_DAYS} + + data.update(role_login_n) + data.update(pay_role_login_n) + data.update(role_all_money_n) + + return data + + +def device_struct(cdate, owner, channel, platform): + data = { + 'cdate': cdate, + '_channel_name': channel, + '_owner_name': owner, + '_platform': platform, + + 'new_device_list': [], + 'new_device': 0, + 'now_pay_device_list': [], + 'now_pay_device': 0, + 'device_now_login': 0, + + 'pay_device_now_login': 0, + 'now_pay_money': 0, + + } + + device_login_n = {'device_login_' + str(day): 0 for day in RETAIN_DAYS} + + pay_device_login_n = {'pay_device_login_' + str(day): 0 for day in RETAIN_DAYS} + + device_all_money_n = {'device_all_money_' + str(day): 0 for day in LTV_DAYS} + + data.update(device_login_n) + data.update(pay_device_login_n) + data.update(device_all_money_n) + + return data + + +def account_struct(cdate, owner, channel, platform): + data = { + 'cdate': cdate, + '_channel_name': channel, + '_owner_name': owner, + '_platform': platform, + 'new_account_list': [], + 'new_account': 0, + 'now_pay_account_list': [], + 'now_pay_account': 0, + 'account_now_login': 0, + 'pay_account_now_login': 0, + 'now_pay_money': 0, + + } + + account_login_n = {'account_login_' + str(day): 0 for day in RETAIN_DAYS} + pay_account_login_n = {'pay_account_login_' + str(day): 0 for day in RETAIN_DAYS} + account_all_money_n = {'account_all_money_' + str(day): 0 for day in LTV_DAYS} + + data.update(account_login_n) + data.update(pay_account_login_n) + data.update(account_all_money_n) + + return data + + +class HandlerWrapper: + handler_link = [] + + def __init__(self, func): + self.func = func + HandlerWrapper.handler_link.append(func) + + def __call__(self, *args, **kwargs): + self.func(*args, **kwargs) + + +class HandlerWrapperInc: + handler_link = [] + + def __init__(self, func): + self.func = func + HandlerWrapperInc.handler_link.append(func) + + def __call__(self, *args, **kwargs): + self.func(*args, **kwargs) + + +class Summary3(Task): + """ + Summary3 + """ + + handler_link = [] + + # 处理天游标 + def get_cursor(self): + if not self.cursor_st: + self.cursor_st = int(time.time()) - 86400 + super().get_cursor() + + def generate_cursor_time(self): + date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(), + pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq) + df = pd.DataFrame(index=date_index[:-1]) + df['st'] = df.index + df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()]) + return df + + class Model(BaseModel): + game_role_id: str = Field(..., title="角色id", alias='_game_role_id') + platform: str = Field(..., min_length=1, title="平台", alias='_platform') + channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name') + owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name') + channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid') + device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id') + district_server_id: int = Field(..., title="区服id", alias='_district_server_id') + role_create_time: int = Field(..., title="角色创建时间") + is_new_device: int = Field(None, title="新设备") + is_new_channel_uid: int = Field(None, title="新账号") + + @classmethod + def get_fields(cls): + return [v.alias for v in cls.__fields__.values()] + + def cleaning(self, cursor_list): + for cursor in cursor_list: # type:dict + for source_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 游标 {ts}') + # 这一天新用户 + where = { + 'role_create_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + }, + '_owner_name': {'$nin': ['dev', 'banshu', 'tishen']}, + '_platform': {'$exists': True}, + '_channel_name': {'$exists': True} + } + projection = self.Model.get_fields() + user_df = pd.DataFrame(self.local_db[self.source_coll].find(where, projection)) + + # self.handler_summary(user_df, ts['cursor_st']) + if self.is_inc: + self.handler_summary_inc(ts['cursor_st']) + + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et']) + + def handler_summary_inc(self, cdate): + params = (self, cdate) + list(map(lambda handler: handler(*params), HandlerWrapperInc.handler_link)) + + @HandlerWrapperInc + def handler_inc_init(self, cdate): + """ + 设置初始值 + """ + yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp()) + min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400 + + def update(cat): + cursor = self.local_db[f'summary_{cat}1'].find({'cdate': {'$gte': min_ts, '$lt': cdate}}) + for doc in cursor: + doc_date = doc['cdate'] + age_day = (cdate - doc_date) // 86400 + 1 + data = {f'{cat}_all_money_{age_day}': doc[f'{cat}_all_money_{age_day - 1}']} + self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) + self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) + + update('role') + update('account') + update('device') + + @HandlerWrapperInc + def handler_money_inc(self, cdate): + yesterday_ts = int(pd.Timestamp(cdate, unit='s', + tz=self.timezone).normalize().timestamp()) + + df = pd.DataFrame(self.local_db['summary_pay'].find({'cdate': yesterday_ts}, { + '_id': False, + '_game_role_id': True, + '_channel_uid': True, + '_device_id': True, + 'role_create_time': True, + })) + + if df.shape == (0, 0): + return + + role_list = list(df['_game_role_id'].unique()) + account_list = list(df['_channel_uid'].unique()) + device_list = list(df['_device_id'].unique()) + min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400 + + def update(cat, cat_list): + update_doc = dict() + for cid in cat_list: + data = self.local_db[f'summary_{cat}1'].find_one( + {'cdate': {'$gte': min_ts, '$lte': yesterday_ts}, f'new_{cat}_list': cid}, { + '_id': True, + 'cdate': True, + f'now_pay_{cat}_list': True, + }) + if not data: + continue + + update_doc[data['_id']] = data + # 添加新充值用户 + data[f'now_pay_{cat}_list'] = list(set(data[f'now_pay_{cat}_list']) | {cid}) + data[f'now_pay_{cat}'] = len(data[f'now_pay_{cat}_list']) + self.local_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data}) + self.remote_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data}) + # 计算累计充值 + for id_, doc in update_doc.items(): + pipeline = [ + { + '$match': { + "cdate": {'$gte': doc['cdate'], '$lte': cdate}, + f'_game_{cat}_id': {'$in': doc[f'now_pay_{cat}_list']}, + } + }, + {'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}} + ] + try: + sum_money = self.local_db['summary_pay'].aggregate(pipeline).next() + except StopIteration: + continue + age_day = (cdate - doc['cdate']) // 86400 + 1 + if sum_money: + data = {f'{cat}_all_money_{age_day}': sum_money['sum_money']} + self.local_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data}) + self.remote_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data}) + + update('role', role_list) + update('account', account_list) + update('device', device_list) + + @HandlerWrapperInc + def handler_login_inc(self, cdate): + yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp()) + + df = pd.DataFrame(self.local_db['summary_login'].find({'cdate': yesterday_ts}, { + '_id': False, + '_game_role_id': True, + '_channel_uid': True, + '_device_id': True, + 'role_create_time': True, + })) + + if df.shape == (0, 0): + return + df = df[df['role_create_time'] > 0] + df['role_cdate'] = df['role_create_time'].apply( + lambda x: int(pd.Timestamp(x, unit='s', tz=self.timezone).normalize().timestamp())) + + df.rename(columns={'_game_role_id': 'role_id', '_channel_uid': 'account_id', '_device_id': 'device_id'}, + inplace=True) + + def update(cat): + cursor = self.local_db[f'summary_{cat}1'].find( + {'cdate': role_cdate}, { + '_id': True, + f'now_pay_{cat}_list': True, + f'new_{cat}_list': True, + }) + for doc in cursor: + data = dict() + age_day = (cdate - role_cdate) // 86400 + 1 + n = df[df[f'{cat}_id'].isin(doc[f'now_pay_{cat}_list'])].shape[0] + if n: + data[f'pay_{cat}_login_{age_day}'] = n + + n = df[df[f'{cat}_id'].isin(doc[f'new_{cat}_list'])].shape[0] + if n: + data[f'{cat}_login_{age_day}'] = n + if data: + self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) + self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) + + for role_cdate in df['role_cdate'].unique(): + role_cdate = int(role_cdate) + update('role') + update('account') + update('device') + + def handler_summary(self, user_df, cdate): + # 日期-owner-channel-platform 相同为一个文档 + # + if user_df.shape == (0, 0): + return + user_group = user_df.groupby(['_owner_name', '_channel_name', '_platform']) + for group in user_group.groups: + df = user_group.get_group(group) + role_data = role_struct(cdate, *group) + device_data = device_struct(cdate, *group) + account_data = account_struct(cdate, *group) + params = (self, role_data, device_data, account_data, df, group, cdate) + list(map(lambda handler: handler(*params), HandlerWrapper.handler_link)) + + @HandlerWrapper + def handler_now(self, role_data, device_data, account_data, df, group, cdate): + + role_data['new_role_list'] = list(df['_game_role_id'].unique()) + role_data['new_role'] = len(role_data['new_role_list']) + + device_data['new_device_list'] = list(df[df['is_new_device'] == 1]['_device_id'].unique()) + device_data['new_device'] = len(device_data['new_device_list']) + + account_data['new_account_list'] = list(df[df['is_new_channel_uid'] == 1]['_channel_uid'].unique()) + account_data['new_account'] = len(account_data['new_account_list']) + + @HandlerWrapper + def handler_pay(self, role_data, device_data, account_data, df, group, cdate): + where = { + 'cdate': {'$gte': cdate} + } + all_pay_device_list = self.local_db['summary_pay'].distinct('_device_id', where) + all_pay_account_list = self.local_db['summary_pay'].distinct('_channel_uid', where) + all_pay_role_list = self.local_db['summary_pay'].distinct('_game_role_id', where) + + pay_device_list = set(all_pay_device_list) & set(device_data['new_device_list']) + pay_account_list = set(all_pay_account_list) & set(account_data['new_account_list']) + pay_role_list = set(all_pay_role_list) & set(role_data['new_role_list']) + + role_data['now_pay_role_list'] = list(pay_role_list) + role_data['now_pay_role'] = len(role_data['now_pay_role_list']) + account_data['now_pay_account_list'] = list(pay_account_list) + account_data['now_pay_account'] = len(account_data['now_pay_account_list']) + device_data['now_pay_device_list'] = list(pay_device_list) + device_data['now_pay_device'] = len(device_data['now_pay_device_list']) + + @HandlerWrapper + def handler_pay_login(self, role_data, device_data, account_data, df, group, cdate): + st = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp()) + et = st + 86400 + role_data['pay_role_now_login'] = self.local_db['summary_login'].count({ + 'cdate': {'$gte': st, '$lt': et}, + '_game_role_id': {'$in': role_data['now_pay_role_list']} + }) + + account_data['pay_account_now_login'] = self.local_db['summary_login'].count({ + 'cdate': {'$gte': st, '$lt': et}, + '_channel_uid': {'$in': account_data['now_pay_account_list']} + }) + + device_data['pay_device_now_login'] = self.local_db['summary_login'].count({ + 'cdate': {'$gte': st, '$lt': et}, + '_first_device_id': {'$in': device_data['now_pay_device_list']} + }) + + @HandlerWrapper + def handler_login(self, role_data, device_data, account_data, df, group, cdate): + ts = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp()) + ts -= 86400 + role_data['role_now_login'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_game_role_id': {'$in': role_data['new_role_list']} + }) + + account_data['account_now_login'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_channel_uid': {'$in': account_data['new_account_list']} + }) + + device_data['device_now_login'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_first_device_id': {'$in': device_data['new_device_list']} + }) + + @HandlerWrapper + def handler_login_day(self, role_data, device_data, account_data, df, group, cdate): + c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize() + today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize() + for day in RETAIN_DAYS: + day_n = c_day + pd.Timedelta(days=(day - 1)) + if day_n >= today: + continue + + ts = int(day_n.timestamp()) + + role_data[f'role_login_{day}'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_game_role_id': {'$in': role_data['new_role_list']} + }) + + account_data[f'account_login_{day}'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_channel_uid': {'$in': account_data['new_account_list']} + }) + + device_data[f'device_login_{day}'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_first_device_id': {'$in': device_data['new_device_list']} + }) + + @HandlerWrapper + def handler_login_day_pay(self, role_data, device_data, account_data, df, group, cdate): + c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize() + today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize() + for day in RETAIN_DAYS: + day_n = c_day + pd.Timedelta(days=(day - 1)) + if day_n >= today: + continue + + ts = int(day_n.timestamp()) + + role_data[f'pay_role_login_{day}'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_game_role_id': {'$in': role_data['now_pay_role_list']} + }) + + account_data[f'pay_account_login_{day}'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_channel_uid': {'$in': account_data['now_pay_account_list']} + }) + + device_data[f'pay_device_login_{day}'] = self.local_db['summary_login'].count({ + 'cdate': ts, + '_first_device_id': {'$in': device_data['now_pay_device_list']} + }) + + @HandlerWrapper + def handler_money_n(self, role_data, device_data, account_data, df, group, cdate): + c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize() + today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize() + for day in LTV_DAYS: + day_n = c_day + pd.Timedelta(days=(day - 1)) + if day_n >= today: + continue + + ts = int(day_n.timestamp()) + + pipeline = [ + {'$match': {'cdate': {'$gte': cdate, '$lte': ts}, + '_game_role_id': {'$in': role_data['now_pay_role_list']} + } + }, + {'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}} + ] + tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}] + role_data[f'role_all_money_{day}'] = tmp[0].get('sum_money', 0) + + pipeline = [ + {'$match': {'cdate': {'$gte': cdate, '$lte': ts}, + '_channel_uid': {'$in': account_data['now_pay_account_list']} + } + }, + {'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}} + ] + tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}] + account_data[f'account_all_money_{day}'] = tmp[0].get('sum_money', 0) + + pipeline = [ + {'$match': {'cdate': {'$gte': cdate, '$lte': ts}, + '_first_device_id': {'$in': device_data['now_pay_device_list']} + } + }, + {'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}} + ] + tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}] + device_data[f'device_all_money_{day}'] = tmp[0].get('sum_money', 0) + + @HandlerWrapper + def save_today(self, role_data, device_data, account_data, df, group, cdate): + def update(cat, data): + if not data.get(f'new_{cat}'): + return + obj_id = self.local_db[f'summary_{cat}1'].update_one( + {'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate}, + {'$set': data}, upsert=True) + if obj_id.upserted_id: + data['_id'] = obj_id.upserted_id + self.remote_db[f'summary_{cat}1'].update_one( + {'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate}, + {'$set': data}, upsert=True) + + update('role', role_data) + update('device', device_data) + update('account', account_data) diff --git a/task/task.py b/task/task.py index 711c7cf..82e6990 100644 --- a/task/task.py +++ b/task/task.py @@ -16,13 +16,16 @@ from utils import * class Task(metaclass=abc.ABCMeta): def __init__(self, *args, **kwargs): self.game_name = kwargs.get('game_name') - self.game_db = f'game_{self.game_name}' self.source_coll = kwargs.get('source_coll') self.dest_coll = kwargs.get('dest_coll') self.cursor_st = kwargs.get('st') self.cursor_et = kwargs.get('et') self.timezone = kwargs.get('timezone') self.task_name = kwargs.get("task_name") + self.freq = kwargs.get('freq', '30T') + for k, v in kwargs.items(): + self.__setattr__(k, v) + self.game_db = f'game_{self.game_name}' self.local_db = get_local_db(self.game_db) self.remote_db = get_remote_db(self.game_db) self.task_coll = self.local_db['task2'] @@ -31,8 +34,6 @@ class Task(metaclass=abc.ABCMeta): } self.task_info = self.get_task_info() - self.freq = kwargs.get('freq', '30T') - def get_task_info(self): task_info = self.task_coll.find_one(self.task_where) or {} return task_info @@ -109,7 +110,8 @@ class Task(metaclass=abc.ABCMeta): def get_event_coll(self) -> list: """ 根据游标时间戳 返回要处理的集合 - :return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}] + :return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, + {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}] """ df = self.generate_cursor_time() df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')