This commit is contained in:
kf_wuhao 2021-01-13 13:48:46 +08:00
parent 9257424091
commit 7143d44e32

View File

@ -202,13 +202,13 @@ class Summary3(Task):
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400 min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
def update(cat): def update(cat):
cursor = self.local_db[f'summary_{cat}1'].find({'cdate': {'$gte': min_ts, '$lt': cdate}}) cursor = self.local_db[f'summary_{cat}'].find({'cdate': {'$gte': min_ts, '$lt': cdate}})
for doc in cursor: for doc in cursor:
doc_date = doc['cdate'] doc_date = doc['cdate']
age_day = (cdate - doc_date) // 86400 + 1 age_day = (cdate - doc_date) // 86400 + 1
data = {f'{cat}_all_money_{age_day}': doc[f'{cat}_all_money_{age_day - 1}']} data = {f'{cat}_all_money_{age_day}': doc[f'{cat}_all_money_{age_day - 1}']}
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) self.local_db[f'summary_{cat}'].update_one({'_id': doc['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) self.remote_db[f'summary_{cat}'].update_one({'_id': doc['_id']}, {'$set': data})
update('role') update('role')
update('account') update('account')
@ -238,7 +238,7 @@ class Summary3(Task):
def update(cat, cat_list): def update(cat, cat_list):
update_doc = dict() update_doc = dict()
for cid in cat_list: for cid in cat_list:
data = self.local_db[f'summary_{cat}1'].find_one( data = self.local_db[f'summary_{cat}'].find_one(
{'cdate': {'$gte': min_ts, '$lte': yesterday_ts}, f'new_{cat}_list': cid}, { {'cdate': {'$gte': min_ts, '$lte': yesterday_ts}, f'new_{cat}_list': cid}, {
'_id': True, '_id': True,
'cdate': True, 'cdate': True,
@ -251,8 +251,8 @@ class Summary3(Task):
# 添加新充值用户 # 添加新充值用户
data[f'now_pay_{cat}_list'] = list(set(data[f'now_pay_{cat}_list']) | {cid}) data[f'now_pay_{cat}_list'] = list(set(data[f'now_pay_{cat}_list']) | {cid})
data[f'now_pay_{cat}'] = len(data[f'now_pay_{cat}_list']) data[f'now_pay_{cat}'] = len(data[f'now_pay_{cat}_list'])
self.local_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data}) self.local_db[f'summary_{cat}'].update_one({'_id': data['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data}) self.remote_db[f'summary_{cat}'].update_one({'_id': data['_id']}, {'$set': data})
# 计算累计充值 # 计算累计充值
for id_, doc in update_doc.items(): for id_, doc in update_doc.items():
pipeline = [ pipeline = [
@ -271,8 +271,8 @@ class Summary3(Task):
age_day = (cdate - doc['cdate']) // 86400 + 1 age_day = (cdate - doc['cdate']) // 86400 + 1
if sum_money: if sum_money:
data = {f'{cat}_all_money_{age_day}': sum_money['sum_money']} data = {f'{cat}_all_money_{age_day}': sum_money['sum_money']}
self.local_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data}) self.local_db[f'summary_{cat}'].update_one({'_id': id_}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data}) self.remote_db[f'summary_{cat}'].update_one({'_id': id_}, {'$set': data})
update('role', role_list) update('role', role_list)
update('account', account_list) update('account', account_list)
@ -300,7 +300,7 @@ class Summary3(Task):
inplace=True) inplace=True)
def update(cat): def update(cat):
cursor = self.local_db[f'summary_{cat}1'].find( cursor = self.local_db[f'summary_{cat}'].find(
{'cdate': role_cdate}, { {'cdate': role_cdate}, {
'_id': True, '_id': True,
f'now_pay_{cat}_list': True, f'now_pay_{cat}_list': True,
@ -317,8 +317,8 @@ class Summary3(Task):
if n: if n:
data[f'{cat}_login_{age_day}'] = n data[f'{cat}_login_{age_day}'] = n
if data: if data:
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) self.local_db[f'summary_{cat}'].update_one({'_id': doc['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data}) self.remote_db[f'summary_{cat}'].update_one({'_id': doc['_id']}, {'$set': data})
for role_cdate in df['role_cdate'].unique(): for role_cdate in df['role_cdate'].unique():
role_cdate = int(role_cdate) role_cdate = int(role_cdate)
@ -508,12 +508,12 @@ class Summary3(Task):
def update(cat, data): def update(cat, data):
if not data.get(f'new_{cat}'): if not data.get(f'new_{cat}'):
return return
obj_id = self.local_db[f'summary_{cat}1'].update_one( obj_id = self.local_db[f'summary_{cat}'].update_one(
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate}, {'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
{'$set': data}, upsert=True) {'$set': data}, upsert=True)
if obj_id.upserted_id: if obj_id.upserted_id:
data['_id'] = obj_id.upserted_id data['_id'] = obj_id.upserted_id
self.remote_db[f'summary_{cat}1'].update_one( self.remote_db[f'summary_{cat}'].update_one(
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate}, {'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
{'$set': data}, upsert=True) {'$set': data}, upsert=True)