data_cleaning/task/summary_join_hd.py
2021-01-11 20:12:21 +08:00

73 lines
3.4 KiB
Python

import time
import traceback
from typing import List, Dict, Any
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
from .task import Task
from utils import *
from model import BaseModel, MdbObjectId, IntStr
class SummaryJoinHd(Task):
"""
参与活动
"""
class Model(BaseModel):
id: MdbObjectId = Field(..., title="id", alias='_id')
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')
device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id')
district_server_id: int = Field(..., title="区服id", alias='_district_server_id')
game_role_id: str = Field(..., min_length=1, title="角色id", alias='_game_role_id')
event_time: int = Field(..., title="事件时间", alias='_event_time')
role_level: int = Field(None, title="角色等级")
role_vip: int = Field(None, title="角色vip等级")
role_stage: IntStr = Field(None, title="关卡")
prize: List[Dict] = Field(None, title='奖励')
need: List[Dict] = Field(None, title='消耗')
htype: str = Field(None, title='活动类型')
hd_idx: Any = Field(None, title='档位')
cdate: int = Field(..., title='当天0点')
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = {
'_event_name': 'res',
'function': 'hdgetprize',
'_event_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
}
}
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
projection = self.Model.get_fields()
projection.extend(['function_data', 'function_detail'])
bulk_data = []
for item in self.local_db[source_coll].find(where, projection):
try:
item['cdate'] = cdate
item['htype'] = str(item['function_detail'])
item['hd_idx'] = 0
if isinstance(item['function_data'], dict):
item['hd_idx'] = item['function_data'].get('idx', 0)
model = self.Model(**item)
data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
except Exception as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])