data_cleaning/task/summary3.py
2021-01-08 17:15:02 +08:00

524 lines
21 KiB
Python

import time
from pydantic import Field, BaseModel
import pandas as pd
from .task import Task
from utils import *
import numpy as np
# LTV 天数
LTV_DAYS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360]
# 留存的天数
RETAIN_DAYS = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55,
56,
57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360]
def role_struct(cdate, owner, channel, platform):
data = {
'cdate': cdate,
'_channel_name': channel,
'_owner_name': owner,
'_platform': platform,
'new_role_list': [],
'new_role': 0,
'now_pay_role_list': [],
'now_pay_role': 0,
'role_now_login': 0,
'pay_role_now_login': 0,
'now_pay_money': 0,
}
role_login_n = {'role_login_' + str(day): 0 for day in LTV_DAYS}
pay_role_login_n = {'pay_role_login_' + str(day): 0 for day in RETAIN_DAYS}
role_all_money_n = {'role_all_money_' + str(day): 0 for day in LTV_DAYS}
data.update(role_login_n)
data.update(pay_role_login_n)
data.update(role_all_money_n)
return data
def device_struct(cdate, owner, channel, platform):
data = {
'cdate': cdate,
'_channel_name': channel,
'_owner_name': owner,
'_platform': platform,
'new_device_list': [],
'new_device': 0,
'now_pay_device_list': [],
'now_pay_device': 0,
'device_now_login': 0,
'pay_device_now_login': 0,
'now_pay_money': 0,
}
device_login_n = {'device_login_' + str(day): 0 for day in RETAIN_DAYS}
pay_device_login_n = {'pay_device_login_' + str(day): 0 for day in RETAIN_DAYS}
device_all_money_n = {'device_all_money_' + str(day): 0 for day in LTV_DAYS}
data.update(device_login_n)
data.update(pay_device_login_n)
data.update(device_all_money_n)
return data
def account_struct(cdate, owner, channel, platform):
data = {
'cdate': cdate,
'_channel_name': channel,
'_owner_name': owner,
'_platform': platform,
'new_account_list': [],
'new_account': 0,
'now_pay_account_list': [],
'now_pay_account': 0,
'account_now_login': 0,
'pay_account_now_login': 0,
'now_pay_money': 0,
}
account_login_n = {'account_login_' + str(day): 0 for day in RETAIN_DAYS}
pay_account_login_n = {'pay_account_login_' + str(day): 0 for day in RETAIN_DAYS}
account_all_money_n = {'account_all_money_' + str(day): 0 for day in LTV_DAYS}
data.update(account_login_n)
data.update(pay_account_login_n)
data.update(account_all_money_n)
return data
class HandlerWrapper:
handler_link = []
def __init__(self, func):
self.func = func
HandlerWrapper.handler_link.append(func)
def __call__(self, *args, **kwargs):
self.func(*args, **kwargs)
class HandlerWrapperInc:
handler_link = []
def __init__(self, func):
self.func = func
HandlerWrapperInc.handler_link.append(func)
def __call__(self, *args, **kwargs):
self.func(*args, **kwargs)
class Summary3(Task):
"""
Summary3
"""
handler_link = []
# 处理天游标
def get_cursor(self):
if not self.cursor_st:
self.cursor_st = self.task_info.get('cursor_et')
if not self.cursor_st:
self.cursor_st = int(time.time()) - 86400
super().get_cursor()
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq)
df = pd.DataFrame(index=date_index[:-1])
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()])
return df
class Model(BaseModel):
game_role_id: str = Field(..., title="角色id", alias='_game_role_id')
platform: str = Field(..., min_length=1, title="平台", alias='_platform')
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')
device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id')
district_server_id: int = Field(..., title="区服id", alias='_district_server_id')
role_create_time: int = Field(..., title="角色创建时间")
is_new_device: int = Field(None, title="新设备")
is_new_channel_uid: int = Field(None, title="新账号")
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 游标 {ts}')
# 这一天新用户
where = {
'role_create_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
},
'_owner_name': {'$nin': ['dev', 'banshu', 'tishen']},
'_platform': {'$exists': True},
'_channel_name': {'$exists': True}
}
projection = self.Model.get_fields()
user_df = pd.DataFrame(self.local_db[self.source_coll].find(where, projection))
self.handler_summary(user_df, ts['cursor_st'])
if self.is_inc:
self.handler_summary_inc(ts['cursor_st'])
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])
def handler_summary_inc(self, cdate):
params = (self, cdate)
list(map(lambda handler: handler(*params), HandlerWrapperInc.handler_link))
@HandlerWrapperInc
def handler_inc_init(self, cdate):
"""
设置初始值
"""
yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp())
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
def update(cat):
cursor = self.local_db[f'summary_{cat}1'].find({'cdate': {'$gte': min_ts, '$lt': cdate}})
for doc in cursor:
doc_date = doc['cdate']
age_day = (cdate - doc_date) // 86400 + 1
data = {f'{cat}_all_money_{age_day}': doc[f'{cat}_all_money_{age_day - 1}']}
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
update('role')
update('account')
update('device')
@HandlerWrapperInc
def handler_money_inc(self, cdate):
yesterday_ts = int(pd.Timestamp(cdate, unit='s',
tz=self.timezone).normalize().timestamp())
df = pd.DataFrame(self.local_db['summary_pay'].find({'cdate': yesterday_ts}, {
'_id': False,
'_game_role_id': True,
'_channel_uid': True,
'_device_id': True,
'role_create_time': True,
}))
if df.shape == (0, 0):
return
role_list = list(df['_game_role_id'].unique())
account_list = list(df['_channel_uid'].unique())
device_list = list(df['_device_id'].unique())
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
def update(cat, cat_list):
update_doc = dict()
for cid in cat_list:
data = self.local_db[f'summary_{cat}1'].find_one(
{'cdate': {'$gte': min_ts, '$lte': yesterday_ts}, f'new_{cat}_list': cid}, {
'_id': True,
'cdate': True,
f'now_pay_{cat}_list': True,
})
if not data:
continue
update_doc[data['_id']] = data
# 添加新充值用户
data[f'now_pay_{cat}_list'] = list(set(data[f'now_pay_{cat}_list']) | {cid})
data[f'now_pay_{cat}'] = len(data[f'now_pay_{cat}_list'])
self.local_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data})
# 计算累计充值
for id_, doc in update_doc.items():
pipeline = [
{
'$match': {
"cdate": {'$gte': doc['cdate'], '$lte': cdate},
f'_game_{cat}_id': {'$in': doc[f'now_pay_{cat}_list']},
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
try:
sum_money = self.local_db['summary_pay'].aggregate(pipeline).next()
except StopIteration:
continue
age_day = (cdate - doc['cdate']) // 86400 + 1
if sum_money:
data = {f'{cat}_all_money_{age_day}': sum_money['sum_money']}
self.local_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data})
update('role', role_list)
update('account', account_list)
update('device', device_list)
@HandlerWrapperInc
def handler_login_inc(self, cdate):
yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp())
df = pd.DataFrame(self.local_db['summary_login'].find({'cdate': yesterday_ts}, {
'_id': False,
'_game_role_id': True,
'_channel_uid': True,
'_device_id': True,
'role_create_time': True,
}))
if df.shape == (0, 0):
return
df = df[df['role_create_time'] > 0]
df['role_cdate'] = df['role_create_time'].apply(
lambda x: int(pd.Timestamp(x, unit='s', tz=self.timezone).normalize().timestamp()))
df.rename(columns={'_game_role_id': 'role_id', '_channel_uid': 'account_id', '_device_id': 'device_id'},
inplace=True)
def update(cat):
cursor = self.local_db[f'summary_{cat}1'].find(
{'cdate': role_cdate}, {
'_id': True,
f'now_pay_{cat}_list': True,
f'new_{cat}_list': True,
})
for doc in cursor:
data = dict()
age_day = (cdate - role_cdate) // 86400 + 1
n = df[df[f'{cat}_id'].isin(doc[f'now_pay_{cat}_list'])].shape[0]
if n:
data[f'pay_{cat}_login_{age_day}'] = n
n = df[df[f'{cat}_id'].isin(doc[f'new_{cat}_list'])].shape[0]
if n:
data[f'{cat}_login_{age_day}'] = n
if data:
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
for role_cdate in df['role_cdate'].unique():
role_cdate = int(role_cdate)
update('role')
update('account')
update('device')
def handler_summary(self, user_df, cdate):
# 日期-owner-channel-platform 相同为一个文档
#
if user_df.shape == (0, 0):
return
user_group = user_df.groupby(['_owner_name', '_channel_name', '_platform'])
for group in user_group.groups:
df = user_group.get_group(group)
role_data = role_struct(cdate, *group)
device_data = device_struct(cdate, *group)
account_data = account_struct(cdate, *group)
params = (self, role_data, device_data, account_data, df, group, cdate)
list(map(lambda handler: handler(*params), HandlerWrapper.handler_link))
@HandlerWrapper
def handler_now(self, role_data, device_data, account_data, df, group, cdate):
role_data['new_role_list'] = list(df['_game_role_id'].unique())
role_data['new_role'] = len(role_data['new_role_list'])
device_data['new_device_list'] = list(df[df['is_new_device'] == 1]['_device_id'].unique())
device_data['new_device'] = len(device_data['new_device_list'])
account_data['new_account_list'] = list(df[df['is_new_channel_uid'] == 1]['_channel_uid'].unique())
account_data['new_account'] = len(account_data['new_account_list'])
@HandlerWrapper
def handler_pay(self, role_data, device_data, account_data, df, group, cdate):
where = {
'cdate': {'$gte': cdate}
}
all_pay_device_list = self.local_db['summary_pay'].distinct('_device_id', where)
all_pay_account_list = self.local_db['summary_pay'].distinct('_channel_uid', where)
all_pay_role_list = self.local_db['summary_pay'].distinct('_game_role_id', where)
pay_device_list = set(all_pay_device_list) & set(device_data['new_device_list'])
pay_account_list = set(all_pay_account_list) & set(account_data['new_account_list'])
pay_role_list = set(all_pay_role_list) & set(role_data['new_role_list'])
role_data['now_pay_role_list'] = list(pay_role_list)
role_data['now_pay_role'] = len(role_data['now_pay_role_list'])
account_data['now_pay_account_list'] = list(pay_account_list)
account_data['now_pay_account'] = len(account_data['now_pay_account_list'])
device_data['now_pay_device_list'] = list(pay_device_list)
device_data['now_pay_device'] = len(device_data['now_pay_device_list'])
@HandlerWrapper
def handler_pay_login(self, role_data, device_data, account_data, df, group, cdate):
st = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp())
et = st + 86400
role_data['pay_role_now_login'] = self.local_db['summary_login'].count({
'cdate': {'$gte': st, '$lt': et},
'_game_role_id': {'$in': role_data['now_pay_role_list']}
})
account_data['pay_account_now_login'] = self.local_db['summary_login'].count({
'cdate': {'$gte': st, '$lt': et},
'_channel_uid': {'$in': account_data['now_pay_account_list']}
})
device_data['pay_device_now_login'] = self.local_db['summary_login'].count({
'cdate': {'$gte': st, '$lt': et},
'_first_device_id': {'$in': device_data['now_pay_device_list']}
})
@HandlerWrapper
def handler_login(self, role_data, device_data, account_data, df, group, cdate):
ts = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp())
ts -= 86400
role_data['role_now_login'] = self.local_db['summary_login'].count({
'cdate': ts,
'_game_role_id': {'$in': role_data['new_role_list']}
})
account_data['account_now_login'] = self.local_db['summary_login'].count({
'cdate': ts,
'_channel_uid': {'$in': account_data['new_account_list']}
})
device_data['device_now_login'] = self.local_db['summary_login'].count({
'cdate': ts,
'_first_device_id': {'$in': device_data['new_device_list']}
})
@HandlerWrapper
def handler_login_day(self, role_data, device_data, account_data, df, group, cdate):
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
for day in RETAIN_DAYS:
day_n = c_day + pd.Timedelta(days=(day - 1))
if day_n >= today:
continue
ts = int(day_n.timestamp())
role_data[f'role_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_game_role_id': {'$in': role_data['new_role_list']}
})
account_data[f'account_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_channel_uid': {'$in': account_data['new_account_list']}
})
device_data[f'device_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_first_device_id': {'$in': device_data['new_device_list']}
})
@HandlerWrapper
def handler_login_day_pay(self, role_data, device_data, account_data, df, group, cdate):
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
for day in RETAIN_DAYS:
day_n = c_day + pd.Timedelta(days=(day - 1))
if day_n >= today:
continue
ts = int(day_n.timestamp())
role_data[f'pay_role_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_game_role_id': {'$in': role_data['now_pay_role_list']}
})
account_data[f'pay_account_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_channel_uid': {'$in': account_data['now_pay_account_list']}
})
device_data[f'pay_device_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_first_device_id': {'$in': device_data['now_pay_device_list']}
})
@HandlerWrapper
def handler_money_n(self, role_data, device_data, account_data, df, group, cdate):
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
for day in LTV_DAYS:
day_n = c_day + pd.Timedelta(days=(day - 1))
if day_n >= today:
continue
ts = int(day_n.timestamp())
pipeline = [
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
'_game_role_id': {'$in': role_data['now_pay_role_list']}
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
role_data[f'role_all_money_{day}'] = tmp[0].get('sum_money', 0)
pipeline = [
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
'_channel_uid': {'$in': account_data['now_pay_account_list']}
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
account_data[f'account_all_money_{day}'] = tmp[0].get('sum_money', 0)
pipeline = [
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
'_first_device_id': {'$in': device_data['now_pay_device_list']}
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
device_data[f'device_all_money_{day}'] = tmp[0].get('sum_money', 0)
@HandlerWrapper
def save_today(self, role_data, device_data, account_data, df, group, cdate):
def update(cat, data):
if not data.get(f'new_{cat}'):
return
obj_id = self.local_db[f'summary_{cat}1'].update_one(
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
{'$set': data}, upsert=True)
if obj_id.upserted_id:
data['_id'] = obj_id.upserted_id
self.remote_db[f'summary_{cat}1'].update_one(
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
{'$set': data}, upsert=True)
update('role', role_data)
update('device', device_data)
update('account', account_data)