import json import os import sys import time import traceback from utils import logger, ddsend_msg from multiprocessing import Pool from importlib import import_module from db import * def get_game() -> list: local_db = get_local_db('admin_game') games = list(local_db['game'].find()) return games def run_task(kwargs): start_ts = int(time.time()) try: module_name = kwargs.get('task_name') class_name = ''.join([s.capitalize() for s in module_name.split('_')]) module = import_module(f'.{module_name}', package='task') c_obj = getattr(module, class_name) obj = c_obj(**kwargs) obj.run() except Exception as e: msg = traceback.format_exc() ddsend_msg(f'异常退出 {msg}') logger.error(msg) else: pass # ddsend_msg(f'正常完成任务 {kwargs.get("task_name")} 耗时 {int(time.time()) - start_ts}') finally: logger.debug(f'任务结束。. {kwargs.get("task_name")} 耗时 {int(time.time()) - start_ts}') if __name__ == '__main__': task_name, st, et = sys.argv[1:] st, et = int(st), int(et) game_list = get_game() with open(os.path.join(settings.ROOT_DIR, 'config.json'), 'r', encoding='utf8') as f: task_conf = json.load(f) params = [] for item in game_list: if not item.get('is_enable', True): continue p = {'game_name': item['id_name'], 'timezone': item.get('timezone', 'Asia/Shanghai'), 'st': st, 'et': et } p.update(task_conf[task_name]) params.append(p) with Pool(len(game_list)) as p: p.map(run_task, params)