diff --git a/config.json b/config.json index b39b35b..7fd61b7 100644 --- a/config.json +++ b/config.json @@ -23,6 +23,11 @@ "source_coll": "user", "dest_coll": "user", "task_name": "add_user_flag" + }, + "summary_pay": { + "source_coll": "paylist", + "dest_coll": "summary_pay", + "task_name": "summary_pay" } } \ No newline at end of file diff --git a/task/summary_pay.py b/task/summary_pay.py new file mode 100644 index 0000000..b527238 --- /dev/null +++ b/task/summary_pay.py @@ -0,0 +1,67 @@ +from pymongo import UpdateOne +from pydantic import BaseModel, Field +import pandas as pd + +from .task import Task +from utils import * +from model import IntFloat, GBaseModel + + + +class SummaryPay(Task): + """ + SummaryPay + """ + + class Model(GBaseModel): + money: IntFloat = Field(..., title='金额') + orderid: str = Field(..., title='订单号') + proid: str = Field(..., title='计费点') + first_device_id: str = Field(None, title='注册设备', alias='_first_device_id') + user_name: str = Field(None, title='用户名') + screen_width: int = Field(None, title='屏幕宽', alias='_screen_width') + screen_height: int = Field(None, title='屏幕高', alias='_screen_height') + manufacturer: str = Field(None, title='设备品牌', alias='_manufacturer') + model: str = Field(None, title='型号', alias='_model') + os_version: str = Field(None, title='系统版本', alias='_os_version') + app_name: str = Field(None, title='游戏版本', alias='_app_name') + cdate: int = Field(..., title='当天0点') + + @classmethod + def get_fields(cls): + return [v.alias for v in cls.__fields__.values()] + + def cleaning(self, cursor_list): + for cursor in cursor_list: # type:dict + for source_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') + where = { + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) + projection = self.Model.get_fields() + bulk_data = [] + for item in self.local_db[source_coll].find(where, projection): + try: + item['cdate'] = cdate + user_info = self.local_db['user'].find_one({'_game_role_id': item['_game_role_id']}, projection) + for k, v in user_info.items(): + item[k] = item.get(k) or user_info[k] + model = self.Model(**item) + data = model.dict(by_alias=True) + _game_role_id = data.pop('_game_role_id') + data.pop('_id') + bulk_data.append(UpdateOne({'orderid': model.orderid}, {'$set': data}, upsert=True)) + except Exception as e: + logger.error(f'msg:{e}') + # pass + if bulk_data: + self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])