diff --git a/task/summary_login.py b/task/summary_login.py index 4f79e58..94011f9 100644 --- a/task/summary_login.py +++ b/task/summary_login.py @@ -29,11 +29,10 @@ class SummaryLogin(Task): continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') where = { - '_event_time': { + '_ut': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], - }, - '_event_name': 'TimeSpending' # 在线时长打点 30s + } } projection = self.Model.get_fields() @@ -47,14 +46,15 @@ class SummaryLogin(Task): # 还没有记录的 role_set = set(role_list) - set(exists_role_list) - for role_id in role_set: - item = self.local_db[source_coll].find_one( - {'_game_role_id': role_id, '_event_name': 'TimeSpending'}, projection) + for item in self.local_db[source_coll].find({'_game_role_id': {'$in': list(role_set)}}, projection): try: item['cdate'] = cdate model = self.Model(**item) data = model.dict(by_alias=True) - bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) + data.pop('_id') + bulk_data.append( + UpdateOne({'cdate': cdate, '_game_role_id': data['_game_role_id']}, {'$set': data}, + upsert=True)) except Exception as e: logger.error(f'msg:{e}') # pass