import abc import time import pandas as pd import numpy as np from db import * from utils import * """" 考虑游标过大,切分为30分钟块写入 采用批量无序操作提高写性能 bulk_write """ class Task(metaclass=abc.ABCMeta): def __init__(self, *args, **kwargs): self.game_name = kwargs.get('game_name') self.source_coll = kwargs.get('source_coll') self.dest_coll = kwargs.get('dest_coll') self.cursor_st = kwargs.get('st') self.cursor_et = kwargs.get('et') self.timezone = kwargs.get('timezone') self.task_name = kwargs.get("task_name") self.freq = kwargs.get('freq', '30T') for k, v in kwargs.items(): self.__setattr__(k, v) self.game_db = f'game_{self.game_name}' self.local_db = get_local_db(self.game_db) self.remote_db = get_remote_db(self.game_db) self.task_coll = self.local_db['task2'] self.task_where = { 'name': self.task_name } self.task_info = self.get_task_info() logger.debug(f'初始化完成 当前{settings.run_model}模式') def get_task_info(self): task_info = self.task_coll.find_one(self.task_where) or {} return task_info def check_run(self) -> bool: is_run = self.task_info.get('is_run') last_ts = self.task_info.get('run_ts', 0) time_out = self.task_info.get('time_out', 86400) if not last_ts and not is_run: # 第一次运行 return True if not is_run: # 可以运行 return True elif int(time.time()) - last_ts > time_out: # 任务超时 ddsend_msg(f'{self.game_name} {self.task_name} 任务超时') logger.info(f'{self.game_name} 钉钉通知') return False else: # 正在运行没超时 logger.info(f'{self.game_name} 正在运行没超时') ddsend_msg(f'{self.game_name} {self.task_name} 已运行 {time.time() - last_ts}s') return False def set_run_ts(self): self.task_coll.update_one(self.task_where, { '$set': {'run_ts': int(time.time())} }, upsert=True) def get_cursor(self): """ 没有手动设置游标,从taskinfo接着上次执行 任务第一次执行取当天0点 :return: """ if not self.cursor_st: self.cursor_st = self.task_info.get('cursor_et') if not self.cursor_st: self.cursor_st = int(pd.Timestamp(time.time(), unit='s', tz=self.timezone).normalize().timestamp()) if not self.cursor_et: self.cursor_et = int(time.time()) def set_cursor(self, **kwargs): """ 本次任务完成设置游标 :return: None """ if kwargs and set(kwargs) > {'cursor_et', 'cursor_st'}: raise ValueError('设置游标不合理') kwargs['run_finish_ts'] = int(time.time()) self.task_coll.update_one(self.task_where, { '$set': kwargs}, upsert=True) def generate_cursor_time(self): date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone), pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq=self.freq) df = pd.DataFrame(index=date_index) df['st'] = df.index df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone)]) return df def get_single_coll(self) -> list: df = self.generate_cursor_time() cursor_list = [] for k, item in df.T.items(): cursor_list.append({self.source_coll: { 'cursor_st': int(item['st'].timestamp()), 'cursor_et': int(item['et'].timestamp()), }} ) return cursor_list def get_event_coll(self) -> list: """ 根据游标时间戳 返回要处理的集合 :return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}] """ df = self.generate_cursor_time() df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}') df['event_coll_e'] = df['et'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}') cursor_list = [] for k, item in df.T.items(): data = {} if item['event_coll_s'] != item['event_coll_e']: data[item['event_coll_s']] = { 'cursor_st': int(item['st'].timestamp()), 'cursor_et': int(item['et'].normalize().timestamp()), } data[item['event_coll_e']] = { 'cursor_st': int(item['et'].normalize().timestamp()), 'cursor_et': int(item['et'].timestamp()), } else: data[item['event_coll_s']] = { 'cursor_st': int(item['st'].timestamp()), 'cursor_et': int(item['et'].timestamp()), } cursor_list.append(data) return cursor_list def get_source_coll(self) -> list: if self.source_coll == 'event': return self.get_event_coll() else: return self.get_single_coll() def set_run_status(self, status: bool): """ 设置运行状态 :param status: :return: """ self.task_coll.update_one(self.task_where, {'$set': {'is_run': status}}, upsert=True) @abc.abstractmethod def cleaning(self, cursor_list): pass def run(self): if settings.run_model == 'production' and not self.check_run(): return '运行中...' self.set_run_ts() self.set_run_status(True) self.get_cursor() cursor_list = self.get_source_coll() self.cleaning(cursor_list) self.set_run_status(False)