diff --git a/config.json b/config.json new file mode 100644 index 0000000..22aff81 --- /dev/null +++ b/config.json @@ -0,0 +1,12 @@ +{ + "summary_func": { + "source_coll": "event", + "dest_coll": "summary_func", + "task_name": "summary_func" + }, + "first_recharge": { + "source_coll": "paylist", + "dest_coll": "user", + "task_name": "first_recharge" + } +} \ No newline at end of file diff --git a/main.py b/main.py index 8f50d67..0ffdea9 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import json import sys from multiprocessing import Pool @@ -14,7 +15,7 @@ def get_game() -> list: def run_task(kwargs): module_name = kwargs.get('task_name') - class_name = ''.join([s.capitalize() for s in task_name.split('_')]) + class_name = ''.join([s.capitalize() for s in module_name.split('_')]) module = import_module(f'.{module_name}', package='task') c_obj = getattr(module, class_name) obj = c_obj(**kwargs) @@ -23,15 +24,21 @@ def run_task(kwargs): if __name__ == '__main__': # eg: summary_func 0 0 + # eg: first_recharge 0 0 task_name, st, et = sys.argv[1:] st, et = int(st), int(et) game_list = get_game() - params = [{'game_name': item['id_name'], - 'task_name': task_name, - 'timezone': item.get('timezone', 'Asia/Shanghai'), - 'st': st, - 'et': et - } - for item in game_list] + with open('config.json', 'r', encoding='utf8') as f: + task_conf = json.load(f) + params = [] + for item in game_list: + p = {'game_name': item['id_name'], + 'timezone': item.get('timezone', 'Asia/Shanghai'), + 'st': st, + 'et': et + } + p.update(task_conf[task_name]) + params.append(p) + with Pool(len(game_list)) as p: p.map(run_task, params) diff --git a/task/task.py b/task/task.py index c6232b1..8381dda 100644 --- a/task/task.py +++ b/task/task.py @@ -17,6 +17,8 @@ class Task(metaclass=abc.ABCMeta): def __init__(self, *args, **kwargs): self.game_name = kwargs.get('game_name') self.game_db = f'game_{self.game_name}' + self.source_coll = kwargs.get('source_coll') + self.dest_coll = kwargs.get('dest_coll') self.cursor_st = kwargs.get('st') self.cursor_et = kwargs.get('et') self.timezone = kwargs.get('timezone') @@ -83,16 +85,31 @@ class Task(metaclass=abc.ABCMeta): self.task_coll.update_one(self.task_where, { '$set': kwargs}, upsert=True) - def get_event_coll(self) -> list: - """ - 根据游标时间戳 返回要处理的集合 - :return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}] - """ + def generate_cursor_time(self): date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone), pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq='30T') df = pd.DataFrame(index=date_index) df['st'] = df.index df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone)]) + return df + + def get_single_coll(self) -> list: + df = self.generate_cursor_time() + cursor_list = [] + for k, item in df.T.items(): + cursor_list.append({self.source_coll: { + 'cursor_st': int(item['st'].timestamp()), + 'cursor_et': int(item['et'].timestamp()), + }} + ) + return cursor_list + + def get_event_coll(self) -> list: + """ + 根据游标时间戳 返回要处理的集合 + :return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}] + """ + df = self.generate_cursor_time() df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}') df['event_coll_e'] = df['et'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}') cursor_list = [] @@ -116,6 +133,12 @@ class Task(metaclass=abc.ABCMeta): cursor_list.append(data) return cursor_list + def get_source_coll(self) -> list: + if self.source_coll == 'event': + return self.get_event_coll() + else: + return self.get_single_coll() + def set_run_status(self, status: bool): """ 设置运行状态 @@ -134,6 +157,6 @@ class Task(metaclass=abc.ABCMeta): self.set_run_ts() self.set_run_status(True) self.get_cursor() - cursor_list = self.get_event_coll() + cursor_list = self.get_source_coll() self.cleaning(cursor_list) self.set_run_status(False)