diff --git a/config.json b/config.json index 2278e5a..b39b35b 100644 --- a/config.json +++ b/config.json @@ -18,6 +18,11 @@ "source_coll": "event", "dest_coll": "summary_login", "task_name": "summary_login" + }, + "add_user_flag": { + "source_coll": "user", + "dest_coll": "user", + "task_name": "add_user_flag" } } \ No newline at end of file diff --git a/task/add_user_flag.py b/task/add_user_flag.py new file mode 100644 index 0000000..1a2e8c8 --- /dev/null +++ b/task/add_user_flag.py @@ -0,0 +1,84 @@ +import pymongo +from pymongo import UpdateOne +from pydantic import BaseModel, Field + +from .task import Task +from utils import * + + +class AddUserFlag(Task): + """ + 添加新设备标记 + """ + + class Model(BaseModel): + game_role_id: str = Field(..., title="角色id", alias='_game_role_id') + device_id: str = Field(..., title='设备id', alias='_device_id') + channel_uid: str = Field(..., title="channel_uid", alias='_channel_uid') + role_create_time: int = Field(..., title="注册时间戳") + + @classmethod + def get_fields(cls): + return [v.alias for v in cls.__fields__.values()] + + def cleaning(self, cursor_list): + for cursor in cursor_list: # type:dict + for source_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') + projection = self.Model.get_fields() + bulk_data = [] + # 处理新账号 + where = { + 'role_create_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + }, + 'is_new_channel_uid': {'$exists': False} + } + + for item in self.local_db[source_coll].find(where, projection).sort('role_create_time', + pymongo.ASCENDING): + try: + # 新账号 + model = self.Model(**item) + # 查找该channel_uid在之前有么 + role_cnt = self.local_db[source_coll].count( + {'_channel_uid': model.channel_uid, 'role_create_time': {'$lt': model.role_create_time}}) + if not role_cnt: + bulk_data.append( + UpdateOne({'_game_role_id': model.game_role_id}, + {'$set': {'is_new_channel_uid': 1}})) + except Exception as e: + logger.error(f'msg:{e}') + # pass + + + # 处理设备 + where = { + 'role_create_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + }, + 'is_new_device': {'$exists': False} + } + for item in self.local_db[source_coll].find(where, projection).sort('role_create_time', + pymongo.ASCENDING): + try: + # 新设备 + model = self.Model(**item) + # 查找该_device_id在之前有么 + role_cnt = self.local_db[source_coll].count( + {'_device_id': model.device_id, 'role_create_time': {'$lt': model.role_create_time}}) + if not role_cnt: + bulk_data.append( + UpdateOne({'_game_role_id': model.game_role_id}, + {'$set': {'is_new_device': 1}})) + except Exception as e: + logger.error(f'msg:{e}') + # pass + if bulk_data: + self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])