sync userinfo

This commit is contained in:
kf_wuhao 2020-12-25 11:32:03 +08:00
parent fd92f3fad8
commit 041910a07b
3 changed files with 50 additions and 1 deletions

View File

@ -38,5 +38,16 @@
"source_coll": "user",
"dest_coll": "attr",
"task_name": "owner_channel_server"
},
"summary3": {
"source_coll": "user",
"dest_coll": "",
"task_name": "summary3",
"freq": "D"
},
"sync_user": {
"source_coll": "user",
"dest_coll": "user",
"task_name": "sync_user"
}
}

36
task/sync_user.py Normal file
View File

@ -0,0 +1,36 @@
import pymongo
from pymongo import UpdateOne
from pydantic import BaseModel, Field
from .task import Task
from utils import *
class SyncUser(Task):
"""
同步user info
"""
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = {
'_ut': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
}
}
bulk_data = []
for item in self.local_db[source_coll].find(where, {'_id': False}):
try:
bulk_data.append(
UpdateOne({'_game_role_id': item['_game_role_id']},
{'$set': item}, upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
if bulk_data:
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])

View File

@ -31,6 +31,8 @@ class Task(metaclass=abc.ABCMeta):
}
self.task_info = self.get_task_info()
self.freq = kwargs.get('freq', '30T')
def get_task_info(self):
task_info = self.task_coll.find_one(self.task_where) or {}
return task_info
@ -87,7 +89,7 @@ class Task(metaclass=abc.ABCMeta):
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq='30T')
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq=self.freq)
df = pd.DataFrame(index=date_index)
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone)])