data_cleaning/task/task.py
2021-01-11 15:04:47 +08:00

170 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import abc
import time
import pandas as pd
import numpy as np
from db import *
from utils import *
""""
考虑游标过大切分为30分钟块写入
采用批量无序操作提高写性能 bulk_write
"""
class Task(metaclass=abc.ABCMeta):
def __init__(self, *args, **kwargs):
self.game_name = kwargs.get('game_name')
self.source_coll = kwargs.get('source_coll')
self.dest_coll = kwargs.get('dest_coll')
self.cursor_st = kwargs.get('st')
self.cursor_et = kwargs.get('et')
self.timezone = kwargs.get('timezone')
self.task_name = kwargs.get("task_name")
self.freq = kwargs.get('freq', '30T')
for k, v in kwargs.items():
self.__setattr__(k, v)
self.game_db = f'game_{self.game_name}'
self.local_db = get_local_db(self.game_db)
self.remote_db = get_remote_db(self.game_db)
self.task_coll = self.local_db['task2']
self.task_where = {
'name': self.task_name
}
self.task_info = self.get_task_info()
logger.debug(f'初始化完成 当前{settings.run_model}模式')
def get_task_info(self):
task_info = self.task_coll.find_one(self.task_where) or {}
return task_info
def check_run(self) -> bool:
is_run = self.task_info.get('is_run')
last_ts = self.task_info.get('run_ts', 0)
time_out = self.task_info.get('time_out', 86400)
if not last_ts and not is_run:
# 第一次运行
return True
if not is_run:
# 可以运行
return True
elif int(time.time()) - last_ts > time_out:
# 任务超时
ddsend_msg(f'{self.game_name} {self.task_name} 任务超时')
logger.info(f'{self.game_name} 钉钉通知')
return False
else:
# 正在运行没超时
logger.info(f'{self.game_name} 正在运行没超时')
ddsend_msg(f'{self.game_name} {self.task_name} 已运行 {time.time() - last_ts}s')
return False
def set_run_ts(self):
self.task_coll.update_one(self.task_where, {
'$set': {'run_ts': int(time.time())}
}, upsert=True)
def get_cursor(self):
"""
没有手动设置游标从taskinfo接着上次执行
任务第一次执行取当天0点
:return:
"""
if not self.cursor_st:
self.cursor_st = self.task_info.get('cursor_et')
if not self.cursor_st:
self.cursor_st = int(pd.Timestamp(time.time(), unit='s', tz=self.timezone).normalize().timestamp())
if not self.cursor_et:
self.cursor_et = int(time.time())
def set_cursor(self, **kwargs):
"""
本次任务完成设置游标
:return: None
"""
if kwargs and set(kwargs) > {'cursor_et', 'cursor_st'}:
raise ValueError('设置游标不合理')
kwargs['run_finish_ts'] = int(time.time())
self.task_coll.update_one(self.task_where, {
'$set': kwargs}, upsert=True)
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq=self.freq)
df = pd.DataFrame(index=date_index)
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone)])
return df
def get_single_coll(self) -> list:
df = self.generate_cursor_time()
cursor_list = []
for k, item in df.T.items():
cursor_list.append({self.source_coll: {
'cursor_st': int(item['st'].timestamp()),
'cursor_et': int(item['et'].timestamp()),
}}
)
return cursor_list
def get_event_coll(self) -> list:
"""
根据游标时间戳 返回要处理的集合
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}},
{'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
"""
df = self.generate_cursor_time()
df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
df['event_coll_e'] = df['et'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
cursor_list = []
for k, item in df.T.items():
data = {}
if item['event_coll_s'] != item['event_coll_e']:
data[item['event_coll_s']] = {
'cursor_st': int(item['st'].timestamp()),
'cursor_et': int(item['et'].normalize().timestamp()),
}
data[item['event_coll_e']] = {
'cursor_st': int(item['et'].normalize().timestamp()),
'cursor_et': int(item['et'].timestamp()),
}
else:
data[item['event_coll_s']] = {
'cursor_st': int(item['st'].timestamp()),
'cursor_et': int(item['et'].timestamp()),
}
cursor_list.append(data)
return cursor_list
def get_source_coll(self) -> list:
if self.source_coll == 'event':
return self.get_event_coll()
else:
return self.get_single_coll()
def set_run_status(self, status: bool):
"""
设置运行状态
:param status:
:return:
"""
self.task_coll.update_one(self.task_where, {'$set': {'is_run': status}}, upsert=True)
@abc.abstractmethod
def cleaning(self, cursor_list):
pass
def run(self):
if settings.run_model == 'production' and not self.check_run():
return '运行中...'
self.set_run_ts()
self.set_run_status(True)
self.get_cursor()
cursor_list = self.get_source_coll()
self.cleaning(cursor_list)
self.set_run_status(False)