更新任务调用

This commit is contained in:
kf_wuhao 2020-12-23 13:35:20 +08:00
parent 9912567004
commit a6e9845062
3 changed files with 56 additions and 14 deletions

12
config.json Normal file
View File

@ -0,0 +1,12 @@
{
"summary_func": {
"source_coll": "event",
"dest_coll": "summary_func",
"task_name": "summary_func"
},
"first_recharge": {
"source_coll": "paylist",
"dest_coll": "user",
"task_name": "first_recharge"
}
}

23
main.py
View File

@ -1,3 +1,4 @@
import json
import sys
from multiprocessing import Pool
@ -14,7 +15,7 @@ def get_game() -> list:
def run_task(kwargs):
module_name = kwargs.get('task_name')
class_name = ''.join([s.capitalize() for s in task_name.split('_')])
class_name = ''.join([s.capitalize() for s in module_name.split('_')])
module = import_module(f'.{module_name}', package='task')
c_obj = getattr(module, class_name)
obj = c_obj(**kwargs)
@ -23,15 +24,21 @@ def run_task(kwargs):
if __name__ == '__main__':
# eg: summary_func 0 0
# eg: first_recharge 0 0
task_name, st, et = sys.argv[1:]
st, et = int(st), int(et)
game_list = get_game()
params = [{'game_name': item['id_name'],
'task_name': task_name,
'timezone': item.get('timezone', 'Asia/Shanghai'),
'st': st,
'et': et
}
for item in game_list]
with open('config.json', 'r', encoding='utf8') as f:
task_conf = json.load(f)
params = []
for item in game_list:
p = {'game_name': item['id_name'],
'timezone': item.get('timezone', 'Asia/Shanghai'),
'st': st,
'et': et
}
p.update(task_conf[task_name])
params.append(p)
with Pool(len(game_list)) as p:
p.map(run_task, params)

View File

@ -17,6 +17,8 @@ class Task(metaclass=abc.ABCMeta):
def __init__(self, *args, **kwargs):
self.game_name = kwargs.get('game_name')
self.game_db = f'game_{self.game_name}'
self.source_coll = kwargs.get('source_coll')
self.dest_coll = kwargs.get('dest_coll')
self.cursor_st = kwargs.get('st')
self.cursor_et = kwargs.get('et')
self.timezone = kwargs.get('timezone')
@ -83,16 +85,31 @@ class Task(metaclass=abc.ABCMeta):
self.task_coll.update_one(self.task_where, {
'$set': kwargs}, upsert=True)
def get_event_coll(self) -> list:
"""
根据游标时间戳 返回要处理的集合
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
"""
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone), freq='30T')
df = pd.DataFrame(index=date_index)
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone)])
return df
def get_single_coll(self) -> list:
df = self.generate_cursor_time()
cursor_list = []
for k, item in df.T.items():
cursor_list.append({self.source_coll: {
'cursor_st': int(item['st'].timestamp()),
'cursor_et': int(item['et'].timestamp()),
}}
)
return cursor_list
def get_event_coll(self) -> list:
"""
根据游标时间戳 返回要处理的集合
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
"""
df = self.generate_cursor_time()
df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
df['event_coll_e'] = df['et'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')
cursor_list = []
@ -116,6 +133,12 @@ class Task(metaclass=abc.ABCMeta):
cursor_list.append(data)
return cursor_list
def get_source_coll(self) -> list:
if self.source_coll == 'event':
return self.get_event_coll()
else:
return self.get_single_coll()
def set_run_status(self, status: bool):
"""
设置运行状态
@ -134,6 +157,6 @@ class Task(metaclass=abc.ABCMeta):
self.set_run_ts()
self.set_run_status(True)
self.get_cursor()
cursor_list = self.get_event_coll()
cursor_list = self.get_source_coll()
self.cleaning(cursor_list)
self.set_run_status(False)