# 本script文件作于脚本提醒文件 import datetime import json import sched import threading from script.settings import mdb import time # # 构造一个sched.scheduler类 # scheduler = sched.scheduler(time.time, time.sleep) # counter = 0 # def increment_counter(**kwargs): # global counter # print('EVENT:', time.time(), kwargs['uid']) # counter += 1 # print('NOW:', counter) # print('START:', time.time()) # e1 = scheduler.enter(30, 1, increment_counter,kwargs={'uid':'qwe1'} ) # e2 = scheduler.enter(30, 1, increment_counter,kwargs={'uid':'qwe2'} ) # # 开始一个线程执行事件 # t = threading.Thread(target=scheduler.run) # t.start() # # 在主线程,取消第一个预定事件 # #scheduler.cancel(e1) # a=scheduler.empty() # b=scheduler.queue # for i in b: # ac=i.kwargs # if ac['uid'] == 'qwe1': # scheduler.cancel(i) # scheduler.enter(31, 1, increment_counter,kwargs={'uid':'qwe3'} ) # t = threading.Thread(target=scheduler.run) # t.start() # c=1 # scheduler.enter(32, 1, increment_counter,kwargs={'uid':'qwe4'} ) # from datetime import datetime # from pymongo import MongoClient # from apscheduler.schedulers.blocking import BlockingScheduler # from apscheduler.jobstores.memory import MemoryJobStore # from apscheduler.jobstores.mongodb import MongoDBJobStore # from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # # # MongoDB 参数 # host = '127.0.0.1' # port = 27017 # client = MongoClient(host, port) # # # # 输出时间 # def job(): # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # # # # 存储方式 # jobstores = { # 'mongo': MongoDBJobStore(collection='job', database='test', client=client), # # 'default': MemoryJobStore() # } # executors = { # 'default': ThreadPoolExecutor(20), # # 'processpool': ProcessPoolExecutor(3) # } # job_defaults = { # 'coalesce': True, # 宕机后开始执行的,只会执行最后一次的时间 # 'max_instances': 3 # 每个job在同一时刻能够运行的最大实例数,默认情况下为1个,可以指定为更大值,这样即使上个job还没运行完同一个job又被调度的话也能够再开一个线程执行。 # } # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) # scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') # scheduler.start() # {"uid": "sdfsd"} # 读数据 # with open('task.json', 'r', encoding='utf-8') as f: # d = json.load(f) # d['ac'] = 'fhdh' # jsontext = json.dumps(d) # # 写数据 # with open('task.json', 'w', encoding='utf-8') as f: # f.write(jsontext) scheduler = sched.scheduler(time.time, time.sleep) def write_task(jsontext): with open('task.json', 'w', encoding='utf-8') as f: f.write(jsontext) def hours(): """ 获取当前时间加24小时之后转成时间戳 :return: """ times = str(datetime.datetime.now()).split('.')[0] times = datetime.datetime.strptime(times, "%Y-%m-%d %H:%M:%S") times = str(times + datetime.timedelta(hours=24)) timearray = time.strptime(times, '%Y-%m-%d %H:%M:%S') return int(time.mktime(timearray)) def task(**kwargs): # 执行推送 #Sample.create_task(kwargs['subject'], kwargs['creator_id'], kwargs['description'], kwargs['executor_ids']) print(kwargs) # 执行完任务,把配置里面的任务取消掉 with open('task.json', 'r', encoding='utf-8') as f: data = json.load(f) # 判断任务为空 if data != {}: data.pop(kwargs['uid']) jsontext = json.dumps(data) # 写数据 write_task(jsontext) def run_task(data): """ 重启服务 data为读取的json文件数据 """ res = scheduler.queue uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid for k, v in data.items(): if k not in uid: # 在现有任务里面不在json文件里面,则启动json文件里面的一个任务 now = str(time.time()).split('.')[0] end_time = v['times'] - int(now) #没有过时的才会重启任务 if end_time >0: scheduler.enter(end_time, 1, task, kwargs=v) t = threading.Thread(target=scheduler.run) t.start() def judge(**kwarg): # 没有任务 if scheduler.empty(): with open('task.json', 'r', encoding='utf-8') as f: ff=f.read() if ff == '': data={} else: data = json.loads(ff) if data == {}: # 创建一个任务 scheduler.enter(3600, 1, task, kwargs=kwarg) data[kwarg['uid']] = kwarg jsontext = json.dumps(data) else: # 重启所有服务 run_task(data) # 再添加这次的服务 scheduler.enter(3600, 1, task, kwargs=kwarg) data[kwarg['uid']] = kwarg jsontext = json.dumps(data) write_task(jsontext) # 开启线程 t = threading.Thread(target=scheduler.run) t.start() else: # 查询创建了的任务 res = scheduler.queue uid = [i.kwargs['uid'] for i in res]#在执行中的任务uid if not kwarg['types']: # 换新的任务 if kwarg['uid'] in uid:#如果新开的任务在执行的任务中 for i in res: # 如存在同样的求职者id,取消老任务 if i.kwargs['uid'] == kwarg['uid']: scheduler.cancel(i) with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 data = json.load(f) for k, v in data.items(): if v['uid'] == kwarg['uid']: data[kwarg['uid']]=kwarg scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 data[kwarg['uid']] = kwarg else:#不在 with open('task.json', 'r', encoding='utf-8') as f: # 添加json任务记录 data = json.load(f) data[kwarg['uid']]=kwarg scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 jsontext = json.dumps(data) write_task(jsontext) else: # 删除任务 for i in res: # 如存在同样的求职者id,取消老任务 if i.kwargs['uid'] == kwarg['uid']: scheduler.cancel(i) with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 data = json.load(f) for k, v in data.items(): if v['uid'] == kwarg['uid']: data.pop(v['uid']) jsontext = json.dumps(data) write_task(jsontext) break if __name__ == '__main__': kwarg = {"uid": "sdfsd", "subject": "推荐通知", "creator_id": "创建者", "description": "待办内容", "executor_ids": "执行者", "types": False, "times": 1666232200} judge(**kwarg) kwargs = {"uid": "sdfsd1", "subject": "推荐通知2", "creator_id": "创建者", "description": "待办内容", "executor_ids": "执行者", "types": True, "times": 1666232300} judge(**kwargs)