561 lines
18 KiB
Python
561 lines
18 KiB
Python
#!/usr/bin/python
|
||
# coding:utf-8
|
||
import json
|
||
import os
|
||
import random
|
||
import sched
|
||
import threading
|
||
import time
|
||
import datetime
|
||
from email.mime.application import MIMEApplication
|
||
from email.mime.multipart import MIMEMultipart
|
||
from math import ceil
|
||
import pandas as pd
|
||
import smtplib
|
||
from email.mime.text import MIMEText
|
||
from email.utils import formataddr
|
||
from fitz import fitz
|
||
from datetime import timedelta
|
||
from datetime import datetime as p1
|
||
import calendar
|
||
from core.config import Settings
|
||
from utils.dingding import Sample
|
||
|
||
# 构造一个sched.scheduler类,用于给推荐通知做定时任务
|
||
scheduler = sched.scheduler(time.time, time.sleep)
|
||
|
||
my_sender = '250213850@qq.com' # 发件人邮箱账号
|
||
my_pass = 'whrsugtgkstibjdj' # 发件人邮箱密码
|
||
subject = '入职通知' # 邮件的主题,也可以说是标题
|
||
mail_host = 'smtp.qq.com'
|
||
|
||
|
||
def get_uid():
|
||
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
|
||
|
||
|
||
# 获取筛选条件的包含关系
|
||
def get_bijiao(bijiao):
|
||
if bijiao == '==' or bijiao == 'in' or bijiao == 'like' or bijiao == 'is not null':
|
||
return "IN"
|
||
elif bijiao == '!=' or bijiao == 'not like' or bijiao == 'is null':
|
||
return 'NOT LIKE'
|
||
|
||
|
||
# 判断传入的数据类型
|
||
def estimate_data(data_type):
|
||
if data_type == 'int':
|
||
return "Nullable(Int64)"
|
||
elif data_type == 'ip':
|
||
return "Nullable(DateTime('UTC'))"
|
||
else:
|
||
return "Nullable(String)"
|
||
|
||
|
||
# 将字典变成字符串
|
||
def dict_to_str(dic):
|
||
c = str()
|
||
b = 0
|
||
for k, v in dic.items():
|
||
b += 1
|
||
if b == 1:
|
||
c += "{\"%s\":\"%s\"," % (k, v)
|
||
elif b != len(dic):
|
||
c += "\"%s\":\"%s\"," % (k, v)
|
||
else:
|
||
c += "\"%s\":\"%s\"}" % (k, v)
|
||
return c
|
||
|
||
|
||
def getEveryDay(begin_date, end_date):
|
||
# 前闭后闭
|
||
date_list = []
|
||
begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
|
||
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
|
||
while begin_date <= end_date:
|
||
date_str = begin_date.strftime("%Y-%m-%d")
|
||
date_list.append(date_str)
|
||
begin_date += datetime.timedelta(days=1)
|
||
return date_list
|
||
|
||
|
||
# print(getEveryDay('2016-01-01','2017-05-11'))
|
||
def Download_xlsx(df, name):
|
||
"""
|
||
下载功能
|
||
name为文件名
|
||
"""
|
||
from urllib.parse import quote
|
||
import mimetypes
|
||
from utils import DfToStream
|
||
from fastapi.responses import StreamingResponse
|
||
file_name = quote(f'{name}.xlsx')
|
||
mime = mimetypes.guess_type(file_name)[0]
|
||
df_to_stream = DfToStream((df, name))
|
||
with df_to_stream as d:
|
||
export = d.to_stream()
|
||
Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
|
||
return Download
|
||
|
||
|
||
def jiange_insert(list_date):
|
||
"""
|
||
间隔1条插入一条数据插入数据
|
||
:param day: list数据
|
||
:return: list
|
||
"""
|
||
i = 1
|
||
while i <= len(list_date):
|
||
list_date.insert(i, '-')
|
||
i += 2
|
||
return list_date
|
||
|
||
|
||
def create_df(resp):
|
||
"""
|
||
分布分析外部下载功能的df数据
|
||
"""
|
||
columns = resp['label']
|
||
day = list(resp['list'].keys())
|
||
jiange_insert(day)
|
||
date = []
|
||
day_nu = 0
|
||
for i in day:
|
||
if i == '-':
|
||
av = day[day_nu - 1]
|
||
day_date = resp['list'][av]['总体']
|
||
else:
|
||
day_date = resp['list'][i]['总体']
|
||
date_dict = {}
|
||
n = 0
|
||
p = 0
|
||
if i == '-':
|
||
date_dict['事件发生时间'] = '-'
|
||
date_dict['总人数'] = '-'
|
||
for nu in range(len(columns)):
|
||
date_dict[columns[nu]] = day_date['p'][p]
|
||
p += 1
|
||
date.append(date_dict)
|
||
else:
|
||
date_dict['事件发生时间'] = i
|
||
date_dict['总人数'] = day_date['total']
|
||
for nu in range(len(columns)):
|
||
date_dict[columns[nu]] = day_date['n'][n]
|
||
n += 1
|
||
date.append(date_dict)
|
||
day_nu += 1
|
||
columns.insert(0, '总人数')
|
||
columns.insert(0, '事件发生时间')
|
||
df = pd.DataFrame(data=date, columns=columns)
|
||
return df
|
||
|
||
|
||
def create_neidf(resp, columnName):
|
||
"""
|
||
分布分析内部下载功能的df数据
|
||
"""
|
||
columns = resp['label']
|
||
day = list(resp['list'].keys())
|
||
jiange_insert(day)
|
||
date = []
|
||
day_nu = 0
|
||
for i in day:
|
||
if i == '-':
|
||
av = day[day_nu - 1]
|
||
day_date = resp['list'][av]
|
||
else:
|
||
day_date = resp['list'][i]
|
||
date_dict = {}
|
||
n = 0
|
||
p = 0
|
||
if i == '-':
|
||
date_dict[columnName] = '-'
|
||
date_dict['全部用户数'] = '-'
|
||
for nu in range(len(columns)):
|
||
date_dict[columns[nu]] = day_date['p'][p]
|
||
p += 1
|
||
date.append(date_dict)
|
||
else:
|
||
date_dict[columnName] = i
|
||
date_dict['全部用户数'] = day_date['total']
|
||
for nu in range(len(columns)):
|
||
date_dict[columns[nu]] = day_date['n'][n]
|
||
n += 1
|
||
date.append(date_dict)
|
||
day_nu += 1
|
||
columns.insert(0, '全部用户数')
|
||
columns.insert(0, columnName)
|
||
df = pd.DataFrame(data=date, columns=columns)
|
||
return df
|
||
|
||
|
||
def random_hex():
|
||
"""
|
||
生成16位随机数
|
||
:return: 随机数
|
||
"""
|
||
result = hex(random.randint(0, 16 ** 16)).replace('0x', '').upper()
|
||
if (len(result) < 16):
|
||
result = '0' * (16 - len(result)) + result
|
||
return result
|
||
|
||
|
||
def get_time(fmt: str = '%Y-%m-%d') -> str:
|
||
'''
|
||
获取当前时间
|
||
'''
|
||
ts = time.time()
|
||
ta = time.localtime(ts)
|
||
t = time.strftime(fmt, ta)
|
||
return t
|
||
|
||
|
||
def import_excel(data, columns, name):
|
||
"""
|
||
导出数据到xlsx表里面
|
||
:param data: 需要导出的数据
|
||
:param columns: df的表名 例:['a','b','c']
|
||
:param name: 文件名。例:'随机数'
|
||
:return:
|
||
"""
|
||
zh = pd.DataFrame(data, columns=columns)
|
||
zh.to_excel(f'{name}.xlsx', index=False, header=True)
|
||
|
||
|
||
def qujian_time(start_time, end_time):
|
||
"""
|
||
把两个时间变成区间
|
||
:param start_time: '2022-07-01 10:00:00'
|
||
:param end_time: '2022-07-01 10:30:00'
|
||
:return: '2022-07-01 10:00:00~10:30:00'
|
||
"""
|
||
timess = str(end_time).split(' ')[-1]
|
||
return str(start_time) + '~' + timess
|
||
|
||
|
||
# 发送的文本邮件
|
||
def send_str_mail(str_msg, my_user):
|
||
"""
|
||
发送的文本邮件
|
||
:param str_msg: 发送的邮件内容
|
||
:param my_user: 接收者的邮箱
|
||
:return: bool
|
||
"""
|
||
msg = MIMEText(str_msg, 'plain', 'utf-8')
|
||
msg['From'] = formataddr(["乐谷游戏人事", my_sender]) # 括号里的对应发件人邮箱昵称,发件人邮箱账号
|
||
msg['To'] = formataddr(["FK", my_user]) # 括号里的对应收件人邮箱昵称,收件人邮箱账号
|
||
msg['Subject'] = subject # 邮件的主题,也可以说是标题
|
||
|
||
server = smtplib.SMTP_SSL("smtp.qq.com", 465) # 发件人邮箱中的SMTP服务器,端口是25
|
||
server.login(my_sender, my_pass) # 括号中对应的是发件人邮箱账号,邮箱密码
|
||
# server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件
|
||
server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件
|
||
server.quit() # 关闭连接
|
||
|
||
|
||
def send_affix_mail(str_msg: str, send_to: list, file_name: str = None):
|
||
"""
|
||
发送邮件,包含文本或和附件
|
||
:param str_msg: 发送的邮件内容
|
||
:param send_to: 接收者的邮箱
|
||
:param file_name: 上传的附件名,绝对路径
|
||
"""
|
||
msg = MIMEMultipart() # 创建一个带附件的实例
|
||
msg["Subject"] = "面试通知"
|
||
msg["From"] = my_sender
|
||
msg["To"] = ','.join(send_to)
|
||
# ---文字部分---
|
||
part = MIMEText(str_msg)
|
||
msg.attach(part)
|
||
# ---附件部分---
|
||
if file_name != None:
|
||
part = MIMEApplication(open(file_name, 'rb').read())
|
||
filename = file_name.split('/')[-1]
|
||
part.add_header('Content-Disposition', 'attachment', filename=filename)
|
||
msg.attach(part)
|
||
smpt = smtplib.SMTP_SSL(mail_host, 465, 'utf-8')
|
||
smpt.login(my_sender, my_pass)
|
||
smpt.sendmail(my_sender, send_to, msg.as_string())
|
||
smpt.quit()
|
||
|
||
|
||
def get_week(date_str=None):
|
||
if date_str and isinstance(date_str, str):
|
||
now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime(
|
||
"%Y-%m-%d %H:%M:%S")
|
||
else:
|
||
now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||
now_time = strptime(now_time)
|
||
# 当前日期所在周的周一
|
||
week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute,
|
||
seconds=now_time.second)
|
||
# 当前日期所在周的周日
|
||
week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59)
|
||
return week_start_time, week_end_time
|
||
|
||
|
||
def strptime(date_string):
|
||
"""
|
||
将字符串转换成datetime.datetime类型
|
||
:param date_string: '2022-05-29 23:59:59'
|
||
:return: 2022-05-29 23:59:59
|
||
"""
|
||
return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S')
|
||
|
||
|
||
def strptime1(date_str):
|
||
"""
|
||
将字符串转换成datetime.datetime类型
|
||
:param date_string: '2022-05-29'
|
||
:return: 2022-05-29 00:00:00
|
||
"""
|
||
return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def start_end_month(time):
|
||
"""
|
||
获取某个月的起始时间和结束时间
|
||
:param time: '2022-05-29'
|
||
:return:
|
||
"""
|
||
now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S")
|
||
this_month_start = datetime.datetime(now.year, now.month, 1)
|
||
this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1])
|
||
this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59)
|
||
return this_month_start, this_month_end1
|
||
|
||
|
||
# 获取两个日期之间的所有天数
|
||
def get_every_days(sdate, edate):
|
||
days = []
|
||
start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d')
|
||
end_date = datetime.datetime.strptime(edate, '%Y-%m-%d')
|
||
different_day = end_date - start_date
|
||
day_num = different_day.days
|
||
for i in range(day_num + 1):
|
||
day = start_date + timedelta(days=i)
|
||
true_day = datetime.datetime.strftime(day, '%Y-%m-%d')
|
||
days.append(true_day)
|
||
return days
|
||
|
||
|
||
# 获取两个日期之间的所有月份
|
||
def get_every_months(sdate, edate):
|
||
start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d')
|
||
end_date = datetime.datetime.strptime(edate, '%Y-%m-%d')
|
||
months = (start_date.year - end_date.year) * 12 + end_date.month - start_date.month
|
||
month_range = ['%04d-%02d' % (int(start_date.year + mon // 12), int(mon % 12 + 1))
|
||
for mon in range(start_date.month - 1, start_date.month + months)]
|
||
|
||
return month_range
|
||
|
||
|
||
# 获取两个日期之间所有的周日日期
|
||
def get_every_weeks(sdate, edate):
|
||
start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d')
|
||
end_date = datetime.datetime.strptime(edate, '%Y-%m-%d')
|
||
different_day = end_date - start_date
|
||
day_num = different_day.days
|
||
weeks = int(ceil(day_num / 7))
|
||
week_days = []
|
||
# 当前周的周末的日期
|
||
start_day = get_current_week(start_date)
|
||
for i in range(weeks + 1):
|
||
day = start_day + timedelta(days=i * 7)
|
||
true_day = datetime.datetime.strftime(day, '%Y-%m-%d')
|
||
week_days.append(true_day)
|
||
return week_days
|
||
|
||
|
||
# 获取当前日期的周日日期
|
||
def get_current_week(s_date):
|
||
sunday = s_date
|
||
one_day = datetime.timedelta(days=1)
|
||
while sunday.weekday() != 6:
|
||
sunday += one_day
|
||
|
||
return sunday
|
||
|
||
|
||
from win32com import client
|
||
|
||
|
||
# 转换doc/docx为pdf
|
||
def doc2pdf(fn, path_data, filename):
|
||
# """
|
||
# :param fn: 'C:\Users\86173\Desktop\陈超峰.docx'
|
||
# :param path_data:'C:\Users\86173\Desktop\\'
|
||
# :param filename:陈超峰.docx
|
||
# :return: 'C:\Users\86173\Desktop\陈超峰.pdf'
|
||
# """
|
||
new_filename = filename.split('.')[0]
|
||
word = client.Dispatch("Word.Application") # 打开word应用程序
|
||
doc = word.Documents.Open(fn) # 打开word文件
|
||
doc.SaveAs("{}/{}.pdf".format(path_data, new_filename), 17) # 另存为后缀为".pdf"的文件,其中参数17表示为pdf
|
||
doc.Close() # 关闭原来word文件
|
||
word.Quit()
|
||
return path_data + '/' + new_filename + '.pdf', new_filename + '.pdf'
|
||
|
||
|
||
# 图片转pdf
|
||
def png2pdf(dir_path, filename):
|
||
"""
|
||
:param dir_path: 图片所在目录路径
|
||
:param filename: 图片文件名
|
||
:return: 转化成的dpf的绝对路径
|
||
"""
|
||
img_path = dir_path + '/' + filename
|
||
img_type = filename.split('.')[-1]
|
||
new_filename = os.path.basename(img_path).replace(img_type, 'pdf')
|
||
doc = fitz.open()
|
||
img_doc = fitz.open(img_path)
|
||
pdf_bytes = img_doc.convert_to_pdf()
|
||
img_pdf = fitz.open('pdf', pdf_bytes)
|
||
doc.insert_pdf(img_pdf)
|
||
res_path = dir_path + '/' + new_filename
|
||
doc.save(res_path)
|
||
return res_path, new_filename
|
||
|
||
|
||
# pdf转换为txt文字
|
||
def pdf_to_text(path):
|
||
res = ''
|
||
doc = fitz.open(path)
|
||
for page in doc:
|
||
text = page.getText()
|
||
res += text
|
||
return res
|
||
|
||
|
||
def write_task(jsontext):
|
||
with open('task.json', 'w', encoding='utf-8') as f:
|
||
f.write(jsontext)
|
||
|
||
|
||
def get_msec():
|
||
"""
|
||
获取当前日期的毫秒级时间
|
||
:return:
|
||
"""
|
||
today = datetime.date.today().strftime('%Y-%m-%d 23:59:59')
|
||
t = int(time.mktime(time.strptime(today, "%Y-%m-%d %H:%M:%S")))
|
||
return int(round(t * 1000))
|
||
|
||
|
||
def hours():
|
||
"""
|
||
获取当前时间加24小时之后转成时间戳
|
||
:return:
|
||
"""
|
||
times = str(datetime.datetime.now()).split('.')[0]
|
||
times = datetime.datetime.strptime(times, "%Y-%m-%d %H:%M:%S")
|
||
times = str(times + datetime.timedelta(hours=24))
|
||
timearray = time.strptime(times, '%Y-%m-%d %H:%M:%S')
|
||
return int(time.mktime(timearray))
|
||
|
||
|
||
def task(**kwargs):
|
||
# 执行推送
|
||
Sample.create_task(kwargs['subject'], kwargs['creator_id'], kwargs['description'], kwargs['executor_ids'])
|
||
# 执行完任务,把配置里面的任务取消掉
|
||
with open('task.json', 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
# 判断任务为空
|
||
if data != {}:
|
||
data.pop(kwargs['uid'])
|
||
jsontext = json.dumps(data)
|
||
# 写数据
|
||
write_task(jsontext)
|
||
|
||
|
||
def run_task(data):
|
||
"""
|
||
重启服务
|
||
data为读取的json文件数据
|
||
"""
|
||
res = scheduler.queue
|
||
uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid
|
||
for k, v in data.items():
|
||
if k not in uid: # 在现有任务里面不在json文件里面,则启动json文件里面的一个任务
|
||
now = str(time.time()).split('.')[0]
|
||
end_time = v['times'] - int(now)
|
||
# 没有过时的才会重启任务
|
||
if end_time > 0:
|
||
scheduler.enter(end_time, 1, task, kwargs=v)
|
||
t = threading.Thread(target=scheduler.run)
|
||
t.start()
|
||
|
||
|
||
def judge(**kwarg):
|
||
# 没有任务
|
||
if scheduler.empty():
|
||
with open('task.json', 'r', encoding='utf-8') as f:
|
||
ff = f.read()
|
||
if ff == '':
|
||
data = {}
|
||
else:
|
||
data = json.loads(ff)
|
||
if data == {}:
|
||
# 创建一个任务
|
||
scheduler.enter(3600, 1, task, kwargs=kwarg)
|
||
data[kwarg['uid']] = kwarg
|
||
jsontext = json.dumps(data)
|
||
else: # 重启所有服务
|
||
run_task(data)
|
||
# 再添加这次的服务
|
||
scheduler.enter(3600, 1, task, kwargs=kwarg)
|
||
data[kwarg['uid']] = kwarg
|
||
jsontext = json.dumps(data)
|
||
write_task(jsontext)
|
||
# 开启线程
|
||
t = threading.Thread(target=scheduler.run)
|
||
t.start()
|
||
else:
|
||
# 查询创建了的任务
|
||
res = scheduler.queue
|
||
uid = [i.kwargs['uid'] for i in res] # 在执行中的任务uid
|
||
if not kwarg['types']: # 换新的任务
|
||
if kwarg['uid'] in uid: # 如果新开的任务在执行的任务中
|
||
for i in res:
|
||
# 如存在同样的求职者id,取消老任务
|
||
if i.kwargs['uid'] == kwarg['uid']:
|
||
scheduler.cancel(i)
|
||
with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录
|
||
data = json.load(f)
|
||
for k, v in data.items():
|
||
if v['uid'] == kwarg['uid']:
|
||
data[kwarg['uid']] = kwarg
|
||
scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务
|
||
data[kwarg['uid']] = kwarg
|
||
else: # 不在
|
||
with open('task.json', 'r', encoding='utf-8') as f: # 添加json任务记录
|
||
data = json.load(f)
|
||
data['uid'] = kwarg['uid']
|
||
scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务
|
||
jsontext = json.dumps(data)
|
||
write_task(jsontext)
|
||
else: # 删除任务
|
||
for i in res:
|
||
# 如存在同样的求职者id,取消老任务
|
||
if i.kwargs['uid'] == kwarg['uid']:
|
||
scheduler.cancel(i)
|
||
break
|
||
with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录
|
||
data = json.load(f)
|
||
for k, v in data.items():
|
||
if v['uid'] == kwarg['uid']:
|
||
data.pop(v['uid'])
|
||
jsontext = json.dumps(data)
|
||
write_task(jsontext)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
kwarg = {"uid": "sdfsd",
|
||
"subject": "推荐通知",
|
||
"creator_id": "创建者",
|
||
"description": "待办内容",
|
||
"executor_ids": "执行者",
|
||
"types": False,
|
||
"times": 1666231200}
|
||
judge(**kwarg)
|