diff --git a/api/api_v1/endpoints/interview.py b/api/api_v1/endpoints/interview.py index 992e7e6..78e1c42 100644 --- a/api/api_v1/endpoints/interview.py +++ b/api/api_v1/endpoints/interview.py @@ -2397,7 +2397,7 @@ async def send_interviewee( send = f"""【推荐通知】\n{current_user.nickname}给你推荐了一位面试者\n面试者姓名:{res[0]['name']}\n岗位:{res[0]['job_names']}""" unionid_list = [i['unionid'] for i in user['hr_name']] # send_dates(send, user_list) #工作通知 - # Sample.create_task('推荐通知', current_user.unionid, send, unionid_list) # 待办通知 + Sample.create_task('推荐通知', current_user.unionid, send, unionid_list) # 待办通知 # 存一份推荐,如到一定时间没有处理推荐,则返回通知推荐人处理 datas = {'uid': data_in.uid, 'name': res[0]['name'], 'job_name': res[0]['job_names'], 'hr_name': user['hr_name'], diff --git a/script/send_interviewee.py b/script/send_interviewee.py index b0875bc..99f54f6 100644 --- a/script/send_interviewee.py +++ b/script/send_interviewee.py @@ -1,32 +1,210 @@ +# 本script文件作于脚本提醒文件 +import datetime +import json +import sched +import threading + +from script.settings import mdb +import time + +# # 构造一个sched.scheduler类 +# scheduler = sched.scheduler(time.time, time.sleep) +# counter = 0 +# def increment_counter(**kwargs): +# global counter +# print('EVENT:', time.time(), kwargs['uid']) +# counter += 1 +# print('NOW:', counter) +# print('START:', time.time()) +# e1 = scheduler.enter(30, 1, increment_counter,kwargs={'uid':'qwe1'} ) +# e2 = scheduler.enter(30, 1, increment_counter,kwargs={'uid':'qwe2'} ) +# # 开始一个线程执行事件 +# t = threading.Thread(target=scheduler.run) +# t.start() +# # 在主线程,取消第一个预定事件 +# #scheduler.cancel(e1) +# a=scheduler.empty() +# b=scheduler.queue +# for i in b: +# ac=i.kwargs +# if ac['uid'] == 'qwe1': +# scheduler.cancel(i) +# scheduler.enter(31, 1, increment_counter,kwargs={'uid':'qwe3'} ) +# t = threading.Thread(target=scheduler.run) +# t.start() +# c=1 +# scheduler.enter(32, 1, increment_counter,kwargs={'uid':'qwe4'} ) + +# from datetime import datetime +# from pymongo import MongoClient +# from apscheduler.schedulers.blocking import BlockingScheduler +# from apscheduler.jobstores.memory import MemoryJobStore +# from apscheduler.jobstores.mongodb import MongoDBJobStore +# from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor +# +# # MongoDB 参数 +# host = '127.0.0.1' +# port = 27017 +# client = MongoClient(host, port) +# +# +# # 输出时间 +# def job(): +# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) +# +# +# # 存储方式 +# jobstores = { +# 'mongo': MongoDBJobStore(collection='job', database='test', client=client), +# # 'default': MemoryJobStore() +# } +# executors = { +# 'default': ThreadPoolExecutor(20), +# # 'processpool': ProcessPoolExecutor(3) +# } +# job_defaults = { +# 'coalesce': True, # 宕机后开始执行的,只会执行最后一次的时间 +# 'max_instances': 3 # 每个job在同一时刻能够运行的最大实例数,默认情况下为1个,可以指定为更大值,这样即使上个job还没运行完同一个job又被调度的话也能够再开一个线程执行。 +# } +# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) +# scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') +# scheduler.start() +# {"uid": "sdfsd"} +# 读数据 +# with open('task.json', 'r', encoding='utf-8') as f: +# d = json.load(f) +# d['ac'] = 'fhdh' +# jsontext = json.dumps(d) +# # 写数据 +# with open('task.json', 'w', encoding='utf-8') as f: +# f.write(jsontext) +scheduler = sched.scheduler(time.time, time.sleep) +def write_task(jsontext): + with open('task.json', 'w', encoding='utf-8') as f: + f.write(jsontext) +def hours(): + """ + 获取当前时间加24小时之后转成时间戳 + :return: + """ + times = str(datetime.datetime.now()).split('.')[0] + times = datetime.datetime.strptime(times, "%Y-%m-%d %H:%M:%S") + times = str(times + datetime.timedelta(hours=24)) + timearray = time.strptime(times, '%Y-%m-%d %H:%M:%S') + return int(time.mktime(timearray)) +def task(**kwargs): + # 执行推送 + #Sample.create_task(kwargs['subject'], kwargs['creator_id'], kwargs['description'], kwargs['executor_ids']) + print(kwargs) + # 执行完任务,把配置里面的任务取消掉 + with open('task.json', 'r', encoding='utf-8') as f: + data = json.load(f) + # 判断任务为空 + if data != {}: + data.pop(kwargs['uid']) + jsontext = json.dumps(data) + # 写数据 + write_task(jsontext) -def send_interviewee(): - - ... - - - - - - - - - - - - - - - - - - - - +def run_task(data): + """ + 重启服务 + data为读取的json文件数据 + """ + res = scheduler.queue + uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid + for k, v in data.items(): + if k not in uid: # 在现有任务里面不在json文件里面,则启动json文件里面的一个任务 + now = str(time.time()).split('.')[0] + end_time = v['times'] - int(now) + #没有过时的才会重启任务 + if end_time >0: + scheduler.enter(end_time, 1, task, kwargs=v) + t = threading.Thread(target=scheduler.run) + t.start() +def judge(**kwarg): + # 没有任务 + if scheduler.empty(): + with open('task.json', 'r', encoding='utf-8') as f: + ff=f.read() + if ff == '': + data={} + else: + data = json.loads(ff) + if data == {}: + # 创建一个任务 + scheduler.enter(3600, 1, task, kwargs=kwarg) + data[kwarg['uid']] = kwarg + jsontext = json.dumps(data) + else: # 重启所有服务 + run_task(data) + # 再添加这次的服务 + scheduler.enter(3600, 1, task, kwargs=kwarg) + data[kwarg['uid']] = kwarg + jsontext = json.dumps(data) + write_task(jsontext) + # 开启线程 + t = threading.Thread(target=scheduler.run) + t.start() + else: + # 查询创建了的任务 + res = scheduler.queue + uid = [i.kwargs['uid'] for i in res]#在执行中的任务uid + if not kwarg['types']: # 换新的任务 + if kwarg['uid'] in uid:#如果新开的任务在执行的任务中 + for i in res: + # 如存在同样的求职者id,取消老任务 + if i.kwargs['uid'] == kwarg['uid']: + scheduler.cancel(i) + with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 + data = json.load(f) + for k, v in data.items(): + if v['uid'] == kwarg['uid']: + data[kwarg['uid']]=kwarg + scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 + data[kwarg['uid']] = kwarg + else:#不在 + with open('task.json', 'r', encoding='utf-8') as f: # 添加json任务记录 + data = json.load(f) + data[kwarg['uid']]=kwarg + scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 + jsontext = json.dumps(data) + write_task(jsontext) + else: # 删除任务 + for i in res: + # 如存在同样的求职者id,取消老任务 + if i.kwargs['uid'] == kwarg['uid']: + scheduler.cancel(i) + with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 + data = json.load(f) + for k, v in data.items(): + if v['uid'] == kwarg['uid']: + data.pop(v['uid']) + jsontext = json.dumps(data) + write_task(jsontext) + break +if __name__ == '__main__': + kwarg = {"uid": "sdfsd", + "subject": "推荐通知", + "creator_id": "创建者", + "description": "待办内容", + "executor_ids": "执行者", + "types": False, + "times": 1666232200} + judge(**kwarg) + kwargs = {"uid": "sdfsd1", + "subject": "推荐通知2", + "creator_id": "创建者", + "description": "待办内容", + "executor_ids": "执行者", + "types": True, + "times": 1666232300} + judge(**kwargs) diff --git a/script/settings.py b/script/settings.py index d1ff979..fd00fe5 100644 --- a/script/settings.py +++ b/script/settings.py @@ -6,7 +6,10 @@ ckdb = Client(host='139.159.159.3', port='9654', user='legu', password='gncPASUw mdb = pymongo.MongoClient(host='139.159.159.3', port=27017, username='root', password='iamciniao') db = mdb['hr_system'] -mdb = db.userinfo +mdb = db.department # 本地 # ckdb = Client(host='139.159.159.3', port='9654', user='legu', password='gncPASUwpYrc') # mdb = pymongo.MongoClient(host='10.0.0.240', port=27017) +import asyncio + + diff --git a/utils/func.py b/utils/func.py index c9fc18f..a5b93d6 100644 --- a/utils/func.py +++ b/utils/func.py @@ -1,7 +1,9 @@ #!/usr/bin/python # coding:utf-8 +import json import os import random +import threading import time import datetime from math import ceil @@ -14,6 +16,7 @@ from datetime import timedelta from datetime import datetime as p1 import calendar from core.config import Settings +from utils.dingding import Sample def get_uid(): @@ -376,6 +379,12 @@ def png2pdf(dir_path, filename): doc.save(res_path) return res_path, new_filename + +def write_task(jsontext): + with open('task.json', 'w', encoding='utf-8') as f: + f.write(jsontext) + + def get_msec(): """ 获取当前日期的毫秒级时间 @@ -384,9 +393,120 @@ def get_msec(): today = datetime.date.today().strftime('%Y-%m-%d 23:59:59') t = int(time.mktime(time.strptime(today, "%Y-%m-%d %H:%M:%S"))) return int(round(t * 1000)) + + +def hours(): + """ + 获取当前时间加24小时之后转成时间戳 + :return: + """ + times = str(datetime.datetime.now()).split('.')[0] + times = datetime.datetime.strptime(times, "%Y-%m-%d %H:%M:%S") + times = str(times + datetime.timedelta(hours=24)) + timearray = time.strptime(times, '%Y-%m-%d %H:%M:%S') + return int(time.mktime(timearray)) + + +def task(**kwargs): + # 执行推送 + Sample.create_task(kwargs['subject'], kwargs['creator_id'], kwargs['description'], kwargs['executor_ids']) + # 执行完任务,把配置里面的任务取消掉 + with open('task.json', 'r', encoding='utf-8') as f: + data = json.load(f) + # 判断任务为空 + if data != {}: + data.pop(kwargs['uid']) + jsontext = json.dumps(data) + # 写数据 + write_task(jsontext) + + +def run_task(data): + """ + 重启服务 + data为读取的json文件数据 + """ + res = Settings.scheduler.queue + uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid + for k, v in data.items(): + if k not in uid: # 在现有任务里面不在json文件里面,则启动json文件里面的一个任务 + now = str(time.time()).split('.')[0] + end_time = v['times'] - int(now) + #没有过时的才会重启任务 + if end_time >0: + Settings.scheduler.enter(end_time, 1, task, kwargs=v) + t = threading.Thread(target=Settings.scheduler.run) + t.start() + + +def judge(**kwarg): + # 没有任务 + if Settings.scheduler.empty(): + with open('task.json', 'r', encoding='utf-8') as f: + ff=f.read() + if ff == '': + data={} + else: + data = json.loads(ff) + if data == {}: + # 创建一个任务 + Settings.scheduler.enter(3600, 1, task, kwargs=kwarg) + data[kwarg['uid']] = kwarg + jsontext = json.dumps(data) + else: # 重启所有服务 + run_task(data) + # 再添加这次的服务 + Settings.scheduler.enter(3600, 1, task, kwargs=kwarg) + data[kwarg['uid']] = kwarg + jsontext = json.dumps(data) + write_task(jsontext) + # 开启线程 + t = threading.Thread(target=Settings.scheduler.run) + t.start() + else: + # 查询创建了的任务 + res = Settings.scheduler.queue + uid = [i.kwargs['uid'] for i in res]#在执行中的任务uid + if not kwarg['types']: # 换新的任务 + if kwarg['uid'] in uid:#如果新开的任务在执行的任务中 + for i in res: + # 如存在同样的求职者id,取消老任务 + if i.kwargs['uid'] == kwarg['uid']: + Settings.scheduler.cancel(i) + with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 + data = json.load(f) + for k, v in data.items(): + if v['uid'] == kwarg['uid']: + data[kwarg['uid']]=kwarg + Settings.scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 + data[kwarg['uid']] = kwarg + else:#不在 + with open('task.json', 'r', encoding='utf-8') as f: # 添加json任务记录 + data = json.load(f) + data['uid'] = kwarg['uid'] + Settings.scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务 + jsontext = json.dumps(data) + write_task(jsontext) + else: # 删除任务 + for i in res: + # 如存在同样的求职者id,取消老任务 + if i.kwargs['uid'] == kwarg['uid']: + Settings.scheduler.cancel(i) + with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录 + data = json.load(f) + for k, v in data.items(): + if v['uid'] == kwarg['uid']: + data.pop(v['uid']) + jsontext = json.dumps(data) + write_task(jsontext) + break + if __name__ == '__main__': - pass - # fn=r'C:\Users\Administrator\Desktop\面试简历1\智联招聘_张双琪_Web开发工程师_中文.doc' - # path_data=r'C:\Users\Administrator\Desktop\面试简历1\\' - # filename='智联招聘_张双琪_Web开发工程师_中文.doc' - # doc2pdf(fn, path_data, filename) + kwarg = {"uid": "sdfsd", + "subject": "推荐通知", + "creator_id": "创建者", + "description": "待办内容", + "executor_ids": "执行者", + "types": False, + "times": 1666231200} + judge(**kwarg)