data_cleaning/task/summary_online_time.py
2021-01-11 09:58:33 +08:00

97 lines
4.3 KiB
Python

import time
import traceback
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
import numpy as np
from model import BaseModel
from .task import Task
from utils import *
class SummaryOnlineTime(Task):
"""
在线时长
"""
class Model(BaseModel):
cdate: int = Field(..., title='当天0点')
platform: str = Field(..., min_length=1, title="平台", alias='_platform')
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')
device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id')
district_server_id: int = Field(..., title="区服id", alias='_district_server_id')
game_role_id: str = Field(..., min_length=1, title="角色id", alias='_game_role_id')
# 处理天游标
def get_cursor(self):
if not self.cursor_st:
self.cursor_st = self.task_info.get('cursor_et')
if self.cursor_st and self.cursor_st >= int(
pd.Timestamp(time.time(), unit='s', tz=self.timezone).normalize().timestamp()):
self.cursor_st -= 86400
if not self.cursor_st:
self.cursor_st = int(time.time()) - 86400
super().get_cursor()
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq)
df = pd.DataFrame(index=date_index[:-1])
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()])
return df
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
# 查出该游标活跃用户
pipeline = [
{
'$match': {
'_event_name': 'TimeSpending'
}
}, {
'$group': {
'_id': '$_game_role_id',
'ts': {
'$sum': '$ts'
}
}
}
]
group_ts = self.local_db[source_coll].aggregate(pipeline=pipeline)
role_ts = {role['_id']: role['ts'] for role in group_ts}
projection = self.Model.get_fields()
where = {
'_game_role_id': {'$in': list(role_ts)}
}
bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db['user'].find(where, projection):
try:
item['cdate'] = cdate
model = self.Model(**item)
data = model.dict(by_alias=True)
data['ts'] = role_ts[data['_game_role_id']]
bulk_data.append(
UpdateOne({'_game_role_id': data['_game_role_id'], 'cdate': data['cdate']}, {'$set': data},
upsert=True))
logger.debug(f'处理 {data["_game_role_id"]}')
except Exception as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
logger.debug(f'准备写入{len(bulk_data)}')
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
logger.debug('写入完成')
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])