From 0db503885b03921e20d3b00797d72b12049460ea Mon Sep 17 00:00:00 2001 From: kf_wuhao <15392746632@qq.com> Date: Wed, 23 Dec 2020 13:35:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=A6=96=E6=AC=A1=E4=BB=98?= =?UTF-8?q?=E8=B4=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- task/first_recharge.py | 59 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 task/first_recharge.py diff --git a/task/first_recharge.py b/task/first_recharge.py new file mode 100644 index 0000000..20c8622 --- /dev/null +++ b/task/first_recharge.py @@ -0,0 +1,59 @@ +from pymongo import UpdateOne +from pydantic import BaseModel, Field, validator +import pandas as pd + +from .task import Task +from utils import * + + +class FirstRecharge(Task): + """ + 首次充值记录 + """ + + class Model(BaseModel): + role_level: int = Field(None, title='角色等级') + role_vip: int = Field(None, title='vip等级') + role_stage: IntStr = Field(None, title='关卡') + money: IntFloat = Field(..., title='金额') + game_role_id: str = Field(..., title='角色id', alias='_game_role_id') + orderid: str = Field(..., title='订单号') + proid: str = Field(..., title='计费点') + cdate: int = Field(..., title='当天0点') + + @classmethod + def get_fields(cls): + return [v.alias for v in cls.__fields__.values()] + + def cleaning(self, cursor_list): + for cursor in cursor_list: # type:dict + for event_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {event_coll} 游标 {ts}') + where = { + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + + projection = self.Model.get_fields() + bulk_data = [] + for item in self.local_db[event_coll].find(where, projection): + try: + item['cdate'] = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) + model = self.Model(**item) + data = model.dict(by_alias=True) + _game_role_id = data.pop('_game_role_id') + bulk_data.append( + UpdateOne({'_game_role_id': _game_role_id, 'is_recharge': {'$exists': False}}, + {'$set': {'is_recharge': data}})) + except Exception as e: + logger.error(f'msg:{e}') + # pass + if bulk_data: + self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])