diff --git a/config.json b/config.json index 0575023..e832c75 100644 --- a/config.json +++ b/config.json @@ -66,5 +66,10 @@ "source_coll": "event", "dest_coll": "summary_join_hd", "task_name": "summary_join_hd" + }, + "summary_open_hd": { + "source_coll": "event", + "dest_coll": "summary_open_hd", + "task_name": "summary_open_hd" } } \ No newline at end of file diff --git a/task/summary_open_hd.py b/task/summary_open_hd.py new file mode 100644 index 0000000..dbd4b06 --- /dev/null +++ b/task/summary_open_hd.py @@ -0,0 +1,56 @@ +import traceback + +from pymongo import UpdateOne +from pydantic import Field +import pandas as pd + +from .task import Task +from utils import * +from model import GBaseModel, MdbObjectId + + +class SummaryOpenHd(Task): + """ + 打开活动 + """ + + class Model(GBaseModel): + id: MdbObjectId = Field(..., title="id", alias='_id') + htype: str = Field(None, title='活动类型') + cdate: int = Field(..., title='当天0点') + + def cleaning(self, cursor_list): + for cursor in cursor_list: # type:dict + for source_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') + where = { + '_event_name': 'Activity', + 'act': 'click', + 'htype': {"$exists": 1}, + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + + projection = self.Model.get_fields() + bulk_data = [] + cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) + + for item in self.local_db[source_coll].find(where, projection): + try: + item['cdate'] = cdate + item['htype'] = str(item['htype']) + model = self.Model(**item) + data = model.dict(by_alias=True) + bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) + except Exception as e: + msg = traceback.format_exc() + ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') + logger.error(repr(e)) + if bulk_data: + self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])