prs_server/utils/func.py
2022-11-02 17:50:07 +08:00

561 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# coding:utf-8
import json
import os
import random
import sched
import threading
import time
import datetime
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from math import ceil
import pandas as pd
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from fitz import fitz
from datetime import timedelta
from datetime import datetime as p1
import calendar
from core.config import Settings
from utils.dingding import Sample
# 构造一个sched.scheduler类,用于给推荐通知做定时任务
scheduler = sched.scheduler(time.time, time.sleep)
my_sender = '250213850@qq.com' # 发件人邮箱账号
my_pass = 'whrsugtgkstibjdj' # 发件人邮箱密码
subject = '入职通知' # 邮件的主题,也可以说是标题
mail_host = 'smtp.qq.com'
def get_uid():
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
# 获取筛选条件的包含关系
def get_bijiao(bijiao):
if bijiao == '==' or bijiao == 'in' or bijiao == 'like' or bijiao == 'is not null':
return "IN"
elif bijiao == '!=' or bijiao == 'not like' or bijiao == 'is null':
return 'NOT LIKE'
# 判断传入的数据类型
def estimate_data(data_type):
if data_type == 'int':
return "Nullable(Int64)"
elif data_type == 'ip':
return "Nullable(DateTime('UTC'))"
else:
return "Nullable(String)"
# 将字典变成字符串
def dict_to_str(dic):
c = str()
b = 0
for k, v in dic.items():
b += 1
if b == 1:
c += "{\"%s\":\"%s\"," % (k, v)
elif b != len(dic):
c += "\"%s\":\"%s\"," % (k, v)
else:
c += "\"%s\":\"%s\"}" % (k, v)
return c
def getEveryDay(begin_date, end_date):
# 前闭后闭
date_list = []
begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
while begin_date <= end_date:
date_str = begin_date.strftime("%Y-%m-%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
return date_list
# print(getEveryDay('2016-01-01','2017-05-11'))
def Download_xlsx(df, name):
"""
下载功能
name为文件名
"""
from urllib.parse import quote
import mimetypes
from utils import DfToStream
from fastapi.responses import StreamingResponse
file_name = quote(f'{name}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df, name))
with df_to_stream as d:
export = d.to_stream()
Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
return Download
def jiange_insert(list_date):
"""
间隔1条插入一条数据插入数据
:param day: list数据
:return: list
"""
i = 1
while i <= len(list_date):
list_date.insert(i, '-')
i += 2
return list_date
def create_df(resp):
"""
分布分析外部下载功能的df数据
"""
columns = resp['label']
day = list(resp['list'].keys())
jiange_insert(day)
date = []
day_nu = 0
for i in day:
if i == '-':
av = day[day_nu - 1]
day_date = resp['list'][av]['总体']
else:
day_date = resp['list'][i]['总体']
date_dict = {}
n = 0
p = 0
if i == '-':
date_dict['事件发生时间'] = '-'
date_dict['总人数'] = '-'
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['p'][p]
p += 1
date.append(date_dict)
else:
date_dict['事件发生时间'] = i
date_dict['总人数'] = day_date['total']
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['n'][n]
n += 1
date.append(date_dict)
day_nu += 1
columns.insert(0, '总人数')
columns.insert(0, '事件发生时间')
df = pd.DataFrame(data=date, columns=columns)
return df
def create_neidf(resp, columnName):
"""
分布分析内部下载功能的df数据
"""
columns = resp['label']
day = list(resp['list'].keys())
jiange_insert(day)
date = []
day_nu = 0
for i in day:
if i == '-':
av = day[day_nu - 1]
day_date = resp['list'][av]
else:
day_date = resp['list'][i]
date_dict = {}
n = 0
p = 0
if i == '-':
date_dict[columnName] = '-'
date_dict['全部用户数'] = '-'
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['p'][p]
p += 1
date.append(date_dict)
else:
date_dict[columnName] = i
date_dict['全部用户数'] = day_date['total']
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['n'][n]
n += 1
date.append(date_dict)
day_nu += 1
columns.insert(0, '全部用户数')
columns.insert(0, columnName)
df = pd.DataFrame(data=date, columns=columns)
return df
def random_hex():
"""
生成16位随机数
:return: 随机数
"""
result = hex(random.randint(0, 16 ** 16)).replace('0x', '').upper()
if (len(result) < 16):
result = '0' * (16 - len(result)) + result
return result
def get_time(fmt: str = '%Y-%m-%d') -> str:
'''
获取当前时间
'''
ts = time.time()
ta = time.localtime(ts)
t = time.strftime(fmt, ta)
return t
def import_excel(data, columns, name):
"""
导出数据到xlsx表里面
:param data: 需要导出的数据
:param columns: df的表名 例:['a','b','c']
:param name: 文件名。例:'随机数'
:return:
"""
zh = pd.DataFrame(data, columns=columns)
zh.to_excel(f'{name}.xlsx', index=False, header=True)
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess
# 发送的文本邮件
def send_str_mail(str_msg, my_user):
"""
发送的文本邮件
:param str_msg: 发送的邮件内容
:param my_user: 接收者的邮箱
:return: bool
"""
msg = MIMEText(str_msg, 'plain', 'utf-8')
msg['From'] = formataddr(["乐谷游戏人事", my_sender]) # 括号里的对应发件人邮箱昵称,发件人邮箱账号
msg['To'] = formataddr(["FK", my_user]) # 括号里的对应收件人邮箱昵称,收件人邮箱账号
msg['Subject'] = subject # 邮件的主题,也可以说是标题
server = smtplib.SMTP_SSL("smtp.qq.com", 465) # 发件人邮箱中的SMTP服务器端口是25
server.login(my_sender, my_pass) # 括号中对应的是发件人邮箱账号,邮箱密码
# server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件
server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件
server.quit() # 关闭连接
def send_affix_mail(str_msg: str, send_to: list, file_name: str = None):
"""
发送邮件,包含文本或和附件
:param str_msg: 发送的邮件内容
:param send_to: 接收者的邮箱
:param file_name: 上传的附件名,绝对路径
"""
msg = MIMEMultipart() # 创建一个带附件的实例
msg["Subject"] = "面试通知"
msg["From"] = my_sender
msg["To"] = ','.join(send_to)
# ---文字部分---
part = MIMEText(str_msg)
msg.attach(part)
# ---附件部分---
if file_name != None:
part = MIMEApplication(open(file_name, 'rb').read())
filename = file_name.split('/')[-1]
part.add_header('Content-Disposition', 'attachment', filename=filename)
msg.attach(part)
smpt = smtplib.SMTP_SSL(mail_host, 465, 'utf-8')
smpt.login(my_sender, my_pass)
smpt.sendmail(my_sender, send_to, msg.as_string())
smpt.quit()
def get_week(date_str=None):
if date_str and isinstance(date_str, str):
now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime(
"%Y-%m-%d %H:%M:%S")
else:
now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0)
now_time = strptime(now_time)
# 当前日期所在周的周一
week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute,
seconds=now_time.second)
# 当前日期所在周的周日
week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59)
return week_start_time, week_end_time
def strptime(date_string):
"""
将字符串转换成datetime.datetime类型
:param date_string: '2022-05-29 23:59:59'
:return: 2022-05-29 23:59:59
"""
return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S')
def strptime1(date_str):
"""
将字符串转换成datetime.datetime类型
:param date_string: '2022-05-29'
:return: 2022-05-29 00:00:00
"""
return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")
def start_end_month(time):
"""
获取某个月的起始时间和结束时间
:param time: '2022-05-29'
:return:
"""
now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S")
this_month_start = datetime.datetime(now.year, now.month, 1)
this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1])
this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59)
return this_month_start, this_month_end1
# 获取两个日期之间的所有天数
def get_every_days(sdate, edate):
days = []
start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d')
end_date = datetime.datetime.strptime(edate, '%Y-%m-%d')
different_day = end_date - start_date
day_num = different_day.days
for i in range(day_num + 1):
day = start_date + timedelta(days=i)
true_day = datetime.datetime.strftime(day, '%Y-%m-%d')
days.append(true_day)
return days
# 获取两个日期之间的所有月份
def get_every_months(sdate, edate):
start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d')
end_date = datetime.datetime.strptime(edate, '%Y-%m-%d')
months = (start_date.year - end_date.year) * 12 + end_date.month - start_date.month
month_range = ['%04d-%02d' % (int(start_date.year + mon // 12), int(mon % 12 + 1))
for mon in range(start_date.month - 1, start_date.month + months)]
return month_range
# 获取两个日期之间所有的周日日期
def get_every_weeks(sdate, edate):
start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d')
end_date = datetime.datetime.strptime(edate, '%Y-%m-%d')
different_day = end_date - start_date
day_num = different_day.days
weeks = int(ceil(day_num / 7))
week_days = []
# 当前周的周末的日期
start_day = get_current_week(start_date)
for i in range(weeks + 1):
day = start_day + timedelta(days=i * 7)
true_day = datetime.datetime.strftime(day, '%Y-%m-%d')
week_days.append(true_day)
return week_days
# 获取当前日期的周日日期
def get_current_week(s_date):
sunday = s_date
one_day = datetime.timedelta(days=1)
while sunday.weekday() != 6:
sunday += one_day
return sunday
from win32com import client
# 转换doc/docx为pdf
def doc2pdf(fn, path_data, filename):
# """
# :param fn: 'C:\Users\86173\Desktop\陈超峰.docx'
# :param path_data:'C:\Users\86173\Desktop\\'
# :param filename:陈超峰.docx
# :return: 'C:\Users\86173\Desktop\陈超峰.pdf'
# """
new_filename = filename.split('.')[0]
word = client.Dispatch("Word.Application") # 打开word应用程序
doc = word.Documents.Open(fn) # 打开word文件
doc.SaveAs("{}/{}.pdf".format(path_data, new_filename), 17) # 另存为后缀为".pdf"的文件其中参数17表示为pdf
doc.Close() # 关闭原来word文件
word.Quit()
return path_data + '/' + new_filename + '.pdf', new_filename + '.pdf'
# 图片转pdf
def png2pdf(dir_path, filename):
"""
:param dir_path: 图片所在目录路径
:param filename: 图片文件名
:return: 转化成的dpf的绝对路径
"""
img_path = dir_path + '/' + filename
img_type = filename.split('.')[-1]
new_filename = os.path.basename(img_path).replace(img_type, 'pdf')
doc = fitz.open()
img_doc = fitz.open(img_path)
pdf_bytes = img_doc.convert_to_pdf()
img_pdf = fitz.open('pdf', pdf_bytes)
doc.insert_pdf(img_pdf)
res_path = dir_path + '/' + new_filename
doc.save(res_path)
return res_path, new_filename
# pdf转换为txt文字
def pdf_to_text(path):
res = ''
doc = fitz.open(path)
for page in doc:
text = page.getText()
res += text
return res
def write_task(jsontext):
with open('task.json', 'w', encoding='utf-8') as f:
f.write(jsontext)
def get_msec():
"""
获取当前日期的毫秒级时间
:return:
"""
today = datetime.date.today().strftime('%Y-%m-%d 23:59:59')
t = int(time.mktime(time.strptime(today, "%Y-%m-%d %H:%M:%S")))
return int(round(t * 1000))
def hours():
"""
获取当前时间加24小时之后转成时间戳
:return:
"""
times = str(datetime.datetime.now()).split('.')[0]
times = datetime.datetime.strptime(times, "%Y-%m-%d %H:%M:%S")
times = str(times + datetime.timedelta(hours=24))
timearray = time.strptime(times, '%Y-%m-%d %H:%M:%S')
return int(time.mktime(timearray))
def task(**kwargs):
# 执行推送
Sample.create_task(kwargs['subject'], kwargs['creator_id'], kwargs['description'], kwargs['executor_ids'])
# 执行完任务,把配置里面的任务取消掉
with open('task.json', 'r', encoding='utf-8') as f:
data = json.load(f)
# 判断任务为空
if data != {}:
data.pop(kwargs['uid'])
jsontext = json.dumps(data)
# 写数据
write_task(jsontext)
def run_task(data):
"""
重启服务
data为读取的json文件数据
"""
res = scheduler.queue
uid = [i.kwargs['uid'] for i in res] # 取所有的任务uid
for k, v in data.items():
if k not in uid: # 在现有任务里面不在json文件里面则启动json文件里面的一个任务
now = str(time.time()).split('.')[0]
end_time = v['times'] - int(now)
# 没有过时的才会重启任务
if end_time > 0:
scheduler.enter(end_time, 1, task, kwargs=v)
t = threading.Thread(target=scheduler.run)
t.start()
def judge(**kwarg):
# 没有任务
if scheduler.empty():
with open('task.json', 'r', encoding='utf-8') as f:
ff = f.read()
if ff == '':
data = {}
else:
data = json.loads(ff)
if data == {}:
# 创建一个任务
scheduler.enter(3600, 1, task, kwargs=kwarg)
data[kwarg['uid']] = kwarg
jsontext = json.dumps(data)
else: # 重启所有服务
run_task(data)
# 再添加这次的服务,单位秒
scheduler.enter(3600, 1, task, kwargs=kwarg)
data[kwarg['uid']] = kwarg
jsontext = json.dumps(data)
write_task(jsontext)
# 开启线程
t = threading.Thread(target=scheduler.run)
t.start()
else:
# 查询创建了的任务
res = scheduler.queue
uid = [i.kwargs['uid'] for i in res] # 在执行中的任务uid
if not kwarg['types']: # 换新的任务
if kwarg['uid'] in uid: # 如果新开的任务在执行的任务中
for i in res:
# 如存在同样的求职者id取消老任务
if i.kwargs['uid'] == kwarg['uid']:
scheduler.cancel(i)
with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录
data = json.load(f)
for k, v in data.items():
if v['uid'] == kwarg['uid']:
data[kwarg['uid']] = kwarg
scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务
data[kwarg['uid']] = kwarg
else: # 不在
with open('task.json', 'r', encoding='utf-8') as f: # 添加json任务记录
data = json.load(f)
data['uid'] = kwarg['uid']
scheduler.enter(3600, 1, task, kwargs=kwarg) # 新任务
jsontext = json.dumps(data)
write_task(jsontext)
else: # 删除任务
for i in res:
# 如存在同样的求职者id取消老任务
if i.kwargs['uid'] == kwarg['uid']:
scheduler.cancel(i)
break
with open('task.json', 'r', encoding='utf-8') as f: # 取消json任务记录
data = json.load(f)
for k, v in data.items():
if v['uid'] == kwarg['uid']:
data.pop(v['uid'])
jsontext = json.dumps(data)
write_task(jsontext)
if __name__ == '__main__':
kwarg = {"uid": "sdfsd",
"subject": "推荐通知",
"creator_id": "创建者",
"description": "待办内容",
"executor_ids": "执行者",
"types": False,
"times": 1666231200}
judge(**kwarg)