Compare commits
6 Commits
b30a18a2fe
...
6cb5391eb0
Author | SHA1 | Date | |
---|---|---|---|
6cb5391eb0 | |||
c445cc0cb8 | |||
019b16a39a | |||
1f289c5ca9 | |||
1091240d45 | |||
fbf375bd45 |
@ -15,7 +15,7 @@
|
||||
"task_name": "repair_gunfu"
|
||||
},
|
||||
"summary_login": {
|
||||
"source_coll": "event",
|
||||
"source_coll": "user",
|
||||
"dest_coll": "summary_login",
|
||||
"task_name": "summary_login"
|
||||
},
|
||||
@ -43,7 +43,8 @@
|
||||
"source_coll": "user",
|
||||
"dest_coll": "",
|
||||
"task_name": "summary3",
|
||||
"freq": "D"
|
||||
"freq": "D",
|
||||
"is_inc": true
|
||||
},
|
||||
"sync_user": {
|
||||
"source_coll": "user",
|
||||
|
@ -1,6 +1,6 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from model.field_type import MdbObjectId
|
||||
from model.field_type import MdbObjectId, IntStr
|
||||
|
||||
|
||||
class GBaseModel(BaseModel):
|
||||
@ -16,7 +16,7 @@ class GBaseModel(BaseModel):
|
||||
role_create_time: int = Field(None, title="角色创建时间")
|
||||
role_level: int = Field(None, title="角色等级")
|
||||
role_vip: int = Field(None, title="角色vip等级")
|
||||
role_stage: int = Field(None, title="关卡")
|
||||
role_stage: IntStr = Field(None, title="关卡")
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls):
|
||||
|
@ -75,9 +75,28 @@ class AddUserFlag(Task):
|
||||
bulk_data.append(
|
||||
UpdateOne({'_game_role_id': model.game_role_id},
|
||||
{'$set': {'is_new_device': 1}}))
|
||||
except Exception as e:
|
||||
logger.error(f'msg:{e}')
|
||||
pass
|
||||
|
||||
# 记录第一次登录设备id
|
||||
where = {
|
||||
'role_create_time': {
|
||||
'$gte': ts['cursor_st'],
|
||||
'$lt': ts['cursor_et'],
|
||||
},
|
||||
'_first_device_id': ''
|
||||
}
|
||||
for item in self.local_db[source_coll].find(where, projection):
|
||||
try:
|
||||
# 新设备
|
||||
model = self.Model(**item)
|
||||
bulk_data.append(
|
||||
UpdateOne({'_game_role_id': model.game_role_id}, {'$set': {'_first_device_id': model.device_id}}))
|
||||
except Exception as e:
|
||||
logger.error(f'msg:{e}')
|
||||
# pass
|
||||
|
||||
if bulk_data:
|
||||
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
|
||||
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
|
||||
|
521
task/summary3.py
Normal file
521
task/summary3.py
Normal file
@ -0,0 +1,521 @@
|
||||
import time
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
import pandas as pd
|
||||
|
||||
from .task import Task
|
||||
from utils import *
|
||||
import numpy as np
|
||||
|
||||
# LTV 天数
|
||||
LTV_DAYS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
|
||||
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
|
||||
57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360]
|
||||
|
||||
# 留存的天数
|
||||
RETAIN_DAYS = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
|
||||
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55,
|
||||
56,
|
||||
57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360]
|
||||
|
||||
|
||||
def role_struct(cdate, owner, channel, platform):
|
||||
data = {
|
||||
'cdate': cdate,
|
||||
'_channel_name': channel,
|
||||
'_owner_name': owner,
|
||||
'_platform': platform,
|
||||
'new_role_list': [],
|
||||
'new_role': 0,
|
||||
'now_pay_role_list': [],
|
||||
'now_pay_role': 0,
|
||||
'role_now_login': 0,
|
||||
'pay_role_now_login': 0,
|
||||
'now_pay_money': 0,
|
||||
}
|
||||
|
||||
role_login_n = {'role_login_' + str(day): 0 for day in LTV_DAYS}
|
||||
pay_role_login_n = {'pay_role_login_' + str(day): 0 for day in RETAIN_DAYS}
|
||||
|
||||
role_all_money_n = {'role_all_money_' + str(day): 0 for day in LTV_DAYS}
|
||||
|
||||
data.update(role_login_n)
|
||||
data.update(pay_role_login_n)
|
||||
data.update(role_all_money_n)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def device_struct(cdate, owner, channel, platform):
|
||||
data = {
|
||||
'cdate': cdate,
|
||||
'_channel_name': channel,
|
||||
'_owner_name': owner,
|
||||
'_platform': platform,
|
||||
|
||||
'new_device_list': [],
|
||||
'new_device': 0,
|
||||
'now_pay_device_list': [],
|
||||
'now_pay_device': 0,
|
||||
'device_now_login': 0,
|
||||
|
||||
'pay_device_now_login': 0,
|
||||
'now_pay_money': 0,
|
||||
|
||||
}
|
||||
|
||||
device_login_n = {'device_login_' + str(day): 0 for day in RETAIN_DAYS}
|
||||
|
||||
pay_device_login_n = {'pay_device_login_' + str(day): 0 for day in RETAIN_DAYS}
|
||||
|
||||
device_all_money_n = {'device_all_money_' + str(day): 0 for day in LTV_DAYS}
|
||||
|
||||
data.update(device_login_n)
|
||||
data.update(pay_device_login_n)
|
||||
data.update(device_all_money_n)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def account_struct(cdate, owner, channel, platform):
|
||||
data = {
|
||||
'cdate': cdate,
|
||||
'_channel_name': channel,
|
||||
'_owner_name': owner,
|
||||
'_platform': platform,
|
||||
'new_account_list': [],
|
||||
'new_account': 0,
|
||||
'now_pay_account_list': [],
|
||||
'now_pay_account': 0,
|
||||
'account_now_login': 0,
|
||||
'pay_account_now_login': 0,
|
||||
'now_pay_money': 0,
|
||||
|
||||
}
|
||||
|
||||
account_login_n = {'account_login_' + str(day): 0 for day in RETAIN_DAYS}
|
||||
pay_account_login_n = {'pay_account_login_' + str(day): 0 for day in RETAIN_DAYS}
|
||||
account_all_money_n = {'account_all_money_' + str(day): 0 for day in LTV_DAYS}
|
||||
|
||||
data.update(account_login_n)
|
||||
data.update(pay_account_login_n)
|
||||
data.update(account_all_money_n)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class HandlerWrapper:
|
||||
handler_link = []
|
||||
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
HandlerWrapper.handler_link.append(func)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.func(*args, **kwargs)
|
||||
|
||||
|
||||
class HandlerWrapperInc:
|
||||
handler_link = []
|
||||
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
HandlerWrapperInc.handler_link.append(func)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.func(*args, **kwargs)
|
||||
|
||||
|
||||
class Summary3(Task):
|
||||
"""
|
||||
Summary3
|
||||
"""
|
||||
|
||||
handler_link = []
|
||||
|
||||
# 处理天游标
|
||||
def get_cursor(self):
|
||||
if not self.cursor_st:
|
||||
self.cursor_st = int(time.time()) - 86400
|
||||
super().get_cursor()
|
||||
|
||||
def generate_cursor_time(self):
|
||||
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(),
|
||||
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq)
|
||||
df = pd.DataFrame(index=date_index[:-1])
|
||||
df['st'] = df.index
|
||||
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()])
|
||||
return df
|
||||
|
||||
class Model(BaseModel):
|
||||
game_role_id: str = Field(..., title="角色id", alias='_game_role_id')
|
||||
platform: str = Field(..., min_length=1, title="平台", alias='_platform')
|
||||
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
|
||||
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
|
||||
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')
|
||||
device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id')
|
||||
district_server_id: int = Field(..., title="区服id", alias='_district_server_id')
|
||||
role_create_time: int = Field(..., title="角色创建时间")
|
||||
is_new_device: int = Field(None, title="新设备")
|
||||
is_new_channel_uid: int = Field(None, title="新账号")
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls):
|
||||
return [v.alias for v in cls.__fields__.values()]
|
||||
|
||||
def cleaning(self, cursor_list):
|
||||
for cursor in cursor_list: # type:dict
|
||||
for source_coll, ts in cursor.items(): # type:str,dict
|
||||
if ts['cursor_st'] == ts['cursor_et']:
|
||||
continue
|
||||
logger.info(f'开始处理{self.game_name} 处理 游标 {ts}')
|
||||
# 这一天新用户
|
||||
where = {
|
||||
'role_create_time': {
|
||||
'$gte': ts['cursor_st'],
|
||||
'$lt': ts['cursor_et'],
|
||||
},
|
||||
'_owner_name': {'$nin': ['dev', 'banshu', 'tishen']},
|
||||
'_platform': {'$exists': True},
|
||||
'_channel_name': {'$exists': True}
|
||||
}
|
||||
projection = self.Model.get_fields()
|
||||
user_df = pd.DataFrame(self.local_db[self.source_coll].find(where, projection))
|
||||
|
||||
# self.handler_summary(user_df, ts['cursor_st'])
|
||||
if self.is_inc:
|
||||
self.handler_summary_inc(ts['cursor_st'])
|
||||
|
||||
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])
|
||||
|
||||
def handler_summary_inc(self, cdate):
|
||||
params = (self, cdate)
|
||||
list(map(lambda handler: handler(*params), HandlerWrapperInc.handler_link))
|
||||
|
||||
@HandlerWrapperInc
|
||||
def handler_inc_init(self, cdate):
|
||||
"""
|
||||
设置初始值
|
||||
"""
|
||||
yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp())
|
||||
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
|
||||
|
||||
def update(cat):
|
||||
cursor = self.local_db[f'summary_{cat}1'].find({'cdate': {'$gte': min_ts, '$lt': cdate}})
|
||||
for doc in cursor:
|
||||
doc_date = doc['cdate']
|
||||
age_day = (cdate - doc_date) // 86400 + 1
|
||||
data = {f'{cat}_all_money_{age_day}': doc[f'{cat}_all_money_{age_day - 1}']}
|
||||
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
|
||||
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
|
||||
|
||||
update('role')
|
||||
update('account')
|
||||
update('device')
|
||||
|
||||
@HandlerWrapperInc
|
||||
def handler_money_inc(self, cdate):
|
||||
yesterday_ts = int(pd.Timestamp(cdate, unit='s',
|
||||
tz=self.timezone).normalize().timestamp())
|
||||
|
||||
df = pd.DataFrame(self.local_db['summary_pay'].find({'cdate': yesterday_ts}, {
|
||||
'_id': False,
|
||||
'_game_role_id': True,
|
||||
'_channel_uid': True,
|
||||
'_device_id': True,
|
||||
'role_create_time': True,
|
||||
}))
|
||||
|
||||
if df.shape == (0, 0):
|
||||
return
|
||||
|
||||
role_list = list(df['_game_role_id'].unique())
|
||||
account_list = list(df['_channel_uid'].unique())
|
||||
device_list = list(df['_device_id'].unique())
|
||||
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
|
||||
|
||||
def update(cat, cat_list):
|
||||
update_doc = dict()
|
||||
for cid in cat_list:
|
||||
data = self.local_db[f'summary_{cat}1'].find_one(
|
||||
{'cdate': {'$gte': min_ts, '$lte': yesterday_ts}, f'new_{cat}_list': cid}, {
|
||||
'_id': True,
|
||||
'cdate': True,
|
||||
f'now_pay_{cat}_list': True,
|
||||
})
|
||||
if not data:
|
||||
continue
|
||||
|
||||
update_doc[data['_id']] = data
|
||||
# 添加新充值用户
|
||||
data[f'now_pay_{cat}_list'] = list(set(data[f'now_pay_{cat}_list']) | {cid})
|
||||
data[f'now_pay_{cat}'] = len(data[f'now_pay_{cat}_list'])
|
||||
self.local_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data})
|
||||
self.remote_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data})
|
||||
# 计算累计充值
|
||||
for id_, doc in update_doc.items():
|
||||
pipeline = [
|
||||
{
|
||||
'$match': {
|
||||
"cdate": {'$gte': doc['cdate'], '$lte': cdate},
|
||||
f'_game_{cat}_id': {'$in': doc[f'now_pay_{cat}_list']},
|
||||
}
|
||||
},
|
||||
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
|
||||
]
|
||||
try:
|
||||
sum_money = self.local_db['summary_pay'].aggregate(pipeline).next()
|
||||
except StopIteration:
|
||||
continue
|
||||
age_day = (cdate - doc['cdate']) // 86400 + 1
|
||||
if sum_money:
|
||||
data = {f'{cat}_all_money_{age_day}': sum_money['sum_money']}
|
||||
self.local_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data})
|
||||
self.remote_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data})
|
||||
|
||||
update('role', role_list)
|
||||
update('account', account_list)
|
||||
update('device', device_list)
|
||||
|
||||
@HandlerWrapperInc
|
||||
def handler_login_inc(self, cdate):
|
||||
yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp())
|
||||
|
||||
df = pd.DataFrame(self.local_db['summary_login'].find({'cdate': yesterday_ts}, {
|
||||
'_id': False,
|
||||
'_game_role_id': True,
|
||||
'_channel_uid': True,
|
||||
'_device_id': True,
|
||||
'role_create_time': True,
|
||||
}))
|
||||
|
||||
if df.shape == (0, 0):
|
||||
return
|
||||
df = df[df['role_create_time'] > 0]
|
||||
df['role_cdate'] = df['role_create_time'].apply(
|
||||
lambda x: int(pd.Timestamp(x, unit='s', tz=self.timezone).normalize().timestamp()))
|
||||
|
||||
df.rename(columns={'_game_role_id': 'role_id', '_channel_uid': 'account_id', '_device_id': 'device_id'},
|
||||
inplace=True)
|
||||
|
||||
def update(cat):
|
||||
cursor = self.local_db[f'summary_{cat}1'].find(
|
||||
{'cdate': role_cdate}, {
|
||||
'_id': True,
|
||||
f'now_pay_{cat}_list': True,
|
||||
f'new_{cat}_list': True,
|
||||
})
|
||||
for doc in cursor:
|
||||
data = dict()
|
||||
age_day = (cdate - role_cdate) // 86400 + 1
|
||||
n = df[df[f'{cat}_id'].isin(doc[f'now_pay_{cat}_list'])].shape[0]
|
||||
if n:
|
||||
data[f'pay_{cat}_login_{age_day}'] = n
|
||||
|
||||
n = df[df[f'{cat}_id'].isin(doc[f'new_{cat}_list'])].shape[0]
|
||||
if n:
|
||||
data[f'{cat}_login_{age_day}'] = n
|
||||
if data:
|
||||
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
|
||||
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
|
||||
|
||||
for role_cdate in df['role_cdate'].unique():
|
||||
role_cdate = int(role_cdate)
|
||||
update('role')
|
||||
update('account')
|
||||
update('device')
|
||||
|
||||
def handler_summary(self, user_df, cdate):
|
||||
# 日期-owner-channel-platform 相同为一个文档
|
||||
#
|
||||
if user_df.shape == (0, 0):
|
||||
return
|
||||
user_group = user_df.groupby(['_owner_name', '_channel_name', '_platform'])
|
||||
for group in user_group.groups:
|
||||
df = user_group.get_group(group)
|
||||
role_data = role_struct(cdate, *group)
|
||||
device_data = device_struct(cdate, *group)
|
||||
account_data = account_struct(cdate, *group)
|
||||
params = (self, role_data, device_data, account_data, df, group, cdate)
|
||||
list(map(lambda handler: handler(*params), HandlerWrapper.handler_link))
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_now(self, role_data, device_data, account_data, df, group, cdate):
|
||||
|
||||
role_data['new_role_list'] = list(df['_game_role_id'].unique())
|
||||
role_data['new_role'] = len(role_data['new_role_list'])
|
||||
|
||||
device_data['new_device_list'] = list(df[df['is_new_device'] == 1]['_device_id'].unique())
|
||||
device_data['new_device'] = len(device_data['new_device_list'])
|
||||
|
||||
account_data['new_account_list'] = list(df[df['is_new_channel_uid'] == 1]['_channel_uid'].unique())
|
||||
account_data['new_account'] = len(account_data['new_account_list'])
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_pay(self, role_data, device_data, account_data, df, group, cdate):
|
||||
where = {
|
||||
'cdate': {'$gte': cdate}
|
||||
}
|
||||
all_pay_device_list = self.local_db['summary_pay'].distinct('_device_id', where)
|
||||
all_pay_account_list = self.local_db['summary_pay'].distinct('_channel_uid', where)
|
||||
all_pay_role_list = self.local_db['summary_pay'].distinct('_game_role_id', where)
|
||||
|
||||
pay_device_list = set(all_pay_device_list) & set(device_data['new_device_list'])
|
||||
pay_account_list = set(all_pay_account_list) & set(account_data['new_account_list'])
|
||||
pay_role_list = set(all_pay_role_list) & set(role_data['new_role_list'])
|
||||
|
||||
role_data['now_pay_role_list'] = list(pay_role_list)
|
||||
role_data['now_pay_role'] = len(role_data['now_pay_role_list'])
|
||||
account_data['now_pay_account_list'] = list(pay_account_list)
|
||||
account_data['now_pay_account'] = len(account_data['now_pay_account_list'])
|
||||
device_data['now_pay_device_list'] = list(pay_device_list)
|
||||
device_data['now_pay_device'] = len(device_data['now_pay_device_list'])
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_pay_login(self, role_data, device_data, account_data, df, group, cdate):
|
||||
st = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp())
|
||||
et = st + 86400
|
||||
role_data['pay_role_now_login'] = self.local_db['summary_login'].count({
|
||||
'cdate': {'$gte': st, '$lt': et},
|
||||
'_game_role_id': {'$in': role_data['now_pay_role_list']}
|
||||
})
|
||||
|
||||
account_data['pay_account_now_login'] = self.local_db['summary_login'].count({
|
||||
'cdate': {'$gte': st, '$lt': et},
|
||||
'_channel_uid': {'$in': account_data['now_pay_account_list']}
|
||||
})
|
||||
|
||||
device_data['pay_device_now_login'] = self.local_db['summary_login'].count({
|
||||
'cdate': {'$gte': st, '$lt': et},
|
||||
'_first_device_id': {'$in': device_data['now_pay_device_list']}
|
||||
})
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_login(self, role_data, device_data, account_data, df, group, cdate):
|
||||
ts = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp())
|
||||
ts -= 86400
|
||||
role_data['role_now_login'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_game_role_id': {'$in': role_data['new_role_list']}
|
||||
})
|
||||
|
||||
account_data['account_now_login'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_channel_uid': {'$in': account_data['new_account_list']}
|
||||
})
|
||||
|
||||
device_data['device_now_login'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_first_device_id': {'$in': device_data['new_device_list']}
|
||||
})
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_login_day(self, role_data, device_data, account_data, df, group, cdate):
|
||||
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
|
||||
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
|
||||
for day in RETAIN_DAYS:
|
||||
day_n = c_day + pd.Timedelta(days=(day - 1))
|
||||
if day_n >= today:
|
||||
continue
|
||||
|
||||
ts = int(day_n.timestamp())
|
||||
|
||||
role_data[f'role_login_{day}'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_game_role_id': {'$in': role_data['new_role_list']}
|
||||
})
|
||||
|
||||
account_data[f'account_login_{day}'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_channel_uid': {'$in': account_data['new_account_list']}
|
||||
})
|
||||
|
||||
device_data[f'device_login_{day}'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_first_device_id': {'$in': device_data['new_device_list']}
|
||||
})
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_login_day_pay(self, role_data, device_data, account_data, df, group, cdate):
|
||||
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
|
||||
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
|
||||
for day in RETAIN_DAYS:
|
||||
day_n = c_day + pd.Timedelta(days=(day - 1))
|
||||
if day_n >= today:
|
||||
continue
|
||||
|
||||
ts = int(day_n.timestamp())
|
||||
|
||||
role_data[f'pay_role_login_{day}'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_game_role_id': {'$in': role_data['now_pay_role_list']}
|
||||
})
|
||||
|
||||
account_data[f'pay_account_login_{day}'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_channel_uid': {'$in': account_data['now_pay_account_list']}
|
||||
})
|
||||
|
||||
device_data[f'pay_device_login_{day}'] = self.local_db['summary_login'].count({
|
||||
'cdate': ts,
|
||||
'_first_device_id': {'$in': device_data['now_pay_device_list']}
|
||||
})
|
||||
|
||||
@HandlerWrapper
|
||||
def handler_money_n(self, role_data, device_data, account_data, df, group, cdate):
|
||||
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
|
||||
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
|
||||
for day in LTV_DAYS:
|
||||
day_n = c_day + pd.Timedelta(days=(day - 1))
|
||||
if day_n >= today:
|
||||
continue
|
||||
|
||||
ts = int(day_n.timestamp())
|
||||
|
||||
pipeline = [
|
||||
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
|
||||
'_game_role_id': {'$in': role_data['now_pay_role_list']}
|
||||
}
|
||||
},
|
||||
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
|
||||
]
|
||||
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
|
||||
role_data[f'role_all_money_{day}'] = tmp[0].get('sum_money', 0)
|
||||
|
||||
pipeline = [
|
||||
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
|
||||
'_channel_uid': {'$in': account_data['now_pay_account_list']}
|
||||
}
|
||||
},
|
||||
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
|
||||
]
|
||||
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
|
||||
account_data[f'account_all_money_{day}'] = tmp[0].get('sum_money', 0)
|
||||
|
||||
pipeline = [
|
||||
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
|
||||
'_first_device_id': {'$in': device_data['now_pay_device_list']}
|
||||
}
|
||||
},
|
||||
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
|
||||
]
|
||||
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
|
||||
device_data[f'device_all_money_{day}'] = tmp[0].get('sum_money', 0)
|
||||
|
||||
@HandlerWrapper
|
||||
def save_today(self, role_data, device_data, account_data, df, group, cdate):
|
||||
def update(cat, data):
|
||||
if not data.get(f'new_{cat}'):
|
||||
return
|
||||
obj_id = self.local_db[f'summary_{cat}1'].update_one(
|
||||
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
|
||||
{'$set': data}, upsert=True)
|
||||
if obj_id.upserted_id:
|
||||
data['_id'] = obj_id.upserted_id
|
||||
self.remote_db[f'summary_{cat}1'].update_one(
|
||||
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
|
||||
{'$set': data}, upsert=True)
|
||||
|
||||
update('role', role_data)
|
||||
update('device', device_data)
|
||||
update('account', account_data)
|
@ -14,7 +14,7 @@ class SummaryLogin(Task):
|
||||
|
||||
class Model(GBaseModel):
|
||||
cdate: int = Field(..., title='当天0点')
|
||||
# first_device_id: str = Field(None, title='第一次登录设备id', alias='_first_device_id')
|
||||
first_device_id: str = Field(None, title='第一次登录设备id', alias='_first_device_id')
|
||||
manufacturer: str = Field(None, title='设备品牌', alias='_manufacturer')
|
||||
model: str = Field(None, title='型号', alias='_model')
|
||||
os_version: str = Field(None, title='系统版本', alias='_os_version')
|
||||
@ -29,11 +29,10 @@ class SummaryLogin(Task):
|
||||
continue
|
||||
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
|
||||
where = {
|
||||
'_event_time': {
|
||||
'_ut': {
|
||||
'$gte': ts['cursor_st'],
|
||||
'$lt': ts['cursor_et'],
|
||||
},
|
||||
'_event_name': 'TimeSpending' # 在线时长打点 30s
|
||||
}
|
||||
}
|
||||
|
||||
projection = self.Model.get_fields()
|
||||
@ -47,14 +46,15 @@ class SummaryLogin(Task):
|
||||
|
||||
# 还没有记录的
|
||||
role_set = set(role_list) - set(exists_role_list)
|
||||
for role_id in role_set:
|
||||
item = self.local_db[source_coll].find_one(
|
||||
{'_game_role_id': role_id, '_event_name': 'TimeSpending'}, projection)
|
||||
for item in self.local_db[source_coll].find({'_game_role_id': {'$in': list(role_set)}}, projection):
|
||||
try:
|
||||
item['cdate'] = cdate
|
||||
model = self.Model(**item)
|
||||
data = model.dict(by_alias=True)
|
||||
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
|
||||
data.pop('_id')
|
||||
bulk_data.append(
|
||||
UpdateOne({'cdate': cdate, '_game_role_id': data['_game_role_id']}, {'$set': data},
|
||||
upsert=True))
|
||||
except Exception as e:
|
||||
logger.error(f'msg:{e}')
|
||||
# pass
|
||||
|
10
task/task.py
10
task/task.py
@ -16,13 +16,16 @@ from utils import *
|
||||
class Task(metaclass=abc.ABCMeta):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.game_name = kwargs.get('game_name')
|
||||
self.game_db = f'game_{self.game_name}'
|
||||
self.source_coll = kwargs.get('source_coll')
|
||||
self.dest_coll = kwargs.get('dest_coll')
|
||||
self.cursor_st = kwargs.get('st')
|
||||
self.cursor_et = kwargs.get('et')
|
||||
self.timezone = kwargs.get('timezone')
|
||||
self.task_name = kwargs.get("task_name")
|
||||
self.freq = kwargs.get('freq', '30T')
|
||||
for k, v in kwargs.items():
|
||||
self.__setattr__(k, v)
|
||||
self.game_db = f'game_{self.game_name}'
|
||||
self.local_db = get_local_db(self.game_db)
|
||||
self.remote_db = get_remote_db(self.game_db)
|
||||
self.task_coll = self.local_db['task2']
|
||||
@ -31,8 +34,6 @@ class Task(metaclass=abc.ABCMeta):
|
||||
}
|
||||
self.task_info = self.get_task_info()
|
||||
|
||||
self.freq = kwargs.get('freq', '30T')
|
||||
|
||||
def get_task_info(self):
|
||||
task_info = self.task_coll.find_one(self.task_where) or {}
|
||||
return task_info
|
||||
@ -109,7 +110,8 @@ class Task(metaclass=abc.ABCMeta):
|
||||
def get_event_coll(self) -> list:
|
||||
"""
|
||||
根据游标时间戳 返回要处理的集合
|
||||
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
|
||||
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}},
|
||||
{'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
|
||||
"""
|
||||
df = self.generate_cursor_time()
|
||||
df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
|
||||
|
Loading…
Reference in New Issue
Block a user