#!/usr/bin/python # coding:utf-8 import os import random import time import datetime from math import ceil import pandas as pd import smtplib from email.mime.text import MIMEText from email.utils import formataddr from fitz import fitz from datetime import timedelta from datetime import datetime as p1 import calendar from core.config import Settings def get_uid(): return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:] # 获取筛选条件的包含关系 def get_bijiao(bijiao): if bijiao == '==' or bijiao == 'in' or bijiao == 'like' or bijiao == 'is not null': return "IN" elif bijiao == '!=' or bijiao == 'not like' or bijiao == 'is null': return 'NOT LIKE' # 判断传入的数据类型 def estimate_data(data_type): if data_type == 'int': return "Nullable(Int64)" elif data_type == 'ip': return "Nullable(DateTime('UTC'))" else: return "Nullable(String)" # 将字典变成字符串 def dict_to_str(dic): c = str() b = 0 for k, v in dic.items(): b += 1 if b == 1: c += "{\"%s\":\"%s\"," % (k, v) elif b != len(dic): c += "\"%s\":\"%s\"," % (k, v) else: c += "\"%s\":\"%s\"}" % (k, v) return c def getEveryDay(begin_date, end_date): # 前闭后闭 date_list = [] begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d") end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") while begin_date <= end_date: date_str = begin_date.strftime("%Y-%m-%d") date_list.append(date_str) begin_date += datetime.timedelta(days=1) return date_list # print(getEveryDay('2016-01-01','2017-05-11')) def Download_xlsx(df, name): """ 下载功能 name为文件名 """ from urllib.parse import quote import mimetypes from utils import DfToStream from fastapi.responses import StreamingResponse file_name = quote(f'{name}.xlsx') mime = mimetypes.guess_type(file_name)[0] df_to_stream = DfToStream((df, name)) with df_to_stream as d: export = d.to_stream() Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) return Download def jiange_insert(list_date): """ 间隔1条插入一条数据插入数据 :param day: list数据 :return: list """ i = 1 while i <= len(list_date): list_date.insert(i, '-') i += 2 return list_date def create_df(resp): """ 分布分析外部下载功能的df数据 """ columns = resp['label'] day = list(resp['list'].keys()) jiange_insert(day) date = [] day_nu = 0 for i in day: if i == '-': av = day[day_nu - 1] day_date = resp['list'][av]['总体'] else: day_date = resp['list'][i]['总体'] date_dict = {} n = 0 p = 0 if i == '-': date_dict['事件发生时间'] = '-' date_dict['总人数'] = '-' for nu in range(len(columns)): date_dict[columns[nu]] = day_date['p'][p] p += 1 date.append(date_dict) else: date_dict['事件发生时间'] = i date_dict['总人数'] = day_date['total'] for nu in range(len(columns)): date_dict[columns[nu]] = day_date['n'][n] n += 1 date.append(date_dict) day_nu += 1 columns.insert(0, '总人数') columns.insert(0, '事件发生时间') df = pd.DataFrame(data=date, columns=columns) return df def create_neidf(resp, columnName): """ 分布分析内部下载功能的df数据 """ columns = resp['label'] day = list(resp['list'].keys()) jiange_insert(day) date = [] day_nu = 0 for i in day: if i == '-': av = day[day_nu - 1] day_date = resp['list'][av] else: day_date = resp['list'][i] date_dict = {} n = 0 p = 0 if i == '-': date_dict[columnName] = '-' date_dict['全部用户数'] = '-' for nu in range(len(columns)): date_dict[columns[nu]] = day_date['p'][p] p += 1 date.append(date_dict) else: date_dict[columnName] = i date_dict['全部用户数'] = day_date['total'] for nu in range(len(columns)): date_dict[columns[nu]] = day_date['n'][n] n += 1 date.append(date_dict) day_nu += 1 columns.insert(0, '全部用户数') columns.insert(0, columnName) df = pd.DataFrame(data=date, columns=columns) return df def random_hex(): """ 生成16位随机数 :return: 随机数 """ result = hex(random.randint(0, 16 ** 16)).replace('0x', '').upper() if (len(result) < 16): result = '0' * (16 - len(result)) + result return result def get_time(fmt: str = '%Y-%m-%d') -> str: ''' 获取当前时间 ''' ts = time.time() ta = time.localtime(ts) t = time.strftime(fmt, ta) return t def import_excel(data, columns, name): """ 导出数据到xlsx表里面 :param data: 需要导出的数据 :param columns: df的表名 例:['a','b','c'] :param name: 文件名。例:'随机数' :return: """ zh = pd.DataFrame(data, columns=columns) zh.to_excel(f'{name}.xlsx', index=False, header=True) def qujian_time(start_time, end_time): """ 把两个时间变成区间 :param start_time: '2022-07-01 10:00:00' :param end_time: '2022-07-01 10:30:00' :return: '2022-07-01 10:00:00~10:30:00' """ timess = str(end_time).split(' ')[-1] return str(start_time) + '~' + timess # 发送的文本邮件 def send_str_mail(str_msg, my_user): """ 发送的文本邮件 :param str_msg: 发送的邮件内容 :param my_user: 接收者的邮箱 :return: bool """ msg = MIMEText(str_msg, 'plain', 'utf-8') msg['From'] = formataddr(["乐谷游戏人事", Settings.my_sender]) # 括号里的对应发件人邮箱昵称,发件人邮箱账号 msg['To'] = formataddr(["FK", my_user]) # 括号里的对应收件人邮箱昵称,收件人邮箱账号 msg['Subject'] = Settings.subject # 邮件的主题,也可以说是标题 server = smtplib.SMTP_SSL("smtp.qq.com", 465) # 发件人邮箱中的SMTP服务器,端口是25 server.login(Settings.my_sender, Settings.my_pass) # 括号中对应的是发件人邮箱账号,邮箱密码 # server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件 server.sendmail(Settings.my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件 server.quit() # 关闭连接 def get_week(date_str=None): if date_str and isinstance(date_str, str): now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime( "%Y-%m-%d %H:%M:%S") else: now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0) now_time = strptime(now_time) # 当前日期所在周的周一 week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute, seconds=now_time.second) # 当前日期所在周的周日 week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59) return week_start_time, week_end_time def strptime(date_string): """ 将字符串转换成datetime.datetime类型 :param date_string: '2022-05-29 23:59:59' :return: 2022-05-29 23:59:59 """ return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S') def strptime1(date_str): """ 将字符串转换成datetime.datetime类型 :param date_string: '2022-05-29' :return: 2022-05-29 00:00:00 """ return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") def start_end_month(time): """ 获取某个月的起始时间和结束时间 :param time: '2022-05-29' :return: """ now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S") this_month_start = datetime.datetime(now.year, now.month, 1) this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1]) this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59) return this_month_start, this_month_end1 # 获取两个日期之间的所有天数 def get_every_days(sdate, edate): days = [] start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d') end_date = datetime.datetime.strptime(edate, '%Y-%m-%d') different_day = end_date - start_date day_num = different_day.days for i in range(day_num + 1): day = start_date + timedelta(days=i) true_day = datetime.datetime.strftime(day, '%Y-%m-%d') days.append(true_day) return days # 获取两个日期之间的所有月份 def get_every_months(sdate, edate): start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d') end_date = datetime.datetime.strptime(edate, '%Y-%m-%d') months = (start_date.year - end_date.year) * 12 + end_date.month - start_date.month month_range = ['%04d-%02d' % (int(start_date.year + mon // 12), int(mon % 12 + 1)) for mon in range(start_date.month - 1, start_date.month + months)] return month_range # 获取两个日期之间所有的周日日期 def get_every_weeks(sdate, edate): start_date = datetime.datetime.strptime(sdate, '%Y-%m-%d') end_date = datetime.datetime.strptime(edate, '%Y-%m-%d') different_day = end_date - start_date day_num = different_day.days weeks = int(ceil(day_num / 7)) week_days = [] # 当前周的周末的日期 start_day = get_current_week(start_date) for i in range(weeks + 1): day = start_day + timedelta(days=i * 7) true_day = datetime.datetime.strftime(day, '%Y-%m-%d') week_days.append(true_day) return week_days # 获取当前日期的周日日期 def get_current_week(s_date): sunday = s_date one_day = datetime.timedelta(days=1) while sunday.weekday() != 6: sunday += one_day return sunday from win32com import client # 转换doc/docx为pdf def doc2pdf(fn, path_data, filename): # """ # :param fn: 'C:\Users\86173\Desktop\陈超峰.docx' # :param path_data:'C:\Users\86173\Desktop\\' # :param filename:陈超峰.docx # :return: 'C:\Users\86173\Desktop\陈超峰.pdf' # """ new_filename = filename.split('.')[0] word = client.Dispatch("Word.Application") # 打开word应用程序 doc = word.Documents.Open(fn) # 打开word文件 doc.SaveAs("{}/{}.pdf".format(path_data, new_filename), 17) # 另存为后缀为".pdf"的文件,其中参数17表示为pdf doc.Close() # 关闭原来word文件 word.Quit() return path_data + '/' + new_filename + '.pdf', new_filename + '.pdf' # 图片转pdf def png2pdf(dir_path, filename): """ :param dir_path: 图片所在目录路径 :param filename: 图片文件名 :return: 转化成的dpf的绝对路径 """ img_path = dir_path + '/' + filename img_type = filename.split('.')[-1] new_filename = os.path.basename(img_path).replace(img_type, 'pdf') doc = fitz.open() img_doc = fitz.open(img_path) pdf_bytes = img_doc.convert_to_pdf() img_pdf = fitz.open('pdf', pdf_bytes) doc.insert_pdf(img_pdf) res_path = dir_path + '/' + new_filename doc.save(res_path) return res_path, new_filename if __name__ == '__main__': pass # fn=r'C:\Users\Administrator\Desktop\面试简历1\智联招聘_张双琪_Web开发工程师_中文.doc' # path_data=r'C:\Users\Administrator\Desktop\面试简历1\\' # filename='智联招聘_张双琪_Web开发工程师_中文.doc' # doc2pdf(fn, path_data, filename)