diff --git a/task/summary_shopbuy.py b/task/summary_shopbuy.py index b399bef..cb74abf 100644 --- a/task/summary_shopbuy.py +++ b/task/summary_shopbuy.py @@ -21,7 +21,7 @@ class SummaryShopbuy(Task): prize: List[dict] = Field(None, title='奖励') need: List[dict] = Field(None, title='消耗') user_name: str = Field(None, title='昵称') - stype: IntStr = Field(None, title='商店id') + stype: int = Field(None, title='商店id') needa: str = Field(...) needn: int = Field(...) needt: str = Field(...) @@ -35,27 +35,40 @@ class SummaryShopbuy(Task): if ts['cursor_st'] == ts['cursor_et']: continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') - where = { - '_event_name': 'Shop', - "act": "buy", - '_event_time': { - '$gte': ts['cursor_st'], - '$lt': ts['cursor_et'], - } - } - projection = self.Model.get_fields() + + if self.game_name in ['shenghuajiyuan']: + where = { + "_event_name": "shopBuy", + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + projection.append('shopType') + + else: + where = { + '_event_name': 'Shop', + "act": "buy", + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + bulk_data = [] for item in self.local_db[source_coll].find(where, projection): try: item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ - .normalize().timestamp()) + .normalize().timestamp()) item['needa'] = item['need'][0]['a'] item['needt'] = item['need'][0]['t'] item['needn'] = item['need'][0]['n'] item['prizea'] = item['prize'][0]['a'] item['prizet'] = item['prize'][0]['t'] item['prizen'] = item['prize'][0]['n'] + item['stype'] = item.get('stype') or item.get('shopType', '255') model = self.Model(**item) data = model.dict(by_alias=True) bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))