diff --git a/config.json b/config.json index fedecf8..0575023 100644 --- a/config.json +++ b/config.json @@ -61,5 +61,10 @@ "dest_coll": "summary_online_time", "task_name": "summary_online_time", "freq": "D" + }, + "summary_join_hd": { + "source_coll": "event", + "dest_coll": "summary_join_hd", + "task_name": "summary_join_hd" } } \ No newline at end of file diff --git a/model/model.py b/model/model.py index 8bb864e..aa40e2f 100644 --- a/model/model.py +++ b/model/model.py @@ -1,10 +1,16 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel as BModel +from pydantic import Field from model.field_type import IntStr -class GBaseModel(BaseModel): +class BaseModel(BModel): + @classmethod + def get_fields(cls): + return [v.alias for v in cls.__fields__.values()] + +class GBaseModel(BaseModel): platform: str = Field(None, title="平台", alias='_platform') channel_name: str = Field(None, title="channel", alias='_channel_name') owner_name: str = Field(None, title="owner", alias='_owner_name') @@ -18,15 +24,6 @@ class GBaseModel(BaseModel): role_vip: int = Field(None, title="角色vip等级") role_stage: IntStr = Field(None, title="关卡") - @classmethod - def get_fields(cls): - return [v.alias for v in cls.__fields__.values()] - - -class BaseModel(BaseModel): - @classmethod - def get_fields(cls): - return [v.alias for v in cls.__fields__.values()] if __name__ == '__main__': diff --git a/task/summary_join_hd.py b/task/summary_join_hd.py new file mode 100644 index 0000000..637c8de --- /dev/null +++ b/task/summary_join_hd.py @@ -0,0 +1,73 @@ +import time +import traceback +from typing import List, Dict, Any + +from pymongo import UpdateOne +from pydantic import Field +import pandas as pd + +from .task import Task +from utils import * +from model import BaseModel, MdbObjectId, IntStr + + +class SummaryJoinHd(Task): + """ + 参与活动 + """ + + class Model(BaseModel): + id: MdbObjectId = Field(..., title="id", alias='_id') + channel_name: str = Field(None, title="channel", alias='_channel_name') + owner_name: str = Field(None, title="owner", alias='_owner_name') + channel_uid: str = Field(None, title="channel_uid", alias='_channel_uid') + device_id: str = Field(None, title='device_id', alias='_device_id') + district_server_id: int = Field(None, title="区服id", alias='_district_server_id') + game_role_id: str = Field(None, title="角色id", alias='_game_role_id') + event_time: int = Field(..., title="事件时间", alias='_event_time') + role_create_time: int = Field(None, title="角色创建时间") + role_level: int = Field(None, title="角色等级") + role_vip: int = Field(None, title="角色vip等级") + role_stage: IntStr = Field(None, title="关卡") + prize: List[Dict] = Field(None, title='奖励') + need: List[Dict] = Field(None, title='消耗') + htype: str = Field(None, title='活动类型') + hd_idx: Any = Field(None, title='档位') + cdate: int = Field(..., title='当天0点') + + def cleaning(self, cursor_list): + for cursor in cursor_list: # type:dict + for source_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') + where = { + '_event_name': 'res', + 'function': 'hdgetprize', + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) + projection = self.Model.get_fields() + projection.extend(['function_data', 'function_detail']) + bulk_data = [] + for item in self.local_db[source_coll].find(where, projection): + try: + item['cdate'] = cdate + item['htype'] = str(item['function_detail']) + item['hd_idx'] = 0 + if isinstance(item['function_data'], dict): + item['hd_idx'] = item['function_data'].get('idx', 0) + model = self.Model(**item) + data = model.dict(by_alias=True) + bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) + except Exception as e: + msg = traceback.format_exc() + ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') + logger.error(repr(e)) + if bulk_data: + self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et']) diff --git a/task/task.py b/task/task.py index 7f6bd94..b102b91 100644 --- a/task/task.py +++ b/task/task.py @@ -87,6 +87,7 @@ class Task(metaclass=abc.ABCMeta): """ if kwargs and set(kwargs) > {'cursor_et', 'cursor_st'}: raise ValueError('设置游标不合理') + kwargs['run_finish_ts'] = int(time.time()) self.task_coll.update_one(self.task_where, { '$set': kwargs}, upsert=True)