import time import traceback from pymongo import UpdateOne from pydantic import Field import pandas as pd import numpy as np from model import BaseModel from .task import Task from utils import * class SummaryOnlineTime(Task): """ 在线时长 """ class Model(BaseModel): cdate: int = Field(..., title='当天0点') platform: str = Field(..., min_length=1, title="平台", alias='_platform') channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name') owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name') channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid') device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id') district_server_id: int = Field(..., title="区服id", alias='_district_server_id') game_role_id: str = Field(..., min_length=1, title="角色id", alias='_game_role_id') # 处理天游标 def get_cursor(self): if not self.cursor_st: self.cursor_st = self.task_info.get('cursor_et') if self.cursor_st and self.cursor_st >= int( pd.Timestamp(time.time(), unit='s', tz=self.timezone).normalize().timestamp()): self.cursor_st -= 86400 if not self.cursor_st: self.cursor_st = int(time.time()) - 86400 super().get_cursor() def generate_cursor_time(self): date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(), pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq) df = pd.DataFrame(index=date_index[:-1]) df['st'] = df.index df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()]) return df def cleaning(self, cursor_list): for cursor in cursor_list: # type:dict for source_coll, ts in cursor.items(): # type:str,dict if ts['cursor_st'] == ts['cursor_et']: continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') # 查出该游标活跃用户 pipeline = [ { '$match': { '_event_name': 'TimeSpending' } }, { '$group': { '_id': '$_game_role_id', 'ts': { '$sum': '$ts' } } } ] group_ts = self.local_db[source_coll].aggregate(pipeline=pipeline) role_ts = {role['_id']: role['ts'] for role in group_ts} projection = self.Model.get_fields() where = { '_game_role_id': {'$in': list(role_ts)} } bulk_data = [] cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) for item in self.local_db['user'].find(where, projection): try: item['cdate'] = cdate model = self.Model(**item) data = model.dict(by_alias=True) data['ts'] = role_ts[data['_game_role_id']] bulk_data.append( UpdateOne({'_game_role_id': data['_game_role_id'], 'cdate': data['cdate']}, {'$set': data}, upsert=True)) logger.debug(f'处理 {data["_game_role_id"]}') except Exception as e: msg = traceback.format_exc() ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') logger.error(repr(e)) if bulk_data: logger.debug(f'准备写入{len(bulk_data)}条') self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) logger.debug('写入完成') self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])