data_cleaning/main.py
2021-01-13 20:11:39 +08:00

62 lines
1.7 KiB
Python

import json
import os
import sys
import time
import traceback
from utils import logger, ddsend_msg
from multiprocessing import Pool
from importlib import import_module
from db import *
def get_game() -> list:
local_db = get_local_db('admin_game')
games = list(local_db['game'].find())
return games
def run_task(kwargs):
start_ts = int(time.time())
try:
module_name = kwargs.get('task_name')
class_name = ''.join([s.capitalize() for s in module_name.split('_')])
module = import_module(f'.{module_name}', package='task')
c_obj = getattr(module, class_name)
obj = c_obj(**kwargs)
obj.run()
except Exception as e:
msg = traceback.format_exc()
ddsend_msg(f'异常退出 {msg}')
logger.error(msg)
finally:
# ddsend_msg(f'正常完成任务 {kwargs.get("task_name")} 耗时 {int(time.time()) - start_ts}')
logger.debug(f'任务结束。 {kwargs.get("task_name")} 耗时 {int(time.time()) - start_ts}')
if __name__ == '__main__':
# eg: summary_func 0 0
# eg: first_recharge 0 0
# eg: repair_gunfu 0 0
task_name, st, et = sys.argv[1:]
st, et = int(st), int(et)
game_list = get_game()
with open(os.path.join(settings.ROOT_DIR, 'config.json'), 'r', encoding='utf8') as f:
task_conf = json.load(f)
params = []
for item in game_list:
if not item.get('is_enable', True):
continue
p = {'game_name': item['id_name'],
'timezone': item.get('timezone', 'Asia/Shanghai'),
'st': st,
'et': et
}
p.update(task_conf[task_name])
params.append(p)
with Pool(len(game_list)) as p:
p.map(run_task, params)