diff --git a/main.py b/main.py index 8a95451..7de7015 100644 --- a/main.py +++ b/main.py @@ -135,6 +135,7 @@ async def run_task(request: Request, call_next): data = json.loads(ff) res = scheduler.queue uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid + _del=[] for k, v in data.items(): if k not in uid: # 在现有任务里面不在json文件里面,则启动json文件里面的一个任务 now = str(time.time()).split('.')[0] @@ -143,11 +144,15 @@ async def run_task(request: Request, call_next): if end_time > 0: scheduler.enter(end_time, 1, task, kwargs=v) else: - data.pop(k) - jsontext = json.dumps(data) - write_task(jsontext) + _del.append(k) + if _del != []: + for i in _del: + data.pop(i) + jsontext = json.dumps(data) + write_task(jsontext) t = threading.Thread(target=scheduler.run) t.start() + return response app.add_middleware(AuthenticationMiddleware, backend=BasicAuth(), on_error=login_expired) app.add_middleware( diff --git a/script/__init__.py b/script/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/script/send_interviewee.py b/script/send_interviewee.py deleted file mode 100644 index 99f54f6..0000000 --- a/script/send_interviewee.py +++ /dev/null @@ -1,210 +0,0 @@ -# 本script文件作于脚本提醒文件 -import datetime -import json -import sched -import threading - -from script.settings import mdb -import time - -# # 构造一个sched.scheduler类 -# scheduler = sched.scheduler(time.time, time.sleep) -# counter = 0 -# def increment_counter(**kwargs): -# global counter -# print('EVENT:', time.time(), kwargs['uid']) -# counter += 1 -# print('NOW:', counter) -# print('START:', time.time()) -# e1 = scheduler.enter(30, 1, increment_counter,kwargs={'uid':'qwe1'} ) -# e2 = scheduler.enter(30, 1, increment_counter,kwargs={'uid':'qwe2'} ) -# # 开始一个线程执行事件 -# t = threading.Thread(target=scheduler.run) -# t.start() -# # 在主线程,取消第一个预定事件 -# #scheduler.cancel(e1) -# a=scheduler.empty() -# b=scheduler.queue -# for i in b: -# ac=i.kwargs -# if ac['uid'] == 'qwe1': -# scheduler.cancel(i) -# scheduler.enter(31, 1, increment_counter,kwargs={'uid':'qwe3'} ) -# t = threading.Thread(target=scheduler.run) -# t.start() -# c=1 -# scheduler.enter(32, 1, increment_counter,kwargs={'uid':'qwe4'} ) - -# from datetime import datetime -# from pymongo import MongoClient -# from apscheduler.schedulers.blocking import BlockingScheduler -# from apscheduler.jobstores.memory import MemoryJobStore -# from apscheduler.jobstores.mongodb import MongoDBJobStore -# from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor -# -# # MongoDB 参数 -# host = '127.0.0.1' -# port = 27017 -# client = MongoClient(host, port) -# -# -# # 输出时间 -# def job(): -# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) -# -# -# # 存储方式 -# jobstores = { -# 'mongo': MongoDBJobStore(collection='job', database='test', client=client), -# # 'default': MemoryJobStore() -# } -# executors = { -# 'default': ThreadPoolExecutor(20), -# # 'processpool': ProcessPoolExecutor(3) -# } -# job_defaults = { -# 'coalesce': True, # 宕机后开始执行的,只会执行最后一次的时间 -# 'max_instances': 3 # 每个job在同一时刻能够运行的最大实例数,默认情况下为1个,可以指定为更大值,这样即使上个job还没运行完同一个job又被调度的话也能够再开一个线程执行。 -# } -# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) -# scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') -# scheduler.start() -# {"uid": "sdfsd"} -# 读数据 -# with open('task.json', 'r', encoding='utf-8') as f: -# d = json.load(f) -# d['ac'] = 'fhdh' -# jsontext = json.dumps(d) -# # 写数据 -# with open('task.json', 'w', encoding='utf-8') as f: -# f.write(jsontext) -scheduler = sched.scheduler(time.time, time.sleep) -def write_task(jsontext): - with open('task.json', 'w', encoding='utf-8') as f: - f.write(jsontext) - - -def hours(): - """ - 获取当前时间加24小时之后转成时间戳 - :return: - """ - times = str(datetime.datetime.now()).split('.')[0] - times = datetime.datetime.strptime(times, "%Y-%m-%d %H:%M:%S") - times = str(times + datetime.timedelta(hours=24)) - timearray = time.strptime(times, '%Y-%m-%d %H:%M:%S') - return int(time.mktime(timearray)) - - -def task(**kwargs): - # 执行推送 - #Sample.create_task(kwargs['subject'], kwargs['creator_id'], kwargs['description'], kwargs['executor_ids']) - print(kwargs) - # 执行完任务,把配置里面的任务取消掉 - with open('task.json', 'r', encoding='utf-8') as f: - data = json.load(f) - # 判断任务为空 - if data != {}: - data.pop(kwargs['uid']) - jsontext = json.dumps(data) - # 写数据 - write_task(jsontext) - - -def run_task(data): - """ - 重启服务 - data为读取的json文件数据 - """ - res = scheduler.queue - uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid - for k, v in data.items(): - if k not in uid: # 在现有任务里面不在json文件里面,则启动json文件里面的一个任务 - now = str(time.time()).split('.')[0] - end_time = v['times'] - int(now) - #没有过时的才会重启任务 - if end_time >0: - scheduler.enter(end_time, 1, task, kwargs=v) - t = threading.Thread(target=scheduler.run) - t.start() - - -def judge(**kwarg): - # 没有任务 - if scheduler.empty(): - with open('task.json', 'r', encoding='utf-8') as f: - ff=f.read() - if ff == '': - data={} - else: - data = json.loads(ff) - if data == {}: - # 创建一个任务 - scheduler.enter(3600, 1, task, kwargs=kwarg) - data[kwarg['uid']] = kwarg - jsontext = json.dumps(data) - else: # 重启所有服务 - run_task(data) - # 再添加这次的服务 - scheduler.enter(3600, 1, task, kwargs=kwarg) - data[kwarg['uid']] = kwarg - jsontext = json.dumps(data) - write_task(jsontext) - # 开启线程 - t = threading.Thread(target=scheduler.run) - t.start() - else: - # 查询创建了的任务 - res = scheduler.queue - uid = [i.kwargs['uid'] for i in res]#在执行中的任务uid - if not kwarg['types']: # 换新的任务 - if kwarg['uid'] in uid:#如果新开的任务在执行的任务中 - for i in res: - # 如存在同样的求职者id,取消老任务 - if i.kwargs['uid'] == kwarg['uid']: - scheduler.cancel(i) - with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 - data = json.load(f) - for k, v in data.items(): - if v['uid'] == kwarg['uid']: - data[kwarg['uid']]=kwarg - scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 - data[kwarg['uid']] = kwarg - else:#不在 - with open('task.json', 'r', encoding='utf-8') as f: # 添加json任务记录 - data = json.load(f) - data[kwarg['uid']]=kwarg - scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 - jsontext = json.dumps(data) - write_task(jsontext) - else: # 删除任务 - for i in res: - # 如存在同样的求职者id,取消老任务 - if i.kwargs['uid'] == kwarg['uid']: - scheduler.cancel(i) - with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 - data = json.load(f) - for k, v in data.items(): - if v['uid'] == kwarg['uid']: - data.pop(v['uid']) - jsontext = json.dumps(data) - write_task(jsontext) - break - -if __name__ == '__main__': - kwarg = {"uid": "sdfsd", - "subject": "推荐通知", - "creator_id": "创建者", - "description": "待办内容", - "executor_ids": "执行者", - "types": False, - "times": 1666232200} - judge(**kwarg) - kwargs = {"uid": "sdfsd1", - "subject": "推荐通知2", - "creator_id": "创建者", - "description": "待办内容", - "executor_ids": "执行者", - "types": True, - "times": 1666232300} - judge(**kwargs) diff --git a/script/settings.py b/script/settings.py deleted file mode 100644 index fd00fe5..0000000 --- a/script/settings.py +++ /dev/null @@ -1,15 +0,0 @@ -import pymongo -from clickhouse_driver import Client - -# 线上 -ckdb = Client(host='139.159.159.3', port='9654', user='legu', password='gncPASUwpYrc') - -mdb = pymongo.MongoClient(host='139.159.159.3', port=27017, username='root', password='iamciniao') -db = mdb['hr_system'] -mdb = db.department -# 本地 -# ckdb = Client(host='139.159.159.3', port='9654', user='legu', password='gncPASUwpYrc') -# mdb = pymongo.MongoClient(host='10.0.0.240', port=27017) -import asyncio - - diff --git a/task.json b/task.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/task.json @@ -0,0 +1 @@ +{} \ No newline at end of file