168 lines
5.8 KiB
Python
168 lines
5.8 KiB
Python
import abc
|
||
import time
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
from db import *
|
||
from utils import *
|
||
|
||
""""
|
||
考虑游标过大,切分为30分钟块写入
|
||
采用批量无序操作提高写性能 bulk_write
|
||
"""
|
||
|
||
|
||
class Task(metaclass=abc.ABCMeta):
|
||
def __init__(self, *args, **kwargs):
|
||
self.game_name = kwargs.get('game_name')
|
||
self.source_coll = kwargs.get('source_coll')
|
||
self.dest_coll = kwargs.get('dest_coll')
|
||
self.cursor_st = kwargs.get('st')
|
||
self.cursor_et = kwargs.get('et')
|
||
self.timezone = kwargs.get('timezone')
|
||
self.task_name = kwargs.get("task_name")
|
||
self.freq = kwargs.get('freq', '30T')
|
||
for k, v in kwargs.items():
|
||
self.__setattr__(k, v)
|
||
self.game_db = f'game_{self.game_name}'
|
||
self.local_db = get_local_db(self.game_db)
|
||
self.remote_db = get_remote_db(self.game_db)
|
||
self.task_coll = self.local_db['task2']
|
||
self.task_where = {
|
||
'name': self.task_name
|
||
}
|
||
self.task_info = self.get_task_info()
|
||
|
||
def get_task_info(self):
|
||
task_info = self.task_coll.find_one(self.task_where) or {}
|
||
return task_info
|
||
|
||
def check_run(self) -> bool:
|
||
is_run = self.task_info.get('is_run')
|
||
last_ts = self.task_info.get('run_ts', 0)
|
||
time_out = self.task_info.get('time_out', 86400)
|
||
if not last_ts and not is_run:
|
||
# 第一次运行
|
||
return True
|
||
if not is_run:
|
||
# 可以运行
|
||
return True
|
||
elif int(time.time()) - last_ts > time_out:
|
||
# 任务超时
|
||
ddsend_msg(f'{self.game_name} {self.task_name} 任务超时')
|
||
logger.info(f'{self.game_name} 钉钉通知')
|
||
return False
|
||
else:
|
||
# 正在运行没超时
|
||
logger.info(f'{self.game_name} 正在运行没超时')
|
||
ddsend_msg(f'{self.game_name} {self.task_name} 已运行 {time.time()-last_ts}s')
|
||
return False
|
||
|
||
def set_run_ts(self):
|
||
self.task_coll.update_one(self.task_where, {
|
||
'$set': {'run_ts': int(time.time())}
|
||
}, upsert=True)
|
||
|
||
def get_cursor(self):
|
||
"""
|
||
没有手动设置游标,从taskinfo接着上次执行
|
||
任务第一次执行取当天0点
|
||
:return:
|
||
"""
|
||
if not self.cursor_st:
|
||
self.cursor_st = self.task_info.get('cursor_et')
|
||
|
||
if not self.cursor_st:
|
||
self.cursor_st = int(pd.Timestamp(time.time(), unit='s', tz=self.timezone).normalize().timestamp())
|
||
|
||
if not self.cursor_et:
|
||
self.cursor_et = int(time.time())
|
||
|
||
def set_cursor(self, **kwargs):
|
||
"""
|
||
本次任务完成设置游标
|
||
:return: None
|
||
"""
|
||
if kwargs and set(kwargs) > {'cursor_et', 'cursor_st'}:
|
||
raise ValueError('设置游标不合理')
|
||
self.task_coll.update_one(self.task_where, {
|
||
'$set': kwargs}, upsert=True)
|
||
|
||
def generate_cursor_time(self):
|
||
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone),
|
||
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq=self.freq)
|
||
df = pd.DataFrame(index=date_index)
|
||
df['st'] = df.index
|
||
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone)])
|
||
return df
|
||
|
||
def get_single_coll(self) -> list:
|
||
df = self.generate_cursor_time()
|
||
cursor_list = []
|
||
for k, item in df.T.items():
|
||
cursor_list.append({self.source_coll: {
|
||
'cursor_st': int(item['st'].timestamp()),
|
||
'cursor_et': int(item['et'].timestamp()),
|
||
}}
|
||
)
|
||
return cursor_list
|
||
|
||
def get_event_coll(self) -> list:
|
||
"""
|
||
根据游标时间戳 返回要处理的集合
|
||
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}},
|
||
{'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
|
||
"""
|
||
df = self.generate_cursor_time()
|
||
df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
|
||
df['event_coll_e'] = df['et'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
|
||
cursor_list = []
|
||
for k, item in df.T.items():
|
||
data = {}
|
||
if item['event_coll_s'] != item['event_coll_e']:
|
||
data[item['event_coll_s']] = {
|
||
'cursor_st': int(item['st'].timestamp()),
|
||
'cursor_et': int(item['et'].normalize().timestamp()),
|
||
}
|
||
data[item['event_coll_e']] = {
|
||
'cursor_st': int(item['et'].normalize().timestamp()),
|
||
'cursor_et': int(item['et'].timestamp()),
|
||
}
|
||
else:
|
||
data[item['event_coll_s']] = {
|
||
'cursor_st': int(item['st'].timestamp()),
|
||
'cursor_et': int(item['et'].timestamp()),
|
||
}
|
||
|
||
cursor_list.append(data)
|
||
return cursor_list
|
||
|
||
def get_source_coll(self) -> list:
|
||
if self.source_coll == 'event':
|
||
return self.get_event_coll()
|
||
else:
|
||
return self.get_single_coll()
|
||
|
||
def set_run_status(self, status: bool):
|
||
"""
|
||
设置运行状态
|
||
:param status:
|
||
:return:
|
||
"""
|
||
self.task_coll.update_one(self.task_where, {'$set': {'is_run': status}}, upsert=True)
|
||
|
||
@abc.abstractmethod
|
||
def cleaning(self, cursor_list):
|
||
pass
|
||
|
||
def run(self):
|
||
if not self.check_run():
|
||
return '运行中...'
|
||
self.set_run_ts()
|
||
self.set_run_status(True)
|
||
self.get_cursor()
|
||
cursor_list = self.get_source_coll()
|
||
self.cleaning(cursor_list)
|
||
self.set_run_status(False)
|